Skip to content

Refactor Pool Stats to be based off of Server/Client stats #445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 7 additions & 33 deletions src/admin.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::pool::BanReason;
use crate::stats::pool::PoolStats;
use bytes::{Buf, BufMut, BytesMut};
use log::{error, info, trace};
use nix::sys::signal::{self, Signal};
Expand All @@ -14,7 +15,7 @@ use crate::errors::Error;
use crate::messages::*;
use crate::pool::ClientServerMap;
use crate::pool::{get_all_pools, get_pool};
use crate::stats::{get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState};
use crate::stats::{get_client_stats, get_server_stats, ClientState, ServerState};

pub fn generate_server_info_for_admin() -> BytesMut {
let mut server_info = BytesMut::new();
Expand Down Expand Up @@ -254,39 +255,12 @@ async fn show_pools<T>(stream: &mut T) -> Result<(), Error>
where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let all_pool_stats = get_pool_stats();

let columns = vec![
("database", DataType::Text),
("user", DataType::Text),
("pool_mode", DataType::Text),
("cl_idle", DataType::Numeric),
("cl_active", DataType::Numeric),
("cl_waiting", DataType::Numeric),
("cl_cancel_req", DataType::Numeric),
("sv_active", DataType::Numeric),
("sv_idle", DataType::Numeric),
("sv_used", DataType::Numeric),
("sv_tested", DataType::Numeric),
("sv_login", DataType::Numeric),
("maxwait", DataType::Numeric),
("maxwait_us", DataType::Numeric),
];

let pool_lookup = PoolStats::construct_pool_lookup();
let mut res = BytesMut::new();
res.put(row_description(&columns));

for ((_user_pool, _pool), pool_stats) in all_pool_stats {
let mut row = vec![
pool_stats.database(),
pool_stats.user(),
pool_stats.pool_mode().to_string(),
];
pool_stats.populate_row(&mut row);
pool_stats.clear_maxwait();
res.put(data_row(&row));
}

res.put(row_description(&PoolStats::generate_header()));
pool_lookup.iter().for_each(|(_identifier, pool_stats)| {
res.put(data_row(&pool_stats.generate_row()));
});
res.put(command_complete("SHOW"));

// ReadyForQuery
Expand Down
14 changes: 1 addition & 13 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::plugins::PluginOutput;
use crate::pool::{get_pool, ClientServerMap, ConnectionPool};
use crate::query_router::{Command, QueryRouter};
use crate::server::Server;
use crate::stats::{ClientStats, PoolStats, ServerStats};
use crate::stats::{ClientStats, ServerStats};
use crate::tls::Tls;

use tokio_rustls::server::TlsStream;
Expand Down Expand Up @@ -654,24 +654,12 @@ where
ready_for_query(&mut write).await?;

trace!("Startup OK");
let pool_stats = match get_pool(pool_name, username) {
Some(pool) => {
if !admin {
pool.stats
} else {
Arc::new(PoolStats::default())
}
}
None => Arc::new(PoolStats::default()),
};

let stats = Arc::new(ClientStats::new(
process_id,
application_name,
username,
pool_name,
tokio::time::Instant::now(),
pool_stats,
));

Ok(Client {
Expand Down
8 changes: 2 additions & 6 deletions src/mirrors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use bytes::{Bytes, BytesMut};
use parking_lot::RwLock;

use crate::config::{get_config, Address, Role, User};
use crate::pool::{ClientServerMap, PoolIdentifier, ServerPool};
use crate::stats::PoolStats;
use crate::pool::{ClientServerMap, ServerPool};
use log::{error, info, trace, warn};
use tokio::sync::mpsc::{channel, Receiver, Sender};

Expand All @@ -24,7 +23,7 @@ impl MirroredClient {
async fn create_pool(&self) -> Pool<ServerPool> {
let config = get_config();
let default = std::time::Duration::from_millis(10_000).as_millis() as u64;
let (connection_timeout, idle_timeout, cfg) =
let (connection_timeout, idle_timeout, _cfg) =
match config.pools.get(&self.address.pool_name) {
Some(cfg) => (
cfg.connect_timeout.unwrap_or(default),
Expand All @@ -34,14 +33,11 @@ impl MirroredClient {
None => (default, default, crate::config::Pool::default()),
};

let identifier = PoolIdentifier::new(&self.database, &self.user.username);

let manager = ServerPool::new(
self.address.clone(),
self.user.clone(),
self.database.as_str(),
ClientServerMap::default(),
Arc::new(PoolStats::new(identifier, cfg.clone())),
Arc::new(RwLock::new(None)),
None,
true,
Expand Down
42 changes: 21 additions & 21 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use rand::seq::SliceRandom;
use rand::thread_rng;
use regex::Regex;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
Expand All @@ -26,7 +27,7 @@ use crate::auth_passthrough::AuthPassthrough;
use crate::plugins::prewarmer;
use crate::server::Server;
use crate::sharding::ShardingFunction;
use crate::stats::{AddressStats, ClientStats, PoolStats, ServerStats};
use crate::stats::{AddressStats, ClientStats, ServerStats};

pub type ProcessId = i32;
pub type SecretKey = i32;
Expand Down Expand Up @@ -76,6 +77,12 @@ impl PoolIdentifier {
}
}

impl Display for PoolIdentifier {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}@{}", self.user, self.db)
}
}

impl From<&Address> for PoolIdentifier {
fn from(address: &Address) -> PoolIdentifier {
PoolIdentifier::new(&address.database, &address.username)
Expand Down Expand Up @@ -202,9 +209,6 @@ pub struct ConnectionPool {
paused: Arc<AtomicBool>,
paused_waiter: Arc<Notify>,

/// Statistics.
pub stats: Arc<PoolStats>,

/// AuthInfo
pub auth_hash: Arc<RwLock<Option<String>>>,
}
Expand Down Expand Up @@ -254,10 +258,6 @@ impl ConnectionPool {
.clone()
.into_keys()
.collect::<Vec<String>>();
let pool_stats = Arc::new(PoolStats::new(identifier, pool_config.clone()));

// Allow the pool to be seen in statistics
pool_stats.register(pool_stats.clone());

// Sort by shard number to ensure consistency.
shard_ids.sort_by_key(|k| k.parse::<i64>().unwrap());
Expand Down Expand Up @@ -358,7 +358,6 @@ impl ConnectionPool {
user.clone(),
&shard.database,
client_server_map.clone(),
pool_stats.clone(),
pool_auth_hash.clone(),
match pool_config.plugins {
Some(ref plugins) => Some(plugins.clone()),
Expand Down Expand Up @@ -429,7 +428,6 @@ impl ConnectionPool {

let pool = ConnectionPool {
databases: shards,
stats: pool_stats,
addresses,
banlist: Arc::new(RwLock::new(banlist)),
config_hash: new_pool_hash_value,
Expand Down Expand Up @@ -610,6 +608,10 @@ impl ConnectionPool {
});
}

// Indicate we're waiting on a server connection from a pool.
let now = Instant::now();
client_stats.waiting();

while !candidates.is_empty() {
// Get the next candidate
let address = match candidates.pop() {
Expand All @@ -628,10 +630,6 @@ impl ConnectionPool {
}
}

// Indicate we're waiting on a server connection from a pool.
let now = Instant::now();
client_stats.waiting();

// Check if we can connect
let mut conn = match self.databases[address.shard][address.address_index]
.get()
Expand Down Expand Up @@ -669,19 +667,27 @@ impl ConnectionPool {
.stats()
.checkout_time(checkout_time, client_stats.application_name());
server.stats().active(client_stats.application_name());

client_stats.active();
return Ok((conn, address.clone()));
}

if self
.run_health_check(address, server, now, client_stats)
.await
{
let checkout_time: u64 = now.elapsed().as_micros() as u64;
client_stats.checkout_time(checkout_time);
server
.stats()
.checkout_time(checkout_time, client_stats.application_name());
server.stats().active(client_stats.application_name());
client_stats.active();
return Ok((conn, address.clone()));
} else {
continue;
}
}
client_stats.idle();
Err(Error::AllServersDown)
}

Expand Down Expand Up @@ -927,9 +933,6 @@ pub struct ServerPool {
/// Client/server mapping.
client_server_map: ClientServerMap,

/// Server statistics.
stats: Arc<PoolStats>,

/// Server auth hash (for auth passthrough).
auth_hash: Arc<RwLock<Option<String>>>,

Expand All @@ -946,7 +949,6 @@ impl ServerPool {
user: User,
database: &str,
client_server_map: ClientServerMap,
stats: Arc<PoolStats>,
auth_hash: Arc<RwLock<Option<String>>>,
plugins: Option<Plugins>,
cleanup_connections: bool,
Expand All @@ -956,7 +958,6 @@ impl ServerPool {
user: user.clone(),
database: database.to_string(),
client_server_map,
stats,
auth_hash,
plugins,
cleanup_connections,
Expand All @@ -975,7 +976,6 @@ impl ManageConnection for ServerPool {

let stats = Arc::new(ServerStats::new(
self.address.clone(),
self.stats.clone(),
tokio::time::Instant::now(),
));

Expand Down
24 changes: 11 additions & 13 deletions src/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;

use crate::config::Address;
use crate::pool::get_all_pools;
use crate::stats::{get_pool_stats, get_server_stats, ServerStats};
use crate::pool::{get_all_pools, PoolIdentifier};
use crate::stats::pool::PoolStats;
use crate::stats::{get_server_stats, ServerStats};

struct MetricHelpType {
help: &'static str,
Expand Down Expand Up @@ -233,10 +234,10 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
Self::from_name(&format!("stats_{}", name), value, labels)
}

fn from_pool(pool: &(String, String), name: &str, value: u64) -> Option<PrometheusMetric<u64>> {
fn from_pool(pool_id: PoolIdentifier, name: &str, value: u64) -> Option<PrometheusMetric<u64>> {
let mut labels = HashMap::new();
labels.insert("pool", pool.0.clone());
labels.insert("user", pool.1.clone());
labels.insert("pool", pool_id.db);
labels.insert("user", pool_id.user);

Self::from_name(&format!("pools_{}", name), value, labels)
}
Expand Down Expand Up @@ -284,18 +285,15 @@ fn push_address_stats(lines: &mut Vec<String>) {

// Adds relevant metrics shown in a SHOW POOLS admin command.
fn push_pool_stats(lines: &mut Vec<String>) {
let pool_stats = get_pool_stats();
for (pool, stats) in pool_stats.iter() {
let stats = &**stats;
let pool_stats = PoolStats::construct_pool_lookup();
for (pool_id, stats) in pool_stats.iter() {
for (name, value) in stats.clone() {
if let Some(prometheus_metric) = PrometheusMetric::<u64>::from_pool(pool, &name, value)
if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
{
lines.push(prometheus_metric.to_string());
} else {
warn!(
"Metric {} not implemented for ({},{})",
name, pool.0, pool.1
);
warn!("Metric {} not implemented for ({})", name, *pool_id);
}
}
}
Expand Down
21 changes: 0 additions & 21 deletions src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::pool::PoolIdentifier;
/// Statistics and reporting.
use arc_swap::ArcSwap;

Expand All @@ -16,13 +15,11 @@ pub mod pool;
pub mod server;
pub use address::AddressStats;
pub use client::{ClientState, ClientStats};
pub use pool::PoolStats;
pub use server::{ServerState, ServerStats};

/// Convenience types for various stats
type ClientStatesLookup = HashMap<i32, Arc<ClientStats>>;
type ServerStatesLookup = HashMap<i32, Arc<ServerStats>>;
type PoolStatsLookup = HashMap<(String, String), Arc<PoolStats>>;

/// Stats for individual client connections
/// Used in SHOW CLIENTS.
Expand All @@ -34,11 +31,6 @@ static CLIENT_STATS: Lazy<Arc<RwLock<ClientStatesLookup>>> =
static SERVER_STATS: Lazy<Arc<RwLock<ServerStatesLookup>>> =
Lazy::new(|| Arc::new(RwLock::new(ServerStatesLookup::default())));

/// Aggregate stats for each pool (a pool is identified by database name and username)
/// Used in SHOW POOLS.
static POOL_STATS: Lazy<Arc<RwLock<PoolStatsLookup>>> =
Lazy::new(|| Arc::new(RwLock::new(PoolStatsLookup::default())));

/// The statistics reporter. An instance is given to each possible source of statistics,
/// e.g. client stats, server stats, connection pool stats.
pub static REPORTER: Lazy<ArcSwap<Reporter>> =
Expand Down Expand Up @@ -80,13 +72,6 @@ impl Reporter {
fn server_disconnecting(&self, server_id: i32) {
SERVER_STATS.write().remove(&server_id);
}

/// Register a pool with the stats system.
fn pool_register(&self, identifier: PoolIdentifier, stats: Arc<PoolStats>) {
POOL_STATS
.write()
.insert((identifier.db, identifier.user), stats);
}
}

/// The statistics collector which used for calculating averages
Expand Down Expand Up @@ -139,12 +124,6 @@ pub fn get_server_stats() -> ServerStatesLookup {
SERVER_STATS.read().clone()
}

/// Get a snapshot of pool statistics.
/// by the `Collector`.
pub fn get_pool_stats() -> PoolStatsLookup {
POOL_STATS.read().clone()
}

/// Get the statistics reporter used to update stats across the pools/clients.
pub fn get_reporter() -> Reporter {
(*(*REPORTER.load())).clone()
Expand Down
Loading