From 8f440a246f787db4d8a96f2789a55ee866fcbcc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Elfstr=C3=B6m?= Date: Sat, 26 Aug 2023 12:26:47 +0200 Subject: [PATCH 1/7] gh-105829: Add test to demonstrate deadlock If the management thread does not clear the wakeup pipe fast enough the wakeup code will block holding the shutdown lock causing deadlock. https://github.com/python/cpython/issues/105829 --- Lib/test/test_concurrent_futures.py | 47 +++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 39dbe234e765e8..516365b4f08ddc 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -19,6 +19,7 @@ import threading import time import unittest +import unittest.mock import weakref from pickle import PicklingError @@ -1389,6 +1390,52 @@ def test_crash_big_data(self): with self.assertRaises(BrokenProcessPool): list(executor.map(_crash_with_data, [data] * 10)) + def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self): + # Issue #105829: The _ExecutorManagerThread wakeup pipe could + # fill up and block. See: https://github.com/python/cpython/issues/105829 + + # Lots of cargo culting while writing this test, apologies if + # something is really stupid... + + self.executor.shutdown(wait=True) + + if not hasattr(signal, 'alarm'): + raise unittest.SkipTest( + "Tested platform does not support the alarm signal") + + def timeout(_signum, _frame): + raise RuntimeError("timed out while submitting jobs?") + + thread_run = futures.process._ExecutorManagerThread.run + def mock_run(self): + # Delay thread startup so the wakeup pipe can fill up and block + time.sleep(5) + thread_run(self) + + # Should be support.PIPE_MAX_SIZE but it is way too + # pessimistic here, would take too long. Assume 64k pipe + # buffer and add some margin... + job_num = 65536 * 2 + job_data = range(job_num) + with unittest.mock.patch.object(futures.process._ExecutorManagerThread, 'run', mock_run): + with self.executor_type(max_workers=2, + mp_context=self.get_context()) as executor: + self.executor = executor # Allow clean up in fail_on_deadlock + + # Need to use sigalarm for timeout detection because + # Executor.submit is not guarded by any timeout (both + # self._work_ids.put(self._queue_count) and + # self._executor_manager_thread_wakeup.wakeup() might + # timeout, maybe more?). In this specific case it was + # the wakeup call that deadlocked on a blocking pipe. + old_handler = signal.signal(signal.SIGALRM, timeout) + try: + signal.alarm(int(support.LONG_TIMEOUT)) + self.assertEqual(job_num, len(list(executor.map(int, job_data)))) + finally: + signal.alarm(0) + signal.signal(signal.SIGALRM, old_handler) + create_executor_tests(ExecutorDeadlockTest, executor_mixins=(ProcessPoolForkMixin, From bbb2a630b6f72f6770b2434435d91f92e930c65f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Elfstr=C3=B6m?= Date: Sat, 26 Aug 2023 13:03:37 +0200 Subject: [PATCH 2/7] gh-105829: Fix concurrent.futures.ProcessPoolExecutor deadlock This fixes issue #105829, https://github.com/python/cpython/issues/105829 The _ExecutorManagerThread wake-up code could deadlock if the wake-up pipe filled up and blocked. The relevant code looked like this: class _ThreadWakeup: def wakeup(self): if not self._closed: self._writer.send_bytes(b"") def clear(self): if not self._closed: while self._reader.poll(): self._reader.recv_bytes() class ProcessPoolExecutor(_base.Executor): def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock: ... # Wake up queue management thread self._executor_manager_thread_wakeup.wakeup() class _ExecutorManagerThread(threading.Thread): def wait_result_broken_or_wakeup(self): ... with self.shutdown_lock: self.thread_wakeup.clear() The shutdown_lock must be taken for both reads and writes of the wake-up pipe. If a read or a write of the pipe blocks, the code will deadlock. It looks like reads can't block (a poll() is done before doing any reads) but writes have not protection against blocking. If the _ExecutorManagerThread cannot keep up and clear the wake-up pipe it will fill up and block. This seems to have been rather easy to do in the real world as long as the number of tasks is more than 100000 or so. With this change we make the writes to the wake-up pipe non blocking. If the pipe blocks we will simply skip the write. This should be OK since the reason for the problem is that both reader and writer must hold the shutdown_lock when accessing the pipe. That should imply that we don't need to send anything if the pipe is full, the reader can't be reading it concurrently, it will eventually wake up due to the data already in the pipe. --- Lib/concurrent/futures/process.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 301207f59de37a..03fcd05c6fe251 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -69,6 +69,7 @@ class _ThreadWakeup: def __init__(self): self._closed = False self._reader, self._writer = mp.Pipe(duplex=False) + os.set_blocking(self._writer.fileno(), False) def close(self): if not self._closed: @@ -78,7 +79,17 @@ def close(self): def wakeup(self): if not self._closed: - self._writer.send_bytes(b"") + try: + self._writer.send_bytes(b"") + except BlockingIOError: + # Assuming BlockingIOError is only raised when there + # is data in the pipe then we can skip the wake-up + # here because we are holding the shutdown_lock and + # the clear() call is also protected by this + # lock. This means the reader will wake up again (or + # is already awake) due to the existing data in the + # pipe. + pass def clear(self): if not self._closed: From 47a6b75b9760d9c287586b383ced61965418e153 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Sat, 26 Aug 2023 12:35:40 +0000 Subject: [PATCH 3/7] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?= =?UTF-8?q?rb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2023-08-26-12-35-39.gh-issue-105829.kyYhWI.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2023-08-26-12-35-39.gh-issue-105829.kyYhWI.rst diff --git a/Misc/NEWS.d/next/Library/2023-08-26-12-35-39.gh-issue-105829.kyYhWI.rst b/Misc/NEWS.d/next/Library/2023-08-26-12-35-39.gh-issue-105829.kyYhWI.rst new file mode 100644 index 00000000000000..eaa2a5a4330e28 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-08-26-12-35-39.gh-issue-105829.kyYhWI.rst @@ -0,0 +1 @@ +Fix concurrent.futures.ProcessPoolExecutor deadlock From 995a14c00bbf615c19fc0037c33dab00067df000 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Elfstr=C3=B6m?= Date: Sat, 2 Sep 2023 22:01:16 +0200 Subject: [PATCH 4/7] fixup! gh-105829: Add test to demonstrate deadlock Try to reduce the size of the pipe to make the test faster. The solution only works on Unix-like platforms, we fall back on a hardcoded size for other platforms. --- Lib/test/test_concurrent_futures.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 516365b4f08ddc..98615d8cd0ffba 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -1412,16 +1412,29 @@ def mock_run(self): time.sleep(5) thread_run(self) - # Should be support.PIPE_MAX_SIZE but it is way too - # pessimistic here, would take too long. Assume 64k pipe - # buffer and add some margin... - job_num = 65536 * 2 - job_data = range(job_num) + def adjust_and_check_jobs_needed_to_block_pipe(connection): + try: + # Try to reduce pipe size to speed up test. Only works on Unix systems + import fcntl + from fcntl import F_SETPIPE_SZ + pipe_size = fcntl.fcntl(connection.fileno(), F_SETPIPE_SZ, 1024) + except ImportError: + # Assume 64k pipe if we fail, makes test take longer + pipe_size = 65536 + + # We send 4 bytes per job (one zero sized bytes object) + return pipe_size // 4 + 100 # Add some margin + with unittest.mock.patch.object(futures.process._ExecutorManagerThread, 'run', mock_run): with self.executor_type(max_workers=2, mp_context=self.get_context()) as executor: self.executor = executor # Allow clean up in fail_on_deadlock + # Try to speed up the test by reducing the size of the wakeup pipe + job_num = adjust_and_check_jobs_needed_to_block_pipe( + executor._executor_manager_thread_wakeup._writer) + job_data = range(job_num) + # Need to use sigalarm for timeout detection because # Executor.submit is not guarded by any timeout (both # self._work_ids.put(self._queue_count) and From 5d4bf8c1afc9cceb70dead63f4b1313192d813a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Elfstr=C3=B6m?= Date: Sun, 10 Sep 2023 19:43:39 +0200 Subject: [PATCH 5/7] fixup! gh-105829: Fix concurrent.futures.ProcessPoolExecutor deadlock This reverts the previous fix and instead opts to remove the locking completely when clearing the wakeup pipe. We can do this because clear() and close() are both called from the same thread and nowhere else. In this version of this fix, the call to ProcessPoolExecutor.submit can still block on the wakeup pipe if it happens to fill up. This should not be an issue as there are already other cases where the submit call can block and if the wakeup pipe is full it implies there is already a lot of work items queued up. Co-authored-by: Antoine Pitrou --- Lib/concurrent/futures/process.py | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 03fcd05c6fe251..7df9e16972c57d 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -69,9 +69,13 @@ class _ThreadWakeup: def __init__(self): self._closed = False self._reader, self._writer = mp.Pipe(duplex=False) - os.set_blocking(self._writer.fileno(), False) def close(self): + # Please note that we do not take the shutdown lock when + # calling clear() (to avoid deadlocking) so this method can + # only be called safely from the same thread as all calls to + # clear() even if you hold the shutdown lock. Otherwise we + # might try to read from the closed pipe. if not self._closed: self._closed = True self._writer.close() @@ -79,17 +83,7 @@ def close(self): def wakeup(self): if not self._closed: - try: - self._writer.send_bytes(b"") - except BlockingIOError: - # Assuming BlockingIOError is only raised when there - # is data in the pipe then we can skip the wake-up - # here because we are holding the shutdown_lock and - # the clear() call is also protected by this - # lock. This means the reader will wake up again (or - # is already awake) due to the existing data in the - # pipe. - pass + self._writer.send_bytes(b"") def clear(self): if not self._closed: @@ -437,8 +431,12 @@ def wait_result_broken_or_wakeup(self): elif wakeup_reader in ready: is_broken = False - with self.shutdown_lock: - self.thread_wakeup.clear() + # No need to hold the _shutdown_lock here because: + # 1. we're the only thread to use the wakeup reader + # 2. we're also the only thread to call thread_wakeup.close() + # 3. we want to avoid a possible deadlock when both reader and writer + # would block (gh-105829) + self.thread_wakeup.clear() return result_item, is_broken, cause @@ -717,7 +715,10 @@ def __init__(self, max_workers=None, mp_context=None, # as it could result in a deadlock if a worker process dies with the # _result_queue write lock still acquired. # - # _shutdown_lock must be locked to access _ThreadWakeup. + # _shutdown_lock must be locked to access _ThreadWakeup.close() and + # .wakeup(). Care must also be taken to not call clear or close from + # more than one thread since _ThreadWakeup.clear() is not protected by + # the _shutdown_lock self._executor_manager_thread_wakeup = _ThreadWakeup() # Create communication channels for the executor From 6104264f5e41ee63e67a068648f827033d6050c0 Mon Sep 17 00:00:00 2001 From: Chris Withers Date: Fri, 15 Sep 2023 07:53:51 +0100 Subject: [PATCH 6/7] use faulthandler for better failure feedback Co-authored-by: Thomas Moreau --- Lib/test/test_concurrent_futures/test_deadlock.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Lib/test/test_concurrent_futures/test_deadlock.py b/Lib/test/test_concurrent_futures/test_deadlock.py index 442dfa79cc5ba5..ce410532ca47c5 100644 --- a/Lib/test/test_concurrent_futures/test_deadlock.py +++ b/Lib/test/test_concurrent_futures/test_deadlock.py @@ -257,6 +257,9 @@ def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self): "Tested platform does not support the alarm signal") def timeout(_signum, _frame): + import faulthandler + faulthandler.dump_traceback() + raise RuntimeError("timed out while submitting jobs?") thread_run = futures.process._ExecutorManagerThread.run From a713ff8ca1af49a180712c318270e447bd388b58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Elfstr=C3=B6m?= Date: Tue, 19 Sep 2023 21:38:18 +0200 Subject: [PATCH 7/7] fixup! gh-105829: Add test to demonstrate deadlock Change test strategy. We now force the main thread to block during the wake-up call by mocking the wake-up object and artificially limiting to a single wake-up before blocking. This allows us to reduce some timeouts, number of tasks and lower the total runtime of the test. It should also guarantee a blocking main thread on all platforms, regardless of any pipe buffer sizes. The drawback is that the test is now a bit opinionated on how we fix this issue (i.e. just making the wake-up pipe non blocking would not satisfy this test even though it is a valid fix for the issue). --- .../test_concurrent_futures/test_deadlock.py | 42 +++++++++++-------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_deadlock.py b/Lib/test/test_concurrent_futures/test_deadlock.py index ce410532ca47c5..e78842b63f7400 100644 --- a/Lib/test/test_concurrent_futures/test_deadlock.py +++ b/Lib/test/test_concurrent_futures/test_deadlock.py @@ -1,4 +1,5 @@ import contextlib +import queue import signal import sys import time @@ -6,7 +7,7 @@ import unittest.mock from pickle import PicklingError from concurrent import futures -from concurrent.futures.process import BrokenProcessPool +from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup from test import support @@ -265,30 +266,35 @@ def timeout(_signum, _frame): thread_run = futures.process._ExecutorManagerThread.run def mock_run(self): # Delay thread startup so the wakeup pipe can fill up and block - time.sleep(5) + time.sleep(3) thread_run(self) - def adjust_and_check_jobs_needed_to_block_pipe(connection): - try: - # Try to reduce pipe size to speed up test. Only works on Unix systems - import fcntl - from fcntl import F_SETPIPE_SZ - pipe_size = fcntl.fcntl(connection.fileno(), F_SETPIPE_SZ, 1024) - except ImportError: - # Assume 64k pipe if we fail, makes test take longer - pipe_size = 65536 + class MockWakeup(_ThreadWakeup): + """Mock wakeup object to force the wakeup to block""" + def __init__(self): + super().__init__() + self._dummy_queue = queue.Queue(maxsize=1) - # We send 4 bytes per job (one zero sized bytes object) - return pipe_size // 4 + 100 # Add some margin + def wakeup(self): + self._dummy_queue.put(None, block=True) + super().wakeup() - with unittest.mock.patch.object(futures.process._ExecutorManagerThread, 'run', mock_run): + def clear(self): + try: + while True: + self._dummy_queue.get_nowait() + except queue.Empty: + super().clear() + + with (unittest.mock.patch.object(futures.process._ExecutorManagerThread, + 'run', mock_run), + unittest.mock.patch('concurrent.futures.process._ThreadWakeup', + MockWakeup)): with self.executor_type(max_workers=2, mp_context=self.get_context()) as executor: self.executor = executor # Allow clean up in fail_on_deadlock - # Try to speed up the test by reducing the size of the wakeup pipe - job_num = adjust_and_check_jobs_needed_to_block_pipe( - executor._executor_manager_thread_wakeup._writer) + job_num = 100 job_data = range(job_num) # Need to use sigalarm for timeout detection because @@ -299,7 +305,7 @@ def adjust_and_check_jobs_needed_to_block_pipe(connection): # the wakeup call that deadlocked on a blocking pipe. old_handler = signal.signal(signal.SIGALRM, timeout) try: - signal.alarm(int(support.LONG_TIMEOUT)) + signal.alarm(int(self.TIMEOUT)) self.assertEqual(job_num, len(list(executor.map(int, job_data)))) finally: signal.alarm(0)