Skip to content

Add PYTHON_THREADPOOL_THREAD_COUNT app setting #744

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion azure_functions_worker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@
# Debug Flags
PYAZURE_WEBHOST_DEBUG = "PYAZURE_WEBHOST_DEBUG"

# Feature Flags (app settings)
# Python Specific Feature Flags and App Settings
PYTHON_ROLLBACK_CWD_PATH = "PYTHON_ROLLBACK_CWD_PATH"
PYTHON_THREADPOOL_THREAD_COUNT = "PYTHON_THREADPOOL_THREAD_COUNT"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stefanushinardi - Could you please check if you want to change the name?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - putting my reasoning here for future reference: it makes sense to put the PYTHON_* prefix for this as without it users might get confused and use this config for other languages


# Setting Defaults
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT = 1
PYTHON_THREADPOOL_THREAD_COUNT_MIN = 1
PYTHON_THREADPOOL_THREAD_COUNT_MAX = 32

# External Site URLs
MODULE_NOT_FOUND_TS_URL = "https://aka.ms/functions-modulenotfound"
70 changes: 48 additions & 22 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@
from . import protos
from . import constants

from .constants import CONSOLE_LOG_PREFIX
from .constants import (
CONSOLE_LOG_PREFIX,
PYTHON_THREADPOOL_THREAD_COUNT,
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT,
PYTHON_THREADPOOL_THREAD_COUNT_MIN,
PYTHON_THREADPOOL_THREAD_COUNT_MAX
)
from .logging import error_logger, logger, is_system_log_category
from .logging import enable_console_logging, disable_console_logging
from .utils.common import get_app_setting
from .utils.tracing import marshall_exception_trace
from .utils.wrappers import disable_feature_by
from asyncio import BaseEventLoop
Expand Down Expand Up @@ -62,24 +69,19 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,

self._old_task_factory = None

# A thread-pool for synchronous function calls. We limit
# the number of threads to 1 so that one Python worker can
# only run one synchronous function in parallel. This is
# because synchronous code in Python is rarely designed with
# concurrency in mind, so we don't want to allow users to
# have races in their synchronous functions. Moreover,
# because of the GIL in CPython, it rarely makes sense to
# use threads (unless the code is IO bound, but we have
# async support for that.)
self._sync_call_tp = concurrent.futures.ThreadPoolExecutor(
max_workers=1)

self._grpc_connect_timeout = grpc_connect_timeout
# We allow the customer to change synchronous thread pool count by
# PYTHON_THREADPOOL_THREAD_COUNT app setting. The default value is 1.
self._sync_tp_max_workers: int = self._get_sync_tp_max_workers()
self._sync_call_tp: concurrent.futures.Executor = (
concurrent.futures.ThreadPoolExecutor(
max_workers=self._sync_tp_max_workers))

