Skip to content

Signal Entity #206

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion azure/durable_functions/models/DurableEntityContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ def set_state(self, state: Any) -> None:
"""
# TODO: enable serialization of custom types
self._exists = True
self._state = json.dumps(state)

# should only serialize the state at the end of a batch
self._state = state

def get_state(self, initializer: Optional[Callable[[], Any]] = None) -> Any:
"""Get the current state of this entity.
Expand Down
23 changes: 22 additions & 1 deletion azure/durable_functions/models/DurableOrchestrationContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
from .utils.entity_utils import EntityId
from ..tasks import call_activity_task, task_all, task_any, call_activity_with_retry_task, \
wait_for_external_event_task, continue_as_new, new_uuid, call_http, create_timer_task, \
call_sub_orchestrator_task, call_sub_orchestrator_with_retry_task, call_entity_task
call_sub_orchestrator_task, call_sub_orchestrator_with_retry_task, call_entity_task, \
signal_entity_task
from azure.functions._durable_functions import _deserialize_custom_object


Expand Down Expand Up @@ -380,6 +381,26 @@ def call_entity(self, entityId: EntityId,
"""
return call_entity_task(self.histories, entityId, operationName, operationInput)

def signal_entity(self, entityId: EntityId,
operationName: str, operationInput: Optional[Any] = None):
"""Send a signal operation to Durable Entity given some input.
Parameters
----------
entityId: EntityId
The ID of the entity to call
operationName: str
The operation to execute
operationInput: Optional[Any]
The input for tne operation, defaults to None.
Returns
-------
Task
A Task of the entity signal
"""
return signal_entity_task(self, self.histories, entityId, operationName, operationInput)

@property
def will_continue_as_new(self) -> bool:
"""Return true if continue_as_new was called."""
Expand Down
1 change: 1 addition & 0 deletions azure/durable_functions/models/actions/ActionType.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ class ActionType(IntEnum):
WAIT_FOR_EXTERNAL_EVENT: int = 6
CALL_ENTITY = 7
CALL_HTTP: int = 8
SIGNAL_ENTITY: int = 9
47 changes: 47 additions & 0 deletions azure/durable_functions/models/actions/SignalEntityAction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from typing import Any, Dict

from .Action import Action
from .ActionType import ActionType
from ..utils.json_utils import add_attrib
from json import dumps
from azure.functions._durable_functions import _serialize_custom_object
from ..utils.entity_utils import EntityId


class SignalEntityAction(Action):
"""Defines the structure of the Signal Entity object.

Provides the information needed by the durable extension to be able to signal an entity
"""

def __init__(self, entity_id: EntityId, operation: str, input_=None):
self.entity_id: EntityId = entity_id

# Validating that EntityId exists before trying to parse its instanceId
if not self.entity_id:
raise ValueError("entity_id cannot be empty")

self.instance_id: str = EntityId.get_scheduler_id(entity_id)
self.operation: str = operation
self.input_: str = dumps(input_, default=_serialize_custom_object)

@property
def action_type(self) -> int:
"""Get the type of action this class represents."""
return ActionType.SIGNAL_ENTITY

def to_json(self) -> Dict[str, Any]:
"""Convert object into a json dictionary.

Returns
-------
Dict[str, Any]
The instance of the class converted into a json dictionary
"""
json_dict: Dict[str, Any] = {}
add_attrib(json_dict, self, "action_type", "actionType")
add_attrib(json_dict, self, 'instance_id', 'instanceId')
add_attrib(json_dict, self, 'operation', 'operation')
add_attrib(json_dict, self, 'input_', 'input')

return json_dict
2 changes: 1 addition & 1 deletion azure/durable_functions/models/entities/EntityState.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def to_json(self) -> Dict[str, Any]:
serialized_results = list(map(lambda x: x.to_json(), self.results))

json_dict["entityExists"] = self.entity_exists
json_dict["entityState"] = self.state
json_dict["entityState"] = json.dumps(self.state)
json_dict["results"] = serialized_results
json_dict["signals"] = self.signals
return json_dict
Expand Down
2 changes: 2 additions & 0 deletions azure/durable_functions/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
from .call_http import call_http
from .create_timer import create_timer_task
from .call_entity import call_entity_task
from .signal_entity import signal_entity_task

__all__ = [
'call_activity_task',
'call_activity_with_retry_task',
'call_sub_orchestrator_task',
'call_sub_orchestrator_with_retry_task',
'call_entity_task',
'signal_entity_task',
'call_http',
'continue_as_new',
'new_uuid',
Expand Down
8 changes: 7 additions & 1 deletion azure/durable_functions/tasks/call_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from ..models.utils.entity_utils import EntityId
from ..models.entities.RequestMessage import RequestMessage
from ..models.entities.ResponseMessage import ResponseMessage
import json


def call_entity_task(
Expand Down Expand Up @@ -64,7 +65,12 @@ def call_entity_task(
if event_raised is not None:
response = parse_history_event(event_raised)
response = ResponseMessage.from_dict(response)
result = response.result

# TODO: json.loads inside parse_history_event is not recursvie
# investigate if response.result is used elsewhere,
# which probably requires another deserialization
result = json.loads(response.result)

return Task(
is_completed=True,
is_faulted=False,
Expand Down
46 changes: 46 additions & 0 deletions azure/durable_functions/tasks/signal_entity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import List, Any, Optional
from ..models.actions.SignalEntityAction import SignalEntityAction
from ..models.history import HistoryEvent, HistoryEventType
from .task_utilities import set_processed, find_event
from ..models.utils.entity_utils import EntityId


def signal_entity_task(
context,
state: List[HistoryEvent],
entity_id: EntityId,
operation_name: str = "",
input_: Optional[Any] = None):
"""Signal a entity operation.

It the action hasn't been scheduled, it appends the action.
If the action has been scheduled, no ops.

Parameters
----------
state: List[HistoryEvent]
The list of history events to search over to determine the
current state of the callEntity Task.
entity_id: EntityId
An identifier for the entity to call.
operation_name: str
The name of the operation the entity needs to execute.
input_: Any
The JSON-serializable input to pass to the activity function.
"""
new_action = SignalEntityAction(entity_id, operation_name, input_)
scheduler_id = EntityId.get_scheduler_id(entity_id=entity_id)

hist_type = HistoryEventType.EVENT_SENT
extra_constraints = {
"InstanceId": scheduler_id,
"Name": "op"
}

event_sent = find_event(state, hist_type, extra_constraints)
set_processed([event_sent])

if event_sent:
return

context.actions.append([new_action])
2 changes: 1 addition & 1 deletion tests/models/test_DurableOrchestrationClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ async def test_wait_or_response_check_status_response(binding_string):


@pytest.mark.asyncio
async def test_wait_or_response_check_status_response(binding_string):
async def test_wait_or_response_null_request(binding_string):
status = dict(createdTime=TEST_CREATED_TIME,
lastUpdatedTime=TEST_LAST_UPDATED_TIME,
runtimeStatus="Running")
Expand Down
38 changes: 33 additions & 5 deletions tests/orchestrator/test_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,39 @@
from azure.durable_functions.models.OrchestratorState import OrchestratorState
from azure.durable_functions.models.actions.CallEntityAction \
import CallEntityAction
from azure.durable_functions.models.actions.SignalEntityAction \
import SignalEntityAction
from tests.test_utils.testClasses import SerializableClass
import azure.durable_functions as df
from typing import Any

def generator_function(context):
def generator_function_call_entity(context):
outputs = []
entityId = df.EntityId("Counter", "myCounter")
x = yield context.call_entity(entityId, "add", 3)

outputs.append(x)
return outputs

def generator_function_signal_entity(context):
outputs = []
entityId = df.EntityId("Counter", "myCounter")
context.signal_entity(entityId, "add", 3)
x = yield context.call_entity(entityId, "get")

outputs.append(x)
return outputs

def base_expected_state(output=None) -> OrchestratorState:
return OrchestratorState(is_done=False, actions=[], output=output)


def add_call_entity_action(state: OrchestratorState, id_: df.EntityId, op: str, input_: Any):
action = CallEntityAction(entity_id=id_, operation=op, input_=input_)
state.actions.append([action])

def add_signal_entity_action(state: OrchestratorState, id_: df.EntityId, op: str, input_: Any):
action = SignalEntityAction(entity_id=id_, operation=op, input_=input_)
state.actions.append([action])

def add_call_entity_completed_events(
context_builder: ContextBuilder, op: str, instance_id=str, input_=None):
Expand All @@ -38,22 +50,38 @@ def test_call_entity_sent():

entityId = df.EntityId("Counter", "myCounter")
result = get_orchestration_state_result(
context_builder, generator_function)
context_builder, generator_function_call_entity)

expected_state = base_expected_state()
add_call_entity_action(expected_state, entityId, "add", 3)
expected = expected_state.to_json()

#assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

def test_signal_entity_sent():
context_builder = ContextBuilder('test_simple_function')

entityId = df.EntityId("Counter", "myCounter")
result = get_orchestration_state_result(
context_builder, generator_function_signal_entity)

expected_state = base_expected_state()
add_signal_entity_action(expected_state, entityId, "add", 3)
add_call_entity_action(expected_state, entityId, "get", None)
expected = expected_state.to_json()

#assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)


def test_call_entity_raised():
entityId = df.EntityId("Counter", "myCounter")
context_builder = ContextBuilder('test_simple_function')
add_call_entity_completed_events(context_builder, "add", df.EntityId.get_scheduler_id(entityId),3)
add_call_entity_completed_events(context_builder, "add", df.EntityId.get_scheduler_id(entityId), 3)

result = get_orchestration_state_result(
context_builder, generator_function)
context_builder, generator_function_call_entity)

expected_state = base_expected_state(
[3]
Expand Down
2 changes: 1 addition & 1 deletion tests/test_utils/ContextBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def add_event_raised_event(self, name:str, id_: int, input_=None, timestamp=None
event = self.get_base_event(HistoryEventType.EVENT_RAISED, id_=id_, timestamp=timestamp)
event.Name = name
if is_entity:
event.Input = json.dumps({ "result": input_ })
event.Input = json.dumps({ "result": json.dumps(input_) })
else:
event.Input = input_
# event.timestamp = timestamp
Expand Down