Skip to content

Commit 9dc2d9c

Browse files
authored
feat(migration): configurable concurrency (#267)
1 parent abe1856 commit 9dc2d9c

File tree

11 files changed

+172
-22
lines changed

11 files changed

+172
-22
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
251007.0
1+
251008.0

crates/cluster/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ sharding = { path = "../sharding" }
1919
derive_more = { workspace = true, features = ["as_ref", "display", "from", "into", "try_from"] }
2020
derive-where = { workspace = true }
2121
thiserror = { workspace = true }
22+
strum = { workspace = true , features = ["derive"] }
2223

2324
# General purpose
2425
base64.workspace = true

crates/cluster/src/settings.rs

Lines changed: 159 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use {
44
crate::smart_contract,
55
serde::{Deserialize, Serialize},
66
std::{ops::RangeInclusive, time::Duration},
7+
strum::{EnumDiscriminants, FromRepr, IntoDiscriminant, IntoStaticStr},
78
time::OffsetDateTime,
89
};
910

@@ -41,6 +42,21 @@ pub struct Settings {
4142
/// success rate of storage operations during a data migration process, due
4243
/// to `KeyspaceVersionMismatch` errors.
4344
pub clock_skew: Duration,
45+
46+
/// Specifies how many shards are allowed to be pulled by a
47+
/// [`NodeOperator`] at the same time during data migration.
48+
pub migration_concurrency: u16,
49+
}
50+
51+
impl Default for Settings {
52+
fn default() -> Self {
53+
Self {
54+
max_node_operator_data_bytes: 4096,
55+
event_propagation_latency: Duration::from_secs(5),
56+
clock_skew: Duration::from_millis(500),
57+
migration_concurrency: 10,
58+
}
59+
}
4460
}
4561

4662
impl Settings {
@@ -58,6 +74,22 @@ impl Settings {
5874
) -> RangeInclusive<OffsetDateTime> {
5975
time - self.clock_skew..=time + self.clock_skew
6076
}
77+
78+
fn extra(&self) -> [Setting; 3] {
79+
[
80+
Setting::EventPropagationLatency(self.event_propagation_latency),
81+
Setting::ClockSkew(self.clock_skew),
82+
Setting::MigrationConcurrency(self.migration_concurrency),
83+
]
84+
}
85+
86+
fn apply_extra_setting(&mut self, setting: Setting) {
87+
match setting {
88+
Setting::EventPropagationLatency(setting) => self.event_propagation_latency = setting,
89+
Setting::ClockSkew(setting) => self.clock_skew = setting,
90+
Setting::MigrationConcurrency(setting) => self.migration_concurrency = setting,
91+
};
92+
}
6193
}
6294

6395
impl TryFrom<Settings> for smart_contract::Settings {
@@ -95,13 +127,13 @@ impl TryFrom<smart_contract::Settings> for Settings {
95127
let schema_version = bytes[0];
96128
let bytes = &bytes[1..];
97129

98-
let extra = match schema_version {
99-
0 => postcard::from_bytes::<ExtraV0>(bytes),
130+
let mut settings = match schema_version {
131+
0 => postcard::from_bytes::<ExtraV0>(bytes).map(Settings::from),
132+
1 => postcard::from_bytes::<ExtraV1>(bytes).map(Settings::from),
100133
ver => return Err(TryFromSmartContractError::UnknownSchemaVersion(ver)),
101134
}
102135
.map_err(TryFromSmartContractError::from_postcard)?;
103136

104-
let mut settings = Settings::from(extra);
105137
settings.max_node_operator_data_bytes = sc_settings.max_node_operator_data_bytes;
106138

107139
Ok(settings)
@@ -142,7 +174,108 @@ impl From<ExtraV0> for Settings {
142174
extra.event_propagation_latency_ms.into(),
143175
),
144176
clock_skew: Duration::from_millis(extra.clock_skew_ms.into()),
177+
migration_concurrency: 10,
178+
}
179+
}
180+
}
181+
182+
// NOTE: The on-chain serialization is non self-describing!
183+
// This `struct` can not be changed, a `struct` with a new schema version should
184+
// be created instead.
185+
#[derive(Clone, Debug, Serialize, Deserialize)]
186+
pub(crate) struct ExtraV1 {
187+
// NOTE: Or
188+
encoded: Vec<Vec<u8>>,
189+
}
190+
191+
#[derive(EnumDiscriminants)]
192+
#[strum_discriminants(name(SettingId))]
193+
#[strum_discriminants(derive(IntoStaticStr, FromRepr))]
194+
#[repr(u8)]
195+
enum Setting {
196+
EventPropagationLatency(Duration) = 0,
197+
ClockSkew(Duration) = 1,
198+
MigrationConcurrency(u16) = 2,
199+
}
200+
201+
impl SettingId {
202+
fn from_idx(idx: usize) -> Option<Self> {
203+
Self::from_repr(u8::try_from(idx).ok()?)
204+
}
205+
}
206+
207+
impl Setting {
208+
fn encode(&self) -> Vec<u8> {
209+
let encode_duration = |value: &Duration| {
210+
u32::try_from(value.as_millis())
211+
.unwrap_or(u32::MAX)
212+
.to_be_bytes()
213+
.into()
214+
};
215+
216+
match self {
217+
Self::EventPropagationLatency(latency) => encode_duration(latency),
218+
Self::ClockSkew(skew) => encode_duration(skew),
219+
Self::MigrationConcurrency(concurrency) => concurrency.to_be_bytes().into(),
220+
}
221+
}
222+
223+
fn decode(id: SettingId, value: &[u8]) -> Option<Self> {
224+
let decode_duration = || {
225+
let millis = u32::from_be_bytes(value.try_into().ok()?);
226+
Some(Duration::from_millis(millis.into()))
227+
};
228+
229+
let decode_u16 = || Some(u16::from_be_bytes(value.try_into().ok()?));
230+
231+
Some(match id {
232+
SettingId::EventPropagationLatency => Self::EventPropagationLatency(decode_duration()?),
233+
SettingId::ClockSkew => Self::ClockSkew(decode_duration()?),
234+
SettingId::MigrationConcurrency => Self::MigrationConcurrency(decode_u16()?),
235+
})
236+
}
237+
}
238+
239+
impl From<Settings> for ExtraV1 {
240+
fn from(settings: Settings) -> Self {
241+
let extra_settings = settings.extra();
242+
243+
let highest_idx = extra_settings
244+
.iter()
245+
.map(|setting| setting.discriminant() as usize)
246+
.max()
247+
.unwrap_or_default();
248+
249+
let mut encoded = vec![vec![]; highest_idx + 1];
250+
251+
for setting in settings.extra() {
252+
encoded[setting.discriminant() as usize] = setting.encode();
145253
}
254+
255+
ExtraV1 { encoded }
256+
}
257+
}
258+
259+
impl From<ExtraV1> for Settings {
260+
fn from(extra: ExtraV1) -> Self {
261+
let mut settings = Settings::default();
262+
263+
for (idx, value) in extra.encoded.iter().enumerate() {
264+
let Some(id) = SettingId::from_idx(idx) else {
265+
tracing::warn!("Unknown SettingId({idx}), ignoring");
266+
continue;
267+
};
268+
269+
let Some(setting) = Setting::decode(id, value) else {
270+
let name: &'static str = id.into();
271+
tracing::warn!("Invalid value `{value:?}` for `{name}` setting, ignoring");
272+
continue;
273+
};
274+
275+
settings.apply_extra_setting(setting);
276+
}
277+
278+
settings
146279
}
147280
}
148281

@@ -177,3 +310,26 @@ impl TryFromSmartContractError {
177310
Self::Codec(format!("{err:?}"))
178311
}
179312
}
313+
314+
#[cfg(test)]
315+
mod test {
316+
use super::*;
317+
318+
#[test]
319+
fn extra_v1() {
320+
let max_node_operator_data_bytes = 10000;
321+
322+
let expected_settings = Settings {
323+
max_node_operator_data_bytes,
324+
event_propagation_latency: Duration::from_secs(42),
325+
clock_skew: Duration::from_secs(10),
326+
migration_concurrency: 1000,
327+
};
328+
329+
let mut settings: Settings = ExtraV1::from(expected_settings).into();
330+
// being set outside of `From` impl
331+
settings.max_node_operator_data_bytes = max_node_operator_data_bytes;
332+
333+
assert_eq!(expected_settings, settings);
334+
}
335+
}

