Skip to content

Commit 825f0b0

Browse files
authored
Add PYTHON_THREADPOOL_THREAD_COUNT app setting (#744)
* Add PYTHON_THREADPOOL_THREAD_COUNT app setting * Add unittest cases to check if the worker count is set * Add validator in get_app_setting * Fix nits
1 parent 8744c89 commit 825f0b0

File tree

7 files changed

+323
-23
lines changed

7 files changed

+323
-23
lines changed

azure_functions_worker/constants.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,14 @@
1313
# Debug Flags
1414
PYAZURE_WEBHOST_DEBUG = "PYAZURE_WEBHOST_DEBUG"
1515

16-
# Feature Flags (app settings)
16+
# Python Specific Feature Flags and App Settings
1717
PYTHON_ROLLBACK_CWD_PATH = "PYTHON_ROLLBACK_CWD_PATH"
18+
PYTHON_THREADPOOL_THREAD_COUNT = "PYTHON_THREADPOOL_THREAD_COUNT"
19+
20+
# Setting Defaults
21+
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT = 1
22+
PYTHON_THREADPOOL_THREAD_COUNT_MIN = 1
23+
PYTHON_THREADPOOL_THREAD_COUNT_MAX = 32
1824

1925
# External Site URLs
2026
MODULE_NOT_FOUND_TS_URL = "https://aka.ms/functions-modulenotfound"

azure_functions_worker/dispatcher.py

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,16 @@
2323
from . import protos
2424
from . import constants
2525

26-
from .constants import CONSOLE_LOG_PREFIX
26+
from .constants import (
27+
CONSOLE_LOG_PREFIX,
28+
PYTHON_THREADPOOL_THREAD_COUNT,
29+
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT,
30+
PYTHON_THREADPOOL_THREAD_COUNT_MIN,
31+
PYTHON_THREADPOOL_THREAD_COUNT_MAX
32+
)
2733
from .logging import error_logger, logger, is_system_log_category
2834
from .logging import enable_console_logging, disable_console_logging
35+
from .utils.common import get_app_setting
2936
from .utils.tracing import marshall_exception_trace
3037
from .utils.wrappers import disable_feature_by
3138
from asyncio import BaseEventLoop
@@ -62,24 +69,19 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
6269

6370
self._old_task_factory = None
6471

65-
# A thread-pool for synchronous function calls. We limit
66-
# the number of threads to 1 so that one Python worker can
67-
# only run one synchronous function in parallel. This is
68-
# because synchronous code in Python is rarely designed with
69-
# concurrency in mind, so we don't want to allow users to
70-
# have races in their synchronous functions. Moreover,
71-
# because of the GIL in CPython, it rarely makes sense to
72-
# use threads (unless the code is IO bound, but we have
73-
# async support for that.)
74-
self._sync_call_tp = concurrent.futures.ThreadPoolExecutor(
75-
max_workers=1)
76-
77-
self._grpc_connect_timeout = grpc_connect_timeout
72+
# We allow the customer to change synchronous thread pool count by
73+
# PYTHON_THREADPOOL_THREAD_COUNT app setting. The default value is 1.
74+
self._sync_tp_max_workers: int = self._get_sync_tp_max_workers()
75+
self._sync_call_tp: concurrent.futures.Executor = (
76+
concurrent.futures.ThreadPoolExecutor(
77+
max_workers=self._sync_tp_max_workers))
78+
79+
self._grpc_connect_timeout: float = grpc_connect_timeout
7880
# This is set to -1 by default to remove the limitation on msg size
79-
self._grpc_max_msg_len = grpc_max_msg_len
81+
self._grpc_max_msg_len: int = grpc_max_msg_len
8082
self._grpc_resp_queue: queue.Queue = queue.Queue()
8183
self._grpc_connected_fut = loop.create_future()
82-
self._grpc_thread = threading.Thread(
84+
self._grpc_thread: threading.Thread = threading.Thread(
8385
name='grpc-thread', target=self.__poll_grpc)
8486

8587
@classmethod
@@ -89,7 +91,9 @@ async def connect(cls, host: str, port: int, worker_id: str,
8991
disp = cls(loop, host, port, worker_id, request_id, connect_timeout)
9092
disp._grpc_thread.start()
9193
await disp._grpc_connected_fut
92-
logger.info('Successfully opened gRPC channel to %s:%s', host, port)
94+
logger.info('Successfully opened gRPC channel to %s:%s '
95+
'with sync threadpool max workers set to %s',
96+
host, port, disp._sync_tp_max_workers)
9397
return disp
9498

9599
async def dispatch_forever(self):
@@ -130,13 +134,13 @@ async def dispatch_forever(self):
130134
try:
131135
await forever
132136
finally:
133-
logger.warn('Detaching gRPC logging due to exception.')
137+
logger.warning('Detaching gRPC logging due to exception.')
134138
logging_handler.flush()
135139
root_logger.removeHandler(logging_handler)
136140

137141
# Reenable console logging when there's an exception
138142
enable_console_logging()
139-
logger.warn('Switched to console logging due to exception.')
143+
logger.warning('Switched to console logging due to exception.')
140144
finally:
141145
DispatcherMeta.__current_dispatcher__ = None
142146

@@ -210,8 +214,8 @@ def _serialize_exception(exc: Exception):
210214
try:
211215
message = f'{type(exc).__name__}: {exc}'
212216
except Exception:
213-
message = (f'Unhandled exception in function. '
214-
f'Could not serialize original exception message.')
217+
message = ('Unhandled exception in function. '
218+
'Could not serialize original exception message.')
215219

216220
try:
217221
stack_trace = marshall_exception_trace(exc)
@@ -475,7 +479,29 @@ def _change_cwd(self, new_cwd: str):
475479
os.chdir(new_cwd)
476480
logger.info('Changing current working directory to %s', new_cwd)
477481
else:
478-
logger.warn('Directory %s is not found when reloading', new_cwd)
482+
logger.warning('Directory %s is not found when reloading', new_cwd)
483+
484+
def _get_sync_tp_max_workers(self) -> int:
485+
def tp_max_workers_validator(value: str) -> bool:
486+
try:
487+
int_value = int(value)
488+
except ValueError:
489+
logger.warning(f'{PYTHON_THREADPOOL_THREAD_COUNT} must be an '
490+
'integer')
491+
return False
492+
493+
if int_value < PYTHON_THREADPOOL_THREAD_COUNT_MIN or (
494+
int_value > PYTHON_THREADPOOL_THREAD_COUNT_MAX):
495+
logger.warning(f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set '
496+
'to a value between 1 and 32')
497+
return False
498+
499+
return True
500+
501+
return int(get_app_setting(
502+
setting=PYTHON_THREADPOOL_THREAD_COUNT,
503+
default_value=f'{PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT}',
504+
validator=tp_max_workers_validator))
479505

480506
def __run_sync_func(self, invocation_id, func, params):
481507
# This helper exists because we need to access the current

azure_functions_worker/utils/common.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Copyright (c) Microsoft Corporation. All rights reserved.
22
# Licensed under the MIT License.
3+
from typing import Optional, Callable
34
import os
45

56

@@ -15,3 +16,46 @@ def is_envvar_true(env_key: str) -> bool:
1516
return False
1617

1718
return is_true_like(os.environ[env_key])
19+
20+
21+
def get_app_setting(
22+
setting: str,
23+
default_value: Optional[str] = None,
24+
validator: Optional[Callable[[str], bool]] = None
25+
) -> Optional[str]:
26+
"""Returns the application setting from environment variable.
27+
28+
Parameters
29+
----------
30+
setting: str
31+
The name of the application setting (e.g. FUNCTIONS_RUNTIME_VERSION)
32+
33+
default_value: Optional[str]
34+
The expected return value when the application setting is not found,
35+
or the app setting does not pass the validator.
36+
37+
validator: Optional[Callable[[str], bool]]
38+
A function accepts the app setting value and should return True when
39+
the app setting value is acceptable.
40+
41+
Returns
42+
-------
43+
Optional[str]
44+
A string value that is set in the application setting
45+
"""
46+
app_setting_value = os.getenv(setting)
47+
48+
# If an app setting is not configured, we return the default value
49+
if app_setting_value is None:
50+
return default_value
51+
52+
# If there's no validator, we should return the app setting value directly
53+
if validator is None:
54+
return app_setting_value
55+
56+
# If the app setting is set with a validator,
57+
# On True, should return the app setting value
58+
# On False, should return the default value
59+
if validator(app_setting_value):
60+
return app_setting_value
61+
return default_value
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
import json
4+
import azure.functions as func
5+
6+
7+
def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
8+
result = {
9+
'function_directory': context.function_directory,
10+
'function_name': context.function_name
11+
}
12+
return func.HttpResponse(body=json.dumps(result),
13+
mimetype='application/json')
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"scriptFile": "__init__.py",
3+
"bindings": [
4+
{
5+
"type": "httpTrigger",
6+
"direction": "in",
7+
"name": "req"
8+
},
9+
{
10+
"type": "http",
11+
"direction": "out",
12+
"name": "$return"
13+
}
14+
]
15+
}

tests/unittests/test_dispatcher.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
import os
4+
from unittest.mock import patch
5+
from azure_functions_worker import protos
6+
from azure_functions_worker import testutils
7+
from azure_functions_worker.constants import PYTHON_THREADPOOL_THREAD_COUNT
8+
9+
10+
class TestDispatcher(testutils.AsyncTestCase):
11+
dispatcher_funcs_dir = testutils.UNIT_TESTS_FOLDER / 'dispatcher_functions'
12+
13+
def setUp(self):
14+
self._pre_env = dict(os.environ)
15+
16+
def tearDown(self):
17+
os.environ.clear()
18+
os.environ.update(self._pre_env)
19+
20+
async def test_dispatcher_sync_threadpool_default_worker(self):
21+
'''Test if the sync threadpool has maximum worker count set to 1
22+
by default
23+
'''
24+
ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir)
25+
26+
async with ctrl as host:
27+
await self._check_if_function_is_ok(host)
28+
29+
# Ensure the dispatcher sync threadpool count is set to 1
30+
self.assertEqual(ctrl._worker._sync_tp_max_workers, 1)
31+
32+
async def test_dispatcher_sync_threadpool_set_worker(self):
33+
'''Test if the sync threadpool maximum worker can be set
34+
'''
35+
# Configure thread pool max worker
36+
os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '5'})
37+
ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir)
38+
39+
async with ctrl as host:
40+
await self._check_if_function_is_ok(host)
41+
42+
# Ensure the dispatcher sync threadpool count is set to 1
43+
self.assertEqual(ctrl._worker._sync_tp_max_workers, 5)
44+
45+
@patch('azure_functions_worker.dispatcher.logger')
46+
async def test_dispatcher_sync_threadpool_invalid_worker_count(
47+
self,
48+
mock_logger
49+
):
50+
'''Test when sync threadpool maximum worker is set to an invalid value,
51+
the host should fallback to default value 1
52+
'''
53+
# Configure thread pool max worker to an invalid value
54+
os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: 'invalid'})
55+
ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir)
56+
57+
async with ctrl as host:
58+
await self._check_if_function_is_ok(host)
59+
60+
# Ensure the dispatcher sync threadpool should fallback to 1
61+
self.assertEqual(ctrl._worker._sync_tp_max_workers, 1)
62+
63+
mock_logger.warning.assert_any_call(
64+
f'{PYTHON_THREADPOOL_THREAD_COUNT} must be an integer')
65+
66+
@patch('azure_functions_worker.dispatcher.logger')
67+
async def test_dispatcher_sync_threadpool_below_min_setting(
68+
self,
69+
mock_logger
70+
):
71+
# Configure thread pool max worker to an invalid value
72+
os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '0'})
73+
ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir)
74+
75+
async with ctrl as host:
76+
await self._check_if_function_is_ok(host)
77+
78+
# Ensure the dispatcher sync threadpool should fallback to 1
79+
self.assertEqual(ctrl._worker._sync_tp_max_workers, 1)
80+
81+
mock_logger.warning.assert_any_call(
82+
f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value between '
83+
'1 and 32')
84+
85+
@patch('azure_functions_worker.dispatcher.logger')
86+
async def test_dispatcher_sync_threadpool_exceed_max_setting(
87+
self,
88+
mock_logger
89+
):
90+
# Configure thread pool max worker to an invalid value
91+
os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '33'})
92+
ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir)
93+
94+
async with ctrl as host:
95+
await self._check_if_function_is_ok(host)
96+
97+
# Ensure the dispatcher sync threadpool should fallback to 1
98+
self.assertEqual(ctrl._worker._sync_tp_max_workers, 1)
99+
100+
mock_logger.warning.assert_any_call(
101+
f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value between '
102+
'1 and 32')
103+
104+
async def _check_if_function_is_ok(self, host):
105+
# Ensure the function can be properly loaded
106+
func_id, load_r = await host.load_function('show_context')
107+
self.assertEqual(load_r.response.function_id, func_id)
108+
self.assertEqual(load_r.response.result.status,
109+
protos.StatusResult.Success)
110+
111+
# Ensure the function can be properly invoked
112+
invoke_id, call_r = await host.invoke_function(
113+
'show_context', [
114+
protos.ParameterBinding(
115+
name='req',
116+
data=protos.TypedData(
117+
http=protos.RpcHttp(
118+
method='GET'
119+
)
120+
)
121+
)
122+
])
123+
self.assertIsNotNone(invoke_id)
124+
self.assertEqual(call_r.response.result.status,
125+
protos.StatusResult.Success)

0 commit comments

Comments
 (0)