Skip to content

[Parquet]Performance Degradation with RowFilter on Unsorted Columns due to Fragmented ReadPlan #8565

@hhhizzz

Description

@hhhizzz

Describe the bug

When a RowFilter is applied to a column that is unsorted and whose page index provides little pruning, the resulting ReadPlan becomes highly fragmented.
It consists of a large number of tiny select(N) and skip(M) operations (often with N = 1), leading to severe performance degradation during decoding.

The current “filter-during-decode” design processes data row by row and incurs heavy CPU overhead from branching and state management.
In extreme cases, filtering an unsorted column can be 10× slower than scanning the entire file — even though it touches far fewer rows.


To Reproduce

The following self-contained Rust program reproduces the issue.
Run in Release mode.

// To run this code, add the following to your Cargo.toml:
// [dependencies]
// parquet = { version = "56.2.0", features = ["async"] }
// tokio = { version = "1.47.1", features = ["full"] }
// arrow-array = "56.2.0"
// arrow-schema = "56.2.0"
// anyhow = "1.0.100"
// arrow = "56.2.0"

use std::fs::File;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::Result;
use arrow::array::{BooleanArray, BooleanBufferBuilder, Int32Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::builder::Int32Builder;
use parquet::arrow::arrow_reader::{
    ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowFilter,
};
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::arrow::ProjectionMask;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use rand::random;

// Goal: ~1GB file size
const TOTAL_ROWS: usize = 64_000_000; // ~64M rows
const ROWS_PER_ROW_GROUP: usize = 1_000_000; // 1M rows per row group
const OUTPUT_FILE: &str = "inventory.parquet";

// Schema column indices
const IDX_DATE_SK: usize = 0;
const IDX_ITEM_SK: usize = 1;
const IDX_WAREHOUSE_SK: usize = 2;
const IDX_QUANTITY: usize = 3;

// Predicate range for the filter
const LOWER_BOUND: i32 = 100;
const UPPER_BOUND: i32 = 500;

/// Manually builds a boolean mask for a "between" filter.
fn build_between_mask(col: &Int32Array, lower: i32, upper: i32) -> BooleanArray {
    let mut builder = BooleanBufferBuilder::new(col.len());
    for i in 0..col.len() {
        let v = col.value(i);
        builder.append(v >= lower && v <= upper);
    }
    BooleanArray::new(builder.finish(), None)
}

#[tokio::main]
async fn main() -> Result<()> {
    let path = PathBuf::from(OUTPUT_FILE);

    // 1) Generate the Parquet file. This can be slow, so we use spawn_blocking
    //    to avoid blocking the Tokio runtime.
    tokio::task::spawn_blocking({
        let path = path.clone();
        move || generate_inventory_parquet(&path, TOTAL_ROWS)
    })
        .await??;

    // 2) Read the file with predicate pushdown enabled.
    let (rows_with_pushdown, duration_with_pushdown) = tokio::task::spawn_blocking({
        let path = path.clone();
        move || read_with_pushdown_sync(&path)
    })
        .await??;

    // 3) Read the file without predicate pushdown (full scan).
    let (rows_without_pushdown, duration_without_pushdown) = tokio::task::spawn_blocking({
        let path = path.clone();
        move || read_without_pushdown_sync(&path)
    })
        .await??;

    // 4) Print the comparison results.
    println!(
        "\n--- Results ---\nWith Predicate Pushdown: Matched Rows = {}, Duration = {:.2?}\nWithout Pushdown       : Matched Rows = {}, Duration = {:.2?}",
        rows_with_pushdown, duration_with_pushdown, rows_without_pushdown, duration_without_pushdown
    );

    if rows_with_pushdown != rows_without_pushdown {
        eprintln!(
            "⚠️ Row count mismatch: pushdown={}, no_pushdown={}",
            rows_with_pushdown, rows_without_pushdown
        );
    } else {
        println!("✅ Row counts match: {}", rows_with_pushdown);
    }

    let speedup = duration_without_pushdown.as_secs_f64() / duration_with_pushdown.as_secs_f64();
    println!("⏱️ Speedup Factor (no_pushdown / pushdown) ≈ {:.2}x", speedup);

    Ok(())
}

/// Generates a TPC-DS style 'inventory' table as a Parquet file.
/// The data is uncompressed and page indexes are written to maximize the effect of page skipping.
fn generate_inventory_parquet(path: &Path, total_rows: usize) -> Result<()> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("inv_date_sk", DataType::Int32, false),
        Field::new("inv_item_sk", DataType::Int32, false),
        Field::new("inv_warehouse_sk", DataType::Int32, false),
        Field::new("inv_quantity_on_hand", DataType::Int32, false),
    ]));
    let file = File::create(path)?;

    let props = WriterProperties::builder()
        .set_compression(Compression::UNCOMPRESSED)
        .build();

    let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?;
    let num_row_groups = (total_rows + ROWS_PER_ROW_GROUP - 1) / ROWS_PER_ROW_GROUP;
    println!("Generating {} rows into {}...", total_rows, path.display());
    let start_time = Instant::now();

    for i in 0..num_row_groups {
        let base_row = i * ROWS_PER_ROW_GROUP;
        let rows_in_this_rg = std::cmp::min(ROWS_PER_ROW_GROUP, total_rows - base_row);

        let mut date_sk = Int32Builder::with_capacity(rows_in_this_rg);
        let mut item_sk = Int32Builder::with_capacity(rows_in_this_rg);
        let mut wh_sk = Int32Builder::with_capacity(rows_in_this_rg);
        let mut qty = Int32Builder::with_capacity(rows_in_this_rg);

        // Generate quantity values distributed between [0, 999] for a reasonable filter hit rate.
        for j in 0..rows_in_this_rg {
            let row_idx = (base_row + j) as i32;
            let val_date = 2_450_000 + (row_idx % 365);
            let val_item = 1 + (row_idx % 200_000);
            let val_wh = 1 + (row_idx % 1_000);
            let val_qty = random::<i32>() % 1_000; // Quantity between [0, 999]

            date_sk.append_value(val_date);
            item_sk.append_value(val_item);
            wh_sk.append_value(val_wh);
            qty.append_value(val_qty);
        }

        let batch = RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(date_sk.finish()),
                Arc::new(item_sk.finish()),
                Arc::new(wh_sk.finish()),
                Arc::new(qty.finish()),
            ],
        )?;

        writer.write(&batch)?;
        if (i + 1) % 8 == 0 {
            print!(".");
            std::io::stdout().flush().ok();
        }
    }

    writer.close()?;
    let file_size = std::fs::metadata(path)?.len();
    println!(
        "\nFinished writing: File size = {} bytes ≈ {:.2} MB, Time taken = {:.2?}",
        file_size,
        file_size as f64 / (1024.0 * 1024.0),
        start_time.elapsed()
    );
    Ok(())
}

