From 766050b00f080c0d090ee182887dd05fe5d2bcd4 Mon Sep 17 00:00:00 2001 From: "Varad Meru [gmail]" Date: Thu, 10 Dec 2020 00:22:47 -0800 Subject: [PATCH 1/4] Setting PTPTC to None for Py3.9 only - Made the changes for Python 3.9 only. - Added tests for verifying that the default is None only for Py3.9 and not for others. --- azure_functions_worker/dispatcher.py | 51 ++++-- azure_functions_worker/testutils.py | 11 +- tests/unittests/test_dispatcher.py | 261 +++++++++++++-------------- 3 files changed, 166 insertions(+), 157 deletions(-) diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index 16f4da348..268361509 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -16,7 +16,7 @@ import threading from asyncio import BaseEventLoop from logging import LogRecord -from typing import Optional, List +from typing import List, Optional import grpc @@ -75,11 +75,13 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int, self._old_task_factory = None - # We allow the customer to change synchronous thread pool count by - # PYTHON_THREADPOOL_THREAD_COUNT app setting. The default value is 1. - self._sync_tp_max_workers: int = self._get_sync_tp_max_workers() + # We allow the customer to change synchronous thread pool max worker + # count by setting the PYTHON_THREADPOOL_THREAD_COUNT app setting. + # For 3.[6|7|8] The default value is 1. + # For 3.9, we don't set this value by default but we honor incoming + # the app setting. self._sync_call_tp: concurrent.futures.Executor = ( - self._create_sync_call_tp(self._sync_tp_max_workers) + self._create_sync_call_tp(self._get_sync_tp_max_workers()) ) self._grpc_connect_timeout: float = grpc_connect_timeout @@ -90,6 +92,15 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int, self._grpc_thread: threading.Thread = threading.Thread( name='grpc-thread', target=self.__poll_grpc) + def get_sync_tp_workers_set(self): + """We don't know the exact value of the threadcount set for the Python + 3.9 scenarios (as we'll start passing only None by default), and we + need to get that information. + + Ref: concurrent.futures.thread.ThreadPoolExecutor.__init__._max_workers + """ + return self._sync_call_tp._max_workers + @classmethod async def connect(cls, host: str, port: int, worker_id: str, request_id: str, connect_timeout: float): @@ -325,7 +336,8 @@ async def _handle__invocation_request(self, req): ] if not fi.is_async: function_invocation_logs.append( - f'sync threadpool max workers: {self._sync_tp_max_workers}' + f'sync threadpool max workers: ' + f'{self.get_sync_tp_workers_set()}' ) logger.info(', '.join(function_invocation_logs)) @@ -434,9 +446,8 @@ async def _handle__function_environment_reload_request(self, req): # Apply PYTHON_THREADPOOL_THREAD_COUNT self._stop_sync_call_tp() - self._sync_tp_max_workers = self._get_sync_tp_max_workers() self._sync_call_tp = ( - self._create_sync_call_tp(self._sync_tp_max_workers) + self._create_sync_call_tp(self._get_sync_tp_max_workers()) ) # Reload package namespaces for customer's libraries @@ -501,7 +512,8 @@ def _stop_sync_call_tp(self): self._sync_call_tp.shutdown() self._sync_call_tp = None - def _get_sync_tp_max_workers(self) -> int: + @staticmethod + def _get_sync_tp_max_workers() -> Optional[int]: def tp_max_workers_validator(value: str) -> bool: try: int_value = int(value) @@ -511,20 +523,27 @@ def tp_max_workers_validator(value: str) -> bool: return False if int_value < PYTHON_THREADPOOL_THREAD_COUNT_MIN or ( - int_value > PYTHON_THREADPOOL_THREAD_COUNT_MAX): + int_value > PYTHON_THREADPOOL_THREAD_COUNT_MAX): logger.warning(f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set ' - 'to a value between 1 and 32') + 'to a value between 1 and 32. ' + 'Reverting to default value for max_workers') return False return True - return int(get_app_setting( - setting=PYTHON_THREADPOOL_THREAD_COUNT, - default_value=f'{PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT}', - validator=tp_max_workers_validator)) + # Starting Python 3.9, worker won't be putting a limit on the + # max_workers count in the created threadpool. + default_value = None if sys.version_info.minor == 9 \ + else f'{PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT}' + max_workers = get_app_setting(setting=PYTHON_THREADPOOL_THREAD_COUNT, + default_value=default_value, + validator=tp_max_workers_validator) + + # We can box the app setting as int for earlier python versions. + return int(max_workers) if max_workers else None def _create_sync_call_tp( - self, max_worker: int) -> concurrent.futures.Executor: + self, max_worker: Optional[int]) -> concurrent.futures.Executor: """Create a thread pool executor with max_worker. This is a wrapper over ThreadPoolExecutor constructor. Consider calling this method after _stop_sync_call_tp() to ensure only 1 synchronous thread pool is diff --git a/azure_functions_worker/testutils.py b/azure_functions_worker/testutils.py index c32ae467e..ca065b251 100644 --- a/azure_functions_worker/testutils.py +++ b/azure_functions_worker/testutils.py @@ -319,13 +319,12 @@ def __init__(self, loop, scripts_dir): self._connected_fut = loop.create_future() self._in_queue = queue.Queue() self._out_aqueue = asyncio.Queue(loop=self._loop) - self._threadpool = concurrent.futures.ThreadPoolExecutor( - max_workers=1) + self._threadpool = concurrent.futures.ThreadPoolExecutor(max_workers=1) self._server = grpc.server(self._threadpool) self._servicer = _MockWebHostServicer(self) + protos.add_FunctionRpcServicer_to_server(self._servicer, self._server) self._port = self._server.add_insecure_port(f'{LOCALHOST}:0') - self._worker_id = self.make_id() self._request_id = self.make_id() @@ -487,10 +486,10 @@ async def __aenter__(self) -> _MockWebHost: await self._host.start() - self._worker = await dispatcher. \ + self._worker = await dispatcher.\ Dispatcher.connect(LOCALHOST, self._host._port, - self._host.worker_id, - self._host.request_id, connect_timeout=5.0) + self._host.worker_id, self._host.request_id, + connect_timeout=5.0) self._worker_task = loop.create_task(self._worker.dispatch_forever()) diff --git a/tests/unittests/test_dispatcher.py b/tests/unittests/test_dispatcher.py index df259353b..5cc52f6c6 100644 --- a/tests/unittests/test_dispatcher.py +++ b/tests/unittests/test_dispatcher.py @@ -1,55 +1,70 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from typing import Tuple +import collections as col import os +from typing import Optional, Tuple from unittest.mock import patch + from azure_functions_worker import protos from azure_functions_worker import testutils -from azure_functions_worker.constants import PYTHON_THREADPOOL_THREAD_COUNT +from azure_functions_worker.constants import PYTHON_THREADPOOL_THREAD_COUNT, \ + PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT + +SysVersionInfo = col.namedtuple("VersionInfo", ["major", "minor", "micro", + "releaselevel", "serial"]) +DISPATCHER_FUNCTIONS_DIR = testutils.UNIT_TESTS_FOLDER / 'dispatcher_functions' + +class TestThreadPoolSettingsPython37(testutils.AsyncTestCase): + """Base test class for testing thread pool settings for sync threadpool + worker count. This class specifically sets sys.version_info to return as + Python 3.7 and extended classes change this value and other platform + specific values to test the behavior across the different python versions. -class TestDispatcher(testutils.AsyncTestCase): - dispatcher_funcs_dir = testutils.UNIT_TESTS_FOLDER / 'dispatcher_functions' + - Why not python 3.6? + - In Azure.Functions (library), the typing_inspect module imports specific + modules which are not available on systems where Python 3.7+ is installed. + Ref: + NEW_TYPING = sys.version_info[:3] >= (3, 7, 0) # PEP 560 + """ def setUp(self): + self._ctrl = testutils.start_mockhost( + script_root=DISPATCHER_FUNCTIONS_DIR) + self._default_workers: Optional[ + int] = PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT self._pre_env = dict(os.environ) + self.mock_version_info = patch( + 'azure_functions_worker.dispatcher.sys.version_info', + SysVersionInfo(3, 7, 0, 'final', 0)) + self.mock_version_info.start() def tearDown(self): os.environ.clear() os.environ.update(self._pre_env) + self.mock_version_info.stop() async def test_dispatcher_sync_threadpool_default_worker(self): - """Test if the sync threadpool has maximum worker count set to 1 - by default + """Test if the sync threadpool has maximum worker count set the + correct default value """ - ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) - - async with ctrl as host: - await self._check_if_function_is_ok(host) - self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) - self.assertIsNotNone(ctrl._worker._sync_call_tp) - - # Check if the dispatcher still function - await self._check_if_function_is_ok(host) + async with self._ctrl as host: + # await self._check_if_function_is_ok(host) + await self._assert_workers_threadpool(self._ctrl, host, + self._default_workers) async def test_dispatcher_sync_threadpool_set_worker(self): """Test if the sync threadpool maximum worker can be set """ # Configure thread pool max worker os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '5'}) - ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) - - async with ctrl as host: - await self._check_if_function_is_ok(host) - self.assertEqual(ctrl._worker._sync_tp_max_workers, 5) - self.assertIsNotNone(ctrl._worker._sync_call_tp) - - # Check if the dispatcher still function + async with self._ctrl as host: await self._check_if_function_is_ok(host) + await self._assert_workers_threadpool(self._ctrl, host, 5) async def test_dispatcher_sync_threadpool_invalid_worker_count(self): """Test when sync threadpool maximum worker is set to an invalid value, - the host should fallback to default value 1 + the host should fallback to default value """ # The @patch decorator does not work as expected and will suppress # any assertion failures in the async test cases. @@ -58,19 +73,11 @@ async def test_dispatcher_sync_threadpool_invalid_worker_count(self): with patch('azure_functions_worker.dispatcher.logger') as mock_logger: # Configure thread pool max worker to an invalid value os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: 'invalid'}) - ctrl = testutils.start_mockhost( - script_root=self.dispatcher_funcs_dir) - - async with ctrl as host: - await self._check_if_function_is_ok(host) - - # Ensure the dispatcher sync threadpool should fallback to 1 - self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) - self.assertIsNotNone(ctrl._worker._sync_call_tp) - # Check if the dispatcher still function + async with self._ctrl as host: await self._check_if_function_is_ok(host) - + await self._assert_workers_threadpool(self._ctrl, host, + self._default_workers) mock_logger.warning.assert_any_call( f'{PYTHON_THREADPOOL_THREAD_COUNT} must be an integer') @@ -78,96 +85,65 @@ async def test_dispatcher_sync_threadpool_below_min_setting(self): """Test if the sync threadpool will pick up default value when the setting is below minimum """ - with patch('azure_functions_worker.dispatcher.logger') as mock_logger: # Configure thread pool max worker to an invalid value os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '0'}) - ctrl = testutils.start_mockhost( - script_root=self.dispatcher_funcs_dir) - - async with ctrl as host: + async with self._ctrl as host: await self._check_if_function_is_ok(host) - - # Ensure the dispatcher sync threadpool should fallback to 1 - self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) - self.assertIsNotNone(ctrl._worker._sync_call_tp) - - # Check if the dispatcher still function - await self._check_if_function_is_ok(host) - + await self._assert_workers_threadpool(self._ctrl, host, + self._default_workers) mock_logger.warning.assert_any_call( - f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value ' - 'between 1 and 32') + f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value ' + 'between 1 and 32. ' + 'Reverting to default value for max_workers') async def test_dispatcher_sync_threadpool_exceed_max_setting(self): """Test if the sync threadpool will pick up default value when the setting is above maximum """ - with patch('azure_functions_worker.dispatcher.logger') as mock_logger: # Configure thread pool max worker to an invalid value os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '33'}) - ctrl = testutils.start_mockhost( - script_root=self.dispatcher_funcs_dir) - - async with ctrl as host: + async with self._ctrl as host: await self._check_if_function_is_ok(host) # Ensure the dispatcher sync threadpool should fallback to 1 - self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) - self.assertIsNotNone(ctrl._worker._sync_call_tp) - - # Check if the dispatcher still function - await self._check_if_function_is_ok(host) + await self._assert_workers_threadpool(self._ctrl, host, + self._default_workers) mock_logger.warning.assert_any_call( f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value ' - 'between 1 and 32') + 'between 1 and 32. ' + 'Reverting to default value for max_workers') async def test_dispatcher_sync_threadpool_in_placeholder(self): """Test if the sync threadpool will pick up app setting in placeholder mode (Linux Consumption) """ - - ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) - - async with ctrl as host: + async with self._ctrl as host: await self._check_if_function_is_ok(host) # Reload environment variable on specialization await host.reload_environment(environment={ PYTHON_THREADPOOL_THREAD_COUNT: '3' }) - - # Ensure the dispatcher sync threadpool should update to 3 - self.assertEqual(ctrl._worker._sync_tp_max_workers, 3) - self.assertIsNotNone(ctrl._worker._sync_call_tp) - - # Check if the dispatcher still function - await self._check_if_function_is_ok(host) + # Ensure the dispatcher sync threadpool should fallback to 1 + await self._assert_workers_threadpool(self._ctrl, host, 3) async def test_dispatcher_sync_threadpool_in_placeholder_invalid(self): """Test if the sync threadpool will use the default setting when the app setting is invalid """ - - ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) - with patch('azure_functions_worker.dispatcher.logger') as mock_logger: - async with ctrl as host: + async with self._ctrl as host: await self._check_if_function_is_ok(host) # Reload environment variable on specialization await host.reload_environment(environment={ PYTHON_THREADPOOL_THREAD_COUNT: 'invalid' }) - - # Ensure the dispatcher sync threadpool should fallback to 1 - self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) - self.assertIsNotNone(ctrl._worker._sync_call_tp) - - # Check if the dispatcher still function - await self._check_if_function_is_ok(host) + await self._assert_workers_threadpool(self._ctrl, host, + self._default_workers) # Check warning message mock_logger.warning.assert_any_call( @@ -177,39 +153,29 @@ async def test_dispatcher_sync_threadpool_in_placeholder_above_max(self): """Test if the sync threadpool will use the default setting when the app setting is above maximum """ - - ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) - with patch('azure_functions_worker.dispatcher.logger') as mock_logger: - async with ctrl as host: + async with self._ctrl as host: await self._check_if_function_is_ok(host) # Reload environment variable on specialization await host.reload_environment(environment={ PYTHON_THREADPOOL_THREAD_COUNT: '33' }) + await self._assert_workers_threadpool(self._ctrl, host, + self._default_workers) - # Ensure the dispatcher sync threadpool should fallback to 1 - self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) - self.assertIsNotNone(ctrl._worker._sync_call_tp) - - # Check if the dispatcher still function - await self._check_if_function_is_ok(host) - - # Check warning message mock_logger.warning.assert_any_call( - f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value ' - 'between 1 and 32') + f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a ' + f'value ' + 'between 1 and 32. ' + 'Reverting to default value for max_workers') async def test_dispatcher_sync_threadpool_in_placeholder_below_min(self): """Test if the sync threadpool will use the default setting when the app setting is below minimum """ - - ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) - with patch('azure_functions_worker.dispatcher.logger') as mock_logger: - async with ctrl as host: + async with self._ctrl as host: await self._check_if_function_is_ok(host) # Reload environment variable on specialization @@ -217,24 +183,19 @@ async def test_dispatcher_sync_threadpool_in_placeholder_below_min(self): PYTHON_THREADPOOL_THREAD_COUNT: '0' }) - # Ensure the dispatcher sync threadpool should fallback to 1 - self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) - self.assertIsNotNone(ctrl._worker._sync_call_tp) - - # Check if the dispatcher still function - await self._check_if_function_is_ok(host) + await self._assert_workers_threadpool(self._ctrl, host, + self._default_workers) - # Check warning message mock_logger.warning.assert_any_call( - f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value ' - 'between 1 and 32') + f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a ' + f'value ' + 'between 1 and 32. ' + 'Reverting to default value for max_workers') async def test_sync_invocation_request_log(self): - ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) - with patch('azure_functions_worker.dispatcher.logger') as mock_logger: - async with ctrl as host: - request_id: str = ctrl._worker._request_id + async with self._ctrl as host: + request_id: str = self._ctrl._worker._request_id func_id, invoke_id = await self._check_if_function_is_ok(host) mock_logger.info.assert_any_call( @@ -243,15 +204,13 @@ async def test_sync_invocation_request_log(self): f'function ID: {func_id}, ' f'invocation ID: {invoke_id}, ' 'function type: sync, ' - 'sync threadpool max workers: 1' + f'sync threadpool max workers: {self._default_workers}' ) async def test_async_invocation_request_log(self): - ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) - with patch('azure_functions_worker.dispatcher.logger') as mock_logger: - async with ctrl as host: - request_id: str = ctrl._worker._request_id + async with self._ctrl as host: + request_id: str = self._ctrl._worker._request_id func_id, invoke_id = ( await self._check_if_async_function_is_ok(host) ) @@ -265,12 +224,11 @@ async def test_async_invocation_request_log(self): ) async def test_sync_invocation_request_log_threads(self): - ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '5'}) with patch('azure_functions_worker.dispatcher.logger') as mock_logger: - async with ctrl as host: - request_id: str = ctrl._worker._request_id + async with self._ctrl as host: + request_id: str = self._ctrl._worker._request_id func_id, invoke_id = await self._check_if_function_is_ok(host) mock_logger.info.assert_any_call( @@ -283,12 +241,11 @@ async def test_sync_invocation_request_log_threads(self): ) async def test_async_invocation_request_log_threads(self): - ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '4'}) with patch('azure_functions_worker.dispatcher.logger') as mock_logger: - async with ctrl as host: - request_id: str = ctrl._worker._request_id + async with self._ctrl as host: + request_id: str = self._ctrl._worker._request_id func_id, invoke_id = ( await self._check_if_async_function_is_ok(host) ) @@ -302,15 +259,13 @@ async def test_async_invocation_request_log_threads(self): ) async def test_sync_invocation_request_log_in_placeholder_threads(self): - ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) - with patch('azure_functions_worker.dispatcher.logger') as mock_logger: - async with ctrl as host: + async with self._ctrl as host: await host.reload_environment(environment={ PYTHON_THREADPOOL_THREAD_COUNT: '5' }) - request_id: str = ctrl._worker._request_id + request_id: str = self._ctrl._worker._request_id func_id, invoke_id = await self._check_if_function_is_ok(host) mock_logger.info.assert_any_call( @@ -323,15 +278,13 @@ async def test_sync_invocation_request_log_in_placeholder_threads(self): ) async def test_async_invocation_request_log_in_placeholder_threads(self): - ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) - with patch('azure_functions_worker.dispatcher.logger') as mock_logger: - async with ctrl as host: + async with self._ctrl as host: await host.reload_environment(environment={ PYTHON_THREADPOOL_THREAD_COUNT: '5' }) - request_id: str = ctrl._worker._request_id + request_id: str = self._ctrl._worker._request_id func_id, invoke_id = ( await self._check_if_async_function_is_ok(host) ) @@ -344,9 +297,19 @@ async def test_async_invocation_request_log_in_placeholder_threads(self): 'function type: async' ) + async def _assert_workers_threadpool(self, ctrl, host, expected_worker_count): + self.assertIsNotNone(ctrl._worker._sync_call_tp) + self.assertEqual(ctrl._worker.get_sync_tp_workers_set(), + expected_worker_count) + # Check if the dispatcher still function + await self._check_if_function_is_ok(host) + async def _check_if_function_is_ok(self, host) -> Tuple[str, str]: # Ensure the function can be properly loaded func_id, load_r = await host.load_function('show_context') + print("%%%%%%%%%%%") + print(load_r) + print("%%%%%%%%%%%") self.assertEqual(load_r.response.function_id, func_id) self.assertEqual(load_r.response.result.status, protos.StatusResult.Success) @@ -367,7 +330,7 @@ async def _check_if_function_is_ok(self, host) -> Tuple[str, str]: self.assertEqual(call_r.response.result.status, protos.StatusResult.Success) - return (func_id, invoke_id) + return func_id, invoke_id async def _check_if_async_function_is_ok(self, host) -> Tuple[str, str]: # Ensure the function can be properly loaded @@ -392,4 +355,32 @@ async def _check_if_async_function_is_ok(self, host) -> Tuple[str, str]: self.assertEqual(call_r.response.result.status, protos.StatusResult.Success) - return (func_id, invoke_id) + return func_id, invoke_id + + +class TestThreadPoolSettingsPython38(TestThreadPoolSettingsPython37): + def setUp(self): + super(TestThreadPoolSettingsPython38, self).setUp() + self.mock_version_info = patch( + 'azure_functions_worker.dispatcher.sys.version_info', + SysVersionInfo(3, 8, 0, 'final', 0)) + self.mock_version_info.start() + + def tearDown(self): + self.mock_version_info.stop() + super(TestThreadPoolSettingsPython38, self).tearDown() + + +class TestThreadPoolSettingsPython39(TestThreadPoolSettingsPython37): + def setUp(self): + super(TestThreadPoolSettingsPython39, self).setUp() + self._default_workers: Optional[ + int] = min(32, (os.cpu_count() or 1) + 4) + self.mock_version_info = patch( + 'azure_functions_worker.dispatcher.sys.version_info', + SysVersionInfo(3, 9, 0, 'final', 0)) + self.mock_version_info.start() + + def tearDown(self): + self.mock_version_info.stop() + super(TestThreadPoolSettingsPython39, self).tearDown() From d2e4cbc90c589b659199ef023343d94b38f5523b Mon Sep 17 00:00:00 2001 From: "Varad Meru [gmail]" Date: Thu, 10 Dec 2020 01:38:10 -0800 Subject: [PATCH 2/4] Using asynctest rather than unittest. --- azure_functions_worker/testutils.py | 3 ++- setup.py | 11 ++++++----- tests/unittests/test_dispatcher.py | 18 +++++++++++------- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/azure_functions_worker/testutils.py b/azure_functions_worker/testutils.py index ca065b251..64444cf58 100644 --- a/azure_functions_worker/testutils.py +++ b/azure_functions_worker/testutils.py @@ -29,6 +29,7 @@ import unittest import uuid +import asynctest as asynctest import grpc import requests @@ -122,7 +123,7 @@ def wrapper(*args, **kwargs): return wrapper -class AsyncTestCase(unittest.TestCase, metaclass=AsyncTestCaseMeta): +class AsyncTestCase(asynctest.TestCase, metaclass=AsyncTestCaseMeta): pass diff --git a/setup.py b/setup.py index 56e0fce93..1097734bc 100644 --- a/setup.py +++ b/setup.py @@ -272,10 +272,10 @@ def run(self): 'License :: OSI Approved :: MIT License', 'Intended Audience :: Developers', 'Programming Language :: Python :: 3', - "Programming Language :: Python :: 3.6", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', 'Operating System :: Microsoft :: Windows', 'Operating System :: POSIX', 'Operating System :: MacOS :: MacOS X', @@ -308,7 +308,8 @@ def run(self): 'pytest-xdist', 'pytest-randomly', 'pytest-instafail', - 'pytest-rerunfailures' + 'pytest-rerunfailures', + 'asynctest' ] }, include_package_data=True, diff --git a/tests/unittests/test_dispatcher.py b/tests/unittests/test_dispatcher.py index 5cc52f6c6..895273002 100644 --- a/tests/unittests/test_dispatcher.py +++ b/tests/unittests/test_dispatcher.py @@ -307,9 +307,6 @@ async def _assert_workers_threadpool(self, ctrl, host, expected_worker_count): async def _check_if_function_is_ok(self, host) -> Tuple[str, str]: # Ensure the function can be properly loaded func_id, load_r = await host.load_function('show_context') - print("%%%%%%%%%%%") - print(load_r) - print("%%%%%%%%%%%") self.assertEqual(load_r.response.function_id, func_id) self.assertEqual(load_r.response.result.status, protos.StatusResult.Success) @@ -367,20 +364,27 @@ def setUp(self): self.mock_version_info.start() def tearDown(self): + os.environ.clear() + os.environ.update(self._pre_env) self.mock_version_info.stop() - super(TestThreadPoolSettingsPython38, self).tearDown() class TestThreadPoolSettingsPython39(TestThreadPoolSettingsPython37): def setUp(self): super(TestThreadPoolSettingsPython39, self).setUp() - self._default_workers: Optional[ - int] = min(32, (os.cpu_count() or 1) + 4) + + # 6 - based on 2 cores - min(32, (os.cpu_count() or 1) + 4) - 2 + 4 + self._default_workers: Optional[int] = 6 + self.mock_os_cpu = patch( + 'azure_functions_worker.dispatcher.os.cpu_count', 2) + self.mock_version_info = patch( 'azure_functions_worker.dispatcher.sys.version_info', SysVersionInfo(3, 9, 0, 'final', 0)) self.mock_version_info.start() def tearDown(self): + os.environ.clear() + os.environ.update(self._pre_env) + self.mock_os_cpu.stop() self.mock_version_info.stop() - super(TestThreadPoolSettingsPython39, self).tearDown() From 669905980018a39b41cfa3391f752852795be947 Mon Sep 17 00:00:00 2001 From: "Varad Meru [gmail]" Date: Thu, 10 Dec 2020 10:57:13 -0800 Subject: [PATCH 3/4] nit fixes; mock start and flake8 errors. --- tests/unittests/test_dispatcher.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/unittests/test_dispatcher.py b/tests/unittests/test_dispatcher.py index 895273002..357a60421 100644 --- a/tests/unittests/test_dispatcher.py +++ b/tests/unittests/test_dispatcher.py @@ -93,9 +93,8 @@ async def test_dispatcher_sync_threadpool_below_min_setting(self): await self._assert_workers_threadpool(self._ctrl, host, self._default_workers) mock_logger.warning.assert_any_call( - f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value ' - 'between 1 and 32. ' - 'Reverting to default value for max_workers') + f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value ' + 'between 1 and 32. Reverting to default value for max_workers') async def test_dispatcher_sync_threadpool_exceed_max_setting(self): """Test if the sync threadpool will pick up default value when the @@ -297,7 +296,8 @@ async def test_async_invocation_request_log_in_placeholder_threads(self): 'function type: async' ) - async def _assert_workers_threadpool(self, ctrl, host, expected_worker_count): + async def _assert_workers_threadpool(self, ctrl, host, + expected_worker_count): self.assertIsNotNone(ctrl._worker._sync_call_tp) self.assertEqual(ctrl._worker.get_sync_tp_workers_set(), expected_worker_count) @@ -373,10 +373,12 @@ class TestThreadPoolSettingsPython39(TestThreadPoolSettingsPython37): def setUp(self): super(TestThreadPoolSettingsPython39, self).setUp() + self.mock_os_cpu = patch( + 'azure_functions_worker.dispatcher.os.cpu_count', + return_value=2) + self.mock_os_cpu.start() # 6 - based on 2 cores - min(32, (os.cpu_count() or 1) + 4) - 2 + 4 self._default_workers: Optional[int] = 6 - self.mock_os_cpu = patch( - 'azure_functions_worker.dispatcher.os.cpu_count', 2) self.mock_version_info = patch( 'azure_functions_worker.dispatcher.sys.version_info', From a2bc850bdacf4ff19e17ca4529d2c71fb4dabe7b Mon Sep 17 00:00:00 2001 From: "Varad Meru [gmail]" Date: Thu, 10 Dec 2020 15:03:57 -0800 Subject: [PATCH 4/4] Adding a skipIf for Python 3.9 tests --- tests/unittests/test_dispatcher.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/unittests/test_dispatcher.py b/tests/unittests/test_dispatcher.py index 357a60421..f22d583eb 100644 --- a/tests/unittests/test_dispatcher.py +++ b/tests/unittests/test_dispatcher.py @@ -2,6 +2,8 @@ # Licensed under the MIT License. import collections as col import os +import sys +import unittest from typing import Optional, Tuple from unittest.mock import patch @@ -369,13 +371,17 @@ def tearDown(self): self.mock_version_info.stop() +@unittest.skipIf(sys.version_info.minor != 9, + "Run the tests only for Python 3.9. In other platforms, " + "as the default passed is None, the cpu_count determines the " + "number of max_workers and we cannot mock the os.cpu_count() " + "in the concurrent.futures.ThreadPoolExecutor") class TestThreadPoolSettingsPython39(TestThreadPoolSettingsPython37): def setUp(self): super(TestThreadPoolSettingsPython39, self).setUp() self.mock_os_cpu = patch( - 'azure_functions_worker.dispatcher.os.cpu_count', - return_value=2) + 'os.cpu_count', return_value=2) self.mock_os_cpu.start() # 6 - based on 2 cores - min(32, (os.cpu_count() or 1) + 4) - 2 + 4 self._default_workers: Optional[int] = 6