Support invalidating replication slots due to horizon and wal_level
authorAndres Freund <[email protected]>
Sat, 8 Apr 2023 05:40:27 +0000 (22:40 -0700)
committerAndres Freund <[email protected]>
Sat, 8 Apr 2023 05:40:27 +0000 (22:40 -0700)
Needed for logical decoding on a standby. Slots need to be invalidated because
of the horizon if rows required for logical decoding are removed. If the
primary's wal_level is lowered from 'logical', logical slots on the standby
need to be invalidated.

The new invalidation methods will be used in a subsequent commit.

Logical slots that have been invalidated can be identified via the new
pg_replication_slots.conflicting column.

See 6af1793954e for an overall design of logical decoding on a standby.

Bumps catversion for the addition of the new pg_replication_slots column.

Author: "Drouvot, Bertrand" <[email protected]>
Author: Andres Freund <[email protected]>
Author: Amit Khandekar <[email protected]> (in an older version)
Reviewed-by: "Drouvot, Bertrand" <[email protected]>
Reviewed-by: Andres Freund <[email protected]>
Reviewed-by: Robert Haas <[email protected]>
Reviewed-by: Fabrízio de Royes Mello <[email protected]>
Reviewed-by: Bharath Rupireddy <[email protected]>
Reviewed-by: Amit Kapila <[email protected]>
Reviewed-by: Melanie Plageman <[email protected]>
Reviewed-by: Alvaro Herrera <[email protected]>
Discussion: https://postgr.es/m/20230407075009[email protected]

doc/src/sgml/system-views.sgml
src/backend/access/transam/xlog.c
src/backend/catalog/system_views.sql
src/backend/replication/logical/logical.c
src/backend/replication/slot.c
src/backend/replication/slotfuncs.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/replication/slot.h
src/test/regress/expected/rules.out

index bb1a4184508dd8f93ecff4ae1ea7f26609b7773c..57b228076e8ec6539daeadef7a9681253372443e 100644 (file)
@@ -2517,6 +2517,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        false for physical slots.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>conflicting</structfield> <type>bool</type>
+      </para>
+      <para>
+       True if this logical slot conflicted with recovery (and so is now
+       invalidated). Always NULL for physical slots.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
index 18e16ae5b3e9b4e949231f779177852405ce6beb..23903445293453d9ca5782c2800ea1f765cbdb63 100644 (file)
@@ -6809,7 +6809,9 @@ CreateCheckPoint(int flags)
     */
    XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
    KeepLogSeg(recptr, &_logSegNo);
-   if (InvalidateObsoleteReplicationSlots(_logSegNo))
+   if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED,
+                                          _logSegNo, InvalidOid,
+                                          InvalidTransactionId))
    {
        /*
         * Some slots have been invalidated; recalculate the old-segment
@@ -7253,7 +7255,9 @@ CreateRestartPoint(int flags)
    replayPtr = GetXLogReplayRecPtr(&replayTLI);
    endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
    KeepLogSeg(endptr, &_logSegNo);
-   if (InvalidateObsoleteReplicationSlots(_logSegNo))
+   if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED,
+                                          _logSegNo, InvalidOid,
+                                          InvalidTransactionId))
    {
        /*
         * Some slots have been invalidated; recalculate the old-segment
index c123f1098965502110c9ecf46d5ae40d3835e055..ff69983f2eafe53be627ec12747a3464e1d6dcc1 100644 (file)
@@ -1000,7 +1000,8 @@ CREATE VIEW pg_replication_slots AS
             L.confirmed_flush_lsn,
             L.wal_status,
             L.safe_wal_size,
-            L.two_phase
+            L.two_phase,
+            L.conflicting
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
index 6082d222d5d4d0ca51d2bceb6f43aabbd63d410e..6ecea3c49c596b85f9172ff53d1c0e42c386e3a3 100644 (file)
@@ -531,6 +531,13 @@ CreateDecodingContext(XLogRecPtr start_lsn,
                        NameStr(MyReplicationSlot->data.name)),
                 errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
 
+   if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
+       ereport(ERROR,
+               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                errmsg("can no longer get changes from replication slot \"%s\"",
+                       NameStr(MyReplicationSlot->data.name)),
+                errdetail("This slot has been invalidated because it was conflicting with recovery.")));
+
    Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
    Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
 
index f969f7c083f97c2fb8f8af17763332e8d639f4ea..4d0421c5ed181556509cc16abfa1b64e4ad1af76 100644 (file)
@@ -1241,8 +1241,58 @@ ReplicationSlotReserveWal(void)
 }
 
 /*
- * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
- * and mark it invalid, if necessary and possible.
+ * Report that replication slot needs to be invalidated
+ */
+static void
+ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
+                      bool terminating,
+                      int pid,
+                      NameData slotname,
+                      XLogRecPtr restart_lsn,
+                      XLogRecPtr oldestLSN,
+                      TransactionId snapshotConflictHorizon)
+{
+   StringInfoData err_detail;
+   bool        hint = false;
+
+   initStringInfo(&err_detail);
+
+   switch (cause)
+   {
+       case RS_INVAL_WAL_REMOVED:
+           hint = true;
+           appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes."),
+                            LSN_FORMAT_ARGS(restart_lsn),
+                            (unsigned long long) (oldestLSN - restart_lsn));
+           break;
+       case RS_INVAL_HORIZON:
+           appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
+                            snapshotConflictHorizon);
+           break;
+
+       case RS_INVAL_WAL_LEVEL:
+           appendStringInfo(&err_detail, _("Logical decoding on standby requires wal_level to be at least logical on the primary server"));
+           break;
+       case RS_INVAL_NONE:
+           pg_unreachable();
+   }
+
+   ereport(LOG,
+           terminating ?
+           errmsg("terminating process %d to release replication slot \"%s\"",
+                  pid, NameStr(slotname)) :
+           errmsg("invalidating obsolete replication slot \"%s\"",
+                  NameStr(slotname)),
+           errdetail_internal("%s", err_detail.data),
+           hint ? errhint("You might need to increase max_slot_wal_keep_size.") : 0);
+
+   pfree(err_detail.data);
+}
+
+/*
+ * Helper for InvalidateObsoleteReplicationSlots
+ *
+ * Acquires the given slot and mark it invalid, if necessary and possible.
  *
  * Returns whether ReplicationSlotControlLock was released in the interim (and
  * in that case we're not holding the lock at return, otherwise we are).
@@ -1253,7 +1303,10 @@ ReplicationSlotReserveWal(void)
  * for syscalls, so caller must restart if we return true.
  */
 static bool
-InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
+InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
+                              ReplicationSlot *s,
+                              XLogRecPtr oldestLSN,
+                              Oid dboid, TransactionId snapshotConflictHorizon,
                               bool *invalidated)
 {
    int         last_signaled_pid = 0;
@@ -1264,6 +1317,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
        XLogRecPtr  restart_lsn;
        NameData    slotname;
        int         active_pid = 0;
+       ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE;
 
        Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
 
@@ -1286,10 +1340,44 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
        restart_lsn = s->data.restart_lsn;
 
        /*
-        * If the slot is already invalid or is fresh enough, we don't need to
-        * do anything.
+        * If the slot is already invalid or is a non conflicting slot, we
+        * don't need to do anything.
         */
-       if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
+       if (s->data.invalidated == RS_INVAL_NONE)
+       {
+           switch (cause)
+           {
+               case RS_INVAL_WAL_REMOVED:
+                   if (s->data.restart_lsn != InvalidXLogRecPtr &&
+                       s->data.restart_lsn < oldestLSN)
+                       conflict = cause;
+                   break;
+               case RS_INVAL_HORIZON:
+                   if (!SlotIsLogical(s))
+                       break;
+                   /* invalid DB oid signals a shared relation */
+                   if (dboid != InvalidOid && dboid != s->data.database)
+                       break;
+                   if (TransactionIdIsValid(s->effective_xmin) &&
+                       TransactionIdPrecedesOrEquals(s->effective_xmin,
+                                                     snapshotConflictHorizon))
+                       conflict = cause;
+                   else if (TransactionIdIsValid(s->effective_catalog_xmin) &&
+                            TransactionIdPrecedesOrEquals(s->effective_catalog_xmin,
+                                                          snapshotConflictHorizon))
+                       conflict = cause;
+                   break;
+               case RS_INVAL_WAL_LEVEL:
+                   if (SlotIsLogical(s))
+                       conflict = cause;
+                   break;
+               case RS_INVAL_NONE:
+                   pg_unreachable();
+           }
+       }
+
+       /* if there's no conflict, we're done */
+       if (conflict == RS_INVAL_NONE)
        {
            SpinLockRelease(&s->mutex);
            if (released_lock)
@@ -1309,13 +1397,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
        {
            MyReplicationSlot = s;
            s->active_pid = MyProcPid;
-           s->data.invalidated = RS_INVAL_WAL_REMOVED;
+           s->data.invalidated = conflict;
 
            /*
             * XXX: We should consider not overwriting restart_lsn and instead
             * just rely on .invalidated.
             */
-           s->data.restart_lsn = InvalidXLogRecPtr;
+           if (conflict == RS_INVAL_WAL_REMOVED)
+               s->data.restart_lsn = InvalidXLogRecPtr;
 
            /* Let caller know */
            *invalidated = true;
@@ -1349,13 +1438,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
             */
            if (last_signaled_pid != active_pid)
            {
-               ereport(LOG,
-                       errmsg("terminating process %d to release replication slot \"%s\"",
-                              active_pid, NameStr(slotname)),
-                       errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
-                                 LSN_FORMAT_ARGS(restart_lsn),
-                                 (unsigned long long) (oldestLSN - restart_lsn)),
-                       errhint("You might need to increase max_slot_wal_keep_size."));
+               ReportSlotInvalidation(conflict, true, active_pid,
+                                      slotname, restart_lsn,
+                                      oldestLSN, snapshotConflictHorizon);
 
                (void) kill(active_pid, SIGTERM);
                last_signaled_pid = active_pid;
@@ -1390,14 +1475,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
            ReplicationSlotMarkDirty();
            ReplicationSlotSave();
            ReplicationSlotRelease();
+           pgstat_drop_replslot(s);
 
-           ereport(LOG,
-                   errmsg("invalidating obsolete replication slot \"%s\"",
-                          NameStr(slotname)),
-                   errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
-                             LSN_FORMAT_ARGS(restart_lsn),
-                             (unsigned long long) (oldestLSN - restart_lsn)),
-                   errhint("You might need to increase max_slot_wal_keep_size."));
+           ReportSlotInvalidation(conflict, false, active_pid,
+                                  slotname, restart_lsn,
+                                  oldestLSN, snapshotConflictHorizon);
 
            /* done with this slot for now */
            break;
@@ -1410,19 +1492,34 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 }
 
 /*
- * Mark any slot that points to an LSN older than the given segment
- * as invalid; it requires WAL that's about to be removed.
+ * Invalidate slots that require resources about to be removed.
  *
  * Returns true when any slot have got invalidated.
  *
+ * Whether a slot needs to be invalidated depends on the cause. A slot is
+ * removed if it:
+ * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
+ * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
+ *   db; dboid may be InvalidOid for shared relations
+ * - RS_INVAL_WAL_LEVEL: is logical
+ *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
 bool
-InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
+InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
+                                  XLogSegNo oldestSegno, Oid dboid,
+                                  TransactionId snapshotConflictHorizon)
 {
    XLogRecPtr  oldestLSN;
    bool        invalidated = false;
 
+   Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon));
+   Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0);
+   Assert(cause != RS_INVAL_NONE);
+
+   if (max_replication_slots == 0)
+       return invalidated;
+
    XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
 
 restart:
@@ -1434,7 +1531,9 @@ restart:
        if (!s->in_use)
            continue;
 
-       if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
+       if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
+                                          snapshotConflictHorizon,
+                                          &invalidated))
        {
            /* if the lock was released, start from scratch */
            goto restart;
index ad3e72be5ee2634983ab39804843aa48a9e2194e..6035cf481600a230a3d66e1aacabaea17ce8afe8 100644 (file)
@@ -232,7 +232,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 14
+#define PG_GET_REPLICATION_SLOTS_COLS 15
    ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    XLogRecPtr  currlsn;
    int         slotno;
@@ -402,6 +402,16 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
        values[i++] = BoolGetDatum(slot_contents.data.two_phase);
 
+       if (slot_contents.data.database == InvalidOid)
+           nulls[i++] = true;
+       else
+       {
+           if (slot_contents.data.invalidated != RS_INVAL_NONE)
+               values[i++] = BoolGetDatum(true);
+           else
+               values[i++] = BoolGetDatum(false);
+       }
+
        Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
 
        tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
index e32d50f3865b26f24e83653999ec376adb6a8ab6..dabe23bbeb0277f04fe7a8c5e3f854ad7627bc09 100644 (file)
@@ -57,6 +57,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202304072
+#define CATALOG_VERSION_NO 202304073
 
 #endif
index 6291d76a4c18c98ff4013e9fb92c018fe1a693a0..0e9ce5215bbbc4df347408f727d32e9d79cd613f 100644 (file)
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
index 34ce055dd503d4a59c52065cd52d55806f48fec3..a8a89dc7844eeff674e7c84e3687141b59129f45 100644 (file)
@@ -46,6 +46,10 @@ typedef enum ReplicationSlotInvalidationCause
    RS_INVAL_NONE,
    /* required WAL has been removed */
    RS_INVAL_WAL_REMOVED,
+   /* required rows have been removed */
+   RS_INVAL_HORIZON,
+   /* wal_level insufficient for slot */
+   RS_INVAL_WAL_LEVEL,
 } ReplicationSlotInvalidationCause;
 
 /*
@@ -226,7 +230,10 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
-extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
+extern bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
+                                              XLogSegNo oldestSegno,
+                                              Oid dboid,
+                                              TransactionId snapshotConflictHorizon);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
 extern int ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
index 8337bac5dbe0433ce09de17dd2e48f6e8c560576..3d2405272a9436e513deb60b49274a008ef316a5 100644 (file)
@@ -1472,8 +1472,9 @@ pg_replication_slots| SELECT l.slot_name,
     l.confirmed_flush_lsn,
     l.wal_status,
     l.safe_wal_size,
-    l.two_phase
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase)
+    l.two_phase,
+    l.conflicting
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,