-
Notifications
You must be signed in to change notification settings - Fork 58
Durable Entities #184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Durable Entities #184
Changes from all commits
0249145
87b7640
e24f8a6
4f27d2b
cf8d86a
402b9ad
14844b3
e69b2b5
98db559
0247d32
b2c3707
b513535
023fd70
65188f2
6c25a78
1881257
a30f63b
33fb7ca
7f350a2
69ec826
f047698
59e7708
dc90330
0d9a327
9c48eb0
4bb82dc
5105a54
4d6f22b
af482d2
523de06
0f8679b
d95c56d
4241369
85192e5
274d4c0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
from .models import DurableEntityContext | ||
from .models.entities import OperationResult, EntityState | ||
from datetime import datetime | ||
from typing import Callable, Any, List, Dict | ||
|
||
class InternalEntityException(Exception): | ||
pass | ||
|
||
class Entity: | ||
"""Durable Entity Class. | ||
|
||
Responsible for executing the user-defined entity function. | ||
""" | ||
|
||
def __init__(self, entity_func: Callable[[DurableEntityContext], None]): | ||
"""Create a new entity for the user-defined entity. | ||
|
||
Responsible for executing the user-defined entity function | ||
|
||
Parameters | ||
---------- | ||
entity_func: Callable[[DurableEntityContext], Generator[Any, Any, Any]] | ||
The user defined entity function | ||
""" | ||
self.fn: Callable[[DurableEntityContext], None] = entity_func | ||
|
||
def handle(self, context: DurableEntityContext, batch: List[Dict[str, Any]]) -> str: | ||
"""Handle the execution of the user-defined entity function. | ||
|
||
Loops over the batch, which serves to specify inputs to the entity, | ||
and collects results and generates a final state, which are returned. | ||
|
||
Parameters | ||
---------- | ||
context: DurableEntityContext | ||
The entity context of the entity, which the user interacts with as their Durable API | ||
|
||
Returns | ||
------- | ||
str | ||
A JSON-formatted string representing the output state, results, and exceptions for the | ||
entity execution. | ||
""" | ||
response = EntityState(results=[], signals=[]) | ||
for operation_data in batch: | ||
result: Any = None | ||
is_error: bool = False | ||
start_time: datetime = datetime.now() | ||
|
||
try: | ||
# populate context | ||
operation = operation_data["name"] | ||
if operation is None: | ||
raise InternalEntityException("Durable Functions Internal Error: Entity operation was missing a name field") | ||
context._operation = operation | ||
context._input = operation_data["input"] | ||
self.fn(context) | ||
result = context._result | ||
|
||
except InternalEntityException as e: | ||
raise e | ||
|
||
except Exception as e: | ||
is_error = True | ||
result = str(e) | ||
|
||
duration: int = self._elapsed_milliseconds_since(start_time) | ||
operation_result = OperationResult( | ||
is_error=is_error, | ||
duration=duration, | ||
result=result | ||
) | ||
response.results.append(operation_result) | ||
|
||
response.state = context._state | ||
response.entity_exists = context._exists | ||
return response.to_json_string() | ||
|
||
@classmethod | ||
def create(cls, fn: Callable[[DurableEntityContext], None]) -> Callable[[Any], str]: | ||
"""Create an instance of the entity class. | ||
|
||
Parameters | ||
---------- | ||
fn (Callable[[DurableEntityContext], None]): [description] | ||
|
||
Returns | ||
------- | ||
Callable[[Any], str] | ||
Handle function of the newly created entity client | ||
""" | ||
def handle(context) -> str: | ||
# It is not clear when the context JSON would be found | ||
# inside a "body"-key, but this pattern matches the | ||
# orchestrator implementation, so we keep it for safety. | ||
context_body = getattr(context, "body", None) | ||
if context_body is None: | ||
context_body = context | ||
ctx, batch = DurableEntityContext.from_json(context_body) | ||
return Entity(fn).handle(ctx, batch) | ||
return handle | ||
|
||
def _elapsed_milliseconds_since(self, start_time: datetime) -> int: | ||
"""Calculate the elapsed time, in milliseconds, from the start_time to the present. | ||
|
||
Parameters | ||
---------- | ||
start_time: datetime | ||
The timestamp of when the entity began processing a batched request. | ||
|
||
Returns | ||
------- | ||
int | ||
The time, in millseconds, from start_time to now | ||
""" | ||
end_time = datetime.now() | ||
time_diff = end_time - start_time | ||
elapsed_time = int(time_diff.total_seconds() * 1000) | ||
return elapsed_time |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,200 @@ | ||
from typing import Optional, Any, Dict, Tuple, List, Callable | ||
from azure.functions._durable_functions import _deserialize_custom_object | ||
import json | ||
|
||
|
||
class DurableEntityContext: | ||
ConnorMcMahon marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Context of the durable entity context. | ||
|
||
Describes the API used to specify durable entity user code. | ||
""" | ||
|
||
def __init__(self, | ||
name: str, | ||
key: str, | ||
exists: bool, | ||
state: Any): | ||
"""Context of the durable entity context. | ||
|
||
Describes the API used to specify durable entity user code. | ||
|
||
Parameters | ||
---------- | ||
name: str | ||
The name of the Durable Entity | ||
key: str | ||
The key of the Durable Entity | ||
exists: bool | ||
Flag to determine if the entity exists | ||
state: Any | ||
The internal state of the Durable Entity | ||
""" | ||
self._entity_name: str = name | ||
self._entity_key: str = key | ||
|
||
self._exists: bool = exists | ||
self._is_newly_constructed: bool = False | ||
|
||
self._state: Any = state | ||
self._input: Any = None | ||
self._operation: Optional[str] = None | ||
self._result: Any = None | ||
|
||
@property | ||
def entity_name(self) -> str: | ||
"""Get the name of the Entity. | ||
|
||
Returns | ||
------- | ||
str | ||
The name of the entity | ||
""" | ||
return self._entity_name | ||
|
||
@property | ||
def entity_key(self) -> str: | ||
"""Get the Entity key. | ||
|
||
Returns | ||
------- | ||
str | ||
The entity key | ||
""" | ||
return self._entity_key | ||
|
||
@property | ||
def operation_name(self) -> Optional[str]: | ||
"""Get the current operation name. | ||
|
||
Returns | ||
------- | ||
Optional[str] | ||
The current operation name | ||
""" | ||
if self._operation is None: | ||
raise Exception("Entity operation is unassigned") | ||
return self._operation | ||
|
||
@property | ||
def is_newly_constructed(self) -> bool: | ||
"""Determine if the Entity was newly constructed. | ||
|
||
Returns | ||
------- | ||
bool | ||
True if the Entity was newly constructed. False otherwise. | ||
""" | ||
# This is not updated at the moment, as its semantics are unclear | ||
return self._is_newly_constructed | ||
|
||
@classmethod | ||
def from_json(cls, json_str: str) -> Tuple['DurableEntityContext', List[Dict[str, Any]]]: | ||
"""Instantiate a DurableEntityContext from a JSON-formatted string. | ||
|
||
Parameters | ||
---------- | ||
json_string: str | ||
A JSON-formatted string, returned by the durable-extension, | ||
which represents the entity context | ||
|
||
Returns | ||
------- | ||
DurableEntityContext | ||
The DurableEntityContext originated from the input string | ||
""" | ||
json_dict = json.loads(json_str) | ||
json_dict["name"] = json_dict["self"]["name"] | ||
json_dict["key"] = json_dict["self"]["key"] | ||
json_dict.pop("self") | ||
|
||
serialized_state = json_dict["state"] | ||
if serialized_state is not None: | ||
json_dict["state"] = from_json_util(serialized_state) | ||
|
||
batch = json_dict.pop("batch") | ||
return cls(**json_dict), batch | ||
|
||
def set_state(self, state: Any) -> None: | ||
"""Set the state of the entity. | ||
|
||
Parameter | ||
--------- | ||
state: Any | ||
The new state of the entity | ||
""" | ||
self._exists = True | ||
|
||
# should only serialize the state at the end of the batch | ||
self._state = state | ||
|
||
def get_state(self, initializer: Optional[Callable[[], Any]] = None) -> Any: | ||
davidmrdavid marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Get the current state of this entity. | ||
|
||
Parameters | ||
---------- | ||
initializer: Optional[Callable[[], Any]] | ||
A 0-argument function to provide an initial state. Defaults to None. | ||
|
||
Returns | ||
------- | ||
Any | ||
The current state of the entity | ||
""" | ||
state = self._state | ||
if state is not None: | ||
return state | ||
elif initializer: | ||
if not callable(initializer): | ||
raise Exception("initializer argument needs to be a callable function") | ||
state = initializer() | ||
return state | ||
|
||
def get_input(self) -> Any: | ||
"""Get the input for this operation. | ||
|
||
Returns | ||
------- | ||
Any | ||
The input for the current operation | ||
""" | ||
input_ = None | ||
req_input = self._input | ||
req_input = json.loads(req_input) | ||
input_ = None if req_input is None else from_json_util(req_input) | ||
return input_ | ||
|
||
def set_result(self, result: Any) -> None: | ||
"""Set the result (return value) of the entity. | ||
|
||
Paramaters | ||
---------- | ||
result: Any | ||
The result / return value for the entity | ||
""" | ||
self._exists = True | ||
self._result = result | ||
|
||
def destruct_on_exit(self) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure how we handle this in JS or C#, but there is the question of how to handle subsequent operations after this within a batch. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's check-in with the team then 😉 What are the semantics for a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll be tracking this here: #222 |
||
"""Delete this entity after the operation completes.""" | ||
self._exists = False | ||
self._state = None | ||
|
||
def from_json_util(self, json_str: str) -> Any: | ||
"""Load an arbitrary datatype from its JSON representation. | ||
|
||
The Out-of-proc SDK has a special JSON encoding strategy | ||
to enable arbitrary datatypes to be serialized. This utility | ||
loads a JSON with the assumption that it follows that encoding | ||
method. | ||
|
||
Parameters | ||
---------- | ||
json_str: str | ||
A JSON-formatted string, from durable-extension | ||
|
||
Returns | ||
------- | ||
Any: | ||
The original datatype that was serialized | ||
""" | ||
return json.loads(json_str, object_hook=_deserialize_custom_object) |
Uh oh!
There was an error while loading. Please reload this page.