Skip to content

Merging dev into master (post 1.1.6) #750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ celerybeat-schedule

# virtualenv (.venv/.venv36/.venv37/.venv38)
.venv*
venv/
venv*/
ENV/
py3env/

Expand Down
8 changes: 7 additions & 1 deletion azure_functions_worker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@
# Debug Flags
PYAZURE_WEBHOST_DEBUG = "PYAZURE_WEBHOST_DEBUG"

# Feature Flags (app settings)
# Python Specific Feature Flags and App Settings
PYTHON_ROLLBACK_CWD_PATH = "PYTHON_ROLLBACK_CWD_PATH"
PYTHON_THREADPOOL_THREAD_COUNT = "PYTHON_THREADPOOL_THREAD_COUNT"

# Setting Defaults
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT = 1
PYTHON_THREADPOOL_THREAD_COUNT_MIN = 1
PYTHON_THREADPOOL_THREAD_COUNT_MAX = 32

# External Site URLs
MODULE_NOT_FOUND_TS_URL = "https://aka.ms/functions-modulenotfound"
72 changes: 49 additions & 23 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@
from . import protos
from . import constants

from .constants import CONSOLE_LOG_PREFIX
from .constants import (
CONSOLE_LOG_PREFIX,
PYTHON_THREADPOOL_THREAD_COUNT,
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT,
PYTHON_THREADPOOL_THREAD_COUNT_MIN,
PYTHON_THREADPOOL_THREAD_COUNT_MAX
)
from .logging import error_logger, logger, is_system_log_category
from .logging import enable_console_logging, disable_console_logging
from .utils.common import get_app_setting
from .utils.tracing import marshall_exception_trace
from .utils.wrappers import disable_feature_by
from asyncio import BaseEventLoop
Expand Down Expand Up @@ -62,24 +69,19 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,

self._old_task_factory = None

# A thread-pool for synchronous function calls. We limit
# the number of threads to 1 so that one Python worker can
# only run one synchronous function in parallel. This is
# because synchronous code in Python is rarely designed with
# concurrency in mind, so we don't want to allow users to
# have races in their synchronous functions. Moreover,
# because of the GIL in CPython, it rarely makes sense to
# use threads (unless the code is IO bound, but we have
# async support for that.)
self._sync_call_tp = concurrent.futures.ThreadPoolExecutor(
max_workers=1)

self._grpc_connect_timeout = grpc_connect_timeout
# We allow the customer to change synchronous thread pool count by
# PYTHON_THREADPOOL_THREAD_COUNT app setting. The default value is 1.
self._sync_tp_max_workers: int = self._get_sync_tp_max_workers()
self._sync_call_tp: concurrent.futures.Executor = (
concurrent.futures.ThreadPoolExecutor(
max_workers=self._sync_tp_max_workers))

