Skip to content

Python Worker Extension Interface (worker) #815

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Apr 1, 2021
2 changes: 2 additions & 0 deletions azure_functions_worker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

__version__ = '1.1.10'
25 changes: 23 additions & 2 deletions azure_functions_worker/bindings/tracecontext.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,44 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from typing import Dict


class TraceContext:
"""Check https://www.w3.org/TR/trace-context/ for more information"""

def __init__(self, trace_parent: str,
trace_state: str, attributes: dict) -> None:
trace_state: str, attributes: Dict[str, str]) -> None:
self.__trace_parent = trace_parent
self.__trace_state = trace_state
self.__attributes = attributes

@property
def Tracestate(self) -> str:
"""Get trace state from trace-context (deprecated)."""
return self.__trace_state

@property
def Traceparent(self) -> str:
"""Get trace parent from trace-context (deprecated)."""
return self.__trace_parent

@property
def Attributes(self) -> Dict[str, str]:
"""Get trace-context attributes (deprecated)."""
return self.__attributes

@property
def trace_state(self) -> str:
"""Get trace state from trace-context"""
return self.__trace_state

@property
def trace_parent(self) -> str:
"""Get trace parent from trace-context"""
return self.__trace_parent

@property
def Attributes(self) -> str:
def attributes(self) -> Dict[str, str]:
"""Get trace-context attributes"""
return self.__attributes
6 changes: 3 additions & 3 deletions azure_functions_worker/constants.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

# Prefixes
CONSOLE_LOG_PREFIX = "LanguageWorkerConsoleLog"

# Capabilities
RAW_HTTP_BODY_BYTES = "RawHttpBodyBytes"
TYPED_DATA_COLLECTION = "TypedDataCollection"
Expand All @@ -26,6 +23,7 @@
PYTHON_ROLLBACK_CWD_PATH = "PYTHON_ROLLBACK_CWD_PATH"
PYTHON_THREADPOOL_THREAD_COUNT = "PYTHON_THREADPOOL_THREAD_COUNT"
PYTHON_ISOLATE_WORKER_DEPENDENCIES = "PYTHON_ISOLATE_WORKER_DEPENDENCIES"
PYTHON_ENABLE_WORKER_EXTENSIONS = "PYTHON_ENABLE_WORKER_EXTENSIONS"
FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED = \
"FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED"
"""
Expand All @@ -40,6 +38,8 @@
PYTHON_THREADPOOL_THREAD_COUNT_MAX = 32
PYTHON_ISOLATE_WORKER_DEPENDENCIES_DEFAULT = False
PYTHON_ISOLATE_WORKER_DEPENDENCIES_DEFAULT_39 = True
PYTHON_ENABLE_WORKER_EXTENSIONS_DEFAULT = False
PYTHON_ENABLE_WORKER_EXTENSIONS_DEFAULT_39 = True

# External Site URLs
MODULE_NOT_FOUND_TS_URL = "https://aka.ms/functions-modulenotfound"
39 changes: 29 additions & 10 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@

import grpc

from . import __version__
from . import bindings
from . import constants
from . import functions
from . import loader
from . import protos
from .constants import (CONSOLE_LOG_PREFIX, PYTHON_THREADPOOL_THREAD_COUNT,
from .constants import (PYTHON_THREADPOOL_THREAD_COUNT,
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT,
PYTHON_THREADPOOL_THREAD_COUNT_MAX,
PYTHON_THREADPOOL_THREAD_COUNT_MIN)
from .logging import disable_console_logging, enable_console_logging
from .logging import error_logger, is_system_log_category, logger
from .logging import (logger, error_logger, is_system_log_category,
CONSOLE_LOG_PREFIX)
from .extension import ExtensionManager
from .utils.common import get_app_setting
from .utils.tracing import marshall_exception_trace
from .utils.dependency import DependencyManager
Expand Down Expand Up @@ -255,8 +258,9 @@ async def _dispatch_grpc_request(self, request):
self._grpc_resp_queue.put_nowait(resp)

async def _handle__worker_init_request(self, req):
logger.info('Received WorkerInitRequest, request ID %s',
self.request_id)
logger.info('Received WorkerInitRequest, '
'python version %s, worker version %s, request ID %s',
sys.version, __version__, self.request_id)

capabilities = {
constants.RAW_HTTP_BODY_BYTES: _TRUE,
Expand Down Expand Up @@ -304,6 +308,11 @@ async def _handle__function_load_request(self, req):
self._functions.add_function(
function_id, func, func_request.metadata)

ExtensionManager.function_load_extension(
function_name,
func_request.metadata.directory
)

logger.info('Successfully processed FunctionLoadRequest, '
f'request ID: {self.request_id}, '
f'function ID: {function_id},'
Expand Down Expand Up @@ -373,20 +382,24 @@ async def _handle__invocation_request(self, req):
pytype=pb_type_info.pytype,
shmem_mgr=self._shmem_mgr)

fi_context = bindings.Context(
fi.name, fi.directory, invocation_id, trace_context)
if fi.requires_context:
args['context'] = bindings.Context(
fi.name, fi.directory, invocation_id, trace_context)
args['context'] = fi_context

if fi.output_types:
for name in fi.output_types:
args[name] = bindings.Out()

if fi.is_async:
call_result = await fi.func(**args)
call_result = await self._run_async_func(
fi_context, fi.func, args
)
else:
call_result = await self._loop.run_in_executor(
self._sync_call_tp,
self.__run_sync_func, invocation_id, fi.func, args)
self._run_sync_func,
invocation_id, fi_context, fi.func, args)
if call_result is not None and not fi.has_return:
raise RuntimeError(f'function {fi.name!r} without a $return '
'binding returned a non-None value')
Expand Down Expand Up @@ -582,15 +595,21 @@ def _create_sync_call_tp(
max_workers=max_worker
)

def __run_sync_func(self, invocation_id, func, params):
def _run_sync_func(self, invocation_id, context, func, params):
# This helper exists because we need to access the current
# invocation_id from ThreadPoolExecutor's threads.
_invocation_id_local.v = invocation_id
try:
return func(**params)
return ExtensionManager.get_sync_invocation_wrapper(context,
func)(params)
finally:
_invocation_id_local.v = None

async def _run_async_func(self, context, func, params):
return await ExtensionManager.get_async_invocation_wrapper(
context, func, params
)

def __poll_grpc(self):
options = []
if self._grpc_max_msg_len:
Expand Down
Loading