Skip to content

feat!: use FilteredReadExec in the planner #3813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Jul 21, 2025

Conversation

westonpace
Copy link
Contributor

@westonpace westonpace commented May 13, 2025

Switches the planner to use the new FilteredReadExec node when not using legacy storage.

Starts to remove the storage_class prototype. Keeping it up was too difficult in this change and the prototype has not seen use or active development into a proper feature. That can still happen in the future.

Refactors most of the planner to use Projection instead of Schema to represent the user's projection.

BREAKING CHANGE: The names _rowid and _rowaddr are no longer valid column names (it's unclear they ever should have been and using these names would have likely led to errors)

@github-actions github-actions bot added enhancement New feature or request python labels May 13, 2025
@westonpace westonpace changed the title feat: use FilteredReadExec in the planner feat!: use FilteredReadExec in the planner May 13, 2025
@niebayes
Copy link

Very cool optimization

@westonpace westonpace force-pushed the feat/filtered-read-in-planner branch 2 times, most recently from 8cde7c1 to 598e178 Compare July 2, 2025 00:55
@codecov-commenter
Copy link

codecov-commenter commented Jul 2, 2025

Codecov Report

Attention: Patch coverage is 65.37370% with 366 lines in your changes missing coverage. Please review.

Project coverage is 80.10%. Comparing base (7069045) to head (157ce91).

Files with missing lines Patch % Lines
rust/lance/src/io/exec/filtered_read.rs 67.07% 118 Missing and 16 partials ⚠️
rust/lance-datafusion/src/exec.rs 19.44% 86 Missing and 1 partial ⚠️
rust/lance/src/index/vector/builder.rs 0.00% 26 Missing and 2 partials ⚠️
rust/lance-io/src/lib.rs 56.36% 24 Missing ⚠️
rust/lance-arrow/src/lib.rs 0.00% 18 Missing ⚠️
rust/lance-core/src/datatypes/schema.rs 81.96% 10 Missing and 1 partial ⚠️
rust/lance/src/dataset/write/insert.rs 27.27% 8 Missing ⚠️
rust/lance/src/io/exec/scalar_index.rs 82.22% 3 Missing and 5 partials ⚠️
rust/lance-datafusion/src/chunker.rs 87.27% 6 Missing and 1 partial ⚠️
rust/lance-datafusion/src/projection.rs 85.10% 0 Missing and 7 partials ⚠️
... and 13 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3813      +/-   ##
==========================================
- Coverage   80.22%   80.10%   -0.12%     
==========================================
  Files         298      298              
  Lines      105624   106214     +590     
  Branches   105624   106214     +590     
==========================================
+ Hits        84732    85082     +350     
- Misses      17805    18014     +209     
- Partials     3087     3118      +31     
Flag Coverage Δ
unittests 80.10% <65.37%> (-0.12%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@westonpace westonpace force-pushed the feat/filtered-read-in-planner branch from 49718cc to c37b1cf Compare July 7, 2025 14:16
@westonpace westonpace marked this pull request as ready for review July 8, 2025 08:41
@wjones127
Copy link
Contributor

There any benchmark results you want to share in this PR?

Copy link
Contributor

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little nervous about the multi-vector code path, but otherwise looks good.

Comment on lines 1231 to 1241
/// Return true if a plan has two filter stages
pub fn has_two_steps(&self) -> bool {
// If there is no refine step then either:
// 1. We are doing a full scan so there is no second filter
// 2. We are doing a pure scalar index search, so there is no first filter
//
// Note: In case #2 we are making the assumption that the scalar index covers
// most of our fragments. If 90% of our fragments are not covered by the scalar
// index then we might actually benefit from late materialization.
self.refine_expr.is_some()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have has_refine() above, so do we need this? TBH I find has_two_steps to be a bit confusing of a name, without more context on why there would be two filter steps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep...this is embarrassing 🤦. I think the logic here was more complicated at one point and then I walked it back and didn't realize the method was no longer useful.

Comment on lines +679 to +681
if let Some((row_id_idx, _)) = batch.schema().column_with_name("row_id") {
batch = batch.rename_column(row_id_idx, ROW_ID)?;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this row_id in the first place? I wonder if we need to modify the source itself?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I get it now; this might have been a user-supplied column and we disallowed that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there is a "user supplied shuffle buffers" path where we write the shuffle input out to files before we call it and a "go directly into shuffle" path where we dont. The direct path uses _rowid because the input is just an output from a scan and never materialized into a file.

@@ -5477,6 +5662,7 @@ mod test {

#[rstest]
#[tokio::test]
#[test_log::test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

Comment on lines 2997 to 3049
HashJoinExec: mode=CollectLeft, join_type=Right, on=[(key@0, key@1)], projection=[_rowaddr@1, value@2, key@3]
LanceRead: uri=..., projection=[key], num_fragments=1, range_before=None, range_after=None, \
row_id=false, row_addr=true, indexed_filter=--, refine_filter=--
RepartitionExec: partitioning=RoundRobinBatch(...), input_partitions=1
StreamingTableExec: partition_sizes=1, projection=[value, key]"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why, but this test keeps flipping between CollectLeft and Partitioned. One time was during a datafusion upgrade. I guess here it's because we switch from LanceScan to LanceRead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's based on the statistics I think. It was Partitioned until I added a statistics impl and then it changed to CollectLeft. Maybe when it switched before it was because the stats weren't making it up?

Comment on lines +267 to +311
/// This mode will use a single thread per partition. This is more traditional for
/// DataFusion and should give better performance for complex queries that have a
/// lot of downstream processing. However, you will want to make sure to create the
/// node with enough partitions or else you will not get any parallelism.
///
/// The number of partitions is specified by the parameter.
MultiplePartitions(usize),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is cool! Do we exercise this in any tests? I couldn't see where we initialize with this enum?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet. My hope is to switch to thus at sone point. I dont think it has much benefit for vector search but it's more natural for SQL / table provider paths and so might be a fun perf experiment.

Comment on lines +1291 to +1356
FilteredReadThreadingMode::MultiplePartitions(num_partitions) => {
total_rows / num_partitions as u64
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it might be inexact. Do we exactly divide by num_partitions? And what about the remainder?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we have each partition just pull from the task stream in a first-come-first-serve way. I don't think that's how datafusion expects it, so might not work well, at least with the stats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Youre right. We do first come first serve and so inexact is more accurate

Comment on lines +262 to +268
// Combine with residual if any
let current_batch = if let Some(residual) = self.residual.take() {
arrow::compute::concat_batches(&residual.schema(), &[residual, batch])
.map_err(|e| DataFusionError::External(Box::new(e)))?
} else {
batch
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance-wise, this seems a little unfortunate. It means that if we have batch_size=100 and a stream of 100 single row batches, we will call concat_batches 99 times to produce a batch. Would be nice to collect points to the batches and call concat_batches() one per output batch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I just copied this over as-is. I made #4192 for follow-up.

Comment on lines 299 to 301
// We may reduce the number of batches but there is no guarantee. We will never
// increase the number of batches.
self.inner.size_hint()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will never increase the number of batches.

Is that true? I would think if you had input batches of size 1000, but requested batch size of 100, you would have 10 output batch for every input batch, right?

Copy link
Contributor Author

@westonpace westonpace Jul 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I removed the size hint. Technically, we shouldn't ever get jumbo batches (since we set the target batch size appropriately in DF). However, I'm not sure this size hint is providing any real value anyways.

Comment on lines +478 to +480
// This is needed as AnalyzeExec launches a thread task per
// partition, and we want these to be connected to the parent span
let plan = Arc::new(TracedExec::new(plan, Span::current()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🫶

@westonpace
Copy link
Contributor Author

There any benchmark results you want to share in this PR?

@wjones127

The primary goal of the PR was to avoid cases where using an index could trigger worse behavior. I've added an IOPS benchmark to track this which measures the number of IOPS a search makes.

Filter Payload Old Behavior New Behavior
Equality Integers 1 IOP 1 IOP
Equality Strings 2 IOPS 2 IOPS
None Integers 14 IOPS 14 IOPS
None Strings 58 IOPS 58 IOPS
Inequality Integers 319 IOPS 14 IOPS
Inequality Strings 668 IOPS 58 IOPS

However, we also want to make sure that the new planning code is not too expensive. I'll be honest, I think it is slightly more expensive and might be noticeable on queries that return a very small number of rows. I think this could be fixed if someone were in the mood to play with data structures and algorithms. The main cost I've reported at #4189

I've instead cheated and fixed a low hanging performance bug where we were calling create_plan twice. Note: all these benchmark numbers are working against data already in the kernel cache. If we were doing real I/O then the differences for the inequality filters would be much more significant.

------------------------------------------------------------------------------------------------------------------- benchmark: 24 tests--------------------------------------------------------------------------------------------------------------------
Name (time in us)                                                              Min                     Max                    Mean            StdDev                 Median               IQR            Outliers         OPS            Rounds  Iterations
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_basic_btree_search[None-equal] (NOW)                                 954.0389 (1.0)          954.0389 (1.0)          954.0389 (1.0)     0.0000 (1.0)          954.0389 (1.0)      0.0000 (1.0)           0;0  1,048.1753 (1.0)           1          10
test_basic_btree_search[None-small_range] (NOW)                           968.5006 (1.02)         968.5006 (1.02)         968.5006 (1.02)    0.0000 (1.0)          968.5006 (1.02)     0.0000 (1.0)           0;0  1,032.5239 (0.99)          1          10
test_basic_btree_search[integers-equal] (NOW)                           1,022.2520 (1.07)       1,022.2520 (1.07)       1,022.2520 (1.07)    0.0000 (1.0)        1,022.2520 (1.07)     0.0000 (1.0)           0;0    978.2324 (0.93)          1          10
test_basic_btree_search[integers-small_range] (NOW)                     1,069.8864 (1.12)       1,069.8864 (1.12)       1,069.8864 (1.12)     0.0000 (1.0)        1,069.8864 (1.12)     0.0000 (1.0)           0;0    934.6787 (0.89)          1          10
test_basic_btree_search[small_strings-equal] (NOW)                      1,082.0105 (1.13)       1,082.0105 (1.13)       1,082.0105 (1.13)     0.0000 (1.0)        1,082.0105 (1.13)     0.0000 (1.0)           0;0    924.2054 (0.88)          1          10
test_basic_btree_search[small_strings-small_range] (NOW)                1,098.1761 (1.15)       1,098.1761 (1.15)       1,098.1761 (1.15)     0.0000 (1.0)        1,098.1761 (1.15)     0.0000 (1.0)           0;0    910.6008 (0.87)          1          10
test_basic_btree_search[None-equal] (0001_9603ad8)                      1,375.6214 (1.44)       1,375.6214 (1.44)       1,375.6214 (1.44)     0.0000 (1.0)        1,375.6214 (1.44)     0.0000 (1.0)           0;0    726.9442 (0.69)          1         100
test_basic_btree_search[None-small_range] (0001_9603ad8)                1,426.6382 (1.50)       1,426.6382 (1.50)       1,426.6382 (1.50)     0.0000 (1.0)        1,426.6382 (1.50)     0.0000 (1.0)           0;0    700.9486 (0.67)          1         100
test_basic_btree_search[integers-equal] (0001_9603ad8)                  1,552.7651 (1.63)       1,552.7651 (1.63)       1,552.7651 (1.63)     0.0000 (1.0)        1,552.7651 (1.63)     0.0000 (1.0)           0;0    644.0124 (0.61)          1         100
test_basic_btree_search[small_strings-equal] (0001_9603ad8)             1,580.4624 (1.66)       1,580.4624 (1.66)       1,580.4624 (1.66)     0.0000 (1.0)        1,580.4624 (1.66)     0.0000 (1.0)           0;0    632.7262 (0.60)          1         100
test_basic_btree_search[small_strings-small_range] (0001_9603ad8)       1,592.6642 (1.67)       1,592.6642 (1.67)       1,592.6642 (1.67)     0.0000 (1.0)        1,592.6642 (1.67)     0.0000 (1.0)           0;0    627.8787 (0.60)          1         100
test_basic_btree_search[integers-small_range] (0001_9603ad8)            1,603.5482 (1.68)       1,603.5482 (1.68)       1,603.5482 (1.68)     0.0000 (1.0)        1,603.5482 (1.68)     0.0000 (1.0)           0;0    623.6171 (0.59)          1         100
test_basic_btree_search[None-none] (NOW)                                8,135.8763 (8.53)       8,135.8763 (8.53)       8,135.8763 (8.53)     0.0000 (1.0)        8,135.8763 (8.53)     0.0000 (1.0)           0;0    122.9124 (0.12)          1          10
test_basic_btree_search[integers-none] (NOW)                           14,770.0967 (15.48)     14,770.0967 (15.48)     14,770.0967 (15.48)    0.0000 (1.0)       14,770.0967 (15.48)    0.0000 (1.0)           0;0     67.7044 (0.06)          1          10
test_basic_btree_search[None-none] (0001_9603ad8)                      42,269.7553 (44.31)     42,269.7553 (44.31)     42,269.7553 (44.31)    0.0000 (1.0)       42,269.7553 (44.31)    0.0000 (1.0)           0;0     23.6576 (0.02)          1         100
test_basic_btree_search[integers-none] (0001_9603ad8)                  44,067.0523 (46.19)     44,067.0523 (46.19)     44,067.0523 (46.19)    0.0000 (1.0)       44,067.0523 (46.19)    0.0000 (1.0)           0;0     22.6927 (0.02)          1         100
test_basic_btree_search[small_strings-none] (NOW)                      46,578.5473 (48.82)     46,578.5473 (48.82)     46,578.5473 (48.82)    0.0000 (1.0)       46,578.5473 (48.82)    0.0000 (1.0)           0;0     21.4691 (0.02)          1          10
test_basic_btree_search[None-not_equal] (NOW)                          53,099.8584 (55.66)     53,099.8584 (55.66)     53,099.8584 (55.66)    0.0000 (1.0)       53,099.8584 (55.66)    0.0000 (1.0)           0;0     18.8324 (0.02)          1          10
test_basic_btree_search[integers-not_equal] (NOW)                      58,868.7611 (61.70)     58,868.7611 (61.70)     58,868.7611 (61.70)    0.0000 (1.0)       58,868.7611 (61.70)    0.0000 (1.0)           0;0     16.9869 (0.02)          1          10
test_basic_btree_search[None-not_equal] (0001_9603ad8)                 62,107.7695 (65.10)     62,107.7695 (65.10)     62,107.7695 (65.10)    0.0000 (1.0)       62,107.7695 (65.10)    0.0000 (1.0)           0;0     16.1010 (0.02)          1         100
test_basic_btree_search[small_strings-none] (0001_9603ad8)             72,005.5600 (75.47)     72,005.5600 (75.47)     72,005.5600 (75.47)    0.0000 (1.0)       72,005.5600 (75.47)    0.0000 (1.0)           0;0     13.8878 (0.01)          1         100
test_basic_btree_search[integers-not_equal] (0001_9603ad8)             80,490.5911 (84.37)     80,490.5911 (84.37)     80,490.5911 (84.37)    0.0000 (1.0)       80,490.5911 (84.37)    0.0000 (1.0)           0;0     12.4238 (0.01)          1         100
test_basic_btree_search[small_strings-not_equal] (NOW)                 93,165.1456 (97.65)     93,165.1456 (97.65)     93,165.1456 (97.65)    0.0000 (1.0)       93,165.1456 (97.65)    0.0000 (1.0)           0;0     10.7336 (0.01)          1          10
test_basic_btree_search[small_strings-not_equal] (0001_9603ad8)       102,490.6225 (107.43)   102,490.6225 (107.43)   102,490.6225 (107.43)   0.0000 (1.0)      102,490.6225 (107.43)   0.0000 (1.0)           0;0      9.7570 (0.01)          1         100
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

@westonpace westonpace force-pushed the feat/filtered-read-in-planner branch from 354cfa8 to 79a57f2 Compare July 10, 2025 13:49
@westonpace
Copy link
Contributor Author

I'm a little nervous about the multi-vector code path, but otherwise looks good.

@wjones127 Can you elaborate? I didn't think I made any changes there.

@wjones127
Copy link
Contributor

I'm a little nervous about the multi-vector code path, but otherwise looks good.

@wjones127 Can you elaborate? I didn't think I made any changes there.

Sorry I meant the multi threaded one that isn’t used in any test. Just missing test coverage is all.

@westonpace westonpace force-pushed the feat/filtered-read-in-planner branch 2 times, most recently from 6e88e72 to 4f44b90 Compare July 18, 2025 13:34
@westonpace westonpace force-pushed the feat/filtered-read-in-planner branch from 3fe23c1 to 1fb9267 Compare July 21, 2025 12:41
@github-actions github-actions bot added the java label Jul 21, 2025
@westonpace westonpace merged commit eb5e1c4 into lancedb:main Jul 21, 2025
27 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants