Skip to content

Use interleave in hash repartitioning #15768

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

Dandandan
Copy link
Contributor

@Dandandan Dandandan commented Apr 18, 2025

Which issue does this PR close?

Addresses #7957, #6822, #7001

Rationale for this change

Saves work in coalescebatches (less concat)

--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃ partition_interleave ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │     1.06ms │               1.17ms │  1.10x slower │
│ QQuery 1     │    24.27ms │              23.55ms │     no change │
│ QQuery 2     │    67.61ms │              67.17ms │     no change │
│ QQuery 3     │    54.63ms │              54.41ms │     no change │
│ QQuery 4     │   551.61ms │             530.68ms │     no change │
│ QQuery 5     │   637.90ms │             672.23ms │  1.05x slower │
│ QQuery 6     │     1.08ms │               1.07ms │     no change │
│ QQuery 7     │    29.59ms │              27.36ms │ +1.08x faster │
│ QQuery 8     │   607.22ms │             588.82ms │     no change │
│ QQuery 9     │   853.83ms │             843.20ms │     no change │
│ QQuery 10    │   192.85ms │             187.92ms │     no change │
│ QQuery 11    │   211.83ms │             211.88ms │     no change │
│ QQuery 12    │   765.88ms │             774.98ms │     no change │
│ QQuery 13    │   989.17ms │            1005.62ms │     no change │
│ QQuery 14    │   679.62ms │             623.39ms │ +1.09x faster │
│ QQuery 15    │   704.14ms │             687.47ms │     no change │
│ QQuery 16    │  1385.58ms │            1351.62ms │     no change │
│ QQuery 17    │  1266.21ms │            1276.99ms │     no change │
│ QQuery 18    │  2466.26ms │            2381.56ms │     no change │
│ QQuery 19    │    46.72ms │              48.64ms │     no change │
│ QQuery 20    │   952.90ms │             980.69ms │     no change │
│ QQuery 21    │  1114.67ms │            1141.75ms │     no change │
│ QQuery 22    │  1838.60ms │            1830.09ms │     no change │
│ QQuery 23    │  6292.87ms │            6258.52ms │     no change │
│ QQuery 24    │   379.92ms │             371.83ms │     no change │
│ QQuery 25    │   359.10ms │             361.16ms │     no change │
│ QQuery 26    │   436.73ms │             437.79ms │     no change │
│ QQuery 27    │  1381.06ms │            1413.99ms │     no change │
│ QQuery 28    │ 11498.71ms │           11437.92ms │     no change │
│ QQuery 29    │   440.50ms │             441.44ms │     no change │
│ QQuery 30    │   597.33ms │             612.49ms │     no change │
│ QQuery 31    │   618.56ms │             554.75ms │ +1.12x faster │
│ QQuery 32    │  2671.70ms │            2559.75ms │     no change │
│ QQuery 33    │  2852.42ms │            2625.58ms │ +1.09x faster │
│ QQuery 34    │  3266.58ms │            3021.01ms │ +1.08x faster │
│ QQuery 35    │   938.89ms │             944.89ms │     no change │
│ QQuery 36    │    82.39ms │              84.81ms │     no change │
│ QQuery 37    │    37.82ms │              37.98ms │     no change │
│ QQuery 38    │    83.94ms │              83.57ms │     no change │
│ QQuery 39    │   139.66ms │             134.62ms │     no change │
│ QQuery 40    │    30.17ms │              28.21ms │ +1.07x faster │
│ QQuery 41    │    27.92ms │              27.69ms │     no change │
│ QQuery 42    │    23.04ms │              22.76ms │     no change │
└──────────────┴────────────┴──────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                   ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                   │ 47602.53ms │
│ Total Time (partition_interleave)   │ 46773.01ms │
│ Average Time (main)                 │  1107.04ms │
│ Average Time (partition_interleave) │  1087.74ms │
│ Queries Faster                      │          6 │
│ Queries Slower                      │          2 │
│ Queries with No Change              │         35 │
└─────────────────────────────────────┴────────────┘



