Description
Bug report
Bug description:
Earlier discussion: #109710 (comment)
This issue is specific to some systems with kqueue. I can reproduce on FreeBSD, and according to the internet it should be reproduced on older macOS versions (10.11) and on OpenBSD/NetBSD (with a different error code). A current version of macOS (13) is not affected.
Minimal snippet to explain the root issue:
import os, selectors
sel = selectors.DefaultSelector()
read_fd, write_fd = os.pipe()
# Close one end of the pipe
os.close(read_fd)
# Register the other end of the pipe
sel.register(write_fd, selectors.EVENT_WRITE)
Running this code on Linux or macOS will not raise any exception. The last call to sel.register()
will return a list with an event, signaling that a write on the FD is possible. A subsequent write will, of course, raise a BrokenPipeError
. On FreeBSD, sel.register()
will raise directly.
asyncio does not account for this platform difference. This reproducer is a bit contrived because it needs to trigger a race condition1:
import asyncio
from itertools import count
import os
from threading import Thread
class MyPipeProtocol(asyncio.BaseProtocol):
def __init__(self):
self.is_connection_lost = False
def connection_lost(self, exc):
self.is_connection_lost = True
async def broken_pipe_repro():
read_fd, write_fd = os.pipe()
os.set_blocking(write_fd, False)
write_file = open(write_fd, "wb")
loop = asyncio.get_running_loop()
transport, proto = await loop.connect_write_pipe(MyPipeProtocol, write_file)
# Pass one end of the pipe to another thread that will eventually close it
t = Thread(target=lambda: os.close(read_fd))
t.start()
try:
# This line will never fail on Linux/macOS, but might on other BSDs:
transport.write(b"ping" * 65536)
await asyncio.sleep(0)
assert proto.is_connection_lost
finally:
t.join()
transport.close()
async def amain():
for i in count():
if i % 1000 == 0:
print(i)
await broken_pipe_repro()
if __name__ == "__main__":
asyncio.run(amain())
On most platform this will run indefinitely. On FreeBSD, after a few run I get a traceback:
Traceback (most recent call last):
...
File "asyncio-broken-pipe.py", line 28, in broken_pipe_repro
transport.write(b"ping" * 65536)
File "cpython/Lib/asyncio/unix_events.py", line 713, in write
self._loop._add_writer(self._fileno, self._write_ready)
File "cpython/Lib/asyncio/selector_events.py", line 317, in _add_writer
self._selector.modify(fd, mask | selectors.EVENT_WRITE,
File "cpython/Lib/selectors.py", line 265, in modify
key = self.register(fileobj, events, data)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "cpython/Lib/selectors.py", line 508, in register
self._selector.control([kev], 0, 0)
BrokenPipeError: [Errno 32] Broken pipe
The issue here is that transport.write()
is not supposed to raise, as far as I understand.
The current implementation of _UnixWritePipeTransport.write()
catches all exceptions on os.write()
but not on self._loop._add_writer()
. The same happens switching read and write ends. Probably the issue was not detected before because it's a rare condition and it doesn't happen on Linux. gh-109709 showed this occurring on a subprocess test case. The fix applied in that case works by wrapping the call to write() in an exception handler, but I think in general user code can't be expected to always catch that error.
I found prior discussion of the equivalent issue in Tokio. In their case, they decided to solve this at an abstraction level that is closer to KqueueSelector.register()
, by ignoring the EPIPE and instead reporting the fd as readable/writable (which is what users expect in other selectors, and what happens in modern macOS). libevent also does something similar. I wonder if this could be a valid solution for Python, because the actual work is done in selectmodule which is lower level. By contrast, to the best of my understanding, libuv does not have special handling for this.
It can also be caught in asyncio (at some layer, either in selector event loops, or in code that calls add_reader/add_writer). Given that this is only seems to happen with pipes, it could make sense to handle this in pipe-specific code.
I can make a PR if this is accepted as a bug.
cc @vstinner
Edit note: my mistake, the error is not raised when registering the read end of a pipe, only the write end.
CPython versions tested on:
3.12, CPython main branch
Operating systems tested on:
Other
Footnotes
-
the other end of the pipe needs to be closed after
_UnixWritePipeTransport.write()
callsos.write()
but before it tries to register a writer. ↩
Metadata
Metadata
Assignees
Labels
Projects
Status