pg_recvlogical: Add --failover option.
authorMasahiko Sawada <[email protected]>
Fri, 4 Apr 2025 17:39:57 +0000 (10:39 -0700)
committerMasahiko Sawada <[email protected]>
Fri, 4 Apr 2025 17:39:57 +0000 (10:39 -0700)
This new option instructs pg_recvlogical to create the logical
replication slot with the failover option enabled. It can be used in
conjunction with the --create-slot option.

Author: Hayato Kuroda <[email protected]>
Reviewed-by: Michael Banck <[email protected]>
Reviewed-by: Masahiko Sawada <[email protected]>
Discussion: https://postgr.es/m/OSCPR01MB14966C54097FC83AF19F3516BF5AC2@OSCPR01MB14966.jpnprd01.prod.outlook.com

doc/src/sgml/ref/pg_recvlogical.sgml
src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_receivewal.c
src/bin/pg_basebackup/pg_recvlogical.c
src/bin/pg_basebackup/streamutil.c
src/bin/pg_basebackup/streamutil.h
src/bin/pg_basebackup/t/030_pg_recvlogical.pl

index 2946bdae1e5e5dacb39984c82bd5a38e3f9aabb6..5166393abeb2d95d9c980277f9560b833e64d62c 100644 (file)
@@ -79,8 +79,8 @@ PostgreSQL documentation
        </para>
 
        <para>
-        The <option>--two-phase</option> can be specified with
-        <option>--create-slot</option> to enable decoding of prepared transactions.
+        The <option>--two-phase</option> and <option>--falover</option> options
+        can be specified with <option>--create-slot</option>.
        </para>
       </listitem>
      </varlistentry>
@@ -165,6 +165,16 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--failover</option></term>
+      <listitem>
+       <para>
+        Enables the slot to be synchronized to the standbys. This option may
+        only be specified with <option>--create-slot</option>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-f <replaceable>filename</replaceable></option></term>
       <term><option>--file=<replaceable>filename</replaceable></option></term>
index 1da4bfc2351e99ad1e94af1c618625909f34dfc7..eb7354200bcee772bf40a98a10515db4ba29ad6a 100644 (file)
@@ -667,7 +667,8 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier,
    if (temp_replication_slot || create_slot)
    {
        if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
-                                  temp_replication_slot, true, true, false, false))
+                                  temp_replication_slot, true, true, false,
+                                  false, false))
            exit(1);
 
        if (verbose)
index de3584018b097152d11173daebd11b7b46a44dc4..e816cf58101fbf36e27f9282840543c3c793535a 100644 (file)
@@ -889,7 +889,7 @@ main(int argc, char **argv)
            pg_log_info("creating replication slot \"%s\"", replication_slot);
 
        if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
-                                  slot_exists_ok, false))
+                                  slot_exists_ok, false, false))
            exit(1);
        exit(0);
    }
index e6158251ec181810746edbaa460e0da72428fe7f..e6810efe5f0d7acea71b0e5fbd1764a4ebefb5cc 100644 (file)
@@ -42,6 +42,7 @@ typedef enum
 static char *outfile = NULL;
 static int verbose = 0;
 static bool two_phase = false;
+static bool failover = false;
 static int noloop = 0;
 static int standby_message_timeout = 10 * 1000;    /* 10 sec = default */
 static int fsync_interval = 10 * 1000; /* 10 sec = default */
@@ -89,6 +90,8 @@ usage(void)
    printf(_("      --start            start streaming in a replication slot (for the slot's name see --slot)\n"));
    printf(_("\nOptions:\n"));
    printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
+   printf(_("      --failover         enable replication slot synchronization to standby servers when\n"
+            "                         creating a slot\n"));
    printf(_("  -f, --file=FILE        receive log into this file, - for stdout\n"));
    printf(_("  -F  --fsync-interval=SECS\n"
             "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
@@ -695,6 +698,7 @@ main(int argc, char **argv)
        {"file", required_argument, NULL, 'f'},
        {"fsync-interval", required_argument, NULL, 'F'},
        {"no-loop", no_argument, NULL, 'n'},
+       {"failover", no_argument, NULL, 5},
        {"verbose", no_argument, NULL, 'v'},
        {"two-phase", no_argument, NULL, 't'},
        {"version", no_argument, NULL, 'V'},
@@ -770,6 +774,9 @@ main(int argc, char **argv)
            case 'v':
                verbose++;
                break;
+           case 5:
+               failover = true;
+               break;
 /* connection options */
            case 'd':
                dbname = pg_strdup(optarg);
@@ -917,11 +924,21 @@ main(int argc, char **argv)
        exit(1);
    }
 
-   if (two_phase && !do_create_slot)
+   if (!do_create_slot)
    {
-       pg_log_error("--two-phase may only be specified with --create-slot");
-       pg_log_error_hint("Try \"%s --help\" for more information.", progname);
-       exit(1);
+       if (two_phase)
+       {
+           pg_log_error("--two-phase may only be specified with --create-slot");
+           pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+           exit(1);
+       }
+
+       if (failover)
+       {
+           pg_log_error("--failover may only be specified with --create-slot");
+           pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+           exit(1);
+       }
    }
 
    /*
@@ -984,7 +1001,8 @@ main(int argc, char **argv)
            pg_log_info("creating replication slot \"%s\"", replication_slot);
 
        if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
-                                  false, false, slot_exists_ok, two_phase))
+                                  false, false, slot_exists_ok, two_phase,
+                                  failover))
            exit(1);
        startpos = InvalidXLogRecPtr;
    }
index 8e605f43ffef0e23ec665d4753a2f9a37961642b..c7b8a4c3a4b6a9f6a46ca5d63e6d7fba21ab2da3 100644 (file)
@@ -583,7 +583,7 @@ GetSlotInformation(PGconn *conn, const char *slot_name,
 bool
 CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
                      bool is_temporary, bool is_physical, bool reserve_wal,
-                     bool slot_exists_ok, bool two_phase)
+                     bool slot_exists_ok, bool two_phase, bool failover)
 {
    PQExpBuffer query;
    PGresult   *res;
@@ -594,6 +594,7 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
    Assert((is_physical && plugin == NULL) ||
           (!is_physical && plugin != NULL));
    Assert(!(two_phase && is_physical));
+   Assert(!(failover && is_physical));
    Assert(slot_name != NULL);
 
    /* Build base portion of query */
@@ -616,6 +617,10 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
    }
    else
    {
+       if (failover && PQserverVersion(conn) >= 170000)
+           AppendPlainCommandOption(query, use_new_option_syntax,
+                                    "FAILOVER");
+
        if (two_phase && PQserverVersion(conn) >= 150000)
            AppendPlainCommandOption(query, use_new_option_syntax,
                                     "TWO_PHASE");
index f917c43517fefa20f226aa5a291209e2b07ce8c7..017b227303c837cd35503b1fbcc4f50cb71aa787 100644 (file)
@@ -35,7 +35,8 @@ extern PGconn *GetConnection(void);
 extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name,
                                  const char *plugin, bool is_temporary,
                                  bool is_physical, bool reserve_wal,
-                                 bool slot_exists_ok, bool two_phase);
+                                 bool slot_exists_ok, bool two_phase,
+                                 bool failover);
 extern bool DropReplicationSlot(PGconn *conn, const char *slot_name);
 extern bool RunIdentifySystem(PGconn *conn, char **sysid,
                              TimeLineID *starttli,
index 62bbc5a3f980d3beb86e4cac0064d4bfe5ce9780..c82e78847b3821d9fe13bd15a7094805d6a1348e 100644 (file)
@@ -135,4 +135,19 @@ $node->command_ok(
    ],
    'drop could work without dbname');
 
+# test with failover option enabled
+$node->command_ok(
+   [
+       'pg_recvlogical',
+       '--slot' => 'test',
+       '--dbname' => $node->connstr('postgres'),
+       '--create-slot',
+       '--failover',
+   ],
+   'slot with failover created');
+
+my $result = $node->safe_psql('postgres',
+   "SELECT failover FROM pg_catalog.pg_replication_slots WHERE slot_name = 'test'");
+is($result, 't', "failover is enabled for the new slot");
+
 done_testing();