Skip to content

Commit cedb577

Browse files
authored
Merge pull request helium#260 from helium/madninja/track_gateway_sc_follower
Track gateway changes in router client sc_follower
2 parents a9ab107 + 73c8b66 commit cedb577

File tree

2 files changed

+53
-23
lines changed

2 files changed

+53
-23
lines changed

src/router/client.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,13 @@ impl RouterClient {
148148
Some(Message::GatewayChanged(gateway)) => {
149149
info!(logger, "gateway changed");
150150
self.gateway = gateway;
151+
match self.state_channel_follower.set_gateway(self.gateway.as_mut()).await {
152+
Ok(()) => (),
153+
Err(err) => {
154+
warn!(logger, "ignoring gateway service setup error: {err:?}");
155+
let _ = self.state_channel_follower.set_gateway(None).await;
156+
}
157+
}
151158
},
152159
Some(Message::Region(region)) => {
153160
self.region = region;
@@ -166,15 +173,14 @@ impl RouterClient {
166173
.await
167174
},
168175
Some(Err(err)) => {
169-
warn!(logger, "gateway service error, shutting down: {:?}", err);
170-
return Ok(())
176+
warn!(logger, "ignoring gateway service error: {:?}", err);
171177
}
172178
// The follower service has closd or errored out. Give up
173179
// since the dispatcher will notice the disconnect/error and
174180
// reconnect a potentially different gateway
175181
None => {
176-
warn!(logger, "gateway service disconnected, shutting down");
177-
return Ok(())
182+
warn!(logger, "gateway service disconnected");
183+
let _ = self.state_channel_follower.set_gateway(None).await;
178184
},
179185
},
180186
sc_message = self.state_channel.message() => match sc_message {

src/service/gateway.rs

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -89,38 +89,62 @@ impl Response for GatewayRespV1 {
8989

9090
#[derive(Debug)]
9191
pub struct StateChannelFollowService {
92-
tx: mpsc::Sender<GatewayScFollowReqV1>,
93-
rx: Streaming,
92+
tx: Option<mpsc::Sender<GatewayScFollowReqV1>>,
93+
rx: Option<Streaming>,
9494
}
9595

9696
impl StateChannelFollowService {
97-
pub async fn new(mut client: GatewayClient, verifier: Arc<PublicKey>) -> Result<Self> {
98-
let (tx, client_rx) = mpsc::channel(3);
99-
let streaming = client
100-
.follow_sc(ReceiverStream::new(client_rx))
101-
.await?
102-
.into_inner();
103-
let rx = Streaming {
104-
streaming,
105-
verifier,
106-
};
107-
Ok(Self { tx, rx })
97+
pub async fn new(gateway: &mut GatewayService) -> Result<Self> {
98+
let mut result = Self { tx: None, rx: None };
99+
result.set_gateway(Some(gateway)).await?;
100+
Ok(result)
108101
}
109102

110103
pub async fn send(&mut self, id: &[u8], owner: &[u8]) -> Result {
111-
let msg = GatewayScFollowReqV1 {
112-
sc_id: id.into(),
113-
sc_owner: owner.into(),
104+
match self.tx.as_mut() {
105+
Some(tx) => {
106+
let msg = GatewayScFollowReqV1 {
107+
sc_id: id.into(),
108+
sc_owner: owner.into(),
109+
};
110+
Ok(tx.send(msg).await?)
111+
}
112+
None => Err(Error::no_service()),
113+
}
114+
}
115+
116+
pub async fn set_gateway(&mut self, gateway: Option<&mut GatewayService>) -> Result {
117+
let (tx, rx) = match gateway {
118+
Some(gateway) => {
119+
let (tx, client_rx) = mpsc::channel(3);
120+
let streaming = gateway
121+
.client
122+
.follow_sc(ReceiverStream::new(client_rx))
123+
.await?
124+
.into_inner();
125+
let rx = Streaming {
126+
streaming,
127+
verifier: gateway.uri.pubkey.clone(),
128+
};
129+
(Some(tx), Some(rx))
130+
}
131+
None => (None, None),
114132
};
115-
Ok(self.tx.send(msg).await?)
133+
self.tx = tx;
134+
self.rx = rx;
135+
Ok(())
116136
}
117137
}
118138

119139
impl Stream for StateChannelFollowService {
120140
type Item = Result<GatewayRespV1>;
121141

122142
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
123-
Pin::new(&mut self.rx).poll_next(cx)
143+
if let Some(rx) = self.rx.as_mut() {
144+
Pin::new(rx).poll_next(cx)
145+
} else {
146+
Poll::Pending
147+
}
124148
}
125149
}
126150

@@ -222,7 +246,7 @@ impl GatewayService {
222246
}
223247

224248
pub async fn follow_sc(&mut self) -> Result<StateChannelFollowService> {
225-
StateChannelFollowService::new(self.client.clone(), self.uri.pubkey.clone()).await
249+
StateChannelFollowService::new(self).await
226250
}
227251

228252
pub async fn close_sc(&mut self, close_txn: BlockchainTxnStateChannelCloseV1) -> Result {

0 commit comments

Comments
 (0)