Add info in WAL records in preparation for logical slot conflict handling
authorAndres Freund <[email protected]>
Sun, 2 Apr 2023 19:32:19 +0000 (12:32 -0700)
committerAndres Freund <[email protected]>
Sun, 2 Apr 2023 19:32:19 +0000 (12:32 -0700)
This commit only implements one prerequisite part for allowing logical
decoding. The commit message contains an explanation of the overall design,
which later commits will refer back to.

Overall design:

1. We want to enable logical decoding on standbys, but replay of WAL
from the primary might remove data that is needed by logical decoding,
causing error(s) on the standby. To prevent those errors, a new replication
conflict scenario needs to be addressed (as much as hot standby does).

2. Our chosen strategy for dealing with this type of replication slot
is to invalidate logical slots for which needed data has been removed.

3. To do this we need the latestRemovedXid for each change, just as we
do for physical replication conflicts, but we also need to know
whether any particular change was to data that logical replication
might access. That way, during WAL replay, we know when there is a risk of
conflict and, if so, if there is a conflict.

4. We can't rely on the standby's relcache entries for this purpose in
any way, because the startup process can't access catalog contents.

5. Therefore every WAL record that potentially removes data from the
index or heap must carry a flag indicating whether or not it is one
that might be accessed during logical decoding.

Why do we need this for logical decoding on standby?

First, let's forget about logical decoding on standby and recall that
on a primary database, any catalog rows that may be needed by a logical
decoding replication slot are not removed.

This is done thanks to the catalog_xmin associated with the logical
replication slot.

But, with logical decoding on standby, in the following cases:

- hot_standby_feedback is off
- hot_standby_feedback is on but there is no a physical slot between
  the primary and the standby. Then, hot_standby_feedback will work,
  but only while the connection is alive (for example a node restart
  would break it)

Then, the primary may delete system catalog rows that could be needed
by the logical decoding on the standby (as it does not know about the
catalog_xmin on the standby).

So, it’s mandatory to identify those rows and invalidate the slots
that may need them if any. Identifying those rows is the purpose of
this commit.

Implementation:

When a WAL replay on standby indicates that a catalog table tuple is
to be deleted by an xid that is greater than a logical slot's
catalog_xmin, then that means the slot's catalog_xmin conflicts with
the xid, and we need to handle the conflict. While subsequent commits
will do the actual conflict handling, this commit adds a new field
isCatalogRel in such WAL records (and a new bit set in the
xl_heap_visible flags field), that is true for catalog tables, so as to
arrange for conflict handling.

The affected WAL records are the ones that already contain the
snapshotConflictHorizon field, namely:

- gistxlogDelete
- gistxlogPageReuse
- xl_hash_vacuum_one_page
- xl_heap_prune
- xl_heap_freeze_page
- xl_heap_visible
- xl_btree_reuse_page
- xl_btree_delete
- spgxlogVacuumRedirect

Due to this new field being added, xl_hash_vacuum_one_page and
gistxlogDelete do now contain the offsets to be deleted as a
FLEXIBLE_ARRAY_MEMBER. This is needed to ensure correct alignment.
It's not needed on the others struct where isCatalogRel has
been added.

This commit just introduces the WAL format changes mentioned above. Handling
the actual conflicts will follow in future commits.

Bumps XLOG_PAGE_MAGIC as the several WAL records are changed.

Author: "Drouvot, Bertrand" <[email protected]>
Author: Andres Freund <[email protected]> (in an older version)
Author: Amit Khandekar <[email protected]>  (in an older version)
Reviewed-by: "Drouvot, Bertrand" <[email protected]>
Reviewed-by: Andres Freund <[email protected]>
Reviewed-by: Robert Haas <[email protected]>
Reviewed-by: Fabrízio de Royes Mello <[email protected]>
Reviewed-by: Melanie Plageman <[email protected]>
15 files changed:
src/backend/access/gist/gistxlog.c
src/backend/access/hash/hash_xlog.c
src/backend/access/hash/hashinsert.c
src/backend/access/heap/heapam.c
src/backend/access/heap/pruneheap.c
src/backend/access/nbtree/nbtpage.c
src/backend/access/spgist/spgvacuum.c
src/include/access/gistxlog.h
src/include/access/hash_xlog.h
src/include/access/heapam_xlog.h
src/include/access/nbtxlog.h
src/include/access/spgxlog.h
src/include/access/visibilitymapdefs.h
src/include/access/xlog_internal.h
src/include/utils/rel.h

