Skip to content

Commit 69ec826

Browse files
authored
Merge pull request #206 from Azure/wenhzha/signal-entity
Signal Entity
2 parents 023fd70 + 7f350a2 commit 69ec826

File tree

11 files changed

+164
-11
lines changed

11 files changed

+164
-11
lines changed

azure/durable_functions/models/DurableEntityContext.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,9 @@ def set_state(self, state: Any) -> None:
124124
"""
125125
# TODO: enable serialization of custom types
126126
self._exists = True
127-
self._state = json.dumps(state)
127+
128+
# should only serialize the state at the end of a batch
129+
self._state = state
128130

129131
def get_state(self, initializer: Optional[Callable[[], Any]] = None) -> Any:
130132
"""Get the current state of this entity.

azure/durable_functions/models/DurableOrchestrationContext.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
from .utils.entity_utils import EntityId
1313
from ..tasks import call_activity_task, task_all, task_any, call_activity_with_retry_task, \
1414
wait_for_external_event_task, continue_as_new, new_uuid, call_http, create_timer_task, \
15-
call_sub_orchestrator_task, call_sub_orchestrator_with_retry_task, call_entity_task
15+
call_sub_orchestrator_task, call_sub_orchestrator_with_retry_task, call_entity_task, \
16+
signal_entity_task
1617
from azure.functions._durable_functions import _deserialize_custom_object
1718

1819

