From dca6eb690f19ce49af1d570623ae26f402d7d285 Mon Sep 17 00:00:00 2001 From: Eli Stevens Date: Thu, 16 May 2024 18:37:49 -0700 Subject: [PATCH 1/4] Adds test for multithreaded writes to files. Multithreaded writes to files (including standard calls to print() from threads) can result in assertion failures and potentially missed output due to race conditions in _io_TextIOWrapper_write_impl and _textiowrapper_writeflush. See: https://github.com/python/cpython/issues/118138 --- Lib/test/test_io_threading.py | 36 +++++++++++++++++++++++++++++++++++ Modules/_io/textio.c | 4 ++++ 2 files changed, 40 insertions(+) create mode 100644 Lib/test/test_io_threading.py diff --git a/Lib/test/test_io_threading.py b/Lib/test/test_io_threading.py new file mode 100644 index 00000000000000..ab962bde891801 --- /dev/null +++ b/Lib/test/test_io_threading.py @@ -0,0 +1,36 @@ +import unittest +from concurrent.futures import ThreadPoolExecutor + + +NUM_THREADS = 100 +NUM_LOOPS = 10 + +def looping_print_to_file(f, i): + for j in range(NUM_LOOPS): + # p = 'x' * 90 + '123456789' + p = f"{i:2}i,{j}j\n" + '0123456789' * 10 + print(p, file=f) + +class IoThreadingTestCase(unittest.TestCase): + def test_io_threading(self): + with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: + with open('test_io_threading.out', 'w') as f: + for i in range(NUM_THREADS): + executor.submit(looping_print_to_file, f, i) + + executor.shutdown(wait=True) + + with open('test_io_threading.out', 'r') as f: + lines = set(x.rstrip() for x in f.readlines() if ',' in x) + + assert len(lines) == NUM_THREADS * NUM_LOOPS, repr(len(lines)) + + for i in range(NUM_THREADS): + for j in range(NUM_LOOPS): + p = f"{i:2}i,{j}j" + + assert p in lines, repr(p) + + +if __name__ == "__main__": + unittest.main() diff --git a/Modules/_io/textio.c b/Modules/_io/textio.c index 9dff8eafb2560f..7dbdcdb5b19524 100644 --- a/Modules/_io/textio.c +++ b/Modules/_io/textio.c @@ -1729,6 +1729,10 @@ _io_TextIOWrapper_write_impl(textio *self, PyObject *text) Py_DECREF(b); return NULL; } + // The assumption that self won't have been modified from another + // thread between the flush above and the assignment below is + // incorrect. + assert(self->pending_bytes == NULL); self->pending_bytes = b; } else if (!PyList_CheckExact(self->pending_bytes)) { From 997dbc260a7d41e5dfeb94c2db4502a3be91e3a2 Mon Sep 17 00:00:00 2001 From: Eli Stevens Date: Fri, 17 May 2024 11:41:41 -0700 Subject: [PATCH 2/4] Calls executor.shutdown before closing file. --- Lib/test/test_io_threading.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_io_threading.py b/Lib/test/test_io_threading.py index ab962bde891801..c1300f1d995bbd 100644 --- a/Lib/test/test_io_threading.py +++ b/Lib/test/test_io_threading.py @@ -18,7 +18,7 @@ def test_io_threading(self): for i in range(NUM_THREADS): executor.submit(looping_print_to_file, f, i) - executor.shutdown(wait=True) + executor.shutdown(wait=True) with open('test_io_threading.out', 'r') as f: lines = set(x.rstrip() for x in f.readlines() if ',' in x) From 3f05fd9db55ea7d7756e7c6e98593dc200e0cdcc Mon Sep 17 00:00:00 2001 From: Eli Stevens Date: Fri, 17 May 2024 11:42:15 -0700 Subject: [PATCH 3/4] Fixes race conditions from multithreaded writes. --- Modules/_io/textio.c | 112 ++++++++++++++++++++++--------------------- 1 file changed, 58 insertions(+), 54 deletions(-) diff --git a/Modules/_io/textio.c b/Modules/_io/textio.c index 7dbdcdb5b19524..e35aaf14c6d3fd 100644 --- a/Modules/_io/textio.c +++ b/Modules/_io/textio.c @@ -1572,23 +1572,27 @@ _textiowrapper_writeflush(textio *self) return 0; PyObject *pending = self->pending_bytes; - PyObject *b; - - if (PyBytes_Check(pending)) { - b = Py_NewRef(pending); - } - else if (PyUnicode_Check(pending)) { - assert(PyUnicode_IS_ASCII(pending)); - assert(PyUnicode_GET_LENGTH(pending) == self->pending_bytes_count); - b = PyBytes_FromStringAndSize( - PyUnicode_DATA(pending), PyUnicode_GET_LENGTH(pending)); - if (b == NULL) { + assert(PyList_Check(pending)); + + PyObject *slice = PyList_GetSlice(pending, 0, PyList_GET_SIZE(pending)); + if (PyList_GET_SIZE(slice) > 0) { + // There is still a race condition here in that two threads might get + // the slice concurrently (getting the same first N entries), and then + // both try and delete N entires, leading to some items getting + // silently discarded (if there are at least 2N entries in the list at + // that time). + // In practice, the race condition does not seem to trigger (especially + // compared to the previous behavior). + if (PyList_SetSlice(pending, 0, PyList_GET_SIZE(slice), NULL) < 0) { return -1; } - } - else { - assert(PyList_Check(pending)); - b = PyBytes_FromStringAndSize(NULL, self->pending_bytes_count); + + // We use the value in _count now, and decrement it later so that the + // length of b is always >= size of bytes in slice (absent race + // conditions mentioned above). Note that _count might have been + // incremented from another thread, so we can't fully trust the value. + // We resize b below to address this. + PyObject *b = PyBytes_FromStringAndSize(NULL, self->pending_bytes_count); if (b == NULL) { return -1; } @@ -1596,8 +1600,8 @@ _textiowrapper_writeflush(textio *self) char *buf = PyBytes_AsString(b); Py_ssize_t pos = 0; - for (Py_ssize_t i = 0; i < PyList_GET_SIZE(pending); i++) { - PyObject *obj = PyList_GET_ITEM(pending, i); + for (Py_ssize_t i = 0; i < PyList_GET_SIZE(slice); i++) { + PyObject *obj = PyList_GET_ITEM(slice, i); char *src; Py_ssize_t len; if (PyUnicode_Check(obj)) { @@ -1612,26 +1616,37 @@ _textiowrapper_writeflush(textio *self) return -1; } } + // Race conditions above could result in memory corruption otherwise. + if (PyBytes_GET_SIZE(b) < pos + len) { + Py_DECREF(b); + return -1; + } memcpy(buf + pos, src, len); pos += len; } - assert(pos == self->pending_bytes_count); - } + self->pending_bytes_count -= pos; // This needs to be atomic, and I don't think it is, but I'm not certain. + assert(self->pending_bytes_count >= 0); - self->pending_bytes_count = 0; - self->pending_bytes = NULL; - Py_DECREF(pending); + if (PyBytes_GET_SIZE(b) != pos) { + if (_PyBytes_Resize(&b, pos) < 0) { + // No need to decref b: + // https://docs.python.org/3.10/c-api/bytes.html#c._PyBytes_Resize + return -1; + } + } + + PyObject *ret; + do { + ret = PyObject_CallMethodOneArg(self->buffer, &_Py_ID(write), b); + } while (ret == NULL && _PyIO_trap_eintr()); + Py_DECREF(b); + // NOTE: We cleared buffer but we don't know how many bytes are actually written + // when an error occurred. + if (ret == NULL) + return -1; + Py_DECREF(ret); + } - PyObject *ret; - do { - ret = PyObject_CallMethodOneArg(self->buffer, &_Py_ID(write), b); - } while (ret == NULL && _PyIO_trap_eintr()); - Py_DECREF(b); - // NOTE: We cleared buffer but we don't know how many bytes are actually written - // when an error occurred. - if (ret == NULL) - return -1; - Py_DECREF(ret); return 0; } @@ -1721,39 +1736,28 @@ _io_TextIOWrapper_write_impl(textio *self, PyObject *text) if (self->pending_bytes == NULL) { self->pending_bytes_count = 0; - self->pending_bytes = b; - } - else if (self->pending_bytes_count + bytes_len > self->chunk_size) { - // Prevent to concatenate more than chunk_size data. - if (_textiowrapper_writeflush(self) < 0) { + self->pending_bytes = PyList_New(0); + if (self->pending_bytes == NULL) { Py_DECREF(b); return NULL; } - // The assumption that self won't have been modified from another - // thread between the flush above and the assignment below is - // incorrect. - assert(self->pending_bytes == NULL); - self->pending_bytes = b; } - else if (!PyList_CheckExact(self->pending_bytes)) { - PyObject *list = PyList_New(2); - if (list == NULL) { + else if (self->pending_bytes_count + bytes_len > self->chunk_size) { + // Prevent to concatenate more than chunk_size data. + if (_textiowrapper_writeflush(self) < 0) { Py_DECREF(b); return NULL; } - PyList_SET_ITEM(list, 0, self->pending_bytes); - PyList_SET_ITEM(list, 1, b); - self->pending_bytes = list; } - else { - if (PyList_Append(self->pending_bytes, b) < 0) { - Py_DECREF(b); - return NULL; - } + + // increment before append so that _count is always >= the size of bytes in pending + self->pending_bytes_count += bytes_len; // this needs to be atomic, and I don't think it is by default + if (PyList_Append(self->pending_bytes, b) < 0) { Py_DECREF(b); + return NULL; } + Py_DECREF(b); - self->pending_bytes_count += bytes_len; if (self->pending_bytes_count >= self->chunk_size || needflush || text_needflush) { if (_textiowrapper_writeflush(self) < 0) From d156d3c73dca08032ffa0e6b8edc1080bebdcb38 Mon Sep 17 00:00:00 2001 From: Eli Stevens Date: Wed, 29 May 2024 22:34:08 -0700 Subject: [PATCH 4/4] Update test_io_threading.py to not suffer from interleaved prints --- Lib/test/test_io_threading.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_io_threading.py b/Lib/test/test_io_threading.py index c1300f1d995bbd..cef216d63ead67 100644 --- a/Lib/test/test_io_threading.py +++ b/Lib/test/test_io_threading.py @@ -8,7 +8,7 @@ def looping_print_to_file(f, i): for j in range(NUM_LOOPS): # p = 'x' * 90 + '123456789' - p = f"{i:2}i,{j}j\n" + '0123456789' * 10 + p = f"\n{i}i,{j}j\n" + '0123456789' * 10 print(p, file=f) class IoThreadingTestCase(unittest.TestCase): @@ -27,7 +27,7 @@ def test_io_threading(self): for i in range(NUM_THREADS): for j in range(NUM_LOOPS): - p = f"{i:2}i,{j}j" + p = f"{i}i,{j}j" assert p in lines, repr(p)