index 4b52719765fe904617d938ab02c322b21a68d5bb..b7678f3c14409a66f98188780e13895a274527cc 100644 (file)
@@ -177,6 +177,7 @@ gistRedoDeleteRecord(XLogReaderState *record)
        gistxlogDelete *xldata = (gistxlogDelete *) XLogRecGetData(record);
        Buffer          buffer;
        Page            page;
+       OffsetNumber *toDelete = xldata->offsets;
 
        /*
         * If we have any conflict processing to do, it must happen before we
@@ -203,14 +204,7 @@ gistRedoDeleteRecord(XLogReaderState *record)
        {
                page = (Page) BufferGetPage(buffer);
 
-               if (XLogRecGetDataLen(record) > SizeOfGistxlogDelete)
-               {
-                       OffsetNumber *todelete;
-
-                       todelete = (OffsetNumber *) ((char *) xldata + SizeOfGistxlogDelete);
-
-                       PageIndexMultiDelete(page, todelete, xldata->ntodelete);
-               }
+               PageIndexMultiDelete(page, toDelete, xldata->ntodelete);
 
                GistClearPageHasGarbage(page);
                GistMarkTuplesDeleted(page);
@@ -609,6 +603,7 @@ gistXLogPageReuse(Relation rel, Relation heaprel,
         */
 
        /* XLOG stuff */
+       xlrec_reuse.isCatalogRel = RelationIsAccessibleInLogicalDecoding(heaprel);
        xlrec_reuse.locator = rel->rd_locator;
        xlrec_reuse.block = blkno;
        xlrec_reuse.snapshotConflictHorizon = deleteXid;
