Skip to content

Commit 5acbb4a

Browse files
committed
Experimental fix for #170
1 parent a332533 commit 5acbb4a

File tree

5 files changed

+170
-26
lines changed

5 files changed

+170
-26
lines changed

tests/test_sockets.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,91 @@ def test_socket_sync_remove_and_immediately_close(self):
189189
self.assertEqual(sock.fileno(), -1)
190190
self.loop.run_until_complete(asyncio.sleep(0.01, loop=self.loop))
191191

192+
def test_sock_cancel_add_reader_race(self):
193+
srv_sock_conn = None
194+
195+
async def server():
196+
nonlocal srv_sock_conn
197+
sock_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
198+
sock_server.setblocking(False)
199+
with sock_server:
200+
sock_server.bind(('127.0.0.1', 0))
201+
sock_server.listen()
202+
fut = asyncio.ensure_future(
203+
client(sock_server.getsockname()), loop=self.loop)
204+
srv_sock_conn, _ = await self.loop.sock_accept(sock_server)
205+
srv_sock_conn.setsockopt(
206+
socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
207+
with srv_sock_conn:
208+
await fut
209+
210+
async def client(addr):
211+
sock_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
212+
sock_client.setblocking(False)
213+
with sock_client:
214+
await self.loop.sock_connect(sock_client, addr)
215+
_, pending_read_futs = await asyncio.wait(
216+
[self.loop.sock_recv(sock_client, 1)],
217+
timeout=1, loop=self.loop)
218+
219+
async def send_server_data():
220+
# Wait a little bit to let reader future cancel and
221+
# schedule the removal of the reader callback. Right after
222+
# "rfut.cancel()" we will call "loop.sock_recv()", which
223+
# will add a reader. This will make a race between
224+
# remove- and add-reader.
225+
await asyncio.sleep(0.1, loop=self.loop)
226+
await self.loop.sock_sendall(srv_sock_conn, b'1')
227+
self.loop.create_task(send_server_data())
228+
229+
for rfut in pending_read_futs:
230+
rfut.cancel()
231+
232+
data = await self.loop.sock_recv(sock_client, 1)
233+
234+
self.assertEqual(data, b'1')
235+
236+
self.loop.run_until_complete(server())
237+
238+
def test_sock_send_before_cancel(self):
239+
srv_sock_conn = None
240+
241+
async def server():
242+
nonlocal srv_sock_conn
243+
sock_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
244+
sock_server.setblocking(False)
245+
with sock_server:
246+
sock_server.bind(('127.0.0.1', 0))
247+
sock_server.listen()
248+
fut = asyncio.ensure_future(
249+
client(sock_server.getsockname()), loop=self.loop)
250+
srv_sock_conn, _ = await self.loop.sock_accept(sock_server)
251+
with srv_sock_conn:
252+
await fut
253+
254+
async def client(addr):
255+
await asyncio.sleep(0.01, loop=self.loop)
256+
sock_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
257+
sock_client.setblocking(False)
258+
with sock_client:
259+
await self.loop.sock_connect(sock_client, addr)
260+
_, pending_read_futs = await asyncio.wait(
261+
[self.loop.sock_recv(sock_client, 1)],
262+
timeout=1, loop=self.loop)
263+
264+
# server can send the data in a random time, even before
265+
# the previous result future has cancelled.
266+
await self.loop.sock_sendall(srv_sock_conn, b'1')
267+
268+
for rfut in pending_read_futs:
269+
rfut.cancel()
270+
271+
data = await self.loop.sock_recv(sock_client, 1)
272+
273+
self.assertEqual(data, b'1')
274+
275+
self.loop.run_until_complete(server())
276+
192277

193278
class TestUVSockets(_TestSockets, tb.UVTestCase):
194279

uvloop/handles/poll.pxd

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ cdef class UVPoll(UVHandle):
1313
cdef int is_active(self)
1414

1515
cdef is_reading(self)
16+
cdef is_writing(self)
17+
1618
cdef start_reading(self, Handle callback)
1719
cdef start_writing(self, Handle callback)
1820
cdef stop_reading(self)

uvloop/handles/poll.pyx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ cdef class UVPoll(UVHandle):
8787
cdef is_reading(self):
8888
return self._is_alive() and self.reading_handle is not None
8989

90+
cdef is_writing(self):
91+
return self._is_alive() and self.writing_handle is not None
92+
9093
cdef start_reading(self, Handle callback):
9194
cdef:
9295
int mask = 0

uvloop/loop.pxd

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,10 +180,12 @@ cdef class Loop:
180180
cdef _new_reader_future(self, sock)
181181
cdef _new_writer_future(self, sock)
182182
cdef _add_reader(self, fd, Handle handle)
183-
cdef _remove_reader(self, fd)
183+
cdef _has_reader(self, fd)
184+
cdef _remove_reader(self, fd, handle)
184185

185186
cdef _add_writer(self, fd, Handle handle)
186-
cdef _remove_writer(self, fd)
187+
cdef _has_writer(self, fd)
188+
cdef _remove_writer(self, fd, handle)
187189

188190
cdef _sock_recv(self, fut, sock, n)
189191
cdef _sock_recv_into(self, fut, sock, buf)

0 commit comments

Comments
 (0)