From 5a5eb398fa287a6a75c50d587a4e9210e3648393 Mon Sep 17 00:00:00 2001 From: kosiew Date: Mon, 23 Jun 2025 20:14:41 +0800 Subject: [PATCH 1/4] feat: Add Parquet writer option autodetection --- python/datafusion/__init__.py | 6 +++++- python/datafusion/dataframe.py | 11 +++++++++-- python/tests/test_dataframe.py | 16 ++++++++++++++++ 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/python/datafusion/__init__.py b/python/datafusion/__init__.py index 16d65f68..0d245aad 100644 --- a/python/datafusion/__init__.py +++ b/python/datafusion/__init__.py @@ -46,7 +46,11 @@ SessionContext, SQLOptions, ) -from .dataframe import DataFrame, ParquetColumnOptions, ParquetWriterOptions +from .dataframe import ( + DataFrame, + ParquetColumnOptions, + ParquetWriterOptions, +) from .expr import ( Expr, WindowFrame, diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 769271c7..6bf821f6 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -55,6 +55,7 @@ from datafusion._internal import DataFrame as DataFrameInternal from datafusion._internal import expr as expr_internal +from dataclasses import dataclass from enum import Enum @@ -873,7 +874,7 @@ def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None def write_parquet( self, path: str | pathlib.Path, - compression: Union[str, Compression] = Compression.ZSTD, + compression: Union[str, Compression, ParquetWriterOptions] = Compression.ZSTD, compression_level: int | None = None, ) -> None: """Execute the :py:class:`DataFrame` and write the results to a Parquet file. @@ -894,7 +895,13 @@ def write_parquet( recommended range is 1 to 22, with the default being 4. Higher levels provide better compression but slower speed. """ - # Convert string to Compression enum if necessary + if isinstance(compression, ParquetWriterOptions): + if compression_level is not None: + msg = "compression_level should be None when using ParquetWriterOptions" + raise ValueError(msg) + self.write_parquet_with_options(path, compression) + return + if isinstance(compression, str): compression = Compression.from_str(compression) diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index 3c9b97f2..deaa30b3 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -2038,6 +2038,22 @@ def test_write_parquet_with_options_column_options(df, tmp_path): assert col["encodings"] == result["encodings"] +def test_write_parquet_options(df, tmp_path): + options = ParquetWriterOptions(compression="gzip", compression_level=6) + df.write_parquet(str(tmp_path), options) + + result = pq.read_table(str(tmp_path)).to_pydict() + expected = df.to_pydict() + + assert result == expected + + +def test_write_parquet_options_error(df, tmp_path): + options = ParquetWriterOptions(compression="gzip", compression_level=6) + with pytest.raises(ValueError): + df.write_parquet(str(tmp_path), options, compression_level=1) + + def test_dataframe_export(df) -> None: # Guarantees that we have the canonical implementation # reading our dataframe export From 8e39b17ba1ef4632c98302a307a47c343ae01149 Mon Sep 17 00:00:00 2001 From: kosiew Date: Mon, 23 Jun 2025 23:08:48 +0800 Subject: [PATCH 2/4] Add compression_level to ParquetWriterOptions --- python/datafusion/__init__.py | 7 ++----- python/datafusion/dataframe.py | 7 +++++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/python/datafusion/__init__.py b/python/datafusion/__init__.py index 0d245aad..71f74492 100644 --- a/python/datafusion/__init__.py +++ b/python/datafusion/__init__.py @@ -46,11 +46,8 @@ SessionContext, SQLOptions, ) -from .dataframe import ( - DataFrame, - ParquetColumnOptions, - ParquetWriterOptions, -) +from .dataframe import DataFrame, ParquetColumnOptions, ParquetWriterOptions + from .expr import ( Expr, WindowFrame, diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 6bf821f6..d1818592 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -55,7 +55,6 @@ from datafusion._internal import DataFrame as DataFrameInternal from datafusion._internal import expr as expr_internal -from dataclasses import dataclass from enum import Enum @@ -192,6 +191,7 @@ def __init__( writer_version: str = "1.0", skip_arrow_metadata: bool = False, compression: Optional[str] = "zstd(3)", + compression_level: Optional[int] = None, dictionary_enabled: Optional[bool] = True, dictionary_page_size_limit: int = 1024 * 1024, statistics_enabled: Optional[str] = "page", @@ -214,7 +214,10 @@ def __init__( self.write_batch_size = write_batch_size self.writer_version = writer_version self.skip_arrow_metadata = skip_arrow_metadata - self.compression = compression + if compression_level is not None: + self.compression = f"{compression}({compression_level})" + else: + self.compression = compression self.dictionary_enabled = dictionary_enabled self.dictionary_page_size_limit = dictionary_page_size_limit self.statistics_enabled = statistics_enabled From 3598637648271be5a536769f7ed5079581ded7b3 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 24 Jun 2025 10:34:35 +0800 Subject: [PATCH 3/4] fix ruff errors --- python/datafusion/__init__.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/datafusion/__init__.py b/python/datafusion/__init__.py index 71f74492..69062fd3 100644 --- a/python/datafusion/__init__.py +++ b/python/datafusion/__init__.py @@ -30,13 +30,12 @@ except ImportError: import importlib_metadata -from datafusion.col import col, column - from . import functions, object_store, substrait, unparser # The following imports are okay to remain as opaque to the user. from ._internal import Config from .catalog import Catalog, Database, Table +from .col import col, column from .common import ( DFSchema, ) @@ -47,7 +46,6 @@ SQLOptions, ) from .dataframe import DataFrame, ParquetColumnOptions, ParquetWriterOptions - from .expr import ( Expr, WindowFrame, From f0f8be7b48677183dfcc2ecf0a9d8b2f64b753fd Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 25 Jun 2025 12:10:55 +0800 Subject: [PATCH 4/4] feat: Add overloads for write_parquet method to support various compression options --- python/datafusion/dataframe.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index d1818592..efdf6a22 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -874,6 +874,30 @@ def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None """ self.df.write_csv(str(path), with_header) + @overload + def write_parquet( + self, + path: str | pathlib.Path, + compression: str, + compression_level: int | None = None, + ) -> None: ... + + @overload + def write_parquet( + self, + path: str | pathlib.Path, + compression: Compression = Compression.ZSTD, + compression_level: int | None = None, + ) -> None: ... + + @overload + def write_parquet( + self, + path: str | pathlib.Path, + compression: ParquetWriterOptions, + compression_level: None = None, + ) -> None: ... + def write_parquet( self, path: str | pathlib.Path,