-
Notifications
You must be signed in to change notification settings - Fork 1k
Description
Describe the bug
When a RowFilter
is applied to a column that is unsorted and whose page index provides little pruning, the resulting ReadPlan
becomes highly fragmented.
It consists of a large number of tiny select(N)
and skip(M)
operations (often with N = 1
), leading to severe performance degradation during decoding.
The current “filter-during-decode” design processes data row by row and incurs heavy CPU overhead from branching and state management.
In extreme cases, filtering an unsorted column can be 10× slower than scanning the entire file — even though it touches far fewer rows.
To Reproduce
The following self-contained Rust program reproduces the issue.
Run in Release mode.
// To run this code, add the following to your Cargo.toml:
// [dependencies]
// parquet = { version = "56.2.0", features = ["async"] }
// tokio = { version = "1.47.1", features = ["full"] }
// arrow-array = "56.2.0"
// arrow-schema = "56.2.0"
// anyhow = "1.0.100"
// arrow = "56.2.0"
use std::fs::File;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::Result;
use arrow::array::{BooleanArray, BooleanBufferBuilder, Int32Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::builder::Int32Builder;
use parquet::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowFilter,
};
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::arrow::ProjectionMask;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use rand::random;
// Goal: ~1GB file size
const TOTAL_ROWS: usize = 64_000_000; // ~64M rows
const ROWS_PER_ROW_GROUP: usize = 1_000_000; // 1M rows per row group
const OUTPUT_FILE: &str = "inventory.parquet";
// Schema column indices
const IDX_DATE_SK: usize = 0;
const IDX_ITEM_SK: usize = 1;
const IDX_WAREHOUSE_SK: usize = 2;
const IDX_QUANTITY: usize = 3;
// Predicate range for the filter
const LOWER_BOUND: i32 = 100;
const UPPER_BOUND: i32 = 500;
/// Manually builds a boolean mask for a "between" filter.
fn build_between_mask(col: &Int32Array, lower: i32, upper: i32) -> BooleanArray {
let mut builder = BooleanBufferBuilder::new(col.len());
for i in 0..col.len() {
let v = col.value(i);
builder.append(v >= lower && v <= upper);
}
BooleanArray::new(builder.finish(), None)
}
#[tokio::main]
async fn main() -> Result<()> {
let path = PathBuf::from(OUTPUT_FILE);
// 1) Generate the Parquet file. This can be slow, so we use spawn_blocking
// to avoid blocking the Tokio runtime.
tokio::task::spawn_blocking({
let path = path.clone();
move || generate_inventory_parquet(&path, TOTAL_ROWS)
})
.await??;
// 2) Read the file with predicate pushdown enabled.
let (rows_with_pushdown, duration_with_pushdown) = tokio::task::spawn_blocking({
let path = path.clone();
move || read_with_pushdown_sync(&path)
})
.await??;
// 3) Read the file without predicate pushdown (full scan).
let (rows_without_pushdown, duration_without_pushdown) = tokio::task::spawn_blocking({
let path = path.clone();
move || read_without_pushdown_sync(&path)
})
.await??;
// 4) Print the comparison results.
println!(
"\n--- Results ---\nWith Predicate Pushdown: Matched Rows = {}, Duration = {:.2?}\nWithout Pushdown : Matched Rows = {}, Duration = {:.2?}",
rows_with_pushdown, duration_with_pushdown, rows_without_pushdown, duration_without_pushdown
);
if rows_with_pushdown != rows_without_pushdown {
eprintln!(
"⚠️ Row count mismatch: pushdown={}, no_pushdown={}",
rows_with_pushdown, rows_without_pushdown
);
} else {
println!("✅ Row counts match: {}", rows_with_pushdown);
}
let speedup = duration_without_pushdown.as_secs_f64() / duration_with_pushdown.as_secs_f64();
println!("⏱️ Speedup Factor (no_pushdown / pushdown) ≈ {:.2}x", speedup);
Ok(())
}
/// Generates a TPC-DS style 'inventory' table as a Parquet file.
/// The data is uncompressed and page indexes are written to maximize the effect of page skipping.
fn generate_inventory_parquet(path: &Path, total_rows: usize) -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("inv_date_sk", DataType::Int32, false),
Field::new("inv_item_sk", DataType::Int32, false),
Field::new("inv_warehouse_sk", DataType::Int32, false),
Field::new("inv_quantity_on_hand", DataType::Int32, false),
]));
let file = File::create(path)?;
let props = WriterProperties::builder()
.set_compression(Compression::UNCOMPRESSED)
.build();
let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?;
let num_row_groups = (total_rows + ROWS_PER_ROW_GROUP - 1) / ROWS_PER_ROW_GROUP;
println!("Generating {} rows into {}...", total_rows, path.display());
let start_time = Instant::now();
for i in 0..num_row_groups {
let base_row = i * ROWS_PER_ROW_GROUP;
let rows_in_this_rg = std::cmp::min(ROWS_PER_ROW_GROUP, total_rows - base_row);
let mut date_sk = Int32Builder::with_capacity(rows_in_this_rg);
let mut item_sk = Int32Builder::with_capacity(rows_in_this_rg);
let mut wh_sk = Int32Builder::with_capacity(rows_in_this_rg);
let mut qty = Int32Builder::with_capacity(rows_in_this_rg);
// Generate quantity values distributed between [0, 999] for a reasonable filter hit rate.
for j in 0..rows_in_this_rg {
let row_idx = (base_row + j) as i32;
let val_date = 2_450_000 + (row_idx % 365);
let val_item = 1 + (row_idx % 200_000);
let val_wh = 1 + (row_idx % 1_000);
let val_qty = random::<i32>() % 1_000; // Quantity between [0, 999]
date_sk.append_value(val_date);
item_sk.append_value(val_item);
wh_sk.append_value(val_wh);
qty.append_value(val_qty);
}
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(date_sk.finish()),
Arc::new(item_sk.finish()),
Arc::new(wh_sk.finish()),
Arc::new(qty.finish()),
],
)?;
writer.write(&batch)?;
if (i + 1) % 8 == 0 {
print!(".");
std::io::stdout().flush().ok();
}
}
writer.close()?;
let file_size = std::fs::metadata(path)?.len();
println!(
"\nFinished writing: File size = {} bytes ≈ {:.2} MB, Time taken = {:.2?}",
file_size,
file_size as f64 / (1024.0 * 1024.0),
start_time.elapsed()
);
Ok(())
}
/// Reads the Parquet file using a `RowFilter` to push down the predicate.
/// The reader should only yield rows that match the filter.
fn read_with_pushdown_sync(path: &Path) -> Result<(usize, Duration)> {
let file = File::open(path)?;
let options = ArrowReaderOptions::new().with_page_index(true);
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)?;
// The predicate only depends on the 'inv_quantity_on_hand' column.
let filter_mask = ProjectionMask::roots(builder.parquet_schema(), [IDX_QUANTITY]);
let predicate = ArrowPredicateFn::new(filter_mask, move |batch: RecordBatch| {
let qty_col = batch
.column(0) // The batch passed to the predicate only contains projected columns.
.as_any()
.downcast_ref::<Int32Array>()
.expect("Quantity column must be Int32");
let mask = build_between_mask(qty_col, LOWER_BOUND, UPPER_BOUND);
Ok(mask)
});
let row_filter = RowFilter::new(vec![Box::new(predicate)]);
let reader = builder
.with_row_filter(row_filter)
.with_projection(ProjectionMask::all())
.with_batch_size(1_000_000)
.build()?;
let start_time = Instant::now();
let mut total_rows = 0usize;
for batch_result in reader {
total_rows += batch_result?.num_rows(); // Only matched rows are produced.
}
Ok((total_rows, start_time.elapsed()))
}
/// Reads the entire Parquet file into memory and applies the filter manually.
fn read_without_pushdown_sync(path: &Path) -> Result<(usize, Duration)> {
let file = File::open(path)?;
// No need for page index here as we are doing a full scan.
let options = ArrowReaderOptions::new().with_page_index(false);
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)?;
let reader = builder
.with_projection(ProjectionMask::all())
.with_batch_size(1_000_000)
.build()?;
let start_time = Instant::now();
let mut matched_rows = 0usize;
for batch_result in reader {
let batch = batch_result?;
let qty_col = batch
.column(IDX_QUANTITY)
.as_any()
.downcast_ref::<Int32Array>()
.expect("Quantity column must be Int32");
for i in 0..qty_col.len() {
let value = qty_col.value(i);
if value >= LOWER_BOUND && value <= UPPER_BOUND {
matched_rows += 1;
}
}
}
Ok((matched_rows, start_time.elapsed()))
}
Example output on my MacBook:
--- Results ---
With Predicate Pushdown: Matched Rows = 12834043, Duration = 3.11s
Without Pushdown : Matched Rows = 12834043, Duration = 390.34ms
✅ Row counts match: 12834043
⏱️ Speedup Factor (no_pushdown / pushdown) ≈ 0.13x
By contrast, Arrow C++ achieves a small speedup on the same dataset.
--- Without Predicate Pushdown ---
Found 12834043 rows.
Time taken: 181.777 ms
--- With Predicate Pushdown ---
Found 12834043 rows.
Time taken: 171.618 ms
Sample code:
#include <iostream>
#include <chrono>
#include <memory>
#include <utility> // Required for std::move
// Arrowcore headers
#include "arrow/api.h"
// Arrow Dataset API, for predicate pushdown
#include "arrow/dataset/api.h"
#include "arrow/dataset/file_parquet.h"
#include "arrow/dataset/scanner.h"
// Parquet file reader
#include "parquet/arrow/reader.h"
// For building filter expressions
#include "arrow/compute/api.h"
#include "arrow/compute/expression.h"
// Helper function to check Arrow operation status
void StatusCheck(const arrow::Status& status) {
if (!status.ok()) {
std::cerr << "Arrow Error: " << status.ToString() << std::endl;
exit(EXIT_FAILURE);
}
}
// Method 1: Without Predicate Pushdown (This function is updated)
long long CountWithoutPredicatePushdown(const std::string& file_path) {
long long count = 0;
auto infile_result = arrow::io::ReadableFile::Open(file_path);
StatusCheck(infile_result.status());
std::shared_ptr<arrow::io::ReadableFile> infile = *infile_result;
// 1. Call OpenFile without the output pointer. It now returns a Result.
auto reader_result = parquet::arrow::OpenFile(infile, arrow::default_memory_pool());
StatusCheck(reader_result.status());
// 2. Move the unique_ptr out of the Result object.
std::unique_ptr<parquet::arrow::FileReader> reader = std::move(*reader_result);
int col_index = reader->parquet_reader()->metadata()->schema()->ColumnIndex("inv_quantity_on_hand");
if (col_index < 0) {
std::cerr << "Column 'inv_quantity_on_hand' not found." << std::endl;
return -1;
}
int num_row_groups = reader->num_row_groups();
for (int i = 0; i < num_row_groups; ++i) {
std::shared_ptr<arrow::Table> table;
StatusCheck(reader->ReadRowGroup(i, {col_index}, &table));
auto col_chunks = table->column(0);
for (const auto& chunk : col_chunks->chunks()) {
auto int_array = std::static_pointer_cast<arrow::Int32Array>(chunk);
for (int64_t j = 0; j < int_array->length(); ++j) {
if (!int_array->IsNull(j)) {
int32_t value = int_array->Value(j);
if (value >= 100 && value <= 500) {
count++;
}
}
}
}
}
return count;
}
// Method 2: With Predicate Pushdown (No changes needed here)
long long CountWithPredicatePushdown(const std::string& file_path) {
long long count = 0;
std::string uri = "file://" + file_path;
arrow::dataset::FileSystemFactoryOptions options;
auto factory_result = arrow::dataset::FileSystemDatasetFactory::Make(
uri, std::make_shared<arrow::dataset::ParquetFileFormat>(), options);
StatusCheck(factory_result.status());
std::shared_ptr<arrow::dataset::DatasetFactory> factory = *factory_result;
auto dataset_result = factory->Finish();
StatusCheck(dataset_result.status());
std::shared_ptr<arrow::dataset::Dataset> dataset = *dataset_result;
arrow::compute::Expression filter_expr = arrow::compute::call("and",
{
arrow::compute::greater_equal(arrow::compute::field_ref("inv_quantity_on_hand"), arrow::compute::literal(100)),
arrow::compute::less_equal(arrow::compute::field_ref("inv_quantity_on_hand"), arrow::compute::literal(500))
}
);
auto scanner_builder_result = dataset->NewScan();
StatusCheck(scanner_builder_result.status());
std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder = *scanner_builder_result;
StatusCheck(scanner_builder->Filter(filter_expr));
StatusCheck(scanner_builder->Project({}));
auto scanner_result = scanner_builder->Finish();
StatusCheck(scanner_result.status());
std::shared_ptr<arrow::dataset::Scanner> scanner = *scanner_result;
auto reader_result = scanner->ToRecordBatchReader();
StatusCheck(reader_result.status());
std::shared_ptr<arrow::RecordBatchReader> reader = *reader_result;
std::shared_ptr<arrow::RecordBatch> batch;
while (true) {
StatusCheck(reader->ReadNext(&batch));
if (!batch) {
break;
}
count += batch->num_rows();
}
return count;
}
int main() {
arrow::compute::Initialize();
std::string file_path = "/path/to/inventory.parquet";
// --- Test without predicate pushdown ---
auto start1 = std::chrono::high_resolution_clock::now();
long long count1 = CountWithoutPredicatePushdown(file_path);
auto end1 = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::milli> duration1 = end1 - start1;
std::cout << "--- Without Predicate Pushdown ---" << std::endl;
std::cout << "Found " << count1 << " rows." << std::endl;
std::cout << "Time taken: " << duration1.count() << " ms" << std::endl;
std::cout << std::endl;
// --- Test with predicate pushdown ---
auto start2 = std::chrono::high_resolution_clock::now();
long long count2 = CountWithPredicatePushdown(file_path);
auto end2 = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::milli> duration2 = end2 - start2;
std::cout << "--- With Predicate Pushdown ---" << std::endl;
std::cout << "Found " << count2 << " rows." << std::endl;
std::cout << "Time taken: " << duration2.count() << " ms" << std::endl;
std::cout << std::endl;
if (count1 == count2) {
std::cout << "✅ Results match!" << std::endl;
} else {
std::cout << "❌ Error: Results do not match!" << std::endl;
}
return 0;
}
Expected behavior
Predicate pushdown should not degrade performance.
Even if minimal pruning occurs, the runtime should be comparable to or better than a full scan, similar to the Arrow C++ implementation.
Additional context
Proposed Analysis & Solution
arrow-rs/parquet/src/arrow/arrow_reader/mod.rs
Line 1010 in 5993dff
impl ParquetRecordBatchReader {
The slowdown is caused by excessive per-row branching and state transitions in a fragmented ReadPlan
.
A possible hybrid strategy could mitigate this:
- Analyze plan fragmentation before decoding each page.
- If fragmentation is low, continue with the existing streaming “filter-during-decode” path.
- If fragmentation is high, switch to “decode-then-filter”: decode the full page into a temporary buffer and apply a vectorized filter kernel.
This approach would preserve Arrow-RS’s low-memory design in the common case while avoiding CPU blowups in pathological cases.
Questions for maintainers
- Are there design constraints (e.g., streaming semantics, async I/O) that make this approach difficult to integrate?
Thank you for maintaining this excellent library.