Skip to content

Commit b4d4ece

Browse files
committed
Add integration test
1 parent 75ef89c commit b4d4ece

File tree

9 files changed

+261
-97
lines changed

9 files changed

+261
-97
lines changed

quickwit/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-config/src/source_config/mod.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -264,20 +264,26 @@ pub enum FileSourceMessageType {
264264
pub struct FileSourceSqs {
265265
pub queue_url: String,
266266
pub message_type: FileSourceMessageType,
267-
#[serde(default = "default_deduplication_window_duration_sec")]
268-
pub deduplication_window_duration_sec: u32,
267+
#[serde(default = "default_deduplication_window_duration_secs")]
268+
pub deduplication_window_duration_secs: u32,
269269
#[serde(default = "default_deduplication_window_max_messages")]
270270
pub deduplication_window_max_messages: u32,
271+
#[serde(default = "default_checkpoint_cleanup_interval_secs")]
272+
pub checkpoint_cleanup_interval_secs: u32,
271273
}
272274

273-
fn default_deduplication_window_duration_sec() -> u32 {
275+
fn default_deduplication_window_duration_secs() -> u32 {
274276
3600
275277
}
276278

277279
fn default_deduplication_window_max_messages() -> u32 {
278280
100_000
279281
}
280282

283+
fn default_checkpoint_cleanup_interval_secs() -> u32 {
284+
60
285+
}
286+
281287
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
282288
#[serde(tag = "type", rename_all = "snake_case")]
283289
pub enum FileSourceNotification {
@@ -903,8 +909,10 @@ mod tests {
903909
queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/queue-name"
904910
.to_string(),
905911
message_type: FileSourceMessageType::S3Notification,
906-
deduplication_window_duration_sec: default_deduplication_window_duration_sec(),
912+
deduplication_window_duration_secs: default_deduplication_window_duration_secs(
913+
),
907914
deduplication_window_max_messages: default_deduplication_window_max_messages(),
915+
checkpoint_cleanup_interval_secs: default_checkpoint_cleanup_interval_secs()
908916
})),
909917
);
910918
let file_params_reserialized = serde_json::to_value(&file_params_deserialized).unwrap();
@@ -914,8 +922,9 @@ mod tests {
914922
"type": "sqs",
915923
"queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/queue-name",
916924
"message_type": "s3_notification",
917-
"deduplication_window_duration_sec": default_deduplication_window_duration_sec(),
918-
"deduplication_window_max_messages": default_deduplication_window_max_messages()
925+
"deduplication_window_duration_secs": default_deduplication_window_duration_secs(),
926+
"deduplication_window_max_messages": default_deduplication_window_max_messages(),
927+
"checkpoint_cleanup_interval_secs": default_checkpoint_cleanup_interval_secs(),
919928
}]})
920929
);
921930
}

