diff --git a/Lib/test/test_asyncgen.py b/Lib/test/test_asyncgen.py index a49630112af510..ea2722d736f040 100644 --- a/Lib/test/test_asyncgen.py +++ b/Lib/test/test_asyncgen.py @@ -1695,6 +1695,32 @@ async def run(): self.loop.run_until_complete(run()) + def test_send_exhausted(self): + """ + Test the behaviour of an exhausted async generator + """ + + async def run(): + # test exhausted generator + async def gen(): + yield 1 + + g = gen() + [f async for f in g] + + # an exhausted generator behaves like this: + + with self.assertRaises(StopAsyncIteration): + r = await g.asend(None) + with self.assertRaises(StopAsyncIteration): + r = await g.__anext__() + + # The following behaviour may be undefined: + r = await g.athrow(EOFError) + assert r is None + + self.loop.run_until_complete(run()) + class TestUnawaitedWarnings(unittest.TestCase): def test_asend(self): @@ -1728,6 +1754,110 @@ async def gen(): gc_collect() +class TestIssueGH74956(unittest.TestCase): + # simultanous use of generator by different coroutines is not + # allowed. + # https://github.com/python/cpython/issues/74956 + + def setUp(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(None) + + def tearDown(self): + self.loop.close() + self.loop = None + asyncio.set_event_loop_policy(None) + + def test_simultaneous_asend(self): + """ + Verify that simultaneous use of generator by different coroutines is not + permitted. We use Tasks to achieve this, where one task is suspended + in a `asyncio.sleep()` call inside the generator (during an `asend()` call), + and the other task attempts + to do an `asend()`, (or `athrow()`, or `aclose()`) on the generator. + """ + + async def run_collision(op, *args): + # Two tasks are created and scheduled. The first will sleep inside the + # `asend()` and the other will then attempt a second operation and fail. + + async def consumer(): + while True: + # task fa will sleep here, and another task will try to iterate + # the generator + await asyncio.sleep(0) + if (yield) is None: + break + + # create and start the generator + agenerator = consumer() + await agenerator.asend(None) + + # start the first asend() task + fa = asyncio.create_task(agenerator.asend("A")) + + # start the second task, which should fail (asend, athrow, aclose) + method = getattr(agenerator, op) + fb = asyncio.create_task(method(*args)) + + # first asend should succeed + await fa + + # second operation should fail + with self.assertRaises(RuntimeError) as err: + await fb + assert "already running" in str(err.exception) + + # cleanup partially run generator + with self.assertRaises(StopAsyncIteration): + await agenerator.asend(None) # close it + + async def run(): + # try different combinations of asend, athrow, aclose + # which are clashing with an asend which is already running + # (and awaiting sleep(0)) + for op, args in [("asend", ["A"]), ("athrow", [EOFError]), ("aclose", [])]: + await run_collision(op, *args) + + self.loop.run_until_complete(run()) + + def test_ag_running(self): + """ + Verify that as_running transitions correctly in + an async generator + """ + state = 0 + + async def agen(): + nonlocal state + state = 1 + await asyncio.sleep(0) + state = 2 + value = yield "foo" + state = value + + a = agen() + assert a.ag_running is False + # start it running + coro = a.asend(None) + assert state == 0 + coro.send(None) + assert state == 1 + assert a.ag_running is True + + # wake it from sleep and have it yield + with self.assertRaises(StopIteration) as v: + coro.send(None) + assert v.exception.value == "foo" + assert state == 2 + assert a.ag_running is False + + # finish it + coro = a.asend("bar") + self.assertRaises(StopAsyncIteration, coro.send, None) + assert a.ag_running is False + assert state == "bar" + if __name__ == "__main__": unittest.main()