From 8e22fe7edfb3978fdd79ec441e179f5b413d8df5 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Fri, 13 Dec 2024 13:33:23 -0800 Subject: [PATCH 1/5] Update streaming callable API --- spec/common/providers/https.spec.ts | 12 ++--- spec/helper.ts | 4 +- src/common/providers/https.ts | 75 ++++++++++++++++++----------- src/v2/providers/https.ts | 39 ++++++++++----- 4 files changed, 83 insertions(+), 47 deletions(-) diff --git a/spec/common/providers/https.spec.ts b/spec/common/providers/https.spec.ts index ef77a6fa6..86e1b8696 100644 --- a/spec/common/providers/https.spec.ts +++ b/spec/common/providers/https.spec.ts @@ -770,7 +770,7 @@ describe("onCallHandler", () => { cors: { origin: true, methods: "POST" }, }, (req, resp) => { - resp.write("hello"); + resp.sendChunk("hello"); return "world"; }, "gcfv2" @@ -840,10 +840,10 @@ describe("onCallHandler", () => { { cors: { origin: true, methods: "POST" }, }, - (req, resp) => { - resp.write("initial message"); - mockReq.emit("close"); - resp.write("should not be sent"); + async (req, resp) => { + await resp.sendChunk("initial message"); + await mockReq.emit("close"); + await resp.sendChunk("should not be sent"); return "done"; }, "gcfv2" @@ -908,7 +908,7 @@ describe("onCallHandler", () => { }, async (resp, res) => { await new Promise((resolve) => setTimeout(resolve, 3_000)); - res.write("hello"); + res.sendChunk("hello"); await new Promise((resolve) => setTimeout(resolve, 3_000)); return "done"; }, diff --git a/spec/helper.ts b/spec/helper.ts index 5c8ca0c76..8415c245a 100644 --- a/spec/helper.ts +++ b/spec/helper.ts @@ -84,8 +84,10 @@ export function runHandler( } } - public write(writeBody: any) { + public write(writeBody: any, cb: () => void = () => {}) { this.sentBody += typeof writeBody === "object" ? JSON.stringify(writeBody) : writeBody; + // N.B. setImmediate breaks sinon. + setImmediate(cb); return true; } diff --git a/src/common/providers/https.ts b/src/common/providers/https.ts index 24300cf9d..1e8016ab9 100644 --- a/src/common/providers/https.ts +++ b/src/common/providers/https.ts @@ -141,23 +141,30 @@ export interface CallableRequest { * The raw request handled by the callable. */ rawRequest: Request; + + /** + * Whether this is a streaming request. + * Code can be optimized by not trying to generate a stream of chunks to + * call response.sendChunk on if request.acceptsStreaming is false. + * It is always safe, however, to call response.sendChunk as this will + * noop if acceptsStreaming is false. + */ + acceptsStreaming: boolean; } /** - * CallableProxyResponse exposes subset of express.Response object - * to allow writing partial, streaming responses back to the client. + * CallableProxyResponse allows streaming response chunks and listening to signals + * triggered in events such as a disconnect. */ -export interface CallableProxyResponse { +export interface CallableResponse { /** * Writes a chunk of the response body to the client. This method can be called * multiple times to stream data progressively. + * Returns a promise of whether the data was written. This can be false, for example, + * if the request was not a streaming request. Rejects if there is a network error. */ - write: express.Response["write"]; - /** - * Indicates whether the client has requested and can handle streaming responses. - * This should be checked before attempting to stream data to avoid compatibility issues. - */ - acceptsStreaming: boolean; + sendChunk: (chunk: T) => Promise; + /** * An AbortSignal that is triggered when the client disconnects or the * request is terminated prematurely. @@ -586,13 +593,9 @@ async function checkTokens( auth: "INVALID", }; - await Promise.all([ - Promise.resolve().then(async () => { - verifications.auth = await checkAuthToken(req, ctx); - }), - Promise.resolve().then(async () => { - verifications.app = await checkAppCheckToken(req, ctx, options); - }), + [verifications.auth, verifications.app] = await Promise.all([ + checkAuthToken(req, ctx), + checkAppCheckToken(req, ctx, options), ]); const logPayload = { @@ -697,9 +700,9 @@ async function checkAppCheckToken( } type v1CallableHandler = (data: any, context: CallableContext) => any | Promise; -type v2CallableHandler = ( +type v2CallableHandler = ( request: CallableRequest, - response?: CallableProxyResponse + response?: CallableResponse ) => Res; /** @internal **/ @@ -718,9 +721,9 @@ export interface CallableOptions { } /** @internal */ -export function onCallHandler( +export function onCallHandler( options: CallableOptions, - handler: v1CallableHandler | v2CallableHandler, + handler: v1CallableHandler | v2CallableHandler, version: "gcfv1" | "gcfv2" ): (req: Request, res: express.Response) => Promise { const wrapped = wrapOnCallHandler(options, handler, version); @@ -739,9 +742,9 @@ function encodeSSE(data: unknown): string { } /** @internal */ -function wrapOnCallHandler( +function wrapOnCallHandler( options: CallableOptions, - handler: v1CallableHandler | v2CallableHandler, + handler: v1CallableHandler | v2CallableHandler, version: "gcfv1" | "gcfv2" ): (req: Request, res: express.Response) => Promise { return async (req: Request, res: express.Response): Promise => { @@ -855,27 +858,41 @@ function wrapOnCallHandler( const arg: CallableRequest = { ...context, data, + acceptsStreaming, }; - const responseProxy: CallableProxyResponse = { - write(chunk): boolean { + const responseProxy: CallableResponse = { + sendChunk(chunk: Stream): Promise { // if client doesn't accept sse-protocol, response.write() is no-op. if (!acceptsStreaming) { - return false; + return Promise.resolve(false); } // if connection is already closed, response.write() is no-op. if (abortController.signal.aborted) { - return false; + return Promise.resolve(false); } const formattedData = encodeSSE({ message: chunk }); - const wrote = res.write(formattedData); + let resolve: (wrote: boolean) => void; + let reject: (err: Error) => void; + const p = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + const wrote = res.write(formattedData, (error) => { + if (error) { + reject(error); + return; + } + resolve(wrote); + }); + // Reset heartbeat timer after successful write if (wrote && heartbeatInterval !== null && heartbeatSeconds > 0) { scheduleHeartbeat(); } - return wrote; + + return p; }, - acceptsStreaming, signal: abortController.signal, }; if (acceptsStreaming) { diff --git a/src/v2/providers/https.ts b/src/v2/providers/https.ts index 0a5b3e8c3..204449f7a 100644 --- a/src/v2/providers/https.ts +++ b/src/v2/providers/https.ts @@ -33,7 +33,7 @@ import { isDebugFeatureEnabled } from "../../common/debug"; import { ResetValue } from "../../common/options"; import { CallableRequest, - CallableProxyResponse, + CallableResponse, FunctionsErrorCode, HttpsError, onCallHandler, @@ -258,12 +258,17 @@ export type HttpsFunction = (( /** * Creates a callable method for clients to call using a Firebase SDK. */ -export interface CallableFunction extends HttpsFunction { +export interface CallableFunction extends HttpsFunction { /** Executes the handler function with the provided data as input. Used for unit testing. * @param data - An input for the handler function. * @returns The output of the handler function. */ - run(data: CallableRequest): Return; + run(request: CallableRequest): Return; + + stream( + request: CallableRequest, + response: CallableResponse + ): { stream: AsyncIterator; output: Return }; } /** @@ -387,22 +392,22 @@ export function onRequest( * @param handler - A function that takes a {@link https.CallableRequest}. * @returns A function that you can export and deploy. */ -export function onCall>( +export function onCall, Stream = string>( opts: CallableOptions, - handler: (request: CallableRequest, response?: CallableProxyResponse) => Return -): CallableFunction ? Return : Promise>; + handler: (request: CallableRequest, response?: CallableResponse) => Return +): CallableFunction ? Return : Promise, Stream>; /** * Declares a callable method for clients to call using a Firebase SDK. * @param handler - A function that takes a {@link https.CallableRequest}. * @returns A function that you can export and deploy. */ -export function onCall>( - handler: (request: CallableRequest, response?: CallableProxyResponse) => Return +export function onCall, Stream = string>( + handler: (request: CallableRequest, response?: CallableResponse) => Return ): CallableFunction ? Return : Promise>; -export function onCall>( +export function onCall, Stream = string>( optsOrHandler: CallableOptions | ((request: CallableRequest) => Return), - handler?: (request: CallableRequest, response?: CallableProxyResponse) => Return + handler?: (request: CallableRequest, response?: CallableResponse) => Return ): CallableFunction ? Return : Promise> { let opts: CallableOptions; if (arguments.length === 1) { @@ -421,7 +426,8 @@ export function onCall>( } // fix the length of handler to make the call to handler consistent - const fixedLen = (req: CallableRequest, resp?: CallableProxyResponse) => handler(req, resp); + const fixedLen = (req: CallableRequest, resp?: CallableResponse) => + handler(req, resp); let func: any = onCallHandler( { cors: { origin, methods: "POST" }, @@ -474,6 +480,17 @@ export function onCall>( callableTrigger: {}, }; + // TODO: in the next major version, do auth/appcheck in these helper methods too. func.run = withInit(handler); + func.stream = () => { + return { + stream: { + next(): Promise> { + return Promise.reject("Coming soon"); + }, + }, + output: Promise.reject("Coming soon"), + }; + }; return func; } From 58b0dff6adcf04c292a1fd5a022fc1746211456d Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Fri, 13 Dec 2024 13:43:23 -0800 Subject: [PATCH 2/5] Fix linter error --- spec/helper.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/spec/helper.ts b/spec/helper.ts index 8415c245a..04829be38 100644 --- a/spec/helper.ts +++ b/spec/helper.ts @@ -84,10 +84,11 @@ export function runHandler( } } - public write(writeBody: any, cb: () => void = () => {}) { + public write(writeBody: any, cb?: () => void) { this.sentBody += typeof writeBody === "object" ? JSON.stringify(writeBody) : writeBody; - // N.B. setImmediate breaks sinon. - setImmediate(cb); + if (cb) { + setImmediate(cb); + } return true; } From 15cb32e7c0357575371ff0d059388bd020ac7831 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Fri, 13 Dec 2024 14:00:11 -0800 Subject: [PATCH 3/5] Stream type defaults to unknown --- src/common/providers/https.ts | 6 +++--- src/v2/providers/https.ts | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/common/providers/https.ts b/src/common/providers/https.ts index 1e8016ab9..a6989f130 100644 --- a/src/common/providers/https.ts +++ b/src/common/providers/https.ts @@ -156,7 +156,7 @@ export interface CallableRequest { * CallableProxyResponse allows streaming response chunks and listening to signals * triggered in events such as a disconnect. */ -export interface CallableResponse { +export interface CallableResponse { /** * Writes a chunk of the response body to the client. This method can be called * multiple times to stream data progressively. @@ -721,7 +721,7 @@ export interface CallableOptions { } /** @internal */ -export function onCallHandler( +export function onCallHandler( options: CallableOptions, handler: v1CallableHandler | v2CallableHandler, version: "gcfv1" | "gcfv2" @@ -742,7 +742,7 @@ function encodeSSE(data: unknown): string { } /** @internal */ -function wrapOnCallHandler( +function wrapOnCallHandler( options: CallableOptions, handler: v1CallableHandler | v2CallableHandler, version: "gcfv1" | "gcfv2" diff --git a/src/v2/providers/https.ts b/src/v2/providers/https.ts index 204449f7a..41b7cc19a 100644 --- a/src/v2/providers/https.ts +++ b/src/v2/providers/https.ts @@ -258,7 +258,7 @@ export type HttpsFunction = (( /** * Creates a callable method for clients to call using a Firebase SDK. */ -export interface CallableFunction extends HttpsFunction { +export interface CallableFunction extends HttpsFunction { /** Executes the handler function with the provided data as input. Used for unit testing. * @param data - An input for the handler function. * @returns The output of the handler function. @@ -392,7 +392,7 @@ export function onRequest( * @param handler - A function that takes a {@link https.CallableRequest}. * @returns A function that you can export and deploy. */ -export function onCall, Stream = string>( +export function onCall, Stream = unknown>( opts: CallableOptions, handler: (request: CallableRequest, response?: CallableResponse) => Return ): CallableFunction ? Return : Promise, Stream>; @@ -402,10 +402,10 @@ export function onCall, Stream = string>( * @param handler - A function that takes a {@link https.CallableRequest}. * @returns A function that you can export and deploy. */ -export function onCall, Stream = string>( +export function onCall, Stream = unknown>( handler: (request: CallableRequest, response?: CallableResponse) => Return ): CallableFunction ? Return : Promise>; -export function onCall, Stream = string>( +export function onCall, Stream = unknown>( optsOrHandler: CallableOptions | ((request: CallableRequest) => Return), handler?: (request: CallableRequest, response?: CallableResponse) => Return ): CallableFunction ? Return : Promise> { From 702719d4bf2cec439b1f9018546778fd0b268721 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Fri, 13 Dec 2024 14:36:32 -0800 Subject: [PATCH 4/5] Changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 605aabbf3..2446e5f49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1 +1,2 @@ - Add an authPolicy callback to CallableOptions for reusable auth middleware as well as helper auth policies (#1650) +- Multiple breaking changes to the not-yet-announced streaming feature for Callable Functions (#1652) From d087d9fb3e9ad4c74ea12cc4dc91371a77183a7f Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Sat, 14 Dec 2024 10:44:08 -0800 Subject: [PATCH 5/5] Format fix --- src/v2/providers/https.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/v2/providers/https.ts b/src/v2/providers/https.ts index 41b7cc19a..e59ce9704 100644 --- a/src/v2/providers/https.ts +++ b/src/v2/providers/https.ts @@ -426,8 +426,7 @@ export function onCall, Stream = unknown>( } // fix the length of handler to make the call to handler consistent - const fixedLen = (req: CallableRequest, resp?: CallableResponse) => - handler(req, resp); + const fixedLen = (req: CallableRequest, resp?: CallableResponse) => handler(req, resp); let func: any = onCallHandler( { cors: { origin, methods: "POST" },