Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
daff18f
Optimize the code start progress when enable-log-recycle == true.
LykxSassinator Nov 1, 2022
b37787b
Remove redundant implementation in pipe_log.rs
LykxSassinator Nov 1, 2022
bc967d4
Add extra abnormal cases for higher testing coverage.
LykxSassinator Nov 8, 2022
9f335fc
Refactor the implementation of preparing extra files for recycling wh…
LykxSassinator Nov 16, 2022
0fdda61
Merge branch 'master' into opt_recycle_init
LykxSassinator Nov 18, 2022
fe870ff
Refactor the management of files by spliting the original FileCollect…
LykxSassinator Nov 22, 2022
c52ac37
Supply extra annoatations.
LykxSassinator Nov 22, 2022
be393c3
Polish the codes according to comments.
LykxSassinator Nov 25, 2022
23327e3
Add basics for multi-directory.
LykxSassinator Nov 27, 2022
92b77e6
Refactor the whole structure of FileCollection.
LykxSassinator Nov 30, 2022
0390744
Polish the codes.
LykxSassinator Nov 30, 2022
6e2f8a9
Polish codes in `build_stale_file`.
LykxSassinator Nov 30, 2022
29c1f03
Polish codes according to `clippy` hints.
LykxSassinator Nov 30, 2022
179f89d
Polish codes according to comments.
LykxSassinator Dec 1, 2022
73c6de6
Polish codes.
LykxSassinator Dec 1, 2022
f6cf945
Polish code-style problems.
LykxSassinator Dec 1, 2022
286772b
Bugfix the strategy for purging files.
LykxSassinator Dec 1, 2022
e7bb948
Polish code styles.
LykxSassinator Dec 2, 2022
5d4b784
Polish codes according to comments.
LykxSassinator Dec 2, 2022
7e77c8b
Polish codes.
LykxSassinator Dec 2, 2022
2bafdad
Merge branch 'master' into opt_recycle_init
LykxSassinator Dec 5, 2022
2e3e219
Polish the implementation of split_by
LykxSassinator Dec 5, 2022
c9fe818
Polish codes according to comments.
LykxSassinator Dec 5, 2022
94e9157
Refine the corner case `test_start_with_stale_file_allocate_error`.
LykxSassinator Dec 8, 2022
deeaf38
Refine the abnormal case.
LykxSassinator Dec 8, 2022
6b583ff
Polish codes according to comments.
LykxSassinator Dec 8, 2022
3709a7c
Bugfix for generating stale files.
LykxSassinator Dec 8, 2022
536cddc
Update rust version.
LykxSassinator Jan 5, 2023
2dfc259
Polish codes according to latest comments.
LykxSassinator Jan 5, 2023
f83a66c
clean up abstractions
tabokie Jan 19, 2023
802b19d
Bugfix and polish codes according to the refactoring on FileMgr.
LykxSassinator Jan 29, 2023
4a659f3
Polish the test cases.
LykxSassinator Jan 30, 2023
2411d1e
Merge branch 'master' into opt_recycle_init
LykxSassinator Jan 30, 2023
58c2394
Polish codes according to comments.
LykxSassinator Jan 30, 2023
a7c1e46
Remove self-defined FailGuard.
LykxSassinator Jan 30, 2023
ba5cb34
Polish codes.
LykxSassinator Jan 30, 2023
36f438f
Polish codes and remove redundant clearing strategy when purging.
LykxSassinator Feb 2, 2023
156a5d9
Fix format-lint errors.
LykxSassinator Feb 2, 2023
cd11db7
Polish codes and Changelogs.
LykxSassinator Feb 3, 2023
26c5220
Polish doc style in ChangeLog.
LykxSassinator Feb 3, 2023
f04dda8
Supplement more boundary test cases for `purge_to`.
LykxSassinator Feb 8, 2023
05006be
Polish codes for the startup of FileSeq.
LykxSassinator Feb 8, 2023
459c127
Fix format errors.
LykxSassinator Feb 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

* Fix data loss caused by aborted rewrite operation. Downgrading to an earlier version without the fix may produce phantom Raft Groups or keys, i.e. never written but appear in queries.

### New Features

* Support preparing prefilled logs to enable log recycling when start-up.

## [0.3.0] - 2022-09-14

### Bug Fixes
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ clean:

## Format code in-place using rustfmt.
format:
cargo fmt --all
cargo ${TOOLCHAIN_ARGS} fmt --all

