Migrate logical slots to the new node during an upgrade.
authorAmit Kapila <[email protected]>
Thu, 26 Oct 2023 01:24:16 +0000 (06:54 +0530)
committerAmit Kapila <[email protected]>
Thu, 26 Oct 2023 01:36:55 +0000 (07:06 +0530)
While reading information from the old cluster, a list of logical
slots is fetched. At the later part of upgrading, pg_upgrade revisits the
list and restores slots by executing pg_create_logical_replication_slot()
on the new cluster. Migration of logical replication slots is only
supported when the old cluster is version 17.0 or later.

If the old node has invalid slots or slots with unconsumed WAL records,
the pg_upgrade fails. These checks are needed to prevent data loss.

The significant advantage of this commit is that it makes it easy to
continue logical replication even after upgrading the publisher node.
Previously, pg_upgrade allowed copying publications to a new node. With
this patch, adjusting the connection string to the new publisher will
cause the apply worker on the subscriber to connect to the new publisher
automatically. This enables seamless continuation of logical replication,
even after an upgrade.

Author: Hayato Kuroda, Hou Zhijie
Reviewed-by: Peter Smith, Bharath Rupireddy, Dilip Kumar, Vignesh C, Shlok Kyal
Discussion: http://postgr.es/m/TYAPR01MB58664C81887B3AF2EB6B16E3F5939@TYAPR01MB5866.jpnprd01.prod.outlook.com
Discussion: http://postgr.es/m/CAA4eK1+t7xYcfa0rEQw839=b2MzsfvYDPz3xbD+ZqOdP3zpKYg@mail.gmail.com

18 files changed:
doc/src/sgml/ref/pgupgrade.sgml
src/backend/replication/logical/decode.c
src/backend/replication/logical/logical.c
src/backend/replication/slot.c
src/backend/utils/adt/pg_upgrade_support.c
src/bin/pg_upgrade/Makefile
src/bin/pg_upgrade/check.c
src/bin/pg_upgrade/function.c
src/bin/pg_upgrade/info.c
src/bin/pg_upgrade/meson.build
src/bin/pg_upgrade/pg_upgrade.c
src/bin/pg_upgrade/pg_upgrade.h
src/bin/pg_upgrade/server.c
src/bin/pg_upgrade/t/003_upgrade_logical_replication_slots.pl [new file with mode: 0644]
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/replication/logical.h
src/tools/pgindent/typedefs.list

index 608193b307063e28912d65ffff46e6aa08ed5e43..46e8a0b7467e17c7bcd964e64681d6df2b1c9eee 100644 (file)
@@ -383,6 +383,79 @@ make prefix=/usr/local/pgsql.new install
     </para>
    </step>
 
+   <step>
+    <title>Prepare for publisher upgrades</title>
+
+    <para>
+     <application>pg_upgrade</application> attempts to migrate logical
+     slots. This helps avoid the need for manually defining the same
+     logical slots on the new publisher. Migration of logical slots is
+     only supported when the old cluster is version 17.0 or later.
+     Logical slots on clusters before version 17.0 will silently be
+     ignored.
+    </para>
+
+    <para>
+     Before you start upgrading the publisher cluster, ensure that the
+     subscription is temporarily disabled, by executing
+     <link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... DISABLE</command></link>.
+     Re-enable the subscription after the upgrade.
+    </para>
+
+    <para>
+     There are some prerequisites for <application>pg_upgrade</application> to
+     be able to upgrade the logical slots. If these are not met an error
+     will be reported.
+    </para>
+
+    <itemizedlist>
+     <listitem>
+      <para>
+       The new cluster must have
+       <link linkend="guc-wal-level"><varname>wal_level</varname></link> as
+       <literal>logical</literal>.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       The new cluster must have
+       <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
+       configured to a value greater than or equal to the number of slots
+       present in the old cluster.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       The output plugins referenced by the slots on the old cluster must be
+       installed in the new PostgreSQL executable directory.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       The old cluster has replicated all the transactions and logical decoding
+       messages to subscribers.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       All slots on the old cluster must be usable, i.e., there are no slots
+       whose
+       <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>conflicting</structfield>
+       is <literal>true</literal>.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       The new cluster must not have permanent logical slots, i.e.,
+       there must be no slots where
+       <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>temporary</structfield>
+       is <literal>false</literal>.
+      </para>
+     </listitem>
+    </itemizedlist>
+
+   </step>
+
    <step>
     <title>Stop both servers</title>
 
@@ -650,8 +723,9 @@ rsync --archive --delete --hard-links --size-only --no-inc-recursive /vol1/pg_tb
        Configure the servers for log shipping.  (You do not need to run
        <function>pg_backup_start()</function> and <function>pg_backup_stop()</function>
        or take a file system backup as the standbys are still synchronized
-       with the primary.)  Replication slots are not copied and must
-       be recreated.
+       with the primary.)  Only logical slots on the primary are copied to the
+       new standby, but other slots on the old standby are not copied so must
+       be recreated manually.
       </para>
      </step>
 
index 24b712aa667873dd9ad5b264ebe3cf138206dcfb..1237118e84fa9c28a73bfaa5313ad953a95e24cb 100644 (file)
@@ -600,12 +600,8 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
        ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
 
-       /*
-        * If we don't have snapshot or we are just fast-forwarding, there is no
-        * point in decoding messages.
-        */
-       if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
-               ctx->fast_forward)
+       /* If we don't have snapshot, there is no point in decoding messages */
+       if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
                return;
 
        message = (xl_logical_message *) XLogRecGetData(r);
@@ -622,6 +618,26 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
                          SnapBuildXactNeedsSkip(builder, buf->origptr)))
                return;
 
