-
-
Notifications
You must be signed in to change notification settings - Fork 32.4k
gh-74956: Add tests for async generator behaviour #111212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
6656265
a34aa8e
9ffc96d
32c5595
94fb0e3
7d3b041
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1695,6 +1695,32 @@ async def run(): | |
|
||
self.loop.run_until_complete(run()) | ||
|
||
def test_send_exhausted(self): | ||
""" | ||
Test the behaviour of an exhausted async generator | ||
""" | ||
|
||
async def run(): | ||
# test exhausted generator | ||
async def gen(): | ||
yield 1 | ||
|
||
g = gen() | ||
[f async for f in g] | ||
|
||
# an exhausted generator behaves like this: | ||
|
||
with self.assertRaises(StopAsyncIteration): | ||
r = await g.asend(None) | ||
with self.assertRaises(StopAsyncIteration): | ||
r = await g.__anext__() | ||
|
||
# The following behaviour may be undefined: | ||
r = await g.athrow(EOFError) | ||
assert r is None | ||
|
||
self.loop.run_until_complete(run()) | ||
|
||
|
||
class TestUnawaitedWarnings(unittest.TestCase): | ||
def test_asend(self): | ||
|
@@ -1728,6 +1754,110 @@ async def gen(): | |
gc_collect() | ||
|
||
|
||
class TestIssueGH74956(unittest.TestCase): | ||
# simultanous use of generator by different coroutines is not | ||
# allowed. | ||
# https://github.com/python/cpython/issues/74956 | ||
|
||
def setUp(self): | ||
self.loop = asyncio.new_event_loop() | ||
asyncio.set_event_loop(None) | ||
|
||
def tearDown(self): | ||
self.loop.close() | ||
self.loop = None | ||
asyncio.set_event_loop_policy(None) | ||
|
||
def test_simultaneous_asend(self): | ||
""" | ||
Verify that simultaneous use of generator by different coroutines is not | ||
permitted. We use Tasks to achieve this, where one task is suspended | ||
in a `asyncio.sleep()` call inside the generator (during an `asend()` call), | ||
and the other task attempts | ||
to do an `asend()`, (or `athrow()`, or `aclose()`) on the generator. | ||
""" | ||
|
||
async def run_collision(op, *args): | ||
# Two tasks are created and scheduled. The first will sleep inside the | ||
# `asend()` and the other will then attempt a second operation and fail. | ||
|
||
async def consumer(): | ||
while True: | ||
# task fa will sleep here, and another task will try to iterate | ||
# the generator | ||
await asyncio.sleep(0) | ||
if (yield) is None: | ||
break | ||
|
||
# create and start the generator | ||
agenerator = consumer() | ||
await agenerator.asend(None) | ||
|
||
# start the first asend() task | ||
fa = asyncio.create_task(agenerator.asend("A")) | ||
|
||
# start the second task, which should fail (asend, athrow, aclose) | ||
method = getattr(agenerator, op) | ||
fb = asyncio.create_task(method(*args)) | ||
|
||
# first asend should succeed | ||
await fa | ||
|
||
# second operation should fail | ||
with self.assertRaises(RuntimeError) as err: | ||
await fb | ||
assert "already running" in str(err.exception) | ||
Comment on lines
+1803
to
+1809
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if this order is well-defined. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the current event loop, yes it is well defined. Tasks are scheduled in the order they are created. First task runs until it hits asend is slightly more complex than send and await hooked together. It needs to have its own "await" implementation. These tests are adapted from tests in my library. |
||
|
||
# cleanup partially run generator | ||
with self.assertRaises(StopAsyncIteration): | ||
await agenerator.asend(None) # close it | ||
|
||
async def run(): | ||
# try different combinations of asend, athrow, aclose | ||
# which are clashing with an asend which is already running | ||
# (and awaiting sleep(0)) | ||
for op, args in [("asend", ["A"]), ("athrow", [EOFError]), ("aclose", [])]: | ||
await run_collision(op, *args) | ||
|
||
self.loop.run_until_complete(run()) | ||
|
||
def test_ag_running(self): | ||
""" | ||
Verify that as_running transitions correctly in | ||
an async generator | ||
""" | ||
state = 0 | ||
|
||
async def agen(): | ||
nonlocal state | ||
state = 1 | ||
await asyncio.sleep(0) | ||
state = 2 | ||
value = yield "foo" | ||
state = value | ||
|
||
a = agen() | ||
assert a.ag_running is False | ||
# start it running | ||
coro = a.asend(None) | ||
assert state == 0 | ||
coro.send(None) | ||
assert state == 1 | ||
assert a.ag_running is True | ||
|
||
# wake it from sleep and have it yield | ||
with self.assertRaises(StopIteration) as v: | ||
coro.send(None) | ||
assert v.exception.value == "foo" | ||
assert state == 2 | ||
assert a.ag_running is False | ||
|
||
# finish it | ||
coro = a.asend("bar") | ||
self.assertRaises(StopAsyncIteration, coro.send, None) | ||
assert a.ag_running is False | ||
assert state == "bar" | ||
|
||
|
||
if __name__ == "__main__": | ||
unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who can validate that behaviour is expected?
Maybe @1st1 ?