Allow users to limit storage reserved by replication slots
authorAlvaro Herrera <[email protected]>
Tue, 7 Apr 2020 22:35:00 +0000 (18:35 -0400)
committerAlvaro Herrera <[email protected]>
Tue, 7 Apr 2020 22:35:00 +0000 (18:35 -0400)
Replication slots are useful to retain data that may be needed by a
replication system.  But experience has shown that allowing them to
retain excessive data can lead to the primary failing because of running
out of space.  This new feature allows the user to configure a maximum
amount of space to be reserved using the new option
max_slot_wal_keep_size.  Slots that overrun that space are invalidated
at checkpoint time, enabling the storage to be released.

Author: Kyotaro HORIGUCHI <[email protected]>
Reviewed-by: Masahiko Sawada <[email protected]>
Reviewed-by: Jehan-Guillaume de Rorthais <[email protected]>
Reviewed-by: Álvaro Herrera <[email protected]>
Discussion: https://postgr.es/m/20170228.122736.123383594[email protected]

17 files changed:
doc/src/sgml/catalogs.sgml
doc/src/sgml/config.sgml
doc/src/sgml/high-availability.sgml
src/backend/access/transam/xlog.c
src/backend/catalog/system_views.sql
src/backend/replication/logical/logicalfuncs.c
src/backend/replication/slot.c
src/backend/replication/slotfuncs.c
src/backend/replication/walsender.c
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/include/access/xlog.h
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/replication/slot.h
src/test/recovery/t/019_replslot_limit.pl [new file with mode: 0644]
src/test/regress/expected/rules.out

index 64614b569c80a5643963a89c8c3f6a3431000d8d..0d61d98b1153ea9772634e8b459706f072996896 100644 (file)
@@ -9907,6 +9907,44 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       </entry>
      </row>
 
+     <row>
+      <entry><structfield>wal_status</structfield></entry>
+      <entry><type>text</type></entry>
+      <entry></entry>
+
+      <entry>Availability of WAL files claimed by this slot.
+      Possible values are:
+       <simplelist>
+        <member>
+         <literal>normal</literal> means that the claimed files
+         are within <varname>max_wal_size</varname>
+        </member>
+        <member>
+         <literal>reserved</literal> means that <varname>max_wal_size</varname>
+         is exceeded but the files are still held, either by some replication
+         slot or by <varname>wal_keep_segments</varname>
+        </member>
+        <member>
+         <literal>lost</literal> means that some WAL files are definitely lost
+         and this slot cannot be used to resume replication anymore.
+        </member>
+       </simplelist>
+      The last two states are seen only when
+      <xref linkend="guc-max-slot-wal-keep-size"/> is
+      non-negative. If <structfield>restart_lsn</structfield> is NULL, this
+      field is null.
+      </entry>
+     </row>
+
+     <row>
+      <entry><structfield>min_safe_lsn</structfield></entry>
+      <entry><type>pg_lsn</type></entry>
+      <entry></entry>
+      <entry>
+       The minimum LSN currently available for walsenders.
+      </entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
index f68c9922136722d9de9e96bd056baad6cb110b98..095b3668b85ce9316dca1370766366accfd5d472 100644 (file)
@@ -3777,6 +3777,29 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+      <varlistentry id="guc-max-slot-wal-keep-size" xreflabel="max_slot_wal_keep_size">
+       <term><varname>max_slot_wal_keep_size</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>max_slot_wal_keep_size</varname> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+       <para>
+        Specify the maximum size of WAL files
+        that <link linkend="streaming-replication-slots">replication
+        slots</link> are allowed to retain in the <filename>pg_wal</filename>
+        directory at checkpoint time.
+        If <varname>max_slot_wal_keep_size</varname> is -1 (the default),
+        replication slots retain unlimited amount of WAL files.  If
+        restart_lsn of a replication slot gets behind more than that megabytes
+        from the current LSN, the standby using the slot may no longer be able
+        to continue replication due to removal of required WAL files. You
+        can see the WAL availability of replication slots
+        in <link linkend="view-pg-replication-slots">pg_replication_slots</link>.
+       </para>
+       </listitem>
+      </varlistentry>
+
      <varlistentry id="guc-wal-sender-timeout" xreflabel="wal_sender_timeout">
       <term><varname>wal_sender_timeout</varname> (<type>integer</type>)
       <indexterm>
