Avoid creating archive status ".ready" files too early
authorAlvaro Herrera <[email protected]>
Mon, 23 Aug 2021 19:50:35 +0000 (15:50 -0400)
committerAlvaro Herrera <[email protected]>
Mon, 23 Aug 2021 19:50:35 +0000 (15:50 -0400)
WAL records may span multiple segments, but XLogWrite() does not
wait for the entire record to be written out to disk before
creating archive status files.  Instead, as soon as the last WAL page of
the segment is written, the archive status file is created, and the
archiver may process it.  If PostgreSQL crashes before it is able to
write and flush the rest of the record (in the next WAL segment), the
wrong version of the first segment file lingers in the archive, which
causes operations such as point-in-time restores to fail.

To fix this, keep track of records that span across segments and ensure
that segments are only marked ready-for-archival once such records have
been completely written to disk.

This has always been wrong, so backpatch all the way back.

Author: Nathan Bossart <[email protected]>
Reviewed-by: Kyotaro Horiguchi <[email protected]>
Reviewed-by: Ryo Matsumura <[email protected]>
Reviewed-by: Andrey Borodin <[email protected]>
Discussion: https://postgr.es/m/CBDDFA01-6E40-46BB-9F98-9340F4379505@amazon.com

src/backend/access/transam/timeline.c
src/backend/access/transam/xlog.c
src/backend/access/transam/xlogarchive.c
src/backend/postmaster/walwriter.c
src/backend/replication/walreceiver.c
src/include/access/xlog.h
src/include/access/xlogarchive.h
src/include/access/xlogdefs.h

index 8d0903c1756a2e1c351640c630fe8c30e0cc4b3b..acd5c2431da0ec9db442e98d1e61a95ef66d6dff 100644 (file)
@@ -452,7 +452,7 @@ writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
    if (XLogArchivingActive())
    {
        TLHistoryFileName(histfname, newTLI);
-       XLogArchiveNotify(histfname);
+       XLogArchiveNotify(histfname, true);
    }
 }
 
index e51a7a749da949997f95546d38ee0af06b626c5d..24165ab03eced9e2358120c68c2195aae4f2c58a 100644 (file)
@@ -724,6 +724,18 @@ typedef struct XLogCtlData
    XLogRecPtr  lastFpwDisableRecPtr;
 
    slock_t     info_lck;       /* locks shared variables shown above */
+
+   /*
+    * Variables used to track segment-boundary-crossing WAL records.  See
+    * RegisterSegmentBoundary.  Protected by segtrack_lck.
+    */
+   XLogSegNo   lastNotifiedSeg;
+   XLogSegNo   earliestSegBoundary;
+   XLogRecPtr  earliestSegBoundaryEndPtr;
+   XLogSegNo   latestSegBoundary;
+   XLogRecPtr  latestSegBoundaryEndPtr;
+
+   slock_t     segtrack_lck;   /* locks shared variables shown above */
 } XLogCtlData;
 
 static XLogCtlData *XLogCtl = NULL;
@@ -920,6 +932,7 @@ static void RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo,
                           XLogSegNo *endlogSegNo);
 static void UpdateLastRemovedPtr(char *filename);
 static void ValidateXLOGDirectoryStructure(void);