+       /*
+        * We also skip decoding in fast_forward mode. This check must be last
+        * because we don't want to set the processing_required flag unless we
+        * have a decodable message.
+        */
+       if (ctx->fast_forward)
+       {
+               /*
+                * We need to set processing_required flag to notify the message's
+                * existence to the caller. Usually, the flag is set when either the
+                * COMMIT or ABORT records are decoded, but this must be turned on
+                * here because the non-transactional logical message is decoded
+                * without waiting for these records.
+                */
+               if (!message->transactional)
+                       ctx->processing_required = true;
+
+               return;
+       }
+
        /*
         * If this is a non-transactional change, get the snapshot we're expected
         * to use. We only get here when the snapshot is consistent, and the
@@ -1286,7 +1302,21 @@ static bool
 DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
                                  Oid txn_dbid, RepOriginId origin_id)
 {
-       return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
-                       (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
-                       ctx->fast_forward || FilterByOrigin(ctx, origin_id));
+       if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+               (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
+               FilterByOrigin(ctx, origin_id))
+               return true;
+
+       /*
+        * We also skip decoding in fast_forward mode. In passing set the
+        * processing_required flag to indicate that if it were not for
+        * fast_forward mode, processing would have been required.
+        */
+       if (ctx->fast_forward)
+       {
+               ctx->processing_required = true;
+               return true;
+       }
+
+       return false;
 }
index 41243d0187aac8553f393cf5224cd2d72fd88ec0..8288da5277fb16b96bbb329f3fab825c973f0fb9 100644 (file)
@@ -29,6 +29,7 @@
 #include "postgres.h"
 
 #include "access/xact.h"
+#include "access/xlogutils.h"
 #include "access/xlog_internal.h"
 #include "fmgr.h"
 #include "miscadmin.h"
@@ -41,6 +42,7 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
+#include "utils/inval.h"
 #include "utils/memutils.h"
 
 /* data for errcontext callback */
@@ -1949,3 +1951,76 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
        rb->totalTxns = 0;
        rb->totalBytes = 0;
 }
+
+/*
+ * Read up to the end of WAL starting from the decoding slot's restart_lsn.
+ * Return true if any meaningful/decodable WAL records are encountered,
+ * otherwise false.
+ */
+bool
+LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
+{
+       bool            has_pending_wal = false;
+
+       Assert(MyReplicationSlot);
+
+       PG_TRY();
+       {
+               LogicalDecodingContext *ctx;
+
+               /*
+                * Create our decoding context in fast_forward mode, passing start_lsn
+                * as InvalidXLogRecPtr, so that we start processing from the slot's
+                * confirmed_flush.
+                */
+               ctx = CreateDecodingContext(InvalidXLogRecPtr,
+                                                                       NIL,
+                                                                       true,   /* fast_forward */
+                                                                       XL_ROUTINE(.page_read = read_local_xlog_page,
+                                                                                          .segment_open = wal_segment_open,
+                                                                                          .segment_close = wal_segment_close),
+                                                                       NULL, NULL, NULL);
+
+               /*
+                * Start reading at the slot's restart_lsn, which we know points to a
+                * valid record.
+                */
+               XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
+
+               /* Invalidate non-timetravel entries */
+               InvalidateSystemCaches();
+
+               /* Loop until the end of WAL or some changes are processed */
+               while (!has_pending_wal && ctx->reader->EndRecPtr < end_of_wal)
+               {
+                       XLogRecord *record;
+                       char       *errm = NULL;
+
+                       record = XLogReadRecord(ctx->reader, &errm);
+
+                       if (errm)
+                               elog(ERROR, "could not find record for logical decoding: %s", errm);
+
+                       if (record != NULL)
+                               LogicalDecodingProcessRecord(ctx, ctx->reader);
+
+                       has_pending_wal = ctx->processing_required;
+
+                       CHECK_FOR_INTERRUPTS();
+               }
+
+               /* Clean up */
+               FreeDecodingContext(ctx);
+               InvalidateSystemCaches();
+       }
+       PG_CATCH();
+       {
+               /* clear all timetravel entries */
+               InvalidateSystemCaches();
+
+               PG_RE_THROW();
+       }
+       PG_END_TRY();
+
+       return has_pending_wal;
+}
index 7e5ec500d8960b9a7d1f14016d2aef41baac975f..99823df3c7d8f4cffeb3fdbef209206b2495d059 100644 (file)
@@ -1423,6 +1423,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 
                SpinLockRelease(&s->mutex);
 
