diff --git a/Cargo.toml b/Cargo.toml index 2d1199c5d..f21d93904 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ futures = "0.1" tokio-io = "0.1" tokio-timer = "0.1" bytes = "0.4" -http = { git = "https://github.com/carllerche/http" } +http = { git = "https://github.com/carllerche/http", branch = "uri-try-from-parts" } byteorder = "1.0" log = "0.3.8" fnv = "1.0.5" diff --git a/examples/akamai.rs b/examples/akamai.rs index 863f24966..efe609798 100644 --- a/examples/akamai.rs +++ b/examples/akamai.rs @@ -8,23 +8,29 @@ extern crate openssl; extern crate io_dump; extern crate env_logger; -use h2::client; - -use http::request; +use h2::client::Client; +use http::{method, Request}; use futures::*; use tokio_core::reactor; use tokio_core::net::TcpStream; +use std::net::ToSocketAddrs; + pub fn main() { let _ = env_logger::init(); + // Sync DNS resolution. + let addr = "http2.akamai.com:443".to_socket_addrs() + .unwrap().next().unwrap(); + + println!("ADDR: {:?}", addr); + let mut core = reactor::Core::new().unwrap();; + let handle = core.handle(); - let tcp = TcpStream::connect( - &"23.39.23.98:443".parse().unwrap(), - &core.handle()); + let tcp = TcpStream::connect(&addr, &handle); let tcp = tcp.then(|res| { use openssl::ssl::{SslMethod, SslConnectorBuilder}; @@ -46,24 +52,29 @@ pub fn main() { // Dump output to stdout let tls = io_dump::Dump::to_stdout(tls); - client::handshake(tls) + println!("Starting client handshake"); + Client::handshake(tls) }) .then(|res| { - let conn = res.unwrap(); + let mut h2 = res.unwrap(); - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); - // request.version = version::H2; + let request = Request::builder() + .method(method::GET) + .uri("https://http2.akamai.com/") + .body(()).unwrap(); - conn.send_request(1.into(), request, true) - }) - .then(|res| { - let conn = res.unwrap(); - // Get the next message - conn.for_each(|frame| { - println!("RX: {:?}", frame); - Ok(()) - }) + let stream = h2.request(request, true).unwrap(); + + let stream = stream.and_then(|response| { + let (_, body) = response.into_parts(); + + body.for_each(|chunk| { + println!("RX: {:?}", chunk); + Ok(()) + }) + }); + + h2.join(stream) }) }); diff --git a/examples/client.rs b/examples/client.rs index 7e79c5178..b371ab803 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,3 +1,4 @@ +/* extern crate h2; extern crate http; extern crate futures; @@ -59,3 +60,6 @@ pub fn main() { core.run(tcp).unwrap(); } +*/ + +pub fn main() {} diff --git a/examples/server.rs b/examples/server.rs index d82659580..7599aba08 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -1,3 +1,4 @@ +/* extern crate h2; extern crate http; extern crate futures; @@ -72,3 +73,6 @@ pub fn main() { core.run(server).unwrap(); } +*/ + +pub fn main() {} diff --git a/src/client.rs b/src/client.rs index a50408781..c9ab889b3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,7 +1,9 @@ -use {frame, proto, Peer, ConnectionError, StreamId}; +use {frame, ConnectionError, StreamId}; +use proto::{self, Connection}; +use error::Reason::*; -use http; -use futures::{Future, Poll, Sink, AsyncSink}; +use http::{self, Request, Response}; +use futures::{self, Future, Poll, Sink, AsyncSink}; use tokio_io::{AsyncRead, AsyncWrite}; use bytes::{Bytes, IntoBuf}; @@ -10,57 +12,201 @@ use std::fmt; /// In progress H2 connection binding pub struct Handshake { // TODO: unbox - inner: Box, Error = ConnectionError>>, + inner: Box, Error = ConnectionError>>, } +#[derive(Debug)] +pub(crate) struct Peer; + /// Marker type indicating a client peer +pub struct Client { + connection: Connection, +} + +#[derive(Debug)] +pub struct Stream { + inner: proto::StreamRef, +} + #[derive(Debug)] -pub struct Client; +pub struct Body { + inner: proto::StreamRef, +} -pub type Connection = super::Connection; +#[derive(Debug)] +pub struct Chunk { + inner: proto::Chunk, +} -pub fn handshake(io: T) -> Handshake +impl Client where T: AsyncRead + AsyncWrite + 'static, { - handshake2(io) + pub fn handshake(io: T) -> Handshake { + Client::handshake2(io) + } } -/// Bind an H2 client connection. -/// -/// Returns a future which resolves to the connection value once the H2 -/// handshake has been completed. -pub fn handshake2(io: T) -> Handshake +impl Client + // TODO: Get rid of 'static where T: AsyncRead + AsyncWrite + 'static, B: IntoBuf + 'static, { - use tokio_io::io; + /// Bind an H2 client connection. + /// + /// Returns a future which resolves to the connection value once the H2 + /// handshake has been completed. + pub fn handshake2(io: T) -> Handshake { + use tokio_io::io; + + debug!("binding client connection"); + + let handshake = io::write_all(io, b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") + .map_err(ConnectionError::from) + .and_then(|(io, _)| { + debug!("client connection bound"); + + let mut framed_write = proto::framed_write(io); + let settings = frame::Settings::default(); + + // Send initial settings frame + match framed_write.start_send(settings.into()) { + Ok(AsyncSink::Ready) => { + let conn = proto::from_framed_write(framed_write); + Ok(Client { connection: conn }) + } + Ok(_) => unreachable!(), + Err(e) => Err(ConnectionError::from(e)), + } + }); - debug!("binding client connection"); + Handshake { inner: Box::new(handshake) } + } - let handshake = io::write_all(io, b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") - .map_err(ConnectionError::from) - .and_then(|(io, _)| { - debug!("client connection bound"); + /// Returns `Ready` when the connection can initialize a new HTTP 2.0 + /// stream. + pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + unimplemented!(); + } - let mut framed_write = proto::framed_write(io); - let settings = frame::Settings::default(); + /// Send a request on a new HTTP 2.0 stream + pub fn request(&mut self, request: Request<()>, end_of_stream: bool) + -> Result, ConnectionError> + { + self.connection.send_request(request, end_of_stream) + .map(|stream| Stream { + inner: stream, + }) + } +} - // Send initial settings frame - match framed_write.start_send(settings.into()) { - Ok(AsyncSink::Ready) => { - Ok(proto::from_framed_write(framed_write)) - } - Ok(_) => unreachable!(), - Err(e) => Err(ConnectionError::from(e)), - } - }); +impl Future for Client + // TODO: Get rid of 'static + where T: AsyncRead + AsyncWrite + 'static, + B: IntoBuf + 'static, +{ + type Item = (); + type Error = ConnectionError; + + fn poll(&mut self) -> Poll<(), ConnectionError> { + self.connection.poll() + } +} + +impl fmt::Debug for Client + where T: fmt::Debug, + B: fmt::Debug + IntoBuf, + B::Buf: fmt::Debug + IntoBuf, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Client") + .field("connection", &self.connection) + .finish() + } +} + +// ===== impl Handshake ===== + +impl Future for Handshake { + type Item = Client; + type Error = ConnectionError; - Handshake { inner: Box::new(handshake) } + fn poll(&mut self) -> Poll { + self.inner.poll() + } +} + +impl fmt::Debug for Handshake + where T: fmt::Debug, + B: fmt::Debug + IntoBuf, + B::Buf: fmt::Debug + IntoBuf, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "client::Handshake") + } } -impl Peer for Client { - type Send = http::request::Head; - type Poll = http::response::Head; +// ===== impl Stream ===== + +impl Stream { + /// Receive the HTTP/2.0 response, if it is ready. + pub fn poll_response(&mut self) -> Poll>, ConnectionError> { + let (parts, _) = try_ready!(self.inner.poll_response()).into_parts(); + let body = Body { inner: self.inner.clone() }; + + Ok(Response::from_parts(parts, body).into()) + } + + /// Send data + pub fn send_data(&mut self, data: B, end_of_stream: bool) + -> Result<(), ConnectionError> + { + self.inner.send_data(data.into_buf(), end_of_stream) + } + + /// Send trailers + pub fn send_trailers(&mut self, trailers: ()) + -> Result<(), ConnectionError> + { + unimplemented!(); + } +} + +impl Future for Stream { + type Item = Response>; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll { + self.poll_response() + } +} + +// ===== impl Body ===== + +impl futures::Stream for Body { + type Item = Chunk; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll, Self::Error> { + let chunk = try_ready!(self.inner.poll_data()) + .map(|inner| Chunk { inner }); + + Ok(chunk.into()) + } +} + +// ===== impl Chunk ===== + +impl Chunk { + pub fn pop_bytes(&mut self) -> Option { + self.inner.pop_bytes() + } +} + +// ===== impl Peer ===== + +impl proto::Peer for Peer { + type Send = Request<()>; + type Poll = Response<()>; fn is_server() -> bool { false @@ -68,15 +214,12 @@ impl Peer for Client { fn convert_send_message( id: StreamId, - headers: Self::Send, + request: Self::Send, end_of_stream: bool) -> frame::Headers { - use http::request::Head; - - // Extract the components of the HTTP request - let Head { method, uri, headers, .. } = headers; + use http::request::Parts; - // TODO: Ensure that the version is set to H2 + let (Parts { method, uri, headers, .. }, _) = request.into_parts(); // Build the set pseudo header set. All requests will include `method` // and `path`. @@ -92,25 +235,9 @@ impl Peer for Client { frame } - fn convert_poll_message(headers: frame::Headers) -> Self::Poll { + fn convert_poll_message(headers: frame::Headers) -> Result { headers.into_response() - } -} - -impl Future for Handshake { - type Item = Connection; - type Error = ConnectionError; - - fn poll(&mut self) -> Poll { - self.inner.poll() - } -} - -impl fmt::Debug for Handshake - where T: fmt::Debug, - B: fmt::Debug + IntoBuf, -{ - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "client::Handshake") + // TODO: Is this always a protocol error? + .map_err(|_| ProtocolError.into()) } } diff --git a/src/frame/headers.rs b/src/frame/headers.rs index dbbf7996a..a181e18ba 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -3,7 +3,8 @@ use hpack; use frame::{self, Frame, Head, Kind, Error}; use HeaderMap; -use http::{request, response, version, uri, Method, StatusCode, Uri}; +use http::{self, request, response, version, uri, Method, StatusCode, Uri}; +use http::{Request, Response}; use http::header::{self, HeaderName, HeaderValue}; use bytes::{BytesMut, Bytes}; @@ -46,11 +47,11 @@ pub struct PushPromise { promised_id: StreamId, /// The associated flags - flags: HeadersFlag, + flags: PushPromiseFlag, } -impl PushPromise { -} +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub struct PushPromiseFlag(u8); #[derive(Debug)] pub struct Continuation { @@ -199,31 +200,28 @@ impl Headers { self.flags.set_end_stream() } - pub fn into_response(self) -> response::Head { - let mut response = response::Head::default(); + pub fn into_response(self) -> http::Result> { + let mut b = Response::builder(); if let Some(status) = self.pseudo.status { - response.status = status; - } else { - unimplemented!(); + b.status(status); } - response.headers = self.fields; - response + let mut response = try!(b.body(())); + *response.headers_mut() = self.fields; + + Ok(response) } - pub fn into_request(self) -> request::Head { - let mut request = request::Head::default(); + pub fn into_request(self) -> http::Result> { + let mut b = Request::builder(); // TODO: should we distinguish between HTTP_2 and HTTP_2C? // carllerche/http#42 - request.version = version::HTTP_2; + b.version(version::HTTP_2); if let Some(method) = self.pseudo.method { - request.method = method; - } else { - // TODO: invalid request - unimplemented!(); + b.method(method); } // Convert the URI @@ -244,12 +242,12 @@ impl Headers { parts.origin_form = Some(uri::OriginForm::try_from_shared(path.into_inner()).unwrap()); } - request.uri = parts.into(); + b.uri(parts); - // Set the header fields - request.headers = self.fields; + let mut request = try!(b.body(())); + *request.headers_mut() = self.fields; - request + Ok(request) } pub fn into_fields(self) -> HeaderMap { @@ -298,12 +296,46 @@ impl Headers { } } -impl From for Frame { - fn from(src: Headers) -> Frame { +impl From for Frame { + fn from(src: Headers) -> Self { Frame::Headers(src) } } +// ===== impl PushPromise ===== + +impl PushPromise { + pub fn load(head: Head, payload: &[u8]) + -> Result + { + let flags = PushPromiseFlag(head.flag()); + + // TODO: Handle padding + + let promised_id = StreamId::parse(&payload[..4]); + + Ok(PushPromise { + stream_id: head.stream_id(), + promised_id: promised_id, + flags: flags, + }) + } + + pub fn stream_id(&self) -> StreamId { + self.stream_id + } + + pub fn promised_id(&self) -> StreamId { + self.promised_id + } +} + +impl From for Frame { + fn from(src: PushPromise) -> Self { + Frame::PushPromise(src) + } +} + // ===== impl Pseudo ===== impl Pseudo { diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 2480a71ba..b7a8022e2 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -67,6 +67,15 @@ pub enum Frame { } impl Frame { + /// Returns true if the frame is a DATA frame. + pub fn is_data(&self) -> bool { + use self::Frame::*; + + match *self { + Data(..) => true, + _ => false, + } + } } impl fmt::Debug for Frame { diff --git a/src/frame/stream_id.rs b/src/frame/stream_id.rs index 6150bef76..417a1e065 100644 --- a/src/frame/stream_id.rs +++ b/src/frame/stream_id.rs @@ -39,6 +39,10 @@ impl StreamId { pub fn is_zero(&self) -> bool { self.0 == 0 } + + pub fn increment(&mut self) { + self.0 += 2; + } } impl From for StreamId { diff --git a/src/hpack/decoder.rs b/src/hpack/decoder.rs index d6cde3b13..70e3212b0 100644 --- a/src/hpack/decoder.rs +++ b/src/hpack/decoder.rs @@ -495,29 +495,29 @@ impl From for DecoderError { } } -impl From for DecoderError { - fn from(_: header::InvalidValueError) -> DecoderError { +impl From for DecoderError { + fn from(_: header::InvalidHeaderValue) -> DecoderError { // TODO: Better error? DecoderError::InvalidUtf8 } } -impl From for DecoderError { - fn from(_: method::FromBytesError) -> DecoderError { +impl From for DecoderError { + fn from(_: header::InvalidHeaderName) -> DecoderError { // TODO: Better error DecoderError::InvalidUtf8 } } -impl From for DecoderError { - fn from(_: header::FromBytesError) -> DecoderError { +impl From for DecoderError { + fn from(_: method::InvalidMethod) -> DecoderError { // TODO: Better error DecoderError::InvalidUtf8 } } -impl From for DecoderError { - fn from(_: status::FromStrError) -> DecoderError { +impl From for DecoderError { + fn from(_: status::InvalidStatusCode) -> DecoderError { // TODO: Better error DecoderError::InvalidUtf8 } diff --git a/src/hpack/test/fuzz.rs b/src/hpack/test/fuzz.rs index 80adbec7f..87515305f 100644 --- a/src/hpack/test/fuzz.rs +++ b/src/hpack/test/fuzz.rs @@ -251,7 +251,6 @@ fn gen_header_name(g: &mut StdRng) -> HeaderName { header::ACCEPT_CHARSET, header::ACCEPT_ENCODING, header::ACCEPT_LANGUAGE, - header::ACCEPT_PATCH, header::ACCEPT_RANGES, header::ACCESS_CONTROL_ALLOW_CREDENTIALS, header::ACCESS_CONTROL_ALLOW_HEADERS, @@ -272,7 +271,6 @@ fn gen_header_name(g: &mut StdRng) -> HeaderName { header::CONTENT_LANGUAGE, header::CONTENT_LENGTH, header::CONTENT_LOCATION, - header::CONTENT_MD5, header::CONTENT_RANGE, header::CONTENT_SECURITY_POLICY, header::CONTENT_SECURITY_POLICY_REPORT_ONLY, @@ -292,7 +290,6 @@ fn gen_header_name(g: &mut StdRng) -> HeaderName { header::IF_RANGE, header::IF_UNMODIFIED_SINCE, header::LAST_MODIFIED, - header::KEEP_ALIVE, header::LINK, header::LOCATION, header::MAX_FORWARDS, @@ -311,10 +308,8 @@ fn gen_header_name(g: &mut StdRng) -> HeaderName { header::SET_COOKIE, header::STRICT_TRANSPORT_SECURITY, header::TE, - header::TK, header::TRAILER, header::TRANSFER_ENCODING, - header::TSV, header::USER_AGENT, header::UPGRADE, header::UPGRADE_INSECURE_REQUESTS, diff --git a/src/lib.rs b/src/lib.rs index b4fdf4deb..08ac6264d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ -// #![allow(warnings)] -#![deny(missing_debug_implementations)] +#![allow(warnings)] +// #![deny(missing_debug_implementations)] #[macro_use] extern crate futures; @@ -20,6 +20,8 @@ extern crate fnv; extern crate byteorder; +extern crate slab; + #[macro_use] extern crate log; @@ -30,11 +32,10 @@ pub mod error; mod hpack; mod proto; mod frame; -pub mod server; +// pub mod server; pub use error::{ConnectionError, Reason}; pub use frame::StreamId; -pub use proto::Connection; use bytes::Bytes; @@ -68,23 +69,3 @@ pub enum Frame { error: Reason, }, } - -/// Either a Client or a Server -pub trait Peer { - /// Message type sent into the transport - type Send; - - /// Message type polled from the transport - type Poll; - - fn is_server() -> bool; - - #[doc(hidden)] - fn convert_send_message( - id: StreamId, - headers: Self::Send, - end_of_stream: bool) -> frame::Headers; - - #[doc(hidden)] - fn convert_poll_message(headers: frame::Headers) -> Self::Poll; -} diff --git a/src/proto/connection.rs b/src/proto/connection.rs index b9f70b2fd..7ae9139d5 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,12 +1,10 @@ -use {ConnectionError, Frame, Peer}; +use {client, ConnectionError, Frame}; use HeaderMap; use frame::{self, StreamId}; -use client::Client; -use server::Server; use proto::*; -use http::{request, response}; +use http::{Request, Response}; use bytes::{Bytes, IntoBuf}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -21,39 +19,34 @@ pub struct Connection { // TODO: Remove ping_pong: PingPong, settings: Settings, - streams: Streams

, + streams: Streams, _phantom: PhantomData

, } -pub fn new(codec: Codec) - -> Connection +impl Connection where T: AsyncRead + AsyncWrite, P: Peer, B: IntoBuf, { - // TODO: Actually configure - let streams = Streams::new(streams::Config { - max_remote_initiated: None, - init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, - max_local_initiated: None, - init_local_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, - }); - - Connection { - codec: codec, - ping_pong: PingPong::new(), - settings: Settings::new(), - streams: streams, - _phantom: PhantomData, + pub fn new(codec: Codec) -> Connection { + // TODO: Actually configure + let streams = Streams::new(streams::Config { + max_remote_initiated: None, + init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, + max_local_initiated: None, + init_local_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, + }); + + Connection { + codec: codec, + ping_pong: PingPong::new(), + settings: Settings::new(), + streams: streams, + _phantom: PhantomData, + } } -} -impl Connection - where T: AsyncRead + AsyncWrite, - P: Peer, - B: IntoBuf, -{ /// Polls for the next update to a remote flow control window. pub fn poll_window_update(&mut self) -> Poll { self.streams.poll_window_update() @@ -87,6 +80,7 @@ impl Connection unimplemented!(); } + /// Returns `Ready` when the connection is ready to receive a frame. pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { try_ready!(self.poll_send_ready()); @@ -96,66 +90,18 @@ impl Connection Ok(().into()) } - pub fn send_data(self, - id: StreamId, - data: B, - end_of_stream: bool) - -> sink::Send - { - self.send(Frame::Data { - id, - data, - end_of_stream, - }) - } - - pub fn send_trailers(self, - id: StreamId, - headers: HeaderMap) - -> sink::Send - { - self.send(Frame::Trailers { - id, - headers, - }) - } - - pub fn start_ping(&mut self, _body: PingPayload) -> StartSend { - unimplemented!(); - } - - // ===== Private ===== - - /// Returns `Ready` when the `Connection` is ready to receive a frame from - /// the socket. - fn poll_recv_ready(&mut self) -> Poll<(), ConnectionError> { - // Pong, settings ack, and stream refusals are high priority frames to - // send. If the write buffer is full, we stop reading any further frames - // until these high priority writes can be committed to the buffer. - - try_ready!(self.ping_pong.send_pending_pong(&mut self.codec)); - try_ready!(self.settings.send_pending_ack(&mut self.codec)); - try_ready!(self.streams.send_pending_refusal(&mut self.codec)); - - Ok(().into()) - } - - /// Returns `Ready` when the `Connection` is ready to accept a frame from - /// the user - /// - /// This function is currently used by poll_complete, but at some point it - /// will probably not be required. - fn poll_send_ready(&mut self) -> Poll<(), ConnectionError> { - try_ready!(self.poll_recv_ready()); - - // Ensure all window updates have been sent. - try_ready!(self.streams.send_pending_window_updates(&mut self.codec)); - - Ok(().into()) + /// Advances the internal state of the connection. + pub fn poll(&mut self) -> Poll<(), ConnectionError> { + match self.poll2() { + Err(e) => { + self.streams.recv_err(&e); + Err(e) + } + ret => ret, + } } - /// Try to receive the next frame - fn recv_frame(&mut self) -> Poll>, ConnectionError> { + fn poll2(&mut self) -> Poll<(), ConnectionError> { use frame::Frame::*; loop { @@ -167,9 +113,7 @@ impl Connection let frame = match try!(self.codec.poll()) { Async::Ready(frame) => frame, Async::NotReady => { - // Receiving new frames may depend on ensuring that the write buffer - // is clear (e.g. if window updates need to be sent), so `poll_complete` - // is called here. + // Flush any pending writes let _ = try!(self.poll_complete()); return Ok(Async::NotReady); } @@ -178,39 +122,31 @@ impl Connection match frame { Some(Headers(frame)) => { trace!("recv HEADERS; frame={:?}", frame); + + if let Some(frame) = try!(self.streams.recv_headers(frame)) { + unimplemented!(); + } + + /* // Update stream state while ensuring that the headers frame // can be received. if let Some(frame) = try!(self.streams.recv_headers(frame)) { - let frame = Self::convert_poll_message(frame); + let frame = Self::convert_poll_message(frame)?; return Ok(Some(frame).into()); } + */ } Some(Data(frame)) => { trace!("recv DATA; frame={:?}", frame); - try!(self.streams.recv_data(&frame)); - - let frame = Frame::Data { - id: frame.stream_id(), - end_of_stream: frame.is_end_stream(), - data: frame.into_payload(), - }; - - return Ok(Some(frame).into()); + try!(self.streams.recv_data(frame)); } Some(Reset(frame)) => { trace!("recv RST_STREAM; frame={:?}", frame); - try!(self.streams.recv_reset(&frame)); - - let frame = Frame::Reset { - id: frame.stream_id(), - error: frame.reason(), - }; - - return Ok(Some(frame).into()); + try!(self.streams.recv_reset(frame)); } Some(PushPromise(frame)) => { trace!("recv PUSH_PROMISE; frame={:?}", frame); - try!(self.streams.recv_push_promise(frame)); + self.streams.recv_push_promise(frame)?; } Some(Settings(frame)) => { trace!("recv SETTINGS; frame={:?}", frame); @@ -219,37 +155,121 @@ impl Connection // TODO: ACK must be sent THEN settings applied. } Some(Ping(frame)) => { + unimplemented!(); + /* trace!("recv PING; frame={:?}", frame); self.ping_pong.recv_ping(frame); + */ } Some(WindowUpdate(frame)) => { + unimplemented!(); + /* trace!("recv WINDOW_UPDATE; frame={:?}", frame); try!(self.streams.recv_window_update(frame)); + */ } None => { + // TODO: Is this correct? trace!("codec closed"); - return Ok(Async::Ready(None)); + return Ok(Async::Ready(())); } } } } - fn convert_poll_message(frame: frame::Headers) -> Frame { + /* + pub fn send_data(self, + id: StreamId, + data: B, + end_of_stream: bool) + -> sink::Send + { + self.send(Frame::Data { + id, + data, + end_of_stream, + }) + } + + pub fn send_trailers(self, + id: StreamId, + headers: HeaderMap) + -> sink::Send + { + self.send(Frame::Trailers { + id, + headers, + }) + } + */ + + // ===== Private ===== + + /// Returns `Ready` when the `Connection` is ready to receive a frame from + /// the socket. + fn poll_recv_ready(&mut self) -> Poll<(), ConnectionError> { + // Pong, settings ack, and stream refusals are high priority frames to + // send. If the write buffer is full, we stop reading any further frames + // until these high priority writes can be committed to the buffer. + + try_ready!(self.ping_pong.send_pending_pong(&mut self.codec)); + try_ready!(self.settings.send_pending_ack(&mut self.codec)); + try_ready!(self.streams.send_pending_refusal(&mut self.codec)); + + Ok(().into()) + } + + /// Returns `Ready` when the `Connection` is ready to accept a frame from + /// the user + /// + /// This function is currently used by poll_complete, but at some point it + /// will probably not be required. + fn poll_send_ready(&mut self) -> Poll<(), ConnectionError> { + // TODO: Is this function needed? + try_ready!(self.poll_recv_ready()); + + Ok(().into()) + } + + fn poll_complete(&mut self) -> Poll<(), ConnectionError> { + try_ready!(self.poll_send_ready()); + + // Ensure all window updates have been sent. + try_ready!(self.streams.poll_complete(&mut self.codec)); + try_ready!(self.codec.poll_complete()); + + Ok(().into()) + } + + fn convert_poll_message(frame: frame::Headers) -> Result, ConnectionError> { if frame.is_trailers() { - Frame::Trailers { + Ok(Frame::Trailers { id: frame.stream_id(), headers: frame.into_fields() - } + }) } else { - Frame::Headers { + Ok(Frame::Headers { id: frame.stream_id(), end_of_stream: frame.is_end_stream(), - headers: P::convert_poll_message(frame), - } + headers: P::convert_poll_message(frame)?, + }) } } } +impl Connection + where T: AsyncRead + AsyncWrite, + B: IntoBuf, +{ + /// Initialize a new HTTP/2.0 stream and send the message. + pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) + -> Result, ConnectionError> + { + self.streams.send_request(request, end_of_stream) + } +} + +/* impl Connection where T: AsyncRead + AsyncWrite, B: IntoBuf, @@ -296,21 +316,9 @@ impl Connection }) } } +*/ -impl Stream for Connection - where T: AsyncRead + AsyncWrite, - P: Peer, - B: IntoBuf, -{ - type Item = Frame; - type Error = ConnectionError; - - fn poll(&mut self) -> Poll, ConnectionError> { - // TODO: intercept errors and flag the connection - self.recv_frame() - } -} - +/* impl Sink for Connection where T: AsyncRead + AsyncWrite, P: Peer, @@ -384,11 +392,5 @@ impl Sink for Connection // Return success Ok(AsyncSink::Ready) } - - fn poll_complete(&mut self) -> Poll<(), ConnectionError> { - try_ready!(self.poll_send_ready()); - try_ready!(self.codec.poll_complete()); - - Ok(().into()) - } } +*/ diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index ae2d059f4..412b1541f 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -92,7 +92,9 @@ impl FramedRead { let _todo = try!(frame::GoAway::load(&bytes[frame::HEADER_LEN..])); unimplemented!(); } - Kind::PushPromise | + Kind::PushPromise => { + frame::PushPromise::load(head, &bytes[frame::HEADER_LEN..])?.into() + } Kind::Priority | Kind::Continuation | Kind::Unknown => { @@ -117,7 +119,7 @@ impl ApplySettings for FramedRead { } */ -impl Stream for FramedRead +impl futures::Stream for FramedRead where T: AsyncRead, { type Item = Frame; diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 30f2c6cfc..7c842d95f 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -6,22 +6,42 @@ mod settings; mod streams; pub use self::connection::Connection; +pub use self::streams::{Streams, StreamRef, Chunk}; use self::framed_read::FramedRead; use self::framed_write::FramedWrite; use self::ping_pong::PingPong; use self::settings::Settings; -use self::streams::Streams; -use {StreamId, Peer}; +use {StreamId, ConnectionError}; use error::Reason; -use frame::Frame; +use frame::{self, Frame}; -use futures::*; +use futures::{self, task, Poll, Async, AsyncSink, Sink, Stream as Stream2}; use bytes::{Buf, IntoBuf}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::length_delimited; +/// Either a Client or a Server +pub trait Peer { + /// Message type sent into the transport + type Send; + + /// Message type polled from the transport + type Poll; + + fn is_server() -> bool; + + #[doc(hidden)] + fn convert_send_message( + id: StreamId, + headers: Self::Send, + end_of_stream: bool) -> frame::Headers; + + #[doc(hidden)] + fn convert_poll_message(headers: frame::Headers) -> Result; +} + pub type PingPayload = [u8; 8]; pub type WindowSize = u32; @@ -69,7 +89,7 @@ pub fn from_framed_write(framed_write: FramedWrite) let codec = FramedRead::new(framed); - connection::new(codec) + Connection::new(codec) } impl WindowUpdate { diff --git a/src/proto/streams/buffer.rs b/src/proto/streams/buffer.rs new file mode 100644 index 000000000..077d10d39 --- /dev/null +++ b/src/proto/streams/buffer.rs @@ -0,0 +1,140 @@ +use frame::{self, Frame}; + +use slab::Slab; + +use std::marker::PhantomData; + +/// Buffers frames for multiple streams. +#[derive(Debug)] +pub struct Buffer { + slab: Slab>, +} + +/// A sequence of frames in a `Buffer` +#[derive(Debug)] +pub struct Deque { + indices: Option, + _p: PhantomData, +} + +/// Tracks the head & tail for a sequence of frames in a `Buffer`. +#[derive(Debug, Default, Copy, Clone)] +struct Indices { + head: usize, + tail: usize, +} + +#[derive(Debug)] +struct Slot { + frame: Frame, + next: Option, +} + +impl Buffer { + pub fn new() -> Self { + Buffer { + slab: Slab::new(), + } + } +} + +impl Deque { + pub fn new() -> Self { + Deque { + indices: None, + _p: PhantomData, + } + } + + pub fn is_empty(&self) -> bool { + self.indices.is_none() + } + + pub fn push_back(&mut self, buf: &mut Buffer, frame: Frame) { + let key = buf.slab.insert(Slot { + frame, + next: None, + }); + + match self.indices { + Some(ref mut idxs) => { + buf.slab[idxs.tail].next = Some(key); + idxs.tail = key; + } + None => { + self.indices = Some(Indices { + head: key, + tail: key, + }); + } + } + } + + pub fn pop_front(&mut self, buf: &mut Buffer) -> Option> { + match self.indices { + Some(mut idxs) => { + let mut slot = buf.slab.remove(idxs.head); + + if idxs.head == idxs.tail { + assert!(slot.next.is_none()); + self.indices = None; + } else { + idxs.head = slot.next.take().unwrap(); + self.indices = Some(idxs); + } + + return Some(slot.frame); + } + None => None, + } + } + + pub fn take_while(&mut self, buf: &mut Buffer, mut f: F) -> Self + where F: FnMut(&Frame) -> bool + { + match self.indices { + Some(mut idxs) => { + if !f(&buf.slab[idxs.head].frame) { + return Deque::new(); + } + + let head = idxs.head; + let mut tail = idxs.head; + + loop { + let next = match buf.slab[tail].next { + Some(next) => next, + None => { + self.indices = None; + return Deque { + indices: Some(idxs), + _p: PhantomData, + }; + } + }; + + if !f(&buf.slab[next].frame) { + // Split the linked list + buf.slab[tail].next = None; + + self.indices = Some(Indices { + head: next, + tail: idxs.tail, + }); + + return Deque { + indices: Some(Indices { + head: head, + tail: tail, + }), + _p: PhantomData, + } + } + + tail = next; + } + } + None => Deque::new(), + } + } +} diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 931adbccf..1dfe92b2f 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -1,43 +1,31 @@ +mod buffer; mod flow_control; +mod prioritize; mod recv; mod send; mod state; mod store; +mod stream; +mod streams; +pub use self::streams::{Streams, StreamRef, Chunk}; + +use self::buffer::Buffer; use self::flow_control::FlowControl; +use self::prioritize::Prioritize; use self::recv::Recv; use self::send::Send; use self::state::State; use self::store::{Store, Entry}; +use self::stream::Stream; -use {frame, Peer, StreamId, ConnectionError}; +use {frame, StreamId, ConnectionError}; use proto::*; use error::Reason::*; use error::User::*; -// TODO: All the VecDeques should become linked lists using the State -// values. -#[derive(Debug)] -pub struct Streams

{ - /// State related to managing the set of streams. - inner: Inner

, - - /// Streams - streams: Store, -} - -/// Fields needed to manage state related to managing the set of streams. This -/// is mostly split out to make ownership happy. -/// -/// TODO: better name -#[derive(Debug)] -struct Inner

{ - /// Manages state transitions initiated by receiving frames - recv: Recv

, - - /// Manages state transitions initiated by sending frames - send: Send

, -} +use http::{Request, Response}; +use bytes::Bytes; #[derive(Debug)] pub struct Config { @@ -53,218 +41,3 @@ pub struct Config { /// Initial window size of locally initiated streams pub init_local_window_sz: WindowSize, } - -impl Streams

{ - pub fn new(config: Config) -> Self { - Streams { - inner: Inner { - recv: Recv::new(&config), - send: Send::new(&config), - }, - streams: Store::new(), - } - } - - pub fn recv_headers(&mut self, frame: frame::Headers) - -> Result, ConnectionError> - { - let id = frame.stream_id(); - - let state = match self.streams.entry(id) { - Entry::Occupied(e) => e.into_mut(), - Entry::Vacant(e) => { - // Trailers cannot open a stream. Trailers are header frames - // that do not contain pseudo headers. Requests MUST contain a - // method and responses MUST contain a status. If they do not,t - // hey are considered to be malformed. - if frame.is_trailers() { - return Err(ProtocolError.into()); - } - - match try!(self.inner.recv.open(id)) { - Some(state) => e.insert(state), - None => return Ok(None), - } - } - }; - - if frame.is_trailers() { - if !frame.is_end_stream() { - // TODO: What error should this return? - unimplemented!(); - } - - try!(self.inner.recv.recv_eos(state)); - } else { - try!(self.inner.recv.recv_headers(state, frame.is_end_stream())); - } - - if state.is_closed() { - self.inner.dec_num_streams(id); - } - - Ok(Some(frame)) - } - - pub fn recv_data(&mut self, frame: &frame::Data) - -> Result<(), ConnectionError> - { - let id = frame.stream_id(); - - let state = match self.streams.get_mut(&id) { - Some(state) => state, - None => return Err(ProtocolError.into()), - }; - - // Ensure there's enough capacity on the connection before acting on the - // stream. - try!(self.inner.recv.recv_data(frame, state)); - - if state.is_closed() { - self.inner.dec_num_streams(id); - } - - Ok(()) - } - - pub fn recv_reset(&mut self, _frame: &frame::Reset) - -> Result<(), ConnectionError> - { - unimplemented!(); - } - - pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) - -> Result<(), ConnectionError> { - let id = frame.stream_id(); - - if id.is_zero() { - try!(self.inner.send.recv_connection_window_update(frame)); - } else { - // The remote may send window updates for streams that the local now - // considers closed. It's ok... - if let Some(state) = self.streams.get_mut(&id) { - try!(self.inner.send.recv_stream_window_update(frame, state)); - } - } - - Ok(()) - } - - pub fn recv_push_promise(&mut self, _frame: frame::PushPromise) - -> Result<(), ConnectionError> - { - unimplemented!(); - } - - pub fn send_headers(&mut self, frame: &frame::Headers) - -> Result<(), ConnectionError> - { - let id = frame.stream_id(); - - trace!("send_headers; id={:?}", id); - - let state = match self.streams.entry(id) { - Entry::Occupied(e) => e.into_mut(), - Entry::Vacant(e) => { - // Trailers cannot open a stream. Trailers are header frames - // that do not contain pseudo headers. Requests MUST contain a - // method and responses MUST contain a status. If they do not,t - // hey are considered to be malformed. - if frame.is_trailers() { - // TODO: Should this be a different error? - return Err(UnexpectedFrameType.into()); - } - - let state = try!(self.inner.send.open(id)); - e.insert(state) - } - }; - - if frame.is_trailers() { - try!(self.inner.send.send_eos(state)); - } else { - try!(self.inner.send.send_headers(state, frame.is_end_stream())); - } - - if state.is_closed() { - self.inner.dec_num_streams(id); - } - - Ok(()) - } - - pub fn send_data(&mut self, frame: &frame::Data) - -> Result<(), ConnectionError> - { - let id = frame.stream_id(); - - let state = match self.streams.get_mut(&id) { - Some(state) => state, - None => return Err(UnexpectedFrameType.into()), - }; - - // Ensure there's enough capacity on the connection before acting on the - // stream. - try!(self.inner.send.send_data(frame, state)); - - if state.is_closed() { - self.inner.dec_num_streams(id); - } - - Ok(()) - } - - pub fn poll_window_update(&mut self) - -> Poll - { - self.inner.send.poll_window_update(&mut self.streams) - } - - pub fn expand_window(&mut self, id: StreamId, sz: WindowSize) - -> Result<(), ConnectionError> - { - if id.is_zero() { - try!(self.inner.recv.expand_connection_window(sz)); - } else { - if let Some(state) = self.streams.get_mut(&id) { - try!(self.inner.recv.expand_stream_window(id, sz, state)); - } - } - - Ok(()) - } - - pub fn send_pending_refusal(&mut self, dst: &mut Codec) - -> Poll<(), ConnectionError> - where T: AsyncWrite, - B: Buf, - { - self.inner.recv.send_pending_refusal(dst) - } - - pub fn send_pending_window_updates(&mut self, dst: &mut Codec) - -> Poll<(), ConnectionError> - where T: AsyncWrite, - B: Buf, - { - try_ready!(self.inner.recv.send_connection_window_update(dst)); - try_ready!(self.inner.recv.send_stream_window_update(&mut self.streams, dst)); - - Ok(().into()) - } -} - -impl Inner

{ - fn dec_num_streams(&mut self, id: StreamId) { - if self.is_local_init(id) { - self.send.dec_num_streams(); - } else { - self.recv.dec_num_streams(); - } - } - - fn is_local_init(&self, id: StreamId) -> bool { - assert!(!id.is_zero()); - P::is_server() == id.is_server_initiated() - } -} diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs new file mode 100644 index 000000000..667bbf339 --- /dev/null +++ b/src/proto/streams/prioritize.rs @@ -0,0 +1,97 @@ +use super::*; + +#[derive(Debug)] +pub(super) struct Prioritize { + pending_send: store::List, + + /// Holds frames that are waiting to be written to the socket + buffer: Buffer, +} + +impl Prioritize + where B: Buf, +{ + pub fn new() -> Prioritize { + Prioritize { + pending_send: store::List::new(), + buffer: Buffer::new(), + } + } + + pub fn queue_frame(&mut self, + frame: Frame, + stream: &mut store::Ptr) + { + // queue the frame in the buffer + stream.pending_send.push_back(&mut self.buffer, frame); + + if stream.is_pending_send { + debug_assert!(!self.pending_send.is_empty()); + + // Already queued to have frame processed. + return; + } + + // Queue the stream + self.push_sender(stream); + } + + pub fn poll_complete(&mut self, + store: &mut Store, + dst: &mut Codec) + -> Poll<(), ConnectionError> + where T: AsyncWrite, + { + loop { + // Ensure codec is ready + try_ready!(dst.poll_ready()); + + match self.pop_frame(store) { + Some(frame) => { + // TODO: data frames should be handled specially... + let res = dst.start_send(frame)?; + + // We already verified that `dst` is ready to accept the + // write + assert!(res.is_ready()); + } + None => break, + } + } + + Ok(().into()) + } + + fn pop_frame(&mut self, store: &mut Store) -> Option> { + match self.pop_sender(store) { + Some(mut stream) => { + let frame = stream.pending_send.pop_front(&mut self.buffer).unwrap(); + + if !stream.pending_send.is_empty() { + self.push_sender(&mut stream); + } + + Some(frame) + } + None => None, + } + } + + fn push_sender(&mut self, stream: &mut store::Ptr) { + debug_assert!(!stream.is_pending_send); + + self.pending_send.push(stream); + + stream.is_pending_send = true; + } + + fn pop_sender<'a>(&mut self, store: &'a mut Store) -> Option> { + match self.pending_send.pop(store) { + Some(mut stream) => { + stream.is_pending_send = false; + Some(stream) + } + None => None, + } + } +} diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 8dd60745a..cfb813f70 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -1,4 +1,4 @@ -use {frame, Peer, ConnectionError}; +use {client, frame, ConnectionError}; use proto::*; use super::*; @@ -8,7 +8,7 @@ use std::collections::VecDeque; use std::marker::PhantomData; #[derive(Debug)] -pub struct Recv

{ +pub(super) struct Recv { /// Maximum number of remote initiated streams max_streams: Option, @@ -21,15 +21,35 @@ pub struct Recv

{ /// Connection level flow control governing received data flow_control: FlowControl, + /// Streams that have pending window updates + /// TODO: don't use a VecDeque pending_window_updates: VecDeque, + /// Holds frames that are waiting to be read + buffer: Buffer, + /// Refused StreamId, this represents a frame that must be sent out. refused: Option, - _p: PhantomData

, + _p: PhantomData<(P, B)>, +} + +#[derive(Debug)] +pub(super) struct Chunk { + /// Data frames pending receival + pub pending_recv: buffer::Deque, +} + +#[derive(Debug, Clone, Copy)] +struct Indices { + head: store::Key, + tail: store::Key, } -impl Recv

{ +impl Recv + where P: Peer, + B: Buf, +{ pub fn new(config: &Config) -> Self { Recv { max_streams: config.max_remote_initiated, @@ -37,6 +57,7 @@ impl Recv

{ init_window_sz: config.init_remote_window_sz, flow_control: FlowControl::new(config.init_remote_window_sz), pending_window_updates: VecDeque::new(), + buffer: Buffer::new(), refused: None, _p: PhantomData, } @@ -45,40 +66,58 @@ impl Recv

{ /// Update state reflecting a new, remotely opened stream /// /// Returns the stream state if successful. `None` if refused - pub fn open(&mut self, id: StreamId) -> Result, ConnectionError> { + pub fn open(&mut self, id: StreamId) -> Result>, ConnectionError> { assert!(self.refused.is_none()); try!(self.ensure_can_open(id)); - if let Some(max) = self.max_streams { - if max <= self.num_streams { - self.refused = Some(id); - return Ok(None); - } + if !self.can_inc_num_streams() { + self.refused = Some(id); + return Ok(None); } - // Increment the number of remote initiated streams - self.num_streams += 1; - - Ok(Some(State::default())) + Ok(Some(Stream::new(id))) } /// Transition the stream state based on receiving headers - pub fn recv_headers(&mut self, state: &mut State, eos: bool) - -> Result<(), ConnectionError> + pub fn recv_headers(&mut self, + frame: frame::Headers, + stream: &mut store::Ptr) + -> Result, ConnectionError> { - state.recv_open(self.init_window_sz, eos) + let is_initial = stream.state.recv_open(self.init_window_sz, frame.is_end_stream())?; + + if is_initial { + if !self.can_inc_num_streams() { + unimplemented!(); + } + + // Increment the number of concurrent streams + self.inc_num_streams(); + } + + // Only servers can receive a headers frame that initiates the stream. + // This is verified in `Streams` before calling this function. + if P::is_server() { + Ok(Some(frame)) + } else { + // Push the frame onto the recv buffer + stream.pending_recv.push_back(&mut self.buffer, frame.into()); + stream.notify_recv(); + + Ok(None) + } } - pub fn recv_eos(&mut self, state: &mut State) + pub fn recv_eos(&mut self, stream: &mut Stream) -> Result<(), ConnectionError> { - state.recv_close() + stream.state.recv_close() } pub fn recv_data(&mut self, - frame: &frame::Data, - state: &mut State) + frame: frame::Data, + stream: &mut store::Ptr) -> Result<(), ConnectionError> { let sz = frame.payload().len(); @@ -89,7 +128,7 @@ impl Recv

{ let sz = sz as WindowSize; - match state.recv_flow_control() { + match stream.recv_flow_control() { Some(flow) => { // Ensure there's enough capacity on the connection before // acting on the stream. @@ -106,12 +145,97 @@ impl Recv

{ } if frame.is_end_stream() { - try!(state.recv_close()); + try!(stream.state.recv_close()); + } + + // Push the frame onto the recv buffer + stream.pending_recv.push_back(&mut self.buffer, frame.into()); + stream.notify_recv(); + + Ok(()) + } + + pub fn recv_push_promise(&mut self, frame: frame::PushPromise, stream: &mut store::Ptr) + -> Result<(), ConnectionError> + { + // First, make sure that the values are legit + self.ensure_can_reserve(frame.promised_id())?; + + // Make sure that the stream state is valid + stream.state.ensure_recv_open()?; + + // TODO: Streams in the reserved states do not count towards the concurrency + // limit. However, it seems like there should be a cap otherwise this + // could grow in memory indefinitely. + + /* + if !self.inc_num_streams() { + self.refused = Some(frame.promised_id()); + return Ok(()); } + */ + + // TODO: All earlier stream IDs should be implicitly closed. + + // Now, create a new entry for the stream + let mut new_stream = Stream::new(frame.promised_id()); + new_stream.state.reserve_remote(); + + let mut ppp = stream.pending_push_promises.take(); + + { + // Store the stream + let mut new_stream = stream.store() + .insert(frame.promised_id(), new_stream); + + ppp.push(&mut new_stream); + } + + stream.pending_push_promises = ppp; + stream.notify_recv(); + + Ok(()) + } + + pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) + -> Result<(), ConnectionError> + { + let err = ConnectionError::Proto(frame.reason()); + // Notify the stream + stream.state.recv_err(&err); + stream.notify_recv(); Ok(()) } + pub fn recv_err(&mut self, err: &ConnectionError, stream: &mut Stream) { + // Receive an error + stream.state.recv_err(err); + + // If a receiver is waiting, notify it + stream.notify_recv(); + } + + /// Returns true if the current stream concurrency can be incremetned + fn can_inc_num_streams(&self) -> bool { + if let Some(max) = self.max_streams { + max > self.num_streams + } else { + true + } + } + + /// Increments the number of concurrenty streams. Panics on failure as this + /// should have been validated before hand. + fn inc_num_streams(&mut self) { + if !self.can_inc_num_streams() { + panic!(); + } + + // Increment the number of remote initiated streams + self.num_streams += 1; + } + pub fn dec_num_streams(&mut self) { self.num_streams -= 1; } @@ -132,11 +256,25 @@ impl Recv

{ Ok(()) } + /// Returns true if the remote peer can reserve a stream with the given ID. + fn ensure_can_reserve(&self, promised_id: StreamId) -> Result<(), ConnectionError> { + // TODO: Are there other rules? + if P::is_server() { + // The remote is a client and cannot reserve + return Err(ProtocolError.into()); + } + + if !promised_id.is_server_initiated() { + return Err(ProtocolError.into()); + } + + Ok(()) + } + /// Send any pending refusals. - pub fn send_pending_refusal(&mut self, dst: &mut Codec) + pub fn send_pending_refusal(&mut self, dst: &mut Codec) -> Poll<(), ConnectionError> where T: AsyncWrite, - B: Buf, { if let Some(stream_id) = self.refused.take() { let frame = frame::Reset::new(stream_id, RefusedStream); @@ -168,11 +306,11 @@ impl Recv

{ pub fn expand_stream_window(&mut self, id: StreamId, sz: WindowSize, - state: &mut State) + stream: &mut store::Ptr) -> Result<(), ConnectionError> { // TODO: handle overflow - if let Some(flow) = state.recv_flow_control() { + if let Some(flow) = stream.recv_flow_control() { flow.expand_window(sz); self.pending_window_updates.push_back(id); } @@ -181,10 +319,9 @@ impl Recv

{ } /// Send connection level window update - pub fn send_connection_window_update(&mut self, dst: &mut Codec) + pub fn send_connection_window_update(&mut self, dst: &mut Codec) -> Poll<(), ConnectionError> where T: AsyncWrite, - B: Buf, { if let Some(incr) = self.flow_control.peek_window_update() { let frame = frame::WindowUpdate::new(StreamId::zero(), incr); @@ -199,17 +336,47 @@ impl Recv

{ Ok(().into()) } + + pub fn poll_chunk(&mut self, stream: &mut Stream) + -> Poll, ConnectionError> + { + let frames = stream.pending_recv + .take_while(&mut self.buffer, |frame| frame.is_data()); + + if frames.is_empty() { + if stream.state.is_recv_closed() { + Ok(None.into()) + } else { + stream.recv_task = Some(task::current()); + Ok(Async::NotReady) + } + } else { + Ok(Some(Chunk { + pending_recv: frames, + }).into()) + } + } + + pub fn pop_bytes(&mut self, chunk: &mut Chunk) -> Option { + match chunk.pending_recv.pop_front(&mut self.buffer) { + Some(Frame::Data(frame)) => { + Some(frame.into_payload()) + } + None => None, + _ => panic!("unexpected frame type"), + } + } + /// Send stream level window update - pub fn send_stream_window_update(&mut self, - streams: &mut Store, - dst: &mut Codec) + pub fn send_stream_window_update(&mut self, + streams: &mut Store, + dst: &mut Codec) -> Poll<(), ConnectionError> where T: AsyncWrite, - B: Buf, { while let Some(id) = self.pending_window_updates.pop_front() { - let flow = streams.get_mut(&id) - .and_then(|state| state.recv_flow_control()); + let flow = streams.find_mut(&id) + .and_then(|stream| stream.into_mut().recv_flow_control()); if let Some(flow) = flow { @@ -233,3 +400,27 @@ impl Recv

{ unimplemented!(); } } + +impl Recv + where B: Buf, +{ + pub fn poll_response(&mut self, stream: &mut store::Ptr) + -> Poll, ConnectionError> { + // If the buffer is not empty, then the first frame must be a HEADERS + // frame or the user violated the contract. + match stream.pending_recv.pop_front(&mut self.buffer) { + Some(Frame::Headers(v)) => { + // TODO: This error should probably be caught on receipt of the + // frame vs. now. + Ok(client::Peer::convert_poll_message(v)?.into()) + } + Some(frame) => unimplemented!(), + None => { + stream.state.ensure_recv_open()?; + + stream.recv_task = Some(task::current()); + Ok(Async::NotReady) + } + } + } +} diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 875fa0652..de7bc6c7b 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -1,4 +1,4 @@ -use {frame, Peer, ConnectionError}; +use {frame, ConnectionError}; use proto::*; use super::*; @@ -10,13 +10,16 @@ use std::collections::VecDeque; use std::marker::PhantomData; #[derive(Debug)] -pub struct Send

{ +pub(super) struct Send { /// Maximum number of locally initiated streams max_streams: Option, /// Current number of locally initiated streams num_streams: usize, + /// Stream identifier to use for next initialized stream. + next_stream_id: StreamId, + /// Initial window size of locally initiated streams init_window_sz: WindowSize, @@ -27,6 +30,8 @@ pub struct Send

{ // XXX It would be cool if this didn't exist. pending_window_updates: VecDeque, + prioritize: Prioritize, + /// When `poll_window_update` is not ready, then the calling task is saved to /// be notified later. Access to poll_window_update must not be shared across tasks, /// as we only track a single task (and *not* i.e. a task per stream id). @@ -35,13 +40,24 @@ pub struct Send

{ _p: PhantomData

, } -impl Send

{ +impl Send + where P: Peer, + B: Buf, +{ pub fn new(config: &Config) -> Self { + let next_stream_id = if P::is_server() { + 2 + } else { + 1 + }; + Send { max_streams: config.max_local_initiated, num_streams: 0, + next_stream_id: next_stream_id.into(), init_window_sz: config.init_local_window_sz, flow_control: FlowControl::new(config.init_local_window_sz), + prioritize: Prioritize::new(), pending_window_updates: VecDeque::new(), blocked: None, _p: PhantomData, @@ -51,8 +67,8 @@ impl Send

{ /// Update state reflecting a new, locally opened stream /// /// Returns the stream state if successful. `None` if refused - pub fn open(&mut self, id: StreamId) -> Result { - try!(self.ensure_can_open(id)); + pub fn open(&mut self) -> Result, ConnectionError> { + try!(self.ensure_can_open()); if let Some(max) = self.max_streams { if max <= self.num_streams { @@ -60,27 +76,38 @@ impl Send

{ } } + let ret = Stream::new(self.next_stream_id); + // Increment the number of locally initiated streams self.num_streams += 1; + self.next_stream_id.increment(); - Ok(State::default()) + Ok(ret) } - pub fn send_headers(&mut self, state: &mut State, eos: bool) + pub fn send_headers(&mut self, + frame: frame::Headers, + stream: &mut store::Ptr) -> Result<(), ConnectionError> { - state.send_open(self.init_window_sz, eos) + // Update the state + stream.state.send_open(self.init_window_sz, frame.is_end_stream())?; + + // Queue the frame for sending + self.prioritize.queue_frame(frame.into(), stream); + + Ok(()) } - pub fn send_eos(&mut self, state: &mut State) + pub fn send_eos(&mut self, stream: &mut Stream) -> Result<(), ConnectionError> { - state.send_close() + stream.state.send_close() } - pub fn send_data(&mut self, - frame: &frame::Data, - state: &mut State) + pub fn send_data(&mut self, + frame: frame::Data, + stream: &mut store::Ptr) -> Result<(), ConnectionError> { let sz = frame.payload().remaining(); @@ -94,7 +121,7 @@ impl Send

{ // Make borrow checker happy loop { - match state.send_flow_control() { + match stream.send_flow_control() { Some(flow) => { try!(self.flow_control.ensure_window(sz, FlowControlViolation)); @@ -110,7 +137,7 @@ impl Send

{ None => {} } - if state.is_closed() { + if stream.state.is_closed() { return Err(InactiveStreamId.into()) } else { return Err(UnexpectedFrameType.into()) @@ -118,14 +145,25 @@ impl Send

{ } if frame.is_end_stream() { - try!(state.send_close()); + try!(stream.state.send_close()); } + self.prioritize.queue_frame(frame.into(), stream); + Ok(()) } + pub fn poll_complete(&mut self, + store: &mut Store, + dst: &mut Codec) + -> Poll<(), ConnectionError> + where T: AsyncWrite, + { + self.prioritize.poll_complete(store, dst) + } + /// Get pending window updates - pub fn poll_window_update(&mut self, streams: &mut Store) + pub fn poll_window_update(&mut self, streams: &mut Store) -> Poll { // This biases connection window updates, which probably makes sense. @@ -138,8 +176,8 @@ impl Send

{ // TODO this should probably account for stream priority? let update = self.pending_window_updates.pop_front() .and_then(|id| { - streams.get_mut(&id) - .and_then(|state| state.send_flow_control()) + streams.find_mut(&id) + .and_then(|stream| stream.into_mut().send_flow_control()) .and_then(|flow| flow.apply_window_update()) .map(|incr| WindowUpdate::new(id, incr)) }); @@ -171,10 +209,10 @@ impl Send

{ pub fn recv_stream_window_update(&mut self, frame: frame::WindowUpdate, - state: &mut State) + stream: &mut store::Ptr) -> Result<(), ConnectionError> { - if let Some(flow) = state.send_flow_control() { + if let Some(flow) = stream.send_flow_control() { // TODO: Handle invalid increment flow.expand_window(frame.size_increment()); } @@ -191,15 +229,13 @@ impl Send

{ } /// Returns true if the local actor can initiate a stream with the given ID. - fn ensure_can_open(&self, id: StreamId) -> Result<(), ConnectionError> { + fn ensure_can_open(&self) -> Result<(), ConnectionError> { if P::is_server() { // Servers cannot open streams. PushPromise must first be reserved. return Err(UnexpectedFrameType.into()); } - if !id.is_client_initiated() { - return Err(InvalidStreamId.into()); - } + // TODO: Handle StreamId overflow Ok(()) } diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 369ac8c96..7b3bdc37d 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -58,7 +58,7 @@ enum Inner { Idle, // TODO: these states shouldn't count against concurrency limits: //ReservedLocal, - //ReservedRemote, + ReservedRemote, Open { local: Peer, remote: Peer, @@ -66,7 +66,7 @@ enum Inner { HalfClosedLocal(Peer), // TODO: explicitly name this value HalfClosedRemote(Peer), // When reset, a reason is provided - Closed(Option), + Closed(Option), } #[derive(Debug, Copy, Clone)] @@ -76,6 +76,12 @@ enum Peer { Streaming(FlowControl), } +#[derive(Debug, Copy, Clone)] +enum Cause { + Proto(Reason), + Io, +} + impl State { /// Opens the send-half of a stream if it is not already open. pub fn send_open(&mut self, sz: WindowSize, eos: bool) -> Result<(), ConnectionError> { @@ -120,11 +126,16 @@ impl State { /// Open the receive have of the stream, this action is taken when a HEADERS /// frame is received. - pub fn recv_open(&mut self, sz: WindowSize, eos: bool) -> Result<(), ConnectionError> { + /// + /// Returns true if this transitions the state to Open + pub fn recv_open(&mut self, sz: WindowSize, eos: bool) -> Result { let remote = Peer::streaming(sz); + let mut initial = false; self.inner = match self.inner { Idle => { + initial = true; + if eos { HalfClosedRemote(AwaitingHeaders) } else { @@ -134,6 +145,18 @@ impl State { } } } + ReservedRemote => { + initial = true; + + if eos { + Closed(None) + } else { + Open { + local: AwaitingHeaders, + remote, + } + } + } Open { local, remote: AwaitingHeaders } => { if eos { HalfClosedRemote(local) @@ -157,7 +180,18 @@ impl State { } }; - return Ok(()); + return Ok(initial); + } + + /// Transition from Idle -> ReservedRemote + pub fn reserve_remote(&mut self) -> Result<(), ConnectionError> { + match self.inner { + Idle => { + self.inner = ReservedRemote; + Ok(()) + } + _ => Err(ProtocolError.into()), + } } /// Indicates that the remote side will not send more data to the local. @@ -178,6 +212,19 @@ impl State { } } + pub fn recv_err(&mut self, err: &ConnectionError) { + match self.inner { + Closed(..) => {} + _ => { + self.inner = Closed(match *err { + ConnectionError::Proto(reason) => Some(Cause::Proto(reason)), + ConnectionError::Io(..) => Some(Cause::Io), + _ => panic!("cannot terminate stream with user error"), + }); + } + } + } + /// Indicates that the local side will not send more data to the local. pub fn send_close(&mut self) -> Result<(), ConnectionError> { match self.inner { @@ -196,6 +243,17 @@ impl State { } } + /// Returns true if a stream with the current state counts against the + /// concurrency limit. + pub fn is_counted(&self) -> bool { + match self.inner { + Open { .. } => true, + HalfClosedLocal(..) => true, + HalfClosedRemote(..) => true, + _ => false, + } + } + pub fn is_closed(&self) -> bool { match self.inner { Closed(_) => true, @@ -203,6 +261,13 @@ impl State { } } + pub fn is_recv_closed(&self) -> bool { + match self.inner { + Closed(..) | HalfClosedRemote(..) => true, + _ => false, + } + } + pub fn recv_flow_control(&mut self) -> Option<&mut FlowControl> { match self.inner { Open { ref mut remote, .. } | @@ -218,6 +283,21 @@ impl State { _ => None, } } + + pub fn ensure_recv_open(&self) -> Result<(), ConnectionError> { + use std::io; + + // TODO: Is this correct? + match self.inner { + Closed(Some(Cause::Proto(reason))) => { + Err(ConnectionError::Proto(reason)) + } + Closed(Some(Cause::Io)) => { + Err(ConnectionError::Io(io::ErrorKind::BrokenPipe.into())) + } + _ => Ok(()), + } + } } impl Default for State { diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 39625299c..89900b83f 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -1,32 +1,59 @@ -extern crate slab; - use super::*; +use slab; + +use std::ops; use std::collections::{HashMap, hash_map}; +use std::marker::PhantomData; /// Storage for streams #[derive(Debug)] -pub struct Store { - slab: slab::Slab, +pub(super) struct Store { + slab: slab::Slab>, ids: HashMap, } -pub enum Entry<'a> { - Occupied(OccupiedEntry<'a>), - Vacant(VacantEntry<'a>), +/// "Pointer" to an entry in the store +pub(super) struct Ptr<'a, B: 'a> { + key: Key, + store: &'a mut Store, +} + +/// References an entry in the store. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) struct Key(usize); + +#[derive(Debug)] +pub(super) struct List { + indices: Option, + _p: PhantomData, +} + +/// A linked list +#[derive(Debug, Clone, Copy)] +struct Indices { + pub head: Key, + pub tail: Key, +} + +pub(super) enum Entry<'a, B: 'a> { + Occupied(OccupiedEntry<'a, B>), + Vacant(VacantEntry<'a, B>), } -pub struct OccupiedEntry<'a> { +pub(super) struct OccupiedEntry<'a, B: 'a> { ids: hash_map::OccupiedEntry<'a, StreamId, usize>, - slab: &'a mut slab::Slab, + slab: &'a mut slab::Slab>, } -pub struct VacantEntry<'a> { +pub(super) struct VacantEntry<'a, B: 'a> { ids: hash_map::VacantEntry<'a, StreamId, usize>, - slab: &'a mut slab::Slab, + slab: &'a mut slab::Slab>, } -impl Store { +// ===== impl Store ===== + +impl Store { pub fn new() -> Self { Store { slab: slab::Slab::new(), @@ -34,15 +61,35 @@ impl Store { } } - pub fn get_mut(&mut self, id: &StreamId) -> Option<&mut State> { - if let Some(handle) = self.ids.get(id) { - Some(&mut self.slab[*handle]) + pub fn resolve(&mut self, key: Key) -> Ptr { + Ptr { + key: key, + store: self, + } + } + + pub fn find_mut(&mut self, id: &StreamId) -> Option> { + if let Some(&key) = self.ids.get(id) { + Some(Ptr { + key: Key(key), + store: self, + }) } else { None } } - pub fn entry(&mut self, id: StreamId) -> Entry { + pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr { + let key = self.slab.insert(val); + assert!(self.ids.insert(id, key).is_none()); + + Ptr { + key: Key(key), + store: self, + } + } + + pub fn find_entry(&mut self, id: StreamId) -> Entry { use self::hash_map::Entry::*; match self.ids.entry(id) { @@ -60,22 +107,145 @@ impl Store { } } } + + pub fn for_each(&mut self, mut f: F) + where F: FnMut(&mut Stream) + { + for &id in self.ids.values() { + f(&mut self.slab[id]) + } + } } -impl<'a> OccupiedEntry<'a> { - pub fn into_mut(self) -> &'a mut State { +// ===== impl List ===== + +impl List { + pub fn new() -> Self { + List { + indices: None, + _p: PhantomData, + } + } + + pub fn is_empty(&self) -> bool { + self.indices.is_none() + } + + pub fn take(&mut self) -> Self { + List { + indices: self.indices.take(), + _p: PhantomData, + } + } + + pub fn push(&mut self, stream: &mut store::Ptr) { + // The next pointer shouldn't be set + debug_assert!(stream.next.is_none()); + + // Queue the stream + match self.indices { + Some(ref mut idxs) => { + // Update the current tail node to point to `stream` + stream.resolve(idxs.tail).next = Some(stream.key()); + + // Update the tail pointer + idxs.tail = stream.key(); + } + None => { + self.indices = Some(store::Indices { + head: stream.key(), + tail: stream.key(), + }); + } + } + } + + pub fn pop<'a>(&mut self, store: &'a mut Store) -> Option> { + if let Some(mut idxs) = self.indices { + let mut stream = store.resolve(idxs.head); + + if idxs.head == idxs.tail { + assert!(stream.next.is_none()); + self.indices = None; + } else { + idxs.head = stream.next.take().unwrap(); + self.indices = Some(idxs); + } + + return Some(stream); + } + + None + } +} + +// ===== impl Ptr ===== + +impl<'a, B: 'a> Ptr<'a, B> { + pub fn key(&self) -> Key { + self.key + } + + pub fn store(&mut self) -> &mut Store { + &mut self.store + } + + pub fn resolve(&mut self, key: Key) -> Ptr { + Ptr { + key: key, + store: self.store, + } + } + + pub fn into_mut(self) -> &'a mut Stream { + &mut self.store.slab[self.key.0] + } +} + +impl<'a, B: 'a> ops::Deref for Ptr<'a, B> { + type Target = Stream; + + fn deref(&self) -> &Stream { + &self.store.slab[self.key.0] + } +} + +impl<'a, B: 'a> ops::DerefMut for Ptr<'a, B> { + fn deref_mut(&mut self) -> &mut Stream { + &mut self.store.slab[self.key.0] + } +} + +// ===== impl OccupiedEntry ===== + +impl<'a, B> OccupiedEntry<'a, B> { + pub fn key(&self) -> Key { + Key(*self.ids.get()) + } + + pub fn get(&self) -> &Stream { + &self.slab[*self.ids.get()] + } + + pub fn get_mut(&mut self) -> &mut Stream { + &mut self.slab[*self.ids.get()] + } + + pub fn into_mut(self) -> &'a mut Stream { &mut self.slab[*self.ids.get()] } } -impl<'a> VacantEntry<'a> { - pub fn insert(self, value: State) -> &'a mut State { +// ===== impl VacantEntry ===== + +impl<'a, B> VacantEntry<'a, B> { + pub fn insert(self, value: Stream) -> Key { // Insert the value in the slab - let handle = self.slab.insert(value); + let key = self.slab.insert(value); // Insert the handle in the ID map - self.ids.insert(handle); + self.ids.insert(key); - &mut self.slab[handle] + Key(key) } } diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs new file mode 100644 index 000000000..7518a483b --- /dev/null +++ b/src/proto/streams/stream.rs @@ -0,0 +1,60 @@ +use super::*; + +#[derive(Debug)] +pub(super) struct Stream { + /// The h2 stream identifier + pub id: StreamId, + + /// Current state of the stream + pub state: State, + + /// Frames pending for this stream to read + pub pending_recv: buffer::Deque, + + /// Task tracking receiving frames + pub recv_task: Option, + + /// Frames pending for this stream being sent to the socket + pub pending_send: buffer::Deque, + + /// Next node in the `Stream` linked list. + /// + /// This field is used in different linked lists depending on the stream + /// state. + pub next: Option, + + /// The stream's pending push promises + pub pending_push_promises: store::List, + + /// True if the stream is currently pending send + pub is_pending_send: bool, +} + +impl Stream { + pub fn new(id: StreamId) -> Stream { + Stream { + id, + state: State::default(), + pending_recv: buffer::Deque::new(), + recv_task: None, + pending_send: buffer::Deque::new(), + next: None, + pending_push_promises: store::List::new(), + is_pending_send: false, + } + } + + pub fn send_flow_control(&mut self) -> Option<&mut FlowControl> { + self.state.send_flow_control() + } + + pub fn recv_flow_control(&mut self) -> Option<&mut FlowControl> { + self.state.recv_flow_control() + } + + pub fn notify_recv(&mut self) { + if let Some(ref mut task) = self.recv_task { + task.notify(); + } + } +} diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs new file mode 100644 index 000000000..fd3a47c9f --- /dev/null +++ b/src/proto/streams/streams.rs @@ -0,0 +1,446 @@ +use client; +use proto::*; +use super::*; + +use std::sync::{Arc, Mutex}; + +// TODO: All the VecDeques should become linked lists using the State +// values. +#[derive(Debug)] +pub struct Streams { + inner: Arc>>, +} + +/// Reference to the stream state +#[derive(Debug)] +pub struct StreamRef { + inner: Arc>>, + key: store::Key, +} + +#[derive(Debug)] +pub struct Chunk + where P: Peer, + B: Buf, +{ + inner: Arc>>, + recv: recv::Chunk, +} + +/// Fields needed to manage state related to managing the set of streams. This +/// is mostly split out to make ownership happy. +/// +/// TODO: better name +#[derive(Debug)] +struct Inner { + actions: Actions, + store: Store, +} + +#[derive(Debug)] +struct Actions { + /// Manages state transitions initiated by receiving frames + recv: Recv, + + /// Manages state transitions initiated by sending frames + send: Send, +} + +impl Streams + where P: Peer, + B: Buf, +{ + pub fn new(config: Config) -> Self { + Streams { + inner: Arc::new(Mutex::new(Inner { + actions: Actions { + recv: Recv::new(&config), + send: Send::new(&config), + }, + store: Store::new(), + })), + } + } + + /// Process inbound headers + pub fn recv_headers(&mut self, frame: frame::Headers) + -> Result, ConnectionError> + { + let id = frame.stream_id(); + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let key = match me.store.find_entry(id) { + Entry::Occupied(e) => e.key(), + Entry::Vacant(e) => { + // Trailers cannot open a stream. Trailers are header frames + // that do not contain pseudo headers. Requests MUST contain a + // method and responses MUST contain a status. If they do not,t + // hey are considered to be malformed. + if frame.is_trailers() { + return Err(ProtocolError.into()); + } + + match try!(me.actions.recv.open(id)) { + Some(stream) => e.insert(stream), + None => return Ok(None), + } + } + }; + + let stream = me.store.resolve(key); + + me.actions.transition(stream, |actions, stream| { + if frame.is_trailers() { + unimplemented!(); + /* + if !frame.is_end_stream() { + // TODO: What error should this return? + unimplemented!(); + } + + try!(me.actions.recv.recv_eos(stream)); + */ + } else { + actions.recv.recv_headers(frame, stream) + } + }) + } + + pub fn recv_data(&mut self, frame: frame::Data) + -> Result<(), ConnectionError> + { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let id = frame.stream_id(); + + let stream = match me.store.find_mut(&id) { + Some(stream) => stream, + None => return Err(ProtocolError.into()), + }; + + me.actions.transition(stream, |actions, stream| { + actions.recv.recv_data(frame, stream) + }) + } + + pub fn recv_reset(&mut self, frame: frame::Reset) + -> Result<(), ConnectionError> + { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let id = frame.stream_id(); + + let mut stream = match me.store.find_mut(&id) { + Some(stream) => stream, + // TODO: should this be an error? + None => return Ok(()), + }; + + me.actions.transition(stream, |actions, stream| { + actions.recv.recv_reset(frame, stream)?; + assert!(stream.state.is_closed()); + Ok(()) + }) + } + + pub fn recv_err(&mut self, err: &ConnectionError) { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let actions = &mut me.actions; + me.store.for_each(|stream| actions.recv.recv_err(err, stream)); + } + + pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) + -> Result<(), ConnectionError> { + let id = frame.stream_id(); + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + if id.is_zero() { + try!(me.actions.send.recv_connection_window_update(frame)); + } else { + // The remote may send window updates for streams that the local now + // considers closed. It's ok... + if let Some(mut stream) = me.store.find_mut(&id) { + try!(me.actions.send.recv_stream_window_update(frame, &mut stream)); + } + } + + Ok(()) + } + + pub fn recv_push_promise(&mut self, frame: frame::PushPromise) + -> Result<(), ConnectionError> + { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let id = frame.stream_id(); + + let mut stream = match me.store.find_mut(&id) { + Some(stream) => stream, + None => return Err(ProtocolError.into()), + }; + + me.actions.recv.recv_push_promise(frame, &mut stream) + } + + pub fn send_headers(&mut self, headers: frame::Headers) + -> Result<(), ConnectionError> + { + unimplemented!(); + /* + let id = frame.stream_id(); + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + // let (id, state) = me.actions.send.open()); + + + let state = match me.store.entry(id) { + Entry::Occupied(e) => e.into_mut(), + Entry::Vacant(e) => { + let (id, state) = try!(me.actions.send.open()); + e.insert(state) + } + }; + + if frame.is_trailers() { + try!(me.actions.send.send_eos(state)); + } else { + try!(me.actions.send.send_headers(state, frame.is_end_stream())); + } + + if state.is_closed() { + me.actions.dec_num_streams(id); + } + + Ok(()) + */ + } + + pub fn poll_window_update(&mut self) + -> Poll + { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + me.actions.send.poll_window_update(&mut me.store) + } + + pub fn expand_window(&mut self, id: StreamId, sz: WindowSize) + -> Result<(), ConnectionError> + { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + if id.is_zero() { + try!(me.actions.recv.expand_connection_window(sz)); + } else { + if let Some(mut stream) = me.store.find_mut(&id) { + try!(me.actions.recv.expand_stream_window(id, sz, &mut stream)); + } + } + + Ok(()) + } + + pub fn send_pending_refusal(&mut self, dst: &mut Codec) + -> Poll<(), ConnectionError> + where T: AsyncWrite, + { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + me.actions.recv.send_pending_refusal(dst) + } + + pub fn poll_complete(&mut self, dst: &mut Codec) + -> Poll<(), ConnectionError> + where T: AsyncWrite, + { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + // TODO: sending window updates should be part of Prioritize + /* + try_ready!(me.actions.recv.send_connection_window_update(dst)); + try_ready!(me.actions.recv.send_stream_window_update(&mut me.store, dst)); + */ + + me.actions.send.poll_complete(&mut me.store, dst) + } +} + +impl Streams + where B: Buf, +{ + pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) + -> Result, ConnectionError> + { + // TODO: There is a hazard with assigning a stream ID before the + // prioritize layer. If prioritization reorders new streams, this + // implicitly closes the earlier stream IDs. + // + // See: carllerche/h2#11 + let key = { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + // Initialize a new stream. This fails if the connection is at capacity. + let mut stream = me.actions.send.open()?; + + // Convert the message + let headers = client::Peer::convert_send_message( + stream.id, request, end_of_stream); + + let mut stream = me.store.insert(stream.id, stream); + + me.actions.send.send_headers(headers, &mut stream)?; + + // Given that the stream has been initialized, it should not be in the + // closed state. + debug_assert!(!stream.state.is_closed()); + + stream.key() + }; + + Ok(StreamRef { + inner: self.inner.clone(), + key: key, + }) + } +} + +// ===== impl StreamRef ===== + +impl StreamRef + where P: Peer, + B: Buf, +{ + pub fn send_data(&mut self, data: B, end_of_stream: bool) + -> Result<(), ConnectionError> + { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let stream = me.store.resolve(self.key); + + // Create the data frame + let frame = frame::Data::from_buf(stream.id, data, end_of_stream); + + me.actions.transition(stream, |actions, stream| { + // Send the data frame + actions.send.send_data(frame, stream) + }) + } + + pub fn poll_data(&mut self) -> Poll>, ConnectionError> { + let recv = { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + try_ready!(me.actions.recv.poll_chunk(&mut stream)) + }; + + // Convert to a chunk + let chunk = recv.map(|recv| { + Chunk { + inner: self.inner.clone(), + recv: recv, + } + }); + + Ok(chunk.into()) + } +} + +impl StreamRef + where B: Buf, +{ + pub fn poll_response(&mut self) -> Poll, ConnectionError> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + me.actions.recv.poll_response(&mut stream) + } +} + + + +impl Clone for StreamRef { + fn clone(&self) -> Self { + StreamRef { + inner: self.inner.clone(), + key: self.key.clone(), + } + } +} + +// ===== impl Chunk ===== + +impl Chunk + where P: Peer, + B: Buf, +{ + // TODO: Come up w/ a better API + pub fn pop_bytes(&mut self) -> Option { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + me.actions.recv.pop_bytes(&mut self.recv) + } +} + +impl Drop for Chunk + where P: Peer, + B: Buf, +{ + fn drop(&mut self) { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + while let Some(_) = me.actions.recv.pop_bytes(&mut self.recv) { + } + } +} + +// ===== impl Actions ===== + +impl Actions + where P: Peer, + B: Buf, +{ + fn dec_num_streams(&mut self, id: StreamId) { + if self.is_local_init(id) { + self.send.dec_num_streams(); + } else { + self.recv.dec_num_streams(); + } + } + + fn is_local_init(&self, id: StreamId) -> bool { + assert!(!id.is_zero()); + P::is_server() == id.is_server_initiated() + } + + fn transition(&mut self, mut stream: store::Ptr, f: F) -> U + where F: FnOnce(&mut Self, &mut store::Ptr) -> U, + { + let is_counted = stream.state.is_counted(); + + let ret = f(self, &mut stream); + + if is_counted && stream.state.is_closed() { + self.dec_num_streams(stream.id); + } + + ret + } +} diff --git a/tests/client_request.rs b/tests/client_request.rs index 49250b2cb..4db144e02 100644 --- a/tests/client_request.rs +++ b/tests/client_request.rs @@ -14,47 +14,47 @@ fn handshake() { .write(SETTINGS_ACK) .build(); - let h2 = client::handshake(mock) + let h2 = Client::handshake(mock) .wait().unwrap(); + trace!("hands have been shook"); // At this point, the connection should be closed - assert!(Stream::wait(h2).next().is_none()); + h2.wait().unwrap(); } - #[test] -fn send_request_with_zero_stream_id() { +fn recv_invalid_server_stream_id() { + let _ = ::env_logger::init(); + let mock = mock_io::Builder::new() .handshake() + // Write GET / + .write(&[ + 0, 0, 0x10, 1, 5, 0, 0, 0, 1, 0x82, 0x87, 0x41, 0x8B, 0x9D, 0x29, + 0xAC, 0x4B, 0x8F, 0xA8, 0xE9, 0x19, 0x97, 0x21, 0xE9, 0x84, + ]) + .write(SETTINGS_ACK) + // Read response + .read(&[0, 0, 1, 1, 5, 0, 0, 0, 2, 137]) .build(); - let h2 = client::handshake(mock) + let mut h2 = Client::handshake(mock) .wait().unwrap(); // Send the request - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()).unwrap(); - let err = h2.send_request(0.into(), request, true).wait().unwrap_err(); - assert_user_err!(err, InvalidStreamId); -} - -#[test] -fn send_request_with_server_stream_id() { - let mock = mock_io::Builder::new() - .handshake() - .build(); + info!("sending request"); + let mut stream = h2.request(request, true).unwrap(); - let h2 = client::handshake(mock) - .wait().unwrap(); - - // Send the request - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); + // The connection errors + assert_proto_err!(h2.wait().unwrap_err(), ProtocolError); - let err = h2.send_request(2.into(), request, true).wait().unwrap_err(); - assert_user_err!(err, InvalidStreamId); + // The stream errors + assert_proto_err!(stream.wait().unwrap_err(), ProtocolError); } #[test] @@ -67,54 +67,10 @@ fn request_without_scheme() { fn request_with_h1_version() { } -#[test] -fn send_invalid_client_stream_id() { - let _ = ::env_logger::init(); - - for &id in &[0, 2] { - let mock = mock_io::Builder::new() - .handshake() - .build(); - - let h2 = client::handshake(mock) - .wait().unwrap(); - - // Send the request - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); - let err = h2.send_request(id.into(), request, true).wait().unwrap_err(); - - assert_user_err!(err, InvalidStreamId); - } -} #[test] -fn recv_invalid_server_stream_id() { - let _ = ::env_logger::init(); - - let mock = mock_io::Builder::new() - .handshake() - // Write GET / - .write(&[ - 0, 0, 0x10, 1, 5, 0, 0, 0, 1, 0x82, 0x87, 0x41, 0x8B, 0x9D, 0x29, - 0xAC, 0x4B, 0x8F, 0xA8, 0xE9, 0x19, 0x97, 0x21, 0xE9, 0x84, - ]) - .write(SETTINGS_ACK) - // Read response - .read(&[0, 0, 1, 1, 5, 0, 0, 0, 2, 137]) - .build(); - - let h2 = client::handshake(mock) - .wait().unwrap(); - - // Send the request - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); - let h2 = h2.send_request(1.into(), request, true).wait().unwrap(); - - // Get the response - let (err, _) = h2.into_future().wait().unwrap_err(); - assert_proto_err!(err, ProtocolError); +#[ignore] +fn sending_request_on_closed_soket() { } const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; diff --git a/tests/ping_pong.rs b/tests/ping_pong.rs index 4ef927624..e548797e9 100644 --- a/tests/ping_pong.rs +++ b/tests/ping_pong.rs @@ -31,10 +31,10 @@ fn recv_single_ping() { */ .build(); + /* let h2 = client::handshake(mock) .wait().unwrap(); - /* // Send the request let mut request = request::Head::default(); request.method = method::POST; diff --git a/tests/prioritization.rs b/tests/prioritization.rs index 6bb30d709..a1312181d 100644 --- a/tests/prioritization.rs +++ b/tests/prioritization.rs @@ -29,6 +29,7 @@ fn single_stream_send_large_body() { ]) .build(); + /* let h2 = client::handshake(mock) .wait().unwrap(); @@ -65,4 +66,5 @@ fn single_stream_send_large_body() { } assert!(Stream::wait(h2).next().is_none());; + */ } diff --git a/tests/server_preface.rs b/tests/server_preface.rs index 71f3ef9f7..a1a15d35e 100644 --- a/tests/server_preface.rs +++ b/tests/server_preface.rs @@ -4,7 +4,7 @@ extern crate futures; extern crate mock_io; extern crate env_logger; -use h2::server; +// use h2::server; use futures::*; @@ -13,6 +13,7 @@ const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; #[test] fn read_preface_in_multiple_frames() { + /* let _ = ::env_logger::init().unwrap(); let mock = mock_io::Builder::new() @@ -28,4 +29,5 @@ fn read_preface_in_multiple_frames() { .wait().unwrap(); assert!(Stream::wait(h2).next().is_none()); + */ } diff --git a/tests/stream_states.rs b/tests/stream_states.rs index 2b1594f45..6ef557abc 100644 --- a/tests/stream_states.rs +++ b/tests/stream_states.rs @@ -5,8 +5,6 @@ extern crate log; pub mod support; use support::*; -use h2::Frame; - #[test] fn send_recv_headers_only() { let _ = env_logger::init(); @@ -23,31 +21,21 @@ fn send_recv_headers_only() { .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) .build(); - let h2 = client::handshake(mock) + let mut h2 = Client::handshake(mock) .wait().unwrap(); // Send the request - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()).unwrap(); info!("sending request"); - let h2 = h2.send_request(1.into(), request, true).wait().unwrap(); + let mut stream = h2.request(request, true).unwrap(); - // Get the response - - info!("getting response"); - let (resp, h2) = h2.into_future().wait().unwrap(); + let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + assert_eq!(resp.status(), status::NO_CONTENT); - match resp.unwrap() { - Frame::Headers { headers, .. } => { - assert_eq!(headers.status, status::NO_CONTENT); - } - _ => panic!("unexpected frame"), - } - - // No more frames - info!("ensure no more responses"); - assert!(Stream::wait(h2).next().is_none());; + h2.wait().unwrap(); } #[test] @@ -75,46 +63,44 @@ fn send_recv_data() { ]) .build(); - let h2 = client::handshake(mock).wait().expect("handshake"); + let mut h2 = Client::handshake2(mock) + .wait().unwrap(); - // Send the request - let mut request = request::Head::default(); - request.method = method::POST; - request.uri = "https://http2.akamai.com/".parse().unwrap(); - let h2 = h2.send_request(1.into(), request, false).wait().expect("send request"); + let request = Request::builder() + .method(method::POST) + .uri("https://http2.akamai.com/") + .body(()).unwrap(); - let b = "hello"; + info!("sending request"); + let mut stream = h2.request(request, false).unwrap(); // Send the data - let h2 = h2.send_data(1.into(), b.into(), true).wait().expect("send data"); - - // Get the response headers - let (resp, h2) = h2.into_future().wait().expect("into future"); - - match resp.expect("response headers") { - Frame::Headers { headers, .. } => { - assert_eq!(headers.status, status::OK); - } - _ => panic!("unexpected frame"), - } - - // Get the response body - let (data, h2) = h2.into_future().wait().expect("into future"); - - match data.expect("response data") { - Frame::Data { id, data, end_of_stream, .. } => { - assert_eq!(id, 1.into()); - assert_eq!(data, &b"world"[..]); - assert!(end_of_stream); - } - _ => panic!("unexpected frame"), - } - - assert!(Stream::wait(h2).next().is_none());; + stream.send_data("hello", true).unwrap(); + + // Get the response + let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + assert_eq!(resp.status(), status::OK); + + // Take the body + let (_, body) = resp.into_parts(); + + // Wait for all the data frames to be received + let mut chunks = h2.run(body.collect()).unwrap(); + + // Only one chunk since two frames are coalesced. + assert_eq!(1, chunks.len()); + + let data = chunks[0].pop_bytes().unwrap(); + assert_eq!(data, &b"world"[..]); + + assert!(chunks[0].pop_bytes().is_none()); + + // The H2 connection is closed + h2.wait().unwrap(); } #[test] -fn send_headers_recv_data() { +fn send_headers_recv_data_single_frame() { let _ = env_logger::init(); let mock = mock_io::Builder::new() @@ -132,80 +118,42 @@ fn send_headers_recv_data() { ]) .build(); - let h2 = client::handshake(mock) + let mut h2 = Client::handshake(mock) .wait().unwrap(); // Send the request - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); - let h2 = h2.send_request(1.into(), request, true).wait().unwrap(); - - // Get the response headers - let (resp, h2) = h2.into_future().wait().unwrap(); - - match resp.unwrap() { - Frame::Headers { headers, .. } => { - assert_eq!(headers.status, status::OK); - } - _ => panic!("unexpected frame"), - } - - // Get the response body - let (data, h2) = h2.into_future().wait().unwrap(); - - match data.unwrap() { - Frame::Data { id, data, end_of_stream, .. } => { - assert_eq!(id, 1.into()); - assert_eq!(data, &b"hello"[..]); - assert!(!end_of_stream); - } - _ => panic!("unexpected frame"), - } - - // Get the response body - let (data, h2) = h2.into_future().wait().unwrap(); - - match data.unwrap() { - Frame::Data { id, data, end_of_stream, .. } => { - assert_eq!(id, 1.into()); - assert_eq!(data, &b"world"[..]); - assert!(end_of_stream); - } - _ => panic!("unexpected frame"), - } - - assert!(Stream::wait(h2).next().is_none());; -} + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()).unwrap(); -#[test] -fn send_headers_twice_with_same_stream_id() { - let _ = env_logger::init(); + info!("sending request"); + let mut stream = h2.request(request, true).unwrap(); - let mock = mock_io::Builder::new() - .handshake() - // Write GET / - .write(&[ - 0, 0, 0x10, 1, 5, 0, 0, 0, 1, 0x82, 0x87, 0x41, 0x8B, 0x9D, 0x29, - 0xAC, 0x4B, 0x8F, 0xA8, 0xE9, 0x19, 0x97, 0x21, 0xE9, 0x84, - ]) - .build(); + let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + assert_eq!(resp.status(), status::OK); - let h2 = client::handshake(mock) - .wait().unwrap(); + // Take the body + let (_, body) = resp.into_parts(); - // Send the request - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); - let h2 = h2.send_request(1.into(), request, true).wait().unwrap(); + // Wait for all the data frames to be received + let mut chunks = h2.run(body.collect()).unwrap(); - // Send another request with the same stream ID - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); - let err = h2.send_request(1.into(), request, true).wait().unwrap_err(); + // Only one chunk since two frames are coalesced. + assert_eq!(1, chunks.len()); - assert_user_err!(err, UnexpectedFrameType); + let data = chunks[0].pop_bytes().unwrap(); + assert_eq!(data, &b"hello"[..]); + + let data = chunks[0].pop_bytes().unwrap(); + assert_eq!(data, &b"world"[..]); + + assert!(chunks[0].pop_bytes().is_none()); + + // The H2 connection is closed + h2.wait().unwrap(); } +/* #[test] fn send_data_after_headers_eos() { let _ = env_logger::init(); @@ -238,22 +186,8 @@ fn send_data_after_headers_eos() { assert_user_err!(err, UnexpectedFrameType); } -#[test] -fn send_data_without_headers() { - let mock = mock_io::Builder::new() - .handshake() - .build(); - - let h2 = client::handshake(mock) - .wait().unwrap(); - - let b = Bytes::from_static(b"hello world"); - let err = h2.send_data(1.into(), b, true).wait().unwrap_err(); - - assert_user_err!(err, UnexpectedFrameType); -} - #[test] #[ignore] fn exceed_max_streams() { } +*/ diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 3074d5272..47869d8ff 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -3,6 +3,7 @@ pub extern crate bytes; pub extern crate h2; pub extern crate http; +pub extern crate tokio_io; pub extern crate futures; pub extern crate mock_io; pub extern crate env_logger; @@ -12,26 +13,30 @@ pub use self::futures::{ Sink, Stream, }; +pub use self::futures::future::poll_fn; pub use self::http::{ request, response, method, status, + Request, + Response, }; -pub use self::h2::{ - client, - server, -}; +pub use self::h2::client::{self, Client}; +// pub use self::h2::server; pub use self::bytes::{ Buf, BufMut, Bytes, BytesMut, + IntoBuf, }; +use tokio_io::{AsyncRead, AsyncWrite}; + pub trait MockH2 { fn handshake(&mut self) -> &mut Self; } @@ -46,6 +51,33 @@ impl MockH2 for mock_io::Builder { } } +pub trait ClientExt { + fn run(&mut self, f: F) -> Result; +} + +impl ClientExt for Client + where T: AsyncRead + AsyncWrite + 'static, + B: IntoBuf + 'static, +{ + fn run(&mut self, f: F) -> Result { + use futures::future::{self, Future}; + use futures::future::Either::*; + + let res = future::poll_fn(|| self.poll()) + .select2(f).wait(); + + match res { + Ok(A((_, b))) => { + // Connection is done... + b.wait() + } + Ok(B((v, _))) => return Ok(v), + Err(A((e, _))) => panic!("err: {:?}", e), + Err(B((e, _))) => return Err(e), + } + } +} + pub mod frames { //! Some useful frames