Skip to content

chore: bringing dat integration testing in ahead of kernel replay #3411

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/actions/setup-env/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ description: "Set up Python, virtual environment, and Rust toolchain"
inputs:
python-version:
description: "The Python version to set up"
required: true
required: false
default: "3.10"

rust-toolchain:
description: "The Rust toolchain to set up"
required: true
required: false
default: "stable"

runs:
Expand Down
7 changes: 6 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ jobs:
override: true

- name: Run tests
run: cargo test --verbose --features ${{ env.DEFAULT_FEATURES }}
run: |
gmake setup-dat
cargo test --verbose --features ${{ env.DEFAULT_FEATURES }}

integration_test:
name: Integration Tests
Expand Down Expand Up @@ -141,11 +143,13 @@ jobs:

- name: Run tests with rustls (default)
run: |
gmake setup-dat
cargo test --features integration_test,${{ env.DEFAULT_FEATURES }}

- name: Run tests with native-tls
run: |
cargo clean
gmake setup-dat
cargo test --no-default-features --features integration_test,s3-native-tls,datafusion

integration_test_lakefs:
Expand Down Expand Up @@ -176,5 +180,6 @@ jobs:

- name: Run tests with rustls (default)
run: |
gmake setup-dat
cargo test --features integration_test_lakefs,lakefs,datafusion

12 changes: 0 additions & 12 deletions .github/workflows/python_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ jobs:
- name: Build and install deltalake
run: make develop

- name: Download Data Acceptance Tests (DAT) files
run: make setup-dat

- name: Run tests
run: uv run --no-sync pytest -m '((s3 or azure) and integration) or not integration and not benchmark' --doctest-modules

Expand Down Expand Up @@ -104,9 +101,6 @@ jobs:
- name: Build and install deltalake
run: make develop

- name: Download Data Acceptance Tests (DAT) files
run: make setup-dat

- name: Run tests
run: uv run --no-sync pytest -m '(lakefs and integration)' --doctest-modules

Expand All @@ -133,9 +127,6 @@ jobs:
- name: Build and install deltalake
run: make develop

- name: Download Data Acceptance Tests (DAT) files
run: make setup-dat

- name: Run tests
run: ../../delta-rs/.github/scripts/retry_integration_test.sh unitycatalog_databricks 5 10

Expand All @@ -162,9 +153,6 @@ jobs:
- name: Build and install deltalake
run: make develop

- name: Download Data Acceptance Tests (DAT) files
run: make setup-dat

- name: Run tests
run: ../../delta-rs/.github/scripts/retry_integration_test.sh unitycatalog_oss 5 10

Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ __blobstorage__
.githubchangeloggenerator.cache/
.githubchangeloggenerator*
data
.zed/

# Add all Cargo.lock files except for those in binary crates
Cargo.lock
Expand All @@ -35,3 +36,4 @@ site
__pycache__
.zed
.zed/
dat/
37 changes: 37 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# This Makefile exists largely to help ensure that some of the common behaviors
# between CI and local development can be consistently replicated
#
# For the most part you should be able to rely on cargo for development.

.DEFAULT_GOAL := help
DAT_VERSION := 0.0.3

## begin dat related
####################
.PHONY: setup-dat
setup-dat: dat/deltalake-dat-v$(DAT_VERSION) ## Download and setup the Delta Acceptance Tests (dat)

dat:
mkdir -p dat

dat/deltalake-dat-v$(DAT_VERSION): dat ## Download DAT test files into ./dat
rm -rf dat/v$(DAT_VERSION)
curl -L --silent --output dat/deltalake-dat-v$(DAT_VERSION).tar.gz \
https://github.com/delta-incubator/dat/releases/download/v$(DAT_VERSION)/deltalake-dat-v$(DAT_VERSION).tar.gz
tar --no-same-permissions -xzf dat/deltalake-dat-v$(DAT_VERSION).tar.gz
mv out dat/v$(DAT_VERSION)
rm dat/deltalake-dat-v$(DAT_VERSION).tar.gz


