Skip to content

Commit 3229965

Browse files
committed
Use pruning interval as control plane cooldown
1 parent 4a3b07b commit 3229965

File tree

7 files changed

+109
-15
lines changed

7 files changed

+109
-15
lines changed

quickwit/quickwit-control-plane/src/control_plane.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ pub(crate) const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, featur
7373
};
7474

7575
/// Minimum period between two identical shard pruning operations.
76-
const PRUNE_SHARDS_COOLDOWN_PERIOD: Duration = Duration::from_secs(120);
76+
const PRUNE_SHARDS_DEFAULT_COOLDOWN_PERIOD: Duration = Duration::from_secs(120);
7777

7878
/// Minimum period between two rebuild plan operations.
7979
const REBUILD_PLAN_COOLDOWN_PERIOD: Duration = Duration::from_secs(2);
@@ -796,19 +796,27 @@ impl Handler<DebouncedPruneShardsRequest> for ControlPlane {
796796
return Ok(Ok(EmptyResponse {}));
797797
}
798798

799-
self.prune_shards_debouncers
799+
let debouncer = self
800+
.prune_shards_debouncers
800801
.entry((
801802
metastore_request.index_uid().index_id.clone(),
802803
metastore_request.source_id.clone(),
803804
))
804-
.or_insert_with(|| Debouncer::new(PRUNE_SHARDS_COOLDOWN_PERIOD))
805-
.self_send_with_cooldown(
806-
ctx,
807-
DebouncedPruneShardsRequest {
808-
request: Some(metastore_request),
809-
execute_now: true,
810-
},
811-
);
805+
.or_insert_with(|| Debouncer::new(PRUNE_SHARDS_DEFAULT_COOLDOWN_PERIOD));
806+
807+
// Update the cooldown period in case the config changes. If multiple
808+
// indexers run with different configs, the control plane ends up
809+
// running one of them in a seemingly arbitrary way.
810+
if let Some(interval) = metastore_request.interval {
811+
debouncer.set_next_cooldown_period(Duration::from_secs(interval as u64));
812+
}
813+
debouncer.self_send_with_cooldown(
814+
ctx,
815+
DebouncedPruneShardsRequest {
816+
request: Some(metastore_request),
817+
execute_now: true,
818+
},
819+
);
812820
Ok(Ok(EmptyResponse {}))
813821
}
814822
}

quickwit/quickwit-control-plane/src/debouncer.rs

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,10 @@ impl Debouncer {
132132
DebouncerState::CooldownNotScheduled | DebouncerState::CooldownScheduled => {}
133133
}
134134
}
135+
136+
pub fn set_next_cooldown_period(&mut self, period: Duration) {
137+
self.cooldown_period = period;
138+
}
135139
}
136140

