Skip to content

refactor: move http server #377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"check:types": "tsc --noEmit --project tsconfig.json",
"reformat": "prettier --write .",
"generate": "./scripts/generate.sh",
"test": "vitest --coverage"
"test": "vitest --run --coverage"
},
"license": "Apache-2.0",
"devDependencies": {
Expand Down
61 changes: 24 additions & 37 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,50 +1,23 @@
#!/usr/bin/env node

import logger, { LogId } from "./common/logger.js";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { config } from "./common/config.js";
import { Session } from "./common/session.js";
import { Server } from "./server.js";
import { packageInfo } from "./common/packageInfo.js";
import { Telemetry } from "./telemetry/telemetry.js";
import { createStdioTransport } from "./transports/stdio.js";
import { createHttpTransport } from "./transports/streamableHttp.js";
import { StdioRunner } from "./transports/stdio.js";
import { StreamableHttpRunner } from "./transports/streamableHttp.js";

try {
const session = new Session({
apiBaseUrl: config.apiBaseUrl,
apiClientId: config.apiClientId,
apiClientSecret: config.apiClientSecret,
});

const transport = config.transport === "stdio" ? createStdioTransport() : await createHttpTransport();

const telemetry = Telemetry.create(session, config);

const mcpServer = new McpServer({
name: packageInfo.mcpServerName,
version: packageInfo.version,
});

const server = new Server({
mcpServer,
session,
telemetry,
userConfig: config,
});
async function main() {
const transportRunner = config.transport === "stdio" ? new StdioRunner() : new StreamableHttpRunner();

const shutdown = () => {
logger.info(LogId.serverCloseRequested, "server", `Server close requested`);

server
transportRunner
.close()
.then(() => {
logger.info(LogId.serverClosed, "server", `Server closed successfully`);
process.exit(0);
})
.catch((err: unknown) => {
const error = err instanceof Error ? err : new Error(String(err));
logger.error(LogId.serverCloseFailure, "server", `Error closing server: ${error.message}`);
.catch((error: unknown) => {
logger.error(LogId.serverCloseFailure, "server", `Error closing server: ${error as string}`);
process.exit(1);
});
};
Expand All @@ -54,8 +27,22 @@ try {
process.once("SIGTERM", shutdown);
process.once("SIGQUIT", shutdown);

await server.connect(transport);
} catch (error: unknown) {
try {
await transportRunner.start();
} catch (error: unknown) {
logger.emergency(LogId.serverStartFailure, "server", `Fatal error running server: ${error as string}`);
try {
await transportRunner.close();
logger.error(LogId.serverClosed, "server", "Server closed");
} catch (error: unknown) {
logger.error(LogId.serverCloseFailure, "server", `Error closing server: ${error as string}`);
} finally {
process.exit(1);
}
}
}

main().catch((error: unknown) => {
logger.emergency(LogId.serverStartFailure, "server", `Fatal error running server: ${error as string}`);
process.exit(1);
}
});
34 changes: 34 additions & 0 deletions src/transports/base.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { config } from "../common/config.js";
import { packageInfo } from "../common/packageInfo.js";
import { Server } from "../server.js";
import { Session } from "../common/session.js";
import { Telemetry } from "../telemetry/telemetry.js";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";

export abstract class TransportRunnerBase {
protected setupServer(): Server {
const session = new Session({
apiBaseUrl: config.apiBaseUrl,
apiClientId: config.apiClientId,
apiClientSecret: config.apiClientSecret,
});

const telemetry = Telemetry.create(session, config);

const mcpServer = new McpServer({
name: packageInfo.mcpServerName,
version: packageInfo.version,
});

return new Server({
mcpServer,
session,
telemetry,
userConfig: config,
});
}

abstract start(): Promise<void>;

abstract close(): Promise<void>;
}
24 changes: 24 additions & 0 deletions src/transports/stdio.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import logger, { LogId } from "../common/logger.js";
import { Server } from "../server.js";
import { TransportRunnerBase } from "./base.js";
import { JSONRPCMessage, JSONRPCMessageSchema } from "@modelcontextprotocol/sdk/types.js";
import { EJSON } from "bson";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
Expand Down Expand Up @@ -45,3 +48,24 @@ export function createStdioTransport(): StdioServerTransport {

return server;
}

export class StdioRunner extends TransportRunnerBase {
private server: Server | undefined;

async start() {
try {
this.server = this.setupServer();

const transport = createStdioTransport();

await this.server.connect(transport);
} catch (error: unknown) {
logger.emergency(LogId.serverStartFailure, "server", `Fatal error running server: ${error as string}`);
process.exit(1);
}
}

async close(): Promise<void> {
await this.server?.close();
}
}
156 changes: 75 additions & 81 deletions src/transports/streamableHttp.ts
Original file line number Diff line number Diff line change
@@ -1,84 +1,92 @@
import express from "express";
import http from "http";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";

import { TransportRunnerBase } from "./base.js";
import { config } from "../common/config.js";
import logger, { LogId } from "../common/logger.js";

const JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED = -32000;
const JSON_RPC_ERROR_CODE_METHOD_NOT_ALLOWED = -32601;

export async function createHttpTransport(): Promise<StreamableHTTPServerTransport> {
const app = express();
app.enable("trust proxy"); // needed for reverse proxy support
app.use(express.urlencoded({ extended: true }));
app.use(express.json());
function promiseHandler(
fn: (req: express.Request, res: express.Response, next: express.NextFunction) => Promise<void>
) {
return (req: express.Request, res: express.Response, next: express.NextFunction) => {
fn(req, res, next).catch(next);
};
}

const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined,
});
export class StreamableHttpRunner extends TransportRunnerBase {
private httpServer: http.Server | undefined;

app.post("/mcp", async (req: express.Request, res: express.Response) => {
try {
await transport.handleRequest(req, res, req.body);
} catch (error) {
logger.error(
LogId.streamableHttpTransportRequestFailure,
"streamableHttpTransport",
`Error handling request: ${error instanceof Error ? error.message : String(error)}`
);
res.status(400).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED,
message: `failed to handle request`,
data: error instanceof Error ? error.message : String(error),
},
});
}
});
async start() {
const app = express();
app.enable("trust proxy"); // needed for reverse proxy support
app.use(express.urlencoded({ extended: true }));
app.use(express.json());

app.post(
"/mcp",
promiseHandler(async (req: express.Request, res: express.Response) => {
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined,
});

const server = this.setupServer();

app.get("/mcp", async (req: express.Request, res: express.Response) => {
try {
await transport.handleRequest(req, res, req.body);
} catch (error) {
logger.error(
LogId.streamableHttpTransportRequestFailure,
"streamableHttpTransport",
`Error handling request: ${error instanceof Error ? error.message : String(error)}`
);
res.status(400).json({
await server.connect(transport);

res.on("close", () => {
Promise.all([transport.close(), server.close()]).catch((error: unknown) => {
logger.error(
LogId.streamableHttpTransportCloseFailure,
"streamableHttpTransport",
`Error closing server: ${error instanceof Error ? error.message : String(error)}`
);
});
});

try {
await transport.handleRequest(req, res, req.body);
} catch (error) {
logger.error(
LogId.streamableHttpTransportRequestFailure,
"streamableHttpTransport",
`Error handling request: ${error instanceof Error ? error.message : String(error)}`
);
res.status(400).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED,
message: `failed to handle request`,
data: error instanceof Error ? error.message : String(error),
},
});
}
})
);

app.get("/mcp", (req: express.Request, res: express.Response) => {
res.status(405).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED,
message: `failed to handle request`,
data: error instanceof Error ? error.message : String(error),
code: JSON_RPC_ERROR_CODE_METHOD_NOT_ALLOWED,
message: `method not allowed`,
},
});
}
});
});

