From bc153c5383fec5e6edc340638dfc1cc17a900d45 Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Sat, 24 May 2025 13:05:15 +0200 Subject: [PATCH 1/4] Add support for new pandas UDF engine --- src/blosc2/proxy.py | 47 ++++++++++++ tests/test_pandas_udf_engine.py | 125 ++++++++++++++++++++++++++++++++ 2 files changed, 172 insertions(+) create mode 100644 tests/test_pandas_udf_engine.py diff --git a/src/blosc2/proxy.py b/src/blosc2/proxy.py index 213d8808..cd533c11 100644 --- a/src/blosc2/proxy.py +++ b/src/blosc2/proxy.py @@ -676,3 +676,50 @@ def wrapper(*args, **func_kwargs): return decorator else: return decorator(func) + + +class PandasUdfEngine: + @staticmethod + def _ensure_numpy_data(data): + if not isinstance(data, np.ndarray): + try: + data = data.values + except AttributeError as err: + raise ValueError( + "blosc2.jit received an object of type {data.__name__}, which is not supported. " + "Try casting your Series or DataFrame to a NumPy dtype." + ) from err + return data + + @classmethod + def map(cls, data, func, args, kwargs, decorator, skip_na): + """ + JIT a NumPy array element-wise. In the case of Blosc2, functions are + expected to be vectorized NumPy operations, so the function is called + with the NumPy array as the function parameter, instead of calling the + function once for each element. + """ + data = cls._ensure_numpy_data(data) + func = decorator(func) + return func(data, *args, **kwargs) + + @classmethod + def apply(cls, data, func, args, kwargs, decorator, axis): + """ + JIT a NumPy array by column or row. In the case of Blosc2, functions are + expected to be vectorized NumPy operations, so the function is called + with the NumPy array as the function parameter, instead of calling the + function once for each column or row. + """ + data = cls._ensure_numpy_data(data) + func = decorator(func) + if data.ndim in (1, 2): + return func(data, *args, **kwargs) + else: + raise NotImplementedError( + "The blosc2 engine only supports data with with 1 or 2 dimensions. " + f"A NumPy array with {data.ndim} dimensions has been received." + ) + + +jit.__pandas_udf__ = PandasUdfEngine diff --git a/tests/test_pandas_udf_engine.py b/tests/test_pandas_udf_engine.py new file mode 100644 index 00000000..82107c83 --- /dev/null +++ b/tests/test_pandas_udf_engine.py @@ -0,0 +1,125 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# This source code is licensed under a BSD-style license (found in the +# LICENSE file in the root directory of this source tree) +####################################################################### + +import numpy as np + +import blosc2 + + +class TestPandasUDF: + def test_map_1d(self): + def add_one(x): + return x + 1 + + data = np.array([1, 2]) + + result = blosc2.jit.__pandas_udf__.map( + data, + add_one, + args=(), + kwargs={}, + decorator=blosc2.jit, + skip_na=False, + ) + assert result.shape == (2,) + assert result[0] == 2 + assert result[1] == 3 + + def test_map_1d_with_args(self): + def add_numbers(x, num1, num2): + return x + num1 + num2 + + data = np.array([1, 2]) + + result = blosc2.jit.__pandas_udf__.map( + data, + add_numbers, + args=(10,), + kwargs={"num2": 100}, + decorator=blosc2.jit, + skip_na=False, + ) + assert result.shape == (2,) + assert result[0] == 111 + assert result[1] == 112 + + def test_map_2d(self): + def add_one(x): + return x + 1 + + data = np.array([[1, 2], [3, 4]]) + + result = blosc2.jit.__pandas_udf__.map( + data, + add_one, + args=(), + kwargs={}, + decorator=blosc2.jit, + skip_na=False, + ) + assert result.shape == (2, 2) + assert result[0, 0] == 2 + assert result[0, 1] == 3 + assert result[1, 0] == 4 + assert result[1, 1] == 5 + + def test_apply_1d(self): + def add_one(x): + return x + 1 + + data = np.array([1, 2]) + + result = blosc2.jit.__pandas_udf__.apply( + data, + add_one, + args=(), + kwargs={}, + decorator=blosc2.jit, + axis=0, + ) + assert result.shape == (2,) + assert result[0] == 2 + assert result[1] == 3 + + def test_apply_1d_with_args(self): + def add_numbers(x, num1, num2): + return x + num1 + num2 + + data = np.array([1, 2]) + + result = blosc2.jit.__pandas_udf__.apply( + data, + add_numbers, + args=(10,), + kwargs={"num2": 100}, + decorator=blosc2.jit, + axis=0, + ) + assert result.shape == (2,) + assert result[0] == 111 + assert result[1] == 112 + + def test_apply_2d(self): + def add_one(x): + return x + 1 + + data = np.array([[1, 2], [3, 4]]) + + result = blosc2.jit.__pandas_udf__.apply( + data, + add_one, + args=(), + kwargs={}, + decorator=blosc2.jit, + axis=0, + ) + assert result.shape == (2, 2) + assert result[0, 0] == 2 + assert result[0, 1] == 3 + assert result[1, 0] == 4 + assert result[1, 1] == 5 From b3349813c2dc7fa6c556191df77fbad457839cf9 Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Sun, 25 May 2025 11:05:43 +0200 Subject: [PATCH 2/4] Removing unneeded check --- src/blosc2/proxy.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/blosc2/proxy.py b/src/blosc2/proxy.py index cd533c11..5d5976e5 100644 --- a/src/blosc2/proxy.py +++ b/src/blosc2/proxy.py @@ -713,13 +713,7 @@ def apply(cls, data, func, args, kwargs, decorator, axis): """ data = cls._ensure_numpy_data(data) func = decorator(func) - if data.ndim in (1, 2): - return func(data, *args, **kwargs) - else: - raise NotImplementedError( - "The blosc2 engine only supports data with with 1 or 2 dimensions. " - f"A NumPy array with {data.ndim} dimensions has been received." - ) + return func(data, *args, **kwargs) jit.__pandas_udf__ = PandasUdfEngine From 5913fba80387e9bcf10167e5f088a4d6a49e596b Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Sun, 25 May 2025 21:46:16 +0200 Subject: [PATCH 3/4] Making the blosc2 engine respect the default pandas udf signatures --- src/blosc2/proxy.py | 22 +++++++-- tests/test_pandas_udf_engine.py | 87 ++++++++++++++++----------------- 2 files changed, 59 insertions(+), 50 deletions(-) diff --git a/src/blosc2/proxy.py b/src/blosc2/proxy.py index 5d5976e5..0c4c67a4 100644 --- a/src/blosc2/proxy.py +++ b/src/blosc2/proxy.py @@ -699,9 +699,7 @@ def map(cls, data, func, args, kwargs, decorator, skip_na): with the NumPy array as the function parameter, instead of calling the function once for each element. """ - data = cls._ensure_numpy_data(data) - func = decorator(func) - return func(data, *args, **kwargs) + raise NotImplementedError("The Blosc2 engine does not support map. Use apply instead.") @classmethod def apply(cls, data, func, args, kwargs, decorator, axis): @@ -713,7 +711,23 @@ def apply(cls, data, func, args, kwargs, decorator, axis): """ data = cls._ensure_numpy_data(data) func = decorator(func) - return func(data, *args, **kwargs) + if data.ndim == 1 or axis is None: + # pandas Series.apply or pipe + return func(data, *args, **kwargs) + elif axis in (0, "index"): + # pandas apply(axis=0) column-wise + result = [] + for row_idx in range(data.shape[1]): + result.append(func(data[:, row_idx], *args, **kwargs)) + return np.vstack(result).transpose() + elif axis == (1, "columns"): + # pandas apply(axis=1) row-wise + result = [] + for col_idx in range(data.shape[0]): + result.append(func(data[col_idx, :], *args, **kwargs)) + return np.vstack(result) + else: + raise NotImplementedError(f"Unknown axis '{axis}'. Use one of 0, 1 or None.") jit.__pandas_udf__ = PandasUdfEngine diff --git a/tests/test_pandas_udf_engine.py b/tests/test_pandas_udf_engine.py index 82107c83..326a7123 100644 --- a/tests/test_pandas_udf_engine.py +++ b/tests/test_pandas_udf_engine.py @@ -7,72 +7,88 @@ ####################################################################### import numpy as np +import pytest import blosc2 class TestPandasUDF: - def test_map_1d(self): + def test_map(self): def add_one(x): return x + 1 data = np.array([1, 2]) - result = blosc2.jit.__pandas_udf__.map( + with pytest.raises(NotImplementedError): + blosc2.jit.__pandas_udf__.map( + data, + add_one, + args=(), + kwargs={}, + decorator=blosc2.jit, + skip_na=False, + ) + + def test_apply_1d(self): + def add_one(x): + return x + 1 + + data = np.array([1, 2]) + + result = blosc2.jit.__pandas_udf__.apply( data, add_one, args=(), kwargs={}, decorator=blosc2.jit, - skip_na=False, + axis=0, ) assert result.shape == (2,) assert result[0] == 2 assert result[1] == 3 - def test_map_1d_with_args(self): + def test_apply_1d_with_args(self): def add_numbers(x, num1, num2): return x + num1 + num2 data = np.array([1, 2]) - result = blosc2.jit.__pandas_udf__.map( + result = blosc2.jit.__pandas_udf__.apply( data, add_numbers, args=(10,), kwargs={"num2": 100}, decorator=blosc2.jit, - skip_na=False, + axis=0, ) assert result.shape == (2,) assert result[0] == 111 assert result[1] == 112 - def test_map_2d(self): + def test_apply_2d(self): def add_one(x): + assert x.shape == (2, 3) return x + 1 - data = np.array([[1, 2], [3, 4]]) + data = np.array([[1, 2, 3], [4, 5, 6]]) - result = blosc2.jit.__pandas_udf__.map( + result = blosc2.jit.__pandas_udf__.apply( data, add_one, args=(), kwargs={}, decorator=blosc2.jit, - skip_na=False, + axis=None, ) - assert result.shape == (2, 2) - assert result[0, 0] == 2 - assert result[0, 1] == 3 - assert result[1, 0] == 4 - assert result[1, 1] == 5 + expected = np.array([[2, 3, 4], [5, 6, 7]]) + assert np.array_equal(result, expected) - def test_apply_1d(self): + def test_apply_2d_by_column(self): def add_one(x): + assert x.shape == (2,) return x + 1 - data = np.array([1, 2]) + data = np.array([[1, 2, 3], [4, 5, 6]]) result = blosc2.jit.__pandas_udf__.apply( data, @@ -82,33 +98,15 @@ def add_one(x): decorator=blosc2.jit, axis=0, ) - assert result.shape == (2,) - assert result[0] == 2 - assert result[1] == 3 + expected = np.array([[2, 3, 4], [5, 6, 7]]) + assert np.array_equal(result, expected) - def test_apply_1d_with_args(self): - def add_numbers(x, num1, num2): - return x + num1 + num2 - - data = np.array([1, 2]) - - result = blosc2.jit.__pandas_udf__.apply( - data, - add_numbers, - args=(10,), - kwargs={"num2": 100}, - decorator=blosc2.jit, - axis=0, - ) - assert result.shape == (2,) - assert result[0] == 111 - assert result[1] == 112 - - def test_apply_2d(self): + def test_apply_2d_by_row(self): def add_one(x): + assert x.shape == (3,) return x + 1 - data = np.array([[1, 2], [3, 4]]) + data = np.array([[1, 2, 3], [4, 5, 6]]) result = blosc2.jit.__pandas_udf__.apply( data, @@ -116,10 +114,7 @@ def add_one(x): args=(), kwargs={}, decorator=blosc2.jit, - axis=0, + axis=1, ) - assert result.shape == (2, 2) - assert result[0, 0] == 2 - assert result[0, 1] == 3 - assert result[1, 0] == 4 - assert result[1, 1] == 5 + expected = np.array([[2, 3, 4], [5, 6, 7]]) + assert np.array_equal(result, expected) From c0e9602bcbbd571fe6946c89fbc86207b568e023 Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Sun, 25 May 2025 21:58:06 +0200 Subject: [PATCH 4/4] Fix typo breaking tests --- src/blosc2/proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/blosc2/proxy.py b/src/blosc2/proxy.py index 0c4c67a4..1c5890c2 100644 --- a/src/blosc2/proxy.py +++ b/src/blosc2/proxy.py @@ -720,7 +720,7 @@ def apply(cls, data, func, args, kwargs, decorator, axis): for row_idx in range(data.shape[1]): result.append(func(data[:, row_idx], *args, **kwargs)) return np.vstack(result).transpose() - elif axis == (1, "columns"): + elif axis in (1, "columns"): # pandas apply(axis=1) row-wise result = [] for col_idx in range(data.shape[0]):