+               /*
+                * The logical replication slots shouldn't be invalidated as
+                * max_slot_wal_keep_size GUC is set to -1 during the upgrade.
+                *
+                * The following is just a sanity check.
+                */
+               if (*invalidated && SlotIsLogical(s) && IsBinaryUpgrade)
+               {
+                       ereport(ERROR,
+                                       errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                       errmsg("replication slots must not be invalidated during the upgrade"),
+                                       errhint("\"max_slot_wal_keep_size\" must be set to -1 during the upgrade"));
+               }
+
                if (active_pid != 0)
                {
                        /*
index 0186636d9f89c3b95ed1146690474a3b663bd2cc..2f6fc86c3df3c14bec7fdd7728aba9ca45036d10 100644 (file)
@@ -17,6 +17,7 @@
 #include "catalog/pg_type.h"
 #include "commands/extension.h"
 #include "miscadmin.h"
+#include "replication/logical.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 
@@ -261,3 +262,46 @@ binary_upgrade_set_missing_value(PG_FUNCTION_ARGS)
 
        PG_RETURN_VOID();
 }
+
+/*
+ * Verify the given slot has already consumed all the WAL changes.
+ *
+ * Returns true if there are no decodable WAL records after the
+ * confirmed_flush_lsn. Otherwise false.
+ *
+ * This is a special purpose function to ensure that the given slot can be
+ * upgraded without data loss.
+ */
+Datum
+binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
+{
+       Name            slot_name;
+       XLogRecPtr      end_of_wal;
+       bool            found_pending_wal;
+
+       CHECK_IS_BINARY_UPGRADE;
+
+       /* We must check before dereferencing the argument */
+       if (PG_ARGISNULL(0))
+               elog(ERROR, "null argument to binary_upgrade_validate_wal_records is not allowed");
+
+       CheckSlotPermissions();
+
+       slot_name = PG_GETARG_NAME(0);
+
+       /* Acquire the given slot */
+       ReplicationSlotAcquire(NameStr(*slot_name), true);
+
+       Assert(SlotIsLogical(MyReplicationSlot));
+
+       /* Slots must be valid as otherwise we won't be able to scan the WAL */
+       Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
+
+       end_of_wal = GetFlushRecPtr(NULL);
+       found_pending_wal = LogicalReplicationSlotHasPendingWal(end_of_wal);
+
+       /* Clean up */
+       ReplicationSlotRelease();
+
+       PG_RETURN_BOOL(!found_pending_wal);
+}
index 5834513add4070f76fda50aa5f9408d67e8f99e6..05e929965445a2ffb8009333d971d517bf49fe56 100644 (file)
@@ -3,6 +3,9 @@
 PGFILEDESC = "pg_upgrade - an in-place binary upgrade utility"
 PGAPPICON = win32
 
+# required for 003_upgrade_logical_replication_slots.pl
+EXTRA_INSTALL=contrib/test_decoding
+
 subdir = src/bin/pg_upgrade
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
index 21a0ff9e42d60fa20e831b953355f7cad1cfb680..179f85ae8a8e9d4388c65aae22d7c8c16399859e 100644 (file)
@@ -33,6 +33,8 @@ static void check_for_jsonb_9_4_usage(ClusterInfo *cluster);
 static void check_for_pg_role_prefix(ClusterInfo *cluster);
 static void check_for_new_tablespace_dir(void);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
+static void check_new_cluster_logical_replication_slots(void);
+static void check_old_cluster_for_valid_slots(bool live_check);
 
 
 /*
@@ -89,8 +91,11 @@ check_and_dump_old_cluster(bool live_check)
        if (!live_check)
                start_postmaster(&old_cluster, true);
 
-       /* Extract a list of databases and tables from the old cluster */
-       get_db_and_rel_infos(&old_cluster);
+       /*
+        * Extract a list of databases, tables, and logical replication slots from
+        * the old cluster.
+        */
+       get_db_rel_and_slot_infos(&old_cluster, live_check);
 
        init_tablespaces();
 
@@ -107,6 +112,13 @@ check_and_dump_old_cluster(bool live_check)
        check_for_reg_data_type_usage(&old_cluster);
        check_for_isn_and_int8_passing_mismatch(&old_cluster);
 
+       /*
+        * Logical replication slots can be migrated since PG17. See comments atop
+        * get_old_cluster_logical_slot_infos().
+        */
+       if (GET_MAJOR_VERSION(old_cluster.major_version) >= 1700)
+               check_old_cluster_for_valid_slots(live_check);
+
        /*
         * PG 16 increased the size of the 'aclitem' type, which breaks the
         * on-disk format for existing data.
@@ -200,7 +212,7 @@ check_and_dump_old_cluster(bool live_check)
 void
 check_new_cluster(void)
 {
-       get_db_and_rel_infos(&new_cluster);
+       get_db_rel_and_slot_infos(&new_cluster, false);
 
        check_new_cluster_is_empty();
 
@@ -223,6 +235,8 @@ check_new_cluster(void)
        check_for_prepared_transactions(&new_cluster);
 
        check_for_new_tablespace_dir();
+
+       check_new_cluster_logical_replication_slots();
 }
 
 
@@ -1451,3 +1465,151 @@ check_for_user_defined_encoding_conversions(ClusterInfo *cluster)
        else
                check_ok();
 }
+
+/*
+ * check_new_cluster_logical_replication_slots()
+ *
+ * Verify that there are no logical replication slots on the new cluster and
+ * that the parameter settings necessary for creating slots are sufficient.
+ */
+static void
+check_new_cluster_logical_replication_slots(void)
+{
+       PGresult   *res;
+       PGconn     *conn;
+       int                     nslots_on_old;
+       int                     nslots_on_new;
+       int                     max_replication_slots;
+       char       *wal_level;
+
+       /* Logical slots can be migrated since PG17. */
+       if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
+               return;
+
+       nslots_on_old = count_old_cluster_logical_slots();
+
+       /* Quick return if there are no logical slots to be migrated. */
+       if (nslots_on_old == 0)
+               return;
+
+       conn = connectToServer(&new_cluster, "template1");
+
+       prep_status("Checking for new cluster logical replication slots");
+
+       res = executeQueryOrDie(conn, "SELECT count(*) "
+                                                       "FROM pg_catalog.pg_replication_slots "
+                                                       "WHERE slot_type = 'logical' AND "
+                                                       "temporary IS FALSE;");
+
+       if (PQntuples(res) != 1)
+               pg_fatal("could not count the number of logical replication slots");
+
+       nslots_on_new = atoi(PQgetvalue(res, 0, 0));
+
+       if (nslots_on_new)
+               pg_fatal("Expected 0 logical replication slots but found %d.",
+                                nslots_on_new);
+
+       PQclear(res);
+
+       res = executeQueryOrDie(conn, "SELECT setting FROM pg_settings "
+                                                       "WHERE name IN ('wal_level', 'max_replication_slots') "
+                                                       "ORDER BY name DESC;");
+
+       if (PQntuples(res) != 2)
+               pg_fatal("could not determine parameter settings on new cluster");
+
+       wal_level = PQgetvalue(res, 0, 0);
+
+       if (strcmp(wal_level, "logical") != 0)
+               pg_fatal("wal_level must be \"logical\", but is set to \"%s\"",
+                                wal_level);
+
+       max_replication_slots = atoi(PQgetvalue(res, 1, 0));
+
+       if (nslots_on_old > max_replication_slots)
+               pg_fatal("max_replication_slots (%d) must be greater than or equal to the number of "
+                                "logical replication slots (%d) on the old cluster",
+                                max_replication_slots, nslots_on_old);
+
+       PQclear(res);
+       PQfinish(conn);
+
+       check_ok();
+}
+
+/*
+ * check_old_cluster_for_valid_slots()
+ *
+ * Verify that all the logical slots are valid and have consumed all the WAL
+ * before shutdown.
+ */
+static void
+check_old_cluster_for_valid_slots(bool live_check)
+{
+       char            output_path[MAXPGPATH];
+       FILE       *script = NULL;
+
+       prep_status("Checking for valid logical replication slots");
+
+       snprintf(output_path, sizeof(output_path), "%s/%s",
+                        log_opts.basedir,
+                        "invalid_logical_replication_slots.txt");
+
+       for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+       {
+               LogicalSlotInfoArr *slot_arr = &old_cluster.dbarr.dbs[dbnum].slot_arr;
+
+               for (int slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+               {
+                       LogicalSlotInfo *slot = &slot_arr->slots[slotnum];
+
+                       /* Is the slot usable? */
+                       if (slot->invalid)
+                       {
+                               if (script == NULL &&
+                                       (script = fopen_priv(output_path, "w")) == NULL)
+                                       pg_fatal("could not open file \"%s\": %s",
+                                                        output_path, strerror(errno));
+
+                               fprintf(script, "The slot \"%s\" is invalid\n",
+                                               slot->slotname);
+
+                               continue;
+                       }
+
+                       /*
+                        * Do additional check to ensure that all logical replication
+                        * slots have consumed all the WAL before shutdown.
+                        *
+                        * Note: This can be satisfied only when the old cluster has been
+                        * shut down, so we skip this for live checks.
+                        */
+                       if (!live_check && !slot->caught_up)
+                       {
+                               if (script == NULL &&
+                                       (script = fopen_priv(output_path, "w")) == NULL)
+                                       pg_fatal("could not open file \"%s\": %s",
+                                                        output_path, strerror(errno));
+
+                               fprintf(script,
+                                               "The slot \"%s\" has not consumed the WAL yet\n",
+                                               slot->slotname);
+                       }
+               }
+       }
+
+       if (script)
+       {
+               fclose(script);
+
+               pg_log(PG_REPORT, "fatal");
+               pg_fatal("Your installation contains logical replication slots that can't be upgraded.\n"
+                                "You can remove invalid slots and/or consume the pending WAL for other slots,\n"
+                                "and then restart the upgrade.\n"
+                                "A list of the problematic slots is in the file:\n"
+                                "    %s", output_path);
+       }
+
+       check_ok();
+}
index dc8800c7cdeeb1c7549dc95a88b3469f9a8b1d16..5af936bd4588affddcb492abafbaf82b572db59d 100644 (file)
@@ -46,7 +46,9 @@ library_name_compare(const void *p1, const void *p2)
 /*
  * get_loadable_libraries()
  *
- *     Fetch the names of all old libraries containing C-language functions.
+ *     Fetch the names of all old libraries containing either C-language functions
+ *     or are corresponding to logical replication output plugins.
+ *
  *     We will later check that they all exist in the new installation.
  */
 void
@@ -55,6 +57,7 @@ get_loadable_libraries(void)
        PGresult  **ress;
        int                     totaltups;
        int                     dbnum;
+       int                     n_libinfos;
 
        ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult *));
        totaltups = 0;
