Skip to content

Commit 2f64bea

Browse files
committed
Simplify address resolution, spawn reseed as a Tokio task
1 parent 8354172 commit 2f64bea

File tree

2 files changed

+42
-50
lines changed

2 files changed

+42
-50
lines changed

elasticsearch/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ dyn-clone = "1"
3232
lazy_static = "1"
3333
percent-encoding = "2"
3434
reqwest = { version = "0.12", default-features = false, features = ["gzip", "json"] }
35-
regex="1"
3635
url = "2"
3736
serde = { version = "1", features = ["derive"] }
3837
serde_json = "1"
@@ -49,6 +48,7 @@ http = "1"
4948
axum = "0.7"
5049
hyper = { version = "1", features = ["server", "http1"] }
5150
os_type = "2"
51+
regex="1"
5252
#sysinfo = "0.31"
5353
textwrap = "0.16"
5454
xml-rs = "0.8"

elasticsearch/src/http/transport.rs

Lines changed: 41 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ use crate::{
3838
use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, write::EncoderWriter, Engine};
3939
use bytes::BytesMut;
4040
use lazy_static::lazy_static;
41-
use regex::Regex;
4241
use serde::Serialize;
4342
use serde_json::Value;
4443
use std::{
@@ -49,7 +48,6 @@ use std::{
4948
atomic::{AtomicBool, AtomicUsize, Ordering},
5049
Arc, RwLock,
5150
},
52-
thread::spawn,
5351
time::{Duration, Instant},
5452
};
5553
use url::Url;
@@ -104,10 +102,6 @@ impl fmt::Display for BuildError {
104102

105103
/// Default address to Elasticsearch running on `https://localhost:9200`
106104
pub static DEFAULT_ADDRESS: &str = "https://localhost:9200";
107-
lazy_static! {
108-
static ref ADDRESS_REGEX: Regex =
109-
Regex::new(r"((?P<fqdn>[^/]+)/)?(?P<ip>[^:]+|\[[\da-fA-F:\.]+\]):(?P<port>\d+)$").unwrap();
110-
}
111105

112106
lazy_static! {
113107
/// Client metadata header: service, language, transport, followed by additional information
@@ -459,6 +453,7 @@ impl Transport {
459453
Ok(transport)
460454
}
461455

456+
#[allow(clippy::too_many_arguments)]
462457
fn request_builder<B, Q>(
463458
&self,
464459
connection: &Connection,
@@ -546,17 +541,18 @@ impl Transport {
546541
return Err(crate::error::lib("Bound Address is empty"));
547542
}
548543

549-
let matches = ADDRESS_REGEX
550-
.captures(address)
551-
.ok_or_else(|| crate::lib(format!("error parsing address into url: {}", address)))?;
544+
let mut host_port = None;
545+
if let Some((host, tail)) = address.split_once('/') {
546+
if let Some((_, port)) = tail.rsplit_once(':') {
547+
host_port = Some((host, port));
548+
}
549+
} else {
550+
host_port = address.rsplit_once(':');
551+
}
552552

553-
let host = matches
554-
.name("fqdn")
555-
.or_else(|| Some(matches.name("ip").unwrap()))
556-
.unwrap()
557-
.as_str()
558-
.trim();
559-
let port = matches.name("port").unwrap().as_str().trim();
553+
let (host, port) = host_port.ok_or_else(|| {
554+
crate::error::lib(format!("error parsing address into url: {}", address))
555+
})?;
560556

561557
Ok(Url::parse(
562558
format!("{}://{}:{}", scheme, host, port).as_str(),
@@ -577,10 +573,10 @@ impl Transport {
577573
B: Body,
578574
Q: Serialize + ?Sized,
579575
{
580-
// Threads will execute against old connection pool during reseed
576+
// Requests will execute against old connection pool during reseed
581577
if self.conn_pool.reseedable() {
582-
let local_conn_pool = self.conn_pool.clone();
583-
let connection = local_conn_pool.next();
578+
let conn_pool = self.conn_pool.clone();
579+
let connection = conn_pool.next();
584580

585581
// Build node info request
586582
let node_request = self.request_builder(
@@ -593,34 +589,30 @@ impl Transport {
593589
timeout,
594590
)?;
595591

596-
spawn(move || {
597-
// TODO: Log reseed failures
598-
let rt = tokio::runtime::Runtime::new().expect("Cannot create tokio runtime");
599-
rt.block_on(async {
600-
let scheme = connection.url.scheme();
601-
let resp = node_request.send().await.unwrap();
602-
let json: Value = resp.json().await.unwrap();
603-
let connections: Vec<Connection> = json["nodes"]
604-
.as_object()
605-
.unwrap()
606-
.iter()
607-
.map(|h| {
608-
let address = h.1["http"]["publish_address"]
609-
.as_str()
610-
.or_else(|| {
611-
Some(
612-
h.1["http"]["bound_address"].as_array().unwrap()[0]
613-
.as_str()
614-
.unwrap(),
615-
)
616-
})
617-
.unwrap();
618-
let url = Self::parse_to_url(address, scheme).unwrap();
619-
Connection::new(url)
620-
})
621-
.collect();
622-
local_conn_pool.reseed(connections);
623-
})
592+
tokio::spawn(async move {
593+
let scheme = connection.url.scheme();
594+
let resp = node_request.send().await.unwrap();
595+
let json: Value = resp.json().await.unwrap();
596+
let connections: Vec<Connection> = json["nodes"]
597+
.as_object()
598+
.unwrap()
599+
.iter()
600+
.map(|(_, node)| {
601+
let address = node["http"]["publish_address"]
602+
.as_str()
603+
.or_else(|| {
604+
Some(
605+
node["http"]["bound_address"].as_array().unwrap()[0]
606+
.as_str()
607+
.unwrap(),
608+
)
609+
})
610+
.unwrap();
611+
let url = Self::parse_to_url(address, scheme).unwrap();
612+
Connection::new(url)
613+
})
614+
.collect();
615+
conn_pool.reseed(connections);
624616
});
625617
}
626618

@@ -858,7 +850,7 @@ where
858850
.map(|last_update| last_update.elapsed() > reseed_frequency);
859851
let reseedable = last_update_is_stale.unwrap_or(true);
860852

861-
return if !reseedable {
853+
if !reseedable {
862854
false
863855
} else {
864856
// Check if refreshing is false if so, sets to true atomically and returns old value (false) meaning refreshable is true
@@ -869,7 +861,7 @@ where
869861
// This can be replaced with `.into_ok_or_err` once stable.
870862
// https://doc.rust-lang.org/std/result/enum.Result.html#method.into_ok_or_err
871863
.unwrap_or(true)
872-
};
864+
}
873865
}
874866

875867
fn reseed(&self, mut connection: Vec<Connection>) {

0 commit comments

Comments
 (0)