@@ -94,23 +94,23 @@ pub async fn run_garbage_collect(
94
94
dry_run : bool ,
95
95
progress_opt : Option < & Progress > ,
96
96
) -> anyhow:: Result < SplitRemovalInfo > {
97
-
98
-
99
-
100
97
let grace_period_timestamp =
101
98
OffsetDateTime :: now_utc ( ) . unix_timestamp ( ) - staged_grace_period. as_secs ( ) as i64 ;
102
99
103
100
let index_uids: Vec < IndexUid > = indexes. keys ( ) . cloned ( ) . collect ( ) ;
104
101
105
- let Some ( list_splits_query_for_index_uids) = ListSplitsQuery :: try_from_index_uids ( index_uids. clone ( ) ) else {
106
- return Ok ( SplitRemovalInfo :: default ( ) )
102
+ let Some ( list_splits_query_for_index_uids) =
103
+ ListSplitsQuery :: try_from_index_uids ( index_uids. clone ( ) )
104
+ else {
105
+ return Ok ( SplitRemovalInfo :: default ( ) ) ;
107
106
} ;
108
107
let list_splits_query = list_splits_query_for_index_uids
109
108
. clone ( )
110
109
. with_split_state ( SplitState :: Staged )
111
110
. with_update_timestamp_lte ( grace_period_timestamp) ;
112
111
113
- let list_deletable_staged_request = ListSplitsRequest :: try_from_list_splits_query ( & list_splits_query) ?;
112
+ let list_deletable_staged_request =
113
+ ListSplitsRequest :: try_from_list_splits_query ( & list_splits_query) ?;
114
114
let deletable_staged_splits: Vec < SplitMetadata > = protect_future (
115
115
progress_opt,
116
116
metastore. list_splits ( list_deletable_staged_request) ,
@@ -120,8 +120,8 @@ pub async fn run_garbage_collect(
120
120
. await ?;
121
121
122
122
if dry_run {
123
- let marked_for_deletion_query = list_splits_query_for_index_uids
124
- . with_split_state ( SplitState :: MarkedForDeletion ) ;
123
+ let marked_for_deletion_query =
124
+ list_splits_query_for_index_uids . with_split_state ( SplitState :: MarkedForDeletion ) ;
125
125
let marked_for_deletion_request =
126
126
ListSplitsRequest :: try_from_list_splits_query ( & marked_for_deletion_query) ?;
127
127
let mut splits_marked_for_deletion: Vec < SplitMetadata > = protect_future (
@@ -244,15 +244,20 @@ async fn delete_splits(
244
244
use anyhow:: Context ;
245
245
246
246
/// Fetch the list metadata from the metastore and returns them as a Vec.
247
- async fn list_splits_metadata ( metastore : & MetastoreServiceClient , query : & ListSplitsQuery ) -> anyhow:: Result < Vec < SplitMetadata > > {
247
+ async fn list_splits_metadata (
248
+ metastore : & MetastoreServiceClient ,
249
+ query : & ListSplitsQuery ,
250
+ ) -> anyhow:: Result < Vec < SplitMetadata > > {
248
251
let list_splits_request = ListSplitsRequest :: try_from_list_splits_query ( & query)
249
252
. context ( "failed to build list splits request" ) ?;
250
253
let metastore = metastore. clone ( ) ;
251
- let splits_to_delete_stream =
252
- metastore
253
- . list_splits ( list_splits_request) . await
254
- . context ( "failed to fetch stream splits" ) ?;
255
- let splits = splits_to_delete_stream. collect_splits_metadata ( ) . await
254
+ let splits_to_delete_stream = metastore
255
+ . list_splits ( list_splits_request)
256
+ . await
257
+ . context ( "failed to fetch stream splits" ) ?;
258
+ let splits = splits_to_delete_stream
259
+ . collect_splits_metadata ( )
260
+ . await
256
261
. context ( "failed to collect splits" ) ?;
257
262
Ok ( splits)
258
263
}
@@ -284,7 +289,12 @@ async fn delete_splits_marked_for_deletion_several_indexes(
284
289
. sort_by_index_uid ( ) ;
285
290
286
291
loop {
287
- let splits_metadata_to_delete: Vec < SplitMetadata > = match protect_future ( progress_opt, list_splits_metadata ( & metastore, & list_splits_query) ) . await {
292
+ let splits_metadata_to_delete: Vec < SplitMetadata > = match protect_future (
293
+ progress_opt,
294
+ list_splits_metadata ( & metastore, & list_splits_query) ,
295
+ )
296
+ . await
297
+ {
288
298
Ok ( splits) => splits,
289
299
Err ( list_splits_err) => {
290
300
error ! ( error=?list_splits_err, "failed to list splits" ) ;
0 commit comments