Skip to content

Commit d791f06

Browse files
committed
Minor stats fixed and cleanup
1 parent 21b9cde commit d791f06

File tree

6 files changed

+67
-74
lines changed

6 files changed

+67
-74
lines changed

src/admin.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,7 @@ where
744744
("age_seconds", DataType::Numeric),
745745
("prepare_cache_hit", DataType::Numeric),
746746
("prepare_cache_miss", DataType::Numeric),
747+
("prepare_cache_eviction", DataType::Numeric),
747748
("prepare_cache_size", DataType::Numeric),
748749
];
749750

@@ -776,6 +777,10 @@ where
776777
.prepared_miss_count
777778
.load(Ordering::Relaxed)
778779
.to_string(),
780+
server
781+
.prepared_eviction_count
782+
.load(Ordering::Relaxed)
783+
.to_string(),
779784
server
780785
.prepared_cache_size
781786
.load(Ordering::Relaxed)

src/client.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ pub struct Client<S, T> {
109109
/// Mapping of client named prepared statement to rewritten parse messages
110110
prepared_statements: HashMap<String, (Arc<Parse>, u64)>,
111111

112-
extended_protocol_data: VecDeque<ExtendedProtocolData>,
112+
/// Buffered extended protocol data
113+
extended_protocol_data_buffer: VecDeque<ExtendedProtocolData>,
113114
}
114115

115116
/// Client entrypoint.
@@ -726,7 +727,7 @@ where
726727
shutdown,
727728
prepared_statements_enabled,
728729
prepared_statements: HashMap::new(),
729-
extended_protocol_data: VecDeque::new(),
730+
extended_protocol_data_buffer: VecDeque::new(),
730731
})
731732
}
732733

@@ -764,7 +765,7 @@ where
764765
shutdown,
765766
prepared_statements_enabled: false,
766767
prepared_statements: HashMap::new(),
767-
extended_protocol_data: VecDeque::new(),
768+
extended_protocol_data_buffer: VecDeque::new(),
768769
})
769770
}
770771

@@ -970,7 +971,7 @@ where
970971
}
971972

972973
'E' => {
973-
self.extended_protocol_data
974+
self.extended_protocol_data_buffer
974975
.push_back(ExtendedProtocolData::create_new_execute(message));
975976
continue;
976977
}
@@ -1259,7 +1260,7 @@ where
12591260
// Execute
12601261
// Execute a prepared statement prepared in `P` and bound in `B`.
12611262
'E' => {
1262-
self.extended_protocol_data
1263+
self.extended_protocol_data_buffer
12631264
.push_back(ExtendedProtocolData::create_new_execute(message));
12641265
}
12651266

@@ -1310,7 +1311,9 @@ where
13101311
// ReadyForQuery
13111312

