Skip to content

Commit e93aa0d

Browse files
committed
docs: explanation for eventsub websocket connection handling
1 parent 0117394 commit e93aa0d

File tree

1 file changed

+50
-78
lines changed

1 file changed

+50
-78
lines changed

examples/eventsub_websocket/src/websocket.rs

Lines changed: 50 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use reqwest::Client;
99
use tokio::{
1010
sync::{
1111
mpsc::{self, UnboundedSender},
12-
Mutex, MutexGuard,
12+
Mutex,
1313
},
1414
task::{JoinError, JoinHandle},
1515
time::{Duration, Instant},
@@ -25,17 +25,12 @@ use twitch_api::{
2525
event::websocket::{EventsubWebsocketData, ReconnectPayload, WelcomePayload},
2626
Event, EventSubscription, Message, SessionData, Transport,
2727
},
28-
helix::{eventsub::CreateEventSubSubscription, ClientRequestError, HelixRequestPostError},
28+
helix::eventsub::CreateEventSubSubscription,
2929
twitch_oauth2::{TwitchToken, UserToken},
3030
types, HelixClient,
3131
};
3232

3333
/// Connect to the websocket and return the stream
34-
///
35-
/// eventsub websocket doesn't support outgoing messages except pongs (which are implicitly handled by tungstenite)
36-
/// so we return only the receiving end of the socket
37-
///
38-
/// [Getting Events Using WebSockets](https://dev.twitch.tv/docs/eventsub/handling-websocket-events/)
3934
async fn connect(
4035
request: impl IntoClientRequest + Unpin,
4136
) -> Result<SplitStream<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>>, eyre::Error> {
@@ -60,18 +55,17 @@ async fn refresh_if_expired(
6055
token: Arc<Mutex<UserToken>>,
6156
helix_client: &HelixClient<'_, Client>,
6257
_opts: &crate::Opts,
63-
) -> eyre::Result<()> {
64-
let lock: MutexGuard<'_, UserToken> = token.lock().await;
58+
) {
59+
let lock = token.lock().await;
6560

6661
if lock.expires_in() >= Duration::from_secs(60) {
67-
return Ok(());
62+
return;
6863
}
6964
let _client = helix_client.get_client();
7065

71-
/* TODO: token refresh logic is left up to the user */
66+
// TODO: token refresh logic is left up to the user
7267

7368
drop(lock);
74-
Ok(())
7569
}
7670

7771
async fn subscribe(
@@ -81,30 +75,16 @@ async fn subscribe(
8175
subscription: impl EventSubscription + Send,
8276
) -> eyre::Result<()> {
8377
let transport: Transport = Transport::websocket(session_id);
84-
let event_info: Result<CreateEventSubSubscription<_>, ClientRequestError<reqwest::Error>> =
85-
helix_client
86-
.create_eventsub_subscription(subscription, transport, token)
87-
.await;
88-
match event_info {
89-
Err(ClientRequestError::HelixRequestPostError(HelixRequestPostError::Error {
90-
status,
91-
..
92-
})) if status.as_u16() == 409 => {
93-
tracing::warn!("409 subscription already exists");
94-
}
95-
_ => {
96-
event_info?;
97-
}
98-
}
78+
let _event_info: CreateEventSubSubscription<_> = helix_client
79+
.create_eventsub_subscription(subscription, transport, token)
80+
.await?;
9981
Ok(())
10082
}
10183

10284
/// action to perform on received message
103-
enum SideEffect {
85+
enum Action {
10486
/// do nothing with the message
10587
Nothing,
106-
/// do something useful with received message
107-
Something(types::UserId),
10888
/// reset the timeout and keep the connection alive
10989
ResetKeepalive,
11090
/// kill predecessor and swap the handle
@@ -120,11 +100,11 @@ async fn process_welcome(
120100
user_id: &types::UserId,
121101
session: SessionData<'_>,
122102
) -> eyre::Result<()> {
123-
// preventing duplicating subscriptions and hitting 409
124-
if !subscribed.load(Ordering::Relaxed) {
103+
// if we're already subscribed, don't subscribe again
104+
if subscribed.load(Ordering::Relaxed) {
125105
return Ok(());
126106
}
127-
let user_token: MutexGuard<'_, UserToken> = token.lock().await;
107+
let user_token = token.lock().await;
128108
tokio::try_join!(
129109
subscribe(
130110
helix_client,
@@ -139,28 +119,26 @@ async fn process_welcome(
139119
ChannelUnbanV1::broadcaster_user_id(user_id.clone()),
140120
),
141121
)?;
142-
drop(user_token);
143122
subscribed.store(true, Ordering::Relaxed);
144123
Ok(())
145124
}
146125

147126
/// Here is where you would handle the events you want to listen to
148-
fn process_payload(event: Event) -> eyre::Result<SideEffect> {
127+
fn process_payload(event: Event) -> eyre::Result<Action> {
149128
match event {
150129
Event::ChannelBanV1(eventsub::Payload { message, .. }) => {
151130
match message {
152131
// not needed for websocket
153132
Message::VerificationRequest(_) => unreachable!(),
154133
Message::Revocation() => Err(eyre::eyre!("unexpected subscription revocation")),
155134
Message::Notification(payload) => {
156-
/*
157-
do something useful with the payload
158-
*/
159-
tracing::info!("doing something useful with channel.ban {payload:?}");
135+
// do something useful with the payload
136+
tracing::info!(?payload, "got ban event");
160137

161-
Ok(SideEffect::Something(payload.user_id))
138+
// new events reset keepalive timeout too
139+
Ok(Action::ResetKeepalive)
162140
}
163-
_ => Ok(SideEffect::Nothing),
141+
_ => Ok(Action::Nothing),
164142
}
165143
}
166144
Event::ChannelUnbanV1(eventsub::Payload { message, .. }) => {
@@ -169,17 +147,16 @@ fn process_payload(event: Event) -> eyre::Result<SideEffect> {
169147
Message::VerificationRequest(_) => unreachable!(),
170148
Message::Revocation() => Err(eyre::eyre!("unexpected subscription revocation")),
171149
Message::Notification(payload) => {
172-
/*
173-
do something useful with the payload
174-
*/
175-
tracing::info!("doing something useful with channel.unban {payload:?}");
150+
// do something useful with the payload
151+
tracing::info!(?payload, "got unban event");
176152

177-
Ok(SideEffect::Something(payload.user_id))
153+
// new events reset keepalive timeout too
154+
Ok(Action::ResetKeepalive)
178155
}
179-
_ => Ok(SideEffect::Nothing),
156+
_ => Ok(Action::Nothing),
180157
}
181158
}
182-
_ => Ok(SideEffect::Nothing),
159+
_ => Ok(Action::Nothing),
183160
}
184161
}
185162

@@ -190,7 +167,7 @@ struct WebSocketConnection {
190167
opts: Arc<crate::Opts>,
191168
subscribed: Arc<AtomicBool>,
192169
user_id: Arc<types::UserId>,
193-
self_killer: UnboundedSender<()>,
170+
kill_self_tx: UnboundedSender<()>,
194171
}
195172

196173
impl WebSocketConnection {
@@ -209,15 +186,15 @@ impl WebSocketConnection {
209186
WsMessage::Ping(_) | WsMessage::Pong(_) => {
210187
// no need to do anything as tungstenite automatically handles pings for you
211188
// but refresh the token just in case
212-
refresh_if_expired(self.token.clone(), self.helix_client, &self.opts).await?;
189+
refresh_if_expired(self.token.clone(), self.helix_client, &self.opts).await;
213190
Ok(None)
214191
}
215192
WsMessage::Binary(_) => unimplemented!(),
216193
WsMessage::Text(payload) => Ok(Some(payload)),
217194
}
218195
}
219196

220-
async fn process_message(&self, frame: String) -> eyre::Result<SideEffect> {
197+
async fn process_message(&self, frame: String) -> eyre::Result<Action> {
221198
let event_data = Event::parse_websocket(&frame).context("parsing error")?;
222199
match event_data {
223200
EventsubWebsocketData::Welcome {
@@ -232,7 +209,7 @@ impl WebSocketConnection {
232209
session,
233210
)
234211
.await?;
235-
Ok(SideEffect::KillPredecessor)
212+
Ok(Action::KillPredecessor)
236213
}
237214
EventsubWebsocketData::Reconnect {
238215
payload: ReconnectPayload { session },
@@ -242,21 +219,20 @@ impl WebSocketConnection {
242219
let successor = ActorHandle::spawn(
243220
url,
244221
self.helix_client,
245-
self.self_killer.clone(),
222+
self.kill_self_tx.clone(),
246223
self.token.clone(),
247224
self.opts.clone(),
248225
self.subscribed.clone(),
249226
self.user_id.clone(),
250227
);
251-
Ok(SideEffect::AssignSuccessor(successor))
228+
Ok(Action::AssignSuccessor(successor))
252229
}
253-
// TODO: keepalive counting https://dev.twitch.tv/docs/eventsub/handling-websocket-events/#keepalive-message
254-
EventsubWebsocketData::Keepalive { .. } => Ok(SideEffect::ResetKeepalive),
230+
EventsubWebsocketData::Keepalive { .. } => Ok(Action::ResetKeepalive),
255231
EventsubWebsocketData::Revocation { metadata, .. } => {
256232
eyre::bail!("got revocation: {metadata:?}")
257233
}
258234
EventsubWebsocketData::Notification { payload: event, .. } => process_payload(event),
259-
_ => Ok(SideEffect::Nothing),
235+
_ => Ok(Action::Nothing),
260236
}
261237
}
262238
}
@@ -267,15 +243,18 @@ impl ActorHandle {
267243
pub fn spawn(
268244
url: impl IntoClientRequest + Unpin + Send + 'static,
269245
helix_client: &'static HelixClient<'_, Client>,
270-
predecessor_killer: UnboundedSender<()>,
246+
kill_predecessor_tx: UnboundedSender<()>,
271247
token: Arc<Mutex<UserToken>>,
272248
opts: Arc<crate::Opts>,
273249
subscribed: Arc<AtomicBool>,
274250
user_id: Arc<types::UserId>,
275251
) -> Self {
276252
Self(tokio::spawn(async move {
277253
let socket = connect(url).await?;
278-
let (self_killer, mut terminator) = mpsc::unbounded_channel::<()>();
254+
// If we receive a reconnect message we want to spawn a new connection to twitch.
255+
// The already existing session should wait on the new session to receive a welcome message before being closed.
256+
// https://dev.twitch.tv/docs/eventsub/handling-websocket-events/#reconnect-message
257+
let (kill_self_tx, mut kill_self_rx) = mpsc::unbounded_channel::<()>();
279258

280259
let mut connection = WebSocketConnection {
281260
socket,
@@ -284,7 +263,7 @@ impl ActorHandle {
284263
opts,
285264
subscribed,
286265
user_id,
287-
self_killer,
266+
kill_self_tx,
288267
};
289268

290269
/// default keepalive duration is 10 seconds
@@ -295,7 +274,7 @@ impl ActorHandle {
295274
loop {
296275
tokio::select! {
297276
biased;
298-
result = terminator.recv() => {
277+
result = kill_self_rx.recv() => {
299278
result.unwrap();
300279
let Some(successor) = successor else {
301280
// can't receive death signal from successor if it isn't spawned yet
@@ -306,18 +285,10 @@ impl ActorHandle {
306285
result = connection.receive_message() => if let Some(frame) = result? {
307286
let side_effect = connection.process_message(frame).await?;
308287
match side_effect {
309-
SideEffect::Nothing => {}
310-
SideEffect::ResetKeepalive => timeout = Instant::now() + Duration::from_secs(WINDOW),
311-
SideEffect::Something(user_id) => {
312-
// reset keepalive on event
313-
timeout = Instant::now() + Duration::from_secs(WINDOW);
314-
tracing::info!(
315-
"doing something useful with user id {user_id:?}"
316-
);
317-
/* TODO */
318-
},
319-
SideEffect::KillPredecessor => predecessor_killer.send(())?,
320-
SideEffect::AssignSuccessor(actor_handle) => {
288+
Action::Nothing => {}
289+
Action::ResetKeepalive => timeout = Instant::now() + Duration::from_secs(WINDOW),
290+
Action::KillPredecessor => kill_predecessor_tx.send(())?,
291+
Action::AssignSuccessor(actor_handle) => {
321292
successor = Some(actor_handle);
322293
},
323294
}
@@ -342,13 +313,14 @@ pub async fn run(
342313
let user_id = Arc::new(user_id);
343314
let subscribed = Arc::new(AtomicBool::new(false));
344315

345-
// `_` and `_unused` have different semantics where `_` is dropped immediately,
346-
// so sender gets a recv error
347-
let (dummy_killer, _unused) = mpsc::unbounded_channel::<()>();
316+
// since this is a root actor without a predecessor it has no previous connection to kill
317+
// but we still need to give it a sender to satisfy the function signature.
318+
// `_` and `_unused` have different semantics where `_` is dropped immediately and sender gets a recv error
319+
let (dummy_tx, _unused_rx) = mpsc::unbounded_channel::<()>();
348320
let mut handle = ActorHandle::spawn(
349321
url.clone(),
350322
helix_client,
351-
dummy_killer.clone(),
323+
dummy_tx.clone(),
352324
token.clone(),
353325
opts.clone(),
354326
subscribed.clone(),
@@ -364,7 +336,7 @@ pub async fn run(
364336
ActorHandle::spawn(
365337
url.clone(),
366338
helix_client,
367-
dummy_killer.clone(),
339+
dummy_tx.clone(),
368340
token.clone(),
369341
opts.clone(),
370342
subscribed.clone(),

0 commit comments

Comments
 (0)