@@ -2,6 +2,7 @@ use std::fmt;
2
2
use std:: io;
3
3
use std:: net:: { SocketAddr , TcpListener as StdTcpListener } ;
4
4
use std:: time:: Duration ;
5
+ use socket2:: TcpKeepalive ;
5
6
6
7
use tokio:: net:: TcpListener ;
7
8
use tokio:: time:: Sleep ;
@@ -13,13 +14,65 @@ use crate::common::{task, Future, Pin, Poll};
13
14
pub use self :: addr_stream:: AddrStream ;
14
15
use super :: accept:: Accept ;
15
16
17
+ #[ derive( Default , Debug , Clone , Copy ) ]
18
+ struct TcpKeepaliveConfig {
19
+ time : Option < Duration > ,
20
+ interval : Option < Duration > ,
21
+ retries : Option < u32 > ,
22
+ }
23
+
24
+ impl TcpKeepaliveConfig {
25
+ /// Converts into a `socket2::TcpKeealive` if there is any keep alive configuration.
26
+ fn into_socket2 ( self ) -> Option < TcpKeepalive > {
27
+ let mut dirty = false ;
28
+ let mut ka = TcpKeepalive :: new ( ) ;
29
+ if let Some ( time) = self . time {
30
+ ka = ka. with_time ( time) ;
31
+ dirty = true
32
+ }
33
+ if let Some ( interval) = self . interval {
34
+ ka = Self :: ka_with_interval ( ka, interval, & mut dirty)
35
+ } ;
36
+ if let Some ( retries) = self . retries {
37
+ ka = Self :: ka_with_retries ( ka, retries, & mut dirty)
38
+ } ;
39
+ if dirty {
40
+ Some ( ka)
41
+ } else {
42
+ None
43
+ }
44
+ }
45
+
46
+ #[ cfg( not( any( target_os = "openbsd" , target_os = "redox" , target_os = "solaris" ) ) ) ]
47
+ fn ka_with_interval ( ka : TcpKeepalive , interval : Duration , dirty : & mut bool ) -> TcpKeepalive {
48
+ * dirty = true ;
49
+ ka. with_interval ( interval)
50
+ }
51
+
52
+ #[ cfg( any( target_os = "openbsd" , target_os = "redox" , target_os = "solaris" ) ) ]
53
+ fn ka_with_interval ( ka : TcpKeepalive , _: Duration , _: & mut bool ) -> TcpKeepalive {
54
+ ka // no-op as keepalive interval is not supported on this platform
55
+ }
56
+
57
+ #[ cfg( not( any( target_os = "openbsd" , target_os = "redox" , target_os = "solaris" , target_os = "windows" ) ) ) ]
58
+ fn ka_with_retries ( ka : TcpKeepalive , retries : u32 , dirty : & mut bool ) -> TcpKeepalive {
59
+ * dirty = true ;
60
+ ka. with_retries ( retries)
61
+ }
62
+
63
+ #[ cfg( any( target_os = "openbsd" , target_os = "redox" , target_os = "solaris" , target_os = "windows" ) ) ]
64
+ fn ka_with_retries ( ka : TcpKeepalive , _: u32 , _: & mut bool ) -> TcpKeepalive {
65
+ ka // no-op as keepalive retries is not supported on this platform
66
+ }
67
+ }
68
+
16
69
/// A stream of connections from binding to an address.
17
70
#[ must_use = "streams do nothing unless polled" ]
18
71
pub struct AddrIncoming {
19
72
addr : SocketAddr ,
20
73
listener : TcpListener ,
21
74
sleep_on_errors : bool ,
22
- tcp_keepalive_timeout : Option < Duration > ,
75
+ tcp_keepalive_config : TcpKeepaliveConfig ,
23
76
tcp_nodelay : bool ,
24
77
timeout : Option < Pin < Box < Sleep > > > ,
25
78
}
@@ -52,7 +105,7 @@ impl AddrIncoming {
52
105
listener,
53
106
addr,
54
107
sleep_on_errors : true ,
55
- tcp_keepalive_timeout : None ,
108
+ tcp_keepalive_config : TcpKeepaliveConfig :: default ( ) ,
56
109
tcp_nodelay : false ,
57
110
timeout : None ,
58
111
} )
@@ -63,13 +116,24 @@ impl AddrIncoming {
63
116
self . addr
64
117
}
65
118
66
- /// Set whether TCP keepalive messages are enabled on accepted connections .
119
+ /// Set the duration to remain idle before sending TCP keepalive probes .
67
120
///
68
- /// If `None` is specified, keepalive is disabled, otherwise the duration
69
- /// specified will be the time to remain idle before sending TCP keepalive
70
- /// probes.
71
- pub fn set_keepalive ( & mut self , keepalive : Option < Duration > ) -> & mut Self {
72
- self . tcp_keepalive_timeout = keepalive;
121
+ /// If `None` is specified, keepalive is disabled.
122
+ pub fn set_keepalive ( & mut self , time : Option < Duration > ) -> & mut Self {
123
+ self . tcp_keepalive_config . time = time;
124
+ self
125
+ }
126
+
127
+ /// Set the duration between two successive TCP keepalive retransmissions,
128
+ /// if acknowledgement to the previous keepalive transmission is not received.
129
+ pub fn set_keepalive_interval ( & mut self , interval : Option < Duration > ) -> & mut Self {
130
+ self . tcp_keepalive_config . interval = interval;
131
+ self
132
+ }
133
+
134
+ /// Set the number of retransmissions to be carried out before declaring that remote end is not available.
135
+ pub fn set_keepalive_retries ( & mut self , retries : Option < u32 > ) -> & mut Self {
136
+ self . tcp_keepalive_config . retries = retries;
73
137
self
74
138
}
75
139
@@ -108,10 +172,9 @@ impl AddrIncoming {
108
172
loop {
109
173
match ready ! ( self . listener. poll_accept( cx) ) {
110
174
Ok ( ( socket, remote_addr) ) => {
111
- if let Some ( dur) = self . tcp_keepalive_timeout {
112
- let socket = socket2:: SockRef :: from ( & socket) ;
113
- let conf = socket2:: TcpKeepalive :: new ( ) . with_time ( dur) ;
114
- if let Err ( e) = socket. set_tcp_keepalive ( & conf) {
175
+ if let Some ( tcp_keepalive) = & self . tcp_keepalive_config . into_socket2 ( ) {
176
+ let sock_ref = socket2:: SockRef :: from ( & socket) ;
177
+ if let Err ( e) = sock_ref. set_tcp_keepalive ( tcp_keepalive) {
115
178
trace ! ( "error trying to set TCP keepalive: {}" , e) ;
116
179
}
117
180
}
@@ -188,7 +251,7 @@ impl fmt::Debug for AddrIncoming {
188
251
f. debug_struct ( "AddrIncoming" )
189
252
. field ( "addr" , & self . addr )
190
253
. field ( "sleep_on_errors" , & self . sleep_on_errors )
191
- . field ( "tcp_keepalive_timeout " , & self . tcp_keepalive_timeout )
254
+ . field ( "tcp_keepalive_config " , & self . tcp_keepalive_config )
192
255
. field ( "tcp_nodelay" , & self . tcp_nodelay )
193
256
. finish ( )
194
257
}
@@ -316,3 +379,49 @@ mod addr_stream {
316
379
}
317
380
}
318
381
}
382
+
383
+ #[ cfg( test) ]
384
+ mod tests {
385
+ use std:: time:: Duration ;
386
+ use crate :: server:: tcp:: TcpKeepaliveConfig ;
387
+
388
+ #[ test]
389
+ fn no_tcp_keepalive_config ( ) {
390
+ assert ! ( TcpKeepaliveConfig :: default ( ) . into_socket2( ) . is_none( ) ) ;
391
+ }
392
+
393
+ #[ test]
394
+ fn tcp_keepalive_time_config ( ) {
395
+ let mut kac = TcpKeepaliveConfig :: default ( ) ;
396
+ kac. time = Some ( Duration :: from_secs ( 60 ) ) ;
397
+ if let Some ( tcp_keepalive) = kac. into_socket2 ( ) {
398
+ assert ! ( format!( "{tcp_keepalive:?}" ) . contains( "time: Some(60s)" ) ) ;
399
+ } else {
400
+ panic ! ( "test failed" ) ;
401
+ }
402
+ }
403
+
404
+ #[ cfg( not( any( target_os = "openbsd" , target_os = "redox" , target_os = "solaris" ) ) ) ]
405
+ #[ test]
406
+ fn tcp_keepalive_interval_config ( ) {
407
+ let mut kac = TcpKeepaliveConfig :: default ( ) ;
408
+ kac. interval = Some ( Duration :: from_secs ( 1 ) ) ;
409
+ if let Some ( tcp_keepalive) = kac. into_socket2 ( ) {
410
+ assert ! ( format!( "{tcp_keepalive:?}" ) . contains( "interval: Some(1s)" ) ) ;
411
+ } else {
412
+ panic ! ( "test failed" ) ;
413
+ }
414
+ }
415
+
416
+ #[ cfg( not( any( target_os = "openbsd" , target_os = "redox" , target_os = "solaris" , target_os = "windows" ) ) ) ]
417
+ #[ test]
418
+ fn tcp_keepalive_retries_config ( ) {
419
+ let mut kac = TcpKeepaliveConfig :: default ( ) ;
420
+ kac. retries = Some ( 3 ) ;
421
+ if let Some ( tcp_keepalive) = kac. into_socket2 ( ) {
422
+ assert ! ( format!( "{tcp_keepalive:?}" ) . contains( "retries: Some(3)" ) ) ;
423
+ } else {
424
+ panic ! ( "test failed" ) ;
425
+ }
426
+ }
427
+ }
0 commit comments