####################
## end dat related


.PHONY: clean
clean: ## Remove temporary and downloaded artifacts
rm -rf dat

.PHONY: help
help: ## Produce the helpful command listing
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
5 changes: 5 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ humantime = { version = "2.1.0" }
[dev-dependencies]
criterion = "0.5"
ctor = "0"
datatest-stable = "0.2"
deltalake-test = { path = "../test", features = ["datafusion"] }
dotenvy = "0"
fs_extra = "1.2.0"
Expand Down Expand Up @@ -134,3 +135,7 @@ cloud = ["object_store/cloud"]

# enable caching some file I/O operations when scanning delta logs
delta-cache = ["foyer", "tempfile", "url/serde"]

[[test]]
name = "dat"
harness = false
52 changes: 52 additions & 0 deletions crates/core/tests/dat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use std::path::Path;

use deltalake_core::DeltaTableBuilder;
use deltalake_test::acceptance::read_dat_case;

static SKIPPED_TESTS: &[&str; 4] = &[
"iceberg_compat_v1",
"column_mapping",
"check_constraints",
"deletion_vectors",
];

fn reader_test_eager(path: &Path) -> datatest_stable::Result<()> {
let root_dir = format!(
"{}/{}",
env!["CARGO_MANIFEST_DIR"],
path.parent().unwrap().to_str().unwrap()
);
for skipped in SKIPPED_TESTS {
if root_dir.ends_with(skipped) {
println!("Skipping test: {}", skipped);
return Ok(());
}
}

tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?
.block_on(async {
let case = read_dat_case(root_dir).unwrap();

let table = DeltaTableBuilder::from_uri(case.table_root().unwrap())
.load()
.await
.expect("table");
let table_info = case.table_summary().expect("load summary");
let snapshot = table.snapshot().expect("Failed to load snapshot");
let protocol = table.protocol().expect("Failed to load protocol");
assert_eq!(snapshot.version() as u64, table_info.version);
assert_eq!(
(protocol.min_reader_version, protocol.min_writer_version),
(table_info.min_reader_version, table_info.min_writer_version)
);
});
Ok(())
}

datatest_stable::harness!(
reader_test_eager,
"../../dat/v0.0.3/reader_tests/generated/",
r"test_case_info\.json"
);
10 changes: 10 additions & 0 deletions crates/test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,16 @@ edition = "2021"
publish = false

[dependencies]
arrow-array = { workspace = true, features = ["chrono-tz"] }
arrow-cast = { workspace = true }
arrow-ord = { workspace = true }
arrow-schema = { workspace = true, features = ["serde"] }
arrow-select = { workspace = true }
parquet = { workspace = true, features = ["async", "object_store"] }

bytes = { workspace = true }
chrono = { workspace = true, default-features = false, features = ["clock"] }
delta_kernel = { workspace = true }
deltalake-core = { version = "0.26.0", path = "../core" }
dotenvy = "0"
fs_extra = "1.3.0"
Expand All @@ -16,7 +24,9 @@ rand = "0.8"
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tempfile = "3"
thiserror = { workspace = true }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
url = { workspace = true }

[features]
default = []
Expand Down
130 changes: 130 additions & 0 deletions crates/test/src/acceptance/data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use std::{path::Path, sync::Arc};

use arrow_array::{Array, RecordBatch};
use arrow_ord::sort::{lexsort_to_indices, SortColumn};
use arrow_schema::{DataType, Schema};
use arrow_select::{concat::concat_batches, take::take};
use delta_kernel::DeltaResult;
use futures::{stream::TryStreamExt, StreamExt};
use object_store::{local::LocalFileSystem, ObjectStore};
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};

use super::TestCaseInfo;
use crate::TestResult;

