Skip to content

feat: add client idle timeout [MCP-57] #383

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: MCP-42
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
"request": "launch",
"name": "Launch Program",
"skipFiles": ["<node_internals>/**"],
"program": "${workspaceFolder}/dist/index.js",
"args": ["--transport", "http", "--loggers", "stderr", "mcp"],
"runtimeExecutable": "npm",
"runtimeArgs": ["start"],
"preLaunchTask": "tsc: build - tsconfig.build.json",
"outFiles": ["${workspaceFolder}/dist/**/*.js"]
}
Expand Down
30 changes: 16 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,20 +302,22 @@ The MongoDB MCP Server can be configured using multiple methods, with the follow

### Configuration Options

| CLI Option | Environment Variable | Default | Description |
| ------------------ | --------------------------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `apiClientId` | `MDB_MCP_API_CLIENT_ID` | <not set> | Atlas API client ID for authentication. Required for running Atlas tools. |
| `apiClientSecret` | `MDB_MCP_API_CLIENT_SECRET` | <not set> | Atlas API client secret for authentication. Required for running Atlas tools. |
| `connectionString` | `MDB_MCP_CONNECTION_STRING` | <not set> | MongoDB connection string for direct database connections. Optional, if not set, you'll need to call the `connect` tool before interacting with MongoDB data. |
| `loggers` | `MDB_MCP_LOGGERS` | disk,mcp | Comma separated values, possible values are `mcp`, `disk` and `stderr`. See [Logger Options](#logger-options) for details. |
| `logPath` | `MDB_MCP_LOG_PATH` | see note\* | Folder to store logs. |
| `disabledTools` | `MDB_MCP_DISABLED_TOOLS` | <not set> | An array of tool names, operation types, and/or categories of tools that will be disabled. |
| `readOnly` | `MDB_MCP_READ_ONLY` | false | When set to true, only allows read, connect, and metadata operation types, disabling create/update/delete operations. |
| `indexCheck` | `MDB_MCP_INDEX_CHECK` | false | When set to true, enforces that query operations must use an index, rejecting queries that perform a collection scan. |
| `telemetry` | `MDB_MCP_TELEMETRY` | enabled | When set to disabled, disables telemetry collection. |
| `transport` | `MDB_MCP_TRANSPORT` | stdio | Either 'stdio' or 'http'. |
| `httpPort` | `MDB_MCP_HTTP_PORT` | 3000 | Port number. |
| `httpHost` | `MDB_MCP_HTTP_HOST` | 127.0.0.1 | Host to bind the http server. |
| CLI Option | Environment Variable | Default | Description |
| ----------------------- | --------------------------------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `apiClientId` | `MDB_MCP_API_CLIENT_ID` | <not set> | Atlas API client ID for authentication. Required for running Atlas tools. |
| `apiClientSecret` | `MDB_MCP_API_CLIENT_SECRET` | <not set> | Atlas API client secret for authentication. Required for running Atlas tools. |
| `connectionString` | `MDB_MCP_CONNECTION_STRING` | <not set> | MongoDB connection string for direct database connections. Optional, if not set, you'll need to call the `connect` tool before interacting with MongoDB data. |
| `loggers` | `MDB_MCP_LOGGERS` | disk,mcp | Comma separated values, possible values are `mcp`, `disk` and `stderr`. See [Logger Options](#logger-options) for details. |
| `logPath` | `MDB_MCP_LOG_PATH` | see note\* | Folder to store logs. |
| `disabledTools` | `MDB_MCP_DISABLED_TOOLS` | <not set> | An array of tool names, operation types, and/or categories of tools that will be disabled. |
| `readOnly` | `MDB_MCP_READ_ONLY` | false | When set to true, only allows read, connect, and metadata operation types, disabling create/update/delete operations. |
| `indexCheck` | `MDB_MCP_INDEX_CHECK` | false | When set to true, enforces that query operations must use an index, rejecting queries that perform a collection scan. |
| `telemetry` | `MDB_MCP_TELEMETRY` | enabled | When set to disabled, disables telemetry collection. |
| `transport` | `MDB_MCP_TRANSPORT` | stdio | Either 'stdio' or 'http'. |
| `httpPort` | `MDB_MCP_HTTP_PORT` | 3000 | Port number. |
| `httpHost` | `MDB_MCP_HTTP_HOST` | 127.0.0.1 | Host to bind the http server. |
| `idleTimeoutMs` | `MDB_MCP_IDLE_TIMEOUT_MS` | 600000 | Idle timeout for a client to disconnect (only applies to http transport). |
| `notificationTimeoutMs` | `MDB_MCP_NOTIFICATION_TIMEOUT_MS` | 540000 | Notification timeout for a client to be aware of diconnect (only applies to http transport). |

#### Logger Options

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
},
"type": "module",
"scripts": {
"start": "node dist/index.js --transport http",
"start": "node dist/index.js --transport http --loggers stderr mcp",
"prepare": "npm run build",
"build:clean": "rm -rf dist",
"build:compile": "tsc --project tsconfig.build.json",
Expand Down
4 changes: 4 additions & 0 deletions src/common/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ export interface UserConfig {
httpPort: number;
httpHost: string;
loggers: Array<"stderr" | "disk" | "mcp">;
idleTimeoutMs: number;
notificationTimeoutMs: number;
}

const defaults: UserConfig = {
Expand All @@ -47,6 +49,8 @@ const defaults: UserConfig = {
httpPort: 3000,
httpHost: "127.0.0.1",
loggers: ["disk", "mcp"],
idleTimeoutMs: 600000, // 10 minutes
notificationTimeoutMs: 540000, // 9 minutes
};

export const config = {
Expand Down
5 changes: 3 additions & 2 deletions src/common/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ export const LogId = {

streamableHttpTransportStarted: mongoLogId(1_006_001),
streamableHttpTransportSessionCloseFailure: mongoLogId(1_006_002),
streamableHttpTransportRequestFailure: mongoLogId(1_006_003),
streamableHttpTransportCloseFailure: mongoLogId(1_006_004),
streamableHttpTransportSessionCloseNotification: mongoLogId(1_006_003),
streamableHttpTransportRequestFailure: mongoLogId(1_006_004),
streamableHttpTransportCloseFailure: mongoLogId(1_006_005),
} as const;

export abstract class LoggerBase {
Expand Down
128 changes: 112 additions & 16 deletions src/common/sessionStore.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,132 @@
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import logger, { LogId } from "./logger.js";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import logger, { LogId, McpLogger } from "./logger.js";

class TimeoutManager {
private timeoutId?: NodeJS.Timeout;
public onerror?: (error: unknown) => void;

constructor(
private readonly callback: () => Promise<void> | void,
private readonly timeoutMS: number
) {
if (timeoutMS <= 0) {
throw new Error("timeoutMS must be greater than 0");
}
this.reset();
}

clear() {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = undefined;
}
}

private async runCallback() {
if (this.callback) {
try {
await this.callback();
} catch (error: unknown) {
this.onerror?.(error);
}
}
}

reset() {
this.clear();
this.timeoutId = setTimeout(() => {
void this.runCallback().finally(() => {
this.timeoutId = undefined;
});
}, this.timeoutMS);
}
}

export class SessionStore {
private sessions: { [sessionId: string]: StreamableHTTPServerTransport } = {};
private sessions: {
[sessionId: string]: {
mcpServer: McpServer;
transport: StreamableHTTPServerTransport;
abortTimeout: TimeoutManager;
notificationTimeout: TimeoutManager;
};
} = {};

constructor(
private readonly idleTimeoutMS: number,
private readonly notificationTimeoutMS: number
) {
if (idleTimeoutMS <= 0) {
throw new Error("idleTimeoutMS must be greater than 0");
}
if (notificationTimeoutMS <= 0) {
throw new Error("notificationTimeoutMS must be greater than 0");
}
if (idleTimeoutMS <= notificationTimeoutMS) {
throw new Error("idleTimeoutMS must be greater than notificationTimeoutMS");
}
}

getSession(sessionId: string): StreamableHTTPServerTransport | undefined {
return this.sessions[sessionId];
this.resetTimeout(sessionId);
return this.sessions[sessionId]?.transport;
}

private resetTimeout(sessionId: string): void {
const session = this.sessions[sessionId];
if (!session) {
return;
}

session.abortTimeout.reset();

session.notificationTimeout.reset();
}

setSession(sessionId: string, transport: StreamableHTTPServerTransport): void {
private sendNotification(sessionId: string): void {
const session = this.sessions[sessionId];
if (!session) {
return;
}
const logger = new McpLogger(session.mcpServer);
logger.info(
LogId.streamableHttpTransportSessionCloseNotification,
"sessionStore",
"Session is about to be closed due to inactivity"
);
}

setSession(sessionId: string, transport: StreamableHTTPServerTransport, mcpServer: McpServer): void {
if (this.sessions[sessionId]) {
throw new Error(`Session ${sessionId} already exists`);
}
this.sessions[sessionId] = transport;
const abortTimeout = new TimeoutManager(async () => {
const logger = new McpLogger(mcpServer);
logger.info(
LogId.streamableHttpTransportSessionCloseNotification,
"sessionStore",
"Session closed due to inactivity"
);

await this.closeSession(sessionId);
}, this.idleTimeoutMS);
const notificationTimeout = new TimeoutManager(
() => this.sendNotification(sessionId),
this.notificationTimeoutMS
);
this.sessions[sessionId] = { mcpServer, transport, abortTimeout, notificationTimeout };
}

async closeSession(sessionId: string, closeTransport: boolean = true): Promise<void> {
if (!this.sessions[sessionId]) {
throw new Error(`Session ${sessionId} not found`);
}
this.sessions[sessionId].abortTimeout.clear();
this.sessions[sessionId].notificationTimeout.clear();
if (closeTransport) {
const transport = this.sessions[sessionId];
if (!transport) {
throw new Error(`Session ${sessionId} not found`);
}
try {
await transport.close();
await this.sessions[sessionId].transport.close();
} catch (error) {
logger.error(
LogId.streamableHttpTransportSessionCloseFailure,
Expand All @@ -38,11 +139,6 @@ export class SessionStore {
}

async closeAllSessions(): Promise<void> {
await Promise.all(
Object.values(this.sessions)
.filter((transport) => transport !== undefined)
.map((transport) => transport.close())
);
this.sessions = {};
await Promise.all(Object.keys(this.sessions).map((sessionId) => this.closeSession(sessionId)));
}
}
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { StdioRunner } from "./transports/stdio.js";
import { StreamableHttpRunner } from "./transports/streamableHttp.js";

async function main() {
const transportRunner = config.transport === "stdio" ? new StdioRunner() : new StreamableHttpRunner();
const transportRunner = config.transport === "stdio" ? new StdioRunner(config) : new StreamableHttpRunner(config);

const shutdown = () => {
logger.info(LogId.serverCloseRequested, "server", `Server close requested`);
Expand Down
14 changes: 7 additions & 7 deletions src/transports/base.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import { config } from "../common/config.js";
import { UserConfig } from "../common/config.js";
import { packageInfo } from "../common/packageInfo.js";
import { Server } from "../server.js";
import { Session } from "../common/session.js";
import { Telemetry } from "../telemetry/telemetry.js";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";

export abstract class TransportRunnerBase {
protected setupServer(): Server {
protected setupServer(userConfig: UserConfig): Server {
const session = new Session({
apiBaseUrl: config.apiBaseUrl,
apiClientId: config.apiClientId,
apiClientSecret: config.apiClientSecret,
apiBaseUrl: userConfig.apiBaseUrl,
apiClientId: userConfig.apiClientId,
apiClientSecret: userConfig.apiClientSecret,
});

const telemetry = Telemetry.create(session, config);
const telemetry = Telemetry.create(session, userConfig);

const mcpServer = new McpServer({
name: packageInfo.mcpServerName,
Expand All @@ -24,7 +24,7 @@ export abstract class TransportRunnerBase {
mcpServer,
session,
telemetry,
userConfig: config,
userConfig,
});
}

Expand Down
7 changes: 6 additions & 1 deletion src/transports/stdio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { TransportRunnerBase } from "./base.js";
import { JSONRPCMessage, JSONRPCMessageSchema } from "@modelcontextprotocol/sdk/types.js";
import { EJSON } from "bson";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { UserConfig } from "../common/config.js";

// This is almost a copy of ReadBuffer from @modelcontextprotocol/sdk
// but it uses EJSON.parse instead of JSON.parse to handle BSON types
Expand Down Expand Up @@ -52,9 +53,13 @@ export function createStdioTransport(): StdioServerTransport {
export class StdioRunner extends TransportRunnerBase {
private server: Server | undefined;

constructor(private userConfig: UserConfig) {
super();
}

async start() {
try {
this.server = this.setupServer();
this.server = this.setupServer(this.userConfig);

const transport = createStdioTransport();

Expand Down
17 changes: 11 additions & 6 deletions src/transports/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import http from "http";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
import { TransportRunnerBase } from "./base.js";
import { config } from "../common/config.js";
import { UserConfig } from "../common/config.js";
import logger, { LogId } from "../common/logger.js";
import { randomUUID } from "crypto";
import { SessionStore } from "../common/sessionStore.js";
Expand Down Expand Up @@ -38,7 +38,12 @@ function promiseHandler(

export class StreamableHttpRunner extends TransportRunnerBase {
private httpServer: http.Server | undefined;
private sessionStore: SessionStore = new SessionStore();
private sessionStore: SessionStore;

constructor(private userConfig: UserConfig) {
super();
this.sessionStore = new SessionStore(this.userConfig.idleTimeoutMs, this.userConfig.notificationTimeoutMs);
}

async start() {
const app = express();
Expand Down Expand Up @@ -101,11 +106,11 @@ export class StreamableHttpRunner extends TransportRunnerBase {
return;
}

const server = this.setupServer();
const server = this.setupServer(this.userConfig);
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID().toString(),
onsessioninitialized: (sessionId) => {
this.sessionStore.setSession(sessionId, transport);
this.sessionStore.setSession(sessionId, transport, server.mcpServer);
},
onsessionclosed: async (sessionId) => {
try {
Expand Down Expand Up @@ -140,7 +145,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
app.delete("/mcp", promiseHandler(handleRequest));

this.httpServer = await new Promise<http.Server>((resolve, reject) => {
const result = app.listen(config.httpPort, config.httpHost, (err?: Error) => {
const result = app.listen(this.userConfig.httpPort, this.userConfig.httpHost, (err?: Error) => {
if (err) {
reject(err);
return;
Expand All @@ -152,7 +157,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
logger.info(
LogId.streamableHttpTransportStarted,
"streamableHttpTransport",
`Server started on http://${config.httpHost}:${config.httpPort}`
`Server started on http://${this.userConfig.httpHost}:${this.userConfig.httpPort}`
);
}

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/transports/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe("StreamableHttpRunner", () => {
oldLoggers = config.loggers;
config.telemetry = "disabled";
config.loggers = ["stderr"];
runner = new StreamableHttpRunner();
runner = new StreamableHttpRunner(config);
await runner.start();
});

Expand Down
Loading