Skip to content

Commit 56df088

Browse files
authored
[DataFrame] - Add repartition funcation for dataframe (#37)
* feat: Add repartition funcation for dataframe * fix: python linter
1 parent 2636185 commit 56df088

File tree

2 files changed

+23
-0
lines changed

2 files changed

+23
-0
lines changed

datafusion/tests/test_dataframe.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,3 +236,11 @@ def test_explain(df):
236236
column("a") - column("b"),
237237
)
238238
df.explain()
239+
240+
241+
def test_repartition(df):
242+
df.repartition(2)
243+
244+
245+
def test_repartition_by_hash(df):
246+
df.repartition_by_hash(column("a"), num=2)

src/dataframe.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use datafusion::arrow::pyarrow::PyArrowConvert;
2222
use datafusion::arrow::util::pretty;
2323
use datafusion::dataframe::DataFrame;
2424
use datafusion::logical_plan::JoinType;
25+
use datafusion::prelude::*;
2526
use pyo3::exceptions::PyTypeError;
2627
use pyo3::prelude::*;
2728
use pyo3::types::PyTuple;
@@ -170,4 +171,18 @@ impl PyDataFrame {
170171
let batches = wait_for_future(py, df.collect())?;
171172
Ok(pretty::print_batches(&batches)?)
172173
}
174+
175+
/// Repartition a `DataFrame` based on a logical partitioning scheme.
176+
fn repartition(&self, num: usize) -> PyResult<Self> {
177+
let new_df = self.df.repartition(Partitioning::RoundRobinBatch(num))?;
178+
Ok(Self::new(new_df))
179+
}
180+
181+
/// Repartition a `DataFrame` based on a logical partitioning scheme.
182+
#[args(args = "*", num)]
183+
fn repartition_by_hash(&self, args: Vec<PyExpr>, num: usize) -> PyResult<Self> {
184+
let expr = args.into_iter().map(|py_expr| py_expr.into()).collect();
185+
let new_df = self.df.repartition(Partitioning::Hash(expr, num))?;
186+
Ok(Self::new(new_df))
187+
}
173188
}

0 commit comments

Comments
 (0)