137141
#[cfg(test)]
@@ -163,6 +167,11 @@ mod tests {
163167
#[derive(Debug)]
164168
struct DebouncedIncrement;
165169

170+
#[derive(Debug, Default, Clone)]
171+
struct UpdateCooldownPeriod {
172+
cooldown_period: Duration,
173+
}
174+
166175
#[async_trait]
167176
impl Actor for DebouncingActor {
168177
type ObservableState = usize;
@@ -200,10 +209,25 @@ mod tests {
200209
}
201210
}
202211

212+
#[async_trait]
213+
impl Handler<UpdateCooldownPeriod> for DebouncingActor {
214+
type Reply = ();
215+
216+
async fn handle(
217+
&mut self,
218+
message: UpdateCooldownPeriod,
219+
_ctx: &ActorContext<Self>,
220+
) -> Result<Self::Reply, ActorExitStatus> {
221+
self.debouncer
222+
.set_next_cooldown_period(message.cooldown_period);
223+
Ok(())
224+
}
225+
}
226+
203227
#[tokio::test]
204228
async fn test_debouncer() {
205229
let universe = Universe::default();
206-
let cooldown_period = Duration::from_millis(1_000);
230+
let cooldown_period = Duration::from_millis(100);
207231
let debouncer = DebouncingActor::new(cooldown_period);
208232
let (debouncer_mailbox, debouncer_handle) = universe.spawn_builder().spawn(debouncer);
209233
{
@@ -237,4 +261,51 @@ mod tests {
237261
}
238262
universe.assert_quit().await;
239263
}
264+
265+
#[tokio::test]
266+
async fn test_debouncer_cooldown_update() {
267+
let universe = Universe::default();
268+
let cooldown_period = Duration::from_millis(100);
269+
let debouncer = DebouncingActor::new(cooldown_period);
270+
let (debouncer_mailbox, debouncer_handle) = universe.spawn_builder().spawn(debouncer);
271+
for _ in 0..10 {
272+
debouncer_mailbox.ask(DebouncedIncrement).await.unwrap();
273+
let count = *debouncer_handle.process_pending_and_observe().await;
274+
assert_eq!(count, 1);
275+
}
276+
{
277+
universe.sleep(cooldown_period.mul_f32(1.2)).await;
278+
let count = *debouncer_handle.process_pending_and_observe().await;
279+
assert_eq!(count, 2);
280+
}
281+
// at this point, we are still cooling down
282+
debouncer_mailbox
283+
.ask(UpdateCooldownPeriod {
284+
cooldown_period: cooldown_period.mul_f32(2.0),
285+
})
286+
.await
287+
.unwrap();
288+
{
289+
universe.sleep(cooldown_period.mul_f32(1.2)).await;
290+
let count = *debouncer_handle.process_pending_and_observe().await;
291+
assert_eq!(count, 2);
292+
}
293+
// the previous cooldown now expired, the new one will apply
294+
for _ in 0..10 {
295+
debouncer_mailbox.ask(DebouncedIncrement).await.unwrap();
296+
let count = *debouncer_handle.process_pending_and_observe().await;
297+
assert_eq!(count, 3);
298+
}
299+
{
300+
universe.sleep(cooldown_period.mul_f32(1.2)).await;
301+
let count = *debouncer_handle.process_pending_and_observe().await;
302+
assert_eq!(count, 3);
303+
}
304+
{
305+
universe.sleep(cooldown_period.mul_f32(1.2)).await;
306+
let count = *debouncer_handle.process_pending_and_observe().await;
307+
assert_eq!(count, 4);
308+
}
309+
universe.assert_quit().await;
310+
}
240311
}

quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ impl QueueSharedState {
100100
source_id: source_id.clone(),
101101
max_age,
102102
max_count,
103+
interval: Some(pruning_interval.as_secs() as u32),
103104
})
104105
.await;
105106
if let Err(err) = result {

quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,7 @@ mod tests {
641641
source_id: source_id.clone(),
642642
max_age: None,
643643
max_count: None,
644+
interval: None,
644645
};
645646
let MutationOccurred::No(()) = shards.prune_shards(request).unwrap() else {
646647
panic!("expected `MutationOccurred::No`");
@@ -651,6 +652,7 @@ mod tests {
651652
source_id: source_id.clone(),
652653
max_age: Some(50),
653654
max_count: None,
655+
interval: None,
654656
};
655657
let MutationOccurred::No(()) = shards.prune_shards(request).unwrap() else {
656658
panic!("expected `MutationOccurred::No`");
@@ -687,6 +689,7 @@ mod tests {
687689
source_id: source_id.clone(),
688690
max_age: Some(150),
689691
max_count: None,
692+
interval: None,
690693
};
691694
let MutationOccurred::Yes(()) = shards.prune_shards(request).unwrap() else {
692695
panic!("expected `MutationOccurred::Yes`");
@@ -697,6 +700,7 @@ mod tests {
697700
source_id: source_id.clone(),
698701
max_age: Some(150),
699702
max_count: None,
703+
interval: None,
700704
};
701705
let MutationOccurred::No(()) = shards.prune_shards(request).unwrap() else {
702706
panic!("expected `MutationOccurred::No`");

quickwit/quickwit-metastore/src/tests/shard.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,7 @@ pub async fn test_metastore_prune_shards<
654654
source_id: test_index.source_id.clone(),
655655
max_age: None,
656656
max_count: None,
657+
interval: None,
657658
};
658659
metastore.prune_shards(prune_index_request).await.unwrap();
659660
let all_shards = metastore
@@ -669,6 +670,7 @@ pub async fn test_metastore_prune_shards<
669670
source_id: test_index.source_id.clone(),
670671
max_age: Some(oldest_shard_age - 350),
671672
max_count: None,
673+
interval: None,
672674
};
673675
metastore.prune_shards(prune_index_request).await.unwrap();
674676

@@ -688,6 +690,7 @@ pub async fn test_metastore_prune_shards<
688690
source_id: test_index.source_id.clone(),
689691
max_age: None,
690692
max_count: Some(90),
693+
interval: None,
691694
};
692695
metastore.prune_shards(prune_index_request).await.unwrap();
693696
let mut all_shards = metastore
@@ -705,6 +708,7 @@ pub async fn test_metastore_prune_shards<
705708
source_id: test_index.source_id.clone(),
706709
max_age: Some(oldest_shard_age - 2950),
707710
max_count: Some(80),
711+
interval: None,
708712
};
709713
metastore.prune_shards(prune_index_request).await.unwrap();
710714
let all_shards = metastore
@@ -718,6 +722,7 @@ pub async fn test_metastore_prune_shards<
718722
source_id: test_index.source_id.clone(),
719723
max_age: Some(oldest_shard_age - 4000),
720724
max_count: Some(50),
725+
interval: None,
721726
};
722727
metastore.prune_shards(prune_index_request).await.unwrap();
723728
let all_shards = metastore

quickwit/quickwit-proto/protos/quickwit/metastore.proto

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -458,9 +458,11 @@ message PruneShardsRequest {
458458
quickwit.common.IndexUid index_uid = 1;
459459
string source_id = 2;
460460
// The maximum age of the shards to keep, in seconds.
461-
optional uint32 max_age = 5;
461+
optional uint32 max_age = 3;
462462
// The maximum number of the shards to keep. Delete older shards first.
463-
optional uint32 max_count = 6;
463+
optional uint32 max_count = 4;
464+
// The interval between two pruning operations, in seconds.
465+
optional uint32 interval = 5;
464466
}
465467

466468
message ListShardsRequest {

quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs

Lines changed: 5 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)