Skip to content

feat: introduce VacuumMode::Full for cleaning up orphaned files #3368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 86 additions & 11 deletions crates/core/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use futures::{StreamExt, TryStreamExt};
use object_store::Error;
use object_store::{path::Path, ObjectStore};
use serde::Serialize;
use tracing::log::*;

use super::{CustomExecuteHandler, Operation};
use crate::errors::{DeltaResult, DeltaTableError};
Expand Down Expand Up @@ -76,6 +77,20 @@ pub trait Clock: Debug + Send + Sync {
fn current_timestamp_millis(&self) -> i64;
}

/// Type of Vacuum operation to perform
#[derive(Debug, Default, Clone, PartialEq)]
pub enum VacuumMode {
/// The `lite` mode will only remove files which are referenced in the `_delta_log` associated
/// with `remove` action
#[default]
Lite,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also expose the mode to python 😊

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you be up for adding this exposure after we merge? 😈

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

/// A `full` mode vacuum will remove _all_ data files no longer actively referenced in the
/// `_delta_log` table. For example, if parquet files exist in the table directory but are no
/// longer mentioned as `add` actions in the transaction log, then this mode will scan storage
/// and remove those files.
Full,
}

/// Vacuum a Delta table with the given options
/// See this module's documentation for more information
pub struct VacuumBuilder {
Expand All @@ -89,6 +104,8 @@ pub struct VacuumBuilder {
enforce_retention_duration: bool,
/// Don't delete the files. Just determine which files can be deleted
dry_run: bool,
/// Mode of vacuum that should be run
mode: VacuumMode,
/// Override the source of time
clock: Option<Arc<dyn Clock>>,
/// Additional information to add to the commit
Expand Down Expand Up @@ -144,6 +161,7 @@ impl VacuumBuilder {
retention_period: None,
enforce_retention_duration: true,
dry_run: false,
mode: VacuumMode::Lite,
clock: None,
commit_properties: CommitProperties::default(),
custom_execute_handler: None,
Expand All @@ -156,6 +174,12 @@ impl VacuumBuilder {
self
}

/// Override the default vacuum mode (lite)
pub fn with_mode(mut self, mode: VacuumMode) -> Self {
self.mode = mode;
self
}

/// Only determine which files should be deleted
pub fn with_dry_run(mut self, dry_run: bool) -> Self {
self.dry_run = dry_run;
Expand Down Expand Up @@ -189,6 +213,10 @@ impl VacuumBuilder {

/// Determine which files can be deleted. Does not actually perform the deletion
async fn create_vacuum_plan(&self) -> Result<VacuumPlan, VacuumError> {
if self.mode == VacuumMode::Full {
info!("Vacuum configured to run with 'VacuumMode::Full'. It will scan for orphaned parquet files in the Delta table directory and remove those as well!");
}

let min_retention = Duration::milliseconds(
self.snapshot
.table_config()
Expand Down Expand Up @@ -228,12 +256,24 @@ impl VacuumBuilder {
while let Some(obj_meta) = all_files.next().await {
// TODO should we allow NotFound here in case we have a temporary commit file in the list
let obj_meta = obj_meta.map_err(DeltaTableError::from)?;
if valid_files.contains(&obj_meta.location) // file is still being tracked in table
|| !expired_tombstones.contains(obj_meta.location.as_ref()) // file is not an expired tombstone
|| is_hidden_directory(partition_columns, &obj_meta.location)?
{
// file is still being tracked in table
if valid_files.contains(&obj_meta.location) {
continue;
}
if is_hidden_directory(partition_columns, &obj_meta.location)? {
continue;
}
// file is not an expired tombstone _and_ this is a "Lite" vacuum
// If the file is not an expired tombstone and we have gotten to here with a
// VacuumMode::Full then it should be added to the deletion plan
if !expired_tombstones.contains(obj_meta.location.as_ref()) {
if self.mode == VacuumMode::Lite {
debug!("The file {:?} was not referenced in a log file, but VacuumMode::Lite means it will not be vacuumed", &obj_meta.location);
continue;
} else {
debug!("The file {:?} was not referenced in a log file, but VacuumMode::Full means it *will be vacuumed*", &obj_meta.location);
}
}

files_to_delete.push(obj_meta.location);
file_sizes.push(obj_meta.size as i64);
Expand Down Expand Up @@ -436,7 +476,44 @@ mod tests {
use std::time::SystemTime;

#[tokio::test]
async fn vacuum_delta_8_0_table() {
async fn test_vacuum_full() -> DeltaResult<()> {
let table = open_table("../test/tests/data/simple_commit").await?;

let (_table, result) = VacuumBuilder::new(table.log_store(), table.snapshot()?.clone())
.with_retention_period(Duration::hours(0))
.with_dry_run(true)
.with_mode(VacuumMode::Lite)
.with_enforce_retention_duration(false)
.await?;
// When running lite, this table with superfluous parquet files should not have anything to
// delete
assert!(result.files_deleted.is_empty());

let (_table, result) = VacuumBuilder::new(table.log_store(), table.snapshot()?.clone())
.with_retention_period(Duration::hours(0))
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await?;
let mut files_deleted = result.files_deleted.clone();
files_deleted.sort();
// When running with full, these superfluous parquet files which are not actually
// referenced in the _delta_log commits should be considered for the
// low-orbit ion-cannon
assert_eq!(
files_deleted,
vec![
"part-00000-512e1537-8aaa-4193-b8b4-bef3de0de409-c000.snappy.parquet",
"part-00000-b44fcdb0-8b06-4f3a-8606-f8311a96f6dc-c000.snappy.parquet",
"part-00001-185eca06-e017-4dea-ae49-fc48b973e37e-c000.snappy.parquet",
"part-00001-4327c977-2734-4477-9507-7ccf67924649-c000.snappy.parquet",
]
);
Ok(())
}

#[tokio::test]
async fn vacuum_delta_8_0_table() -> DeltaResult<()> {
let table = open_table("../test/tests/data/delta-0.8.0").await.unwrap();

let result = VacuumBuilder::new(table.log_store(), table.snapshot().unwrap().clone())
Expand All @@ -453,8 +530,7 @@ mod tests {
.with_retention_period(Duration::hours(0))
.with_dry_run(true)
.with_enforce_retention_duration(false)
.await
.unwrap();
.await?;
// do not enforce retention duration check with 0 hour will purge all files
assert_eq!(
result.files_deleted,
Expand All @@ -465,8 +541,7 @@ mod tests {
VacuumBuilder::new(table.log_store(), table.snapshot().unwrap().clone())
.with_retention_period(Duration::hours(169))
.with_dry_run(true)
.await
.unwrap();
.await?;

assert_eq!(
result.files_deleted,
Expand All @@ -483,9 +558,9 @@ mod tests {
VacuumBuilder::new(table.log_store(), table.snapshot().unwrap().clone())
.with_retention_period(Duration::hours(retention_hours as i64))
.with_dry_run(true)
.await
.unwrap();
.await?;

assert_eq!(result.files_deleted, empty);
Ok(())
}
}
1 change: 1 addition & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class RawDeltaTable:
enforce_retention_duration: bool,
commit_properties: CommitProperties | None,
post_commithook_properties: PostCommitHookProperties | None,
full: bool,
) -> list[str]: ...
def compact_optimize(
self,
Expand Down
3 changes: 3 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ def vacuum(
enforce_retention_duration: bool = True,
post_commithook_properties: PostCommitHookProperties | None = None,
commit_properties: CommitProperties | None = None,
full: bool = False,
) -> list[str]:
"""
Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.
Expand All @@ -526,6 +527,7 @@ def vacuum(
enforce_retention_duration: when disabled, accepts retention hours smaller than the value from `delta.deletedFileRetentionDuration`.
post_commithook_properties: properties for the post commit hook. If None, default values are used.
commit_properties: properties of the transaction commit. If None, default values are used.
full: when set to True, will perform a "full" vacuum and remove all files not referenced in the transaction log
Returns:
the list of files no longer referenced by the Delta Table and are older than the retention threshold.
"""
Expand All @@ -539,6 +541,7 @@ def vacuum(
enforce_retention_duration,
commit_properties,
post_commithook_properties,
full,
)

def update(
Expand Down
11 changes: 9 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use deltalake::operations::restore::RestoreBuilder;
use deltalake::operations::set_tbl_properties::SetTablePropertiesBuilder;
use deltalake::operations::update::UpdateBuilder;
use deltalake::operations::update_field_metadata::UpdateFieldMetadataBuilder;
use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::operations::vacuum::{VacuumBuilder, VacuumMode};
use deltalake::operations::write::WriteBuilder;
use deltalake::operations::CustomExecuteHandler;
use deltalake::parquet::basic::Compression;
Expand Down Expand Up @@ -477,7 +477,8 @@ impl RawDeltaTable {

/// Run the Vacuum command on the Delta Table: list and delete files no longer referenced
/// by the Delta table and are older than the retention threshold.
#[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true, commit_properties=None, post_commithook_properties=None))]
#[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true, commit_properties=None, post_commithook_properties=None, full = false))]
#[allow(clippy::too_many_arguments)]
pub fn vacuum(
&self,
py: Python,
Expand All @@ -486,6 +487,7 @@ impl RawDeltaTable {
enforce_retention_duration: bool,
commit_properties: Option<PyCommitProperties>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
full: bool,
) -> PyResult<Vec<String>> {
let (table, metrics) = py.allow_threads(|| {
let snapshot = match self._table.lock() {
Expand All @@ -499,6 +501,11 @@ impl RawDeltaTable {
let mut cmd = VacuumBuilder::new(self.log_store()?, snapshot)
.with_enforce_retention_duration(enforce_retention_duration)
.with_dry_run(dry_run);

if full {
cmd = cmd.with_mode(VacuumMode::Full);
}

if let Some(retention_period) = retention_hours {
cmd = cmd.with_retention_period(Duration::hours(retention_period as i64));
}
Expand Down
Loading