@@ -81,7 +84,12 @@ get_loadable_libraries(void)
                PQfinish(conn);
        }
 
-       os_info.libraries = (LibraryInfo *) pg_malloc(totaltups * sizeof(LibraryInfo));
+       /*
+        * Allocate memory for required libraries and logical replication output
+        * plugins.
+        */
+       n_libinfos = totaltups + count_old_cluster_logical_slots();
+       os_info.libraries = (LibraryInfo *) pg_malloc(sizeof(LibraryInfo) * n_libinfos);
        totaltups = 0;
 
        for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
@@ -89,6 +97,7 @@ get_loadable_libraries(void)
                PGresult   *res = ress[dbnum];
                int                     ntups;
                int                     rowno;
+               LogicalSlotInfoArr *slot_arr = &old_cluster.dbarr.dbs[dbnum].slot_arr;
 
                ntups = PQntuples(res);
                for (rowno = 0; rowno < ntups; rowno++)
@@ -101,6 +110,23 @@ get_loadable_libraries(void)
                        totaltups++;
                }
                PQclear(res);
+
+               /*
+                * Store the names of output plugins as well. There is a possibility
+                * that duplicated plugins are set, but the consumer function
+                * check_loadable_libraries() will avoid checking the same library, so
+                * we do not have to consider their uniqueness here.
+                */
+               for (int slotno = 0; slotno < slot_arr->nslots; slotno++)
+               {
+                       if (slot_arr->slots[slotno].invalid)
+                               continue;
+
+                       os_info.libraries[totaltups].name = pg_strdup(slot_arr->slots[slotno].plugin);
+                       os_info.libraries[totaltups].dbnum = dbnum;
+
+                       totaltups++;
+               }
        }
 
        pg_free(ress);
index aa5faca4d615e79e0843e93ba813390a52bad6b7..7f21d26fd2334763afb5bb9e2182d9aef2d5b1be 100644 (file)
@@ -26,6 +26,8 @@ static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
 static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
+static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
+static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check);
 
 
 /*
@@ -266,13 +268,15 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo *db, bool is_new_db)
 }
 
 /*
- * get_db_and_rel_infos()
+ * get_db_rel_and_slot_infos()
  *
  * higher level routine to generate dbinfos for the database running
  * on the given "port". Assumes that server is already running.
+ *
+ * live_check would be used only when the target is the old cluster.
  */
 void
