Skip to content

Commit 7168d71

Browse files
committed
Implement PortReader and ChanWriter
1 parent 4e0cb31 commit 7168d71

File tree

1 file changed

+172
-11
lines changed

1 file changed

+172
-11
lines changed

src/libstd/io/comm_adapters.rs

+172-11
Original file line numberDiff line numberDiff line change
@@ -8,30 +8,107 @@
88
// option. This file may not be copied, modified, or distributed
99
// except according to those terms.
1010

11-
use option::Option;
12-
use comm::{GenericPort, GenericChan};
11+
use prelude::*;
12+
13+
use comm::{GenericPort, GenericChan, GenericSmartChan};
14+
use cmp;
15+
use io;
16+
use option::{None, Option, Some};
1317
use super::{Reader, Writer};
18+
use vec::{bytes, CopyableVector, MutableVector, ImmutableVector};
1419

15-
pub struct PortReader<P>;
20+
/// Allows reading from a port.
21+
///
22+
/// # Example
23+
///
24+
/// ```
25+
/// let reader = PortReader::new(port);
26+
///
27+
/// let mut buf = ~[0u8, ..100];
28+
/// match reader.read(buf) {
29+
/// Some(nread) => println!("Read {} bytes", nread),
30+
/// None => println!("At the end of the stream!")
31+
/// }
32+
/// ```
33+
pub struct PortReader<P> {
34+
priv buf: Option<~[u8]>, // A buffer of bytes received but not consumed.
35+
priv pos: uint, // How many of the buffered bytes have already be consumed.
36+
priv port: P, // The port to pull data from.
37+
priv closed: bool, // Whether the pipe this port connects to has been closed.
38+
}
1639

1740
impl<P: GenericPort<~[u8]>> PortReader<P> {
18-
pub fn new(_port: P) -> PortReader<P> { fail!() }
41+
pub fn new(port: P) -> PortReader<P> {
42+
PortReader {
43+
buf: None,
44+
pos: 0,
45+
port: port,
46+
closed: false,
47+
}
48+
}
1949
}
2050

2151
impl<P: GenericPort<~[u8]>> Reader for PortReader<P> {
22-
fn read(&mut self, _buf: &mut [u8]) -> Option<uint> { fail!() }
52+
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
53+
let mut num_read = 0;
54+
loop {
55+
match self.buf {
56+
Some(ref prev) => {
57+
let dst = buf.mut_slice_from(num_read);
58+
let src = prev.slice_from(self.pos);
59+
let count = cmp::min(dst.len(), src.len());
60+
bytes::copy_memory(dst, src, count);
61+
num_read += count;
62+
self.pos += count;
63+
},
64+
None => (),
65+
};
66+
if num_read == buf.len() || self.closed {
67+
break;
68+
}
69+
self.pos = 0;
70+
self.buf = self.port.try_recv();
71+
self.closed = self.buf.is_none();
72+
}
73+
if self.closed && num_read == 0 {
74+
io::io_error::cond.raise(io::standard_error(io::EndOfFile));
75+
None
76+
} else {
77+
Some(num_read)
78+
}
79+
}
2380

24-
fn eof(&mut self) -> bool { fail!() }
81+
fn eof(&mut self) -> bool { self.closed }
2582
}
2683

27-
pub struct ChanWriter<C>;
84+
/// Allows writing to a chan.
85+
///
86+
/// # Example
87+
///
88+
/// ```
89+
/// let writer = ChanWriter::new(chan);
90+
/// writer.write("hello, world".as_bytes());
91+
/// ```
92+
pub struct ChanWriter<C> {
93+
chan: C,
94+
}
2895

