From 80590abc0f37ad9dae7a282c0cd06568f6d7c15a Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sun, 13 Mar 2022 16:29:58 +0200 Subject: [PATCH 1/3] Implement wait_for on top on timeout() and TaskGroup() --- Lib/asyncio/tasks.py | 62 +++++++------------------------------------- 1 file changed, 10 insertions(+), 52 deletions(-) diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 059143fb9086fb..945de2ee1932dd 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -24,6 +24,8 @@ from . import events from . import exceptions from . import futures +from . import taskgroups +from . import timeouts from .coroutines import _is_coroutine # Helper to generate new task names @@ -418,6 +420,11 @@ def _release_waiter(waiter, *args): waiter.set_result(None) +async def _wait_for(fut, timeout): + async with timeouts.timeout(timeout): + return await fut + + async def wait_for(fut, timeout): """Wait for the single Future or coroutine to complete, with timeout. @@ -431,62 +438,13 @@ async def wait_for(fut, timeout): This function is a coroutine. """ - loop = events.get_running_loop() if timeout is None: return await fut - if timeout <= 0: - fut = ensure_future(fut, loop=loop) - - if fut.done(): - return fut.result() - - await _cancel_and_wait(fut, loop=loop) - try: - return fut.result() - except exceptions.CancelledError as exc: - raise exceptions.TimeoutError() from exc - - waiter = loop.create_future() - timeout_handle = loop.call_later(timeout, _release_waiter, waiter) - cb = functools.partial(_release_waiter, waiter) - - fut = ensure_future(fut, loop=loop) - fut.add_done_callback(cb) - - try: - # wait until the future completes or the timeout - try: - await waiter - except exceptions.CancelledError: - if fut.done(): - return fut.result() - else: - fut.remove_done_callback(cb) - # We must ensure that the task is not running - # after wait_for() returns. - # See https://bugs.python.org/issue32751 - await _cancel_and_wait(fut, loop=loop) - raise - - if fut.done(): - return fut.result() - else: - fut.remove_done_callback(cb) - # We must ensure that the task is not running - # after wait_for() returns. - # See https://bugs.python.org/issue32751 - await _cancel_and_wait(fut, loop=loop) - # In case task cancellation failed with some - # exception, we should re-raise it - # See https://bugs.python.org/issue40607 - try: - return fut.result() - except exceptions.CancelledError as exc: - raise exceptions.TimeoutError() from exc - finally: - timeout_handle.cancel() + async with taskgroups.TaskGroup() as tg: + subtask = tg.create_task(_wait_for(fut, timeout)) + return await subtask async def _wait(fs, timeout, return_when, loop): From 5e258912603210f94ade518783a87561a5d83c6b Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sun, 13 Mar 2022 16:59:25 +0200 Subject: [PATCH 2/3] TaskGroup is not required here --- Lib/asyncio/tasks.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 945de2ee1932dd..616273b0b9b08b 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -24,7 +24,6 @@ from . import events from . import exceptions from . import futures -from . import taskgroups from . import timeouts from .coroutines import _is_coroutine @@ -420,11 +419,6 @@ def _release_waiter(waiter, *args): waiter.set_result(None) -async def _wait_for(fut, timeout): - async with timeouts.timeout(timeout): - return await fut - - async def wait_for(fut, timeout): """Wait for the single Future or coroutine to complete, with timeout. @@ -439,12 +433,12 @@ async def wait_for(fut, timeout): This function is a coroutine. """ - if timeout is None: - return await fut + if not futures.isfuture(fut): + # wrap a coroutine + fut = create_task(fut) - async with taskgroups.TaskGroup() as tg: - subtask = tg.create_task(_wait_for(fut, timeout)) - return await subtask + async with timeouts.timeout(timeout): + return await fut async def _wait(fs, timeout, return_when, loop): From a7007258bc028835a92b3b638f3db61ad8c5e01a Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Wed, 16 Mar 2022 23:37:04 +0200 Subject: [PATCH 3/3] Cancel the inner task --- Lib/asyncio/tasks.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 140539a924a033..ef5c0f9669abbb 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -444,12 +444,11 @@ async def wait_for(fut, timeout): This function is a coroutine. """ - if not futures.isfuture(fut): - # wrap a coroutine - fut = create_task(fut) + async def inner(): + async with timeouts.timeout(timeout): + return await fut - async with timeouts.timeout(timeout): - return await fut + return await create_task(inner()) async def _wait(fs, timeout, return_when, loop):