index bb5d9962ed337d274b78a1a2d09ec37751654b2c..4659b9ef5dbc1f7b244fc512070bc60566703b67 100644 (file)
@@ -925,9 +925,11 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
     <xref linkend="guc-archive-command"/>.
     However, these methods often result in retaining more WAL segments than
     required, whereas replication slots retain only the number of segments
-    known to be needed.  An advantage of these methods is that they bound
-    the space requirement for <literal>pg_wal</literal>; there is currently no way
-    to do this using replication slots.
+    known to be needed.  On the other hand, replication slots can retain so
+    many WAL segments that they fill up the space allocated
+    for <literal>pg_wal</literal>;
+    <xref linkend="guc-max-slot-wal-keep-size"/> limits the size of WAL files
+    retained by replication slots.
    </para>
    <para>
     Similarly, <xref linkend="guc-hot-standby-feedback"/>
index 1651e15e8988cd2cbc2fc793cd5dc13b706fe1e2..8a4c1743e55ad0f4030f3355c44ca7557931f812 100644 (file)
@@ -108,6 +108,7 @@ int                 wal_level = WAL_LEVEL_MINIMAL;
 int                    CommitDelay = 0;        /* precommit delay in microseconds */
 int                    CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int                    wal_retrieve_retry_interval = 5000;
+int                    max_slot_wal_keep_size_mb = -1;
 
 #ifdef WAL_DEBUG
 bool           XLOG_DEBUG = false;
@@ -759,7 +760,7 @@ static ControlFileData *ControlFile = NULL;
  */
 #define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD)
 
-/* Convert min_wal_size_mb and max_wal_size_mb to equivalent segment count */
+/* Convert values of GUCs measured in megabytes to equiv. segment count */
 #define ConvertToXSegs(x, segsize)     \
        (x / ((segsize) / (1024 * 1024)))
 
@@ -3963,9 +3964,10 @@ XLogGetLastRemovedSegno(void)
        return lastRemovedSegNo;
 }
 
+
 /*
- * Update the last removed segno pointer in shared memory, to reflect
- * that the given XLOG file has been removed.
+ * Update the last removed segno pointer in shared memory, to reflect that the
+ * given XLOG file has been removed.
  */
 static void
 UpdateLastRemovedPtr(char *filename)
@@ -9043,6 +9045,7 @@ CreateCheckPoint(int flags)
         */
        XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
        KeepLogSeg(recptr, &_logSegNo);
+       InvalidateObsoleteReplicationSlots(_logSegNo);
        _logSegNo--;
        RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
 
@@ -9377,6 +9380,7 @@ CreateRestartPoint(int flags)
        replayPtr = GetXLogReplayRecPtr(&replayTLI);
        endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
        KeepLogSeg(endptr, &_logSegNo);
