Skip to content

Commit d8a89fc

Browse files
committed
Add debounce to shard prune request
1 parent 7bd61e3 commit d8a89fc

File tree

14 files changed

+383
-130
lines changed

14 files changed

+383
-130
lines changed

quickwit/quickwit-control-plane/src/control_plane.rs

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ use quickwit_ingest::{IngesterPool, LocalShardsUpdate};
4242
use quickwit_metastore::{CreateIndexRequestExt, CreateIndexResponseExt, IndexMetadataResponseExt};
4343
use quickwit_proto::control_plane::{
4444
AdviseResetShardsRequest, AdviseResetShardsResponse, ControlPlaneError, ControlPlaneResult,
45-
GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSubrequest,
45+
DebouncedPruneShardsRequest, GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse,
46+
GetOrCreateOpenShardsSubrequest,
4647
};
4748
use quickwit_proto::indexing::ShardPositionsUpdate;
4849
use quickwit_proto::metastore::{
@@ -51,7 +52,7 @@ use quickwit_proto::metastore::{
5152
IndexMetadataResponse, IndexTemplateMatch, MetastoreError, MetastoreResult, MetastoreService,
5253
MetastoreServiceClient, ToggleSourceRequest, UpdateIndexRequest,
5354
};
54-
use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceUid};
55+
use quickwit_proto::types::{IndexId, IndexUid, NodeId, ShardId, SourceId, SourceUid};
5556
use serde::Serialize;
5657
use serde_json::{json, Value as JsonValue};
5758
use tokio::sync::watch;
@@ -71,13 +72,16 @@ pub(crate) const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, featur
7172
Duration::from_secs(5)
7273
};
7374

75+
/// Minimum period between two identical shard pruning operations.
76+
const PRUNE_SHARDS_COOLDOWN_PERIOD: Duration = Duration::from_secs(120);
77+
7478
/// Minimum period between two rebuild plan operations.
7579
const REBUILD_PLAN_COOLDOWN_PERIOD: Duration = Duration::from_secs(2);
7680

7781
#[derive(Debug)]
7882
struct ControlPlanLoop;
7983

80-
#[derive(Debug, Default)]
84+
#[derive(Debug, Default, Clone)]
8185
struct RebuildPlan;
8286

