Skip to content

Commit 870b34d

Browse files
committed
Change auth_query config to be per pool instead of global.
Given that we can connect to different username/databases/servers using connection pools, it makes sense that `auth_query` should be configured in a per pool basis. This change implements that and also leaves the global parameters so pool configuration can inherit global ones. Note that now, `auth_query_database` is dropped in favor of the pool configured database.
1 parent 785a409 commit 870b34d

File tree

10 files changed

+112
-52
lines changed

10 files changed

+112
-52
lines changed

.circleci/pgcat.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ admin_password = "admin_pass"
5353
# auth_query = "SELECT * FROM public.user_lookup('$1');"
5454
# auth_query_user = "md5_auth_user"
5555
# auth_query_password = "secret"
56-
# auth_query_database = "postgres"
5756

5857
# pool
5958
# configs are structured as pool.<pool_name>

.circleci/query_auth_test.sh

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,18 @@ export LOCAL_IP=$(hostname -i)
1212
# auth_query = "SELECT * FROM public.user_lookup('$1');"
1313
# auth_query_user = "md5_auth_user"
1414
# auth_query_password = "secret"
15-
# auth_query_database = "postgres"
1615
# ...
1716

1817
# Before (sets up auth_query in postgres and pgcat)
1918
PGDATABASE=postgres PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_auth_setup.sql
19+
PGDATABASE=shard0 PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_auth_setup_function.sql
20+
PGDATABASE=shard1 PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_auth_setup_function.sql
21+
PGDATABASE=shard2 PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_auth_setup_function.sql
2022
sed -i 's/^# auth_query/auth_query/' .circleci/pgcat.toml
2123

2224
# TEST_WRONG_AUTH_QUERY BEGIN
2325
# When auth_query fails...
24-
PGDATABASE=postgres \
26+
PGDATABASE=shard0 \
2527
PGPASSWORD=postgres \
2628
psql -e -h 127.0.0.1 -p 5432 -U postgres -c "REVOKE ALL ON FUNCTION public.user_lookup(text) FROM public, md5_auth_user;"
2729

@@ -33,7 +35,7 @@ echo "When query_auth_config is wrong, we fall back to passwords set in cleartex
3335
psql -U sharding_user -h 127.0.0.1 -p 6432 -c 'SELECT 1'
3436

3537
# After
36-
PGDATABASE=postgres \
38+
PGDATABASE=shard0 \
3739
PGPASSWORD=postgres \
3840
psql -e -h 127.0.0.1 -p 5432 -U postgres -c "GRANT EXECUTE ON FUNCTION public.user_lookup(text) TO md5_auth_user;"
3941
# TEST_WRONG_AUTH_QUERY END
@@ -80,6 +82,9 @@ PGPASSWORD=another_sharding_password psql -U sharding_user -h "${LOCAL_IP}" -p 6
8082
# TEST_PASSWORD_CHANGE END
8183

8284
# After
85+
PGDATABASE=shard0 PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_auth_teardown_function.sql
86+
PGDATABASE=shard1 PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_auth_teardown_function.sql
87+
PGDATABASE=shard2 PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_auth_teardown_function.sql
8388
PGDATABASE=postgres PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_auth_teardown.sql
8489
sed -i 's/^auth_query/# auth_query/' .circleci/pgcat.toml
8590
sed -i 's/^# password =/password =/' .circleci/pgcat.toml

src/auth_passthrough.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,12 @@ use bytes::BytesMut;
77
use fallible_iterator::FallibleIterator;
88
use parking_lot::Mutex;
99

10+
use crate::pool::ClientServerMap;
1011
use log::{debug, trace, warn};
1112
use postgres_protocol::message;
1213
use std::collections::HashMap;
1314
use std::sync::Arc;
1415

15-
use crate::config::get_config;
16-
use crate::pool::ClientServerMap;
17-
1816
pub struct AuthPassthrough {
1917
password: String,
2018
query: String,
@@ -33,23 +31,35 @@ impl AuthPassthrough {
3331
}
3432
}
3533

