Skip to content

deps: upgrade to tokio v1.0 ecosystem #717

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions postgres-native-tls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ runtime = ["tokio-postgres/runtime"]
[dependencies]
futures = "0.3"
native-tls = "0.2"
tokio = "0.3"
tokio-native-tls = "0.2"
tokio = "1.0"
tokio-native-tls = "0.3"
tokio-postgres = { version = "0.6.0", path = "../tokio-postgres", default-features = false }

[dev-dependencies]
tokio = { version = "0.3", features = ["full"] }
tokio = { version = "1.0", features = ["full"] }
postgres = { version = "0.18.0", path = "../postgres" }
6 changes: 3 additions & 3 deletions postgres-openssl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ runtime = ["tokio-postgres/runtime"]
[dependencies]
futures = "0.3"
openssl = "0.10"
tokio = "0.3"
tokio-openssl = "0.5"
tokio = "1.0"
tokio-openssl = "0.6"
tokio-postgres = { version = "0.6.0", path = "../tokio-postgres", default-features = false }

[dev-dependencies]
tokio = { version = "0.3", features = ["full"] }
tokio = { version = "1.0", features = ["full"] }
postgres = { version = "0.18.0", path = "../postgres" }
50 changes: 42 additions & 8 deletions postgres-openssl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,18 @@ use openssl::hash::MessageDigest;
use openssl::nid::Nid;
#[cfg(feature = "runtime")]
use openssl::ssl::SslConnector;
use openssl::ssl::{ConnectConfiguration, SslRef};
use std::fmt::Debug;
use openssl::ssl::{self, ConnectConfiguration, SslRef};
use openssl::x509::X509VerifyResult;
use std::error::Error;
use std::fmt::{self, Debug};
use std::future::Future;
use std::io;
use std::pin::Pin;
#[cfg(feature = "runtime")]
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_openssl::{HandshakeError, SslStream};
use tokio_openssl::SslStream;
use tokio_postgres::tls;
#[cfg(feature = "runtime")]
use tokio_postgres::tls::MakeTlsConnect;
Expand Down Expand Up @@ -131,23 +133,55 @@ impl TlsConnector {

impl<S> TlsConnect<S> for TlsConnector
where
S: AsyncRead + AsyncWrite + Unpin + Debug + 'static + Sync + Send,
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Stream = TlsStream<S>;
type Error = HandshakeError<S>;
type Error = Box<dyn Error + Send + Sync>;
#[allow(clippy::type_complexity)]
type Future = Pin<Box<dyn Future<Output = Result<TlsStream<S>, HandshakeError<S>>> + Send>>;
type Future = Pin<Box<dyn Future<Output = Result<TlsStream<S>, Self::Error>> + Send>>;

fn connect(self, stream: S) -> Self::Future {
let future = async move {
let stream = tokio_openssl::connect(self.ssl, &self.domain, stream).await?;
Ok(TlsStream(stream))
let ssl = self.ssl.into_ssl(&self.domain)?;
let mut stream = SslStream::new(ssl, stream)?;
match Pin::new(&mut stream).connect().await {
Ok(()) => Ok(TlsStream(stream)),
Err(error) => Err(Box::new(ConnectError {
error,
verify_result: stream.ssl().verify_result(),
}) as _),
}
};

Box::pin(future)
}
}

#[derive(Debug)]
struct ConnectError {
error: ssl::Error,
verify_result: X509VerifyResult,
}

impl fmt::Display for ConnectError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.error, fmt)?;

if self.verify_result != X509VerifyResult::OK {
fmt.write_str(": ")?;
fmt::Display::fmt(&self.verify_result, fmt)?;
}

Ok(())
}
}

impl Error for ConnectError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
Some(&self.error)
}
}

/// The stream returned by `TlsConnector`.
pub struct TlsStream<S>(SslStream<S>);

Expand Down
2 changes: 1 addition & 1 deletion postgres-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ readme = "../README.md"
[dependencies]
base64 = "0.13"
byteorder = "1.0"
bytes = "0.5"
bytes = "1.0"
fallible-iterator = "0.2"
hmac = "0.10"
md5 = "0.7"
Expand Down
2 changes: 1 addition & 1 deletion postgres-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ with-uuid-0_8 = ["uuid-08"]
with-time-0_2 = ["time-02"]

