diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index b7a2cac7f57015..27bba27ad5cee8 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -108,20 +108,25 @@ class BrokenThreadPool(_base.BrokenExecutor): Raised when a worker thread in a ThreadPoolExecutor failed initializing. """ +class RejectedExecutionException(Exception): + """ + Raised when the work queue is full. + """ + class ThreadPoolExecutor(_base.Executor): # Used to assign unique thread names when thread_name_prefix is not supplied. _counter = itertools.count().__next__ - def __init__(self, max_workers=None, thread_name_prefix='', + def __init__(self, max_workers=None, thread_name_prefix='',queue_size=0, initializer=None, initargs=()): """Initializes a new ThreadPoolExecutor instance. - Args: max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. + queue_size: The size limit for work queue. initializer: A callable used to initialize worker threads. initargs: A tuple of arguments to pass to the initializer. """ @@ -141,7 +146,7 @@ def __init__(self, max_workers=None, thread_name_prefix='', raise TypeError("initializer must be a callable") self._max_workers = max_workers - self._work_queue = queue.SimpleQueue() + self._work_queue = queue.Queue(queue_size) self._idle_semaphore = threading.Semaphore(0) self._threads = set() self._broken = False @@ -166,7 +171,10 @@ def submit(self, fn, /, *args, **kwargs): f = _base.Future() w = _WorkItem(f, fn, args, kwargs) - self._work_queue.put(w) + try: + self._work_queue.put(w) + except queue.Full: + raise RejectedExecutionException() self._adjust_thread_count() return f submit.__doc__ = _base.Executor.submit.__doc__