## Run clippy.
clippy:
Expand Down
24 changes: 24 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ pub struct Config {
///
/// Default: false
pub enable_log_recycle: bool,

/// Whether to prepare log files for recycling when start.
/// If `true`, batch empty log files will be prepared for recycling when
/// starting engine.
/// Only available for `enable-log-reycle` is true.
///
/// Default: false
pub prefill_for_recycle: bool,
}

impl Default for Config {
Expand All @@ -110,6 +118,7 @@ impl Default for Config {
purge_rewrite_garbage_ratio: 0.6,
memory_limit: None,
enable_log_recycle: false,
prefill_for_recycle: false,
};
// Test-specific configurations.
#[cfg(test)]
Expand Down Expand Up @@ -156,6 +165,11 @@ impl Config {
self.format_version
));
}
if !self.enable_log_recycle && self.prefill_for_recycle {
return Err(box_err!(
"prefill is not allowed when log recycle is disabled"
));
}
#[cfg(not(feature = "swap"))]
if self.memory_limit.is_some() {
warn!("memory-limit will be ignored because swap feature is disabled");
Expand Down Expand Up @@ -207,6 +221,7 @@ mod tests {
purge-threshold = "3MB"
format-version = 1
enable-log-recycle = false
prefill-for-recycle = false
"#;
let mut load: Config = toml::from_str(custom).unwrap();
assert_eq!(load.dir, "custom_dir");
Expand All @@ -233,6 +248,7 @@ mod tests {
target-file-size = "5000MB"
format-version = 2
enable-log-recycle = true
prefill-for-recycle = true
"#;
let soft_load: Config = toml::from_str(soft_error).unwrap();
let mut soft_sanitized = soft_load;
Expand All @@ -252,6 +268,14 @@ mod tests {
"#;
let mut cfg_load: Config = toml::from_str(recycle_error).unwrap();
assert!(cfg_load.sanitize().is_err());

let prefill_error = r#"
enable-log-recycle = false
prefill-for-recycle = true
format-version = 2
"#;
let mut cfg_load: Config = toml::from_str(prefill_error).unwrap();
assert!(cfg_load.sanitize().is_err());
}

#[test]
Expand Down
168 changes: 145 additions & 23 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ where
mod tests {
use super::*;
use crate::env::ObfuscatedFileSystem;
use crate::file_pipe_log::FileNameExt;
use crate::file_pipe_log::{parse_recycled_file_name, FileNameExt};
use crate::log_batch::AtomicGroupBuilder;
use crate::pipe_log::Version;
use crate::test_util::{generate_entries, PanicGuard};
Expand Down Expand Up @@ -1551,6 +1551,8 @@ mod tests {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
purge_threshold: ReadableSize(1),
format_version: Version::V1,
enable_log_recycle: false,
..Default::default()
};
// config with v2
Expand All @@ -1559,6 +1561,7 @@ mod tests {
target_file_size: ReadableSize(1),
purge_threshold: ReadableSize(1),
format_version: Version::V2,
enable_log_recycle: false,
..Default::default()
};
test_engine_ops(&cfg_v1, &cfg_v2);
Expand All @@ -1574,6 +1577,8 @@ mod tests {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
purge_threshold: ReadableSize(1),
format_version: Version::V1,
enable_log_recycle: false,
..Default::default()
};
// config with v2
Expand All @@ -1583,6 +1588,7 @@ mod tests {
purge_threshold: ReadableSize(1),
format_version: Version::V2,
enable_log_recycle: true,
prefill_for_recycle: true,
..Default::default()
};
test_engine_ops(&cfg_v1, &cfg_v2);
Expand Down Expand Up @@ -1948,26 +1954,38 @@ mod tests {
pub struct DeleteMonitoredFileSystem {
inner: ObfuscatedFileSystem,
append_metadata: Mutex<BTreeSet<u64>>,
recycled_metadata: Mutex<BTreeSet<u64>>,
}

impl DeleteMonitoredFileSystem {
fn new() -> Self {
Self {
inner: ObfuscatedFileSystem::default(),
append_metadata: Mutex::new(BTreeSet::new()),
recycled_metadata: Mutex::new(BTreeSet::new()),
}
}

fn update_metadata(&self, path: &Path, delete: bool) -> bool {
let id = FileId::parse_file_name(path.file_name().unwrap().to_str().unwrap()).unwrap();
if id.queue == LogQueue::Append {
if delete {
self.append_metadata.lock().unwrap().remove(&id.seq)
} else {
self.append_metadata.lock().unwrap().insert(id.seq)
let path = path.file_name().unwrap().to_str().unwrap();
let parse_append = FileId::parse_file_name(path);
let parse_recycled = parse_recycled_file_name(path);
match (parse_append, parse_recycled) {
(Some(id), None) if id.queue == LogQueue::Append => {
if delete {
self.append_metadata.lock().unwrap().remove(&id.seq)
} else {
self.append_metadata.lock().unwrap().insert(id.seq)
}
}
} else {
false
(None, Some(seq)) => {
if delete {
self.recycled_metadata.lock().unwrap().remove(&seq)
} else {
self.recycled_metadata.lock().unwrap().insert(seq)
}
}
_ => false,
}
}
}
Expand Down Expand Up @@ -2019,12 +2037,15 @@ mod tests {
if self.inner.exists_metadata(&path) {
return true;
}
let id = FileId::parse_file_name(path.as_ref().file_name().unwrap().to_str().unwrap())
.unwrap();
if id.queue == LogQueue::Append {
self.append_metadata.lock().unwrap().contains(&id.seq)
} else {
false
let path = path.as_ref().file_name().unwrap().to_str().unwrap();
let parse_append = FileId::parse_file_name(path);
let parse_recycled = parse_recycled_file_name(path);
match (parse_append, parse_recycled) {
(Some(id), None) if id.queue == LogQueue::Append => {
self.append_metadata.lock().unwrap().contains(&id.seq)
}
(None, Some(seq)) => self.recycled_metadata.lock().unwrap().contains(&seq),
_ => false,
}
}

Expand Down Expand Up @@ -2080,7 +2101,7 @@ mod tests {
&start
);

// Simulate stale metadata.
// Simulate recycled metadata.
for i in start / 2..start {
fs.append_metadata.lock().unwrap().insert(i);
}
Expand All @@ -2105,10 +2126,12 @@ mod tests {
purge_threshold: ReadableSize(100),
format_version: Version::V2,
enable_log_recycle: true,
prefill_for_recycle: true,
..Default::default()
};
let fs = Arc::new(DeleteMonitoredFileSystem::new());
let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap();
let recycled_start = *fs.recycled_metadata.lock().unwrap().iter().next().unwrap();
for rid in 1..=10 {
engine.append(rid, 1, 11, Some(&entry_data));
}
Expand All @@ -2123,23 +2146,31 @@ mod tests {
.must_rewrite_append_queue(Some(end - 1), None);
assert!(start < engine.file_span(LogQueue::Append).0);
assert_eq!(engine.file_count(Some(LogQueue::Append)), 1);
// no file have been physically deleted.
// Recycled files have been reused.
assert_eq!(
fs.append_metadata.lock().unwrap().iter().next().unwrap(),
&start
&(start + 20)
);
// reusing these files.
let recycled_start_1 = *fs.recycled_metadata.lock().unwrap().iter().next().unwrap();
assert!(recycled_start < recycled_start_1);
// Reuse these files.
for rid in 1..=5 {
engine.append(rid, 1, 11, Some(&entry_data));
}
let start_1 = *fs.append_metadata.lock().unwrap().iter().next().unwrap();
assert!(start < start_1);
assert!(start <= start_1);
let recycled_start_2 = *fs.recycled_metadata.lock().unwrap().iter().next().unwrap();
assert!(recycled_start_1 < recycled_start_2);

// reopen the engine and validate the stale files are removed
// Reopen the engine and validate the recycled files are reserved
let file_count = fs.inner.file_count();
let engine = engine.reopen();
assert_eq!(fs.inner.file_count(), engine.file_count(None));
assert_eq!(file_count, fs.inner.file_count());
assert!(file_count > engine.file_count(None));
let start_2 = *fs.append_metadata.lock().unwrap().iter().next().unwrap();
assert!(start_1 < start_2);
assert_eq!(start_1, start_2);
let recycled_start_3 = *fs.recycled_metadata.lock().unwrap().iter().next().unwrap();
assert_eq!(recycled_start_2, recycled_start_3);
}

#[test]
Expand Down Expand Up @@ -2250,6 +2281,97 @@ mod tests {
}
}

#[test]
fn test_start_engine_with_resize_recycle_capacity() {
let dir = tempfile::Builder::new()
.prefix("test_start_engine_with_resize_recycle_capacity")
.tempdir()
.unwrap();
let path = dir.path().to_str().unwrap();
let file_system = Arc::new(DeleteMonitoredFileSystem::new());
let entry_data = vec![b'x'; 512];

// Case 1: start an engine with no-recycle.
let cfg = Config {
dir: path.to_owned(),
enable_log_recycle: false,
..Default::default()
};
let engine = RaftLogEngine::open_with_file_system(cfg, file_system.clone()).unwrap();
let (start, _) = engine.file_span(LogQueue::Append);
// Only one valid file left, the last one => active_file.
assert_eq!(engine.file_count(Some(LogQueue::Append)), 1);
assert_eq!(file_system.inner.file_count(), engine.file_count(None));
// Append data.
for rid in 1..=5 {
engine.append(rid, 1, 10, Some(&entry_data));
}
assert_eq!(engine.file_span(LogQueue::Append).0, start);
assert_eq!(file_system.inner.file_count(), engine.file_count(None));
drop(engine);

// Case 2: restart the engine with a common size of recycling capacity.
let cfg = Config {
dir: path.to_owned(),
target_file_size: ReadableSize(1),
purge_threshold: ReadableSize(80), // common size of capacity
enable_log_recycle: true,
prefill_for_recycle: true,
..Default::default()
};
let engine =
RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap();
let (start, end) = engine.file_span(LogQueue::Append);
// Only one valid file left, the last one => active_file.
assert_eq!(start, end);
let recycled_count = file_system.inner.file_count() - engine.file_count(None);
assert!(recycled_count > 0);
// Append data. Several recycled files have been reused.
for rid in 1..=5 {
engine.append(rid, 10, 20, Some(&entry_data));
}
assert_eq!(engine.file_span(LogQueue::Append).0, start);
assert!(recycled_count > file_system.inner.file_count() - engine.file_count(None));
let (start, end) = engine.file_span(LogQueue::Append);
let recycled_count = file_system.inner.file_count() - engine.file_count(None);
drop(engine);

// Case 3: restart the engine with a smaller capacity. Redundant recycled files
// will be cleared.
let cfg_v2 = Config {
target_file_size: ReadableSize(1),
purge_threshold: ReadableSize(50),
..cfg
};
let engine =
RaftLogEngine::open_with_file_system(cfg_v2.clone(), file_system.clone()).unwrap();
assert_eq!(engine.file_span(LogQueue::Append), (start, end));
assert!(recycled_count > file_system.inner.file_count() - engine.file_count(None));
// Recycled files have filled the LogQueue::Append, purge_expired_files won't
// truely remove files from it.
engine.purge_expired_files().unwrap();
assert_eq!(engine.file_span(LogQueue::Append), (start, end));
for rid in 1..=10 {
engine.append(rid, 20, 31, Some(&entry_data));
}
assert!(engine.file_span(LogQueue::Append).1 > end);
let engine = engine.reopen();
assert!(recycled_count > file_system.inner.file_count() - engine.file_count(None));
drop(engine);

// Case 4: restart the engine without log recycling. Recycled logs should be
// cleared.
let cfg_v3 = Config {
target_file_size: ReadableSize::kb(2),
purge_threshold: ReadableSize::kb(100),
enable_log_recycle: false,
prefill_for_recycle: false,
..cfg_v2
};
let engine = RaftLogEngine::open_with_file_system(cfg_v3, file_system.clone()).unwrap();
assert_eq!(file_system.inner.file_count(), engine.file_count(None));
}

#[test]
fn test_rewrite_atomic_group() {
let dir = tempfile::Builder::new()
Expand Down
1 change: 1 addition & 0 deletions src/env/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ impl FileSystem for DefaultFileSystem {
}

fn delete<P: AsRef<Path>>(&self, path: P) -> IoResult<()> {
fail_point!("default_fs::delete_skipped", |_| { Ok(()) });
std::fs::remove_file(path)
}

Expand Down
6 changes: 6 additions & 0 deletions src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ pub trait FileSystem: Send + Sync {
self.rename(src_path, dst_path)
}

#[inline]
fn reuse_and_open<P: AsRef<Path>>(&self, src_path: P, dst_path: P) -> Result<Self::Handle> {
self.reuse(src_path.as_ref(), dst_path.as_ref())?;
self.open(dst_path)
}

/// Deletes user implemented metadata associated with `path`. Returns
/// `true` if any metadata is deleted.
///
Expand Down
Loading