Skip to content

keep knowledge of ongoing merges across merge pipelines #5633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
typo
  • Loading branch information
trinity-1686a committed Feb 5, 2025
commit ae99e044f9fa139e72f05987cc06cd31554b7ef9
37 changes: 19 additions & 18 deletions quickwit/quickwit-common/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,40 @@ use std::sync::{Arc, Mutex};

use census::{Inventory, TrackedObject as InventoredObject};

pub type TrackedObject<T> = InventoredObject<RecordUnacknoledgedDrop<T>>;
pub type TrackedObject<T> = InventoredObject<RecordUnacknowledgedDrop<T>>;

#[derive(Clone)]
pub struct Tracker<T> {
inner_inventory: Inventory<RecordUnacknoledgedDrop<T>>,
unacknoledged_drop_receiver: Arc<Mutex<Receiver<T>>>,
inner_inventory: Inventory<RecordUnacknowledgedDrop<T>>,
unacknowledged_drop_receiver: Arc<Mutex<Receiver<T>>>,
return_channel: Sender<T>,
}

#[derive(Debug)]
pub struct RecordUnacknoledgedDrop<T> {
pub struct RecordUnacknowledgedDrop<T> {
// safety: this is always kept initialized except after Self::drop, where we move that
// that value away to either send it through the return channel, or drop it manually
inner: MaybeUninit<T>,
acknoledged: AtomicBool,
acknowledged: AtomicBool,
return_channel: Sender<T>,
}

impl<T> RecordUnacknoledgedDrop<T> {
pub fn acknoledge(&self) {
self.acknoledged.store(true, Ordering::Relaxed);
impl<T> RecordUnacknowledgedDrop<T> {
pub fn acknowledge(&self) {
self.acknowledged.store(true, Ordering::Relaxed);
}

pub fn untracked(value: T) -> Self {
let (sender, _receiver) = channel();
RecordUnacknoledgedDrop {
RecordUnacknowledgedDrop {
inner: MaybeUninit::new(value),
acknoledged: true.into(),
acknowledged: true.into(),
return_channel: sender,
}
}
}

impl<T> Deref for RecordUnacknoledgedDrop<T> {
impl<T> Deref for RecordUnacknowledgedDrop<T> {
type Target = T;
fn deref(&self) -> &T {
unsafe {
Expand All @@ -49,15 +49,15 @@ impl<T> Deref for RecordUnacknoledgedDrop<T> {
}
}

impl<T> Drop for RecordUnacknoledgedDrop<T> {
impl<T> Drop for RecordUnacknowledgedDrop<T> {
fn drop(&mut self) {
let item = unsafe {
// safety: see struct definition. Additionally, we don't touch to self.inner
// after this point so there is no risk of making a 2nd copy and cause a
// double-free
self.inner.assume_init_read()
};
if !*self.acknoledged.get_mut() {
if !*self.acknowledged.get_mut() {
// if send fails, no one cared about getting that notification, it's fine to
// drop item
let _ = self.return_channel.send(item);
Expand All @@ -76,7 +76,7 @@ impl<T> Tracker<T> {
let (sender, receiver) = channel();
Tracker {
inner_inventory: Inventory::new(),
unacknoledged_drop_receiver: Arc::new(Mutex::new(receiver)),
unacknowledged_drop_receiver: Arc::new(Mutex::new(receiver)),
return_channel: sender,
}
}
Expand All @@ -89,7 +89,8 @@ impl<T> Tracker<T> {
/// Once this return true, it will stay that way until [Tracker::track] or [Tracker::clone] are
/// called.
pub fn safe_to_recreate(&self) -> bool {
Arc::strong_count(&self.unacknoledged_drop_receiver) == 1 && self.inner_inventory.len() == 0
Arc::strong_count(&self.unacknowledged_drop_receiver) == 1
&& self.inner_inventory.len() == 0
}

pub fn list_ongoing(&self) -> Vec<TrackedObject<T>> {
Expand All @@ -98,17 +99,17 @@ impl<T> Tracker<T> {

pub fn take_dead(&self) -> Vec<T> {
let mut res = Vec::new();
let receiver = self.unacknoledged_drop_receiver.lock().unwrap();
let receiver = self.unacknowledged_drop_receiver.lock().unwrap();
while let Ok(dead_entry) = receiver.try_recv() {
res.push(dead_entry);
}
res
}

pub fn track(&self, value: T) -> TrackedObject<T> {
self.inner_inventory.track(RecordUnacknoledgedDrop {
self.inner_inventory.track(RecordUnacknowledgedDrop {
inner: MaybeUninit::new(value),
acknoledged: false.into(),
acknowledged: false.into(),
return_channel: self.return_channel.clone(),
})
}
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/merge_policy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ pub mod tests {
}
let merged_split = fake_merge(merge_policy, merge_op.splits_as_slice());
split_index.insert(merged_split.split_id().to_string(), merged_split.clone());
merge_op.acknoledge();
merge_op.acknowledge();
merged_split
}

Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-janitor/src/actors/delete_task_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use async_trait::async_trait;
use itertools::Itertools;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
use quickwit_common::extract_time_range;
use quickwit_common::tracker::RecordUnacknoledgedDrop;
use quickwit_common::tracker::RecordUnacknowledgedDrop;
use quickwit_common::uri::Uri;
use quickwit_doc_mapper::tag_pruning::extract_tags_from_query;
use quickwit_indexing::actors::{schedule_merge, MergeSchedulerService, MergeSplitDownloader};
Expand Down Expand Up @@ -84,7 +84,7 @@ pub struct DeleteTaskPlanner {
/// a merge operation is dropped after the publish of the split that underwent
/// the delete operation.
/// The inventory is used to avoid sending twice the same delete operation.
ongoing_delete_operations_inventory: Inventory<RecordUnacknoledgedDrop<MergeOperation>>,
ongoing_delete_operations_inventory: Inventory<RecordUnacknowledgedDrop<MergeOperation>>,
}

#[async_trait]
Expand Down Expand Up @@ -198,7 +198,7 @@ impl DeleteTaskPlanner {
info!(delete_operation=?delete_operation, "planned delete operation");
let tracked_delete_operation = self
.ongoing_delete_operations_inventory
.track(RecordUnacknoledgedDrop::untracked(delete_operation));
.track(RecordUnacknowledgedDrop::untracked(delete_operation));
schedule_merge(
&self.merge_scheduler_service,
tracked_delete_operation,
Expand Down