+static void RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr pos);
 static void CleanupBackupHistory(void);
 static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
 static XLogRecord *ReadRecord(XLogReaderState *xlogreader,
@@ -1154,23 +1167,56 @@ XLogInsertRecord(XLogRecData *rdata,
    END_CRIT_SECTION();
 
    /*
-    * Update shared LogwrtRqst.Write, if we crossed page boundary.
+    * If we crossed page boundary, update LogwrtRqst.Write; if we crossed
+    * segment boundary, register that and wake up walwriter.
     */
    if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
    {
+       XLogSegNo   StartSeg;
+       XLogSegNo   EndSeg;
+
+       XLByteToSeg(StartPos, StartSeg, wal_segment_size);
+       XLByteToSeg(EndPos, EndSeg, wal_segment_size);
+
+       /*
+        * Register our crossing the segment boundary if that occurred.
+        *
+        * Note that we did not use XLByteToPrevSeg() for determining the
+        * ending segment.  This is so that a record that fits perfectly into
+        * the end of the segment causes the latter to get marked ready for
+        * archival immediately.
+        */
+       if (StartSeg != EndSeg && XLogArchivingActive())
+           RegisterSegmentBoundary(EndSeg, EndPos);
+
+       /*
+        * Advance LogwrtRqst.Write so that it includes new block(s).
+        *
+        * We do this after registering the segment boundary so that the
+        * comparison with the flushed pointer below can use the latest value
+        * known globally.
+        */
        SpinLockAcquire(&XLogCtl->info_lck);
-       /* advance global request to include new block(s) */
        if (XLogCtl->LogwrtRqst.Write < EndPos)
            XLogCtl->LogwrtRqst.Write = EndPos;
        /* update local result copy while I have the chance */
        LogwrtResult = XLogCtl->LogwrtResult;
        SpinLockRelease(&XLogCtl->info_lck);
+
+       /*
+        * There's a chance that the record was already flushed to disk and we
+        * missed marking segments as ready for archive.  If this happens, we
+        * nudge the WALWriter, which will take care of notifying segments as
+        * needed.
+        */
+       if (StartSeg != EndSeg && XLogArchivingActive() &&
+           LogwrtResult.Flush >= EndPos && ProcGlobal->walwriterLatch)
+           SetLatch(ProcGlobal->walwriterLatch);
    }
 
    /*
     * If this was an XLOG_SWITCH record, flush the record and the empty
-    * padding space that fills the rest of the segment, and perform
-    * end-of-segment actions (eg, notifying archiver).
+    * padding space that fills the rest of the segment.
     */
    if (isLogSwitch)
    {
@@ -2421,6 +2467,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 
    /* We should always be inside a critical section here */
    Assert(CritSectionCount > 0);
+   Assert(LWLockHeldByMe(WALWriteLock));
 
    /*
     * Update local LogwrtResult (caller probably did this already, but...)
@@ -2586,11 +2633,12 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
             * later. Doing it here ensures that one and only one backend will
             * perform this fsync.
             *
-            * This is also the right place to notify the Archiver that the
-            * segment is ready to copy to archival storage, and to update the
-            * timer for archive_timeout, and to signal for a checkpoint if
-            * too many logfile segments have been used since the last
-            * checkpoint.
+            * If WAL archiving is active, we attempt to notify the archiver
+            * of any segments that are now ready for archival.
+            *
+            * This is also the right place to update the timer for
+            * archive_timeout and to signal for a checkpoint if too many
+            * logfile segments have been used since the last checkpoint.
             */
            if (finishing_seg)
            {
@@ -2602,7 +2650,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
                LogwrtResult.Flush = LogwrtResult.Write;    /* end of page */
 
                if (XLogArchivingActive())
-                   XLogArchiveNotifySeg(openLogSegNo);
+                   NotifySegmentsReadyForArchive(LogwrtResult.Flush);
 
                XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
                XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush;
@@ -2690,6 +2738,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
            XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
        SpinLockRelease(&XLogCtl->info_lck);
    }
+
+   if (XLogArchivingActive())
+       NotifySegmentsReadyForArchive(LogwrtResult.Flush);
 }
 
 /*
@@ -4328,6 +4379,131 @@ ValidateXLOGDirectoryStructure(void)
    }
 }
 
+/*
+ * RegisterSegmentBoundary
+ *
+ * WAL records that are split across a segment boundary require special
+ * treatment for archiving: the initial segment must not be archived until
+ * the end segment has been flushed, in case we crash before we have
+ * the chance to flush the end segment (because after recovery we would
+ * overwrite that WAL record with a different one, and so the file we
+ * archived no longer represents truth.)  This also applies to streaming
+ * physical replication.
+ *
+ * To handle this, we keep track of the LSN of WAL records that cross
+ * segment boundaries.  Two such are sufficient: the ones with the
+ * earliest and the latest end pointers we know about, since the flush
+ * position advances monotonically.  WAL record writers register
+ * boundary-crossing records here, which is used by .ready file creation
+ * to delay until the end segment is known flushed.
+ */
+static void
+RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos)
+{
+   XLogSegNo   segno PG_USED_FOR_ASSERTS_ONLY;
+
+   /* verify caller computed segment number correctly */
+   AssertArg((XLByteToSeg(endpos, segno, wal_segment_size), segno == seg));
+
+   SpinLockAcquire(&XLogCtl->segtrack_lck);
+
+   /*
+    * If no segment boundaries are registered, store the new segment boundary
+    * in earliestSegBoundary.  Otherwise, store the greater segment
+    * boundaries in latestSegBoundary.
+    */
+   if (XLogCtl->earliestSegBoundary == MaxXLogSegNo)
+   {
+       XLogCtl->earliestSegBoundary = seg;
+       XLogCtl->earliestSegBoundaryEndPtr = endpos;
+   }
+   else if (seg > XLogCtl->earliestSegBoundary &&
+            (XLogCtl->latestSegBoundary == MaxXLogSegNo ||
+             seg > XLogCtl->latestSegBoundary))
+   {
+       XLogCtl->latestSegBoundary = seg;
+       XLogCtl->latestSegBoundaryEndPtr = endpos;
+   }
+
+   SpinLockRelease(&XLogCtl->segtrack_lck);
+}
+
+/*
+ * NotifySegmentsReadyForArchive
+ *
+ * Mark segments as ready for archival, given that it is safe to do so.
+ * This function is idempotent.
+ */
+void
+NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr)
+{
+   XLogSegNo   latest_boundary_seg;
+   XLogSegNo   last_notified;
+   XLogSegNo   flushed_seg;
+   XLogSegNo   seg;
+   bool        keep_latest;
+
+   XLByteToSeg(flushRecPtr, flushed_seg, wal_segment_size);
+
+   SpinLockAcquire(&XLogCtl->segtrack_lck);
+
+   if (XLogCtl->latestSegBoundary <= flushed_seg &&
+       XLogCtl->latestSegBoundaryEndPtr <= flushRecPtr)
+   {
+       latest_boundary_seg = XLogCtl->latestSegBoundary;
+       keep_latest = false;
+   }
+   else if (XLogCtl->earliestSegBoundary <= flushed_seg &&
+            XLogCtl->earliestSegBoundaryEndPtr <= flushRecPtr)
+   {
+       latest_boundary_seg = XLogCtl->earliestSegBoundary;
+       keep_latest = true;
+   }
+   else
+   {
+       SpinLockRelease(&XLogCtl->segtrack_lck);
+       return;
+   }
+
+   last_notified = XLogCtl->lastNotifiedSeg;
+
+   /*
+    * Update shared memory and discard segment boundaries that are no longer
+    * needed.
+    *
+    * It is safe to update shared memory before we attempt to create the
+    * .ready files.  If our calls to XLogArchiveNotifySeg() fail,
+    * RemoveOldXlogFiles() will retry it as needed.
+    */
+   if (last_notified < latest_boundary_seg - 1)
+       XLogCtl->lastNotifiedSeg = latest_boundary_seg - 1;
+
+   if (keep_latest)
+   {
+       XLogCtl->earliestSegBoundary = XLogCtl->latestSegBoundary;
+       XLogCtl->earliestSegBoundaryEndPtr = XLogCtl->latestSegBoundaryEndPtr;
+   }
+   else
+   {
+       XLogCtl->earliestSegBoundary = MaxXLogSegNo;
+       XLogCtl->earliestSegBoundaryEndPtr = InvalidXLogRecPtr;
+   }
+
+   XLogCtl->latestSegBoundary = MaxXLogSegNo;
+   XLogCtl->latestSegBoundaryEndPtr = InvalidXLogRecPtr;
+
+   SpinLockRelease(&XLogCtl->segtrack_lck);
+
+   /*
+    * Notify archiver about segments that are ready for archival (by creating
+    * the corresponding .ready files).
+    */
+   for (seg = last_notified + 1; seg < latest_boundary_seg; seg++)
+       XLogArchiveNotifySeg(seg, false);
+
+   PgArchWakeup();
+}
+
 /*
  * Remove previous backup history files.  This also retries creation of
  * .ready files for any backup history files for which XLogArchiveNotify
@@ -5230,9 +5406,17 @@ XLOGShmemInit(void)
 
    SpinLockInit(&XLogCtl->Insert.insertpos_lck);
    SpinLockInit(&XLogCtl->info_lck);
+   SpinLockInit(&XLogCtl->segtrack_lck);
    SpinLockInit(&XLogCtl->ulsn_lck);
    InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
    ConditionVariableInit(&XLogCtl->recoveryNotPausedCV);
+
+   /* Initialize stuff for marking segments as ready for archival. */
+   XLogCtl->lastNotifiedSeg = MaxXLogSegNo;
+   XLogCtl->earliestSegBoundary = MaxXLogSegNo;
+   XLogCtl->earliestSegBoundaryEndPtr = InvalidXLogRecPtr;
+   XLogCtl->latestSegBoundary = MaxXLogSegNo;
+   XLogCtl->latestSegBoundaryEndPtr = InvalidXLogRecPtr;
 }
 
 /*
@@ -7873,6 +8057,20 @@ StartupXLOG(void)
    XLogCtl->LogwrtRqst.Write = EndOfLog;
    XLogCtl->LogwrtRqst.Flush = EndOfLog;
 
+   /*
+    * Initialize XLogCtl->lastNotifiedSeg to the previous WAL file.
+    */
+   if (XLogArchivingActive())
+   {
+       XLogSegNo   EndOfLogSeg;
+
+       XLByteToSeg(EndOfLog, EndOfLogSeg, wal_segment_size);
+
+       SpinLockAcquire(&XLogCtl->segtrack_lck);
+       XLogCtl->lastNotifiedSeg = EndOfLogSeg - 1;
+       SpinLockRelease(&XLogCtl->segtrack_lck);
+   }
+
    /*
     * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
     * record before resource manager writes cleanup WAL records or checkpoint
@@ -8000,7 +8198,7 @@ StartupXLOG(void)
                XLogArchiveCleanup(partialfname);
 
                durable_rename(origpath, partialpath, ERROR);
-               XLogArchiveNotify(partialfname);
+               XLogArchiveNotify(partialfname, true);
            }
        }
    }
index 26b023e754b00309ec39285356c858d375e00433..b9c19b20856287ebfec094513e143bcf09aee73e 100644 (file)
@@ -433,7 +433,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname)
    if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
        XLogArchiveForceDone(xlogfname);
    else
-       XLogArchiveNotify(xlogfname);
+       XLogArchiveNotify(xlogfname, true);
 
    /*
     * If the existing file was replaced, since walsenders might have it open,
@@ -462,9 +462,12 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname)
  * by the archiver, e.g. we write 0000000100000001000000C6.ready
  * and the archiver then knows to archive XLOGDIR/0000000100000001000000C6,
  * then when complete, rename it to 0000000100000001000000C6.done
+ *
+ * Optionally, nudge the archiver process so that it'll notice the file we
+ * create.
  */
 void