crates/migration/src/manager/mod.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,6 @@ pub trait Config:
5050
&self,
5151
node: &'a Self::Node,
5252
) -> &'a Self::OutboundReplicaConnection;
53-
54-
/// Specifies how many shards migration [`Manager`] is allowed to transfer
55-
/// at the same time.
56-
fn concurrency(&self) -> usize;
5753
}
5854

5955
/// WCN Migration Manager.
@@ -249,7 +245,7 @@ where
249245
})
250246
.pipe(stream::iter)
251247
.for_each_concurrent(
252-
Some(self.manager.config.concurrency()),
248+
Some(cluster_view.settings().migration_concurrency.into()),
253249
|(shard_id, source)| {
254250
retry(move || {
255251
self.transfer_shard(shard_id, source, keyspace_version)

crates/migration/src/manager/test.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,6 @@ impl super::Config for Config {
5757
) -> &'a Self::OutboundReplicaConnection {
5858
&node.storage
5959
}
60-
61-
fn concurrency(&self) -> usize {
62-
100
63-
}
6460
}
6561

6662
#[derive(AsRef, Clone)]
@@ -98,9 +94,10 @@ async fn transfers_data_and_writes_to_smart_contract() {
9894
max_node_operator_data_bytes: 1024,
9995
event_propagation_latency: Duration::from_secs(1),
10096
clock_skew: Duration::from_millis(100),
97+
migration_concurrency: 100,
10198
},
10299
(0..8)
103-
.map(|idx| cluster::testing::node_operator(idx as u8))
100+
.map(|idx: u8| cluster::testing::node_operator(idx))
104101
.collect(),
105102
)
106103
.await

crates/node/src/lib.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -284,10 +284,6 @@ impl wcn_migration::manager::Config for AppConfig {
284284
) -> &'a Self::OutboundReplicaConnection {
285285
&node.replica_low_prio_connection
286286
}
287-
288-
fn concurrency(&self) -> usize {
289-
100
290-
}
291287
}
292288

