-
Notifications
You must be signed in to change notification settings - Fork 421
feat!: use FilteredReadExec in the planner #3813
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!: use FilteredReadExec in the planner #3813
Conversation
Very cool optimization |
8cde7c1
to
598e178
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3813 +/- ##
==========================================
- Coverage 80.22% 80.10% -0.12%
==========================================
Files 298 298
Lines 105624 106214 +590
Branches 105624 106214 +590
==========================================
+ Hits 84732 85082 +350
- Misses 17805 18014 +209
- Partials 3087 3118 +31
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
49718cc
to
c37b1cf
Compare
There any benchmark results you want to share in this PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little nervous about the multi-vector code path, but otherwise looks good.
/// Return true if a plan has two filter stages | ||
pub fn has_two_steps(&self) -> bool { | ||
// If there is no refine step then either: | ||
// 1. We are doing a full scan so there is no second filter | ||
// 2. We are doing a pure scalar index search, so there is no first filter | ||
// | ||
// Note: In case #2 we are making the assumption that the scalar index covers | ||
// most of our fragments. If 90% of our fragments are not covered by the scalar | ||
// index then we might actually benefit from late materialization. | ||
self.refine_expr.is_some() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have has_refine()
above, so do we need this? TBH I find has_two_steps
to be a bit confusing of a name, without more context on why there would be two filter steps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep...this is embarrassing 🤦. I think the logic here was more complicated at one point and then I walked it back and didn't realize the method was no longer useful.
if let Some((row_id_idx, _)) = batch.schema().column_with_name("row_id") { | ||
batch = batch.rename_column(row_id_idx, ROW_ID)?; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this row_id
in the first place? I wonder if we need to modify the source itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I get it now; this might have been a user-supplied column and we disallowed that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, there is a "user supplied shuffle buffers" path where we write the shuffle input out to files before we call it and a "go directly into shuffle" path where we dont. The direct path uses _rowid because the input is just an output from a scan and never materialized into a file.
rust/lance/src/dataset/scanner.rs
Outdated
@@ -5477,6 +5662,7 @@ mod test { | |||
|
|||
#[rstest] | |||
#[tokio::test] | |||
#[test_log::test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
HashJoinExec: mode=CollectLeft, join_type=Right, on=[(key@0, key@1)], projection=[_rowaddr@1, value@2, key@3] | ||
LanceRead: uri=..., projection=[key], num_fragments=1, range_before=None, range_after=None, \ | ||
row_id=false, row_addr=true, indexed_filter=--, refine_filter=-- | ||
RepartitionExec: partitioning=RoundRobinBatch(...), input_partitions=1 | ||
StreamingTableExec: partition_sizes=1, projection=[value, key]" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know why, but this test keeps flipping between CollectLeft
and Partitioned
. One time was during a datafusion upgrade. I guess here it's because we switch from LanceScan
to LanceRead
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's based on the statistics I think. It was Partitioned until I added a statistics
impl and then it changed to CollectLeft
. Maybe when it switched before it was because the stats weren't making it up?
/// This mode will use a single thread per partition. This is more traditional for | ||
/// DataFusion and should give better performance for complex queries that have a | ||
/// lot of downstream processing. However, you will want to make sure to create the | ||
/// node with enough partitions or else you will not get any parallelism. | ||
/// | ||
/// The number of partitions is specified by the parameter. | ||
MultiplePartitions(usize), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is cool! Do we exercise this in any tests? I couldn't see where we initialize with this enum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not yet. My hope is to switch to thus at sone point. I dont think it has much benefit for vector search but it's more natural for SQL / table provider paths and so might be a fun perf experiment.
FilteredReadThreadingMode::MultiplePartitions(num_partitions) => { | ||
total_rows / num_partitions as u64 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like it might be inexact. Do we exactly divide by num_partitions
? And what about the remainder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we have each partition just pull from the task stream in a first-come-first-serve way. I don't think that's how datafusion expects it, so might not work well, at least with the stats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Youre right. We do first come first serve and so inexact is more accurate
// Combine with residual if any | ||
let current_batch = if let Some(residual) = self.residual.take() { | ||
arrow::compute::concat_batches(&residual.schema(), &[residual, batch]) | ||
.map_err(|e| DataFusionError::External(Box::new(e)))? | ||
} else { | ||
batch | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Performance-wise, this seems a little unfortunate. It means that if we have batch_size=100
and a stream of 100 single row batches, we will call concat_batches
99 times to produce a batch. Would be nice to collect points to the batches and call concat_batches()
one per output batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but I just copied this over as-is. I made #4192 for follow-up.
rust/lance-datafusion/src/chunker.rs
Outdated
// We may reduce the number of batches but there is no guarantee. We will never | ||
// increase the number of batches. | ||
self.inner.size_hint() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will never increase the number of batches.
Is that true? I would think if you had input batches of size 1000, but requested batch size of 100, you would have 10 output batch for every input batch, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I removed the size hint. Technically, we shouldn't ever get jumbo batches (since we set the target batch size appropriately in DF). However, I'm not sure this size hint is providing any real value anyways.
// This is needed as AnalyzeExec launches a thread task per | ||
// partition, and we want these to be connected to the parent span | ||
let plan = Arc::new(TracedExec::new(plan, Span::current())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🫶
The primary goal of the PR was to avoid cases where using an index could trigger worse behavior. I've added an IOPS benchmark to track this which measures the number of IOPS a search makes.
However, we also want to make sure that the new planning code is not too expensive. I'll be honest, I think it is slightly more expensive and might be noticeable on queries that return a very small number of rows. I think this could be fixed if someone were in the mood to play with data structures and algorithms. The main cost I've reported at #4189 I've instead cheated and fixed a low hanging performance bug where we were calling create_plan twice. Note: all these benchmark numbers are working against data already in the kernel cache. If we were doing real I/O then the differences for the inequality filters would be much more significant.
|
354cfa8
to
79a57f2
Compare
@wjones127 Can you elaborate? I didn't think I made any changes there. |
Sorry I meant the multi threaded one that isn’t used in any test. Just missing test coverage is all. |
6e88e72
to
4f44b90
Compare
3fe23c1
to
1fb9267
Compare
Switches the planner to use the new FilteredReadExec node when not using legacy storage.
Starts to remove the storage_class prototype. Keeping it up was too difficult in this change and the prototype has not seen use or active development into a proper feature. That can still happen in the future.
Refactors most of the planner to use
Projection
instead ofSchema
to represent the user's projection.BREAKING CHANGE: The names
_rowid
and_rowaddr
are no longer valid column names (it's unclear they ever should have been and using these names would have likely led to errors)