From 8e458cdcd5af52294c267df6db6e8f27eefedd27 Mon Sep 17 00:00:00 2001 From: francis-du Date: Fri, 12 Aug 2022 15:25:20 +0800 Subject: [PATCH 1/3] feat: add DataFrame::distinct binding --- .gitignore | 1 + datafusion/tests/test_dataframe.py | 20 ++++++++++++++++++++ src/dataframe.rs | 8 +++++++- 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index b57efb74e..dba642930 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ target Cargo.lock +/venv .idea # Byte-compiled / optimized / DLL files diff --git a/datafusion/tests/test_dataframe.py b/datafusion/tests/test_dataframe.py index bed0a91a6..7333d6950 100644 --- a/datafusion/tests/test_dataframe.py +++ b/datafusion/tests/test_dataframe.py @@ -156,6 +156,26 @@ def test_join(): assert table.to_pydict() == expected +def test_distinct(): + ctx = SessionContext() + + batch = pa.RecordBatch.from_arrays( + [pa.array([1, 2, 3, 1, 2, 3]), pa.array([4, 5, 6, 4, 5, 6])], + names=["a", "b"], + ) + df_a = ctx.create_dataframe([[batch]]) \ + .distinct().sort(column("a").sort(ascending=True)) + + batch = pa.RecordBatch.from_arrays( + [pa.array([1, 2, 3]), pa.array([4, 5, 6])], + names=["a", "b"], + ) + df_b = ctx.create_dataframe([[batch]]) \ + .sort(column("a").sort(ascending=True)) + + assert df_a.collect() == df_b.collect() + + def test_window_lead(df): df = df.select( column("a"), diff --git a/src/dataframe.rs b/src/dataframe.rs index 80963f7f0..13e21a1aa 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -129,6 +129,12 @@ impl PyDataFrame { Ok(pretty::print_batches(&batches)?) } + /// Filter out duplicate rows + fn distinct(&self) -> PyResult { + let df = self.df.distinct()?; + Ok(Self::new(df)) + } + fn join( &self, right: PyDataFrame, @@ -147,7 +153,7 @@ impl PyDataFrame { "The join type {} does not exist or is not implemented", how )) - .into()) + .into()); } }; From d94ba7a220f988e10c76709bf1045615e9194597 Mon Sep 17 00:00:00 2001 From: francis-du Date: Thu, 18 Aug 2022 22:03:23 +0800 Subject: [PATCH 2/3] fix: fmt --- src/dataframe.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dataframe.rs b/src/dataframe.rs index 13e21a1aa..87f38c170 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -153,7 +153,7 @@ impl PyDataFrame { "The join type {} does not exist or is not implemented", how )) - .into()); + .into()); } }; From 14e0384b6c3e0ebd8c9146807613004390ace829 Mon Sep 17 00:00:00 2001 From: francis-du Date: Tue, 6 Sep 2022 13:16:33 +0800 Subject: [PATCH 3/3] fix: python linter --- datafusion/tests/test_dataframe.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/datafusion/tests/test_dataframe.py b/datafusion/tests/test_dataframe.py index 7333d6950..30506a7c6 100644 --- a/datafusion/tests/test_dataframe.py +++ b/datafusion/tests/test_dataframe.py @@ -163,15 +163,19 @@ def test_distinct(): [pa.array([1, 2, 3, 1, 2, 3]), pa.array([4, 5, 6, 4, 5, 6])], names=["a", "b"], ) - df_a = ctx.create_dataframe([[batch]]) \ - .distinct().sort(column("a").sort(ascending=True)) + df_a = ( + ctx.create_dataframe([[batch]]) + .distinct() + .sort(column("a").sort(ascending=True)) + ) batch = pa.RecordBatch.from_arrays( [pa.array([1, 2, 3]), pa.array([4, 5, 6])], names=["a", "b"], ) - df_b = ctx.create_dataframe([[batch]]) \ - .sort(column("a").sort(ascending=True)) + df_b = ctx.create_dataframe([[batch]]).sort( + column("a").sort(ascending=True) + ) assert df_a.collect() == df_b.collect()