Skip to content

Durable Entities #184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0249145
created helper classes
davidmrdavid Aug 9, 2020
87b7640
merge and clean-up
davidmrdavid Aug 9, 2020
e24f8a6
typechecking entity-context
davidmrdavid Aug 9, 2020
4f27d2b
moving entity-context
davidmrdavid Aug 9, 2020
cf8d86a
Durable entities are functional, linted, typechecked
davidmrdavid Aug 25, 2020
402b9ad
added simple test
davidmrdavid Aug 25, 2020
14844b3
added sample, missing test
davidmrdavid Aug 25, 2020
e69b2b5
removed leftover print statement
davidmrdavid Aug 25, 2020
98db559
hidden object references are removed
davidmrdavid Aug 27, 2020
0247d32
PR feedback
davidmrdavid Sep 11, 2020
b2c3707
merge
davidmrdavid Sep 11, 2020
b513535
minor change in CallEntityAction to overcomposate for GitHub acting up
davidmrdavid Sep 11, 2020
023fd70
linting
davidmrdavid Sep 11, 2020
65188f2
Rewind API (#163)
davidmrdavid Sep 14, 2020
6c25a78
Implemented out-of-proc error reporting schema (#196)
davidmrdavid Sep 17, 2020
1881257
support signal entity
wenhzha Oct 6, 2020
a30f63b
unit test and fixes in test infra
wenhzha Oct 6, 2020
33fb7ca
lint
wenhzha Oct 12, 2020
7f350a2
lint and fix test
wenhzha Oct 12, 2020
69ec826
Merge pull request #206 from Azure/wenhzha/signal-entity
wenhzha Oct 13, 2020
f047698
Added missing 'kind' field in TokenSource's serialized form (#209)
davidmrdavid Oct 19, 2020
59e7708
disabling schema validation (#212)
davidmrdavid Oct 20, 2020
dc90330
Add Client Signal Entity (#216)
wenhzha Oct 29, 2020
0d9a327
Create link for sample folder
kemurayama Nov 11, 2020
9c48eb0
Merge branch 'dajusto/entities' of https://github.com/Azure/azure-fun…
davidmrdavid Nov 12, 2020
4bb82dc
added entitycontext, fixed sample
davidmrdavid Nov 13, 2020
5105a54
serialize custom types
davidmrdavid Nov 13, 2020
4d6f22b
sample uses signal
davidmrdavid Nov 13, 2020
af482d2
Serialize input for CallActivityWithRetryAction (#225)
carlvitzthum Nov 18, 2020
523de06
Merge branch 'dev' into dev
kemurayama Nov 19, 2020
0f8679b
Merge pull request #219 from kemurayama/dev
davidmrdavid Nov 19, 2020
d95c56d
Replace datetime.strptime for find_task_timer_created() (#228)
kemurayama Dec 2, 2020
4241369
Merge branch 'dajusto/entities' of https://github.com/Azure/azure-fun…
davidmrdavid Dec 3, 2020
85192e5
PR feedback
davidmrdavid Dec 3, 2020
274d4c0
not handling internal exceptions, deferring to extension
davidmrdavid Dec 3, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions azure/durable_functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@
Exposes the different API components intended for public consumption
"""
from .orchestrator import Orchestrator
from .entity import Entity
from .models.utils.entity_utils import EntityId
from .models.DurableOrchestrationClient import DurableOrchestrationClient
from .models.DurableOrchestrationContext import DurableOrchestrationContext
from .models.DurableEntityContext import DurableEntityContext
from .models.RetryOptions import RetryOptions
from .models.TokenSource import ManagedIdentityTokenSource

__all__ = [
'Orchestrator',
'Entity',
'EntityId',
'DurableOrchestrationClient',
'DurableEntityContext',
'DurableOrchestrationContext',
'ManagedIdentityTokenSource',
'RetryOptions'
Expand Down
92 changes: 92 additions & 0 deletions azure/durable_functions/entity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from .models import DurableEntityContext
from .models.entities import OperationResult, EntityState
from datetime import datetime
from typing import Callable, Any, List, Dict


class Entity:
"""Durable Entity Class.

Responsible for execuitng the user-defined entity function.
"""

def __init__(self, entity_func: Callable[[DurableEntityContext], None]):
"""Create a new entity for the user-defined entity.

Responsible for executing the user-defined entity function

Parameters
----------
entity_func: Callable[[DurableEntityContext], Generator[Any, Any, Any]]
The user defined entity function
"""
self.fn: Callable[[DurableEntityContext], None] = entity_func

def handle(self, context: DurableEntityContext, batch: List[Dict[str, Any]]) -> str:
"""Handle the execution of the user-defined entity function.

Loops over the batch, which serves to specify inputs to the entity,
and collects results and generates a final state, which are returned.

Parameters
----------
context: DurableEntityContext
The entity context of the entity, which the user interacts with as their Durable API

Returns
-------
str
A JSON-formatted string representing the output state, results, and exceptions for the
entity execution.
"""
response = EntityState(results=[], signals=[])
for packet in batch:
result: Any = None
is_error: bool = False
start_time: datetime = datetime.now()

try:
# populate context
context._operation = packet["name"]
context._input = packet["input"]
self.fn(context)
result = context._result

except Exception as e:
is_error = True
result = str(e)

duration: int = context._elapsed_milliseconds_since(start_time)
operation_result = OperationResult(
is_error=is_error,
duration=duration,
result=result
)
response.results.append(operation_result)

response.state = context._state
response.entity_exists = context._exists
return response.to_json_string()

@classmethod
def create(cls, fn: Callable[[DurableEntityContext], None]) -> Callable[[Any], str]:
"""Create an instance of the entity class.

Parameters
----------
fn (Callable[[DurableEntityContext], None]): [description]

Returns
-------
Callable[[Any], str]
Handle function of the newly created entity client
"""
# TODO: review types here!
def handle(context) -> str:
# TODO: this requires some commenting, where do we need to get this from the body
context_body = getattr(context, "body", None)
if context_body is None:
context_body = context
ctx, batch = DurableEntityContext.from_json(context_body)
return Entity(fn).handle(ctx, batch)
return handle
219 changes: 219 additions & 0 deletions azure/durable_functions/models/DurableEntityContext.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
from typing import Optional, Any, Dict, Tuple, List, Callable
from azure.functions._durable_functions import _deserialize_custom_object
from datetime import datetime
import json


class DurableEntityContext:
"""Context of the durable entity context.

Describes the API used to specify durable entity user code.
"""

def __init__(self,
name: str,
key: str,
exists: bool,
state: Any):
"""Context of the durable entity context.

Describes the API used to specify durable entity user code.

Parameters
----------
name: str
The name of the Durable Entity
key: str
The key of the Durable Entity
exists: bool
Flag to determine if the entity exists
state: Any
The internal state of the Durable Entity
"""
self._entity_name: str = name
self._entity_key: str = key

self._exists: bool = exists
self._is_newly_constructed: bool = False

self._state: Any = state
self._input: Any = None
self._operation: Optional[str] = None
self._result: Any = None

@property
def entity_name(self) -> str:
"""Get the name of the Entity.

Returns
-------
str
The name of the entity
"""
return self._entity_name

@property
def entity_key(self) -> str:
"""Get the Entity key.

Returns
-------
str
The entity key
"""
return self._entity_key

@property
def operation_name(self) -> Optional[str]:
"""Get the current operation name.

Returns
-------
Optional[str]
The current operation name
"""
# TODO: Maybe we should raise an
# exception if _operation is None
return self._operation

@property
def is_newly_constructed(self) -> bool:
"""Determine if the Entity was newly constructed.

Returns
-------
bool
True if the Entity was newly constructed. False otherwise.
"""
# TODO: not updating this atm

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this even gets updated when C# handles a batch either, so this may not be necessary. We should consult Sebastian about this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sebastianburckhardt:

Is there a use for a isNewlyConstructed flag for entities? Or can it be safely removed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll be tracking this here: #222

return self._is_newly_constructed

@classmethod
def from_json(cls, json_str: str) -> Tuple['DurableEntityContext', List[Dict[str, Any]]]:
"""Instantiate a DurableEntityContext from a JSON-formatted string.

Parameters
----------
json_string: str
A JSON-formatted string, returned by the durable-extension,
which represents the entity context

Returns
-------
DurableEntityContext
The DurableEntityContext originated from the input string
"""
json_dict = json.loads(json_str)
json_dict["name"] = json_dict["self"]["name"]
json_dict["key"] = json_dict["self"]["key"]
json_dict.pop("self")

serialized_state = json_dict["state"]
if serialized_state is not None:
json_dict["state"] = json.loads(serialized_state,
object_hook=_deserialize_custom_object)
batch = json_dict.pop("batch")
return cls(**json_dict), batch

def set_state(self, state: Any) -> None:
"""Set the state of the entity.

Parameter
---------
state: Any
The new state of the entity
"""
# TODO: enable serialization of custom types
self._exists = True
self._state = json.dumps(state)

def get_state(self, initializer: Optional[Callable[[], Any]] = None) -> Any:
"""Get the current state of this entity.

Parameters
----------
initializer: Optional[Callable[[], Any]]
A 0-argument function to provide an initial state. Defaults to None.

Returns
-------
Any
The current state of the entity
"""
state = self._state
# TODO: some weird errs here with None states
if state is not None:
return state
elif initializer:
# TODO: ensure this is a fucntion
state = initializer()
return state

def get_input(self) -> Any:
"""Get the input for this operation.

Returns
-------
Any
The input for the current operation
"""
input_ = None
req_input = self._input
req_input = json.loads(req_input)
input_ = None if req_input is None else self.from_json_util(req_input)
return input_

def set_result(self, result: Any) -> None:
"""Set the result (return value) of the entity.

Paramaters
----------
result: Any
The result / return value for the entity
"""
self._exists = True
self._result = result

def destruct_on_exit(self) -> None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how we handle this in JS or C#, but there is the question of how to handle subsequent operations after this within a batch.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's check-in with the team then 😉
@cgillum , @sebastianburckhardt:

What are the semantics for a destructed entity (via destructOnExit) when there are still operations to perform on a batch?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll be tracking this here: #222

"""Delete this entity after the operation completes."""
self._exists = False
self._state = None

def _elapsed_milliseconds_since(self, start_time: datetime) -> int:
"""Calculate the elapsed time, in milliseconds, from the start_time to the present.

Parameters
----------
start_time: datetime
The timestamp of when the entity began processing a batched request.

Returns
-------
int
The time, in millseconds, from start_time to now
"""
end_time = datetime.now()
time_diff = end_time - start_time
elapsed_time = int(time_diff.total_seconds() * 1000)
return elapsed_time

def from_json_util(self, json_str: str) -> Any:
"""Load an arbitrary datatype from its JSON representation.

The Out-of-proc SDK has a special JSON encoding strategy
to enable arbitrary datatypes to be serialized. This utility
loads a JSON with the assumption that it follows that encoding
method.

Parameters
----------
json_str: str
A JSON-formatted string, from durable-extension

Returns
-------
Any:
The original datatype that was serialized
"""
# TODO: this should be a util elsewhere, since we use it alot
return json.loads(json_str, object_hook=_deserialize_custom_object)
23 changes: 22 additions & 1 deletion azure/durable_functions/models/DurableOrchestrationContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
from .actions import Action
from ..models.Task import Task
from ..models.TokenSource import TokenSource
from .utils.entity_utils import EntityId
from ..tasks import call_activity_task, task_all, task_any, call_activity_with_retry_task, \
wait_for_external_event_task, continue_as_new, new_uuid, call_http, create_timer_task, \
call_sub_orchestrator_task, call_sub_orchestrator_with_retry_task
call_sub_orchestrator_task, call_sub_orchestrator_with_retry_task, call_entity_task
from azure.functions._durable_functions import _deserialize_custom_object


Expand Down Expand Up @@ -359,6 +360,26 @@ def function_context(self) -> FunctionContext:
"""
return self._function_context

def call_entity(self, entityId: EntityId,
operationName: str, operationInput: Optional[Any] = None):
"""Get the result of Durable Entity operation given some input.

Parameters
----------
entityId: EntityId
The ID of the entity to call
operationName: str
The operation to execute
operationInput: Optional[Any]
The input for tne operation, defaults to None.

Returns
-------
Task
A Task of the entity call
"""
return call_entity_task(self.histories, entityId, operationName, operationInput)

@property
def will_continue_as_new(self) -> bool:
"""Return true if continue_as_new was called."""
Expand Down
2 changes: 2 additions & 0 deletions azure/durable_functions/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
from .TaskSet import TaskSet
from .DurableHttpRequest import DurableHttpRequest
from .TokenSource import ManagedIdentityTokenSource
from .DurableEntityContext import DurableEntityContext

__all__ = [
'DurableOrchestrationBindings',
'DurableOrchestrationClient',
'DurableEntityContext',
'DurableOrchestrationContext',
'DurableHttpRequest',
'ManagedIdentityTokenSource',
Expand Down
1 change: 1 addition & 0 deletions azure/durable_functions/models/actions/ActionType.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ class ActionType(IntEnum):
CONTINUE_AS_NEW: int = 4
CREATE_TIMER: int = 5
WAIT_FOR_EXTERNAL_EVENT: int = 6
CALL_ENTITY = 7
CALL_HTTP: int = 8
Loading