Skip to content

Handling multi worker scenerio for pystein #1142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 60 additions & 44 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,30 +312,7 @@ async def _handle__functions_metadata_request(self, request):
status=protos.StatusResult.Success)))

try:
indexed_functions = loader.index_function_app(function_path)
logger.info('Indexed function app and found %s functions',
len(indexed_functions))

fx_metadata_results = []
if indexed_functions:
indexed_function_logs: List[str] = []
for func in indexed_functions:
function_log = "Function Name: {}, Function Binding: {}" \
.format(func.get_function_name(),
[(binding.type, binding.name) for binding in
func.get_bindings()])
indexed_function_logs.append(function_log)

logger.info(
'Successfully processed FunctionMetadataRequest for '
'functions: %s', " ".join(indexed_function_logs))

fx_metadata_results = loader.process_indexed_function(
self._functions,
indexed_functions)
else:
logger.warning("No functions indexed. Please refer to "
"aka.ms/pythonprogrammingmodel for more info.")
fx_metadata_results = self.index_functions(function_path)

return protos.StreamingMessage(
request_id=request.request_id,
Expand All @@ -355,33 +332,48 @@ async def _handle__functions_metadata_request(self, request):
async def _handle__function_load_request(self, request):
func_request = request.function_load_request
function_id = func_request.function_id
function_name = func_request.metadata.name
function_metadata = func_request.metadata
function_name = function_metadata.name
function_path = os.path.join(function_metadata.directory,
SCRIPT_FILE_NAME)

logger.info(
'Received WorkerLoadRequest, request ID %s, function_id: %s,'
'function_name: %s,', self.request_id, function_id, function_name)

try:
if not self._functions.get_function(function_id):
func = loader.load_function(
func_request.metadata.name,
func_request.metadata.directory,
func_request.metadata.script_file,
func_request.metadata.entry_point)

self._functions.add_function(
function_id, func, func_request.metadata)

ExtensionManager.function_load_extension(
function_name,
func_request.metadata.directory
)

logger.info('Successfully processed FunctionLoadRequest, '
'request ID: %s, '
'function ID: %s,'
'function Name: %s', self.request_id, function_id,
function_name)
if function_metadata.properties.get("worker_indexed", False) \
or os.path.exists(function_path):
# This is for the second worker and above where the worker
# indexing is enabled and load request is called without
# calling the metadata request. In this case we index the
# function and update the workers registry
logger.info(f"Indexing function {function_name} in the "
f"load request")
_ = self.index_functions(function_path)
else:
# legacy function
func = loader.load_function(
func_request.metadata.name,
func_request.metadata.directory,
func_request.metadata.script_file,
func_request.metadata.entry_point)

self._functions.add_function(
function_id, func, func_request.metadata)

ExtensionManager.function_load_extension(
function_name,
func_request.metadata.directory
)

logger.info('Successfully processed FunctionLoadRequest, '
'request ID: %s, '
'function ID: %s,'
'function Name: %s', self.request_id,
function_id,
function_name)

return protos.StreamingMessage(
request_id=self.request_id,
Expand Down Expand Up @@ -577,6 +569,30 @@ async def _handle__function_environment_reload_request(self, request):
request_id=self.request_id,
function_environment_reload_response=failure_response)

def index_functions(self, function_path: str):
indexed_functions = loader.index_function_app(function_path)
logger.info('Indexed function app and found %s functions',
len(indexed_functions))

if indexed_functions:
indexed_function_logs: List[str] = []
for func in indexed_functions:
function_log = "Function Name: {}, Function Binding: {}" \
.format(func.get_function_name(),
[(binding.type, binding.name) for binding in
func.get_bindings()])
indexed_function_logs.append(function_log)

logger.info(
'Successfully processed FunctionMetadataRequest for '
'functions: %s', " ".join(indexed_function_logs))

fx_metadata_results = loader.process_indexed_function(
self._functions,
indexed_functions)

return fx_metadata_results

async def _handle__close_shared_memory_resources_request(self, request):
"""
Frees any memory maps that were produced as output for a given
Expand Down
8 changes: 6 additions & 2 deletions azure_functions_worker/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import operator
import pathlib
import typing
import uuid

from . import bindings as bindings_utils
from . import protos
Expand All @@ -21,6 +22,7 @@ class FunctionInfo(typing.NamedTuple):

name: str
directory: str
function_id: str
requires_context: bool
is_async: bool
has_return: bool
Expand Down Expand Up @@ -311,6 +313,7 @@ def add_func_to_registry_and_return_funcinfo(self, function,
func=function,
name=function_name,
directory=directory,
function_id=function_id,
requires_context=requires_context,
is_async=inspect.iscoroutinefunction(function),
has_return=has_explicit_return or has_implicit_return,
Expand Down Expand Up @@ -371,11 +374,12 @@ def add_function(self, function_id: str,
input_types,
output_types, return_type)

def add_indexed_function(self, function_id: str,
function):
def add_indexed_function(self, function):
func = function.get_user_function()
func_name = function.get_function_name()
func_type = function.http_type
function_id = str(uuid.uuid5(namespace=uuid.NAMESPACE_OID,
name=func_name))
return_binding_name: typing.Optional[str] = None
has_explicit_return = False
has_implicit_return = False
Expand Down
8 changes: 3 additions & 5 deletions azure_functions_worker/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import os.path
import pathlib
import sys
import uuid
from os import PathLike, fspath
from typing import Optional, Dict

Expand Down Expand Up @@ -65,24 +64,23 @@ def process_indexed_function(functions_registry: functions.Registry,
indexed_functions):
fx_metadata_results = []
for indexed_function in indexed_functions:
function_id = str(uuid.uuid4())
function_info = functions_registry.add_indexed_function(
function_id,
function=indexed_function)

binding_protos = build_binding_protos(indexed_function)

function_metadata = protos.RpcFunctionMetadata(
name=function_info.name,
function_id=function_id,
function_id=function_info.function_id,
managed_dependency_enabled=False, # only enabled for PowerShell
directory=function_info.directory,
script_file=indexed_function.function_script_file,
entry_point=function_info.name,
is_proxy=False, # not supported in V4
language=PYTHON_LANGUAGE_RUNTIME,
bindings=binding_protos,
raw_bindings=indexed_function.get_raw_bindings())
raw_bindings=indexed_function.get_raw_bindings(),
properties={"worker_indexed": "True"})

fx_metadata_results.append(function_metadata)

Expand Down
Loading