Fix hot standby latestRemovedXid processing.
authorHeikki Linnakangas <[email protected]>
Mon, 19 May 2014 15:22:01 +0000 (18:22 +0300)
committerHeikki Linnakangas <[email protected]>
Mon, 19 May 2014 15:22:01 +0000 (18:22 +0300)
plus readme updates

12 files changed:
src/backend/access/heap/heapam.c
src/backend/access/heap/pruneheap.c
src/backend/access/nbtree/nbtxlog.c
src/backend/access/rmgrdesc/heapdesc.c
src/backend/access/rmgrdesc/standbydesc.c
src/backend/access/transam/README
src/backend/commands/vacuumlazy.c
src/backend/storage/ipc/procarray.c
src/backend/storage/ipc/standby.c
src/include/access/heapam.h
src/include/access/heapam_xlog.h
src/include/storage/standby.h

index d78f0b1b929199f9d987ff8da866fc693a1c9190..4f746fa5c180f4ff2b444df3273871cf5f4c4c84 100644 (file)
@@ -40,6 +40,7 @@
  */
 #include "postgres.h"
 
+#include "access/clog.h"
 #include "access/heapam.h"
 #include "access/heapam_xlog.h"
 #include "access/hio.h"
@@ -6395,8 +6396,8 @@ heap_restrpos(HeapScanDesc scan)
  * with queries.
  */
 void
-HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
-                                                                          TransactionId *latestRemovedXid)
+HeapTupleHeaderAdvanceLatestRemoved(HeapTupleHeader tuple,
+                                                                       XLogRecPtr *latestRemovedCommitLSN)
 {
        TransactionId xmin = HeapTupleHeaderGetXmin(tuple);
        TransactionId xmax = HeapTupleHeaderGetUpdateXid(tuple);
@@ -6404,8 +6405,11 @@ HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
 
        if (tuple->t_infomask & HEAP_MOVED)
        {
-               if (TransactionIdPrecedes(*latestRemovedXid, xvac))
-                       *latestRemovedXid = xvac;
+               XLogRecPtr commitlsn = TransactionIdGetCommitLSN(xvac);
+
+               if (COMMITLSN_IS_NORMAL(commitlsn) &&
+                       commitlsn > *latestRemovedCommitLSN)
+                       *latestRemovedCommitLSN = commitlsn;
        }
 
        /*
@@ -6416,15 +6420,16 @@ HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
         * This needs to work on both master and standby, where it is used to
         * assess btree delete records.
         */
-       if (HeapTupleHeaderXminCommitted(tuple) ||
-               (!HeapTupleHeaderXminInvalid(tuple) && TransactionIdDidCommit(xmin)))
+       if (!HeapTupleHeaderXminInvalid(tuple) && xmax != xmin)
        {
-               if (xmax != xmin &&
-                       TransactionIdFollows(xmax, *latestRemovedXid))
-                       *latestRemovedXid = xmax;
+               XLogRecPtr commitlsn = TransactionIdGetCommitLSN(xmin);
+
+               if (COMMITLSN_IS_NORMAL(commitlsn) &&
+                       commitlsn > *latestRemovedCommitLSN)
+                       *latestRemovedCommitLSN = commitlsn;
        }
 
-       /* *latestRemovedXid may still be invalid at end */
+       /* *latestRemovedCommitLSN may still be invalid at end */
 }
 
 /*
@@ -6434,14 +6439,14 @@ HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
  * see comments for vacuum_log_cleanup_info().
  */
 XLogRecPtr
