diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index 1442124a..390e0891 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -13,6 +13,7 @@ from .OrchestrationRuntimeStatus import OrchestrationRuntimeStatus from ..models.DurableOrchestrationBindings import DurableOrchestrationBindings from .utils.http_utils import get_async_request, post_async_request, delete_async_request +from .utils.entity_utils import EntityId from azure.functions._durable_functions import _serialize_custom_object @@ -457,6 +458,57 @@ async def wait_for_completion_or_create_check_status_response( else: return self.create_check_status_response(request, instance_id) + async def signal_entity(self, entityId: EntityId, operation_name: str, + operation_input: Optional[Any] = None, + task_hub_name: Optional[str] = None, + connection_name: Optional[str] = None) -> None: + """Signals an entity to perform an operation. + + Parameters + ---------- + entityId : EntityId + The EntityId of the targeted entity to perform operation. + operation_name: str + The name of the operation. + operation_input: Optional[Any] + The content for the operation. + task_hub_name: Optional[str] + The task hub name of the target entity. + connection_name: Optional[str] + The name of the connection string associated with [task_hub_name]. + + Raises + ------ + Exception: + When the signal entity call failed with an unexpected status code + + Returns + ------- + None + """ + options = RpcManagementOptions(operation_name=operation_name, + connection_name=connection_name, + task_hub_name=task_hub_name, + entity_Id=entityId) + + request_url = options.to_url(self._orchestration_bindings.rpc_base_url) + response = await self._post_async_request( + request_url, + json.dumps(operation_input) if operation_input else None) + + switch_statement = { + 202: lambda: None # signal accepted + } + + has_error_message = switch_statement.get( + response[0], + lambda: f"The operation failed with an unexpected status code {response[0]}") + + error_message = has_error_message() + + if error_message: + raise Exception(error_message) + @staticmethod def _create_http_response( status_code: int, body: Union[str, Any]) -> func.HttpResponse: diff --git a/azure/durable_functions/models/RpcManagementOptions.py b/azure/durable_functions/models/RpcManagementOptions.py index c16c508f..b41d1493 100644 --- a/azure/durable_functions/models/RpcManagementOptions.py +++ b/azure/durable_functions/models/RpcManagementOptions.py @@ -4,6 +4,8 @@ from azure.durable_functions.constants import DATETIME_STRING_FORMAT from azure.durable_functions.models.OrchestrationRuntimeStatus import OrchestrationRuntimeStatus +from .utils.entity_utils import EntityId + class RpcManagementOptions: """Class used to collect the options for getting orchestration status.""" @@ -12,7 +14,9 @@ def __init__(self, instance_id: str = None, task_hub_name: str = None, connection_name: str = None, show_history: bool = None, show_history_output: bool = None, created_time_from: datetime = None, created_time_to: datetime = None, - runtime_status: List[OrchestrationRuntimeStatus] = None, show_input: bool = None): + runtime_status: List[OrchestrationRuntimeStatus] = None, show_input: bool = None, + operation_name: str = None, + entity_Id: EntityId = None): self._instance_id = instance_id self._task_hub_name = task_hub_name self._connection_name = connection_name @@ -22,6 +26,8 @@ def __init__(self, instance_id: str = None, task_hub_name: str = None, self._created_time_to = created_time_to self._runtime_status = runtime_status self._show_input = show_input + self.operation_name = operation_name + self.entity_Id = entity_Id @staticmethod def _add_arg(query: List[str], name: str, value: Any): @@ -55,7 +61,10 @@ def to_url(self, base_url: Optional[str]) -> str: if base_url is None: raise ValueError("orchestration bindings has not RPC base url") - url = f"{base_url}instances/{self._instance_id if self._instance_id else ''}" + if self.entity_Id: + url = f'{base_url}{EntityId.get_entity_id_url_path(self.entity_Id)}' + else: + url = f"{base_url}instances/{self._instance_id if self._instance_id else ''}" query: List[str] = [] @@ -66,6 +75,7 @@ def to_url(self, base_url: Optional[str]) -> str: self._add_arg(query, 'showHistoryOutput', self._show_history_output) self._add_date_arg(query, 'createdTimeFrom', self._created_time_from) self._add_date_arg(query, 'createdTimeTo', self._created_time_to) + self._add_arg(query, 'op', self.operation_name) if self._runtime_status is not None and len(self._runtime_status) > 0: runtime_status = ",".join(r.value for r in self._runtime_status) self._add_arg(query, 'runtimeStatus', runtime_status) diff --git a/azure/durable_functions/models/utils/entity_utils.py b/azure/durable_functions/models/utils/entity_utils.py index 0ff33dd4..f5669323 100644 --- a/azure/durable_functions/models/utils/entity_utils.py +++ b/azure/durable_functions/models/utils/entity_utils.py @@ -69,6 +69,17 @@ def get_entity_id(scheduler_id: str) -> 'EntityId': [name, key] = components return EntityId(name, key) + @staticmethod + def get_entity_id_url_path(entity_id: 'EntityId') -> str: + """Print the the entity url path. + + Returns + ------- + str: + A url path of the EntityId + """ + return f'entities/{entity_id.name}/{entity_id.key}' + def __str__(self) -> str: """Print the string representation of this EntityId.