Skip to content

refactor: use LogStore in Snapshot / LogSegment APIs #3452

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 57 additions & 69 deletions crates/core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,19 @@ impl LogSegment {
/// Try to create a new [`LogSegment`]
///
/// This will list the entire log directory and find all relevant files for the given table version.
pub async fn try_new(
table_root: &Path,
version: Option<i64>,
store: &dyn ObjectStore,
) -> DeltaResult<Self> {
let log_url = table_root.child("_delta_log");
let maybe_cp = read_last_checkpoint(store, &log_url).await?;
pub async fn try_new(log_store: &dyn LogStore, version: Option<i64>) -> DeltaResult<Self> {
let store = log_store.object_store(None);

let log_url = Path::from("_delta_log");
let maybe_cp = read_last_checkpoint(&store, &log_url).await?;

// List relevant files from log
let (mut commit_files, checkpoint_files) = match (maybe_cp, version) {
(Some(cp), None) => list_log_files_with_checkpoint(&cp, store, &log_url).await?,
(Some(cp), None) => list_log_files_with_checkpoint(&cp, &store, &log_url).await?,
(Some(cp), Some(v)) if cp.version <= v => {
list_log_files_with_checkpoint(&cp, store, &log_url).await?
list_log_files_with_checkpoint(&cp, &store, &log_url).await?
}
_ => list_log_files(store, &log_url, version, None).await?,
_ => list_log_files(&store, &log_url, version, None).await?,
};

// remove all files above requested version
Expand Down Expand Up @@ -273,10 +271,12 @@ impl LogSegment {

pub(super) fn commit_stream(
&self,
store: Arc<dyn ObjectStore>,
log_store: &dyn LogStore,
read_schema: &Schema,
config: &DeltaTableConfig,
) -> DeltaResult<BoxStream<'_, DeltaResult<RecordBatch>>> {
let store = log_store.object_store(None);

let decoder = json::get_decoder(Arc::new(read_schema.try_into()?), config)?;
let stream = futures::stream::iter(self.commit_files.iter())
.map(move |meta| {
Expand All @@ -289,10 +289,12 @@ impl LogSegment {

pub(super) fn checkpoint_stream(
&self,
store: Arc<dyn ObjectStore>,
log_store: &dyn LogStore,
read_schema: &Schema,
config: &DeltaTableConfig,
) -> BoxStream<'_, DeltaResult<RecordBatch>> {
let store = log_store.object_store(None);

let batch_size = config.log_batch_size;
let read_schema = Arc::new(read_schema.clone());
futures::stream::iter(self.checkpoint_files.clone())
Expand Down Expand Up @@ -341,7 +343,7 @@ impl LogSegment {
/// Read [`Protocol`] and [`Metadata`] actions
pub(super) async fn read_metadata(
&self,
store: Arc<dyn ObjectStore>,
log_store: &dyn LogStore,
config: &DeltaTableConfig,
) -> DeltaResult<(Option<Protocol>, Option<Metadata>)> {
static READ_SCHEMA: LazyLock<StructType> = LazyLock::new(|| {
Expand All @@ -354,7 +356,7 @@ impl LogSegment {
let mut maybe_protocol = None;
let mut maybe_metadata = None;

let mut commit_stream = self.commit_stream(store.clone(), &READ_SCHEMA, config)?;
let mut commit_stream = self.commit_stream(log_store, &READ_SCHEMA, config)?;
while let Some(batch) = commit_stream.next().await {
let batch = batch?;
if maybe_protocol.is_none() {
Expand All @@ -372,7 +374,7 @@ impl LogSegment {
}
}

let mut checkpoint_stream = self.checkpoint_stream(store.clone(), &READ_SCHEMA, config);
let mut checkpoint_stream = self.checkpoint_stream(log_store, &READ_SCHEMA, config);
while let Some(batch) = checkpoint_stream.next().await {
let batch = batch?;
if maybe_protocol.is_none() {
Expand Down Expand Up @@ -602,12 +604,9 @@ pub(super) mod tests {
}

async fn log_segment_serde() -> TestResult {
let store = TestTables::Simple
.table_builder()
.build_storage()?
.object_store(None);
let log_store = TestTables::Simple.table_builder().build_storage()?;

let segment = LogSegment::try_new(&Path::default(), None, store.as_ref()).await?;
let segment = LogSegment::try_new(&log_store, None).await?;
let bytes = serde_json::to_vec(&segment).unwrap();
let actual: LogSegment = serde_json::from_slice(&bytes).unwrap();
assert_eq!(actual.version(), segment.version());
Expand All @@ -621,63 +620,56 @@ pub(super) mod tests {
}

async fn read_log_files() -> TestResult {
let store = TestTables::SimpleWithCheckpoint
let log_store = TestTables::SimpleWithCheckpoint
.table_builder()
.build_storage()?
.object_store(None);
.build_storage()?;
let store = log_store.object_store(None);

let log_path = Path::from("_delta_log");
let cp = read_last_checkpoint(store.as_ref(), &log_path)
.await?
.unwrap();
let cp = read_last_checkpoint(&store, &log_path).await?.unwrap();
assert_eq!(cp.version, 10);

let (log, check) = list_log_files_with_checkpoint(&cp, store.as_ref(), &log_path).await?;
let (log, check) = list_log_files_with_checkpoint(&cp, &store, &log_path).await?;
assert_eq!(log.len(), 0);
assert_eq!(check.len(), 1);

let (log, check) = list_log_files(store.as_ref(), &log_path, None, None).await?;
let (log, check) = list_log_files(&store, &log_path, None, None).await?;
assert_eq!(log.len(), 0);
assert_eq!(check.len(), 1);

let (log, check) = list_log_files(store.as_ref(), &log_path, Some(8), None).await?;
let (log, check) = list_log_files(&store, &log_path, Some(8), None).await?;
assert_eq!(log.len(), 9);
assert_eq!(check.len(), 0);

let segment = LogSegment::try_new(&Path::default(), None, store.as_ref()).await?;
let segment = LogSegment::try_new(&log_store, None).await?;
assert_eq!(segment.version, 10);
assert_eq!(segment.commit_files.len(), 0);
assert_eq!(segment.checkpoint_files.len(), 1);

let segment = LogSegment::try_new(&Path::default(), Some(8), store.as_ref()).await?;
let segment = LogSegment::try_new(&log_store, Some(8)).await?;
assert_eq!(segment.version, 8);
assert_eq!(segment.commit_files.len(), 9);
assert_eq!(segment.checkpoint_files.len(), 0);

let store = TestTables::Simple
.table_builder()
.build_storage()?
.object_store(None);
let log_store = TestTables::Simple.table_builder().build_storage()?;
let store = log_store.object_store(None);

let (log, check) = list_log_files(store.as_ref(), &log_path, None, None).await?;
let (log, check) = list_log_files(&store, &log_path, None, None).await?;
assert_eq!(log.len(), 5);
assert_eq!(check.len(), 0);

let (log, check) = list_log_files(store.as_ref(), &log_path, Some(2), None).await?;
let (log, check) = list_log_files(&store, &log_path, Some(2), None).await?;
assert_eq!(log.len(), 3);
assert_eq!(check.len(), 0);

Ok(())
}

async fn read_metadata() -> TestResult {
let store = TestTables::WithDvSmall
.table_builder()
.build_storage()?
.object_store(None);
let segment = LogSegment::try_new(&Path::default(), None, store.as_ref()).await?;
let log_store = TestTables::WithDvSmall.table_builder().build_storage()?;
let segment = LogSegment::try_new(&log_store, None).await?;
let (protocol, _metadata) = segment
.read_metadata(store.clone(), &Default::default())
.read_metadata(&log_store, &Default::default())
.await?;
let protocol = protocol.unwrap();

Expand All @@ -697,17 +689,17 @@ pub(super) mod tests {
.table_builder()
.load()
.await?;
let store = TestTables::LatestNotCheckpointed

let base_store = table_to_checkpoint.log_store().root_object_store(None);
let slow_list_store = Arc::new(slow_store::SlowListStore { store: base_store });
let slow_log_store = TestTables::LatestNotCheckpointed
.table_builder()
.build_storage()?
.object_store(None);
let slow_list_store = Arc::new(slow_store::SlowListStore { store });
.with_storage_backend(slow_list_store, url::Url::parse("dummy:///").unwrap())
.build_storage()?;

let version = table_to_checkpoint.version();
let load_task: JoinHandle<Result<LogSegment, DeltaTableError>> = tokio::spawn(async move {
let segment =
LogSegment::try_new(&Path::default(), Some(version), slow_list_store.as_ref())
.await?;
let segment = LogSegment::try_new(&slow_log_store, Some(version)).await?;
Ok(segment)
});

Expand Down Expand Up @@ -870,25 +862,21 @@ pub(super) mod tests {
assert_eq!(commit.metrics.num_log_files_cleaned_up, 0);
assert!(!commit.metrics.new_checkpoint_created);

let batches = LogSegment::try_new(
&Path::default(),
Some(commit.version),
log_store.object_store(None).as_ref(),
)
.await
.unwrap()
.checkpoint_stream(
log_store.object_store(None),
&StructType::new(vec![
ActionType::Metadata.schema_field().clone(),
ActionType::Protocol.schema_field().clone(),
ActionType::Add.schema_field().clone(),
]),
&Default::default(),
)
.try_collect::<Vec<_>>()
.await
.unwrap();
let batches = LogSegment::try_new(&log_store, Some(commit.version))
.await
.unwrap()
.checkpoint_stream(
&log_store,
&StructType::new(vec![
ActionType::Metadata.schema_field().clone(),
ActionType::Protocol.schema_field().clone(),
ActionType::Add.schema_field().clone(),
]),
&Default::default(),
)
.try_collect::<Vec<_>>()
.await
.unwrap();

let batch = arrow::compute::concat_batches(&batches[0].schema(), batches.iter()).unwrap();

Expand Down
Loading
Loading