Skip to content

sync commit #76

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
sync commit
  • Loading branch information
levkk committed Apr 3, 2025
commit 293a916121c1be5bc2fd42a1845e8106231ccae9
62 changes: 34 additions & 28 deletions pgdog.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,41 +26,47 @@ password = "pgdog"
name = "pgdog"
host = "127.0.0.1"

[[databases]]
name = "pgdog"
host = "127.0.0.1"
port = 5433
role = "replica"

#
# Sharded cluster with two primaries.
#
[[databases]]
name = "pgdog_sharded"
host = "127.0.0.1"
database_name = "shard_0"
shard = 0
# [[databases]]
# name = "pgdog_sharded"
# host = "127.0.0.1"
# database_name = "shard_0"
# shard = 0

[[databases]]
name = "pgdog_sharded"
host = "127.0.0.1"
database_name = "shard_1"
shard = 1
# [[databases]]
# name = "pgdog_sharded"
# host = "127.0.0.1"
# database_name = "shard_1"
# shard = 1

[[databases]]
name = "failover"
host = "127.0.0.1"
port = 5433
role = "primary"
database_name = "pgdog"
# [[databases]]
# name = "failover"
# host = "127.0.0.1"
# port = 5433
# role = "primary"
# database_name = "pgdog"

[[databases]]
name = "failover"
host = "127.0.0.1"
port = 5434
role = "replica"
database_name = "pgdog"
# [[databases]]
# name = "failover"
# host = "127.0.0.1"
# port = 5434
# role = "replica"
# database_name = "pgdog"

[[databases]]
name = "failover"
host = "127.0.0.1"
port = 5435
role = "replica"
database_name = "pgdog"
# [[databases]]
# name = "failover"
# host = "127.0.0.1"
# port = 5435
# role = "replica"
# database_name = "pgdog"

#
# Read/write access to theses tables will be automatically
Expand Down
1 change: 1 addition & 0 deletions pgdog/src/backend/databases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ pub fn from_config(config: &ConfigAndUsers) -> Databases {
user.pooler_mode.unwrap_or(general.pooler_mode),
sharded_tables,
user.replication_sharding.clone(),
user.synchronous_commit,
),
);
}
Expand Down
3 changes: 3 additions & 0 deletions pgdog/src/backend/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ pub enum Error {

#[error("{0}")]
Auth(#[from] crate::auth::Error),

#[error("no lsn")]
NoLsn,
}

impl Error {
Expand Down
3 changes: 3 additions & 0 deletions pgdog/src/backend/pool/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub struct Address {
pub user: String,
/// Password.
pub password: String,
/// Shard number.
pub shard: usize,
}

impl Address {
Expand Down Expand Up @@ -44,6 +46,7 @@ impl Address {
} else {
user.password.clone()
},
shard: database.shard,
}
}

Expand Down
2 changes: 1 addition & 1 deletion pgdog/src/backend/pool/ban.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::{Duration, Instant};
use super::Error;

/// Pool ban.
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Clone)]
pub struct Ban {
/// When the banw as created.
pub(super) created_at: Instant,
Expand Down
12 changes: 12 additions & 0 deletions pgdog/src/backend/pool/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct Cluster {
pooler_mode: PoolerMode,
sharded_tables: ShardedTables,
replication_sharding: Option<String>,
synchronous_commit: bool,
}

/// Sharding configuration from the cluster.
Expand All @@ -60,6 +61,7 @@ impl Cluster {
pooler_mode: PoolerMode,
sharded_tables: ShardedTables,
replication_sharding: Option<String>,
synchronous_commit: bool,
) -> Self {
Self {
shards: shards
Expand All @@ -71,6 +73,7 @@ impl Cluster {
pooler_mode,
sharded_tables,
replication_sharding,
synchronous_commit,
}
}

Expand Down Expand Up @@ -98,6 +101,7 @@ impl Cluster {
pooler_mode: self.pooler_mode,
sharded_tables: self.sharded_tables.clone(),
replication_sharding: self.replication_sharding.clone(),
synchronous_commit: self.synchronous_commit,
}
}

Expand All @@ -115,6 +119,10 @@ impl Cluster {
&self.shards
}

pub fn shard(&self, shard: usize) -> Option<&Shard> {
self.shards.get(shard)
}

/// Plugin input.
///
/// # Safety
Expand Down Expand Up @@ -217,6 +225,10 @@ impl Cluster {
tables: self.sharded_tables.clone(),
}
}