@@ -678,6 +673,7 @@ gistXLogDelete(Buffer buffer, OffsetNumber *todelete, int ntodelete,
        gistxlogDelete xlrec;
        XLogRecPtr      recptr;
 
+       xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(heaprel);
        xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
        xlrec.ntodelete = ntodelete;
 
index f38b42efb90f4da2d413f4fd00e82483fa9fd77a..f2dd9be8d3f0ae61456482596c84915779ac31ec 100644 (file)
@@ -980,8 +980,10 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
        Page            page;
        XLogRedoAction action;
        HashPageOpaque pageopaque;
+       OffsetNumber *toDelete;
 
        xldata = (xl_hash_vacuum_one_page *) XLogRecGetData(record);
+       toDelete = xldata->offsets;
 
        /*
         * If we have any conflict processing to do, it must happen before we
@@ -1010,14 +1012,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
        {
                page = (Page) BufferGetPage(buffer);
 
-               if (XLogRecGetDataLen(record) > SizeOfHashVacuumOnePage)
-               {
-                       OffsetNumber *unused;
-
-                       unused = (OffsetNumber *) ((char *) xldata + SizeOfHashVacuumOnePage);
-
-                       PageIndexMultiDelete(page, unused, xldata->ntuples);
-               }
+               PageIndexMultiDelete(page, toDelete, xldata->ntuples);
 
                /*
                 * Mark the page as not containing any LP_DEAD items. See comments in
index a604e3189196381b356bf314ab65444bad982458..22656b24e205131fe38d01fc5be0c18691128cbd 100644 (file)
@@ -432,6 +432,7 @@ _hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf)
                        xl_hash_vacuum_one_page xlrec;
                        XLogRecPtr      recptr;
 
+                       xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(hrel);
                        xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
                        xlrec.ntuples = ndeletable;
 
index 9662e382549f833b4e475c590c0d7950846384f2..f7d9ce59a4790aef587beef073f37fdea8c8b449 100644 (file)
@@ -6698,6 +6698,7 @@ heap_freeze_execute_prepared(Relation rel, Buffer buffer,
                nplans = heap_log_freeze_plan(tuples, ntuples, plans, offsets);
 
                xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
+               xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(rel);
                xlrec.nplans = nplans;
 
                XLogBeginInsert();
@@ -8280,6 +8281,8 @@ log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer,
 
        xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
        xlrec.flags = vmflags;
+       if (RelationIsAccessibleInLogicalDecoding(rel))
+               xlrec.flags |= VISIBILITYMAP_XLOG_CATALOG_REL;
        XLogBeginInsert();
        XLogRegisterData((char *) &xlrec, SizeOfHeapVisible);
 
@@ -8870,6 +8873,8 @@ heap_xlog_visible(XLogReaderState *record)
        BlockNumber blkno;
        XLogRedoAction action;
 
+       Assert((xlrec->flags & VISIBILITYMAP_XLOG_VALID_BITS) == xlrec->flags);
+
        XLogRecGetBlockTag(record, 1, &rlocator, NULL, &blkno);
 
        /*
@@ -8956,11 +8961,15 @@ heap_xlog_visible(XLogReaderState *record)
        {
                Page            vmpage = BufferGetPage(vmbuffer);
                Relation        reln;
+               uint8           vmbits;
 
                /* initialize the page if it was read as zeros */
                if (PageIsNew(vmpage))
                        PageInit(vmpage, BLCKSZ, 0);
 
+               /* remove VISIBILITYMAP_XLOG_* */
+               vmbits = xlrec->flags & VISIBILITYMAP_VALID_BITS;
+
                /*
                 * XLogReadBufferForRedoExtended locked the buffer. But
                 * visibilitymap_set will handle locking itself.
@@ -8971,7 +8980,7 @@ heap_xlog_visible(XLogReaderState *record)
                visibilitymap_pin(reln, blkno, &vmbuffer);
 
                visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer,
-                                                 xlrec->snapshotConflictHorizon, xlrec->flags);
+                                                 xlrec->snapshotConflictHorizon, vmbits);
 
                ReleaseBuffer(vmbuffer);
                FreeFakeRelcacheEntry(reln);
index 4e65cbcadf8b6c652dfca5eedb711d49efb7fea3..3f0342351fbf1ad150b3794a1a2358769f6cc861 100644 (file)
@@ -418,6 +418,7 @@ heap_page_prune(Relation relation, Buffer buffer,
                        xl_heap_prune xlrec;
                        XLogRecPtr      recptr;
 
+                       xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(relation);
                        xlrec.snapshotConflictHorizon = prstate.snapshotConflictHorizon;
                        xlrec.nredirected = prstate.nredirected;
                        xlrec.ndead = prstate.ndead;
index ee996b56602c3e70681da8c77ddd14b85a7814f2..151ad37a542b50dff86bd8a63b360ddac4e286fa 100644 (file)
@@ -836,6 +836,7 @@ _bt_log_reuse_page(Relation rel, Relation heaprel, BlockNumber blkno,
         */
 
        /* XLOG stuff */
+       xlrec_reuse.isCatalogRel = RelationIsAccessibleInLogicalDecoding(heaprel);
        xlrec_reuse.locator = rel->rd_locator;
        xlrec_reuse.block = blkno;
        xlrec_reuse.snapshotConflictHorizon = safexid;
@@ -1358,6 +1359,7 @@ _bt_delitems_delete(Relation rel, Relation heaprel, Buffer buf,
                XLogRecPtr      recptr;
                xl_btree_delete xlrec_delete;
 
+               xlrec_delete.isCatalogRel = RelationIsAccessibleInLogicalDecoding(heaprel);
                xlrec_delete.snapshotConflictHorizon = snapshotConflictHorizon;
                xlrec_delete.ndeleted = ndeletable;
                xlrec_delete.nupdated = nupdatable;
index 3cff71e7203d5ff344e6557414715cb023ec8a41..2f4a4aad24195872cf8879b952fad45948f61795 100644 (file)
@@ -503,6 +503,7 @@ vacuumRedirectAndPlaceholder(Relation index, Relation heaprel, Buffer buffer)
        spgxlogVacuumRedirect xlrec;
        GlobalVisState *vistest;
 
+       xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(heaprel);
        xlrec.nToPlaceholder = 0;
        xlrec.snapshotConflictHorizon = InvalidTransactionId;
 
index 2ce9366277ded64bc7a339e313848e50c2c218c4..aff2ffbdcc242a1c6afb51dc7db9ad4d295c50f1 100644 (file)
@@ -51,11 +51,14 @@ typedef struct gistxlogDelete
 {
        TransactionId snapshotConflictHorizon;
        uint16          ntodelete;              /* number of deleted offsets */
+       bool            isCatalogRel;   /* to handle recovery conflict during logical
+                                                                * decoding on standby */
 
-       /* TODELETE OFFSET NUMBER ARRAY FOLLOWS */
+       /* TODELETE OFFSET NUMBERS */
+       OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER];
 } gistxlogDelete;
 
-#define SizeOfGistxlogDelete   (offsetof(gistxlogDelete, ntodelete) + sizeof(uint16))
+#define SizeOfGistxlogDelete   offsetof(gistxlogDelete, offsets)
 
 /*
  * Backup Blk 0: If this operation completes a page split, by inserting a
@@ -98,9 +101,11 @@ typedef struct gistxlogPageReuse
        RelFileLocator locator;
        BlockNumber block;
        FullTransactionId snapshotConflictHorizon;
+       bool            isCatalogRel;   /* to handle recovery conflict during logical
+                                                                * decoding on standby */
 } gistxlogPageReuse;
 
