Skip to content

Commit b2387e2

Browse files
committed
Versions + really asynchronous IO
Ignore-this: 32abf6faee2d4be9c3a29058a0ad762a darcs-hash:9e8e17abf3ee2c5e396527ad7d93edfe5c96445d
1 parent de64705 commit b2387e2

File tree

5 files changed

+90
-79
lines changed

5 files changed

+90
-79
lines changed

Cargo.toml

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,25 +10,25 @@ documentation = "https://pijul.org/thrussh/doc/thrussh"
1010
license = "Apache-2.0"
1111
include = [
1212
"Cargo.toml",
13-
"src/server/mod.rs",
14-
"src/server/encrypted.rs",
15-
"src/negociation.rs",
16-
"src/kex.rs",
1713
"src/auth.rs",
18-
"src/cryptobuf.rs",
19-
"src/cipher",
20-
"src/cipher/mod.rs",
21-
"src/cipher/chacha20poly1305.rs",
22-
"src/msg.rs",
23-
"src/lib.rs",
2414
"src/encoding.rs",
25-
"src/client",
26-
"src/client/mod.rs",
27-
"src/client/encrypted.rs",
15+
"src/kex.rs",
2816
"src/key.rs",
17+
"src/lib.rs",
18+
"src/msg.rs",
19+
"src/negociation.rs",
20+
"src/pty.rs",
2921
"src/session.rs",
3022
"src/sshbuffer.rs",
31-
"src/pty.rs"
23+
"src/bin/client.rs",
24+
"src/bin/server.rs",
25+
"src/cipher/chacha20poly1305.rs",
26+
"src/cipher/clear.rs",
27+
"src/cipher/mod.rs",
28+
"src/client/mod.rs",
29+
"src/client/encrypted.rs",
30+
"src/server/mod.rs",
31+
"src/server/encrypted.rs"
3232
]
3333

3434
[dependencies]
@@ -37,19 +37,19 @@ byteorder = "0.5"
3737
bitflags = "0.7"
3838
libc = "0.2"
3939
log = "0.3"
40-
ring = { path = "../ring" }
40+
ring = "0.6.0-alpha"
4141
rustc-serialize = "0.3"
4242
time = "0.1"
4343
untrusted = "0.3.1"
4444
mio = "0.6"
4545
regex = "0.1"
4646
user = "0.1.1"
47-
cryptovec = "0.2"
47+
cryptovec = "0.3"
4848

49-
tokio-core = "*"
50-
futures = "*"
49+
tokio-core = "0.1"
50+
futures = "0.1"
5151

52-
env_logger = "*"
52+
env_logger = "0.3"
5353

5454
[dev-dependencies]
5555
tempdir="0.3"

src/cipher/mod.rs

Lines changed: 35 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@
1414
//
1515
use byteorder::{ByteOrder, BigEndian};
1616
use Error;
17-
use std::io::BufRead;
1817
use std;
19-
use cryptovec::CryptoVec;
2018
use sshbuffer::SSHBuffer;
2119
use std::num::Wrapping;
2220

@@ -106,53 +104,49 @@ pub trait SealingKey {
106104
fn seal(&self, seqn: u32, plaintext_in_ciphertext_out: &mut [u8], tag_out: &mut [u8]);
107105
}
108106

