From c2abb046df077e8e37ee13f0c7130762cb106155 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 23 Jul 2022 11:23:37 +0100 Subject: [PATCH 1/7] gh-95166: cancel map waited on future on timeout --- Lib/concurrent/futures/_base.py | 16 ++++++++++++++-- Lib/test/test_concurrent_futures.py | 25 +++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index d7e7e41967cc21..6742a07753c921 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -310,6 +310,18 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED): done.update(waiter.finished_futures) return DoneAndNotDoneFutures(done, fs - done) + +def _result_or_cancel(fut, timeout=None): + try: + try: + return fut.result(timeout) + finally: + fut.cancel() + finally: + # Break a reference cycle with the exception in self._exception + del fut + + class Future(object): """Represents the result of an asynchronous computation.""" @@ -604,9 +616,9 @@ def result_iterator(): while fs: # Careful not to keep a reference to the popped future if timeout is None: - yield fs.pop().result() + yield _result_or_cancel(fs.pop()) else: - yield fs.pop().result(end_time - time.monotonic()) + yield _result_or_cancel(fs.pop(), end_time - time.monotonic()) finally: for future in fs: future.cancel() diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index e294bd3a0957c7..0d45675d625371 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -932,6 +932,31 @@ def submit(pool): with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers: workers.submit(tuple) + def test_executor_map_current_future_cancel(self): + stop_event = threading.Event() + log = [] + + def log_n_wait(ident): + log.append(f"{ident=} started") + try: + stop_event.wait() + finally: + log.append(f"{ident=} stopped") + + with self.executor_type(max_workers=1) as pool: + fut = pool.submit(log_n_wait, ident="first") + try: + with contextlib.closing( + pool.map(print_n_wait, ["second", "third"], timeout=0) + ) as gen: + with self.assertRaises(TimeoutError): + next(gen) + finally: + stop_event.set() + + self.assertListEqual(log, ["ident='first started'", "ident='first' stopped"]) + fut.result() + class ProcessPoolExecutorTest(ExecutorTest): From b305705dd956348d2d94f77fcaf8f2d71e64de79 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Sat, 23 Jul 2022 10:42:08 +0000 Subject: [PATCH 2/7] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?= =?UTF-8?q?rb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst diff --git a/Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst b/Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst new file mode 100644 index 00000000000000..3291e0fd68eb59 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst @@ -0,0 +1 @@ +:meth:`concurrent.futures.Executor.map` now cancels the currently waiting on future on an error - eg TimeoutError or KeyboardInterrupt From 5638c0a8a136b325cccbad1e4197c447563f9986 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 23 Jul 2022 11:47:02 +0100 Subject: [PATCH 3/7] Update Lib/test/test_concurrent_futures.py --- Lib/test/test_concurrent_futures.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 0d45675d625371..f768ca1a8dc17e 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -947,15 +947,14 @@ def log_n_wait(ident): fut = pool.submit(log_n_wait, ident="first") try: with contextlib.closing( - pool.map(print_n_wait, ["second", "third"], timeout=0) + pool.map(log_n_wait, ["second", "third"], timeout=0) ) as gen: with self.assertRaises(TimeoutError): next(gen) finally: stop_event.set() - + fut.result() self.assertListEqual(log, ["ident='first started'", "ident='first' stopped"]) - fut.result() class ProcessPoolExecutorTest(ExecutorTest): From 030a7cc1801a7b68c2ab12c39a2a3e064978c9a5 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 23 Jul 2022 12:15:10 +0100 Subject: [PATCH 4/7] Update Lib/test/test_concurrent_futures.py --- Lib/test/test_concurrent_futures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index f768ca1a8dc17e..9fa239236f2ad0 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -954,7 +954,7 @@ def log_n_wait(ident): finally: stop_event.set() fut.result() - self.assertListEqual(log, ["ident='first started'", "ident='first' stopped"]) + self.assertListEqual(log, ["ident='first started", "ident='first' stopped"]) class ProcessPoolExecutorTest(ExecutorTest): From f54eadffe2c73db54deba4db47d603f005ca7164 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 23 Jul 2022 12:18:14 +0100 Subject: [PATCH 5/7] Apply suggestions from code review --- Lib/test/test_concurrent_futures.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 9fa239236f2ad0..cc381ca0f79413 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -944,6 +944,7 @@ def log_n_wait(ident): log.append(f"{ident=} stopped") with self.executor_type(max_workers=1) as pool: + # submit work to saturate the pool fut = pool.submit(log_n_wait, ident="first") try: with contextlib.closing( @@ -954,6 +955,8 @@ def log_n_wait(ident): finally: stop_event.set() fut.result() + # ident='second' is cancelled as a result of raising a TimeoutError + # ident='third' is cancelled because it remained in the collection of futures self.assertListEqual(log, ["ident='first started", "ident='first' stopped"]) From 2b75518ac08c41c3f6c891a6044d0dcddffeb68b Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 23 Jul 2022 15:09:25 +0100 Subject: [PATCH 6/7] Update Lib/test/test_concurrent_futures.py --- Lib/test/test_concurrent_futures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index cc381ca0f79413..fe9fdc4f44d37b 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -957,7 +957,7 @@ def log_n_wait(ident): fut.result() # ident='second' is cancelled as a result of raising a TimeoutError # ident='third' is cancelled because it remained in the collection of futures - self.assertListEqual(log, ["ident='first started", "ident='first' stopped"]) + self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"]) class ProcessPoolExecutorTest(ExecutorTest): From f5b7372f458d0c085b99b494cbd92eae29d6646e Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sun, 24 Jul 2022 15:42:35 +0100 Subject: [PATCH 7/7] Update Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst Co-authored-by: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> --- .../next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst b/Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst index 3291e0fd68eb59..34b017078436d2 100644 --- a/Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst +++ b/Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst @@ -1 +1 @@ -:meth:`concurrent.futures.Executor.map` now cancels the currently waiting on future on an error - eg TimeoutError or KeyboardInterrupt +Fix :meth:`concurrent.futures.Executor.map` to cancel the currently waiting on future on an error - e.g. TimeoutError or KeyboardInterrupt.