[dependencies]
bytes = "0.5"
bytes = "1.0"
fallible-iterator = "0.2"
postgres-protocol = { version = "0.5.0", path = "../postgres-protocol" }
postgres-derive = { version = "0.4.0", optional = true, path = "../postgres-derive" }
Expand Down
1 change: 0 additions & 1 deletion postgres-types/src/serde_json_1.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::{FromSql, IsNull, ToSql, Type};
use bytes::buf::BufMutExt;
use bytes::{BufMut, BytesMut};
use serde_1::{Deserialize, Serialize};
use serde_json_1::Value;
Expand Down
4 changes: 2 additions & 2 deletions postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ with-uuid-0_8 = ["tokio-postgres/with-uuid-0_8"]
with-time-0_2 = ["tokio-postgres/with-time-0_2"]

[dependencies]
bytes = "0.5"
bytes = "1.0"
fallible-iterator = "0.2"
futures = "0.3"
tokio-postgres = { version = "0.6.0", path = "../tokio-postgres" }

tokio = { version = "0.3", features = ["rt", "time"] }
tokio = { version = "1.0", features = ["rt", "time"] }
log = "0.4"

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion postgres/src/copy_out_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl BufRead for CopyOutReader<'_> {
};
}

Ok(self.cur.bytes())
Ok(&self.cur)
}

fn consume(&mut self, amt: usize) {
Expand Down
9 changes: 5 additions & 4 deletions postgres/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::connection::ConnectionRef;
use crate::{Error, Notification};
use fallible_iterator::FallibleIterator;
use futures::{ready, FutureExt};
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;
use tokio::time::{self, Instant, Sleep};
Expand Down Expand Up @@ -64,7 +65,7 @@ impl<'a> Notifications<'a> {
/// This iterator may start returning `Some` after previously returning `None` if more notifications are received.
pub fn timeout_iter(&mut self, timeout: Duration) -> TimeoutIter<'_> {
TimeoutIter {
delay: self.connection.enter(|| time::sleep(timeout)),
delay: Box::pin(self.connection.enter(|| time::sleep(timeout))),
timeout,
connection: self.connection.as_ref(),
}
Expand Down Expand Up @@ -124,7 +125,7 @@ impl<'a> FallibleIterator for BlockingIter<'a> {
/// A time-limited blocking iterator over pending notifications.
pub struct TimeoutIter<'a> {
connection: ConnectionRef<'a>,
delay: Sleep,
delay: Pin<Box<Sleep>>,
timeout: Duration,
}

Expand All @@ -134,7 +135,7 @@ impl<'a> FallibleIterator for TimeoutIter<'a> {

fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
if let Some(notification) = self.connection.notifications_mut().pop_front() {
self.delay.reset(Instant::now() + self.timeout);
self.delay.as_mut().reset(Instant::now() + self.timeout);
return Ok(Some(notification));
}

Expand All @@ -143,7 +144,7 @@ impl<'a> FallibleIterator for TimeoutIter<'a> {
self.connection.poll_block_on(|cx, notifications, done| {
match notifications.pop_front() {
Some(notification) => {
delay.reset(Instant::now() + timeout);
delay.as_mut().reset(Instant::now() + timeout);
return Poll::Ready(Ok(Some(notification)));
}
None if done => return Poll::Ready(Ok(None)),
Expand Down
8 changes: 4 additions & 4 deletions tokio-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ with-time-0_2 = ["postgres-types/with-time-0_2"]

[dependencies]
async-trait = "0.1"
bytes = "0.5"
bytes = "1.0"
byteorder = "1.0"
fallible-iterator = "0.2"
futures = "0.3"
Expand All @@ -50,11 +50,11 @@ phf = "0.8"
postgres-protocol = { version = "0.5.0", path = "../postgres-protocol" }
postgres-types = { version = "0.1.2", path = "../postgres-types" }
socket2 = "0.3"
tokio = { version = "0.3", features = ["io-util"] }
tokio-util = { version = "0.4", features = ["codec"] }
tokio = { version = "1.0", features = ["io-util"] }
tokio-util = { version = "0.6", features = ["codec"] }

[dev-dependencies]
tokio = { version = "0.3", features = ["full"] }
tokio = { version = "1.0", features = ["full"] }
env_logger = "0.8"
criterion = "0.3"

Expand Down
2 changes: 1 addition & 1 deletion tokio-postgres/src/binary_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl Stream for BinaryCopyOutStream {
Some(header) => header.has_oids,
None => {
check_remaining(&chunk, HEADER_LEN)?;
if &chunk.bytes()[..MAGIC.len()] != MAGIC {
if !chunk.chunk().starts_with(MAGIC) {
return Poll::Ready(Some(Err(Error::parse(io::Error::new(
io::ErrorKind::InvalidData,
"invalid magic value",
Expand Down
1 change: 0 additions & 1 deletion tokio-postgres/src/copy_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::{query, slice_iter, Error, Statement};
use bytes::buf::BufExt;
use bytes::{Buf, BufMut, BytesMut};
use futures::channel::mpsc;
use futures::future;
Expand Down