bool have_error; /* have any subxacts aborted in this xact? */
bool changing_xact_state; /* xact state change in process */
bool parallel_commit; /* do we commit (sub)xacts in parallel? */
+ bool parallel_abort; /* do we abort (sub)xacts in parallel? */
bool invalidated; /* true if reconnect is pending */
bool keep_connections; /* setting value of keep_connections
* server option */
/* tracks whether any work is needed in callback functions */
static bool xact_got_connection = false;
+/*
+ * Milliseconds to wait to cancel an in-progress query or execute a cleanup
+ * query; if it takes longer than 30 seconds to do these, we assume the
+ * connection is dead.
+ */
+#define CONNECTION_CLEANUP_TIMEOUT 30000
+
+/* Macro for constructing abort command to be sent */
+#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
+ do { \
+ if (toplevel) \
+ snprintf((sql), sizeof(sql), \
+ "ABORT TRANSACTION"); \
+ else \
+ snprintf((sql), sizeof(sql), \
+ "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
+ (entry)->xact_depth, (entry)->xact_depth); \
+ } while(0)
+
/*
* SQL functions
*/
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
static bool pgfdw_cancel_query(PGconn *conn);
+static bool pgfdw_cancel_query_begin(PGconn *conn);
+static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
+ bool consume_input);
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool ignore_errors);
+static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
+static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
+ TimestampTz endtime,
+ bool consume_input,
+ bool ignore_errors);
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
PGresult **result, bool *timed_out);
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
+static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
+ List **pending_entries,
+ List **cancel_requested);
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
int curlevel);
+static void pgfdw_finish_abort_cleanup(List *pending_entries,
+ List *cancel_requested,
+ bool toplevel);
static bool UserMappingPasswordRequired(UserMapping *user);
static bool disconnect_cached_connections(Oid serverid);
*
* By default, all the connections to any foreign servers are kept open.
*
- * Also determine whether to commit (sub)transactions opened on the remote
- * server in parallel at (sub)transaction end, which is disabled by
+ * Also determine whether to commit/abort (sub)transactions opened on the
+ * remote server in parallel at (sub)transaction end, which is disabled by
* default.
*
* Note: it's enough to determine these only when making a new connection
*/
entry->keep_connections = true;
entry->parallel_commit = false;
+ entry->parallel_abort = false;
foreach(lc, server->options)
{
DefElem *def = (DefElem *) lfirst(lc);
entry->keep_connections = defGetBoolean(def);
else if (strcmp(def->defname, "parallel_commit") == 0)
entry->parallel_commit = defGetBoolean(def);
+ else if (strcmp(def->defname, "parallel_abort") == 0)
+ entry->parallel_abort = defGetBoolean(def);
}
/* Now try to make the connection */
HASH_SEQ_STATUS scan;
ConnCacheEntry *entry;
List *pending_entries = NIL;
+ List *cancel_requested = NIL;
/* Quick exit if no connections were touched in this transaction. */
if (!xact_got_connection)
case XACT_EVENT_PARALLEL_ABORT:
case XACT_EVENT_ABORT:
/* Rollback all remote transactions during abort */
- pgfdw_abort_cleanup(entry, true);
+ if (entry->parallel_abort)
+ {
+ if (pgfdw_abort_cleanup_begin(entry, true,
+ &pending_entries,
+ &cancel_requested))
+ continue;
+ }
+ else
+ pgfdw_abort_cleanup(entry, true);
break;
}
}
}
/* If there are any pending connections, finish cleaning them up */
- if (pending_entries)
+ if (pending_entries || cancel_requested)
{
- Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
- event == XACT_EVENT_PRE_COMMIT);
- pgfdw_finish_pre_commit_cleanup(pending_entries);
+ if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
+ event == XACT_EVENT_PRE_COMMIT)
+ {
+ Assert(cancel_requested == NIL);
+ pgfdw_finish_pre_commit_cleanup(pending_entries);
+ }
+ else
+ {
+ Assert(event == XACT_EVENT_PARALLEL_ABORT ||
+ event == XACT_EVENT_ABORT);
+ pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
+ true);
+ }
}
/*
ConnCacheEntry *entry;
int curlevel;
List *pending_entries = NIL;
+ List *cancel_requested = NIL;
/* Nothing to do at subxact start, nor after commit. */
if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
else
{
/* Rollback all remote subtransactions during abort */
- pgfdw_abort_cleanup(entry, false);
+ if (entry->parallel_abort)
+ {
+ if (pgfdw_abort_cleanup_begin(entry, false,
+ &pending_entries,
+ &cancel_requested))
+ continue;
+ }
+ else
+ pgfdw_abort_cleanup(entry, false);
}
/* OK, we're outta that level of subtransaction */
}
/* If there are any pending connections, finish cleaning them up */
- if (pending_entries)
+ if (pending_entries || cancel_requested)
{
- Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB);
- pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
+ if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
+ {
+ Assert(cancel_requested == NIL);
+ pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
+ }
+ else
+ {
+ Assert(event == SUBXACT_EVENT_ABORT_SUB);
+ pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
+ false);
+ }
}
}
static bool
pgfdw_cancel_query(PGconn *conn)
{
- PGcancel *cancel;
- char errbuf[256];
- PGresult *result = NULL;
TimestampTz endtime;
- bool timed_out;
/*
* If it takes too long to cancel the query and discard the result, assume
* the connection is dead.
*/
- endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+
+ if (!pgfdw_cancel_query_begin(conn))
+ return false;
+ return pgfdw_cancel_query_end(conn, endtime, false);
+}
+
+static bool
+pgfdw_cancel_query_begin(PGconn *conn)
+{
+ PGcancel *cancel;
+ char errbuf[256];
/*
* Issue cancel request. Unfortunately, there's no good way to limit the
PQfreeCancel(cancel);
}
+ return true;
+}
+
+static bool
+pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
+{
+ PGresult *result = NULL;
+ bool timed_out;
+
+ /*
+ * If requested, consume whatever data is available from the socket. (Note
+ * that if all data is available, this allows pgfdw_get_cleanup_result to
+ * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
+ * which would be large compared to the overhead of PQconsumeInput.)
+ */
+ if (consume_input && !PQconsumeInput(conn))
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not get result of cancel request: %s",
+ pchomp(PQerrorMessage(conn)))));
+ return false;
+ }
+
/* Get and discard the result of the query. */
if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
{
static bool
pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
{
- PGresult *result = NULL;
TimestampTz endtime;
- bool timed_out;
/*
* If it takes too long to execute a cleanup query, assume the connection
* place (e.g. statement timeout, user cancel), so the timeout shouldn't
* be too long.
*/
- endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+
+ if (!pgfdw_exec_cleanup_query_begin(conn, query))
+ return false;
+ return pgfdw_exec_cleanup_query_end(conn, query, endtime,
+ false, ignore_errors);
+}
+static bool
+pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
+{
/*
* Submit a query. Since we don't use non-blocking mode, this also can
* block. But its risk is relatively small, so we ignore that for now.
return false;
}
+ return true;
+}
+
+static bool
+pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
+ TimestampTz endtime, bool consume_input,
+ bool ignore_errors)
+{
+ PGresult *result = NULL;
+ bool timed_out;
+
+ /*
+ * If requested, consume whatever data is available from the socket. (Note
+ * that if all data is available, this allows pgfdw_get_cleanup_result to
+ * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
+ * which would be large compared to the overhead of PQconsumeInput.)
+ */
+ if (consume_input && !PQconsumeInput(conn))
+ {
+ pgfdw_report_error(WARNING, NULL, conn, false, query);
+ return false;
+ }
+
/* Get the result of the query. */
if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
{
!pgfdw_cancel_query(entry->conn))
return; /* Unable to cancel running query */
- if (toplevel)
- snprintf(sql, sizeof(sql), "ABORT TRANSACTION");
- else
- snprintf(sql, sizeof(sql),
- "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
- entry->xact_depth, entry->xact_depth);
+ CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
return; /* Unable to abort remote (sub)transaction */
entry->changing_xact_state = false;
}
+/*
+ * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
+ * don't wait for the result.
+ *
+ * Returns true if the abort command or cancel request is successfully issued,
+ * false otherwise. If the abort command is successfully issued, the given
+ * connection cache entry is appended to *pending_entries. Othewise, if the
+ * cancel request is successfully issued, it is appended to *cancel_requested.
+ */
+static bool
+pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
+ List **pending_entries, List **cancel_requested)
+{
+ /*
+ * Don't try to clean up the connection if we're already in error
+ * recursion trouble.
+ */
+ if (in_error_recursion_trouble())
+ entry->changing_xact_state = true;
+
+ /*
+ * If connection is already unsalvageable, don't touch it further.
+ */
+ if (entry->changing_xact_state)
+ return false;
+
+ /*
+ * Mark this connection as in the process of changing transaction state.
+ */
+ entry->changing_xact_state = true;
+
+ /* Assume we might have lost track of prepared statements */
+ entry->have_error = true;
+
+ /*
+ * If a command has been submitted to the remote server by using an
+ * asynchronous execution function, the command might not have yet
+ * completed. Check to see if a command is still being processed by the
+ * remote server, and if so, request cancellation of the command.
+ */
+ if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+ {
+ if (!pgfdw_cancel_query_begin(entry->conn))
+ return false; /* Unable to cancel running query */
+ *cancel_requested = lappend(*cancel_requested, entry);
+ }
+ else
+ {
+ char sql[100];
+
+ CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
+ if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
+ return false; /* Unable to abort remote transaction */
+ *pending_entries = lappend(*pending_entries, entry);
+ }
+
+ return true;
+}
+
/*
* Finish pre-commit cleanup of connections on each of which we've sent a
* COMMIT command to the remote server.
}
}
+/*
+ * Finish abort cleanup of connections on each of which we've sent an abort
+ * command or cancel request to the remote server.
+ */
+static void
+pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
+ bool toplevel)
+{
+ List *pending_deallocs = NIL;
+ ListCell *lc;
+
+ /*
+ * For each of the pending cancel requests (if any), get and discard the
+ * result of the query, and submit an abort command to the remote server.
+ */
+ if (cancel_requested)
+ {
+ foreach(lc, cancel_requested)
+ {
+ ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
+ TimestampTz endtime;
+ char sql[100];
+
+ Assert(entry->changing_xact_state);
+
+ /*
+ * Set end time. You might think we should do this before issuing
+ * cancel request like in normal mode, but that is problematic,
+ * because if, for example, it took longer than 30 seconds to
+ * process the first few entries in the cancel_requested list, it
+ * would cause a timeout error when processing each of the
+ * remaining entries in the list, leading to slamming that entry's
+ * connection shut.
+ */
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+
+ if (!pgfdw_cancel_query_end(entry->conn, endtime, true))
+ {
+ /* Unable to cancel running query */
+ pgfdw_reset_xact_state(entry, toplevel);
+ continue;
+ }
+
+ /* Send an abort command in parallel if needed */
+ CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
+ if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
+ {
+ /* Unable to abort remote (sub)transaction */
+ pgfdw_reset_xact_state(entry, toplevel);
+ }
+ else
+ pending_entries = lappend(pending_entries, entry);
+ }
+ }
+
+ /* No further work if no pending entries */
+ if (!pending_entries)
+ return;
+
+ /*
+ * Get the result of the abort command for each of the pending entries
+ */
+ foreach(lc, pending_entries)
+ {
+ ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
+ TimestampTz endtime;
+ char sql[100];
+
+ Assert(entry->changing_xact_state);
+
+ /*
+ * Set end time. We do this now, not before issuing the command like
+ * in normal mode, for the same reason as for the cancel_requested
+ * entries.
+ */
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+
+ CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
+ if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
+ true, false))
+ {
+ /* Unable to abort remote (sub)transaction */
+ pgfdw_reset_xact_state(entry, toplevel);
+ continue;
+ }
+
+ if (toplevel)
+ {
+ /* Do a DEALLOCATE ALL in parallel if needed */
+ if (entry->have_prep_stmt && entry->have_error)
+ {
+ if (!pgfdw_exec_cleanup_query_begin(entry->conn,
+ "DEALLOCATE ALL"))
+ {
+ /* Trouble clearing prepared statements */
+ pgfdw_reset_xact_state(entry, toplevel);
+ }
+ else
+ pending_deallocs = lappend(pending_deallocs, entry);
+ continue;
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
+ }
+
+ /* Reset the per-connection state if needed */
+ if (entry->state.pendingAreq)
+ memset(&entry->state, 0, sizeof(entry->state));
+
+ /* We're done with this entry; unset the changing_xact_state flag */
+ entry->changing_xact_state = false;
+ pgfdw_reset_xact_state(entry, toplevel);
+ }
+
+ /* No further work if no pending entries */
+ if (!pending_deallocs)
+ return;
+ Assert(toplevel);
+
+ /*
+ * Get the result of the DEALLOCATE command for each of the pending
+ * entries
+ */
+ foreach(lc, pending_deallocs)
+ {
+ ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
+ TimestampTz endtime;
+
+ Assert(entry->changing_xact_state);
+ Assert(entry->have_prep_stmt);
+ Assert(entry->have_error);
+
+ /*
+ * Set end time. We do this now, not before issuing the command like
+ * in normal mode, for the same reason as for the cancel_requested
+ * entries.
+ */
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+
+ if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
+ endtime, true, true))
+ {
+ /* Trouble clearing prepared statements */
+ pgfdw_reset_xact_state(entry, toplevel);
+ continue;
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
+
+ /* Reset the per-connection state if needed */
+ if (entry->state.pendingAreq)
+ memset(&entry->state, 0, sizeof(entry->state));
+
+ /* We're done with this entry; unset the changing_xact_state flag */
+ entry->changing_xact_state = false;
+ pgfdw_reset_xact_state(entry, toplevel);
+ }
+}
+
/*
* List active foreign server connections.
*