Skip to content

feat: Support PiecewiseMergeJoin to speed up single range predicate joins #16660

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: main
Choose a base branch
from

Conversation

jonathanc-n
Copy link
Contributor

@jonathanc-n jonathanc-n commented Jul 2, 2025

Rationale for this change

PiecewiseMergeJoin is a nice pre cursor to the implementation of ASOF, inequality, etc. joins (multiple range predicates). PiecewiseMergeJoin is specialized for when there is only one range filter and can perform much faster in this case especially for semi, anti, mark joins.

What changes are included in this PR?

PiecewiseMergeJoin implementation only, there is no physical planner -> PiecewiseMergeJoinExec.

ExecutionPlan has been implemented for PiecewiseMergeJoinExec

  • Currently compute_properties and swap_inputs is not implemented
  • Builds execution plan for piecewise merge join exec
  • Buffered plan gets built at this step.

PiecewiseMergeJoinStream has been implemented for the actual batch emission logic

  • Behaviour is different for regular joins and existence joins.

Examples have been provided for the PiecewiseMergeJoinExec and PiecewiseMergeJoinStream implementations.

Benchmark Results

The benchmarks were tested on a random batch of values (streamed side) against a sorted batch (buffered side).

When compared to NestedLoopJoin the queries for classic joins (left, right, inner, full) were about 10x faster 🚀

  • However, when larger batch sizes were equal, it performed slower than the Nested loop join.

For existence joins (semi, anti), the join performed about 1000 x faster 🚀

  • Just as a quick note to explain the ridiculous speedup, all we need to do instead of a cartesian product, is find the max/min value of the unsorted stream side, and do a O(n) scan of the sorted buffered side to find the first match and emit all rows after it.
Benchmark Results for normal joins

   joins/PiecewiseMergeJoin/l=1000_r=1000
                        time:   [345.69 µs 351.99 µs 361.11 µs]
                        change: [-4.0041% -2.4315% -0.4405%] (p = 0.01 < 0.05)
                        Change within noise threshold.
Found 10 outliers among 100 measurements (10.00%)
  7 (7.00%) high mild
  3 (3.00%) high severe
joins/NestedLoopJoin/l=1000_r=1000
                        time:   [2.6237 ms 2.6518 ms 2.6870 ms]
                        change: [-4.0439% +0.5217% +4.4183%] (p = 0.84 > 0.05)
                        No change in performance detected.
Found 15 outliers among 100 measurements (15.00%)
  4 (4.00%) high mild
  11 (11.00%) high severe