-get_db_and_rel_infos(ClusterInfo *cluster)
+get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check)
 {
        int                     dbnum;
 
@@ -283,7 +287,17 @@ get_db_and_rel_infos(ClusterInfo *cluster)
        get_db_infos(cluster);
 
        for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-               get_rel_infos(cluster, &cluster->dbarr.dbs[dbnum]);
+       {
+               DbInfo     *pDbInfo = &cluster->dbarr.dbs[dbnum];
+
+               get_rel_infos(cluster, pDbInfo);
+
+               /*
+                * Retrieve the logical replication slots infos for the old cluster.
+                */
+               if (cluster == &old_cluster)
+                       get_old_cluster_logical_slot_infos(pDbInfo, live_check);
+       }
 
        if (cluster == &old_cluster)
                pg_log(PG_VERBOSE, "\nsource databases:");
@@ -600,6 +614,125 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
        dbinfo->rel_arr.nrels = num_rels;
 }
 
+/*
+ * get_old_cluster_logical_slot_infos()
+ *
+ * Gets the LogicalSlotInfos for all the logical replication slots of the
+ * database referred to by "dbinfo". The status of each logical slot is gotten
+ * here, but they are used at the checking phase. See
+ * check_old_cluster_for_valid_slots().
+ *
+ * Note: This function will not do anything if the old cluster is pre-PG17.
+ * This is because before that the logical slots are not saved at shutdown, so
+ * there is no guarantee that the latest confirmed_flush_lsn is saved to disk
+ * which can lead to data loss. It is still not guaranteed for manually created
+ * slots in PG17, so subsequent checks done in
+ * check_old_cluster_for_valid_slots() would raise a FATAL error if such slots
+ * are included.
+ */
+static void
+get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
+{
+       PGconn     *conn;
+       PGresult   *res;
+       LogicalSlotInfo *slotinfos = NULL;
+       int                     num_slots = 0;
+
+       /* Logical slots can be migrated since PG17. */
+       if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
+       {
+               dbinfo->slot_arr.slots = slotinfos;
+               dbinfo->slot_arr.nslots = num_slots;
+               return;
+       }
+
+       conn = connectToServer(&old_cluster, dbinfo->db_name);
+
+       /*
+        * Fetch the logical replication slot information. The check whether the
+        * slot is considered caught up is done by an upgrade function. This
+        * regards the slot as caught up if we don't find any decodable changes.
+        * See binary_upgrade_logical_slot_has_caught_up().
+        *
+        * Note that we can't ensure whether the slot is caught up during
+        * live_check as the new WAL records could be generated.
+        *
+        * We intentionally skip checking the WALs for invalidated slots as the
+        * corresponding WALs could have been removed for such slots.
+        *
+        * The temporary slots are explicitly ignored while checking because such
+        * slots cannot exist after the upgrade. During the upgrade, clusters are
+        * started and stopped several times causing any temporary slots to be
+        * removed.
+        */
+       res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, "
+                                                       "%s as caught_up, conflicting as invalid "
+                                                       "FROM pg_catalog.pg_replication_slots "
+                                                       "WHERE slot_type = 'logical' AND "
+                                                       "database = current_database() AND "
+                                                       "temporary IS FALSE;",
+                                                       live_check ? "FALSE" :
+                                                       "(CASE WHEN conflicting THEN FALSE "
+                                                       "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
+                                                       "END)");
+
+       num_slots = PQntuples(res);
+
+       if (num_slots)
+       {
+               int                     i_slotname;
+               int                     i_plugin;
+               int                     i_twophase;
+               int                     i_caught_up;
+               int                     i_invalid;
+
+               slotinfos = (LogicalSlotInfo *) pg_malloc(sizeof(LogicalSlotInfo) * num_slots);
+
+               i_slotname = PQfnumber(res, "slot_name");
+               i_plugin = PQfnumber(res, "plugin");
+               i_twophase = PQfnumber(res, "two_phase");
+               i_caught_up = PQfnumber(res, "caught_up");
+               i_invalid = PQfnumber(res, "invalid");
+
+               for (int slotnum = 0; slotnum < num_slots; slotnum++)
+               {
+                       LogicalSlotInfo *curr = &slotinfos[slotnum];
+
+                       curr->slotname = pg_strdup(PQgetvalue(res, slotnum, i_slotname));
+                       curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin));
+                       curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0);
+                       curr->caught_up = (strcmp(PQgetvalue(res, slotnum, i_caught_up), "t") == 0);
+                       curr->invalid = (strcmp(PQgetvalue(res, slotnum, i_invalid), "t") == 0);
+               }
+       }
+
+       PQclear(res);
+       PQfinish(conn);
+
+       dbinfo->slot_arr.slots = slotinfos;
+       dbinfo->slot_arr.nslots = num_slots;
+}
+
+
+/*
+ * count_old_cluster_logical_slots()
+ *
+ * Returns the number of logical replication slots for all databases.
+ *
+ * Note: this function always returns 0 if the old_cluster is PG16 and prior
+ * because we gather slot information only for cluster versions greater than or
+ * equal to PG17. See get_old_cluster_logical_slot_infos().
+ */
+int
+count_old_cluster_logical_slots(void)
+{
+       int                     slot_count = 0;
+
+       for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+               slot_count += old_cluster.dbarr.dbs[dbnum].slot_arr.nslots;
+
+       return slot_count;
+}
 
 static void
 free_db_and_rel_infos(DbInfoArr *db_arr)
@@ -642,8 +775,11 @@ print_db_infos(DbInfoArr *db_arr)
 
        for (dbnum = 0; dbnum < db_arr->ndbs; dbnum++)
        {
-               pg_log(PG_VERBOSE, "Database: \"%s\"", db_arr->dbs[dbnum].db_name);
-               print_rel_infos(&db_arr->dbs[dbnum].rel_arr);
+               DbInfo     *pDbInfo = &db_arr->dbs[dbnum];
+
+               pg_log(PG_VERBOSE, "Database: \"%s\"", pDbInfo->db_name);
+               print_rel_infos(&pDbInfo->rel_arr);
+               print_slot_infos(&pDbInfo->slot_arr);
        }
 }
 
@@ -660,3 +796,23 @@ print_rel_infos(RelInfoArr *rel_arr)
                           rel_arr->rels[relnum].reloid,
                           rel_arr->rels[relnum].tablespace);
 }
