diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index d5ba39e3d71774..fd0f8331162f19 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -304,18 +304,6 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED): done.update(waiter.finished_futures) return DoneAndNotDoneFutures(done, fs - done) - -def _result_or_cancel(fut, timeout=None): - try: - try: - return fut.result(timeout) - finally: - fut.cancel() - finally: - # Break a reference cycle with the exception in self._exception - del fut - - class Future(object): """Represents the result of an asynchronous computation.""" @@ -625,23 +613,35 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): # before the first iterator value is required. def result_iterator(): try: - # reverse to keep finishing order + # reverse so that the next (FIFO) future is on the right fs.reverse() + # careful not to keep references to futures or results while fs: + # wait for the next result + if timeout is None: + fs[-1].result() + else: + fs[-1].result(end_time - time.monotonic()) + + # buffer next task if ( buffersize and (executor := executor_weakref()) and (args := next(zipped_iterables, None)) ): fs.appendleft(executor.submit(fn, *args)) - # Careful not to keep a reference to the popped future - if timeout is None: - yield _result_or_cancel(fs.pop()) - else: - yield _result_or_cancel(fs.pop(), end_time - time.monotonic()) + + # yield the awaited result + yield fs.pop()._result + finally: + # break the reference cycle with fs[-1]._exception's traceback + if fs: + fs.pop().cancel() + for future in fs: future.cancel() + return result_iterator() def shutdown(self, wait=True, *, cancel_futures=False): diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index d88c34d1c8c8e4..2f46d561d0128f 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -3,7 +3,9 @@ import time import weakref from concurrent import futures -from operator import add +from contextlib import suppress +from functools import partial +from operator import add, truediv from test import support from test.support import Py_GIL_DISABLED @@ -143,6 +145,29 @@ def test_map_buffersize_when_buffer_is_full(self): msg="should have fetched only `buffersize` elements from `ints`.", ) + def test_map_buffersize_when_error(self): + ints = [1, 2, 3, 0, 4, 5, 6] + index_of_zero = ints.index(0) + ints_iter = iter(ints) + buffersize = 2 + reciprocal = partial(truediv, 1) + results = [] + with suppress(ZeroDivisionError): + for result in self.executor.map( + reciprocal, ints_iter, buffersize=buffersize + ): + results.append(result) + self.assertEqual( + len(results), + index_of_zero, + msg="should have mapped until reaching the zero.", + ) + self.assertEqual( + len(results) + buffersize + len(list(ints_iter)), + len(ints), + msg="ints should be either processed, or buffered, or not fetched.", + ) + def test_shutdown_race_issue12456(self): # Issue #12456: race condition at shutdown where trying to post a # sentinel in the call queue blocks (the queue is full while processes