Skip to content

Enable Eager Workflow Start #430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 34 additions & 38 deletions temporalio/bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 25 additions & 1 deletion temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ async def start_workflow(
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
request_eager_start: bool = False,
) -> WorkflowHandle[SelfType, ReturnType]:
...

Expand Down Expand Up @@ -317,6 +318,7 @@ async def start_workflow(
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
request_eager_start: bool = False,
) -> WorkflowHandle[SelfType, ReturnType]:
...

Expand Down Expand Up @@ -349,6 +351,7 @@ async def start_workflow(
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
request_eager_start: bool = False,
) -> WorkflowHandle[SelfType, ReturnType]:
...

Expand Down Expand Up @@ -381,6 +384,7 @@ async def start_workflow(
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
request_eager_start: bool = False,
) -> WorkflowHandle[Any, Any]:
...

Expand Down Expand Up @@ -411,6 +415,7 @@ async def start_workflow(
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
request_eager_start: bool = False,
stack_level: int = 2,
) -> WorkflowHandle[Any, Any]:
"""Start a workflow and return its handle.
Expand Down Expand Up @@ -445,6 +450,10 @@ async def start_workflow(
rpc_metadata: Headers used on the RPC call. Keys here override
client-level RPC metadata keys.
rpc_timeout: Optional RPC deadline to set for the RPC call.
request_eager_start: Potentially reduce the latency to start this workflow by
encouraging the server to start it on a local worker running with
this same client.
This is currently experimental.

Returns:
A workflow handle to the started workflow.
Expand Down Expand Up @@ -492,6 +501,7 @@ async def start_workflow(
ret_type=result_type,
rpc_metadata=rpc_metadata,
rpc_timeout=rpc_timeout,
request_eager_start=request_eager_start,
)
)

Expand Down Expand Up @@ -521,6 +531,7 @@ async def execute_workflow(
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
request_eager_start: bool = False,
) -> ReturnType:
...

Expand Down Expand Up @@ -551,6 +562,7 @@ async def execute_workflow(
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
request_eager_start: bool = False,
) -> ReturnType:
...

Expand Down Expand Up @@ -583,6 +595,7 @@ async def execute_workflow(
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
request_eager_start: bool = False,
) -> ReturnType:
...

Expand Down Expand Up @@ -615,6 +628,7 @@ async def execute_workflow(
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
request_eager_start: bool = False,
) -> Any:
...

Expand Down Expand Up @@ -645,6 +659,7 @@ async def execute_workflow(
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
request_eager_start: bool = False,
) -> Any:
"""Start a workflow and wait for completion.

Expand Down Expand Up @@ -674,6 +689,7 @@ async def execute_workflow(
start_signal_args=start_signal_args,
rpc_metadata=rpc_metadata,
rpc_timeout=rpc_timeout,
request_eager_start=request_eager_start,
stack_level=3,
)
).result()
Expand Down Expand Up @@ -1082,6 +1098,7 @@ def __init__(
self._result_run_id = result_run_id
self._first_execution_run_id = first_execution_run_id
self._result_type = result_type
self.__temporal_eagerly_started = False

@property
def id(self) -> str:
Expand Down Expand Up @@ -4282,6 +4299,7 @@ class StartWorkflowInput:
ret_type: Optional[Type]
rpc_metadata: Mapping[str, str]
rpc_timeout: Optional[timedelta]
request_eager_start: bool


@dataclass
Expand Down Expand Up @@ -4751,6 +4769,8 @@ async def start_workflow(
)
else:
req = temporalio.api.workflowservice.v1.StartWorkflowExecutionRequest()
req.request_eager_execution = input.request_eager_start

req.namespace = self._client.namespace
req.workflow_id = input.id
req.workflow_type.name = input.workflow
Expand Down Expand Up @@ -4794,6 +4814,7 @@ async def start_workflow(
temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse,
]
first_execution_run_id = None
eagerly_started = False
try:
if isinstance(
req,
Expand All @@ -4813,6 +4834,7 @@ async def start_workflow(
timeout=input.rpc_timeout,
)
first_execution_run_id = resp.run_id
eagerly_started = resp.HasField("eager_workflow_task")
except RPCError as err:
# If the status is ALREADY_EXISTS and the details can be extracted
# as already started, use a different exception
Expand All @@ -4826,13 +4848,15 @@ async def start_workflow(
)
else:
raise
return WorkflowHandle(
handle: WorkflowHandle[Any, Any] = WorkflowHandle(
self._client,
req.workflow_id,
result_run_id=resp.run_id,
first_execution_run_id=first_execution_run_id,
result_type=input.ret_type,
)
setattr(handle, "__temporal_eagerly_started", eagerly_started)
return handle

async def cancel_workflow(self, input: CancelWorkflowInput) -> None:
await self._client.workflow_service.request_cancel_workflow_execution(
Expand Down
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]:
"system.forceSearchAttributesCacheRefreshOnRead=true",
"--dynamic-config-value",
f"limit.historyCount.suggestContinueAsNew={CONTINUE_AS_NEW_SUGGEST_HISTORY_COUNT}",
"--dynamic-config-value",
"system.enableEagerWorkflowStart=true",
]
)
elif env_type == "time-skipping":
Expand Down
15 changes: 15 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,21 @@ async def test_workflow_hello(client: Client):
assert result == "Hello, Temporal!"


async def test_workflow_hello_eager(client: Client):
async with new_worker(client, HelloWorkflow) as worker:
handle = await client.start_workflow(
HelloWorkflow.run,
"Temporal",
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
request_eager_start=True,
task_timeout=timedelta(hours=1), # hang if retry needed
)
assert handle.__temporal_eagerly_started
result = await handle.result()
assert result == "Hello, Temporal!"


@activity.defn
async def multi_param_activity(param1: int, param2: str) -> str:
return f"param1: {param1}, param2: {param2}"
Expand Down