Skip to content

Supporting PyStein programming model in the Worker #965

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 49 commits into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
8a2ea94
Supporting Metadata Indexing request.
vrdmr Aug 3, 2021
0e69f69
Testing new Programming Model Indexing
vrdmr Aug 8, 2021
a5f41be
Fix
vrdmr Aug 9, 2021
719cca1
Raw Bindings string correction and load check
vrdmr Aug 9, 2021
d3f02af
Aliasing function name
vrdmr Aug 10, 2021
5ce82f7
Merge branch 'dev' into vameru/support-prog-model-indexing
vrdmr Nov 9, 2021
92c37c9
update proto file and worker config
vrdmr Nov 15, 2021
69c6ccc
Reflecting new protobuf changes
vrdmr Nov 19, 2021
195304f
Merge remote-tracking branch 'Azure/dev' into vameru/support-prog-mod…
vrdmr Dec 3, 2021
dc6431f
Initial commit for new programming model
gavin-aguiar Feb 1, 2022
bbeeace
Casting binding direction
gavin-aguiar Feb 2, 2022
57f1528
Refactored for binding direction and datatype
gavin-aguiar Feb 3, 2022
9e55ab7
Pulling in new changes
gavin-aguiar Feb 7, 2022
6f41570
Added loader tests
gavin-aguiar Feb 7, 2022
6304eb9
Added new tests
gavin-aguiar Feb 11, 2022
d1dd89f
Added unit and e2e test
gavin-aguiar Feb 16, 2022
7712112
Minor fixes/Fixed flake8
gavin-aguiar Feb 16, 2022
0786cae
Refactoring and addressing comments
gavin-aguiar Feb 23, 2022
34def6a
Added fallback to legacy
gavin-aguiar Feb 25, 2022
8fdd764
Updated protobuf file
gavin-aguiar Mar 1, 2022
3ca6c06
Minor fixes and refactoring
gavin-aguiar Mar 3, 2022
09ed69f
Added e2e and unit tests
gavin-aguiar Mar 14, 2022
5f6748f
Merging with current dev
gavin-aguiar Mar 14, 2022
d6f569b
Updated worker.config.json
gavin-aguiar Mar 16, 2022
e645c71
Fixed worker.config.json
gavin-aguiar Mar 17, 2022
053b46f
Unit Test fixes
gavin-aguiar Mar 17, 2022
cddb51b
unit test fixes
gavin-aguiar Mar 18, 2022
5ca0b39
Merge branch 'dev' of github.com:Azure/azure-functions-python-worker …
gavin-aguiar Mar 18, 2022
58698b9
Fixed blob tests
gavin-aguiar Mar 18, 2022
70b889b
Service bus test fix
gavin-aguiar Mar 18, 2022
bd6fa98
Added dispatcher unit tests
gavin-aguiar Mar 21, 2022
73eb469
Merge branch 'dev' of github.com:Azure/azure-functions-python-worker …
gavin-aguiar Mar 21, 2022
852160b
Fixed 3.10 tests
gavin-aguiar Apr 4, 2022
c30d409
unit test fixes
gavin-aguiar Apr 4, 2022
7336381
Unit Test fixes
gavin-aguiar Apr 4, 2022
f74f010
Reverting previous commit
gavin-aguiar Apr 12, 2022
1217794
Resolving conflicts
gavin-aguiar Apr 12, 2022
aafd082
Addressing comments
gavin-aguiar Apr 25, 2022
bca60c9
Change auth_level typo
gavin-aguiar Apr 25, 2022
dd711f0
Merge branch 'dev' of github.com:Azure/azure-functions-python-worker …
gavin-aguiar Apr 25, 2022
1563d7e
Removing auth level from http functions
gavin-aguiar Apr 25, 2022
42bb311
Removed authlevel from queue functions
gavin-aguiar Apr 26, 2022
30a9622
Updated on_queue_change to queue trigger
gavin-aguiar Apr 26, 2022
c66d06f
Fixed flake8 tests
gavin-aguiar Apr 26, 2022
db402b5
Updated on_blob_change to blob_trigger
gavin-aguiar Apr 26, 2022
776e054
Fixed flake8 tests
gavin-aguiar Apr 26, 2022
9d25cee
Updated trigger names for tests
gavin-aguiar Apr 27, 2022
9c98261
Addressed comments
gavin-aguiar Apr 27, 2022
55845e5
Merge branch 'dev' into gaaguiar/new-prg-model
gavin-aguiar Apr 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions azure_functions_worker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,8 @@

