From db472bd60317dcb032cc3e5c9ef4a03bee6b373c Mon Sep 17 00:00:00 2001 From: Stephen Leyva Date: Sat, 12 Sep 2020 19:14:07 -0700 Subject: [PATCH 1/9] Implement static node list connection pool * This commit adds a connection pool that takes a static list of nodes and distributes the load. * trait for setting connection distribution. Defaults to RoundRobin. --- elasticsearch/src/http/transport.rs | 122 +++++++++++++++++++++++++++- 1 file changed, 121 insertions(+), 1 deletion(-) diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index 6afb0244..6fbbe1ad 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -42,6 +42,10 @@ use std::{ error, fmt, fmt::Debug, io::{self, Write}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, time::Duration, }; use url::Url; @@ -337,6 +341,15 @@ impl Transport { Ok(transport) } + /// Creates a new instance of a [Transport] configured with a + /// [StaticNodeListConnectionPool] + pub fn static_node_list(urls: Vec<&str>) -> Result { + let urls = urls.iter().map(|url| Url::parse(url).unwrap()).collect(); + let conn_pool = StaticNodeListConnectionPool::round_robin(urls); + let transport = TransportBuilder::new(conn_pool).build()?; + Ok(transport) + } + /// Creates a new instance of a [Transport] configured for use with /// [Elasticsearch service in Elastic Cloud](https://www.elastic.co/cloud/). /// @@ -600,11 +613,75 @@ impl ConnectionPool for CloudConnectionPool { } } +/// A Connection Pool that manages a static connection of nodes +#[derive(Debug, Clone)] +pub struct StaticNodeListConnectionPool { + connections: Vec, + strategy: TStrategy, +} + +impl ConnectionPool for StaticNodeListConnectionPool +where + TStrategy: Strategy + Clone, +{ + fn next(&self) -> &Connection { + self.strategy.try_next(&self.connections).unwrap() + } +} + +impl StaticNodeListConnectionPool { + /** Use a round-robin strategy for balancing traffic over the given set of nodes. */ + pub fn round_robin(urls: Vec) -> Self { + let connections: Vec<_> = urls.into_iter().map(Connection::new).collect(); + + let strategy = RoundRobin::default(); + + Self { + connections, + strategy, + } + } +} + +/** The strategy selects an address from a given collection. */ +pub trait Strategy: Send + Sync + Debug { + /** Try get the next connection. */ + fn try_next<'a>(&self, connections: &'a [Connection]) -> Result<&'a Connection, Error>; +} + +/** A round-robin strategy cycles through nodes sequentially. */ +#[derive(Clone, Debug)] +pub struct RoundRobin { + index: Arc, +} + +impl Default for RoundRobin { + fn default() -> Self { + RoundRobin { + index: Arc::new(AtomicUsize::new(0)), + } + } +} + +impl Strategy for RoundRobin { + fn try_next<'a>(&self, connections: &'a [Connection]) -> Result<&'a Connection, Error> { + if connections.is_empty() { + Err(crate::error::lib("Connection list empty")) + } else { + let i = self.index.fetch_add(1, Ordering::Relaxed) % connections.len(); + Ok(&connections[i]) + } + } +} + #[cfg(test)] pub mod tests { #[cfg(any(feature = "native-tls", feature = "rustls-tls"))] use crate::auth::ClientCertificate; - use crate::http::transport::{CloudId, Connection, SingleNodeConnectionPool, TransportBuilder}; + use crate::http::transport::{ + CloudId, Connection, ConnectionPool, RoundRobin, SingleNodeConnectionPool, + StaticNodeListConnectionPool, TransportBuilder, + }; use url::Url; #[test] @@ -714,4 +791,47 @@ pub mod tests { let conn = Connection::new(url); assert_eq!(conn.url.as_str(), "http://10.1.2.3/"); } + + fn round_robin(addresses: Vec) -> StaticNodeListConnectionPool { + StaticNodeListConnectionPool::round_robin(addresses) + } + + fn expected_addresses() -> Vec { + vec!["http://a:9200/", "http://b:9200/", "http://c:9200/"] + .iter() + .map(|addr| Url::parse(addr).unwrap()) + .collect() + } + + #[test] + fn round_robin_next_multi() { + let connections = round_robin(expected_addresses()); + + for _ in 0..10 { + for expected in expected_addresses() { + let actual = connections.next(); + + assert_eq!(expected.as_str(), actual.url.as_str()); + } + } + } + + #[test] + fn round_robin_next_single() { + let expected = Url::parse("http://a:9200/").unwrap(); + let connections = round_robin(vec![expected.clone()]); + + for _ in 0..10 { + let actual = connections.next(); + + assert_eq!(expected.as_str(), actual.url.as_str()); + } + } + + #[test] + #[should_panic] + fn round_robin_next_empty_fails() { + let connections = round_robin(vec![]); + connections.next(); + } } From 21b563dceb8ef287ebbd4edb15239ecd2c917475 Mon Sep 17 00:00:00 2001 From: Stephen Leyva Date: Tue, 15 Sep 2020 18:05:49 -0700 Subject: [PATCH 2/9] Fail fast if iterations fail for url parsing on static nodes --- elasticsearch/src/http/transport.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index 6fbbe1ad..85c14386 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -344,7 +344,10 @@ impl Transport { /// Creates a new instance of a [Transport] configured with a /// [StaticNodeListConnectionPool] pub fn static_node_list(urls: Vec<&str>) -> Result { - let urls = urls.iter().map(|url| Url::parse(url).unwrap()).collect(); + let urls: Vec = urls + .iter() + .map(|url| Url::parse(url)) + .collect::, _>>()?; let conn_pool = StaticNodeListConnectionPool::round_robin(urls); let transport = TransportBuilder::new(conn_pool).build()?; Ok(transport) From 2590242908e37dfc048f38886a7ce217bc5ac918 Mon Sep 17 00:00:00 2001 From: Stephen Leyva Date: Sun, 20 Sep 2020 22:06:56 -0700 Subject: [PATCH 3/9] Rename connection and return cloned object * The connection should be owned by the current user of said connection --- elasticsearch/src/http/transport.rs | 44 +++++++++++++++-------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index 85c14386..8fe79bcb 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -44,7 +44,7 @@ use std::{ io::{self, Write}, sync::{ atomic::{AtomicUsize, Ordering}, - Arc, + Arc, RwLock, }, time::Duration, }; @@ -348,7 +348,7 @@ impl Transport { .iter() .map(|url| Url::parse(url)) .collect::, _>>()?; - let conn_pool = StaticNodeListConnectionPool::round_robin(urls); + let conn_pool = MultiNodeConnectionPool::round_robin(urls); let transport = TransportBuilder::new(conn_pool).build()?; Ok(transport) } @@ -461,7 +461,7 @@ impl Default for Transport { /// dynamically at runtime, based upon the response to API calls. pub trait ConnectionPool: Debug + dyn_clone::DynClone + Sync + Send { /// Gets a reference to the next [Connection] - fn next(&self) -> &Connection; + fn next(&self) -> Connection; } clone_trait_object!(ConnectionPool); @@ -490,8 +490,8 @@ impl Default for SingleNodeConnectionPool { impl ConnectionPool for SingleNodeConnectionPool { /// Gets a reference to the next [Connection] - fn next(&self) -> &Connection { - &self.connection + fn next(&self) -> Connection { + self.connection.clone() } } @@ -611,31 +611,33 @@ impl CloudConnectionPool { impl ConnectionPool for CloudConnectionPool { /// Gets a reference to the next [Connection] - fn next(&self) -> &Connection { - &self.connection + fn next(&self) -> Connection { + self.connection.clone() } } /// A Connection Pool that manages a static connection of nodes #[derive(Debug, Clone)] -pub struct StaticNodeListConnectionPool { - connections: Vec, +pub struct MultiNodeConnectionPool { + connections: Arc>>, strategy: TStrategy, } -impl ConnectionPool for StaticNodeListConnectionPool +impl ConnectionPool for MultiNodeConnectionPool where TStrategy: Strategy + Clone, { - fn next(&self) -> &Connection { - self.strategy.try_next(&self.connections).unwrap() + fn next(&self) -> Connection { + let inner = self.connections.read().expect("lock poisoned"); + self.strategy.try_next(&inner).unwrap() } } -impl StaticNodeListConnectionPool { +impl MultiNodeConnectionPool { /** Use a round-robin strategy for balancing traffic over the given set of nodes. */ pub fn round_robin(urls: Vec) -> Self { - let connections: Vec<_> = urls.into_iter().map(Connection::new).collect(); + let connections: Arc>> = + Arc::new(RwLock::new(urls.into_iter().map(Connection::new).collect())); let strategy = RoundRobin::default(); @@ -649,7 +651,7 @@ impl StaticNodeListConnectionPool { /** The strategy selects an address from a given collection. */ pub trait Strategy: Send + Sync + Debug { /** Try get the next connection. */ - fn try_next<'a>(&self, connections: &'a [Connection]) -> Result<&'a Connection, Error>; + fn try_next<'a>(&self, connections: &'a [Connection]) -> Result; } /** A round-robin strategy cycles through nodes sequentially. */ @@ -667,12 +669,12 @@ impl Default for RoundRobin { } impl Strategy for RoundRobin { - fn try_next<'a>(&self, connections: &'a [Connection]) -> Result<&'a Connection, Error> { + fn try_next<'a>(&self, connections: &'a [Connection]) -> Result { if connections.is_empty() { Err(crate::error::lib("Connection list empty")) } else { let i = self.index.fetch_add(1, Ordering::Relaxed) % connections.len(); - Ok(&connections[i]) + Ok(connections[i].clone()) } } } @@ -682,8 +684,8 @@ pub mod tests { #[cfg(any(feature = "native-tls", feature = "rustls-tls"))] use crate::auth::ClientCertificate; use crate::http::transport::{ - CloudId, Connection, ConnectionPool, RoundRobin, SingleNodeConnectionPool, - StaticNodeListConnectionPool, TransportBuilder, + CloudId, Connection, ConnectionPool, MultiNodeConnectionPool, RoundRobin, + SingleNodeConnectionPool, TransportBuilder, }; use url::Url; @@ -795,8 +797,8 @@ pub mod tests { assert_eq!(conn.url.as_str(), "http://10.1.2.3/"); } - fn round_robin(addresses: Vec) -> StaticNodeListConnectionPool { - StaticNodeListConnectionPool::round_robin(addresses) + fn round_robin(addresses: Vec) -> MultiNodeConnectionPool { + MultiNodeConnectionPool::round_robin(addresses) } fn expected_addresses() -> Vec { From 27056632781a9a991ebd71da084fc7d6d16fe3af Mon Sep 17 00:00:00 2001 From: Stephen Leyva Date: Sun, 20 Sep 2020 22:52:15 -0700 Subject: [PATCH 4/9] Allow reseeding of nodes on MultiNodeConnection --- elasticsearch/src/http/transport.rs | 125 +++++++++++++++++++++++----- 1 file changed, 106 insertions(+), 19 deletions(-) diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index 8fe79bcb..9e1b1e6a 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -46,7 +46,7 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, RwLock, }, - time::Duration, + time::{Duration, Instant}, }; use url::Url; @@ -348,7 +348,7 @@ impl Transport { .iter() .map(|url| Url::parse(url)) .collect::, _>>()?; - let conn_pool = MultiNodeConnectionPool::round_robin(urls); + let conn_pool = MultiNodeConnectionPool::round_robin(urls, None); let transport = TransportBuilder::new(conn_pool).build()?; Ok(transport) } @@ -379,6 +379,10 @@ impl Transport { B: Body, Q: Serialize + ?Sized, { + if self.conn_pool.reseedable() { + // Reseed nodes + println!("Reseeding!"); + } let connection = self.conn_pool.next(); let url = connection.url.join(path.trim_start_matches('/'))?; let reqwest_method = self.method(method); @@ -462,6 +466,13 @@ impl Default for Transport { pub trait ConnectionPool: Debug + dyn_clone::DynClone + Sync + Send { /// Gets a reference to the next [Connection] fn next(&self) -> Connection; + + fn reseedable(&self) -> bool { + false + } + + // NOOP by default + fn reseed(&self, _connection: Vec) {} } clone_trait_object!(ConnectionPool); @@ -619,31 +630,63 @@ impl ConnectionPool for CloudConnectionPool { /// A Connection Pool that manages a static connection of nodes #[derive(Debug, Clone)] pub struct MultiNodeConnectionPool { - connections: Arc>>, + inner: Arc>, + wait: Option, strategy: TStrategy, } +#[derive(Debug, Clone)] +pub struct MultiNodeConnectionPoolInner { + last_update: Option, + connections: Vec, +} + impl ConnectionPool for MultiNodeConnectionPool where TStrategy: Strategy + Clone, { fn next(&self) -> Connection { - let inner = self.connections.read().expect("lock poisoned"); - self.strategy.try_next(&inner).unwrap() + let inner = self.inner.read().expect("lock poisoned"); + self.strategy.try_next(&inner.connections).unwrap() + } + + fn reseedable(&self) -> bool { + let inner = self.inner.read().expect("lock poisoned"); + let wait = match self.wait { + Some(wait) => wait, + None => return false, + }; + let last_update_is_stale = inner + .last_update + .as_ref() + .map(|last_update| last_update.elapsed() > wait); + last_update_is_stale.unwrap_or(true) + } + + fn reseed(&self, mut connection: Vec) { + let mut inner = self.inner.write().expect("lock poisoned"); + inner.last_update = Some(Instant::now()); + inner.connections.clear(); + inner.connections.append(&mut connection); } } impl MultiNodeConnectionPool { /** Use a round-robin strategy for balancing traffic over the given set of nodes. */ - pub fn round_robin(urls: Vec) -> Self { - let connections: Arc>> = - Arc::new(RwLock::new(urls.into_iter().map(Connection::new).collect())); + pub fn round_robin(urls: Vec, wait: Option) -> Self { + let connections = urls.into_iter().map(Connection::new).collect(); - let strategy = RoundRobin::default(); + let inner: Arc> = + Arc::new(RwLock::new(MultiNodeConnectionPoolInner { + last_update: None, + connections, + })); + let strategy = RoundRobin::default(); Self { - connections, + inner, strategy, + wait, } } } @@ -684,9 +727,10 @@ pub mod tests { #[cfg(any(feature = "native-tls", feature = "rustls-tls"))] use crate::auth::ClientCertificate; use crate::http::transport::{ - CloudId, Connection, ConnectionPool, MultiNodeConnectionPool, RoundRobin, - SingleNodeConnectionPool, TransportBuilder, + CloudId, Connection, ConnectionPool, MultiNodeConnectionPool, SingleNodeConnectionPool, + TransportBuilder, }; + use std::time::{Duration, Instant}; use url::Url; #[test] @@ -797,10 +841,6 @@ pub mod tests { assert_eq!(conn.url.as_str(), "http://10.1.2.3/"); } - fn round_robin(addresses: Vec) -> MultiNodeConnectionPool { - MultiNodeConnectionPool::round_robin(addresses) - } - fn expected_addresses() -> Vec { vec!["http://a:9200/", "http://b:9200/", "http://c:9200/"] .iter() @@ -808,9 +848,56 @@ pub mod tests { .collect() } + #[test] + fn test_reseedable_false_on_no_duration() { + let connections = MultiNodeConnectionPool::round_robin(expected_addresses(), None); + assert!(!connections.reseedable()); + } + + #[test] + fn test_reseed() { + let connection_pool = + MultiNodeConnectionPool::round_robin(vec![], Some(Duration::from_secs(28800))); + + let connections = expected_addresses() + .into_iter() + .map(Connection::new) + .collect(); + + connection_pool.reseeding(); + connection_pool.reseed(connections); + for _ in 0..10 { + for expected in expected_addresses() { + let actual = connection_pool.next(); + + assert_eq!(expected.as_str(), actual.url.as_str()); + } + } + // Check connection pool not reseedable after reseed + assert!(!connection_pool.reseedable()); + + let inner = connection_pool.inner.read().expect("lock poisoned"); + assert!(!inner.reseeding); + } + + #[test] + fn test_reseedable_after_duration() { + let connection_pool = MultiNodeConnectionPool::round_robin( + expected_addresses(), + Some(Duration::from_secs(30)), + ); + + // Set internal last_update to a minute ago + let mut inner = connection_pool.inner.write().expect("lock poisoned"); + inner.last_update = Some(Instant::now() - Duration::from_secs(60)); + drop(inner); + + assert!(connection_pool.reseedable()); + } + #[test] fn round_robin_next_multi() { - let connections = round_robin(expected_addresses()); + let connections = MultiNodeConnectionPool::round_robin(expected_addresses(), None); for _ in 0..10 { for expected in expected_addresses() { @@ -824,7 +911,7 @@ pub mod tests { #[test] fn round_robin_next_single() { let expected = Url::parse("http://a:9200/").unwrap(); - let connections = round_robin(vec![expected.clone()]); + let connections = MultiNodeConnectionPool::round_robin(vec![expected.clone()], None); for _ in 0..10 { let actual = connections.next(); @@ -836,7 +923,7 @@ pub mod tests { #[test] #[should_panic] fn round_robin_next_empty_fails() { - let connections = round_robin(vec![]); + let connections = MultiNodeConnectionPool::round_robin(vec![], None); connections.next(); } } From 61c7b0b0e6a1577c922bc88c82887e734d2302d7 Mon Sep 17 00:00:00 2001 From: Stephen Leyva Date: Sun, 20 Sep 2020 23:22:30 -0700 Subject: [PATCH 5/9] Implement Sniff Nodes request * make review edits --- elasticsearch/src/http/transport.rs | 129 ++++++++++++++++++++++------ 1 file changed, 104 insertions(+), 25 deletions(-) diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index 9e1b1e6a..9e30fe02 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -38,6 +38,7 @@ use crate::{ use base64::write::EncoderWriter as Base64Encoder; use bytes::BytesMut; use serde::Serialize; +use serde_json::Value; use std::{ error, fmt, fmt::Debug, @@ -288,7 +289,7 @@ impl Default for TransportBuilder { /// A connection to an Elasticsearch node, used to send an API request #[derive(Debug, Clone)] pub struct Connection { - url: Url, + url: Arc, } impl Connection { @@ -303,8 +304,14 @@ impl Connection { url }; + let url = Arc::new(url); + Self { url } } + + pub fn url(&self) -> Arc { + self.url.clone() + } } /// A HTTP transport responsible for making the API requests to Elasticsearch, @@ -365,27 +372,22 @@ impl Transport { Ok(transport) } - /// Creates an asynchronous request that can be awaited - pub async fn send( + pub fn request_builder( &self, + connection: &Connection, method: Method, path: &str, headers: HeaderMap, query_string: Option<&Q>, body: Option, timeout: Option, - ) -> Result + ) -> Result where B: Body, Q: Serialize + ?Sized, { - if self.conn_pool.reseedable() { - // Reseed nodes - println!("Reseeding!"); - } - let connection = self.conn_pool.next(); - let url = connection.url.join(path.trim_start_matches('/'))?; let reqwest_method = self.method(method); + let url = connection.url.join(path.trim_start_matches('/'))?; let mut request_builder = self.client.request(reqwest_method, url); if let Some(t) = timeout { @@ -442,6 +444,70 @@ impl Transport { if let Some(q) = query_string { request_builder = request_builder.query(q); } + Ok(request_builder) + } + + /// Creates an asynchronous request that can be awaited + pub async fn send( + &self, + method: Method, + path: &str, + headers: HeaderMap, + query_string: Option<&Q>, + body: Option, + timeout: Option, + ) -> Result + where + B: Body, + Q: Serialize + ?Sized, + { + // Threads will execute against old connection pool during reseed + if self.conn_pool.reseedable() { + // Set as reseeding prevents another thread from attempting + // to reseed during es request and reseed + self.conn_pool.reseeding(); + + let connection = self.conn_pool.next(); + let scheme = &connection.url.scheme(); + // Build node info request + let node_request = self.request_builder( + &connection, + Method::Get, + "_nodes/_all/http", + headers.clone(), + None::<&Q>, + None::, + timeout, + )?; + let resp = node_request.send().await?; + let json: Value = resp.json().await?; + let connections: Vec = json["nodes"] + .as_object() + .unwrap() + .iter() + .map(|h| { + let url = format!( + "{}://{}", + scheme, + h.1["http"]["publish_address"].as_str().unwrap() + ); + let url = Url::parse(&url).unwrap(); + Connection::new(url) + }) + .collect(); + self.conn_pool.reseed(connections); + } + + let connection = self.conn_pool.next(); + let request_builder = self.request_builder( + &connection, + method, + path, + headers, + query_string, + body, + timeout, + )?; let response = request_builder.send().await; match response { @@ -471,6 +537,9 @@ pub trait ConnectionPool: Debug + dyn_clone::DynClone + Sync + Send { false } + // NOOP + fn reseeding(&self) {} + // NOOP by default fn reseed(&self, _connection: Vec) {} } @@ -629,38 +698,46 @@ impl ConnectionPool for CloudConnectionPool { /// A Connection Pool that manages a static connection of nodes #[derive(Debug, Clone)] -pub struct MultiNodeConnectionPool { +pub struct MultiNodeConnectionPool { inner: Arc>, - wait: Option, - strategy: TStrategy, + reseed_frequency: Option, + load_balancing_strategy: LoadBalancing, } #[derive(Debug, Clone)] pub struct MultiNodeConnectionPoolInner { + reseeding: bool, last_update: Option, connections: Vec, } -impl ConnectionPool for MultiNodeConnectionPool +impl ConnectionPool for MultiNodeConnectionPool where - TStrategy: Strategy + Clone, + LoadBalancing: LoadBalancingStrategy + Clone, { fn next(&self) -> Connection { let inner = self.inner.read().expect("lock poisoned"); - self.strategy.try_next(&inner.connections).unwrap() + self.load_balancing_strategy + .try_next(&inner.connections) + .unwrap() } fn reseedable(&self) -> bool { let inner = self.inner.read().expect("lock poisoned"); - let wait = match self.wait { + let reseed_frequency = match self.reseed_frequency { Some(wait) => wait, None => return false, }; let last_update_is_stale = inner .last_update .as_ref() - .map(|last_update| last_update.elapsed() > wait); - last_update_is_stale.unwrap_or(true) + .map(|last_update| last_update.elapsed() > reseed_frequency); + last_update_is_stale.unwrap_or(true) && !inner.reseeding + } + + fn reseeding(&self) { + let mut inner = self.inner.write().expect("Lock Poisoned"); + inner.reseeding = true } fn reseed(&self, mut connection: Vec) { @@ -668,31 +745,33 @@ where inner.last_update = Some(Instant::now()); inner.connections.clear(); inner.connections.append(&mut connection); + inner.reseeding = false; } } impl MultiNodeConnectionPool { /** Use a round-robin strategy for balancing traffic over the given set of nodes. */ - pub fn round_robin(urls: Vec, wait: Option) -> Self { + pub fn round_robin(urls: Vec, reseed_frequency: Option) -> Self { let connections = urls.into_iter().map(Connection::new).collect(); let inner: Arc> = Arc::new(RwLock::new(MultiNodeConnectionPoolInner { + reseeding: false, last_update: None, connections, })); - let strategy = RoundRobin::default(); + let load_balancing_strategy = RoundRobin::default(); Self { inner, - strategy, - wait, + load_balancing_strategy, + reseed_frequency, } } } /** The strategy selects an address from a given collection. */ -pub trait Strategy: Send + Sync + Debug { +pub trait LoadBalancingStrategy: Send + Sync + Debug { /** Try get the next connection. */ fn try_next<'a>(&self, connections: &'a [Connection]) -> Result; } @@ -711,7 +790,7 @@ impl Default for RoundRobin { } } -impl Strategy for RoundRobin { +impl LoadBalancingStrategy for RoundRobin { fn try_next<'a>(&self, connections: &'a [Connection]) -> Result { if connections.is_empty() { Err(crate::error::lib("Connection list empty")) From dbfb7c55961d2b929919bbee91b74162ff0e5b2f Mon Sep 17 00:00:00 2001 From: Stephen Leyva Date: Tue, 22 Sep 2020 20:53:18 -0700 Subject: [PATCH 6/9] Create constructors for static and sniffing node pool transports --- elasticsearch/src/http/transport.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index 9e30fe02..f9493f12 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -349,7 +349,7 @@ impl Transport { } /// Creates a new instance of a [Transport] configured with a - /// [StaticNodeListConnectionPool] + /// [MultiNodeConnectionPool] that does not refresh pub fn static_node_list(urls: Vec<&str>) -> Result { let urls: Vec = urls .iter() @@ -360,6 +360,23 @@ impl Transport { Ok(transport) } + /// Creates a new instance of a [Transport] configured with a + /// [MultiNodeConnectionPool] + /// + /// * `reseed_frequency` - frequency at which connections should be refreshed in seconds + pub fn sniffing_node_list( + urls: Vec<&str>, + reseed_frequency: Duration, + ) -> Result { + let urls: Vec = urls + .iter() + .map(|url| Url::parse(url)) + .collect::, _>>()?; + let conn_pool = MultiNodeConnectionPool::round_robin(urls, Some(reseed_frequency)); + let transport = TransportBuilder::new(conn_pool).build()?; + Ok(transport) + } + /// Creates a new instance of a [Transport] configured for use with /// [Elasticsearch service in Elastic Cloud](https://www.elastic.co/cloud/). /// From 214ef0a412d48be6968a43fabc32c43ff58721bf Mon Sep 17 00:00:00 2001 From: Stephen Leyva Date: Wed, 23 Sep 2020 15:13:00 -0700 Subject: [PATCH 7/9] Introduce AtomicBool to prevent multiple reseeds across threads --- elasticsearch/src/http/transport.rs | 43 ++++++++++++++--------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index f9493f12..ce00b50f 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -44,7 +44,7 @@ use std::{ fmt::Debug, io::{self, Write}, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, RwLock, }, time::{Duration, Instant}, @@ -480,10 +480,6 @@ impl Transport { { // Threads will execute against old connection pool during reseed if self.conn_pool.reseedable() { - // Set as reseeding prevents another thread from attempting - // to reseed during es request and reseed - self.conn_pool.reseeding(); - let connection = self.conn_pool.next(); let scheme = &connection.url.scheme(); // Build node info request @@ -554,9 +550,6 @@ pub trait ConnectionPool: Debug + dyn_clone::DynClone + Sync + Send { false } - // NOOP - fn reseeding(&self) {} - // NOOP by default fn reseed(&self, _connection: Vec) {} } @@ -719,11 +712,11 @@ pub struct MultiNodeConnectionPool { inner: Arc>, reseed_frequency: Option, load_balancing_strategy: LoadBalancing, + reseeding: Arc, } #[derive(Debug, Clone)] pub struct MultiNodeConnectionPoolInner { - reseeding: bool, last_update: Option, connections: Vec, } @@ -749,12 +742,17 @@ where .last_update .as_ref() .map(|last_update| last_update.elapsed() > reseed_frequency); - last_update_is_stale.unwrap_or(true) && !inner.reseeding - } + let reseedable = last_update_is_stale.unwrap_or(true); - fn reseeding(&self) { - let mut inner = self.inner.write().expect("Lock Poisoned"); - inner.reseeding = true + return if !reseedable { + false + } else { + // Check if refreshing is false if so, sets to true atomically and returns old value (false) meaning refreshable is true + // If refreshing is set to true, do nothing and return true, meaning refreshable is false + !self + .reseeding + .compare_and_swap(false, true, Ordering::Relaxed) + }; } fn reseed(&self, mut connection: Vec) { @@ -762,7 +760,7 @@ where inner.last_update = Some(Instant::now()); inner.connections.clear(); inner.connections.append(&mut connection); - inner.reseeding = false; + self.reseeding.store(false, Ordering::Relaxed); } } @@ -773,16 +771,17 @@ impl MultiNodeConnectionPool { let inner: Arc> = Arc::new(RwLock::new(MultiNodeConnectionPoolInner { - reseeding: false, last_update: None, connections, })); + let reseeding = Arc::new(AtomicBool::new(false)); let load_balancing_strategy = RoundRobin::default(); Self { inner, load_balancing_strategy, reseed_frequency, + reseeding, } } } @@ -826,7 +825,10 @@ pub mod tests { CloudId, Connection, ConnectionPool, MultiNodeConnectionPool, SingleNodeConnectionPool, TransportBuilder, }; - use std::time::{Duration, Instant}; + use std::{ + sync::atomic::Ordering, + time::{Duration, Instant}, + }; use url::Url; #[test] @@ -959,8 +961,6 @@ pub mod tests { .into_iter() .map(Connection::new) .collect(); - - connection_pool.reseeding(); connection_pool.reseed(connections); for _ in 0..10 { for expected in expected_addresses() { @@ -971,9 +971,7 @@ pub mod tests { } // Check connection pool not reseedable after reseed assert!(!connection_pool.reseedable()); - - let inner = connection_pool.inner.read().expect("lock poisoned"); - assert!(!inner.reseeding); + assert!(!connection_pool.reseeding.load(Ordering::Relaxed)); } #[test] @@ -989,6 +987,7 @@ pub mod tests { drop(inner); assert!(connection_pool.reseedable()); + assert!(connection_pool.reseeding.load(Ordering::Relaxed)); } #[test] From 50f8084f4913db14bf1fcb4471243143611bd1ee Mon Sep 17 00:00:00 2001 From: Stephen Leyva Date: Wed, 23 Sep 2020 23:32:39 -0700 Subject: [PATCH 8/9] Add regex parsing bound_address to URL * Style changes per code review --- elasticsearch/Cargo.toml | 2 + elasticsearch/src/http/transport.rs | 84 +++++++++++++++++++++++------ elasticsearch/src/lib.rs | 3 ++ 3 files changed, 72 insertions(+), 17 deletions(-) diff --git a/elasticsearch/Cargo.toml b/elasticsearch/Cargo.toml index 445d8af9..a03b5ed3 100644 --- a/elasticsearch/Cargo.toml +++ b/elasticsearch/Cargo.toml @@ -27,7 +27,9 @@ bytes = "^0.5" dyn-clone = "~1" percent-encoding = "2.1.0" reqwest = { version = "~0.10", default-features = false, features = ["gzip", "json"] } +lazy_static = "^1.4" url = "^2.1" +regex = "1.3" serde = { version = "~1", features = ["derive"] } serde_json = "~1" serde_with = "~1" diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index ce00b50f..3a8fcc2d 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -37,6 +37,7 @@ use crate::{ }; use base64::write::EncoderWriter as Base64Encoder; use bytes::BytesMut; +use regex::Regex; use serde::Serialize; use serde_json::Value; use std::{ @@ -101,6 +102,10 @@ impl fmt::Display for BuildError { /// Default address to Elasticsearch running on `http://localhost:9200` pub static DEFAULT_ADDRESS: &str = "http://localhost:9200"; +lazy_static! { + static ref ADDRESS_REGEX: Regex = + Regex::new(r"((?P[^/]+)/)?(?P[^:]+|\[[\da-fA-F:\.]+\]):(?P\d+)$").unwrap(); +} /// Builds a HTTP transport to make API calls to Elasticsearch pub struct TransportBuilder { @@ -389,7 +394,7 @@ impl Transport { Ok(transport) } - pub fn request_builder( + fn request_builder( &self, connection: &Connection, method: Method, @@ -464,6 +469,28 @@ impl Transport { Ok(request_builder) } + fn parse_to_url(address: &str, scheme: &str) -> Result { + if address.is_empty() { + return Err(crate::error::lib("Bound Address is empty")); + } + + let matches = ADDRESS_REGEX + .captures(address) + .ok_or_else(|| crate::lib(format!("error parsing address into url: {}", address)))?; + + let host = matches + .name("fqdn") + .or_else(|| Some(matches.name("ip").unwrap())) + .unwrap() + .as_str() + .trim(); + let port = matches.name("port").unwrap().as_str().trim(); + + Ok(Url::parse( + format!("{}://{}:{}", scheme, host, port).as_str(), + )?) + } + /// Creates an asynchronous request that can be awaited pub async fn send( &self, @@ -486,7 +513,7 @@ impl Transport { let node_request = self.request_builder( &connection, Method::Get, - "_nodes/_all/http", + "_nodes/http?filter_path=nodes.*.http", headers.clone(), None::<&Q>, None::, @@ -499,12 +526,17 @@ impl Transport { .unwrap() .iter() .map(|h| { - let url = format!( - "{}://{}", - scheme, - h.1["http"]["publish_address"].as_str().unwrap() - ); - let url = Url::parse(&url).unwrap(); + let address = h.1["http"]["publish_address"] + .as_str() + .or_else(|| { + Some( + h.1["http"]["bound_address"].as_array().unwrap()[0] + .as_str() + .unwrap(), + ) + }) + .unwrap(); + let url = Self::parse_to_url(address, scheme).unwrap(); Connection::new(url) }) .collect(); @@ -708,10 +740,10 @@ impl ConnectionPool for CloudConnectionPool { /// A Connection Pool that manages a static connection of nodes #[derive(Debug, Clone)] -pub struct MultiNodeConnectionPool { +pub struct MultiNodeConnectionPool { inner: Arc>, reseed_frequency: Option, - load_balancing_strategy: LoadBalancing, + load_balancing_strategy: ConnSelector, reseeding: Arc, } @@ -721,9 +753,9 @@ pub struct MultiNodeConnectionPoolInner { connections: Vec, } -impl ConnectionPool for MultiNodeConnectionPool +impl ConnectionPool for MultiNodeConnectionPool where - LoadBalancing: LoadBalancingStrategy + Clone, + ConnSelector: ConnectionSelector + Clone, { fn next(&self) -> Connection { let inner = self.inner.read().expect("lock poisoned"); @@ -787,9 +819,9 @@ impl MultiNodeConnectionPool { } /** The strategy selects an address from a given collection. */ -pub trait LoadBalancingStrategy: Send + Sync + Debug { +pub trait ConnectionSelector: Send + Sync + Debug { /** Try get the next connection. */ - fn try_next<'a>(&self, connections: &'a [Connection]) -> Result; + fn try_next(&self, connections: &[Connection]) -> Result; } /** A round-robin strategy cycles through nodes sequentially. */ @@ -806,8 +838,8 @@ impl Default for RoundRobin { } } -impl LoadBalancingStrategy for RoundRobin { - fn try_next<'a>(&self, connections: &'a [Connection]) -> Result { +impl ConnectionSelector for RoundRobin { + fn try_next(&self, connections: &[Connection]) -> Result { if connections.is_empty() { Err(crate::error::lib("Connection list empty")) } else { @@ -823,7 +855,7 @@ pub mod tests { use crate::auth::ClientCertificate; use crate::http::transport::{ CloudId, Connection, ConnectionPool, MultiNodeConnectionPool, SingleNodeConnectionPool, - TransportBuilder, + Transport, TransportBuilder, }; use std::{ sync::atomic::Ordering, @@ -853,6 +885,24 @@ pub mod tests { assert!(res.is_err()); } + #[test] + fn test_url_parsing_where_hostname_and_ip_present() { + let url = Transport::parse_to_url("localhost/127.0.0.1:9200", "http").unwrap(); + assert_eq!(url.into_string(), "http://localhost:9200/"); + } + + #[test] + fn test_url_parsing_where_only_ip_present() { + let url = Transport::parse_to_url("127.0.0.1:9200", "http").unwrap(); + assert_eq!(url.into_string(), "http://127.0.0.1:9200/"); + } + + #[test] + fn test_url_parsing_where_only_hostname_present() { + let url = Transport::parse_to_url("localhost:9200", "http").unwrap(); + assert_eq!(url.into_string(), "http://localhost:9200/"); + } + #[test] fn can_parse_cloud_id() { let base64 = base64::encode("cloud-endpoint.example$3dadf823f05388497ea684236d918a1a$3f26e1609cf54a0f80137a80de560da4"); diff --git a/elasticsearch/src/lib.rs b/elasticsearch/src/lib.rs index cd114877..b8b5d4f5 100644 --- a/elasticsearch/src/lib.rs +++ b/elasticsearch/src/lib.rs @@ -353,6 +353,9 @@ type _DoctestReadme = (); #[macro_use] extern crate dyn_clone; +#[macro_use] +extern crate lazy_static; + pub mod auth; pub mod cert; pub mod http; From 7df36efae9433d005e090ac834179529fb16d5c2 Mon Sep 17 00:00:00 2001 From: Stephen Leyva Date: Sat, 26 Sep 2020 14:49:12 -0700 Subject: [PATCH 9/9] Reseed connections on seperate thread --- elasticsearch/Cargo.toml | 1 + elasticsearch/src/http/transport.rs | 66 +++++++++++++++++------------ 2 files changed, 39 insertions(+), 28 deletions(-) diff --git a/elasticsearch/Cargo.toml b/elasticsearch/Cargo.toml index a03b5ed3..4182c6cc 100644 --- a/elasticsearch/Cargo.toml +++ b/elasticsearch/Cargo.toml @@ -33,6 +33,7 @@ regex = "1.3" serde = { version = "~1", features = ["derive"] } serde_json = "~1" serde_with = "~1" +tokio = { version = "0.2.0", default-features = false, features = ["macros", "tcp", "time"] } void = "1.0.2" [dev-dependencies] diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index 3a8fcc2d..66dd5311 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -48,6 +48,7 @@ use std::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, RwLock, }, + thread::spawn, time::{Duration, Instant}, }; use url::Url; @@ -278,7 +279,7 @@ impl TransportBuilder { let client = client_builder.build()?; Ok(Transport { client, - conn_pool: self.conn_pool, + conn_pool: Arc::new(self.conn_pool), credentials: self.credentials, }) } @@ -325,7 +326,7 @@ impl Connection { pub struct Transport { client: reqwest::Client, credentials: Option, - conn_pool: Box, + conn_pool: Arc>, } impl Transport { @@ -507,8 +508,9 @@ impl Transport { { // Threads will execute against old connection pool during reseed if self.conn_pool.reseedable() { - let connection = self.conn_pool.next(); - let scheme = &connection.url.scheme(); + let local_conn_pool = self.conn_pool.clone(); + let connection = local_conn_pool.next(); + // Build node info request let node_request = self.request_builder( &connection, @@ -519,28 +521,36 @@ impl Transport { None::, timeout, )?; - let resp = node_request.send().await?; - let json: Value = resp.json().await?; - let connections: Vec = json["nodes"] - .as_object() - .unwrap() - .iter() - .map(|h| { - let address = h.1["http"]["publish_address"] - .as_str() - .or_else(|| { - Some( - h.1["http"]["bound_address"].as_array().unwrap()[0] - .as_str() - .unwrap(), - ) + + spawn(move || { + // TODO: Log reseed failures + let mut rt = tokio::runtime::Runtime::new().expect("Cannot create tokio runtime"); + rt.block_on(async { + let scheme = connection.url.scheme(); + let resp = node_request.send().await.unwrap(); + let json: Value = resp.json().await.unwrap(); + let connections: Vec = json["nodes"] + .as_object() + .unwrap() + .iter() + .map(|h| { + let address = h.1["http"]["publish_address"] + .as_str() + .or_else(|| { + Some( + h.1["http"]["bound_address"].as_array().unwrap()[0] + .as_str() + .unwrap(), + ) + }) + .unwrap(); + let url = Self::parse_to_url(address, scheme).unwrap(); + Connection::new(url) }) - .unwrap(); - let url = Self::parse_to_url(address, scheme).unwrap(); - Connection::new(url) + .collect(); + local_conn_pool.reseed(connections); }) - .collect(); - self.conn_pool.reseed(connections); + }); } let connection = self.conn_pool.next(); @@ -743,7 +753,7 @@ impl ConnectionPool for CloudConnectionPool { pub struct MultiNodeConnectionPool { inner: Arc>, reseed_frequency: Option, - load_balancing_strategy: ConnSelector, + connection_selector: ConnSelector, reseeding: Arc, } @@ -759,7 +769,7 @@ where { fn next(&self) -> Connection { let inner = self.inner.read().expect("lock poisoned"); - self.load_balancing_strategy + self.connection_selector .try_next(&inner.connections) .unwrap() } @@ -808,10 +818,10 @@ impl MultiNodeConnectionPool { })); let reseeding = Arc::new(AtomicBool::new(false)); - let load_balancing_strategy = RoundRobin::default(); + let connection_selector = RoundRobin::default(); Self { inner, - load_balancing_strategy, + connection_selector, reseed_frequency, reseeding, }