Skip to content

[WIP] Add ManifestEvaluator to allow filtering of files in a table scan (Issue #152) #241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(wip): add ManifestEvalVisitor and InclusiveProjection
  • Loading branch information
sdd committed Mar 22, 2024
commit 9a0f80fe91d2041770f0f40ac267f81df9b5b4cf
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async-trait = "0.1"
bimap = "0.6"
bitvec = "1.0.1"
bytes = "1.5"
chrono = "0.4"
chrono = "~0.4.34"
derive_builder = "0.20.0"
either = "1"
env_logger = "0.11.0"
Expand Down
54 changes: 49 additions & 5 deletions crates/iceberg/src/expr/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl<T: Debug, const N: usize> Debug for LogicalExpression<T, N> {
}

impl<T, const N: usize> LogicalExpression<T, N> {
fn new(inputs: [Box<T>; N]) -> Self {
pub(crate) fn new(inputs: [Box<T>; N]) -> Self {
Self { inputs }
}

Expand Down Expand Up @@ -82,9 +82,9 @@ where
#[derive(PartialEq)]
pub struct UnaryExpression<T> {
/// Operator of this predicate, must be single operand operator.
op: PredicateOperator,
pub(crate) op: PredicateOperator,
/// Term of this predicate, for example, `a` in `a IS NULL`.
term: T,
pub(crate) term: T,
}

impl<T: Debug> Debug for UnaryExpression<T> {
Expand Down Expand Up @@ -116,13 +116,20 @@ impl<T> UnaryExpression<T> {
debug_assert!(op.is_unary());
Self { op, term }
}

pub(crate) fn field_id(&self) -> i32 {
todo!();

// The below is not yet working since T may not implement `.field()`
// self.term.field().id
}
}

/// Binary predicate, for example, `a > 10`.
#[derive(PartialEq)]
pub struct BinaryExpression<T> {
/// Operator of this predicate, must be binary operator, such as `=`, `>`, `<`, etc.
op: PredicateOperator,
pub(crate) op: PredicateOperator,
/// Term of this predicate, for example, `a` in `a > 10`.
term: T,
/// Literal of this predicate, for example, `10` in `a > 10`.
Expand All @@ -144,6 +151,13 @@ impl<T> BinaryExpression<T> {
debug_assert!(op.is_binary());
Self { op, term, literal }
}

pub(crate) fn field_id(&self) -> i32 {
todo!();

// The below is not yet working since T may not implement `.field()`
// self.term.field().id
}
}

impl<T: Display> Display for BinaryExpression<T> {
Expand All @@ -169,7 +183,7 @@ impl<T: Bind> Bind for BinaryExpression<T> {
#[derive(PartialEq)]
pub struct SetExpression<T> {
/// Operator of this predicate, must be set operator, such as `IN`, `NOT IN`, etc.
op: PredicateOperator,
pub(crate) op: PredicateOperator,
/// Term of this predicate, for example, `a` in `a in (1, 2, 3)`.
term: T,
/// Literals of this predicate, for example, `(1, 2, 3)` in `a in (1, 2, 3)`.
Expand All @@ -191,6 +205,13 @@ impl<T> SetExpression<T> {
debug_assert!(op.is_set());
Self { op, term, literals }
}

pub(crate) fn field_id(&self) -> i32 {
todo!();

// The below is not yet working since T may not implement `.field()`
// self.term.field().id
}
}

impl<T: Bind> Bind for SetExpression<T> {
Expand All @@ -217,6 +238,9 @@ impl<T: Display + Debug> Display for SetExpression<T> {
/// Unbound predicate expression before binding to a schema.
#[derive(Debug, PartialEq)]
pub enum Predicate {
AlwaysTrue,
AlwaysFalse,

/// And predicate, for example, `a > 10 AND b < 20`.
And(LogicalExpression<Predicate, 2>),
/// Or predicate, for example, `a > 10 OR b < 20`.
Expand Down Expand Up @@ -367,13 +391,21 @@ impl Bind for Predicate {
bound_literals,
)))
}
Predicate::AlwaysTrue => Ok(BoundPredicate::AlwaysTrue),
Predicate::AlwaysFalse => Ok(BoundPredicate::AlwaysFalse),
}
}
}

impl Display for Predicate {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Predicate::AlwaysTrue => {
write!(f, "TRUE")
}
Predicate::AlwaysFalse => {
write!(f, "FALSE")
}
Predicate::And(expr) => {
write!(f, "({}) AND ({})", expr.inputs()[0], expr.inputs()[1])
}
Expand Down Expand Up @@ -461,6 +493,8 @@ impl Predicate {
/// ```
pub fn negate(self) -> Predicate {
match self {
Predicate::AlwaysTrue => Predicate::AlwaysFalse,
Predicate::AlwaysFalse => Predicate::AlwaysTrue,
Predicate::And(expr) => Predicate::Or(LogicalExpression::new(
expr.inputs.map(|expr| Box::new(expr.negate())),
)),
Expand Down Expand Up @@ -525,6 +559,8 @@ impl Predicate {
Predicate::Unary(expr) => Predicate::Unary(expr),
Predicate::Binary(expr) => Predicate::Binary(expr),
Predicate::Set(expr) => Predicate::Set(expr),
Predicate::AlwaysTrue => Predicate::AlwaysTrue,
Predicate::AlwaysFalse => Predicate::AlwaysFalse,
}
}
}
Expand Down Expand Up @@ -607,6 +643,14 @@ impl Display for BoundPredicate {
}
}

pub(crate) trait PredicateVisitor<T> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these are needed anymore.

fn visit(predicate: Predicate) -> T;
}

pub(crate) trait BoundPredicateVisitor<T> {
fn visit(predicate: BoundPredicate) -> T;
}

#[cfg(test)]
mod tests {
use std::ops::Not;
Expand Down
74 changes: 41 additions & 33 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,20 @@

use crate::arrow::ArrowReaderBuilder;
use crate::expr::BoundPredicate::AlwaysTrue;
use crate::expr::{Bind, BoundPredicate, BoundReference, LogicalExpression, Predicate, PredicateOperator};
use crate::expr::{Bind, BoundPredicate, LogicalExpression, Predicate, PredicateOperator};
use crate::io::FileIO;
use crate::spec::{
DataContentType, FieldSummary, Manifest, ManifestEntry, ManifestEntryRef, ManifestFile,
PartitionField, PartitionSpec, PartitionSpecRef, Schema, SchemaRef, SnapshotRef,
TableMetadataRef, Transform,
DataContentType, FieldSummary, ManifestEntryRef, ManifestFile, PartitionField,
PartitionSpecRef, Schema, SchemaRef, SnapshotRef, TableMetadataRef,
};
use crate::table::Table;
use crate::{Error, ErrorKind};
use arrow_array::RecordBatch;
use async_stream::try_stream;
use futures::stream::{iter, BoxStream};
use futures::StreamExt;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use async_stream::try_stream;

/// Builder to create table scan.
pub struct TableScanBuilder<'a> {
Expand Down Expand Up @@ -173,8 +171,7 @@ pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;
impl TableScan {
/// Returns a stream of file scan tasks.


pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
pub async fn plan_files(&'static self) -> crate::Result<FileScanTaskStream> {
// Cache `PartitionEvaluator`s created as part of this scan
let mut partition_evaluator_cache: HashMap<i32, PartitionEvaluator> = HashMap::new();

Expand All @@ -197,20 +194,12 @@ impl TableScan {
// PartitionEvaluator that matches this manifest's partition spec ID.
// Use one from the cache if there is one. If not, create one, put it in
// the cache, and take a reference to it.
let partition_evaluator = if let Some(filter) = self.filter.as_ref() {
Some(
partition_evaluator_cache
if let Some(filter) = self.filter.as_ref() {
let partition_evaluator = partition_evaluator_cache
.entry(manifest.partition_spec_id())
.or_insert_with_key(self.create_partition_evaluator(filter))
.deref(),
)
} else {
None
};
.or_insert_with_key(|key| self.create_partition_evaluator(key, filter));

// If this scan has a filter, reject any manifest files whose partition values
// don't match the filter.
if let Some(partition_evaluator) = partition_evaluator {
// reject any manifest files whose partition values don't match the filter.
if !partition_evaluator.filter_manifest_file(&entry) {
continue;
}
Expand All @@ -236,20 +225,24 @@ impl TableScan {
}
}
}
}.boxed())
}
.boxed())
}

fn create_partition_evaluator(&self, filter: &Predicate) -> fn(&i32) -> crate::Result<PartitionEvaluator> {
|&id| {
// TODO: predicate binding not yet merged to main
let bound_predicate = filter.bind(self.schema.clone(), self.case_sensitive)?;
fn create_partition_evaluator(&self, id: &i32, filter: &Predicate) -> PartitionEvaluator {

let partition_spec = self.table_metadata.partition_spec_by_id(id).unwrap();
PartitionEvaluator::new(partition_spec.clone(), bound_predicate, self.schema.clone())
}
// TODO: this does not work yet. `bind` consumes self, but `Predicate`
// does not implement `Clone` or `Copy`.
let bound_predicate = filter.clone()
.bind(self.schema.clone(), self.case_sensitive)
.unwrap();

let partition_spec = self.table_metadata.partition_spec_by_id(*id).unwrap();
PartitionEvaluator::new(partition_spec.clone(), bound_predicate, self.schema.clone())
.unwrap()
}

pub async fn to_arrow(&self) -> crate::Result<ArrowRecordBatchStream> {
pub async fn to_arrow(&'static self) -> crate::Result<ArrowRecordBatchStream> {
let mut arrow_reader_builder =
ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone());

Expand Down Expand Up @@ -315,7 +308,11 @@ struct ManifestEvalVisitor {
}

impl ManifestEvalVisitor {
fn new(partition_schema: SchemaRef, partition_filter: Predicate, case_sensitive: bool) -> crate::Result<Self> {
fn new(
partition_schema: SchemaRef,
partition_filter: Predicate,
case_sensitive: bool,
) -> crate::Result<Self> {
let partition_filter = partition_filter.bind(partition_schema.clone(), case_sensitive)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have rewrite_not already in place to be applied to partition_filter?


Ok(Self {
Expand All @@ -332,8 +329,13 @@ impl ManifestEvalVisitor {
case_sensitive: bool,
) -> crate::Result<Self> {
let partition_type = partition_spec.partition_type(&table_schema)?;

// this is needed as SchemaBuilder.with_fields expects an iterator over
// Arc<NestedField> rather than &Arc<NestedField>
let cloned_partition_fields: Vec<_> = partition_type.fields().iter().map(Arc::clone).collect();

let partition_schema = Schema::builder()
.with_fields(partition_type.fields())
.with_fields(cloned_partition_fields)
.build()?;

let partition_schema_ref = Arc::new(partition_schema);
Expand Down Expand Up @@ -478,14 +480,20 @@ impl InclusiveProjection {

// TODO: cache this?
let mut parts: Vec<&PartitionField> = vec![];
for partition_spec_field in self.partition_spec.fields {
for partition_spec_field in &self.partition_spec.fields {
if partition_spec_field.source_id == field_id {
parts.push(&partition_spec_field)
}
}

parts.iter().fold(Predicate::AlwaysTrue, |res, &part| {
res.and(part.transform.project(&part.name, &predicate))
// should this use ? instead of destructuring Ok() so that the whole call fails
// if the transform project() call errors? This would require changing the signature of `visit`.
if let Ok(Some(pred_for_part)) = part.transform.project(&part.name, &predicate) {
res.and(pred_for_part)
} else {
res
}
})
}
}
Expand Down
7 changes: 7 additions & 0 deletions crates/iceberg/src/spec/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Transforms in iceberg.

use crate::error::{Error, Result};
use crate::expr::{BoundPredicate, Predicate};
use crate::spec::datatypes::{PrimitiveType, Type};
use crate::ErrorKind;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
Expand Down Expand Up @@ -261,6 +262,12 @@ impl Transform {
_ => self == other,
}
}

pub fn project(&self, _name: &str, _predicate: &BoundPredicate) -> Result<Option<Predicate>> {
// Waiting on https://github.com/apache/iceberg-rust/pull/269
// to deliver https://github.com/apache/iceberg-rust/issues/264
todo!()
}
}

impl Display for Transform {
Expand Down