Benchmarking joins/PiecewiseMergeJoin/l=10000_r=10000: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 52.4s, or reduce sample count to 10.
joins/PiecewiseMergeJoin/l=10000_r=10000
                        time:   [490.26 ms 501.24 ms 513.75 ms]
                        change: [-14.807% -9.4227% -4.1141%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 12 outliers among 100 measurements (12.00%)
  8 (8.00%) high mild
  4 (4.00%) high severe
Benchmarking joins/NestedLoopJoin/l=10000_r=10000: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 32.9s, or reduce sample count to 10.
joins/NestedLoopJoin/l=10000_r=10000
                        time:   [325.74 ms 330.41 ms 335.76 ms]
                        change: [-30.701% -25.545% -20.089%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 9 outliers among 100 measurements (9.00%)
  1 (1.00%) high mild
  8 (8.00%) high severe
joins/PiecewiseMergeJoin/l=100000_r=1000
                        time:   [46.738 ms 47.037 ms 47.348 ms]
                        change: [+6.8565% +7.8729% +8.8987%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild
Benchmarking joins/NestedLoopJoin/l=100000_r=1000: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 34.7s, or reduce sample count to 10.
joins/NestedLoopJoin/l=100000_r=1000
                        time:   [337.92 ms 355.00 ms 375.33 ms]
                        change: [+3.4274% +8.8931% +15.219%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 17 outliers among 100 measurements (17.00%)
  4 (4.00%) high mild
  13 (13.00%) high severe
joins/PiecewiseMergeJoin/l=10000_r=100
                        time:   [353.07 µs 356.19 µs 359.16 µs]
                        change: [-20.427% -19.045% -17.788%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 3 outliers among 100 measurements (3.00%)
  3 (3.00%) low mild
joins/NestedLoopJoin/l=10000_r=100
                        time:   [2.4624 ms 2.4690 ms 2.4759 ms]
                        change: [-35.277% -26.644% -17.558%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe
joins/PiecewiseMergeJoin/l=1000000_r=100
                        time:   [49.569 ms 49.788 ms 50.071 ms]
                        change: [-11.268% -8.9464% -7.0861%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 6 outliers among 100 measurements (6.00%)
  4 (4.00%) high mild
  2 (2.00%) high severe
Benchmarking joins/NestedLoopJoin/l=1000000_r=100: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 32.6s, or reduce sample count to 10.
joins/NestedLoopJoin/l=1000000_r=100
                        time:   [318.73 ms 321.16 ms 324.12 ms]
                        change: [-2.3069% -0.7454% +0.6191%] (p = 0.35 > 0.05)
                        No change in performance detected.
Found 3 outliers among 100 measurements (3.00%)
  1 (1.00%) high mild
  2 (2.00%) high severe

Benchmark Results for existence joins

joins/PiecewiseMergeJoin/l=1000_r=1000
                        time:   [17.562 µs 17.856 µs 18.368 µs]
                        change: [-95.034% -94.834% -94.578%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 5 outliers among 100 measurements (5.00%)
  5 (5.00%) high severe
joins/NestedLoopJoin/l=1000_r=1000
                        time:   [2.5747 ms 2.6143 ms 2.6718 ms]
                        change: [-3.5382% -1.4140% +1.1788%] (p = 0.24 > 0.05)
                        No change in performance detected.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high severe
joins/PiecewiseMergeJoin/l=10000_r=10000
                        time:   [126.97 µs 130.34 µs 133.60 µs]
                        change: [-99.975% -99.974% -99.973%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 21 outliers among 100 measurements (21.00%)
  16 (16.00%) low mild
  5 (5.00%) high mild
Benchmarking joins/NestedLoopJoin/l=10000_r=10000: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 32.1s, or reduce sample count to 10.
joins/NestedLoopJoin/l=10000_r=10000
                        time:   [324.45 ms 329.32 ms 335.64 ms]
                        change: [-2.5195% -0.3276% +2.0344%] (p = 0.79 > 0.05)
                        No change in performance detected.
Found 8 outliers among 100 measurements (8.00%)
  3 (3.00%) high mild
  5 (5.00%) high severe
Benchmarking joins/PiecewiseMergeJoin/l=100000_r=1000: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.7s, enable flat sampling, or reduce sample count to 60.
  
   joins/PiecewiseMergeJoin/l=1000_r=10000
                        time:   [21.704 µs 21.851 µs 22.039 µs]
                        change: [-99.951% -99.951% -99.951%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
  1 (1.00%) low mild
  3 (3.00%) high severe
joins/NestedLoopJoin/l=1000_r=10000
                        time:   [26.852 ms 27.166 ms 27.482 ms]
                        change: [-34.057% -28.293% -22.241%] (p = 0.00 < 0.05)
                        Performance has improved.
joins/PiecewiseMergeJoin/l=100_r=100000
                        time:   [74.249 µs 74.381 µs 74.516 µs]
                        change: [-99.952% -99.952% -99.951%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
  7 (7.00%) high mild
  1 (1.00%) high severe
joins/NestedLoopJoin/l=100_r=100000
                        time:   [25.960 ms 26.343 ms 26.807 ms]
                        change: [-1.0541% +0.8379% +2.8866%] (p = 0.41 > 0.05)
                        No change in performance detected.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high severe
joins/PiecewiseMergeJoin/l=1000_r=100000
                        time:   [82.470 µs 83.025 µs 83.761 µs]
                        change: [-99.996% -99.996% -99.996%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 12 outliers among 100 measurements (12.00%)
  6 (6.00%) high mild
  6 (6.00%) high severe
Benchmarking joins/NestedLoopJoin/l=1000_r=100000: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 33.0s, or reduce sample count to 10.
joins/NestedLoopJoin/l=1000_r=100000
                        time:   [322.35 ms 323.86 ms 325.53 ms]
                        change: [-33.068% -26.620% -19.778%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe
  

If you want to replicate:

Code

Here’s the hidden content that you can put Markdown in,
including lists, code blocks, images, etc.

use std::sync::Arc;

use arrow::array::{
  ArrayRef, Date32Builder, Decimal128Builder, Int32Builder, RecordBatch, StringBuilder,
};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
use datafusion_common::{JoinSide, Result};
use datafusion_execution::TaskContext;
use datafusion_expr::{JoinType, Operator};
use datafusion_physical_expr::expressions::{BinaryExpr, Column};

use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
use datafusion_physical_plan::joins::{NestedLoopJoinExec, PiecewiseMergeJoinExec};
use datafusion_physical_plan::test::TestMemoryExec;
use datafusion_physical_plan::{collect, ExecutionPlan};
use rand::{rng, Rng};
use tokio::runtime::Runtime;

/// Creates a RecordBatch of `num_rows` with completely random values in [0, 100_000].
pub fn create_random_batch(num_rows: usize) -> RecordBatch {
  let schema = Arc::new(Schema::new(vec![
      Field::new("c0", DataType::Int32, true),
      Field::new("c1", DataType::Utf8, true),
      Field::new("c2", DataType::Date32, true),
      Field::new("c3", DataType::Decimal128(11, 2), true),
  ]));

  let mut rng = rng();
  let mut a = Int32Builder::new();
  let mut b = StringBuilder::new();
  let mut c = Date32Builder::new();
  let mut d = Decimal128Builder::new()
      .with_precision_and_scale(11, 2)
      .unwrap();

  for _ in 0..num_rows {
      let int_val = rng.random_range(0..=100_000);
      a.append_value(int_val);
      b.append_value(format!("string_{int_val}"));
      c.append_value(int_val);
      let dec_val = (rng.random_range(0..=100_000) as i128) * 100;
      d.append_value(dec_val);
  }

  let a = Arc::new(a.finish()) as ArrayRef;
  let b = Arc::new(b.finish()) as ArrayRef;
  let c = Arc::new(c.finish()) as ArrayRef;
  let d = Arc::new(d.finish()) as ArrayRef;

  RecordBatch::try_new(schema.clone(), vec![a, b, c, d]).unwrap()
}

pub fn create_sorted_batch(num_rows: usize, max_increment: i32) -> RecordBatch {
  let schema = Arc::new(Schema::new(vec![
      Field::new("c0", DataType::Int32, true),
      Field::new("c1", DataType::Utf8, true),
      Field::new("c2", DataType::Date32, true),
      Field::new("c3", DataType::Decimal128(11, 2), true),
  ]));

  let mut rng = rng();
  let mut a = Int32Builder::new();
  let mut b = StringBuilder::new();
  let mut c = Date32Builder::new();
  let mut d = Decimal128Builder::new()
      .with_precision_and_scale(11, 2)
      .unwrap();

  let mut current = rng.random_range(0..=max_increment);
  for _ in 0..num_rows {
      let inc = rng.random_range(0..=max_increment);
      current = current.saturating_add(inc);
      a.append_value(current);
      b.append_value(format!("string_{current}"));
      c.append_value(current);
      d.append_value((current as i128) * 100);
  }

  let a = Arc::new(a.finish()) as ArrayRef;
  let b = Arc::new(b.finish()) as ArrayRef;
  let c = Arc::new(c.finish()) as ArrayRef;
  let d = Arc::new(d.finish()) as ArrayRef;

  RecordBatch::try_new(schema.clone(), vec![a, b, c, d]).unwrap()
}

fn make_memory_execs(
  left_rows: usize,
  right_rows: usize,
) -> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>, SchemaRef) {
  let left_batch = create_random_batch(left_rows);
  let schema = left_batch.schema();
  let left_partitions = vec![vec![left_batch]];

  let right_batch = create_sorted_batch(right_rows, 10);
  let right_partitions = vec![vec![right_batch]];

  let left_mem =
      TestMemoryExec::try_new_exec(&left_partitions, schema.clone(), None).unwrap();
  let right_mem =
      TestMemoryExec::try_new_exec(&right_partitions, schema.clone(), None).unwrap();

  (left_mem, right_mem, schema)
}

fn build_two_joins(
  left: Arc<dyn ExecutionPlan>,
  right: Arc<dyn ExecutionPlan>,
) -> Result<(
  Arc<dyn ExecutionPlan>, // pwmj
  Arc<dyn ExecutionPlan>, // nlj
)> {
  let left_on: Arc<dyn PhysicalExpr> = Arc::new(
      Column::new_with_schema("c0", &left.schema())
          .expect("left schema must contain 'c0'"),
  );
  let right_on: Arc<dyn PhysicalExpr> = Arc::new(
      Column::new_with_schema("c0", &right.schema())
          .expect("right schema must contain 'c0'"),
  );

  let hj = PiecewiseMergeJoinExec::try_new(
      left.clone(),
      right.clone(),
      (left_on.clone(), right_on.clone()),
      Operator::Lt,
      JoinType::Left,
  )?;

  let filter_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
      left_on.clone(),
      Operator::Lt,
      right_on.clone(),
  ));

  let column_indices = vec![
      ColumnIndex {
          index: 0,
          side: JoinSide::Left,
      },
      ColumnIndex {
          index: 0,
          side: JoinSide::Right,
      },
  ];

  let intermediate_schema = Arc::new(Schema::new(vec![
      Field::new("c0_left", DataType::Int32, false),
      Field::new("c0_right", DataType::Int32, false),
  ]));

  let join_filter = JoinFilter::new(filter_expr, column_indices, intermediate_schema); // :contentReference[oaicite
  let nlj = NestedLoopJoinExec::try_new(
      left,
      right,
      Some(join_filter),
      &JoinType::Left,
      None,
  )?;

  Ok((Arc::new(hj), Arc::new(nlj)))
}

fn bench_joins(c: &mut Criterion) {
  let rt = Runtime::new().unwrap();
  let mut group = c.benchmark_group("joins");

  // row pairs for each side in benchmarks
  let size_pairs = &[
      (1000, 1000),
      (10000, 10000),
      (100000, 1000),
      (10000, 100),
      (1000000, 100),
      (1000, 10000),
      (100, 100000),
      (1000, 100000),
  ];

  for &(left_rows, right_rows) in size_pairs.iter() {
      let (left_mem, right_mem, _schema) = make_memory_execs(left_rows, right_rows);

      let (pwmj_join, nested_loop_join) =
          build_two_joins(left_mem.clone(), right_mem.clone()).unwrap();

      group.bench_with_input(
          BenchmarkId::new(
              "PiecewiseMergeJoin",
              format!("l={}_r={}", left_rows, right_rows),
          ),
          &pwmj_join,
          |b, plan| {
              b.iter_batched(
                  || (),
                  |_setup| {
                      let ctx = TaskContext::default();
                      let fut = collect(plan.clone(), Arc::new(ctx));
                      rt.block_on(async {
                          let _ = fut.await.unwrap();
                      });
                  },
                  BatchSize::SmallInput,
              )
          },
      );

      group.bench_with_input(
          BenchmarkId::new(
              "NestedLoopJoin",
              format!("l={}_r={}", left_rows, right_rows),
          ),
          &nested_loop_join,
          |b, plan| {
              b.iter_batched(
                  || (),
                  |_setup| {
                      let ctx = TaskContext::default();
                      let fut = collect(plan.clone(), Arc::new(ctx));
                      rt.block_on(async {
                          let _ = fut.await.unwrap();
                      });
                  },
                  BatchSize::SmallInput,
              )
          },
      );
  }

  group.finish();
}

criterion_group!(benches, bench_joins);
criterion_main!(benches);

Next Steps

Pull request was getting large, here are the following steps for this:

  • Serialization
  • Mark join support
  • physical planner
  • fuzz tests
  • Refactor to compare on same key similar to sort merge join

Are these changes tested?

Yes unit tests

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Jul 2, 2025
@jonathanc-n
Copy link
Contributor Author

jonathanc-n commented Jul 5, 2025

cc @alamb @ozankabak Seems like you guys were part of the discussion for range joins, this is a nice start to it? @Dandandan @comphead @my-vegetable-has-exploded might be interested

@alamb
Copy link
Contributor

alamb commented Jul 7, 2025

I will try and review this Pr later this week

@comphead
Copy link
Contributor

comphead commented Jul 7, 2025

Thanks @jonathanc-n let me first get familiar with this kind of join

let left_indices = (0..left_size as u64).collect::<UInt64Array>();
let right_indices = (0..left_size)
.map(|idx| left_bit_map.get_bit(idx).then_some(0))
.collect::<UInt32Array>();
return (left_indices, right_indices);
}
let left_indices = if join_type == JoinType::LeftSemi {
let left_indices = if join_type == JoinType::LeftSemi
|| (join_type == JoinType::RightSemi && piecewise)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so piecewise works only together with RightSemi?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it falls through with left semi as well. In left and right semi/anti/mark join we use the bitmap to mark all matched sides on the buffered side (this is done in process_unmatched_buffered_batch), we use the flag to only allow right semi/anti/mark to follow through when calling from the piecewise join. Usually the bitmap is only used to mark the unmatched rows on left side, which is why it originally only holds support for Left semi/anti/mark. I'll add a comment at the beginning of process unmatched buffered batch to explain this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-plan Changes to the physical-plan crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants