From d2273003286bf25ec83c88ad6f0507e7592333df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A9ry=20Ogam?= Date: Tue, 15 Mar 2022 19:20:50 +0100 Subject: [PATCH 1/4] Fix a BrokenPipeError when a multiprocessing.Queue is garbage collected --- Lib/multiprocessing/queues.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index a2901814876d6c..ff3747ec4943b9 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -165,12 +165,16 @@ def _start_thread(self): debug('Queue._start_thread()') # Start thread which transfers data from buffer to pipe + # Pass a reference to the reader end of the pipe to the thread + # to prevent the garbage collector from closing it before the + # thread has sent all buffered data to the writer end of the + # pipe, thereby avoiding a BrokenPipeError self._buffer.clear() self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send_bytes, self._wlock, self._writer.close, self._ignore_epipe, - self._on_queue_feeder_error, self._sem), + self._on_queue_feeder_error, self._sem, self._reader), name='QueueFeederThread' ) self._thread.daemon = True @@ -212,7 +216,7 @@ def _finalize_close(buffer, notempty): @staticmethod def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, - onerror, queue_sem): + onerror, queue_sem, reader): debug('starting thread to feed data to pipe') nacquire = notempty.acquire nrelease = notempty.release From 9950aaa248e9189677727e9d3ff6e1928af880d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A9ry=20Ogam?= Date: Tue, 26 Apr 2022 09:41:55 +0200 Subject: [PATCH 2/4] Close self._reader from the queue thread instead of the main thread --- Lib/multiprocessing/queues.py | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index ff3747ec4943b9..9b53206b939778 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -139,13 +139,10 @@ def put_nowait(self, obj): def close(self): self._closed = True - try: - self._reader.close() - finally: - close = self._close - if close: - self._close = None - close() + close = self._close + if close: + self._close = None + close() def join_thread(self): debug('Queue.join_thread()') @@ -165,16 +162,13 @@ def _start_thread(self): debug('Queue._start_thread()') # Start thread which transfers data from buffer to pipe - # Pass a reference to the reader end of the pipe to the thread - # to prevent the garbage collector from closing it before the - # thread has sent all buffered data to the writer end of the - # pipe, thereby avoiding a BrokenPipeError self._buffer.clear() self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send_bytes, - self._wlock, self._writer.close, self._ignore_epipe, - self._on_queue_feeder_error, self._sem, self._reader), + self._wlock, self._reader.close, self._writer.close, + self._ignore_epipe, self._on_queue_feeder_error, + self._sem), name='QueueFeederThread' ) self._thread.daemon = True @@ -215,8 +209,8 @@ def _finalize_close(buffer, notempty): notempty.notify() @staticmethod - def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, - onerror, queue_sem, reader): + def _feed(buffer, notempty, send_bytes, writelock, reader_close, + writer_close, ignore_epipe, onerror, queue_sem, reader): debug('starting thread to feed data to pipe') nacquire = notempty.acquire nrelease = notempty.release @@ -242,7 +236,8 @@ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, obj = bpopleft() if obj is sentinel: debug('feeder thread got sentinel -- exiting') - close() + reader_close() + writer_close() return # serialize the data before acquiring the lock From cf7d8ec85ac5d8a1533350fa6d497db7eecb1c88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A9ry=20Ogam?= Date: Tue, 26 Apr 2022 09:43:17 +0200 Subject: [PATCH 3/4] Update queues.py --- Lib/multiprocessing/queues.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 9b53206b939778..f37f114a968871 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -210,7 +210,7 @@ def _finalize_close(buffer, notempty): @staticmethod def _feed(buffer, notempty, send_bytes, writelock, reader_close, - writer_close, ignore_epipe, onerror, queue_sem, reader): + writer_close, ignore_epipe, onerror, queue_sem): debug('starting thread to feed data to pipe') nacquire = notempty.acquire nrelease = notempty.release From d8e8957414cd79471fc97be5a6350f8238de06e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A9ry=20Ogam?= Date: Tue, 26 Apr 2022 19:37:33 +0200 Subject: [PATCH 4/4] Create 2022-04-26-19-01-13.bpo-47029.qkT42X.rst --- .../next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst diff --git a/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst b/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst new file mode 100644 index 00000000000000..cc054673338f0b --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst @@ -0,0 +1,4 @@ +Always close the read end of the pipe used by :class:`multiprocessing.Queue` +*after* the last write of buffered data to the write end of the pipe to avoid +:exc:`BrokenPipeError` at garbage collection and at +:meth:`multiprocessing.Queue.close` calls. Patch by Géry Ogam.