@@ -380,6 +381,26 @@ def call_entity(self, entityId: EntityId,
380381
"""
381382
return call_entity_task(self.histories, entityId, operationName, operationInput)
382383

384+
def signal_entity(self, entityId: EntityId,
385+
operationName: str, operationInput: Optional[Any] = None):
386+
"""Send a signal operation to Durable Entity given some input.
387+
388+
Parameters
389+
----------
390+
entityId: EntityId
391+
The ID of the entity to call
392+
operationName: str
393+
The operation to execute
394+
operationInput: Optional[Any]
395+
The input for tne operation, defaults to None.
396+
397+
Returns
398+
-------
399+
Task
400+
A Task of the entity signal
401+
"""
402+
return signal_entity_task(self, self.histories, entityId, operationName, operationInput)
403+
383404
@property
384405
def will_continue_as_new(self) -> bool:
385406
"""Return true if continue_as_new was called."""

azure/durable_functions/models/actions/ActionType.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ class ActionType(IntEnum):
1313
WAIT_FOR_EXTERNAL_EVENT: int = 6
1414
CALL_ENTITY = 7
1515
CALL_HTTP: int = 8
16+
SIGNAL_ENTITY: int = 9
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from typing import Any, Dict
2+
3+
from .Action import Action
4+
from .ActionType import ActionType
5+
from ..utils.json_utils import add_attrib
6+
from json import dumps
7+
from azure.functions._durable_functions import _serialize_custom_object
8+
from ..utils.entity_utils import EntityId
9+
10+
11+
class SignalEntityAction(Action):
12+
"""Defines the structure of the Signal Entity object.
13+
14+
Provides the information needed by the durable extension to be able to signal an entity
15+
"""
16+
17+
def __init__(self, entity_id: EntityId, operation: str, input_=None):
18+
self.entity_id: EntityId = entity_id
19+
20+
# Validating that EntityId exists before trying to parse its instanceId
21+
if not self.entity_id:
22+
raise ValueError("entity_id cannot be empty")
23+
24+
self.instance_id: str = EntityId.get_scheduler_id(entity_id)
25+
self.operation: str = operation
26+
self.input_: str = dumps(input_, default=_serialize_custom_object)
27+
28+
@property
29+
def action_type(self) -> int:
30+
"""Get the type of action this class represents."""
31+
return ActionType.SIGNAL_ENTITY
32+
33+
def to_json(self) -> Dict[str, Any]:
34+
"""Convert object into a json dictionary.
35+
36+
Returns
37+
-------
38+
Dict[str, Any]
39+
The instance of the class converted into a json dictionary
40+
"""
41+
json_dict: Dict[str, Any] = {}
42+
add_attrib(json_dict, self, "action_type", "actionType")
43+
add_attrib(json_dict, self, 'instance_id', 'instanceId')
44+
add_attrib(json_dict, self, 'operation', 'operation')
45+
add_attrib(json_dict, self, 'input_', 'input')
46+
47+
return json_dict

azure/durable_functions/models/entities/EntityState.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def to_json(self) -> Dict[str, Any]:
5555
serialized_results = list(map(lambda x: x.to_json(), self.results))
5656

5757
json_dict["entityExists"] = self.entity_exists
58-
json_dict["entityState"] = self.state
58+
json_dict["entityState"] = json.dumps(self.state)
5959
json_dict["results"] = serialized_results
6060
json_dict["signals"] = self.signals
6161
return json_dict

azure/durable_functions/tasks/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@
1212
from .call_http import call_http
1313
from .create_timer import create_timer_task
1414
from .call_entity import call_entity_task
15+
from .signal_entity import signal_entity_task
1516

1617
__all__ = [
1718
'call_activity_task',
1819
'call_activity_with_retry_task',
1920
'call_sub_orchestrator_task',
2021
'call_sub_orchestrator_with_retry_task',
2122
'call_entity_task',
23+
'signal_entity_task',
2224
'call_http',
2325
'continue_as_new',
2426
'new_uuid',

azure/durable_functions/tasks/call_entity.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from ..models.utils.entity_utils import EntityId
99
from ..models.entities.RequestMessage import RequestMessage
1010
from ..models.entities.ResponseMessage import ResponseMessage
11+
import json
1112

1213

1314
def call_entity_task(
@@ -64,7 +65,12 @@ def call_entity_task(
6465
if event_raised is not None:
6566
response = parse_history_event(event_raised)
6667
response = ResponseMessage.from_dict(response)
67-
result = response.result
68+
69+
# TODO: json.loads inside parse_history_event is not recursvie
70+
# investigate if response.result is used elsewhere,
71+
# which probably requires another deserialization
72+
result = json.loads(response.result)
73+
6874
return Task(
6975
is_completed=True,
7076
is_faulted=False,
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from typing import List, Any, Optional
2+
from ..models.actions.SignalEntityAction import SignalEntityAction
3+
from ..models.history import HistoryEvent, HistoryEventType
4+
from .task_utilities import set_processed, find_event
5+
from ..models.utils.entity_utils import EntityId
6+
7+
8+
def signal_entity_task(
9+
context,
10+
state: List[HistoryEvent],
11+
entity_id: EntityId,
12+
operation_name: str = "",
13+
input_: Optional[Any] = None):
14+
"""Signal a entity operation.
15+
16+
It the action hasn't been scheduled, it appends the action.
17+
If the action has been scheduled, no ops.
18+
19+
Parameters
20+
----------
21+
state: List[HistoryEvent]
22+
The list of history events to search over to determine the
23+
current state of the callEntity Task.
24+
entity_id: EntityId
25+
An identifier for the entity to call.
26+
operation_name: str
27+
The name of the operation the entity needs to execute.
28+
input_: Any
29+
The JSON-serializable input to pass to the activity function.
30+
"""
31+
new_action = SignalEntityAction(entity_id, operation_name, input_)
32+
scheduler_id = EntityId.get_scheduler_id(entity_id=entity_id)
33+
34+
hist_type = HistoryEventType.EVENT_SENT
35+
extra_constraints = {
36+
"InstanceId": scheduler_id,
37+
"Name": "op"
38+
}
39+
40+
event_sent = find_event(state, hist_type, extra_constraints)
41+
set_processed([event_sent])
42+
43+
if event_sent:
44+
return
45+
46+
context.actions.append([new_action])

tests/models/test_DurableOrchestrationClient.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ async def test_wait_or_response_check_status_response(binding_string):
492492

493493

494494
@pytest.mark.asyncio
495-
async def test_wait_or_response_check_status_response(binding_string):
495+
async def test_wait_or_response_null_request(binding_string):
496496
status = dict(createdTime=TEST_CREATED_TIME,
497497
lastUpdatedTime=TEST_LAST_UPDATED_TIME,
498498
runtimeStatus="Running")

tests/orchestrator/test_entity.py

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,39 @@
44
from azure.durable_functions.models.OrchestratorState import OrchestratorState
55
from azure.durable_functions.models.actions.CallEntityAction \
66
import CallEntityAction
7+
from azure.durable_functions.models.actions.SignalEntityAction \
8+
import SignalEntityAction
79
from tests.test_utils.testClasses import SerializableClass
810
import azure.durable_functions as df
911
from typing import Any
1012

11-
def generator_function(context):
13+
def generator_function_call_entity(context):
1214
outputs = []
1315
entityId = df.EntityId("Counter", "myCounter")
1416
x = yield context.call_entity(entityId, "add", 3)
1517

1618
outputs.append(x)
1719
return outputs
1820

21+
def generator_function_signal_entity(context):
22+
outputs = []
23+
entityId = df.EntityId("Counter", "myCounter")
24+
context.signal_entity(entityId, "add", 3)
25+
x = yield context.call_entity(entityId, "get")
26+
27+
outputs.append(x)
28+
return outputs
1929

2030
def base_expected_state(output=None) -> OrchestratorState:
2131
return OrchestratorState(is_done=False, actions=[], output=output)
2232

23-
2433
def add_call_entity_action(state: OrchestratorState, id_: df.EntityId, op: str, input_: Any):
2534
action = CallEntityAction(entity_id=id_, operation=op, input_=input_)
2635
state.actions.append([action])
2736

37+
def add_signal_entity_action(state: OrchestratorState, id_: df.EntityId, op: str, input_: Any):
38+
action = SignalEntityAction(entity_id=id_, operation=op, input_=input_)
39+
state.actions.append([action])
2840

2941
def add_call_entity_completed_events(
3042
context_builder: ContextBuilder, op: str, instance_id=str, input_=None):
@@ -38,22 +50,38 @@ def test_call_entity_sent():
3850

3951
entityId = df.EntityId("Counter", "myCounter")
4052
result = get_orchestration_state_result(
41-
context_builder, generator_function)
53+
context_builder, generator_function_call_entity)
4254

4355
expected_state = base_expected_state()
4456
add_call_entity_action(expected_state, entityId, "add", 3)
4557
expected = expected_state.to_json()
4658

4759
#assert_valid_schema(result)
4860
assert_orchestration_state_equals(expected, result)
61+
62+
def test_signal_entity_sent():
63+
context_builder = ContextBuilder('test_simple_function')
64+
65+
entityId = df.EntityId("Counter", "myCounter")
66+
result = get_orchestration_state_result(
67+
context_builder, generator_function_signal_entity)
68+
69+
expected_state = base_expected_state()
70+
add_signal_entity_action(expected_state, entityId, "add", 3)
71+
add_call_entity_action(expected_state, entityId, "get", None)
72+
expected = expected_state.to_json()
73+
74+
#assert_valid_schema(result)
75+
assert_orchestration_state_equals(expected, result)
76+
4977

5078
def test_call_entity_raised():
5179
entityId = df.EntityId("Counter", "myCounter")
5280
context_builder = ContextBuilder('test_simple_function')
53-
add_call_entity_completed_events(context_builder, "add", df.EntityId.get_scheduler_id(entityId),3)
81+
add_call_entity_completed_events(context_builder, "add", df.EntityId.get_scheduler_id(entityId), 3)
5482

5583
result = get_orchestration_state_result(
56-
context_builder, generator_function)
84+
context_builder, generator_function_call_entity)
5785

5886
expected_state = base_expected_state(
5987
[3]

0 commit comments

Comments
 (0)