From 0a194cb68fc81c78a9b0a1ecace7139c9ed8529a Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Fri, 13 Jan 2023 08:38:09 -0600 Subject: [PATCH] Fail activity worker on broken executor Fixes #245 --- README.md | 3 ++ temporalio/worker/_activity.py | 37 ++++++++++++++++++++++-- temporalio/worker/_worker.py | 4 ++- tests/worker/test_activity.py | 53 ++++++++++++++++++++++++++++++++++ 4 files changed, 94 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 7f3eb3e51..562414fa0 100644 --- a/README.md +++ b/README.md @@ -938,6 +938,9 @@ Note, all calls from an activity to functions in the `temporalio.activity` packa activities must `copy_context()` and then `.run()` manually to ensure `temporalio.activity` calls like `heartbeat` still function in the new threads. +If any activity ever throws a `concurrent.futures.BrokenExecutor`, the failure is consisted unrecoverable and the worker +will fail and shutdown. + ###### Synchronous Multithreaded Activities If `activity_executor` is set to an instance of `concurrent.futures.ThreadPoolExecutor` then the synchronous activities diff --git a/temporalio/worker/_activity.py b/temporalio/worker/_activity.py index c1c45a827..e2c6ae7a3 100644 --- a/temporalio/worker/_activity.py +++ b/temporalio/worker/_activity.py @@ -16,7 +16,18 @@ from contextlib import contextmanager from dataclasses import dataclass from datetime import datetime, timedelta, timezone -from typing import Any, Callable, Dict, Iterator, Optional, Sequence, Tuple, Type, Union +from typing import ( + Any, + Callable, + Dict, + Iterator, + NoReturn, + Optional, + Sequence, + Tuple, + Type, + Union, +) import google.protobuf.duration_pb2 import google.protobuf.timestamp_pb2 @@ -64,6 +75,7 @@ def __init__( self._running_activities: Dict[bytes, _RunningActivity] = {} self._data_converter = data_converter self._interceptors = interceptors + self._fail_worker_exception_queue: asyncio.Queue[Exception] = asyncio.Queue() # Lazily created on first activity self._worker_shutdown_event: Optional[ temporalio.activity._CompositeEvent @@ -111,11 +123,26 @@ def __init__( self._activities[defn.name] = defn async def run(self) -> None: + # Create a task that fails when we get a failure on the queue + async def raise_from_queue() -> NoReturn: + raise await self._fail_worker_exception_queue.get() + + exception_task = asyncio.create_task(raise_from_queue()) + # Continually poll for activity work while True: try: # Poll for a task - task = await self._bridge_worker().poll_activity_task() + poll_task = asyncio.create_task( + self._bridge_worker().poll_activity_task() + ) + await asyncio.wait([poll_task, exception_task], return_when=asyncio.FIRST_COMPLETED) # type: ignore + # If exception for failing the worker happened, raise it. + # Otherwise, the poll succeeded. + if exception_task.done(): + poll_task.cancel() + await exception_task + task = await poll_task if task.HasField("start"): # Cancelled event and sync field will be updated inside @@ -131,8 +158,10 @@ async def run(self) -> None: else: raise RuntimeError(f"Unrecognized activity task: {task}") except temporalio.bridge.worker.PollShutdownError: + exception_task.cancel() return except Exception as err: + exception_task.cancel() raise RuntimeError("Activity worker failed") from err async def shutdown(self, after_graceful_timeout: timedelta) -> None: @@ -465,6 +494,10 @@ async def _run_activity( await self._data_converter.encode_failure( err, completion.result.failed.failure ) + + # For broken executors, we have to fail the entire worker + if isinstance(err, concurrent.futures.BrokenExecutor): + self._fail_worker_exception_queue.put_nowait(err) except Exception as inner_err: temporalio.activity.logger.exception( f"Exception handling failed, original error: {err}" diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 643b66bb2..56cef3e1c 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -93,7 +93,9 @@ def __init__( activity_executor: Concurrent executor to use for non-async activities. This is required if any activities are non-async. If this is a :py:class:`concurrent.futures.ProcessPoolExecutor`, - all non-async activities must be picklable. + all non-async activities must be picklable. Note, a broken + executor failure from this executor will cause the worker to + fail and shutdown. workflow_task_executor: Thread pool executor for workflow tasks. If this is not present, a new :py:class:`concurrent.futures.ThreadPoolExecutor` will be diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index 4905963ae..0c06fdd43 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -3,10 +3,13 @@ import logging import logging.handlers import multiprocessing +import os import queue +import signal import threading import time import uuid +from concurrent.futures.process import BrokenProcessPool from dataclasses import dataclass from datetime import datetime, timedelta, timezone from typing import Any, Callable, List, NoReturn, Optional, Sequence @@ -919,6 +922,56 @@ async def test_sync_activity_process_worker_shutdown_graceful( assert "Worker graceful shutdown" == await handle.result() +@activity.defn +def kill_my_process() -> str: + os.kill(os.getpid(), getattr(signal, "SIGKILL", -9)) + return "does not get here" + + +async def test_sync_activity_process_executor_crash( + client: Client, worker: ExternalWorker +): + act_task_queue = str(uuid.uuid4()) + with concurrent.futures.ProcessPoolExecutor() as executor: + act_worker = Worker( + client, + task_queue=act_task_queue, + activities=[kill_my_process], + activity_executor=executor, + graceful_shutdown_timeout=timedelta(seconds=2), + shared_state_manager=_default_shared_state_manager, + ) + act_worker_task = asyncio.create_task(act_worker.run()) + + # Confirm workflow failure with broken pool + with pytest.raises(WorkflowFailureError) as workflow_err: + await client.execute_workflow( + "kitchen_sink", + KSWorkflowParams( + actions=[ + KSAction( + execute_activity=KSExecuteActivityAction( + name="kill_my_process", + task_queue=act_task_queue, + heartbeat_timeout_ms=30000, + ) + ) + ] + ), + id=str(uuid.uuid4()), + task_queue=worker.task_queue, + ) + assert isinstance(workflow_err.value.cause, ActivityError) + assert isinstance(workflow_err.value.cause.cause, ApplicationError) + assert workflow_err.value.cause.cause.type == "BrokenProcessPool" + + # Also confirm that activity worker fails unrecoverably + with pytest.raises(RuntimeError) as worker_err: + await asyncio.wait_for(act_worker_task, 10) + assert str(worker_err.value) == "Activity worker failed" + assert isinstance(worker_err.value.__cause__, BrokenProcessPool) + + class AsyncActivityWrapper: def __init__(self) -> None: self._info: Optional[activity.Info] = None