From 0b7c9a61135a952b8017b71732acb0583e770863 Mon Sep 17 00:00:00 2001 From: Wenonah Zhang Date: Wed, 28 Oct 2020 14:50:02 -0700 Subject: [PATCH 1/2] add client signal entity support --- .../models/DurableOrchestrationClient.py | 50 +++++++++++++++++++ .../models/RpcManagementOptions.py | 15 ++++-- .../models/utils/entity_utils.py | 13 +++++ 3 files changed, 75 insertions(+), 3 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index 1442124a..039ae6ba 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -13,6 +13,7 @@ from .OrchestrationRuntimeStatus import OrchestrationRuntimeStatus from ..models.DurableOrchestrationBindings import DurableOrchestrationBindings from .utils.http_utils import get_async_request, post_async_request, delete_async_request +from .utils.entity_utils import EntityId from azure.functions._durable_functions import _serialize_custom_object @@ -457,6 +458,55 @@ async def wait_for_completion_or_create_check_status_response( else: return self.create_check_status_response(request, instance_id) + async def signal_entity(self, entityId: EntityId, operation_name: str, + operation_input: Optional[Any] = None, + task_hub_name: Optional[str] = None, + connection_name: Optional[str] = None) -> None: + """Signals an entity to perform an operation + + Parameters + ---------- + entityId : EntityId + The EntityId of the targeted entity to perform operation. + operation_name: str + The name of the operation. + operation_input: Optional[Any] + The content for the operation. + task_hub_name: Optional[str] + The task hub name of the target entity. + connection_name: Optional[str] + The name of the connection string associated with [task_hub_name]. + + Raises + ------ + Exception: + When the signal entity call failed with an unexpected status code + + Returns + ------- + None + """ + options = RpcManagementOptions(operation_name=operation_name, + connection_name=connection_name, + task_hub_name=task_hub_name, + entity_Id=entityId) + + request_url = options.to_url(self._orchestration_bindings.rpc_base_url) + response = await self._post_async_request(request_url, json.dumps(operation_input) if operation_input else None) + + switch_statement = { + 202: lambda: None # signal accepted + } + + has_error_message = switch_statement.get( + response[0], + lambda: f"The operation failed with an unexpected status code {response[0]}") + + error_message = has_error_message() + + if error_message: + raise Exception(error_message) + @staticmethod def _create_http_response( status_code: int, body: Union[str, Any]) -> func.HttpResponse: diff --git a/azure/durable_functions/models/RpcManagementOptions.py b/azure/durable_functions/models/RpcManagementOptions.py index c16c508f..308c8ade 100644 --- a/azure/durable_functions/models/RpcManagementOptions.py +++ b/azure/durable_functions/models/RpcManagementOptions.py @@ -4,6 +4,7 @@ from azure.durable_functions.constants import DATETIME_STRING_FORMAT from azure.durable_functions.models.OrchestrationRuntimeStatus import OrchestrationRuntimeStatus +from .utils.entity_utils import EntityId class RpcManagementOptions: """Class used to collect the options for getting orchestration status.""" @@ -12,7 +13,9 @@ def __init__(self, instance_id: str = None, task_hub_name: str = None, connection_name: str = None, show_history: bool = None, show_history_output: bool = None, created_time_from: datetime = None, created_time_to: datetime = None, - runtime_status: List[OrchestrationRuntimeStatus] = None, show_input: bool = None): + runtime_status: List[OrchestrationRuntimeStatus] = None, show_input: bool = None, + operation_name : str = None, + entity_Id: EntityId = None): self._instance_id = instance_id self._task_hub_name = task_hub_name self._connection_name = connection_name @@ -22,6 +25,8 @@ def __init__(self, instance_id: str = None, task_hub_name: str = None, self._created_time_to = created_time_to self._runtime_status = runtime_status self._show_input = show_input + self.operation_name = operation_name + self.entity_Id = entity_Id @staticmethod def _add_arg(query: List[str], name: str, value: Any): @@ -33,7 +38,7 @@ def _add_date_arg(query: List[str], name: str, value: Optional[datetime]): if value: date_as_string = value.strftime(DATETIME_STRING_FORMAT) RpcManagementOptions._add_arg(query, name, date_as_string) - + def to_url(self, base_url: Optional[str]) -> str: """Get the url based on the options selected. @@ -55,7 +60,10 @@ def to_url(self, base_url: Optional[str]) -> str: if base_url is None: raise ValueError("orchestration bindings has not RPC base url") - url = f"{base_url}instances/{self._instance_id if self._instance_id else ''}" + if self.entity_Id: + url = f'{base_url}{EntityId.get_entity_id_url_path(self.entity_Id)}' + else: + url = f"{base_url}instances/{self._instance_id if self._instance_id else ''}" query: List[str] = [] @@ -66,6 +74,7 @@ def to_url(self, base_url: Optional[str]) -> str: self._add_arg(query, 'showHistoryOutput', self._show_history_output) self._add_date_arg(query, 'createdTimeFrom', self._created_time_from) self._add_date_arg(query, 'createdTimeTo', self._created_time_to) + self._add_arg(query, 'op', self.operation_name) if self._runtime_status is not None and len(self._runtime_status) > 0: runtime_status = ",".join(r.value for r in self._runtime_status) self._add_arg(query, 'runtimeStatus', runtime_status) diff --git a/azure/durable_functions/models/utils/entity_utils.py b/azure/durable_functions/models/utils/entity_utils.py index 0ff33dd4..9cc9531d 100644 --- a/azure/durable_functions/models/utils/entity_utils.py +++ b/azure/durable_functions/models/utils/entity_utils.py @@ -69,6 +69,19 @@ def get_entity_id(scheduler_id: str) -> 'EntityId': [name, key] = components return EntityId(name, key) + @staticmethod + def get_entity_id_url_path(entity_id: 'EntityId') -> str: + + """Print the the entity url path. + + Returns + ------- + str: + A url path of the EntityId + """ + + return f'entities/{entity_id.name}/{entity_id.key}' + def __str__(self) -> str: """Print the string representation of this EntityId. From 036cc1c7b59f4eaf9611c7f68c4409ef85f90b42 Mon Sep 17 00:00:00 2001 From: Wenonah Zhang Date: Wed, 28 Oct 2020 15:54:07 -0700 Subject: [PATCH 2/2] modify the formating --- .../models/DurableOrchestrationClient.py | 20 ++++++++++--------- .../models/RpcManagementOptions.py | 5 +++-- .../models/utils/entity_utils.py | 2 -- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index 039ae6ba..390e0891 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -459,10 +459,10 @@ async def wait_for_completion_or_create_check_status_response( return self.create_check_status_response(request, instance_id) async def signal_entity(self, entityId: EntityId, operation_name: str, - operation_input: Optional[Any] = None, - task_hub_name: Optional[str] = None, - connection_name: Optional[str] = None) -> None: - """Signals an entity to perform an operation + operation_input: Optional[Any] = None, + task_hub_name: Optional[str] = None, + connection_name: Optional[str] = None) -> None: + """Signals an entity to perform an operation. Parameters ---------- @@ -487,12 +487,14 @@ async def signal_entity(self, entityId: EntityId, operation_name: str, None """ options = RpcManagementOptions(operation_name=operation_name, - connection_name=connection_name, - task_hub_name=task_hub_name, - entity_Id=entityId) + connection_name=connection_name, + task_hub_name=task_hub_name, + entity_Id=entityId) request_url = options.to_url(self._orchestration_bindings.rpc_base_url) - response = await self._post_async_request(request_url, json.dumps(operation_input) if operation_input else None) + response = await self._post_async_request( + request_url, + json.dumps(operation_input) if operation_input else None) switch_statement = { 202: lambda: None # signal accepted @@ -501,7 +503,7 @@ async def signal_entity(self, entityId: EntityId, operation_name: str, has_error_message = switch_statement.get( response[0], lambda: f"The operation failed with an unexpected status code {response[0]}") - + error_message = has_error_message() if error_message: diff --git a/azure/durable_functions/models/RpcManagementOptions.py b/azure/durable_functions/models/RpcManagementOptions.py index 308c8ade..b41d1493 100644 --- a/azure/durable_functions/models/RpcManagementOptions.py +++ b/azure/durable_functions/models/RpcManagementOptions.py @@ -6,6 +6,7 @@ from .utils.entity_utils import EntityId + class RpcManagementOptions: """Class used to collect the options for getting orchestration status.""" @@ -14,7 +15,7 @@ def __init__(self, instance_id: str = None, task_hub_name: str = None, show_history_output: bool = None, created_time_from: datetime = None, created_time_to: datetime = None, runtime_status: List[OrchestrationRuntimeStatus] = None, show_input: bool = None, - operation_name : str = None, + operation_name: str = None, entity_Id: EntityId = None): self._instance_id = instance_id self._task_hub_name = task_hub_name @@ -38,7 +39,7 @@ def _add_date_arg(query: List[str], name: str, value: Optional[datetime]): if value: date_as_string = value.strftime(DATETIME_STRING_FORMAT) RpcManagementOptions._add_arg(query, name, date_as_string) - + def to_url(self, base_url: Optional[str]) -> str: """Get the url based on the options selected. diff --git a/azure/durable_functions/models/utils/entity_utils.py b/azure/durable_functions/models/utils/entity_utils.py index 9cc9531d..f5669323 100644 --- a/azure/durable_functions/models/utils/entity_utils.py +++ b/azure/durable_functions/models/utils/entity_utils.py @@ -71,7 +71,6 @@ def get_entity_id(scheduler_id: str) -> 'EntityId': @staticmethod def get_entity_id_url_path(entity_id: 'EntityId') -> str: - """Print the the entity url path. Returns @@ -79,7 +78,6 @@ def get_entity_id_url_path(entity_id: 'EntityId') -> str: str: A url path of the EntityId """ - return f'entities/{entity_id.name}/{entity_id.key}' def __str__(self) -> str: