Skip to content

Commit 67fe2e3

Browse files
authored
Setting PTPTC to None for Py3.9 only (#791)
* Setting PTPTC to None for Py3.9 only - Added tests for verifying that the default is None only for Py3.9 and not for others. - Using `asynctest` rather than `unittest`. - Adding a skipIf for Python 3.9 tests to make sure they only run for Python 3.9 platform.
1 parent 3f5e62a commit 67fe2e3

File tree

4 files changed

+185
-162
lines changed

4 files changed

+185
-162
lines changed

azure_functions_worker/dispatcher.py

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import threading
1717
from asyncio import BaseEventLoop
1818
from logging import LogRecord
19-
from typing import Optional, List
19+
from typing import List, Optional
2020

2121
import grpc
2222

@@ -75,11 +75,13 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
7575

7676
self._old_task_factory = None
7777

78-
# We allow the customer to change synchronous thread pool count by
79-
# PYTHON_THREADPOOL_THREAD_COUNT app setting. The default value is 1.
80-
self._sync_tp_max_workers: int = self._get_sync_tp_max_workers()
78+
# We allow the customer to change synchronous thread pool max worker
79+
# count by setting the PYTHON_THREADPOOL_THREAD_COUNT app setting.
80+
# For 3.[6|7|8] The default value is 1.
81+
# For 3.9, we don't set this value by default but we honor incoming
82+
# the app setting.
8183
self._sync_call_tp: concurrent.futures.Executor = (
82-
self._create_sync_call_tp(self._sync_tp_max_workers)
84+
self._create_sync_call_tp(self._get_sync_tp_max_workers())
8385
)
8486

8587
self._grpc_connect_timeout: float = grpc_connect_timeout
@@ -90,6 +92,15 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
9092
self._grpc_thread: threading.Thread = threading.Thread(
9193
name='grpc-thread', target=self.__poll_grpc)
9294

95+
def get_sync_tp_workers_set(self):
96+
"""We don't know the exact value of the threadcount set for the Python
97+
3.9 scenarios (as we'll start passing only None by default), and we
98+
need to get that information.
99+
100+
Ref: concurrent.futures.thread.ThreadPoolExecutor.__init__._max_workers
101+
"""
102+
return self._sync_call_tp._max_workers
103+
93104
@classmethod
94105
async def connect(cls, host: str, port: int, worker_id: str,
95106
request_id: str, connect_timeout: float):
@@ -325,7 +336,8 @@ async def _handle__invocation_request(self, req):
325336
]
326337
if not fi.is_async:
327338
function_invocation_logs.append(
328-
f'sync threadpool max workers: {self._sync_tp_max_workers}'
339+
f'sync threadpool max workers: '
340+
f'{self.get_sync_tp_workers_set()}'
329341
)
330342
logger.info(', '.join(function_invocation_logs))
331343

@@ -434,9 +446,8 @@ async def _handle__function_environment_reload_request(self, req):
434446

435447
# Apply PYTHON_THREADPOOL_THREAD_COUNT
436448
self._stop_sync_call_tp()
437-
self._sync_tp_max_workers = self._get_sync_tp_max_workers()
438449
self._sync_call_tp = (
439-
self._create_sync_call_tp(self._sync_tp_max_workers)
450+
self._create_sync_call_tp(self._get_sync_tp_max_workers())
440451
)
441452

442453
# Reload package namespaces for customer's libraries
@@ -501,7 +512,8 @@ def _stop_sync_call_tp(self):
501512
self._sync_call_tp.shutdown()
502513
self._sync_call_tp = None
503514

504-
def _get_sync_tp_max_workers(self) -> int:
515+
@staticmethod
516+
def _get_sync_tp_max_workers() -> Optional[int]:
505517
def tp_max_workers_validator(value: str) -> bool:
506518
try:
507519
int_value = int(value)
@@ -511,20 +523,27 @@ def tp_max_workers_validator(value: str) -> bool:
511523
return False
512524

513525
if int_value < PYTHON_THREADPOOL_THREAD_COUNT_MIN or (
514-
int_value > PYTHON_THREADPOOL_THREAD_COUNT_MAX):
526+
int_value > PYTHON_THREADPOOL_THREAD_COUNT_MAX):
515527
logger.warning(f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set '
516-
'to a value between 1 and 32')
528+
'to a value between 1 and 32. '
529+
'Reverting to default value for max_workers')
517530
return False
518531

