-
Notifications
You must be signed in to change notification settings - Fork 58
Durable Entities #184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Durable Entities #184
Changes from 9 commits
0249145
87b7640
e24f8a6
4f27d2b
cf8d86a
402b9ad
14844b3
e69b2b5
98db559
0247d32
b2c3707
b513535
023fd70
65188f2
6c25a78
1881257
a30f63b
33fb7ca
7f350a2
69ec826
f047698
59e7708
dc90330
0d9a327
9c48eb0
4bb82dc
5105a54
4d6f22b
af482d2
523de06
0f8679b
d95c56d
4241369
85192e5
274d4c0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
from .models import DurableEntityContext | ||
from .models.entities import OperationResult, EntityState | ||
from datetime import datetime | ||
from typing import Callable, Any, List, Dict | ||
|
||
|
||
class Entity: | ||
"""Durable Entity Class. | ||
|
||
Responsible for execuitng the user-defined entity function. | ||
""" | ||
|
||
def __init__(self, entity_func: Callable[[DurableEntityContext], None]): | ||
"""Create a new entity for the user-defined entity. | ||
|
||
Responsible for executing the user-defined entity function | ||
|
||
Parameters | ||
---------- | ||
entity_func: Callable[[DurableEntityContext], Generator[Any, Any, Any]] | ||
The user defined entity function | ||
""" | ||
self.fn: Callable[[DurableEntityContext], None] = entity_func | ||
|
||
def handle(self, context: DurableEntityContext, batch: List[Dict[str, Any]]) -> str: | ||
davidmrdavid marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Handle the execution of the user-defined entity function. | ||
|
||
Loops over the batch, which serves to specify inputs to the entity, | ||
and collects results and generates a final state, which are returned. | ||
|
||
Parameters | ||
---------- | ||
context: DurableEntityContext | ||
The entity context of the entity, which the user interacts with as their Durable API | ||
|
||
Returns | ||
------- | ||
str | ||
A JSON-formatted string representing the output state, results, and exceptions for the | ||
entity execution. | ||
""" | ||
response = EntityState(results=[], signals=[]) | ||
for packet in batch: | ||
ConnorMcMahon marked this conversation as resolved.
Show resolved
Hide resolved
|
||
result: Any = None | ||
is_error: bool = False | ||
start_time: datetime = datetime.now() | ||
|
||
try: | ||
# populate context | ||
context._operation = packet["name"] | ||
context._input = packet["input"] | ||
self.fn(context) | ||
result = context._result | ||
|
||
except Exception as e: | ||
is_error = True | ||
result = str(e) | ||
|
||
duration: int = context._elapsed_milliseconds_since(start_time) | ||
ConnorMcMahon marked this conversation as resolved.
Show resolved
Hide resolved
|
||
operation_result = OperationResult( | ||
is_error=is_error, | ||
duration=duration, | ||
result=result | ||
) | ||
response.results.append(operation_result) | ||
|
||
response.state = context._state | ||
response.entity_exists = context._exists | ||
return response.to_json_string() | ||
|
||
@classmethod | ||
def create(cls, fn: Callable[[DurableEntityContext], None]) -> Callable[[Any], str]: | ||
"""Create an instance of the entity class. | ||
|
||
Parameters | ||
---------- | ||
fn (Callable[[DurableEntityContext], None]): [description] | ||
|
||
Returns | ||
------- | ||
Callable[[Any], str] | ||
Handle function of the newly created entity client | ||
""" | ||
# TODO: review types here! | ||
def handle(context) -> str: | ||
# TODO: this requires some commenting, where do we need to get this from the body | ||
context_body = getattr(context, "body", None) | ||
if context_body is None: | ||
context_body = context | ||
ctx, batch = DurableEntityContext.from_json(context_body) | ||
return Entity(fn).handle(ctx, batch) | ||
return handle |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,219 @@ | ||
from typing import Optional, Any, Dict, Tuple, List, Callable | ||
from azure.functions._durable_functions import _deserialize_custom_object | ||
from datetime import datetime | ||
import json | ||
|
||
|
||
class DurableEntityContext: | ||
ConnorMcMahon marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Context of the durable entity context. | ||
|
||
Describes the API used to specify durable entity user code. | ||
""" | ||
|
||
def __init__(self, | ||
name: str, | ||
key: str, | ||
exists: bool, | ||
state: Any): | ||
"""Context of the durable entity context. | ||
|
||
Describes the API used to specify durable entity user code. | ||
|
||
Parameters | ||
---------- | ||
name: str | ||
The name of the Durable Entity | ||
key: str | ||
The key of the Durable Entity | ||
exists: bool | ||
Flag to determine if the entity exists | ||
state: Any | ||
The internal state of the Durable Entity | ||
""" | ||
self._entity_name: str = name | ||
self._entity_key: str = key | ||
|
||
self._exists: bool = exists | ||
self._is_newly_constructed: bool = False | ||
|
||
self._state: Any = state | ||
self._input: Any = None | ||
self._operation: Optional[str] = None | ||
self._result: Any = None | ||
|
||
@property | ||
def entity_name(self) -> str: | ||
"""Get the name of the Entity. | ||
|
||
Returns | ||
------- | ||
str | ||
The name of the entity | ||
""" | ||
return self._entity_name | ||
|
||
@property | ||
def entity_key(self) -> str: | ||
"""Get the Entity key. | ||
|
||
Returns | ||
------- | ||
str | ||
The entity key | ||
""" | ||
return self._entity_key | ||
|
||
@property | ||
def operation_name(self) -> Optional[str]: | ||
"""Get the current operation name. | ||
|
||
Returns | ||
------- | ||
Optional[str] | ||
The current operation name | ||
""" | ||
# TODO: Maybe we should raise an | ||
ConnorMcMahon marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# exception if _operation is None | ||
return self._operation | ||
|
||
@property | ||
def is_newly_constructed(self) -> bool: | ||
"""Determine if the Entity was newly constructed. | ||
|
||
Returns | ||
------- | ||
bool | ||
True if the Entity was newly constructed. False otherwise. | ||
""" | ||
# TODO: not updating this atm | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if this even gets updated when C# handles a batch either, so this may not be necessary. We should consult Sebastian about this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a use for a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll be tracking this here: #222 |
||
return self._is_newly_constructed | ||
|
||
@classmethod | ||
def from_json(cls, json_str: str) -> Tuple['DurableEntityContext', List[Dict[str, Any]]]: | ||
"""Instantiate a DurableEntityContext from a JSON-formatted string. | ||
|
||
Parameters | ||
---------- | ||
json_string: str | ||
A JSON-formatted string, returned by the durable-extension, | ||
which represents the entity context | ||
|
||
Returns | ||
------- | ||
DurableEntityContext | ||
The DurableEntityContext originated from the input string | ||
""" | ||
json_dict = json.loads(json_str) | ||
json_dict["name"] = json_dict["self"]["name"] | ||
json_dict["key"] = json_dict["self"]["key"] | ||
json_dict.pop("self") | ||
|
||
serialized_state = json_dict["state"] | ||
if serialized_state is not None: | ||
json_dict["state"] = json.loads(serialized_state, | ||
ConnorMcMahon marked this conversation as resolved.
Show resolved
Hide resolved
|
||
object_hook=_deserialize_custom_object) | ||
batch = json_dict.pop("batch") | ||
return cls(**json_dict), batch | ||
|
||
def set_state(self, state: Any) -> None: | ||
"""Set the state of the entity. | ||
|
||
Parameter | ||
--------- | ||
state: Any | ||
The new state of the entity | ||
""" | ||
# TODO: enable serialization of custom types | ||
self._exists = True | ||
self._state = json.dumps(state) | ||
|
||
def get_state(self, initializer: Optional[Callable[[], Any]] = None) -> Any: | ||
davidmrdavid marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Get the current state of this entity. | ||
|
||
Parameters | ||
---------- | ||
initializer: Optional[Callable[[], Any]] | ||
A 0-argument function to provide an initial state. Defaults to None. | ||
|
||
Returns | ||
------- | ||
Any | ||
The current state of the entity | ||
""" | ||
state = self._state | ||
# TODO: some weird errs here with None states | ||
if state is not None: | ||
return state | ||
elif initializer: | ||
# TODO: ensure this is a fucntion | ||
state = initializer() | ||
return state | ||
|
||
def get_input(self) -> Any: | ||
"""Get the input for this operation. | ||
|
||
Returns | ||
------- | ||
Any | ||
The input for the current operation | ||
""" | ||
input_ = None | ||
req_input = self._input | ||
req_input = json.loads(req_input) | ||
input_ = None if req_input is None else self.from_json_util(req_input) | ||
return input_ | ||
|
||
def set_result(self, result: Any) -> None: | ||
"""Set the result (return value) of the entity. | ||
|
||
Paramaters | ||
---------- | ||
result: Any | ||
The result / return value for the entity | ||
""" | ||
self._exists = True | ||
self._result = result | ||
|
||
def destruct_on_exit(self) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure how we handle this in JS or C#, but there is the question of how to handle subsequent operations after this within a batch. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's check-in with the team then 😉 What are the semantics for a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll be tracking this here: #222 |
||
"""Delete this entity after the operation completes.""" | ||
self._exists = False | ||
self._state = None | ||
|
||
def _elapsed_milliseconds_since(self, start_time: datetime) -> int: | ||
"""Calculate the elapsed time, in milliseconds, from the start_time to the present. | ||
|
||
Parameters | ||
---------- | ||
start_time: datetime | ||
The timestamp of when the entity began processing a batched request. | ||
|
||
Returns | ||
------- | ||
int | ||
The time, in millseconds, from start_time to now | ||
""" | ||
end_time = datetime.now() | ||
time_diff = end_time - start_time | ||
elapsed_time = int(time_diff.total_seconds() * 1000) | ||
return elapsed_time | ||
|
||
def from_json_util(self, json_str: str) -> Any: | ||
"""Load an arbitrary datatype from its JSON representation. | ||
|
||
The Out-of-proc SDK has a special JSON encoding strategy | ||
to enable arbitrary datatypes to be serialized. This utility | ||
loads a JSON with the assumption that it follows that encoding | ||
method. | ||
|
||
Parameters | ||
---------- | ||
json_str: str | ||
A JSON-formatted string, from durable-extension | ||
|
||
Returns | ||
------- | ||
Any: | ||
The original datatype that was serialized | ||
""" | ||
# TODO: this should be a util elsewhere, since we use it alot | ||
return json.loads(json_str, object_hook=_deserialize_custom_object) |
Uh oh!
There was an error while loading. Please reload this page.