Skip to content

Commit 929df26

Browse files
authored
Multi-tenant (#161)
* Multi-tenant * save * clippy * save * savee * routing tests * only load schema if multi-tenant * shortcut if not multi-tenant
1 parent 1b77414 commit 929df26

File tree

21 files changed

+605
-149
lines changed

21 files changed

+605
-149
lines changed

pgdog.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,6 @@ fingerprint = "f4814b6fadabc4c1" #[17618446160277259457]
106106

107107
[[manual_query]]
108108
fingerprint = "04dc05f480b702d3"
109+
110+
[multi_tenant]
111+
column = "tenant_id"

pgdog/src/backend/databases.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,8 +363,14 @@ pub(crate) fn new_pool(
363363
}
364364
};
365365

366-
let cluster_config =
367-
ClusterConfig::new(general, user, &shard_configs, sharded_tables, mirror_of);
366+
let cluster_config = ClusterConfig::new(
367+
general,
368+
user,
369+
&shard_configs,
370+
sharded_tables,
371+
mirror_of,
372+
config.multi_tenant(),
373+
);
368374

369375
Some((
370376
User {

pgdog/src/backend/pool/cluster.rs

Lines changed: 53 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
11
//! A collection of replicas and a primary.
22
3+
use parking_lot::RwLock;
4+
use std::sync::Arc;
5+
use tokio::spawn;
6+
use tracing::{error, info};
7+
38
use crate::{
49
backend::{
510
databases::databases,
611
replication::{ReplicationConfig, ShardedColumn},
7-
ShardedTables,
12+
Schema, ShardedTables,
813
},
9-
config::{General, PoolerMode, ShardedTable, User},
14+
config::{General, MultiTenant, PoolerMode, ShardedTable, User},
1015
net::messages::BackendKeyData,
1116
};
1217

1318
use super::{Address, Config, Error, Guard, Request, Shard};
1419
use crate::config::LoadBalancingStrategy;
1520

16-
use std::ffi::CString;
17-
1821
#[derive(Clone, Debug)]
1922
/// Database configuration.
2023
pub struct PoolConfig {
@@ -36,6 +39,8 @@ pub struct Cluster {
3639
sharded_tables: ShardedTables,
3740
replication_sharding: Option<String>,
3841
mirror_of: Option<String>,
42+
schema: Arc<RwLock<Schema>>,
43+
multi_tenant: Option<MultiTenant>,
3944
}
4045

4146
/// Sharding configuration from the cluster.
@@ -63,6 +68,7 @@ pub struct ClusterConfig<'a> {
6368
pub sharded_tables: ShardedTables,
6469
pub replication_sharding: Option<String>,
6570
pub mirror_of: Option<&'a str>,
71+
pub multi_tenant: &'a Option<MultiTenant>,
6672
}
6773

6874
impl<'a> ClusterConfig<'a> {
@@ -72,6 +78,7 @@ impl<'a> ClusterConfig<'a> {
7278
shards: &'a [ClusterShardConfig],
7379
sharded_tables: ShardedTables,
7480
mirror_of: Option<&'a str>,
81+
multi_tenant: &'a Option<MultiTenant>,
7582
) -> Self {
7683
Self {
7784
name: &user.database,
@@ -83,6 +90,7 @@ impl<'a> ClusterConfig<'a> {
8390
shards,
8491
sharded_tables,
8592
mirror_of,
93+
multi_tenant,
8694
}
8795
}
8896
}
@@ -100,6 +108,7 @@ impl Cluster {
100108
sharded_tables,
101109
replication_sharding,
102110
mirror_of,
111+
multi_tenant,
103112
} = config;
104113

105114
Self {
@@ -114,6 +123,8 @@ impl Cluster {
114123
sharded_tables,
115124
replication_sharding,
116125
mirror_of: mirror_of.map(|s| s.to_owned()),
126+
schema: Arc::new(RwLock::new(Schema::default())),
127+
multi_tenant: multi_tenant.clone(),
117128
}
118129
}
119130

@@ -160,6 +171,8 @@ impl Cluster {
160171
sharded_tables: self.sharded_tables.clone(),
161172
replication_sharding: self.replication_sharding.clone(),
162173
mirror_of: self.mirror_of.clone(),
174+
schema: self.schema.clone(),
175+
multi_tenant: self.multi_tenant.clone(),
163176
}
164177
}
165178

@@ -182,52 +195,6 @@ impl Cluster {
182195
self.mirror_of.as_deref()
183196
}
184197

185-
/// Plugin input.
186-
///
187-
/// # Safety
188-
///
189-
/// This allocates, so make sure to call `Config::drop` when you're done.
190-
///
191-
pub unsafe fn plugin_config(&self) -> Result<pgdog_plugin::bindings::Config, Error> {
192-
use pgdog_plugin::bindings::{Config, DatabaseConfig, Role_PRIMARY, Role_REPLICA};
193-
let mut databases: Vec<DatabaseConfig> = vec![];
194-
let name = CString::new(self.name.as_str()).map_err(|_| Error::NullBytes)?;
195-
196-
for (index, shard) in self.shards.iter().enumerate() {
197-
if let Some(ref primary) = shard.primary {
198-
// Ignore hosts with null bytes.
199-
let host = if let Ok(host) = CString::new(primary.addr().host.as_str()) {
200-
host
201-
} else {
202-
continue;
203-
};
204-
databases.push(DatabaseConfig::new(
205-
host,
206-
primary.addr().port,
207-
Role_PRIMARY,
208-
index,
209-
));
210-
}
211-
212-
for replica in shard.replicas.pools() {
213-
// Ignore hosts with null bytes.
214-
let host = if let Ok(host) = CString::new(replica.addr().host.as_str()) {
215-
host
216-
} else {
217-
continue;
218-
};
219-
databases.push(DatabaseConfig::new(
220-
host,
221-
replica.addr().port,
222-
Role_REPLICA,
223-
index,
224-
));
225-
}
226-
}
227-
228-
Ok(Config::new(name, &databases, self.shards.len()))
229-
}
230-
231198
/// Get the password the user should use to connect to the database.
232199
pub fn password(&self) -> &str {
233200
&self.password
@@ -280,6 +247,11 @@ impl Cluster {
280247
true
281248
}
282249

250+
/// Multi-tenant config.
251+
pub fn multi_tenant(&self) -> &Option<MultiTenant> {
252+
&self.multi_tenant
253+
}
254+
283255
/// Get replication configuration for this cluster.
284256
pub fn replication_sharding_config(&self) -> Option<ReplicationConfig> {
285257
self.replication_sharding
@@ -295,11 +267,42 @@ impl Cluster {
295267
}
296268
}
297269

270+
/// Update schema from primary.
271+
async fn update_schema(&self) -> Result<(), crate::backend::Error> {
272+
let mut server = self.primary(0, &Request::default()).await?;
273+
let schema = Schema::load(&mut server).await?;
274+
info!(
275+
"loaded {} tables from schema [{}]",
276+
schema.tables().len(),
277+
server.addr()
278+
);
279+
*self.schema.write() = schema;
280+
Ok(())
281+
}
282+
283+
fn load_schema(&self) -> bool {
284+
self.multi_tenant.is_some()
285+
}
286+
287+
/// Get currently loaded schema.
288+
pub fn schema(&self) -> Schema {
289+
self.schema.read().clone()
290+
}
291+
298292
/// Launch the connection pools.
299293
pub(crate) fn launch(&self) {
300294
for shard in self.shards() {
301295
shard.launch();
302296
}
297+
298+
if self.load_schema() {
299+
let me = self.clone();
300+
spawn(async move {
301+
if let Err(err) = me.update_schema().await {
302+
error!("error loading schema: {}", err);
303+
}
304+
});
305+
}
303306
}
304307

305308
/// Shutdown the connection pools.

pgdog/src/backend/pool/connection/mirror.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ use tracing::{debug, error};
66
use crate::backend::Cluster;
77
use crate::config::config;
88
use crate::frontend::client::timeouts::Timeouts;
9-
use crate::frontend::{PreparedStatements, Router};
9+
use crate::frontend::{PreparedStatements, Router, RouterContext};
10+
use crate::net::Parameters;
1011
use crate::state::State;
1112
use crate::{
1213
backend::pool::{Error as PoolError, Request},
@@ -37,6 +38,7 @@ pub(crate) struct Mirror {
3738
router: Router,
3839
cluster: Cluster,
3940
prepared_statements: PreparedStatements,
41+
params: Parameters,
4042
state: State,
4143
}
4244

@@ -50,6 +52,7 @@ impl Mirror {
5052
prepared_statements: PreparedStatements::new(),
5153
cluster: cluster.clone(),
5254
state: State::Idle,
55+
params: Parameters::default(),
5356
};
5457

5558
let config = config();
@@ -110,18 +113,21 @@ impl Mirror {
110113
pub(crate) async fn handle(&mut self, request: &MirrorRequest) -> Result<(), Error> {
111114
if !self.connection.connected() {
112115
// TODO: handle parsing errors.
113-
if let Err(err) = self.router.query(
116+
if let Ok(context) = RouterContext::new(
114117
&request.buffer,
115118
&self.cluster,
116119
&mut self.prepared_statements,
120+
&self.params,
117121
) {
118-
error!("mirror query parse error: {}", err);
119-
return Ok(()); // Drop request.
120-
}
122+
if let Err(err) = self.router.query(context) {
123+
error!("mirror query parse error: {}", err);
124+
return Ok(()); // Drop request.
125+
}
121126

122-
self.connection
123-
.connect(&request.request, &self.router.route())
124-
.await?;
127+
self.connection
128+
.connect(&request.request, &self.router.route())
129+
.await?;
130+
}
125131
}
126132

127133
// TODO: handle streaming.

pgdog/src/backend/schema/mod.rs

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
pub mod columns;
33
pub mod relation;
44

5-
use std::collections::HashMap;
5+
use std::sync::Arc;
6+
use std::{collections::HashMap, ops::Deref};
67
use tracing::debug;
78

89
pub use relation::Relation;
@@ -11,10 +12,16 @@ use super::{pool::Request, Cluster, Error, Server};
1112

1213
static SETUP: &str = include_str!("setup.sql");
1314

15+
#[derive(Debug, Default)]
16+
struct Inner {
17+
search_path: Vec<String>,
18+
relations: HashMap<(String, String), Relation>,
19+
}
20+
1421
/// Load schema from database.
1522
#[derive(Debug, Clone, Default)]
1623
pub struct Schema {
17-
relations: HashMap<(String, String), Relation>,
24+
inner: Arc<Inner>,
1825
}
1926

2027
impl Schema {
@@ -31,7 +38,23 @@ impl Schema {
3138
})
3239
.collect();
3340

34-
Ok(Self { relations })
41+
let search_path = server
42+
.fetch_all::<String>("SHOW search_path")
43+
.await?
44+
.pop()
45+
.unwrap_or(String::from("$user, public"))
46+
.split(",")
47+
.map(|p| p.trim().replace("\"", ""))
48+
.collect();
49+
50+
let inner = Inner {
51+
search_path,
52+
relations,
53+
};
54+
55+
Ok(Self {
56+
inner: Arc::new(inner),
57+
})
3558
}
3659

3760
/// Load schema from primary database.
@@ -68,7 +91,7 @@ impl Schema {
6891
.iter()
6992
.filter(|table| table.schema() != "pgdog")
7093
{
71-
let column_match = schema_table.columns.iter().find(|column| {
94+
let column_match = schema_table.columns.values().find(|column| {
7295
column.column_name == table.column && column.data_type == "bigint"
7396
});
7497
if let Some(column_match) = column_match {
@@ -110,24 +133,41 @@ impl Schema {
110133
/// Get table by name.
111134
pub fn table(&self, name: &str, schema: Option<&str>) -> Option<&Relation> {
112135
let schema = schema.unwrap_or("public");
113-
self.relations.get(&(name.to_string(), schema.to_string()))
136+
self.inner
137+
.relations
138+
.get(&(name.to_string(), schema.to_string()))
114139
}
115140

116141
/// Get all indices.
117142
pub fn tables(&self) -> Vec<&Relation> {
118-
self.relations
143+
self.inner
144+
.relations
119145
.values()
120146
.filter(|value| value.is_table())
121147
.collect()
122148
}
123149

124150
/// Get all sequences.
125151
pub fn sequences(&self) -> Vec<&Relation> {
126-
self.relations
152+
self.inner
153+
.relations
127154
.values()
128155
.filter(|value| value.is_sequence())
129156
.collect()
130157
}
158+
159+
/// Get search path components.
160+
pub fn search_path(&self) -> &[String] {
161+
&self.inner.search_path
162+
}
163+
}
164+
165+
impl Deref for Schema {
166+
type Target = HashMap<(String, String), Relation>;
167+
168+
fn deref(&self) -> &Self::Target {
169+
&self.inner.relations
170+
}
131171
}
132172

133173
#[cfg(test)]

0 commit comments

Comments
 (0)