From 853a537b52159380d27a61a7f6eca46831eebaba Mon Sep 17 00:00:00 2001 From: francis-du Date: Tue, 16 Aug 2022 15:50:48 +0800 Subject: [PATCH 1/2] feat: Add repartition funcation for dataframe --- datafusion/tests/test_dataframe.py | 8 ++++++++ src/dataframe.rs | 17 ++++++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/datafusion/tests/test_dataframe.py b/datafusion/tests/test_dataframe.py index bed0a91a6..525e93187 100644 --- a/datafusion/tests/test_dataframe.py +++ b/datafusion/tests/test_dataframe.py @@ -212,3 +212,11 @@ def test_explain(df): column("a") - column("b"), ) df.explain() + + +def test_repartition(df): + df.repartition(2) + + +def test_repartition_by_hash(df): + df.repartition_by_hash(column("a"), num=2) \ No newline at end of file diff --git a/src/dataframe.rs b/src/dataframe.rs index 80963f7f0..4dc645583 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -22,6 +22,7 @@ use datafusion::arrow::pyarrow::PyArrowConvert; use datafusion::arrow::util::pretty; use datafusion::dataframe::DataFrame; use datafusion::logical_plan::JoinType; +use datafusion::prelude::*; use pyo3::exceptions::PyTypeError; use pyo3::prelude::*; use pyo3::types::PyTuple; @@ -147,7 +148,7 @@ impl PyDataFrame { "The join type {} does not exist or is not implemented", how )) - .into()) + .into()); } }; @@ -164,4 +165,18 @@ impl PyDataFrame { let batches = wait_for_future(py, df.collect())?; Ok(pretty::print_batches(&batches)?) } + + /// Repartition a `DataFrame` based on a logical partitioning scheme. + fn repartition(&self, num: usize) -> PyResult { + let new_df = self.df.repartition(Partitioning::RoundRobinBatch(num))?; + Ok(Self::new(new_df)) + } + + /// Repartition a `DataFrame` based on a logical partitioning scheme. + #[args(args = "*", num)] + fn repartition_by_hash(&self, args: Vec, num: usize) -> PyResult { + let expr = args.into_iter().map(|py_expr| py_expr.into()).collect(); + let new_df = self.df.repartition(Partitioning::Hash(expr, num))?; + Ok(Self::new(new_df)) + } } From 1481cc978a9a49ece3f9bfe8e356a2561c8c6364 Mon Sep 17 00:00:00 2001 From: francis-du Date: Tue, 6 Sep 2022 13:18:49 +0800 Subject: [PATCH 2/2] fix: python linter --- datafusion/tests/test_dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/tests/test_dataframe.py b/datafusion/tests/test_dataframe.py index 525e93187..7d2ee06fd 100644 --- a/datafusion/tests/test_dataframe.py +++ b/datafusion/tests/test_dataframe.py @@ -219,4 +219,4 @@ def test_repartition(df): def test_repartition_by_hash(df): - df.repartition_by_hash(column("a"), num=2) \ No newline at end of file + df.repartition_by_hash(column("a"), num=2)