+
+static void
+print_slot_infos(LogicalSlotInfoArr *slot_arr)
+{
+       /* Quick return if there are no logical slots. */
+       if (slot_arr->nslots == 0)
+               return;
+
+       pg_log(PG_VERBOSE, "Logical replication slots within the database:");
+
+       for (int slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+       {
+               LogicalSlotInfo *slot_info = &slot_arr->slots[slotnum];
+
+               pg_log(PG_VERBOSE, "slot_name: \"%s\", plugin: \"%s\", two_phase: %s",
+                          slot_info->slotname,
+                          slot_info->plugin,
+                          slot_info->two_phase ? "true" : "false");
+       }
+}
index 12a97f84e299ec22048e4d02d1c40e2d9055564a..2c4f38d865bc521d62b1ce88fe20d65b7b4f153e 100644 (file)
@@ -42,6 +42,7 @@ tests += {
     'tests': [
       't/001_basic.pl',
       't/002_pg_upgrade.pl',
+      't/003_upgrade_logical_replication_slots.pl',
     ],
     'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
   },
index 96bfb67167f736b3b2ceef0f31e9da50a9b83328..3960af40368a2681a3a945bf4bb4db6dfe0edcee 100644 (file)
@@ -59,6 +59,7 @@ static void copy_xact_xlog_xid(void);
 static void set_frozenxids(bool minmxid_only);
 static void make_outputdirs(char *pgdata);
 static void setup(char *argv0, bool *live_check);
+static void create_logical_replication_slots(void);
 
 ClusterInfo old_cluster,
                        new_cluster;
@@ -188,6 +189,21 @@ main(int argc, char **argv)
                          new_cluster.pgdata);
        check_ok();
 
+       /*
+        * Migrate the logical slots to the new cluster.  Note that we need to do
+        * this after resetting WAL because otherwise the required WAL would be
+        * removed and slots would become unusable.  There is a possibility that
+        * background processes might generate some WAL before we could create the
+        * slots in the new cluster but we can ignore that WAL as that won't be
+        * required downstream.
+        */
+       if (count_old_cluster_logical_slots())
+       {
+               start_postmaster(&new_cluster, true);
+               create_logical_replication_slots();
+               stop_postmaster(false);
+       }
+
        if (user_opts.do_sync)
        {
                prep_status("Sync data directory to disk");
@@ -593,7 +609,7 @@ create_new_objects(void)
                set_frozenxids(true);
 
        /* update new_cluster info now that we have objects in the databases */
-       get_db_and_rel_infos(&new_cluster);
+       get_db_rel_and_slot_infos(&new_cluster, false);
 }
 
 /*
@@ -862,3 +878,59 @@ set_frozenxids(bool minmxid_only)
 
        check_ok();
 }
+
+/*
+ * create_logical_replication_slots()
+ *
+ * Similar to create_new_objects() but only restores logical replication slots.
+ */
+static void
+create_logical_replication_slots(void)
+{
+       prep_status_progress("Restoring logical replication slots in the new cluster");
+
+       for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+       {
+               DbInfo     *old_db = &old_cluster.dbarr.dbs[dbnum];
+               LogicalSlotInfoArr *slot_arr = &old_db->slot_arr;
+               PGconn     *conn;
+               PQExpBuffer query;
+
+               /* Skip this database if there are no slots */
+               if (slot_arr->nslots == 0)
+                       continue;
+
+               conn = connectToServer(&new_cluster, old_db->db_name);
+               query = createPQExpBuffer();
+
+               pg_log(PG_STATUS, "%s", old_db->db_name);
+
+               for (int slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+               {
+                       LogicalSlotInfo *slot_info = &slot_arr->slots[slotnum];
+
+                       /* Constructs a query for creating logical replication slots */
+                       appendPQExpBuffer(query,
+                                                         "SELECT * FROM "
+                                                         "pg_catalog.pg_create_logical_replication_slot(");
+                       appendStringLiteralConn(query, slot_info->slotname, conn);
+                       appendPQExpBuffer(query, ", ");
+                       appendStringLiteralConn(query, slot_info->plugin, conn);
+                       appendPQExpBuffer(query, ", false, %s);",
+                                                         slot_info->two_phase ? "true" : "false");
+
+                       PQclear(executeQueryOrDie(conn, "%s", query->data));
+
+                       resetPQExpBuffer(query);
+               }
+
+               PQfinish(conn);
+
+               destroyPQExpBuffer(query);
+       }
+
+       end_progress_output();
+       check_ok();
+
+       return;
+}
index 842f3b6cd377ec163850ded0b1da1334ae553b22..ba8129d1354a6f6359fb4d70388db62c7af95727 100644 (file)
@@ -150,6 +150,24 @@ typedef struct
        int                     nrels;
 } RelInfoArr;
 
+/*
+ * Structure to store logical replication slot information.
+ */
+typedef struct
+{
+       char       *slotname;           /* slot name */
+       char       *plugin;                     /* plugin */
+       bool            two_phase;              /* can the slot decode 2PC? */
+       bool            caught_up;              /* has the slot caught up to latest changes? */
+       bool            invalid;                /* if true, the slot is unusable */
+} LogicalSlotInfo;
+
+typedef struct
+{
+       int                     nslots;                 /* number of logical slot infos */
+       LogicalSlotInfo *slots;         /* array of logical slot infos */
+} LogicalSlotInfoArr;
+
 /*
  * The following structure represents a relation mapping.
  */
@@ -176,6 +194,7 @@ typedef struct
        char            db_tablespace[MAXPGPATH];       /* database default tablespace
                                                                                         * path */
        RelInfoArr      rel_arr;                /* array of all user relinfos */
+       LogicalSlotInfoArr slot_arr;    /* array of all LogicalSlotInfo */
 } DbInfo;
 
 /*
@@ -400,7 +419,8 @@ void                check_loadable_libraries(void);
 FileNameMap *gen_db_file_maps(DbInfo *old_db,
                                                          DbInfo *new_db, int *nmaps, const char *old_pgdata,
                                                          const char *new_pgdata);
-void           get_db_and_rel_infos(ClusterInfo *cluster);
+void           get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check);
+int                    count_old_cluster_logical_slots(void);
 
 /* option.c */
 
