Skip to content

[WIP] Add ManifestEvaluator to allow filtering of files in a table scan (Issue #152) #241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: update accessors to use Arc and store their map as Arcs.
  • Loading branch information
sdd committed Apr 3, 2024
commit fbe09e9feffb4941730a64c2ddc984688d097a21
13 changes: 9 additions & 4 deletions crates/iceberg/src/expr/accessor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::sync::Arc;
use crate::spec::{Literal, Struct, Type};
use serde_derive::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
enum InnerOrType {
Inner(Box<StructAccessor>),
Inner(Arc<StructAccessor>),
Type(Type),
}

Expand All @@ -13,6 +14,8 @@ pub struct StructAccessor {
inner_or_type: InnerOrType,
}

pub(crate) type StructAccessorRef = Arc<StructAccessor>;

impl StructAccessor {
pub(crate) fn new(position: i32, r#type: Type) -> Self {
StructAccessor {
Expand All @@ -21,10 +24,10 @@ impl StructAccessor {
}
}

pub(crate) fn wrap(position: i32, inner: StructAccessor) -> Self {
pub(crate) fn wrap(position: i32, inner: StructAccessorRef) -> Self {
StructAccessor {
position,
inner_or_type: InnerOrType::Inner(Box::from(inner)),
inner_or_type: InnerOrType::Inner(inner),
}
}

Expand All @@ -43,7 +46,9 @@ impl StructAccessor {
match &self.inner_or_type {
InnerOrType::Inner(inner) => match container.get(self.position) {
Literal::Struct(wrapped) => inner.get(wrapped),
_ => { panic!("Should only be wrapping a Struct") }
_ => {
panic!("Nested accessor should only be wrapping a Struct")
}
},
InnerOrType::Type(_) => container.get(self.position),
}
Expand Down
21 changes: 14 additions & 7 deletions crates/iceberg/src/expr/term.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::fmt::{Display, Formatter};

use fnv::FnvHashSet;

use crate::expr::accessor::StructAccessor;
use crate::expr::accessor::{StructAccessor, StructAccessorRef};
use crate::expr::Bind;
use crate::expr::{BinaryExpression, Predicate, PredicateOperator, SetExpression, UnaryExpression};
use crate::spec::{Datum, NestedField, NestedFieldRef, SchemaRef};
Expand Down Expand Up @@ -190,7 +190,12 @@ impl Bind for Reference {
)
})?;

let accessor = schema.accessor_for_field_id(field.id);
let accessor = schema.accessor_for_field_id(field.id).ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Accessor for Field {} not found", self.name),
)
})?;

Ok(BoundReference::new(
self.name.clone(),
Expand All @@ -207,12 +212,12 @@ pub struct BoundReference {
// For example, if the field is `a.b.c`, then `field.name` is `c`, but `original_name` is `a.b.c`.
column_name: String,
field: NestedFieldRef,
accessor: StructAccessor,
accessor: StructAccessorRef,
}

impl BoundReference {
/// Creates a new bound reference.
pub fn new(name: impl Into<String>, field: NestedFieldRef, accessor: StructAccessor) -> Self {
pub fn new(name: impl Into<String>, field: NestedFieldRef, accessor: StructAccessorRef) -> Self {
Self {
column_name: name.into(),
field,
Expand Down Expand Up @@ -244,7 +249,7 @@ pub type BoundTerm = BoundReference;
mod tests {
use std::sync::Arc;

use crate::expr::accessor::{StructAccessor, StructAccessor};
use crate::expr::accessor::StructAccessor;
use crate::expr::{Bind, BoundReference, Reference};
use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type};

Expand All @@ -268,10 +273,11 @@ mod tests {
let schema = table_schema_simple();
let reference = Reference::new("bar").bind(schema, true).unwrap();

let accessor_ref = Arc::new(StructAccessor::new(1, Type::Primitive(PrimitiveType::Int)));
let expected_ref = BoundReference::new(
"bar",
NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
StructAccessor::new(1, Type::Primitive(PrimitiveType::Int)),
accessor_ref.clone(),
);

assert_eq!(expected_ref, reference);
Expand All @@ -282,10 +288,11 @@ mod tests {
let schema = table_schema_simple();
let reference = Reference::new("BAR").bind(schema, false).unwrap();

let accessor_ref = Arc::new(StructAccessor::new(1, Type::Primitive(PrimitiveType::Int)));
let expected_ref = BoundReference::new(
"BAR",
NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
StructAccessor::new(1, Type::Primitive(PrimitiveType::Int)),
accessor_ref.clone(),
);

assert_eq!(expected_ref, reference);
Expand Down
26 changes: 17 additions & 9 deletions crates/iceberg/src/spec/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct Schema {
lowercase_name_to_id: HashMap<String, i32>,
id_to_name: HashMap<i32, String>,

field_id_to_accessor: HashMap<i32, StructAccessor>,
field_id_to_accessor: HashMap<i32, Arc<StructAccessor>>,
}

impl PartialEq for Schema {
Expand Down Expand Up @@ -145,43 +145,51 @@ impl SchemaBuilder {
})
}

fn build_accessors(&self) -> HashMap<i32, StructAccessor> {
fn build_accessors(&self) -> HashMap<i32, Arc<StructAccessor>> {
let mut map = HashMap::new();

for (pos, field) in self.fields.iter().enumerate() {
// add an accessor for this field

let accessor = Arc::new(StructAccessor::new(pos as i32, *field.field_type.clone()));
map.insert(
field.id,
StructAccessor::new(pos as i32, *field.field_type.clone()),
accessor.clone(),
);

if let Type::Struct(nested) = field.field_type.as_ref() {
// add accessors for nested fields
for (field_id, accessor) in Self::build_accessors_nested(nested.fields()) {
map.insert(field_id, StructAccessor::wrap(pos as i32, accessor));
let new_accessor = Arc::new(StructAccessor::wrap(pos as i32, accessor));
map.insert(field_id, new_accessor.clone());
}
}
}

map
}

fn build_accessors_nested(fields: &[NestedFieldRef]) -> Vec<(i32, StructAccessor)> {
fn build_accessors_nested(fields: &[NestedFieldRef]) -> Vec<(i32, Arc<StructAccessor>)> {
let mut results = vec![];
for (pos, field) in fields.iter().enumerate() {
if let Type::Struct(nested) = field.field_type.as_ref() {
let nested_accessors = Self::build_accessors_nested(nested.fields());

let wrapped_nested_accessors = nested_accessors
.into_iter()
.map(|(id, accessor)| (id, StructAccessor::wrap(pos as i32, accessor)));
.map(|(id, accessor)| {
let new_accessor = Arc::new(StructAccessor::wrap(pos as i32, accessor));
(id, new_accessor.clone())
});

results.extend(wrapped_nested_accessors);
}

let accessor = Arc::new(StructAccessor::new(pos as i32, *field.field_type.clone()));

results.push((
field.id,
StructAccessor::new(pos as i32, *field.field_type.clone()),
accessor.clone(),
));
}

Expand Down Expand Up @@ -314,8 +322,8 @@ impl Schema {
}

/// Get an accessor for retrieving data in a struct
pub fn accessor_for_field_id(&self, field_id: i32) -> Option<&StructAccessor> {
self.field_id_to_accessor.get(&field_id)
pub fn accessor_for_field_id(&self, field_id: i32) -> Option<Arc<StructAccessor>> {
self.field_id_to_accessor.get(&field_id).map(|acc|acc.clone())
}
}

Expand Down