</entry>
</row>
+ <row>
+ <entry><structfield>wal_status</structfield></entry>
+ <entry><type>text</type></entry>
+ <entry></entry>
+
+ <entry>Availability of WAL files claimed by this slot.
+ Possible values are:
+ <simplelist>
+ <member>
+ <literal>normal</literal> means that the claimed files
+ are within <varname>max_wal_size</varname>
+ </member>
+ <member>
+ <literal>reserved</literal> means that <varname>max_wal_size</varname>
+ is exceeded but the files are still held, either by some replication
+ slot or by <varname>wal_keep_segments</varname>
+ </member>
+ <member>
+ <literal>lost</literal> means that some WAL files are definitely lost
+ and this slot cannot be used to resume replication anymore.
+ </member>
+ </simplelist>
+ The last two states are seen only when
+ <xref linkend="guc-max-slot-wal-keep-size"/> is
+ non-negative. If <structfield>restart_lsn</structfield> is NULL, this
+ field is null.
+ </entry>
+ </row>
+
+ <row>
+ <entry><structfield>min_safe_lsn</structfield></entry>
+ <entry><type>pg_lsn</type></entry>
+ <entry></entry>
+ <entry>
+ The minimum LSN currently available for walsenders.
+ </entry>
+ </row>
+
</tbody>
</tgroup>
</table>
</listitem>
</varlistentry>
+ <varlistentry id="guc-max-slot-wal-keep-size" xreflabel="max_slot_wal_keep_size">
+ <term><varname>max_slot_wal_keep_size</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>max_slot_wal_keep_size</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Specify the maximum size of WAL files
+ that <link linkend="streaming-replication-slots">replication
+ slots</link> are allowed to retain in the <filename>pg_wal</filename>
+ directory at checkpoint time.
+ If <varname>max_slot_wal_keep_size</varname> is -1 (the default),
+ replication slots retain unlimited amount of WAL files. If
+ restart_lsn of a replication slot gets behind more than that megabytes
+ from the current LSN, the standby using the slot may no longer be able
+ to continue replication due to removal of required WAL files. You
+ can see the WAL availability of replication slots
+ in <link linkend="view-pg-replication-slots">pg_replication_slots</link>.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-wal-sender-timeout" xreflabel="wal_sender_timeout">
<term><varname>wal_sender_timeout</varname> (<type>integer</type>)
<indexterm>
<xref linkend="guc-archive-command"/>.
However, these methods often result in retaining more WAL segments than
required, whereas replication slots retain only the number of segments
- known to be needed. An advantage of these methods is that they bound
- the space requirement for <literal>pg_wal</literal>; there is currently no way
- to do this using replication slots.
+ known to be needed. On the other hand, replication slots can retain so
+ many WAL segments that they fill up the space allocated
+ for <literal>pg_wal</literal>;
+ <xref linkend="guc-max-slot-wal-keep-size"/> limits the size of WAL files
+ retained by replication slots.
</para>
<para>
Similarly, <xref linkend="guc-hot-standby-feedback"/>
int CommitDelay = 0; /* precommit delay in microseconds */
int CommitSiblings = 5; /* # concurrent xacts needed to sleep */
int wal_retrieve_retry_interval = 5000;
+int max_slot_wal_keep_size_mb = -1;
#ifdef WAL_DEBUG
bool XLOG_DEBUG = false;
*/
#define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD)
-/* Convert min_wal_size_mb and max_wal_size_mb to equivalent segment count */
+/* Convert values of GUCs measured in megabytes to equiv. segment count */
#define ConvertToXSegs(x, segsize) \
(x / ((segsize) / (1024 * 1024)))
return lastRemovedSegNo;
}
+
/*
- * Update the last removed segno pointer in shared memory, to reflect
- * that the given XLOG file has been removed.
+ * Update the last removed segno pointer in shared memory, to reflect that the
+ * given XLOG file has been removed.
*/
static void
UpdateLastRemovedPtr(char *filename)
*/
XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
KeepLogSeg(recptr, &_logSegNo);
+ InvalidateObsoleteReplicationSlots(_logSegNo);
_logSegNo--;
RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
replayPtr = GetXLogReplayRecPtr(&replayTLI);
endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
KeepLogSeg(endptr, &_logSegNo);
+ InvalidateObsoleteReplicationSlots(_logSegNo);
_logSegNo--;
/*
return true;
}
+/*
+ * Report availability of WAL for the given target LSN
+ * (typically a slot's restart_lsn)
+ *
+ * Returns one of the following enum values:
+ * * WALAVAIL_NORMAL means targetLSN is available because it is in the range
+ * of max_wal_size.
+ *
+ * * WALAVAIL_PRESERVED means it is still available by preserving extra
+ * segments beyond max_wal_size. If max_slot_wal_keep_size is smaller
+ * than max_wal_size, this state is not returned.
+ *
+ * * WALAVAIL_REMOVED means it is definitely lost. A replication stream on
+ * a slot with this LSN cannot continue.
+ *
+ * * WALAVAIL_INVALID_LSN means the slot hasn't been set to reserve WAL.
+ */
+WALAvailability
+GetWALAvailability(XLogRecPtr targetLSN)
+{
+ XLogRecPtr currpos; /* current write LSN */
+ XLogSegNo currSeg; /* segid of currpos */
+ XLogSegNo targetSeg; /* segid of targetLSN */
+ XLogSegNo oldestSeg; /* actual oldest segid */
+ XLogSegNo oldestSegMaxWalSize; /* oldest segid kept by max_wal_size */
+ XLogSegNo oldestSlotSeg = InvalidXLogRecPtr; /* oldest segid kept by
+ * slot */
+ uint64 keepSegs;
+
+ /* slot does not reserve WAL. Either deactivated, or has never been active */
+ if (XLogRecPtrIsInvalid(targetLSN))
+ return WALAVAIL_INVALID_LSN;
+
+ currpos = GetXLogWriteRecPtr();
+
+ /* calculate oldest segment currently needed by slots */
+ XLByteToSeg(targetLSN, targetSeg, wal_segment_size);
+ KeepLogSeg(currpos, &oldestSlotSeg);
+
+ /*
+ * Find the oldest extant segment file. We get 1 until checkpoint removes
+ * the first WAL segment file since startup, which causes the status being
+ * wrong under certain abnormal conditions but that doesn't actually harm.
+ */
+ oldestSeg = XLogGetLastRemovedSegno() + 1;
+
+ /* calculate oldest segment by max_wal_size and wal_keep_segments */
+ XLByteToSeg(currpos, currSeg, wal_segment_size);
+ keepSegs = ConvertToXSegs(Max(max_wal_size_mb, wal_keep_segments),
+ wal_segment_size) + 1;
+
+ if (currSeg > keepSegs)
+ oldestSegMaxWalSize = currSeg - keepSegs;
+ else
+ oldestSegMaxWalSize = 1;
+
+ /*
+ * If max_slot_wal_keep_size has changed after the last call, the segment
+ * that would been kept by the current setting might have been lost by the
+ * previous setting. No point in showing normal or keeping status values
+ * if the targetSeg is known to be lost.
+ */
+ if (targetSeg >= oldestSeg)
+ {
+ /*
+ * show "normal" when targetSeg is within max_wal_size, even if
+ * max_slot_wal_keep_size is smaller than max_wal_size.
+ */
+ if ((max_slot_wal_keep_size_mb <= 0 ||
+ max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
+ oldestSegMaxWalSize <= targetSeg)
+ return WALAVAIL_NORMAL;
+
+ /* being retained by slots */
+ if (oldestSlotSeg <= targetSeg)
+ return WALAVAIL_RESERVED;
+ }
+
+ /* Definitely lost */
+ return WALAVAIL_REMOVED;
+}
+
+
/*
* Retreat *logSegNo to the last segment that we need to retain because of
* either wal_keep_segments or replication slots.
*
* This is calculated by subtracting wal_keep_segments from the given xlog
* location, recptr and by making sure that that result is below the
- * requirement of replication slots.
+ * requirement of replication slots. For the latter criterion we do consider
+ * the effects of max_slot_wal_keep_size: reserve at most that much space back
+ * from recptr.
*/
static void
KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
{
+ XLogSegNo currSegNo;
XLogSegNo segno;
XLogRecPtr keep;
- XLByteToSeg(recptr, segno, wal_segment_size);
- keep = XLogGetReplicationSlotMinimumLSN();
+ XLByteToSeg(recptr, currSegNo, wal_segment_size);
+ segno = currSegNo;
- /* compute limit for wal_keep_segments first */
- if (wal_keep_segments > 0)
+ /*
+ * Calculate how many segments are kept by slots first, adjusting for
+ * max_slot_wal_keep_size.
+ */
+ keep = XLogGetReplicationSlotMinimumLSN();
+ if (keep != InvalidXLogRecPtr)
{
- /* avoid underflow, don't go below 1 */
- if (segno <= wal_keep_segments)
- segno = 1;
- else
- segno = segno - wal_keep_segments;
- }
+ XLByteToSeg(keep, segno, wal_segment_size);
- /* then check whether slots limit removal further */
- if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
- {
- XLogSegNo slotSegNo;
+ /* Cap by max_slot_wal_keep_size ... */
+ if (max_slot_wal_keep_size_mb >= 0)
+ {
+ XLogRecPtr slot_keep_segs;
- XLByteToSeg(keep, slotSegNo, wal_segment_size);
+ slot_keep_segs =
+ ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
- if (slotSegNo <= 0)
+ if (currSegNo - segno > slot_keep_segs)
+ segno = currSegNo - slot_keep_segs;
+ }
+ }
+
+ /* but, keep at least wal_keep_segments if that's set */
+ if (wal_keep_segments > 0 && currSegNo - segno < wal_keep_segments)
+ {
+ /* avoid underflow, don't go below 1 */
+ if (currSegNo <= wal_keep_segments)
segno = 1;
- else if (slotSegNo < segno)
- segno = slotSegNo;
+ else
+ segno = currSegNo - wal_keep_segments;
}
/* don't delete WAL segments newer than the calculated segment */
- if (segno < *logSegNo)
+ if (XLogRecPtrIsInvalid(*logSegNo) || segno < *logSegNo)
*logSegNo = segno;
}
L.xmin,
L.catalog_xmin,
L.restart_lsn,
- L.confirmed_flush_lsn
+ L.confirmed_flush_lsn,
+ L.wal_status,
+ L.min_safe_lsn
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);
else
end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
- ReplicationSlotAcquire(NameStr(*name), true);
+ (void) ReplicationSlotAcquire(NameStr(*name), SAB_Error);
PG_TRY();
{
/*
* Find a previously created slot and mark it as used by this backend.
+ *
+ * The return value is only useful if behavior is SAB_Inquire, in which
+ * it's zero if we successfully acquired the slot, or the PID of the
+ * owning process otherwise. If behavior is SAB_Error, then trying to
+ * acquire an owned slot is an error. If SAB_Block, we sleep until the
+ * slot is released by the owning process.
*/
-void
-ReplicationSlotAcquire(const char *name, bool nowait)
+int
+ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
{
ReplicationSlot *slot;
int active_pid;
*/
if (active_pid != MyProcPid)
{
- if (nowait)
+ if (behavior == SAB_Error)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("replication slot \"%s\" is active for PID %d",
name, active_pid)));
+ else if (behavior == SAB_Inquire)
+ return active_pid;
/* Wait here until we get signaled, and then restart */
ConditionVariableSleep(&slot->active_cv,
/* We made this slot active, so it's ours now. */
MyReplicationSlot = slot;
+
+ /* success */
+ return 0;
}
/*
{
Assert(MyReplicationSlot == NULL);
- ReplicationSlotAcquire(name, nowait);
+ (void) ReplicationSlotAcquire(name, nowait ? SAB_Error : SAB_Block);
ReplicationSlotDropAcquired();
}
/*
* Compute the oldest restart LSN across all slots and inform xlog module.
+ *
+ * Note: while max_slot_wal_keep_size is theoretically relevant for this
+ * purpose, we don't try to account for that, because this module doesn't
+ * know what to compare against.
*/
void
ReplicationSlotsComputeRequiredLSN(void)
restart_lsn = s->data.restart_lsn;
SpinLockRelease(&s->mutex);
+ if (restart_lsn == InvalidXLogRecPtr)
+ continue;
+
if (result == InvalidXLogRecPtr ||
restart_lsn < result)
result = restart_lsn;
}
}
+/*
+ * Mark any slot that points to an LSN older than the given segment
+ * as invalid; it requires WAL that's about to be removed.
+ *
+ * NB - this runs as part of checkpoint, so avoid raising errors if possible.
+ */
+void
+InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
+{
+ XLogRecPtr oldestLSN;
+
+ XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
+
+restart:
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (int i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+ XLogRecPtr restart_lsn = InvalidXLogRecPtr;
+ char *slotname;
+
+ if (!s->in_use)
+ continue;
+
+ SpinLockAcquire(&s->mutex);
+ if (s->data.restart_lsn == InvalidXLogRecPtr ||
+ s->data.restart_lsn >= oldestLSN)
+ {
+ SpinLockRelease(&s->mutex);
+ continue;
+ }
+
+ slotname = pstrdup(NameStr(s->data.name));
+ restart_lsn = s->data.restart_lsn;
+
+ SpinLockRelease(&s->mutex);
+ LWLockRelease(ReplicationSlotControlLock);
+
+ for (;;)
+ {
+ int wspid = ReplicationSlotAcquire(slotname, SAB_Inquire);
+
+ /* no walsender? success! */
+ if (wspid == 0)
+ break;
+
+ ereport(LOG,
+ (errmsg("terminating walsender %d because replication slot \"%s\" is too far behind",
+ wspid, slotname)));
+ (void) kill(wspid, SIGTERM);
+
+ ConditionVariableTimedSleep(&s->active_cv, 10,
+ WAIT_EVENT_REPLICATION_SLOT_DROP);
+ }
+ ConditionVariableCancelSleep();
+
+ ereport(LOG,
+ (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
+ slotname,
+ (uint32) (restart_lsn >> 32),
+ (uint32) restart_lsn)));
+
+ SpinLockAcquire(&s->mutex);
+ s->data.restart_lsn = InvalidXLogRecPtr;
+ SpinLockRelease(&s->mutex);
+ ReplicationSlotRelease();
+
+ /* if we did anything, start from scratch */
+ CHECK_FOR_INTERRUPTS();
+ goto restart;
+ }
+ LWLockRelease(ReplicationSlotControlLock);
+}
+
/*
* Flush all replication slots to disk.
*
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_GET_REPLICATION_SLOTS_COLS 11
+#define PG_GET_REPLICATION_SLOTS_COLS 13
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
Oid database;
NameData slot_name;
NameData plugin;
+ WALAvailability walstate;
+ XLogSegNo last_removed_seg;
int i;
if (!slot->in_use)
else
nulls[i++] = true;
+ walstate = GetWALAvailability(restart_lsn);
+
+ switch (walstate)
+ {
+ case WALAVAIL_INVALID_LSN:
+ nulls[i++] = true;
+ break;
+
+ case WALAVAIL_NORMAL:
+ values[i++] = CStringGetTextDatum("normal");
+ break;
+
+ case WALAVAIL_RESERVED:
+ values[i++] = CStringGetTextDatum("reserved");
+ break;
+
+ case WALAVAIL_REMOVED:
+ values[i++] = CStringGetTextDatum("lost");
+ break;
+ }
+
+ if (max_slot_wal_keep_size_mb >= 0 &&
+ (walstate == WALAVAIL_NORMAL || walstate == WALAVAIL_RESERVED) &&
+ ((last_removed_seg = XLogGetLastRemovedSegno()) != 0))
+ {
+ XLogRecPtr min_safe_lsn;
+
+ XLogSegNoOffsetToRecPtr(last_removed_seg + 1, 0,
+ wal_segment_size, min_safe_lsn);
+ values[i++] = Int64GetDatum(min_safe_lsn);
+ }
+ else
+ nulls[i++] = true;
+
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
LWLockRelease(ReplicationSlotControlLock);
XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn;
XLogRecPtr retlsn = startlsn;
+ Assert(moveto != InvalidXLogRecPtr);
+
if (startlsn < moveto)
{
SpinLockAcquire(&MyReplicationSlot->mutex);
ResourceOwner old_resowner = CurrentResourceOwner;
XLogRecPtr retlsn;
+ Assert(moveto != InvalidXLogRecPtr);
+
PG_TRY();
{
/*
moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
/* Acquire the slot so we "own" it */
- ReplicationSlotAcquire(NameStr(*slotname), true);
+ (void) ReplicationSlotAcquire(NameStr(*slotname), SAB_Error);
/* A slot whose restart_lsn has never been reserved cannot be advanced */
if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
if (cmd->slotname)
{
- ReplicationSlotAcquire(cmd->slotname, true);
+ (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error);
if (SlotIsLogical(MyReplicationSlot))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
Assert(!MyReplicationSlot);
- ReplicationSlotAcquire(cmd->slotname, true);
+ (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error);
/*
* Force a disconnect, so that the decoding code doesn't need to care
NULL, NULL, NULL
},
+ {
+ {"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
+ gettext_noop("Sets the maximum WAL size that can be reserved by replication slots."),
+ gettext_noop("Replication slots will be marked as failed, and segments released "
+ "for deletion or recycling, if this much space is occupied by WAL "
+ "on disk."),
+ GUC_UNIT_MB
+ },
+ &max_slot_wal_keep_size_mb,
+ -1, -1, MAX_KILOBYTES,
+ NULL, NULL, NULL
+ },
+
{
{"wal_sender_timeout", PGC_USERSET, REPLICATION_SENDING,
gettext_noop("Sets the maximum time to wait for WAL replication."),
#max_wal_senders = 10 # max number of walsender processes
# (change requires restart)
#wal_keep_segments = 0 # in logfile segments; 0 disables
+#max_slot_wal_keep_size = -1 # measured in bytes; -1 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
#max_replication_slots = 10 # max number of replication slots
extern int min_wal_size_mb;
extern int max_wal_size_mb;
extern int wal_keep_segments;
+extern int max_slot_wal_keep_size_mb;
extern int XLOGbuffers;
extern int XLogArchiveTimeout;
extern int wal_retrieve_retry_interval;
extern CheckpointStatsData CheckpointStats;
+/*
+ * GetWALAvailability return codes
+ */
+typedef enum WALAvailability
+{
+ WALAVAIL_INVALID_LSN, /* parameter error */
+ WALAVAIL_NORMAL, /* WAL segment is within max_wal_size */
+ WALAVAIL_RESERVED, /* WAL segment is reserved by a slot */
+ WALAVAIL_REMOVED /* WAL segment has been removed */
+} WALAvailability;
+
struct XLogRecData;
extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata,
extern void InitXLOGAccess(void);
extern void CreateCheckPoint(int flags);
extern bool CreateRestartPoint(int flags);
+extern WALAvailability GetWALAvailability(XLogRecPtr restart_lsn);
+extern XLogRecPtr CalculateMaxmumSafeLSN(void);
extern void XLogPutNextOid(Oid nextOid);
extern XLogRecPtr XLogRestorePoint(const char *rpName);
extern void UpdateFullPageWrites(void);
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202004072
+#define CATALOG_VERSION_NO 202004073
#endif
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', prorettype => 'record',
proargtypes => '',
- proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn}',
- proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}',
+ proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,pg_lsn}',
+ proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,min_safe_lsn}',
prosrc => 'pg_get_replication_slots' },
{ oid => '3786', descr => 'set up a logical replication slot',
proname => 'pg_create_logical_replication_slot', provolatile => 'v',
RS_TEMPORARY
} ReplicationSlotPersistency;
+/* For ReplicationSlotAcquire, q.v. */
+typedef enum SlotAcquireBehavior
+{
+ SAB_Error,
+ SAB_Block,
+ SAB_Inquire
+} SlotAcquireBehavior;
+
/*
* On-Disk data of a replication slot, preserved across restarts.
*/
extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name, bool nowait);
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior);
extern void ReplicationSlotRelease(void);
extern void ReplicationSlotCleanup(void);
extern void ReplicationSlotSave(void);
extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
extern void ReplicationSlotsDropDBSlots(Oid dboid);
+extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
extern void StartupReplicationSlots(void);
extern void CheckPointReplicationSlots(void);
--- /dev/null
+# Test for replication slot limit
+# Ensure that max_slot_wal_keep_size limits the number of WAL files to
+# be kept by replication slots.
+use strict;
+use warnings;
+
+use TestLib;
+use PostgresNode;
+
+use File::Path qw(rmtree);
+use Test::More tests => 13;
+use Time::HiRes qw(usleep);
+
+$ENV{PGDATABASE} = 'postgres';
+
+# Initialize master node, setting wal-segsize to 1MB
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, extra => ['--wal-segsize=1']);
+$node_master->append_conf('postgresql.conf', qq(
+min_wal_size = 2MB
+max_wal_size = 4MB
+log_checkpoints = yes
+));
+$node_master->start;
+$node_master->safe_psql('postgres', "SELECT pg_create_physical_replication_slot('rep1')");
+
+# The slot state and remain should be null before the first connection
+my $result = $node_master->safe_psql('postgres', "SELECT restart_lsn is NULL, wal_status is NULL, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "t|t|t", 'check the state of non-reserved slot is "unknown"');
+
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create a standby linking to it using the replication slot
+my $node_standby = get_new_node('standby_1');
+$node_standby->init_from_backup($node_master, $backup_name, has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', "primary_slot_name = 'rep1'");
+
+$node_standby->start;
+
+# Wait until standby has replayed enough data
+my $start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+# Stop standby
+$node_standby->stop;
+
+# Preparation done, the slot is the state "normal" now
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|t", 'check the catching-up state');
+
+# Advance WAL by five segments (= 5MB) on master
+advance_wal($node_master, 1);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is always "safe" when fitting max_wal_size
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|t", 'check that restart_lsn is in max_wal_size');
+
+advance_wal($node_master, 4);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is always "safe" when max_slot_wal_keep_size is not set
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|t", 'check that slot is working');
+
+# The standby can reconnect to master
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+# Set max_slot_wal_keep_size on master
+my $max_slot_wal_keep_size_mb = 6;
+$node_master->append_conf('postgresql.conf', qq(
+max_slot_wal_keep_size = ${max_slot_wal_keep_size_mb}MB
+));
+$node_master->reload;
+
+# The slot is in safe state. The distance from the min_safe_lsn should
+# be as almost (max_slot_wal_keep_size - 1) times large as the segment
+# size
+
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(pg_current_wal_lsn() - min_safe_lsn) FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|5120 kB", 'check that max_slot_wal_keep_size is working');
+
+# Advance WAL again then checkpoint, reducing remain by 2 MB.
+advance_wal($node_master, 2);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is still working
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(pg_current_wal_lsn() - min_safe_lsn) FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|2048 kB", 'check that min_safe_lsn gets close to the current LSN');
+
+# The standby can reconnect to master
+$node_standby->start;
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+$node_standby->stop;
+
+# wal_keep_segments overrides max_slot_wal_keep_size
+$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 8; SELECT pg_reload_conf();");
+# Advance WAL again then checkpoint, reducing remain by 6 MB.
+advance_wal($node_master, 6);
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(pg_current_wal_lsn() - min_safe_lsn) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|8192 kB", 'check that wal_keep_segments overrides max_slot_wal_keep_size');
+# restore wal_keep_segments
+$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 0; SELECT pg_reload_conf();");
+
+# The standby can reconnect to master
+$node_standby->start;
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+$node_standby->stop;
+
+# Advance WAL again without checkpoint, reducing remain by 6 MB.
+advance_wal($node_master, 6);
+
+# Slot gets into 'reserved' state
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(restart_lsn - min_safe_lsn) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|reserved|216 bytes", 'check that the slot state changes to "reserved"');
+
+# do checkpoint so that the next checkpoint runs too early
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# Advance WAL again without checkpoint; remain goes to 0.
+advance_wal($node_master, 1);
+
+# Slot gets into 'lost' state
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|lost|t", 'check that the slot state changes to "lost"');
+
+# The standby still can connect to master before a checkpoint
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+ok(!find_in_log($node_standby,
+ "requested WAL segment [0-9A-F]+ has already been removed"),
+ 'check that required WAL segments are still available');
+
+# Advance WAL again, the slot loses the oldest segment.
+my $logstart = get_log_size($node_master);
+advance_wal($node_master, 7);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# WARNING should be issued
+ok(find_in_log($node_master,
+ "invalidating slot \"rep1\" because its restart_lsn [0-9A-F/]+ exceeds max_slot_wal_keep_size",
+ $logstart),
+ 'check that the warning is logged');
+
+# This slot should be broken
+$result = $node_master->safe_psql('postgres', "SELECT slot_name, active, restart_lsn, wal_status, min_safe_lsn FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "rep1|f|||", 'check that the slot became inactive');
+
+# The standby no longer can connect to the master
+$logstart = get_log_size($node_standby);
+$node_standby->start;
+
+my $failed = 0;
+for (my $i = 0; $i < 10000; $i++)
+{
+ if (find_in_log($node_standby,
+ "requested WAL segment [0-9A-F]+ has already been removed",
+ $logstart))
+ {
+ $failed = 1;
+ last;
+ }
+ usleep(100_000);
+}
+ok($failed, 'check that replication has been broken');
+
+$node_standby->stop;
+
+#####################################
+# Advance WAL of $node by $n segments
+sub advance_wal
+{
+ my ($node, $n) = @_;
+
+ # Advance by $n segments (= (16 * $n) MB) on master
+ for (my $i = 0 ; $i < $n ; $i++)
+ {
+ $node->safe_psql('postgres', "CREATE TABLE t (); DROP TABLE t; SELECT pg_switch_wal();");
+ }
+}
+
+# return the size of logfile of $node in bytes
+sub get_log_size
+{
+ my ($node) = @_;
+
+ return (stat $node->logfile)[7];
+}
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+ my ($node, $pat, $off) = @_;
+
+ $off = 0 unless defined $off;
+ my $log = TestLib::slurp_file($node->logfile);
+ return 0 if (length($log) <= $off);
+
+ $log = substr($log, $off);
+
+ return $log =~ m/$pat/;
+}
l.xmin,
l.catalog_xmin,
l.restart_lsn,
- l.confirmed_flush_lsn
- FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
+ l.confirmed_flush_lsn,
+ l.wal_status,
+ l.min_safe_lsn
+ FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, min_safe_lsn)
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,