Skip to content

Add Client Signal Entity #216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions azure/durable_functions/models/DurableOrchestrationClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .OrchestrationRuntimeStatus import OrchestrationRuntimeStatus
from ..models.DurableOrchestrationBindings import DurableOrchestrationBindings
from .utils.http_utils import get_async_request, post_async_request, delete_async_request
from .utils.entity_utils import EntityId
from azure.functions._durable_functions import _serialize_custom_object


Expand Down Expand Up @@ -457,6 +458,57 @@ async def wait_for_completion_or_create_check_status_response(
else:
return self.create_check_status_response(request, instance_id)

async def signal_entity(self, entityId: EntityId, operation_name: str,
operation_input: Optional[Any] = None,
task_hub_name: Optional[str] = None,
connection_name: Optional[str] = None) -> None:
"""Signals an entity to perform an operation.

Parameters
----------
entityId : EntityId
The EntityId of the targeted entity to perform operation.
operation_name: str
The name of the operation.
operation_input: Optional[Any]
The content for the operation.
task_hub_name: Optional[str]
The task hub name of the target entity.
connection_name: Optional[str]
The name of the connection string associated with [task_hub_name].

Raises
------
Exception:
When the signal entity call failed with an unexpected status code

Returns
-------
None
"""
options = RpcManagementOptions(operation_name=operation_name,
connection_name=connection_name,
task_hub_name=task_hub_name,
entity_Id=entityId)

request_url = options.to_url(self._orchestration_bindings.rpc_base_url)
response = await self._post_async_request(
request_url,
json.dumps(operation_input) if operation_input else None)

switch_statement = {
202: lambda: None # signal accepted
}

has_error_message = switch_statement.get(
response[0],
lambda: f"The operation failed with an unexpected status code {response[0]}")

error_message = has_error_message()

if error_message:
raise Exception(error_message)

@staticmethod
def _create_http_response(
status_code: int, body: Union[str, Any]) -> func.HttpResponse:
Expand Down
14 changes: 12 additions & 2 deletions azure/durable_functions/models/RpcManagementOptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from azure.durable_functions.constants import DATETIME_STRING_FORMAT
from azure.durable_functions.models.OrchestrationRuntimeStatus import OrchestrationRuntimeStatus

from .utils.entity_utils import EntityId


class RpcManagementOptions:
"""Class used to collect the options for getting orchestration status."""
Expand All @@ -12,7 +14,9 @@ def __init__(self, instance_id: str = None, task_hub_name: str = None,
connection_name: str = None, show_history: bool = None,
show_history_output: bool = None, created_time_from: datetime = None,
created_time_to: datetime = None,
runtime_status: List[OrchestrationRuntimeStatus] = None, show_input: bool = None):
runtime_status: List[OrchestrationRuntimeStatus] = None, show_input: bool = None,
operation_name: str = None,
entity_Id: EntityId = None):
self._instance_id = instance_id
self._task_hub_name = task_hub_name
self._connection_name = connection_name
Expand All @@ -22,6 +26,8 @@ def __init__(self, instance_id: str = None, task_hub_name: str = None,
self._created_time_to = created_time_to
self._runtime_status = runtime_status
self._show_input = show_input
self.operation_name = operation_name
self.entity_Id = entity_Id

@staticmethod
def _add_arg(query: List[str], name: str, value: Any):
Expand Down Expand Up @@ -55,7 +61,10 @@ def to_url(self, base_url: Optional[str]) -> str:
if base_url is None:
raise ValueError("orchestration bindings has not RPC base url")

url = f"{base_url}instances/{self._instance_id if self._instance_id else ''}"
if self.entity_Id:
url = f'{base_url}{EntityId.get_entity_id_url_path(self.entity_Id)}'
else:
url = f"{base_url}instances/{self._instance_id if self._instance_id else ''}"

query: List[str] = []

Expand All @@ -66,6 +75,7 @@ def to_url(self, base_url: Optional[str]) -> str:
self._add_arg(query, 'showHistoryOutput', self._show_history_output)
self._add_date_arg(query, 'createdTimeFrom', self._created_time_from)
self._add_date_arg(query, 'createdTimeTo', self._created_time_to)
self._add_arg(query, 'op', self.operation_name)
if self._runtime_status is not None and len(self._runtime_status) > 0:
runtime_status = ",".join(r.value for r in self._runtime_status)
self._add_arg(query, 'runtimeStatus', runtime_status)
Expand Down
11 changes: 11 additions & 0 deletions azure/durable_functions/models/utils/entity_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ def get_entity_id(scheduler_id: str) -> 'EntityId':
[name, key] = components
return EntityId(name, key)

@staticmethod
def get_entity_id_url_path(entity_id: 'EntityId') -> str:
"""Print the the entity url path.

Returns
-------
str:
A url path of the EntityId
"""
return f'entities/{entity_id.name}/{entity_id.key}'

def __str__(self) -> str:
"""Print the string representation of this EntityId.

Expand Down