-#define SizeOfGistxlogPageReuse        (offsetof(gistxlogPageReuse, snapshotConflictHorizon) + sizeof(FullTransactionId))
+#define SizeOfGistxlogPageReuse        (offsetof(gistxlogPageReuse, isCatalogRel) + sizeof(bool))
 
 extern void gist_redo(XLogReaderState *record);
 extern void gist_desc(StringInfo buf, XLogReaderState *record);
index 9894ab9afee20161198f18b97c5e4397ec61078d..b93619d1a814841166de71221be5a54414122ba4 100644 (file)
@@ -251,13 +251,15 @@ typedef struct xl_hash_init_bitmap_page
 typedef struct xl_hash_vacuum_one_page
 {
        TransactionId snapshotConflictHorizon;
-       uint16                  ntuples;
+       uint16          ntuples;
+       bool            isCatalogRel;   /* to handle recovery conflict during logical
+                                                                * decoding on standby */
 
-       /* TARGET OFFSET NUMBERS FOLLOW AT THE END */
+       /* TARGET OFFSET NUMBERS */
+       OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER];
 } xl_hash_vacuum_one_page;
 
-#define SizeOfHashVacuumOnePage \
-       (offsetof(xl_hash_vacuum_one_page, ntuples) + sizeof(uint16))
+#define SizeOfHashVacuumOnePage offsetof(xl_hash_vacuum_one_page, offsets)
 
 extern void hash_redo(XLogReaderState *record);
 extern void hash_desc(StringInfo buf, XLogReaderState *record);
index 42620bbdc9e53fba845ef509c5912f114e577f87..e71d84895225a125c5ee30e53b535bff09533515 100644 (file)
@@ -245,10 +245,12 @@ typedef struct xl_heap_prune
        TransactionId snapshotConflictHorizon;
        uint16          nredirected;
        uint16          ndead;
+       bool            isCatalogRel;   /* to handle recovery conflict during logical
+                                                                * decoding on standby */
        /* OFFSET NUMBERS are in the block reference 0 */
 } xl_heap_prune;
 
-#define SizeOfHeapPrune (offsetof(xl_heap_prune, ndead) + sizeof(uint16))
+#define SizeOfHeapPrune (offsetof(xl_heap_prune, isCatalogRel) + sizeof(bool))
 
 /*
  * The vacuum page record is similar to the prune record, but can only mark
@@ -344,13 +346,15 @@ typedef struct xl_heap_freeze_page
 {
        TransactionId snapshotConflictHorizon;
        uint16          nplans;
+       bool            isCatalogRel;   /* to handle recovery conflict during logical
+                                                                * decoding on standby */
 
        /*
         * In payload of blk 0 : FREEZE PLANS and OFFSET NUMBER ARRAY
         */
 } xl_heap_freeze_page;
 
