Skip to content

Commit dc90330

Browse files
authored
Add Client Signal Entity (#216)
* add client signal entity support * modify the formating
1 parent 69ec826 commit dc90330

File tree

3 files changed

+75
-2
lines changed

3 files changed

+75
-2
lines changed

azure/durable_functions/models/DurableOrchestrationClient.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from .OrchestrationRuntimeStatus import OrchestrationRuntimeStatus
1414
from ..models.DurableOrchestrationBindings import DurableOrchestrationBindings
1515
from .utils.http_utils import get_async_request, post_async_request, delete_async_request
16+
from .utils.entity_utils import EntityId
1617
from azure.functions._durable_functions import _serialize_custom_object
1718

1819

@@ -457,6 +458,57 @@ async def wait_for_completion_or_create_check_status_response(
457458
else:
458459
return self.create_check_status_response(request, instance_id)
459460

461+
async def signal_entity(self, entityId: EntityId, operation_name: str,
462+
operation_input: Optional[Any] = None,
463+
task_hub_name: Optional[str] = None,
464+
connection_name: Optional[str] = None) -> None:
465+
"""Signals an entity to perform an operation.
466+
467+
Parameters
468+
----------
469+
entityId : EntityId
470+
The EntityId of the targeted entity to perform operation.
471+
operation_name: str
472+
The name of the operation.
473+
operation_input: Optional[Any]
474+
The content for the operation.
475+
task_hub_name: Optional[str]
476+
The task hub name of the target entity.
477+
connection_name: Optional[str]
478+
The name of the connection string associated with [task_hub_name].
479+
480+
Raises
481+
------
482+
Exception:
483+
When the signal entity call failed with an unexpected status code
484+
485+
Returns
486+
-------
487+
None
488+
"""
489+
options = RpcManagementOptions(operation_name=operation_name,
490+
connection_name=connection_name,
491+
task_hub_name=task_hub_name,
492+
entity_Id=entityId)
493+
494+
request_url = options.to_url(self._orchestration_bindings.rpc_base_url)
495+
response = await self._post_async_request(
496+
request_url,
497+
json.dumps(operation_input) if operation_input else None)
498+
499+
switch_statement = {
500+
202: lambda: None # signal accepted
501+
}
502+
503+
has_error_message = switch_statement.get(
504+
response[0],
505+
lambda: f"The operation failed with an unexpected status code {response[0]}")
506+
507+
error_message = has_error_message()
508+
509+
if error_message:
510+
raise Exception(error_message)
511+
460512
@staticmethod
461513
def _create_http_response(
462514
status_code: int, body: Union[str, Any]) -> func.HttpResponse:

azure/durable_functions/models/RpcManagementOptions.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from azure.durable_functions.constants import DATETIME_STRING_FORMAT
55
from azure.durable_functions.models.OrchestrationRuntimeStatus import OrchestrationRuntimeStatus
66

7+
from .utils.entity_utils import EntityId
8+
79

810
class RpcManagementOptions:
911
"""Class used to collect the options for getting orchestration status."""
@@ -12,7 +14,9 @@ def __init__(self, instance_id: str = None, task_hub_name: str = None,
1214
connection_name: str = None, show_history: bool = None,
1315
show_history_output: bool = None, created_time_from: datetime = None,
1416
created_time_to: datetime = None,
15-
runtime_status: List[OrchestrationRuntimeStatus] = None, show_input: bool = None):
17+
runtime_status: List[OrchestrationRuntimeStatus] = None, show_input: bool = None,
18+
operation_name: str = None,
19+
entity_Id: EntityId = None):
1620
self._instance_id = instance_id
1721
self._task_hub_name = task_hub_name
1822
self._connection_name = connection_name
@@ -22,6 +26,8 @@ def __init__(self, instance_id: str = None, task_hub_name: str = None,
2226
self._created_time_to = created_time_to
2327
self._runtime_status = runtime_status
2428
self._show_input = show_input
29+
self.operation_name = operation_name
30+
self.entity_Id = entity_Id
2531

2632
@staticmethod
2733
def _add_arg(query: List[str], name: str, value: Any):
@@ -55,7 +61,10 @@ def to_url(self, base_url: Optional[str]) -> str:
5561
if base_url is None:
5662
raise ValueError("orchestration bindings has not RPC base url")
5763

58-
url = f"{base_url}instances/{self._instance_id if self._instance_id else ''}"
64+
if self.entity_Id:
65+
url = f'{base_url}{EntityId.get_entity_id_url_path(self.entity_Id)}'
66+
else:
67+
url = f"{base_url}instances/{self._instance_id if self._instance_id else ''}"
5968

6069
query: List[str] = []
6170

@@ -66,6 +75,7 @@ def to_url(self, base_url: Optional[str]) -> str:
6675
self._add_arg(query, 'showHistoryOutput', self._show_history_output)
6776
self._add_date_arg(query, 'createdTimeFrom', self._created_time_from)
6877
self._add_date_arg(query, 'createdTimeTo', self._created_time_to)
78+
self._add_arg(query, 'op', self.operation_name)
6979
if self._runtime_status is not None and len(self._runtime_status) > 0:
7080
runtime_status = ",".join(r.value for r in self._runtime_status)
7181
self._add_arg(query, 'runtimeStatus', runtime_status)

azure/durable_functions/models/utils/entity_utils.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,17 @@ def get_entity_id(scheduler_id: str) -> 'EntityId':
6969
[name, key] = components
7070
return EntityId(name, key)
7171

72+
@staticmethod
73+
def get_entity_id_url_path(entity_id: 'EntityId') -> str:
74+
"""Print the the entity url path.
75+
76+
Returns
77+
-------
78+
str:
79+
A url path of the EntityId
80+
"""
81+
return f'entities/{entity_id.name}/{entity_id.key}'
82+
7283
def __str__(self) -> str:
7384
"""Print the string representation of this EntityId.
7485

0 commit comments

Comments
 (0)