Skip to content

Add RawValue support for non-converted Payloads #1664

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions packages/common/src/converter/payload-converter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,25 @@ export function mapFromPayloads<K extends string, T = unknown>(
) as Record<K, T>;
}

export declare const rawPayloadTypeBrand: unique symbol;
/**
* RawValue is a wrapper over a payload.
* A payload that belongs to a RawValue is special in that it bypasses user-defined payload converters,
* instead using the default payload converter. The payload still undergoes codec conversion.
*/
export class RawValue<T = unknown> {
private readonly _payload: Payload;
private readonly [rawPayloadTypeBrand]: T = undefined as T;

constructor(value: T, payloadConverter: PayloadConverter = defaultPayloadConverter) {
this._payload = payloadConverter.toPayload(value);
}

get payload(): Payload {
return this._payload;
}
}

export interface PayloadConverterWithEncoding {
/**
* Converts a value to a {@link Payload}.
Expand Down Expand Up @@ -143,6 +162,9 @@ export class CompositePayloadConverter implements PayloadConverter {
* Returns the first successful result, throws {@link ValueError} if there is no converter that can handle the value.
*/
public toPayload<T>(value: T): Payload {
if (value instanceof RawValue) {
return value.payload;
}
for (const converter of this.converters) {
const result = converter.toPayload(value);
if (result !== undefined) {
Expand All @@ -160,6 +182,7 @@ export class CompositePayloadConverter implements PayloadConverter {
if (payload.metadata === undefined || payload.metadata === null) {
throw new ValueError('Missing payload metadata');
}

const encoding = decode(payload.metadata[METADATA_ENCODING_KEY]);
const converter = this.converterByEncoding.get(encoding);
if (converter === undefined) {
Expand Down
26 changes: 26 additions & 0 deletions packages/test/src/test-integration-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
ActivityCancellationType,
ApplicationFailure,
defineSearchAttributeKey,
RawValue,
SearchAttributePair,
SearchAttributeType,
TypedSearchAttributes,
Expand Down Expand Up @@ -1340,6 +1341,31 @@ test('can register search attributes to dev server', async (t) => {
await env.teardown();
});

export async function rawValueWorkflow(value: unknown): Promise<RawValue> {
const { rawValueActivity } = workflow.proxyActivities({ startToCloseTimeout: '10s' });
return await rawValueActivity(new RawValue(value));
}

test('workflow and activity can receive/return RawValue', async (t) => {
const { executeWorkflow, createWorker } = helpers(t);
const worker = await createWorker({
activities: {
async rawValueActivity(value: unknown): Promise<RawValue> {
return new RawValue(value);
},
},
});

await worker.runUntil(async () => {
const testValue = 'test';
const rawValue = new RawValue(testValue);
const res = await executeWorkflow(rawValueWorkflow, {
args: [rawValue],
});
t.deepEqual(res, testValue);
});
});

export async function ChildWorkflowInfo(): Promise<workflow.RootWorkflowInfo | undefined> {
let blocked = true;
workflow.setHandler(unblockSignal, () => {
Expand Down
21 changes: 12 additions & 9 deletions packages/workflow/src/internals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
WorkflowUpdateValidatorType,
mapFromPayloads,
fromPayloadsAtIndex,
RawValue,
WorkflowFunctionWithOptions,
VersioningBehavior,
WorkflowDefinitionOptions,
Expand All @@ -41,13 +42,13 @@ import {
DefaultSignalHandler,
StackTraceSDKInfo,
StackTraceFileSlice,
EnhancedStackTrace,
StackTraceFileLocation,
WorkflowInfo,
WorkflowCreateOptionsInternal,
ActivationCompletion,
DefaultUpdateHandler,
DefaultQueryHandler,
EnhancedStackTrace,
} from './interfaces';
import { type SinkCall } from './sinks';
import { untrackPromise } from './stack-helpers';
Expand Down Expand Up @@ -263,17 +264,19 @@ export class Activator implements ActivationHandler {
'__stack_trace',
{
handler: () => {
return this.getStackTraces()
.map((s) => s.formatted)
.join('\n\n');
return new RawValue<string>(
this.getStackTraces()
.map((s) => s.formatted)
.join('\n\n')
);
},
description: 'Returns a sensible stack trace.',
},
],
[
'__enhanced_stack_trace',
{
handler: (): EnhancedStackTrace => {
handler: (): RawValue => {
const { sourceMap } = this;
const sdk: StackTraceSDKInfo = { name: 'typescript', version: pkg.version };
const stacks = this.getStackTraces().map(({ structured: locations }) => ({ locations }));
Expand All @@ -293,15 +296,15 @@ export class Activator implements ActivationHandler {
}
}
}
return { sdk, stacks, sources };
return new RawValue<EnhancedStackTrace>({ sdk, stacks, sources });
},
description: 'Returns a stack trace annotated with source information.',
},
],
[
'__temporal_workflow_metadata',
{
handler: (): temporal.api.sdk.v1.IWorkflowMetadata => {
handler: (): RawValue => {
const workflowType = this.info.workflowType;
const queryDefinitions = Array.from(this.queryHandlers.entries()).map(([name, value]) => ({
name,
Expand All @@ -315,14 +318,14 @@ export class Activator implements ActivationHandler {
name,
description: value.description,
}));
return {
return new RawValue<temporal.api.sdk.v1.IWorkflowMetadata>({
definition: {
type: workflowType,
queryDefinitions,
signalDefinitions,
updateDefinitions,
},
};
});
},
description: 'Returns metadata associated with this workflow.',
},
Expand Down
Loading