From 944ed127285438139755bfba1fa4a0546bd62adf Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 30 Oct 2023 10:05:56 -0700 Subject: [PATCH 1/2] Remove interceptable poll update --- temporalio/client.py | 108 ++++++++++++++++--------------------------- tests/test_client.py | 7 --- 2 files changed, 39 insertions(+), 76 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 3ee537788..1d1f4753c 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -4007,16 +4007,45 @@ async def result( self._result_type, ) - return await self._client._impl.poll_workflow_update( - PollWorkflowUpdateInput( - self.workflow_id, - self.workflow_run_id, - self.id, - timeout, - self._result_type, - rpc_metadata, - rpc_timeout, - ) + req = temporalio.api.workflowservice.v1.PollWorkflowExecutionUpdateRequest( + namespace=self._client.namespace, + update_ref=temporalio.api.update.v1.UpdateRef( + workflow_execution=temporalio.api.common.v1.WorkflowExecution( + workflow_id=self.workflow_id, + run_id=self.workflow_run_id or "", + ), + update_id=self.id, + ), + identity=self._client.identity, + wait_policy=temporalio.api.update.v1.WaitPolicy( + lifecycle_stage=temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED + ), + ) + + async def poll_loop(): + # Continue polling as long as we have either an empty response, or an *rpc* timeout + while True: + try: + res = await self._client.workflow_service.poll_workflow_execution_update( + req, + retry=True, + metadata=rpc_metadata, + timeout=rpc_timeout, + ) + if res.HasField("outcome"): + return await _update_outcome_to_result( + res.outcome, + self.id, + self._client.data_converter, + self._result_type, + ) + except RPCError as err: + if err.status != RPCStatusCode.DEADLINE_EXCEEDED: + raise + + # Wait for at most the *overall* timeout + return await asyncio.wait_for( + poll_loop(), timeout.total_seconds() if timeout else None ) @@ -4241,19 +4270,6 @@ class StartWorkflowUpdateInput: rpc_timeout: Optional[timedelta] -@dataclass -class PollWorkflowUpdateInput: - """Input for :py:meth:`OutboundInterceptor.poll_workflow_update`.""" - - workflow_id: str - run_id: Optional[str] - update_id: str - timeout: Optional[timedelta] - ret_type: Optional[Type] - rpc_metadata: Mapping[str, str] - rpc_timeout: Optional[timedelta] - - @dataclass class HeartbeatAsyncActivityInput: """Input for :py:meth:`OutboundInterceptor.heartbeat_async_activity`.""" @@ -4504,10 +4520,6 @@ async def start_workflow_update( """Called for every :py:meth:`WorkflowHandle.update` and :py:meth:`WorkflowHandle.start_update` call.""" return await self.next.start_workflow_update(input) - async def poll_workflow_update(self, input: PollWorkflowUpdateInput) -> Any: - """May be called when calling :py:meth:`WorkflowUpdateHandle.result`.""" - return await self.next.poll_workflow_update(input) - ### Async activity calls async def heartbeat_async_activity( @@ -4885,48 +4897,6 @@ async def start_workflow_update( return update_handle - async def poll_workflow_update(self, input: PollWorkflowUpdateInput) -> Any: - req = temporalio.api.workflowservice.v1.PollWorkflowExecutionUpdateRequest( - namespace=self._client.namespace, - update_ref=temporalio.api.update.v1.UpdateRef( - workflow_execution=temporalio.api.common.v1.WorkflowExecution( - workflow_id=input.workflow_id, - run_id=input.run_id or "", - ), - update_id=input.update_id, - ), - identity=self._client.identity, - wait_policy=temporalio.api.update.v1.WaitPolicy( - lifecycle_stage=temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED - ), - ) - - async def poll_loop(): - # Continue polling as long as we have either an empty response, or an *rpc* timeout - while True: - try: - res = await self._client.workflow_service.poll_workflow_execution_update( - req, - retry=True, - metadata=input.rpc_metadata, - timeout=input.rpc_timeout, - ) - if res.HasField("outcome"): - return await _update_outcome_to_result( - res.outcome, - input.update_id, - self._client.data_converter, - input.ret_type, - ) - except RPCError as err: - if err.status != RPCStatusCode.DEADLINE_EXCEEDED: - raise - - # Wait for at most the *overall* timeout - return await asyncio.wait_for( - poll_loop(), input.timeout.total_seconds() if input.timeout else None - ) - ### Async activity calls async def heartbeat_async_activity( diff --git a/tests/test_client.py b/tests/test_client.py index 1772cc51e..7fb26b85d 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -36,7 +36,6 @@ Client, Interceptor, OutboundInterceptor, - PollWorkflowUpdateInput, QueryWorkflowInput, RPCError, RPCStatusCode, @@ -456,12 +455,6 @@ async def start_workflow_update( self._parent.traces.append(("start_workflow_update", input)) return await super().start_workflow_update(input) - async def poll_workflow_update( - self, input: PollWorkflowUpdateInput - ) -> WorkflowUpdateHandle[Any]: - self._parent.traces.append(("poll_workflow_update", input)) - return await super().poll_workflow_update(input) - async def test_interceptor(client: Client, worker: ExternalWorker): # Create new client from existing client but with a tracing interceptor From 4752f157d68173c5e24dedce88cdaa1f650afabe Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 30 Oct 2023 10:07:20 -0700 Subject: [PATCH 2/2] Remove timeout from wf handle result --- temporalio/client.py | 38 ++++++++++++++++---------------------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 1d1f4753c..170f58aaf 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -3979,7 +3979,6 @@ def workflow_run_id(self) -> Optional[str]: async def result( self, *, - timeout: Optional[timedelta] = None, rpc_metadata: Mapping[str, str] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: @@ -3988,7 +3987,6 @@ async def result( specified. Args: - timeout: Optional timeout specifying maximum wait time for the result. rpc_metadata: Headers used on the RPC call. Keys here override client-level RPC metadata keys. rpc_timeout: Optional RPC deadline to set for the RPC call. If this elapses, the poll is retried until the overall timeout has been reached. @@ -4022,31 +4020,27 @@ async def result( ), ) - async def poll_loop(): - # Continue polling as long as we have either an empty response, or an *rpc* timeout - while True: - try: - res = await self._client.workflow_service.poll_workflow_execution_update( + # Continue polling as long as we have either an empty response, or an *rpc* timeout + while True: + try: + res = ( + await self._client.workflow_service.poll_workflow_execution_update( req, retry=True, metadata=rpc_metadata, timeout=rpc_timeout, ) - if res.HasField("outcome"): - return await _update_outcome_to_result( - res.outcome, - self.id, - self._client.data_converter, - self._result_type, - ) - except RPCError as err: - if err.status != RPCStatusCode.DEADLINE_EXCEEDED: - raise - - # Wait for at most the *overall* timeout - return await asyncio.wait_for( - poll_loop(), timeout.total_seconds() if timeout else None - ) + ) + if res.HasField("outcome"): + return await _update_outcome_to_result( + res.outcome, + self.id, + self._client.data_converter, + self._result_type, + ) + except RPCError as err: + if err.status != RPCStatusCode.DEADLINE_EXCEEDED: + raise class WorkflowFailureError(temporalio.exceptions.TemporalError):