diff --git a/src/client.rs b/src/client.rs index c980b59d9..c9ab889b3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,7 +3,7 @@ use proto::{self, Connection}; use error::Reason::*; use http::{self, Request, Response}; -use futures::{Future, Poll, Sink, AsyncSink}; +use futures::{self, Future, Poll, Sink, AsyncSink}; use tokio_io::{AsyncRead, AsyncWrite}; use bytes::{Bytes, IntoBuf}; @@ -23,12 +23,21 @@ pub struct Client { connection: Connection, } -/// Client half of an active HTTP/2.0 stream. #[derive(Debug)] pub struct Stream { inner: proto::StreamRef, } +#[derive(Debug)] +pub struct Body { + inner: proto::StreamRef, +} + +#[derive(Debug)] +pub struct Chunk { + inner: proto::Chunk, +} + impl Client where T: AsyncRead + AsyncWrite + 'static, { @@ -140,15 +149,18 @@ impl fmt::Debug for Handshake impl Stream { /// Receive the HTTP/2.0 response, if it is ready. - pub fn poll_response(&mut self) -> Poll, ConnectionError> { - self.inner.poll_response() + pub fn poll_response(&mut self) -> Poll>, ConnectionError> { + let (parts, _) = try_ready!(self.inner.poll_response()).into_parts(); + let body = Body { inner: self.inner.clone() }; + + Ok(Response::from_parts(parts, body).into()) } /// Send data pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), ConnectionError> { - unimplemented!(); + self.inner.send_data(data.into_buf(), end_of_stream) } /// Send trailers @@ -160,7 +172,7 @@ impl Stream { } impl Future for Stream { - type Item = Response<()>; + type Item = Response>; type Error = ConnectionError; fn poll(&mut self) -> Poll { @@ -168,6 +180,28 @@ impl Future for Stream { } } +// ===== impl Body ===== + +impl futures::Stream for Body { + type Item = Chunk; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll, Self::Error> { + let chunk = try_ready!(self.inner.poll_data()) + .map(|inner| Chunk { inner }); + + Ok(chunk.into()) + } +} + +// ===== impl Chunk ===== + +impl Chunk { + pub fn pop_bytes(&mut self) -> Option { + self.inner.pop_bytes() + } +} + // ===== impl Peer ===== impl proto::Peer for Peer { diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 2480a71ba..b7a8022e2 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -67,6 +67,15 @@ pub enum Frame { } impl Frame { + /// Returns true if the frame is a DATA frame. + pub fn is_data(&self) -> bool { + use self::Frame::*; + + match *self { + Data(..) => true, + _ => false, + } + } } impl fmt::Debug for Frame { diff --git a/src/proto/connection.rs b/src/proto/connection.rs index e8b3cd61f..a28ce80b1 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -127,19 +127,8 @@ impl Connection */ } Some(Data(frame)) => { - unimplemented!(); - /* trace!("recv DATA; frame={:?}", frame); - try!(self.streams.recv_data(&frame)); - - let frame = Frame::Data { - id: frame.stream_id(), - end_of_stream: frame.is_end_stream(), - data: frame.into_payload(), - }; - - return Ok(Some(frame).into()); - */ + try!(self.streams.recv_data(frame)); } Some(Reset(frame)) => { unimplemented!(); diff --git a/src/proto/mod.rs b/src/proto/mod.rs index aaca6dbc4..7c842d95f 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -6,7 +6,7 @@ mod settings; mod streams; pub use self::connection::Connection; -pub use self::streams::{Streams, StreamRef}; +pub use self::streams::{Streams, StreamRef, Chunk}; use self::framed_read::FramedRead; use self::framed_write::FramedWrite; diff --git a/src/proto/streams/buffer.rs b/src/proto/streams/buffer.rs index 8aa1c991f..077d10d39 100644 --- a/src/proto/streams/buffer.rs +++ b/src/proto/streams/buffer.rs @@ -88,4 +88,53 @@ impl Deque { None => None, } } + + pub fn take_while(&mut self, buf: &mut Buffer, mut f: F) -> Self + where F: FnMut(&Frame) -> bool + { + match self.indices { + Some(mut idxs) => { + if !f(&buf.slab[idxs.head].frame) { + return Deque::new(); + } + + let head = idxs.head; + let mut tail = idxs.head; + + loop { + let next = match buf.slab[tail].next { + Some(next) => next, + None => { + self.indices = None; + return Deque { + indices: Some(idxs), + _p: PhantomData, + }; + } + }; + + if !f(&buf.slab[next].frame) { + // Split the linked list + buf.slab[tail].next = None; + + self.indices = Some(Indices { + head: next, + tail: idxs.tail, + }); + + return Deque { + indices: Some(Indices { + head: head, + tail: tail, + }), + _p: PhantomData, + } + } + + tail = next; + } + } + None => Deque::new(), + } + } } diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index c03ea99ff..1dfe92b2f 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -8,7 +8,7 @@ mod store; mod stream; mod streams; -pub use self::streams::{Streams, StreamRef}; +pub use self::streams::{Streams, StreamRef, Chunk}; use self::buffer::Buffer; use self::flow_control::FlowControl; diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 0de3d4e0c..0c1743412 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -32,6 +32,12 @@ pub(super) struct Recv { _p: PhantomData<(P, B)>, } +#[derive(Debug)] +pub(super) struct Chunk { + /// Data frames pending receival + pub pending_recv: buffer::Deque, +} + impl Recv where P: Peer, B: Buf, @@ -67,7 +73,7 @@ impl Recv // Increment the number of remote initiated streams self.num_streams += 1; - Ok(Some(Stream::new())) + Ok(Some(Stream::new(id))) } /// Transition the stream state based on receiving headers @@ -98,7 +104,7 @@ impl Recv } pub fn recv_data(&mut self, - frame: &frame::Data, + frame: frame::Data, stream: &mut Stream) -> Result<(), ConnectionError> { @@ -130,6 +136,10 @@ impl Recv try!(stream.state.recv_close()); } + // Push the frame onto the recv buffer + stream.pending_recv.push_back(&mut self.buffer, frame.into()); + stream.notify_recv(); + Ok(()) } @@ -218,6 +228,37 @@ impl Recv Ok(().into()) } + + pub fn poll_chunk(&mut self, stream: &mut Stream) + -> Poll, ConnectionError> + { + let frames = stream.pending_recv + .take_while(&mut self.buffer, |frame| frame.is_data()); + + if frames.is_empty() { + if stream.state.is_recv_closed() { + Ok(None.into()) + } else { + stream.recv_task = Some(task::current()); + Ok(Async::NotReady) + } + } else { + Ok(Some(Chunk { + pending_recv: frames, + }).into()) + } + } + + pub fn pop_bytes(&mut self, chunk: &mut Chunk) -> Option { + match chunk.pending_recv.pop_front(&mut self.buffer) { + Some(Frame::Data(frame)) => { + Some(frame.into_payload()) + } + None => None, + _ => panic!("unexpected frame type"), + } + } + /// Send stream level window update pub fn send_stream_window_update(&mut self, streams: &mut Store, diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index a6d80316d..8650a31f4 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -67,7 +67,7 @@ impl Send /// Update state reflecting a new, locally opened stream /// /// Returns the stream state if successful. `None` if refused - pub fn open(&mut self) -> Result<(StreamId, Stream), ConnectionError> { + pub fn open(&mut self) -> Result, ConnectionError> { try!(self.ensure_can_open()); if let Some(max) = self.max_streams { @@ -76,7 +76,7 @@ impl Send } } - let ret = (self.next_stream_id, Stream::new()); + let ret = Stream::new(self.next_stream_id); // Increment the number of locally initiated streams self.num_streams += 1; @@ -106,8 +106,8 @@ impl Send } pub fn send_data(&mut self, - frame: &frame::Data, - stream: &mut Stream) + frame: frame::Data, + stream: &mut store::Ptr) -> Result<(), ConnectionError> { let sz = frame.payload().remaining(); @@ -148,6 +148,8 @@ impl Send try!(stream.state.send_close()); } + self.prioritize.queue_frame(frame.into(), stream); + Ok(()) } diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 369ac8c96..e6b681cc5 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -203,6 +203,13 @@ impl State { } } + pub fn is_recv_closed(&self) -> bool { + match self.inner { + Closed(..) | HalfClosedRemote(..) => true, + _ => false, + } + } + pub fn recv_flow_control(&mut self) -> Option<&mut FlowControl> { match self.inner { Open { ref mut remote, .. } | diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 588edcb71..d26266c4c 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -2,6 +2,9 @@ use super::*; #[derive(Debug)] pub(super) struct Stream { + /// The h2 stream identifier + pub id: StreamId, + /// Current state of the stream pub state: State, @@ -22,8 +25,9 @@ pub(super) struct Stream { } impl Stream { - pub fn new() -> Stream { + pub fn new(id: StreamId) -> Stream { Stream { + id, state: State::default(), pending_recv: buffer::Deque::new(), recv_task: None, diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 8134b804d..f79e34197 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -18,6 +18,15 @@ pub struct StreamRef { key: store::Key, } +#[derive(Debug)] +pub struct Chunk + where P: Peer, + B: Buf, +{ + inner: Arc>>, + recv: recv::Chunk, +} + /// Fields needed to manage state related to managing the set of streams. This /// is mostly split out to make ownership happy. /// @@ -103,7 +112,7 @@ impl Streams Ok(ret) } - pub fn recv_data(&mut self, frame: &frame::Data) + pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), ConnectionError> { let id = frame.stream_id(); @@ -191,6 +200,7 @@ impl Streams */ } + /* pub fn send_data(&mut self, frame: &frame::Data) -> Result<(), ConnectionError> { @@ -213,6 +223,7 @@ impl Streams Ok(()) } + */ pub fn poll_window_update(&mut self) -> Poll @@ -281,13 +292,13 @@ impl Streams let me = &mut *me; // Initialize a new stream. This fails if the connection is at capacity. - let (id, mut stream) = me.actions.send.open()?; + let mut stream = me.actions.send.open()?; // Convert the message let headers = client::Peer::convert_send_message( - id, request, end_of_stream); + stream.id, request, end_of_stream); - let mut stream = me.store.insert(id, stream); + let mut stream = me.store.insert(stream.id, stream); me.actions.send.send_headers(headers, &mut stream)?; @@ -305,6 +316,55 @@ impl Streams } } +// ===== impl StreamRef ===== + +impl StreamRef + where P: Peer, + B: Buf, +{ + pub fn send_data(&mut self, data: B, end_of_stream: bool) + -> Result<(), ConnectionError> + { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + // Create the data frame + let frame = frame::Data::from_buf(stream.id, data, end_of_stream); + + // Send the data frame + me.actions.send.send_data(frame, &mut stream)?; + + if stream.state.is_closed() { + me.actions.dec_num_streams(stream.id); + } + + Ok(()) + } + + pub fn poll_data(&mut self) -> Poll>, ConnectionError> { + let recv = { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + try_ready!(me.actions.recv.poll_chunk(&mut stream)) + }; + + // Convert to a chunk + let chunk = recv.map(|recv| { + Chunk { + inner: self.inner.clone(), + recv: recv, + } + }); + + Ok(chunk.into()) + } +} + impl StreamRef where B: Buf, { @@ -318,6 +378,47 @@ impl StreamRef } } + + +impl Clone for StreamRef { + fn clone(&self) -> Self { + StreamRef { + inner: self.inner.clone(), + key: self.key.clone(), + } + } +} + +// ===== impl Chunk ===== + +impl Chunk + where P: Peer, + B: Buf, +{ + // TODO: Come up w/ a better API + pub fn pop_bytes(&mut self) -> Option { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + me.actions.recv.pop_bytes(&mut self.recv) + } +} + +impl Drop for Chunk + where P: Peer, + B: Buf, +{ + fn drop(&mut self) { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + while let Some(_) = me.actions.recv.pop_bytes(&mut self.recv) { + } + } +} + +// ===== impl Actions ===== + impl Actions where P: Peer, B: Buf, diff --git a/tests/stream_states.rs b/tests/stream_states.rs index eef431837..224a2a342 100644 --- a/tests/stream_states.rs +++ b/tests/stream_states.rs @@ -5,8 +5,6 @@ extern crate log; pub mod support; use support::*; -use h2::Frame; - #[test] fn send_recv_headers_only() { let _ = env_logger::init(); @@ -40,7 +38,6 @@ fn send_recv_headers_only() { h2.wait().unwrap(); } -/* #[test] fn send_recv_data() { let _ = env_logger::init(); @@ -66,14 +63,42 @@ fn send_recv_data() { ]) .build(); - let h2 = client::handshake(mock).wait().expect("handshake"); + let mut h2 = Client::handshake2(mock) + .wait().unwrap(); - // Send the request - let mut request = request::Head::default(); - request.method = method::POST; - request.uri = "https://http2.akamai.com/".parse().unwrap(); - let h2 = h2.send_request(1.into(), request, false).wait().expect("send request"); + let request = Request::builder() + .method(method::POST) + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + + info!("sending request"); + let mut stream = h2.request(request, false).unwrap(); + + // Send the data + stream.send_data("hello", true).unwrap(); + + // Get the response + let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + assert_eq!(resp.status(), status::OK); + + // Take the body + let (_, body) = resp.into_parts(); + + // Wait for all the data frames to be received + let mut chunks = h2.run(body.collect()).unwrap(); + + // Only one chunk since two frames are coalesced. + assert_eq!(1, chunks.len()); + + let data = chunks[0].pop_bytes().unwrap(); + assert_eq!(data, &b"world"[..]); + + assert!(chunks[0].pop_bytes().is_none()); + + // The H2 connection is closed + h2.wait().unwrap(); + /* let b = "hello"; // Send the data @@ -102,10 +127,11 @@ fn send_recv_data() { } assert!(Stream::wait(h2).next().is_none());; + */ } #[test] -fn send_headers_recv_data() { +fn send_headers_recv_data_single_frame() { let _ = env_logger::init(); let mock = mock_io::Builder::new() @@ -123,51 +149,42 @@ fn send_headers_recv_data() { ]) .build(); - let h2 = client::handshake(mock) + let mut h2 = Client::handshake(mock) .wait().unwrap(); // Send the request - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); - let h2 = h2.send_request(1.into(), request, true).wait().unwrap(); + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()).unwrap(); - // Get the response headers - let (resp, h2) = h2.into_future().wait().unwrap(); + info!("sending request"); + let mut stream = h2.request(request, true).unwrap(); - match resp.unwrap() { - Frame::Headers { headers, .. } => { - assert_eq!(headers.status, status::OK); - } - _ => panic!("unexpected frame"), - } + let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + assert_eq!(resp.status(), status::OK); - // Get the response body - let (data, h2) = h2.into_future().wait().unwrap(); + // Take the body + let (_, body) = resp.into_parts(); - match data.unwrap() { - Frame::Data { id, data, end_of_stream, .. } => { - assert_eq!(id, 1.into()); - assert_eq!(data, &b"hello"[..]); - assert!(!end_of_stream); - } - _ => panic!("unexpected frame"), - } + // Wait for all the data frames to be received + let mut chunks = h2.run(body.collect()).unwrap(); - // Get the response body - let (data, h2) = h2.into_future().wait().unwrap(); + // Only one chunk since two frames are coalesced. + assert_eq!(1, chunks.len()); - match data.unwrap() { - Frame::Data { id, data, end_of_stream, .. } => { - assert_eq!(id, 1.into()); - assert_eq!(data, &b"world"[..]); - assert!(end_of_stream); - } - _ => panic!("unexpected frame"), - } + let data = chunks[0].pop_bytes().unwrap(); + assert_eq!(data, &b"hello"[..]); - assert!(Stream::wait(h2).next().is_none());; + let data = chunks[0].pop_bytes().unwrap(); + assert_eq!(data, &b"world"[..]); + + assert!(chunks[0].pop_bytes().is_none()); + + // The H2 connection is closed + h2.wait().unwrap(); } +/* #[test] fn send_headers_twice_with_same_stream_id() { let _ = env_logger::init();