From 7d3acf7a3dc6acd57dc45666696b1b239d1b2af0 Mon Sep 17 00:00:00 2001 From: Filipe Constantinov Menezes Date: Mon, 21 Jul 2025 14:25:05 +0100 Subject: [PATCH 1/2] chore: address comments from #361 --- src/common/logger.ts | 5 +++-- src/common/sessionStore.ts | 41 ++++++++++++++++++++++---------------- 2 files changed, 27 insertions(+), 19 deletions(-) diff --git a/src/common/logger.ts b/src/common/logger.ts index 7ed1a9ac..0c2fd726 100644 --- a/src/common/logger.ts +++ b/src/common/logger.ts @@ -44,8 +44,9 @@ export const LogId = { streamableHttpTransportStarted: mongoLogId(1_006_001), streamableHttpTransportSessionCloseFailure: mongoLogId(1_006_002), streamableHttpTransportSessionCloseNotification: mongoLogId(1_006_003), - streamableHttpTransportRequestFailure: mongoLogId(1_006_004), - streamableHttpTransportCloseFailure: mongoLogId(1_006_005), + streamableHttpTransportSessionCloseNotificationFailure: mongoLogId(1_006_004), + streamableHttpTransportRequestFailure: mongoLogId(1_006_005), + streamableHttpTransportCloseFailure: mongoLogId(1_006_006), } as const; export abstract class LoggerBase { diff --git a/src/common/sessionStore.ts b/src/common/sessionStore.ts index 9ad9d9bb..81458e44 100644 --- a/src/common/sessionStore.ts +++ b/src/common/sessionStore.ts @@ -1,12 +1,12 @@ import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; -import logger, { LogId, McpLogger } from "./logger.js"; +import logger, { LogId, LoggerBase, McpLogger } from "./logger.js"; import { TimeoutManager } from "./timeoutManager.js"; export class SessionStore { private sessions: { [sessionId: string]: { - mcpServer: McpServer; + logger: LoggerBase; transport: StreamableHTTPServerTransport; abortTimeout: TimeoutManager; notificationTimeout: TimeoutManager; @@ -47,10 +47,14 @@ export class SessionStore { private sendNotification(sessionId: string): void { const session = this.sessions[sessionId]; if (!session) { + logger.warning( + LogId.streamableHttpTransportSessionCloseNotificationFailure, + "sessionStore", + `session ${sessionId} not found, no notification delivered` + ); return; } - const logger = new McpLogger(session.mcpServer); - logger.info( + session.logger.info( LogId.streamableHttpTransportSessionCloseNotification, "sessionStore", "Session is about to be closed due to inactivity" @@ -58,35 +62,38 @@ export class SessionStore { } setSession(sessionId: string, transport: StreamableHTTPServerTransport, mcpServer: McpServer): void { - if (this.sessions[sessionId]) { + const session = this.sessions[sessionId]; + if (session) { throw new Error(`Session ${sessionId} already exists`); } const abortTimeout = new TimeoutManager(async () => { - const logger = new McpLogger(mcpServer); - logger.info( - LogId.streamableHttpTransportSessionCloseNotification, - "sessionStore", - "Session closed due to inactivity" - ); + if (this.sessions[sessionId]) { + this.sessions[sessionId].logger.info( + LogId.streamableHttpTransportSessionCloseNotification, + "sessionStore", + "Session closed due to inactivity" + ); - await this.closeSession(sessionId); + await this.closeSession(sessionId); + } }, this.idleTimeoutMS); const notificationTimeout = new TimeoutManager( () => this.sendNotification(sessionId), this.notificationTimeoutMS ); - this.sessions[sessionId] = { mcpServer, transport, abortTimeout, notificationTimeout }; + this.sessions[sessionId] = { logger: new McpLogger(mcpServer), transport, abortTimeout, notificationTimeout }; } async closeSession(sessionId: string, closeTransport: boolean = true): Promise { - if (!this.sessions[sessionId]) { + const session = this.sessions[sessionId]; + if (!session) { throw new Error(`Session ${sessionId} not found`); } - this.sessions[sessionId].abortTimeout.clear(); - this.sessions[sessionId].notificationTimeout.clear(); + session.abortTimeout.clear(); + session.notificationTimeout.clear(); if (closeTransport) { try { - await this.sessions[sessionId].transport.close(); + await session.transport.close(); } catch (error) { logger.error( LogId.streamableHttpTransportSessionCloseFailure, From 88eb47ae60a407d85d12be1d67ceb6ea0f21add4 Mon Sep 17 00:00:00 2001 From: Filipe Constantinov Menezes Date: Mon, 21 Jul 2025 14:36:57 +0100 Subject: [PATCH 2/2] more comments --- src/common/managedTimeout.ts | 27 ++++++++ src/common/sessionStore.ts | 18 +++--- src/common/timeoutManager.ts | 63 ------------------- ...Manager.test.ts => managedTimeout.test.ts} | 34 ++++------ 4 files changed, 47 insertions(+), 95 deletions(-) create mode 100644 src/common/managedTimeout.ts delete mode 100644 src/common/timeoutManager.ts rename tests/unit/common/{timeoutManager.test.ts => managedTimeout.test.ts} (61%) diff --git a/src/common/managedTimeout.ts b/src/common/managedTimeout.ts new file mode 100644 index 00000000..9309947e --- /dev/null +++ b/src/common/managedTimeout.ts @@ -0,0 +1,27 @@ +export interface ManagedTimeout { + cancel: () => void; + restart: () => void; +} + +export function setManagedTimeout(callback: () => Promise | void, timeoutMS: number): ManagedTimeout { + let timeoutId: NodeJS.Timeout | undefined = setTimeout(() => { + void callback(); + }, timeoutMS); + + function cancel() { + clearTimeout(timeoutId); + timeoutId = undefined; + } + + function restart() { + cancel(); + timeoutId = setTimeout(() => { + void callback(); + }, timeoutMS); + } + + return { + cancel, + restart, + }; +} diff --git a/src/common/sessionStore.ts b/src/common/sessionStore.ts index 81458e44..e37358fc 100644 --- a/src/common/sessionStore.ts +++ b/src/common/sessionStore.ts @@ -1,15 +1,15 @@ import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import logger, { LogId, LoggerBase, McpLogger } from "./logger.js"; -import { TimeoutManager } from "./timeoutManager.js"; +import { ManagedTimeout, setManagedTimeout } from "./managedTimeout.js"; export class SessionStore { private sessions: { [sessionId: string]: { logger: LoggerBase; transport: StreamableHTTPServerTransport; - abortTimeout: TimeoutManager; - notificationTimeout: TimeoutManager; + abortTimeout: ManagedTimeout; + notificationTimeout: ManagedTimeout; }; } = {}; @@ -39,9 +39,9 @@ export class SessionStore { return; } - session.abortTimeout.reset(); + session.abortTimeout.restart(); - session.notificationTimeout.reset(); + session.notificationTimeout.restart(); } private sendNotification(sessionId: string): void { @@ -66,7 +66,7 @@ export class SessionStore { if (session) { throw new Error(`Session ${sessionId} already exists`); } - const abortTimeout = new TimeoutManager(async () => { + const abortTimeout = setManagedTimeout(async () => { if (this.sessions[sessionId]) { this.sessions[sessionId].logger.info( LogId.streamableHttpTransportSessionCloseNotification, @@ -77,7 +77,7 @@ export class SessionStore { await this.closeSession(sessionId); } }, this.idleTimeoutMS); - const notificationTimeout = new TimeoutManager( + const notificationTimeout = setManagedTimeout( () => this.sendNotification(sessionId), this.notificationTimeoutMS ); @@ -89,8 +89,8 @@ export class SessionStore { if (!session) { throw new Error(`Session ${sessionId} not found`); } - session.abortTimeout.clear(); - session.notificationTimeout.clear(); + session.abortTimeout.cancel(); + session.notificationTimeout.cancel(); if (closeTransport) { try { await session.transport.close(); diff --git a/src/common/timeoutManager.ts b/src/common/timeoutManager.ts deleted file mode 100644 index 03161dfc..00000000 --- a/src/common/timeoutManager.ts +++ /dev/null @@ -1,63 +0,0 @@ -/** - * A class that manages timeouts for a callback function. - * It is used to ensure that a callback function is called after a certain amount of time. - * If the callback function is not called after the timeout, it will be called with an error. - */ -export class TimeoutManager { - private timeoutId?: NodeJS.Timeout; - - /** - * A callback function that is called when the timeout is reached. - */ - public onerror?: (error: unknown) => void; - - /** - * Creates a new TimeoutManager. - * @param callback - A callback function that is called when the timeout is reached. - * @param timeoutMS - The timeout in milliseconds. - */ - constructor( - private readonly callback: () => Promise | void, - private readonly timeoutMS: number - ) { - if (timeoutMS <= 0) { - throw new Error("timeoutMS must be greater than 0"); - } - this.reset(); - } - - /** - * Clears the timeout. - */ - clear() { - if (this.timeoutId) { - clearTimeout(this.timeoutId); - this.timeoutId = undefined; - } - } - - /** - * Runs the callback function. - */ - private async runCallback() { - if (this.callback) { - try { - await this.callback(); - } catch (error: unknown) { - this.onerror?.(error); - } - } - } - - /** - * Resets the timeout. - */ - reset() { - this.clear(); - this.timeoutId = setTimeout(() => { - void this.runCallback().finally(() => { - this.timeoutId = undefined; - }); - }, this.timeoutMS); - } -} diff --git a/tests/unit/common/timeoutManager.test.ts b/tests/unit/common/managedTimeout.test.ts similarity index 61% rename from tests/unit/common/timeoutManager.test.ts rename to tests/unit/common/managedTimeout.test.ts index a0cc5b30..d51c4b13 100644 --- a/tests/unit/common/timeoutManager.test.ts +++ b/tests/unit/common/managedTimeout.test.ts @@ -1,7 +1,7 @@ import { afterAll, beforeAll, describe, expect, it, vi } from "vitest"; -import { TimeoutManager } from "../../../src/common/timeoutManager.js"; +import { setManagedTimeout } from "../../../src/common/managedTimeout.js"; -describe("TimeoutManager", () => { +describe("setManagedTimeout", () => { beforeAll(() => { vi.useFakeTimers(); }); @@ -13,7 +13,7 @@ describe("TimeoutManager", () => { it("calls the timeout callback", () => { const callback = vi.fn(); - new TimeoutManager(callback, 1000); + setManagedTimeout(callback, 1000); vi.advanceTimersByTime(1000); expect(callback).toHaveBeenCalled(); @@ -22,10 +22,10 @@ describe("TimeoutManager", () => { it("does not call the timeout callback if the timeout is cleared", () => { const callback = vi.fn(); - const timeoutManager = new TimeoutManager(callback, 1000); + const timeout = setManagedTimeout(callback, 1000); vi.advanceTimersByTime(500); - timeoutManager.clear(); + timeout.cancel(); vi.advanceTimersByTime(500); expect(callback).not.toHaveBeenCalled(); @@ -34,44 +34,32 @@ describe("TimeoutManager", () => { it("does not call the timeout callback if the timeout is reset", () => { const callback = vi.fn(); - const timeoutManager = new TimeoutManager(callback, 1000); + const timeout = setManagedTimeout(callback, 1000); vi.advanceTimersByTime(500); - timeoutManager.reset(); + timeout.restart(); vi.advanceTimersByTime(500); expect(callback).not.toHaveBeenCalled(); }); - it("calls the onerror callback", () => { - const onerrorCallback = vi.fn(); - - const tm = new TimeoutManager(() => { - throw new Error("test"); - }, 1000); - tm.onerror = onerrorCallback; - - vi.advanceTimersByTime(1000); - expect(onerrorCallback).toHaveBeenCalled(); - }); - describe("if timeout is reset", () => { it("does not call the timeout callback within the timeout period", () => { const callback = vi.fn(); - const timeoutManager = new TimeoutManager(callback, 1000); + const timeout = setManagedTimeout(callback, 1000); vi.advanceTimersByTime(500); - timeoutManager.reset(); + timeout.restart(); vi.advanceTimersByTime(500); expect(callback).not.toHaveBeenCalled(); }); it("calls the timeout callback after the timeout period", () => { const callback = vi.fn(); - const timeoutManager = new TimeoutManager(callback, 1000); + const timeout = setManagedTimeout(callback, 1000); vi.advanceTimersByTime(500); - timeoutManager.reset(); + timeout.restart(); vi.advanceTimersByTime(1000); expect(callback).toHaveBeenCalled(); });