Skip to content

Added Retry Context for python worker #909

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Nov 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion azure_functions_worker/bindings/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from .tracecontext import TraceContext
from .retrycontext import RetryContext
from .context import Context
from .meta import check_input_type_annotation
from .meta import check_output_type_annotation
Expand All @@ -16,6 +17,6 @@
'is_trigger_binding',
'check_input_type_annotation', 'check_output_type_annotation',
'has_implicit_output',
'from_incoming_proto', 'to_outgoing_proto', 'TraceContext',
'from_incoming_proto', 'to_outgoing_proto', 'TraceContext', 'RetryContext',
'to_outgoing_param_binding'
)
14 changes: 12 additions & 2 deletions azure_functions_worker/bindings/context.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from . import TraceContext
from . import RetryContext


class Context:

def __init__(self, func_name: str, func_dir: str,
invocation_id: str, trace_context: TraceContext) -> None:
def __init__(self,
func_name: str,
func_dir: str,
invocation_id: str,
trace_context: TraceContext,
retry_context: RetryContext) -> None:
self.__func_name = func_name
self.__func_dir = func_dir
self.__invocation_id = invocation_id
self.__trace_context = trace_context
self.__retry_context = retry_context

@property
def invocation_id(self) -> str:
Expand All @@ -27,3 +33,7 @@ def function_directory(self) -> str:
@property
def trace_context(self) -> TraceContext:
return self.__trace_context

@property
def retry_context(self) -> RetryContext:
return self.__retry_context
31 changes: 31 additions & 0 deletions azure_functions_worker/bindings/retrycontext.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from . import rpcexception


class RetryContext:
"""Check https://docs.microsoft.com/en-us/azure/azure-functions/
functions-bindings-error-pages?tabs=python#retry-policies-preview"""

def __init__(self,
retry_count: int,
max_retry_count: int,
rpc_exception: rpcexception.RpcException) -> None:
self.__retry_count = retry_count
self.__max_retry_count = max_retry_count
self.__rpc_exception = rpc_exception

@property
def retry_count(self) -> int:
"""Gets the current retry count from retry-context"""
return self.__retry_count

@property
def max_retry_count(self) -> int:
"""Gets the max retry count from retry-context"""
return self.__max_retry_count

@property
def exception(self) -> rpcexception.RpcException:
return self.__rpc_exception
25 changes: 25 additions & 0 deletions azure_functions_worker/bindings/rpcexception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.


class RpcException:

def __init__(self,
source: str,
stack_trace: str,
message: str) -> None:
self.__source = source
self.__stack_trace = stack_trace
self.__message = message

@property
def source(self) -> str:
return self.__source

@property
def stack_trace(self) -> str:
return self.__stack_trace

@property
def message(self) -> str:
return self.__message
33 changes: 22 additions & 11 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@


class DispatcherMeta(type):

__current_dispatcher__ = None

@property
Expand All @@ -62,7 +61,6 @@ def current(mcls):


class Dispatcher(metaclass=DispatcherMeta):

_GRPC_STOP_RESPONSE = object()

def __init__(self, loop: BaseEventLoop, host: str, port: int,
Expand Down Expand Up @@ -346,13 +344,9 @@ async def _handle__function_load_request(self, req):

async def _handle__invocation_request(self, req):
invoc_request = req.invocation_request

invocation_id = invoc_request.invocation_id
function_id = invoc_request.function_id
trace_context = bindings.TraceContext(
invoc_request.trace_context.trace_parent,
invoc_request.trace_context.trace_state,
invoc_request.trace_context.attributes)

# Set the current `invocation_id` to the current task so
# that our logging handler can find it.
current_task = _CURRENT_TASK(self._loop)
Expand Down Expand Up @@ -392,8 +386,7 @@ async def _handle__invocation_request(self, req):
pytype=pb_type_info.pytype,
shmem_mgr=self._shmem_mgr)

fi_context = bindings.Context(
fi.name, fi.directory, invocation_id, trace_context)
fi_context = self._get_context(invoc_request, fi.name, fi.directory)
if fi.requires_context:
args['context'] = fi_context

Expand Down Expand Up @@ -470,7 +463,7 @@ async def _handle__function_environment_reload_request(self, req):
# Import before clearing path cache so that the default
# azure.functions modules is available in sys.modules for
# customer use
import azure.functions # NoQA
import azure.functions # NoQA

# Append function project root to module finding sys.path
if func_env_reload_request.function_app_directory:
Expand Down Expand Up @@ -556,6 +549,25 @@ async def _handle__close_shared_memory_resources_request(self, req):
request_id=self.request_id,
close_shared_memory_resources_response=response)

@staticmethod
def _get_context(invoc_request: protos.InvocationRequest, name: str,
directory: str) -> bindings.Context:
""" For more information refer: https://aka.ms/azfunc-invocation-context
"""
trace_context = bindings.TraceContext(
invoc_request.trace_context.trace_parent,
invoc_request.trace_context.trace_state,
invoc_request.trace_context.attributes)

retry_context = bindings.RetryContext(
invoc_request.retry_context.retry_count,
invoc_request.retry_context.max_retry_count,
invoc_request.retry_context.exception)

