Skip to content

feat: add session management for streamableHttp [MCP-52] #379

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"name": "Launch Program",
"skipFiles": ["<node_internals>/**"],
"program": "${workspaceFolder}/dist/index.js",
"args": ["--transport", "http", "--loggers", "stderr", "mcp"],
"preLaunchTask": "tsc: build - tsconfig.build.json",
"outFiles": ["${workspaceFolder}/dist/**/*.js"]
}
Expand Down
28 changes: 14 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,20 +299,20 @@ The MongoDB MCP Server can be configured using multiple methods, with the follow

### Configuration Options

| Option | Default | Description |
| ------------------ | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `apiClientId` | <not set> | Atlas API client ID for authentication. Required for running Atlas tools. |
| `apiClientSecret` | <not set> | Atlas API client secret for authentication. Required for running Atlas tools. |
| `connectionString` | <not set> | MongoDB connection string for direct database connections. Optional, if not set, you'll need to call the `connect` tool before interacting with MongoDB data. |
| `loggers` | disk,mcp | Comma separated values, possible values are `mcp`, `disk` and `stderr`. See [Logger Options](#logger-options) for details. |
| `logPath` | see note\* | Folder to store logs. |
| `disabledTools` | <not set> | An array of tool names, operation types, and/or categories of tools that will be disabled. |
| `readOnly` | false | When set to true, only allows read, connect, and metadata operation types, disabling create/update/delete operations. |
| `indexCheck` | false | When set to true, enforces that query operations must use an index, rejecting queries that perform a collection scan. |
| `telemetry` | enabled | When set to disabled, disables telemetry collection. |
| `transport` | stdio | Either 'stdio' or 'http'. |
| `httpPort` | 3000 | Port number. |
| `httpHost` | 127.0.0.1 | Host to bind the http server. |
| CLI Option | Environment Variable | Default | Description |
| ------------------ | --------------------------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `apiClientId` | `MDB_MCP_API_CLIENT_ID` | <not set> | Atlas API client ID for authentication. Required for running Atlas tools. |
| `apiClientSecret` | `MDB_MCP_API_CLIENT_SECRET` | <not set> | Atlas API client secret for authentication. Required for running Atlas tools. |
| `connectionString` | `MDB_MCP_CONNECTION_STRING` | <not set> | MongoDB connection string for direct database connections. Optional, if not set, you'll need to call the `connect` tool before interacting with MongoDB data. |
| `loggers` | `MDB_MCP_LOGGERS` | disk,mcp | Comma separated values, possible values are `mcp`, `disk` and `stderr`. See [Logger Options](#logger-options) for details. |
| `logPath` | `MDB_MCP_LOG_PATH` | see note\* | Folder to store logs. |
| `disabledTools` | `MDB_MCP_DISABLED_TOOLS` | <not set> | An array of tool names, operation types, and/or categories of tools that will be disabled. |
| `readOnly` | `MDB_MCP_READ_ONLY` | false | When set to true, only allows read, connect, and metadata operation types, disabling create/update/delete operations. |
| `indexCheck` | `MDB_MCP_INDEX_CHECK` | false | When set to true, enforces that query operations must use an index, rejecting queries that perform a collection scan. |
| `telemetry` | `MDB_MCP_TELEMETRY` | enabled | When set to disabled, disables telemetry collection. |
| `transport` | `MDB_MCP_TRANSPORT` | stdio | Either 'stdio' or 'http'. |
| `httpPort` | `MDB_MCP_HTTP_PORT` | 3000 | Port number. |
| `httpHost` | `MDB_MCP_HTTP_HOST` | 127.0.0.1 | Host to bind the http server. |

#### Logger Options

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
},
"type": "module",
"scripts": {
"start": "node dist/index.js --transport http",
"prepare": "npm run build",
"build:clean": "rm -rf dist",
"build:compile": "tsc --project tsconfig.build.json",
Expand Down
9 changes: 3 additions & 6 deletions src/common/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,9 @@ export const LogId = {
toolUpdateFailure: mongoLogId(1_005_001),

streamableHttpTransportStarted: mongoLogId(1_006_001),
streamableHttpTransportStartFailure: mongoLogId(1_006_002),
streamableHttpTransportSessionInitialized: mongoLogId(1_006_003),
streamableHttpTransportRequestFailure: mongoLogId(1_006_004),
streamableHttpTransportCloseRequested: mongoLogId(1_006_005),
streamableHttpTransportCloseSuccess: mongoLogId(1_006_006),
streamableHttpTransportCloseFailure: mongoLogId(1_006_007),
streamableHttpTransportSessionCloseFailure: mongoLogId(1_006_002),
streamableHttpTransportRequestFailure: mongoLogId(1_006_003),
streamableHttpTransportCloseFailure: mongoLogId(1_006_004),
} as const;

export abstract class LoggerBase {
Expand Down
48 changes: 48 additions & 0 deletions src/common/sessionStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import logger, { LogId } from "./logger.js";

export class SessionStore {
private sessions: { [sessionId: string]: StreamableHTTPServerTransport } = {};

getSession(sessionId: string): StreamableHTTPServerTransport | undefined {
return this.sessions[sessionId];
}

setSession(sessionId: string, transport: StreamableHTTPServerTransport): void {
if (this.sessions[sessionId]) {
throw new Error(`Session ${sessionId} already exists`);
}
this.sessions[sessionId] = transport;
}

async closeSession(sessionId: string, closeTransport: boolean = true): Promise<void> {
if (!this.sessions[sessionId]) {
throw new Error(`Session ${sessionId} not found`);
}
if (closeTransport) {
const transport = this.sessions[sessionId];
if (!transport) {
throw new Error(`Session ${sessionId} not found`);
}
try {
await transport.close();
} catch (error) {
logger.error(
LogId.streamableHttpTransportSessionCloseFailure,
"streamableHttpTransport",
`Error closing transport ${sessionId}: ${error instanceof Error ? error.message : String(error)}`
);
}
}
delete this.sessions[sessionId];
}

async closeAllSessions(): Promise<void> {
await Promise.all(
Object.values(this.sessions)
.filter((transport) => transport !== undefined)
.map((transport) => transport.close())
);
this.sessions = {};
}
}
9 changes: 5 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ async function main() {
transportRunner
.close()
.then(() => {
logger.info(LogId.serverClosed, "server", `Server closed`);
process.exit(0);
})
.catch((error: unknown) => {
Expand All @@ -22,10 +23,10 @@ async function main() {
});
};

process.once("SIGINT", shutdown);
process.once("SIGABRT", shutdown);
process.once("SIGTERM", shutdown);
process.once("SIGQUIT", shutdown);
process.on("SIGINT", shutdown);
process.on("SIGABRT", shutdown);
process.on("SIGTERM", shutdown);
process.on("SIGQUIT", shutdown);

try {
await transportRunner.start();
Expand Down
168 changes: 112 additions & 56 deletions src/transports/streamableHttp.ts
Original file line number Diff line number Diff line change
@@ -1,90 +1,143 @@
import express from "express";
import http from "http";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
import { TransportRunnerBase } from "./base.js";
import { config } from "../common/config.js";
import logger, { LogId } from "../common/logger.js";
import { randomUUID } from "crypto";
import { SessionStore } from "../common/sessionStore.js";

const JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED = -32000;
const JSON_RPC_ERROR_CODE_METHOD_NOT_ALLOWED = -32601;
const JSON_RPC_ERROR_CODE_SESSION_ID_REQUIRED = -32001;
const JSON_RPC_ERROR_CODE_SESSION_ID_INVALID = -32002;
const JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND = -32003;
const JSON_RPC_ERROR_CODE_INVALID_REQUEST = -32004;

function promiseHandler(
fn: (req: express.Request, res: express.Response, next: express.NextFunction) => Promise<void>
) {
return (req: express.Request, res: express.Response, next: express.NextFunction) => {
fn(req, res, next).catch(next);
fn(req, res, next).catch((error) => {
logger.error(
LogId.streamableHttpTransportRequestFailure,
"streamableHttpTransport",
`Error handling request: ${error instanceof Error ? error.message : String(error)}`
);
res.status(400).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED,
message: `failed to handle request`,
data: error instanceof Error ? error.message : String(error),
},
});
});
};
}

export class StreamableHttpRunner extends TransportRunnerBase {
private httpServer: http.Server | undefined;
private sessionStore: SessionStore = new SessionStore();

async start() {
const app = express();
app.enable("trust proxy"); // needed for reverse proxy support
app.use(express.urlencoded({ extended: true }));
app.use(express.json());

const handleRequest = async (req: express.Request, res: express.Response) => {
const sessionId = req.headers["mcp-session-id"];
if (!sessionId) {
res.status(400).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_SESSION_ID_REQUIRED,
message: `session id is required`,
},
});
return;
}
if (typeof sessionId !== "string") {
res.status(400).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_SESSION_ID_INVALID,
message: `session id is invalid`,
},
});
return;
}
const transport = this.sessionStore.getSession(sessionId);
if (!transport) {
res.status(404).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND,
message: `session not found`,
},
});
return;
}
await transport.handleRequest(req, res, req.body);
};

