Skip to content

Commit 9e9e30c

Browse files
authored
Merge pull request #310 from daniellietz/add-transaction-param-execute
Add `use_transaction` parameter for `postgres_query()` and `postgres_execute()`
2 parents 2f77f6d + e5bcea7 commit 9e9e30c

File tree

8 files changed

+182
-14
lines changed

8 files changed

+182
-14
lines changed

src/include/postgres_scanner.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ struct PostgresBindData : public FunctionData {
3939
bool can_use_main_thread = true;
4040
bool read_only = true;
4141
bool emit_ctid = false;
42+
bool use_transaction = true;
4243
idx_t max_threads = 1;
4344

4445
public:

src/include/storage/postgres_transaction.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@ class PostgresTransaction : public Transaction {
2828
void Commit();
2929
void Rollback();
3030

31+
PostgresConnection &GetConnectionWithoutTransaction();
3132
PostgresConnection &GetConnection();
33+
3234
string GetDSN();
3335
unique_ptr<PostgresResult> Query(const string &query);
36+
unique_ptr<PostgresResult> QueryWithoutTransaction(const string &query);
3437
vector<unique_ptr<PostgresResult>> ExecuteQueries(const string &queries);
3538
static PostgresTransaction &Get(ClientContext &context, Catalog &catalog);
3639

src/postgres_execute.cpp

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@
1010
namespace duckdb {
1111

1212
struct PGExecuteBindData : public TableFunctionData {
13-
explicit PGExecuteBindData(PostgresCatalog &pg_catalog, string query_p)
14-
: pg_catalog(pg_catalog), query(std::move(query_p)) {
13+
explicit PGExecuteBindData(PostgresCatalog &pg_catalog, string query_p, bool use_transaction)
14+
: pg_catalog(pg_catalog), query(std::move(query_p)), use_transaction(use_transaction) {
1515
}
1616

1717
bool finished = false;
1818
PostgresCatalog &pg_catalog;
1919
string query;
20+
bool use_transaction = true;
2021
};
2122

2223
static duckdb::unique_ptr<FunctionData> PGExecuteBind(ClientContext &context, TableFunctionBindInput &input,
@@ -36,7 +37,15 @@ static duckdb::unique_ptr<FunctionData> PGExecuteBind(ClientContext &context, Ta
3637
throw BinderException("Attached database \"%s\" does not refer to a Postgres database", db_name);
3738
}
3839
auto &pg_catalog = catalog.Cast<PostgresCatalog>();
39-
return make_uniq<PGExecuteBindData>(pg_catalog, input.inputs[1].GetValue<string>());
40+
41+
bool use_transaction = true;
42+
for (auto &kv : input.named_parameters) {
43+
if (kv.first == "use_transaction") {
44+
use_transaction = BooleanValue::Get(kv.second);
45+
}
46+
}
47+
48+
return make_uniq<PGExecuteBindData>(pg_catalog, input.inputs[1].GetValue<string>(), use_transaction);
4049
}
4150

4251
static void PGExecuteFunction(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) {
@@ -45,13 +54,19 @@ static void PGExecuteFunction(ClientContext &context, TableFunctionInput &data_p
4554
return;
4655
}
4756
auto &transaction = Transaction::Get(context, data.pg_catalog).Cast<PostgresTransaction>();
48-
transaction.Query(data.query);
57+
if (data.use_transaction) {
58+
transaction.Query(data.query);
59+
} else {
60+
transaction.QueryWithoutTransaction(data.query);
61+
}
62+
4963
data.finished = true;
5064
}
5165

5266
PostgresExecuteFunction::PostgresExecuteFunction()
5367
: TableFunction("postgres_execute", {LogicalType::VARCHAR, LogicalType::VARCHAR}, PGExecuteFunction,
5468
PGExecuteBind) {
69+
named_parameters["use_transaction"] = LogicalType::BOOLEAN;
5570
}
5671

5772
} // namespace duckdb

src/postgres_query.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,16 @@ static unique_ptr<FunctionData> PGQueryBind(ClientContext &context, TableFunctio
3838
StringUtil::RTrim(sql);
3939
}
4040

41-
auto &con = transaction.GetConnection();
41+
bool use_transaction = true;
42+
for (auto &kv : input.named_parameters) {
43+
if (kv.first == "use_transaction") {
44+
use_transaction = BooleanValue::Get(kv.second);
45+
}
46+
}
47+
result->use_transaction = use_transaction;
48+
49+
auto &con = use_transaction ? transaction.GetConnection() : transaction.GetConnectionWithoutTransaction();
50+
4251
auto conn = con.GetConn();
4352
// prepare execution of the query to figure out the result types and names
4453
auto prepared = PQprepare(conn, "", sql.c_str(), 0, nullptr);
@@ -87,6 +96,7 @@ static unique_ptr<FunctionData> PGQueryBind(ClientContext &context, TableFunctio
8796

8897
PostgresQueryFunction::PostgresQueryFunction()
8998
: TableFunction("postgres_query", {LogicalType::VARCHAR, LogicalType::VARCHAR}, nullptr, PGQueryBind) {
99+
named_parameters["use_transaction"] = LogicalType::BOOLEAN;
90100
PostgresScanFunction scan_function;
91101
init_global = scan_function.init_global;
92102
init_local = scan_function.init_local;

src/postgres_scanner.cpp

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -251,15 +251,14 @@ static void PostgresInitInternal(ClientContext &context, const PostgresBindData
251251
}
252252
if (bind_data->table_name.empty()) {
253253
D_ASSERT(!bind_data->sql.empty());
254-
lstate.sql = StringUtil::Format(
255-
R"(COPY (SELECT %s FROM (%s) AS __unnamed_subquery %s) TO STDOUT (FORMAT "binary");)",
256-
col_names, bind_data->sql, filter);
254+
lstate.sql =
255+
StringUtil::Format(R"(COPY (SELECT %s FROM (%s) AS __unnamed_subquery %s) TO STDOUT (FORMAT "binary");)",
256+
col_names, bind_data->sql, filter);
257257

258258
} else {
259-
lstate.sql = StringUtil::Format(
260-
R"(COPY (SELECT %s FROM %s.%s %s) TO STDOUT (FORMAT "binary");)",
261-
col_names, KeywordHelper::WriteQuoted(bind_data->schema_name, '"'),
262-
KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter);
259+
lstate.sql = StringUtil::Format(R"(COPY (SELECT %s FROM %s.%s %s) TO STDOUT (FORMAT "binary");)", col_names,
260+
KeywordHelper::WriteQuoted(bind_data->schema_name, '"'),
261+
KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter);
263262
}
264263
lstate.exec = false;
265264
lstate.done = false;
@@ -291,11 +290,14 @@ static unique_ptr<GlobalTableFunctionState> PostgresInitGlobalState(ClientContex
291290
auto pg_catalog = bind_data.GetCatalog();
292291
if (pg_catalog) {
293292
auto &transaction = Transaction::Get(context, *pg_catalog).Cast<PostgresTransaction>();
294-
auto &con = transaction.GetConnection();
293+
auto &con =
294+
bind_data.use_transaction ? transaction.GetConnection() : transaction.GetConnectionWithoutTransaction();
295295
result->SetConnection(con.GetConnection());
296296
} else {
297297
auto con = PostgresConnection::Open(bind_data.dsn);
298-
PostgresScanConnect(con, string());
298+
if (bind_data.use_transaction) {
299+
PostgresScanConnect(con, string());
300+
}
299301
result->SetConnection(std::move(con));
300302
}
301303
if (bind_data.requires_materialization) {

src/storage/postgres_transaction.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@ static string GetBeginTransactionQuery(AccessMode access_mode) {
3939
return result;
4040
}
4141

42+
PostgresConnection &PostgresTransaction::GetConnectionWithoutTransaction() {
43+
if (transaction_state == PostgresTransactionState::TRANSACTION_STARTED) {
44+
throw std::runtime_error("Execution without a Transaction is not possible if a Transaction already started");
45+
}
46+
if (access_mode == AccessMode::READ_ONLY) {
47+
throw std::runtime_error("Execution without a Transaction is not possible in Read Only Mode");
48+
}
49+
return connection.GetConnection();
50+
}
51+
4252
PostgresConnection &PostgresTransaction::GetConnection() {
4353
auto &con = GetConnectionRaw();
4454
if (transaction_state == PostgresTransactionState::TRANSACTION_NOT_YET_STARTED) {
@@ -68,6 +78,17 @@ unique_ptr<PostgresResult> PostgresTransaction::Query(const string &query) {
6878
return con.Query(query);
6979
}
7080

81+
unique_ptr<PostgresResult> PostgresTransaction::QueryWithoutTransaction(const string &query) {
82+
auto &con = GetConnectionRaw();
83+
if (transaction_state == PostgresTransactionState::TRANSACTION_STARTED) {
84+
throw std::runtime_error("Execution without a Transaction is not possible if a Transaction already started");
85+
}
86+
if (access_mode == AccessMode::READ_ONLY) {
87+
throw std::runtime_error("Execution without a Transaction is not possible in Read Only Mode");
88+
}
89+
return con.Query(query);
90+
}
91+
7192
vector<unique_ptr<PostgresResult>> PostgresTransaction::ExecuteQueries(const string &queries) {
7293
auto &con = GetConnectionRaw();
7394
if (transaction_state == PostgresTransactionState::TRANSACTION_NOT_YET_STARTED) {
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# name: test/sql/storage/postgres_execute_use_transaction.test
2+
# description: Test use_transaction flag in postgres_execute
3+
# group: [storage]
4+
5+
require postgres_scanner
6+
7+
require-env POSTGRES_TEST_DATABASE_AVAILABLE
8+
9+
statement ok
10+
PRAGMA enable_verification
11+
12+
statement ok
13+
ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES)
14+
15+
statement error
16+
CALL postgres_execute('s', 'VACUUM')
17+
----
18+
Invalid Error: Failed to execute query "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
19+
VACUUM": ERROR: VACUUM cannot run inside a transaction block
20+
21+
statement error
22+
CALL postgres_execute('s', 'VACUUM', use_transaction=true)
23+
----
24+
Invalid Error: Failed to execute query "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
25+
VACUUM": ERROR: VACUUM cannot run inside a transaction block
26+
27+
statement error
28+
CALL postgres_execute('s', 'VACUUM', use_transaction=true)
29+
----
30+
Invalid Error: Failed to execute query "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
31+
VACUUM": ERROR: VACUUM cannot run inside a transaction block
32+
33+
statement ok
34+
CALL postgres_execute('s', 'VACUUM', use_transaction=false)
35+
36+
statement ok
37+
BEGIN;
38+
39+
statement ok
40+
CALL postgres_execute('s', 'SELECT 1')
41+
42+
statement error
43+
CALL postgres_execute('s', 'VACUUM', use_transaction=false)
44+
----
45+
Invalid Error: Execution without a Transaction is not possible if a Transaction already started
46+
47+
statement ok
48+
ROLLBACK
49+
50+
statement ok
51+
CALL postgres_execute('s', 'VACUUM', use_transaction=false)
52+
53+
statement ok
54+
ATTACH 'dbname=postgresscanner' AS s2 (TYPE POSTGRES, READ_ONLY)
55+
56+
statement error
57+
CALL postgres_execute('s2', 'VACUUM', use_transaction=false)
58+
----
59+
Invalid Error: Execution without a Transaction is not possible in Read Only Mode
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# name: test/sql/storage/postgres_query_use_transaction.test
2+
# description: Test use_transaction flag in postgres_query
3+
# group: [storage]
4+
5+
require postgres_scanner
6+
7+
require-env POSTGRES_TEST_DATABASE_AVAILABLE
8+
9+
statement ok
10+
PRAGMA enable_verification
11+
12+
statement ok
13+
ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES)
14+
15+
query I
16+
CALL postgres_query('s', 'SELECT 1')
17+
----
18+
1
19+
20+
query I
21+
CALL postgres_query('s', 'SELECT 1', use_transaction=true)
22+
----
23+
1
24+
25+
query I
26+
CALL postgres_query('s', 'SELECT 1', use_transaction=false)
27+
----
28+
1
29+
30+
statement ok
31+
BEGIN;
32+
33+
query I
34+
CALL postgres_query('s', 'SELECT 1')
35+
----
36+
1
37+
38+
statement error
39+
CALL postgres_query('s', 'SELECT 1', use_transaction=false)
40+
----
41+
Invalid Error: Execution without a Transaction is not possible if a Transaction already started
42+
43+
statement ok
44+
ROLLBACK
45+
46+
query I
47+
CALL postgres_query('s', 'SELECT 1', use_transaction=false)
48+
----
49+
1
50+
51+
statement ok
52+
ATTACH 'dbname=postgresscanner' AS s2 (TYPE POSTGRES, READ_ONLY)
53+
54+
statement error
55+
CALL postgres_query('s2', 'SELECT 1', use_transaction=false)
56+
----
57+
Invalid Error: Execution without a Transaction is not possible in Read Only Mode

0 commit comments

Comments
 (0)