diff --git a/.gitignore b/.gitignore index 0b436164..40d5de1b 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ lcov.info dev/.bash_history dev/cache !dev/cache/.keepme +.venv \ No newline at end of file diff --git a/src/client.rs b/src/client.rs index 98a0669c..c3fd747c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -542,7 +542,7 @@ where } // Authenticate normal user. else { - let mut pool = match get_pool(pool_name, username) { + let pool = match get_pool(pool_name, username) { Some(pool) => pool, None => { error_response( @@ -800,6 +800,18 @@ where &self.pool_name, ); + // Get a pool instance referenced by the most up-to-date + // pointer. This ensures we always read the latest config + // when starting a query. + let mut pool = if self.admin { + // Admin clients do not use pools. + ConnectionPool::default() + } else { + self.get_pool().await? + }; + + query_router.update_pool_settings(&pool.settings); + // Our custom protocol loop. // We expect the client to either start a transaction with regular queries // or issue commands for our sharding and server selection protocol. @@ -853,12 +865,6 @@ where continue; } - // Get a pool instance referenced by the most up-to-date - // pointer. This ensures we always read the latest config - // when starting a query. - let mut pool = self.get_pool().await?; - query_router.update_pool_settings(pool.settings.clone()); - let mut initial_parsed_ast = None; match message[0] as char { @@ -990,12 +996,11 @@ where }; // Check if the pool is paused and wait until it's resumed. - if pool.wait_paused().await { - // Refresh pool information, something might have changed. - pool = self.get_pool().await?; - } + pool.wait_paused().await; - query_router.update_pool_settings(pool.settings.clone()); + // Refresh pool information, something might have changed. + pool = self.get_pool().await?; + query_router.update_pool_settings(&pool.settings); let current_shard = query_router.shard(); diff --git a/src/pool.rs b/src/pool.rs index 736dc1ad..77394070 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -190,11 +190,11 @@ impl Default for PoolSettings { #[derive(Clone, Debug, Default)] pub struct ConnectionPool { /// The pools handled internally by bb8. - databases: Vec>>, + databases: Arc>>>, /// The addresses (host, port, role) to handle /// failover and load balancing deterministically. - addresses: Vec>, + addresses: Arc>>, /// List of banned addresses (see above) /// that should not be queried. @@ -206,7 +206,7 @@ pub struct ConnectionPool { original_server_parameters: Arc>, /// Pool configuration. - pub settings: PoolSettings, + pub settings: Arc, /// If not validated, we need to double check the pool is available before allowing a client /// to use it. @@ -445,13 +445,13 @@ impl ConnectionPool { } let pool = ConnectionPool { - databases: shards, - addresses, + databases: Arc::new(shards), + addresses: Arc::new(addresses), banlist: Arc::new(RwLock::new(banlist)), config_hash: new_pool_hash_value, original_server_parameters: Arc::new(RwLock::new(ServerParameters::new())), auth_hash: pool_auth_hash, - settings: PoolSettings { + settings: Arc::new(PoolSettings { pool_mode: match user.pool_mode { Some(pool_mode) => pool_mode, None => pool_config.pool_mode, @@ -494,7 +494,7 @@ impl ConnectionPool { Some(ref plugins) => Some(plugins.clone()), None => config.plugins.clone(), }, - }, + }), validated: Arc::new(AtomicBool::new(false)), paused: Arc::new(AtomicBool::new(false)), paused_waiter: Arc::new(Notify::new()), @@ -504,7 +504,7 @@ impl ConnectionPool { // before setting it globally. // Do this async and somewhere else, we don't have to wait here. if config.general.validate_config { - let mut validate_pool = pool.clone(); + let validate_pool = pool.clone(); tokio::task::spawn(async move { let _ = validate_pool.validate().await; }); @@ -525,7 +525,7 @@ impl ConnectionPool { /// when they connect. /// This also warms up the pool for clients that connect when /// the pooler starts up. - pub async fn validate(&mut self) -> Result<(), Error> { + pub async fn validate(&self) -> Result<(), Error> { let mut futures = Vec::new(); let validated = Arc::clone(&self.validated); diff --git a/src/query_router.rs b/src/query_router.rs index 8b451dd3..541883f2 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -128,8 +128,8 @@ impl QueryRouter { } /// Pool settings can change because of a config reload. - pub fn update_pool_settings(&mut self, pool_settings: PoolSettings) { - self.pool_settings = pool_settings; + pub fn update_pool_settings(&mut self, pool_settings: &PoolSettings) { + self.pool_settings = pool_settings.clone(); } pub fn pool_settings(&self) -> &PoolSettings { @@ -1403,7 +1403,7 @@ mod test { assert_eq!(qr.primary_reads_enabled, None); // Internal state must not be changed due to this, only defaults - qr.update_pool_settings(pool_settings.clone()); + qr.update_pool_settings(&pool_settings); assert_eq!(qr.active_role, None); assert_eq!(qr.active_shard, None); @@ -1476,7 +1476,7 @@ mod test { }; let mut qr = QueryRouter::new(); - qr.update_pool_settings(pool_settings); + qr.update_pool_settings(&pool_settings); // Shard should start out unset assert_eq!(qr.active_shard, None); @@ -1860,7 +1860,7 @@ mod test { ..Default::default() }; let mut qr = QueryRouter::new(); - qr.update_pool_settings(pool_settings); + qr.update_pool_settings(&pool_settings); let query = simple_query("SELECT * FROM pg_database"); let ast = qr.parse(&query).unwrap();