index 0bc3d2806b87095578b825ea9dff2b459a0e5704..d7f6c268ef4c65ad46893e4a08154a423d86cda1 100644 (file)
@@ -201,6 +201,7 @@ start_postmaster(ClusterInfo *cluster, bool report_and_exit_on_error)
        PGconn     *conn;
        bool            pg_ctl_return = false;
        char            socket_string[MAXPGPATH + 200];
+       PQExpBufferData pgoptions;
 
        static bool exit_hook_registered = false;
 
@@ -227,23 +228,41 @@ start_postmaster(ClusterInfo *cluster, bool report_and_exit_on_error)
                                 cluster->sockdir);
 #endif
 
+       initPQExpBuffer(&pgoptions);
+
        /*
-        * Use -b to disable autovacuum.
+        * Construct a parameter string which is passed to the server process.
         *
         * Turn off durability requirements to improve object creation speed, and
         * we only modify the new cluster, so only use it there.  If there is a
         * crash, the new cluster has to be recreated anyway.  fsync=off is a big
         * win on ext4.
         */
+       if (cluster == &new_cluster)
+               appendPQExpBufferStr(&pgoptions, " -c synchronous_commit=off -c fsync=off -c full_page_writes=off");
+
+       /*
+        * Use max_slot_wal_keep_size as -1 to prevent the WAL removal by the
+        * checkpointer process.  If WALs required by logical replication slots
+        * are removed, the slots are unusable.  This setting prevents the
+        * invalidation of slots during the upgrade. We set this option when
+        * cluster is PG17 or later because logical replication slots can only be
+        * migrated since then. Besides, max_slot_wal_keep_size is added in PG13.
+        */
+       if (GET_MAJOR_VERSION(cluster->major_version) >= 1700)
+               appendPQExpBufferStr(&pgoptions, " -c max_slot_wal_keep_size=-1");
+
+       /* Use -b to disable autovacuum. */
        snprintf(cmd, sizeof(cmd),
                         "\"%s/pg_ctl\" -w -l \"%s/%s\" -D \"%s\" -o \"-p %d -b%s %s%s\" start",
                         cluster->bindir,
                         log_opts.logdir,
                         SERVER_LOG_FILE, cluster->pgconfig, cluster->port,
-                        (cluster == &new_cluster) ?
-                        " -c synchronous_commit=off -c fsync=off -c full_page_writes=off" : "",
+                        pgoptions.data,
                         cluster->pgopts ? cluster->pgopts : "", socket_string);
 