109-
/// Fills the read buffer, and returns whether a complete message has been read.
110-
fn read(stream: &mut BufRead,
111-
read_buffer: &mut CryptoVec,
112-
read_len: usize,
113-
bytes_read: &mut usize)
114-
-> Result<(), Error> {
115-
// This loop consumes something or returns, it cannot loop forever.
116-
loop {
117-
let consumed_len = {
118-
let buf = try!(stream.fill_buf());
119-
if read_buffer.len() + buf.len() < read_len + 4 {
120-
read_buffer.extend(buf);
121-
buf.len()
122-
} else {
123-
let consumed_len = read_len + 4 - read_buffer.len();
124-
read_buffer.extend(&buf[0..consumed_len]);
125-
consumed_len
126-
}
127-
};
128-
stream.consume(consumed_len);
129-
*bytes_read += consumed_len;
130-
if read_buffer.len() >= 4 + read_len {
131-
return Ok(());
132-
}
133-
}
134-
}
135-
136-
137107
impl CipherPair {
138-
pub fn read<'a>(&self,
139-
stream: &mut BufRead,
140-
buffer: &'a mut SSHBuffer)
141-
-> Result<&'a [u8], Error> {
108+
pub fn read<'a, R:std::io::Read>(&self,
109+
mut stream: R,
110+
buffer: &'a mut SSHBuffer)
111+
-> Result<&'a [u8], Error> {
142112

143113
let key = self.remote_to_local.as_opening_key();
144114

145115
let seqn = buffer.seqn.0;
146116

147117
if buffer.len == 0 {
118+
148119
buffer.buffer.clear();
149-
let mut len = [0; 4];
150-
try!(stream.read_exact(&mut len));
151-
buffer.buffer.extend(&len);
152-
let len = key.decrypt_packet_length(seqn, len);
120+
121+
while buffer.read_len_bytes < 4 {
122+
debug!("cipherpair: reading");
123+
let r = buffer.read_len_bytes;
124+
let extra = try!(stream.read(&mut buffer.len_bytes[r..]));
125+
if extra == 0 {
126+
return Ok(&[])
127+
} else {
128+
buffer.read_len_bytes += extra
129+
}
130+
}
131+
132+
buffer.buffer.extend(&buffer.len_bytes);
133+
let len = key.decrypt_packet_length(seqn, buffer.len_bytes);
153134
buffer.len = BigEndian::read_u32(&len) as usize + key.tag_len();
135+
buffer.read_len_bytes = 0;
136+
137+
}
138+
139+
while buffer.buffer.len() < 4 + buffer.len {
140+
// try!(read(stream, &mut buffer.buffer, buffer.len, &mut buffer.bytes));
141+
debug!("cipherpair: reading ciphertext");
142+
let current_len = buffer.buffer.len();
143+
let n = try!(buffer.buffer.read(4 + buffer.len - current_len, &mut stream));
144+
if n == 0 {
145+
return Ok(&[])
146+
} else {
147+
buffer.bytes += n;
148+
}
154149
}
155-
try!(read(stream, &mut buffer.buffer, buffer.len, &mut buffer.bytes));
156150

157151
let ciphertext_len = buffer.buffer.len() - key.tag_len();
158152
let (ciphertext, tag) = buffer.buffer.split_at_mut(ciphertext_len);
@@ -190,8 +184,8 @@ impl CipherPair {
190184
assert!(padding_length <= std::u8::MAX as usize);
191185
buffer.buffer.push(padding_length as u8);
192186
buffer.buffer.extend(payload);
193-
key.fill_padding(buffer.buffer.reserve(padding_length));
194-
buffer.buffer.reserve(key.tag_len());
187+
key.fill_padding(buffer.buffer.resize_mut(padding_length));
188+
buffer.buffer.resize_mut(key.tag_len());
195189

196190
let (plaintext, tag) = buffer.buffer[offset..]
197191
.split_at_mut(PACKET_LENGTH_LEN + packet_length);

src/client/mod.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ impl KexInit {
274274
&mut self.exchange.client_kex_init));
275275

276276
cipher.write(&self.exchange.client_kex_init[i0..], write_buffer);
277-
self.exchange.client_kex_init.truncate(i0);
277+
self.exchange.client_kex_init.resize(i0);
278278

279279

280280
Ok(KexDhDone {
@@ -339,7 +339,10 @@ impl<H: Handler> Future for Connection<TcpStream, H> {
339339
// If timeout, shutdown the socket.
340340
try_ready!(self.poll_timeout());
341341
loop {
342-
try_ready!(self.atomic_poll())
342+
debug!("client polling");
343+
if ! try_ready!(self.atomic_poll()) {
344+
return Ok(Async::Ready(()))
345+
}
343346
}
344347
}
345348
}
@@ -362,6 +365,7 @@ impl<H:Handler> Connection<TcpStream, H> {
362365
if let Async::Ready(()) = try!(timeout.poll()) {
363366
debug!("Timeout, shutdown");
364367
try_nb!(self.stream.get_mut().shutdown(std::net::Shutdown::Both));
368+
self.session.0.disconnected = true;
365369
return Err(Error::ConnectionTimeout)
366370
}
367371
}
@@ -437,18 +441,21 @@ impl<H:Handler> Connection<TcpStream, H> {
437441
/// whether at least one complete packet was read. `buffer` and
438442
/// `buffer2` are work spaces mostly used to compute keys. They
439443
/// are cleared before using, hence nothing is expected from them.
440-
fn atomic_poll(&mut self) -> Poll<(), Error> {
444+
fn atomic_poll(&mut self) -> Poll<bool, Error> {
441445

442446
try_ready!(self.pending_poll());
443447

444448
// Special case for the beginning.
445449
match std::mem::replace(&mut self.state, None) {
450+
None if self.session.0.disconnected => Ok(Async::Ready(false)),
446451
None => {
447-
Ok(Async::Ready(()))
452+
try_nb!(self.stream.get_mut().shutdown(std::net::Shutdown::Both));
453+
self.session.0.disconnected = true;
454+
Ok(Async::Ready(false))
448455
}
449456
Some(ConnectionState::WriteSshId) => {
450457
self.state = Some(ConnectionState::ReadSshId { sshid: read_ssh_id() });
451-
Ok(Async::Ready(()))
458+
Ok(Async::Ready(true))
452459
}
453460
Some(ConnectionState::ReadSshId { mut sshid }) => {
454461

@@ -479,7 +486,7 @@ impl<H:Handler> Connection<TcpStream, H> {
479486
&mut self.session.0.write_buffer);
480487
self.session.0.kex = Some(Kex::KexInit(kexinit));
481488
self.state = Some(ConnectionState::Write);
482-
Ok(Async::Ready(()))
489+
Ok(Async::Ready(true))
483490
}
484491
}
485492
},
@@ -488,24 +495,24 @@ impl<H:Handler> Connection<TcpStream, H> {
488495
self.session.flush();
489496
try_nb!(self.session.0.write_buffer.write_all(self.stream.get_mut()));
490497
self.state = Some(ConnectionState::Read);
491-
Ok(Async::Ready(()))
498+
Ok(Async::Ready(true))
492499
},
493500
Some(ConnectionState::Read) => {
494501

495502
self.state = Some(ConnectionState::Read);
496503
// In all other cases:
497504
let buf = try_nb!(self.session.0.cipher.read(&mut self.stream, &mut self.read_buffer));
498505
// Handle the transport layer.
499-
if buf[0] == msg::DISCONNECT {
506+
if buf.len() == 0 || buf[0] == msg::DISCONNECT {
500507
// transport
501-
return Ok(Async::Ready(()));
508+
return Ok(Async::Ready(false));
502509
}
503510
// If we don't disconnect, keep the state.
504511
self.state = Some(ConnectionState::Write);
505512

506513
// Handle transport layer packets.
507514
if buf[0] <= 4 {
508-
return Ok(Async::Ready(()))
515+
return Ok(Async::Ready(true))
509516
}
510517

511518
// Handle key exchange/re-exchange.
@@ -521,7 +528,7 @@ impl<H:Handler> Connection<TcpStream, H> {
521528
match kexdhdone {
522529
Ok(kexdhdone) => {
523530
self.session.0.kex = Some(Kex::KexDhDone(kexdhdone));
524-
return Ok(Async::Ready(()));
531+
return Ok(Async::Ready(true));
525532
}
526533
Err(e) => return Err(e),
527534
}
@@ -563,7 +570,7 @@ impl<H:Handler> Connection<TcpStream, H> {
563570
try_nb!(self.session.client_read_encrypted(&mut self.handler, buf, &mut self.buffer));
564571
}
565572
}
566-
Ok(Async::Ready(()))
573+
Ok(Async::Ready(true))
567574
}
568575
}
569576
}
@@ -634,6 +641,7 @@ impl<H: Handler> Future for Authenticate<TcpStream, H> {
634641
try_ready!(c.poll_timeout());
635642
}
636643
loop {
644+
debug!("authenticated loop");
637645
let is_authenticated = if let Some(ref c) = self.0 { c.is_authenticated() } else { false };
638646
if is_authenticated {
639647
return Ok(Async::Ready(std::mem::replace(&mut self.0, None).unwrap()))
@@ -675,6 +683,7 @@ impl<H: Handler, ChannelType> Future for ChannelOpen<TcpStream, H, ChannelType>
675683
}
676684

677685
loop {
686+
debug!("channelopen loop");
678687
let is_open = if let Some(ref c) = self.connection {
679688
c.channel_is_open(self.channel)
680689
} else { false };
@@ -699,6 +708,7 @@ impl<H: Handler> Future for ChannelClose<TcpStream, H> {
699708
}
700709

701710
loop {
711+
debug!("channelclose loop");
702712
let is_open = if let Some(ref c) = self.connection {
703713
c.channel_is_open(self.channel)
704714
} else { false };
@@ -722,6 +732,7 @@ impl<H: Handler> Future for Flush<TcpStream, H> {
722732
try_ready!(c.poll_timeout());
723733
}
724734
loop {
735+
debug!("flush loop");
725736
let completely_written = if let Some(ref mut c) = self.connection {
726737
try_nb!(c.session.0.write_buffer.write_all(c.stream.get_mut()))
727738
} else {

src/server/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,9 @@ pub trait Handler {
169169
user: &str,
170170
submethods: &str,
171171
response: Option<Response>)
172-
-> Self::FutureAuth;
172+
-> Self::FutureAuth {
173+
Self::FutureAuth::finished(Auth::Reject)
174+
}
173175

174176
/// Called when the client closes a channel.
175177
#[allow(unused_variables)]
@@ -502,8 +504,7 @@ impl<H: Handler> Future for Connection<TcpStream, H> {
502504
}
503505
}
504506
debug!("polling, state = {:?}", self.state);
505-
try_ready!(self.atomic_poll());
506-
if self.state.is_none() {
507+
if let Status::Disconnect = try_ready!(self.atomic_poll()) {
507508
return Ok(Async::Ready(()))
508509
}
509510
}
@@ -601,7 +602,7 @@ impl<H:Handler> Connection<TcpStream, H> {
601602
debug!("connection state: {:?}", self.state);
602603

603604
match std::mem::replace(&mut self.state, None) {
604-
None if self.session.0.disconnected => Ok(Async::Ready(Status::Ok)),
605+
None if self.session.0.disconnected => Ok(Async::Ready(Status::Disconnect)),
605606
None => {
606607
try_nb!(self.stream.get_mut().shutdown(std::net::Shutdown::Both));
607608
self.session.0.disconnected = true;
@@ -661,7 +662,7 @@ impl<H:Handler> Connection<TcpStream, H> {
661662
let buf = try_nb!(self.session.0.cipher.read(&mut self.stream, &mut self.read_buffer));
662663
debug!("read buf = {:?}", buf);
663664
// Handle the transport layer.
664-
if buf[0] == msg::DISCONNECT {
665+
if buf.len() == 0 || buf[0] == msg::DISCONNECT {
665666
// transport
666667
self.state = None;
667668
debug!("Received disconnect, shutdown");

src/sshbuffer.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ pub struct SSHBuffer {
2323
pub len: usize, // next packet length.
2424
pub bytes: usize,
2525

26+
pub len_bytes: [u8;4],
27+
pub read_len_bytes: usize,
28+
2629
// Sequence numbers are on 32 bits and wrap.
2730
// https://tools.ietf.org/html/rfc4253#section-6.4
2831
pub seqn: Wrapping<u32>,
@@ -34,6 +37,8 @@ impl SSHBuffer {
3437
buffer: CryptoVec::new(),
3538
len: 0,
3639
bytes: 0,
40+
len_bytes: [0;4],
41+
read_len_bytes: 0,
3742
seqn: Wrapping(0),
3843
}
3944
}
@@ -46,9 +51,9 @@ impl SSHBuffer {
4651
}
4752

4853
/// Returns true iff the write buffer has been completely written.
49-
pub fn write_all<W: Write>(&mut self, stream: &mut W) -> Result<bool, Error> {
54+
pub fn write_all<W:Write>(&mut self, mut stream: W) -> Result<bool, Error> {
5055
while self.len < self.buffer.len() {
51-
let s = try!(self.buffer.write_all_from(self.len, stream));
56+
let s = try!(self.buffer.write_all_from(self.len, &mut stream));
5257
self.len += s;
5358
self.bytes += s;
5459
try!(stream.flush());

0 commit comments

Comments
 (0)