-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Use interleave
in hash repartitioning
#15768
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
) | ||
.unwrap(); | ||
|
||
let batch = interleave_record_batch(&b, &indices)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably an api like apache/arrow-rs#7325 would be even faster (avoiding one level of "trivial" indexing).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI @ctsk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice change, and a much clearer performance win than #15479. I expect (without testing) that these two PRs interact negatively with one another - Removing coalesce will mean that the data is "more scattered" in memory and probably make interleave work worse - as well as the computation of the left join keys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think removing coalesce after this change (for all hash repartitions) might be possible, as the output batch size will be roughly equal to input batch size (instead of roughly 1/partitions * batch_size). Unless hash values are somehow skewed (but this is currently also not good anyway).
A future api could use your take_in
api maybe to only output rows once batch size has been reached.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see I had misunderstood this PR. It makes a lot of sense to do this. As part of prototyping the integration of a take_in API in datafusion, I made a similar change - move the buffering before sending the small batches to their destination thread. I don't remember seeing as much speedup when I benchmarked that change independently - I guess using interleave instead of a take/concat combo (like I did back then) makes a significant difference. Awesome!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah the speedup comes from avoiding copying the data a second time in concat
/ CoalesceBatches. So when using take_in
we should be careful to use it once (for a single destination batch) to avoid doing the concat on the small batches
interleave
in hash repartitioninginterleave
in hash repartitioning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR improves DataFusion’s hash repartitioning by using arrow's interleave_record_batch, refactors the partitioning interface to work with multiple batches, and adds a buffering mechanism in the repartition executor.
- Changes the partition and partition_iter functions to accept a Vec instead of a single batch.
- Replaces the use of take_arrays with interleave_record_batch to build repartitioned batches efficiently.
- Updates the RepartitionExec logic to buffer input batches based on the partitioning mode.
Comments suppressed due to low confidence (1)
datafusion/physical-plan/src/repartition/mod.rs:316
- [nitpick] The variable name 'b' is not descriptive. Consider renaming it to something like 'batch_refs' or 'batches_slice' to improve code readability.
let b: Vec<&RecordBatch> = batches.iter().collect();
@@ -233,11 +233,11 @@ impl BatchPartitioner { | |||
/// | |||
/// The time spent repartitioning, not including time spent in `f` will be recorded | |||
/// to the [`metrics::Time`] provided on construction | |||
pub fn partition<F>(&mut self, batch: RecordBatch, mut f: F) -> Result<()> | |||
pub fn partition<F>(&mut self, batches: Vec<RecordBatch>, mut f: F) -> Result<()> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a pub API, it would be best if we could keep the old API, mark it as deprecated, and add a new API with the changed arguments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good one
Does the datafusion/datafusion/physical-plan/src/repartition/mod.rs Lines 921 to 926 in 91870b6
|
Which issue does this PR close?
Addresses #7957, #6822, #7001
Rationale for this change
Saves work in
coalescebatches
(less concat)What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?