Skip to content

Setting PTPTC to None for Py3.9 only #791

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 35 additions & 16 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import threading
from asyncio import BaseEventLoop
from logging import LogRecord
from typing import Optional, List
from typing import List, Optional

import grpc

Expand Down Expand Up @@ -75,11 +75,13 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,

self._old_task_factory = None

# We allow the customer to change synchronous thread pool count by
# PYTHON_THREADPOOL_THREAD_COUNT app setting. The default value is 1.
self._sync_tp_max_workers: int = self._get_sync_tp_max_workers()
# We allow the customer to change synchronous thread pool max worker
# count by setting the PYTHON_THREADPOOL_THREAD_COUNT app setting.
# For 3.[6|7|8] The default value is 1.
# For 3.9, we don't set this value by default but we honor incoming
# the app setting.
self._sync_call_tp: concurrent.futures.Executor = (
self._create_sync_call_tp(self._sync_tp_max_workers)
self._create_sync_call_tp(self._get_sync_tp_max_workers())
)

self._grpc_connect_timeout: float = grpc_connect_timeout
Expand All @@ -90,6 +92,15 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
self._grpc_thread: threading.Thread = threading.Thread(
name='grpc-thread', target=self.__poll_grpc)

def get_sync_tp_workers_set(self):
"""We don't know the exact value of the threadcount set for the Python
3.9 scenarios (as we'll start passing only None by default), and we
need to get that information.

Ref: concurrent.futures.thread.ThreadPoolExecutor.__init__._max_workers
"""
return self._sync_call_tp._max_workers

@classmethod
async def connect(cls, host: str, port: int, worker_id: str,
request_id: str, connect_timeout: float):
Expand Down Expand Up @@ -325,7 +336,8 @@ async def _handle__invocation_request(self, req):
]
if not fi.is_async:
function_invocation_logs.append(
f'sync threadpool max workers: {self._sync_tp_max_workers}'
f'sync threadpool max workers: '
f'{self.get_sync_tp_workers_set()}'
)
logger.info(', '.join(function_invocation_logs))

Expand Down Expand Up @@ -434,9 +446,8 @@ async def _handle__function_environment_reload_request(self, req):

# Apply PYTHON_THREADPOOL_THREAD_COUNT
self._stop_sync_call_tp()
self._sync_tp_max_workers = self._get_sync_tp_max_workers()
self._sync_call_tp = (
self._create_sync_call_tp(self._sync_tp_max_workers)
self._create_sync_call_tp(self._get_sync_tp_max_workers())
)

# Reload package namespaces for customer's libraries
Expand Down Expand Up @@ -501,7 +512,8 @@ def _stop_sync_call_tp(self):
self._sync_call_tp.shutdown()
self._sync_call_tp = None

def _get_sync_tp_max_workers(self) -> int:
@staticmethod
def _get_sync_tp_max_workers() -> Optional[int]:
def tp_max_workers_validator(value: str) -> bool:
try:
int_value = int(value)
Expand All @@ -511,20 +523,27 @@ def tp_max_workers_validator(value: str) -> bool:
return False

if int_value < PYTHON_THREADPOOL_THREAD_COUNT_MIN or (
int_value > PYTHON_THREADPOOL_THREAD_COUNT_MAX):
int_value > PYTHON_THREADPOOL_THREAD_COUNT_MAX):
logger.warning(f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set '
'to a value between 1 and 32')
'to a value between 1 and 32. '
'Reverting to default value for max_workers')
return False

return True

return int(get_app_setting(
setting=PYTHON_THREADPOOL_THREAD_COUNT,
default_value=f'{PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT}',
validator=tp_max_workers_validator))
# Starting Python 3.9, worker won't be putting a limit on the
# max_workers count in the created threadpool.
default_value = None if sys.version_info.minor == 9 \
else f'{PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT}'
max_workers = get_app_setting(setting=PYTHON_THREADPOOL_THREAD_COUNT,
default_value=default_value,
validator=tp_max_workers_validator)

# We can box the app setting as int for earlier python versions.
return int(max_workers) if max_workers else None

def _create_sync_call_tp(
self, max_worker: int) -> concurrent.futures.Executor:
self, max_worker: Optional[int]) -> concurrent.futures.Executor:
"""Create a thread pool executor with max_worker. This is a wrapper
over ThreadPoolExecutor constructor. Consider calling this method after
_stop_sync_call_tp() to ensure only 1 synchronous thread pool is
Expand Down
14 changes: 7 additions & 7 deletions azure_functions_worker/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import unittest
import uuid

import asynctest as asynctest
import grpc
import requests

Expand Down Expand Up @@ -122,7 +123,7 @@ def wrapper(*args, **kwargs):
return wrapper


class AsyncTestCase(unittest.TestCase, metaclass=AsyncTestCaseMeta):
class AsyncTestCase(asynctest.TestCase, metaclass=AsyncTestCaseMeta):
pass


Expand Down Expand Up @@ -319,13 +320,12 @@ def __init__(self, loop, scripts_dir):
self._connected_fut = loop.create_future()
self._in_queue = queue.Queue()
self._out_aqueue = asyncio.Queue(loop=self._loop)
self._threadpool = concurrent.futures.ThreadPoolExecutor(
max_workers=1)
self._threadpool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
self._server = grpc.server(self._threadpool)
self._servicer = _MockWebHostServicer(self)

protos.add_FunctionRpcServicer_to_server(self._servicer, self._server)
self._port = self._server.add_insecure_port(f'{LOCALHOST}:0')

self._worker_id = self.make_id()
self._request_id = self.make_id()

Expand Down Expand Up @@ -487,10 +487,10 @@ async def __aenter__(self) -> _MockWebHost:

await self._host.start()

self._worker = await dispatcher. \
self._worker = await dispatcher.\
Dispatcher.connect(LOCALHOST, self._host._port,
self._host.worker_id,
self._host.request_id, connect_timeout=5.0)
self._host.worker_id, self._host.request_id,
connect_timeout=5.0)

self._worker_task = loop.create_task(self._worker.dispatch_forever())

Expand Down
11 changes: 6 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,10 @@ def run(self):
'License :: OSI Approved :: MIT License',
'Intended Audience :: Developers',
'Programming Language :: Python :: 3',
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Operating System :: Microsoft :: Windows',
'Operating System :: POSIX',
'Operating System :: MacOS :: MacOS X',
Expand Down Expand Up @@ -308,7 +308,8 @@ def run(self):
'pytest-xdist',
'pytest-randomly',
'pytest-instafail',
'pytest-rerunfailures'
'pytest-rerunfailures',
'asynctest'
]
},
include_package_data=True,
Expand Down
Loading