diff --git a/azure_functions_worker/constants.py b/azure_functions_worker/constants.py index 07f219f2a..c78e8c1b8 100644 --- a/azure_functions_worker/constants.py +++ b/azure_functions_worker/constants.py @@ -50,3 +50,8 @@ # External Site URLs MODULE_NOT_FOUND_TS_URL = "https://aka.ms/functions-modulenotfound" + +# new programming model script file name +SCRIPT_FILE_NAME = "function_app.py" + +PYTHON_LANGUAGE_RUNTIME = "python" diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index 03c29478b..eaf43a66a 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -18,17 +18,13 @@ import grpc -from . import bindings -from . import constants -from . import functions -from . import loader -from . import protos +from . import bindings, constants, functions, loader, protos from .bindings.shared_memory_data_transfer import SharedMemoryManager from .constants import (PYTHON_THREADPOOL_THREAD_COUNT, PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT, PYTHON_THREADPOOL_THREAD_COUNT_MAX_37, PYTHON_THREADPOOL_THREAD_COUNT_MIN, - PYTHON_ENABLE_DEBUG_LOGGING) + PYTHON_ENABLE_DEBUG_LOGGING, SCRIPT_FILE_NAME) from .extension import ExtensionManager from .logging import disable_console_logging, enable_console_logging from .logging import enable_debug_logging_recommendation @@ -40,7 +36,6 @@ from .utils.wrappers import disable_feature_by from .version import VERSION - _TRUE = "true" """In Python 3.6, the current_task method was in the Task class, but got moved @@ -260,13 +255,13 @@ async def _dispatch_grpc_request(self, request): resp = await request_handler(request) self._grpc_resp_queue.put_nowait(resp) - async def _handle__worker_init_request(self, req): + async def _handle__worker_init_request(self, request): logger.info('Received WorkerInitRequest, ' 'python version %s, worker version %s, request ID %s', sys.version, VERSION, self.request_id) enable_debug_logging_recommendation() - worker_init_request = req.worker_init_request + worker_init_request = request.worker_init_request host_capabilities = worker_init_request.capabilities if constants.FUNCTION_DATA_CACHE in host_capabilities: val = host_capabilities[constants.FUNCTION_DATA_CACHE] @@ -294,42 +289,93 @@ async def _handle__worker_init_request(self, req): result=protos.StatusResult( status=protos.StatusResult.Success))) - async def _handle__worker_status_request(self, req): + async def _handle__worker_status_request(self, request): # Logging is not necessary in this request since the response is used # for host to judge scale decisions of out-of-proc languages. # Having log here will reduce the responsiveness of the worker. return protos.StreamingMessage( - request_id=req.request_id, + request_id=request.request_id, worker_status_response=protos.WorkerStatusResponse()) - async def _handle__function_load_request(self, req): - func_request = req.function_load_request + async def _handle__functions_metadata_request(self, request): + metadata_request = request.functions_metadata_request + directory = metadata_request.function_app_directory + function_path = os.path.join(directory, SCRIPT_FILE_NAME) + + if not os.path.exists(function_path): + # Fallback to legacy model + logger.info(f"{SCRIPT_FILE_NAME} does not exist. " + "Switching to host indexing.") + return protos.StreamingMessage( + request_id=request.request_id, + function_metadata_response=protos.FunctionMetadataResponse( + use_default_metadata_indexing=True, + result=protos.StatusResult( + status=protos.StatusResult.Success))) + + try: + fx_metadata_results = [] + indexed_functions = loader.index_function_app(function_path) + if indexed_functions: + indexed_function_logs: List[str] = [] + for func in indexed_functions: + function_log = \ + f"Function Name: {func.get_function_name()} " \ + "Function Binding: " \ + f"{[binding.name for binding in func.get_bindings()]}" + indexed_function_logs.append(function_log) + + logger.info( + f'Successfully processed FunctionMetadataRequest for ' + f'functions: {" ".join(indexed_function_logs)}') + + fx_metadata_results = loader.process_indexed_function( + self._functions, + indexed_functions) + else: + logger.warning("No functions indexed. Please refer to the " + "documentation.") + + return protos.StreamingMessage( + request_id=request.request_id, + function_metadata_response=protos.FunctionMetadataResponse( + function_metadata_results=fx_metadata_results, + result=protos.StatusResult( + status=protos.StatusResult.Success))) + + except Exception as ex: + return protos.StreamingMessage( + request_id=self.request_id, + function_metadata_response=protos.FunctionMetadataResponse( + result=protos.StatusResult( + status=protos.StatusResult.Failure, + exception=self._serialize_exception(ex)))) + + async def _handle__function_load_request(self, request): + func_request = request.function_load_request function_id = func_request.function_id function_name = func_request.metadata.name - logger.info(f'Received FunctionLoadRequest, ' - f'request ID: {self.request_id}, ' - f'function ID: {function_id}' - f'function Name: {function_name}') try: - func = loader.load_function( - func_request.metadata.name, - func_request.metadata.directory, - func_request.metadata.script_file, - func_request.metadata.entry_point) - - self._functions.add_function( - function_id, func, func_request.metadata) - - ExtensionManager.function_load_extension( - function_name, - func_request.metadata.directory - ) + if not self._functions.get_function(function_id): + func = loader.load_function( + func_request.metadata.name, + func_request.metadata.directory, + func_request.metadata.script_file, + func_request.metadata.entry_point) + + self._functions.add_function( + function_id, func, func_request.metadata) + + ExtensionManager.function_load_extension( + function_name, + func_request.metadata.directory + ) - logger.info('Successfully processed FunctionLoadRequest, ' - f'request ID: {self.request_id}, ' - f'function ID: {function_id},' - f'function Name: {function_name}') + logger.info('Successfully processed FunctionLoadRequest, ' + f'request ID: {self.request_id}, ' + f'function ID: {function_id},' + f'function Name: {function_name}') return protos.StreamingMessage( request_id=self.request_id, @@ -347,8 +393,8 @@ async def _handle__function_load_request(self, req): status=protos.StatusResult.Failure, exception=self._serialize_exception(ex)))) - async def _handle__invocation_request(self, req): - invoc_request = req.invocation_request + async def _handle__invocation_request(self, request): + invoc_request = request.invocation_request invocation_id = invoc_request.invocation_id function_id = invoc_request.function_id @@ -361,6 +407,7 @@ async def _handle__invocation_request(self, req): try: fi: functions.FunctionInfo = self._functions.get_function( function_id) + assert fi is not None function_invocation_logs: List[str] = [ 'Received FunctionInvocationRequest', @@ -456,7 +503,7 @@ async def _handle__invocation_request(self, req): status=protos.StatusResult.Failure, exception=self._serialize_exception(ex)))) - async def _handle__function_environment_reload_request(self, req): + async def _handle__function_environment_reload_request(self, request): """Only runs on Linux Consumption placeholder specialization. """ try: @@ -464,7 +511,8 @@ async def _handle__function_environment_reload_request(self, req): 'request ID: %s', self.request_id) enable_debug_logging_recommendation() - func_env_reload_request = req.function_environment_reload_request + func_env_reload_request = \ + request.function_environment_reload_request # Import before clearing path cache so that the default # azure.functions modules is available in sys.modules for @@ -523,7 +571,7 @@ async def _handle__function_environment_reload_request(self, req): request_id=self.request_id, function_environment_reload_response=failure_response) - async def _handle__close_shared_memory_resources_request(self, req): + async def _handle__close_shared_memory_resources_request(self, request): """ Frees any memory maps that were produced as output for a given invocation. @@ -534,7 +582,7 @@ async def _handle__close_shared_memory_resources_request(self, req): If the cache is not enabled, the worker should free the resources as at this point the host has read the memory maps and does not need them. """ - close_request = req.close_shared_memory_resources_request + close_request = request.close_shared_memory_resources_request map_names = close_request.map_names # Assign default value of False to all result values. # If we are successfully able to close a memory map, its result will be diff --git a/azure_functions_worker/extension.py b/azure_functions_worker/extension.py index 3c6791b3e..a2a7c14c1 100644 --- a/azure_functions_worker/extension.py +++ b/azure_functions_worker/extension.py @@ -1,16 +1,11 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +import functools +import logging from types import ModuleType from typing import Any, Callable, List, Optional -import logging -import functools -from .utils.common import ( - is_python_version, - get_sdk_from_sys_path, - get_sdk_version -) -from .utils.wrappers import enable_feature_by + from .constants import ( PYTHON_ISOLATE_WORKER_DEPENDENCIES, PYTHON_ENABLE_WORKER_EXTENSIONS, @@ -18,7 +13,12 @@ PYTHON_ENABLE_WORKER_EXTENSIONS_DEFAULT_39 ) from .logging import logger, SYSTEM_LOG_PREFIX - +from .utils.common import ( + is_python_version, + get_sdk_from_sys_path, + get_sdk_version +) +from .utils.wrappers import enable_feature_by # Extension Hooks FUNC_EXT_POST_FUNCTION_LOAD = "post_function_load" diff --git a/azure_functions_worker/functions.py b/azure_functions_worker/functions.py index b221caa39..085fb5de6 100644 --- a/azure_functions_worker/functions.py +++ b/azure_functions_worker/functions.py @@ -2,21 +2,23 @@ # Licensed under the MIT License. import inspect import operator +import pathlib import typing -from . import bindings +from azure.functions import DataType, Function + +from . import bindings as bindings_utils from . import protos from ._thirdparty import typing_inspect +from .protos import BindingInfo class ParamTypeInfo(typing.NamedTuple): - binding_name: str pytype: typing.Optional[type] class FunctionInfo(typing.NamedTuple): - func: typing.Callable name: str @@ -38,63 +40,66 @@ def __init__(self, function_name: str, msg: str) -> None: class Registry: - _functions: typing.MutableMapping[str, FunctionInfo] def __init__(self) -> None: self._functions = {} def get_function(self, function_id: str) -> FunctionInfo: - try: + if function_id in self._functions: return self._functions[function_id] - except KeyError: - raise RuntimeError( - f'no function with function_id={function_id}') from None - def add_function(self, function_id: str, - func: typing.Callable, - metadata: protos.RpcFunctionMetadata): - func_name = metadata.name - sig = inspect.signature(func) - params = dict(sig.parameters) - annotations = typing.get_type_hints(func) + return None + + @staticmethod + def get_explicit_and_implicit_return(binding_name: str, + binding: BindingInfo, + explicit_return: bool, + implicit_return: bool, + bound_params: dict) -> \ + typing.Tuple[bool, bool]: + if binding_name == '$return': + explicit_return = True + elif bindings_utils.has_implicit_output( + binding.type): + implicit_return = True + bound_params[binding_name] = binding + else: + bound_params[binding_name] = binding + return explicit_return, implicit_return + + @staticmethod + def get_return_binding(binding_name: str, + binding_type: str, + return_binding_name: str) -> str: + if binding_name == "$return": + return_binding_name = binding_type + assert return_binding_name is not None + elif bindings_utils.has_implicit_output(binding_type): + return_binding_name = binding_type + + return return_binding_name + + @staticmethod + def validate_binding_direction(binding_name: str, + binding_direction: str, + func_name: str): + if binding_direction == protos.BindingInfo.inout: + raise FunctionLoadError( + func_name, + '"inout" bindings are not supported') - input_types: typing.Dict[str, ParamTypeInfo] = {} - output_types: typing.Dict[str, ParamTypeInfo] = {} - return_binding_name: typing.Optional[str] = None - return_pytype: typing.Optional[type] = None + if binding_name == '$return' and \ + binding_direction != protos.BindingInfo.out: + raise FunctionLoadError( + func_name, + '"$return" binding must have direction set to "out"') + @staticmethod + def is_context_required(params, bound_params: dict, + annotations: dict, + func_name: str) -> bool: requires_context = False - has_explicit_return = False - has_implicit_return = False - - bound_params = {} - for name, desc in metadata.bindings.items(): - if desc.direction == protos.BindingInfo.inout: - raise FunctionLoadError( - func_name, - '"inout" bindings are not supported') - - if name == '$return': - if desc.direction != protos.BindingInfo.out: - raise FunctionLoadError( - func_name, - '"$return" binding must have direction set to "out"') - - has_explicit_return = True - return_binding_name = desc.type - assert return_binding_name is not None - - elif bindings.has_implicit_output(desc.type): - # If the binding specify implicit output binding - # (e.g. orchestrationTrigger, activityTrigger) - # we should enable output even if $return is not specified - has_implicit_return = True - return_binding_name = desc.type - bound_params[name] = desc - else: - bound_params[name] = desc - if 'context' in params and 'context' not in bound_params: requires_context = True params.pop('context') @@ -107,7 +112,11 @@ def add_function(self, function_id: str, 'the "context" parameter is expected to be of ' 'type azure.functions.Context, got ' f'{ctx_anno!r}') + return requires_context + @staticmethod + def validate_function_params(params: dict, bound_params: dict, + annotations: dict, func_name: str): if set(params) - set(bound_params): raise FunctionLoadError( func_name, @@ -120,8 +129,11 @@ def add_function(self, function_id: str, f'the following parameters are declared in function.json but ' f'not in Python: {set(bound_params) - set(params)!r}') + input_types: typing.Dict[str, ParamTypeInfo] = {} + output_types: typing.Dict[str, ParamTypeInfo] = {} + for param in params.values(): - desc = bound_params[param.name] + binding = bound_params[param.name] param_has_anno = param.name in annotations param_anno = annotations.get(param.name) @@ -147,7 +159,7 @@ def add_function(self, function_id: str, else: is_param_out = False - is_binding_out = desc.direction == protos.BindingInfo.out + is_binding_out = binding.direction == protos.BindingInfo.out if is_param_out: param_anno_args = typing_inspect.get_args(param_anno) @@ -162,15 +174,14 @@ def add_function(self, function_id: str, # so if the annotation was func.Out[typing.List[foo]], # we need to reconstruct it. if (isinstance(param_py_type, tuple) - and typing_inspect.is_generic_type(param_py_type[0])): - + and typing_inspect.is_generic_type(param_py_type[0])): param_py_type = operator.getitem( param_py_type[0], *param_py_type[1:]) else: param_py_type = param_anno if (param_has_anno and not isinstance(param_py_type, type) - and not typing_inspect.is_generic_type(param_py_type)): + and not typing_inspect.is_generic_type(param_py_type)): raise FunctionLoadError( func_name, f'binding {param.name} has invalid non-type annotation ' @@ -191,33 +202,34 @@ def add_function(self, function_id: str, 'is azure.functions.Out in Python') if param_has_anno and param_py_type in (str, bytes) and ( - not bindings.has_implicit_output(desc.type)): + not bindings_utils.has_implicit_output(binding.type)): param_bind_type = 'generic' else: - param_bind_type = desc.type + param_bind_type = binding.type if param_has_anno: if is_param_out: - checks_out = bindings.check_output_type_annotation( + checks_out = bindings_utils.check_output_type_annotation( param_bind_type, param_py_type) else: - checks_out = bindings.check_input_type_annotation( + checks_out = bindings_utils.check_input_type_annotation( param_bind_type, param_py_type) if not checks_out: - if desc.data_type is not protos.BindingInfo.undefined: + if binding.data_type is not DataType( + protos.BindingInfo.undefined): raise FunctionLoadError( func_name, - f'{param.name!r} binding type "{desc.type}" ' - f'and dataType "{desc.data_type}" in function.json' - f' do not match the corresponding function ' - f'parameter\'s Python type ' + f'{param.name!r} binding type "{binding.type}" ' + f'and dataType "{binding.data_type}" in ' + f'function.json do not match the corresponding ' + f'function parameter\'s Python type ' f'annotation "{param_py_type.__name__}"') else: raise FunctionLoadError( func_name, f'type of {param.name} binding in function.json ' - f'"{desc.type}" does not match its Python ' + f'"{binding.type}" does not match its Python ' f'annotation "{param_py_type.__name__}"') param_type_info = ParamTypeInfo(param_bind_type, param_py_type) @@ -225,12 +237,18 @@ def add_function(self, function_id: str, output_types[param.name] = param_type_info else: input_types[param.name] = param_type_info + return input_types, output_types + @staticmethod + def get_function_return_type(annotations: dict, has_explicit_return: bool, + has_implicit_return: bool, binding_name: str, + func_name: str): return_pytype = None if has_explicit_return and 'return' in annotations: return_anno = annotations.get('return') - if (typing_inspect.is_generic_type(return_anno) - and typing_inspect.get_origin(return_anno).__name__ == 'Out'): + if typing_inspect.is_generic_type( + return_anno) and typing_inspect.get_origin( + return_anno).__name__ == 'Out': raise FunctionLoadError( func_name, 'return annotation should not be azure.functions.Out') @@ -243,29 +261,152 @@ def add_function(self, function_id: str, f'annotation {return_pytype!r}') if return_pytype is (str, bytes): - return_binding_name = 'generic' + binding_name = 'generic' - if not bindings.check_output_type_annotation( - return_binding_name, return_pytype): + if not bindings_utils.check_output_type_annotation( + binding_name, return_pytype): raise FunctionLoadError( func_name, f'Python return annotation "{return_pytype.__name__}" ' - f'does not match binding type "{return_binding_name}"') + f'does not match binding type "{binding_name}"') if has_implicit_return and 'return' in annotations: return_pytype = annotations.get('return') return_type = None if has_explicit_return or has_implicit_return: - return_type = ParamTypeInfo(return_binding_name, return_pytype) - - self._functions[function_id] = FunctionInfo( - func=func, - name=func_name, - directory=metadata.directory, + return_type = ParamTypeInfo(binding_name, return_pytype) + + return return_type + + def add_func_to_registry_and_return_funcinfo(self, function, + function_name: str, + function_id: str, + directory: str, + requires_context: bool, + has_explicit_return: bool, + has_implicit_return: bool, + input_types: typing.Dict[ + str, ParamTypeInfo], + output_types: typing.Dict[ + str, ParamTypeInfo], + return_type: str): + + function_info = FunctionInfo( + func=function, + name=function_name, + directory=directory, requires_context=requires_context, - is_async=inspect.iscoroutinefunction(func), + is_async=inspect.iscoroutinefunction(function), has_return=has_explicit_return or has_implicit_return, input_types=input_types, output_types=output_types, return_type=return_type) + + self._functions[function_id] = function_info + return function_info + + def add_function(self, function_id: str, + func: typing.Callable, + metadata: protos.RpcFunctionMetadata): + func_name = metadata.name + sig = inspect.signature(func) + params = dict(sig.parameters) + annotations = typing.get_type_hints(func) + return_binding_name: typing.Optional[str] = None + has_explicit_return = False + has_implicit_return = False + + bound_params = {} + for binding_name, binding_info in metadata.bindings.items(): + self.validate_binding_direction(binding_name, + binding_info.direction, func_name) + + has_explicit_return, has_implicit_return = \ + self.get_explicit_and_implicit_return( + binding_name, binding_info, has_explicit_return, + has_explicit_return, bound_params) + + return_binding_name = self.get_return_binding(binding_name, + binding_info.type, + return_binding_name) + + requires_context = self.is_context_required(params, bound_params, + annotations, + func_name) + + input_types, output_types = self.validate_function_params(params, + bound_params, + annotations, + func_name) + + return_type = \ + self.get_function_return_type(annotations, + has_explicit_return, + has_implicit_return, + return_binding_name, + func_name) + + self.add_func_to_registry_and_return_funcinfo(func, func_name, + function_id, + metadata.directory, + requires_context, + has_explicit_return, + has_implicit_return, + input_types, + output_types, return_type) + + def add_indexed_function(self, function_id: str, + function: Function): + func = function.get_user_function() + func_name = function.get_function_name() + return_binding_name: typing.Optional[str] = None + has_explicit_return = False + has_implicit_return = False + + sig = inspect.signature(func) + params = dict(sig.parameters) + annotations = typing.get_type_hints(func) + func_dir = str(pathlib.Path(inspect.getfile(func)).parent) + + bound_params = {} + for binding in function.get_bindings(): + self.validate_binding_direction(binding.name, + binding.direction, + func_name) + + has_explicit_return, has_implicit_return = \ + self.get_explicit_and_implicit_return( + binding.name, binding, has_explicit_return, + has_implicit_return, bound_params) + + return_binding_name = self.get_return_binding(binding.name, + binding.type, + return_binding_name) + + requires_context = self.is_context_required(params, bound_params, + annotations, + func_name) + + input_types, output_types = self.validate_function_params(params, + bound_params, + annotations, + func_name) + + return_type = \ + self.get_function_return_type(annotations, + has_explicit_return, + has_implicit_return, + return_binding_name, + func_name) + + return \ + self.add_func_to_registry_and_return_funcinfo(func, func_name, + function_id, + func_dir, + requires_context, + has_explicit_return, + has_implicit_return, + input_types, + output_types, + return_type) diff --git a/azure_functions_worker/loader.py b/azure_functions_worker/loader.py index 2dd260654..e5120a2b9 100644 --- a/azure_functions_worker/loader.py +++ b/azure_functions_worker/loader.py @@ -1,8 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. """Python functions loader.""" - - import importlib import importlib.machinery import importlib.util @@ -10,10 +8,15 @@ import os.path import pathlib import sys -import typing +import uuid from os import PathLike, fspath +from typing import List, Optional, Dict + +from azure.functions import Function, FunctionApp -from .constants import MODULE_NOT_FOUND_TS_URL +from . import protos, functions +from .constants import MODULE_NOT_FOUND_TS_URL, SCRIPT_FILE_NAME, \ + PYTHON_LANGUAGE_RUNTIME from .utils.wrappers import attach_message_to_exception _AZURE_NAMESPACE = '__app__' @@ -44,6 +47,45 @@ def uninstall() -> None: pass +def build_binding_protos(indexed_function: List[Function]) -> Dict: + binding_protos = {} + for binding in indexed_function.get_bindings(): + binding_protos[binding.name] = protos.BindingInfo( + type=binding.type, + data_type=binding.data_type, + direction=binding.direction) + + return binding_protos + + +def process_indexed_function(functions_registry: functions.Registry, + indexed_functions: List[Function]): + fx_metadata_results = [] + for indexed_function in indexed_functions: + function_id = str(uuid.uuid4()) + function_info = functions_registry.add_indexed_function( + function_id, + function=indexed_function) + + binding_protos = build_binding_protos(indexed_function) + + function_metadata = protos.RpcFunctionMetadata( + name=function_info.name, + function_id=function_id, + managed_dependency_enabled=False, # only enabled for PowerShell + directory=function_info.directory, + script_file=indexed_function.function_script_file, + entry_point=function_info.name, + is_proxy=False, # not supported in V4 + language=PYTHON_LANGUAGE_RUNTIME, + bindings=binding_protos, + raw_bindings=indexed_function.get_raw_bindings()) + + fx_metadata_results.append(function_metadata) + + return fx_metadata_results + + @attach_message_to_exception( expt_type=ImportError, message=f'Please check the requirements.txt file for the missing module. ' @@ -51,7 +93,7 @@ def uninstall() -> None: f' guide: {MODULE_NOT_FOUND_TS_URL} ' ) def load_function(name: str, directory: str, script_file: str, - entry_point: typing.Optional[str]): + entry_point: Optional[str]): dir_path = pathlib.Path(directory) script_path = pathlib.Path(script_file) if script_file else pathlib.Path( _DEFAULT_SCRIPT_FILENAME) @@ -93,3 +135,27 @@ def load_function(name: str, directory: str, script_file: str, f'present in {rel_script_path}') return func + + +@attach_message_to_exception( + expt_type=ImportError, + message=f'Troubleshooting Guide: {MODULE_NOT_FOUND_TS_URL}' +) +def index_function_app(function_path: str) -> List[Function]: + module_name = pathlib.Path(function_path).stem + imported_module = importlib.import_module(module_name) + + app: Optional[FunctionApp] = None + for i in imported_module.__dir__(): + if isinstance(getattr(imported_module, i, None), FunctionApp): + if not app: + app = getattr(imported_module, i, None) + else: + raise ValueError( + "Multiple instances of FunctionApp are defined") + + if not app: + raise ValueError("Could not find instance of FunctionApp in " + f"{SCRIPT_FILE_NAME}.") + + return app.get_functions() diff --git a/azure_functions_worker/logging.py b/azure_functions_worker/logging.py index 4f8fac2ea..cd8a0be99 100644 --- a/azure_functions_worker/logging.py +++ b/azure_functions_worker/logging.py @@ -1,10 +1,10 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from typing import Optional import logging import logging.handlers import sys +from typing import Optional # Logging Prefixes CONSOLE_LOG_PREFIX = "LanguageWorkerConsoleLog" diff --git a/azure_functions_worker/protos/__init__.py b/azure_functions_worker/protos/__init__.py index 827a0df72..cd4d495c5 100644 --- a/azure_functions_worker/protos/__init__.py +++ b/azure_functions_worker/protos/__init__.py @@ -28,4 +28,6 @@ RpcSharedMemory, RpcDataType, CloseSharedMemoryResourcesRequest, - CloseSharedMemoryResourcesResponse) + CloseSharedMemoryResourcesResponse, + FunctionsMetadataRequest, + FunctionMetadataResponse) diff --git a/azure_functions_worker/protos/_src/src/proto/FunctionRpc.proto b/azure_functions_worker/protos/_src/src/proto/FunctionRpc.proto index 94b0734db..734211c45 100644 --- a/azure_functions_worker/protos/_src/src/proto/FunctionRpc.proto +++ b/azure_functions_worker/protos/_src/src/proto/FunctionRpc.proto @@ -67,7 +67,7 @@ message StreamingMessage { // Worker logs a message back to the host RpcLog rpc_log = 2; - + FunctionEnvironmentReloadRequest function_environment_reload_request = 25; FunctionEnvironmentReloadResponse function_environment_reload_response = 26; @@ -78,14 +78,20 @@ message StreamingMessage { // Worker indexing message types FunctionsMetadataRequest functions_metadata_request = 29; - FunctionMetadataResponses function_metadata_responses = 30; + FunctionMetadataResponse function_metadata_response = 30; + + // Host sends required metadata to worker to load functions + FunctionLoadRequestCollection function_load_request_collection = 31; + + // Host gets the list of function load responses + FunctionLoadResponseCollection function_load_response_collection = 32; } } // Process.Start required info // connection details // protocol type -// protocol version +// protocol version // Worker sends the host information identifying itself message StartStream { @@ -93,7 +99,7 @@ message StartStream { string worker_id = 2; } -// Host requests the worker to initialize itself +// Host requests the worker to initialize itself message WorkerInitRequest { // version of the host sending init request string host_version = 1; @@ -107,6 +113,9 @@ message WorkerInitRequest { // Full path of worker.config.json location string worker_directory = 4; + + // base directory for function app + string function_app_directory = 5; } // Worker responds with the result of initializing itself @@ -181,7 +190,7 @@ message WorkerActionResponse { Restart = 0; Reload = 1; } - + // action for this response Action action = 1; @@ -220,7 +229,17 @@ message CloseSharedMemoryResourcesResponse { map close_map_results = 1; } -// Host tells the worker to load a Function +// Host tells the worker to load a list of Functions +message FunctionLoadRequestCollection { + repeated FunctionLoadRequest function_load_requests = 1; +} + +// Host gets the list of function load responses +message FunctionLoadResponseCollection { + repeated FunctionLoadResponse function_load_responses = 1; +} + +// Load request of a single Function message FunctionLoadRequest { // unique function identifier (avoid name collisions, facilitate reload case) string function_id = 1; @@ -252,7 +271,7 @@ message RpcFunctionMetadata { // base directory for the Function string directory = 1; - + // Script file specified string script_file = 2; @@ -273,6 +292,12 @@ message RpcFunctionMetadata { // Raw binding info repeated string raw_bindings = 10; + + // unique function identifier (avoid name collisions, facilitate reload case) + string function_id = 13; + + // A flag indicating if managed dependency is enabled or not + bool managed_dependency_enabled = 14; } // Host tells worker it is ready to receive metadata @@ -282,12 +307,15 @@ message FunctionsMetadataRequest { } // Worker sends function metadata back to host -message FunctionMetadataResponses { +message FunctionMetadataResponse { // list of function indexing responses - repeated FunctionLoadRequest function_load_requests_results = 1; + repeated RpcFunctionMetadata function_metadata_results = 1; // status of overall metadata request StatusResult result = 2; + + // if set to true then host will perform indexing + bool use_default_metadata_indexing = 3; } // Host requests worker to invoke a Function @@ -464,7 +492,7 @@ message BindingInfo { DataType data_type = 4; } -// Used to send logs back to the Host +// Used to send logs back to the Host message RpcLog { // Matching ILogger semantics // https://github.com/aspnet/Logging/blob/9506ccc3f3491488fe88010ef8b9eb64594abf95/src/Microsoft.Extensions.Logging/Logger.cs @@ -481,8 +509,9 @@ message RpcLog { // Category of the log. Defaults to User if not specified. enum RpcLogCategory { - User = 0; - System = 1; + User = 0; + System = 1; + CustomMetric = 2; } // Unique id for invocation (if exists) @@ -504,14 +533,17 @@ message RpcLog { // Exception (if exists) RpcException exception = 6; - // json serialized property bag, or could use a type scheme like map + // json serialized property bag string properties = 7; - // Category of the log. Either user(default) or system. + // Category of the log. Either user(default), system, or custom metric. RpcLogCategory log_category = 8; + + // strongly-typed (ish) property bag + map propertiesMap = 9; } -// Encapsulates an Exception +// Encapsulates an Exception message RpcException { // Source of the exception string source = 3; @@ -565,7 +597,7 @@ message RpcHttpCookie { // TODO - solidify this or remove it message RpcHttp { string method = 1; - string url = 2; + string url = 2; map headers = 3; TypedData body = 4; map params = 10; diff --git a/azure_functions_worker/testutils.py b/azure_functions_worker/testutils.py index 4f04ae4ab..e06a01386 100644 --- a/azure_functions_worker/testutils.py +++ b/azure_functions_worker/testutils.py @@ -33,22 +33,21 @@ import grpc import requests - +from azure_functions_worker import dispatcher +from azure_functions_worker import protos from azure_functions_worker._thirdparty import aio_compat from azure_functions_worker.bindings.shared_memory_data_transfer \ import FileAccessorFactory from azure_functions_worker.bindings.shared_memory_data_transfer \ import SharedMemoryConstants as consts -from . import dispatcher -from . import protos -from .constants import ( +from azure_functions_worker.constants import ( PYAZURE_WEBHOST_DEBUG, PYAZURE_WORKER_DIR, PYAZURE_INTEGRATION_TEST, FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED, UNIX_SHARED_MEMORY_DIRECTORIES ) -from .utils.common import is_envvar_true, get_app_setting +from azure_functions_worker.utils.common import is_envvar_true, get_app_setting PROJECT_ROOT = pathlib.Path(__file__).parent.parent TESTS_ROOT = PROJECT_ROOT / 'tests' @@ -499,6 +498,18 @@ async def init_worker(self, host_version: str): return r + async def get_functions_metadata(self): + r = await self.communicate( + protos.StreamingMessage( + functions_metadata_request=protos.FunctionsMetadataRequest( + function_app_directory=str(self._scripts_dir) + ) + ), + wait_for='function_metadata_response' + ) + + return r + async def load_function(self, name): if name not in self._available_functions: raise RuntimeError(f'cannot load function {name}') @@ -714,13 +725,14 @@ async def __aexit__(self, *exc): def start_mockhost(*, script_root=FUNCS_PATH): - tests_dir = TESTS_ROOT - scripts_dir = tests_dir / script_root + scripts_dir = TESTS_ROOT / script_root if not (scripts_dir.exists() and scripts_dir.is_dir()): raise RuntimeError( f'invalid script_root argument: ' f'{scripts_dir} directory does not exist') + sys.path.append(str(scripts_dir)) + return _MockWebHostController(scripts_dir) @@ -765,6 +777,7 @@ def popen_webhost(*, stdout, stderr, script_root=FUNCS_PATH, port=None): testconfig.read(WORKER_CONFIG) hostexe_args = [] + os.environ['AzureWebJobsFeatureFlags'] = 'EnableWorkerIndexing' # If we want to use core-tools coretools_exe = os.environ.get('CORE_TOOLS_EXE_PATH') @@ -836,7 +849,8 @@ def popen_webhost(*, stdout, stderr, script_root=FUNCS_PATH, port=None): 'languageWorkers:python:workerDirectory': str(worker_path), 'host:logger:consoleLoggingMode': 'always', 'AZURE_FUNCTIONS_ENVIRONMENT': 'development', - 'AzureWebJobsSecretStorageType': 'files' + 'AzureWebJobsSecretStorageType': 'files', + 'FUNCTIONS_WORKER_RUNTIME': 'python' } # In E2E Integration mode, we should use the core tools worker @@ -898,39 +912,6 @@ def start_webhost(*, script_dir=None, stdout=None): time.sleep(10) # Giving host some time to start fully. addr = f'http://{LOCALHOST}:{port}' - health_check_endpoint = f'{addr}/api/ping' - host_out = "" - if stdout is not None and hasattr(stdout, - "readable") and stdout.readable(): - host_out = stdout.readlines(100) - - for _ in range(5): - try: - r = requests.get(health_check_endpoint, - params={'code': 'testFunctionKey'}) - # Give the host a bit more time to settle - time.sleep(2) - - if 200 <= r.status_code < 300: - # Give the host a bit more time to settle - time.sleep(1) - break - else: - print(f'Failed to ping {health_check_endpoint}, status code: ' - f'{r.status_code}', flush=True) - except requests.exceptions.ConnectionError: - pass - time.sleep(1) - else: - proc.terminate() - try: - proc.wait(20) - except subprocess.TimeoutExpired: - proc.kill() - raise RuntimeError('could not start the webworker in time. Please' - f' check the log file for details: {stdout.name} \n' - f' Captured WebHost stdout:\n{host_out}') - return _WebHostProxy(proc, addr) @@ -985,7 +966,6 @@ def _symlink_dir(src, dst): def _setup_func_app(app_root): extensions = app_root / 'bin' - ping_func = app_root / 'ping' host_json = app_root / 'host.json' extensions_csproj_file = app_root / 'extensions.csproj' @@ -997,18 +977,16 @@ def _setup_func_app(app_root): with open(extensions_csproj_file, 'w') as f: f.write(EXTENSION_CSPROJ_TEMPLATE) - _symlink_dir(TESTS_ROOT / 'common' / 'ping', ping_func) _symlink_dir(EXTENSIONS_PATH, extensions) def _teardown_func_app(app_root): extensions = app_root / 'bin' - ping_func = app_root / 'ping' host_json = app_root / 'host.json' extensions_csproj_file = app_root / 'extensions.csproj' extensions_obj_file = app_root / 'obj' - for path in (extensions, ping_func, host_json, extensions_csproj_file, + for path in (extensions, host_json, extensions_csproj_file, extensions_obj_file): remove_path(path) diff --git a/python/prodV4/worker.config.json b/python/prodV4/worker.config.json index 69e1adab1..10c01b56f 100644 --- a/python/prodV4/worker.config.json +++ b/python/prodV4/worker.config.json @@ -7,6 +7,7 @@ "supportedArchitectures":["X64", "X86"], "extensions":[".py"], "defaultExecutablePath":"python", - "defaultWorkerPath":"%FUNCTIONS_WORKER_RUNTIME_VERSION%/{os}/{architecture}/worker.py" + "defaultWorkerPath":"%FUNCTIONS_WORKER_RUNTIME_VERSION%/{os}/{architecture}/worker.py", + "workerIndexing": "true" } } \ No newline at end of file diff --git a/python/test/worker.config.json b/python/test/worker.config.json index 0c38868bf..3fc2a9236 100644 --- a/python/test/worker.config.json +++ b/python/test/worker.config.json @@ -3,6 +3,7 @@ "language":"python", "extensions":[".py"], "defaultExecutablePath":"python", - "defaultWorkerPath":"worker.py" + "defaultWorkerPath":"worker.py", + "workerIndexing": "true" } } diff --git a/tests/endtoend/blob_functions/blob_functions_stein/function_app.py b/tests/endtoend/blob_functions/blob_functions_stein/function_app.py new file mode 100644 index 000000000..e30161f55 --- /dev/null +++ b/tests/endtoend/blob_functions/blob_functions_stein/function_app.py @@ -0,0 +1,393 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import hashlib +import io +import json +import random +import string + +import azure.functions as func + +app = func.FunctionApp() + + +@app.function_name(name="blob_trigger") +@app.blob_trigger(arg_name="file", + path="python-worker-tests/test-blob-trigger.txt", + connection="AzureWebJobsStorage") +@app.write_blob(arg_name="$return", + path="python-worker-tests/test-blob-triggered.txt", + connection="AzureWebJobsStorage") +def blob_trigger(file: func.InputStream) -> str: + return json.dumps({ + 'name': file.name, + 'length': file.length, + 'content': file.read().decode('utf-8') + }) + + +@app.function_name(name="get_blob_as_bytes") +@app.route(route="get_blob_as_bytes") +@app.read_blob(arg_name="file", + path="python-worker-tests/test-bytes.txt", + data_type="BINARY", + connection="AzureWebJobsStorage") +def get_blob_as_bytes(req: func.HttpRequest, file: bytes) -> str: + assert isinstance(file, bytes) + return file.decode('utf-8') + + +@app.function_name(name="get_blob_as_bytes_return_http_response") +@app.route(route="get_blob_as_bytes_return_http_response") +@app.read_blob(arg_name="file", + path="python-worker-tests/shmem-test-bytes.txt", + data_type="BINARY", + connection="AzureWebJobsStorage") +def get_blob_as_bytes_return_http_response(req: func.HttpRequest, file: bytes) \ + -> func.HttpResponse: + """ + Read a blob (bytes) and respond back (in HTTP response) with the number of + bytes read and the MD5 digest of the content. + """ + assert isinstance(file, bytes) + + content_size = len(file) + content_md5 = hashlib.md5(file).hexdigest() + + response_dict = { + 'content_size': content_size, + 'content_md5': content_md5 + } + + response_body = json.dumps(response_dict, indent=2) + + return func.HttpResponse( + body=response_body, + mimetype="application/json", + status_code=200 + ) + + +@app.function_name(name="get_blob_as_bytes_stream_return_http_response") +@app.route(route="get_blob_as_bytes_stream_return_http_response") +@app.read_blob(arg_name="file", + path="python-worker-tests/shmem-test-bytes.txt", + data_type="BINARY", + connection="AzureWebJobsStorage") +def get_blob_as_bytes_stream_return_http_response(req: func.HttpRequest, + file: func.InputStream) \ + -> func.HttpResponse: + """ + Read a blob (as azf.InputStream) and respond back (in HTTP response) with + the number of bytes read and the MD5 digest of the content. + """ + file_bytes = file.read() + + content_size = len(file_bytes) + content_md5 = hashlib.md5(file_bytes).hexdigest() + + response_dict = { + 'content_size': content_size, + 'content_md5': content_md5 + } + + response_body = json.dumps(response_dict, indent=2) + + return func.HttpResponse( + body=response_body, + mimetype="application/json", + status_code=200 + ) + + +@app.function_name(name="get_blob_as_str") +@app.route(route="get_blob_as_str") +@app.read_blob(arg_name="file", + path="python-worker-tests/test-str.txt", + data_type="STRING", + connection="AzureWebJobsStorage") +def get_blob_as_str(req: func.HttpRequest, file: str) -> str: + assert isinstance(file, str) + return file + + +@app.function_name(name="get_blob_as_str_return_http_response") +@app.route(route="get_blob_as_str_return_http_response") +@app.read_blob(arg_name="file", + path="python-worker-tests/shmem-test-bytes.txt", + data_type="STRING", + connection="AzureWebJobsStorage") +def get_blob_as_str_return_http_response(req: func.HttpRequest, + file: str) -> func.HttpResponse: + """ + Read a blob (string) and respond back (in HTTP response) with the number of + characters read and the MD5 digest of the utf-8 encoded content. + """ + assert isinstance(file, str) + + num_chars = len(file) + content_bytes = file.encode('utf-8') + content_md5 = hashlib.md5(content_bytes).hexdigest() + + response_dict = { + 'num_chars': num_chars, + 'content_md5': content_md5 + } + + response_body = json.dumps(response_dict, indent=2) + + return func.HttpResponse( + body=response_body, + mimetype="application/json", + status_code=200 + ) + + +@app.function_name(name="get_blob_bytes") +@app.route(route="get_blob_bytes") +@app.read_blob(arg_name="file", + path="python-worker-tests/test-bytes.txt", + connection="AzureWebJobsStorage") +def get_blob_bytes(req: func.HttpRequest, file: func.InputStream) -> str: + return file.read().decode('utf-8') + + +@app.function_name(name="get_blob_filelike") +@app.route(route="get_blob_filelike") +@app.read_blob(arg_name="file", + path="python-worker-tests/test-filelike.txt", + connection="AzureWebJobsStorage") +def get_blob_filelike(req: func.HttpRequest, file: func.InputStream) -> str: + return file.read().decode('utf-8') + + +@app.function_name(name="get_blob_return") +@app.route(route="get_blob_return") +@app.read_blob(arg_name="file", + path="python-worker-tests/test-return.txt", + connection="AzureWebJobsStorage") +def get_blob_return(req: func.HttpRequest, file: func.InputStream) -> str: + return file.read().decode('utf-8') + + +@app.function_name(name="get_blob_str") +@app.route(route="get_blob_str") +@app.read_blob(arg_name="file", + path="python-worker-tests/test-str.txt", + connection="AzureWebJobsStorage") +def get_blob_str(req: func.HttpRequest, file: func.InputStream) -> str: + return file.read().decode('utf-8') + + +@app.function_name(name="get_blob_triggered") +@app.read_blob(arg_name="file", + path="python-worker-tests/test-blob-triggered.txt", + connection="AzureWebJobsStorage") +@app.route(route="get_blob_triggered") +def get_blob_triggered(req: func.HttpRequest, file: func.InputStream) -> str: + return file.read().decode('utf-8') + + +@app.function_name(name="put_blob_as_bytes_return_http_response") +@app.write_blob(arg_name="file", + path="python-worker-tests/shmem-test-bytes-out.txt", + data_type="BINARY", + connection="AzureWebJobsStorage") +@app.route(route="put_blob_as_bytes_return_http_response") +def put_blob_as_bytes_return_http_response(req: func.HttpRequest, + file: func.Out[ + bytes]) -> func.HttpResponse: + """ + Write a blob (bytes) and respond back (in HTTP response) with the number of + bytes written and the MD5 digest of the content. + The number of bytes to write are specified in the input HTTP request. + """ + content_size = int(req.params['content_size']) + + # When this is set, then 0x01 byte is repeated content_size number of + # times to use as input. + # This is to avoid generating random input for large size which can be + # slow. + if 'no_random_input' in req.params: + content = b'\x01' * content_size + else: + content = bytearray(random.getrandbits(8) for _ in range(content_size)) + content_md5 = hashlib.md5(content).hexdigest() + + file.set(content) + + response_dict = { + 'content_size': content_size, + 'content_md5': content_md5 + } + + response_body = json.dumps(response_dict, indent=2) + + return func.HttpResponse( + body=response_body, + mimetype="application/json", + status_code=200 + ) + + +@app.function_name(name="put_blob_as_str_return_http_response") +@app.write_blob(arg_name="file", + path="python-worker-tests/shmem-test-str-out.txt", + data_type="STRING", + connection="AzureWebJobsStorage") +@app.route(route="put_blob_as_str_return_http_response") +def put_blob_as_str_return_http_response(req: func.HttpRequest, file: func.Out[ + str]) -> func.HttpResponse: + """ + Write a blob (string) and respond back (in HTTP response) with the number of + characters written and the MD5 digest of the utf-8 encoded content. + The number of characters to write are specified in the input HTTP request. + """ + num_chars = int(req.params['num_chars']) + + content = ''.join(random.choices(string.ascii_uppercase + string.digits, + k=num_chars)) + content_bytes = content.encode('utf-8') + content_size = len(content_bytes) + content_md5 = hashlib.md5(content_bytes).hexdigest() + + file.set(content) + + response_dict = { + 'num_chars': num_chars, + 'content_size': content_size, + 'content_md5': content_md5 + } + + response_body = json.dumps(response_dict, indent=2) + + return func.HttpResponse( + body=response_body, + mimetype="application/json", + status_code=200 + ) + + +@app.function_name(name="put_blob_bytes") +@app.write_blob(arg_name="file", + path="python-worker-tests/test-bytes.txt", + connection="AzureWebJobsStorage") +@app.route(route="put_blob_bytes") +def put_blob_bytes(req: func.HttpRequest, file: func.Out[bytes]) -> str: + file.set(req.get_body()) + return 'OK' + + +@app.function_name(name="put_blob_filelike") +@app.write_blob(arg_name="file", + path="python-worker-tests/test-filelike.txt", + connection="AzureWebJobsStorage") +@app.route(route="put_blob_filelike") +def put_blob_filelike(req: func.HttpRequest, + file: func.Out[io.StringIO]) -> str: + file.set(io.StringIO('filelike')) + return 'OK' + + +@app.function_name(name="put_blob_return") +@app.write_blob(arg_name="$return", + path="python-worker-tests/test-return.txt", + connection="AzureWebJobsStorage") +@app.route(route="put_blob_return", binding_arg_name="resp") +def put_blob_return(req: func.HttpRequest, + resp: func.Out[func.HttpResponse]) -> str: + return 'FROM RETURN' + + +@app.function_name(name="put_blob_str") +@app.write_blob(arg_name="file", + path="python-worker-tests/test-str.txt", + connection="AzureWebJobsStorage") +@app.route(route="put_blob_str") +def put_blob_str(req: func.HttpRequest, file: func.Out[str]) -> str: + file.set(req.get_body()) + return 'OK' + + +@app.function_name(name="put_blob_trigger") +@app.write_blob(arg_name="file", + path="python-worker-tests/test-blob-trigger.txt", + connection="AzureWebJobsStorage") +@app.route(route="put_blob_trigger") +def put_blob_trigger(req: func.HttpRequest, file: func.Out[str]) -> str: + file.set(req.get_body()) + return 'OK' + + +def _generate_content_and_digest(content_size): + content = bytearray(random.getrandbits(8) for _ in range(content_size)) + content_md5 = hashlib.md5(content).hexdigest() + return content, content_md5 + + +@app.function_name(name="put_get_multiple_blobs_as_bytes_return_http_response") +@app.read_blob(arg_name="inputfile1", + data_type="BINARY", + path="python-worker-tests/shmem-test-bytes-1.txt", + connection="AzureWebJobsStorage") +@app.read_blob(arg_name="inputfile2", + data_type="BINARY", + path="python-worker-tests/shmem-test-bytes-2.txt", + connection="AzureWebJobsStorage") +@app.write_blob(arg_name="outputfile1", + path="python-worker-tests/shmem-test-bytes-out-1.txt", + data_type="BINARY", + connection="AzureWebJobsStorage") +@app.write_blob(arg_name="outputfile2", + path="python-worker-tests/shmem-test-bytes-out-2.txt", + data_type="BINARY", + connection="AzureWebJobsStorage") +@app.route(route="put_get_multiple_blobs_as_bytes_return_http_response") +def put_get_multiple_blobs_as_bytes_return_http_response( + req: func.HttpRequest, + inputfile1: bytes, + inputfile2: bytes, + outputfile1: func.Out[bytes], + outputfile2: func.Out[bytes]) -> func.HttpResponse: + """ + Read two blobs (bytes) and respond back (in HTTP response) with the number + of bytes read from each blob and the MD5 digest of the content of each. + Write two blobs (bytes) and respond back (in HTTP response) with the number + bytes written in each blob and the MD5 digest of the content of each. + The number of bytes to write are specified in the input HTTP request. + """ + input_content_size_1 = len(inputfile1) + input_content_size_2 = len(inputfile2) + + input_content_md5_1 = hashlib.md5(inputfile1).hexdigest() + input_content_md5_2 = hashlib.md5(inputfile2).hexdigest() + + output_content_size_1 = int(req.params['output_content_size_1']) + output_content_size_2 = int(req.params['output_content_size_2']) + + output_content_1, output_content_md5_1 = \ + _generate_content_and_digest(output_content_size_1) + output_content_2, output_content_md5_2 = \ + _generate_content_and_digest(output_content_size_2) + + outputfile1.set(output_content_1) + outputfile2.set(output_content_2) + + response_dict = { + 'input_content_size_1': input_content_size_1, + 'input_content_size_2': input_content_size_2, + 'input_content_md5_1': input_content_md5_1, + 'input_content_md5_2': input_content_md5_2, + 'output_content_size_1': output_content_size_1, + 'output_content_size_2': output_content_size_2, + 'output_content_md5_1': output_content_md5_1, + 'output_content_md5_2': output_content_md5_2 + } + + response_body = json.dumps(response_dict, indent=2) + + return func.HttpResponse( + body=response_body, + mimetype="application/json", + status_code=200 + ) diff --git a/tests/endtoend/cosmosdb_functions/cosmosdb_functions_stein/function_app.py b/tests/endtoend/cosmosdb_functions/cosmosdb_functions_stein/function_app.py new file mode 100644 index 000000000..89badf607 --- /dev/null +++ b/tests/endtoend/cosmosdb_functions/cosmosdb_functions_stein/function_app.py @@ -0,0 +1,47 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import azure.functions as func + +app = func.FunctionApp() + + +@app.route() +@app.read_cosmos_db_documents( + arg_name="docs", database_name="test", + collection_name="items", + id="cosmosdb-input-test", + connection_string_setting="AzureWebJobsCosmosDBConnectionString") +def cosmosdb_input(req: func.HttpRequest, docs: func.DocumentList) -> str: + return func.HttpResponse(docs[0].to_json(), mimetype='application/json') + + +@app.cosmos_db_trigger( + arg_name="docs", database_name="test", + collection_name="items", + lease_collection_name="leases", + connection_string_setting="AzureWebJobsCosmosDBConnectionString", + create_lease_collection_if_not_exists=True) +@app.write_blob(arg_name="$return", connection="AzureWebJobsStorage", + path="python-worker-tests/test-cosmosdb-triggered.txt") +def cosmosdb_trigger(docs: func.DocumentList) -> str: + return docs[0].to_json() + + +@app.route() +@app.read_blob(arg_name="file", connection="AzureWebJobsStorage", + path="python-worker-tests/test-cosmosdb-triggered.txt") +def get_cosmosdb_triggered(req: func.HttpRequest, + file: func.InputStream) -> str: + return file.read().decode('utf-8') + + +@app.route() +@app.write_cosmos_db_documents( + arg_name="doc", database_name="test", + collection_name="items", + create_if_not_exists=True, + connection_string_setting="AzureWebJobsCosmosDBConnectionString") +def put_document(req: func.HttpRequest, doc: func.Out[func.Document]): + doc.set(func.Document.from_json(req.get_body())) + + return 'OK' diff --git a/tests/endtoend/eventhub_functions/eventhub_functions_stein/function_app.py b/tests/endtoend/eventhub_functions/eventhub_functions_stein/function_app.py new file mode 100644 index 000000000..20c1c51ad --- /dev/null +++ b/tests/endtoend/eventhub_functions/eventhub_functions_stein/function_app.py @@ -0,0 +1,108 @@ +import json +import os +import typing + +from azure.eventhub import EventData +from azure.eventhub.aio import EventHubProducerClient + +import azure.functions as func + +app = func.FunctionApp() + + +# An HttpTrigger to generating EventHub event from EventHub Output Binding +@app.function_name(name="eventhub_output") +@app.route(route="eventhub_output") +@app.write_event_hub_message(arg_name="event", + event_hub_name="python-worker-ci-eventhub-one", + connection="AzureWebJobsEventHubConnectionString") +def eventhub_output(req: func.HttpRequest, event: func.Out[str]): + event.set(req.get_body().decode('utf-8')) + return 'OK' + + +# This is an actual EventHub trigger which will convert the event data +# into a storage blob. +@app.function_name(name="eventhub_trigger") +@app.event_hub_message_trigger(arg_name="event", + event_hub_name="python-worker-ci-eventhub-one", + connection="AzureWebJobsEventHubConnectionString" + ) +@app.write_blob(arg_name="$return", + path="python-worker-tests/test-eventhub-triggered.txt", + connection="AzureWebJobsStorage") +def eventhub_trigger(event: func.EventHubEvent) -> bytes: + return event.get_body() + + +# Retrieve the event data from storage blob and return it as Http response +@app.function_name(name="get_eventhub_triggered") +@app.route(route="get_eventhub_triggered") +@app.read_blob(arg_name="file", + path="python-worker-tests/test-eventhub-triggered.txt", + connection="AzureWebJobsStorage") +def get_eventhub_triggered(req: func.HttpRequest, + file: func.InputStream) -> str: + return file.read().decode('utf-8') + + +# Retrieve the event data from storage blob and return it as Http response +@app.function_name(name="get_metadata_triggered") +@app.route(route="get_metadata_triggered") +@app.read_blob(arg_name="file", + path="python-worker-tests/test-metadata-triggered.txt", + connection="AzureWebJobsStorage") +async def get_metadata_triggered(req: func.HttpRequest, + file: func.InputStream) -> str: + return func.HttpResponse(body=file.read().decode('utf-8'), + status_code=200, + mimetype='application/json') + + +# An HttpTrigger to generating EventHub event from azure-eventhub SDK. +# Events generated from azure-eventhub contain the full metadata. +@app.function_name(name="metadata_output") +@app.route(route="metadata_output") +async def metadata_output(req: func.HttpRequest): + # Parse event metadata from http request + json_string = req.get_body().decode('utf-8') + event_dict = json.loads(json_string) + + # Create an EventHub Client and event batch + client = EventHubProducerClient.from_connection_string( + os.getenv('AzureWebJobsEventHubConnectionString'), + eventhub_name='python-worker-ci-eventhub-one-metadata') + + # Generate new event based on http request with full metadata + event_data_batch = await client.create_batch() + event_data_batch.add(EventData(event_dict.get('body'))) + + # Send out event into event hub + try: + await client.send_batch(event_data_batch) + finally: + await client.close() + + return 'OK' + + +@app.function_name(name="metadata_trigger") +@app.event_hub_message_trigger( + arg_name="event", + event_hub_name="python-worker-ci-eventhub-one-metadata", + connection="AzureWebJobsEventHubConnectionString") +@app.write_blob(arg_name="$return", + path="python-worker-tests/test-metadata-triggered.txt", + connection="AzureWebJobsStorage") +async def metadata_trigger(event: func.EventHubEvent) -> bytes: + event_dict: typing.Mapping[str, typing.Any] = { + 'body': event.get_body().decode('utf-8'), + # Uncomment this when the EnqueuedTimeUtc is fixed in azure-functions + # 'enqueued_time': event.enqueued_time.isoformat(), + 'partition_key': event.partition_key, + 'sequence_number': event.sequence_number, + 'offset': event.offset, + 'metadata': event.metadata + } + + return json.dumps(event_dict) diff --git a/tests/endtoend/http_functions/http_functions_stein/function_app.py b/tests/endtoend/http_functions/http_functions_stein/function_app.py new file mode 100644 index 000000000..873fce914 --- /dev/null +++ b/tests/endtoend/http_functions/http_functions_stein/function_app.py @@ -0,0 +1,34 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import logging + +import azure.functions as func + +app = func.FunctionApp() + + +@app.route(route="default_template") +def default_template(req: func.HttpRequest) -> func.HttpResponse: + logging.info('Python HTTP trigger function processed a request.') + + name = req.params.get('name') + if not name: + try: + req_body = req.get_json() + except ValueError: + pass + else: + name = req_body.get('name') + + if name: + return func.HttpResponse( + f"Hello, {name}. This HTTP triggered function " + f"executed successfully.") + else: + return func.HttpResponse( + "This HTTP triggered function executed successfully. " + "Pass a name in the query string or in the request body for a" + " personalized response.", + status_code=200 + ) diff --git a/tests/endtoend/queue_functions/queue_functions_stein/function_app.py b/tests/endtoend/queue_functions/queue_functions_stein/function_app.py new file mode 100644 index 000000000..914cf0797 --- /dev/null +++ b/tests/endtoend/queue_functions/queue_functions_stein/function_app.py @@ -0,0 +1,185 @@ +import json +import logging +import typing + +import azure.functions as func + +app = func.FunctionApp() + + +@app.function_name(name="get_queue_blob") +@app.route(route="get_queue_blob") +@app.read_blob(arg_name="file", + connection="AzureWebJobsStorage", + path="python-worker-tests/test-queue-blob.txt") +def get_queue_blob(req: func.HttpRequest, file: func.InputStream) -> str: + return json.dumps({ + 'queue': json.loads(file.read().decode('utf-8')) + }) + + +@app.function_name(name="get_queue_blob_message_return") +@app.route(route="get_queue_blob_message_return") +@app.read_blob(arg_name="file", + connection="AzureWebJobsStorage", + path="python-worker-tests/test-queue-blob-message-return.txt") +def get_queue_blob_message_return(req: func.HttpRequest, + file: func.InputStream) -> str: + return file.read().decode('utf-8') + + +@app.function_name(name="get_queue_blob_return") +@app.route(route="get_queue_blob_return") +@app.read_blob(arg_name="file", + connection="AzureWebJobsStorage", + path="python-worker-tests/test-queue-blob-return.txt") +def get_queue_blob_return(req: func.HttpRequest, file: func.InputStream) -> str: + return file.read().decode('utf-8') + + +@app.function_name(name="get_queue_untyped_blob_return") +@app.route(route="get_queue_untyped_blob_return") +@app.read_blob(arg_name="file", + connection="AzureWebJobsStorage", + path="python-worker-tests/test-queue-untyped-blob-return.txt") +def get_queue_untyped_blob_return(req: func.HttpRequest, + file: func.InputStream) -> str: + return file.read().decode('utf-8') + + +@app.function_name(name="put_queue") +@app.route(route="put_queue") +@app.write_queue(arg_name="msg", + connection="AzureWebJobsStorage", + queue_name="testqueue") +def put_queue(req: func.HttpRequest, msg: func.Out[str]): + msg.set(req.get_body()) + + return 'OK' + + +@app.function_name(name="put_queue_message_return") +@app.route(route="put_queue_message_return", binding_arg_name="resp") +@app.write_queue(arg_name="$return", + connection="AzureWebJobsStorage", + queue_name="testqueue-message-return") +def main(req: func.HttpRequest, resp: func.Out[str]) -> bytes: + return func.QueueMessage(body=req.get_body()) + + +@app.function_name("put_queue_multiple_out") +@app.route(route="put_queue_multiple_out", binding_arg_name="resp") +@app.write_queue(arg_name="msg", + connection="AzureWebJobsStorage", + queue_name="testqueue-return-multiple-outparam") +def put_queue_multiple_out(req: func.HttpRequest, + resp: func.Out[func.HttpResponse], + msg: func.Out[func.QueueMessage]) -> None: + data = req.get_body().decode() + msg.set(func.QueueMessage(body=data)) + resp.set(func.HttpResponse(body='HTTP response: {}'.format(data))) + + +@app.function_name("put_queue_return") +@app.route(route="put_queue_return", binding_arg_name="resp") +@app.write_queue(arg_name="$return", + connection="AzureWebJobsStorage", + queue_name="testqueue-return") +def put_queue_return(req: func.HttpRequest, resp: func.Out[str]) -> bytes: + return req.get_body() + + +@app.function_name(name="put_queue_multiple_return") +@app.route(route="put_queue_multiple_return") +@app.write_queue(arg_name="msgs", + connection="AzureWebJobsStorage", + queue_name="testqueue-return-multiple") +def put_queue_multiple_return(req: func.HttpRequest, + msgs: func.Out[typing.List[str]]): + msgs.set(['one', 'two']) + + +@app.function_name(name="put_queue_untyped_return") +@app.route(route="put_queue_untyped_return", binding_arg_name="resp") +@app.write_queue(arg_name="$return", + connection="AzureWebJobsStorage", + queue_name="testqueue-untyped-return") +def put_queue_untyped_return(req: func.HttpRequest, + resp: func.Out[str]) -> bytes: + return func.QueueMessage(body=req.get_body()) + + +@app.function_name(name="queue_trigger") +@app.queue_trigger(arg_name="msg", + queue_name="testqueue", + connection="AzureWebJobsStorage") +@app.write_blob(arg_name="$return", + connection="AzureWebJobsStorage", + path="python-worker-tests/test-queue-blob.txt") +def queue_trigger(msg: func.QueueMessage) -> str: + result = json.dumps({ + 'id': msg.id, + 'body': msg.get_body().decode('utf-8'), + 'expiration_time': (msg.expiration_time.isoformat() + if msg.expiration_time else None), + 'insertion_time': (msg.insertion_time.isoformat() + if msg.insertion_time else None), + 'time_next_visible': (msg.time_next_visible.isoformat() + if msg.time_next_visible else None), + 'pop_receipt': msg.pop_receipt, + 'dequeue_count': msg.dequeue_count + }) + + return result + + +@app.function_name(name="queue_trigger_message_return") +@app.queue_trigger(arg_name="msg", + queue_name="testqueue-message-return", + connection="AzureWebJobsStorage") +@app.write_blob(arg_name="$return", + connection="AzureWebJobsStorage", + path="python-worker-tests/test-queue-blob-message-return.txt") +def queue_trigger_message_return(msg: func.QueueMessage) -> bytes: + return msg.get_body() + + +@app.function_name(name="queue_trigger_return") +@app.queue_trigger(arg_name="msg", + queue_name="testqueue-return", + connection="AzureWebJobsStorage") +@app.write_blob(arg_name="$return", + connection="AzureWebJobsStorage", + path="python-worker-tests/test-queue-blob-return.txt") +def queue_trigger_return(msg: func.QueueMessage) -> bytes: + return msg.get_body() + + +@app.function_name(name="queue_trigger_return_multiple") +@app.queue_trigger(arg_name="msg", + queue_name="testqueue-return-multiple", + connection="AzureWebJobsStorage") +def queue_trigger_return_multiple(msg: func.QueueMessage) -> None: + logging.info('trigger on message: %s', msg.get_body().decode('utf-8')) + + +@app.function_name(name="queue_trigger_untyped") +@app.queue_trigger(arg_name="msg", + queue_name="testqueue-untyped-return", + connection="AzureWebJobsStorage") +@app.write_blob(arg_name="$return", + connection="AzureWebJobsStorage", + path="python-worker-tests/test-queue-untyped-blob-return.txt") +def queue_trigger_untyped(msg: str) -> str: + return msg + + +@app.function_name(name="put_queue_return_multiple") +@app.route(route="put_queue_return_multiple", binding_arg_name="resp") +@app.write_queue(arg_name="msgs", + connection="AzureWebJobsStorage", + queue_name="testqueue-return-multiple") +def put_queue_return_multiple(req: func.HttpRequest, + resp: func.Out[str], + msgs: func.Out[typing.List[str]]): + msgs.set(['one', 'two']) diff --git a/tests/endtoend/servicebus_functions/put_message_return/__init__.py b/tests/endtoend/servicebus_functions/put_message_return/__init__.py deleted file mode 100644 index 21f3b275b..000000000 --- a/tests/endtoend/servicebus_functions/put_message_return/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. -import azure.functions as azf - - -def main(req: azf.HttpRequest) -> bytes: - return req.get_body() diff --git a/tests/endtoend/servicebus_functions/put_message_return/function.json b/tests/endtoend/servicebus_functions/put_message_return/function.json deleted file mode 100644 index 7b36bb376..000000000 --- a/tests/endtoend/servicebus_functions/put_message_return/function.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "scriptFile": "__init__.py", - - "bindings": [ - { - "type": "httpTrigger", - "direction": "in", - "name": "req" - }, - { - "direction": "out", - "name": "$return", - "queueName": "testqueue-return", - "connection": "AzureWebJobsServiceBusConnectionString", - "type": "serviceBus" - } - ] -} diff --git a/tests/endtoend/servicebus_functions/servicebus_functions_stein/function_app.py b/tests/endtoend/servicebus_functions/servicebus_functions_stein/function_app.py new file mode 100644 index 000000000..fe6cfea22 --- /dev/null +++ b/tests/endtoend/servicebus_functions/servicebus_functions_stein/function_app.py @@ -0,0 +1,55 @@ +import json + +import azure.functions as func + +app = func.FunctionApp() + + +@app.route(route="put_message") +@app.write_service_bus_queue( + arg_name="msg", + connection="AzureWebJobsServiceBusConnectionString", + queue_name="testqueue") +def put_message(req: func.HttpRequest, msg: func.Out[str]): + msg.set(req.get_body().decode('utf-8')) + return 'OK' + + +@app.route(route="get_servicebus_triggered") +@app.read_blob(arg_name="file", + path="python-worker-tests/test-servicebus-triggered.txt", + connection="AzureWebJobsStorage") +def get_servicebus_triggered(req: func.HttpRequest, + file: func.InputStream) -> str: + return func.HttpResponse( + file.read().decode('utf-8'), mimetype='application/json') + + +@app.service_bus_queue_trigger( + arg_name="msg", + connection="AzureWebJobsServiceBusConnectionString", + queue_name="testqueue") +@app.write_blob(arg_name="$return", + path="python-worker-tests/test-servicebus-triggered.txt", + connection="AzureWebJobsStorage") +def servicebus_trigger(msg: func.ServiceBusMessage) -> str: + result = json.dumps({ + 'message_id': msg.message_id, + 'body': msg.get_body().decode('utf-8'), + 'content_type': msg.content_type, + 'delivery_count': msg.delivery_count, + 'expiration_time': (msg.expiration_time.isoformat() if + msg.expiration_time else None), + 'label': msg.label, + 'partition_key': msg.partition_key, + 'reply_to': msg.reply_to, + 'reply_to_session_id': msg.reply_to_session_id, + 'scheduled_enqueue_time': (msg.scheduled_enqueue_time.isoformat() if + msg.scheduled_enqueue_time else None), + 'session_id': msg.session_id, + 'time_to_live': msg.time_to_live, + 'to': msg.to, + 'user_properties': msg.user_properties, + }) + + return result diff --git a/tests/endtoend/test_blob_functions.py b/tests/endtoend/test_blob_functions.py index 096df55b7..2725ab5ec 100644 --- a/tests/endtoend/test_blob_functions.py +++ b/tests/endtoend/test_blob_functions.py @@ -159,3 +159,11 @@ def test_blob_trigger_with_large_content(self): except AssertionError: if try_no == max_retries - 1: raise + + +class TestBlobFunctionsStein(TestBlobFunctions): + + @classmethod + def get_script_dir(cls): + return testutils.E2E_TESTS_FOLDER / 'blob_functions' / \ + 'blob_functions_stein' diff --git a/tests/endtoend/test_cosmosdb_functions.py b/tests/endtoend/test_cosmosdb_functions.py index 78b2e0c4b..ede7b7642 100644 --- a/tests/endtoend/test_cosmosdb_functions.py +++ b/tests/endtoend/test_cosmosdb_functions.py @@ -80,3 +80,11 @@ def test_cosmosdb_input(self): raise else: break + + +class TestCosmosDBFunctionsStein(TestCosmosDBFunctions): + + @classmethod + def get_script_dir(cls): + return testutils.E2E_TESTS_FOLDER / 'cosmosdb_functions' / \ + 'cosmosdb_functions_stein' diff --git a/tests/endtoend/test_eventhub_functions.py b/tests/endtoend/test_eventhub_functions.py index 250e43122..225ecdbd8 100644 --- a/tests/endtoend/test_eventhub_functions.py +++ b/tests/endtoend/test_eventhub_functions.py @@ -3,6 +3,7 @@ import json import time from datetime import datetime + from dateutil import parser, tz from azure_functions_worker import testutils @@ -97,3 +98,11 @@ def test_eventhub_trigger_with_metadata(self): self.assertIsNone(sys_props['PartitionKey']) self.assertGreaterEqual(sys_props['SequenceNumber'], 0) self.assertIsNotNone(sys_props['Offset']) + + +class TestEventHubFunctionsStein(TestEventHubFunctions): + + @classmethod + def get_script_dir(cls): + return testutils.E2E_TESTS_FOLDER / 'eventhub_functions' / \ + 'eventhub_functions_stein' diff --git a/tests/endtoend/test_http_functions.py b/tests/endtoend/test_http_functions.py index 167833f56..e324c583b 100644 --- a/tests/endtoend/test_http_functions.py +++ b/tests/endtoend/test_http_functions.py @@ -4,6 +4,7 @@ from unittest.mock import patch import requests + from azure_functions_worker import testutils REQUEST_TIMEOUT_SEC = 5 @@ -105,3 +106,11 @@ def test_worker_status_endpoint_should_return_ok_when_disabled(self): params={'checkHealth': '1'}, timeout=REQUEST_TIMEOUT_SEC) self.assertTrue(r.ok) + + +class TestHttpFunctionsStein(TestHttpFunctions): + + @classmethod + def get_script_dir(cls): + return testutils.E2E_TESTS_FOLDER / 'http_functions' /\ + 'http_functions_stein' diff --git a/tests/endtoend/test_queue_functions.py b/tests/endtoend/test_queue_functions.py index dba16287e..4b7b85929 100644 --- a/tests/endtoend/test_queue_functions.py +++ b/tests/endtoend/test_queue_functions.py @@ -91,3 +91,11 @@ def test_queue_return_multiple_outparam(self): f"Returned status code {r.status_code}, " "not in the 200-300 range.") self.assertEqual(r.text, 'HTTP response: foo') + + +class TestQueueFunctionsStein(TestQueueFunctions): + + @classmethod + def get_script_dir(cls): + return testutils.E2E_TESTS_FOLDER / 'queue_functions' / \ + 'queue_functions_stein' diff --git a/tests/endtoend/test_servicebus_functions.py b/tests/endtoend/test_servicebus_functions.py index 613a4c9de..8a9caa785 100644 --- a/tests/endtoend/test_servicebus_functions.py +++ b/tests/endtoend/test_servicebus_functions.py @@ -36,3 +36,11 @@ def test_servicebus_basic(self): raise else: break + + +class TestServiceBusFunctionsStein(TestServiceBusFunctions): + + @classmethod + def get_script_dir(cls): + return testutils.E2E_TESTS_FOLDER / 'servicebus_functions' / \ + 'servicebus_functions_stein' diff --git a/tests/unittests/azure_namespace_import/azure_namespace_import.py b/tests/unittests/azure_namespace_import/azure_namespace_import.py index f71309838..dab6aff49 100644 --- a/tests/unittests/azure_namespace_import/azure_namespace_import.py +++ b/tests/unittests/azure_namespace_import/azure_namespace_import.py @@ -5,8 +5,7 @@ import shutil import asyncio -from azure_functions_worker import protos -from azure_functions_worker import testutils +from azure_functions_worker import protos, testutils async def vertify_nested_namespace_import(): diff --git a/tests/unittests/broken_functions/invalid_stein/function_app.py b/tests/unittests/broken_functions/invalid_stein/function_app.py new file mode 100644 index 000000000..d6ddd39d9 --- /dev/null +++ b/tests/unittests/broken_functions/invalid_stein/function_app.py @@ -0,0 +1,8 @@ +import azure.functions as func + +app = func.FunctionApp() + + +@app.route() +def main(): + pass diff --git a/tests/unittests/http_functions/missing_module/function.json b/tests/unittests/broken_functions/missing_module/function.json similarity index 100% rename from tests/unittests/http_functions/missing_module/function.json rename to tests/unittests/broken_functions/missing_module/function.json diff --git a/tests/unittests/http_functions/missing_module/main.py b/tests/unittests/broken_functions/missing_module/main.py similarity index 100% rename from tests/unittests/http_functions/missing_module/main.py rename to tests/unittests/broken_functions/missing_module/main.py diff --git a/tests/unittests/dispatcher_functions/dispatcher_functions_stein/function_app.py b/tests/unittests/dispatcher_functions/dispatcher_functions_stein/function_app.py new file mode 100644 index 000000000..cf4257282 --- /dev/null +++ b/tests/unittests/dispatcher_functions/dispatcher_functions_stein/function_app.py @@ -0,0 +1,8 @@ +import azure.functions as func + +app = func.FunctionApp() + + +@app.route() +def main(req: func.HttpRequest): + pass diff --git a/tests/unittests/http_functions/http_functions_stein/function_app.py b/tests/unittests/http_functions/http_functions_stein/function_app.py new file mode 100644 index 000000000..7abf2be96 --- /dev/null +++ b/tests/unittests/http_functions/http_functions_stein/function_app.py @@ -0,0 +1,320 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import asyncio +import hashlib +import json +import logging +import sys +import time +from urllib.request import urlopen + +import azure.functions as func + +app = func.FunctionApp() + +logger = logging.getLogger("my-function") + + +@app.route(route="return_str") +def return_str(req: func.HttpRequest) -> str: + return 'Hello World!' + + +@app.route(route="accept_json") +def accept_json(req: func.HttpRequest): + return json.dumps({ + 'method': req.method, + 'url': req.url, + 'headers': dict(req.headers), + 'params': dict(req.params), + 'get_body': req.get_body().decode(), + 'get_json': req.get_json() + }) + + +async def nested(): + try: + 1 / 0 + except ZeroDivisionError: + logger.error('and another error', exc_info=True) + + +@app.route(route="async_logging") +async def async_logging(req: func.HttpRequest): + logger.info('hello %s', 'info') + + await asyncio.sleep(0.1) + + # Create a nested task to check if invocation_id is still + # logged correctly. + await asyncio.ensure_future(nested()) + + await asyncio.sleep(0.1) + + return 'OK-async' + + +@app.route(route="async_return_str") +async def async_return_str(req: func.HttpRequest): + await asyncio.sleep(0.1) + return 'Hello Async World!' + + +@app.route(route="debug_logging") +def debug_logging(req: func.HttpRequest): + logging.critical('logging critical', exc_info=True) + logging.info('logging info', exc_info=True) + logging.warning('logging warning', exc_info=True) + logging.debug('logging debug', exc_info=True) + logging.error('logging error', exc_info=True) + return 'OK-debug' + + +@app.route(route="debug_user_logging") +def debug_user_logging(req: func.HttpRequest): + logger.setLevel(logging.DEBUG) + + logging.critical('logging critical', exc_info=True) + logger.info('logging info', exc_info=True) + logger.warning('logging warning', exc_info=True) + logger.debug('logging debug', exc_info=True) + logger.error('logging error', exc_info=True) + return 'OK-user-debug' + + +# Attempt to log info into system log from customer code +disguised_logger = logging.getLogger('azure_functions_worker') + + +async def parallelly_print(): + await asyncio.sleep(0.1) + print('parallelly_print') + + +async def parallelly_log_info(): + await asyncio.sleep(0.2) + logging.info('parallelly_log_info at root logger') + + +async def parallelly_log_warning(): + await asyncio.sleep(0.3) + logging.warning('parallelly_log_warning at root logger') + + +async def parallelly_log_error(): + await asyncio.sleep(0.4) + logging.error('parallelly_log_error at root logger') + + +async def parallelly_log_exception(): + await asyncio.sleep(0.5) + try: + raise Exception('custom exception') + except Exception: + logging.exception('parallelly_log_exception at root logger', + exc_info=sys.exc_info()) + + +async def parallelly_log_custom(): + await asyncio.sleep(0.6) + logger.info('parallelly_log_custom at custom_logger') + + +async def parallelly_log_system(): + await asyncio.sleep(0.7) + disguised_logger.info('parallelly_log_system at disguised_logger') + + +@app.route(route="hijack_current_event_loop") +async def hijack_current_event_loop(req: func.HttpRequest) -> func.HttpResponse: + loop = asyncio.get_event_loop() + + # Create multiple tasks and schedule it into one asyncio.wait blocker + task_print: asyncio.Task = loop.create_task(parallelly_print()) + task_info: asyncio.Task = loop.create_task(parallelly_log_info()) + task_warning: asyncio.Task = loop.create_task(parallelly_log_warning()) + task_error: asyncio.Task = loop.create_task(parallelly_log_error()) + task_exception: asyncio.Task = loop.create_task(parallelly_log_exception()) + task_custom: asyncio.Task = loop.create_task(parallelly_log_custom()) + task_disguise: asyncio.Task = loop.create_task(parallelly_log_system()) + + # Create an awaitable future and occupy the current event loop resource + future = loop.create_future() + loop.call_soon_threadsafe(future.set_result, 'callsoon_log') + + # WaitAll + await asyncio.wait([task_print, task_info, task_warning, task_error, + task_exception, task_custom, task_disguise, future]) + + # Log asyncio low-level future result + logging.info(future.result()) + + return 'OK-hijack-current-event-loop' + + +@app.route(route="no_return") +def no_return(req: func.HttpRequest): + logger.info('hi') + + +@app.route(route="no_return_returns") +def no_return_returns(req): + return 'ABC' + + +@app.route(route="print_logging") +def print_logging(req: func.HttpRequest): + flush_required = False + is_console_log = False + is_stderr = False + message = req.params.get('message', '') + + if req.params.get('flush') == 'true': + flush_required = True + if req.params.get('console') == 'true': + is_console_log = True + if req.params.get('is_stderr') == 'true': + is_stderr = True + + # Adding LanguageWorkerConsoleLog will make function host to treat + # this as system log and will be propagated to kusto + prefix = 'LanguageWorkerConsoleLog' if is_console_log else '' + print(f'{prefix} {message}'.strip(), + file=sys.stderr if is_stderr else sys.stdout, + flush=flush_required) + + return 'OK-print-logging' + + +@app.route(route="raw_body_bytes") +def raw_body_bytes(req: func.HttpRequest) -> func.HttpResponse: + body = req.get_body() + body_len = str(len(body)) + + headers = {'body-len': body_len} + return func.HttpResponse(body=body, status_code=200, headers=headers) + + +@app.route(route="remapped_context") +def remapped_context(req: func.HttpRequest): + return req.method + + +@app.route(route="return_bytes") +def return_bytes(req: func.HttpRequest): + # This function will fail, as we don't auto-convert "bytes" to "http". + return b'Hello World!' + + +@app.route(route="return_context") +def return_context(req: func.HttpRequest, context: func.Context): + return json.dumps({ + 'method': req.method, + 'ctx_func_name': context.function_name, + 'ctx_func_dir': context.function_directory, + 'ctx_invocation_id': context.invocation_id, + 'ctx_trace_context_Traceparent': context.trace_context.Traceparent, + 'ctx_trace_context_Tracestate': context.trace_context.Tracestate, + }) + + +@app.route(route="return_http") +def return_http(req: func.HttpRequest): + return func.HttpResponse('

