Skip to content

Commit 46a0505

Browse files
Durable Entities (#184)
Co-authored-by: Wenonah Zhang <[email protected]> Co-authored-by: wenhzha <[email protected]>
1 parent d95c56d commit 46a0505

39 files changed

+1791
-17
lines changed

azure/durable_functions/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,20 @@
33
Exposes the different API components intended for public consumption
44
"""
55
from .orchestrator import Orchestrator
6+
from .entity import Entity
7+
from .models.utils.entity_utils import EntityId
68
from .models.DurableOrchestrationClient import DurableOrchestrationClient
79
from .models.DurableOrchestrationContext import DurableOrchestrationContext
10+
from .models.DurableEntityContext import DurableEntityContext
811
from .models.RetryOptions import RetryOptions
912
from .models.TokenSource import ManagedIdentityTokenSource
1013

1114
__all__ = [
1215
'Orchestrator',
16+
'Entity',
17+
'EntityId',
1318
'DurableOrchestrationClient',
19+
'DurableEntityContext',
1420
'DurableOrchestrationContext',
1521
'ManagedIdentityTokenSource',
1622
'RetryOptions'

azure/durable_functions/entity.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
from .models import DurableEntityContext
2+
from .models.entities import OperationResult, EntityState
3+
from datetime import datetime
4+
from typing import Callable, Any, List, Dict
5+
6+
class InternalEntityException(Exception):
7+
pass
8+
9+
class Entity:
10+
"""Durable Entity Class.
11+
12+
Responsible for executing the user-defined entity function.
13+
"""
14+
15+
def __init__(self, entity_func: Callable[[DurableEntityContext], None]):
16+
"""Create a new entity for the user-defined entity.
17+
18+
Responsible for executing the user-defined entity function
19+
20+
Parameters
21+
----------
22+
entity_func: Callable[[DurableEntityContext], Generator[Any, Any, Any]]
23+
The user defined entity function
24+
"""
25+
self.fn: Callable[[DurableEntityContext], None] = entity_func
26+
27+
def handle(self, context: DurableEntityContext, batch: List[Dict[str, Any]]) -> str:
28+
"""Handle the execution of the user-defined entity function.
29+
30+
Loops over the batch, which serves to specify inputs to the entity,
31+
and collects results and generates a final state, which are returned.
32+
33+
Parameters
34+
----------
35+
context: DurableEntityContext
36+
The entity context of the entity, which the user interacts with as their Durable API
37+
38+
Returns
39+
-------
40+
str
41+
A JSON-formatted string representing the output state, results, and exceptions for the
42+
entity execution.
43+
"""
44+
response = EntityState(results=[], signals=[])
45+
for operation_data in batch:
46+
result: Any = None
47+
is_error: bool = False
48+
start_time: datetime = datetime.now()
49+
50+
try:
51+
# populate context
52+
operation = operation_data["name"]
53+
if operation is None:
54+
raise InternalEntityException("Durable Functions Internal Error: Entity operation was missing a name field")
55+
context._operation = operation
56+
context._input = operation_data["input"]
57+
self.fn(context)
58+
result = context._result
59+
60+
except InternalEntityException as e:
61+
raise e
62+
63+
except Exception as e:
64+
is_error = True
65+
result = str(e)
66+
67+
duration: int = self._elapsed_milliseconds_since(start_time)
68+
operation_result = OperationResult(
69+
is_error=is_error,
70+
duration=duration,
71+
result=result
72+
)
73+
response.results.append(operation_result)
74+
75+
response.state = context._state
76+
response.entity_exists = context._exists
77+
return response.to_json_string()
78+
79+
@classmethod
80+
def create(cls, fn: Callable[[DurableEntityContext], None]) -> Callable[[Any], str]:
81+
"""Create an instance of the entity class.
82+
83+
Parameters
84+
----------
85+
fn (Callable[[DurableEntityContext], None]): [description]
86+
87+
Returns
88+
-------
89+
Callable[[Any], str]
90+
Handle function of the newly created entity client
91+
"""
92+
def handle(context) -> str:
93+
# It is not clear when the context JSON would be found
94+
# inside a "body"-key, but this pattern matches the
95+
# orchestrator implementation, so we keep it for safety.
96+
context_body = getattr(context, "body", None)
97+
if context_body is None:
98+
context_body = context
99+
ctx, batch = DurableEntityContext.from_json(context_body)
100+
return Entity(fn).handle(ctx, batch)
101+
return handle
102+
103+
def _elapsed_milliseconds_since(self, start_time: datetime) -> int:
104+
"""Calculate the elapsed time, in milliseconds, from the start_time to the present.
105+
106+
Parameters
107+
----------
108+
start_time: datetime
109+
The timestamp of when the entity began processing a batched request.
110+
111+
Returns
112+
-------
113+
int
114+
The time, in millseconds, from start_time to now
115+
"""
116+
end_time = datetime.now()
117+
time_diff = end_time - start_time
118+
elapsed_time = int(time_diff.total_seconds() * 1000)
119+
return elapsed_time
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
from typing import Optional, Any, Dict, Tuple, List, Callable
2+
from azure.functions._durable_functions import _deserialize_custom_object
3+
import json
4+
5+
6+
class DurableEntityContext:
7+
"""Context of the durable entity context.
8+
9+
Describes the API used to specify durable entity user code.
10+
"""
11+
12+
def __init__(self,
13+
name: str,
14+
key: str,
15+
exists: bool,
16+
state: Any):
17+
"""Context of the durable entity context.
18+
19+
Describes the API used to specify durable entity user code.
20+
21+
Parameters
22+
----------
23+
name: str
24+
The name of the Durable Entity
25+
key: str
26+
The key of the Durable Entity
27+
exists: bool
28+
Flag to determine if the entity exists
29+
state: Any
30+
The internal state of the Durable Entity
31+
"""
32+
self._entity_name: str = name
33+
self._entity_key: str = key
34+
35+
self._exists: bool = exists
36+
self._is_newly_constructed: bool = False
37+
38+
self._state: Any = state
39+
self._input: Any = None
40+
self._operation: Optional[str] = None
41+
self._result: Any = None
42+
43+
@property
44+
def entity_name(self) -> str:
45+
"""Get the name of the Entity.
46+
47+
Returns
48+
-------
49+
str
50+
The name of the entity
51+
"""
52+
return self._entity_name
53+
54+
@property
55+
def entity_key(self) -> str:
56+
"""Get the Entity key.
57+
58+
Returns
59+
-------
60+
str
61+
The entity key
62+
"""
63+
return self._entity_key
64+
65+
@property
66+
def operation_name(self) -> Optional[str]:
67+
"""Get the current operation name.
68+
69+
Returns
70+
-------
71+
Optional[str]
72+
The current operation name
73+
"""
74+
if self._operation is None:
75+
raise Exception("Entity operation is unassigned")
76+
return self._operation
77+
78+
@property
79+
def is_newly_constructed(self) -> bool:
80+
"""Determine if the Entity was newly constructed.
81+
82+
Returns
83+
-------
84+
bool
85+
True if the Entity was newly constructed. False otherwise.
86+
"""
87+
# This is not updated at the moment, as its semantics are unclear
88+
return self._is_newly_constructed
89+
90+
@classmethod
91+
def from_json(cls, json_str: str) -> Tuple['DurableEntityContext', List[Dict[str, Any]]]:
92+
"""Instantiate a DurableEntityContext from a JSON-formatted string.
93+
94+
Parameters
95+
----------
96+
json_string: str
97+
A JSON-formatted string, returned by the durable-extension,
98+
which represents the entity context
99+
100+
Returns
101+
-------
102+
DurableEntityContext
103+
The DurableEntityContext originated from the input string
104+
"""
105+
json_dict = json.loads(json_str)
106+
json_dict["name"] = json_dict["self"]["name"]
107+
json_dict["key"] = json_dict["self"]["key"]
108+
json_dict.pop("self")
109+
110+
serialized_state = json_dict["state"]
111+
if serialized_state is not None:
112+
json_dict["state"] = from_json_util(serialized_state)
113+
114+
batch = json_dict.pop("batch")
115+
return cls(**json_dict), batch
116+
117+
def set_state(self, state: Any) -> None:
118+
"""Set the state of the entity.
119+
120+
Parameter
121+
---------
122+
state: Any
123+
The new state of the entity
124+
"""
125+
self._exists = True
126+
127+
# should only serialize the state at the end of the batch
128+
self._state = state
129+
130+
def get_state(self, initializer: Optional[Callable[[], Any]] = None) -> Any:
131+
"""Get the current state of this entity.
132+
133+
Parameters
134+
----------
135+
initializer: Optional[Callable[[], Any]]
136+
A 0-argument function to provide an initial state. Defaults to None.
137+
138+
Returns
139+
-------
140+
Any
141+
The current state of the entity
142+
"""
143+
state = self._state
144+
if state is not None:
145+
return state
146+
elif initializer:
147+
if not callable(initializer):
148+
raise Exception("initializer argument needs to be a callable function")
149+
state = initializer()
150+
return state
151+
152+
def get_input(self) -> Any:
153+
"""Get the input for this operation.
154+
155+
Returns
156+
-------
157+
Any
158+
The input for the current operation
159+
"""
160+
input_ = None
161+
req_input = self._input
162+
req_input = json.loads(req_input)
163+
input_ = None if req_input is None else from_json_util(req_input)
164+
return input_
165+
166+
def set_result(self, result: Any) -> None:
167+
"""Set the result (return value) of the entity.
168+
169+
Paramaters
170+
----------
171+
result: Any
172+
The result / return value for the entity
173+
"""
174+
self._exists = True
175+
self._result = result
176+
177+
def destruct_on_exit(self) -> None:
178+
"""Delete this entity after the operation completes."""
179+
self._exists = False
180+
self._state = None
181+
182+
def from_json_util(self, json_str: str) -> Any:
183+
"""Load an arbitrary datatype from its JSON representation.
184+
185+
The Out-of-proc SDK has a special JSON encoding strategy
186+
to enable arbitrary datatypes to be serialized. This utility
187+
loads a JSON with the assumption that it follows that encoding
188+
method.
189+
190+
Parameters
191+
----------
192+
json_str: str
193+
A JSON-formatted string, from durable-extension
194+
195+
Returns
196+
-------
197+
Any:
198+
The original datatype that was serialized
199+
"""
200+
return json.loads(json_str, object_hook=_deserialize_custom_object)

0 commit comments

Comments
 (0)