8387
pub struct ControlPlane {
@@ -94,6 +98,7 @@ pub struct ControlPlane {
9498
ingest_controller: IngestController,
9599
metastore: MetastoreServiceClient,
96100
model: ControlPlaneModel,
101+
prune_shards_debouncers: HashMap<(IndexId, SourceId), Debouncer>,
97102
rebuild_plan_debouncer: Debouncer,
98103
readiness_tx: watch::Sender<bool>,
99104
// Disables the control loop. This is useful for unit testing.
@@ -177,6 +182,7 @@ impl ControlPlane {
177182
ingest_controller,
178183
metastore: metastore.clone(),
179184
model: Default::default(),
185+
prune_shards_debouncers: HashMap::new(),
180186
rebuild_plan_debouncer: Debouncer::new(REBUILD_PLAN_COOLDOWN_PERIOD),
181187
readiness_tx,
182188
disable_control_loop,
@@ -387,7 +393,7 @@ impl ControlPlane {
387393
.next_rebuild_tracker
388394
.next_rebuild_waiter();
389395
self.rebuild_plan_debouncer
390-
.self_send_with_cooldown::<RebuildPlan>(ctx);
396+
.self_send_with_cooldown(ctx, RebuildPlan);
391397
next_rebuild_waiter
392398
}
393399
}
@@ -769,6 +775,41 @@ impl Handler<DeleteSourceRequest> for ControlPlane {
769775
}
770776
}
771777

778+
#[async_trait]
779+
impl Handler<DebouncedPruneShardsRequest> for ControlPlane {
780+
type Reply = ControlPlaneResult<EmptyResponse>;
781+
782+
async fn handle(
783+
&mut self,
784+
request: DebouncedPruneShardsRequest,
785+
ctx: &ActorContext<Self>,
786+
) -> Result<ControlPlaneResult<EmptyResponse>, ActorExitStatus> {
787+
let metastore_request = request.request.unwrap();
788+
if request.execute_now {
789+
self.metastore
790+
.prune_shards(metastore_request)
791+
.await
792+
.context("failed to prune shards")?;
793+
return Ok(Ok(EmptyResponse {}));
794+
}
795+
796+
self.prune_shards_debouncers
797+
.entry((
798+
metastore_request.index_uid().index_id.clone(),
799+
metastore_request.source_id.clone(),
800+
))
801+
.or_insert_with(|| Debouncer::new(PRUNE_SHARDS_COOLDOWN_PERIOD))
802+
.self_send_with_cooldown(
803+
ctx,
804+
DebouncedPruneShardsRequest {
805+
request: Some(metastore_request),
806+
execute_now: true,
807+
},
808+
);
809+
Ok(Ok(EmptyResponse {}))
810+
}
811+
}
812+
772813
// This is neither a proxied call nor a metastore callback.
773814
#[async_trait]
774815
impl Handler<GetOrCreateOpenShardsRequest> for ControlPlane {

quickwit/quickwit-control-plane/src/debouncer.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,25 +91,25 @@ impl Debouncer {
9191
previous_state
9292
}
9393

94-
fn emit_message<A, M>(&self, ctx: &ActorContext<A>)
94+
fn emit_message<A, M>(&self, ctx: &ActorContext<A>, message: M)
9595
where
9696
A: Actor + Handler<M> + DeferableReplyHandler<M>,
97-
M: Default + std::fmt::Debug + Send + Sync + 'static,
97+
M: std::fmt::Debug + Send + Sync + 'static,
9898
{
99-
let _ = ctx.mailbox().send_message_with_high_priority(M::default());
99+
let _ = ctx.mailbox().send_message_with_high_priority(message);
100100
}
101101

102-
fn schedule_post_cooldown_callback<A, M>(&self, ctx: &ActorContext<A>)
102+
fn schedule_post_cooldown_callback<A, M>(&self, ctx: &ActorContext<A>, message: M)
103103
where
104104
A: Actor + Handler<M> + DeferableReplyHandler<M>,
105-
M: Default + std::fmt::Debug + Send + Sync + 'static,
105+
M: Clone + std::fmt::Debug + Send + Sync + 'static,
106106
{
107107
let ctx_clone = ctx.clone();
108108
let self_clone = self.clone();
109109
let callback = move || {
110110
let previous_state = self_clone.accept_transition(Transition::CooldownExpired);
111111
if previous_state == DebouncerState::CooldownScheduled {
112-
self_clone.self_send_with_cooldown(&ctx_clone);
112+
self_clone.self_send_with_cooldown(&ctx_clone, message);
113113
}
114114
};
115115
ctx.spawn_ctx()
@@ -119,14 +119,15 @@ impl Debouncer {
119119
pub fn self_send_with_cooldown<M>(
120120
&self,
121121
ctx: &ActorContext<impl Handler<M> + DeferableReplyHandler<M>>,
122+
message: M,
122123
) where
123-
M: Default + std::fmt::Debug + Send + Sync + 'static,
124+
M: Clone + std::fmt::Debug + Send + Sync + 'static,
124125
{
125126
let cooldown_state = self.accept_transition(Transition::Emit);
126127
match cooldown_state {
127128
DebouncerState::NoCooldown => {
128-
self.emit_message(ctx);
129-
self.schedule_post_cooldown_callback(ctx);
129+
self.emit_message(ctx, message.clone());
130+
self.schedule_post_cooldown_callback(ctx, message);
130131
}
131132
DebouncerState::CooldownNotScheduled | DebouncerState::CooldownScheduled => {}
132133
}
@@ -156,7 +157,7 @@ mod tests {
156157
}
157158
}
158159

159-
#[derive(Debug, Default)]
160+
#[derive(Debug, Default, Clone)]
160161
struct Increment;
161162

162163
#[derive(Debug)]
@@ -194,7 +195,7 @@ mod tests {
194195
_message: DebouncedIncrement,
195196
ctx: &ActorContext<Self>,
196197
) -> Result<Self::Reply, ActorExitStatus> {
197-
self.debouncer.self_send_with_cooldown::<Increment>(ctx);
198+
self.debouncer.self_send_with_cooldown(ctx, Increment);
198199
Ok(())
199200
}
200201
}

quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ use std::fmt;
2121

2222
use async_trait::async_trait;
2323
use quickwit_common::uri::Uri;
24-
use quickwit_proto::control_plane::{ControlPlaneService, ControlPlaneServiceClient};
24+
use quickwit_proto::control_plane::{
25+
ControlPlaneService, ControlPlaneServiceClient, DebouncedPruneShardsRequest,
26+
};
2527
use quickwit_proto::metastore::{
2628
AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest,
2729
CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest,
@@ -35,8 +37,8 @@ use quickwit_proto::metastore::{
3537
ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse,
3638
ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult, MetastoreService,
3739
MetastoreServiceClient, MetastoreServiceStream, OpenShardsRequest, OpenShardsResponse,
38-
PruneShardsRequest, PruneShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest,
39-
StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest,
40+
PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest,
41+
ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest,
4042
UpdateSplitsDeleteOpstampResponse,
4143
};
4244

@@ -120,6 +122,15 @@ impl MetastoreService for ControlPlaneMetastore {
120122
Ok(response)
121123
}
122124

125+
async fn prune_shards(&self, request: PruneShardsRequest) -> MetastoreResult<EmptyResponse> {
126+
let debounced_request = DebouncedPruneShardsRequest {
127+
request: Some(request),
128+
execute_now: false,
129+
};
130+
let response = self.control_plane.prune_shards(debounced_request).await?;
131+
Ok(response)
132+
}
133+
123134
// Other metastore API calls.
124135

125136
async fn index_metadata(
@@ -237,14 +248,6 @@ impl MetastoreService for ControlPlaneMetastore {
237248
self.metastore.delete_shards(request).await
238249
}
239250

240-
async fn prune_shards(
241-
&self,
242-
request: PruneShardsRequest,
243-
) -> MetastoreResult<PruneShardsResponse> {
244-
// TODO this call should go through the control plane which should apply debounce
245-
self.metastore.prune_shards(request).await
246-
}
247-
248251
// Index Template API
249252

250253
async fn create_index_template(

quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ use quickwit_proto::metastore::{
3737
AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest,
3838
DeleteShardsResponse, DeleteTask, EntityKind, ListShardsSubrequest, ListShardsSubresponse,
3939
MetastoreError, MetastoreResult, OpenShardSubrequest, OpenShardSubresponse, PruneShardsRequest,
40-
PruneShardsResponse,
4140
};
4241
use quickwit_proto::types::{IndexUid, PublishToken, SourceId, SplitId};
4342
use serde::{Deserialize, Serialize};
@@ -657,7 +656,7 @@ impl FileBackedIndex {
657656
pub(crate) fn prune_shards(
658657
&mut self,
659658
request: PruneShardsRequest,
660-
) -> MetastoreResult<MutationOccurred<PruneShardsResponse>> {
659+
) -> MetastoreResult<MutationOccurred<()>> {
661660
self.get_shards_for_source_mut(&request.source_id)?
662661
.prune_shards(request)
663662
}

quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use quickwit_proto::ingest::{Shard, ShardState};
2626
use quickwit_proto::metastore::{
2727
AcquireShardsRequest, AcquireShardsResponse, DeleteShardsRequest, DeleteShardsResponse,
2828
EntityKind, ListShardsSubrequest, ListShardsSubresponse, MetastoreError, MetastoreResult,
29-
OpenShardSubrequest, OpenShardSubresponse, PruneShardsRequest, PruneShardsResponse,
29+
OpenShardSubrequest, OpenShardSubresponse, PruneShardsRequest,
3030
};
3131
use quickwit_proto::types::{queue_id, IndexUid, Position, PublishToken, ShardId, SourceId};
3232
use time::OffsetDateTime;
@@ -242,7 +242,7 @@ impl Shards {
242242
pub(super) fn prune_shards(
243243
&mut self,
244244
request: PruneShardsRequest,
245-
) -> MetastoreResult<MutationOccurred<PruneShardsResponse>> {
245+
) -> MetastoreResult<MutationOccurred<()>> {
246246
let initial_shard_count = self.shards.len();
247247

248248
if let Some(max_age) = request.max_age {
@@ -267,14 +267,10 @@ impl Shards {
267267
}
268268
}
269269
}
270-
let response = PruneShardsResponse {
271-
index_uid: request.index_uid,
272-
source_id: request.source_id,
273-
};
274270
if initial_shard_count > self.shards.len() {
275-
Ok(MutationOccurred::Yes(response))
271+
Ok(MutationOccurred::Yes(()))
276272
} else {
277-
Ok(MutationOccurred::No(response))
273+
Ok(MutationOccurred::No(()))
278274
}
279275
}
280276

@@ -646,23 +642,19 @@ mod tests {
646642
max_age: None,
647643
max_count: None,
648644
};
649-
let MutationOccurred::No(response) = shards.prune_shards(request).unwrap() else {
645+
let MutationOccurred::No(()) = shards.prune_shards(request).unwrap() else {
650646
panic!("expected `MutationOccurred::No`");
651647
};
652-
assert_eq!(response.index_uid(), &index_uid);
653-
assert_eq!(response.source_id, source_id);
654648

655649
let request = PruneShardsRequest {
656650
index_uid: Some(index_uid.clone()),
657651
source_id: source_id.clone(),
658652
max_age: Some(50),
659653
max_count: None,
660654
};
661-
let MutationOccurred::No(response) = shards.prune_shards(request).unwrap() else {
655+
let MutationOccurred::No(()) = shards.prune_shards(request).unwrap() else {
662656
panic!("expected `MutationOccurred::No`");
663657
};
664-
assert_eq!(response.index_uid(), &index_uid);
665-
assert_eq!(response.source_id, source_id);
666658

667659
let current_timestamp = OffsetDateTime::now_utc().unix_timestamp();
668660
shards.shards.insert(
@@ -696,22 +688,18 @@ mod tests {
696688
max_age: Some(150),
697689
max_count: None,
698690
};
699-
let MutationOccurred::Yes(response) = shards.prune_shards(request).unwrap() else {
691+
let MutationOccurred::Yes(()) = shards.prune_shards(request).unwrap() else {
700692
panic!("expected `MutationOccurred::Yes`");
701693
};
702-
assert_eq!(response.index_uid(), &index_uid);
703-
assert_eq!(response.source_id, source_id);
704694

705695
let request = PruneShardsRequest {
706696
index_uid: Some(index_uid.clone()),
707697
source_id: source_id.clone(),
708698
max_age: Some(150),
709699
max_count: None,
710700
};
711-
let MutationOccurred::No(response) = shards.prune_shards(request).unwrap() else {
701+
let MutationOccurred::No(()) = shards.prune_shards(request).unwrap() else {
712702
panic!("expected `MutationOccurred::No`");
713703
};
714-
assert_eq!(response.index_uid(), &index_uid);
715-
assert_eq!(response.source_id, source_id);
716704
}
717705
}

quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ use quickwit_proto::metastore::{
5858
ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse,
5959
ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult,
6060
MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardsRequest,
61-
OpenShardsResponse, PruneShardsRequest, PruneShardsResponse, PublishSplitsRequest,
62-
ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest,
63-
UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
61+
OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest,
62+
StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest,
63+
UpdateSplitsDeleteOpstampResponse,
6464
};
6565
use quickwit_proto::types::{IndexId, IndexUid};
6666
use quickwit_storage::Storage;
@@ -892,15 +892,11 @@ impl MetastoreService for FileBackedMetastore {
892892
Ok(response)
893893
}
894894

895-
async fn prune_shards(
896-
&self,
897-
request: PruneShardsRequest,
898-
) -> MetastoreResult<PruneShardsResponse> {
895+
async fn prune_shards(&self, request: PruneShardsRequest) -> MetastoreResult<EmptyResponse> {
899896
let index_uid = request.index_uid().clone();
900-
let response = self
901-
.mutate(&index_uid, |index| index.prune_shards(request))
897+
self.mutate(&index_uid, |index| index.prune_shards(request))
902898
.await?;
903-
Ok(response)
899+
Ok(EmptyResponse {})
904900
}
905901

906902
async fn list_shards(&self, request: ListShardsRequest) -> MetastoreResult<ListShardsResponse> {

quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,8 @@ use quickwit_proto::metastore::{
4646
ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError,
4747
MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest,
4848
OpenShardSubresponse, OpenShardsRequest, OpenShardsResponse, PruneShardsRequest,
49-
PruneShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest,
50-
ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest,
51-
UpdateSplitsDeleteOpstampResponse,
49+
PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest,
50+
UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
5251
};
5352
use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, ShardId, SourceId};
5453
use sea_query::{Alias, Asterisk, Expr, Func, PostgresQueryBuilder, Query, UnionType};
@@ -1488,10 +1487,7 @@ impl MetastoreService for PostgresqlMetastore {
14881487
Ok(response)
14891488
}
14901489

1491-
async fn prune_shards(
1492-
&self,
1493-
request: PruneShardsRequest,
1494-
) -> MetastoreResult<PruneShardsResponse> {
1490+
async fn prune_shards(&self, request: PruneShardsRequest) -> MetastoreResult<EmptyResponse> {
14951491
const PRUNE_AGE_SHARDS_QUERY: &str = include_str!("queries/shards/prune_age.sql");
14961492
const PRUNE_COUNT_SHARDS_QUERY: &str = include_str!("queries/shards/prune_count.sql");
14971493

@@ -1513,12 +1509,7 @@ impl MetastoreService for PostgresqlMetastore {
15131509
.execute(&self.connection_pool)
15141510
.await?;
15151511
}
1516-
1517-
let response = PruneShardsResponse {
1518-
index_uid: request.index_uid,
1519-
source_id: request.source_id,
1520-
};
1521-
Ok(response)
1512+
Ok(EmptyResponse {})
15221513
}
15231514

15241515
// Index Template API

0 commit comments

Comments
 (0)