-log_heap_cleanup_info(RelFileNode rnode, TransactionId latestRemovedXid)
+log_heap_cleanup_info(RelFileNode rnode, XLogRecPtr latestRemovedCommitLSN)
 {
        xl_heap_cleanup_info xlrec;
        XLogRecPtr      recptr;
        XLogRecData rdata;
 
        xlrec.node = rnode;
-       xlrec.latestRemovedXid = latestRemovedXid;
+       xlrec.latestRemovedCommitLSN = latestRemovedCommitLSN;
 
        rdata.data = (char *) &xlrec;
        rdata.len = SizeOfHeapCleanupInfo;
@@ -6470,7 +6475,7 @@ log_heap_clean(Relation reln, Buffer buffer,
                           OffsetNumber *redirected, int nredirected,
                           OffsetNumber *nowdead, int ndead,
                           OffsetNumber *nowunused, int nunused,
-                          TransactionId latestRemovedXid)
+                          XLogRecPtr latestRemovedCommitLSN)
 {
        xl_heap_clean xlrec;
        uint8           info;
@@ -6482,7 +6487,7 @@ log_heap_clean(Relation reln, Buffer buffer,
 
        xlrec.node = reln->rd_node;
        xlrec.block = BufferGetBlockNumber(buffer);
-       xlrec.latestRemovedXid = latestRemovedXid;
+       xlrec.latestRemovedCommitLSN = latestRemovedCommitLSN;
        xlrec.nredirected = nredirected;
        xlrec.ndead = ndead;
 
@@ -7228,7 +7233,7 @@ heap_xlog_cleanup_info(XLogRecPtr lsn, XLogRecord *record)
        xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record);
 
        if (InHotStandby)
-               ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node);
+               ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedCommitLSN, xlrec->node);
 
        /*
         * Actual operation is a no-op. Record type exists to provide a means for
@@ -7266,8 +7271,8 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
         * conflict on the records that cause MVCC failures for user queries. If
         * latestRemovedXid is invalid, skip conflict processing.
         */
-       if (InHotStandby && TransactionIdIsValid(xlrec->latestRemovedXid))
-               ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+       if (InHotStandby && xlrec->latestRemovedCommitLSN != InvalidXLogRecPtr)
+               ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedCommitLSN,
                                                                                        xlrec->node);
 
        /*
index 605945a45f1318db58aad9ce2a94e73c380a82de..f6e2599777093d4a3b564d0098a8d0c67f4bf3a1 100644 (file)
@@ -31,8 +31,8 @@
 typedef struct
 {
        TransactionId new_prune_xid;    /* new prune hint value for page */
-       TransactionId latestRemovedXid;         /* latest xid to be removed by this
-                                                                                * prune */
+       XLogRecPtr latestRemovedCommitLSN;      /* latest commit LSN of a tuple
+                                                                                * removed by this prune */
        int                     nredirected;    /* numbers of entries in arrays below */
        int                     ndead;
        int                     nunused;
@@ -141,7 +141,7 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
                 */
                if (PageIsFull(page) || PageGetHeapFreeSpace(page) < minfree)
                {
-                       TransactionId ignore = InvalidTransactionId;            /* return value not
+                       XLogRecPtr ignore = InvalidXLogRecPtr;          /* return value not
                                                                                                                                 * needed */
 
                        /* OK to prune */
@@ -174,7 +174,7 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
  */
 int
 heap_page_prune(Relation relation, Buffer buffer, XLogRecPtr OldestSnapshot,
-                               bool report_stats, TransactionId *latestRemovedXid)
+                               bool report_stats, XLogRecPtr *latestRemovedCommitLSN)
 {
        int                     ndeleted = 0;
        Page            page = BufferGetPage(buffer);
@@ -194,7 +194,7 @@ heap_page_prune(Relation relation, Buffer buffer, XLogRecPtr OldestSnapshot,
         * initialize the rest of our working state.
         */
        prstate.new_prune_xid = InvalidTransactionId;
-       prstate.latestRemovedXid = *latestRemovedXid;
+       prstate.latestRemovedCommitLSN = *latestRemovedCommitLSN;
        prstate.nredirected = prstate.ndead = prstate.nunused = 0;
        memset(prstate.marked, 0, sizeof(prstate.marked));
 
@@ -262,7 +262,7 @@ heap_page_prune(Relation relation, Buffer buffer, XLogRecPtr OldestSnapshot,
                                                                        prstate.redirected, prstate.nredirected,
                                                                        prstate.nowdead, prstate.ndead,
                                                                        prstate.nowunused, prstate.nunused,
-                                                                       prstate.latestRemovedXid);
+                                                                       prstate.latestRemovedCommitLSN);
 
                        PageSetLSN(BufferGetPage(buffer), recptr);
                }