-#define SizeOfHeapFreezePage (offsetof(xl_heap_freeze_page, nplans) + sizeof(uint16))
+#define SizeOfHeapFreezePage   (offsetof(xl_heap_freeze_page, isCatalogRel) + sizeof(bool))
 
 /*
  * This is what we need to know about setting a visibility map bit
index 7dd67257f296193051ffd3dd62a3d0e84cd44acf..a0310c96fb1afba6b6b6ed85d8f7d473b006686f 100644 (file)
@@ -188,9 +188,11 @@ typedef struct xl_btree_reuse_page
        RelFileLocator locator;
        BlockNumber block;
        FullTransactionId snapshotConflictHorizon;
+       bool            isCatalogRel;   /* to handle recovery conflict during logical
+                                                                * decoding on standby */
 } xl_btree_reuse_page;
 
-#define SizeOfBtreeReusePage   (sizeof(xl_btree_reuse_page))
+#define SizeOfBtreeReusePage   (offsetof(xl_btree_reuse_page, isCatalogRel) + sizeof(bool))
 
 /*
  * xl_btree_vacuum and xl_btree_delete records describe deletion of index
@@ -235,6 +237,8 @@ typedef struct xl_btree_delete
        TransactionId snapshotConflictHorizon;
        uint16          ndeleted;
        uint16          nupdated;
+       bool            isCatalogRel;   /* to handle recovery conflict during logical
+                                                                * decoding on standby */
 
        /*----
         * In payload of blk 0 :
@@ -245,7 +249,7 @@ typedef struct xl_btree_delete
         */
 } xl_btree_delete;
 
-#define SizeOfBtreeDelete      (offsetof(xl_btree_delete, nupdated) + sizeof(uint16))
+#define SizeOfBtreeDelete      (offsetof(xl_btree_delete, isCatalogRel) + sizeof(bool))
 
 /*
  * The offsets that appear in xl_btree_update metadata are offsets into the
index b9d6753533d5b9bf42d772aeafa09bcd17d9105f..a7f10bf2d3a2bd72e7bcf82a60246bdb341f9bc5 100644 (file)
@@ -240,6 +240,8 @@ typedef struct spgxlogVacuumRedirect
        uint16          nToPlaceholder; /* number of redirects to make placeholders */
        OffsetNumber firstPlaceholder;  /* first placeholder tuple to remove */
        TransactionId snapshotConflictHorizon;  /* newest XID of removed redirects */
+       bool            isCatalogRel;   /* to handle recovery conflict during logical
+                                                                * decoding on standby */
 
        /* offsets of redirect tuples to make placeholders follow */
        OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER];
index 9165b9456b9a04d37e44fb84d308ed70da38ec1a..8dfdbfa71efae59adccb377475e72263e7483f2c 100644 (file)
 #define VISIBILITYMAP_ALL_FROZEN       0x02
 #define VISIBILITYMAP_VALID_BITS       0x03    /* OR of all valid visibilitymap
                                                                                         * flags bits */
+/*
+ * To detect recovery conflicts during logical decoding on a standby, we need
+ * to know if a table is a user catalog table. For that we add an additional
+ * bit into xl_heap_visible.flags, in addition to the above.
+ *
+ * NB: VISIBILITYMAP_XLOG_* may not be passed to visibilitymap_set().
+ */
+#define VISIBILITYMAP_XLOG_CATALOG_REL 0x04
+#define VISIBILITYMAP_XLOG_VALID_BITS  (VISIBILITYMAP_VALID_BITS | VISIBILITYMAP_XLOG_CATALOG_REL)
 
 #endif                                                 /* VISIBILITYMAPDEFS_H */
index 8edae7bb079e1138359df61f0267082cd5ebc5b6..b0fd338a00ce7f63a2ac8b9ec3bd25169be61e9b 100644 (file)
@@ -31,7 +31,7 @@
 /*
  * Each page of XLOG file has a header like this:
  */
-#define XLOG_PAGE_MAGIC 0xD112 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD113 /* can be used as WAL version indicator */
 
 typedef struct XLogPageHeaderData
 {
index c0ddddb2f0da3979a82548e6597ca00df7ce9cac..31f84e90eb79d2ca7c93724b12365ecf07a514ce 100644 (file)
@@ -16,6 +16,7 @@
 
 #include "access/tupdesc.h"
 #include "access/xlog.h"
+#include "catalog/catalog.h"
 #include "catalog/pg_class.h"
 #include "catalog/pg_index.h"
 #include "catalog/pg_publication.h"