519532
return True
520533

521-
return int(get_app_setting(
522-
setting=PYTHON_THREADPOOL_THREAD_COUNT,
523-
default_value=f'{PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT}',
524-
validator=tp_max_workers_validator))
534+
# Starting Python 3.9, worker won't be putting a limit on the
535+
# max_workers count in the created threadpool.
536+
default_value = None if sys.version_info.minor == 9 \
537+
else f'{PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT}'
538+
max_workers = get_app_setting(setting=PYTHON_THREADPOOL_THREAD_COUNT,
539+
default_value=default_value,
540+
validator=tp_max_workers_validator)
541+
542+
# We can box the app setting as int for earlier python versions.
543+
return int(max_workers) if max_workers else None
525544

526545
def _create_sync_call_tp(
527-
self, max_worker: int) -> concurrent.futures.Executor:
546+
self, max_worker: Optional[int]) -> concurrent.futures.Executor:
528547
"""Create a thread pool executor with max_worker. This is a wrapper
529548
over ThreadPoolExecutor constructor. Consider calling this method after
530549
_stop_sync_call_tp() to ensure only 1 synchronous thread pool is

azure_functions_worker/testutils.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import unittest
3030
import uuid
3131

32+
import asynctest as asynctest
3233
import grpc
3334
import requests
3435

@@ -122,7 +123,7 @@ def wrapper(*args, **kwargs):
122123
return wrapper
123124

124125

125-
class AsyncTestCase(unittest.TestCase, metaclass=AsyncTestCaseMeta):
126+
class AsyncTestCase(asynctest.TestCase, metaclass=AsyncTestCaseMeta):
126127
pass
127128

128129

@@ -319,13 +320,12 @@ def __init__(self, loop, scripts_dir):
319320
self._connected_fut = loop.create_future()
320321
self._in_queue = queue.Queue()
321322
self._out_aqueue = asyncio.Queue(loop=self._loop)
322-
self._threadpool = concurrent.futures.ThreadPoolExecutor(
323-
max_workers=1)
323+
self._threadpool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
324324
self._server = grpc.server(self._threadpool)
325325
self._servicer = _MockWebHostServicer(self)
326+
326327
protos.add_FunctionRpcServicer_to_server(self._servicer, self._server)
327328
self._port = self._server.add_insecure_port(f'{LOCALHOST}:0')
328-
329329
self._worker_id = self.make_id()
330330
self._request_id = self.make_id()
331331

@@ -487,10 +487,10 @@ async def __aenter__(self) -> _MockWebHost:
487487

488488
await self._host.start()
489489

490-
self._worker = await dispatcher. \
490+
self._worker = await dispatcher.\
491491
Dispatcher.connect(LOCALHOST, self._host._port,
492-
self._host.worker_id,
493-
self._host.request_id, connect_timeout=5.0)
492+
self._host.worker_id, self._host.request_id,
493+
connect_timeout=5.0)
494494

495495
self._worker_task = loop.create_task(self._worker.dispatch_forever())
496496

setup.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -272,10 +272,10 @@ def run(self):
272272
'License :: OSI Approved :: MIT License',
273273
'Intended Audience :: Developers',
274274
'Programming Language :: Python :: 3',
275-
"Programming Language :: Python :: 3.6",
276-
"Programming Language :: Python :: 3.7",
277-
"Programming Language :: Python :: 3.8",
278-
"Programming Language :: Python :: 3.9",
275+
'Programming Language :: Python :: 3.6',
276+
'Programming Language :: Python :: 3.7',
277+
'Programming Language :: Python :: 3.8',
278+
'Programming Language :: Python :: 3.9',
279279
'Operating System :: Microsoft :: Windows',
280280
'Operating System :: POSIX',
281281
'Operating System :: MacOS :: MacOS X',
@@ -308,7 +308,8 @@ def run(self):
308308
'pytest-xdist',
309309
'pytest-randomly',
310310
'pytest-instafail',
311-
'pytest-rerunfailures'
311+
'pytest-rerunfailures',
312+
'asynctest'
312313
]
313314
},
314315
include_package_data=True,

0 commit comments

Comments
 (0)