-XLogArchiveNotify(const char *xlog)
+XLogArchiveNotify(const char *xlog, bool nudge)
 {
    char        archiveStatusPath[MAXPGPATH];
    FILE       *fd;
@@ -489,8 +492,8 @@ XLogArchiveNotify(const char *xlog)
        return;
    }
 
-   /* Notify archiver that it's got something to do */
-   if (IsUnderPostmaster)
+   /* If caller requested, let archiver know it's got work to do */
+   if (nudge)
        PgArchWakeup();
 }
 
@@ -498,12 +501,12 @@ XLogArchiveNotify(const char *xlog)
  * Convenience routine to notify using segment number representation of filename
  */
 void
-XLogArchiveNotifySeg(XLogSegNo segno)
+XLogArchiveNotifySeg(XLogSegNo segno, bool nudge)
 {
    char        xlog[MAXFNAMELEN];
 
    XLogFileName(xlog, ThisTimeLineID, segno, wal_segment_size);
-   XLogArchiveNotify(xlog);
+   XLogArchiveNotify(xlog, nudge);
 }
 
 /*
@@ -608,7 +611,7 @@ XLogArchiveCheckDone(const char *xlog)
        return true;
 
    /* Retry creation of the .ready file */
-   XLogArchiveNotify(xlog);
+   XLogArchiveNotify(xlog, true);
    return false;
 }
 
