From e37d14417d902fb7a632e7d44364a2dc6daa83c4 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 16 Apr 2025 11:11:14 -0700 Subject: [PATCH 1/2] Expose poller autoscaling options --- temporalio/bridge/src/worker.rs | 53 +++++++++++++++++++----- temporalio/bridge/worker.py | 26 +++++++++++- temporalio/worker/__init__.py | 6 +++ temporalio/worker/_replayer.py | 8 +++- temporalio/worker/_worker.py | 71 +++++++++++++++++++++++++++++---- tests/worker/test_worker.py | 24 +++++++++++ 6 files changed, 166 insertions(+), 22 deletions(-) diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index d9a6487b6..db3cd3cac 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -15,9 +15,8 @@ use temporal_sdk_core::api::errors::PollError; use temporal_sdk_core::replay::{HistoryForReplay, ReplayWorkerInput}; use temporal_sdk_core_api::errors::WorkflowErrorType; use temporal_sdk_core_api::worker::{ - PollerBehavior, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext, - SlotReleaseContext, SlotReservationContext, SlotSupplier as SlotSupplierTrait, - SlotSupplierPermit, + SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext, SlotReleaseContext, + SlotReservationContext, SlotSupplier as SlotSupplierTrait, SlotSupplierPermit, }; use temporal_sdk_core_api::Worker; use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion; @@ -49,9 +48,9 @@ pub struct WorkerConfig { identity_override: Option, max_cached_workflows: usize, tuner: TunerHolder, - max_concurrent_workflow_task_polls: usize, + workflow_task_poller_behavior: PollerBehavior, nonsticky_to_sticky_poll_ratio: f32, - max_concurrent_activity_task_polls: usize, + activity_task_poller_behavior: PollerBehavior, no_remote_activities: bool, sticky_queue_schedule_to_start_timeout_millis: u64, max_heartbeat_throttle_interval_millis: u64, @@ -63,6 +62,42 @@ pub struct WorkerConfig { nondeterminism_as_workflow_fail_for_types: HashSet, } +#[derive(FromPyObject)] +pub struct PollerBehaviorSimpleMaximum { + pub maximum: usize, +} + +#[derive(FromPyObject)] +pub struct PollerBehaviorAutoscaling { + pub minimum: usize, + pub maximum: usize, + pub initial: usize, +} + +/// Recreates [temporal_sdk_core_api::worker::PollerBehavior] +#[derive(FromPyObject)] +pub enum PollerBehavior { + SimpleMaximum(PollerBehaviorSimpleMaximum), + Autoscaling(PollerBehaviorAutoscaling), +} + +impl From for temporal_sdk_core_api::worker::PollerBehavior { + fn from(value: PollerBehavior) -> Self { + match value { + PollerBehavior::SimpleMaximum(simple) => { + temporal_sdk_core_api::worker::PollerBehavior::SimpleMaximum(simple.maximum) + } + PollerBehavior::Autoscaling(auto) => { + temporal_sdk_core_api::worker::PollerBehavior::Autoscaling { + minimum: auto.minimum, + maximum: auto.maximum, + initial: auto.initial, + } + } + } + } +} + /// Recreates [temporal_sdk_core_api::worker::WorkerVersioningStrategy] #[derive(FromPyObject)] pub enum WorkerVersioningStrategy { @@ -626,14 +661,10 @@ fn convert_worker_config( .versioning_strategy(converted_versioning_strategy) .client_identity_override(conf.identity_override) .max_cached_workflows(conf.max_cached_workflows) - .workflow_task_poller_behavior(PollerBehavior::SimpleMaximum( - conf.max_concurrent_workflow_task_polls, - )) + .workflow_task_poller_behavior(conf.workflow_task_poller_behavior) .tuner(Arc::new(converted_tuner)) .nonsticky_to_sticky_poll_ratio(conf.nonsticky_to_sticky_poll_ratio) - .activity_task_poller_behavior(PollerBehavior::SimpleMaximum( - conf.max_concurrent_activity_task_polls, - )) + .activity_task_poller_behavior(conf.activity_task_poller_behavior) .no_remote_activities(conf.no_remote_activities) .sticky_queue_schedule_to_start_timeout(Duration::from_millis( conf.sticky_queue_schedule_to_start_timeout_millis, diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index 3ef78ec76..419bebb05 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -48,9 +48,9 @@ class WorkerConfig: identity_override: Optional[str] max_cached_workflows: int tuner: TunerHolder - max_concurrent_workflow_task_polls: int + workflow_task_poller_behavior: PollerBehavior nonsticky_to_sticky_poll_ratio: float - max_concurrent_activity_task_polls: int + activity_task_poller_behavior: PollerBehavior no_remote_activities: bool sticky_queue_schedule_to_start_timeout_millis: int max_heartbeat_throttle_interval_millis: int @@ -62,6 +62,28 @@ class WorkerConfig: nondeterminism_as_workflow_fail_for_types: Set[str] +@dataclass +class PollerBehaviorSimpleMaximum: + """Python representation of the Rust struct for simple poller behavior.""" + + maximum: int + + +@dataclass +class PollerBehaviorAutoscaling: + """Python representation of the Rust struct for autoscaling poller behavior.""" + + minimum: int + maximum: int + initial: int + + +PollerBehavior: TypeAlias = Union[ + PollerBehaviorSimpleMaximum, + PollerBehaviorAutoscaling, +] + + @dataclass class WorkerDeploymentVersion: """Python representation of the Rust struct for configuring a worker deployment version.""" diff --git a/temporalio/worker/__init__.py b/temporalio/worker/__init__.py index 96e37d3b3..bd053fcf2 100644 --- a/temporalio/worker/__init__.py +++ b/temporalio/worker/__init__.py @@ -43,6 +43,9 @@ WorkflowSlotInfo, ) from ._worker import ( + PollerBehavior, + PollerBehaviorAutoscaling, + PollerBehaviorSimpleMaximum, Worker, WorkerConfig, WorkerDeploymentConfig, @@ -65,6 +68,9 @@ "ReplayerConfig", "WorkflowReplayResult", "WorkflowReplayResults", + "PollerBehavior", + "PollerBehaviorSimpleMaximum", + "PollerBehaviorAutoscaling", # Interceptor base classes "Interceptor", "ActivityInboundInterceptor", diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 238d64ace..964184196 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -242,9 +242,7 @@ def on_eviction_hook( 1 ), ), - max_concurrent_workflow_task_polls=1, nonsticky_to_sticky_poll_ratio=1, - max_concurrent_activity_task_polls=1, no_remote_activities=True, sticky_queue_schedule_to_start_timeout_millis=1000, max_heartbeat_throttle_interval_millis=1000, @@ -255,6 +253,12 @@ def on_eviction_hook( versioning_strategy=temporalio.bridge.worker.WorkerVersioningStrategyNone( build_id=self._config["build_id"] or load_default_build_id(), ), + workflow_task_poller_behavior=temporalio.bridge.worker.PollerBehaviorSimpleMaximum( + 1 + ), + activity_task_poller_behavior=temporalio.bridge.worker.PollerBehaviorSimpleMaximum( + 1 + ), ), ) # Start worker diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index f0b446111..c4c4696fa 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -18,10 +18,11 @@ Optional, Sequence, Type, + Union, cast, ) -from typing_extensions import TypedDict +from typing_extensions import TypeAlias, TypedDict import temporalio.activity import temporalio.api.common.v1 @@ -48,6 +49,48 @@ logger = logging.getLogger(__name__) +@dataclass +class PollerBehaviorSimpleMaximum: + """A poller behavior that will attempt to poll as long as a slot is available, up to the + provided maximum. Cannot be less than two for workflow tasks, or one for other tasks. + """ + + maximum: int = 5 + + def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior: + return temporalio.bridge.worker.PollerBehaviorSimpleMaximum( + maximum=self.maximum + ) + + +@dataclass +class PollerBehaviorAutoscaling: + """A poller behavior that will automatically scale the number of pollers based on feedback + from the server. A slot must be available before beginning polling. + """ + + minimum: int = 1 + """At least this many poll calls will always be attempted (assuming slots are available).""" + maximum: int = 100 + """At most this many poll calls will ever be open at once. Must be >= `minimum`.""" + initial: int = 5 + """This many polls will be attempted initially before scaling kicks in. Must be between + `minimum` and `maximum`.""" + + def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior: + return temporalio.bridge.worker.PollerBehaviorAutoscaling( + minimum=self.minimum, + maximum=self.maximum, + initial=self.initial, + ) + + +PollerBehavior: TypeAlias = Union[ + PollerBehaviorSimpleMaximum, + PollerBehaviorAutoscaling, +] + + class Worker: """Worker to process workflows and/or activities. @@ -76,9 +119,9 @@ def __init__( max_concurrent_activities: Optional[int] = None, max_concurrent_local_activities: Optional[int] = None, tuner: Optional[WorkerTuner] = None, - max_concurrent_workflow_task_polls: int = 5, + max_concurrent_workflow_task_polls: Optional[int] = None, nonsticky_to_sticky_poll_ratio: float = 0.2, - max_concurrent_activity_task_polls: int = 5, + max_concurrent_activity_task_polls: Optional[int] = None, no_remote_activities: bool = False, sticky_queue_schedule_to_start_timeout: timedelta = timedelta(seconds=10), max_heartbeat_throttle_interval: timedelta = timedelta(seconds=60), @@ -94,6 +137,12 @@ def __init__( use_worker_versioning: bool = False, disable_safe_workflow_eviction: bool = False, deployment_config: Optional[WorkerDeploymentConfig] = None, + workflow_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum( + maximum=5 + ), + activity_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum( + maximum=5 + ), ) -> None: """Create a worker to process workflows and/or activities. @@ -156,6 +205,8 @@ def __init__( max_concurrent_workflow_task_polls: Maximum number of concurrent poll workflow task requests we will perform at a time on this worker's task queue. + + WARNING: Deprecated, use ``workflow_task_poller_behavior`` instead nonsticky_to_sticky_poll_ratio: max_concurrent_workflow_task_polls * this number = the number of max pollers that will be allowed for the nonsticky queue when sticky tasks are enabled. If both @@ -166,6 +217,8 @@ def __init__( max_concurrent_activity_task_polls: Maximum number of concurrent poll activity task requests we will perform at a time on this worker's task queue. + + WARNING: Deprecated, use ``activity_task_poller_behavior`` instead no_remote_activities: If true, this worker will only handle workflow tasks and local activities, it will not poll for activity tasks. sticky_queue_schedule_to_start_timeout: How long a workflow task is @@ -231,6 +284,8 @@ def __init__( deployment_config: Deployment config for the worker. Exclusive with `build_id` and `use_worker_versioning`. WARNING: This is an experimental feature and may change in the future. + workflow_task_poller_behavior: Specify the behavior of workflow task polling + activity_task_poller_behavior: Specify the behavior of activity task polling """ if not activities and not workflows: raise ValueError("At least one activity or workflow must be specified") @@ -408,9 +463,7 @@ def __init__( identity_override=identity, max_cached_workflows=max_cached_workflows, tuner=bridge_tuner, - max_concurrent_workflow_task_polls=max_concurrent_workflow_task_polls, nonsticky_to_sticky_poll_ratio=nonsticky_to_sticky_poll_ratio, - max_concurrent_activity_task_polls=max_concurrent_activity_task_polls, # We have to disable remote activities if a user asks _or_ if we # are not running an activity worker at all. Otherwise shutdown # will not proceed properly. @@ -440,6 +493,8 @@ def __init__( else set() ), versioning_strategy=versioning_strategy, + workflow_task_poller_behavior=workflow_task_poller_behavior._to_bridge(), + activity_task_poller_behavior=activity_task_poller_behavior._to_bridge(), ), ) @@ -696,9 +751,9 @@ class WorkerConfig(TypedDict, total=False): max_concurrent_activities: Optional[int] max_concurrent_local_activities: Optional[int] tuner: Optional[WorkerTuner] - max_concurrent_workflow_task_polls: int + max_concurrent_workflow_task_polls: Optional[int] nonsticky_to_sticky_poll_ratio: float - max_concurrent_activity_task_polls: int + max_concurrent_activity_task_polls: Optional[int] no_remote_activities: bool sticky_queue_schedule_to_start_timeout: timedelta max_heartbeat_throttle_interval: timedelta @@ -714,6 +769,8 @@ class WorkerConfig(TypedDict, total=False): use_worker_versioning: bool disable_safe_workflow_eviction: bool deployment_config: Optional[WorkerDeploymentConfig] + workflow_task_poller_behavior: PollerBehavior + activity_task_poller_behavior: PollerBehavior @dataclass diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index 76eb6a238..934d2c621 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -27,6 +27,7 @@ CustomSlotSupplier, FixedSizeSlotSupplier, LocalActivitySlotInfo, + PollerBehaviorAutoscaling, ResourceBasedSlotConfig, ResourceBasedSlotSupplier, ResourceBasedTunerConfig, @@ -919,6 +920,29 @@ async def test_workflows_can_use_default_versioning_behavior( ) +async def test_can_run_autoscaling_polling_worker( + client: Client, env: WorkflowEnvironment +): + async with new_worker( + client, + WaitOnSignalWorkflow, + activities=[say_hello], + workflow_task_poller_behavior=PollerBehaviorAutoscaling(), + activity_task_poller_behavior=PollerBehaviorAutoscaling(), + ) as w: + + async def do_workflow(): + wf = await client.start_workflow( + WaitOnSignalWorkflow.run, + id=f"resource-based-{uuid.uuid4()}", + task_queue=w.task_queue, + ) + await wf.signal(WaitOnSignalWorkflow.my_signal, "finish") + await wf.result() + + await asyncio.gather(*[do_workflow() for _ in range(20)]) + + async def wait_until_worker_deployment_visible( client: Client, version: WorkerDeploymentVersion ) -> DescribeWorkerDeploymentResponse: From 3b133f235c6fac7386cc85f94b62cc98f54720de Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 16 Apr 2025 16:18:36 -0700 Subject: [PATCH 2/2] Review comments --- temporalio/bridge/sdk-core | 2 +- temporalio/bridge/src/worker.rs | 4 ++-- temporalio/bridge/worker.py | 2 +- temporalio/worker/_worker.py | 28 ++++++++++++++++++---- tests/worker/test_worker.py | 42 ++++++++++++++++++++++++++++++--- 5 files changed, 66 insertions(+), 12 deletions(-) diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index e18982ec7..1cbca021d 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit e18982ec72be62e357a5ea418b1670c8b2fee55f +Subproject commit 1cbca021d29763c52129730644a95d6f8cf68931 diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index db3cd3cac..85833f440 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -64,7 +64,7 @@ pub struct WorkerConfig { #[derive(FromPyObject)] pub struct PollerBehaviorSimpleMaximum { - pub maximum: usize, + pub simple_maximum: usize, } #[derive(FromPyObject)] @@ -85,7 +85,7 @@ impl From for temporal_sdk_core_api::worker::PollerBehavior { fn from(value: PollerBehavior) -> Self { match value { PollerBehavior::SimpleMaximum(simple) => { - temporal_sdk_core_api::worker::PollerBehavior::SimpleMaximum(simple.maximum) + temporal_sdk_core_api::worker::PollerBehavior::SimpleMaximum(simple.simple_maximum) } PollerBehavior::Autoscaling(auto) => { temporal_sdk_core_api::worker::PollerBehavior::Autoscaling { diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index 419bebb05..8c9d2e9e0 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -66,7 +66,7 @@ class WorkerConfig: class PollerBehaviorSimpleMaximum: """Python representation of the Rust struct for simple poller behavior.""" - maximum: int + simple_maximum: int @dataclass diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index c4c4696fa..786203dca 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -49,7 +49,7 @@ logger = logging.getLogger(__name__) -@dataclass +@dataclass(frozen=True) class PollerBehaviorSimpleMaximum: """A poller behavior that will attempt to poll as long as a slot is available, up to the provided maximum. Cannot be less than two for workflow tasks, or one for other tasks. @@ -59,11 +59,11 @@ class PollerBehaviorSimpleMaximum: def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior: return temporalio.bridge.worker.PollerBehaviorSimpleMaximum( - maximum=self.maximum + simple_maximum=self.maximum ) -@dataclass +@dataclass(frozen=True) class PollerBehaviorAutoscaling: """A poller behavior that will automatically scale the number of pollers based on feedback from the server. A slot must be available before beginning polling. @@ -201,11 +201,16 @@ def __init__( ``max_concurrent_workflow_tasks``, ``max_concurrent_activities``, and ``max_concurrent_local_activities`` arguments. + Defaults to fixed-size 100 slots for each slot kind if unset and none of the + max_* arguments are provided. + WARNING: This argument is experimental max_concurrent_workflow_task_polls: Maximum number of concurrent poll workflow task requests we will perform at a time on this worker's task queue. + If set, will override any value passed to ``workflow_task_poller_behavior``. + WARNING: Deprecated, use ``workflow_task_poller_behavior`` instead nonsticky_to_sticky_poll_ratio: max_concurrent_workflow_task_polls * this number = the number of max pollers that will be allowed for @@ -218,6 +223,8 @@ def __init__( poll activity task requests we will perform at a time on this worker's task queue. + If set, will override any value passed to ``activity_task_poller_behavior``. + WARNING: Deprecated, use ``activity_task_poller_behavior`` instead no_remote_activities: If true, this worker will only handle workflow tasks and local activities, it will not poll for activity tasks. @@ -284,8 +291,10 @@ def __init__( deployment_config: Deployment config for the worker. Exclusive with `build_id` and `use_worker_versioning`. WARNING: This is an experimental feature and may change in the future. - workflow_task_poller_behavior: Specify the behavior of workflow task polling - activity_task_poller_behavior: Specify the behavior of activity task polling + workflow_task_poller_behavior: Specify the behavior of workflow task polling. + Defaults to a 5-poller maximum. + activity_task_poller_behavior: Specify the behavior of activity task polling. + Defaults to a 5-poller maximum. """ if not activities and not workflows: raise ValueError("At least one activity or workflow must be specified") @@ -448,6 +457,15 @@ def __init__( build_id=build_id ) + if max_concurrent_workflow_task_polls: + workflow_task_poller_behavior = PollerBehaviorSimpleMaximum( + maximum=max_concurrent_workflow_task_polls + ) + if max_concurrent_activity_task_polls: + activity_task_poller_behavior = PollerBehaviorSimpleMaximum( + maximum=max_concurrent_activity_task_polls + ) + # Create bridge worker last. We have empirically observed that if it is # created before an error is raised from the activity worker # constructor, a deadlock/hang will occur presumably while trying to diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index 934d2c621..c66f3b145 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -6,6 +6,7 @@ import uuid from datetime import timedelta from typing import Any, Awaitable, Callable, Optional, Sequence +from urllib.request import urlopen import temporalio.api.enums.v1 import temporalio.worker._worker @@ -20,6 +21,7 @@ ) from temporalio.client import BuildIdOpAddNewDefault, Client, TaskReachabilityType from temporalio.common import RawValue, VersioningBehavior +from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig from temporalio.service import RPCError from temporalio.testing import WorkflowEnvironment from temporalio.worker import ( @@ -42,7 +44,12 @@ WorkflowSlotInfo, ) from temporalio.workflow import VersioningIntent -from tests.helpers import assert_eventually, new_worker, worker_versioning_enabled +from tests.helpers import ( + assert_eventually, + find_free_port, + new_worker, + worker_versioning_enabled, +) # Passing through because Python 3.9 has an import bug at # https://github.com/python/cpython/issues/91351 @@ -923,13 +930,42 @@ async def test_workflows_can_use_default_versioning_behavior( async def test_can_run_autoscaling_polling_worker( client: Client, env: WorkflowEnvironment ): + # Create new runtime with Prom server + prom_addr = f"127.0.0.1:{find_free_port()}" + runtime = Runtime( + telemetry=TelemetryConfig( + metrics=PrometheusConfig(bind_address=prom_addr), + ) + ) + client = await Client.connect( + client.service_client.config.target_host, + namespace=client.namespace, + runtime=runtime, + ) + async with new_worker( client, WaitOnSignalWorkflow, activities=[say_hello], - workflow_task_poller_behavior=PollerBehaviorAutoscaling(), - activity_task_poller_behavior=PollerBehaviorAutoscaling(), + workflow_task_poller_behavior=PollerBehaviorAutoscaling(initial=2), + activity_task_poller_behavior=PollerBehaviorAutoscaling(initial=2), ) as w: + # Give pollers a beat to start + await asyncio.sleep(0.3) + + with urlopen(url=f"http://{prom_addr}/metrics") as f: + prom_str: str = f.read().decode("utf-8") + prom_lines = prom_str.splitlines() + matches = [line for line in prom_lines if "temporal_num_pollers" in line] + activity_pollers = [l for l in matches if "activity_task" in l] + assert len(activity_pollers) == 1 + assert activity_pollers[0].endswith("2") + workflow_pollers = [l for l in matches if "workflow_task" in l] + assert len(workflow_pollers) == 2 + # There's sticky & non-sticky pollers, and they may have a count of 1 or 2 depending on + # initialization timing. + assert workflow_pollers[0].endswith("2") or workflow_pollers[0].endswith("1") + assert workflow_pollers[1].endswith("2") or workflow_pollers[1].endswith("1") async def do_workflow(): wf = await client.start_workflow(