quickwit/quickwit-indexing/src/source/file_source.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,8 +435,9 @@ mod localstack_tests {
435435
FileSourceParams::Notifications(FileSourceNotification::Sqs(FileSourceSqs {
436436
queue_url,
437437
message_type: FileSourceMessageType::RawUri,
438-
deduplication_window_duration_sec: 100,
438+
deduplication_window_duration_secs: 100,
439439
deduplication_window_max_messages: 100,
440+
checkpoint_cleanup_interval_secs: 60,
440441
}));
441442
let source_config = SourceConfig::for_test(
442443
"test-file-source-sqs-notifications",

quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
use std::fmt;
2121
use std::sync::Arc;
22-
use std::time::{Duration, Instant};
22+
use std::time::Duration;
2323

2424
use itertools::Itertools;
2525
use quickwit_actors::{ActorExitStatus, Mailbox};
@@ -98,20 +98,18 @@ impl QueueCoordinator {
9898
message_type: MessageType,
9999
shard_max_age: Option<u32>,
100100
shard_max_count: Option<u32>,
101+
shard_pruning_interval: Duration,
101102
) -> Self {
102103
Self {
103-
shared_state: QueueSharedState {
104-
metastore: source_runtime.metastore,
105-
source_id: source_runtime.pipeline_id.source_id.clone(),
106-
index_uid: source_runtime.pipeline_id.index_uid.clone(),
107-
reacquire_grace_period: Duration::from_secs(
108-
2 * source_runtime.indexing_setting.commit_timeout_secs as u64,
109-
),
110-
last_initiated_pruning: Instant::now(),
111-
max_age: shard_max_age,
112-
max_count: shard_max_count,
113-
pruning_interval: Duration::from_secs(60),
114-
},
104+
shared_state: QueueSharedState::new(
105+
source_runtime.metastore,
106+
source_runtime.pipeline_id.index_uid.clone(),
107+
source_runtime.pipeline_id.source_id.clone(),
108+
Duration::from_secs(2 * source_runtime.indexing_setting.commit_timeout_secs as u64),
109+
shard_max_age,
110+
shard_max_count,
111+
shard_pruning_interval,
112+
),
115113
local_state: QueueLocalState::default(),
116114
pipeline_id: source_runtime.pipeline_id,
117115
source_type: source_runtime.source_config.source_type(),
@@ -143,8 +141,9 @@ impl QueueCoordinator {
143141
source_runtime,
144142
Arc::new(queue),
145143
message_type,
146-
Some(config.deduplication_window_duration_sec),
144+
Some(config.deduplication_window_duration_secs),
147145
Some(config.deduplication_window_max_messages),
146+
Duration::from_secs(config.checkpoint_cleanup_interval_secs as u64),
148147
))
149148
}
150149

quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs

Lines changed: 70 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1919

2020
use std::collections::BTreeMap;
21-
use std::time::{Duration, Instant};
21+
use std::sync::Arc;
22+
use std::time::Duration;
2223

2324
use anyhow::{bail, Context};
2425
use quickwit_metastore::checkpoint::PartitionId;
@@ -34,35 +35,78 @@ use super::message::PreProcessedMessage;
3435

3536
#[derive(Clone)]
3637
pub struct QueueSharedState {
37-
pub metastore: MetastoreServiceClient,
38+
metastore: MetastoreServiceClient,
3839
pub index_uid: IndexUid,
3940
pub source_id: String,
4041
/// Duration after which the processing of a shard is considered stale and
4142
/// should be reacquired
42-
pub reacquire_grace_period: Duration,
43-
pub max_age: Option<u32>,
44-
pub max_count: Option<u32>,
45-
pub last_initiated_pruning: Instant,
46-
pub pruning_interval: Duration,
43+
reacquire_grace_period: Duration,
44+
_cleanup_handle: Arc<()>,
4745
}
4846

4947
impl QueueSharedState {
50-
async fn clean_partitions(&self) {
51-
if self.max_count.is_none() && self.max_age.is_none() {
52-
return;
48+
/// Create a shared state service and runs a cleanup task that prunes shards
49+
/// in the background
50+
pub fn new(
51+
metastore: MetastoreServiceClient,
52+
index_uid: IndexUid,
53+
source_id: String,
54+
reacquire_grace_period: Duration,
55+
max_age: Option<u32>,
56+
max_count: Option<u32>,
57+
pruning_interval: Duration,
58+
) -> Self {
59+
let cleanup_handle = Arc::new(());
60+
tokio::spawn(Self::run_cleanup_task(
61+
metastore.clone(),
62+
index_uid.clone(),
63+
source_id.clone(),
64+
max_age,
65+
max_count,
66+
pruning_interval,
67+
cleanup_handle.clone(),
68+
));
69+
Self {
70+
metastore,
71+
index_uid,
72+
source_id,
73+
reacquire_grace_period,
74+
_cleanup_handle: cleanup_handle,
5375
}
54-
let result = self
55-
.metastore
56-
.prune_shards(PruneShardsRequest {
57-
index_uid: Some(self.index_uid.clone()),
58-
source_id: self.source_id.clone(),
59-
max_age: self.max_age,
60-
max_count: self.max_count,
61-
})
62-
.await;
63-
if let Err(err) = result {
64-
error!(error = ?err, "failed to prune shards");
76+
}
77+
78+
async fn run_cleanup_task(
79+
metastore: MetastoreServiceClient,
80+
index_uid: IndexUid,
81+
source_id: String,
82+
max_age: Option<u32>,
83+
max_count: Option<u32>,
84+
pruning_interval: Duration,
85+
owner_handle: Arc<()>,
86+
) {
87+
if max_count.is_none() && max_age.is_none() {
88+
return;
6589
}
90+
tokio::spawn(async move {
91+
let mut interval = tokio::time::interval(pruning_interval);
92+
loop {
93+
interval.tick().await;
94+
if Arc::strong_count(&owner_handle) == 1 {
95+
break;
96+
}
97+
let result: Result<_, _> = metastore
98+
.prune_shards(PruneShardsRequest {
99+
index_uid: Some(index_uid.clone()),
100+
source_id: source_id.clone(),
101+
max_age,
102+
max_count,
103+
})
104+
.await;
105+
if let Err(err) = result {
106+
error!(error = ?err, "failed to prune shards");
107+
}
108+
}
109+
});
66110
}
67111

68112
/// Tries to acquire the ownership for the provided messages from the global
@@ -75,13 +119,6 @@ impl QueueSharedState {
75119
publish_token: &str,
76120
partitions: Vec<PartitionId>,
77121
) -> anyhow::Result<Vec<(PartitionId, Position)>> {
78-
if self.last_initiated_pruning.elapsed() > self.pruning_interval {
79-
let self_cloned = self.clone();
80-
tokio::spawn(async move {
81-
self_cloned.clean_partitions().await;
82-
});
83-
self.last_initiated_pruning = Instant::now();
84-
}
85122
let open_shard_subrequests = partitions
86123
.iter()
87124
.enumerate()
@@ -323,10 +360,7 @@ pub mod shared_state_for_tests {
323360
index_uid,
324361
source_id: "test-queue-src".to_string(),
325362
reacquire_grace_period: Duration::from_secs(10),
326-
last_initiated_pruning: Instant::now(),
327-
max_age: None,
328-
max_count: None,
329-
pruning_interval: Duration::from_secs(10),
363+
_cleanup_handle: Arc::new(()),
330364
}
331365
}
332366
}
@@ -378,10 +412,7 @@ mod tests {
378412
index_uid,
379413
source_id: "test-sqs-source".to_string(),
380414
reacquire_grace_period: Duration::from_secs(10),
381-
last_initiated_pruning: Instant::now(),
382-
max_age: None,
383-
max_count: None,
384-
pruning_interval: Duration::from_secs(10),
415+
_cleanup_handle: Arc::new(()),
385416
};
386417

387418
let aquired = shared_state
@@ -411,10 +442,7 @@ mod tests {
411442
index_uid,
412443
source_id: "test-sqs-source".to_string(),
413444
reacquire_grace_period: Duration::from_secs(10),
414-
last_initiated_pruning: Instant::now(),
415-
max_age: None,
416-
max_count: None,
417-
pruning_interval: Duration::from_secs(10),
445+
_cleanup_handle: Arc::new(()),
418446
};
419447

420448
let acquired = shared_state
@@ -444,10 +472,7 @@ mod tests {
444472
index_uid,
445473
source_id: "test-sqs-source".to_string(),
446474
reacquire_grace_period: Duration::from_secs(10),
447-
last_initiated_pruning: Instant::now(),
448-
max_age: None,
449-
max_count: None,
450-
pruning_interval: Duration::from_secs(10),
475+
_cleanup_handle: Arc::new(()),
451476
};
452477

453478
let aquired = shared_state
@@ -481,10 +506,7 @@ mod tests {
481506
index_uid,
482507
source_id: "test-sqs-source".to_string(),
483508
reacquire_grace_period: Duration::from_secs(10),
484-
last_initiated_pruning: Instant::now(),
485-
max_age: None,
486-
max_count: None,
487-
pruning_interval: Duration::from_secs(10),
509+
_cleanup_handle: Arc::new(()),
488510
};
489511

490512
let checkpointed_msg = checkpoint_messages(&mut shared_state, "token1", source_messages)

0 commit comments

Comments
 (0)