Skip to content

Optionally clean up server connections #444

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,10 +487,15 @@ pub struct Pool {
#[serde(default)] // False
pub primary_reads_enabled: bool,

/// Maximum time to allow for establishing a new server connection.
pub connect_timeout: Option<u64>,

/// Close idle connections that have been opened for longer than this.
pub idle_timeout: Option<u64>,

/// Close server connections that have been opened for longer than this.
/// Only applied to idle connections. If the connection is actively used for
/// longer than this period, the pool will not interrupt it.
pub server_lifetime: Option<u64>,

#[serde(default = "Pool::default_sharding_function")]
Expand All @@ -507,6 +512,9 @@ pub struct Pool {
pub auth_query_user: Option<String>,
pub auth_query_password: Option<String>,

#[serde(default = "Pool::default_cleanup_server_connections")]
pub cleanup_server_connections: bool,

pub plugins: Option<Plugins>,
pub shards: BTreeMap<String, Shard>,
pub users: BTreeMap<String, User>,
Expand Down Expand Up @@ -548,6 +556,10 @@ impl Pool {
ShardingFunction::PgBigintHash
}

pub fn default_cleanup_server_connections() -> bool {
true
}

pub fn validate(&mut self) -> Result<(), Error> {
match self.default_role.as_ref() {
"any" => (),
Expand Down Expand Up @@ -637,6 +649,7 @@ impl Default for Pool {
auth_query_password: None,
server_lifetime: None,
plugins: None,
cleanup_server_connections: true,
}
}
}
Expand Down Expand Up @@ -1066,6 +1079,10 @@ impl Config {
None => "default".to_string(),
}
);
info!(
"[pool: {}] Cleanup server connections: {}",
pool_name, pool_config.cleanup_server_connections
);
info!(
"[pool: {}] Plugins: {}",
pool_name,
Expand Down
1 change: 1 addition & 0 deletions src/mirrors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl MirroredClient {
Arc::new(PoolStats::new(identifier, cfg.clone())),
Arc::new(RwLock::new(None)),
None,
true,
);

Pool::builder()
Expand Down
20 changes: 20 additions & 0 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ impl ConnectionPool {
Some(ref plugins) => Some(plugins.clone()),
None => config.plugins.clone(),
},
pool_config.cleanup_server_connections,
);

let connect_timeout = match pool_config.connect_timeout {
Expand Down Expand Up @@ -914,13 +915,29 @@ impl ConnectionPool {

/// Wrapper for the bb8 connection pool.
pub struct ServerPool {
/// Server address.
address: Address,

/// Server Postgres user.
user: User,

/// Server database.
database: String,

/// Client/server mapping.
client_server_map: ClientServerMap,

/// Server statistics.
stats: Arc<PoolStats>,

/// Server auth hash (for auth passthrough).
auth_hash: Arc<RwLock<Option<String>>>,

/// Server plugins.
plugins: Option<Plugins>,

/// Should we clean up dirty connections before putting them into the pool?
cleanup_connections: bool,
}

impl ServerPool {
Expand All @@ -932,6 +949,7 @@ impl ServerPool {
stats: Arc<PoolStats>,
auth_hash: Arc<RwLock<Option<String>>>,
plugins: Option<Plugins>,
cleanup_connections: bool,
) -> ServerPool {
ServerPool {
address,
Expand All @@ -941,6 +959,7 @@ impl ServerPool {
stats,
auth_hash,
plugins,
cleanup_connections,
}
}
}
Expand Down Expand Up @@ -970,6 +989,7 @@ impl ManageConnection for ServerPool {
self.client_server_map.clone(),
stats.clone(),
self.auth_hash.clone(),
self.cleanup_connections,
)
.await
{
Expand Down
12 changes: 9 additions & 3 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,16 @@ pub struct Server {
/// Application name using the server at the moment.
application_name: String,

// Last time that a successful server send or response happened
/// Last time that a successful server send or response happened
last_activity: SystemTime,

mirror_manager: Option<MirroringManager>,

// Associated addresses used
/// Associated addresses used
addr_set: Option<AddrSet>,

/// Should clean up dirty connections?
cleanup_connections: bool,
}

impl Server {
Expand All @@ -207,6 +210,7 @@ impl Server {
client_server_map: ClientServerMap,
stats: Arc<ServerStats>,
auth_hash: Arc<RwLock<Option<String>>>,
cleanup_connections: bool,
) -> Result<Server, Error> {
let cached_resolver = CACHED_RESOLVER.load();
let mut addr_set: Option<AddrSet> = None;
Expand Down Expand Up @@ -687,6 +691,7 @@ impl Server {
address.mirrors.clone(),
)),
},
cleanup_connections,
};

server.set_name("pgcat").await?;
Expand Down Expand Up @@ -1004,7 +1009,7 @@ impl Server {
// to avoid leaking state between clients. For performance reasons we only
// send `DISCARD ALL` if we think the session is altered instead of just sending
// it before each checkin.
if self.cleanup_state.needs_cleanup() {
if self.cleanup_state.needs_cleanup() && self.cleanup_connections {
warn!("Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name);
self.query("DISCARD ALL").await?;
self.query("RESET ROLE").await?;
Expand Down Expand Up @@ -1084,6 +1089,7 @@ impl Server {
client_server_map,
Arc::new(ServerStats::default()),
Arc::new(RwLock::new(None)),
true,
)
.await?;
debug!("Connected!, sending query.");
Expand Down
46 changes: 25 additions & 21 deletions tests/ruby/helpers/pgcat_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction", lb
end
end

def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random", log_level="info")
def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random", log_level="info", pool_settings={})
user = {
"password" => "sharding_user",
"pool_size" => pool_size,
Expand All @@ -134,28 +134,32 @@ def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mo
replica1 = PgInstance.new(8432, user["username"], user["password"], "shard0")
replica2 = PgInstance.new(9432, user["username"], user["password"], "shard0")

pool_config = {
"default_role" => "any",
"pool_mode" => pool_mode,
"load_balancing_mode" => lb_mode,
"primary_reads_enabled" => false,
"query_parser_enabled" => false,
"sharding_function" => "pg_bigint_hash",
"shards" => {
"0" => {
"database" => "shard0",
"servers" => [
["localhost", primary.port.to_s, "primary"],
["localhost", replica0.port.to_s, "replica"],
["localhost", replica1.port.to_s, "replica"],
["localhost", replica2.port.to_s, "replica"]
]
},
},
"users" => { "0" => user }
}

pool_config = pool_config.merge(pool_settings)

# Main proxy configs
pgcat_cfg["pools"] = {
"#{pool_name}" => {
"default_role" => "any",
"pool_mode" => pool_mode,
"load_balancing_mode" => lb_mode,
"primary_reads_enabled" => false,
"query_parser_enabled" => false,
"sharding_function" => "pg_bigint_hash",
"shards" => {
"0" => {
"database" => "shard0",
"servers" => [
["localhost", primary.port.to_s, "primary"],
["localhost", replica0.port.to_s, "replica"],
["localhost", replica1.port.to_s, "replica"],
["localhost", replica2.port.to_s, "replica"]
]
},
},
"users" => { "0" => user }
}
"#{pool_name}" => pool_config,
}
pgcat_cfg["general"]["port"] = pgcat.port
pgcat.update_config(pgcat_cfg)
Expand Down
25 changes: 25 additions & 0 deletions tests/ruby/misc_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,31 @@
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
end
end

context "server cleanup disabled" do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 1, "transaction", "random", "info", { "cleanup_server_connections" => false }) }

it "will not clean up connection state" do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
processes.primary.reset_stats
conn.async_exec("SET statement_timeout TO 1000")
conn.close

puts processes.pgcat.logs
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
end

it "will not clean up prepared statements" do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
processes.primary.reset_stats
conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")

conn.close

puts processes.pgcat.logs
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
end
end
end

describe "Idle client timeout" do
Expand Down