diff --git a/bgzip_runner.py b/bgzip_runner.py new file mode 100755 index 00000000..17659a5b --- /dev/null +++ b/bgzip_runner.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 +import io +import sys + +from isal.igzip_threaded import _ThreadedGzipReader, _ThreadedBGzipReader + +def main(): + file = sys.argv[1] + with io.BufferedReader( + _ThreadedBGzipReader(file, threads=8) + ) as f: + while True: + block = f.read(128 * 1024) + if block == b"": + return + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/setup.py b/setup.py index 55c89496..e8bd66ca 100644 --- a/setup.py +++ b/setup.py @@ -42,6 +42,7 @@ Extension("isal.isal_zlib", ["src/isal/isal_zlibmodule.c"]), Extension("isal.igzip_lib", ["src/isal/igzip_libmodule.c"]), Extension("isal._isal", ["src/isal/_isalmodule.c"]), + Extension("isal._bgzip", ["src/isal/_bgzipmodule.c"]), ] diff --git a/src/isal/_bgzip.pyi b/src/isal/_bgzip.pyi new file mode 100644 index 00000000..b41a42ad --- /dev/null +++ b/src/isal/_bgzip.pyi @@ -0,0 +1,8 @@ +# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, +# 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022 +# Python Software Foundation; All Rights Reserved + +# This file is part of python-isal which is distributed under the +# PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2. + +def find_last_bgzip_end(__data: bytes) -> int: ... diff --git a/src/isal/_bgzipmodule.c b/src/isal/_bgzipmodule.c new file mode 100644 index 00000000..a8c6e311 --- /dev/null +++ b/src/isal/_bgzipmodule.c @@ -0,0 +1,106 @@ +/* +Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, +2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022 +Python Software Foundation; All Rights Reserved + +This file is part of python-isal which is distributed under the +PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2. + +This file is not originally from the CPython distribution. But it does +contain mostly example code from the Python docs. Also dual licensing just +for this one file seemed silly. +*/ + +#define PY_SSIZE_T_CLEAN +#include +#include + +#define FEXTRA 4 + +static inline uint16_t load_u16_le(const void *address) { + #if PY_BIG_ENDIAN + uint8_t *mem = address; + return mem[0] | (mem[1] << 8); + #else + return *(uint16_t *)address; + #endif +} + +static PyObject *find_last_bgzip_end(PyObject *module, PyObject *buffer_obj) { + Py_buffer buf; + int ret = PyObject_GetBuffer(buffer_obj, &buf, PyBUF_SIMPLE); + if (ret == -1) { + return NULL; + } + const uint8_t *data = buf.buf; + Py_ssize_t data_length = buf.len; + const uint8_t *data_end = data + data_length; + const uint8_t *cursor = data; + + while (true) { + if (cursor + 18 > data_end) { + break; + } + uint8_t magic1 = cursor[0]; + uint8_t magic2 = cursor[1]; + uint8_t method = cursor[2]; + uint8_t flags = cursor[3]; + uint16_t xlen = load_u16_le(cursor + 10); + uint8_t si1 = cursor[12]; + uint8_t si2 = cursor[13]; + uint16_t subfield_length = load_u16_le(cursor + 14); + if ( + magic1 != 31 || + magic2 != 139 || + method != 8 || + flags != FEXTRA || + xlen != 6 || + si1 != 66 || + si2 != 67 || + subfield_length != 2 + ) { + PyErr_Format( + PyExc_ValueError, + "Incorrect bgzip header:\n" + "magic: %x, %x\n" + "method: %x\n" + "flags: %x\n" + "xlen: %d\n" + "si1, si2: %d, %d \n" + "subfield_length: %d", + magic1, magic2, method, flags, xlen, si1, si2, subfield_length + ); + return NULL; + } + uint16_t block_size = load_u16_le(cursor + 16); + const uint8_t *new_start = cursor + block_size + 1; + if (new_start > data_end) { + break; + } + cursor = new_start; + } + return PyLong_FromSsize_t(cursor - data); +} + +static PyMethodDef _bgzip_methods[] = { + {"find_last_bgzip_end", find_last_bgzip_end, METH_O, NULL}, + {NULL}, +}; + +static struct PyModuleDef _bgzip_module = { + PyModuleDef_HEAD_INIT, + .m_name = "_bgzip", + .m_doc = NULL, + .m_size = -1, + .m_methods = _bgzip_methods, +}; + +PyMODINIT_FUNC +PyInit__bgzip(void) +{ + PyObject *m = PyModule_Create(&_bgzip_module); + if (m == NULL) { + return NULL; + } + return m; +} diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index 7f1c94fc..ecab37cf 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -14,7 +14,7 @@ import threading from typing import List, Optional, Tuple -from . import igzip, isal_zlib +from . import igzip, isal_zlib, _bgzip DEFLATE_WINDOW_SIZE = 2 ** 15 @@ -167,6 +167,138 @@ def closed(self) -> bool: return self._closed +class _ThreadedBGzipReader(io.RawIOBase): + """ + Reads BGZip files multithreaded. For one thread, the normal gzip reader + class is more efficient, as it operates fewer queues and keeps less + allocated data around. + """ + def __init__(self, filename, threads=1, queue_size=2, block_size=1024 * 1024): + if threads < 1: + raise RuntimeError("At least one thread is needed") + self.raw, self.closefd = open_as_binary_stream(filename, "rb") + + self.lock = threading.Lock() + self.pos = 0 + self._read_from_index = 0 + self.read_file = False + self.input_queues = [queue.Queue(queue_size) for _ in range(threads)] + self.output_queues = [queue.Queue(queue_size) for _ in range(threads)] + self.eof = False + self.exception = None + self.buffer = io.BytesIO() + self.block_size = block_size + self.input_worker = threading.Thread(target=self._read_input) + self.output_workers = [threading.Thread(target=self._decompress, args=(i,)) for i in range(threads)] + self._closed = False + self.running = True + self._calling_thread = threading.current_thread() + self.input_worker.start() + for worker in self.output_workers: + worker.start() + + def _check_closed(self, msg=None): + if self._closed: + raise ValueError("I/O operation on closed file") + + def _read_input(self): + block_size = self.block_size + number_of_queues = len(self.output_queues) + input_index = 0 + buffer = bytearray(block_size) + buffer_view = memoryview(buffer) + offset = 0 + while self.running and self._calling_thread.is_alive(): + bytes_read = self.raw.readinto(buffer_view[offset:]) + total_read = offset + bytes_read + if bytes_read == 0: + return + blocks_end = _bgzip.find_last_bgzip_end(buffer_view[:total_read]) + compressed = bytes(buffer_view[:blocks_end]) + leftover = buffer_view[blocks_end:total_read] + offset = leftover.nbytes + buffer[:offset] = leftover + input_queue = self.input_queues[input_index] + while self.running and self._calling_thread.is_alive(): + try: + input_queue.put(compressed, timeout=0.01) + break + except queue.Full: + pass + input_index += 1 + input_index %= number_of_queues + + def _decompress(self, index: int): + input_queue = self.input_queues[index] + output_queue = self.output_queues[index] + while self.running and self._calling_thread.is_alive(): + try: + input_data = input_queue.get(timeout=0.01) + except queue.Empty: + if not self.input_worker.is_alive(): + return + continue + try: + decompressed = isal_zlib._GzipReader(input_data).read() + except Exception as e: + with self.lock: + self.exception = e + return + input_queue.task_done() + while self.running and self._calling_thread.is_alive(): + try: + output_queue.put(decompressed, timeout=0.05) + break + except queue.Full: + pass + + def readinto(self, b): + self._check_closed() + result = self.buffer.readinto(b) + if result == 0: + output_queue = self.output_queues[self._read_from_index] + while True: + try: + data_from_queue = output_queue.get(timeout=0.01) + output_queue.task_done() + self._read_from_index += 1 + self._read_from_index %= len(self.output_queues) + break + except queue.Empty: + if self.exception: + raise self.exception + if not any(worker.is_alive() for worker in self.output_workers): + # EOF reached + return 0 + self.buffer = io.BytesIO(data_from_queue) + result = self.buffer.readinto(b) + self.pos += result + return result + + def readable(self) -> bool: + return True + + def tell(self) -> int: + self._check_closed() + return self.pos + + def close(self) -> None: + if self._closed: + return + self.running = False + self.input_worker.join() + for worker in self.output_workers: + worker.join() + self.buffer.close() + if self.closefd: + self.raw.close() + self._closed = True + + @property + def closed(self) -> bool: + return self._closed + + class FlushableBufferedWriter(io.BufferedWriter): def flush(self): super().flush()