# External Site URLs
MODULE_NOT_FOUND_TS_URL = "https://aka.ms/functions-modulenotfound"

# new programming model script file name
SCRIPT_FILE_NAME = "function_app.py"

PYTHON_LANGUAGE_RUNTIME = "python"
128 changes: 88 additions & 40 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,13 @@

import grpc

from . import bindings
from . import constants
from . import functions
from . import loader
from . import protos
from . import bindings, constants, functions, loader, protos
from .bindings.shared_memory_data_transfer import SharedMemoryManager
from .constants import (PYTHON_THREADPOOL_THREAD_COUNT,
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT,
PYTHON_THREADPOOL_THREAD_COUNT_MAX_37,
PYTHON_THREADPOOL_THREAD_COUNT_MIN,
PYTHON_ENABLE_DEBUG_LOGGING)
PYTHON_ENABLE_DEBUG_LOGGING, SCRIPT_FILE_NAME)
from .extension import ExtensionManager
from .logging import disable_console_logging, enable_console_logging
from .logging import enable_debug_logging_recommendation
Expand All @@ -40,7 +36,6 @@
from .utils.wrappers import disable_feature_by
from .version import VERSION


_TRUE = "true"

"""In Python 3.6, the current_task method was in the Task class, but got moved
Expand Down Expand Up @@ -260,13 +255,13 @@ async def _dispatch_grpc_request(self, request):
resp = await request_handler(request)
self._grpc_resp_queue.put_nowait(resp)

async def _handle__worker_init_request(self, req):
async def _handle__worker_init_request(self, request):
logger.info('Received WorkerInitRequest, '
'python version %s, worker version %s, request ID %s',
sys.version, VERSION, self.request_id)
enable_debug_logging_recommendation()

worker_init_request = req.worker_init_request
worker_init_request = request.worker_init_request
host_capabilities = worker_init_request.capabilities
if constants.FUNCTION_DATA_CACHE in host_capabilities:
val = host_capabilities[constants.FUNCTION_DATA_CACHE]
Expand Down Expand Up @@ -294,42 +289,93 @@ async def _handle__worker_init_request(self, req):
result=protos.StatusResult(
status=protos.StatusResult.Success)))

async def _handle__worker_status_request(self, req):
async def _handle__worker_status_request(self, request):
# Logging is not necessary in this request since the response is used
# for host to judge scale decisions of out-of-proc languages.
# Having log here will reduce the responsiveness of the worker.
return protos.StreamingMessage(
request_id=req.request_id,
request_id=request.request_id,
worker_status_response=protos.WorkerStatusResponse())

async def _handle__function_load_request(self, req):
func_request = req.function_load_request
async def _handle__functions_metadata_request(self, request):
metadata_request = request.functions_metadata_request
directory = metadata_request.function_app_directory
function_path = os.path.join(directory, SCRIPT_FILE_NAME)

if not os.path.exists(function_path):
# Fallback to legacy model
logger.info(f"{SCRIPT_FILE_NAME} does not exist. "
"Switching to host indexing.")
return protos.StreamingMessage(
request_id=request.request_id,
function_metadata_response=protos.FunctionMetadataResponse(
use_default_metadata_indexing=True,
result=protos.StatusResult(
status=protos.StatusResult.Success)))

try:
fx_metadata_results = []
indexed_functions = loader.index_function_app(function_path)
if indexed_functions:
indexed_function_logs: List[str] = []
for func in indexed_functions:
function_log = \
f"Function Name: {func.get_function_name()} " \
"Function Binding: " \
f"{[binding.name for binding in func.get_bindings()]}"
indexed_function_logs.append(function_log)

logger.info(
f'Successfully processed FunctionMetadataRequest for '
f'functions: {" ".join(indexed_function_logs)}')

fx_metadata_results = loader.process_indexed_function(
self._functions,
indexed_functions)
else:
logger.warning("No functions indexed. Please refer to the "
"documentation.")

return protos.StreamingMessage(
request_id=request.request_id,
function_metadata_response=protos.FunctionMetadataResponse(
function_metadata_results=fx_metadata_results,
result=protos.StatusResult(
status=protos.StatusResult.Success)))

except Exception as ex:
return protos.StreamingMessage(
request_id=self.request_id,
function_metadata_response=protos.FunctionMetadataResponse(
result=protos.StatusResult(
status=protos.StatusResult.Failure,
exception=self._serialize_exception(ex))))

async def _handle__function_load_request(self, request):
func_request = request.function_load_request
function_id = func_request.function_id
function_name = func_request.metadata.name

logger.info(f'Received FunctionLoadRequest, '
f'request ID: {self.request_id}, '
f'function ID: {function_id}'
f'function Name: {function_name}')
try:
func = loader.load_function(
func_request.metadata.name,
func_request.metadata.directory,
func_request.metadata.script_file,
func_request.metadata.entry_point)

self._functions.add_function(
function_id, func, func_request.metadata)

ExtensionManager.function_load_extension(
function_name,
func_request.metadata.directory
)
if not self._functions.get_function(function_id):
func = loader.load_function(
func_request.metadata.name,
func_request.metadata.directory,
func_request.metadata.script_file,
func_request.metadata.entry_point)

self._functions.add_function(
function_id, func, func_request.metadata)

ExtensionManager.function_load_extension(
function_name,
func_request.metadata.directory
)

logger.info('Successfully processed FunctionLoadRequest, '
f'request ID: {self.request_id}, '
f'function ID: {function_id},'
f'function Name: {function_name}')
logger.info('Successfully processed FunctionLoadRequest, '
f'request ID: {self.request_id}, '
f'function ID: {function_id},'
f'function Name: {function_name}')

return protos.StreamingMessage(
request_id=self.request_id,
Expand All @@ -347,8 +393,8 @@ async def _handle__function_load_request(self, req):
status=protos.StatusResult.Failure,
exception=self._serialize_exception(ex))))

async def _handle__invocation_request(self, req):
invoc_request = req.invocation_request
async def _handle__invocation_request(self, request):
invoc_request = request.invocation_request
invocation_id = invoc_request.invocation_id
function_id = invoc_request.function_id

Expand All @@ -361,6 +407,7 @@ async def _handle__invocation_request(self, req):
try:
fi: functions.FunctionInfo = self._functions.get_function(
function_id)
assert fi is not None

function_invocation_logs: List[str] = [
'Received FunctionInvocationRequest',
Expand Down Expand Up @@ -456,15 +503,16 @@ async def _handle__invocation_request(self, req):
status=protos.StatusResult.Failure,
exception=self._serialize_exception(ex))))

async def _handle__function_environment_reload_request(self, req):
async def _handle__function_environment_reload_request(self, request):
"""Only runs on Linux Consumption placeholder specialization.
"""
try:
logger.info('Received FunctionEnvironmentReloadRequest, '
'request ID: %s', self.request_id)
enable_debug_logging_recommendation()

func_env_reload_request = req.function_environment_reload_request
func_env_reload_request = \
request.function_environment_reload_request

# Import before clearing path cache so that the default
# azure.functions modules is available in sys.modules for
Expand Down Expand Up @@ -523,7 +571,7 @@ async def _handle__function_environment_reload_request(self, req):
request_id=self.request_id,
function_environment_reload_response=failure_response)

async def _handle__close_shared_memory_resources_request(self, req):
async def _handle__close_shared_memory_resources_request(self, request):
"""
Frees any memory maps that were produced as output for a given
invocation.
Expand All @@ -534,7 +582,7 @@ async def _handle__close_shared_memory_resources_request(self, req):
If the cache is not enabled, the worker should free the resources as at
this point the host has read the memory maps and does not need them.
"""
close_request = req.close_shared_memory_resources_request
close_request = request.close_shared_memory_resources_request
map_names = close_request.map_names
# Assign default value of False to all result values.
# If we are successfully able to close a memory map, its result will be
Expand Down
18 changes: 9 additions & 9 deletions azure_functions_worker/extension.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import functools
import logging
from types import ModuleType
from typing import Any, Callable, List, Optional
import logging
import functools
from .utils.common import (
is_python_version,
get_sdk_from_sys_path,
get_sdk_version
)
from .utils.wrappers import enable_feature_by

from .constants import (
PYTHON_ISOLATE_WORKER_DEPENDENCIES,
PYTHON_ENABLE_WORKER_EXTENSIONS,
PYTHON_ENABLE_WORKER_EXTENSIONS_DEFAULT,
PYTHON_ENABLE_WORKER_EXTENSIONS_DEFAULT_39
)
from .logging import logger, SYSTEM_LOG_PREFIX

from .utils.common import (
is_python_version,
get_sdk_from_sys_path,
get_sdk_version
)
from .utils.wrappers import enable_feature_by

# Extension Hooks
FUNC_EXT_POST_FUNCTION_LOAD = "post_function_load"
Expand Down
Loading