@@ -297,7 +297,7 @@ heap_page_prune(Relation relation, Buffer buffer, XLogRecPtr OldestSnapshot,
        if (report_stats && ndeleted > prstate.ndead)
                pgstat_update_heap_dead_tuples(relation, ndeleted - prstate.ndead);
 
-       *latestRemovedXid = prstate.latestRemovedXid;
+       *latestRemovedCommitLSN = prstate.latestRemovedCommitLSN;
 
        /*
         * XXX Should we update the FSM information of this page ?
@@ -402,8 +402,8 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
                                == HEAPTUPLE_DEAD && !HeapTupleHeaderIsHotUpdated(htup))
                        {
                                heap_prune_record_unused(prstate, rootoffnum);
-                               HeapTupleHeaderAdvanceLatestRemovedXid(htup,
-                                                                                                &prstate->latestRemovedXid);
+                               HeapTupleHeaderAdvanceLatestRemoved(htup,
+                                                                                                       &prstate->latestRemovedCommitLSN);
                                ndeleted++;
                        }
 
@@ -535,8 +535,8 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
                if (tupdead)
                {
                        latestdead = offnum;
-                       HeapTupleHeaderAdvanceLatestRemovedXid(htup,
-                                                                                                &prstate->latestRemovedXid);
+                       HeapTupleHeaderAdvanceLatestRemoved(htup,
+                                                                                               &prstate->latestRemovedCommitLSN);
                }
                else if (!recent_dead)
                        break;
index 5f9fc49e78ca1388ab482e24c8b5a873238ae0b6..df3f08b84ddce4f6eabcb201c9cfbdb173e9ca5e 100644 (file)
@@ -580,7 +580,7 @@ btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record)
 }
 
 /*
- * Get the latestRemovedXid from the heap pages pointed at by the index
+ * Get the latestRemovedCommitLSN from the heap pages pointed at by the index
  * tuples being deleted. This puts the work for calculating latestRemovedXid
  * into the recovery path rather than the primary path.
  *
@@ -591,7 +591,7 @@ btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record)
  * XXX optimise later with something like XLogPrefetchBuffer()
  */
 static TransactionId
-btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec)
+btree_xlog_delete_get_latestRemovedCommitLSN(xl_btree_delete *xlrec)
 {
        OffsetNumber *unused;
        Buffer          ibuffer,
@@ -604,7 +604,7 @@ btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec)
        HeapTupleHeader htuphdr;
        BlockNumber hblkno;
        OffsetNumber hoffnum;
-       TransactionId latestRemovedXid = InvalidTransactionId;
+       XLogRecPtr latestRemovedCommitLSN = InvalidXLogRecPtr;
        int                     i;
 
        /*
@@ -619,7 +619,7 @@ btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec)
         * coding will result in throwing a conflict anyway.
         */
        if (CountDBBackends(InvalidOid) == 0)
-               return latestRemovedXid;
+               return InvalidXLogRecPtr;
 
        /*
         * In what follows, we have to examine the previous state of the index
@@ -629,17 +629,17 @@ btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec)
         * won't have let in any user sessions before we reach consistency.
         */
        if (!reachedConsistency)
-               elog(PANIC, "btree_xlog_delete_get_latestRemovedXid: cannot operate with inconsistent data");
+               elog(PANIC, "btree_xlog_delete_get_latestRemovedCommitLSN: cannot operate with inconsistent data");
 
        /*
         * Get index page.  If the DB is consistent, this should not fail, nor
         * should any of the heap page fetches below.  If one does, we return
-        * InvalidTransactionId to cancel all HS transactions.  That's probably
+        * InvalidXLogRecPtr to cancel all HS transactions.  That's probably
         * overkill, but it's safe, and certainly better than panicking here.
         */
        ibuffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
        if (!BufferIsValid(ibuffer))
-               return InvalidTransactionId;
+               return InvalidXLogRecPtr;
        ipage = (Page) BufferGetPage(ibuffer);
 
        /*
@@ -664,7 +664,7 @@ btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec)
                if (!BufferIsValid(hbuffer))
                {
                        UnlockReleaseBuffer(ibuffer);
-                       return InvalidTransactionId;
+                       return InvalidXLogRecPtr;
                }
                hpage = (Page) BufferGetPage(hbuffer);
 
@@ -697,7 +697,7 @@ btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec)
                {
                        htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
 
-                       HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
+                       HeapTupleHeaderAdvanceLatestRemoved(htuphdr, &latestRemovedCommitLSN);
                }
                else if (ItemIdIsDead(hitemid))
                {
@@ -718,12 +718,12 @@ btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec)
 
        /*
         * If all heap tuples were LP_DEAD then we will be returning
-        * InvalidTransactionId here, which avoids conflicts. This matches
+        * InvalidXLogRecPtr here, which avoids conflicts. This matches
         * existing logic which assumes that LP_DEAD tuples must already be older
         * than the latestRemovedXid on the cleanup record that set them as
         * LP_DEAD, hence must already have generated a conflict.
         */
