Skip to content

Durable Entities #184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0249145
created helper classes
davidmrdavid Aug 9, 2020
87b7640
merge and clean-up
davidmrdavid Aug 9, 2020
e24f8a6
typechecking entity-context
davidmrdavid Aug 9, 2020
4f27d2b
moving entity-context
davidmrdavid Aug 9, 2020
cf8d86a
Durable entities are functional, linted, typechecked
davidmrdavid Aug 25, 2020
402b9ad
added simple test
davidmrdavid Aug 25, 2020
14844b3
added sample, missing test
davidmrdavid Aug 25, 2020
e69b2b5
removed leftover print statement
davidmrdavid Aug 25, 2020
98db559
hidden object references are removed
davidmrdavid Aug 27, 2020
0247d32
PR feedback
davidmrdavid Sep 11, 2020
b2c3707
merge
davidmrdavid Sep 11, 2020
b513535
minor change in CallEntityAction to overcomposate for GitHub acting up
davidmrdavid Sep 11, 2020
023fd70
linting
davidmrdavid Sep 11, 2020
65188f2
Rewind API (#163)
davidmrdavid Sep 14, 2020
6c25a78
Implemented out-of-proc error reporting schema (#196)
davidmrdavid Sep 17, 2020
1881257
support signal entity
wenhzha Oct 6, 2020
a30f63b
unit test and fixes in test infra
wenhzha Oct 6, 2020
33fb7ca
lint
wenhzha Oct 12, 2020
7f350a2
lint and fix test
wenhzha Oct 12, 2020
69ec826
Merge pull request #206 from Azure/wenhzha/signal-entity
wenhzha Oct 13, 2020
f047698
Added missing 'kind' field in TokenSource's serialized form (#209)
davidmrdavid Oct 19, 2020
59e7708
disabling schema validation (#212)
davidmrdavid Oct 20, 2020
dc90330
Add Client Signal Entity (#216)
wenhzha Oct 29, 2020
0d9a327
Create link for sample folder
kemurayama Nov 11, 2020
9c48eb0
Merge branch 'dajusto/entities' of https://github.com/Azure/azure-fun…
davidmrdavid Nov 12, 2020
4bb82dc
added entitycontext, fixed sample
davidmrdavid Nov 13, 2020
5105a54
serialize custom types
davidmrdavid Nov 13, 2020
4d6f22b
sample uses signal
davidmrdavid Nov 13, 2020
af482d2
Serialize input for CallActivityWithRetryAction (#225)
carlvitzthum Nov 18, 2020
523de06
Merge branch 'dev' into dev
kemurayama Nov 19, 2020
0f8679b
Merge pull request #219 from kemurayama/dev
davidmrdavid Nov 19, 2020
d95c56d
Replace datetime.strptime for find_task_timer_created() (#228)
kemurayama Dec 2, 2020
4241369
Merge branch 'dajusto/entities' of https://github.com/Azure/azure-fun…
davidmrdavid Dec 3, 2020
85192e5
PR feedback
davidmrdavid Dec 3, 2020
274d4c0
not handling internal exceptions, deferring to extension
davidmrdavid Dec 3, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Note: Conda based environments are not yet supported in Azure Functions.
### Setting up durable-py debugging


1. Git clone your fork and use any starter sample from this [folder] in your fork (https://github.com/Azure/azure-functions-durable-python/tree/dev/samples/) and open this folder in your VS Code editor.
1. Git clone your fork and use any starter sample from this [folder](https://github.com/Azure/azure-functions-durable-python/tree/dev/samples/) in your fork and open this folder in your VS Code editor.

2. Initialize this folder as an Azure Functions project using the VS Code Extension using these [instructions](https://docs.microsoft.com/en-us/azure/azure-functions/functions-create-first-function-vs-code?pivots=programming-language-python). This step will create a Python virtual environment if one doesn't exist already.

Expand Down
6 changes: 6 additions & 0 deletions azure/durable_functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@
Exposes the different API components intended for public consumption
"""
from .orchestrator import Orchestrator
from .entity import Entity
from .models.utils.entity_utils import EntityId
from .models.DurableOrchestrationClient import DurableOrchestrationClient
from .models.DurableOrchestrationContext import DurableOrchestrationContext
from .models.DurableEntityContext import DurableEntityContext
from .models.RetryOptions import RetryOptions
from .models.TokenSource import ManagedIdentityTokenSource

__all__ = [
'Orchestrator',
'Entity',
'EntityId',
'DurableOrchestrationClient',
'DurableEntityContext',
'DurableOrchestrationContext',
'ManagedIdentityTokenSource',
'RetryOptions'
Expand Down
119 changes: 119 additions & 0 deletions azure/durable_functions/entity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from .models import DurableEntityContext
from .models.entities import OperationResult, EntityState
from datetime import datetime
from typing import Callable, Any, List, Dict

class InternalEntityException(Exception):
pass

class Entity:
"""Durable Entity Class.

Responsible for executing the user-defined entity function.
"""

def __init__(self, entity_func: Callable[[DurableEntityContext], None]):
"""Create a new entity for the user-defined entity.

Responsible for executing the user-defined entity function

Parameters
----------
entity_func: Callable[[DurableEntityContext], Generator[Any, Any, Any]]
The user defined entity function
"""
self.fn: Callable[[DurableEntityContext], None] = entity_func

def handle(self, context: DurableEntityContext, batch: List[Dict[str, Any]]) -> str:
"""Handle the execution of the user-defined entity function.

Loops over the batch, which serves to specify inputs to the entity,
and collects results and generates a final state, which are returned.

Parameters
----------
context: DurableEntityContext
The entity context of the entity, which the user interacts with as their Durable API

Returns
-------
str
A JSON-formatted string representing the output state, results, and exceptions for the
entity execution.
"""
response = EntityState(results=[], signals=[])
for operation_data in batch:
result: Any = None
is_error: bool = False
start_time: datetime = datetime.now()

try:
# populate context
operation = operation_data["name"]
if operation is None:
raise InternalEntityException("Durable Functions Internal Error: Entity operation was missing a name field")
context._operation = operation
context._input = operation_data["input"]
self.fn(context)
result = context._result

except InternalEntityException as e:
raise e

except Exception as e:
is_error = True
result = str(e)

duration: int = self._elapsed_milliseconds_since(start_time)
operation_result = OperationResult(
is_error=is_error,
duration=duration,
result=result
)
response.results.append(operation_result)

response.state = context._state
response.entity_exists = context._exists
return response.to_json_string()

@classmethod
def create(cls, fn: Callable[[DurableEntityContext], None]) -> Callable[[Any], str]:
"""Create an instance of the entity class.

Parameters
----------
fn (Callable[[DurableEntityContext], None]): [description]

Returns
-------
Callable[[Any], str]
Handle function of the newly created entity client
"""
def handle(context) -> str:
# It is not clear when the context JSON would be found
# inside a "body"-key, but this pattern matches the
# orchestrator implementation, so we keep it for safety.
context_body = getattr(context, "body", None)
if context_body is None:
context_body = context
ctx, batch = DurableEntityContext.from_json(context_body)
return Entity(fn).handle(ctx, batch)
return handle

def _elapsed_milliseconds_since(self, start_time: datetime) -> int:
"""Calculate the elapsed time, in milliseconds, from the start_time to the present.

Parameters
----------
start_time: datetime
The timestamp of when the entity began processing a batched request.

Returns
-------
int
The time, in millseconds, from start_time to now
"""
end_time = datetime.now()
time_diff = end_time - start_time
elapsed_time = int(time_diff.total_seconds() * 1000)
return elapsed_time
200 changes: 200 additions & 0 deletions azure/durable_functions/models/DurableEntityContext.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
from typing import Optional, Any, Dict, Tuple, List, Callable
from azure.functions._durable_functions import _deserialize_custom_object
import json


class DurableEntityContext:
"""Context of the durable entity context.

Describes the API used to specify durable entity user code.
"""

def __init__(self,
name: str,
key: str,
exists: bool,
state: Any):
"""Context of the durable entity context.

Describes the API used to specify durable entity user code.

Parameters
----------
name: str
The name of the Durable Entity
key: str
The key of the Durable Entity
exists: bool
Flag to determine if the entity exists
state: Any
The internal state of the Durable Entity
"""
self._entity_name: str = name
self._entity_key: str = key

self._exists: bool = exists
self._is_newly_constructed: bool = False

self._state: Any = state
self._input: Any = None
self._operation: Optional[str] = None
self._result: Any = None

@property
def entity_name(self) -> str:
"""Get the name of the Entity.

Returns
-------
str
The name of the entity
"""
return self._entity_name

@property
def entity_key(self) -> str:
"""Get the Entity key.

Returns
-------
str
The entity key
"""
return self._entity_key

@property
def operation_name(self) -> Optional[str]:
"""Get the current operation name.

Returns
-------
Optional[str]
The current operation name
"""
if self._operation is None:
raise Exception("Entity operation is unassigned")
return self._operation

@property
def is_newly_constructed(self) -> bool:
"""Determine if the Entity was newly constructed.

Returns
-------
bool
True if the Entity was newly constructed. False otherwise.
"""
# This is not updated at the moment, as its semantics are unclear
return self._is_newly_constructed

@classmethod
def from_json(cls, json_str: str) -> Tuple['DurableEntityContext', List[Dict[str, Any]]]:
"""Instantiate a DurableEntityContext from a JSON-formatted string.

Parameters
----------
json_string: str
A JSON-formatted string, returned by the durable-extension,
which represents the entity context

Returns
-------
DurableEntityContext
The DurableEntityContext originated from the input string
"""
json_dict = json.loads(json_str)
json_dict["name"] = json_dict["self"]["name"]
json_dict["key"] = json_dict["self"]["key"]
json_dict.pop("self")

serialized_state = json_dict["state"]
if serialized_state is not None:
json_dict["state"] = from_json_util(serialized_state)

batch = json_dict.pop("batch")
return cls(**json_dict), batch

def set_state(self, state: Any) -> None:
"""Set the state of the entity.

Parameter
---------
state: Any
The new state of the entity
"""
self._exists = True

# should only serialize the state at the end of the batch
self._state = state

def get_state(self, initializer: Optional[Callable[[], Any]] = None) -> Any:
"""Get the current state of this entity.

Parameters
----------
initializer: Optional[Callable[[], Any]]
A 0-argument function to provide an initial state. Defaults to None.

Returns
-------
Any
The current state of the entity
"""
state = self._state
if state is not None:
return state
elif initializer:
if not callable(initializer):
raise Exception("initializer argument needs to be a callable function")
state = initializer()
return state

def get_input(self) -> Any:
"""Get the input for this operation.

Returns
-------
Any
The input for the current operation
"""
input_ = None
req_input = self._input
req_input = json.loads(req_input)
input_ = None if req_input is None else from_json_util(req_input)
return input_

def set_result(self, result: Any) -> None:
"""Set the result (return value) of the entity.

Paramaters
----------
result: Any
The result / return value for the entity
"""
self._exists = True
self._result = result

def destruct_on_exit(self) -> None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how we handle this in JS or C#, but there is the question of how to handle subsequent operations after this within a batch.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's check-in with the team then 😉
@cgillum , @sebastianburckhardt:

What are the semantics for a destructed entity (via destructOnExit) when there are still operations to perform on a batch?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll be tracking this here: #222

"""Delete this entity after the operation completes."""
self._exists = False
self._state = None

def from_json_util(self, json_str: str) -> Any:
"""Load an arbitrary datatype from its JSON representation.

The Out-of-proc SDK has a special JSON encoding strategy
to enable arbitrary datatypes to be serialized. This utility
loads a JSON with the assumption that it follows that encoding
method.

Parameters
----------
json_str: str
A JSON-formatted string, from durable-extension

Returns
-------
Any:
The original datatype that was serialized
"""
return json.loads(json_str, object_hook=_deserialize_custom_object)
Loading