pub fn synchronous_commit(&self) -> bool {
self.synchronous_commit
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions pgdog/src/backend/pool/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub struct Config {
pub replication_mode: bool,
/// Pooler mode.
pub pooler_mode: PoolerMode,
/// Sync commit.
pub synchronous_commit: bool,
}

impl Config {
Expand Down Expand Up @@ -127,6 +129,7 @@ impl Config {
statement_timeout: user.statement_timeout,
replication_mode: user.replication_mode,
pooler_mode: user.pooler_mode.unwrap_or(general.pooler_mode),
synchronous_commit: user.synchronous_commit,
..Default::default()
}
}
Expand Down Expand Up @@ -154,6 +157,7 @@ impl Default for Config {
statement_timeout: None,
replication_mode: false,
pooler_mode: PoolerMode::default(),
synchronous_commit: false,
}
}
}
23 changes: 23 additions & 0 deletions pgdog/src/backend/pool/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub struct Connection {
database: String,
binding: Binding,
cluster: Option<Cluster>,
replica: bool,
}

impl Connection {
Expand All @@ -50,6 +51,7 @@ impl Connection {
cluster: None,
user: user.to_owned(),
database: database.to_owned(),
replica: false,
};

if !admin {
Expand Down Expand Up @@ -108,6 +110,7 @@ impl Connection {
} else {
self.cluster()?.primary(*shard, request).await?
};
self.replica = route.is_read();

// Cleanup session mode connections when
// they are done.
Expand Down Expand Up @@ -268,6 +271,26 @@ impl Connection {
}
}

pub async fn sync_commit(&mut self) -> Result<(), Error> {
if self.replica {
return Ok(());
}

let cluster = self.cluster()?.clone();
if let Binding::Server(Some(ref mut server)) = self.binding {
let replicas = cluster
.shard(server.addr().shard)
.map(|shard| shard.replica_pools())
.ok_or(Error::NotConnected)?;
let lsn = server.lsn(false).await?;
for mut replica in replicas {
replica.wait_for_lsn(lsn).await?;
}
};

Ok(())
}

/// We are done and can disconnect from this server.
pub fn done(&self) -> bool {
self.binding.done()
Expand Down
3 changes: 3 additions & 0 deletions pgdog/src/backend/pool/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,7 @@ pub enum Error {

#[error("all replicas down")]
AllReplicasDown,

#[error("{0}")]
Listener(#[from] crate::backend::pool::events::Error),
}
83 changes: 83 additions & 0 deletions pgdog/src/backend/pool/events/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use crate::{
backend::{pool::Address, Error, Pool, Server},
net::{Bind, Execute, Flush, Message, Parse, Protocol, Sync},
};

#[derive(Debug)]
pub struct Connection {
server: Option<Server>,
pool: Pool,
}

impl Connection {
pub fn new(pool: &Pool) -> Self {
Self {
server: None,
pool: pool.clone(),
}
}

pub async fn read(&mut self) -> Result<Message, Error> {
loop {
if let Some(ref mut server) = self.server {
return server.read().await;
} else {
self.reconnect().await?;
}
}
}

pub async fn execute(&mut self, name: &str) -> Result<(), Error> {
loop {
if let Some(ref mut server) = self.server {
return server
.send(vec![
Bind::new(name).message()?,
Execute::new().message()?,
Sync.message()?,
])
.await;
} else {
self.reconnect().await?;
}
}
}

async fn connect(&mut self) -> Result<(), Error> {
self.server =
Some(Server::connect(self.pool.addr(), self.pool.startup_parameters()).await?);
Ok(())
}

async fn prepare(&mut self) -> Result<(), Error> {
self.server
.as_mut()
.unwrap()
.send(vec![
Parse::named("lsn_primary", "SELECT pg_current_wal_lsn()").message()?,
Parse::named("lsn_replica", "SELECT pg_last_wal_replay_lsn()").message()?,
Parse::named("is_replica", "SELECT pg_is_in_recovery()").message()?,
Flush.message()?,
])
.await?;
self.server.as_mut().unwrap().flush().await?;

for _ in 0..3 {
let reply = self.server.as_mut().unwrap().read().await?;
if reply.code() != '1' {
return Err(Error::NotInSync);
}
}

Ok(())
}

pub async fn reconnect(&mut self) -> Result<(), Error> {
self.connect().await?;
self.prepare().await
}

pub fn addr(&self) -> Result<&Address, Error> {
Ok(self.server.as_ref().ok_or(Error::NotInSync)?.addr())
}
}
13 changes: 13 additions & 0 deletions pgdog/src/backend/pool/events/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use thiserror::Error;

#[derive(Debug, Error, PartialEq, Clone, Copy)]
pub enum Error {
#[error("watcher closed")]
Watcher,
}

impl From<tokio::sync::watch::error::RecvError> for Error {
fn from(_: tokio::sync::watch::error::RecvError) -> Self {
Error::Watcher
}
}
7 changes: 7 additions & 0 deletions pgdog/src/backend/pool/events/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use crate::net::{Message, PgLsn};

#[derive(Debug, Clone)]
pub enum Event {
Notify(Message),
Lsn(PgLsn),
}
Loading
Loading