self._grpc_connect_timeout: float = grpc_connect_timeout
# This is set to -1 by default to remove the limitation on msg size
self._grpc_max_msg_len = grpc_max_msg_len
self._grpc_max_msg_len: int = grpc_max_msg_len
self._grpc_resp_queue: queue.Queue = queue.Queue()
self._grpc_connected_fut = loop.create_future()
self._grpc_thread = threading.Thread(
self._grpc_thread: threading.Thread = threading.Thread(
name='grpc-thread', target=self.__poll_grpc)

@classmethod
Expand All @@ -89,7 +91,9 @@ async def connect(cls, host: str, port: int, worker_id: str,
disp = cls(loop, host, port, worker_id, request_id, connect_timeout)
disp._grpc_thread.start()
await disp._grpc_connected_fut
logger.info('Successfully opened gRPC channel to %s:%s', host, port)
logger.info('Successfully opened gRPC channel to %s:%s '
'with sync threadpool max workers set to %s',
host, port, disp._sync_tp_max_workers)
return disp

async def dispatch_forever(self):
Expand Down Expand Up @@ -130,13 +134,13 @@ async def dispatch_forever(self):
try:
await forever
finally:
logger.warn('Detaching gRPC logging due to exception.')
logger.warning('Detaching gRPC logging due to exception.')
logging_handler.flush()
root_logger.removeHandler(logging_handler)

# Reenable console logging when there's an exception
enable_console_logging()
logger.warn('Switched to console logging due to exception.')
logger.warning('Switched to console logging due to exception.')
finally:
DispatcherMeta.__current_dispatcher__ = None

Expand Down Expand Up @@ -210,8 +214,8 @@ def _serialize_exception(exc: Exception):
try:
message = f'{type(exc).__name__}: {exc}'
except Exception:
message = (f'Unhandled exception in function. '
f'Could not serialize original exception message.')
message = ('Unhandled exception in function. '
'Could not serialize original exception message.')

try:
stack_trace = marshall_exception_trace(exc)
Expand Down Expand Up @@ -475,7 +479,29 @@ def _change_cwd(self, new_cwd: str):
os.chdir(new_cwd)
logger.info('Changing current working directory to %s', new_cwd)
else:
logger.warn('Directory %s is not found when reloading', new_cwd)
logger.warning('Directory %s is not found when reloading', new_cwd)

def _get_sync_tp_max_workers(self) -> int:
def tp_max_workers_validator(value: str) -> bool:
try:
int_value = int(value)
except ValueError:
logger.warning(f'{PYTHON_THREADPOOL_THREAD_COUNT} must be an '
'integer')
return False

if int_value < PYTHON_THREADPOOL_THREAD_COUNT_MIN or (
int_value > PYTHON_THREADPOOL_THREAD_COUNT_MAX):
logger.warning(f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set '
'to a value between 1 and 32')
return False

return True

return int(get_app_setting(
setting=PYTHON_THREADPOOL_THREAD_COUNT,
default_value=f'{PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT}',
validator=tp_max_workers_validator))

def __run_sync_func(self, invocation_id, func, params):
# This helper exists because we need to access the current
Expand Down
44 changes: 44 additions & 0 deletions azure_functions_worker/utils/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from typing import Optional, Callable
import os


Expand All @@ -15,3 +16,46 @@ def is_envvar_true(env_key: str) -> bool:
return False

return is_true_like(os.environ[env_key])


def get_app_setting(
setting: str,
default_value: Optional[str] = None,
validator: Optional[Callable[[str], bool]] = None
) -> Optional[str]:
"""Returns the application setting from environment variable.

Parameters
----------
setting: str
The name of the application setting (e.g. FUNCTIONS_RUNTIME_VERSION)

default_value: Optional[str]
The expected return value when the application setting is not found,
or the app setting does not pass the validator.

validator: Optional[Callable[[str], bool]]
A function accepts the app setting value and should return True when
the app setting value is acceptable.

Returns
-------
Optional[str]
A string value that is set in the application setting
"""
app_setting_value = os.getenv(setting)

# If an app setting is not configured, we return the default value
if app_setting_value is None:
return default_value

# If there's no validator, we should return the app setting value directly
if validator is None:
return app_setting_value

# If the app setting is set with a validator,
# On True, should return the app setting value
# On False, should return the default value
if validator(app_setting_value):
return app_setting_value
return default_value
13 changes: 13 additions & 0 deletions tests/unittests/dispatcher_functions/show_context/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import json
import azure.functions as func


def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
result = {
'function_directory': context.function_directory,
'function_name': context.function_name
}
return func.HttpResponse(body=json.dumps(result),
mimetype='application/json')
15 changes: 15 additions & 0 deletions tests/unittests/dispatcher_functions/show_context/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"name": "req"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
125 changes: 125 additions & 0 deletions tests/unittests/test_dispatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import os
from unittest.mock import patch
from azure_functions_worker import protos
from azure_functions_worker import testutils
from azure_functions_worker.constants import PYTHON_THREADPOOL_THREAD_COUNT


class TestDispatcher(testutils.AsyncTestCase):
dispatcher_funcs_dir = testutils.UNIT_TESTS_FOLDER / 'dispatcher_functions'

def setUp(self):
self._pre_env = dict(os.environ)

def tearDown(self):
os.environ.clear()
os.environ.update(self._pre_env)

async def test_dispatcher_sync_threadpool_default_worker(self):
'''Test if the sync threadpool has maximum worker count set to 1
by default
'''
ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir)

async with ctrl as host:
await self._check_if_function_is_ok(host)

# Ensure the dispatcher sync threadpool count is set to 1
self.assertEqual(ctrl._worker._sync_tp_max_workers, 1)

async def test_dispatcher_sync_threadpool_set_worker(self):
'''Test if the sync threadpool maximum worker can be set
'''
# Configure thread pool max worker
os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '5'})
ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir)

async with ctrl as host:
await self._check_if_function_is_ok(host)

# Ensure the dispatcher sync threadpool count is set to 1
self.assertEqual(ctrl._worker._sync_tp_max_workers, 5)

@patch('azure_functions_worker.dispatcher.logger')
async def test_dispatcher_sync_threadpool_invalid_worker_count(
self,
mock_logger
):
'''Test when sync threadpool maximum worker is set to an invalid value,
the host should fallback to default value 1
'''
# Configure thread pool max worker to an invalid value
os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: 'invalid'})
ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir)

async with ctrl as host:
await self._check_if_function_is_ok(host)

# Ensure the dispatcher sync threadpool should fallback to 1
self.assertEqual(ctrl._worker._sync_tp_max_workers, 1)

mock_logger.warning.assert_any_call(
f'{PYTHON_THREADPOOL_THREAD_COUNT} must be an integer')

@patch('azure_functions_worker.dispatcher.logger')
async def test_dispatcher_sync_threadpool_below_min_setting(
self,
mock_logger
):
# Configure thread pool max worker to an invalid value
os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '0'})
ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir)

async with ctrl as host:
await self._check_if_function_is_ok(host)

# Ensure the dispatcher sync threadpool should fallback to 1
self.assertEqual(ctrl._worker._sync_tp_max_workers, 1)

mock_logger.warning.assert_any_call(
f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value between '
'1 and 32')

@patch('azure_functions_worker.dispatcher.logger')
async def test_dispatcher_sync_threadpool_exceed_max_setting(
self,
mock_logger
):
# Configure thread pool max worker to an invalid value
os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '33'})
ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir)

async with ctrl as host:
await self._check_if_function_is_ok(host)

# Ensure the dispatcher sync threadpool should fallback to 1
self.assertEqual(ctrl._worker._sync_tp_max_workers, 1)

mock_logger.warning.assert_any_call(
f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value between '
'1 and 32')

async def _check_if_function_is_ok(self, host):
# Ensure the function can be properly loaded
func_id, load_r = await host.load_function('show_context')
self.assertEqual(load_r.response.function_id, func_id)
self.assertEqual(load_r.response.result.status,
protos.StatusResult.Success)

# Ensure the function can be properly invoked
invoke_id, call_r = await host.invoke_function(
'show_context', [
protos.ParameterBinding(
name='req',
data=protos.TypedData(
http=protos.RpcHttp(
method='GET'
)
)
)
])
self.assertIsNotNone(invoke_id)
self.assertEqual(call_r.response.result.status,
protos.StatusResult.Success)
Loading