Skip to content

feat: during LakeFS file operations, skip merge when 0 changes #3346

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 99 additions & 2 deletions crates/lakefs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use deltalake_core::DeltaResult;
use reqwest::Client;
use reqwest::StatusCode;
use serde::Deserialize;
use serde_json::json;
use serde_json::{json, Value};
use tracing::debug;
use url::Url;
use uuid::Uuid;
Expand Down Expand Up @@ -195,7 +195,7 @@ impl LakeFSClient {
"squash_merge": true,
});

debug!("Merging LakeFS, source `{transaction_branch}` into target `{transaction_branch}` in repo: {repo}");
debug!("Merging LakeFS, source `{transaction_branch}` into target `{target_branch}` in repo: {repo}");
let response = self
.http_client
.post(&request_url)
Expand Down Expand Up @@ -223,6 +223,56 @@ impl LakeFSClient {
}
}

pub async fn has_changes(
&self,
repo: &str,
base_branch: &str,
compare_branch: &str,
) -> Result<bool, TransactionError> {
let request_url = format!(
"{}/api/v1/repositories/{repo}/refs/{base_branch}/diff/{compare_branch}",
self.config.host
);

debug!("Checking for changes from `{base_branch}` to `{compare_branch}` in repo: {repo}");
let response = self
.http_client
.get(&request_url)
.basic_auth(&self.config.username, Some(&self.config.password))
.send()
.await
.map_err(|e| LakeFSOperationError::HttpRequestFailed { source: e })?;

match response.status() {
StatusCode::OK => {
// Parse the response to check if there are any differences
#[derive(Deserialize, Debug)]
struct DiffResponse {
results: Vec<Value>,
}

let diff: DiffResponse = response
.json()
.await
.map_err(|e| LakeFSOperationError::HttpRequestFailed { source: e })?;

// If there are any results in the diff, there are changes
Ok(!diff.results.is_empty())
}
StatusCode::UNAUTHORIZED => Err(LakeFSOperationError::UnauthorizedAction.into()),
_ => {
let error: LakeFSErrorResponse =
response
.json()
.await
.unwrap_or_else(|_| LakeFSErrorResponse {
message: "Unknown error occurred.".to_string(),
});
Err(LakeFSOperationError::MergeFailed(error.message).into())
}
}
}