+       InvalidateObsoleteReplicationSlots(_logSegNo);
        _logSegNo--;
 
        /*
@@ -9445,48 +9449,143 @@ CreateRestartPoint(int flags)
        return true;
 }
 
+/*
+ * Report availability of WAL for the given target LSN
+ *             (typically a slot's restart_lsn)
+ *
+ * Returns one of the following enum values:
+ * * WALAVAIL_NORMAL means targetLSN is available because it is in the range
+ *   of max_wal_size.
+ *
+ * * WALAVAIL_PRESERVED means it is still available by preserving extra
+ *   segments beyond max_wal_size. If max_slot_wal_keep_size is smaller
+ *   than max_wal_size, this state is not returned.
+ *
+ * * WALAVAIL_REMOVED means it is definitely lost. A replication stream on
+ *   a slot with this LSN cannot continue.
+ *
+ * * WALAVAIL_INVALID_LSN means the slot hasn't been set to reserve WAL.
+ */
+WALAvailability
+GetWALAvailability(XLogRecPtr targetLSN)
+{
+       XLogRecPtr      currpos;                /* current write LSN */
+       XLogSegNo       currSeg;                /* segid of currpos */
+       XLogSegNo       targetSeg;              /* segid of targetLSN */
+       XLogSegNo       oldestSeg;              /* actual oldest segid */
+       XLogSegNo       oldestSegMaxWalSize;    /* oldest segid kept by max_wal_size */
+       XLogSegNo       oldestSlotSeg = InvalidXLogRecPtr;      /* oldest segid kept by
+                                                                                                        * slot */
+       uint64          keepSegs;
+
+       /* slot does not reserve WAL. Either deactivated, or has never been active */
+       if (XLogRecPtrIsInvalid(targetLSN))
+               return WALAVAIL_INVALID_LSN;
+
+       currpos = GetXLogWriteRecPtr();
+
+       /* calculate oldest segment currently needed by slots */
+       XLByteToSeg(targetLSN, targetSeg, wal_segment_size);
+       KeepLogSeg(currpos, &oldestSlotSeg);
+
+       /*
+        * Find the oldest extant segment file. We get 1 until checkpoint removes
+        * the first WAL segment file since startup, which causes the status being
+        * wrong under certain abnormal conditions but that doesn't actually harm.
+        */
+       oldestSeg = XLogGetLastRemovedSegno() + 1;
+
+       /* calculate oldest segment by max_wal_size and wal_keep_segments */
+       XLByteToSeg(currpos, currSeg, wal_segment_size);
+       keepSegs = ConvertToXSegs(Max(max_wal_size_mb, wal_keep_segments),
+                                                         wal_segment_size) + 1;
+
+       if (currSeg > keepSegs)
+               oldestSegMaxWalSize = currSeg - keepSegs;
+       else
+               oldestSegMaxWalSize = 1;
+
+       /*
+        * If max_slot_wal_keep_size has changed after the last call, the segment
+        * that would been kept by the current setting might have been lost by the
+        * previous setting. No point in showing normal or keeping status values
+        * if the targetSeg is known to be lost.
+        */
+       if (targetSeg >= oldestSeg)
+       {
+               /*
+                * show "normal" when targetSeg is within max_wal_size, even if
+                * max_slot_wal_keep_size is smaller than max_wal_size.
+                */
+               if ((max_slot_wal_keep_size_mb <= 0 ||
+                        max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
+                       oldestSegMaxWalSize <= targetSeg)
+                       return WALAVAIL_NORMAL;
+
+               /* being retained by slots */
+               if (oldestSlotSeg <= targetSeg)
+                       return WALAVAIL_RESERVED;
+       }
+
+       /* Definitely lost */
+       return WALAVAIL_REMOVED;
+}
+
+
 /*
  * Retreat *logSegNo to the last segment that we need to retain because of
  * either wal_keep_segments or replication slots.
  *
  * This is calculated by subtracting wal_keep_segments from the given xlog
  * location, recptr and by making sure that that result is below the
- * requirement of replication slots.
+ * requirement of replication slots.  For the latter criterion we do consider
+ * the effects of max_slot_wal_keep_size: reserve at most that much space back
+ * from recptr.
  */
 static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 {
+       XLogSegNo       currSegNo;
        XLogSegNo       segno;
        XLogRecPtr      keep;
 
-       XLByteToSeg(recptr, segno, wal_segment_size);
-       keep = XLogGetReplicationSlotMinimumLSN();
+       XLByteToSeg(recptr, currSegNo, wal_segment_size);
+       segno = currSegNo;
 
-       /* compute limit for wal_keep_segments first */
-       if (wal_keep_segments > 0)
+       /*
+        * Calculate how many segments are kept by slots first, adjusting for
+        * max_slot_wal_keep_size.
+        */
+       keep = XLogGetReplicationSlotMinimumLSN();
+       if (keep != InvalidXLogRecPtr)
        {
-               /* avoid underflow, don't go below 1 */
-               if (segno <= wal_keep_segments)
-                       segno = 1;
-               else
-                       segno = segno - wal_keep_segments;
-       }
+               XLByteToSeg(keep, segno, wal_segment_size);
 
-       /* then check whether slots limit removal further */
-       if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
-       {
-               XLogSegNo       slotSegNo;
+               /* Cap by max_slot_wal_keep_size ... */
+               if (max_slot_wal_keep_size_mb >= 0)
+               {
+                       XLogRecPtr      slot_keep_segs;
 
-               XLByteToSeg(keep, slotSegNo, wal_segment_size);
+                       slot_keep_segs =
+                               ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
 
-               if (slotSegNo <= 0)
+                       if (currSegNo - segno > slot_keep_segs)
+                               segno = currSegNo - slot_keep_segs;
+               }
+       }
+
+       /* but, keep at least wal_keep_segments if that's set */
+       if (wal_keep_segments > 0 && currSegNo - segno < wal_keep_segments)
+       {
+               /* avoid underflow, don't go below 1 */
+               if (currSegNo <= wal_keep_segments)
                        segno = 1;
-               else if (slotSegNo < segno)
-                       segno = slotSegNo;
+               else
+                       segno = currSegNo - wal_keep_segments;
        }
 
        /* don't delete WAL segments newer than the calculated segment */
-       if (segno < *logSegNo)
+       if (XLogRecPtrIsInvalid(*logSegNo) || segno < *logSegNo)
                *logSegNo = segno;
 }
 
index 813ea8bfc3b1467d030058a9ffd6e3d9603fa1da..d406ea8118c0f0a37408ea32de18d12fc76471fc 100644 (file)
@@ -876,7 +876,9 @@ CREATE VIEW pg_replication_slots AS
             L.xmin,
             L.catalog_xmin,
             L.restart_lsn,
-            L.confirmed_flush_lsn
+            L.confirmed_flush_lsn,
+            L.wal_status,
+            L.min_safe_lsn
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
index 04510094a80c80f1273ce9f5d74c220ba380dae9..f5384f1df8c9f639a6be51c8bfb3434b6e95a9d4 100644 (file)
@@ -225,7 +225,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
        else
                end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
 
-       ReplicationSlotAcquire(NameStr(*name), true);
+       (void) ReplicationSlotAcquire(NameStr(*name), SAB_Error);
 
        PG_TRY();
        {
index 47851ec4c1a9945db2f8366d634ccc0fbd16d81e..abae74c9a59bf2041bf8c722f52524a77fbd29ae 100644 (file)
@@ -325,9 +325,15 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 
 /*
  * Find a previously created slot and mark it as used by this backend.
+ *
+ * The return value is only useful if behavior is SAB_Inquire, in which
+ * it's zero if we successfully acquired the slot, or the PID of the
+ * owning process otherwise.  If behavior is SAB_Error, then trying to
+ * acquire an owned slot is an error.  If SAB_Block, we sleep until the
+ * slot is released by the owning process.
  */
-void
-ReplicationSlotAcquire(const char *name, bool nowait)
+int
+ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
 {
        ReplicationSlot *slot;
        int                     active_pid;
@@ -392,11 +398,13 @@ retry:
         */
        if (active_pid != MyProcPid)
        {
-               if (nowait)
+               if (behavior == SAB_Error)
                        ereport(ERROR,
                                        (errcode(ERRCODE_OBJECT_IN_USE),
                                         errmsg("replication slot \"%s\" is active for PID %d",
                                                        name, active_pid)));
+               else if (behavior == SAB_Inquire)
+                       return active_pid;
 
                /* Wait here until we get signaled, and then restart */
                ConditionVariableSleep(&slot->active_cv,
@@ -412,6 +420,9 @@ retry:
 
        /* We made this slot active, so it's ours now. */
        MyReplicationSlot = slot;
+
+       /* success */
+       return 0;
 }
 
 /*
@@ -518,7 +529,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
        Assert(MyReplicationSlot == NULL);
 
-       ReplicationSlotAcquire(name, nowait);
+       (void) ReplicationSlotAcquire(name, nowait ? SAB_Error : SAB_Block);
 
        ReplicationSlotDropAcquired();
 }
@@ -743,6 +754,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 
 /*
  * Compute the oldest restart LSN across all slots and inform xlog module.
+ *
+ * Note: while max_slot_wal_keep_size is theoretically relevant for this
+ * purpose, we don't try to account for that, because this module doesn't
+ * know what to compare against.
  */
 void
 ReplicationSlotsComputeRequiredLSN(void)
@@ -818,6 +833,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
                restart_lsn = s->data.restart_lsn;
                SpinLockRelease(&s->mutex);
 
+               if (restart_lsn == InvalidXLogRecPtr)
+                       continue;
+
                if (result == InvalidXLogRecPtr ||
                        restart_lsn < result)
                        result = restart_lsn;
@@ -1064,6 +1082,80 @@ ReplicationSlotReserveWal(void)
        }
 }
 
+/*
+ * Mark any slot that points to an LSN older than the given segment
+ * as invalid; it requires WAL that's about to be removed.
+ *
+ * NB - this runs as part of checkpoint, so avoid raising errors if possible.
+ */
+void
+InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
+{
+       XLogRecPtr      oldestLSN;
+
+       XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
+
+restart:
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+       for (int i = 0; i < max_replication_slots; i++)
+       {
+               ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+               XLogRecPtr      restart_lsn = InvalidXLogRecPtr;
+               char       *slotname;
+
+               if (!s->in_use)
+                       continue;
+
+               SpinLockAcquire(&s->mutex);
+               if (s->data.restart_lsn == InvalidXLogRecPtr ||
+                       s->data.restart_lsn >= oldestLSN)
+               {
+                       SpinLockRelease(&s->mutex);
+                       continue;
+               }
+
+               slotname = pstrdup(NameStr(s->data.name));
+               restart_lsn = s->data.restart_lsn;
+
+               SpinLockRelease(&s->mutex);
+               LWLockRelease(ReplicationSlotControlLock);
+
+               for (;;)
+               {
+                       int                     wspid = ReplicationSlotAcquire(slotname, SAB_Inquire);
+
+                       /* no walsender? success! */
+                       if (wspid == 0)
+                               break;
+
+                       ereport(LOG,
+                                       (errmsg("terminating walsender %d because replication slot \"%s\" is too far behind",
+                                                       wspid, slotname)));
+                       (void) kill(wspid, SIGTERM);
+
+                       ConditionVariableTimedSleep(&s->active_cv, 10,
+                                                                               WAIT_EVENT_REPLICATION_SLOT_DROP);
+               }
+               ConditionVariableCancelSleep();
+
+               ereport(LOG,
+                               (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
+                                               slotname,
+                                               (uint32) (restart_lsn >> 32),
+                                               (uint32) restart_lsn)));
+
+               SpinLockAcquire(&s->mutex);
+               s->data.restart_lsn = InvalidXLogRecPtr;
+               SpinLockRelease(&s->mutex);
+               ReplicationSlotRelease();
+
+               /* if we did anything, start from scratch */
+               CHECK_FOR_INTERRUPTS();
+               goto restart;
+       }
+       LWLockRelease(ReplicationSlotControlLock);
+}
+
 /*
  * Flush all replication slots to disk.
  *
index ce0c9127bca340313829b307918020ee0ad45cdd..f776de3df7ac87867b3a7ddc8445d60f21e05e8b 100644 (file)
@@ -234,7 +234,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 11
+#define PG_GET_REPLICATION_SLOTS_COLS 13
        ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
        TupleDesc       tupdesc;
        Tuplestorestate *tupstore;
@@ -288,6 +288,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                Oid                     database;
                NameData        slot_name;
                NameData        plugin;
+               WALAvailability walstate;
+               XLogSegNo       last_removed_seg;
                int                     i;
 
                if (!slot->in_use)
@@ -355,6 +357,40 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                else
                        nulls[i++] = true;
 
+               walstate = GetWALAvailability(restart_lsn);
+
+               switch (walstate)
+               {
+                       case WALAVAIL_INVALID_LSN:
+                               nulls[i++] = true;
+                               break;
+
+                       case WALAVAIL_NORMAL:
+                               values[i++] = CStringGetTextDatum("normal");
+                               break;
+
+                       case WALAVAIL_RESERVED:
+                               values[i++] = CStringGetTextDatum("reserved");
+                               break;
+
+                       case WALAVAIL_REMOVED:
+                               values[i++] = CStringGetTextDatum("lost");
+                               break;
+               }
+
+               if (max_slot_wal_keep_size_mb >= 0 &&
+                       (walstate == WALAVAIL_NORMAL || walstate == WALAVAIL_RESERVED) &&
+                       ((last_removed_seg = XLogGetLastRemovedSegno()) != 0))
+               {
+                       XLogRecPtr      min_safe_lsn;
+
+                       XLogSegNoOffsetToRecPtr(last_removed_seg + 1, 0,
+                                                                       wal_segment_size, min_safe_lsn);
+                       values[i++] = Int64GetDatum(min_safe_lsn);
+               }
+               else
+                       nulls[i++] = true;
+
                tuplestore_putvalues(tupstore, tupdesc, values, nulls);
        }
        LWLockRelease(ReplicationSlotControlLock);
@@ -377,6 +413,8 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
        XLogRecPtr      startlsn = MyReplicationSlot->data.restart_lsn;
        XLogRecPtr      retlsn = startlsn;
 
+       Assert(moveto != InvalidXLogRecPtr);
+
        if (startlsn < moveto)
        {
                SpinLockAcquire(&MyReplicationSlot->mutex);
@@ -414,6 +452,8 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
        ResourceOwner old_resowner = CurrentResourceOwner;
        XLogRecPtr      retlsn;
 
+       Assert(moveto != InvalidXLogRecPtr);
+
        PG_TRY();
        {
                /*
@@ -552,7 +592,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
                moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
 
        /* Acquire the slot so we "own" it */
-       ReplicationSlotAcquire(NameStr(*slotname), true);
+       (void) ReplicationSlotAcquire(NameStr(*slotname), SAB_Error);
 
        /* A slot whose restart_lsn has never been reserved cannot be advanced */
        if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
index 9e5611574cc11f971e4e31f6de78b3db33be506d..06e8b7903601041fabee3291556a5c063ce9c182 100644 (file)
@@ -595,7 +595,7 @@ StartReplication(StartReplicationCmd *cmd)
 
        if (cmd->slotname)
        {
-               ReplicationSlotAcquire(cmd->slotname, true);
+               (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error);
                if (SlotIsLogical(MyReplicationSlot))
                        ereport(ERROR,
                                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1132,7 +1132,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
        Assert(!MyReplicationSlot);
 
-       ReplicationSlotAcquire(cmd->slotname, true);
+       (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error);
 
        /*
         * Force a disconnect, so that the decoding code doesn't need to care
index 03a22d71ac22e71ad962d9aba0ab2f219848645f..5bdc02fce2ff29741b3e17638ceb54d0426683d3 100644 (file)
@@ -2784,6 +2784,19 @@ static struct config_int ConfigureNamesInt[] =
                NULL, NULL, NULL
        },
 
+       {
+               {"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
+                       gettext_noop("Sets the maximum WAL size that can be reserved by replication slots."),
+                       gettext_noop("Replication slots will be marked as failed, and segments released "
+                                                "for deletion or recycling, if this much space is occupied by WAL "
+                                                "on disk."),
+                       GUC_UNIT_MB
+               },
+               &max_slot_wal_keep_size_mb,
+               -1, -1, MAX_KILOBYTES,
+               NULL, NULL, NULL
+       },
+
        {
                {"wal_sender_timeout", PGC_USERSET, REPLICATION_SENDING,
                        gettext_noop("Sets the maximum time to wait for WAL replication."),
index 1ae8b77306c1def768c461c2d089f1c31da512a4..995b6ca1554190bd02646480285af41d1d29b393 100644 (file)
 #max_wal_senders = 10          # max number of walsender processes
                                # (change requires restart)
 #wal_keep_segments = 0         # in logfile segments; 0 disables
+#max_slot_wal_keep_size = -1   # measured in bytes; -1 disables
 #wal_sender_timeout = 60s      # in milliseconds; 0 disables
 
 #max_replication_slots = 10    # max number of replication slots
index 7412caa5f228bb54eb642069d0775c532f52338a..f60ed2d36cb33a48b72a265187b8088aa2e7b639 100644 (file)
@@ -108,6 +108,7 @@ extern int  wal_segment_size;
 extern int     min_wal_size_mb;
 extern int     max_wal_size_mb;
 extern int     wal_keep_segments;
+extern int     max_slot_wal_keep_size_mb;
 extern int     XLOGbuffers;
 extern int     XLogArchiveTimeout;
 extern int     wal_retrieve_retry_interval;
@@ -255,6 +256,17 @@ typedef struct CheckpointStatsData
 
 extern CheckpointStatsData CheckpointStats;
 
+/*
+ * GetWALAvailability return codes
+ */
+typedef enum WALAvailability
+{
+       WALAVAIL_INVALID_LSN,           /* parameter error */
+       WALAVAIL_NORMAL,                        /* WAL segment is within max_wal_size */
+       WALAVAIL_RESERVED,                      /* WAL segment is reserved by a slot */
+       WALAVAIL_REMOVED                        /* WAL segment has been removed */
+} WALAvailability;
+
 struct XLogRecData;
 
 extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata,
@@ -305,6 +317,8 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
+extern WALAvailability GetWALAvailability(XLogRecPtr restart_lsn);
+extern XLogRecPtr CalculateMaxmumSafeLSN(void);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
 extern void UpdateFullPageWrites(void);
index 7830c02021a05ab1822bdbf42772d4cc02d4045d..27381d7874f259720a10839d83a990e200bccdcb 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     202004072
+#define CATALOG_VERSION_NO     202004073
 
 #endif
index c9902fa1234148f4c9b09ff108bb212cd42b747c..4bce3ad8de6a1485f59f8c33c140e31700d7e435 100644 (file)
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,pg_lsn}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,min_safe_lsn}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
index 3e95b019b334d444d463dc19ec5933e30f702f96..149210bae45a4fccfbeae94608fe3be0955d399d 100644 (file)
@@ -36,6 +36,14 @@ typedef enum ReplicationSlotPersistency
        RS_TEMPORARY
 } ReplicationSlotPersistency;
 
