Skip to content

gh-74956: Add tests for async generator behaviour #111212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions Lib/test/test_asyncgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -1695,6 +1695,32 @@ async def run():

self.loop.run_until_complete(run())

def test_send_exhausted(self):
"""
Test the behaviour of an exhausted async generator
"""

async def run():
# test exhausted generator
async def gen():
yield 1

g = gen()
[f async for f in g]

# an exhausted generator behaves like this:

with self.assertRaises(StopAsyncIteration):
r = await g.asend(None)
with self.assertRaises(StopAsyncIteration):
r = await g.__anext__()

# The following behaviour may be undefined:
r = await g.athrow(EOFError)
assert r is None
Comment on lines +1718 to +1720
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who can validate that behaviour is expected?
Maybe @1st1 ?


self.loop.run_until_complete(run())


class TestUnawaitedWarnings(unittest.TestCase):
def test_asend(self):
Expand Down Expand Up @@ -1728,6 +1754,110 @@ async def gen():
gc_collect()


class TestIssueGH74956(unittest.TestCase):
# simultanous use of generator by different coroutines is not
# allowed.
# https://github.com/python/cpython/issues/74956

def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)

def tearDown(self):
self.loop.close()
self.loop = None
asyncio.set_event_loop_policy(None)

def test_simultaneous_asend(self):
"""
Verify that simultaneous use of generator by different coroutines is not
permitted. We use Tasks to achieve this, where one task is suspended
in a `asyncio.sleep()` call inside the generator (during an `asend()` call),
and the other task attempts
to do an `asend()`, (or `athrow()`, or `aclose()`) on the generator.
"""

async def run_collision(op, *args):
# Two tasks are created and scheduled. The first will sleep inside the
# `asend()` and the other will then attempt a second operation and fail.

async def consumer():
while True:
# task fa will sleep here, and another task will try to iterate
# the generator
await asyncio.sleep(0)
if (yield) is None:
break

# create and start the generator
agenerator = consumer()
await agenerator.asend(None)

# start the first asend() task
fa = asyncio.create_task(agenerator.asend("A"))

# start the second task, which should fail (asend, athrow, aclose)
method = getattr(agenerator, op)
fb = asyncio.create_task(method(*args))

# first asend should succeed
await fa

# second operation should fail
with self.assertRaises(RuntimeError) as err:
await fb
assert "already running" in str(err.exception)
Comment on lines +1803 to +1809
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this order is well-defined.
I guess that asend is internally synchronous send + await, so maybe?

Copy link
Contributor Author

@kristjanvalur kristjanvalur Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current event loop, yes it is well defined. Tasks are scheduled in the order they are created. First task runs until it hits sleep(), at which point the second task runs. It is a strictly FIFO system.

asend is slightly more complex than send and await hooked together. It needs to have its own "await" implementation.
If you are interested, my implementation of the async generator protocol using my Monitor pattern is here:

https://github.com/kristjanvalur/py-asynkit/blob/a39db445f4f33fb83f69d05ed6abf3839e0855ad/src/asynkit/monitor.py#L329

These tests are adapted from tests in my library.


# cleanup partially run generator
with self.assertRaises(StopAsyncIteration):
await agenerator.asend(None) # close it

async def run():
# try different combinations of asend, athrow, aclose
# which are clashing with an asend which is already running
# (and awaiting sleep(0))
for op, args in [("asend", ["A"]), ("athrow", [EOFError]), ("aclose", [])]:
await run_collision(op, *args)

self.loop.run_until_complete(run())

def test_ag_running(self):
"""
Verify that as_running transitions correctly in
an async generator
"""
state = 0

async def agen():
nonlocal state
state = 1
await asyncio.sleep(0)
state = 2
value = yield "foo"
state = value

a = agen()
assert a.ag_running is False
# start it running
coro = a.asend(None)
assert state == 0
coro.send(None)
assert state == 1
assert a.ag_running is True

# wake it from sleep and have it yield
with self.assertRaises(StopIteration) as v:
coro.send(None)
assert v.exception.value == "foo"
assert state == 2
assert a.ag_running is False

# finish it
coro = a.asend("bar")
self.assertRaises(StopAsyncIteration, coro.send, None)
assert a.ag_running is False
assert state == "bar"


if __name__ == "__main__":
unittest.main()