29-
impl<C: GenericChan<~[u8]>> ChanWriter<C> {
30-
pub fn new(_chan: C) -> ChanWriter<C> { fail!() }
96+
impl<C: GenericSmartChan<~[u8]>> ChanWriter<C> {
97+
pub fn new(chan: C) -> ChanWriter<C> {
98+
ChanWriter { chan: chan }
99+
}
31100
}
32101

33-
impl<C: GenericChan<~[u8]>> Writer for ChanWriter<C> {
34-
fn write(&mut self, _buf: &[u8]) { fail!() }
102+
impl<C: GenericSmartChan<~[u8]>> Writer for ChanWriter<C> {
103+
fn write(&mut self, buf: &[u8]) {
104+
if !self.chan.try_send(buf.to_owned()) {
105+
io::io_error::cond.raise(io::IoError {
106+
kind: io::BrokenPipe,
107+
desc: "Pipe closed",
108+
detail: None
109+
});
110+
}
111+
}
35112
}
36113

37114
pub struct ReaderPort<R>;
@@ -55,3 +132,87 @@ impl<W: Writer> WriterChan<W> {
55132
impl<W: Writer> GenericChan<~[u8]> for WriterChan<W> {
56133
fn send(&self, _x: ~[u8]) { fail!() }
57134
}
135+
136+
137+
#[cfg(test)]
138+
mod test {
139+
use prelude::*;
140+
use super::*;
141+
use io;
142+
use comm;
143+
use task;
144+
145+
#[test]
146+
fn test_port_reader() {
147+
let (port, chan) = comm::stream();
148+
do task::spawn {
149+
chan.send(~[1u8, 2u8]);
150+
chan.send(~[]);
151+
chan.send(~[3u8, 4u8]);
152+
chan.send(~[5u8, 6u8]);
153+
chan.send(~[7u8, 8u8]);
154+
}
155+
156+
let mut reader = PortReader::new(port);
157+
let mut buf = ~[0u8, ..3];
158+
159+
assert_eq!(false, reader.eof());
160+
161+
assert_eq!(Some(0), reader.read(~[]));
162+
assert_eq!(false, reader.eof());
163+
164+
assert_eq!(Some(3), reader.read(buf));
165+
assert_eq!(false, reader.eof());
166+
assert_eq!(~[1,2,3], buf);
167+
168+
assert_eq!(Some(3), reader.read(buf));
169+
assert_eq!(false, reader.eof());
170+
assert_eq!(~[4,5,6], buf);
171+
172+
assert_eq!(Some(2), reader.read(buf));
173+
assert_eq!(~[7,8,6], buf);
174+
assert_eq!(true, reader.eof());
175+
176+
let mut err = None;
177+
let result = io::io_error::cond.trap(|io::standard_error(k, _, _)| {
178+
err = Some(k)
179+
}).inside(|| {
180+
reader.read(buf)
181+
});
182+
assert_eq!(Some(io::EndOfFile), err);
183+
assert_eq!(None, result);
184+
assert_eq!(true, reader.eof());
185+
assert_eq!(~[7,8,6], buf);
186+
187+
// Ensure it continues to fail in the same way.
188+
err = None;
189+
let result = io::io_error::cond.trap(|io::standard_error(k, _, _)| {
190+
err = Some(k)
191+
}).inside(|| {
192+
reader.read(buf)
193+
});
194+
assert_eq!(Some(io::EndOfFile), err);
195+
assert_eq!(None, result);
196+
assert_eq!(true, reader.eof());
197+
assert_eq!(~[7,8,6], buf);
198+
}
199+
200+
#[test]
201+
fn test_chan_writer() {
202+
let (port, chan) = comm::stream();
203+
let mut writer = ChanWriter::new(chan);
204+
writer.write_be_u32(42);
205+
206+
let wanted = ~[0u8, 0u8, 0u8, 42u8];
207+
let got = do task::try { port.recv() }.unwrap();
208+
assert_eq!(wanted, got);
209+
210+
let mut err = None;
211+
io::io_error::cond.trap(|io::IoError { kind, .. } | {
212+
err = Some(kind)
213+
}).inside(|| {
214+
writer.write_u8(1)
215+
});
216+
assert_eq!(Some(io::BrokenPipe), err);
217+
}
218+
}

0 commit comments

Comments
 (0)