/// Reads the Parquet file using a `RowFilter` to push down the predicate.
/// The reader should only yield rows that match the filter.
fn read_with_pushdown_sync(path: &Path) -> Result<(usize, Duration)> {
    let file = File::open(path)?;
    let options = ArrowReaderOptions::new().with_page_index(true);
    let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)?;

    // The predicate only depends on the 'inv_quantity_on_hand' column.
    let filter_mask = ProjectionMask::roots(builder.parquet_schema(), [IDX_QUANTITY]);
    let predicate = ArrowPredicateFn::new(filter_mask, move |batch: RecordBatch| {
        let qty_col = batch
            .column(0) // The batch passed to the predicate only contains projected columns.
            .as_any()
            .downcast_ref::<Int32Array>()
            .expect("Quantity column must be Int32");
        let mask = build_between_mask(qty_col, LOWER_BOUND, UPPER_BOUND);
        Ok(mask)
    });

    let row_filter = RowFilter::new(vec![Box::new(predicate)]);
    let reader = builder
        .with_row_filter(row_filter)
        .with_projection(ProjectionMask::all())
        .with_batch_size(1_000_000)
        .build()?;

    let start_time = Instant::now();
    let mut total_rows = 0usize;
    for batch_result in reader {
        total_rows += batch_result?.num_rows(); // Only matched rows are produced.
    }
    Ok((total_rows, start_time.elapsed()))
}