293289
impl wcn_metrics::aggregator::Config for AppConfig {

crates/replication/src/coordinator/test.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,11 @@ impl Context {
9090
max_node_operator_data_bytes: 1024,
9191
event_propagation_latency: Duration::from_secs(3),
9292
clock_skew: Duration::from_secs(1),
93+
migration_concurrency: 100,
9394
},
9495
(0..8)
95-
.map(|idx| {
96-
wcn_cluster::testing::node_operator(idx as u8).tap_mut(|operator| {
96+
.map(|idx: u8| {
97+
wcn_cluster::testing::node_operator(idx).tap_mut(|operator| {
9798
if idx == 0 {
9899
operator.clients = vec![Client {
99100
peer_id: client_peer_id,

crates/replication/src/replica/test.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,10 @@ impl Context {
7171
max_node_operator_data_bytes: 1024,
7272
event_propagation_latency: Duration::from_secs(4),
7373
clock_skew: Duration::from_secs(2),
74+
migration_concurrency: 100,
7475
},
7576
(0..8)
76-
.map(|idx| wcn_cluster::testing::node_operator(idx as u8))
77+
.map(|idx: u8| wcn_cluster::testing::node_operator(idx))
7778
.collect(),
7879
)
7980
.await

crates/testing/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ impl TestCluster {
9292
max_node_operator_data_bytes: 4096,
9393
event_propagation_latency: Duration::from_secs(1),
9494
clock_skew: Duration::from_millis(100),
95+
migration_concurrency: 100,
9596
};
9697

9798
tracing::info!(port = %anvil.port(), "Anvil launched");

0 commit comments

Comments
 (0)