Skip to content

Make isolation level configurable #333

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/MainDistributionPipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ jobs:
name: Build extension binaries
uses: duckdb/extension-ci-tools/.github/workflows/[email protected]
with:
duckdb_version: main
ci_tools_version: main
duckdb_version: v1.3.0
ci_tools_version: v1.3.0
extension_name: postgres_scanner
exclude_archs: 'wasm_mvp;wasm_eh;wasm_threads;windows_amd64_mingw'

Expand All @@ -27,8 +27,8 @@ jobs:
uses: duckdb/extension-ci-tools/.github/workflows/[email protected]
secrets: inherit
with:
duckdb_version: main
ci_tools_version: main
duckdb_version: v1.3.0
ci_tools_version: v1.3.0
extension_name: postgres_scanner
exclude_archs: 'wasm_mvp;wasm_eh;wasm_threads;windows_amd64_mingw'
deploy_latest: ${{ startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main' }}
2 changes: 2 additions & 0 deletions src/include/postgres_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ struct PostgresCopyState {
void Initialize(ClientContext &context);
};

enum class PostgresIsolationLevel { READ_COMMITTED, REPEATABLE_READ, SERIALIZABLE };

class PostgresUtils {
public:
static PGconn *PGConnect(const string &dsn);
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/postgres_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ class PostgresSchemaEntry;
class PostgresCatalog : public Catalog {
public:
explicit PostgresCatalog(AttachedDatabase &db_p, string connection_string, string attach_path,
AccessMode access_mode, string schema_to_load);
AccessMode access_mode, string schema_to_load, PostgresIsolationLevel isolation_level);
~PostgresCatalog();

string connection_string;
string attach_path;
AccessMode access_mode;
PostgresIsolationLevel isolation_level;

public:
void Initialize(bool load_builtin) override;
Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/postgres_transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ class PostgresTransaction : public Transaction {
PostgresPoolConnection connection;
PostgresTransactionState transaction_state;
AccessMode access_mode;
PostgresIsolationLevel isolation_level;
string temporary_schema;

private:
//! Retrieves the connection **without** starting a transaction if none is active
PostgresConnection &GetConnectionRaw();

string GetBeginTransactionQuery();
};

} // namespace duckdb
2 changes: 1 addition & 1 deletion src/postgres_binary_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,4 @@ void PostgresBinaryCopyFunction::PostgresBinaryWriteFinalize(ClientContext &cont
gstate.Flush();
}

} // namespace duckdb
} // namespace duckdb
2 changes: 1 addition & 1 deletion src/postgres_execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ static void PGExecuteFunction(ClientContext &context, TableFunctionInput &data_p
PostgresExecuteFunction::PostgresExecuteFunction()
: TableFunction("postgres_execute", {LogicalType::VARCHAR, LogicalType::VARCHAR}, PGExecuteFunction,
PGExecuteBind) {
named_parameters["use_transaction"] = LogicalType::BOOLEAN;
named_parameters["use_transaction"] = LogicalType::BOOLEAN;
}

} // namespace duckdb
5 changes: 2 additions & 3 deletions src/postgres_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,8 @@ static void LoadInternal(DatabaseInstance &db) {
LogicalType::BOOLEAN, Value::BOOLEAN(false), PostgresClearCacheFunction::ClearCacheOnSetting);
config.AddExtensionOption("pg_connection_cache", "Whether or not to use the connection cache", LogicalType::BOOLEAN,
Value::BOOLEAN(true), PostgresConnectionPool::PostgresSetConnectionCache);
config.AddExtensionOption("pg_experimental_filter_pushdown",
"Whether or not to use filter pushdown", LogicalType::BOOLEAN,
Value::BOOLEAN(true));
config.AddExtensionOption("pg_experimental_filter_pushdown", "Whether or not to use filter pushdown",
LogicalType::BOOLEAN, Value::BOOLEAN(true));
config.AddExtensionOption("pg_null_byte_replacement",
"When writing NULL bytes to Postgres, replace them with the given character",
LogicalType::VARCHAR, Value(), SetPostgresNullByteReplacement);
Expand Down
4 changes: 2 additions & 2 deletions src/postgres_filter_pushdown.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ string TransformBlob(const string &val) {
char const HEX_DIGITS[] = "0123456789ABCDEF";

string result = "'\\x";
for(idx_t i = 0; i < val.size(); i++) {
for (idx_t i = 0; i < val.size(); i++) {
uint8_t byte_val = static_cast<uint8_t>(val[i]);
result += HEX_DIGITS[(byte_val >> 4) & 0xf];
result += HEX_DIGITS[byte_val & 0xf];
Expand Down Expand Up @@ -96,7 +96,7 @@ string PostgresFilterPushdown::TransformFilter(string &column_name, TableFilter
case TableFilterType::IN_FILTER: {
auto &in_filter = filter.Cast<InFilter>();
string in_list;
for(auto &val : in_filter.values) {
for (auto &val : in_filter.values) {
if (!in_list.empty()) {
in_list += ", ";
}
Expand Down
14 changes: 7 additions & 7 deletions src/postgres_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,15 +251,15 @@ static void PostgresInitInternal(ClientContext &context, const PostgresBindData
}
if (bind_data->table_name.empty()) {
D_ASSERT(!bind_data->sql.empty());
lstate.sql = StringUtil::Format(
R"(COPY (SELECT %s FROM (%s) AS __unnamed_subquery %s%s) TO STDOUT (FORMAT "binary");)",
col_names, bind_data->sql, filter, bind_data->limit);
lstate.sql =
StringUtil::Format(R"(COPY (SELECT %s FROM (%s) AS __unnamed_subquery %s%s) TO STDOUT (FORMAT "binary");)",
col_names, bind_data->sql, filter, bind_data->limit);

} else {
lstate.sql = StringUtil::Format(
R"(COPY (SELECT %s FROM %s.%s %s%s) TO STDOUT (FORMAT "binary");)",
col_names, KeywordHelper::WriteQuoted(bind_data->schema_name, '"'),
KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter, bind_data->limit);
lstate.sql =
StringUtil::Format(R"(COPY (SELECT %s FROM %s.%s %s%s) TO STDOUT (FORMAT "binary");)", col_names,
KeywordHelper::WriteQuoted(bind_data->schema_name, '"'),
KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter, bind_data->limit);
}
lstate.exec = false;
lstate.done = false;
Expand Down
18 changes: 17 additions & 1 deletion src/postgres_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ static unique_ptr<Catalog> PostgresAttach(StorageExtensionInfo *storage_info, Cl

string secret_name;
string schema_to_load;
PostgresIsolationLevel isolation_level = PostgresIsolationLevel::REPEATABLE_READ;
for (auto &entry : info.options) {
auto lower_name = StringUtil::Lower(entry.first);
if (lower_name == "type" || lower_name == "read_only") {
Expand All @@ -26,12 +27,27 @@ static unique_ptr<Catalog> PostgresAttach(StorageExtensionInfo *storage_info, Cl
secret_name = entry.second.ToString();
} else if (lower_name == "schema") {
schema_to_load = entry.second.ToString();
} else if (lower_name == "isolation_level") {
auto param = entry.second.ToString();
auto lparam = StringUtil::Lower(param);
if (lparam == "read committed") {
isolation_level = PostgresIsolationLevel::READ_COMMITTED;
} else if (lparam == "repeatable read") {
isolation_level = PostgresIsolationLevel::REPEATABLE_READ;
} else if (lparam == "serializable") {
isolation_level = PostgresIsolationLevel::SERIALIZABLE;
} else {
throw InvalidInputException("Invalid value \"%s\" for isolation_level, expected READ COMMITTED, "
"REPEATABLE READ or SERIALIZABLE",
param);
}
} else {
throw BinderException("Unrecognized option for Postgres attach: %s", entry.first);
}
}
auto connection_string = PostgresCatalog::GetConnectionString(context, attach_path, secret_name);
return make_uniq<PostgresCatalog>(db, std::move(connection_string), std::move(attach_path), access_mode, std::move(schema_to_load));
return make_uniq<PostgresCatalog>(db, std::move(connection_string), std::move(attach_path), access_mode,
std::move(schema_to_load), isolation_level);
}

static unique_ptr<TransactionManager> PostgresCreateTransactionManager(StorageExtensionInfo *storage_info,
Expand Down
17 changes: 9 additions & 8 deletions src/storage/postgres_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@

namespace duckdb {

PostgresCatalog::PostgresCatalog(AttachedDatabase &db_p, string connection_string_p, string attach_path_p, AccessMode access_mode,
string schema_to_load)
: Catalog(db_p), connection_string(std::move(connection_string_p)), attach_path(std::move(attach_path_p)), access_mode(access_mode), schemas(*this, schema_to_load), connection_pool(*this),
default_schema(schema_to_load) {
PostgresCatalog::PostgresCatalog(AttachedDatabase &db_p, string connection_string_p, string attach_path_p,
AccessMode access_mode, string schema_to_load, PostgresIsolationLevel isolation_level)
: Catalog(db_p), connection_string(std::move(connection_string_p)), attach_path(std::move(attach_path_p)),
access_mode(access_mode), isolation_level(isolation_level), schemas(*this, schema_to_load),
connection_pool(*this), default_schema(schema_to_load) {
if (default_schema.empty()) {
default_schema = "public";
}
Expand Down Expand Up @@ -100,7 +101,6 @@ string PostgresCatalog::GetConnectionString(ClientContext &context, const string
return connection_string;
}


PostgresCatalog::~PostgresCatalog() = default;

void PostgresCatalog::Initialize(bool load_builtin) {
Expand Down Expand Up @@ -138,9 +138,10 @@ void PostgresCatalog::ScanSchemas(ClientContext &context, std::function<void(Sch
schemas.Scan(context, [&](CatalogEntry &schema) { callback(schema.Cast<PostgresSchemaEntry>()); });
}

optional_ptr<SchemaCatalogEntry> PostgresCatalog::LookupSchema(CatalogTransaction transaction, const EntryLookupInfo &schema_lookup,
OnEntryNotFound if_not_found) {
auto schema_name = schema_lookup.GetEntryName();
optional_ptr<SchemaCatalogEntry> PostgresCatalog::LookupSchema(CatalogTransaction transaction,
const EntryLookupInfo &schema_lookup,
OnEntryNotFound if_not_found) {
auto schema_name = schema_lookup.GetEntryName();
auto &postgres_transaction = PostgresTransaction::Get(transaction.GetContext(), *this);
if (schema_name == "pg_temp") {
schema_name = postgres_transaction.GetTemporarySchema();
Expand Down
5 changes: 3 additions & 2 deletions src/storage/postgres_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,16 @@ InsertionOrderPreservingMap<string> PostgresDelete::ParamsToString() const {
//===--------------------------------------------------------------------===//
// Plan
//===--------------------------------------------------------------------===//
PhysicalOperator &PostgresCatalog::PlanDelete(ClientContext &context, PhysicalPlanGenerator &planner, LogicalDelete &op, PhysicalOperator &plan) {
PhysicalOperator &PostgresCatalog::PlanDelete(ClientContext &context, PhysicalPlanGenerator &planner, LogicalDelete &op,
PhysicalOperator &plan) {
if (op.return_chunk) {
throw BinderException("RETURNING clause not yet supported for deletion of a Postgres table");
}
auto &bound_ref = op.expressions[0]->Cast<BoundReferenceExpression>();
PostgresCatalog::MaterializePostgresScans(plan);

auto &delete_op = planner.Make<PostgresDelete>(op, op.table, bound_ref.index);
delete_op.children.push_back(plan);
delete_op.children.push_back(plan);
return delete_op;
}

Expand Down
60 changes: 32 additions & 28 deletions src/storage/postgres_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ InsertionOrderPreservingMap<string> PostgresInsert::ParamsToString() const {
//===--------------------------------------------------------------------===//
// Plan
//===--------------------------------------------------------------------===//
PhysicalOperator &AddCastToPostgresTypes(ClientContext &context, PhysicalPlanGenerator &planner, PhysicalOperator &plan) {
PhysicalOperator &AddCastToPostgresTypes(ClientContext &context, PhysicalPlanGenerator &planner,
PhysicalOperator &plan) {
// check if we need to cast anything
bool require_cast = false;
auto &child_types = plan.GetTypes();
Expand All @@ -164,30 +165,31 @@ PhysicalOperator &AddCastToPostgresTypes(ClientContext &context, PhysicalPlanGen
break;
}
}
if (!require_cast) {
return plan;
}

vector<LogicalType> postgres_types;
vector<unique_ptr<Expression>> select_list;
for (idx_t i = 0; i < child_types.size(); i++) {
auto &type = child_types[i];
unique_ptr<Expression> expr;
expr = make_uniq<BoundReferenceExpression>(type, i);

auto postgres_type = PostgresUtils::ToPostgresType(type);
if (postgres_type != type) {
// add a cast
expr = BoundCastExpression::AddCastToType(context, std::move(expr), postgres_type);
}
postgres_types.push_back(std::move(postgres_type));
select_list.push_back(std::move(expr));
}

// we need to cast: add casts
auto &proj = planner.Make<PhysicalProjection>(std::move(postgres_types), std::move(select_list), plan.estimated_cardinality);
proj.children.push_back(plan);
return proj;
if (!require_cast) {
return plan;
}

vector<LogicalType> postgres_types;
vector<unique_ptr<Expression>> select_list;
for (idx_t i = 0; i < child_types.size(); i++) {
auto &type = child_types[i];
unique_ptr<Expression> expr;
expr = make_uniq<BoundReferenceExpression>(type, i);

auto postgres_type = PostgresUtils::ToPostgresType(type);
if (postgres_type != type) {
// add a cast
expr = BoundCastExpression::AddCastToType(context, std::move(expr), postgres_type);
}
postgres_types.push_back(std::move(postgres_type));
select_list.push_back(std::move(expr));
}

// we need to cast: add casts
auto &proj =
planner.Make<PhysicalProjection>(std::move(postgres_types), std::move(select_list), plan.estimated_cardinality);
proj.children.push_back(plan);
return proj;
}

bool PostgresCatalog::IsPostgresScan(const string &name) {
Expand All @@ -210,15 +212,16 @@ void PostgresCatalog::MaterializePostgresScans(PhysicalOperator &op) {
}
}

PhysicalOperator &PostgresCatalog::PlanInsert(ClientContext &context, PhysicalPlanGenerator &planner, LogicalInsert &op, optional_ptr<PhysicalOperator> plan) {
PhysicalOperator &PostgresCatalog::PlanInsert(ClientContext &context, PhysicalPlanGenerator &planner, LogicalInsert &op,
optional_ptr<PhysicalOperator> plan) {
if (op.return_chunk) {
throw BinderException("RETURNING clause not yet supported for insertion into Postgres table");
}
if (op.action_type != OnConflictAction::THROW) {
throw BinderException("ON CONFLICT clause not yet supported for insertion into Postgres table");
}

D_ASSERT(plan);
D_ASSERT(plan);
MaterializePostgresScans(*plan);
auto &inner_plan = AddCastToPostgresTypes(context, planner, *plan);

Expand All @@ -227,7 +230,8 @@ PhysicalOperator &PostgresCatalog::PlanInsert(ClientContext &context, PhysicalPl
return insert;
}

PhysicalOperator &PostgresCatalog::PlanCreateTableAs(ClientContext &context, PhysicalPlanGenerator &planner, LogicalCreateTable &op, PhysicalOperator &plan) {
PhysicalOperator &PostgresCatalog::PlanCreateTableAs(ClientContext &context, PhysicalPlanGenerator &planner,
LogicalCreateTable &op, PhysicalOperator &plan) {
auto &inner_plan = AddCastToPostgresTypes(context, planner, plan);
MaterializePostgresScans(inner_plan);

Expand Down
1 change: 0 additions & 1 deletion src/storage/postgres_optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "storage/postgres_catalog.hpp"
#include "postgres_scanner.hpp"


namespace duckdb {

struct PostgresOperators {
Expand Down
5 changes: 3 additions & 2 deletions src/storage/postgres_schema_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ void PostgresSchemaEntry::DropEntry(ClientContext &context, DropInfo &info) {
GetCatalogSet(info.type).DropEntry(context, info);
}

optional_ptr<CatalogEntry> PostgresSchemaEntry::LookupEntry(CatalogTransaction transaction, const EntryLookupInfo &lookup_info) {
optional_ptr<CatalogEntry> PostgresSchemaEntry::LookupEntry(CatalogTransaction transaction,
const EntryLookupInfo &lookup_info) {
auto catalog_type = lookup_info.GetCatalogType();
if (!CatalogTypeIsSupported(catalog_type)) {
return nullptr;
Expand All @@ -205,4 +206,4 @@ PostgresCatalogSet &PostgresSchemaEntry::GetCatalogSet(CatalogType type) {
}
}

} // namespace duckdb
} // namespace duckdb
Loading
Loading