diff --git a/package-lock.json b/package-lock.json index c42a99ec6..d64ebb2ea 100644 --- a/package-lock.json +++ b/package-lock.json @@ -30,7 +30,7 @@ "@types/nock": "^10.0.3", "@types/node": "^14.18.24", "@types/node-fetch": "^3.0.3", - "@types/sinon": "^7.0.13", + "@types/sinon": "^9.0.11", "@typescript-eslint/eslint-plugin": "^5.33.1", "@typescript-eslint/parser": "^5.33.1", "api-extractor-model-me": "^0.1.1", @@ -56,7 +56,7 @@ "prettier": "^2.7.1", "protobufjs-cli": "^1.1.1", "semver": "^7.3.5", - "sinon": "^7.3.2", + "sinon": "^9.2.4", "ts-node": "^10.4.0", "typescript": "^4.3.5", "yargs": "^15.3.1" @@ -1043,31 +1043,30 @@ "type-detect": "4.0.8" } }, - "node_modules/@sinonjs/formatio": { - "version": "3.2.2", - "resolved": "https://registry.npmjs.org/@sinonjs/formatio/-/formatio-3.2.2.tgz", - "integrity": "sha512-B8SEsgd8gArBLMD6zpRw3juQ2FVSsmdd7qlevyDqzS9WTCtvF55/gAL+h6gue8ZvPYcdiPdvueM/qm//9XzyTQ==", + "node_modules/@sinonjs/fake-timers": { + "version": "6.0.1", + "resolved": "https://registry.npmjs.org/@sinonjs/fake-timers/-/fake-timers-6.0.1.tgz", + "integrity": "sha512-MZPUxrmFubI36XS1DI3qmI0YdN1gks62JtFZvxR67ljjSNCeK6U08Zx4msEWOXuofgqUt6zPHSi1H9fbjR/NRA==", "dev": true, "dependencies": { - "@sinonjs/commons": "^1", - "@sinonjs/samsam": "^3.1.0" + "@sinonjs/commons": "^1.7.0" } }, "node_modules/@sinonjs/samsam": { - "version": "3.3.3", - "resolved": "https://registry.npmjs.org/@sinonjs/samsam/-/samsam-3.3.3.tgz", - "integrity": "sha512-bKCMKZvWIjYD0BLGnNrxVuw4dkWCYsLqFOUWw8VgKF/+5Y+mE7LfHWPIYoDXowH+3a9LsWDMo0uAP8YDosPvHQ==", + "version": "5.3.1", + "resolved": "https://registry.npmjs.org/@sinonjs/samsam/-/samsam-5.3.1.tgz", + "integrity": "sha512-1Hc0b1TtyfBu8ixF/tpfSHTVWKwCBLY4QJbkgnE7HcwyvT2xArDxb4K7dMgqRm3szI+LJbzmW/s4xxEhv6hwDg==", "dev": true, "dependencies": { - "@sinonjs/commons": "^1.3.0", - "array-from": "^2.1.1", - "lodash": "^4.17.15" + "@sinonjs/commons": "^1.6.0", + "lodash.get": "^4.4.2", + "type-detect": "^4.0.8" } }, "node_modules/@sinonjs/text-encoding": { - "version": "0.7.2", - "resolved": "https://registry.npmjs.org/@sinonjs/text-encoding/-/text-encoding-0.7.2.tgz", - "integrity": "sha512-sXXKG+uL9IrKqViTtao2Ws6dy0znu9sOaP1di/jKGW1M6VssO8vlpXCQcpZ+jisQ1tTFAC5Jo/EOzFbggBagFQ==", + "version": "0.7.3", + "resolved": "https://registry.npmjs.org/@sinonjs/text-encoding/-/text-encoding-0.7.3.tgz", + "integrity": "sha512-DE427ROAphMQzU4ENbliGYrBSYPXF+TtLg9S8vzeA+OF4ZKzoDdzfL8sxuMUGS/lgRhM6j1URSk9ghf7Xo1tyA==", "dev": true }, "node_modules/@tootallnate/once": { @@ -1320,9 +1319,18 @@ } }, "node_modules/@types/sinon": { - "version": "7.5.2", - "resolved": "https://registry.npmjs.org/@types/sinon/-/sinon-7.5.2.tgz", - "integrity": "sha512-T+m89VdXj/eidZyejvmoP9jivXgBDdkOSBVQjU9kF349NEx10QdPNGxHeZUaj1IlJ32/ewdyXJjnJxyxJroYwg==", + "version": "9.0.11", + "resolved": "https://registry.npmjs.org/@types/sinon/-/sinon-9.0.11.tgz", + "integrity": "sha512-PwP4UY33SeeVKodNE37ZlOsR9cReypbMJOhZ7BVE0lB+Hix3efCOxiJWiE5Ia+yL9Cn2Ch72EjFTRze8RZsNtg==", + "dev": true, + "dependencies": { + "@types/sinonjs__fake-timers": "*" + } + }, + "node_modules/@types/sinonjs__fake-timers": { + "version": "8.1.5", + "resolved": "https://registry.npmjs.org/@types/sinonjs__fake-timers/-/sinonjs__fake-timers-8.1.5.tgz", + "integrity": "sha512-mQkU2jY8jJEF7YHjHvsQO8+3ughTL1mcnn96igfhONmR+fUPSKIkefQYpSe8bsly2Ep7oQbn/6VG5/9/0qcArQ==", "dev": true }, "node_modules/@types/tough-cookie": { @@ -1790,12 +1798,6 @@ "resolved": "https://registry.npmjs.org/array-flatten/-/array-flatten-1.1.1.tgz", "integrity": "sha512-PCVAQswWemu6UdxsDFFX/+gVeYqKAod3D3UVm91jHwynguOwAvYPhx8nNlM++NqRcK6CxxpUafjmhIdKiHibqg==" }, - "node_modules/array-from": { - "version": "2.1.1", - "resolved": "https://registry.npmjs.org/array-from/-/array-from-2.1.1.tgz", - "integrity": "sha512-GQTc6Uupx1FCavi5mPzBvVT7nEOeWMmUA9P95wpfpW1XwMSKs+KaymD5C2Up7KAUKg/mYwbsUYzdZWcoajlNZg==", - "dev": true - }, "node_modules/array-union": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/array-union/-/array-union-2.1.0.tgz", @@ -4397,12 +4399,6 @@ "url": "https://github.com/sponsors/sindresorhus" } }, - "node_modules/lolex": { - "version": "4.2.0", - "resolved": "https://registry.npmjs.org/lolex/-/lolex-4.2.0.tgz", - "integrity": "sha512-gKO5uExCXvSm6zbF562EvM+rd1kQDnB9AZBbiQVzf1ZmdDpxUSvpnAaVOP83N/31mRK8Ml8/VE8DMvsAZQ+7wg==", - "dev": true - }, "node_modules/long": { "version": "5.2.3", "resolved": "https://registry.npmjs.org/long/-/long-5.2.3.tgz", @@ -4873,31 +4869,22 @@ } }, "node_modules/nise": { - "version": "1.5.3", - "resolved": "https://registry.npmjs.org/nise/-/nise-1.5.3.tgz", - "integrity": "sha512-Ymbac/94xeIrMf59REBPOv0thr+CJVFMhrlAkW/gjCIE58BGQdCj0x7KRCb3yz+Ga2Rz3E9XXSvUyyxqqhjQAQ==", + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/nise/-/nise-4.1.0.tgz", + "integrity": "sha512-eQMEmGN/8arp0xsvGoQ+B1qvSkR73B1nWSCh7nOt5neMCtwcQVYQGdzQMhcNscktTsWB54xnlSQFzOAPJD8nXA==", "dev": true, "dependencies": { - "@sinonjs/formatio": "^3.2.1", + "@sinonjs/commons": "^1.7.0", + "@sinonjs/fake-timers": "^6.0.0", "@sinonjs/text-encoding": "^0.7.1", "just-extend": "^4.0.2", - "lolex": "^5.0.1", "path-to-regexp": "^1.7.0" } }, - "node_modules/nise/node_modules/lolex": { - "version": "5.1.2", - "resolved": "https://registry.npmjs.org/lolex/-/lolex-5.1.2.tgz", - "integrity": "sha512-h4hmjAvHTmd+25JSwrtTIuwbKdwg5NzZVRMLn9saij4SZaepCrTCxPr35H/3bjwfMJtN+t3CX8672UIkglz28A==", - "dev": true, - "dependencies": { - "@sinonjs/commons": "^1.7.0" - } - }, "node_modules/nise/node_modules/path-to-regexp": { - "version": "1.8.0", - "resolved": "https://registry.npmjs.org/path-to-regexp/-/path-to-regexp-1.8.0.tgz", - "integrity": "sha512-n43JRhlUKUAlibEJhPeir1ncUID16QnEjNpwzNdO3Lm4ywrBpBZ5oLD0I6br9evr1Y9JTqwRtAh7JLoOzAQdVA==", + "version": "1.9.0", + "resolved": "https://registry.npmjs.org/path-to-regexp/-/path-to-regexp-1.9.0.tgz", + "integrity": "sha512-xIp7/apCFJuUHdDLWe8O1HIkb0kQrOMb/0u6FXQjemHn/ii5LrIzU6bdECnsiTF/GjZkMEKg1xdiZwNqDYlZ6g==", "dev": true, "dependencies": { "isarray": "0.0.1" @@ -5945,50 +5932,33 @@ } }, "node_modules/sinon": { - "version": "7.5.0", - "resolved": "https://registry.npmjs.org/sinon/-/sinon-7.5.0.tgz", - "integrity": "sha512-AoD0oJWerp0/rY9czP/D6hDTTUYGpObhZjMpd7Cl/A6+j0xBE+ayL/ldfggkBXUs0IkvIiM1ljM8+WkOc5k78Q==", + "version": "9.2.4", + "resolved": "https://registry.npmjs.org/sinon/-/sinon-9.2.4.tgz", + "integrity": "sha512-zljcULZQsJxVra28qIAL6ow1Z9tpattkCTEJR4RBP3TGc00FcttsP5pK284Nas5WjMZU5Yzy3kAIp3B3KRf5Yg==", + "deprecated": "16.1.1", "dev": true, "dependencies": { - "@sinonjs/commons": "^1.4.0", - "@sinonjs/formatio": "^3.2.1", - "@sinonjs/samsam": "^3.3.3", - "diff": "^3.5.0", - "lolex": "^4.2.0", - "nise": "^1.5.2", - "supports-color": "^5.5.0" + "@sinonjs/commons": "^1.8.1", + "@sinonjs/fake-timers": "^6.0.1", + "@sinonjs/samsam": "^5.3.1", + "diff": "^4.0.2", + "nise": "^4.0.4", + "supports-color": "^7.1.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/sinon" } }, "node_modules/sinon/node_modules/diff": { - "version": "3.5.0", - "resolved": "https://registry.npmjs.org/diff/-/diff-3.5.0.tgz", - "integrity": "sha512-A46qtFgd+g7pDZinpnwiRJtxbC1hpgf0uzP3iG89scHk0AUC7A1TGxf5OiiOUv/JMZR8GOt8hL900hV0bOy5xA==", + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz", + "integrity": "sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==", "dev": true, "engines": { "node": ">=0.3.1" } }, - "node_modules/sinon/node_modules/has-flag": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/has-flag/-/has-flag-3.0.0.tgz", - "integrity": "sha512-sKJf1+ceQBr4SMkvQnBDNDtf4TXpVhVGateu0t918bl30FnbE2m4vNLX+VWe/dpjlb+HugGYzW7uQXH98HPEYw==", - "dev": true, - "engines": { - "node": ">=4" - } - }, - "node_modules/sinon/node_modules/supports-color": { - "version": "5.5.0", - "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-5.5.0.tgz", - "integrity": "sha512-QjVjwdXIt408MIiAqCX4oUKsgU2EqAGzs2Ppkm4aQYbjm+ZEWEcW4SfFNTr4uMNZma0ey4f5lgLrkB0aX0QMow==", - "dev": true, - "dependencies": { - "has-flag": "^3.0.0" - }, - "engines": { - "node": ">=4" - } - }, "node_modules/slash": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/slash/-/slash-3.0.0.tgz", diff --git a/package.json b/package.json index dc45b8819..d7a6c0eb7 100644 --- a/package.json +++ b/package.json @@ -287,7 +287,7 @@ "@types/nock": "^10.0.3", "@types/node": "^14.18.24", "@types/node-fetch": "^3.0.3", - "@types/sinon": "^7.0.13", + "@types/sinon": "^9.0.11", "@typescript-eslint/eslint-plugin": "^5.33.1", "@typescript-eslint/parser": "^5.33.1", "api-extractor-model-me": "^0.1.1", @@ -313,7 +313,7 @@ "prettier": "^2.7.1", "protobufjs-cli": "^1.1.1", "semver": "^7.3.5", - "sinon": "^7.3.2", + "sinon": "^9.2.4", "ts-node": "^10.4.0", "typescript": "^4.3.5", "yargs": "^15.3.1" diff --git a/spec/common/providers/https.spec.ts b/spec/common/providers/https.spec.ts index 55f698575..ef77a6fa6 100644 --- a/spec/common/providers/https.spec.ts +++ b/spec/common/providers/https.spec.ts @@ -764,7 +764,7 @@ describe("onCallHandler", () => { "application/json", {}, { accept: "text/event-stream" } - ) as any; + ); const fn = https.onCallHandler( { cors: { origin: true, methods: "POST" }, @@ -776,7 +776,7 @@ describe("onCallHandler", () => { "gcfv2" ); - const resp = await runHandler(fn, mockReq); + const resp = await runHandler(fn, mockReq as any); const data = [`data: {"message":"hello"}`, `data: {"result":"world"}`]; expect(resp.body).to.equal([...data, ""].join("\n")); }); @@ -787,7 +787,7 @@ describe("onCallHandler", () => { "application/json", {}, { accept: "text/event-stream" } - ) as any; + ); const fn = https.onCallHandler( { cors: { origin: true, methods: "POST" }, @@ -798,7 +798,7 @@ describe("onCallHandler", () => { "gcfv2" ); - const resp = await runHandler(fn, mockReq); + const resp = await runHandler(fn, mockReq as any); const data = [`data: {"error":{"message":"INTERNAL","status":"INTERNAL"}}`]; expect(resp.body).to.equal([...data, ""].join("\n")); }); @@ -827,6 +827,127 @@ describe("onCallHandler", () => { }, }); }); + + it("stops processing when client disconnects", async () => { + const mockReq = mockRequest( + { message: "test abort" }, + "application/json", + {}, + { accept: "text/event-stream" } + ) as any; + + const fn = https.onCallHandler( + { + cors: { origin: true, methods: "POST" }, + }, + (req, resp) => { + resp.write("initial message"); + mockReq.emit("close"); + resp.write("should not be sent"); + return "done"; + }, + "gcfv2" + ); + + const resp = await runHandler(fn, mockReq); + + expect(resp.body).to.equal(`data: {"message":"initial message"}\n`); + }); + + describe("Heartbeats", () => { + let clock: sinon.SinonFakeTimers; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + clock.restore(); + }); + + it("sends heartbeat messages at specified interval", async () => { + const mockReq = mockRequest( + { message: "test heartbeat" }, + "application/json", + {}, + { accept: "text/event-stream" } + ); + + const fn = https.onCallHandler( + { + cors: { origin: true, methods: "POST" }, + heartbeatSeconds: 5, + }, + async () => { + // Simulate long-running operation + await new Promise((resolve) => setTimeout(resolve, 11_000)); + return "done"; + }, + "gcfv2" + ); + + const handlerPromise = runHandler(fn, mockReq as any); + await clock.tickAsync(11_000); + const resp = await handlerPromise; + const data = [": ping", ": ping", `data: {"result":"done"}`]; + expect(resp.body).to.equal([...data, ""].join("\n")); + }); + + it("doesn't send heartbeat messages if user writes data", async () => { + const mockReq = mockRequest( + { message: "test heartbeat" }, + "application/json", + {}, + { accept: "text/event-stream" } + ); + + const fn = https.onCallHandler( + { + cors: { origin: true, methods: "POST" }, + heartbeatSeconds: 5, + }, + async (resp, res) => { + await new Promise((resolve) => setTimeout(resolve, 3_000)); + res.write("hello"); + await new Promise((resolve) => setTimeout(resolve, 3_000)); + return "done"; + }, + "gcfv2" + ); + + const handlerPromise = runHandler(fn, mockReq as any); + await clock.tickAsync(10_000); + const resp = await handlerPromise; + const data = [`data: {"message":"hello"}`, `data: {"result":"done"}`]; + expect(resp.body).to.equal([...data, ""].join("\n")); + }); + + it("respects null heartbeatSeconds option", async () => { + const mockReq = mockRequest( + { message: "test no heartbeat" }, + "application/json", + {}, + { accept: "text/event-stream" } + ); + + const fn = https.onCallHandler( + { + cors: { origin: true, methods: "POST" }, + heartbeatSeconds: null, + }, + async () => { + await new Promise((resolve) => setTimeout(resolve, 31_000)); + return "done"; + }, + "gcfv2" + ); + + const handlerPromise = runHandler(fn, mockReq as any); + await clock.tickAsync(31_000); + const resp = await handlerPromise; + expect(resp.body).to.equal('data: {"result":"done"}\n'); + }); + }); }); }); diff --git a/spec/fixtures/mockrequest.ts b/spec/fixtures/mockrequest.ts index c85ea36ed..c27f8e2cd 100644 --- a/spec/fixtures/mockrequest.ts +++ b/spec/fixtures/mockrequest.ts @@ -1,3 +1,5 @@ +import { EventEmitter } from 'node:stream'; + import * as jwt from 'jsonwebtoken'; import * as jwkToPem from 'jwk-to-pem'; import * as nock from 'nock'; @@ -5,14 +7,14 @@ import * as mockJWK from '../fixtures/credential/jwk.json'; import * as mockKey from '../fixtures/credential/key.json'; // MockRequest mocks an https.Request. -export class MockRequest { +export class MockRequest extends EventEmitter { public method: 'POST' | 'GET' | 'OPTIONS' = 'POST'; constructor( readonly body: any, readonly headers: { [name: string]: string } ) { - // This block intentionally left blank. + super() } public header(name: string): string { diff --git a/spec/helper.ts b/spec/helper.ts index 544061b0b..5c8ca0c76 100644 --- a/spec/helper.ts +++ b/spec/helper.ts @@ -52,6 +52,10 @@ export function runHandler( private headers: { [name: string]: string } = {}; private callback: () => void; + constructor() { + request.on("close", () => this.end()); + } + public status(code: number) { this.statusCode = code; return this; @@ -82,6 +86,7 @@ export function runHandler( public write(writeBody: any) { this.sentBody += typeof writeBody === "object" ? JSON.stringify(writeBody) : writeBody; + return true; } public end() { @@ -89,8 +94,8 @@ export function runHandler( } public on(event: string, callback: () => void) { - if (event !== "finish") { - throw new Error("MockResponse only implements the finish event"); + if (event !== "finish" && event !== "close") { + throw new Error("MockResponse only implements close and finish event"); } this.callback = callback; } diff --git a/src/common/providers/https.ts b/src/common/providers/https.ts index d798a2579..83eaba433 100644 --- a/src/common/providers/https.ts +++ b/src/common/providers/https.ts @@ -40,6 +40,8 @@ const JWT_REGEX = /^[a-zA-Z0-9\-_=]+?\.[a-zA-Z0-9\-_=]+?\.([a-zA-Z0-9\-_=]+)?$/; export const CALLABLE_AUTH_HEADER = "x-callable-context-auth"; /** @internal */ export const ORIGINAL_AUTH_HEADER = "x-original-auth"; +/** @internal */ +export const DEFAULT_HEARTBEAT_SECONDS = 30; /** An express request with the wire format representation of the request body. */ export interface Request extends express.Request { @@ -146,8 +148,21 @@ export interface CallableRequest { * to allow writing partial, streaming responses back to the client. */ export interface CallableProxyResponse { + /** + * Writes a chunk of the response body to the client. This method can be called + * multiple times to stream data progressively. + */ write: express.Response["write"]; + /** + * Indicates whether the client has requested and can handle streaming responses. + * This should be checked before attempting to stream data to avoid compatibility issues. + */ acceptsStreaming: boolean; + /** + * An AbortSignal that is triggered when the client disconnects or the + * request is terminated prematurely. + */ + signal: AbortSignal; } /** @@ -692,6 +707,13 @@ export interface CallableOptions { cors: cors.CorsOptions; enforceAppCheck?: boolean; consumeAppCheckToken?: boolean; + /** + * Time in seconds between sending heartbeat messages to keep the connection + * alive. Set to `null` to disable heartbeats. + * + * Defaults to 30 seconds. + */ + heartbeatSeconds?: number | null; } /** @internal */ @@ -722,6 +744,36 @@ function wrapOnCallHandler( version: "gcfv1" | "gcfv2" ): (req: Request, res: express.Response) => Promise { return async (req: Request, res: express.Response): Promise => { + const abortController = new AbortController(); + let heartbeatInterval: NodeJS.Timeout | null = null; + + const heartbeatSeconds = + options.heartbeatSeconds === undefined ? DEFAULT_HEARTBEAT_SECONDS : options.heartbeatSeconds; + + const clearScheduledHeartbeat = () => { + if (heartbeatInterval) { + clearTimeout(heartbeatInterval); + heartbeatInterval = null; + } + }; + + const scheduleHeartbeat = () => { + clearScheduledHeartbeat(); + if (!abortController.signal.aborted) { + heartbeatInterval = setTimeout(() => { + if (!abortController.signal.aborted) { + res.write(": ping\n"); + scheduleHeartbeat(); + } + }, heartbeatSeconds * 1000); + } + }; + + res.on("close", () => { + clearScheduledHeartbeat(); + abortController.abort(); + }); + try { if (!isValidRequest(req)) { logger.error("Invalid request, unable to process."); @@ -797,52 +849,75 @@ function wrapOnCallHandler( ...context, data, }; - // TODO: set up optional heartbeat + const responseProxy: CallableProxyResponse = { write(chunk): boolean { - if (acceptsStreaming) { - const formattedData = encodeSSE({ message: chunk }); - return res.write(formattedData); - } // if client doesn't accept sse-protocol, response.write() is no-op. + if (!acceptsStreaming) { + return false; + } + // if connection is already closed, response.write() is no-op. + if (abortController.signal.aborted) { + return false; + } + const formattedData = encodeSSE({ message: chunk }); + const wrote = res.write(formattedData); + // Reset heartbeat timer after successful write + if (wrote && heartbeatInterval !== null && heartbeatSeconds > 0) { + scheduleHeartbeat(); + } + return wrote; }, acceptsStreaming, + signal: abortController.signal, }; if (acceptsStreaming) { // SSE always responds with 200 res.status(200); + + if (heartbeatSeconds !== null && heartbeatSeconds > 0) { + scheduleHeartbeat(); + } } // For some reason the type system isn't picking up that the handler // is a one argument function. result = await (handler as any)(arg, responseProxy); + clearScheduledHeartbeat(); } - - // Encode the result as JSON to preserve types like Dates. - result = encode(result); - - // If there was some result, encode it in the body. - const responseBody: HttpResponseBody = { result }; - if (acceptsStreaming) { - res.write(encodeSSE(responseBody)); - res.end(); + if (!abortController.signal.aborted) { + // Encode the result as JSON to preserve types like Dates. + result = encode(result); + // If there was some result, encode it in the body. + const responseBody: HttpResponseBody = { result }; + if (acceptsStreaming) { + res.write(encodeSSE(responseBody)); + res.end(); + } else { + res.status(200).send(responseBody); + } } else { - res.status(200).send(responseBody); + res.end(); } } catch (err) { - let httpErr = err; - if (!(err instanceof HttpsError)) { - // This doesn't count as an 'explicit' error. - logger.error("Unhandled error", err); - httpErr = new HttpsError("internal", "INTERNAL"); - } - - const { status } = httpErr.httpErrorCode; - const body = { error: httpErr.toJSON() }; - if (version === "gcfv2" && req.header("accept") === "text/event-stream") { - res.send(encodeSSE(body)); + if (!abortController.signal.aborted) { + let httpErr = err; + if (!(err instanceof HttpsError)) { + // This doesn't count as an 'explicit' error. + logger.error("Unhandled error", err); + httpErr = new HttpsError("internal", "INTERNAL"); + } + const { status } = httpErr.httpErrorCode; + const body = { error: httpErr.toJSON() }; + if (version === "gcfv2" && req.header("accept") === "text/event-stream") { + res.send(encodeSSE(body)); + } else { + res.status(status).send(body); + } } else { - res.status(status).send(body); + res.end(); } + } finally { + clearScheduledHeartbeat(); } }; } diff --git a/src/v2/providers/https.ts b/src/v2/providers/https.ts index 0c5fd2ba0..321c31765 100644 --- a/src/v2/providers/https.ts +++ b/src/v2/providers/https.ts @@ -198,6 +198,14 @@ export interface CallableOptions extends HttpsOptions { * further decisions, such as requiring additional security checks or rejecting the request. */ consumeAppCheckToken?: boolean; + + /** + * Time in seconds between sending heartbeat messages to keep the connection + * alive. Set to `null` to disable heartbeats. + * + * Defaults to 30 seconds. + */ + heartbeatSeconds?: number | null; } /** @@ -387,6 +395,7 @@ export function onCall>( cors: { origin, methods: "POST" }, enforceAppCheck: opts.enforceAppCheck ?? options.getGlobalOptions().enforceAppCheck, consumeAppCheckToken: opts.consumeAppCheckToken, + heartbeatSeconds: opts.heartbeatSeconds, }, fixedLen, "gcfv2"