app.post(
"/mcp",
promiseHandler(async (req: express.Request, res: express.Response) => {
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined,
});
const sessionId = req.headers["mcp-session-id"];
if (sessionId) {
await handleRequest(req, res);
return;
}

const server = this.setupServer();
if (!isInitializeRequest(req.body)) {
res.status(400).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_INVALID_REQUEST,
message: `invalid request`,
},
});
return;
}

await server.connect(transport);
const server = this.setupServer();
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID().toString(),
onsessioninitialized: (sessionId) => {
this.sessionStore.setSession(sessionId, transport);
},
onsessionclosed: async (sessionId) => {
try {
await this.sessionStore.closeSession(sessionId, false);
} catch (error) {
logger.error(
LogId.streamableHttpTransportSessionCloseFailure,
"streamableHttpTransport",
`Error closing session: ${error instanceof Error ? error.message : String(error)}`
);
}
},
});

res.on("close", () => {
Promise.all([transport.close(), server.close()]).catch((error: unknown) => {
transport.onclose = () => {
server.close().catch((error) => {
logger.error(
LogId.streamableHttpTransportCloseFailure,
"streamableHttpTransport",
`Error closing server: ${error instanceof Error ? error.message : String(error)}`
);
});
});
};

try {
await transport.handleRequest(req, res, req.body);
} catch (error) {
logger.error(
LogId.streamableHttpTransportRequestFailure,
"streamableHttpTransport",
`Error handling request: ${error instanceof Error ? error.message : String(error)}`
);
res.status(400).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED,
message: `failed to handle request`,
data: error instanceof Error ? error.message : String(error),
},
});
}
await server.connect(transport);

await transport.handleRequest(req, res, req.body);
})
);

app.get("/mcp", (req: express.Request, res: express.Response) => {
res.status(405).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_METHOD_NOT_ALLOWED,
message: `method not allowed`,
},
});
});

app.delete("/mcp", (req: express.Request, res: express.Response) => {
res.status(405).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_METHOD_NOT_ALLOWED,
message: `method not allowed`,
},
});
});
app.get("/mcp", promiseHandler(handleRequest));
app.delete("/mcp", promiseHandler(handleRequest));

this.httpServer = await new Promise<http.Server>((resolve, reject) => {
const result = app.listen(config.httpPort, config.httpHost, (err?: Error) => {
Expand All @@ -104,14 +157,17 @@ export class StreamableHttpRunner extends TransportRunnerBase {
}

async close(): Promise<void> {
await new Promise<void>((resolve, reject) => {
this.httpServer?.close((err) => {
if (err) {
reject(err);
return;
}
resolve();
});
});
await Promise.all([
this.sessionStore.closeAllSessions(),
new Promise<void>((resolve, reject) => {
this.httpServer?.close((err) => {
if (err) {
reject(err);
return;
}
resolve();
});
}),
]);
}
}
Loading
Loading