Skip to content

Commit a8a30ad

Browse files
authored
Refactor Pool Stats to be based off of Server/Client stats (#445)
What is wrong Stats reported by SHOW POOLS seem to be leaking. We see lingering cl_idle , cl_waiting, and similarly for sv_idle , sv_active. We confirmed that these are reporting issues not actual lingering clients. This behavior is readily reproducible by running while true; do psql "postgres://sharding_user:sharding_user@localhost:6432/sharded_db" -c "SELECT 1" > /dev/null 2>&1 & done Why it happens I wasn't able to get to figure our the reason for the bug but my best guess is that we have race conditions when updating pool-level stats. So even though individual update operations are atomic, we perform a check then update sequence which is not protected by a guard. https://github.com/postgresml/pgcat/blob/main/src/stats/pool.rs#L174-L179 I am also suspecting that using Relaxed ordering might allow this behavior (I changed all operations to use Ordering::SeqCst but still got lingering clients) How to fix Since SHOW POOLS/SHOW SERVER/SHOW CLIENTS all show the current state of the proxy (as opposed to SHOW STATS which show aggregate values), this PR refactors SHOW POOLS to have it construct the results directly from SHOW SERVER and SHOW CLIENT datasets. This reduces the complexity of stat updates and eliminates the need for having locks when updating pool stats as we only care about updating individual client/server states. This will change the semantics of maxwait, so instead of it holding the maxwait time ever encountered by a client (connected or disconnected), it will only consider connected clients which should be okay given PgCat tends to hold on to client connections more than Pgbouncer.
1 parent d63be9b commit a8a30ad

File tree

11 files changed

+548
-713
lines changed

11 files changed

+548
-713
lines changed

src/admin.rs

+7-33
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::pool::BanReason;
2+
use crate::stats::pool::PoolStats;
23
use bytes::{Buf, BufMut, BytesMut};
34
use log::{error, info, trace};
45
use nix::sys::signal::{self, Signal};
@@ -14,7 +15,7 @@ use crate::errors::Error;
1415
use crate::messages::*;
1516
use crate::pool::ClientServerMap;
1617
use crate::pool::{get_all_pools, get_pool};
17-
use crate::stats::{get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState};
18+
use crate::stats::{get_client_stats, get_server_stats, ClientState, ServerState};
1819

1920
pub fn generate_server_info_for_admin() -> BytesMut {
2021
let mut server_info = BytesMut::new();
@@ -254,39 +255,12 @@ async fn show_pools<T>(stream: &mut T) -> Result<(), Error>
254255
where
255256
T: tokio::io::AsyncWrite + std::marker::Unpin,
256257
{
257-
let all_pool_stats = get_pool_stats();
258-
259-
let columns = vec![
260-
("database", DataType::Text),
261-
("user", DataType::Text),
262-
("pool_mode", DataType::Text),
263-
("cl_idle", DataType::Numeric),
264-
("cl_active", DataType::Numeric),
265-
("cl_waiting", DataType::Numeric),
266-
("cl_cancel_req", DataType::Numeric),
267-
("sv_active", DataType::Numeric),
268-
("sv_idle", DataType::Numeric),
269-
("sv_used", DataType::Numeric),
270-
("sv_tested", DataType::Numeric),
271-
("sv_login", DataType::Numeric),
272-
("maxwait", DataType::Numeric),
273-
("maxwait_us", DataType::Numeric),
274-
];
275-
258+
let pool_lookup = PoolStats::construct_pool_lookup();
276259
let mut res = BytesMut::new();
277-
res.put(row_description(&columns));
278-
279-
for ((_user_pool, _pool), pool_stats) in all_pool_stats {
280-
let mut row = vec![
281-
pool_stats.database(),
282-
pool_stats.user(),
283-
pool_stats.pool_mode().to_string(),
284-
];
285-
pool_stats.populate_row(&mut row);
286-
pool_stats.clear_maxwait();
287-
res.put(data_row(&row));
288-
}
289-
260+
res.put(row_description(&PoolStats::generate_header()));
261+
pool_lookup.iter().for_each(|(_identifier, pool_stats)| {
262+
res.put(data_row(&pool_stats.generate_row()));
263+
});
290264
res.put(command_complete("SHOW"));
291265

292266
// ReadyForQuery

src/client.rs

+1-13
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::plugins::PluginOutput;
2020
use crate::pool::{get_pool, ClientServerMap, ConnectionPool};
2121
use crate::query_router::{Command, QueryRouter};
2222
use crate::server::Server;
23-
use crate::stats::{ClientStats, PoolStats, ServerStats};
23+
use crate::stats::{ClientStats, ServerStats};
2424
use crate::tls::Tls;
2525

2626
use tokio_rustls::server::TlsStream;
@@ -654,24 +654,12 @@ where
654654
ready_for_query(&mut write).await?;
655655

656656
trace!("Startup OK");
657-
let pool_stats = match get_pool(pool_name, username) {
658-
Some(pool) => {
659-
if !admin {
660-
pool.stats
661-
} else {
662-
Arc::new(PoolStats::default())
663-
}
664-
}
665-
None => Arc::new(PoolStats::default()),
666-
};
667-
668657
let stats = Arc::new(ClientStats::new(
669658
process_id,
670659
application_name,
671660
username,
672661
pool_name,
673662
tokio::time::Instant::now(),
674-
pool_stats,
675663
));
676664

677665
Ok(Client {

src/mirrors.rs

+2-6
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ use bytes::{Bytes, BytesMut};
77
use parking_lot::RwLock;
88

99
use crate::config::{get_config, Address, Role, User};
10-
use crate::pool::{ClientServerMap, PoolIdentifier, ServerPool};
11-
use crate::stats::PoolStats;
10+
use crate::pool::{ClientServerMap, ServerPool};
1211
use log::{error, info, trace, warn};
1312
use tokio::sync::mpsc::{channel, Receiver, Sender};
1413

@@ -24,7 +23,7 @@ impl MirroredClient {
2423
async fn create_pool(&self) -> Pool<ServerPool> {
2524
let config = get_config();
2625
let default = std::time::Duration::from_millis(10_000).as_millis() as u64;
27-
let (connection_timeout, idle_timeout, cfg) =
26+
let (connection_timeout, idle_timeout, _cfg) =
2827
match config.pools.get(&self.address.pool_name) {
2928
Some(cfg) => (
3029
cfg.connect_timeout.unwrap_or(default),
@@ -34,14 +33,11 @@ impl MirroredClient {
3433
None => (default, default, crate::config::Pool::default()),
3534
};
3635

37-
let identifier = PoolIdentifier::new(&self.database, &self.user.username);
38-
3936
let manager = ServerPool::new(
4037
self.address.clone(),
4138
self.user.clone(),
4239
self.database.as_str(),
4340
ClientServerMap::default(),
44-
Arc::new(PoolStats::new(identifier, cfg.clone())),
4541
Arc::new(RwLock::new(None)),
4642
None,
4743
true,

src/pool.rs

+21-21
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use rand::seq::SliceRandom;
1010
use rand::thread_rng;
1111
use regex::Regex;
1212
use std::collections::HashMap;
13+
use std::fmt::{Display, Formatter};
1314
use std::sync::{
1415
atomic::{AtomicBool, Ordering},
1516
Arc,
@@ -26,7 +27,7 @@ use crate::auth_passthrough::AuthPassthrough;
2627
use crate::plugins::prewarmer;
2728
use crate::server::Server;
2829
use crate::sharding::ShardingFunction;
29-
use crate::stats::{AddressStats, ClientStats, PoolStats, ServerStats};
30+
use crate::stats::{AddressStats, ClientStats, ServerStats};
3031

3132
pub type ProcessId = i32;
3233
pub type SecretKey = i32;
@@ -76,6 +77,12 @@ impl PoolIdentifier {
7677
}
7778
}
7879

80+
impl Display for PoolIdentifier {
81+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
82+
write!(f, "{}@{}", self.user, self.db)
83+
}
84+
}
85+
7986
impl From<&Address> for PoolIdentifier {
8087
fn from(address: &Address) -> PoolIdentifier {
8188
PoolIdentifier::new(&address.database, &address.username)
@@ -202,9 +209,6 @@ pub struct ConnectionPool {
202209
paused: Arc<AtomicBool>,
203210
paused_waiter: Arc<Notify>,
204211

205-
/// Statistics.
206-
pub stats: Arc<PoolStats>,
207-
208212
/// AuthInfo
209213
pub auth_hash: Arc<RwLock<Option<String>>>,
210214
}
@@ -254,10 +258,6 @@ impl ConnectionPool {
254258
.clone()
255259
.into_keys()
256260
.collect::<Vec<String>>();
257-
let pool_stats = Arc::new(PoolStats::new(identifier, pool_config.clone()));
258-
259-
// Allow the pool to be seen in statistics
260-
pool_stats.register(pool_stats.clone());
261261

262262
// Sort by shard number to ensure consistency.
263263
shard_ids.sort_by_key(|k| k.parse::<i64>().unwrap());
@@ -358,7 +358,6 @@ impl ConnectionPool {
358358
user.clone(),
359359
&shard.database,
360360
client_server_map.clone(),
361-
pool_stats.clone(),
362361
pool_auth_hash.clone(),
363362
match pool_config.plugins {
364363
Some(ref plugins) => Some(plugins.clone()),
@@ -429,7 +428,6 @@ impl ConnectionPool {
429428

430429
let pool = ConnectionPool {
431430
databases: shards,
432-
stats: pool_stats,
433431
addresses,
434432
banlist: Arc::new(RwLock::new(banlist)),
435433
config_hash: new_pool_hash_value,
@@ -610,6 +608,10 @@ impl ConnectionPool {
610608
});
611609
}
612610

611+
// Indicate we're waiting on a server connection from a pool.
612+
let now = Instant::now();
613+
client_stats.waiting();
614+
613615
while !candidates.is_empty() {
614616
// Get the next candidate
615617
let address = match candidates.pop() {
@@ -628,10 +630,6 @@ impl ConnectionPool {
628630
}
629631
}
630632

631-
// Indicate we're waiting on a server connection from a pool.
632-
let now = Instant::now();
633-
client_stats.waiting();
634-
635633
// Check if we can connect
636634
let mut conn = match self.databases[address.shard][address.address_index]
637635
.get()
@@ -669,19 +667,27 @@ impl ConnectionPool {
669667
.stats()
670668
.checkout_time(checkout_time, client_stats.application_name());
671669
server.stats().active(client_stats.application_name());
672-
670+
client_stats.active();
673671
return Ok((conn, address.clone()));
674672
}
675673

676674
if self
677675
.run_health_check(address, server, now, client_stats)
678676
.await
679677
{
678+
let checkout_time: u64 = now.elapsed().as_micros() as u64;
679+
client_stats.checkout_time(checkout_time);
680+
server
681+
.stats()
682+
.checkout_time(checkout_time, client_stats.application_name());
683+
server.stats().active(client_stats.application_name());
684+
client_stats.active();
680685
return Ok((conn, address.clone()));
681686
} else {
682687
continue;
683688
}
684689
}
690+
client_stats.idle();
685691
Err(Error::AllServersDown)
686692
}
687693

@@ -927,9 +933,6 @@ pub struct ServerPool {
927933
/// Client/server mapping.
928934
client_server_map: ClientServerMap,
929935

930-
/// Server statistics.
931-
stats: Arc<PoolStats>,
932-
933936
/// Server auth hash (for auth passthrough).
934937
auth_hash: Arc<RwLock<Option<String>>>,
935938

@@ -946,7 +949,6 @@ impl ServerPool {
946949
user: User,
947950
database: &str,
948951
client_server_map: ClientServerMap,
949-
stats: Arc<PoolStats>,
950952
auth_hash: Arc<RwLock<Option<String>>>,
951953
plugins: Option<Plugins>,
952954
cleanup_connections: bool,
@@ -956,7 +958,6 @@ impl ServerPool {
956958
user: user.clone(),
957959
database: database.to_string(),
958960
client_server_map,
959-
stats,
960961
auth_hash,
961962
plugins,
962963
cleanup_connections,
@@ -975,7 +976,6 @@ impl ManageConnection for ServerPool {
975976

976977
let stats = Arc::new(ServerStats::new(
977978
self.address.clone(),
978-
self.stats.clone(),
979979
tokio::time::Instant::now(),
980980
));
981981

src/prometheus.rs

+11-13
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ use std::sync::atomic::Ordering;
99
use std::sync::Arc;
1010

1111
use crate::config::Address;
12-
use crate::pool::get_all_pools;
13-
use crate::stats::{get_pool_stats, get_server_stats, ServerStats};
12+
use crate::pool::{get_all_pools, PoolIdentifier};
13+
use crate::stats::pool::PoolStats;
14+
use crate::stats::{get_server_stats, ServerStats};
1415

1516
struct MetricHelpType {
1617
help: &'static str,
@@ -233,10 +234,10 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
233234
Self::from_name(&format!("stats_{}", name), value, labels)
234235
}
235236

236-
fn from_pool(pool: &(String, String), name: &str, value: u64) -> Option<PrometheusMetric<u64>> {
237+
fn from_pool(pool_id: PoolIdentifier, name: &str, value: u64) -> Option<PrometheusMetric<u64>> {
237238
let mut labels = HashMap::new();
238-
labels.insert("pool", pool.0.clone());
239-
labels.insert("user", pool.1.clone());
239+
labels.insert("pool", pool_id.db);
240+
labels.insert("user", pool_id.user);
240241

241242
Self::from_name(&format!("pools_{}", name), value, labels)
242243
}
@@ -284,18 +285,15 @@ fn push_address_stats(lines: &mut Vec<String>) {
284285

285286
// Adds relevant metrics shown in a SHOW POOLS admin command.
286287
fn push_pool_stats(lines: &mut Vec<String>) {
287-
let pool_stats = get_pool_stats();
288-
for (pool, stats) in pool_stats.iter() {
289-
let stats = &**stats;
288+
let pool_stats = PoolStats::construct_pool_lookup();
289+
for (pool_id, stats) in pool_stats.iter() {
290290
for (name, value) in stats.clone() {
291-
if let Some(prometheus_metric) = PrometheusMetric::<u64>::from_pool(pool, &name, value)
291+
if let Some(prometheus_metric) =
292+
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
292293
{
293294
lines.push(prometheus_metric.to_string());
294295
} else {
295-
warn!(
296-
"Metric {} not implemented for ({},{})",
297-
name, pool.0, pool.1
298-
);
296+
warn!("Metric {} not implemented for ({})", name, *pool_id);
299297
}
300298
}
301299
}

src/stats.rs

-21
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::pool::PoolIdentifier;
21
/// Statistics and reporting.
32
use arc_swap::ArcSwap;
43

@@ -16,13 +15,11 @@ pub mod pool;
1615
pub mod server;
1716
pub use address::AddressStats;
1817
pub use client::{ClientState, ClientStats};
19-
pub use pool::PoolStats;
2018
pub use server::{ServerState, ServerStats};
2119

2220
/// Convenience types for various stats
2321
type ClientStatesLookup = HashMap<i32, Arc<ClientStats>>;
2422
type ServerStatesLookup = HashMap<i32, Arc<ServerStats>>;
25-
type PoolStatsLookup = HashMap<(String, String), Arc<PoolStats>>;
2623

2724
/// Stats for individual client connections
2825
/// Used in SHOW CLIENTS.
@@ -34,11 +31,6 @@ static CLIENT_STATS: Lazy<Arc<RwLock<ClientStatesLookup>>> =
3431
static SERVER_STATS: Lazy<Arc<RwLock<ServerStatesLookup>>> =
3532
Lazy::new(|| Arc::new(RwLock::new(ServerStatesLookup::default())));
3633

37-
/// Aggregate stats for each pool (a pool is identified by database name and username)
38-
/// Used in SHOW POOLS.
39-
static POOL_STATS: Lazy<Arc<RwLock<PoolStatsLookup>>> =
40-
Lazy::new(|| Arc::new(RwLock::new(PoolStatsLookup::default())));
41-
4234
/// The statistics reporter. An instance is given to each possible source of statistics,
4335
/// e.g. client stats, server stats, connection pool stats.
4436
pub static REPORTER: Lazy<ArcSwap<Reporter>> =
@@ -80,13 +72,6 @@ impl Reporter {
8072
fn server_disconnecting(&self, server_id: i32) {
8173
SERVER_STATS.write().remove(&server_id);
8274
}
83-
84-
/// Register a pool with the stats system.
85-
fn pool_register(&self, identifier: PoolIdentifier, stats: Arc<PoolStats>) {
86-
POOL_STATS
87-
.write()
88-
.insert((identifier.db, identifier.user), stats);
89-
}
9075
}
9176

9277
/// The statistics collector which used for calculating averages
@@ -139,12 +124,6 @@ pub fn get_server_stats() -> ServerStatesLookup {
139124
SERVER_STATS.read().clone()
140125
}
141126

142-
/// Get a snapshot of pool statistics.
143-
/// by the `Collector`.
144-
pub fn get_pool_stats() -> PoolStatsLookup {
145-
POOL_STATS.read().clone()
146-
}
147-
148127
/// Get the statistics reporter used to update stats across the pools/clients.
149128
pub fn get_reporter() -> Reporter {
150129
(*(*REPORTER.load())).clone()

0 commit comments

Comments
 (0)