Skip to content

Commit 743207f

Browse files
authored
feat(cluster): add private IP address to Node (#253)
1 parent f4ae9c5 commit 743207f

File tree

17 files changed

+439
-262
lines changed

17 files changed

+439
-262
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
250917.0
1+
250923.0

crates/cluster/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ tokio = { workspace = true, features = ["time", "rt"] }
4747
tokio-stream = { workspace = true }
4848

4949
# Domain specific
50-
libp2p-identity = { workspace = true, features = ["serde"] }
50+
libp2p-identity = { workspace = true, features = ["serde", "rand"] }
5151
xxhash-rust = { workspace = true }
5252
multihash = { version = "0.19", default-features = false }
5353
alloy = { workspace = true, features = [

crates/cluster/src/node.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,18 @@ pub(crate) struct V0 {
6868
pub secondary_port: u16,
6969
}
7070

71+
// NOTE: The on-chain serialization is non self-describing!
72+
// This `struct` can not be changed, a `struct` with a new schema version should
73+
// be created instead.
74+
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
75+
pub(crate) struct V1 {
76+
pub peer_id: PeerId,
77+
pub ipv4_addr: Ipv4Addr,
78+
pub private_ipv4_addr: Option<Ipv4Addr>,
79+
pub primary_port: u16,
80+
pub secondary_port: u16,
81+
}
82+
7183
impl From<Node> for V0 {
7284
fn from(node: Node) -> Self {
7385
Self {
@@ -90,6 +102,29 @@ impl From<V0> for Node {
90102
}
91103
}
92104

105+
impl From<Node> for V1 {
106+
fn from(node: Node) -> Self {
107+
Self {
108+
peer_id: node.peer_id,
109+
ipv4_addr: node.ipv4_addr,
110+
private_ipv4_addr: None,
111+
primary_port: node.primary_port,
112+
secondary_port: node.secondary_port,
113+
}
114+
}
115+
}
116+
117+
impl From<V1> for Node {
118+
fn from(node: V1) -> Self {
119+
Self {
120+
peer_id: node.peer_id,
121+
ipv4_addr: node.ipv4_addr,
122+
primary_port: node.primary_port,
123+
secondary_port: node.secondary_port,
124+
}
125+
}
126+
}
127+
93128
struct Ipv4AddrNumeral(FlexibleNumeralString);
94129

95130
impl From<Ipv4Addr> for Ipv4AddrNumeral {

crates/cluster/src/node_operator.rs

Lines changed: 79 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33
use {
44
crate::{self as cluster, client, node, smart_contract, Client, Config, EncryptionKey, Node},
55
derive_more::derive::AsRef,
6+
libp2p_identity::PeerId,
67
serde::{Deserialize, Serialize},
7-
std::sync::{
8-
atomic::{self, AtomicUsize},
9-
Arc,
8+
std::{
9+
net::Ipv4Addr,
10+
sync::{
11+
atomic::{self, AtomicUsize},
12+
Arc,
13+
},
1014
},
1115
tap::Tap,
1216
};
@@ -189,11 +193,22 @@ impl NodeOperator {
189193
}
190194
}
191195

192-
impl<C: Config> NodeOperator<C> {
193-
pub(super) fn try_from_sc(
196+
impl<N> NodeOperator<N> {
197+
pub(super) fn from_sc(
194198
operator: smart_contract::NodeOperator,
195-
cfg: &C,
196-
) -> Result<NodeOperator<C::Node>, TryFromSmartContractError> {
199+
cfg: &impl Config<Node = N>,
200+
) -> Self {
201+
let id = operator.id;
202+
203+
NodeOperator::try_from_sc(operator, cfg)
204+
.map_err(|err| tracing::warn!(%id, ?err, "Invalid NodeOperator"))
205+
.unwrap_or_else(|_| NodeOperator::new_corrupted(id, cfg))
206+
}
207+
208+
fn try_from_sc(
209+
operator: smart_contract::NodeOperator,
210+
cfg: &impl Config<Node = N>,
211+
) -> Result<Self, TryFromSmartContractError> {
197212
use TryFromSmartContractError as Error;
198213

199214
let data_bytes = operator.data;
@@ -205,22 +220,57 @@ impl<C: Config> NodeOperator<C> {
205220
let schema_version = data_bytes[0];
206221
let bytes = &data_bytes[1..];
207222

208-
let data = match schema_version {
209-
0 => postcard::from_bytes::<DataV0>(bytes),
210-
ver => return Err(Error::UnknownSchemaVersion(ver)),
223+
match schema_version {
224+
0 => {
225+
let data = postcard::from_bytes::<DataV0>(bytes).map_err(Error::from_postcard)?;
226+
227+
let nodes = data
228+
.nodes
229+
.into_iter()
230+
.map(|v0| Node::from(v0).tap_mut(|node| node.decrypt(cfg.as_ref())))
231+
.map(|node| cfg.new_node(operator.id, node))
232+
.collect();
233+
234+
let clients = data.clients.into_iter().map(Into::into).collect();
235+
236+
Ok(NodeOperator::new(operator.id, data.name, nodes, clients)?)
237+
}
238+
1 => {
239+
let data = postcard::from_bytes::<DataV1>(bytes).map_err(Error::from_postcard)?;
240+
241+
let nodes = data
242+
.nodes
243+
.into_iter()
244+
.map(|v1| Node::from(v1).tap_mut(|node| node.decrypt(cfg.as_ref())))
245+
.map(|node| cfg.new_node(operator.id, node))
246+
.collect();
247+
248+
let clients = data.clients.into_iter().map(Into::into).collect();
249+
250+
Ok(NodeOperator::new(operator.id, data.name, nodes, clients)?)
251+
}
252+
ver => Err(Error::UnknownSchemaVersion(ver)),
211253
}
212-
.map_err(Error::from_postcard)?;
254+
}
213255

214-
let nodes = data
215-
.nodes
216-
.into_iter()
217-
.map(|v0| Node::from(v0).tap_mut(|node| node.decrypt(cfg.as_ref())))
218-
.map(|node| cfg.new_node(operator.id, node))
256+
fn new_corrupted(id: Id, cfg: &impl Config<Node = N>) -> Self {
257+
let nodes = (0..MIN_NODES)
258+
.map(|_| Node {
259+
peer_id: PeerId::random(),
260+
ipv4_addr: Ipv4Addr::UNSPECIFIED,
261+
primary_port: 0,
262+
secondary_port: 0,
263+
})
264+
.map(|node| cfg.new_node(id, node))
219265
.collect();
220266

221-
let clients = data.clients.into_iter().map(Into::into).collect();
222-
223-
Ok(NodeOperator::new(operator.id, data.name, nodes, clients)?)
267+
Self {
268+
id,
269+
name: Name("CORRUPTED_DATA".to_string()),
270+
clients: vec![],
271+
nodes,
272+
counter: Default::default(),
273+
}
224274
}
225275
}
226276

@@ -234,6 +284,16 @@ struct DataV0 {
234284
clients: Vec<client::V0>,
235285
}
236286

287+
// NOTE: The on-chain serialization is non self-describing!
288+
// This `struct` can not be changed, a `struct` with a new schema version should
289+
// be created instead.
290+
#[derive(Debug, Clone, Deserialize, Serialize)]
291+
struct DataV1 {
292+
name: Name,
293+
nodes: Vec<node::V1>,
294+
clients: Vec<client::V0>,
295+
}
296+
237297
#[derive(Debug, thiserror::Error)]
238298
pub enum TryIntoSmartContractError {
239299
#[error("Codec: {0}")]

crates/cluster/src/view.rs

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use {
2020
Settings,
2121
},
2222
derive_where::derive_where,
23-
itertools::Itertools,
2423
std::sync::Arc,
2524
tap::Pipe as _,
2625
time::OffsetDateTime,
@@ -174,6 +173,7 @@ impl<C: Config> View<C> {
174173

175174
let time = migration.started_at
176175
+ self.settings.event_propagation_latency
176+
+ self.settings.clock_skew
177177
+ migration::PULL_DATA_LEEWAY_TIME;
178178

179179
Some(time)
@@ -232,16 +232,11 @@ impl<C: Config> View<C> {
232232
/// Checks whether a data pull with the specified [`keyspace::Version`] can
233233
/// be executed at this moment.
234234
pub fn validate_data_pull(&self, keyspace_version: keyspace::Version) -> bool {
235-
if !self.migration.is_in_progress() {
235+
let Some(scheduled_after) = self.data_pull_scheduled_after() else {
236236
return false;
237-
}
238-
239-
let allowed_after = self.migration.started_at
240-
+ self.settings.event_propagation_latency
241-
+ self.settings.clock_skew
242-
+ migration::PULL_DATA_LEEWAY_TIME;
237+
};
243238

244-
OffsetDateTime::now_utc() >= allowed_after && self.keyspace_version == keyspace_version
239+
OffsetDateTime::now_utc() >= scheduled_after && self.keyspace_version == keyspace_version
245240
}
246241

247242
pub(super) fn require_no_migration(&self) -> Result<(), migration::InProgressError> {
@@ -320,11 +315,7 @@ impl<C: Config> View<C> {
320315
let node_operators = view
321316
.node_operators
322317
.into_iter()
323-
.map(|slot| {
324-
slot.map(|operator| NodeOperator::try_from_sc(operator, cfg))
325-
.transpose()
326-
})
327-
.try_collect::<_, Vec<_>, _>()?
318+
.map(|slot| slot.map(|operator| NodeOperator::from_sc(operator, cfg)))
328319
.pipe(NodeOperators::from_slots)?;
329320

330321
let ownership = Ownership::new(view.owner);
@@ -470,10 +461,8 @@ impl smart_contract::event::NodeOperatorAdded {
470461
.require_not_exists(&self.operator.id)?
471462
.require_free_slot(self.idx)?;
472463

473-
view.node_operators.set(
474-
self.idx,
475-
Some(NodeOperator::try_from_sc(self.operator, cfg)?),
476-
);
464+
view.node_operators
465+
.set(self.idx, Some(NodeOperator::from_sc(self.operator, cfg)));
477466

478467
Ok(())
479468
}
@@ -483,7 +472,7 @@ impl smart_contract::event::NodeOperatorUpdated {
483472
pub(super) fn apply<C: Config>(self, cfg: &C, view: &mut View<C>) -> Result<()> {
484473
let idx = view.node_operators.require_idx(&self.operator.id)?;
485474
view.node_operators
486-
.set(idx, Some(NodeOperator::try_from_sc(self.operator, cfg)?));
475+
.set(idx, Some(NodeOperator::from_sc(self.operator, cfg)));
487476

488477
Ok(())
489478
}

crates/migration/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ backoff = { workspace = true }
1919
tap = { workspace = true }
2020
time = { workspace = true }
2121

22+
# Collections
23+
smallvec = { workspace = true }
24+
2225
# Async
2326
futures = { workspace = true }
2427
futures-concurrency = { workspace = true }

crates/migration/src/manager/mod.rs

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use {
33
backoff::ExponentialBackoffBuilder,
44
futures::{stream, FutureExt as _, StreamExt, TryFutureExt},
55
futures_concurrency::future::Race,
6+
smallvec::SmallVec,
67
std::{
78
fmt,
89
future::{self, Future},
@@ -14,7 +15,7 @@ use {
1415
time::OffsetDateTime,
1516
wcn_cluster::{
1617
self as cluster,
17-
keyspace,
18+
keyspace::{self, ReplicaSet},
1819
migration,
1920
node_operator,
2021
smart_contract::{self, Write},
@@ -219,41 +220,43 @@ where
219220
.secondary_keyspace_shards()
220221
.ok_or_else(|| anyhow!("Missing secondary keyspace shards"))?;
221222

222-
let replica_idx = |shard: &keyspace::Shard<&NodeOperator<_>>| {
223-
shard
224-
.replica_set()
225-
.iter()
226-
.position(|op| op.id == self.manager.node_operator_id)
227-
};
223+
// a separate closure because otherwise rustfmt breaks and refuses to format
224+
// this function
225+
let log_err = |err, shard_id, source_id| tracing::warn!(?err, %shard_id, %source_id, "Failed to transfer shard");
228226

229227
primary_shards
230228
.zip(secondary_shards)
231229
.filter_map(|((shard_id, primary), (_, secondary))| {
232-
// If operator is not in the new replica set skip this shard.
233-
let secondary_idx = replica_idx(&secondary)?;
230+
let old = primary.replica_set();
231+
let new = secondary.replica_set();
232+
233+
let added_operators = replica_set_difference(new, old);
234+
235+
let idx = added_operators
236+
.iter()
237+
.position(|op| op.id == self.manager.node_operator_id)?;
234238

235-
// Also skip if operator is in the old replica set. Meaning that it's in both,
236-
// so we don't need to transfer any data.
237-
let None = replica_idx(&primary) else {
238-
return None;
239-
};
239+
// Skip the shard if this node operator is not being newly added to it.
240+
let removed_operators = replica_set_difference(old, new);
240241

241-
// Pull data only from the operator previously occupying the same replica index
242-
// to guarantee consistency.
242+
// Pull data only from the operator being replaced to ensure consistency.
243243
// TODO: We should have a way to override this behaviour for special
244244
// circumstances, for example if a node operator is completely dead and we are
245245
// forcefully removing it.
246-
let source = primary.replica_set()[secondary_idx];
246+
let source = removed_operators[idx];
247247

248248
Some((shard_id, source))
249249
})
250250
.pipe(stream::iter)
251-
.for_each_concurrent(Some(self.manager.config.concurrency()), |(shard_id, source)| {
252-
retry(move || {
253-
self.transfer_shard(shard_id, source, keyspace_version)
254-
.map_err(move |err| tracing::warn!(?err, %shard_id, source = %source.id, "Failed to transfer shard"))
255-
})
256-
})
251+
.for_each_concurrent(
252+
Some(self.manager.config.concurrency()),
253+
|(shard_id, source)| {
254+
retry(move || {
255+
self.transfer_shard(shard_id, source, keyspace_version)
256+
.map_err(move |err| log_err(err, shard_id, source.id))
257+
})
258+
},
259+
)
257260
.await;
258261

259262
Ok(State::CompletingMigration(migration_id))
@@ -406,3 +409,19 @@ where
406409
// emitted.
407410
backoff::future::retry(backoff, f).await.unwrap()
408411
}
412+
413+
// Returns `NodeOperator`s which are in `a`, but not in `b`.
414+
fn replica_set_difference<'a, T>(
415+
a: &ReplicaSet<&'a NodeOperator<T>>,
416+
b: &ReplicaSet<&NodeOperator<T>>,
417+
) -> SmallVec<[&'a NodeOperator<T>; 5]> {
418+
let mut diff = SmallVec::new();
419+
420+
for operator in a {
421+
if !b.iter().any(|op| op.id == operator.id) {
422+
diff.push(*operator);
423+
}
424+
}
425+
426+
diff
427+
}

0 commit comments

Comments
 (0)