diff --git a/src/client.rs b/src/client.rs index c980b59d9..29f302918 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,4 @@ -use {frame, ConnectionError, StreamId}; +use {frame, BodyType, ConnectionError, StreamId, WindowSize}; use proto::{self, Connection}; use error::Reason::*; @@ -76,7 +76,7 @@ impl Client /// Returns `Ready` when the connection can initialize a new HTTP 2.0 /// stream. pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - unimplemented!(); + self.connection.poll_ready() } /// Send a request on a new HTTP 2.0 stream @@ -140,10 +140,14 @@ impl fmt::Debug for Handshake impl Stream { /// Receive the HTTP/2.0 response, if it is ready. - pub fn poll_response(&mut self) -> Poll, ConnectionError> { + pub fn poll_response(&mut self) -> Poll, ConnectionError> { self.inner.poll_response() } + pub fn poll_data(&mut self, sz: WindowSize) -> Poll, ConnectionError> { + self.inner.poll_data(sz) + } + /// Send data pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), ConnectionError> @@ -160,7 +164,7 @@ impl Stream { } impl Future for Stream { - type Item = Response<()>; + type Item = Response; type Error = ConnectionError; fn poll(&mut self) -> Poll { @@ -172,7 +176,7 @@ impl Future for Stream { impl proto::Peer for Peer { type Send = Request<()>; - type Poll = Response<()>; + type Poll = Response; fn is_server() -> bool { false @@ -202,8 +206,17 @@ impl proto::Peer for Peer { } fn convert_poll_message(headers: frame::Headers) -> Result { + let body = if headers.is_end_stream() { + BodyType::Empty + } else { + BodyType::Stream + }; headers.into_response() // TODO: Is this always a protocol error? .map_err(|_| ProtocolError.into()) + .map(move |r| { + let (p, _) = r.into_parts(); + Response::from_parts(p, body) + }) } } diff --git a/src/frame/headers.rs b/src/frame/headers.rs index 81c0dee92..e0bc2e0f3 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -1,7 +1,6 @@ +use {hpack, BodyType, HeaderMap}; use super::StreamId; -use hpack; use frame::{self, Frame, Head, Kind, Error}; -use HeaderMap; use http::{self, request, response, version, uri, Method, StatusCode, Uri}; use http::{Request, Response}; @@ -200,14 +199,20 @@ impl Headers { self.flags.set_end_stream() } - pub fn into_response(self) -> http::Result> { + pub fn into_response(self) -> http::Result> { let mut b = Response::builder(); if let Some(status) = self.pseudo.status { b.status(status); } - let mut response = try!(b.body(())); + let body = if self.is_end_stream() { + BodyType::Empty + } else { + BodyType::Stream + }; + + let mut response = try!(b.body(body)); *response.headers_mut() = self.fields; Ok(response) diff --git a/src/lib.rs b/src/lib.rs index 08ac6264d..a5dbf5298 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,8 +34,10 @@ mod proto; mod frame; // pub mod server; +pub use client::Client; pub use error::{ConnectionError, Reason}; pub use frame::StreamId; +pub use proto::WindowSize; use bytes::Bytes; @@ -43,6 +45,11 @@ pub type FrameSize = u32; // TODO: remove if carllerche/http#90 lands pub type HeaderMap = http::HeaderMap; +pub enum BodyType { + Empty, + Stream, +} + /// An H2 connection frame #[derive(Debug)] pub enum Frame { diff --git a/src/proto/connection.rs b/src/proto/connection.rs index e8b3cd61f..0dad046ec 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -127,19 +127,7 @@ impl Connection */ } Some(Data(frame)) => { - unimplemented!(); - /* - trace!("recv DATA; frame={:?}", frame); - try!(self.streams.recv_data(&frame)); - - let frame = Frame::Data { - id: frame.stream_id(), - end_of_stream: frame.is_end_stream(), - data: frame.into_payload(), - }; - - return Ok(Some(frame).into()); - */ + try!(self.streams.recv_data(frame)); } Some(Reset(frame)) => { unimplemented!(); diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 0de3d4e0c..983bea7cd 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -1,4 +1,4 @@ -use {client, frame, ConnectionError}; +use {client, frame, BodyType, ConnectionError}; use proto::*; use super::*; @@ -73,7 +73,7 @@ impl Recv /// Transition the stream state based on receiving headers pub fn recv_headers(&mut self, frame: frame::Headers, - stream: &mut store::Ptr) + stream: &mut Stream) -> Result, ConnectionError> { stream.state.recv_open(self.init_window_sz, frame.is_end_stream())?; @@ -98,35 +98,43 @@ impl Recv } pub fn recv_data(&mut self, - frame: &frame::Data, + frame: frame::Data, stream: &mut Stream) -> Result<(), ConnectionError> { let sz = frame.payload().len(); + let eos = frame.is_end_stream(); if sz > MAX_WINDOW_SIZE as usize { unimplemented!(); } - let sz = sz as WindowSize; + if sz > 0 { + match stream.recv_flow_control() { + None => return Err(ProtocolError.into()), + Some(flow) => { + let sz = sz as WindowSize; - match stream.recv_flow_control() { - Some(flow) => { - // Ensure there's enough capacity on the connection before - // acting on the stream. - try!(self.flow_control.ensure_window(sz, FlowControlError)); + // Ensure there's enough capacity on the connection before + // acting on the stream. + try!(self.flow_control.ensure_window(sz, FlowControlError)); - // Claim the window on the stream - try!(flow.claim_window(sz, FlowControlError)); + // Claim the window on the stream + try!(flow.claim_window(sz, FlowControlError)); - // Claim the window on the connection. - self.flow_control.claim_window(sz, FlowControlError) - .expect("local connection flow control error"); + // Claim the window on the connection. + self.flow_control.claim_window(sz, FlowControlError) + .expect("local connection flow control error"); + } } - None => return Err(ProtocolError.into()), } - if frame.is_end_stream() { + if sz > 0 || eos { + stream.pending_recv.push_back(&mut self.buffer, frame.into()); + stream.notify_recv(); + } + + if eos { try!(stream.state.recv_close()); } @@ -255,8 +263,9 @@ impl Recv impl Recv where B: Buf, { - pub fn poll_response(&mut self, stream: &mut store::Ptr) - -> Poll, ConnectionError> { + pub fn poll_response(&mut self, stream: &mut Stream) + -> Poll, ConnectionError> + { // If the buffer is not empty, then the first frame must be a HEADERS // frame or the user violated the contract. match stream.pending_recv.pop_front(&mut self.buffer) { @@ -272,4 +281,24 @@ impl Recv } } } + + pub fn poll_data(&mut self, stream: &mut Stream, sz: WindowSize) + -> Poll, ConnectionError> + { + // TODO(ver): split frames into the proper number of bytes, returning unconsumed + // bytes onto pending_recv. + match stream.pending_recv.pop_front(&mut self.buffer) { + Some(Frame::Data(v)) => { + unimplemented!() + } + Some(f) => { + stream.pending_recv.push_back(&mut self.buffer, f); + Ok(Async::Ready(None)) + }, + None => { + stream.recv_task = Some(task::current()); + Ok(Async::NotReady) + } + } + } } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 8134b804d..33c379f19 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -1,4 +1,4 @@ -use client; +use {client, BodyType}; use proto::*; use super::*; @@ -103,7 +103,7 @@ impl Streams Ok(ret) } - pub fn recv_data(&mut self, frame: &frame::Data) + pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), ConnectionError> { let id = frame.stream_id(); @@ -308,7 +308,7 @@ impl Streams impl StreamRef where B: Buf, { - pub fn poll_response(&mut self) -> Poll, ConnectionError> { + pub fn poll_response(&mut self) -> Poll, ConnectionError> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -316,6 +316,15 @@ impl StreamRef me.actions.recv.poll_response(&mut stream) } + + pub fn poll_data(&mut self, sz: WindowSize) -> Poll, ConnectionError> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + me.actions.recv.poll_data(&mut stream, sz) + } } impl Actions @@ -323,14 +332,14 @@ impl Actions B: Buf, { fn dec_num_streams(&mut self, id: StreamId) { - if self.is_local_init(id) { + if Self::is_local_init(id) { self.send.dec_num_streams(); } else { self.recv.dec_num_streams(); } } - fn is_local_init(&self, id: StreamId) -> bool { + fn is_local_init(id: StreamId) -> bool { assert!(!id.is_zero()); P::is_server() == id.is_server_initiated() }