return bindings.Context(
name, directory, invoc_request.invocation_id,
trace_context, retry_context)

@disable_feature_by(constants.PYTHON_ROLLBACK_CWD_PATH)
def _change_cwd(self, new_cwd: str):
if os.path.exists(new_cwd):
Expand Down Expand Up @@ -702,7 +714,6 @@ def emit(self, record: LogRecord) -> None:


class ContextEnabledTask(asyncio.Task):

AZURE_INVOCATION_ID = '__azure_function_invocation_id__'

def __init__(self, coro, loop):
Expand Down
59 changes: 51 additions & 8 deletions azure_functions_worker/protos/_src/src/proto/FunctionRpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,25 @@ message StreamingMessage {
// Ask the worker to close any open shared memory resources for a given invocation
CloseSharedMemoryResourcesRequest close_shared_memory_resources_request = 27;
CloseSharedMemoryResourcesResponse close_shared_memory_resources_response = 28;

// Worker indexing message types
FunctionsMetadataRequest functions_metadata_request = 29;
FunctionMetadataResponses function_metadata_responses = 30;
}
}

// Process.Start required info
// connection details
// protocol type
// protocol version
// protocol version

// Worker sends the host information identifying itself
message StartStream {
// id of the worker
string worker_id = 2;
}

// Host requests the worker to initialize itself
// Host requests the worker to initialize itself
message WorkerInitRequest {
// version of the host sending init request
string host_version = 1;
Expand Down Expand Up @@ -177,7 +181,7 @@ message WorkerActionResponse {
Restart = 0;
Reload = 1;
}

// action for this response
Action action = 1;

Expand Down Expand Up @@ -248,7 +252,7 @@ message RpcFunctionMetadata {

// base directory for the Function
string directory = 1;

// Script file specified
string script_file = 2;

Expand All @@ -260,6 +264,30 @@ message RpcFunctionMetadata {

// Is set to true for proxy
bool is_proxy = 7;

// Function indexing status
StatusResult status = 8;

// Function language
string language = 9;

// Raw binding info
repeated string raw_bindings = 10;
}

// Host tells worker it is ready to receive metadata
message FunctionsMetadataRequest {
// base directory for function app
string function_app_directory = 1;
}

// Worker sends function metadata back to host
message FunctionMetadataResponses {
// list of function indexing responses
repeated FunctionLoadRequest function_load_requests_results = 1;

// status of overall metadata request
StatusResult result = 2;
}

// Host requests worker to invoke a Function
Expand All @@ -278,6 +306,9 @@ message InvocationRequest {

// Populates activityId, tracestate and tags from host
RpcTraceContext trace_context = 5;

// Current retry context
RetryContext retry_context = 6;
}

// Host sends ActivityId, traceStateString and Tags from host
Expand All @@ -292,6 +323,18 @@ message RpcTraceContext {
map<string, string> attributes = 3;
}

// Host sends retry context for a function invocation
message RetryContext {
// Current retry count
int32 retry_count = 1;

// Max retry count
int32 max_retry_count = 2;

// Exception that caused the retry
RpcException exception = 3;
}

// Host requests worker to cancel invocation
message InvocationCancel {
// Unique id for invocation
Expand Down Expand Up @@ -421,7 +464,7 @@ message BindingInfo {
DataType data_type = 4;
}

// Used to send logs back to the Host
// Used to send logs back to the Host
message RpcLog {
// Matching ILogger semantics
// https://github.com/aspnet/Logging/blob/9506ccc3f3491488fe88010ef8b9eb64594abf95/src/Microsoft.Extensions.Logging/Logger.cs
Expand Down Expand Up @@ -468,7 +511,7 @@ message RpcLog {
RpcLogCategory log_category = 8;
}

// Encapsulates an Exception
// Encapsulates an Exception
message RpcException {
// Source of the exception
string source = 3;
Expand Down Expand Up @@ -522,7 +565,7 @@ message RpcHttpCookie {
// TODO - solidify this or remove it
message RpcHttp {
string method = 1;
string url = 2;
string url = 2;
map<string,string> headers = 3;
TypedData body = 4;
map<string,string> params = 10;
Expand All @@ -535,4 +578,4 @@ message RpcHttp {
map<string,NullableString> nullable_headers = 20;
map<string,NullableString> nullable_params = 21;
map<string,NullableString> nullable_query = 22;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"scriptFile": "main.py",
"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"name": "req"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
],
"retry": {
"strategy": "exponentialBackoff",
"maxRetryCount": 3,
"minimumInterval": "00:00:01",
"maximumInterval": "00:00:05"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import azure.functions
import logging

logger = logging.getLogger('my function')


def main(req: azure.functions.HttpRequest, context: azure.functions.Context):
logger.info(f'Current retry count: {context.retry_context.retry_count}')
logger.info(f'Max retry count: {context.retry_context.max_retry_count}')

raise Exception("Testing retries")
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"scriptFile": "main.py",
"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"name": "req"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
],
"retry": {
"strategy": "fixedDelay",
"maxRetryCount": 3,
"delayInterval": "00:00:01"
}
}
Loading