13121313
// Iterate over our extended protocol data that we've buffered
1313-
while let Some(protocol_data) = self.extended_protocol_data.pop_front() {
1314+
while let Some(protocol_data) =
1315+
self.extended_protocol_data_buffer.pop_front()
1316+
{
13141317
match protocol_data {
13151318
ExtendedProtocolData::Parse { data, metadata } => {
13161319
let (parse, hash) = match metadata {
@@ -1712,7 +1715,7 @@ where
17121715

17131716
if client_given_name.is_empty() {
17141717
debug!("Anonymous parse message");
1715-
self.extended_protocol_data
1718+
self.extended_protocol_data_buffer
17161719
.push_back(ExtendedProtocolData::create_new_parse(message, None));
17171720
return Ok(());
17181721
}
@@ -1741,7 +1744,7 @@ where
17411744
self.prepared_statements
17421745
.insert(client_given_name, (new_parse.clone(), hash));
17431746

1744-
self.extended_protocol_data
1747+
self.extended_protocol_data_buffer
17451748
.push_back(ExtendedProtocolData::create_new_parse(
17461749
new_parse.as_ref().try_into()?,
17471750
Some((new_parse.clone(), hash)),
@@ -1762,7 +1765,7 @@ where
17621765

17631766
if client_given_name.is_empty() {
17641767
debug!("Anonymous bind message");
1765-
self.extended_protocol_data
1768+
self.extended_protocol_data_buffer
17661769
.push_back(ExtendedProtocolData::create_new_bind(message, None));
17671770
return Ok(());
17681771
}
@@ -1776,11 +1779,9 @@ where
17761779
client_given_name, rewritten_parse.name
17771780
);
17781781

1779-
self.extended_protocol_data
1780-
.push_back(ExtendedProtocolData::create_new_bind(
1781-
message,
1782-
Some(client_given_name),
1783-
));
1782+
self.extended_protocol_data_buffer.push_back(
1783+
ExtendedProtocolData::create_new_bind(message, Some(client_given_name)),
1784+
);
17841785

17851786
Ok(())
17861787
}
@@ -1818,7 +1819,7 @@ where
18181819

18191820
if describe.anonymous() {
18201821
debug!("Anonymous describe message");
1821-
self.extended_protocol_data
1822+
self.extended_protocol_data_buffer
18221823
.push_back(ExtendedProtocolData::create_new_describe(message, None));
18231824

18241825
return Ok(());
@@ -1835,11 +1836,12 @@ where
18351836
client_given_name, describe.statement_name
18361837
);
18371838

1838-
self.extended_protocol_data
1839-
.push_back(ExtendedProtocolData::create_new_describe(
1839+
self.extended_protocol_data_buffer.push_back(
1840+
ExtendedProtocolData::create_new_describe(
18401841
describe.try_into()?,
18411842
Some(client_given_name),
1842-
));
1843+
),
1844+
);
18431845

18441846
Ok(())
18451847
}
@@ -1866,7 +1868,7 @@ where
18661868

18671869
fn reset_buffered_state(&mut self) {
18681870
self.buffer.clear();
1869-
self.extended_protocol_data.clear();
1871+
self.extended_protocol_data_buffer.clear();
18701872
self.response_message_queue_buffer.clear();
18711873
}
18721874

src/messages.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -925,20 +925,6 @@ impl Parse {
925925
pub fn anonymous(&self) -> bool {
926926
self.name.is_empty()
927927
}
928-
929-
/// Reads the parse message from the buffer and throws them away
930-
pub fn remove_from_buffer_start(buf: &mut BytesMut) -> Result<(), Error> {
931-
buf.get_u8();
932-
buf.get_i32();
933-
buf.read_string()?;
934-
buf.read_string()?;
935-
936-
for _ in 0..buf.get_i16() {
937-
buf.get_i32();
938-
}
939-
940-
Ok(())
941-
}
942928
}
943929

944930
/// Bind (B) message.
@@ -1104,11 +1090,6 @@ impl Bind {
11041090
Ok(response_buf)
11051091
}
11061092

1107-
pub fn reassign(mut self, new_prepared_statement_name: &str) -> Self {
1108-
self.prepared_statement = new_prepared_statement_name.to_string();
1109-
self
1110-
}
1111-
11121093
pub fn anonymous(&self) -> bool {
11131094
self.prepared_statement.is_empty()
11141095
}

src/pool.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,21 +60,18 @@ pub enum BanReason {
6060
pub type PreparedStatementCacheType = Arc<Mutex<PreparedStatementCache>>;
6161

6262
// TODO: Add stats the this cache
63+
// TODO: Add application name to the cache value to help identify which application is using the cache
64+
// TODO: Create admin command to show which statements are in the cache
6365
#[derive(Debug)]
6466
pub struct PreparedStatementCache {
65-
cache: LruCache<u64, Arc<Parse>>, // TODO: Store a tuple of BytesMut and String instead Parse to remove need to convert to BytesMut again
66-
}
67-
68-
impl Default for PreparedStatementCache {
69-
fn default() -> Self {
70-
Self::new(1)
71-
}
67+
cache: LruCache<u64, Arc<Parse>>,
7268
}
7369

7470
impl PreparedStatementCache {
75-
pub fn new(size: usize) -> Self {
71+
pub fn new(mut size: usize) -> Self {
72+
// Cannot be zeros
7673
if size == 0 {
77-
return Self::default();
74+
size = 1;
7875
}
7976

8077
PreparedStatementCache {

src/server.rs

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,14 +1116,21 @@ impl Server {
11161116
Ok(bytes)
11171117
}
11181118

1119+
// Determines if the server already has a prepared statement with the given name
1120+
// Increments the prepared statement cache hit counter
11191121
pub fn has_prepared_statement(&mut self, name: &str) -> bool {
1120-
self.stats.prepared_cache_hit();
1121-
self.prepared_statement_cache.get(name).is_some()
1122+
let has_it = self.prepared_statement_cache.get(name).is_some();
1123+
if has_it {
1124+
self.stats.prepared_cache_hit();
1125+
} else {
1126+
self.stats.prepared_cache_miss();
1127+
}
1128+
1129+
has_it
11221130
}
11231131

11241132
pub fn add_prepared_statement_to_cache(&mut self, name: &str) -> Option<String> {
11251133
self.stats.prepared_cache_add();
1126-
self.stats.prepared_cache_miss();
11271134

11281135
// If we evict something, we need to close it on the server
11291136
if let Some((evicted_name, _)) = self.prepared_statement_cache.push(name.to_string(), ()) {
@@ -1149,35 +1156,33 @@ impl Server {
11491156
parse: &Parse,
11501157
should_send_parse_to_server: bool,
11511158
) -> Result<(), Error> {
1152-
match self.prepared_statement_cache.get(&parse.name) {
1153-
Some(_) => self.stats.prepared_cache_hit(),
1154-
None => {
1155-
let mut bytes = BytesMut::new();
1156-
1157-
if should_send_parse_to_server {
1158-
let parse_bytes: BytesMut = parse.try_into()?;
1159-
bytes.extend_from_slice(&parse_bytes);
1160-
}
1159+
if !self.has_prepared_statement(&parse.name) {
1160+
let mut bytes = BytesMut::new();
11611161

1162-
// If we evict something, we need to close it on the server
1163-
// We do this by adding it to the messages we're sending to the server before the sync
1164-
if let Some(evicted_name) = self.add_prepared_statement_to_cache(&parse.name) {
1165-
self.remove_prepared_statement_from_cache(&evicted_name);
1166-
let close_bytes: BytesMut = Close::new(&evicted_name).try_into()?;
1167-
bytes.extend_from_slice(&close_bytes);
1168-
};
1162+
if should_send_parse_to_server {
1163+
let parse_bytes: BytesMut = parse.try_into()?;
1164+
bytes.extend_from_slice(&parse_bytes);
1165+
}
11691166

1170-
if !bytes.is_empty() {
1171-
bytes.extend_from_slice(&sync());
1167+
// If we evict something, we need to close it on the server
1168+
// We do this by adding it to the messages we're sending to the server before the sync
1169+
if let Some(evicted_name) = self.add_prepared_statement_to_cache(&parse.name) {
1170+
self.remove_prepared_statement_from_cache(&evicted_name);
1171+
let close_bytes: BytesMut = Close::new(&evicted_name).try_into()?;
1172+
bytes.extend_from_slice(&close_bytes);
1173+
};
11721174

1173-
self.send(&bytes).await?;
1175+
// If we have a parse or close we need to send to the server, send them and sync
1176+
if !bytes.is_empty() {
1177+
bytes.extend_from_slice(&sync());
11741178

1175-
loop {
1176-
self.recv(None).await?;
1179+
self.send(&bytes).await?;
11771180

1178-
if !self.is_data_available() {
1179-
break;
1180-
}
1181+
loop {
1182+
self.recv(None).await?;
1183+
1184+
if !self.is_data_available() {
1185+
break;
11811186
}
11821187
}
11831188
}

src/stats/server.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ pub struct ServerStats {
4949
pub error_count: Arc<AtomicU64>,
5050
pub prepared_hit_count: Arc<AtomicU64>,
5151
pub prepared_miss_count: Arc<AtomicU64>,
52+
pub prepared_eviction_count: Arc<AtomicU64>,
5253
pub prepared_cache_size: Arc<AtomicU64>,
5354
}
5455

@@ -68,6 +69,7 @@ impl Default for ServerStats {
6869
reporter: get_reporter(),
6970
prepared_hit_count: Arc::new(AtomicU64::new(0)),
7071
prepared_miss_count: Arc::new(AtomicU64::new(0)),
72+
prepared_eviction_count: Arc::new(AtomicU64::new(0)),
7173
prepared_cache_size: Arc::new(AtomicU64::new(0)),
7274
}
7375
}
@@ -221,6 +223,7 @@ impl ServerStats {
221223
}
222224

223225
pub fn prepared_cache_remove(&self) {
226+
self.prepared_eviction_count.fetch_add(1, Ordering::Relaxed);
224227
self.prepared_cache_size.fetch_sub(1, Ordering::Relaxed);
225228
}
226229
}

0 commit comments

Comments
 (0)