+/* For ReplicationSlotAcquire, q.v. */
+typedef enum SlotAcquireBehavior
+{
+       SAB_Error,
+       SAB_Block,
+       SAB_Inquire
+} SlotAcquireBehavior;
+
 /*
  * On-Disk data of a replication slot, preserved across restarts.
  */
@@ -184,7 +192,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern int     ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(void);
 extern void ReplicationSlotSave(void);
@@ -198,6 +206,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
+extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
new file mode 100644 (file)
index 0000000..d6bc77e
--- /dev/null
@@ -0,0 +1,217 @@
+# Test for replication slot limit
+# Ensure that max_slot_wal_keep_size limits the number of WAL files to
+# be kept by replication slots.
+use strict;
+use warnings;
+
+use TestLib;
+use PostgresNode;
+
+use File::Path qw(rmtree);
+use Test::More tests => 13;
+use Time::HiRes qw(usleep);
+
+$ENV{PGDATABASE} = 'postgres';
+
+# Initialize master node, setting wal-segsize to 1MB
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, extra => ['--wal-segsize=1']);
+$node_master->append_conf('postgresql.conf', qq(
+min_wal_size = 2MB
+max_wal_size = 4MB
+log_checkpoints = yes
+));
+$node_master->start;
+$node_master->safe_psql('postgres', "SELECT pg_create_physical_replication_slot('rep1')");
+
+# The slot state and remain should be null before the first connection
+my $result = $node_master->safe_psql('postgres', "SELECT restart_lsn is NULL, wal_status is NULL, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "t|t|t", 'check the state of non-reserved slot is "unknown"');
+
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create a standby linking to it using the replication slot
+my $node_standby = get_new_node('standby_1');
+$node_standby->init_from_backup($node_master, $backup_name, has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', "primary_slot_name = 'rep1'");
+
+$node_standby->start;
+
+# Wait until standby has replayed enough data
+my $start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+# Stop standby
+$node_standby->stop;
+
+# Preparation done, the slot is the state "normal" now
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|t", 'check the catching-up state');
+
+# Advance WAL by five segments (= 5MB) on master
+advance_wal($node_master, 1);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is always "safe" when fitting max_wal_size
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|t", 'check that restart_lsn is in max_wal_size');
+
+advance_wal($node_master, 4);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is always "safe" when max_slot_wal_keep_size is not set
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|t", 'check that slot is working');
+
+# The standby can reconnect to master
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+# Set max_slot_wal_keep_size on master
+my $max_slot_wal_keep_size_mb = 6;
+$node_master->append_conf('postgresql.conf', qq(
+max_slot_wal_keep_size = ${max_slot_wal_keep_size_mb}MB
+));
+$node_master->reload;
+
+# The slot is in safe state. The distance from the min_safe_lsn should
+# be as almost (max_slot_wal_keep_size - 1) times large as the segment
+# size
+
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(pg_current_wal_lsn() - min_safe_lsn)  FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|5120 kB", 'check that max_slot_wal_keep_size is working');
+
+# Advance WAL again then checkpoint, reducing remain by 2 MB.
+advance_wal($node_master, 2);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is still working
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(pg_current_wal_lsn() - min_safe_lsn) FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|2048 kB", 'check that min_safe_lsn gets close to the current LSN');
+
+# The standby can reconnect to master
+$node_standby->start;
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+$node_standby->stop;
+
+# wal_keep_segments overrides max_slot_wal_keep_size
+$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 8; SELECT pg_reload_conf();");
+# Advance WAL again then checkpoint, reducing remain by 6 MB.
+advance_wal($node_master, 6);
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(pg_current_wal_lsn() - min_safe_lsn) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|8192 kB", 'check that wal_keep_segments overrides max_slot_wal_keep_size');
+# restore wal_keep_segments
+$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 0; SELECT pg_reload_conf();");
+
+# The standby can reconnect to master
+$node_standby->start;
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+$node_standby->stop;
+
+# Advance WAL again without checkpoint, reducing remain by 6 MB.
+advance_wal($node_master, 6);
+
+# Slot gets into 'reserved' state
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(restart_lsn - min_safe_lsn) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|reserved|216 bytes", 'check that the slot state changes to "reserved"');
+
+# do checkpoint so that the next checkpoint runs too early
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# Advance WAL again without checkpoint; remain goes to 0.
+advance_wal($node_master, 1);
+
+# Slot gets into 'lost' state
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|lost|t", 'check that the slot state changes to "lost"');
+
+# The standby still can connect to master before a checkpoint
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+ok(!find_in_log($node_standby,
+                               "requested WAL segment [0-9A-F]+ has already been removed"),
+   'check that required WAL segments are still available');
+
+# Advance WAL again, the slot loses the oldest segment.
+my $logstart = get_log_size($node_master);
+advance_wal($node_master, 7);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# WARNING should be issued
+ok(find_in_log($node_master,
+                          "invalidating slot \"rep1\" because its restart_lsn [0-9A-F/]+ exceeds max_slot_wal_keep_size",
+                          $logstart),
+   'check that the warning is logged');
+
+# This slot should be broken
+$result = $node_master->safe_psql('postgres', "SELECT slot_name, active, restart_lsn, wal_status, min_safe_lsn FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "rep1|f|||", 'check that the slot became inactive');
+
+# The standby no longer can connect to the master
+$logstart = get_log_size($node_standby);
+$node_standby->start;
+
+my $failed = 0;
+for (my $i = 0; $i < 10000; $i++)
+{
+       if (find_in_log($node_standby,
+                                       "requested WAL segment [0-9A-F]+ has already been removed",
+                                       $logstart))
+       {
+               $failed = 1;
+               last;
+       }
+       usleep(100_000);
+}
+ok($failed, 'check that replication has been broken');
+
+$node_standby->stop;
+
+#####################################
+# Advance WAL of $node by $n segments
+sub advance_wal
+{
+       my ($node, $n) = @_;
+
+       # Advance by $n segments (= (16 * $n) MB) on master
+       for (my $i = 0 ; $i < $n ; $i++)
+       {
+               $node->safe_psql('postgres', "CREATE TABLE t (); DROP TABLE t; SELECT pg_switch_wal();");
+       }
+}
+
+# return the size of logfile of $node in bytes
+sub get_log_size
+{
+       my ($node) = @_;
+
+       return (stat $node->logfile)[7];
+}
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+       my ($node, $pat, $off) = @_;
+
+       $off = 0 unless defined $off;
+       my $log = TestLib::slurp_file($node->logfile);
+       return 0 if (length($log) <= $off);
+
+       $log = substr($log, $off);
+
+       return $log =~ m/$pat/;
+}
index 6eec8ec568f6c428e251117ccd3e680f80237769..ac31840739de92f47d1fb0e067080e8c083debac 100644 (file)
@@ -1462,8 +1462,10 @@ pg_replication_slots| SELECT l.slot_name,
     l.xmin,
     l.catalog_xmin,
     l.restart_lsn,
-    l.confirmed_flush_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
+    l.confirmed_flush_lsn,
+    l.wal_status,
+    l.min_safe_lsn
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, min_safe_lsn)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,