+       termPQExpBuffer(&pgoptions);
+
        /*
         * Don't throw an error right away, let connecting throw the error because
         * it might supply a reason for the failure.
diff --git a/src/bin/pg_upgrade/t/003_upgrade_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_upgrade_logical_replication_slots.pl
new file mode 100644 (file)
index 0000000..5e416f5
--- /dev/null
@@ -0,0 +1,192 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Tests for upgrading logical replication slots
+
+use strict;
+use warnings;
+
+use File::Find qw(find);
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Can be changed to test the other modes
+my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
+
+# Initialize old cluster
+my $old_publisher = PostgreSQL::Test::Cluster->new('old_publisher');
+$old_publisher->init(allows_streaming => 'logical');
+
+# Initialize new cluster
+my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
+$new_publisher->init(allows_streaming => 'logical');
+
+# Setup a pg_upgrade command. This will be used anywhere.
+my @pg_upgrade_cmd = (
+       'pg_upgrade', '--no-sync',
+       '-d', $old_publisher->data_dir,
+       '-D', $new_publisher->data_dir,
+       '-b', $old_publisher->config_data('--bindir'),
+       '-B', $new_publisher->config_data('--bindir'),
+       '-s', $new_publisher->host,
+       '-p', $old_publisher->port,
+       '-P', $new_publisher->port,
+       $mode);
+
+# ------------------------------
+# TEST: Confirm pg_upgrade fails when the new cluster has wrong GUC values
+
+# Preparations for the subsequent test:
+# 1. Create two slots on the old cluster
+$old_publisher->start;
+$old_publisher->safe_psql(
+       'postgres', qq[
+       SELECT pg_create_logical_replication_slot('test_slot1', 'test_decoding');
+       SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding');
+]);
+$old_publisher->stop();
+
+# 2. Set 'max_replication_slots' to be less than the number of slots (2)
+#       present on the old cluster.
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 1");
+
+# pg_upgrade will fail because the new cluster has insufficient
+# max_replication_slots
+command_checks_all(
+       [@pg_upgrade_cmd],
+       1,
+       [
+               qr/max_replication_slots \(1\) must be greater than or equal to the number of logical replication slots \(2\) on the old cluster/
+       ],
+       [qr//],
+       'run of pg_upgrade where the new cluster has insufficient max_replication_slots'
+);
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+       "pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Set 'max_replication_slots' to match the number of slots (2) present on the
+# old cluster. Both slots will be used for subsequent tests.
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 2");
+
+
+# ------------------------------
+# TEST: Confirm pg_upgrade fails when the slot still has unconsumed WAL records
+
+# Preparations for the subsequent test:
+# 1. Generate extra WAL records. At this point neither test_slot1 nor
+#       test_slot2 has consumed them.
+#
+# 2. Advance the slot test_slot2 up to the current WAL location, but test_slot1
+#       still has unconsumed WAL records.
+#
+# 3. Emit a non-transactional message. This will cause test_slot2 to detect the
+#       unconsumed WAL record.
+$old_publisher->start;
+$old_publisher->safe_psql(
+       'postgres', qq[
+               CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;
+               SELECT pg_replication_slot_advance('test_slot2', pg_current_wal_lsn());
+               SELECT count(*) FROM pg_logical_emit_message('false', 'prefix', 'This is a non-transactional message');
+]);
+$old_publisher->stop;
+
+# pg_upgrade will fail because there are slots still having unconsumed WAL
+# records
+command_checks_all(
+       [@pg_upgrade_cmd],
+       1,
+       [
+               qr/Your installation contains logical replication slots that can't be upgraded./
+       ],
+       [qr//],
+       'run of pg_upgrade of old cluster with slots having unconsumed WAL records'
+);
+
+# Verify the reason why the logical replication slot cannot be upgraded
+my $slots_filename;
+
+# Find a txt file that contains a list of logical replication slots that cannot
+# be upgraded. We cannot predict the file's path because the output directory
+# contains a milliseconds timestamp. File::Find::find must be used.
+find(
+       sub {
+               if ($File::Find::name =~ m/invalid_logical_replication_slots\.txt/)
+               {
+                       $slots_filename = $File::Find::name;
+               }
+       },
+       $new_publisher->data_dir . "/pg_upgrade_output.d");
+
+# Check the file content. Both slots should be reporting that they have
+# unconsumed WAL records.
+like(
+       slurp_file($slots_filename),
+       qr/The slot \"test_slot1\" has not consumed the WAL yet/m,
+       'the previous test failed due to unconsumed WALs');
+like(
+       slurp_file($slots_filename),
+       qr/The slot \"test_slot2\" has not consumed the WAL yet/m,
+       'the previous test failed due to unconsumed WALs');
+
+
+# ------------------------------
+# TEST: Successful upgrade
+
+# Preparations for the subsequent test:
+# 1. Setup logical replication (first, cleanup slots from the previous tests)
+my $old_connstr = $old_publisher->connstr . ' dbname=postgres';
+
+$old_publisher->start;
+$old_publisher->safe_psql(
+       'postgres', qq[
+       SELECT * FROM pg_drop_replication_slot('test_slot1');
+       SELECT * FROM pg_drop_replication_slot('test_slot2');
+       CREATE PUBLICATION regress_pub FOR ALL TABLES;
+]);
+
+# Initialize subscriber cluster
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init();
+
+$subscriber->start;
+$subscriber->safe_psql(
+       'postgres', qq[
+       CREATE TABLE tbl (a int);
+       CREATE SUBSCRIPTION regress_sub CONNECTION '$old_connstr' PUBLICATION regress_pub WITH (two_phase = 'true')
+]);
+$subscriber->wait_for_subscription_sync($old_publisher, 'regress_sub');
+
+# 2. Temporarily disable the subscription
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION regress_sub DISABLE");
+$old_publisher->stop;
+
+# pg_upgrade should be successful
+command_ok([@pg_upgrade_cmd], 'run of pg_upgrade of old cluster');
+
+# Check that the slot 'regress_sub' has migrated to the new cluster
+$new_publisher->start;
+my $result = $new_publisher->safe_psql('postgres',
+       "SELECT slot_name, two_phase FROM pg_replication_slots");
+is($result, qq(regress_sub|t), 'check the slot exists on new cluster');
+
+# Update the connection
+my $new_connstr = $new_publisher->connstr . ' dbname=postgres';
+$subscriber->safe_psql(
+       'postgres', qq[
+       ALTER SUBSCRIPTION regress_sub CONNECTION '$new_connstr';
+       ALTER SUBSCRIPTION regress_sub ENABLE;
+]);
+
+# Check whether changes on the new publisher get replicated to the subscriber
+$new_publisher->safe_psql('postgres',
+       "INSERT INTO tbl VALUES (generate_series(11, 20))");
+$new_publisher->wait_for_catchup('regress_sub');
+$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+is($result, qq(20), 'check changes are replicated to the subscriber');
+
+# Clean up
+$subscriber->stop();
+$new_publisher->stop();
+
+done_testing();
index 2f46fdc7391d1bdbcf99758ee4549e1acb593b5d..f9e1c0e5351fd43181002e67139768fe1797460b 100644 (file)
@@ -57,6 +57,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     202310181
+#define CATALOG_VERSION_NO     202310261
 
 #endif
index c92d0631a01110673bc3e129e1fac1124e0d4976..06435e8b925655bf9e786b1481915cb8f3f2aed5 100644 (file)
   proname => 'binary_upgrade_set_next_pg_tablespace_oid', provolatile => 'v',
   proparallel => 'u', prorettype => 'void', proargtypes => 'oid',
   prosrc => 'binary_upgrade_set_next_pg_tablespace_oid' },
+{ oid => '8046', descr => 'for use by pg_upgrade',
+  proname => 'binary_upgrade_logical_slot_has_caught_up', proisstrict => 'f',
+  provolatile => 'v', proparallel => 'u', prorettype => 'bool',
+  proargtypes => 'name',
+  prosrc => 'binary_upgrade_logical_slot_has_caught_up' },
 
 # conversion functions
 { oid => '4302',
index 5f49554ea05336dc6c17d4b4b0175ba2c73d2003..dffc0d15648bbe4c4e9f33725a5a0273de0be481 100644 (file)
@@ -109,6 +109,9 @@ typedef struct LogicalDecodingContext
        TransactionId write_xid;
        /* Are we processing the end LSN of a transaction? */
        bool            end_xact;
+
+       /* Do we need to process any change in fast_forward mode? */
+       bool            processing_required;
 } LogicalDecodingContext;
 
 
@@ -145,4 +148,6 @@ extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
 
+extern bool LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal);
+
 #endif
index 085d0d7e548855999703e41291b5385ab32b68a5..87c1aee379ff16187b71598f3d615e7e5839f6c1 100644 (file)
@@ -1504,6 +1504,8 @@ LogicalRepTyp
 LogicalRepWorker
 LogicalRepWorkerType
 LogicalRewriteMappingData
+LogicalSlotInfo
+LogicalSlotInfoArr
 LogicalTape
 LogicalTapeSet
 LsnReadQueue