self._grpc_connect_timeout: float = grpc_connect_timeout
# This is set to -1 by default to remove the limitation on msg size
self._grpc_max_msg_len = grpc_max_msg_len
self._grpc_max_msg_len: int = grpc_max_msg_len
self._grpc_resp_queue: queue.Queue = queue.Queue()
self._grpc_connected_fut = loop.create_future()
self._grpc_thread = threading.Thread(
self._grpc_thread: threading.Thread = threading.Thread(
name='grpc-thread', target=self.__poll_grpc)

@classmethod
Expand All @@ -89,7 +91,9 @@ async def connect(cls, host: str, port: int, worker_id: str,
disp = cls(loop, host, port, worker_id, request_id, connect_timeout)
disp._grpc_thread.start()
await disp._grpc_connected_fut
logger.info('Successfully opened gRPC channel to %s:%s', host, port)
logger.info('Successfully opened gRPC channel to %s:%s '
'with sync threadpool max workers set to %s',
host, port, disp._sync_tp_max_workers)
return disp

async def dispatch_forever(self):
Expand Down Expand Up @@ -122,21 +126,21 @@ async def dispatch_forever(self):
# established, should use it for system and user logs
logging_handler = AsyncLoggingHandler()
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.setLevel(logging.DEBUG)
root_logger.addHandler(logging_handler)
logger.info('Switched to gRPC logging.')
logging_handler.flush()

try:
await forever
finally:
logger.warn('Detaching gRPC logging due to exception.')
logger.warning('Detaching gRPC logging due to exception.')
logging_handler.flush()
root_logger.removeHandler(logging_handler)

# Reenable console logging when there's an exception
enable_console_logging()
logger.warn('Switched to console logging due to exception.')
logger.warning('Switched to console logging due to exception.')
finally:
DispatcherMeta.__current_dispatcher__ = None

Expand Down Expand Up @@ -210,8 +214,8 @@ def _serialize_exception(exc: Exception):
try:
message = f'{type(exc).__name__}: {exc}'
except Exception:
message = (f'Unhandled exception in function. '
f'Could not serialize original exception message.')
message = ('Unhandled exception in function. '
'Could not serialize original exception message.')

try:
stack_trace = marshall_exception_trace(exc)
Expand Down Expand Up @@ -475,7 +479,29 @@ def _change_cwd(self, new_cwd: str):
os.chdir(new_cwd)
logger.info('Changing current working directory to %s', new_cwd)
else:
logger.warn('Directory %s is not found when reloading', new_cwd)
logger.warning('Directory %s is not found when reloading', new_cwd)

def _get_sync_tp_max_workers(self) -> int:
def tp_max_workers_validator(value: str) -> bool:
try:
int_value = int(value)
except ValueError:
logger.warning(f'{PYTHON_THREADPOOL_THREAD_COUNT} must be an '
'integer')
return False

if int_value < PYTHON_THREADPOOL_THREAD_COUNT_MIN or (
int_value > PYTHON_THREADPOOL_THREAD_COUNT_MAX):
logger.warning(f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set '
'to a value between 1 and 32')
return False

return True

return int(get_app_setting(
setting=PYTHON_THREADPOOL_THREAD_COUNT,
default_value=f'{PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT}',
validator=tp_max_workers_validator))

def __run_sync_func(self, invocation_id, func, params):
# This helper exists because we need to access the current
Expand Down
26 changes: 17 additions & 9 deletions azure_functions_worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,28 @@

import argparse

from ._thirdparty import aio_compat
from . import dispatcher
from . import logging
from ._thirdparty import aio_compat
from .logging import error_logger, logger


def parse_args():
parser = argparse.ArgumentParser(
description='Python Azure Functions Worker')
parser.add_argument('--host')
parser.add_argument('--port', type=int)
parser.add_argument('--workerId', dest='worker_id')
parser.add_argument('--requestId', dest='request_id')
parser.add_argument('--host',
help="host address")
parser.add_argument('--port', type=int,
help='id for the requests')
parser.add_argument('--workerId', dest='worker_id',
help='id for the worker')
parser.add_argument('--requestId', dest='request_id',
help='log destination: stdout, stderr, '
'syslog, or a file path')
parser.add_argument('--log-level', type=str, default='INFO',
choices=['TRACE', 'INFO', 'WARNING', 'ERROR'],)
choices=['TRACE', 'INFO', 'WARNING', 'ERROR'],
help="log level: 'TRACE', 'INFO', 'WARNING', "
"or 'ERROR'")
parser.add_argument('--log-to', type=str, default=None,
help='log destination: stdout, stderr, '
'syslog, or a file path')
Expand All @@ -45,8 +52,9 @@ def main():


async def start_async(host, port, worker_id, request_id):
disp = await dispatcher.Dispatcher.connect(
host, port, worker_id, request_id,
connect_timeout=5.0)
disp = await dispatcher.Dispatcher.connect(host=host, port=port,
worker_id=worker_id,
request_id=request_id,
connect_timeout=5.0)

await disp.dispatch_forever()
18 changes: 12 additions & 6 deletions azure_functions_worker/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def wrapper(self, *args, __meth__=test_case,
# Trim off host output timestamps
host_output = getattr(self, 'host_out', '')
output_lines = host_output.splitlines()
ts_re = r"^\[\d+\/\d+\/\d+ \d+\:\d+\:\d+ (A|P)M\]"
ts_re = r"^\[\d+\/\d+\/\d+ \d+\:\d+\:\d+.*(A|P)*M*\]"
output = list(map(
lambda s: re.sub(ts_re, '', s).strip(),
output_lines))
Expand All @@ -171,6 +171,11 @@ class WebHostTestCase(unittest.TestCase, metaclass=WebHostTestCaseMeta):
In addition to automatically starting up a WebHost instance,
this test case class logs WebHost stdout/stderr in case
a unit test fails.

You can write two sets of test - test_* and check_log_* tests.

test_ABC - Unittest
check_log_ABC - Check logs generated during the execution of test_ABC.
"""
host_stdout_logger = logging.getLogger('webhosttests')

Expand Down Expand Up @@ -728,7 +733,7 @@ def call(*args, **kwargs):
return decorate


def _remove_path(path):
def remove_path(path):
if path.is_symlink():
path.unlink()
elif path.is_dir():
Expand All @@ -738,7 +743,7 @@ def _remove_path(path):


def _symlink_dir(src, dst):
_remove_path(dst)
remove_path(dst)

if ON_WINDOWS:
shutil.copytree(str(src), str(dst))
Expand All @@ -751,8 +756,9 @@ def _setup_func_app(app_root):
ping_func = app_root / 'ping'
host_json = app_root / 'host.json'

with open(host_json, 'w') as f:
f.write(HOST_JSON_TEMPLATE)
if not os.path.isfile(host_json):
with open(host_json, 'w') as f:
f.write(HOST_JSON_TEMPLATE)

_symlink_dir(TESTS_ROOT / 'common' / 'ping', ping_func)
_symlink_dir(EXTENSIONS_PATH, extensions)
Expand All @@ -764,7 +770,7 @@ def _teardown_func_app(app_root):
host_json = app_root / 'host.json'

for path in (extensions, ping_func, host_json):
_remove_path(path)
remove_path(path)


def _main():
Expand Down
44 changes: 44 additions & 0 deletions azure_functions_worker/utils/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from typing import Optional, Callable
import os


Expand All @@ -15,3 +16,46 @@ def is_envvar_true(env_key: str) -> bool:
return False

return is_true_like(os.environ[env_key])


def get_app_setting(
setting: str,
default_value: Optional[str] = None,
validator: Optional[Callable[[str], bool]] = None
) -> Optional[str]:
"""Returns the application setting from environment variable.

Parameters
----------
setting: str
The name of the application setting (e.g. FUNCTIONS_RUNTIME_VERSION)

default_value: Optional[str]
The expected return value when the application setting is not found,
or the app setting does not pass the validator.

validator: Optional[Callable[[str], bool]]
A function accepts the app setting value and should return True when
the app setting value is acceptable.

Returns
-------
Optional[str]
A string value that is set in the application setting
"""
app_setting_value = os.getenv(setting)

# If an app setting is not configured, we return the default value
if app_setting_value is None:
return default_value

# If there's no validator, we should return the app setting value directly
if validator is None:
return app_setting_value

# If the app setting is set with a validator,
# On True, should return the app setting value
# On False, should return the default value
if validator(app_setting_value):
return app_setting_value
return default_value
9 changes: 5 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
from setuptools.command import develop


# TODO: change this to something more stable when available.
# TODO: Change this to something more stable when available.
# TODO: Change this to use 3.x
WEBHOST_URL = (
'https://github.com/Azure/azure-functions-host/releases/download/'
'https://github.com/Azure/azure-functions-host/releases/download'
'/v2.0.14361/Functions.Binaries.2.0.14361.no-runtime.zip'
)

Expand Down Expand Up @@ -258,7 +259,7 @@ def run(self):

setup(
name='azure-functions-worker',
version='1.1.5',
version='1.1.6',
description='Python Language Worker for Azure Functions Host',
long_description=long_description,
long_description_content_type='text/markdown',
Expand Down Expand Up @@ -286,7 +287,7 @@ def run(self):
],
extras_require={
'dev': [
'azure-functions==1.3.1',
'azure-functions==1.4.0',
'azure-eventhub~=5.1.0',
'python-dateutil~=2.8.1',
'flake8~=3.7.9',
Expand Down
13 changes: 13 additions & 0 deletions tests/unittests/dispatcher_functions/show_context/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import json
import azure.functions as func


def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
result = {
'function_directory': context.function_directory,
'function_name': context.function_name
}
return func.HttpResponse(body=json.dumps(result),
mimetype='application/json')
15 changes: 15 additions & 0 deletions tests/unittests/dispatcher_functions/show_context/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"name": "req"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
15 changes: 15 additions & 0 deletions tests/unittests/http_functions/debug_logging/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"scriptFile": "main.py",
"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"name": "req"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
Loading