Hello World™

', + mimetype='text/html') + + +@app.route(route="return_http_404") +def return_http_404(req: func.HttpRequest): + return func.HttpResponse('bye', status_code=404) + + +@app.route(route="return_http_auth_admin", auth_level=func.AuthLevel.ADMIN) +def return_http_auth_admin(req: func.HttpRequest): + return func.HttpResponse('

Hello World™

', + mimetype='text/html') + + +@app.route(route="return_http_no_body") +def return_http_no_body(req: func.HttpRequest): + return func.HttpResponse() + + +@app.route(route="return_http_redirect") +def return_http_redirect(req: func.HttpRequest): + location = 'return_http?code={}'.format(req.params['code']) + return func.HttpResponse( + status_code=302, + headers={'location': location}) + + +@app.route(route="return_out", binding_arg_name="foo") +def return_out(req: func.HttpRequest, foo: func.Out[func.HttpResponse]): + foo.set(func.HttpResponse(body='hello', status_code=201)) + + +@app.route(route="return_request") +def return_request(req: func.HttpRequest): + params = dict(req.params) + params.pop('code', None) + body = req.get_body() + return json.dumps({ + 'method': req.method, + 'url': req.url, + 'headers': dict(req.headers), + 'params': params, + 'get_body': body.decode(), + 'body_hash': hashlib.sha256(body).hexdigest(), + }) + + +@app.route(route="return_route_params/{param1}/{param2}") +def return_route_params(req: func.HttpRequest) -> str: + return json.dumps(dict(req.route_params)) + + +@app.route(route="sync_logging") +def main(req: func.HttpRequest): + try: + 1 / 0 + except ZeroDivisionError: + logger.error('a gracefully handled error', exc_info=True) + logger.error('a gracefully handled critical error', exc_info=True) + time.sleep(0.05) + return 'OK-sync' + + +@app.route(route="unhandled_error") +def unhandled_error(req: func.HttpRequest): + 1 / 0 + + +@app.route(route="unhandled_urllib_error") +def unhandled_urllib_error(req: func.HttpRequest) -> str: + image_url = req.params.get('img') + urlopen(image_url).read() + + +class UnserializableException(Exception): + def __str__(self): + raise RuntimeError('cannot serialize me') + + +@app.route(route="unhandled_unserializable_error") +def unhandled_unserializable_error(req: func.HttpRequest) -> str: + raise UnserializableException('foo') + + +async def try_log(): + logger.info("try_log") + + +@app.route(route="user_event_loop") +def user_event_loop(req: func.HttpRequest) -> func.HttpResponse: + loop = asyncio.SelectorEventLoop() + asyncio.set_event_loop(loop) + + # This line should throws an asyncio RuntimeError exception + loop.run_until_complete(try_log()) + loop.close() + return 'OK-user-event-loop' diff --git a/tests/unittests/http_functions/http_retries_exponential_backoff/function.json b/tests/unittests/http_functions/http_retries_exponential_backoff/function.json deleted file mode 100644 index 5cc83dad9..000000000 --- a/tests/unittests/http_functions/http_retries_exponential_backoff/function.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "scriptFile": "main.py", - "bindings": [ - { - "type": "httpTrigger", - "direction": "in", - "name": "req" - }, - { - "type": "http", - "direction": "out", - "name": "$return" - } - ], - "retry": { - "strategy": "exponentialBackoff", - "maxRetryCount": 3, - "minimumInterval": "00:00:01", - "maximumInterval": "00:00:05" - } -} diff --git a/tests/unittests/http_functions/http_retries_exponential_backoff/main.py b/tests/unittests/http_functions/http_retries_exponential_backoff/main.py deleted file mode 100644 index 68f45d108..000000000 --- a/tests/unittests/http_functions/http_retries_exponential_backoff/main.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. -import azure.functions -import logging - -logger = logging.getLogger('my function') - - -def main(req: azure.functions.HttpRequest, context: azure.functions.Context): - logger.info(f'Current retry count: {context.retry_context.retry_count}') - logger.info(f'Max retry count: {context.retry_context.max_retry_count}') - - raise Exception("Testing retries") diff --git a/tests/unittests/http_functions/http_retries_fixed_delay/function.json b/tests/unittests/http_functions/http_retries_fixed_delay/function.json deleted file mode 100644 index e86cbc8e0..000000000 --- a/tests/unittests/http_functions/http_retries_fixed_delay/function.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "scriptFile": "main.py", - "bindings": [ - { - "type": "httpTrigger", - "direction": "in", - "name": "req" - }, - { - "type": "http", - "direction": "out", - "name": "$return" - } - ], - "retry": { - "strategy": "fixedDelay", - "maxRetryCount": 3, - "delayInterval": "00:00:01" - } -} diff --git a/tests/unittests/http_functions/http_retries_fixed_delay/main.py b/tests/unittests/http_functions/http_retries_fixed_delay/main.py deleted file mode 100644 index 68f45d108..000000000 --- a/tests/unittests/http_functions/http_retries_fixed_delay/main.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. -import azure.functions -import logging - -logger = logging.getLogger('my function') - - -def main(req: azure.functions.HttpRequest, context: azure.functions.Context): - logger.info(f'Current retry count: {context.retry_context.retry_count}') - logger.info(f'Max retry count: {context.retry_context.max_retry_count}') - - raise Exception("Testing retries") diff --git a/tests/unittests/path_import/path_import.py b/tests/unittests/path_import/path_import.py index 5143bbb63..fd2127d36 100644 --- a/tests/unittests/path_import/path_import.py +++ b/tests/unittests/path_import/path_import.py @@ -5,8 +5,7 @@ import shutil import asyncio -from azure_functions_worker import protos -from azure_functions_worker import testutils +from azure_functions_worker import protos, testutils async def verify_path_imports(): diff --git a/tests/unittests/test_broken_functions.py b/tests/unittests/test_broken_functions.py index ceb3db04a..0be21c84d 100644 --- a/tests/unittests/test_broken_functions.py +++ b/tests/unittests/test_broken_functions.py @@ -1,7 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from azure_functions_worker import protos -from azure_functions_worker import testutils +from azure_functions_worker import protos, testutils class TestMockHost(testutils.AsyncTestCase): @@ -10,7 +9,6 @@ class TestMockHost(testutils.AsyncTestCase): async def test_load_broken__missing_py_param(self): async with testutils.start_mockhost( script_root=self.broken_funcs_dir) as host: - func_id, r = await host.load_function('missing_py_param') self.assertEqual(r.response.function_id, func_id) @@ -26,7 +24,6 @@ async def test_load_broken__missing_py_param(self): async def test_load_broken__missing_json_param(self): async with testutils.start_mockhost( script_root=self.broken_funcs_dir) as host: - func_id, r = await host.load_function('missing_json_param') self.assertEqual(r.response.function_id, func_id) @@ -42,7 +39,6 @@ async def test_load_broken__missing_json_param(self): async def test_load_broken__wrong_param_dir(self): async with testutils.start_mockhost( script_root=self.broken_funcs_dir) as host: - func_id, r = await host.load_function('wrong_param_dir') self.assertEqual(r.response.function_id, func_id) @@ -57,7 +53,6 @@ async def test_load_broken__wrong_param_dir(self): async def test_load_broken__bad_out_annotation(self): async with testutils.start_mockhost( script_root=self.broken_funcs_dir) as host: - func_id, r = await host.load_function('bad_out_annotation') self.assertEqual(r.response.function_id, func_id) @@ -72,7 +67,6 @@ async def test_load_broken__bad_out_annotation(self): async def test_load_broken__wrong_binding_dir(self): async with testutils.start_mockhost( script_root=self.broken_funcs_dir) as host: - func_id, r = await host.load_function('wrong_binding_dir') self.assertEqual(r.response.function_id, func_id) @@ -88,7 +82,6 @@ async def test_load_broken__wrong_binding_dir(self): async def test_load_broken__invalid_context_param(self): async with testutils.start_mockhost( script_root=self.broken_funcs_dir) as host: - func_id, r = await host.load_function('invalid_context_param') self.assertEqual(r.response.function_id, func_id) @@ -103,7 +96,6 @@ async def test_load_broken__invalid_context_param(self): async def test_load_broken__syntax_error(self): async with testutils.start_mockhost( script_root=self.broken_funcs_dir) as host: - func_id, r = await host.load_function('syntax_error') self.assertEqual(r.response.function_id, func_id) @@ -115,7 +107,6 @@ async def test_load_broken__syntax_error(self): async def test_load_broken__module_not_found_error(self): async with testutils.start_mockhost( script_root=self.broken_funcs_dir) as host: - func_id, r = await host.load_function('module_not_found_error') self.assertEqual(r.response.function_id, func_id) @@ -128,7 +119,6 @@ async def test_load_broken__module_not_found_error(self): async def test_load_broken__import_error(self): async with testutils.start_mockhost( script_root=self.broken_funcs_dir) as host: - func_id, r = await host.load_function('import_error') self.assertEqual(r.response.function_id, func_id) @@ -145,7 +135,6 @@ async def test_load_broken__import_error(self): async def test_load_broken__inout_param(self): async with testutils.start_mockhost( script_root=self.broken_funcs_dir) as host: - func_id, r = await host.load_function('inout_param') self.assertEqual(r.response.function_id, func_id) @@ -160,7 +149,6 @@ async def test_load_broken__inout_param(self): async def test_load_broken__return_param_in(self): async with testutils.start_mockhost( script_root=self.broken_funcs_dir) as host: - func_id, r = await host.load_function('return_param_in') self.assertEqual(r.response.function_id, func_id) @@ -175,7 +163,6 @@ async def test_load_broken__return_param_in(self): async def test_load_broken__invalid_return_anno(self): async with testutils.start_mockhost( script_root=self.broken_funcs_dir) as host: - func_id, r = await host.load_function('invalid_return_anno') self.assertEqual(r.response.function_id, func_id) @@ -191,7 +178,6 @@ async def test_load_broken__invalid_return_anno(self): async def test_load_broken__invalid_return_anno_non_type(self): async with testutils.start_mockhost( script_root=self.broken_funcs_dir) as host: - func_id, r = await host.load_function( 'invalid_return_anno_non_type') @@ -207,55 +193,54 @@ async def test_load_broken__invalid_return_anno_non_type(self): async def test_load_broken__invalid_http_trigger_anno(self): async with testutils.start_mockhost( script_root=self.broken_funcs_dir) as host: - func_id, r = await host.load_function('invalid_http_trigger_anno') self.assertEqual(r.response.function_id, func_id) self.assertEqual(r.response.result.status, protos.StatusResult.Failure) - self.assertRegex( + self.assertEqual( r.response.result.exception.message, - r'.*cannot load the invalid_http_trigger_anno function' - r'.*type of req binding .* "httpTrigger" ' - r'does not match its Python annotation "int"') + 'FunctionLoadError: cannot load the invalid_http_trigger_anno ' + 'function: \'req\' binding type "httpTrigger" and dataType "0"' + ' in function.json do not match the corresponding function' + ' parameter\'s Python type annotation "int"') async def test_load_broken__invalid_out_anno(self): async with testutils.start_mockhost( script_root=self.broken_funcs_dir) as host: - func_id, r = await host.load_function('invalid_out_anno') self.assertEqual(r.response.function_id, func_id) self.assertEqual(r.response.result.status, protos.StatusResult.Failure) - self.assertRegex( + self.assertEqual( r.response.result.exception.message, - r'.*cannot load the invalid_out_anno function' - r'.*type of ret binding .* "http" ' - r'does not match its Python annotation "HttpRequest"') + 'FunctionLoadError: cannot load the invalid_out_anno function: ' + '\'ret\' binding type "http" and dataType "0" in function.json' + ' do not match the corresponding function parameter\'s Python' + ' type annotation "HttpRequest"') async def test_load_broken__invalid_in_anno(self): async with testutils.start_mockhost( script_root=self.broken_funcs_dir) as host: - func_id, r = await host.load_function('invalid_in_anno') self.assertEqual(r.response.function_id, func_id) self.assertEqual(r.response.result.status, protos.StatusResult.Failure) - self.assertRegex( + self.assertEqual( r.response.result.exception.message, - r'.*cannot load the invalid_in_anno function' - r'.*type of req binding .* "httpTrigger" ' - r'does not match its Python annotation "HttpResponse"') + 'FunctionLoadError: cannot load the invalid_in_anno function:' + ' \'req\' binding type "httpTrigger" and dataType "0" in ' + 'function.json do not match the corresponding function ' + 'parameter\'s Python type annotation "HttpResponse"') async def test_load_broken__invalid_in_anno_non_type(self): async with testutils.start_mockhost( script_root=self.broken_funcs_dir) as host: - func_id, r = await host.load_function('invalid_in_anno_non_type') self.assertEqual(r.response.function_id, func_id) @@ -266,3 +251,15 @@ async def test_load_broken__invalid_in_anno_non_type(self): r.response.result.exception.message, r'.*cannot load the invalid_in_anno_non_type function: ' r'binding req has invalid non-type annotation 123') + + async def test_import_module_troubleshooting_url(self): + async with testutils.start_mockhost( + script_root=self.broken_funcs_dir) as host: + func_id, r = await host.load_function('missing_module') + + self.assertEqual(r.response.result.status, + protos.StatusResult.Failure) + + self.assertRegex( + r.response.result.exception.message, + r'.*ModuleNotFoundError') diff --git a/tests/unittests/test_dispatcher.py b/tests/unittests/test_dispatcher.py index eefa43506..e52d6e10b 100644 --- a/tests/unittests/test_dispatcher.py +++ b/tests/unittests/test_dispatcher.py @@ -7,8 +7,7 @@ from typing import Optional, Tuple from unittest.mock import patch -from azure_functions_worker import protos -from azure_functions_worker import testutils +from azure_functions_worker import protos, testutils from azure_functions_worker.constants import PYTHON_THREADPOOL_THREAD_COUNT, \ PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT, \ PYTHON_THREADPOOL_THREAD_COUNT_MAX_37, PYTHON_THREADPOOL_THREAD_COUNT_MIN @@ -16,6 +15,12 @@ SysVersionInfo = col.namedtuple("VersionInfo", ["major", "minor", "micro", "releaselevel", "serial"]) DISPATCHER_FUNCTIONS_DIR = testutils.UNIT_TESTS_FOLDER / 'dispatcher_functions' +DISPATCHER_STEIN_FUNCTIONS_DIR = testutils.UNIT_TESTS_FOLDER / \ + 'dispatcher_functions' / \ + 'dispatcher_functions_stein' +DISPATCHER_STEIN_INVALID_FUNCTIONS_DIR = testutils.UNIT_TESTS_FOLDER / \ + 'broken_functions' / \ + 'invalid_stein' class TestThreadPoolSettingsPython37(testutils.AsyncTestCase): @@ -31,6 +36,7 @@ class TestThreadPoolSettingsPython37(testutils.AsyncTestCase): Ref: NEW_TYPING = sys.version_info[:3] >= (3, 7, 0) # PEP 560 """ + def setUp(self): self._ctrl = testutils.start_mockhost( script_root=DISPATCHER_FUNCTIONS_DIR) @@ -121,7 +127,7 @@ async def test_dispatcher_sync_threadpool_set_worker(self): """ # Configure thread pool max worker os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: - f'{self._allowed_max_workers}'}) + f'{self._allowed_max_workers}'}) async with self._ctrl as host: await self._check_if_function_is_ok(host) await self._assert_workers_threadpool(self._ctrl, host, @@ -171,7 +177,7 @@ async def test_dispatcher_sync_threadpool_exceed_max_setting(self): with patch('azure_functions_worker.dispatcher.logger'): # Configure thread pool max worker to an invalid value os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: - f'{self._over_max_workers}'}) + f'{self._over_max_workers}'}) async with self._ctrl as host: await self._check_if_function_is_ok(host) @@ -516,3 +522,59 @@ def tearDown(self): os.environ.update(self._pre_env) self.mock_os_cpu.stop() self.mock_version_info.stop() + + +class TestDispatcherStein(testutils.AsyncTestCase): + + def setUp(self): + self._ctrl = testutils.start_mockhost( + script_root=DISPATCHER_STEIN_FUNCTIONS_DIR) + self._pre_env = dict(os.environ) + self.mock_version_info = patch( + 'azure_functions_worker.dispatcher.sys.version_info', + SysVersionInfo(3, 9, 0, 'final', 0)) + self.mock_version_info.start() + + def tearDown(self): + os.environ.clear() + os.environ.update(self._pre_env) + self.mock_version_info.stop() + + async def test_dispatcher_functions_metadata_request(self): + """Test if the functions metadata response will be sent correctly + when a functions metadata request is received + """ + async with self._ctrl as host: + r = await host.get_functions_metadata() + self.assertIsInstance(r.response, protos.FunctionMetadataResponse) + self.assertFalse(r.response.use_default_metadata_indexing) + self.assertEqual(r.response.result.status, + protos.StatusResult.Success) + + +class TestDispatcherSteinLegacyFallback(testutils.AsyncTestCase): + + def setUp(self): + self._ctrl = testutils.start_mockhost( + script_root=DISPATCHER_FUNCTIONS_DIR) + self._pre_env = dict(os.environ) + self.mock_version_info = patch( + 'azure_functions_worker.dispatcher.sys.version_info', + SysVersionInfo(3, 9, 0, 'final', 0)) + self.mock_version_info.start() + + def tearDown(self): + os.environ.clear() + os.environ.update(self._pre_env) + self.mock_version_info.stop() + + async def test_dispatcher_functions_metadata_request_legacy_fallback(self): + """Test if the functions metadata response will be sent correctly + when a functions metadata request is received + """ + async with self._ctrl as host: + r = await host.get_functions_metadata() + self.assertIsInstance(r.response, protos.FunctionMetadataResponse) + self.assertTrue(r.response.use_default_metadata_indexing) + self.assertEqual(r.response.result.status, + protos.StatusResult.Success) diff --git a/tests/unittests/test_http_functions.py b/tests/unittests/test_http_functions.py index 3b2fe8c00..b514dfce4 100644 --- a/tests/unittests/test_http_functions.py +++ b/tests/unittests/test_http_functions.py @@ -1,16 +1,18 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +import filecmp import hashlib +import os import pathlib -import filecmp import typing -import os + import pytest from azure_functions_worker import testutils +from azure_functions_worker.testutils import WebHostTestCase -class TestHttpFunctions(testutils.WebHostTestCase): +class TestHttpFunctions(WebHostTestCase): @classmethod def get_script_dir(cls): @@ -137,7 +139,6 @@ def test_return_context(self): self.assertEqual(data['method'], 'GET') self.assertEqual(data['ctx_func_name'], 'return_context') - self.assertIn('return_context', data['ctx_func_dir']) self.assertIn('ctx_invocation_id', data) self.assertIn('ctx_trace_context_Tracestate', data) self.assertIn('ctx_trace_context_Traceparent', data) @@ -313,10 +314,6 @@ def test_user_event_loop_error(self): def check_log_user_event_loop_error(self, host_out: typing.List[str]): self.assertIn('try_log', host_out) - def test_import_module_troubleshooting_url(self): - r = self.webhost.request('GET', 'missing_module/') - self.assertEqual(r.status_code, 500) - def check_log_import_module_troubleshooting_url(self, host_out: typing.List[str]): self.assertIn("Exception: ModuleNotFoundError: " @@ -359,11 +356,11 @@ def check_log_print_to_console_stdout(self, host_out: typing.List[str]): def test_print_to_console_stderr(self): r = self.webhost.request('GET', 'print_logging?console=true' - '&message=Secret42&is_stderr=true') + '&message=Secret42&is_stderr=true') self.assertEqual(r.status_code, 200) self.assertEqual(r.text, 'OK-print-logging') - def check_log_print_to_console_stderr(self, host_out: typing.List[str],): + def check_log_print_to_console_stderr(self, host_out: typing.List[str], ): # System logs stderr should not exist in host_out self.assertNotIn('Secret42', host_out) @@ -386,25 +383,18 @@ def check_log_hijack_current_event_loop(self, host_out: typing.List[str]): # System logs should not exist in host_out self.assertNotIn('parallelly_log_system at disguised_logger', host_out) - def test_retry_context_fixed_delay(self): - r = self.webhost.request('GET', 'http_retries_fixed_delay') - self.assertEqual(r.status_code, 500) - def check_log_retry_context_fixed_delay(self, host_out: typing.List[str]): - self.assertIn('Current retry count: 1', host_out) - self.assertIn('Current retry count: 2', host_out) - self.assertIn('Current retry count: 3', host_out) - self.assertNotIn('Current retry count: 4', host_out) - self.assertIn('Max retry count: 3', host_out) +class TestHttpFunctionsStein(TestHttpFunctions): - def test_retry_context_exponential_backoff(self): - r = self.webhost.request('GET', 'http_retries_exponential_backoff') + @classmethod + def get_script_dir(cls): + return testutils.UNIT_TESTS_FOLDER / 'http_functions' / \ + 'http_functions_stein' + + def test_no_return(self): + r = self.webhost.request('GET', 'no_return') self.assertEqual(r.status_code, 500) - def check_log_retry_context_exponential_backoff(self, - host_out: typing.List[str]): - self.assertIn('Current retry count: 1', host_out) - self.assertIn('Current retry count: 2', host_out) - self.assertIn('Current retry count: 3', host_out) - self.assertNotIn('Current retry count: 4', host_out) - self.assertIn('Max retry count: 3', host_out) + def test_no_return_returns(self): + r = self.webhost.request('GET', 'no_return_returns') + self.assertEqual(r.status_code, 200) diff --git a/tests/unittests/test_mock_blob_shared_memory_functions.py b/tests/unittests/test_mock_blob_shared_memory_functions.py index 60204ad39..db263313f 100644 --- a/tests/unittests/test_mock_blob_shared_memory_functions.py +++ b/tests/unittests/test_mock_blob_shared_memory_functions.py @@ -11,8 +11,7 @@ import SharedMemoryMap from azure_functions_worker.bindings.shared_memory_data_transfer \ import SharedMemoryConstants as consts -from azure_functions_worker import protos -from azure_functions_worker import testutils +from azure_functions_worker import protos, testutils @skipIf(sys.platform == 'darwin', 'MacOS M1 machines do not correctly test the' diff --git a/tests/unittests/test_mock_durable_functions.py b/tests/unittests/test_mock_durable_functions.py index 68714a6b4..e43062fa9 100644 --- a/tests/unittests/test_mock_durable_functions.py +++ b/tests/unittests/test_mock_durable_functions.py @@ -1,7 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from azure_functions_worker import protos -from azure_functions_worker import testutils +from azure_functions_worker import protos, testutils class TestDurableFunctions(testutils.AsyncTestCase): diff --git a/tests/unittests/test_mock_eventhub_functions.py b/tests/unittests/test_mock_eventhub_functions.py index 1c0668de6..b8c7eba57 100644 --- a/tests/unittests/test_mock_eventhub_functions.py +++ b/tests/unittests/test_mock_eventhub_functions.py @@ -2,8 +2,7 @@ # Licensed under the MIT License. import json -from azure_functions_worker import protos -from azure_functions_worker import testutils +from azure_functions_worker import protos, testutils class TestEventHubMockFunctions(testutils.AsyncTestCase): diff --git a/tests/unittests/test_mock_generic_functions.py b/tests/unittests/test_mock_generic_functions.py index 1e8b0094b..d08e08538 100644 --- a/tests/unittests/test_mock_generic_functions.py +++ b/tests/unittests/test_mock_generic_functions.py @@ -1,7 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from azure_functions_worker import protos -from azure_functions_worker import testutils +from azure_functions_worker import protos, testutils class TestGenericFunctions(testutils.AsyncTestCase): diff --git a/tests/unittests/test_mock_http_functions.py b/tests/unittests/test_mock_http_functions.py index e00ff32ac..8d1b07b24 100644 --- a/tests/unittests/test_mock_http_functions.py +++ b/tests/unittests/test_mock_http_functions.py @@ -1,7 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from azure_functions_worker import protos -from azure_functions_worker import testutils +from azure_functions_worker import protos, testutils class TestMockHost(testutils.AsyncTestCase): diff --git a/tests/unittests/test_mock_log_filtering_functions.py b/tests/unittests/test_mock_log_filtering_functions.py index 800d4f70d..b20231192 100644 --- a/tests/unittests/test_mock_log_filtering_functions.py +++ b/tests/unittests/test_mock_log_filtering_functions.py @@ -2,7 +2,7 @@ # Licensed under the MIT License. from unittest.mock import patch, call -from azure_functions_worker import testutils, protos +from azure_functions_worker import protos, testutils from azure_functions_worker.logging import is_system_log_category diff --git a/tests/unittests/test_mock_timer_functions.py b/tests/unittests/test_mock_timer_functions.py index 660392070..8b22b97f8 100644 --- a/tests/unittests/test_mock_timer_functions.py +++ b/tests/unittests/test_mock_timer_functions.py @@ -2,8 +2,7 @@ # Licensed under the MIT License. import json -from azure_functions_worker import protos -from azure_functions_worker import testutils +from azure_functions_worker import protos, testutils class TestTimerFunctions(testutils.AsyncTestCase): diff --git a/tests/unittests/test_rpc_messages.py b/tests/unittests/test_rpc_messages.py index 9026efe8c..4d5057540 100644 --- a/tests/unittests/test_rpc_messages.py +++ b/tests/unittests/test_rpc_messages.py @@ -7,8 +7,7 @@ import typing import unittest -from azure_functions_worker import protos -from azure_functions_worker import testutils +from azure_functions_worker import protos, testutils from azure_functions_worker.utils.common import is_python_version