--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     main ┃ partition_interleave ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  82.91ms │              71.95ms │ +1.15x faster │
│ QQuery 2     │  13.44ms │              14.48ms │  1.08x slower │
│ QQuery 3     │  21.56ms │              25.41ms │  1.18x slower │
│ QQuery 4     │  13.14ms │              13.22ms │     no change │
│ QQuery 5     │  35.31ms │              40.70ms │  1.15x slower │
│ QQuery 6     │   5.42ms │               5.08ms │ +1.07x faster │
│ QQuery 7     │  74.82ms │              70.62ms │ +1.06x faster │
│ QQuery 8     │  18.12ms │              17.11ms │ +1.06x faster │
│ QQuery 9     │  41.97ms │              38.48ms │ +1.09x faster │
│ QQuery 10    │  37.72ms │              39.52ms │     no change │
│ QQuery 11    │   6.78ms │               6.31ms │ +1.08x faster │
│ QQuery 12    │  26.04ms │              26.83ms │     no change │
│ QQuery 13    │  20.12ms │              19.11ms │ +1.05x faster │
│ QQuery 14    │   6.12ms │               6.31ms │     no change │
│ QQuery 15    │  13.22ms │              12.88ms │     no change │
│ QQuery 16    │  15.42ms │              14.91ms │     no change │
│ QQuery 17    │  60.49ms │              53.01ms │ +1.14x faster │
│ QQuery 18    │ 149.16ms │             131.06ms │ +1.14x faster │
│ QQuery 19    │  23.93ms │              22.15ms │ +1.08x faster │
│ QQuery 20    │  23.31ms │              21.74ms │ +1.07x faster │
│ QQuery 21    │  97.12ms │             100.41ms │     no change │
│ QQuery 22    │  14.00ms │              12.53ms │ +1.12x faster │
└──────────────┴──────────┴──────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ Benchmark Summary                   ┃          ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ Total Time (main)                   │ 800.12ms │
│ Total Time (partition_interleave)   │ 763.82ms │
│ Average Time (main)                 │  36.37ms │
│ Average Time (partition_interleave) │  34.72ms │
│ Queries Faster                      │       12 │
│ Queries Slower                      │        3 │
│ Queries with No Change              │        7 │
└─────────────────────────────────────┴──────────┘
--------------------
Benchmark tpch_mem_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃      main ┃ partition_interleave ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  708.80ms │             732.16ms │     no change │
│ QQuery 2     │  118.13ms │             111.67ms │ +1.06x faster │
│ QQuery 3     │  231.20ms │             243.78ms │  1.05x slower │
│ QQuery 4     │  128.75ms │             117.27ms │ +1.10x faster │
│ QQuery 5     │  485.65ms │             508.54ms │     no change │
│ QQuery 6     │   41.76ms │              39.76ms │     no change │
│ QQuery 7     │ 1045.93ms │             950.99ms │ +1.10x faster │
│ QQuery 8     │  308.03ms │             402.42ms │  1.31x slower │
│ QQuery 9     │  802.59ms │             924.43ms │  1.15x slower │
│ QQuery 10    │  366.24ms │             384.73ms │  1.05x slower │
│ QQuery 11    │   78.49ms │              79.59ms │     no change │
│ QQuery 12    │  272.61ms │             248.18ms │ +1.10x faster │
│ QQuery 13    │  258.18ms │             229.36ms │ +1.13x faster │
│ QQuery 14    │   44.89ms │              49.77ms │  1.11x slower │
│ QQuery 15    │  133.83ms │             119.50ms │ +1.12x faster │
│ QQuery 16    │   87.74ms │              86.65ms │     no change │
│ QQuery 17    │  866.81ms │             741.38ms │ +1.17x faster │
│ QQuery 18    │ 2827.04ms │            2133.64ms │ +1.32x faster │
│ QQuery 19    │  174.92ms │             164.81ms │ +1.06x faster │
│ QQuery 20    │  237.00ms │             196.51ms │ +1.21x faster │
│ QQuery 21    │ 1476.31ms │            1326.69ms │ +1.11x faster │
│ QQuery 22    │  103.81ms │              95.21ms │ +1.09x faster │
└──────────────┴───────────┴──────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                   ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                   │ 10798.71ms │
│ Total Time (partition_interleave)   │  9887.04ms │
│ Average Time (main)                 │   490.85ms │
│ Average Time (partition_interleave) │   449.41ms │
│ Queries Faster                      │         12 │
│ Queries Slower                      │          5 │
│ Queries with No Change              │          5 │
└─────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     main ┃ partition_interleave ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 119.51ms │             118.50ms │     no change │
│ QQuery 2     │  47.77ms │              46.39ms │     no change │
│ QQuery 3     │  54.73ms │              54.28ms │     no change │
│ QQuery 4     │  41.15ms │              41.17ms │     no change │
│ QQuery 5     │  76.56ms │              78.65ms │     no change │
│ QQuery 6     │  21.34ms │              19.83ms │ +1.08x faster │
│ QQuery 7     │  93.86ms │              89.96ms │     no change │
│ QQuery 8     │  75.74ms │              73.42ms │     no change │
│ QQuery 9     │ 112.47ms │             107.36ms │     no change │
│ QQuery 10    │  99.40ms │              99.34ms │     no change │
│ QQuery 11    │  37.57ms │              35.18ms │ +1.07x faster │
│ QQuery 12    │  50.79ms │              53.85ms │  1.06x slower │
│ QQuery 13    │ 134.96ms │             127.03ms │ +1.06x faster │
│ QQuery 14    │  36.78ms │              36.84ms │     no change │
│ QQuery 15    │  45.07ms │              43.73ms │     no change │
│ QQuery 16    │  28.66ms │              27.33ms │     no change │
│ QQuery 17    │ 104.85ms │              97.23ms │ +1.08x faster │
│ QQuery 18    │ 157.98ms │             141.05ms │ +1.12x faster │
│ QQuery 19    │  64.44ms │              63.13ms │     no change │
│ QQuery 20    │  61.32ms │              54.65ms │ +1.12x faster │
│ QQuery 21    │ 118.36ms │             118.64ms │     no change │
│ QQuery 22    │  31.31ms │              28.68ms │ +1.09x faster │
└──────────────┴──────────┴──────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                   ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main)                   │ 1614.65ms │
│ Total Time (partition_interleave)   │ 1556.23ms │
│ Average Time (main)                 │   73.39ms │
│ Average Time (partition_interleave) │   70.74ms │
│ Queries Faster                      │         7 │
│ Queries Slower                      │         1 │
│ Queries with No Change              │        14 │
└─────────────────────────────────────┴───────────┘
--------------------
Benchmark tpch_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃      main ┃ partition_interleave ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  958.12ms │             893.06ms │ +1.07x faster │
│ QQuery 2     │  155.97ms │             151.12ms │     no change │
│ QQuery 3     │  444.95ms │             404.93ms │ +1.10x faster │
│ QQuery 4     │  493.09ms │             424.63ms │ +1.16x faster │
│ QQuery 5     │  699.85ms │             705.07ms │     no change │
│ QQuery 6     │  151.53ms │             146.06ms │     no change │
│ QQuery 7     │  859.27ms │             925.51ms │  1.08x slower │
│ QQuery 8     │  672.54ms │             694.92ms │     no change │
│ QQuery 9     │ 1126.95ms │            1202.57ms │  1.07x slower │
│ QQuery 10    │  655.96ms │             630.90ms │     no change │
│ QQuery 11    │  138.04ms │             113.87ms │ +1.21x faster │
│ QQuery 12    │  317.60ms │             308.93ms │     no change │
│ QQuery 13    │  692.45ms │             638.41ms │ +1.08x faster │
│ QQuery 14    │  255.59ms │             254.78ms │     no change │
│ QQuery 15    │  390.63ms │             400.39ms │     no change │
│ QQuery 16    │  108.01ms │             109.57ms │     no change │
│ QQuery 17    │ 1170.52ms │            1035.03ms │ +1.13x faster │
│ QQuery 18    │ 1977.86ms │            1541.96ms │ +1.28x faster │
│ QQuery 19    │  439.84ms │             457.03ms │     no change │
│ QQuery 20    │  416.80ms │             360.84ms │ +1.16x faster │
│ QQuery 21    │ 1412.98ms │            1295.92ms │ +1.09x faster │
│ QQuery 22    │  155.17ms │             140.06ms │ +1.11x faster │
└──────────────┴───────────┴──────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                   ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                   │ 13693.73ms │
│ Total Time (partition_interleave)   │ 12835.54ms │
│ Average Time (main)                 │   622.44ms │
│ Average Time (partition_interleave) │   583.43ms │
│ Queries Faster                      │         10 │
│ Queries Slower                      │          2 │
│ Queries with No Change              │         10 │
└─────────────────────────────────────┴────────────┘

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

)
.unwrap();

