</para>
</step>
+ <step>
+ <title>Prepare for publisher upgrades</title>
+
+ <para>
+ <application>pg_upgrade</application> attempts to migrate logical
+ slots. This helps avoid the need for manually defining the same
+ logical slots on the new publisher. Migration of logical slots is
+ only supported when the old cluster is version 17.0 or later.
+ Logical slots on clusters before version 17.0 will silently be
+ ignored.
+ </para>
+
+ <para>
+ Before you start upgrading the publisher cluster, ensure that the
+ subscription is temporarily disabled, by executing
+ <link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... DISABLE</command></link>.
+ Re-enable the subscription after the upgrade.
+ </para>
+
+ <para>
+ There are some prerequisites for <application>pg_upgrade</application> to
+ be able to upgrade the logical slots. If these are not met an error
+ will be reported.
+ </para>
+
+ <itemizedlist>
+ <listitem>
+ <para>
+ The new cluster must have
+ <link linkend="guc-wal-level"><varname>wal_level</varname></link> as
+ <literal>logical</literal>.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ The new cluster must have
+ <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
+ configured to a value greater than or equal to the number of slots
+ present in the old cluster.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ The output plugins referenced by the slots on the old cluster must be
+ installed in the new PostgreSQL executable directory.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ The old cluster has replicated all the transactions and logical decoding
+ messages to subscribers.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ All slots on the old cluster must be usable, i.e., there are no slots
+ whose
+ <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>conflicting</structfield>
+ is <literal>true</literal>.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ The new cluster must not have permanent logical slots, i.e.,
+ there must be no slots where
+ <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>temporary</structfield>
+ is <literal>false</literal>.
+ </para>
+ </listitem>
+ </itemizedlist>
+
+ </step>
+
<step>
<title>Stop both servers</title>
Configure the servers for log shipping. (You do not need to run
<function>pg_backup_start()</function> and <function>pg_backup_stop()</function>
or take a file system backup as the standbys are still synchronized
- with the primary.) Replication slots are not copied and must
- be recreated.
+ with the primary.) Only logical slots on the primary are copied to the
+ new standby, but other slots on the old standby are not copied so must
+ be recreated manually.
</para>
</step>
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
- /*
- * If we don't have snapshot or we are just fast-forwarding, there is no
- * point in decoding messages.
- */
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
- ctx->fast_forward)
+ /* If we don't have snapshot, there is no point in decoding messages */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
message = (xl_logical_message *) XLogRecGetData(r);
SnapBuildXactNeedsSkip(builder, buf->origptr)))
return;
+ /*
+ * We also skip decoding in fast_forward mode. This check must be last
+ * because we don't want to set the processing_required flag unless we
+ * have a decodable message.
+ */
+ if (ctx->fast_forward)
+ {
+ /*
+ * We need to set processing_required flag to notify the message's
+ * existence to the caller. Usually, the flag is set when either the
+ * COMMIT or ABORT records are decoded, but this must be turned on
+ * here because the non-transactional logical message is decoded
+ * without waiting for these records.
+ */
+ if (!message->transactional)
+ ctx->processing_required = true;
+
+ return;
+ }
+
/*
* If this is a non-transactional change, get the snapshot we're expected
* to use. We only get here when the snapshot is consistent, and the
DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
Oid txn_dbid, RepOriginId origin_id)
{
- return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
- (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
- ctx->fast_forward || FilterByOrigin(ctx, origin_id));
+ if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+ (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
+ FilterByOrigin(ctx, origin_id))
+ return true;
+
+ /*
+ * We also skip decoding in fast_forward mode. In passing set the
+ * processing_required flag to indicate that if it were not for
+ * fast_forward mode, processing would have been required.
+ */
+ if (ctx->fast_forward)
+ {
+ ctx->processing_required = true;
+ return true;
+ }
+
+ return false;
}
#include "postgres.h"
#include "access/xact.h"
+#include "access/xlogutils.h"
#include "access/xlog_internal.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
+#include "utils/inval.h"
#include "utils/memutils.h"
/* data for errcontext callback */
rb->totalTxns = 0;
rb->totalBytes = 0;
}
+
+/*
+ * Read up to the end of WAL starting from the decoding slot's restart_lsn.
+ * Return true if any meaningful/decodable WAL records are encountered,
+ * otherwise false.
+ */
+bool
+LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
+{
+ bool has_pending_wal = false;
+
+ Assert(MyReplicationSlot);
+
+ PG_TRY();
+ {
+ LogicalDecodingContext *ctx;
+
+ /*
+ * Create our decoding context in fast_forward mode, passing start_lsn
+ * as InvalidXLogRecPtr, so that we start processing from the slot's
+ * confirmed_flush.
+ */
+ ctx = CreateDecodingContext(InvalidXLogRecPtr,
+ NIL,
+ true, /* fast_forward */
+ XL_ROUTINE(.page_read = read_local_xlog_page,
+ .segment_open = wal_segment_open,
+ .segment_close = wal_segment_close),
+ NULL, NULL, NULL);
+
+ /*
+ * Start reading at the slot's restart_lsn, which we know points to a
+ * valid record.
+ */
+ XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
+
+ /* Invalidate non-timetravel entries */
+ InvalidateSystemCaches();
+
+ /* Loop until the end of WAL or some changes are processed */
+ while (!has_pending_wal && ctx->reader->EndRecPtr < end_of_wal)
+ {
+ XLogRecord *record;
+ char *errm = NULL;
+
+ record = XLogReadRecord(ctx->reader, &errm);
+
+ if (errm)
+ elog(ERROR, "could not find record for logical decoding: %s", errm);
+
+ if (record != NULL)
+ LogicalDecodingProcessRecord(ctx, ctx->reader);
+
+ has_pending_wal = ctx->processing_required;
+
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ /* Clean up */
+ FreeDecodingContext(ctx);
+ InvalidateSystemCaches();
+ }
+ PG_CATCH();
+ {
+ /* clear all timetravel entries */
+ InvalidateSystemCaches();
+
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ return has_pending_wal;
+}
SpinLockRelease(&s->mutex);
+ /*
+ * The logical replication slots shouldn't be invalidated as
+ * max_slot_wal_keep_size GUC is set to -1 during the upgrade.
+ *
+ * The following is just a sanity check.
+ */
+ if (*invalidated && SlotIsLogical(s) && IsBinaryUpgrade)
+ {
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("replication slots must not be invalidated during the upgrade"),
+ errhint("\"max_slot_wal_keep_size\" must be set to -1 during the upgrade"));
+ }
+
if (active_pid != 0)
{
/*
#include "catalog/pg_type.h"
#include "commands/extension.h"
#include "miscadmin.h"
+#include "replication/logical.h"
#include "utils/array.h"
#include "utils/builtins.h"
PG_RETURN_VOID();
}
+
+/*
+ * Verify the given slot has already consumed all the WAL changes.
+ *
+ * Returns true if there are no decodable WAL records after the
+ * confirmed_flush_lsn. Otherwise false.
+ *
+ * This is a special purpose function to ensure that the given slot can be
+ * upgraded without data loss.
+ */
+Datum
+binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
+{
+ Name slot_name;
+ XLogRecPtr end_of_wal;
+ bool found_pending_wal;
+
+ CHECK_IS_BINARY_UPGRADE;
+
+ /* We must check before dereferencing the argument */
+ if (PG_ARGISNULL(0))
+ elog(ERROR, "null argument to binary_upgrade_validate_wal_records is not allowed");
+
+ CheckSlotPermissions();
+
+ slot_name = PG_GETARG_NAME(0);
+
+ /* Acquire the given slot */
+ ReplicationSlotAcquire(NameStr(*slot_name), true);
+
+ Assert(SlotIsLogical(MyReplicationSlot));
+
+ /* Slots must be valid as otherwise we won't be able to scan the WAL */
+ Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
+
+ end_of_wal = GetFlushRecPtr(NULL);
+ found_pending_wal = LogicalReplicationSlotHasPendingWal(end_of_wal);
+
+ /* Clean up */
+ ReplicationSlotRelease();
+
+ PG_RETURN_BOOL(!found_pending_wal);
+}
PGFILEDESC = "pg_upgrade - an in-place binary upgrade utility"
PGAPPICON = win32
+# required for 003_upgrade_logical_replication_slots.pl
+EXTRA_INSTALL=contrib/test_decoding
+
subdir = src/bin/pg_upgrade
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
static void check_for_pg_role_prefix(ClusterInfo *cluster);
static void check_for_new_tablespace_dir(void);
static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
+static void check_new_cluster_logical_replication_slots(void);
+static void check_old_cluster_for_valid_slots(bool live_check);
/*
if (!live_check)
start_postmaster(&old_cluster, true);
- /* Extract a list of databases and tables from the old cluster */
- get_db_and_rel_infos(&old_cluster);
+ /*
+ * Extract a list of databases, tables, and logical replication slots from
+ * the old cluster.
+ */
+ get_db_rel_and_slot_infos(&old_cluster, live_check);
init_tablespaces();
check_for_reg_data_type_usage(&old_cluster);
check_for_isn_and_int8_passing_mismatch(&old_cluster);
+ /*
+ * Logical replication slots can be migrated since PG17. See comments atop
+ * get_old_cluster_logical_slot_infos().
+ */
+ if (GET_MAJOR_VERSION(old_cluster.major_version) >= 1700)
+ check_old_cluster_for_valid_slots(live_check);
+
/*
* PG 16 increased the size of the 'aclitem' type, which breaks the
* on-disk format for existing data.
void
check_new_cluster(void)
{
- get_db_and_rel_infos(&new_cluster);
+ get_db_rel_and_slot_infos(&new_cluster, false);
check_new_cluster_is_empty();
check_for_prepared_transactions(&new_cluster);
check_for_new_tablespace_dir();
+
+ check_new_cluster_logical_replication_slots();
}
else
check_ok();
}
+
+/*
+ * check_new_cluster_logical_replication_slots()
+ *
+ * Verify that there are no logical replication slots on the new cluster and
+ * that the parameter settings necessary for creating slots are sufficient.
+ */
+static void
+check_new_cluster_logical_replication_slots(void)
+{
+ PGresult *res;
+ PGconn *conn;
+ int nslots_on_old;
+ int nslots_on_new;
+ int max_replication_slots;
+ char *wal_level;
+
+ /* Logical slots can be migrated since PG17. */
+ if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
+ return;
+
+ nslots_on_old = count_old_cluster_logical_slots();
+
+ /* Quick return if there are no logical slots to be migrated. */
+ if (nslots_on_old == 0)
+ return;
+
+ conn = connectToServer(&new_cluster, "template1");
+
+ prep_status("Checking for new cluster logical replication slots");
+
+ res = executeQueryOrDie(conn, "SELECT count(*) "
+ "FROM pg_catalog.pg_replication_slots "
+ "WHERE slot_type = 'logical' AND "
+ "temporary IS FALSE;");
+
+ if (PQntuples(res) != 1)
+ pg_fatal("could not count the number of logical replication slots");
+
+ nslots_on_new = atoi(PQgetvalue(res, 0, 0));
+
+ if (nslots_on_new)
+ pg_fatal("Expected 0 logical replication slots but found %d.",
+ nslots_on_new);
+
+ PQclear(res);
+
+ res = executeQueryOrDie(conn, "SELECT setting FROM pg_settings "
+ "WHERE name IN ('wal_level', 'max_replication_slots') "
+ "ORDER BY name DESC;");
+
+ if (PQntuples(res) != 2)
+ pg_fatal("could not determine parameter settings on new cluster");
+
+ wal_level = PQgetvalue(res, 0, 0);
+
+ if (strcmp(wal_level, "logical") != 0)
+ pg_fatal("wal_level must be \"logical\", but is set to \"%s\"",
+ wal_level);
+
+ max_replication_slots = atoi(PQgetvalue(res, 1, 0));
+
+ if (nslots_on_old > max_replication_slots)
+ pg_fatal("max_replication_slots (%d) must be greater than or equal to the number of "
+ "logical replication slots (%d) on the old cluster",
+ max_replication_slots, nslots_on_old);
+
+ PQclear(res);
+ PQfinish(conn);
+
+ check_ok();
+}
+
+/*
+ * check_old_cluster_for_valid_slots()
+ *
+ * Verify that all the logical slots are valid and have consumed all the WAL
+ * before shutdown.
+ */
+static void
+check_old_cluster_for_valid_slots(bool live_check)
+{
+ char output_path[MAXPGPATH];
+ FILE *script = NULL;
+
+ prep_status("Checking for valid logical replication slots");
+
+ snprintf(output_path, sizeof(output_path), "%s/%s",
+ log_opts.basedir,
+ "invalid_logical_replication_slots.txt");
+
+ for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+ {
+ LogicalSlotInfoArr *slot_arr = &old_cluster.dbarr.dbs[dbnum].slot_arr;
+
+ for (int slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+ {
+ LogicalSlotInfo *slot = &slot_arr->slots[slotnum];
+
+ /* Is the slot usable? */
+ if (slot->invalid)
+ {
+ if (script == NULL &&
+ (script = fopen_priv(output_path, "w")) == NULL)
+ pg_fatal("could not open file \"%s\": %s",
+ output_path, strerror(errno));
+
+ fprintf(script, "The slot \"%s\" is invalid\n",
+ slot->slotname);
+
+ continue;
+ }
+
+ /*
+ * Do additional check to ensure that all logical replication
+ * slots have consumed all the WAL before shutdown.
+ *
+ * Note: This can be satisfied only when the old cluster has been
+ * shut down, so we skip this for live checks.
+ */
+ if (!live_check && !slot->caught_up)
+ {
+ if (script == NULL &&
+ (script = fopen_priv(output_path, "w")) == NULL)
+ pg_fatal("could not open file \"%s\": %s",
+ output_path, strerror(errno));
+
+ fprintf(script,
+ "The slot \"%s\" has not consumed the WAL yet\n",
+ slot->slotname);
+ }
+ }
+ }
+
+ if (script)
+ {
+ fclose(script);
+
+ pg_log(PG_REPORT, "fatal");
+ pg_fatal("Your installation contains logical replication slots that can't be upgraded.\n"
+ "You can remove invalid slots and/or consume the pending WAL for other slots,\n"
+ "and then restart the upgrade.\n"
+ "A list of the problematic slots is in the file:\n"
+ " %s", output_path);
+ }
+
+ check_ok();
+}
/*
* get_loadable_libraries()
*
- * Fetch the names of all old libraries containing C-language functions.
+ * Fetch the names of all old libraries containing either C-language functions
+ * or are corresponding to logical replication output plugins.
+ *
* We will later check that they all exist in the new installation.
*/
void
PGresult **ress;
int totaltups;
int dbnum;
+ int n_libinfos;
ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult *));
totaltups = 0;
PQfinish(conn);
}
- os_info.libraries = (LibraryInfo *) pg_malloc(totaltups * sizeof(LibraryInfo));
+ /*
+ * Allocate memory for required libraries and logical replication output
+ * plugins.
+ */
+ n_libinfos = totaltups + count_old_cluster_logical_slots();
+ os_info.libraries = (LibraryInfo *) pg_malloc(sizeof(LibraryInfo) * n_libinfos);
totaltups = 0;
for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
PGresult *res = ress[dbnum];
int ntups;
int rowno;
+ LogicalSlotInfoArr *slot_arr = &old_cluster.dbarr.dbs[dbnum].slot_arr;
ntups = PQntuples(res);
for (rowno = 0; rowno < ntups; rowno++)
totaltups++;
}
PQclear(res);
+
+ /*
+ * Store the names of output plugins as well. There is a possibility
+ * that duplicated plugins are set, but the consumer function
+ * check_loadable_libraries() will avoid checking the same library, so
+ * we do not have to consider their uniqueness here.
+ */
+ for (int slotno = 0; slotno < slot_arr->nslots; slotno++)
+ {
+ if (slot_arr->slots[slotno].invalid)
+ continue;
+
+ os_info.libraries[totaltups].name = pg_strdup(slot_arr->slots[slotno].plugin);
+ os_info.libraries[totaltups].dbnum = dbnum;
+
+ totaltups++;
+ }
}
pg_free(ress);
static void free_rel_infos(RelInfoArr *rel_arr);
static void print_db_infos(DbInfoArr *db_arr);
static void print_rel_infos(RelInfoArr *rel_arr);
+static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
+static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check);
/*
}
/*
- * get_db_and_rel_infos()
+ * get_db_rel_and_slot_infos()
*
* higher level routine to generate dbinfos for the database running
* on the given "port". Assumes that server is already running.
+ *
+ * live_check would be used only when the target is the old cluster.
*/
void
-get_db_and_rel_infos(ClusterInfo *cluster)
+get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check)
{
int dbnum;
get_db_infos(cluster);
for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
- get_rel_infos(cluster, &cluster->dbarr.dbs[dbnum]);
+ {
+ DbInfo *pDbInfo = &cluster->dbarr.dbs[dbnum];
+
+ get_rel_infos(cluster, pDbInfo);
+
+ /*
+ * Retrieve the logical replication slots infos for the old cluster.
+ */
+ if (cluster == &old_cluster)
+ get_old_cluster_logical_slot_infos(pDbInfo, live_check);
+ }
if (cluster == &old_cluster)
pg_log(PG_VERBOSE, "\nsource databases:");
dbinfo->rel_arr.nrels = num_rels;
}
+/*
+ * get_old_cluster_logical_slot_infos()
+ *
+ * Gets the LogicalSlotInfos for all the logical replication slots of the
+ * database referred to by "dbinfo". The status of each logical slot is gotten
+ * here, but they are used at the checking phase. See
+ * check_old_cluster_for_valid_slots().
+ *
+ * Note: This function will not do anything if the old cluster is pre-PG17.
+ * This is because before that the logical slots are not saved at shutdown, so
+ * there is no guarantee that the latest confirmed_flush_lsn is saved to disk
+ * which can lead to data loss. It is still not guaranteed for manually created
+ * slots in PG17, so subsequent checks done in
+ * check_old_cluster_for_valid_slots() would raise a FATAL error if such slots
+ * are included.
+ */
+static void
+get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
+{
+ PGconn *conn;
+ PGresult *res;
+ LogicalSlotInfo *slotinfos = NULL;
+ int num_slots = 0;
+
+ /* Logical slots can be migrated since PG17. */
+ if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
+ {
+ dbinfo->slot_arr.slots = slotinfos;
+ dbinfo->slot_arr.nslots = num_slots;
+ return;
+ }
+
+ conn = connectToServer(&old_cluster, dbinfo->db_name);
+
+ /*
+ * Fetch the logical replication slot information. The check whether the
+ * slot is considered caught up is done by an upgrade function. This
+ * regards the slot as caught up if we don't find any decodable changes.
+ * See binary_upgrade_logical_slot_has_caught_up().
+ *
+ * Note that we can't ensure whether the slot is caught up during
+ * live_check as the new WAL records could be generated.
+ *
+ * We intentionally skip checking the WALs for invalidated slots as the
+ * corresponding WALs could have been removed for such slots.
+ *
+ * The temporary slots are explicitly ignored while checking because such
+ * slots cannot exist after the upgrade. During the upgrade, clusters are
+ * started and stopped several times causing any temporary slots to be
+ * removed.
+ */
+ res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, "
+ "%s as caught_up, conflicting as invalid "
+ "FROM pg_catalog.pg_replication_slots "
+ "WHERE slot_type = 'logical' AND "
+ "database = current_database() AND "
+ "temporary IS FALSE;",
+ live_check ? "FALSE" :
+ "(CASE WHEN conflicting THEN FALSE "
+ "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
+ "END)");
+
+ num_slots = PQntuples(res);
+
+ if (num_slots)
+ {
+ int i_slotname;
+ int i_plugin;
+ int i_twophase;
+ int i_caught_up;
+ int i_invalid;
+
+ slotinfos = (LogicalSlotInfo *) pg_malloc(sizeof(LogicalSlotInfo) * num_slots);
+
+ i_slotname = PQfnumber(res, "slot_name");
+ i_plugin = PQfnumber(res, "plugin");
+ i_twophase = PQfnumber(res, "two_phase");
+ i_caught_up = PQfnumber(res, "caught_up");
+ i_invalid = PQfnumber(res, "invalid");
+
+ for (int slotnum = 0; slotnum < num_slots; slotnum++)
+ {
+ LogicalSlotInfo *curr = &slotinfos[slotnum];
+
+ curr->slotname = pg_strdup(PQgetvalue(res, slotnum, i_slotname));
+ curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin));
+ curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0);
+ curr->caught_up = (strcmp(PQgetvalue(res, slotnum, i_caught_up), "t") == 0);
+ curr->invalid = (strcmp(PQgetvalue(res, slotnum, i_invalid), "t") == 0);
+ }
+ }
+
+ PQclear(res);
+ PQfinish(conn);
+
+ dbinfo->slot_arr.slots = slotinfos;
+ dbinfo->slot_arr.nslots = num_slots;
+}
+
+
+/*
+ * count_old_cluster_logical_slots()
+ *
+ * Returns the number of logical replication slots for all databases.
+ *
+ * Note: this function always returns 0 if the old_cluster is PG16 and prior
+ * because we gather slot information only for cluster versions greater than or
+ * equal to PG17. See get_old_cluster_logical_slot_infos().
+ */
+int
+count_old_cluster_logical_slots(void)
+{
+ int slot_count = 0;
+
+ for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+ slot_count += old_cluster.dbarr.dbs[dbnum].slot_arr.nslots;
+
+ return slot_count;
+}
static void
free_db_and_rel_infos(DbInfoArr *db_arr)
for (dbnum = 0; dbnum < db_arr->ndbs; dbnum++)
{
- pg_log(PG_VERBOSE, "Database: \"%s\"", db_arr->dbs[dbnum].db_name);
- print_rel_infos(&db_arr->dbs[dbnum].rel_arr);
+ DbInfo *pDbInfo = &db_arr->dbs[dbnum];
+
+ pg_log(PG_VERBOSE, "Database: \"%s\"", pDbInfo->db_name);
+ print_rel_infos(&pDbInfo->rel_arr);
+ print_slot_infos(&pDbInfo->slot_arr);
}
}
rel_arr->rels[relnum].reloid,
rel_arr->rels[relnum].tablespace);
}
+
+static void
+print_slot_infos(LogicalSlotInfoArr *slot_arr)
+{
+ /* Quick return if there are no logical slots. */
+ if (slot_arr->nslots == 0)
+ return;
+
+ pg_log(PG_VERBOSE, "Logical replication slots within the database:");
+
+ for (int slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+ {
+ LogicalSlotInfo *slot_info = &slot_arr->slots[slotnum];
+
+ pg_log(PG_VERBOSE, "slot_name: \"%s\", plugin: \"%s\", two_phase: %s",
+ slot_info->slotname,
+ slot_info->plugin,
+ slot_info->two_phase ? "true" : "false");
+ }
+}
'tests': [
't/001_basic.pl',
't/002_pg_upgrade.pl',
+ 't/003_upgrade_logical_replication_slots.pl',
],
'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
},
static void set_frozenxids(bool minmxid_only);
static void make_outputdirs(char *pgdata);
static void setup(char *argv0, bool *live_check);
+static void create_logical_replication_slots(void);
ClusterInfo old_cluster,
new_cluster;
new_cluster.pgdata);
check_ok();
+ /*
+ * Migrate the logical slots to the new cluster. Note that we need to do
+ * this after resetting WAL because otherwise the required WAL would be
+ * removed and slots would become unusable. There is a possibility that
+ * background processes might generate some WAL before we could create the
+ * slots in the new cluster but we can ignore that WAL as that won't be
+ * required downstream.
+ */
+ if (count_old_cluster_logical_slots())
+ {
+ start_postmaster(&new_cluster, true);
+ create_logical_replication_slots();
+ stop_postmaster(false);
+ }
+
if (user_opts.do_sync)
{
prep_status("Sync data directory to disk");
set_frozenxids(true);
/* update new_cluster info now that we have objects in the databases */
- get_db_and_rel_infos(&new_cluster);
+ get_db_rel_and_slot_infos(&new_cluster, false);
}
/*
check_ok();
}
+
+/*
+ * create_logical_replication_slots()
+ *
+ * Similar to create_new_objects() but only restores logical replication slots.
+ */
+static void
+create_logical_replication_slots(void)
+{
+ prep_status_progress("Restoring logical replication slots in the new cluster");
+
+ for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+ {
+ DbInfo *old_db = &old_cluster.dbarr.dbs[dbnum];
+ LogicalSlotInfoArr *slot_arr = &old_db->slot_arr;
+ PGconn *conn;
+ PQExpBuffer query;
+
+ /* Skip this database if there are no slots */
+ if (slot_arr->nslots == 0)
+ continue;
+
+ conn = connectToServer(&new_cluster, old_db->db_name);
+ query = createPQExpBuffer();
+
+ pg_log(PG_STATUS, "%s", old_db->db_name);
+
+ for (int slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+ {
+ LogicalSlotInfo *slot_info = &slot_arr->slots[slotnum];
+
+ /* Constructs a query for creating logical replication slots */
+ appendPQExpBuffer(query,
+ "SELECT * FROM "
+ "pg_catalog.pg_create_logical_replication_slot(");
+ appendStringLiteralConn(query, slot_info->slotname, conn);
+ appendPQExpBuffer(query, ", ");
+ appendStringLiteralConn(query, slot_info->plugin, conn);
+ appendPQExpBuffer(query, ", false, %s);",
+ slot_info->two_phase ? "true" : "false");
+
+ PQclear(executeQueryOrDie(conn, "%s", query->data));
+
+ resetPQExpBuffer(query);
+ }
+
+ PQfinish(conn);
+
+ destroyPQExpBuffer(query);
+ }
+
+ end_progress_output();
+ check_ok();
+
+ return;
+}
int nrels;
} RelInfoArr;
+/*
+ * Structure to store logical replication slot information.
+ */
+typedef struct
+{
+ char *slotname; /* slot name */
+ char *plugin; /* plugin */
+ bool two_phase; /* can the slot decode 2PC? */
+ bool caught_up; /* has the slot caught up to latest changes? */
+ bool invalid; /* if true, the slot is unusable */
+} LogicalSlotInfo;
+
+typedef struct
+{
+ int nslots; /* number of logical slot infos */
+ LogicalSlotInfo *slots; /* array of logical slot infos */
+} LogicalSlotInfoArr;
+
/*
* The following structure represents a relation mapping.
*/
char db_tablespace[MAXPGPATH]; /* database default tablespace
* path */
RelInfoArr rel_arr; /* array of all user relinfos */
+ LogicalSlotInfoArr slot_arr; /* array of all LogicalSlotInfo */
} DbInfo;
/*
FileNameMap *gen_db_file_maps(DbInfo *old_db,
DbInfo *new_db, int *nmaps, const char *old_pgdata,
const char *new_pgdata);
-void get_db_and_rel_infos(ClusterInfo *cluster);
+void get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check);
+int count_old_cluster_logical_slots(void);
/* option.c */
PGconn *conn;
bool pg_ctl_return = false;
char socket_string[MAXPGPATH + 200];
+ PQExpBufferData pgoptions;
static bool exit_hook_registered = false;
cluster->sockdir);
#endif
+ initPQExpBuffer(&pgoptions);
+
/*
- * Use -b to disable autovacuum.
+ * Construct a parameter string which is passed to the server process.
*
* Turn off durability requirements to improve object creation speed, and
* we only modify the new cluster, so only use it there. If there is a
* crash, the new cluster has to be recreated anyway. fsync=off is a big
* win on ext4.
*/
+ if (cluster == &new_cluster)
+ appendPQExpBufferStr(&pgoptions, " -c synchronous_commit=off -c fsync=off -c full_page_writes=off");
+
+ /*
+ * Use max_slot_wal_keep_size as -1 to prevent the WAL removal by the
+ * checkpointer process. If WALs required by logical replication slots
+ * are removed, the slots are unusable. This setting prevents the
+ * invalidation of slots during the upgrade. We set this option when
+ * cluster is PG17 or later because logical replication slots can only be
+ * migrated since then. Besides, max_slot_wal_keep_size is added in PG13.
+ */
+ if (GET_MAJOR_VERSION(cluster->major_version) >= 1700)
+ appendPQExpBufferStr(&pgoptions, " -c max_slot_wal_keep_size=-1");
+
+ /* Use -b to disable autovacuum. */
snprintf(cmd, sizeof(cmd),
"\"%s/pg_ctl\" -w -l \"%s/%s\" -D \"%s\" -o \"-p %d -b%s %s%s\" start",
cluster->bindir,
log_opts.logdir,
SERVER_LOG_FILE, cluster->pgconfig, cluster->port,
- (cluster == &new_cluster) ?
- " -c synchronous_commit=off -c fsync=off -c full_page_writes=off" : "",
+ pgoptions.data,
cluster->pgopts ? cluster->pgopts : "", socket_string);
+ termPQExpBuffer(&pgoptions);
+
/*
* Don't throw an error right away, let connecting throw the error because
* it might supply a reason for the failure.
--- /dev/null
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Tests for upgrading logical replication slots
+
+use strict;
+use warnings;
+
+use File::Find qw(find);
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Can be changed to test the other modes
+my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
+
+# Initialize old cluster
+my $old_publisher = PostgreSQL::Test::Cluster->new('old_publisher');
+$old_publisher->init(allows_streaming => 'logical');
+
+# Initialize new cluster
+my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
+$new_publisher->init(allows_streaming => 'logical');
+
+# Setup a pg_upgrade command. This will be used anywhere.
+my @pg_upgrade_cmd = (
+ 'pg_upgrade', '--no-sync',
+ '-d', $old_publisher->data_dir,
+ '-D', $new_publisher->data_dir,
+ '-b', $old_publisher->config_data('--bindir'),
+ '-B', $new_publisher->config_data('--bindir'),
+ '-s', $new_publisher->host,
+ '-p', $old_publisher->port,
+ '-P', $new_publisher->port,
+ $mode);
+
+# ------------------------------
+# TEST: Confirm pg_upgrade fails when the new cluster has wrong GUC values
+
+# Preparations for the subsequent test:
+# 1. Create two slots on the old cluster
+$old_publisher->start;
+$old_publisher->safe_psql(
+ 'postgres', qq[
+ SELECT pg_create_logical_replication_slot('test_slot1', 'test_decoding');
+ SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding');
+]);
+$old_publisher->stop();
+
+# 2. Set 'max_replication_slots' to be less than the number of slots (2)
+# present on the old cluster.
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 1");
+
+# pg_upgrade will fail because the new cluster has insufficient
+# max_replication_slots
+command_checks_all(
+ [@pg_upgrade_cmd],
+ 1,
+ [
+ qr/max_replication_slots \(1\) must be greater than or equal to the number of logical replication slots \(2\) on the old cluster/
+ ],
+ [qr//],
+ 'run of pg_upgrade where the new cluster has insufficient max_replication_slots'
+);
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+ "pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Set 'max_replication_slots' to match the number of slots (2) present on the
+# old cluster. Both slots will be used for subsequent tests.
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 2");
+
+
+# ------------------------------
+# TEST: Confirm pg_upgrade fails when the slot still has unconsumed WAL records
+
+# Preparations for the subsequent test:
+# 1. Generate extra WAL records. At this point neither test_slot1 nor
+# test_slot2 has consumed them.
+#
+# 2. Advance the slot test_slot2 up to the current WAL location, but test_slot1
+# still has unconsumed WAL records.
+#
+# 3. Emit a non-transactional message. This will cause test_slot2 to detect the
+# unconsumed WAL record.
+$old_publisher->start;
+$old_publisher->safe_psql(
+ 'postgres', qq[
+ CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;
+ SELECT pg_replication_slot_advance('test_slot2', pg_current_wal_lsn());
+ SELECT count(*) FROM pg_logical_emit_message('false', 'prefix', 'This is a non-transactional message');
+]);
+$old_publisher->stop;
+
+# pg_upgrade will fail because there are slots still having unconsumed WAL
+# records
+command_checks_all(
+ [@pg_upgrade_cmd],
+ 1,
+ [
+ qr/Your installation contains logical replication slots that can't be upgraded./
+ ],
+ [qr//],
+ 'run of pg_upgrade of old cluster with slots having unconsumed WAL records'
+);
+
+# Verify the reason why the logical replication slot cannot be upgraded
+my $slots_filename;
+
+# Find a txt file that contains a list of logical replication slots that cannot
+# be upgraded. We cannot predict the file's path because the output directory
+# contains a milliseconds timestamp. File::Find::find must be used.
+find(
+ sub {
+ if ($File::Find::name =~ m/invalid_logical_replication_slots\.txt/)
+ {
+ $slots_filename = $File::Find::name;
+ }
+ },
+ $new_publisher->data_dir . "/pg_upgrade_output.d");
+
+# Check the file content. Both slots should be reporting that they have
+# unconsumed WAL records.
+like(
+ slurp_file($slots_filename),
+ qr/The slot \"test_slot1\" has not consumed the WAL yet/m,
+ 'the previous test failed due to unconsumed WALs');
+like(
+ slurp_file($slots_filename),
+ qr/The slot \"test_slot2\" has not consumed the WAL yet/m,
+ 'the previous test failed due to unconsumed WALs');
+
+
+# ------------------------------
+# TEST: Successful upgrade
+
+# Preparations for the subsequent test:
+# 1. Setup logical replication (first, cleanup slots from the previous tests)
+my $old_connstr = $old_publisher->connstr . ' dbname=postgres';
+
+$old_publisher->start;
+$old_publisher->safe_psql(
+ 'postgres', qq[
+ SELECT * FROM pg_drop_replication_slot('test_slot1');
+ SELECT * FROM pg_drop_replication_slot('test_slot2');
+ CREATE PUBLICATION regress_pub FOR ALL TABLES;
+]);
+
+# Initialize subscriber cluster
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init();
+
+$subscriber->start;
+$subscriber->safe_psql(
+ 'postgres', qq[
+ CREATE TABLE tbl (a int);
+ CREATE SUBSCRIPTION regress_sub CONNECTION '$old_connstr' PUBLICATION regress_pub WITH (two_phase = 'true')
+]);
+$subscriber->wait_for_subscription_sync($old_publisher, 'regress_sub');
+
+# 2. Temporarily disable the subscription
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION regress_sub DISABLE");
+$old_publisher->stop;
+
+# pg_upgrade should be successful
+command_ok([@pg_upgrade_cmd], 'run of pg_upgrade of old cluster');
+
+# Check that the slot 'regress_sub' has migrated to the new cluster
+$new_publisher->start;
+my $result = $new_publisher->safe_psql('postgres',
+ "SELECT slot_name, two_phase FROM pg_replication_slots");
+is($result, qq(regress_sub|t), 'check the slot exists on new cluster');
+
+# Update the connection
+my $new_connstr = $new_publisher->connstr . ' dbname=postgres';
+$subscriber->safe_psql(
+ 'postgres', qq[
+ ALTER SUBSCRIPTION regress_sub CONNECTION '$new_connstr';
+ ALTER SUBSCRIPTION regress_sub ENABLE;
+]);
+
+# Check whether changes on the new publisher get replicated to the subscriber
+$new_publisher->safe_psql('postgres',
+ "INSERT INTO tbl VALUES (generate_series(11, 20))");
+$new_publisher->wait_for_catchup('regress_sub');
+$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+is($result, qq(20), 'check changes are replicated to the subscriber');
+
+# Clean up
+$subscriber->stop();
+$new_publisher->stop();
+
+done_testing();
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202310181
+#define CATALOG_VERSION_NO 202310261
#endif
proname => 'binary_upgrade_set_next_pg_tablespace_oid', provolatile => 'v',
proparallel => 'u', prorettype => 'void', proargtypes => 'oid',
prosrc => 'binary_upgrade_set_next_pg_tablespace_oid' },
+{ oid => '8046', descr => 'for use by pg_upgrade',
+ proname => 'binary_upgrade_logical_slot_has_caught_up', proisstrict => 'f',
+ provolatile => 'v', proparallel => 'u', prorettype => 'bool',
+ proargtypes => 'name',
+ prosrc => 'binary_upgrade_logical_slot_has_caught_up' },
# conversion functions
{ oid => '4302',
TransactionId write_xid;
/* Are we processing the end LSN of a transaction? */
bool end_xact;
+
+ /* Do we need to process any change in fast_forward mode? */
+ bool processing_required;
} LogicalDecodingContext;
extern void ResetLogicalStreamingState(void);
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
+extern bool LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal);
+
#endif
LogicalRepWorker
LogicalRepWorkerType
LogicalRewriteMappingData
+LogicalSlotInfo
+LogicalSlotInfoArr
LogicalTape
LogicalTapeSet
LsnReadQueue