pub fn set_transaction(&self, id: Uuid, branch: String) {
self.transactions.insert(id, branch);
debug!("{}", format!("LakeFS Transaction `{id}` has been set."));
Expand Down Expand Up @@ -435,4 +485,51 @@ mod tests {
let result = client.get_transaction(transaction_id);
assert!(result.is_err());
}

#[test]
fn test_has_changes() {
// Test cases with different parameters
let test_cases = vec![
("with_changes", r#"{"results": [{"some": "change"}]}"#, true),
("without_changes", r#"{"results": []}"#, false),
];

for (test_name, response_body, expected_has_changes) in test_cases {
let mut server = mockito::Server::new();
let mock = server
.mock(
"GET",
"/api/v1/repositories/test_repo/refs/base_branch/diff/compare_branch",
)
.with_status(StatusCode::OK.as_u16().into())
.with_body(response_body)
.create();

let config = LakeFSConfig::new(
server.url(),
"test_user".to_string(),
"test_pass".to_string(),
);
let client = LakeFSClient::with_config(config);

let result = rt().block_on(async {
client
.has_changes("test_repo", "base_branch", "compare_branch")
.await
});

assert!(
result.is_ok(),
"Test case '{}' failed: API call returned error",
test_name
);
let has_changes = result.unwrap();
assert_eq!(
has_changes, expected_has_changes,
"Test case '{}' failed: expected has_changes to be {}",
test_name, expected_has_changes
);
mock.assert();
}
}
}
11 changes: 11 additions & 0 deletions crates/lakefs/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,16 @@ mod tests {
.with_status(StatusCode::CREATED.as_u16().into())
.create();

let diff_mock = server
.mock(
"GET",
format!("/api/v1/repositories/repo/refs/branch/diff/delta-tx-{operation_id}")
.as_str(),
)
.with_status(StatusCode::OK.as_u16().into())
.with_body(r#"{"results": [{"some": "change"}]}"#)
.create();

let merge_branch_mock = server
.mock(
"POST",
Expand Down Expand Up @@ -293,6 +303,7 @@ mod tests {
});

create_commit_mock.assert();
diff_mock.assert();
merge_branch_mock.assert();
delete_branch_mock.assert();
assert!(result.is_ok());
Expand Down
76 changes: 47 additions & 29 deletions crates/lakefs/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,44 +141,62 @@ impl LakeFSLogStore {
let (repo, transaction_branch, table) = self.client.decompose_url(transaction_url);
self.client
.commit(
repo,
transaction_branch,
repo.clone(),
transaction_branch.clone(),
format!("Delta file operations {{ table: {table}}}"),
true, // Needs to be true, it could be a file operation but no logs were deleted.
)
.await?;

// Try LakeFS Branch merge of transaction branch in source branch
// Get target branch information
let (repo, target_branch, table) =
self.client.decompose_url(self.config.location.to_string());
match self

// Check if there are any changes before attempting to merge
let has_changes = self
.client
.merge(
repo,
target_branch,
self.client.get_transaction(operation_id)?,
0,
format!("Finished delta file operations {{ table: {table}}}"),
true, // Needs to be true, it could be a file operation but no logs were deleted.
)
.has_changes(&repo, &target_branch, &transaction_branch)
.await
{
Ok(_) => {
let (repo, _, _) = self.client.decompose_url(self.config.location.to_string());
self.client
.delete_branch(repo, self.client.get_transaction(operation_id)?)
.await?;
Ok(())
}
// TODO: propagate better LakeFS errors.
Err(TransactionError::VersionAlreadyExists(_)) => {
Err(TransactionError::LogStoreError {
msg: "Merge Failed".to_string(),
source: Box::new(DeltaTableError::generic("Merge Failed")),
})
}
Err(err) => Err(err),
}?;
.map_err(|e| DeltaTableError::generic(format!("Failed to check for changes: {}", e)))?;

// Only perform merge if there are changes
if has_changes {
debug!("Changes detected, proceeding with merge");
match self
.client
.merge(
repo,
target_branch,
self.client.get_transaction(operation_id)?,
0,
format!("Finished delta file operations {{ table: {table}}}"),
true, // Needs to be true, it could be a file operation but no logs were deleted.
)
.await
{
Ok(_) => {
// Merge successful
}
// TODO: propagate better LakeFS errors.
Err(TransactionError::VersionAlreadyExists(_)) => {
return Err(DeltaTableError::Transaction {
source: TransactionError::LogStoreError {
msg: "Merge Failed".to_string(),
source: Box::new(DeltaTableError::generic("Merge Failed")),
},
});
}
Err(err) => return Err(DeltaTableError::Transaction { source: err }),
};
} else {
debug!("No changes detected, skipping merge");
}

// Always delete the transaction branch when done
let (repo, _, _) = self.client.decompose_url(self.config.location.to_string());
self.client
.delete_branch(repo, self.client.get_transaction(operation_id)?)
.await?;

self.client.clear_transaction(operation_id);
Ok(())
Expand Down
28 changes: 28 additions & 0 deletions python/tests/test_lakefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,34 @@ def test_checkpoint(sample_data: pa.Table, lakefs_storage_options, lakefs_client
assert branch.object(checkpoint_path).exists()


@pytest.mark.lakefs
@pytest.mark.integration
def test_no_empty_commits(
lakefs_path, sample_data: pa.Table, lakefs_storage_options, lakefs_client
):
import lakefs

write_deltalake(
lakefs_path, sample_data, mode="append", storage_options=lakefs_storage_options
)
dt = DeltaTable(lakefs_path, storage_options=lakefs_storage_options)

# Get current branch head commit before operation
branch = lakefs.Branch(
repository_id="bronze", branch_id="main", client=lakefs_client
)
commits_before = list(branch.log())
before_commit_id = commits_before[0].id if commits_before else None

# Since there should be no files to vacuum in a fresh table this should be a no-op operation
dt.vacuum(dry_run=False)

commits_after = list(branch.log())
after_commit_id = commits_after[0].id if commits_after else None

assert before_commit_id == after_commit_id, "Empty commit should be skipped"


@pytest.mark.lakefs
@pytest.mark.integration
def test_storage_options(sample_data: pa.Table):
Expand Down
Loading