let batch = interleave_record_batch(&b, &indices)?;
Copy link
Contributor Author

@Dandandan Dandandan Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably an api like apache/arrow-rs#7325 would be even faster (avoiding one level of "trivial" indexing).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @ctsk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice change, and a much clearer performance win than #15479. I expect (without testing) that these two PRs interact negatively with one another - Removing coalesce will mean that the data is "more scattered" in memory and probably make interleave work worse - as well as the computation of the left join keys.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think removing coalesce after this change (for all hash repartitions) might be possible, as the output batch size will be roughly equal to input batch size (instead of roughly 1/partitions * batch_size). Unless hash values are somehow skewed (but this is currently also not good anyway).

A future api could use your take_in api maybe to only output rows once batch size has been reached.

Copy link
Contributor

@ctsk ctsk Apr 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see I had misunderstood this PR. It makes a lot of sense to do this. As part of prototyping the integration of a take_in API in datafusion, I made a similar change - move the buffering before sending the small batches to their destination thread. I don't remember seeing as much speedup when I benchmarked that change independently - I guess using interleave instead of a take/concat combo (like I did back then) makes a significant difference. Awesome!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the speedup comes from avoiding copying the data a second time in concat / CoalesceBatches. So when using take_in we should be careful to use it once (for a single destination batch) to avoid doing the concat on the small batches

