Skip to content

Commit c93e44f

Browse files
authored
Merging dev into master (post 1.1.6) (#750)
* Add PYTHON_THREADPOOL_THREAD_COUNT app setting (#744) * Adding support for debug logs in executed functions. (#745) * Bumping up the version to 1.1.6 (#748)
1 parent 94e18e3 commit c93e44f

File tree

21 files changed

+563
-44
lines changed

21 files changed

+563
-44
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ celerybeat-schedule
8484

8585
# virtualenv (.venv/.venv36/.venv37/.venv38)
8686
.venv*
87-
venv/
87+
venv*/
8888
ENV/
8989
py3env/
9090

azure_functions_worker/constants.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,14 @@
1313
# Debug Flags
1414
PYAZURE_WEBHOST_DEBUG = "PYAZURE_WEBHOST_DEBUG"
1515

16-
# Feature Flags (app settings)
16+
# Python Specific Feature Flags and App Settings
1717
PYTHON_ROLLBACK_CWD_PATH = "PYTHON_ROLLBACK_CWD_PATH"
18+
PYTHON_THREADPOOL_THREAD_COUNT = "PYTHON_THREADPOOL_THREAD_COUNT"
19+
20+
# Setting Defaults
21+
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT = 1
22+
PYTHON_THREADPOOL_THREAD_COUNT_MIN = 1
23+
PYTHON_THREADPOOL_THREAD_COUNT_MAX = 32
1824

1925
# External Site URLs
2026
MODULE_NOT_FOUND_TS_URL = "https://aka.ms/functions-modulenotfound"

azure_functions_worker/dispatcher.py

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,16 @@
2323
from . import protos
2424
from . import constants
2525

26-
from .constants import CONSOLE_LOG_PREFIX
26+
from .constants import (
27+
CONSOLE_LOG_PREFIX,
28+
PYTHON_THREADPOOL_THREAD_COUNT,
29+
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT,
30+
PYTHON_THREADPOOL_THREAD_COUNT_MIN,
31+
PYTHON_THREADPOOL_THREAD_COUNT_MAX
32+
)
2733
from .logging import error_logger, logger, is_system_log_category
2834
from .logging import enable_console_logging, disable_console_logging
35+
from .utils.common import get_app_setting
2936
from .utils.tracing import marshall_exception_trace
3037
from .utils.wrappers import disable_feature_by
3138
from asyncio import BaseEventLoop
@@ -62,24 +69,19 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
6269

6370
self._old_task_factory = None
6471

65-
# A thread-pool for synchronous function calls. We limit
66-
# the number of threads to 1 so that one Python worker can
67-
# only run one synchronous function in parallel. This is
68-
# because synchronous code in Python is rarely designed with
69-
# concurrency in mind, so we don't want to allow users to
70-
# have races in their synchronous functions. Moreover,
71-
# because of the GIL in CPython, it rarely makes sense to
72-
# use threads (unless the code is IO bound, but we have
73-
# async support for that.)
74-
self._sync_call_tp = concurrent.futures.ThreadPoolExecutor(
75-
max_workers=1)
76-
77-
self._grpc_connect_timeout = grpc_connect_timeout
72+
# We allow the customer to change synchronous thread pool count by
73+
# PYTHON_THREADPOOL_THREAD_COUNT app setting. The default value is 1.
74+
self._sync_tp_max_workers: int = self._get_sync_tp_max_workers()
75+
self._sync_call_tp: concurrent.futures.Executor = (
76+
concurrent.futures.ThreadPoolExecutor(
77+
max_workers=self._sync_tp_max_workers))
78+
79+
self._grpc_connect_timeout: float = grpc_connect_timeout
7880
# This is set to -1 by default to remove the limitation on msg size
79-
self._grpc_max_msg_len = grpc_max_msg_len
81+
self._grpc_max_msg_len: int = grpc_max_msg_len
8082
self._grpc_resp_queue: queue.Queue = queue.Queue()
8183
self._grpc_connected_fut = loop.create_future()
82-
self._grpc_thread = threading.Thread(
84+
self._grpc_thread: threading.Thread = threading.Thread(
8385
name='grpc-thread', target=self.__poll_grpc)
8486

8587
@classmethod
@@ -89,7 +91,9 @@ async def connect(cls, host: str, port: int, worker_id: str,
8991
disp = cls(loop, host, port, worker_id, request_id, connect_timeout)
9092
disp._grpc_thread.start()
9193
await disp._grpc_connected_fut
92-
logger.info('Successfully opened gRPC channel to %s:%s', host, port)
94+
logger.info('Successfully opened gRPC channel to %s:%s '
95+
'with sync threadpool max workers set to %s',
96+
host, port, disp._sync_tp_max_workers)
9397
return disp
9498

9599
async def dispatch_forever(self):
@@ -122,21 +126,21 @@ async def dispatch_forever(self):
122126
# established, should use it for system and user logs
123127
logging_handler = AsyncLoggingHandler()
124128
root_logger = logging.getLogger()
125-
root_logger.setLevel(logging.INFO)
129+
root_logger.setLevel(logging.DEBUG)
126130
root_logger.addHandler(logging_handler)
127131
logger.info('Switched to gRPC logging.')
128132
logging_handler.flush()
129133

130134
try:
131135
await forever
132136
finally:
133-
logger.warn('Detaching gRPC logging due to exception.')
137+
logger.warning('Detaching gRPC logging due to exception.')
134138
logging_handler.flush()
135139
root_logger.removeHandler(logging_handler)
136140

137141
# Reenable console logging when there's an exception
138142
enable_console_logging()
139-
logger.warn('Switched to console logging due to exception.')
143+
logger.warning('Switched to console logging due to exception.')
140144
finally:
141145
DispatcherMeta.__current_dispatcher__ = None
142146

@@ -210,8 +214,8 @@ def _serialize_exception(exc: Exception):
210214
try:
211215
message = f'{type(exc).__name__}: {exc}'
212216
except Exception:
213-
message = (f'Unhandled exception in function. '
214-
f'Could not serialize original exception message.')
217+
message = ('Unhandled exception in function. '
218+
'Could not serialize original exception message.')
215219

216220
try:
217221
stack_trace = marshall_exception_trace(exc)
@@ -475,7 +479,29 @@ def _change_cwd(self, new_cwd: str):
475479
os.chdir(new_cwd)
476480
logger.info('Changing current working directory to %s', new_cwd)
477481
else:
478-
logger.warn('Directory %s is not found when reloading', new_cwd)
482+
logger.warning('Directory %s is not found when reloading', new_cwd)
483+
484+
def _get_sync_tp_max_workers(self) -> int:
485+
def tp_max_workers_validator(value: str) -> bool:
486+
try:
487+
int_value = int(value)
488+
except ValueError:
489+
logger.warning(f'{PYTHON_THREADPOOL_THREAD_COUNT} must be an '
490+
'integer')
491+
return False
492+
493+
if int_value < PYTHON_THREADPOOL_THREAD_COUNT_MIN or (
494+
int_value > PYTHON_THREADPOOL_THREAD_COUNT_MAX):
495+
logger.warning(f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set '
496+
'to a value between 1 and 32')
497+
return False
498+
499+
return True
500+
501+
return int(get_app_setting(
502+
setting=PYTHON_THREADPOOL_THREAD_COUNT,
503+
default_value=f'{PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT}',
504+
validator=tp_max_workers_validator))
479505

480506
def __run_sync_func(self, invocation_id, func, params):
481507
# This helper exists because we need to access the current

azure_functions_worker/main.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,28 @@
55

66
import argparse
77

8-
from ._thirdparty import aio_compat
98
from . import dispatcher
109
from . import logging
10+
from ._thirdparty import aio_compat
1111
from .logging import error_logger, logger
1212

1313

1414
def parse_args():
1515
parser = argparse.ArgumentParser(
1616
description='Python Azure Functions Worker')
17-
parser.add_argument('--host')
18-
parser.add_argument('--port', type=int)
19-
parser.add_argument('--workerId', dest='worker_id')
20-
parser.add_argument('--requestId', dest='request_id')
17+
parser.add_argument('--host',
18+
help="host address")
19+
parser.add_argument('--port', type=int,
20+
help='id for the requests')
21+
parser.add_argument('--workerId', dest='worker_id',
22+
help='id for the worker')
23+
parser.add_argument('--requestId', dest='request_id',
24+
help='log destination: stdout, stderr, '
25+
'syslog, or a file path')
2126
parser.add_argument('--log-level', type=str, default='INFO',
22-
choices=['TRACE', 'INFO', 'WARNING', 'ERROR'],)
27+
choices=['TRACE', 'INFO', 'WARNING', 'ERROR'],
28+
help="log level: 'TRACE', 'INFO', 'WARNING', "
29+
"or 'ERROR'")
2330
parser.add_argument('--log-to', type=str, default=None,
2431
help='log destination: stdout, stderr, '
2532
'syslog, or a file path')
@@ -45,8 +52,9 @@ def main():
4552

4653

4754
async def start_async(host, port, worker_id, request_id):
48-
disp = await dispatcher.Dispatcher.connect(
49-
host, port, worker_id, request_id,
50-
connect_timeout=5.0)
55+
disp = await dispatcher.Dispatcher.connect(host=host, port=port,
56+
worker_id=worker_id,
57+
request_id=request_id,
58+
connect_timeout=5.0)
5159

5260
await disp.dispatch_forever()

azure_functions_worker/testutils.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ def wrapper(self, *args, __meth__=test_case,
148148
# Trim off host output timestamps
149149
host_output = getattr(self, 'host_out', '')
150150
output_lines = host_output.splitlines()
151-
ts_re = r"^\[\d+\/\d+\/\d+ \d+\:\d+\:\d+ (A|P)M\]"
151+
ts_re = r"^\[\d+\/\d+\/\d+ \d+\:\d+\:\d+.*(A|P)*M*\]"
152152
output = list(map(
153153
lambda s: re.sub(ts_re, '', s).strip(),
154154
output_lines))
@@ -171,6 +171,11 @@ class WebHostTestCase(unittest.TestCase, metaclass=WebHostTestCaseMeta):
171171
In addition to automatically starting up a WebHost instance,
172172
this test case class logs WebHost stdout/stderr in case
173173
a unit test fails.
174+
175+
You can write two sets of test - test_* and check_log_* tests.
176+
177+
test_ABC - Unittest
178+
check_log_ABC - Check logs generated during the execution of test_ABC.
174179
"""
175180
host_stdout_logger = logging.getLogger('webhosttests')
176181

@@ -728,7 +733,7 @@ def call(*args, **kwargs):
728733
return decorate
729734

730735

731-
def _remove_path(path):
736+
def remove_path(path):
732737
if path.is_symlink():
733738
path.unlink()
734739
elif path.is_dir():
@@ -738,7 +743,7 @@ def _remove_path(path):
738743

739744

740745
def _symlink_dir(src, dst):
741-
_remove_path(dst)
746+
remove_path(dst)
742747

743748
if ON_WINDOWS:
744749
shutil.copytree(str(src), str(dst))
@@ -751,8 +756,9 @@ def _setup_func_app(app_root):
751756
ping_func = app_root / 'ping'
752757
host_json = app_root / 'host.json'
753758

754-
with open(host_json, 'w') as f:
755-
f.write(HOST_JSON_TEMPLATE)
759+
if not os.path.isfile(host_json):
760+
with open(host_json, 'w') as f:
761+
f.write(HOST_JSON_TEMPLATE)
756762

757763
_symlink_dir(TESTS_ROOT / 'common' / 'ping', ping_func)
758764
_symlink_dir(EXTENSIONS_PATH, extensions)
@@ -764,7 +770,7 @@ def _teardown_func_app(app_root):
764770
host_json = app_root / 'host.json'
765771

766772
for path in (extensions, ping_func, host_json):
767-
_remove_path(path)
773+
remove_path(path)
768774

769775

770776
def _main():

azure_functions_worker/utils/common.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Copyright (c) Microsoft Corporation. All rights reserved.
22
# Licensed under the MIT License.
3+
from typing import Optional, Callable
34
import os
45

56

@@ -15,3 +16,46 @@ def is_envvar_true(env_key: str) -> bool:
1516
return False
1617

1718
return is_true_like(os.environ[env_key])
19+
20+
21+
def get_app_setting(
22+
setting: str,
23+
default_value: Optional[str] = None,
24+
validator: Optional[Callable[[str], bool]] = None
25+
) -> Optional[str]:
26+
"""Returns the application setting from environment variable.
27+
28+
Parameters
29+
----------
30+
setting: str
31+
The name of the application setting (e.g. FUNCTIONS_RUNTIME_VERSION)
32+
33+
default_value: Optional[str]
34+
The expected return value when the application setting is not found,
35+
or the app setting does not pass the validator.
36+
37+
validator: Optional[Callable[[str], bool]]
38+
A function accepts the app setting value and should return True when
39+
the app setting value is acceptable.
40+
41+
Returns
42+
-------
43+
Optional[str]
44+
A string value that is set in the application setting
45+
"""
46+
app_setting_value = os.getenv(setting)
47+
48+
# If an app setting is not configured, we return the default value
49+
if app_setting_value is None:
50+
return default_value
51+
52+
# If there's no validator, we should return the app setting value directly
53+
if validator is None:
54+
return app_setting_value
55+
56+
# If the app setting is set with a validator,
57+
# On True, should return the app setting value
58+
# On False, should return the default value
59+
if validator(app_setting_value):
60+
return app_setting_value
61+
return default_value

setup.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616
from setuptools.command import develop
1717

1818

19-
# TODO: change this to something more stable when available.
19+
# TODO: Change this to something more stable when available.
20+
# TODO: Change this to use 3.x
2021
WEBHOST_URL = (
21-
'https://github.com/Azure/azure-functions-host/releases/download/'
22+
'https://github.com/Azure/azure-functions-host/releases/download'
2223
'/v2.0.14361/Functions.Binaries.2.0.14361.no-runtime.zip'
2324
)
2425

@@ -258,7 +259,7 @@ def run(self):
258259

259260
setup(
260261
name='azure-functions-worker',
261-
version='1.1.5',
262+
version='1.1.6',
262263
description='Python Language Worker for Azure Functions Host',
263264
long_description=long_description,
264265
long_description_content_type='text/markdown',
@@ -286,7 +287,7 @@ def run(self):
286287
],
287288
extras_require={
288289
'dev': [
289-
'azure-functions==1.3.1',
290+
'azure-functions==1.4.0',
290291
'azure-eventhub~=5.1.0',
291292
'python-dateutil~=2.8.1',
292293
'flake8~=3.7.9',
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
import json
4+
import azure.functions as func
5+
6+
7+
def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
8+
result = {
9+
'function_directory': context.function_directory,
10+
'function_name': context.function_name
11+
}
12+
return func.HttpResponse(body=json.dumps(result),
13+
mimetype='application/json')
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"scriptFile": "__init__.py",
3+
"bindings": [
4+
{
5+
"type": "httpTrigger",
6+
"direction": "in",
7+
"name": "req"
8+
},
9+
{
10+
"type": "http",
11+
"direction": "out",
12+
"name": "$return"
13+
}
14+
]
15+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"scriptFile": "main.py",
3+
"bindings": [
4+
{
5+
"type": "httpTrigger",
6+
"direction": "in",
7+
"name": "req"
8+
},
9+
{
10+
"type": "http",
11+
"direction": "out",
12+
"name": "$return"
13+
}
14+
]
15+
}

0 commit comments

Comments
 (0)