-       return latestRemovedXid;
+       return latestRemovedCommitLSN;
 }
 
 static void
@@ -747,9 +747,9 @@ btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
         */
        if (InHotStandby)
        {
-               TransactionId latestRemovedXid = btree_xlog_delete_get_latestRemovedXid(xlrec);
+               TransactionId latestRemovedCommitLSN = btree_xlog_delete_get_latestRemovedCommitLSN(xlrec);
 
-               ResolveRecoveryConflictWithSnapshot(latestRemovedXid, xlrec->node);
+               ResolveRecoveryConflictWithSnapshot(latestRemovedCommitLSN, xlrec->node);
        }
 
        /* If we have a full-page image, restore it and we're done */
index c8a61669dd292c3fd7345fcbd8b274487cbe9f6b..38103b3bd8bde9682f3118a3b1145831930b819f 100644 (file)
@@ -135,10 +135,11 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec)
        {
                xl_heap_clean *xlrec = (xl_heap_clean *) rec;
 
-               appendStringInfo(buf, "clean: rel %u/%u/%u; blk %u remxid %u",
+               appendStringInfo(buf, "clean: rel %u/%u/%u; blk %u remlsn %X/%X",
                                                 xlrec->node.spcNode, xlrec->node.dbNode,
                                                 xlrec->node.relNode, xlrec->block,
-                                                xlrec->latestRemovedXid);
+                                                (uint32) (xlrec->latestRemovedCommitLSN >> 32),
+                                                (uint32) xlrec->latestRemovedCommitLSN);
        }
        else if (info == XLOG_HEAP2_FREEZE_PAGE)
        {
@@ -157,8 +158,9 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec)
        {
                xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) rec;
 
