Skip to content

Commit 4256847

Browse files
authored
[3.12] gh-125451: Fix deadlock in ProcessPoolExecutor shutdown (GH-125492) (#125599)
There was a deadlock when `ProcessPoolExecutor` shuts down at the same time that a queueing thread handles an error processing a task. Don't use `_shutdown_lock` to protect the `_ThreadWakeup` pipes -- use an internal lock instead. This fixes the ordering deadlock where the `ExecutorManagerThread` holds the `_shutdown_lock` and joins the queueing thread, while the queueing thread is attempting to acquire the `_shutdown_lock` while closing the `_ThreadWakeup`. (cherry picked from commit 760872e)
1 parent cbd50a4 commit 4256847

File tree

2 files changed

+24
-30
lines changed

2 files changed

+24
-30
lines changed

Lib/concurrent/futures/process.py

Lines changed: 22 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -68,27 +68,31 @@
6868
class _ThreadWakeup:
6969
def __init__(self):
7070
self._closed = False
71+
self._lock = threading.Lock()
7172
self._reader, self._writer = mp.Pipe(duplex=False)
7273

7374
def close(self):
74-
# Please note that we do not take the shutdown lock when
75+
# Please note that we do not take the self._lock when
7576
# calling clear() (to avoid deadlocking) so this method can
7677
# only be called safely from the same thread as all calls to
77-
# clear() even if you hold the shutdown lock. Otherwise we
78+
# clear() even if you hold the lock. Otherwise we
7879
# might try to read from the closed pipe.
79-
if not self._closed:
80-
self._closed = True
81-
self._writer.close()
82-
self._reader.close()
80+
with self._lock:
81+
if not self._closed:
82+
self._closed = True
83+
self._writer.close()
84+
self._reader.close()
8385

8486
def wakeup(self):
85-
if not self._closed:
86-
self._writer.send_bytes(b"")
87+
with self._lock:
88+
if not self._closed:
89+
self._writer.send_bytes(b"")
8790

8891
def clear(self):
89-
if not self._closed:
90-
while self._reader.poll():
91-
self._reader.recv_bytes()
92+
if self._closed:
93+
raise RuntimeError('operation on closed _ThreadWakeup')
94+
while self._reader.poll():
95+
self._reader.recv_bytes()
9296

9397

9498
def _python_exit():
@@ -167,10 +171,8 @@ def __init__(self, work_id, fn, args, kwargs):
167171

168172
class _SafeQueue(Queue):
169173
"""Safe Queue set exception to the future object linked to a job"""
170-
def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
171-
thread_wakeup):
174+
def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
172175
self.pending_work_items = pending_work_items
173-
self.shutdown_lock = shutdown_lock
174176
self.thread_wakeup = thread_wakeup
175177
super().__init__(max_size, ctx=ctx)
176178

@@ -179,8 +181,7 @@ def _on_queue_feeder_error(self, e, obj):
179181
tb = format_exception(type(e), e, e.__traceback__)
180182
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
181183
work_item = self.pending_work_items.pop(obj.work_id, None)
182-
with self.shutdown_lock:
183-
self.thread_wakeup.wakeup()
184+
self.thread_wakeup.wakeup()
184185
# work_item can be None if another process terminated. In this
185186
# case, the executor_manager_thread fails all work_items
186187
# with BrokenProcessPool
@@ -305,12 +306,10 @@ def __init__(self, executor):
305306
# will wake up the queue management thread so that it can terminate
306307
# if there is no pending work item.
307308
def weakref_cb(_,
308-
thread_wakeup=self.thread_wakeup,
309-
shutdown_lock=self.shutdown_lock):
309+
thread_wakeup=self.thread_wakeup):
310310
mp.util.debug('Executor collected: triggering callback for'
311311
' QueueManager wakeup')
312-
with shutdown_lock:
313-
thread_wakeup.wakeup()
312+
thread_wakeup.wakeup()
314313

315314
self.executor_reference = weakref.ref(executor, weakref_cb)
316315

@@ -438,11 +437,6 @@ def wait_result_broken_or_wakeup(self):
438437
elif wakeup_reader in ready:
439438
is_broken = False
440439

441-
# No need to hold the _shutdown_lock here because:
442-
# 1. we're the only thread to use the wakeup reader
443-
# 2. we're also the only thread to call thread_wakeup.close()
444-
# 3. we want to avoid a possible deadlock when both reader and writer
445-
# would block (gh-105829)
446440
self.thread_wakeup.clear()
447441

448442
return result_item, is_broken, cause
@@ -740,10 +734,9 @@ def __init__(self, max_workers=None, mp_context=None,
740734
# as it could result in a deadlock if a worker process dies with the
741735
# _result_queue write lock still acquired.
742736
#
743-
# _shutdown_lock must be locked to access _ThreadWakeup.close() and
744-
# .wakeup(). Care must also be taken to not call clear or close from
745-
# more than one thread since _ThreadWakeup.clear() is not protected by
746-
# the _shutdown_lock
737+
# Care must be taken to only call clear and close from the
738+
# executor_manager_thread, since _ThreadWakeup.clear() is not protected
739+
# by a lock.
747740
self._executor_manager_thread_wakeup = _ThreadWakeup()
748741

749742
# Create communication channels for the executor
@@ -754,7 +747,6 @@ def __init__(self, max_workers=None, mp_context=None,
754747
self._call_queue = _SafeQueue(
755748
max_size=queue_size, ctx=self._mp_context,
756749
pending_work_items=self._pending_work_items,
757-
shutdown_lock=self._shutdown_lock,
758750
thread_wakeup=self._executor_manager_thread_wakeup)
759751
# Killed worker processes can produce spurious "broken pipe"
760752
# tracebacks in the queue's own worker thread. But we detect killed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Fix deadlock when :class:`concurrent.futures.ProcessPoolExecutor` shuts down
2+
concurrently with an error when feeding a job to a worker process.

0 commit comments

Comments
 (0)