Skip to content

feat: side-effect SELECT #158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 210 additions & 0 deletions pgdog/src/frontend/router/parser/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@ use pg_query::{
use regex::Regex;
use tracing::{debug, trace};

static SIDE_EFFECT_FUNCTIONS: Lazy<HashSet<std::string::String>> = Lazy::new(|| {
let mut set: HashSet<std::string::String> = HashSet::new();
// Advisory lock functions
set.insert("pg_advisory_lock".to_lowercase());
set.insert("pg_advisory_xact_lock".to_lowercase());
set.insert("pg_advisory_lock_shared".to_lowercase());
set.insert("pg_advisory_xact_lock_shared".to_lowercase());
set.insert("pg_try_advisory_lock".to_lowercase());
set.insert("pg_try_advisory_xact_lock".to_lowercase());
set.insert("pg_try_advisory_lock_shared".to_lowercase());
set.insert("pg_try_advisory_xact_lock_shared".to_lowercase());
// Sequence functions
set.insert("nextval".to_lowercase());
set.insert("setval".to_lowercase());
set
});

static REPLICATION_REGEX: Lazy<Regex> = Lazy::new(|| {
Regex::new(
"(CREATE_REPLICATION_SLOT|IDENTIFY_SYSTEM|DROP_REPLICATION_SLOT|READ_REPLICATION_SLOT|ALTER_REPLICATION_SLOT|TIMELINE_HISTORY).*",
Expand Down Expand Up @@ -229,6 +246,13 @@ impl QueryParser {
if matches!(shard, Shard::Direct(_)) {
return Ok(Command::Query(Route::read(shard)));
}
// Side-effects such as advisory locks
else if Self::has_side_effect_function(stmt) {
debug!(
"Query contains side-effect function, routing to a single primary shard."
);
return Ok(Command::Query(Route::write(None)));
}
// `SELECT NOW()`, `SELECT 1`, etc.
else if ast.tables().is_empty() {
return Ok(Command::Query(Route::read(Some(
Expand Down Expand Up @@ -701,6 +725,165 @@ impl QueryParser {
fn delete(_stmt: &DeleteStmt) -> Result<Command, Error> {
Ok(Command::Query(Route::write(None)))
}

/// Check if a SELECT statement contains any functions with side effects.
fn has_side_effect_function(stmt: &SelectStmt) -> bool {
// Check target list items for side effect functions
for target_item_wrapper_node in &stmt.target_list {
if Self::has_side_effect_node(&target_item_wrapper_node.node) {
return true;
}
}

// Check WHERE clause
if let Some(ref where_clause) = stmt.where_clause {
if Self::has_side_effect_node(&where_clause.node) {
return true;
}
}

// Check HAVING clause
if let Some(ref having_clause) = stmt.having_clause {
if Self::has_side_effect_node(&having_clause.node) {
return true;
}
}

false
}

/// Recursively check if a node contains a function with side effects.
fn has_side_effect_node(node_option: &Option<NodeEnum>) -> bool {
let Some(node) = node_option else {
return false;
};

match node {
// Direct function call
NodeEnum::FuncCall(func) => {
if let Some(name_node) = func.funcname.first() {
if let Some(NodeEnum::String(string_val)) = &name_node.node {
return SIDE_EFFECT_FUNCTIONS.contains(&string_val.sval.to_lowercase());
}
}
false
}

// ResTarget wraps an expression
NodeEnum::ResTarget(res_target) => {
if let Some(ref val) = res_target.val {
Self::has_side_effect_node(&val.node)
} else {
false
}
}

// TypeCast wraps an expression
NodeEnum::TypeCast(type_cast) => {
if let Some(ref arg) = type_cast.arg {
Self::has_side_effect_node(&arg.node)
} else {
false
}
}

// A-expressions (binary operations)
NodeEnum::AExpr(a_expr) => {
// Check both sides of the expression
if let Some(arg) = &a_expr.lexpr {
if Self::has_side_effect_node(&arg.node) {
return true;
}
}
if let Some(arg) = &a_expr.rexpr {
if Self::has_side_effect_node(&arg.node) {
return true;
}
}
false
}

// BoolExpr (AND, OR, NOT)
NodeEnum::BoolExpr(bool_expr) => {
for arg in &bool_expr.args {
if Self::has_side_effect_node(&arg.node) {
return true;
}
}
false
}

// SubLink (subquery)
NodeEnum::SubLink(sub_link) => {
if let Some(ref subselect) = sub_link.subselect {
if let Some(NodeEnum::SelectStmt(select_stmt)) = &subselect.node {
return Self::has_side_effect_function(select_stmt);
}
}
false
}

// CaseExpr (CASE WHEN ... THEN ... END)
NodeEnum::CaseExpr(case_expr) => {
// Check the argument expression
if let Some(ref arg) = case_expr.arg {
if Self::has_side_effect_node(&arg.node) {
return true;
}
}

// Check each WHEN ... THEN ... clause
for when_clause in &case_expr.args {
if let Some(NodeEnum::CaseWhen(case_when)) = &when_clause.node {
// Check the WHEN expression
if let Some(ref expr) = case_when.expr {
if Self::has_side_effect_node(&expr.node) {
return true;
}
}

// Check the THEN result
if let Some(ref result) = case_when.result {
if Self::has_side_effect_node(&result.node) {
return true;
}
}
}
}

// Check the ELSE clause
if let Some(ref defresult) = case_expr.defresult {
if Self::has_side_effect_node(&defresult.node) {
return true;
}
}

false
}

// CoalesceExpr (COALESCE function)
NodeEnum::CoalesceExpr(coalesce) => {
for arg in &coalesce.args {
if Self::has_side_effect_node(&arg.node) {
return true;
}
}
false
}

// NullTest (IS NULL, IS NOT NULL)
NodeEnum::NullTest(null_test) => {
if let Some(ref arg) = null_test.arg {
Self::has_side_effect_node(&arg.node)
} else {
false
}
}

// Add other node types as needed
_ => false,
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -955,4 +1138,31 @@ mod test {
assert!(!qp.routed);
assert!(qp.in_transaction);
}
#[test]
fn test_lock() {
let route = query!("SELECT pg_advisory_lock('test')::bool");
assert!(matches!(route.shard(), Shard::All));
assert!(route.is_write());
}

#[test]
fn test_lock_param() {
let route = query!("SELECT pg_advisory_lock($1)");
assert!(matches!(route.shard(), Shard::All));
assert!(route.is_write());
}

#[test]
fn test_lock_2() {
let route = query!("SELECT pg_advisory_lock('test')");
assert!(matches!(route.shard(), Shard::All));
assert!(route.is_write());
}

#[test]
fn test_non_write() {
let route = query!("SELECT 1");
assert!(matches!(route.shard(), Shard::Direct(_)));
assert!(!route.is_write());
}
}