/// Reads the entire Parquet file into memory and applies the filter manually.
fn read_without_pushdown_sync(path: &Path) -> Result<(usize, Duration)> {
    let file = File::open(path)?;
    // No need for page index here as we are doing a full scan.
    let options = ArrowReaderOptions::new().with_page_index(false);
    let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)?;
    let reader = builder
        .with_projection(ProjectionMask::all())
        .with_batch_size(1_000_000)
        .build()?;

    let start_time = Instant::now();
    let mut matched_rows = 0usize;

    for batch_result in reader {
        let batch = batch_result?;
        let qty_col = batch
            .column(IDX_QUANTITY)
            .as_any()
            .downcast_ref::<Int32Array>()
            .expect("Quantity column must be Int32");

        for i in 0..qty_col.len() {
            let value = qty_col.value(i);
            if value >= LOWER_BOUND && value <= UPPER_BOUND {
                matched_rows += 1;
            }
        }
    }
    Ok((matched_rows, start_time.elapsed()))
}

Example output on my MacBook:

--- Results ---
With Predicate Pushdown: Matched Rows = 12834043, Duration = 3.11s
Without Pushdown       : Matched Rows = 12834043, Duration = 390.34ms
✅ Row counts match: 12834043
⏱️ Speedup Factor (no_pushdown / pushdown) ≈ 0.13x

By contrast, Arrow C++ achieves a small speedup on the same dataset.

--- Without Predicate Pushdown ---
Found 12834043 rows.
Time taken: 181.777 ms

--- With Predicate Pushdown ---
Found 12834043 rows.
Time taken: 171.618 ms

Sample code:

#include <iostream>
#include <chrono>
#include <memory>
#include <utility> // Required for std::move

// Arrowcore headers
#include "arrow/api.h"

// Arrow Dataset API, for predicate pushdown
#include "arrow/dataset/api.h"
#include "arrow/dataset/file_parquet.h"
#include "arrow/dataset/scanner.h"

// Parquet file reader
#include "parquet/arrow/reader.h"

// For building filter expressions
#include "arrow/compute/api.h"
#include "arrow/compute/expression.h"

// Helper function to check Arrow operation status
void StatusCheck(const arrow::Status& status) {
    if (!status.ok()) {
        std::cerr << "Arrow Error: " << status.ToString() << std::endl;
        exit(EXIT_FAILURE);
    }
}

// Method 1: Without Predicate Pushdown (This function is updated)
long long CountWithoutPredicatePushdown(const std::string& file_path) {
    long long count = 0;

    auto infile_result = arrow::io::ReadableFile::Open(file_path);
    StatusCheck(infile_result.status());
    std::shared_ptr<arrow::io::ReadableFile> infile = *infile_result;

    // 1. Call OpenFile without the output pointer. It now returns a Result.
    auto reader_result = parquet::arrow::OpenFile(infile, arrow::default_memory_pool());
    StatusCheck(reader_result.status());
    // 2. Move the unique_ptr out of the Result object.
    std::unique_ptr<parquet::arrow::FileReader> reader = std::move(*reader_result);

    int col_index = reader->parquet_reader()->metadata()->schema()->ColumnIndex("inv_quantity_on_hand");
    if (col_index < 0) {
        std::cerr << "Column 'inv_quantity_on_hand' not found." << std::endl;
        return -1;
    }

    int num_row_groups = reader->num_row_groups();
    for (int i = 0; i < num_row_groups; ++i) {
        std::shared_ptr<arrow::Table> table;
        StatusCheck(reader->ReadRowGroup(i, {col_index}, &table));

        auto col_chunks = table->column(0);

        for (const auto& chunk : col_chunks->chunks()) {
            auto int_array = std::static_pointer_cast<arrow::Int32Array>(chunk);
            for (int64_t j = 0; j < int_array->length(); ++j) {
                if (!int_array->IsNull(j)) {
                    int32_t value = int_array->Value(j);
                    if (value >= 100 && value <= 500) {
                        count++;
                    }
                }
            }
        }
    }
    return count;
}