pub async fn read_golden(path: &Path, _version: Option<&str>) -> DeltaResult<RecordBatch> {
let expected_root = path.join("expected").join("latest").join("table_content");
let store = Arc::new(LocalFileSystem::new_with_prefix(&expected_root)?);
let files: Vec<_> = store.list(None).try_collect().await?;
let mut batches = vec![];
let mut schema = None;
for meta in files.into_iter() {
if let Some(ext) = meta.location.extension() {
if ext == "parquet" {
let reader = ParquetObjectReader::new(store.clone(), meta.location);
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
if schema.is_none() {
schema = Some(builder.schema().clone());
}
let mut stream = builder.build()?;
while let Some(batch) = stream.next().await {
batches.push(batch?);
}
}
}
}
let all_data = concat_batches(&schema.unwrap(), &batches)?;
Ok(all_data)
}

pub fn sort_record_batch(batch: RecordBatch) -> DeltaResult<RecordBatch> {
// Sort by all columns
let mut sort_columns = vec![];
for col in batch.columns() {
match col.data_type() {
DataType::Struct(_) | DataType::List(_) | DataType::Map(_, _) => {
// can't sort structs, lists, or maps
}
_ => sort_columns.push(SortColumn {
values: col.clone(),
options: None,
}),
}
}
let indices = lexsort_to_indices(&sort_columns, None)?;
let columns = batch
.columns()
.iter()
.map(|c| take(c, &indices, None).unwrap())
.collect();
Ok(RecordBatch::try_new(batch.schema(), columns)?)
}

// Ensure that two schema have the same field names, and dict_id/ordering.
// We ignore:
// - data type: This is checked already in `assert_columns_match`
// - nullability: parquet marks many things as nullable that we don't in our schema
// - metadata: because that diverges from the real data to the golden tabled data
fn assert_schema_fields_match(schema: &Schema, golden: &Schema) {
for (schema_field, golden_field) in schema.fields.iter().zip(golden.fields.iter()) {
assert!(
schema_field.name() == golden_field.name(),
"Field names don't match"
);
assert!(
schema_field.dict_id() == golden_field.dict_id(),
"Field dict_id doesn't match"
);
assert!(
schema_field.dict_is_ordered() == golden_field.dict_is_ordered(),
"Field dict_is_ordered doesn't match"
);
}
}

// some things are equivalent, but don't show up as equivalent for `==`, so we normalize here
fn normalize_col(col: Arc<dyn Array>) -> Arc<dyn Array> {
if let DataType::Timestamp(unit, Some(zone)) = col.data_type() {
if **zone == *"+00:00" {
arrow_cast::cast::cast(&col, &DataType::Timestamp(*unit, Some("UTC".into())))
.expect("Could not cast to UTC")
} else {
col
}
} else {
col
}
}

fn assert_columns_match(actual: &[Arc<dyn Array>], expected: &[Arc<dyn Array>]) {
for (actual, expected) in actual.iter().zip(expected) {
let actual = normalize_col(actual.clone());
let expected = normalize_col(expected.clone());
// note that array equality includes data_type equality
// See: https://arrow.apache.org/rust/arrow_data/equal/fn.equal.html
assert_eq!(
&actual, &expected,
"Column data didn't match. Got {actual:?}, expected {expected:?}"
);
}
}

pub async fn assert_scan_data(
all_data: Vec<RecordBatch>,
test_case: &TestCaseInfo,
) -> TestResult<()> {
let all_data = concat_batches(&all_data[0].schema(), all_data.iter()).unwrap();
let all_data = sort_record_batch(all_data)?;

let golden = read_golden(test_case.root_dir(), None).await?;
let golden = sort_record_batch(golden)?;

assert_columns_match(all_data.columns(), golden.columns());
assert_schema_fields_match(all_data.schema().as_ref(), golden.schema().as_ref());
assert!(
all_data.num_rows() == golden.num_rows(),
"Didn't have same number of rows"
);

Ok(())
}
Loading
Loading