-               appendStringInfo(buf, "cleanup info: remxid %u",
-                                                xlrec->latestRemovedXid);
+               appendStringInfo(buf, "cleanup info: remxid %X/%X",
+                                                (uint32) (xlrec->latestRemovedCommitLSN >> 32),
+                                                (uint32) xlrec->latestRemovedCommitLSN);
        }
        else if (info == XLOG_HEAP2_VISIBLE)
        {
index aba9a94aee9d1fb0db48078b33052e8b47c62bfe..0717b612baddb5b332f0a1f7cf385138f8ced50e 100644 (file)
@@ -19,9 +19,8 @@
 static void
 standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
 {
-       appendStringInfo(buf, " nextXid %u latestCompletedXid %u oldestRunningXid %u",
+       appendStringInfo(buf, " nextXid %u oldestRunningXid %u",
                                         xlrec->nextXid,
-                                        xlrec->latestCompletedXid,
                                         xlrec->oldestRunningXid);
 }
 
index bafd5576723749e7548b19f9d41ef2e97d46d463..a2be640759cbd64ebcaa41c741f7c45a230ea370 100644 (file)
@@ -245,12 +245,12 @@ committed".
 
 What we actually enforce is strict serialization of commits and rollbacks
 with snapshot-taking. We use the LSNs generated by Write-Ahead-Logging as
-a convenient monotonically-increasing counter, to serialize commits with
+a convenient monotonically increasing counter, to serialize commits with
 snapshots. Each commit is naturally assigned an LSN; it's the LSN of the
 commit WAL record. Snapshots are also represented by an LSN; all commits
 with a commit record's LSN <= the snapshot's LSN are considered as visible
-to the snapshot. Therefore acquiring a snapshot is a matter of reading the
-current WAL insert location.
+to the snapshot. Acquiring a snapshot is a matter of reading the current
+WAL insert location.
 
 That means that we need to be able to look up the commit LSN of each
 transaction, by XID. For that purpose, we store the commit LSN of each
@@ -268,13 +268,13 @@ XactLockTableWait(). That's quite heavy-weight, but the race should
 happen rarely.
 
 So, a snapshot is simply an LSN, such that all transactions that committed
-before that LSN are visible, and everything later is still considered
-as in-progress. However, to avoid consulting the clog every time the
-visibility of a tuple is checked, we also record a lower and upper bound of
-the XIDs considered visible by the snapshot, in SnapshotData. When a snapshot
-is taken, xmin is set to the current nextXid value; any transaction that
-begins after the snapshot is surely still running. The xmin is tracked
-lazily in shared memory, by AdvanceGlobalXmin().
+before that LSN are visible, and everything later is still considered as
+in-progress. However, to avoid consulting the clog every time the visibilty
+of a tuple is checked, we also record a lower and upper bound of the XIDs
+considered visible by the snapshot, in SnapshotData. When a snapshot is
+taken, xmax is set to the current nextXid value; any transaction that begins
+after the snapshot is surely still running. The xmin is tracked lazily in
+shared memory, by AdvanceGlobalXmin().
 
 We allow GetNewTransactionId to store the XID into MyPgXact->xid (or the
 subxid array) without taking ProcArrayLock.  This was once necessary to
index 322c60fc34a3cdfa0e2b7b27565ad91832150b69..e54c58e56451b449ac1aeaace5f8e024c8eaf9a6 100644 (file)
@@ -118,7 +118,7 @@ typedef struct LVRelStats
        int                     max_dead_tuples;        /* # slots allocated in array */
        ItemPointer dead_tuples;        /* array of ItemPointerData */
        int                     num_index_scans;
-       TransactionId latestRemovedXid;
+       XLogRecPtr      latestRemovedCommitLSN;
        bool            lock_waiter_detected;
 } LVRelStats;
 
@@ -396,8 +396,8 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
        /*
         * No need to write the record at all unless it contains a valid value
         */
-       if (TransactionIdIsValid(vacrelstats->latestRemovedXid))
-               (void) log_heap_cleanup_info(rel->rd_node, vacrelstats->latestRemovedXid);
+       if (vacrelstats->latestRemovedCommitLSN != InvalidXLogRecPtr)
+               (void) log_heap_cleanup_info(rel->rd_node, vacrelstats->latestRemovedCommitLSN);
 }
 
 /*
@@ -456,7 +456,7 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
        vacrelstats->rel_pages = nblocks;
        vacrelstats->scanned_pages = 0;
        vacrelstats->nonempty_pages = 0;
-       vacrelstats->latestRemovedXid = InvalidTransactionId;
+       vacrelstats->latestRemovedCommitLSN = InvalidXLogRecPtr;
 
        lazy_space_alloc(vacrelstats, nblocks);
        frozen = palloc(sizeof(xl_heap_freeze_tuple) * MaxHeapTuplesPerPage);
@@ -735,7 +735,7 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                 * We count tuples removed by the pruning step as removed by VACUUM.
                 */
                tups_vacuumed += heap_page_prune(onerel, buf, OldestSnapshot, false,
-                                                                                &vacrelstats->latestRemovedXid);
+                                                                                &vacrelstats->latestRemovedCommitLSN);
 
                /*
                 * Now scan the page to collect vacuumable items and check for tuples
@@ -892,8 +892,8 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                        if (tupgone)
                        {
                                lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
-                               HeapTupleHeaderAdvanceLatestRemovedXid(tuple.t_data,
-                                                                                        &vacrelstats->latestRemovedXid);
+                               HeapTupleHeaderAdvanceLatestRemoved(tuple.t_data,
+                                                                                        &vacrelstats->latestRemovedCommitLSN);
                                tups_vacuumed += 1;
                                has_dead_tuples = true;
                        }
@@ -1237,7 +1237,7 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
                recptr = log_heap_clean(onerel, buffer,
                                                                NULL, 0, NULL, 0,
                                                                unused, uncnt,
-                                                               vacrelstats->latestRemovedXid);
+                                                               vacrelstats->latestRemovedCommitLSN);
                PageSetLSN(page, recptr);
        }
 
index a77b724c7aa277d805ff79f8ce5bfe62ae8bf4ba..22225afe1e0b29753d5608f65306cc9746bd98c1 100644 (file)
@@ -289,11 +289,8 @@ ProcArrayEndTransaction(PGPROC *proc)
 {
        PGXACT     *pgxact = &allPgXact[proc->pgprocno];
 
-       /*
-        * We don't need to lock, because we're only changing our own struct.
-        * FIXME: Do we need to enforce some specific order for these stores
-        * anyway? Resetting snapshotlsn is not atomic, for example..
-        */
+       /* A shared lock is enough to modify our own fields */
+       LWLockAcquire(ProcArrayLock, LW_SHARED);
        pgxact->xid = InvalidTransactionId;
        proc->lxid = InvalidLocalTransactionId;
        pgxact->snapshotlsn = InvalidXLogRecPtr;
