diff --git a/Lib/test/test_io_threading.py b/Lib/test/test_io_threading.py new file mode 100644 index 00000000000000..cef216d63ead67 --- /dev/null +++ b/Lib/test/test_io_threading.py @@ -0,0 +1,36 @@ +import unittest +from concurrent.futures import ThreadPoolExecutor + + +NUM_THREADS = 100 +NUM_LOOPS = 10 + +def looping_print_to_file(f, i): + for j in range(NUM_LOOPS): + # p = 'x' * 90 + '123456789' + p = f"\n{i}i,{j}j\n" + '0123456789' * 10 + print(p, file=f) + +class IoThreadingTestCase(unittest.TestCase): + def test_io_threading(self): + with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: + with open('test_io_threading.out', 'w') as f: + for i in range(NUM_THREADS): + executor.submit(looping_print_to_file, f, i) + + executor.shutdown(wait=True) + + with open('test_io_threading.out', 'r') as f: + lines = set(x.rstrip() for x in f.readlines() if ',' in x) + + assert len(lines) == NUM_THREADS * NUM_LOOPS, repr(len(lines)) + + for i in range(NUM_THREADS): + for j in range(NUM_LOOPS): + p = f"{i}i,{j}j" + + assert p in lines, repr(p) + + +if __name__ == "__main__": + unittest.main() diff --git a/Modules/_io/textio.c b/Modules/_io/textio.c index 9dff8eafb2560f..e35aaf14c6d3fd 100644 --- a/Modules/_io/textio.c +++ b/Modules/_io/textio.c @@ -1572,23 +1572,27 @@ _textiowrapper_writeflush(textio *self) return 0; PyObject *pending = self->pending_bytes; - PyObject *b; - - if (PyBytes_Check(pending)) { - b = Py_NewRef(pending); - } - else if (PyUnicode_Check(pending)) { - assert(PyUnicode_IS_ASCII(pending)); - assert(PyUnicode_GET_LENGTH(pending) == self->pending_bytes_count); - b = PyBytes_FromStringAndSize( - PyUnicode_DATA(pending), PyUnicode_GET_LENGTH(pending)); - if (b == NULL) { + assert(PyList_Check(pending)); + + PyObject *slice = PyList_GetSlice(pending, 0, PyList_GET_SIZE(pending)); + if (PyList_GET_SIZE(slice) > 0) { + // There is still a race condition here in that two threads might get + // the slice concurrently (getting the same first N entries), and then + // both try and delete N entires, leading to some items getting + // silently discarded (if there are at least 2N entries in the list at + // that time). + // In practice, the race condition does not seem to trigger (especially + // compared to the previous behavior). + if (PyList_SetSlice(pending, 0, PyList_GET_SIZE(slice), NULL) < 0) { return -1; } - } - else { - assert(PyList_Check(pending)); - b = PyBytes_FromStringAndSize(NULL, self->pending_bytes_count); + + // We use the value in _count now, and decrement it later so that the + // length of b is always >= size of bytes in slice (absent race + // conditions mentioned above). Note that _count might have been + // incremented from another thread, so we can't fully trust the value. + // We resize b below to address this. + PyObject *b = PyBytes_FromStringAndSize(NULL, self->pending_bytes_count); if (b == NULL) { return -1; } @@ -1596,8 +1600,8 @@ _textiowrapper_writeflush(textio *self) char *buf = PyBytes_AsString(b); Py_ssize_t pos = 0; - for (Py_ssize_t i = 0; i < PyList_GET_SIZE(pending); i++) { - PyObject *obj = PyList_GET_ITEM(pending, i); + for (Py_ssize_t i = 0; i < PyList_GET_SIZE(slice); i++) { + PyObject *obj = PyList_GET_ITEM(slice, i); char *src; Py_ssize_t len; if (PyUnicode_Check(obj)) { @@ -1612,26 +1616,37 @@ _textiowrapper_writeflush(textio *self) return -1; } } + // Race conditions above could result in memory corruption otherwise. + if (PyBytes_GET_SIZE(b) < pos + len) { + Py_DECREF(b); + return -1; + } memcpy(buf + pos, src, len); pos += len; } - assert(pos == self->pending_bytes_count); - } + self->pending_bytes_count -= pos; // This needs to be atomic, and I don't think it is, but I'm not certain. + assert(self->pending_bytes_count >= 0); - self->pending_bytes_count = 0; - self->pending_bytes = NULL; - Py_DECREF(pending); + if (PyBytes_GET_SIZE(b) != pos) { + if (_PyBytes_Resize(&b, pos) < 0) { + // No need to decref b: + // https://docs.python.org/3.10/c-api/bytes.html#c._PyBytes_Resize + return -1; + } + } + + PyObject *ret; + do { + ret = PyObject_CallMethodOneArg(self->buffer, &_Py_ID(write), b); + } while (ret == NULL && _PyIO_trap_eintr()); + Py_DECREF(b); + // NOTE: We cleared buffer but we don't know how many bytes are actually written + // when an error occurred. + if (ret == NULL) + return -1; + Py_DECREF(ret); + } - PyObject *ret; - do { - ret = PyObject_CallMethodOneArg(self->buffer, &_Py_ID(write), b); - } while (ret == NULL && _PyIO_trap_eintr()); - Py_DECREF(b); - // NOTE: We cleared buffer but we don't know how many bytes are actually written - // when an error occurred. - if (ret == NULL) - return -1; - Py_DECREF(ret); return 0; } @@ -1721,35 +1736,28 @@ _io_TextIOWrapper_write_impl(textio *self, PyObject *text) if (self->pending_bytes == NULL) { self->pending_bytes_count = 0; - self->pending_bytes = b; - } - else if (self->pending_bytes_count + bytes_len > self->chunk_size) { - // Prevent to concatenate more than chunk_size data. - if (_textiowrapper_writeflush(self) < 0) { + self->pending_bytes = PyList_New(0); + if (self->pending_bytes == NULL) { Py_DECREF(b); return NULL; } - self->pending_bytes = b; } - else if (!PyList_CheckExact(self->pending_bytes)) { - PyObject *list = PyList_New(2); - if (list == NULL) { + else if (self->pending_bytes_count + bytes_len > self->chunk_size) { + // Prevent to concatenate more than chunk_size data. + if (_textiowrapper_writeflush(self) < 0) { Py_DECREF(b); return NULL; } - PyList_SET_ITEM(list, 0, self->pending_bytes); - PyList_SET_ITEM(list, 1, b); - self->pending_bytes = list; } - else { - if (PyList_Append(self->pending_bytes, b) < 0) { - Py_DECREF(b); - return NULL; - } + + // increment before append so that _count is always >= the size of bytes in pending + self->pending_bytes_count += bytes_len; // this needs to be atomic, and I don't think it is by default + if (PyList_Append(self->pending_bytes, b) < 0) { Py_DECREF(b); + return NULL; } + Py_DECREF(b); - self->pending_bytes_count += bytes_len; if (self->pending_bytes_count >= self->chunk_size || needflush || text_needflush) { if (_textiowrapper_writeflush(self) < 0)