index 626fae8454ca4d38cacc9fd4e6c82ce596990908..6a1e16edc23ab806f1321b26d4458f0845706ccb 100644 (file)
@@ -248,6 +248,13 @@ WalWriterMain(void)
        /* Process any signals received recently */
        HandleWalWriterInterrupts();
 
+       /*
+        * Notify the archiver of any WAL segments that are ready.  We do this
+        * here to handle a race condition where WAL is flushed to disk prior
+        * to registering the segment boundary.
+        */
+       NotifySegmentsReadyForArchive(GetFlushRecPtr());
+
        /*
         * Do what we're here for; then, if XLogBackgroundFlush() found useful
         * work to do, reset hibernation counter.
index 9a2bc37fd711b74c211f5db512880d8742950ce1..60de3be92c2cc057d3e4123f2754d1da4da6d2bf 100644 (file)
@@ -622,7 +622,7 @@ WalReceiverMain(void)
            if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
                XLogArchiveForceDone(xlogfname);
            else
-               XLogArchiveNotify(xlogfname);
+               XLogArchiveNotify(xlogfname, true);
        }
        recvFile = -1;
 
@@ -760,7 +760,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
            if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
                XLogArchiveForceDone(fname);
            else
-               XLogArchiveNotify(fname);
+               XLogArchiveNotify(fname, true);
 
            pfree(fname);
            pfree(content);
@@ -915,7 +915,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
                if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
                    XLogArchiveForceDone(xlogfname);
                else
-                   XLogArchiveNotify(xlogfname);
+                   XLogArchiveNotify(xlogfname, true);
            }
            recvFile = -1;
 
index 0a8ede700defc658925d3d60383ac94fd4e25988..6b6ae81c2d5c9e3d935f4c7016dd55b97372d834 100644 (file)
@@ -315,6 +315,7 @@ extern XLogRecPtr GetInsertRecPtr(void);
 extern XLogRecPtr GetFlushRecPtr(void);
 extern XLogRecPtr GetLastImportantRecPtr(void);
 extern void RemovePromoteSignalFiles(void);
+extern void NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr);
 
 extern bool PromoteIsTriggered(void);
 extern bool CheckPromoteSignal(void);
index 3edd1a976c1b48ba38a9778346e04bd49adbb75e..935b4cb02d51186cdeb08b87223236afad5aad88 100644 (file)
@@ -23,8 +23,8 @@ extern bool RestoreArchivedFile(char *path, const char *xlogfname,
 extern void ExecuteRecoveryCommand(const char *command, const char *commandName,
                                   bool failOnSignal);
 extern void KeepFileRestoredFromArchive(const char *path, const char *xlogfname);
-extern void XLogArchiveNotify(const char *xlog);
-extern void XLogArchiveNotifySeg(XLogSegNo segno);
+extern void XLogArchiveNotify(const char *xlog, bool nudge);
+extern void XLogArchiveNotifySeg(XLogSegNo segno, bool nudge);
 extern void XLogArchiveForceDone(const char *xlog);
 extern bool XLogArchiveCheckDone(const char *xlog);
 extern bool XLogArchiveIsBusy(const char *xlog);
index 60348d1850964238ab3c2ceb3af2abd7669b7511..9b455e88e3193f3f8ac91dccae737e9affd7852e 100644 (file)
@@ -46,6 +46,7 @@ typedef uint64 XLogRecPtr;
  * XLogSegNo - physical log file sequence number.
  */
 typedef uint64 XLogSegNo;
+#define MaxXLogSegNo   ((XLogSegNo) 0xFFFFFFFFFFFFFFFF)
 
 /*
  * TimeLineID (TLI) - identifies different database histories to prevent