app.delete("/mcp", async (req: express.Request, res: express.Response) => {
try {
await transport.handleRequest(req, res, req.body);
} catch (error) {
logger.error(
LogId.streamableHttpTransportRequestFailure,
"streamableHttpTransport",
`Error handling request: ${error instanceof Error ? error.message : String(error)}`
);
res.status(400).json({
app.delete("/mcp", (req: express.Request, res: express.Response) => {
res.status(405).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED,
message: `failed to handle request`,
data: error instanceof Error ? error.message : String(error),
code: JSON_RPC_ERROR_CODE_METHOD_NOT_ALLOWED,
message: `method not allowed`,
},
});
}
});
});

try {
const server = await new Promise<http.Server>((resolve, reject) => {
this.httpServer = await new Promise<http.Server>((resolve, reject) => {
const result = app.listen(config.httpPort, config.httpHost, (err?: Error) => {
if (err) {
reject(err);
Expand All @@ -93,31 +101,17 @@ export async function createHttpTransport(): Promise<StreamableHTTPServerTranspo
"streamableHttpTransport",
`Server started on http://${config.httpHost}:${config.httpPort}`
);
}

transport.onclose = () => {
logger.info(LogId.streamableHttpTransportCloseRequested, "streamableHttpTransport", `Closing server`);
server.close((err?: Error) => {
async close(): Promise<void> {
await new Promise<void>((resolve, reject) => {
this.httpServer?.close((err) => {
if (err) {
logger.error(
LogId.streamableHttpTransportCloseFailure,
"streamableHttpTransport",
`Error closing server: ${err.message}`
);
reject(err);
return;
}
logger.info(LogId.streamableHttpTransportCloseSuccess, "streamableHttpTransport", `Server closed`);
resolve();
});
};

return transport;
} catch (error: unknown) {
const err = error instanceof Error ? error : new Error(String(error));
logger.info(
LogId.streamableHttpTransportStartFailure,
"streamableHttpTransport",
`Error starting server: ${err.message}`
);

throw err;
});
}
}
Loading
Loading