@Dandandan Dandandan changed the title User interleave in hash repartitioning Use interleave in hash repartitioning Apr 18, 2025
@Dandandan Dandandan requested a review from Copilot April 18, 2025 21:18
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR improves DataFusion’s hash repartitioning by using arrow's interleave_record_batch, refactors the partitioning interface to work with multiple batches, and adds a buffering mechanism in the repartition executor.

  • Changes the partition and partition_iter functions to accept a Vec instead of a single batch.
  • Replaces the use of take_arrays with interleave_record_batch to build repartitioned batches efficiently.
  • Updates the RepartitionExec logic to buffer input batches based on the partitioning mode.
Comments suppressed due to low confidence (1)

datafusion/physical-plan/src/repartition/mod.rs:316

  • [nitpick] The variable name 'b' is not descriptive. Consider renaming it to something like 'batch_refs' or 'batches_slice' to improve code readability.
let b: Vec<&RecordBatch> = batches.iter().collect();

@@ -233,11 +233,11 @@ impl BatchPartitioner {
///
/// The time spent repartitioning, not including time spent in `f` will be recorded
/// to the [`metrics::Time`] provided on construction
pub fn partition<F>(&mut self, batch: RecordBatch, mut f: F) -> Result<()>
pub fn partition<F>(&mut self, batches: Vec<RecordBatch>, mut f: F) -> Result<()>
Copy link
Member

@xudong963 xudong963 Apr 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a pub API, it would be best if we could keep the old API, mark it as deprecated, and add a new API with the changed arguments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good one

@ctsk
Copy link
Contributor

ctsk commented Apr 20, 2025

Does the batches_until_yield threshold need to be adjusted?

if batches_until_yield == 0 {
tokio::task::yield_now().await;
batches_until_yield = partitioner.num_partitions();
} else {
batches_until_yield -= 1;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants