Skip to content

Commit 276a241

Browse files
fix rewrite bug
Signed-off-by: Little-Wallace <[email protected]>
1 parent 78337dd commit 276a241

File tree

5 files changed

+66
-47
lines changed

5 files changed

+66
-47
lines changed

src/engine.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::log_batch::{
2222
};
2323
use crate::memtable::{EntryIndex, MemTable};
2424
use crate::pipe_log::{GenericPipeLog, LogQueue, PipeLog, FILE_MAGIC_HEADER, VERSION};
25-
use crate::purge::PurgeManager;
25+
use crate::purge::{PurgeManager, RemovedMemtables};
2626
use crate::util::{HandyRwLock, HashMap, Worker};
2727
use crate::wal::{LogMsg, WalRunner, WriteTask};
2828
use crate::{codec, CacheStats, GlobalStats, Result};
@@ -136,6 +136,7 @@ where
136136

137137
fn apply_item<E, W>(
138138
memtables: &MemTableAccessor<E, W>,
139+
removed_memtables: &RemovedMemtables,
139140
item: LogItem<E>,
140141
queue: LogQueue,
141142
file_num: u64,
@@ -155,9 +156,8 @@ fn apply_item<E, W>(
155156
}
156157
}
157158
LogItemContent::Command(Command::Clean) => {
158-
if self.memtables.remove(item.raft_group_id).is_some() {
159-
self.purge_manager
160-
.remove_memtable(file_num, item.raft_group_id);
159+
if memtables.remove(item.raft_group_id).is_some() {
160+
removed_memtables.remove_memtable(file_num, item.raft_group_id);
161161
}
162162
}
163163
LogItemContent::Command(Command::Compact { index }) => {
@@ -183,8 +183,9 @@ where
183183
P: GenericPipeLog,
184184
{
185185
fn apply_to_memtable(&self, log_batch: &mut LogBatch<E, W>, queue: LogQueue, file_num: u64) {
186+
let removed = self.purge_manager.get_removed_memtables();
186187
for item in log_batch.items.drain(..) {
187-
apply_item(&self.memtables, item, queue, file_num);
188+
apply_item(&self.memtables, &removed, item, queue, file_num);
188189
}
189190
}
190191

@@ -208,14 +209,21 @@ where
208209
}
209210
let memtables = self.memtables.clone();
210211
let items = std::mem::replace(&mut log_batch.items, vec![]);
212+
let removed_memtables = self.purge_manager.get_removed_memtables();
211213
return Box::pin(async move {
212214
let (file_num, offset, tracker) = r.await?;
213215
if file_num > 0 {
214216
for mut item in items {
215217
if let LogItemContent::Entries(entries) = &mut item.content {
216218
entries.update_position(LogQueue::Append, file_num, offset, &tracker);
217219
}
218-
apply_item(&memtables, item, LogQueue::Append, file_num);
220+
apply_item(
221+
&memtables,
222+
&removed_memtables,
223+
item,
224+
LogQueue::Append,
225+
file_num,
226+
);
219227
}
220228
}
221229
return Ok(bytes);
@@ -282,7 +290,12 @@ where
282290
);
283291

284292
let cfg = Arc::new(cfg);
285-
let purge_manager = PurgeManager::new(cfg.clone(), memtables.clone(), pipe_log.clone());
293+
let purge_manager = PurgeManager::new(
294+
cfg.clone(),
295+
memtables.clone(),
296+
pipe_log.clone(),
297+
global_stats.clone(),
298+
);
286299
let (wal_sender, wal_receiver) = channel();
287300
let engine = Engine {
288301
cfg,

src/log_batch.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -591,12 +591,7 @@ where
591591
item.encode_to::<W>(&mut vec).unwrap();
592592
}
593593

594-
let compression_type = if vec.len() > COMPRESSION_SIZE {
595-
vec = lz4::encode_block(&vec[HEADER_LEN..], HEADER_LEN, 4);
596-
CompressionType::Lz4
597-
} else {
598-
CompressionType::None
599-
};
594+
let compression_type = CompressionType::None;
600595

601596
let checksum = crc32(&vec[8..]);
602597
vec.encode_u32_le(checksum).unwrap();
@@ -697,7 +692,7 @@ mod tests {
697692
),
698693
];
699694

700-
for item in items {
695+
for mut item in items {
701696
let mut encoded = vec![];
702697
item.encode_to::<Entry>(&mut encoded).unwrap();
703698
let mut s = encoded.as_slice();
@@ -728,9 +723,7 @@ mod tests {
728723
assert_eq!(batch, decoded_batch);
729724

730725
match &decoded_batch.items[0].content {
731-
LogItemContent::Entries(entries) => {
732-
assert_eq!(entries.encoded_size.get(), encoded_size)
733-
}
726+
LogItemContent::Entries(entries) => assert_eq!(entries.encoded_size, encoded_size),
734727
_ => unreachable!(),
735728
}
736729
}

src/memtable.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,8 @@ impl<E: Message + Clone, W: EntryExt<E>> MemTable<E, W> {
303303
self.entries_index[i + distance].base_offset = ei.base_offset;
304304
self.entries_index[i + distance].compression_type = ei.compression_type;
305305
self.entries_index[i + distance].batch_len = ei.batch_len;
306+
self.entries_index[i + distance].offset = ei.offset;
307+
self.entries_index[i + distance].len = ei.len;
306308
}
307309
compacted_rewrite_operations
308310
}

src/pipe_log.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -760,7 +760,7 @@ mod tests {
760760
fn write_to_log(
761761
log: &mut PipeLog,
762762
submitor: &mut CacheSubmitor,
763-
batch: &LogBatch<Entry, Entry>,
763+
batch: &mut LogBatch<Entry, Entry>,
764764
file_num: &mut u64,
765765
) {
766766
let mut entries_size = 0;
@@ -770,8 +770,8 @@ mod tests {
770770
let offset = log.append(LogQueue::Append, &content).unwrap();
771771
let tracker = submitor.get_cache_tracker(cur_file_num, offset);
772772
submitor.fill_chunk(entries_size);
773-
for item in &batch.items {
774-
if let LogItemContent::Entries(ref entries) = item.content {
773+
for item in &mut batch.items {
774+
if let LogItemContent::Entries(entries) = &mut item.content {
775775
entries.update_position(LogQueue::Append, cur_file_num, offset, &tracker);
776776
}
777777
}
@@ -809,9 +809,9 @@ mod tests {
809809
// After 4 batches are written into pipe log, no `CacheTask::NewChunk`
810810
// task should be triggered. However the last batch will trigger it.
811811
for i in 0..5 {
812-
let log_batch = get_1m_batch();
812+
let mut log_batch = get_1m_batch();
813813
let mut file_num = 0;
814-
write_to_log(&mut pipe_log, &mut submitor, &log_batch, &mut file_num);
814+
write_to_log(&mut pipe_log, &mut submitor, &mut log_batch, &mut file_num);
815815
log_batches.push(log_batch);
816816
let x = receiver.recv_timeout(Duration::from_millis(100));
817817
if i < 4 {
@@ -824,9 +824,9 @@ mod tests {
824824
// Write more 2 batches into pipe log. A `CacheTask::NewChunk` will be
825825
// emit on the second batch because log file is switched.
826826
for i in 5..7 {
827-
let log_batch = get_1m_batch();
827+
let mut log_batch = get_1m_batch();
828828
let mut file_num = 0;
829-
write_to_log(&mut pipe_log, &mut submitor, &log_batch, &mut file_num);
829+
write_to_log(&mut pipe_log, &mut submitor, &mut log_batch, &mut file_num);
830830
log_batches.push(log_batch);
831831
let x = receiver.recv_timeout(Duration::from_millis(100));
832832
if i < 6 {
@@ -840,9 +840,9 @@ mod tests {
840840
// `CacheTracker`s accociated in `EntryIndex`s are droped.
841841
drop(log_batches);
842842
for _ in 7..20 {
843-
let log_batch = get_1m_batch();
843+
let mut log_batch = get_1m_batch();
844844
let mut file_num = 0;
845-
write_to_log(&mut pipe_log, &mut submitor, &log_batch, &mut file_num);
845+
write_to_log(&mut pipe_log, &mut submitor, &mut log_batch, &mut file_num);
846846
drop(log_batch);
847847
assert!(receiver.recv_timeout(Duration::from_millis(100)).is_err());
848848
}

src/purge.rs

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,16 @@ use crate::pipe_log::{GenericPipeLog, LogQueue};
1212
use crate::util::HandyRwLock;
1313
use crate::{GlobalStats, Result};
1414

15+
#[derive(Clone, Default)]
16+
pub struct RemovedMemtables(Arc<Mutex<BinaryHeap<(Reverse<u64>, u64)>>>);
17+
18+
impl RemovedMemtables {
19+
pub fn remove_memtable(&self, file_num: u64, raft_group_id: u64) {
20+
let mut tables = self.0.lock().unwrap();
21+
tables.push((Reverse(file_num), raft_group_id));
22+
}
23+
}
24+
1525
// If a region has some very old raft logs less than this threshold,
1626
// rewrite them to clean stale log files ASAP.
1727
const REWRITE_ENTRY_COUNT_THRESHOLD: usize = 32;
@@ -31,8 +41,7 @@ where
3141
global_stats: Arc<GlobalStats>,
3242

3343
// Vector of (file_num, raft_group_id).
34-
#[allow(clippy::type_complexity)]
35-
removed_memtables: Arc<Mutex<BinaryHeap<(Reverse<u64>, u64)>>>,
44+
removed_memtables: RemovedMemtables,
3645

3746
// Only one thread can run `purge_expired_files` at a time.
3847
purge_mutex: Arc<Mutex<()>>,
@@ -115,9 +124,8 @@ where
115124
}
116125
}
117126

118-
pub fn remove_memtable(&self, file_num: u64, raft_group_id: u64) {
119-
let mut tables = self.removed_memtables.lock().unwrap();
120-
tables.push((Reverse(file_num), raft_group_id));
127+
pub fn get_removed_memtables(&self) -> RemovedMemtables {
128+
self.removed_memtables.clone()
121129
}
122130

123131
// Returns (`latest_needs_rewrite`, `latest_needs_force_compact`).
@@ -159,13 +167,16 @@ where
159167
assert!(latest_compact <= latest_rewrite);
160168
let mut log_batch = LogBatch::<E, W>::new();
161169

162-
while let Some(item) = self.removed_memtables.lock().unwrap().pop() {
163-
let (file_num, raft_id) = ((item.0).0, item.1);
164-
if file_num > latest_rewrite {
165-
self.removed_memtables.lock().unwrap().push(item);
166-
break;
170+
{
171+
let mut guard = self.removed_memtables.0.lock().unwrap();
172+
while let Some(item) = guard.pop() {
173+
let (file_num, raft_id) = ((item.0).0, item.1);
174+
if file_num > latest_rewrite {
175+
guard.push(item);
176+
break;
177+
}
178+
log_batch.clean_region(raft_id);
167179
}
168-
log_batch.clean_region(raft_id);
169180
}
170181

171182
let memtables = self.memtables.collect(|t| {
@@ -216,7 +227,6 @@ where
216227
Ok(())
217228
}
218229

219-
220230
fn rewrite_to_memtable(
221231
&self,
222232
log_batch: &mut LogBatch<E, W>,
@@ -227,7 +237,7 @@ where
227237
let memtable = self.memtables.get_or_insert(item.raft_group_id);
228238
match item.content {
229239
LogItemContent::Entries(entries_to_add) => {
230-
let entries_index = entries_to_add.entries_index.into_inner();
240+
let entries_index = entries_to_add.entries_index;
231241
memtable.wl().rewrite(entries_index, latest_rewrite);
232242
}
233243
LogItemContent::Kv(kv) => match kv.op_type {
@@ -294,16 +304,17 @@ mod tests {
294304
cfg.dir = dir.path().to_str().unwrap().to_owned();
295305
let engine = RaftLogEngine::new(cfg.clone());
296306

297-
engine.purge_manager().remove_memtable(3, 10);
298-
engine.purge_manager().remove_memtable(3, 9);
299-
engine.purge_manager().remove_memtable(3, 11);
300-
engine.purge_manager().remove_memtable(2, 9);
301-
engine.purge_manager().remove_memtable(4, 4);
302-
engine.purge_manager().remove_memtable(4, 3);
307+
let tables = engine.purge_manager().get_removed_memtables();
308+
tables.remove_memtable(3, 10);
309+
tables.remove_memtable(3, 9);
310+
tables.remove_memtable(3, 11);
311+
tables.remove_memtable(2, 9);
312+
tables.remove_memtable(4, 4);
313+
tables.remove_memtable(4, 3);
303314

304-
let mut tables = engine.purge_manager().removed_memtables.lock().unwrap();
315+
let mut guard = tables.0.lock().unwrap();
305316
for (file_num, raft_id) in vec![(2, 9), (3, 11), (3, 10), (3, 9), (4, 4), (4, 3)] {
306-
let item = tables.pop().unwrap();
317+
let item = guard.pop().unwrap();
307318
assert_eq!((item.0).0, file_num);
308319
assert_eq!(item.1, raft_id);
309320
}

0 commit comments

Comments
 (0)