36-
/// Returns an AuthPassthrough given the configuration.
34+
/// Returns an AuthPassthrough given the pool configuration.
3735
/// If any of required values is not set, None is returned.
38-
pub fn from_config() -> Option<Self> {
39-
let config = get_config();
40-
41-
if config.is_auth_query_configured() {
36+
pub fn from_pool_config(pool_config: &crate::config::Pool, database: &String) -> Option<Self> {
37+
if pool_config.is_auth_query_configured() {
4238
return Some(AuthPassthrough::new(
43-
config.general.auth_query.as_ref().unwrap(),
44-
config.general.auth_query_user.as_ref().unwrap(),
45-
config.general.auth_query_password.as_ref().unwrap(),
46-
config.general.auth_query_database.as_ref().unwrap(),
39+
pool_config.auth_query.as_ref().unwrap(),
40+
pool_config.auth_query_user.as_ref().unwrap(),
41+
pool_config.auth_query_password.as_ref().unwrap(),
42+
database,
4743
));
4844
}
4945

5046
None
5147
}
5248

49+
/// Returns an AuthPassthrough given the pool settings.
50+
/// If any of required values is not set, None is returned.
51+
pub fn from_pool_settings(
52+
pool_settings: &crate::pool::PoolSettings,
53+
database: &String,
54+
) -> Option<Self> {
55+
let mut pool_config = crate::config::Pool::default();
56+
pool_config.auth_query = pool_settings.auth_query.clone();
57+
pool_config.auth_query_password = pool_settings.auth_query_password.clone();
58+
pool_config.auth_query_user = pool_settings.auth_query_user.clone();
59+
60+
AuthPassthrough::from_pool_config(&pool_config, database)
61+
}
62+
5363
/// Connects to server and executes auth_query for the specified address.
5464
/// If the response is a row with two columns containing the username set in the address.
5565
/// and its MD5 hash, the MD5 hash returned.
@@ -220,7 +230,7 @@ async fn recv_data(server: &mut Server) -> Result<BytesMut, Error> {
220230
}
221231

222232
async fn send_auth_query(server: &mut Server, query: &str) -> Result<(), Error> {
223-
return match server.send(simple_query(query)).await {
233+
return match server.send(&simple_query(query)).await {
224234
Ok(()) => Ok(()),
225235
Err(err) => {
226236
let message = format!(

src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ where
507507
}
508508
}
509509
if password_hash.is_none() {
510-
warn!("Clien auth is not possible, you either have not set a valid auth_query or a password for {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", username, pool_name, application_name);
510+
warn!("Client auth is not possible, you either have not set a valid auth_query or a password for {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", username, pool_name, application_name);
511511
wrong_password(&mut write, username).await?;
512512

513513
return Err(Error::ClientError(format!("Invalid client auth {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", username, pool_name, application_name)));

src/config.rs

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
/// Parse the configuration file.
22
use arc_swap::ArcSwap;
3+
use hmac::digest::typenum::private::IsNotEqualPrivate;
34
use log::{error, info};
45
use once_cell::sync::Lazy;
56
use serde_derive::{Deserialize, Serialize};
@@ -193,7 +194,6 @@ pub struct General {
193194
pub auth_query: Option<String>,
194195
pub auth_query_user: Option<String>,
195196
pub auth_query_password: Option<String>,
196-
pub auth_query_database: Option<String>,
197197
}
198198

199199
impl General {
@@ -258,7 +258,6 @@ impl Default for General {
258258
auth_query: None,
259259
auth_query_user: None,
260260
auth_query_password: None,
261-
auth_query_database: None,
262261
}
263262
}
264263
}
@@ -329,9 +328,19 @@ pub struct Pool {
329328

330329
pub shards: BTreeMap<String, Shard>,
331330
pub users: BTreeMap<String, User>,
331+
332+
pub auth_query: Option<String>,
333+
pub auth_query_user: Option<String>,
334+
pub auth_query_password: Option<String>,
332335
}
333336

334337
impl Pool {
338+
pub fn is_auth_query_configured(&self) -> bool {
339+
self.auth_query_password.is_some()
340+
&& self.auth_query_user.is_some()
341+
&& self.auth_query_password.is_some()
342+
}
343+
335344
pub fn default_pool_mode() -> PoolMode {
336345
PoolMode::Transaction
337346
}
@@ -390,6 +399,9 @@ impl Default for Pool {
390399
automatic_sharding_key: None,
391400
connect_timeout: None,
392401
idle_timeout: None,
402+
auth_query: None,
403+
auth_query_user: None,
404+
auth_query_password: None,
393405
}
394406
}
395407
}
@@ -481,6 +493,12 @@ pub struct Config {
481493
}
482494

483495
impl Config {
496+
pub fn is_auth_query_configured(&self) -> bool {
497+
self.pools
498+
.iter()
499+
.any(|(_name, pool)| pool.is_auth_query_configured())
500+
}
501+
484502
pub fn default_path() -> String {
485503
String::from("pgcat.toml")
486504
}
@@ -494,6 +512,22 @@ impl Config {
494512
self.general.auth_query_password = Some(val);
495513
}
496514
}
515+
516+
pub fn fill_up_auth_query_config(&mut self) {
517+
for (_name, pool) in self.pools.iter_mut() {
518+
if pool.auth_query.is_none() {
519+
pool.auth_query = self.general.auth_query.clone();
520+
}
521+
522+
if pool.auth_query_user.is_none() {
523+
pool.auth_query_user = self.general.auth_query_user.clone();
524+
}
525+
526+
if pool.auth_query_password.is_none() {
527+
pool.auth_query_password = self.general.auth_query_password.clone();
528+
}
529+
}
530+
}
497531
}
498532

499533
impl Default for Config {
@@ -698,21 +732,13 @@ impl Config {
698732
}
699733
}
700734

701-
pub fn is_auth_query_configured(&self) -> bool {
702-
self.general.auth_query_password.is_some()
703-
&& self.general.auth_query_user.is_some()
704-
&& self.general.auth_query_password.is_some()
705-
&& self.general.auth_query_database.is_some()
706-
}
707-
708735
pub fn validate(&mut self) -> Result<(), Error> {
709736
// Validation for auth_query feature
710737
if self.general.auth_query.is_some()
711738
&& (self.general.auth_query_user.is_none()
712-
|| self.general.auth_query_password.is_none()
713-
|| self.general.auth_query_database.is_none())
739+
|| self.general.auth_query_password.is_none())
714740
{
715-
error!("If auth_query is specified, you need to provide a value for `auth_query_user`, `auth_query_password` and `auth_query_database`");
741+
error!("If auth_query is specified, you need to provide a value for `auth_query_user`, `auth_query_password`");
716742
return Err(Error::BadConfig);
717743
}
718744

@@ -801,6 +827,7 @@ pub async fn parse(path: &str) -> Result<(), Error> {
801827
};
802828

803829
config.overwrite_passwords();
830+
config.fill_up_auth_query_config();
804831
config.validate()?;
805832

806833
config.path = path.to_string();
@@ -916,7 +943,6 @@ mod test {
916943
);
917944
assert_eq!(get_config().pools["simple_db"].users["0"].pool_size, 5);
918945
assert_eq!(get_config().general.auth_query, None);
919-
assert_eq!(get_config().general.auth_query_database, None);
920946
assert_eq!(get_config().general.auth_query_user, None);
921947
assert_eq!(get_config().general.auth_query_password, None);
922948
}

src/pool.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ pub struct PoolSettings {
9292

9393
// Health check delay
9494
pub healthcheck_delay: u64,
95+
96+
// Auth query parameters
97+
pub auth_query: Option<String>,
98+
pub auth_query_user: Option<String>,
99+
pub auth_query_password: Option<String>,
95100
}
96101

97102
impl Default for PoolSettings {
@@ -108,6 +113,9 @@ impl Default for PoolSettings {
108113
automatic_sharding_key: None,
109114
healthcheck_delay: General::default_healthcheck_delay(),
110115
healthcheck_timeout: General::default_healthcheck_timeout(),
116+
auth_query: None,
117+
auth_query_user: None,
118+
auth_query_password: None,
111119
}
112120
}
113121
}
@@ -144,15 +152,19 @@ pub struct ConnectionPool {
144152

145153
impl ConnectionPool {
146154
async fn auth_hash_has_changed(&mut self) -> bool {
147-
if let Some(apt) = AuthPassthrough::from_config() {
148-
if let Some(shard) = self.addresses.first() {
149-
if let Some(address) = shard.first() {
155+
if let Some(shard) = self.addresses.first() {
156+
if let Some(address) = shard.first() {
157+
if let Some(apt) =
158+
AuthPassthrough::from_pool_settings(&self.settings, &address.database)
159+
{
150160
match apt.fetch_hash(address).await {
151161
Ok(md5) => {
152162
if let Some(auth_hash) = self.auth_hash.as_ref() {
153163
if auth_hash != &md5 {
154164
info!("Password hash changed for user: {:?}. Pool will be reloaded.", address.username);
155165
return true
166+
} else {
167+
debug!("Password hash has not changed for user: {:?}.", address.username);
156168
}
157169
}
158170
},
@@ -167,7 +179,6 @@ impl ConnectionPool {
167179
/// Construct the connection pool from the configuration.
168180
pub async fn from_config(client_server_map: ClientServerMap) -> Result<(), Error> {
169181
let config = get_config();
170-
let auth_passthrough = AuthPassthrough::from_config();
171182

172183
let mut new_pools = HashMap::new();
173184
let mut address_id = 0;
@@ -247,14 +258,16 @@ impl ConnectionPool {
247258
}
248259

249260
// We assume every server in the pool share user/passwords
250-
// TODO: Make it server dependant.
251261
let mut auth_hash = None;
262+
let auth_passthrough =
263+
AuthPassthrough::from_pool_config(pool_config, &shard.database);
252264
if let Some(apt) = auth_passthrough.as_ref() {
253265
match apt.fetch_hash(&address).await {
254266
Ok(ok) => {
255267
if let Some(pool_auth_hash_value) = pool_auth_hash {
256268
if ok != pool_auth_hash_value {
257-
warn!("Hash is not the same across servers of the same pool, client auth will be done using last obtained hash.");
269+
warn!("Hash is not the same across shards of the same pool, client auth will \
270+
be done using last obtained hash. Server: {}:{}, Database: {}", server.host, server.port, shard.database);
258271
}
259272
}
260273
pool_auth_hash = Some(ok.clone());
@@ -302,6 +315,12 @@ impl ConnectionPool {
302315
}
303316

304317
assert_eq!(shards.len(), addresses.len());
318+
if pool_auth_hash.is_some() {
319+
info!(
320+
"Auth hash obtained from query_auth for pool {{ name: {}, user: {} }}",
321+
pool_name, user.username
322+
);
323+
}
305324

306325
let mut pool = ConnectionPool {
307326
databases: shards,
@@ -328,6 +347,9 @@ impl ConnectionPool {
328347
automatic_sharding_key: pool_config.automatic_sharding_key.clone(),
329348
healthcheck_delay: config.general.healthcheck_delay,
330349
healthcheck_timeout: config.general.healthcheck_timeout,
350+
auth_query: pool_config.auth_query.clone(),
351+
auth_query_user: pool_config.auth_query_user.clone(),
352+
auth_query_password: pool_config.auth_query_password.clone(),
331353
},
332354
};
333355

tests/sharding/query_auth_setup.sql

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,3 @@
44

55
ALTER ROLE sharding_user ENCRYPTED PASSWORD 'md5fa9d23e5a874c61a91bf37e1e4a9c86e'; --- sharding_user
66
CREATE ROLE md5_auth_user ENCRYPTED PASSWORD 'md54ab2c5d00339c4b2a4e921d2dc4edec7' LOGIN; --- secret
7-
8-
CREATE OR REPLACE FUNCTION public.user_lookup(in i_username text, out uname text, out phash text)
9-
RETURNS record AS $$
10-
BEGIN
11-
SELECT usename, passwd FROM pg_catalog.pg_shadow
12-
WHERE usename = i_username INTO uname, phash;
13-
RETURN;
14-
END;
15-
$$ LANGUAGE plpgsql SECURITY DEFINER;
16-
17-
REVOKE ALL ON FUNCTION public.user_lookup(text) FROM public, md5_auth_user;
18-
GRANT EXECUTE ON FUNCTION public.user_lookup(text) TO md5_auth_user;
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
CREATE OR REPLACE FUNCTION public.user_lookup(in i_username text, out uname text, out phash text)
2+
RETURNS record AS $$
3+
BEGIN
4+
SELECT usename, passwd FROM pg_catalog.pg_shadow
5+
WHERE usename = i_username INTO uname, phash;
6+
RETURN;
7+
END;
8+
$$ LANGUAGE plpgsql SECURITY DEFINER;
9+
10+
REVOKE ALL ON FUNCTION public.user_lookup(text) FROM public, md5_auth_user;
11+
GRANT EXECUTE ON FUNCTION public.user_lookup(text) TO md5_auth_user;

tests/sharding/query_auth_teardown.sql

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,4 @@
33
--- - Drops auth query function and user.
44

55
ALTER ROLE sharding_user ENCRYPTED PASSWORD 'sharding_user' LOGIN;
6-
7-
REVOKE ALL ON FUNCTION public.user_lookup(text) FROM public, md5_auth_user;
8-
DROP FUNCTION public.user_lookup(in i_username text, out uname text, out phash text);
96
DROP ROLE md5_auth_user;
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
REVOKE ALL ON FUNCTION public.user_lookup(text) FROM public, md5_auth_user;
2+
DROP FUNCTION public.user_lookup(in i_username text, out uname text, out phash text);

0 commit comments

Comments
 (0)