44
55use self :: codec:: { TunnelCodec , TunnelMessage } ;
66use crate :: utils:: { hashing:: IntHashMap , types:: GameID } ;
7+ use bytes:: Bytes ;
78use futures_util:: { Sink , Stream } ;
89use hyper:: upgrade:: Upgraded ;
910use hyper_util:: rt:: TokioIo ;
@@ -17,8 +18,12 @@ use std::{
1718 Arc ,
1819 } ,
1920 task:: { ready, Context , Poll } ,
21+ time:: Duration ,
22+ } ;
23+ use tokio:: {
24+ sync:: mpsc,
25+ time:: { interval_at, Instant , Interval , MissedTickBehavior } ,
2026} ;
21- use tokio:: sync:: mpsc;
2227use tokio_util:: codec:: Framed ;
2328
2429use super :: sessions:: AssociationId ;
@@ -240,6 +245,8 @@ pub struct Tunnel {
240245 write_state : TunnelWriteState ,
241246 /// The service access
242247 service : Arc < TunnelService > ,
248+ /// Interval for sending keep alive messages
249+ keep_alive_interval : Interval ,
243250}
244251
245252impl Drop for Tunnel {
@@ -273,6 +280,9 @@ enum TunnelReadState {
273280}
274281
275282impl Tunnel {
283+ // Send keep-alive pings every 10s
284+ const KEEP_ALIVE_DELAY : Duration = Duration :: from_secs ( 10 ) ;
285+
276286 /// Starts a new tunnel on `io` using the tunnel `service`
277287 ///
278288 /// ## Arguments
@@ -294,13 +304,20 @@ impl Tunnel {
294304 . write ( )
295305 . insert_tunnel ( id, TunnelHandle { tx } ) ;
296306
307+ // Create the interval to track keep alive pings
308+ let keep_alive_start = Instant :: now ( ) + Self :: KEEP_ALIVE_DELAY ;
309+ let mut keep_alive_interval = interval_at ( keep_alive_start, Self :: KEEP_ALIVE_DELAY ) ;
310+
311+ keep_alive_interval. set_missed_tick_behavior ( MissedTickBehavior :: Delay ) ;
312+
297313 // Spawn the tunnel task
298314 tokio:: spawn ( Tunnel {
299315 service,
300316 id,
301317 io,
302318 rx,
303319 write_state : Default :: default ( ) ,
320+ keep_alive_interval,
304321 } ) ;
305322
306323 id
@@ -372,6 +389,11 @@ impl Tunnel {
372389 return Poll :: Ready ( TunnelReadState :: Stop ) ;
373390 } ;
374391
392+ // Ping messages can be ignored
393+ if message. index == 255 {
394+ return Poll :: Ready ( TunnelReadState :: Continue ) ;
395+ }
396+
375397 // Get the path through the tunnel
376398 let ( target_handle, index) = match self . service . get_tunnel_route ( self . id , message. index ) {
377399 Some ( value) => value,
@@ -413,6 +435,27 @@ impl Future for Tunnel {
413435 }
414436 }
415437
438+ // Write a ping message at the interval if we aren't already sending a message
439+ if this. keep_alive_interval . poll_tick ( cx) . is_ready ( ) {
440+ if let TunnelWriteState :: Recv = this. write_state {
441+ // Move to a writing state
442+ this. write_state = TunnelWriteState :: Write ( Some ( TunnelMessage {
443+ index : 255 ,
444+ message : Bytes :: new ( ) ,
445+ } ) ) ;
446+
447+ // Poll the writer with the new message
448+ if let Poll :: Ready ( next_state) = this. poll_write_state ( cx) {
449+ this. write_state = next_state;
450+
451+ // Tunnel has stopped
452+ if let TunnelWriteState :: Stop = this. write_state {
453+ return Poll :: Ready ( ( ) ) ;
454+ }
455+ }
456+ }
457+ }
458+
416459 Poll :: Pending
417460 }
418461}
@@ -443,6 +486,13 @@ mod codec {
443486 //! Length: 16-bits. Determines the size in bytes of the payload that follows
444487 //!
445488 //! Payload: Variable length. The message bytes payload of `Length`
489+ //!
490+ //!
491+ //! ## Keep alive
492+ //!
493+ //! The server will send keep-alive messages, these are in the same
494+ //! format as the packet above. However, the index will always be 255
495+ //! and the payload will be empty.
446496
447497 use bytes:: { Buf , BufMut , Bytes } ;
448498 use tokio_util:: codec:: { Decoder , Encoder } ;
0 commit comments