diff --git a/src/client.rs b/src/client.rs index dbd128936..a5fba808e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -52,16 +52,8 @@ impl Peer for Client { type Send = http::request::Head; type Poll = http::response::Head; - fn is_valid_local_stream_id(id: StreamId) -> bool { - id.is_client_initiated() - } - - fn is_valid_remote_stream_id(id: StreamId) -> bool { - id.is_server_initiated() - } - - fn local_can_open() -> bool { - true + fn is_server() -> bool { + false } fn convert_send_message( diff --git a/src/lib.rs b/src/lib.rs index e625f7827..48b4a7ba3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -77,23 +77,7 @@ pub trait Peer { /// Message type polled from the transport type Poll; - /// Returns `true` if `id` is a valid StreamId for a stream initiated by the - /// local node. - fn is_valid_local_stream_id(id: StreamId) -> bool; - - /// Returns `true` if `id` is a valid StreamId for a stream initiated by the - /// remote node. - fn is_valid_remote_stream_id(id: StreamId) -> bool; - - fn local_can_open() -> bool; - fn remote_can_open() -> bool { - !Self::local_can_open() - } - - //fn can_reserve_local_stream() -> bool; - // fn can_reserve_remote_stream() -> bool { - // !self.can_reserve_local_stream - // } + fn is_server() -> bool; #[doc(hidden)] fn convert_send_message( diff --git a/src/proto/connection.rs b/src/proto/connection.rs index b0afc8b6b..982349455 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -13,13 +13,13 @@ use std::marker::PhantomData; /// An H2 connection #[derive(Debug)] pub struct Connection { - inner: Transport, + inner: Transport, // Set to `true` as long as the connection is in a valid state. active: bool, _phantom: PhantomData<(P, B)>, } -pub fn new(transport: Transport) +pub fn new(transport: Transport) -> Connection where T: AsyncRead + AsyncWrite, P: Peer, diff --git a/src/proto/control_streams.rs b/src/proto/control_streams.rs index a1686b805..5e899f091 100644 --- a/src/proto/control_streams.rs +++ b/src/proto/control_streams.rs @@ -1,204 +1,22 @@ -use ConnectionError; use proto::*; /// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up /// to Connection). pub trait ControlStreams { - /// Determines whether the given stream could theoretically be opened by the local - /// side of this connection. - fn local_valid_id(id: StreamId) -> bool; + fn streams(&self) -> &Streams; - /// Determines whether the given stream could theoretically be opened by the remote - /// side of this connection. - fn remote_valid_id(id: StreamId) -> bool; - - /// Indicates whether this local endpoint may open streams (with HEADERS). - /// - /// Implies that this endpoint is a client. - fn local_can_open() -> bool; - - /// Indicates whether this remote endpoint may open streams (with HEADERS). - /// - /// Implies that this endpoint is a server. - fn remote_can_open() -> bool { - !Self::local_can_open() - } - - // TODO push promise - // fn local_can_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; - // fn remote_can_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; - - /// Creates a new stream in the OPEN state from the local side (i.e. as a Client). - /// - /// Must only be called when local_can_open returns true. - fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; - - /// Create a new stream in the OPEN state from the remote side (i.e. as a Server). - /// - /// Must only be called when remote_can_open returns true. - fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; - - /// Prepare the receive side of a local stream to receive data from the remote. - /// - /// Typically called when a client receives a response header. - fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; - - /// Prepare the send side of a remote stream to receive data from the local endpoint. - /// - /// Typically called when a server sends a response header. - fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; - - // TODO push promise - // fn local_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; - // fn remote_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; - - /// Closes the send half of a stream. - /// - /// Fails with a ProtocolError if send half of the stream was not open. - fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; - - /// Closes the recv half of a stream. - /// - /// Fails with a ProtocolError if recv half of the stream was not open. - fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; - - /// Resets the given stream. - /// - /// If the stream was already reset, the stored cause is updated. - fn reset_stream(&mut self, id: StreamId, cause: Reason); - - /// Get the reason the stream was reset, if it was reset. - fn get_reset(&self, id: StreamId) -> Option; - - /// Returns true if the given stream was opened by the local peer and is not yet - /// closed. - fn is_local_active(&self, id: StreamId) -> bool; - - /// Returns true if the given stream was opened by the remote peer and is not yet - /// closed. - fn is_remote_active(&self, id: StreamId) -> bool; - - /// Returns true if the given stream was opened and is not yet closed. - fn is_active(&self, id: StreamId) -> bool { - if Self::local_valid_id(id) { - self.is_local_active(id) - } else { - self.is_remote_active(id) - } - } - - /// Returns the number of open streams initiated by the local peer. - fn local_active_len(&self) -> usize; - - /// Returns the number of open streams initiated by the remote peer. - fn remote_active_len(&self) -> usize; - - /// Returns true iff the recv half of the given stream is open. - fn is_recv_open(&mut self, id: StreamId) -> bool; - - /// Returns true iff the send half of the given stream is open. - fn is_send_open(&mut self, id: StreamId) -> bool; - - /// If the given stream ID is active and able to recv data, get its mutable recv flow - /// control state. - fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>; - - /// If the given stream ID is active and able to send data, get its mutable send flow - /// control state. - fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>; - - /// Updates the initial window size for the local peer. - fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize); - - /// Updates the initial window size for the remote peer. - fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize); + fn streams_mut(&mut self) -> &mut Streams; } macro_rules! proxy_control_streams { ($outer:ident) => ( impl ControlStreams for $outer { - fn local_valid_id(id: StreamId) -> bool { - T::local_valid_id(id) - } - - fn remote_valid_id(id: StreamId) -> bool { - T::remote_valid_id(id) - } - - fn local_can_open() -> bool { - T::local_can_open() - } - - fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.local_open(id, sz) - } - - fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.remote_open(id, sz) - } - - fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.local_open_recv_half(id, sz) - } - - fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.remote_open_send_half(id, sz) - } - - fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_send_half(id) - } - - fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_recv_half(id) - } - - fn reset_stream(&mut self, id: StreamId, cause: Reason) { - self.inner.reset_stream(id, cause) - } - - fn get_reset(&self, id: StreamId) -> Option { - self.inner.get_reset(id) - } - - fn is_local_active(&self, id: StreamId) -> bool { - self.inner.is_local_active(id) - } - - fn is_remote_active(&self, id: StreamId) -> bool { - self.inner.is_remote_active(id) - } - - fn local_active_len(&self) -> usize { - self.inner.local_active_len() - } - - fn remote_active_len(&self) -> usize { - self.inner.remote_active_len() - } - - fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { - self.inner.update_inital_recv_window_size(old_sz, new_sz) - } - - fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { - self.inner.update_inital_send_window_size(old_sz, new_sz) - } - - fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.recv_flow_controller(id) - } - - fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.send_flow_controller(id) - } - - fn is_send_open(&mut self, id: StreamId) -> bool { - self.inner.is_send_open(id) + fn streams(&self) -> &Streams { + self.inner.streams() } - fn is_recv_open(&mut self, id: StreamId) -> bool { - self.inner.is_recv_open(id) + fn streams_mut(&mut self) -> &mut Streams { + self.inner.streams_mut() } } ) diff --git a/src/proto/flow_control_recv.rs b/src/proto/flow_control_recv.rs index 221534cdf..582699676 100644 --- a/src/proto/flow_control_recv.rs +++ b/src/proto/flow_control_recv.rs @@ -44,7 +44,7 @@ impl FlowControlRecv /// Exposes a public upward API for flow control. impl ControlFlowRecv for FlowControlRecv { fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { - let added = match self.recv_flow_controller(id) { + let added = match self.streams_mut().recv_flow_controller(id) { None => false, Some(mut fc) => { fc.expand_window(incr); @@ -57,7 +57,7 @@ impl ControlFlowRecv for FlowControlRecv { self.pending_streams.push_back(id); } Ok(()) - } else if let Some(rst) = self.inner.get_reset(id) { + } else if let Some(rst) = self.streams().get_reset(id) { Err(error::User::StreamReset(rst).into()) } else { Err(error::User::InvalidStreamId.into()) @@ -80,8 +80,8 @@ impl FlowControlRecv } while let Some(id) = self.pending_streams.pop_front() { - if self.inner.get_reset(id).is_none() { - let update = self.recv_flow_controller(id).and_then(|s| s.apply_window_update()); + if self.streams().get_reset(id).is_none() { + let update = self.streams_mut().recv_flow_controller(id).and_then(|s| s.apply_window_update()); if let Some(incr) = update { try_ready!(self.try_send(frame::WindowUpdate::new(id, incr))); } @@ -124,8 +124,9 @@ impl Stream for FlowControlRecv return Err(error::Reason::FlowControlError.into()); } - let fc = self.inner.recv_flow_controller(id) + let fc = self.inner.streams_mut().recv_flow_controller(id) .expect("receiving data with no flow controller"); + if fc.claim_window(sz).is_err() { // TODO this should cause a GO_AWAY return Err(error::Reason::FlowControlError.into()); @@ -206,7 +207,7 @@ impl ApplySettings for FlowControlRecv return Ok(()); } - self.inner.update_inital_recv_window_size(old_window_size, new_window_size); + self.streams_mut().update_inital_recv_window_size(old_window_size, new_window_size); self.initial_window_size = new_window_size; } Ok(()) diff --git a/src/proto/flow_control_send.rs b/src/proto/flow_control_send.rs index a3135c1a1..4ff108baf 100644 --- a/src/proto/flow_control_send.rs +++ b/src/proto/flow_control_send.rs @@ -50,7 +50,7 @@ impl ControlFlowSend for FlowControlSend { // TODO this should probably account for stream priority? while let Some(id) = self.pending_streams.pop_front() { - if let Some(mut flow) = self.send_flow_controller(id) { + if let Some(mut flow) = self.streams_mut().send_flow_controller(id) { if let Some(incr) = flow.apply_window_update() { return Ok(Async::Ready(WindowUpdate::new(id, incr))); } @@ -84,7 +84,7 @@ impl Stream for FlowControlSend } else { // The remote may send window updates for streams that the local // now considers closed. It's okay. - if let Some(fc) = self.inner.send_flow_controller(id) { + if let Some(fc) = self.streams_mut().send_flow_controller(id) { fc.expand_window(sz); } } @@ -110,7 +110,7 @@ impl Sink for FlowControlSend type SinkError = T::SinkError; fn start_send(&mut self, frame: Frame) -> StartSend { - debug_assert!(self.inner.get_reset(frame.stream_id()).is_none()); + debug_assert!(self.streams().get_reset(frame.stream_id()).is_none()); // Ensures that the underlying transport is will accept the frame. It's important // that this be checked before claiming capacity from the flow controllers. @@ -130,8 +130,9 @@ impl Sink for FlowControlSend } // Ensure there's enough capacity on stream. - let mut fc = self.inner.send_flow_controller(v.stream_id()) + let mut fc = self.inner.streams_mut().send_flow_controller(v.stream_id()) .expect("no remote stream for data frame"); + if fc.claim_window(sz).is_err() { return Err(error::User::FlowControlViolation.into()) } @@ -195,7 +196,7 @@ impl ApplySettings for FlowControlSend return Ok(()); } - self.inner.update_inital_send_window_size(old_window_size, new_window_size); + self.streams_mut().update_inital_send_window_size(old_window_size, new_window_size); self.initial_window_size = new_window_size; } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 06dbd957f..2f6dbcc9e 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -97,7 +97,7 @@ use self::stream_recv_close::StreamRecvClose; use self::stream_recv_open::StreamRecvOpen; use self::stream_send_close::StreamSendClose; use self::stream_send_open::StreamSendOpen; -use self::stream_states::StreamStates; +use self::stream_states::{StreamStates, Streams}; /// Represents the internals of an HTTP/2 connection. /// @@ -187,22 +187,22 @@ use self::stream_states::StreamStates; /// /// - Encodes frames to bytes. /// -type Transport= +type Transport= Settings< - Streams< + Streams2< PingPong< Codec, - B>, - P>>; + B>>>; -type Streams = +// TODO: rename +type Streams2 = StreamSendOpen< FlowControlSend< StreamSendClose< StreamRecvClose< FlowControlRecv< StreamRecvOpen< - StreamStates>>>>>>; + StreamStates>>>>>>; type Codec = FramedRead< @@ -303,7 +303,7 @@ pub fn from_server_handshaker(settings: Settings StreamRecvOpen::new( initial_recv_window_size, local_max_concurrency, - StreamStates::new( + StreamStates::new::

( PingPong::new( FramedRead::new(framed))))))))) }); diff --git a/src/proto/stream_recv_close.rs b/src/proto/stream_recv_close.rs index 1868ea6a3..ad5345908 100644 --- a/src/proto/stream_recv_close.rs +++ b/src/proto/stream_recv_close.rs @@ -1,5 +1,4 @@ use ConnectionError; -use error::Reason; use frame::{self, Frame}; use proto::*; use proto::ready::ReadySink; @@ -39,10 +38,10 @@ impl Stream for StreamRecvClose if frame.is_end_stream() { trace!("poll: id={:?} eos", id); if let &Frame::Reset(ref rst) = &frame { - self.inner.reset_stream(id, rst.reason()); + self.streams_mut().reset_stream(id, rst.reason()); } else { - debug_assert!(self.inner.is_active(id)); - self.inner.close_recv_half(id)?; + debug_assert!(self.streams().is_active(id)); + self.streams_mut().close_recv_half(id)?; } } } diff --git a/src/proto/stream_recv_open.rs b/src/proto/stream_recv_open.rs index af9144c97..772a26d26 100644 --- a/src/proto/stream_recv_open.rs +++ b/src/proto/stream_recv_open.rs @@ -42,7 +42,7 @@ impl StreamRecvOpen let f = frame::Reset::new(id, RefusedStream); match self.inner.start_send(f.into())? { AsyncSink::Ready => { - self.inner.reset_stream(id, RefusedStream); + self.streams_mut().reset_stream(id, RefusedStream); Ok(Async::Ready(())) } AsyncSink::NotReady(_) => { @@ -81,7 +81,7 @@ impl ApplySettings for StreamRecvOpen impl StreamRecvOpen { fn check_not_reset(&self, id: StreamId) -> Result<(), ConnectionError> { // Ensure that the stream hasn't been closed otherwise. - match self.inner.get_reset(id) { + match self.streams().get_reset(id) { Some(reason) => Err(reason.into()), None => Ok(()), } @@ -127,20 +127,20 @@ impl Stream for StreamRecvOpen &Frame::Headers(..) => { self.check_not_reset(id)?; - if T::remote_valid_id(id) { - if self.inner.is_remote_active(id) { + if self.streams().is_valid_remote_stream_id(id) { + if self.streams().is_remote_active(id) { // Can't send a a HEADERS frame on a remote stream that's // active, because we've already received headers. This will // have to change to support PUSH_PROMISE. return Err(ProtocolError.into()); } - if !T::remote_can_open() { + if !self.streams().can_remote_open() { return Err(ProtocolError.into()); } if let Some(max) = self.max_concurrency { - if (max as usize) < self.inner.remote_active_len() { + if (max as usize) < self.streams().remote_active_len() { debug!("refusing stream that would exceed max_concurrency={}", max); self.send_refuse(id)?; @@ -149,17 +149,17 @@ impl Stream for StreamRecvOpen } } - self.inner.remote_open(id, self.initial_window_size)?; + self.inner.streams_mut().remote_open(id, self.initial_window_size)?; } else { // On remote streams, - self.inner.local_open_recv_half(id, self.initial_window_size)?; + self.inner.streams_mut().local_open_recv_half(id, self.initial_window_size)?; } } // All other stream frames are sent only when _ => { self.check_not_reset(id)?; - if !self.inner.is_recv_open(id) { + if !self.streams().is_recv_open(id) { return Err(ProtocolError.into()); } } diff --git a/src/proto/stream_send_close.rs b/src/proto/stream_send_close.rs index c2094b1f5..544ca4126 100644 --- a/src/proto/stream_send_close.rs +++ b/src/proto/stream_send_close.rs @@ -1,5 +1,4 @@ use ConnectionError; -use error::Reason; use frame::{self, Frame}; use proto::*; @@ -34,10 +33,10 @@ impl Sink for StreamSendClose if !id.is_zero() { if eos { if let &Frame::Reset(ref rst) = &frame { - self.inner.reset_stream(id, rst.reason()); + self.streams_mut().reset_stream(id, rst.reason()); } else { - debug_assert!(self.inner.is_active(id)); - self.inner.close_send_half(id)?; + debug_assert!(self.streams().is_active(id)); + self.streams_mut().close_send_half(id)?; } } } diff --git a/src/proto/stream_send_open.rs b/src/proto/stream_send_open.rs index 20e4395c3..a1ea59407 100644 --- a/src/proto/stream_send_open.rs +++ b/src/proto/stream_send_open.rs @@ -49,7 +49,7 @@ impl ApplySettings for StreamSendOpen { impl StreamSendOpen { fn check_not_reset(&self, id: StreamId) -> Result<(), ConnectionError> { // Ensure that the stream hasn't been closed otherwise. - match self.inner.get_reset(id) { + match self.streams().get_reset(id) { Some(reason) => Err(StreamReset(reason).into()), None => Ok(()), } @@ -82,15 +82,15 @@ impl Sink for StreamSendOpen &Frame::Headers(..) => { self.check_not_reset(id)?; - if T::local_valid_id(id) { - if self.inner.is_local_active(id) { + if self.streams().is_valid_local_stream_id(id) { + if self.streams().is_local_active(id) { // Can't send a a HEADERS frame on a local stream that's active, // because we've already sent headers. This will have to change // to support PUSH_PROMISE. return Err(UnexpectedFrameType.into()); } - if !T::local_can_open() { + if !self.streams().can_local_open() { // A server tried to start a stream with a HEADERS frame. return Err(UnexpectedFrameType.into()); } @@ -98,15 +98,15 @@ impl Sink for StreamSendOpen if let Some(max) = self.max_concurrency { // Don't allow this stream to overflow the remote's max stream // concurrency. - if (max as usize) < self.inner.local_active_len() { + if (max as usize) < self.streams().local_active_len() { return Err(Rejected.into()); } } - self.inner.local_open(id, self.initial_window_size)?; + self.inner.streams_mut().local_open(id, self.initial_window_size)?; } else { // On remote streams, - if self.inner.remote_open_send_half(id, self.initial_window_size).is_err() { + if self.inner.streams_mut().remote_open_send_half(id, self.initial_window_size).is_err() { return Err(InvalidStreamId.into()); } } @@ -116,7 +116,7 @@ impl Sink for StreamSendOpen // the stream is open (i.e. has already sent headers). _ => { self.check_not_reset(id)?; - if !self.inner.is_send_open(id) { + if !self.streams().is_send_open(id) { return Err(InactiveStreamId.into()); } } diff --git a/src/proto/stream_states.rs b/src/proto/stream_states.rs index b2e7e428f..d805755be 100644 --- a/src/proto/stream_states.rs +++ b/src/proto/stream_states.rs @@ -6,14 +6,20 @@ use proto::stream_state::StreamState; use fnv::FnvHasher; use ordermap::OrderMap; use std::hash::BuildHasherDefault; -use std::marker::PhantomData; /// Holds the underlying stream state to be accessed by upper layers. // TODO track reserved streams // TODO constrain the size of `reset` -#[derive(Debug, Default)] -pub struct StreamStates { +#[derive(Debug)] +pub struct StreamStates { inner: T, + streams: Streams, +} + +#[derive(Debug)] +pub struct Streams { + /// True when in the context of an H2 server. + is_server: bool, /// Holds active streams initiated by the local endpoint. local_active: OrderMap>, @@ -23,30 +29,56 @@ pub struct StreamStates { /// Holds active streams initiated by the remote. reset: OrderMap>, - - _phantom: PhantomData

, } -impl StreamStates +impl StreamStates where T: Stream, T: Sink, SinkError = ConnectionError>, - P: Peer, { - pub fn new(inner: T) -> StreamStates { + pub fn new(inner: T) -> StreamStates { StreamStates { inner, - local_active: OrderMap::default(), - remote_active: OrderMap::default(), - reset: OrderMap::default(), - _phantom: PhantomData, + streams: Streams { + is_server: P::is_server(), + local_active: OrderMap::default(), + remote_active: OrderMap::default(), + reset: OrderMap::default(), + }, } } } -impl StreamStates { - pub fn get_active(&mut self, id: StreamId) -> Option<&StreamState> { +impl ControlStreams for StreamStates { + fn streams(&self) -> &Streams { + &self.streams + } + + fn streams_mut(&mut self) -> &mut Streams { + &mut self.streams + } +} + +impl Streams { + pub fn is_valid_local_stream_id(&self, id: StreamId) -> bool { + if self.is_server { + id.is_server_initiated() + } else { + id.is_client_initiated() + } + } + + pub fn is_valid_remote_stream_id(&self, id: StreamId) -> bool { + if self.is_server { + id.is_client_initiated() + } else { + id.is_server_initiated() + } + } + + pub fn get_active(&self, id: StreamId) -> Option<&StreamState> { assert!(!id.is_zero()); - if P::is_valid_local_stream_id(id) { + + if self.is_valid_local_stream_id(id) { self.local_active.get(&id) } else { self.remote_active.get(&id) @@ -55,7 +87,8 @@ impl StreamStates { pub fn get_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> { assert!(!id.is_zero()); - if P::is_valid_local_stream_id(id) { + + if self.is_valid_local_stream_id(id) { self.local_active.get_mut(&id) } else { self.remote_active.get_mut(&id) @@ -64,31 +97,27 @@ impl StreamStates { pub fn remove_active(&mut self, id: StreamId) { assert!(!id.is_zero()); - if P::is_valid_local_stream_id(id) { + + if self.is_valid_local_stream_id(id) { self.local_active.remove(&id); } else { self.remote_active.remove(&id); } } -} -impl ControlStreams for StreamStates { - fn local_valid_id(id: StreamId) -> bool { - P::is_valid_local_stream_id(id) + pub fn can_local_open(&self) -> bool { + !self.is_server } - fn remote_valid_id(id: StreamId) -> bool { - P::is_valid_remote_stream_id(id) + pub fn can_remote_open(&self) -> bool { + !self.can_local_open() } - fn local_can_open() -> bool { - P::local_can_open() - } - - fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - if !Self::local_valid_id(id) || !Self::local_can_open() { + pub fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + if !self.is_valid_local_stream_id(id) || !self.can_local_open() { return Err(ProtocolError.into()); } + if self.local_active.contains_key(&id) { return Err(ProtocolError.into()); } @@ -97,8 +126,8 @@ impl ControlStreams for StreamStates { Ok(()) } - fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - if !Self::remote_valid_id(id) || !Self::remote_can_open() { + pub fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + if !self.is_valid_remote_stream_id(id) || !self.can_remote_open() { return Err(ProtocolError.into()); } if self.remote_active.contains_key(&id) { @@ -109,8 +138,8 @@ impl ControlStreams for StreamStates { Ok(()) } - fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - if !Self::local_valid_id(id) { + pub fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + if !self.is_valid_local_stream_id(id) { return Err(ProtocolError.into()); } @@ -120,8 +149,8 @@ impl ControlStreams for StreamStates { } } - fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - if !Self::remote_valid_id(id) { + pub fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + if !self.is_valid_remote_stream_id(id) { return Err(ProtocolError.into()); } @@ -131,7 +160,7 @@ impl ControlStreams for StreamStates { } } - fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + pub fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { let fully_closed = self.get_active_mut(id) .map(|s| s.close_send_half()) .unwrap_or_else(|| Err(ProtocolError.into()))?; @@ -143,7 +172,7 @@ impl ControlStreams for StreamStates { Ok(()) } - fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + pub fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { let fully_closed = self.get_active_mut(id) .map(|s| s.close_recv_half()) .unwrap_or_else(|| Err(ProtocolError.into()))?; @@ -155,46 +184,55 @@ impl ControlStreams for StreamStates { Ok(()) } - fn reset_stream(&mut self, id: StreamId, cause: Reason) { + pub fn reset_stream(&mut self, id: StreamId, cause: Reason) { self.remove_active(id); self.reset.insert(id, cause); } - fn get_reset(&self, id: StreamId) -> Option { + pub fn get_reset(&self, id: StreamId) -> Option { self.reset.get(&id).map(|r| *r) } - fn is_local_active(&self, id: StreamId) -> bool { + pub fn is_local_active(&self, id: StreamId) -> bool { self.local_active.contains_key(&id) } - fn is_remote_active(&self, id: StreamId) -> bool { + pub fn is_remote_active(&self, id: StreamId) -> bool { self.remote_active.contains_key(&id) } - fn is_send_open(&mut self, id: StreamId) -> bool { + /// Returns true if the given stream was opened and is not yet closed. + pub fn is_active(&self, id: StreamId) -> bool { + if self.is_valid_local_stream_id(id) { + self.is_local_active(id) + } else { + self.is_remote_active(id) + } + } + + pub fn is_send_open(&self, id: StreamId) -> bool { match self.get_active(id) { Some(s) => s.is_send_open(), None => false, } } - fn is_recv_open(&mut self, id: StreamId) -> bool { + pub fn is_recv_open(&self, id: StreamId) -> bool { match self.get_active(id) { Some(s) => s.is_recv_open(), None => false, } } - fn local_active_len(&self) -> usize { + pub fn local_active_len(&self) -> usize { self.local_active.len() } - fn remote_active_len(&self) -> usize { + pub fn remote_active_len(&self) -> usize { self.remote_active.len() } - fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { + pub fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { if new_sz < old_sz { let decr = old_sz - new_sz; @@ -226,7 +264,7 @@ impl ControlStreams for StreamStates { } } - fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { + pub fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { if new_sz < old_sz { let decr = old_sz - new_sz; @@ -258,20 +296,21 @@ impl ControlStreams for StreamStates { } } - fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + pub fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + // TODO: Abstract getting the state for a stream ID if id.is_zero() { None - } else if P::is_valid_local_stream_id(id) { + } else if self.is_valid_local_stream_id(id) { self.local_active.get_mut(&id).and_then(|s| s.recv_flow_controller()) } else { self.remote_active.get_mut(&id).and_then(|s| s.recv_flow_controller()) } } - fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + pub fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { if id.is_zero() { None - } else if P::is_valid_local_stream_id(id) { + } else if self.is_valid_local_stream_id(id) { self.local_active.get_mut(&id).and_then(|s| s.send_flow_controller()) } else { self.remote_active.get_mut(&id).and_then(|s| s.send_flow_controller()) @@ -279,8 +318,8 @@ impl ControlStreams for StreamStates { } } -proxy_apply_settings!(StreamStates, P); -proxy_control_ping!(StreamStates, P); -proxy_stream!(StreamStates, P); -proxy_sink!(StreamStates, P); -proxy_ready_sink!(StreamStates, P); +proxy_apply_settings!(StreamStates); +proxy_control_ping!(StreamStates); +proxy_stream!(StreamStates); +proxy_sink!(StreamStates); +proxy_ready_sink!(StreamStates); diff --git a/src/server.rs b/src/server.rs index b36c419ef..ec36b7824 100644 --- a/src/server.rs +++ b/src/server.rs @@ -111,16 +111,8 @@ impl Peer for Server { type Send = http::response::Head; type Poll = http::request::Head; - fn is_valid_local_stream_id(id: StreamId) -> bool { - id.is_server_initiated() - } - - fn is_valid_remote_stream_id(id: StreamId) -> bool { - id.is_client_initiated() - } - - fn local_can_open() -> bool { - false + fn is_server() -> bool { + true } fn convert_send_message(