@@ -301,6 +298,7 @@ ProcArrayEndTransaction(PGPROC *proc)
        pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
        pgxact->delayChkpt = false;             /* be sure this is cleared in abort */
        proc->recoveryConflictPending = false;
+       LWLockRelease(ProcArrayLock);
 }
 
 
@@ -317,12 +315,8 @@ ProcArrayClearTransaction(PGPROC *proc)
 {
        PGXACT     *pgxact = &allPgXact[proc->pgprocno];
 
-       /*
-        * We can skip locking ProcArrayLock here, because this action does not
-        * actually change anyone's view of the set of running XIDs: our entry is
-        * duplicate with the gxact that has already been inserted into the
-        * ProcArray.
-        */
+       /* A shared lock is enough to modify our own fields. */
+       LWLockAcquire(ProcArrayLock, LW_SHARED);
        pgxact->xid = InvalidTransactionId;
        proc->lxid = InvalidLocalTransactionId;
        pgxact->snapshotlsn = InvalidXLogRecPtr;
@@ -331,6 +325,7 @@ ProcArrayClearTransaction(PGPROC *proc)
        /* redundant, but just in case */
        pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
        pgxact->delayChkpt = false;
+       LWLockRelease(ProcArrayLock);
 }
 
 /*
@@ -680,9 +675,6 @@ GetOldestSnapshotLSN(Relation rel, bool ignoreVacuum)
                        proc->databaseId == MyDatabaseId ||
                        proc->databaseId == 0)          /* always include WalSender */
                {
-                       /* FIXME: is this atomic? If not, do we have enough locking? If it
-                        * is, could we get away with less locking?
-                        */
                        XLogRecPtr snapshotlsn = pgxact->snapshotlsn;
 
                        if (snapshotlsn < result &&     snapshotlsn != InvalidXLogRecPtr)
@@ -813,8 +805,8 @@ ProcArrayInstallImportedSnapshot(XLogRecPtr snapshotlsn, TransactionId sourcexid
        if (!TransactionIdIsNormal(sourcexid))
                return false;
 
-       /* Get lock so source xact can't end while we're doing this */
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       /* Get exclusive lock so source xact can't end while we're doing this */
+       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
        for (index = 0; index < arrayP->numProcs; index++)
        {
@@ -1282,23 +1274,15 @@ IsBackendPid(int pid)
  *
  * The arguments allow filtering the set of VXIDs returned.  Our own process
  * is always skipped.  In addition:
- *     If limitSnapshotLSN is not InvalidTransactionId, skip processes with
+ *     If limitSnapshotLSN is not InvalidXLogRecPtr, skip processes with
  *             snapshotlsn > limitSnapshotLSN, or no snapshot at all .
  *     If allDbs is false, skip processes attached to other databases.
  *     If excludeVacuum isn't zero, skip processes for which
  *             (vacuumFlags & excludeVacuum) is not zero.
  *
- * Note: the purpose of the limitXmin and excludeXmin0 parameters is to
+ * Note: the purpose of the limitSnapshotLSN parameter is to
  * allow skipping backends whose oldest live snapshot is no older than
- * some snapshot we have.  Since we examine the procarray with only shared
- * lock, there are race conditions: a backend could set its xmin just after
- * we look.  Indeed, on multiprocessors with weak memory ordering, the
- * other backend could have set its xmin *before* we look.  We know however
- * that such a backend must have held shared ProcArrayLock overlapping our
- * own hold of ProcArrayLock, else we would see its xmin update.  Therefore,
- * any snapshot the other backend is taking concurrently with our scan cannot
- * consider any transactions as still running that we think are committed
- * (since backends must hold ProcArrayLock exclusive to commit).
+ * some snapshot we have.
  */
 VirtualTransactionId *
 GetCurrentVirtualXIDs(XLogRecPtr limitSnapshotLSN,
@@ -1314,7 +1298,11 @@ GetCurrentVirtualXIDs(XLogRecPtr limitSnapshotLSN,
        vxids = (VirtualTransactionId *)
                palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
 
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       /*
+        * Take an exclusive lock, to prevent backends from changing their
+        * snapshotlsn values while we're reading them.
+        */
+       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
        for (index = 0; index < arrayP->numProcs; index++)
        {
@@ -1330,11 +1318,6 @@ GetCurrentVirtualXIDs(XLogRecPtr limitSnapshotLSN,
 
                if (allDbs || proc->databaseId == MyDatabaseId)
                {
-                       /*
-                        * Fetch LSN just once - might change on us
-                        *
-                        * FIXME: really?
-                        */
                        XLogRecPtr snapshotlsn = pgxact->snapshotlsn;
 
                        if (limitSnapshotLSN == InvalidXLogRecPtr ||
@@ -1362,25 +1345,10 @@ GetCurrentVirtualXIDs(XLogRecPtr limitSnapshotLSN,
  * limitXmin is supplied as either latestRemovedXid, or InvalidTransactionId
  * in cases where we cannot accurately determine a value for latestRemovedXid.
  *
- * If limitSnapshotLSN is InvalidTransactionId then we want to kill everybody,
+ * If limitSnapshotLSN is InvalidXLogRecPtr then we want to kill everybody,
  * so we're not worried if they have a snapshot or not. Otherwise the result
- * only includes those backends that hold onto a snapshot that still considers
- * transactions committed before limitSnapshotLSN as in-progress.
- *
- * FIXME: the below comment is obsolete (or belons somewhere else now)
- * All callers that are checking xmins always now supply a valid and useful
- * value for limitXmin. The limitXmin is always lower than the lowest
- * numbered KnownAssignedXid that is not already a FATAL error. This is
- * because we only care about cleanup records that are cleaning up tuple
- * versions from committed transactions. In that case they will only occur
- * at the point where the record is less than the lowest running xid. That
- * allows us to say that if any backend takes a snapshot concurrently with
- * us then the conflict assessment made here would never include the snapshot
- * that is being derived. So we take LW_SHARED on the ProcArray and allow
- * concurrent snapshots when limitXmin is valid. We might think about adding
- *      Assert(limitXmin < lowest(KnownAssignedXids))
- * but that would not be true in the case of FATAL errors lagging in array,
- * but we already know those are bogus anyway, so we skip that test.
+ * only includes those backends that hold onto a snapshot that's older than
+ * limitSnapshotLSN.
  *
  * If dbOid is valid we skip backends attached to other databases.
  *
@@ -1410,7 +1378,11 @@ GetConflictingVirtualXIDs(XLogRecPtr limitSnapshotLSN, Oid dbOid)
                                         errmsg("out of memory")));
        }
 
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       /*
+        * Take an exclusive lock, to prevent backends from changing their
+        * snapshotlsn values while we're reading them.
+        */
+       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
        for (index = 0; index < arrayP->numProcs; index++)
        {
@@ -1425,8 +1397,6 @@ GetConflictingVirtualXIDs(XLogRecPtr limitSnapshotLSN, Oid dbOid)
                if (!OidIsValid(dbOid) ||
                        proc->databaseId == dbOid)
                {
-                       /* Fetch LSN just once - can't change on us, but good coding */
-                       /* FIXME: this ain't really atomic... */
                        XLogRecPtr snapshotlsn = pgxact->snapshotlsn;
 
                        /*
index 35fde9867403d9d47b76b956c00863315a1c8a40..0322eba79afe8334c26b68f38b8a4b38670e665a 100644 (file)
@@ -772,7 +772,6 @@ standby_redo(XLogRecPtr lsn, XLogRecord *record)
                RunningTransactionsData running;
 
                running.nextXid = xlrec->nextXid;
-               running.latestCompletedXid = xlrec->latestCompletedXid;
                running.oldestRunningXid = xlrec->oldestRunningXid;
 
                ProcArrayApplyRecoveryInfo(&running);
@@ -915,7 +914,6 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 
        xlrec.nextXid = CurrRunningXacts->nextXid;
        xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
-       xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid;
 
        /* Header */
        rdata.data = (char *) (&xlrec);
index bbd4a7c11b68bf6eeba539aa18a6aca2c04635be..aebd0944c4c4f32bf8f6fc2341de5c84fc0c44d6 100644 (file)
@@ -167,7 +167,7 @@ extern void heap_sync(Relation relation);
 extern void heap_page_prune_opt(Relation relation, Buffer buffer);
 extern int heap_page_prune(Relation relation, Buffer buffer,
                                XLogRecPtr OldestSnapshot,
-                               bool report_stats, TransactionId *latestRemovedXid);
+                               bool report_stats, XLogRecPtr *latestRemovedCommitLSN);
 extern void heap_page_prune_execute(Buffer buffer,
                                                OffsetNumber *redirected, int nredirected,
                                                OffsetNumber *nowdead, int ndead,
index cfdd1ffbefc37246a8243149be4a2cd04e28e5f3..866f000d1aaa1a5eb7fe9a0a16ed7ec3f390e093 100644 (file)
@@ -216,7 +216,7 @@ typedef struct xl_heap_clean
 {
        RelFileNode node;
        BlockNumber block;
-       TransactionId latestRemovedXid;
+       XLogRecPtr      latestRemovedCommitLSN;
        uint16          nredirected;
        uint16          ndead;
        /* OFFSET NUMBERS FOLLOW */
@@ -226,13 +226,13 @@ typedef struct xl_heap_clean
 
 /*
  * Cleanup_info is required in some cases during a lazy VACUUM.
- * Used for reporting the results of HeapTupleHeaderAdvanceLatestRemovedXid()
+ * Used for reporting the results of HeapTupleHeaderAdvanceLatestRemoved()
  * see vacuumlazy.c for full explanation
  */
 typedef struct xl_heap_cleanup_info
 {
        RelFileNode node;
-       TransactionId latestRemovedXid;
+       XLogRecPtr latestRemovedCommitLSN;
 } xl_heap_cleanup_info;
 
 #define SizeOfHeapCleanupInfo (sizeof(xl_heap_cleanup_info))
@@ -364,8 +364,8 @@ typedef struct xl_heap_rewrite_mapping
        XLogRecPtr      start_lsn;              /* Insert LSN at begin of rewrite */
 } xl_heap_rewrite_mapping;
 
-extern void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
-                                                                          TransactionId *latestRemovedXid);
+extern void HeapTupleHeaderAdvanceLatestRemoved(HeapTupleHeader tuple,
+                                                                          XLogRecPtr *latestRemovedCommitLSN);
 
 extern void heap_redo(XLogRecPtr lsn, XLogRecord *rptr);
 extern void heap_desc(StringInfo buf, uint8 xl_info, char *rec);
@@ -374,12 +374,12 @@ extern void heap2_desc(StringInfo buf, uint8 xl_info, char *rec);
 extern void heap_xlog_logical_rewrite(XLogRecPtr lsn, XLogRecord *r);
 
 extern XLogRecPtr log_heap_cleanup_info(RelFileNode rnode,
-                                         TransactionId latestRemovedXid);
+                                         XLogRecPtr latestRemovedCommitLSN);
 extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
                           OffsetNumber *redirected, int nredirected,
                           OffsetNumber *nowdead, int ndead,
                           OffsetNumber *nowunused, int nunused,
-                          TransactionId latestRemovedXid);
+                          XLogRecPtr latestRemovedCommitLSN);
 extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer,
                                TransactionId cutoff_xid, xl_heap_freeze_tuple *tuples,
                                int ntuples);
index e7cdd633f53a98b10f4932b7474d6279d9cf4d79..163bc43450c20d707442e9efb78432c2919bf6c1 100644 (file)
@@ -69,10 +69,9 @@ typedef struct xl_running_xacts
 {
        TransactionId nextXid;          /* copy of ShmemVariableCache->nextXid */
        TransactionId oldestRunningXid;         /* *not* oldestXmin */
-       TransactionId latestCompletedXid;       /* so we can set xmax */
 } xl_running_xacts;
 
-#define MinSizeOfXactRunningXacts (offsetof(xl_running_xacts, latestCompletedXid) + sizeof(TransactionId))
+#define MinSizeOfXactRunningXacts (offsetof(xl_running_xacts, oldestRunningXid) + sizeof(TransactionId))
 
 
 /* Recovery handlers for the Standby Rmgr (RM_STANDBY_ID) */