// Method 2: With Predicate Pushdown (No changes needed here)
long long CountWithPredicatePushdown(const std::string& file_path) {
    long long count = 0;

    std::string uri = "file://" + file_path;

    arrow::dataset::FileSystemFactoryOptions options;
    auto factory_result = arrow::dataset::FileSystemDatasetFactory::Make(
        uri, std::make_shared<arrow::dataset::ParquetFileFormat>(), options);
    StatusCheck(factory_result.status());
    std::shared_ptr<arrow::dataset::DatasetFactory> factory = *factory_result;

    auto dataset_result = factory->Finish();
    StatusCheck(dataset_result.status());
    std::shared_ptr<arrow::dataset::Dataset> dataset = *dataset_result;

    arrow::compute::Expression filter_expr = arrow::compute::call("and",
        {
            arrow::compute::greater_equal(arrow::compute::field_ref("inv_quantity_on_hand"), arrow::compute::literal(100)),
            arrow::compute::less_equal(arrow::compute::field_ref("inv_quantity_on_hand"), arrow::compute::literal(500))
        }
    );

    auto scanner_builder_result = dataset->NewScan();
    StatusCheck(scanner_builder_result.status());
    std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder = *scanner_builder_result;

    StatusCheck(scanner_builder->Filter(filter_expr));
    StatusCheck(scanner_builder->Project({}));

    auto scanner_result = scanner_builder->Finish();
    StatusCheck(scanner_result.status());
    std::shared_ptr<arrow::dataset::Scanner> scanner = *scanner_result;

    auto reader_result = scanner->ToRecordBatchReader();
    StatusCheck(reader_result.status());
    std::shared_ptr<arrow::RecordBatchReader> reader = *reader_result;

    std::shared_ptr<arrow::RecordBatch> batch;
    while (true) {
        StatusCheck(reader->ReadNext(&batch));
        if (!batch) {
            break;
        }
        count += batch->num_rows();
    }

    return count;
}

int main() {
    arrow::compute::Initialize();
    std::string file_path = "/path/to/inventory.parquet";

    // --- Test without predicate pushdown ---
    auto start1 = std::chrono::high_resolution_clock::now();
    long long count1 = CountWithoutPredicatePushdown(file_path);
    auto end1 = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double, std::milli> duration1 = end1 - start1;

    std::cout << "--- Without Predicate Pushdown ---" << std::endl;
    std::cout << "Found " << count1 << " rows." << std::endl;
    std::cout << "Time taken: " << duration1.count() << " ms" << std::endl;
    std::cout << std::endl;

    // --- Test with predicate pushdown ---
    auto start2 = std::chrono::high_resolution_clock::now();
    long long count2 = CountWithPredicatePushdown(file_path);
    auto end2 = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double, std::milli> duration2 = end2 - start2;

    std::cout << "--- With Predicate Pushdown ---" << std::endl;
    std::cout << "Found " << count2 << " rows." << std::endl;
    std::cout << "Time taken: " << duration2.count() << " ms" << std::endl;
    std::cout << std::endl;

    if (count1 == count2) {
        std::cout << "✅ Results match!" << std::endl;
    } else {
        std::cout << "❌ Error: Results do not match!" << std::endl;
    }

    return 0;
}

Expected behavior

Predicate pushdown should not degrade performance.
Even if minimal pruning occurs, the runtime should be comparable to or better than a full scan, similar to the Arrow C++ implementation.


Additional context


Proposed Analysis & Solution

impl ParquetRecordBatchReader {

The slowdown is caused by excessive per-row branching and state transitions in a fragmented ReadPlan.
A possible hybrid strategy could mitigate this:

  1. Analyze plan fragmentation before decoding each page.
  2. If fragmentation is low, continue with the existing streaming “filter-during-decode” path.
  3. If fragmentation is high, switch to “decode-then-filter”: decode the full page into a temporary buffer and apply a vectorized filter kernel.

This approach would preserve Arrow-RS’s low-memory design in the common case while avoiding CPU blowups in pathological cases.


Questions for maintainers

  • Are there design constraints (e.g., streaming semantics, async I/O) that make this approach difficult to integrate?

Thank you for maintaining this excellent library.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions