Track conflict_reason in pg_replication_slots.
authorAmit Kapila <[email protected]>
Thu, 4 Jan 2024 02:51:51 +0000 (08:21 +0530)
committerAmit Kapila <[email protected]>
Thu, 4 Jan 2024 02:56:25 +0000 (08:26 +0530)
This patch changes the existing 'conflicting' field to 'conflict_reason'
in pg_replication_slots. This new field indicates the reason for the
logical slot's conflict with recovery. It is always NULL for physical
slots, as well as for logical slots which are not invalidated. The
non-NULL values indicate that the slot is marked as invalidated. Possible
values are:

wal_removed = required WAL has been removed.
rows_removed = required rows have been removed.
wal_level_insufficient = the primary doesn't have a wal_level sufficient
to perform logical decoding.

The existing users of 'conflicting' column can get the same answer by
using 'conflict_reason' IS NOT NULL.

Author: Shveta Malik
Reviewed-by: Amit Kapila, Bertrand Drouvot, Michael Paquier
Discussion: https://postgr.es/m/[email protected]

doc/src/sgml/system-views.sgml
src/backend/catalog/system_views.sql
src/backend/replication/slotfuncs.c
src/bin/pg_upgrade/info.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/test/recovery/t/035_standby_logical_decoding.pl
src/test/regress/expected/rules.out

index 0ef1745631833dacbb267f87e2c77cfacddc00d4..72d01fc624caa5f30acae6400918ab091ce6f785 100644 (file)
@@ -2525,11 +2525,34 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>conflicting</structfield> <type>bool</type>
+       <structfield>conflict_reason</structfield> <type>text</type>
       </para>
       <para>
-       True if this logical slot conflicted with recovery (and so is now
-       invalidated). Always NULL for physical slots.
+       The reason for the logical slot's conflict with recovery. It is always
+       NULL for physical slots, as well as for logical slots which are not
+       invalidated. The non-NULL values indicate that the slot is marked
+       as invalidated. Possible values are:
+       <itemizedlist spacing="compact">
+        <listitem>
+         <para>
+          <literal>wal_removed</literal> means that the required WAL has been
+          removed.
+         </para>
+        </listitem>
+        <listitem>
+         <para>
+          <literal>rows_removed</literal> means that the required rows have
+          been removed.
+         </para>
+        </listitem>
+        <listitem>
+         <para>
+          <literal>wal_level_insufficient</literal> means that the
+          primary doesn't have a <xref linkend="guc-wal-level"/> sufficient to
+          perform logical decoding.
+         </para>
+        </listitem>
+       </itemizedlist>
       </para></entry>
      </row>
     </tbody>
index 4899b5e0f0077dcf26018f649f2179a1977a2ddd..e43e36f5ac68dab06be0f016c3f59d51a797c95c 100644 (file)
@@ -1023,7 +1023,7 @@ CREATE VIEW pg_replication_slots AS
             L.wal_status,
             L.safe_wal_size,
             L.two_phase,
-            L.conflicting
+            L.conflict_reason
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
index 47e104782c90a92483abb5c44cdf0e8e05e41ee1..cad35dce7fc68e84ce0d46293784cac79cb303ca 100644 (file)
@@ -406,10 +406,24 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                        nulls[i++] = true;
                else
                {
-                       if (slot_contents.data.invalidated != RS_INVAL_NONE)
-                               values[i++] = BoolGetDatum(true);
-                       else
-                               values[i++] = BoolGetDatum(false);
+                       switch (slot_contents.data.invalidated)
+                       {
+                               case RS_INVAL_NONE:
+                                       nulls[i++] = true;
+                                       break;
+
+                               case RS_INVAL_WAL_REMOVED:
+                                       values[i++] = CStringGetTextDatum("wal_removed");
+                                       break;
+
+                               case RS_INVAL_HORIZON:
+                                       values[i++] = CStringGetTextDatum("rows_removed");
+                                       break;
+
+                               case RS_INVAL_WAL_LEVEL:
+                                       values[i++] = CStringGetTextDatum("wal_level_insufficient");
+                                       break;
+                       }
                }
 
                Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
index c52501c8fb63919d993866428039c6ca3d1577f6..190dd53a427a7bafbd24cd2fff4697c13548f8e8 100644 (file)
@@ -667,13 +667,13 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
         * removed.
         */
        res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, "
-                                                       "%s as caught_up, conflicting as invalid "
+                                                       "%s as caught_up, conflict_reason IS NOT NULL as invalid "
                                                        "FROM pg_catalog.pg_replication_slots "
                                                        "WHERE slot_type = 'logical' AND "
                                                        "database = current_database() AND "
                                                        "temporary IS FALSE;",
                                                        live_check ? "FALSE" :
-                                                       "(CASE WHEN conflicting THEN FALSE "
+                                                       "(CASE WHEN conflict_reason IS NOT NULL THEN FALSE "
                                                        "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
                                                        "END)");
 
index 6bbe0f1af1e1499e6edc7f5b6a3ff535f3f8ad53..686667a0f88b808066cb57f538ac72d34f55bfcd 100644 (file)
@@ -57,6 +57,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     202401021
+#define CATALOG_VERSION_NO     202401041
 
 #endif
index 14b1dfacab33dc6e7ea4c7aac80d7c311db82779..7979392776d467dd8ca6514e215dd29a3c851aec 100644 (file)
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text}',
   proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
index 3231538418b5d149fbb8a8e9dd5797a9c76d3220..8bc39a5f0337d4f01b90834090e235d76ad1d4f4 100644 (file)
@@ -168,27 +168,25 @@ sub change_hot_standby_feedback_and_wait_for_xmins
        }
 }
 
-# Check conflicting status in pg_replication_slots.
-sub check_slots_conflicting_status
+# Check conflict_reason in pg_replication_slots.
+sub check_slots_conflict_reason
 {
-       my ($conflicting) = @_;
+       my ($slot_prefix, $reason) = @_;
 
-       if ($conflicting)
-       {
-               $res = $node_standby->safe_psql(
-                       'postgres', qq(
-                                select bool_and(conflicting) from pg_replication_slots;));
+       my $active_slot = $slot_prefix . 'activeslot';
+       my $inactive_slot = $slot_prefix . 'inactiveslot';
 
-               is($res, 't', "Logical slots are reported as conflicting");
-       }
-       else
-       {
-               $res = $node_standby->safe_psql(
-                       'postgres', qq(
-                               select bool_or(conflicting) from pg_replication_slots;));
+       $res = $node_standby->safe_psql(
+               'postgres', qq(
+                        select conflict_reason from pg_replication_slots where slot_name = '$active_slot';));
 
-               is($res, 'f', "Logical slots are reported as non conflicting");
-       }
+       is($res, "$reason", "$active_slot conflict_reason is $reason");
+
+       $res = $node_standby->safe_psql(
+               'postgres', qq(
+                        select conflict_reason from pg_replication_slots where slot_name = '$inactive_slot';));
+
+       is($res, "$reason", "$inactive_slot conflict_reason is $reason");
 }
 
 # Drop the slots, re-create them, change hot_standby_feedback,
@@ -260,13 +258,13 @@ $node_primary->safe_psql('testdb',
        qq[SELECT * FROM pg_create_physical_replication_slot('$primary_slotname');]
 );
 
-# Check conflicting is NULL for physical slot
+# Check conflict_reason is NULL for physical slot
 $res = $node_primary->safe_psql(
        'postgres', qq[
-                SELECT conflicting is null FROM pg_replication_slots where slot_name = '$primary_slotname';]
+                SELECT conflict_reason is null FROM pg_replication_slots where slot_name = '$primary_slotname';]
 );
 
-is($res, 't', "Physical slot reports conflicting as NULL");
+is($res, 't', "Physical slot reports conflict_reason as NULL");
 
 my $backup_name = 'b1';
 $node_primary->backup($backup_name);
@@ -483,8 +481,8 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('vacuum_full_', 1, 'with vacuum FULL on pg_class');
 
-# Verify slots are reported as conflicting in pg_replication_slots
-check_slots_conflicting_status(1);
+# Verify conflict_reason is 'rows_removed' in pg_replication_slots
+check_slots_conflict_reason('vacuum_full_', 'rows_removed');
 
 $handle =
   make_slot_active($node_standby, 'vacuum_full_', 0, \$stdout, \$stderr);
@@ -502,8 +500,8 @@ change_hot_standby_feedback_and_wait_for_xmins(1, 1);
 ##################################################
 $node_standby->restart;
 
-# Verify slots are reported as conflicting in pg_replication_slots
-check_slots_conflicting_status(1);
+# Verify conflict_reason is retained across a restart.
+check_slots_conflict_reason('vacuum_full_', 'rows_removed');
 
 ##################################################
 # Verify that invalidated logical slots do not lead to retaining WAL.
@@ -511,7 +509,7 @@ check_slots_conflicting_status(1);
 
 # Get the restart_lsn from an invalidated slot
 my $restart_lsn = $node_standby->safe_psql('postgres',
-       "SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'vacuum_full_activeslot' and conflicting is true;"
+       "SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'vacuum_full_activeslot' and conflict_reason is not null;"
 );
 
 chomp($restart_lsn);
@@ -565,8 +563,8 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('row_removal_', $logstart, 'with vacuum on pg_class');
 
-# Verify slots are reported as conflicting in pg_replication_slots
-check_slots_conflicting_status(1);
+# Verify conflict_reason is 'rows_removed' in pg_replication_slots
+check_slots_conflict_reason('row_removal_', 'rows_removed');
 
 $handle =
   make_slot_active($node_standby, 'row_removal_', 0, \$stdout, \$stderr);
@@ -604,8 +602,8 @@ $node_primary->wait_for_replay_catchup($node_standby);
 check_for_invalidation('shared_row_removal_', $logstart,
        'with vacuum on pg_authid');
 
-# Verify slots are reported as conflicting in pg_replication_slots
-check_slots_conflicting_status(1);
+# Verify conflict_reason is 'rows_removed' in pg_replication_slots
+check_slots_conflict_reason('shared_row_removal_', 'rows_removed');
 
 $handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout,
        \$stderr);
@@ -657,7 +655,13 @@ ok( $node_standby->poll_query_until(
 ) or die "Timed out waiting confl_active_logicalslot to be updated";
 
 # Verify slots are reported as non conflicting in pg_replication_slots
-check_slots_conflicting_status(0);
+is( $node_standby->safe_psql(
+               'postgres',
+               q[select bool_or(conflicting) from
+                 (select conflict_reason is not NULL as conflicting
+                  from pg_replication_slots WHERE slot_type = 'logical')]),
+       'f',
+       'Logical slots are reported as non conflicting');
 
 # Turn hot_standby_feedback back on
 change_hot_standby_feedback_and_wait_for_xmins(1, 0);
@@ -693,8 +697,8 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('pruning_', $logstart, 'with on-access pruning');
 
-# Verify slots are reported as conflicting in pg_replication_slots
-check_slots_conflicting_status(1);
+# Verify conflict_reason is 'rows_removed' in pg_replication_slots
+check_slots_conflict_reason('pruning_', 'rows_removed');
 
 $handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr);
 
@@ -737,8 +741,8 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('wal_level_', $logstart, 'due to wal_level');
 
-# Verify slots are reported as conflicting in pg_replication_slots
-check_slots_conflicting_status(1);
+# Verify conflict_reason is 'wal_level_insufficient' in pg_replication_slots
+check_slots_conflict_reason('wal_level_', 'wal_level_insufficient');
 
 $handle =
   make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
index f645e8486bf38dae7a07af0a5bcd636d8b7797d3..d878a971df92bd5c2b3bf688e3cdfc66cdabfe34 100644 (file)
@@ -1473,8 +1473,8 @@ pg_replication_slots| SELECT l.slot_name,
     l.wal_status,
     l.safe_wal_size,
     l.two_phase,
-    l.conflicting
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting)
+    l.conflict_reason
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,