Reimplement ProcArrayLock as a new type of FlexLock. flexlock
authorRobert Haas <[email protected]>
Mon, 14 Nov 2011 15:04:55 +0000 (10:04 -0500)
committerRobert Haas <[email protected]>
Fri, 2 Dec 2011 11:35:30 +0000 (06:35 -0500)
By providing some custom handling for ProcArrayEndTransaction, we can
avoid the need for ending transactions to repeatedly acquire the
spinlock.  The amount of work that needs to be done while holding the
lock is so small that we can do it while holding the spinlock, or
(when the lock is contended) make the last person to release the lock
do it on behalf of the ending backend.  This greatly improves
performance for unlogged tables at high client counts; permanent
tables also benefit, but performance is still severely throttled by
WALInsertLock contention.

src/backend/commands/analyze.c
src/backend/commands/vacuum.c
src/backend/storage/ipc/procarray.c
src/backend/storage/lmgr/Makefile
src/backend/storage/lmgr/flexlock.c
src/backend/storage/lmgr/proc.c
src/backend/storage/lmgr/procarraylock.c [new file with mode: 0644]
src/include/storage/flexlock_internals.h
src/include/storage/procarraylock.h [new file with mode: 0644]

index 314324618a8216e7adc1eb2c18175f9a6b2ae5dd..2e972ec2806794a83a96533e9119c50d843812d3 100644 (file)
@@ -40,6 +40,7 @@
 #include "storage/lmgr.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
+#include "storage/procarraylock.h"
 #include "utils/acl.h"
 #include "utils/attoptcache.h"
 #include "utils/datum.h"
@@ -222,9 +223,9 @@ analyze_rel(Oid relid, VacuumStmt *vacstmt, BufferAccessStrategy bstrategy)
        /*
         * OK, let's do it.  First let other backends know I'm in ANALYZE.
         */
-       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+       ProcArrayLockAcquire(PAL_EXCLUSIVE);
        MyPgXact->vacuumFlags |= PROC_IN_ANALYZE;
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 
        /*
         * Do the normal non-recursive ANALYZE.
@@ -249,9 +250,9 @@ analyze_rel(Oid relid, VacuumStmt *vacstmt, BufferAccessStrategy bstrategy)
         * Reset my PGPROC flag.  Note: we need this here, and not in vacuum_rel,
         * because the vacuum flag is cleared by the end-of-xact code.
         */
-       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+       ProcArrayLockAcquire(PAL_EXCLUSIVE);
        MyPgXact->vacuumFlags &= ~PROC_IN_ANALYZE;
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 }
 
 /*
index e70dbedbd056369cccc9b55acbca3a0209203b23..09aa32b95ae9186f3120521931503fd1016b7a03 100644 (file)
@@ -39,6 +39,7 @@
 #include "storage/lmgr.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
+#include "storage/procarraylock.h"
 #include "utils/acl.h"
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
@@ -895,11 +896,11 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt, bool do_toast, bool for_wraparound)
                 * MyProc->xid/xmin, else OldestXmin might appear to go backwards,
                 * which is probably Not Good.
                 */
-               LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+               ProcArrayLockAcquire(PAL_EXCLUSIVE);
                MyPgXact->vacuumFlags |= PROC_IN_VACUUM;
                if (for_wraparound)
                        MyPgXact->vacuumFlags |= PROC_VACUUM_FOR_WRAPAROUND;
-               LWLockRelease(ProcArrayLock);
+               ProcArrayLockRelease();
        }
 
        /*
index 19ff524a6040b3453a53b4d94941e9ff20ba3e8d..d457e3f95748bab41962bdf13c9b91d839ffc2ef 100644 (file)
@@ -52,6 +52,7 @@
 #include "access/twophase.h"
 #include "miscadmin.h"
 #include "storage/procarray.h"
+#include "storage/procarraylock.h"
 #include "storage/spin.h"
 #include "utils/builtins.h"
 #include "utils/snapmgr.h"
@@ -261,7 +262,7 @@ ProcArrayAdd(PGPROC *proc)
        ProcArrayStruct *arrayP = procArray;
        int                     index;
 
-       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+       ProcArrayLockAcquire(PAL_EXCLUSIVE);
 
        if (arrayP->numProcs >= arrayP->maxProcs)
        {
@@ -270,7 +271,7 @@ ProcArrayAdd(PGPROC *proc)
                 * fixed supply of PGPROC structs too, and so we should have failed
                 * earlier.)
                 */
-               LWLockRelease(ProcArrayLock);
+               ProcArrayLockRelease();
                ereport(FATAL,
                                (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
                                 errmsg("sorry, too many clients already")));
@@ -300,7 +301,7 @@ ProcArrayAdd(PGPROC *proc)
        arrayP->pgprocnos[index] = proc->pgprocno;
        arrayP->numProcs++;
 
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 }
 
 /*
@@ -325,7 +326,7 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
                DisplayXidCache();
 #endif
 
-       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+       ProcArrayLockAcquire(PAL_EXCLUSIVE);
 
        if (TransactionIdIsValid(latestXid))
        {
@@ -351,13 +352,13 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
                                        (arrayP->numProcs - index - 1) * sizeof (int));
                        arrayP->pgprocnos[arrayP->numProcs - 1] = -1; /* for debugging */
                        arrayP->numProcs--;
-                       LWLockRelease(ProcArrayLock);
+                       ProcArrayLockRelease();
                        return;
                }
        }
 
        /* Ooops */
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 
        elog(LOG, "failed to find proc %p in ProcArray", proc);
 }
@@ -383,54 +384,19 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
 
        if (TransactionIdIsValid(latestXid))
        {
-               /*
-                * We must lock ProcArrayLock while clearing our advertised XID, so
-                * that we do not exit the set of "running" transactions while someone
-                * else is taking a snapshot.  See discussion in
-                * src/backend/access/transam/README.
-                */
-               Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
-
-               LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
-
-               pgxact->xid = InvalidTransactionId;
-               proc->lxid = InvalidLocalTransactionId;
-               pgxact->xmin = InvalidTransactionId;
-               /* must be cleared with xid/xmin: */
-               pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
-               pgxact->inCommit = false; /* be sure this is cleared in abort */
-               proc->recoveryConflictPending = false;
-
-               /* Clear the subtransaction-XID cache too while holding the lock */
-               pgxact->nxids = 0;
-               pgxact->overflowed = false;
-
-               /* Also advance global latestCompletedXid while holding the lock */
-               if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
-                                                                 latestXid))
-                       ShmemVariableCache->latestCompletedXid = latestXid;
-
-               LWLockRelease(ProcArrayLock);
+               Assert(proc == MyProc);
+               ProcArrayLockClearTransaction(latestXid);               
        }
        else
        {
-               /*
-                * If we have no XID, we don't need to lock, since we won't affect
-                * anyone else's calculation of a snapshot.  We might change their
-                * estimate of global xmin, but that's OK.
-                */
-               Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
-
-               proc->lxid = InvalidLocalTransactionId;
                pgxact->xmin = InvalidTransactionId;
                /* must be cleared with xid/xmin: */
                pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
-               pgxact->inCommit = false; /* be sure this is cleared in abort */
-               proc->recoveryConflictPending = false;
-
-               Assert(pgxact->nxids == 0);
-               Assert(pgxact->overflowed == false);
        }
+
+       proc->lxid = InvalidLocalTransactionId;
+       pgxact->inCommit = false; /* be sure this is cleared in abort */
+       proc->recoveryConflictPending = false;
 }
 
 
@@ -562,7 +528,7 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
        /*
         * Nobody else is running yet, but take locks anyhow
         */
-       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+       ProcArrayLockAcquire(PAL_EXCLUSIVE);
 
        /*
         * KnownAssignedXids is sorted so we cannot just add the xids, we have to
@@ -669,7 +635,7 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
        Assert(TransactionIdIsNormal(ShmemVariableCache->latestCompletedXid));
        Assert(TransactionIdIsValid(ShmemVariableCache->nextXid));
 
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 
        KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
        if (standbyState == STANDBY_SNAPSHOT_READY)
@@ -724,7 +690,7 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
        /*
         * Uses same locking as transaction commit
         */
-       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+       ProcArrayLockAcquire(PAL_EXCLUSIVE);
 
        /*
         * Remove subxids from known-assigned-xacts.
@@ -737,7 +703,7 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
        if (TransactionIdPrecedes(procArray->lastOverflowedXid, max_xid))
                procArray->lastOverflowedXid = max_xid;
 
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 }
 
 /*
@@ -829,7 +795,7 @@ TransactionIdIsInProgress(TransactionId xid)
                                         errmsg("out of memory")));
        }
 
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       ProcArrayLockAcquire(PAL_SHARED);
 
        /*
         * Now that we have the lock, we can check latestCompletedXid; if the
@@ -837,7 +803,7 @@ TransactionIdIsInProgress(TransactionId xid)
         */
        if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, xid))
        {
-               LWLockRelease(ProcArrayLock);
+               ProcArrayLockRelease();
                xc_by_latest_xid_inc();
                return true;
        }
@@ -865,7 +831,7 @@ TransactionIdIsInProgress(TransactionId xid)
                 */
                if (TransactionIdEquals(pxid, xid))
                {
-                       LWLockRelease(ProcArrayLock);
+                       ProcArrayLockRelease();
                        xc_by_main_xid_inc();
                        return true;
                }
@@ -887,7 +853,7 @@ TransactionIdIsInProgress(TransactionId xid)
 
                        if (TransactionIdEquals(cxid, xid))
                        {
-                               LWLockRelease(ProcArrayLock);
+                               ProcArrayLockRelease();
                                xc_by_child_xid_inc();
                                return true;
                        }
@@ -915,7 +881,7 @@ TransactionIdIsInProgress(TransactionId xid)
 
                if (KnownAssignedXidExists(xid))
                {
-                       LWLockRelease(ProcArrayLock);
+                       ProcArrayLockRelease();
                        xc_by_known_assigned_inc();
                        return true;
                }
@@ -931,7 +897,7 @@ TransactionIdIsInProgress(TransactionId xid)
                        nxids = KnownAssignedXidsGet(xids, xid);
        }
 
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 
        /*
         * If none of the relevant caches overflowed, we know the Xid is not
@@ -997,7 +963,7 @@ TransactionIdIsActive(TransactionId xid)
        if (TransactionIdPrecedes(xid, RecentXmin))
                return false;
 
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       ProcArrayLockAcquire(PAL_SHARED);
 
        for (i = 0; i < arrayP->numProcs; i++)
        {
@@ -1022,7 +988,7 @@ TransactionIdIsActive(TransactionId xid)
                }
        }
 
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 
        return result;
 }
@@ -1085,7 +1051,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
        /* Cannot look for individual databases during recovery */
        Assert(allDbs || !RecoveryInProgress());
 
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       ProcArrayLockAcquire(PAL_SHARED);
 
        /*
         * We initialize the MIN() calculation with latestCompletedXid + 1. This
@@ -1140,7 +1106,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
                 */
                TransactionId kaxmin = KnownAssignedXidsGetOldestXmin();
 
-               LWLockRelease(ProcArrayLock);
+               ProcArrayLockRelease();
 
                if (TransactionIdIsNormal(kaxmin) &&
                        TransactionIdPrecedes(kaxmin, result))
@@ -1151,7 +1117,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
                /*
                 * No other information needed, so release the lock immediately.
                 */
-               LWLockRelease(ProcArrayLock);
+               ProcArrayLockRelease();
 
                /*
                 * Compute the cutoff XID by subtracting vacuum_defer_cleanup_age,
@@ -1280,7 +1246,7 @@ GetSnapshotData(Snapshot snapshot)
         * It is sufficient to get shared lock on ProcArrayLock, even if we are
         * going to set MyProc->xmin.
         */
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       ProcArrayLockAcquire(PAL_SHARED);
 
        /* xmax is always latestCompletedXid + 1 */
        xmax = ShmemVariableCache->latestCompletedXid;
@@ -1418,7 +1384,7 @@ GetSnapshotData(Snapshot snapshot)
 
        if (!TransactionIdIsValid(MyPgXact->xmin))
                MyPgXact->xmin = TransactionXmin = xmin;
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 
        /*
         * Update globalxmin to include actual process xids.  This is a slightly
@@ -1475,7 +1441,7 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
                return false;
 
        /* Get lock so source xact can't end while we're doing this */
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       ProcArrayLockAcquire(PAL_SHARED);
 
        for (index = 0; index < arrayP->numProcs; index++)
        {
@@ -1521,7 +1487,7 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
                break;
        }
 
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 
        return result;
 }
@@ -1595,7 +1561,7 @@ GetRunningTransactionData(void)
         * Ensure that no xids enter or leave the procarray while we obtain
         * snapshot.
         */
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       ProcArrayLockAcquire(PAL_SHARED);
        LWLockAcquire(XidGenLock, LW_SHARED);
 
        latestCompletedXid = ShmemVariableCache->latestCompletedXid;
@@ -1658,7 +1624,7 @@ GetRunningTransactionData(void)
        CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
 
        /* We don't release XidGenLock here, the caller is responsible for that */
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 
        Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
        Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));
@@ -1691,7 +1657,7 @@ GetOldestActiveTransactionId(void)
 
        Assert(!RecoveryInProgress());
 
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       ProcArrayLockAcquire(PAL_SHARED);
 
        oldestRunningXid = ShmemVariableCache->nextXid;
 
@@ -1720,7 +1686,7 @@ GetOldestActiveTransactionId(void)
                 */
        }
 
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 
        return oldestRunningXid;
 }
@@ -1753,7 +1719,7 @@ GetTransactionsInCommit(TransactionId **xids_p)
        xids = (TransactionId *) palloc(arrayP->maxProcs * sizeof(TransactionId));
        nxids = 0;
 
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       ProcArrayLockAcquire(PAL_SHARED);
 
        for (index = 0; index < arrayP->numProcs; index++)
        {
@@ -1768,7 +1734,7 @@ GetTransactionsInCommit(TransactionId **xids_p)
                        xids[nxids++] = pxid;
        }
 
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 
        *xids_p = xids;
        return nxids;
@@ -1790,7 +1756,7 @@ HaveTransactionsInCommit(TransactionId *xids, int nxids)
        ProcArrayStruct *arrayP = procArray;
        int                     index;
 
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       ProcArrayLockAcquire(PAL_SHARED);
 
        for (index = 0; index < arrayP->numProcs; index++)
        {
@@ -1818,7 +1784,7 @@ HaveTransactionsInCommit(TransactionId *xids, int nxids)
                }
        }
 
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 
        return result;
 }
@@ -1840,7 +1806,7 @@ BackendPidGetProc(int pid)
        if (pid == 0)                           /* never match dummy PGPROCs */
                return NULL;
 
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       ProcArrayLockAcquire(PAL_SHARED);
 
        for (index = 0; index < arrayP->numProcs; index++)
        {
@@ -1853,7 +1819,7 @@ BackendPidGetProc(int pid)
                }
        }
 
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 
        return result;
 }
@@ -1881,7 +1847,7 @@ BackendXidGetPid(TransactionId xid)
        if (xid == InvalidTransactionId)        /* never match invalid xid */
                return 0;
 
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       ProcArrayLockAcquire(PAL_SHARED);
 
        for (index = 0; index < arrayP->numProcs; index++)
        {
@@ -1896,7 +1862,7 @@ BackendXidGetPid(TransactionId xid)
                }
        }
 
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 
        return result;
 }
@@ -1951,7 +1917,7 @@ GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
        vxids = (VirtualTransactionId *)
                palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
 
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       ProcArrayLockAcquire(PAL_SHARED);
 
        for (index = 0; index < arrayP->numProcs; index++)
        {
@@ -1989,7 +1955,7 @@ GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
                }
        }
 
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 
        *nvxids = count;
        return vxids;
@@ -2048,7 +2014,7 @@ GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
                                         errmsg("out of memory")));
        }
 
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       ProcArrayLockAcquire(PAL_SHARED);
 
        for (index = 0; index < arrayP->numProcs; index++)
        {
@@ -2083,7 +2049,7 @@ GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
                }
        }
 
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 
        /* add the terminator */
        vxids[count].backendId = InvalidBackendId;
@@ -2104,7 +2070,7 @@ CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode)
        int                     index;
        pid_t           pid = 0;
 
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       ProcArrayLockAcquire(PAL_SHARED);
 
        for (index = 0; index < arrayP->numProcs; index++)
        {
@@ -2131,7 +2097,7 @@ CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode)
                }
        }
 
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 
        return pid;
 }
@@ -2207,7 +2173,7 @@ CountDBBackends(Oid databaseid)
        int                     count = 0;
        int                     index;
 
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       ProcArrayLockAcquire(PAL_SHARED);
 
        for (index = 0; index < arrayP->numProcs; index++)
        {
@@ -2221,7 +2187,7 @@ CountDBBackends(Oid databaseid)
                        count++;
        }
 
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 
        return count;
 }
@@ -2237,7 +2203,7 @@ CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending)
        pid_t           pid = 0;
 
        /* tell all backends to die */
-       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+       ProcArrayLockAcquire(PAL_EXCLUSIVE);
 
        for (index = 0; index < arrayP->numProcs; index++)
        {
@@ -2263,7 +2229,7 @@ CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending)
                }
        }
 
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 }
 
 /*
@@ -2276,7 +2242,7 @@ CountUserBackends(Oid roleid)
        int                     count = 0;
        int                     index;
 
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       ProcArrayLockAcquire(PAL_SHARED);
 
        for (index = 0; index < arrayP->numProcs; index++)
        {
@@ -2289,7 +2255,7 @@ CountUserBackends(Oid roleid)
                        count++;
        }
 
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 
        return count;
 }
@@ -2337,7 +2303,7 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
 
                *nbackends = *nprepared = 0;
 
-               LWLockAcquire(ProcArrayLock, LW_SHARED);
+               ProcArrayLockAcquire(PAL_SHARED);
 
                for (index = 0; index < arrayP->numProcs; index++)
                {
@@ -2363,7 +2329,7 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
                        }
                }
 
-               LWLockRelease(ProcArrayLock);
+               ProcArrayLockRelease();
 
                if (!found)
                        return false;           /* no conflicting backends, so done */
@@ -2416,7 +2382,7 @@ XidCacheRemoveRunningXids(TransactionId xid,
         * to abort subtransactions, but pending closer analysis we'd best be
         * conservative.
         */
-       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+       ProcArrayLockAcquire(PAL_EXCLUSIVE);
 
        /*
         * Under normal circumstances xid and xids[] will be in increasing order,
@@ -2464,7 +2430,7 @@ XidCacheRemoveRunningXids(TransactionId xid,
                                                          latestXid))
                ShmemVariableCache->latestCompletedXid = latestXid;
 
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 }
 
 #ifdef XIDCACHE_DEBUG
@@ -2631,7 +2597,7 @@ ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
        /*
         * Uses same locking as transaction commit
         */
-       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+       ProcArrayLockAcquire(PAL_EXCLUSIVE);
 
        KnownAssignedXidsRemoveTree(xid, nsubxids, subxids);
 
@@ -2640,7 +2606,7 @@ ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
                                                          max_xid))
                ShmemVariableCache->latestCompletedXid = max_xid;
 
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 }
 
 /*
@@ -2650,9 +2616,9 @@ ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
 void
 ExpireAllKnownAssignedTransactionIds(void)
 {
-       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+       ProcArrayLockAcquire(PAL_EXCLUSIVE);
        KnownAssignedXidsRemovePreceding(InvalidTransactionId);
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 }
 
 /*
@@ -2662,9 +2628,9 @@ ExpireAllKnownAssignedTransactionIds(void)
 void
 ExpireOldKnownAssignedTransactionIds(TransactionId xid)
 {
-       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+       ProcArrayLockAcquire(PAL_EXCLUSIVE);
        KnownAssignedXidsRemovePreceding(xid);
-       LWLockRelease(ProcArrayLock);
+       ProcArrayLockRelease();
 }
 
 
@@ -2886,7 +2852,7 @@ KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
        {
                /* must hold lock to compress */
                if (!exclusive_lock)
-                       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+                       ProcArrayLockAcquire(PAL_EXCLUSIVE);
 
                KnownAssignedXidsCompress(true);
 
@@ -2894,7 +2860,7 @@ KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
                /* note: we no longer care about the tail pointer */
 
                if (!exclusive_lock)
-                       LWLockRelease(ProcArrayLock);
+                       ProcArrayLockRelease();
 
                /*
                 * If it still won't fit then we're out of memory
index 3730e51c7e42ad10bf19a561ffe9681be9a436d1..27eaa97020a3882dbf46bfd0702c30537556575b 100644 (file)
@@ -13,7 +13,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = flexlock.o lmgr.o lock.o proc.o deadlock.o lwlock.o spin.o s_lock.o \
-       predicate.o
+       procarraylock.o predicate.o
 
 include $(top_srcdir)/src/backend/common.mk
 
index 1bd3dc727e2c4af6f29ad183774a22ee8a692e9e..614595100b3ed22b811a67de54e750bb15a89395 100644 (file)
@@ -30,6 +30,7 @@
 #include "storage/flexlock.h"
 #include "storage/flexlock_internals.h"
 #include "storage/predicate.h"
+#include "storage/procarraylock.h"
 #include "storage/spin.h"
 
 /*
@@ -176,9 +177,14 @@ CreateFlexLocks(void)
 
        FlexLockArray = (FlexLockPadded *) ptr;
 
-       /* All of the "fixed" FlexLocks are LWLocks. */
+       /* All of the "fixed" FlexLocks are LWLocks - except ProcArrayLock. */
        for (id = 0, lock = FlexLockArray; id < NumFixedFlexLocks; id++, lock++)
-               FlexLockInit(&lock->flex, FLEXLOCK_TYPE_LWLOCK);
+       {
+               if (id == ProcArrayLock)
+                       FlexLockInit(&lock->flex, FLEXLOCK_TYPE_PROCARRAYLOCK);
+               else
+                       FlexLockInit(&lock->flex, FLEXLOCK_TYPE_LWLOCK);
+       }
 
        /*
         * Initialize the dynamic-allocation counter, which is stored just before
@@ -322,13 +328,20 @@ FlexLockReleaseAll(void)
 {
        while (num_held_flexlocks > 0)
        {
+               FlexLockId      id;
+               FlexLock   *flex;
+
                HOLD_INTERRUPTS();              /* match the upcoming RESUME_INTERRUPTS */
 
-               /*
-                * FLEXTODO: When we have multiple types of flex locks, this will
-                * need to call the appropriate release function for each lock type.
-                */
-               LWLockRelease(held_flexlocks[num_held_flexlocks - 1]);
+               id = held_flexlocks[num_held_flexlocks - 1];
+               flex = &FlexLockArray[id].flex;
+               if (flex->locktype == FLEXLOCK_TYPE_LWLOCK)
+                       LWLockRelease(id);
+               else
+               {
+                       Assert(id == ProcArrayLock);
+                       ProcArrayLockRelease();
+               }
        }
 }
 
index b402999d8ec8c206bd2a5bc39455ae60c7757aea..10ec83b26f3780b91b46b954d20f27621d6ce3e4 100644 (file)
@@ -46,6 +46,7 @@
 #include "storage/pmsignal.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
+#include "storage/procarraylock.h"
 #include "storage/procsignal.h"
 #include "storage/spin.h"
 #include "utils/timestamp.h"
@@ -1083,7 +1084,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
                        PGPROC     *autovac = GetBlockingAutoVacuumPgproc();
                        PGXACT     *autovac_pgxact = &ProcGlobal->allPgXact[autovac->pgprocno];
 
-                       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+                       ProcArrayLockAcquire(PAL_EXCLUSIVE);
 
                        /*
                         * Only do it if the worker is not working to protect against Xid
@@ -1099,7 +1100,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
                                         pid);
 
                                /* don't hold the lock across the kill() syscall */
-                               LWLockRelease(ProcArrayLock);
+                               ProcArrayLockRelease();
 
                                /* send the autovacuum worker Back to Old Kent Road */
                                if (kill(pid, SIGINT) < 0)
@@ -1111,7 +1112,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
                                }
                        }
                        else
-                               LWLockRelease(ProcArrayLock);
+                               ProcArrayLockRelease();
 
                        /* prevent signal from being resent more than once */
                        allow_autovacuum_cancel = false;
diff --git a/src/backend/storage/lmgr/procarraylock.c b/src/backend/storage/lmgr/procarraylock.c
new file mode 100644 (file)
index 0000000..7cd4b6b
--- /dev/null
@@ -0,0 +1,344 @@
+/*-------------------------------------------------------------------------
+ *
+ * procarraylock.c
+ *       Lock management for the ProcArray
+ *
+ * Because the ProcArray data structure is highly trafficked, it is
+ * critical that mutual exclusion for ProcArray options be as efficient
+ * as possible.  A particular problem is transaction end (commit or abort)
+ * which cannot be done in parallel with snapshot acquisition.  We
+ * therefore include some special hacks to deal with this case efficiently.
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *       src/backend/storage/lmgr/procarraylock.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "pg_trace.h"
+#include "access/transam.h"
+#include "storage/flexlock_internals.h"
+#include "storage/ipc.h"
+#include "storage/procarraylock.h"
+#include "storage/proc.h"
+#include "storage/spin.h"
+
+typedef struct ProcArrayLockStruct
+{
+       FlexLock        flex;                   /* common FlexLock infrastructure */
+       char            exclusive;              /* # of exclusive holders (0 or 1) */
+       int                     shared;                 /* # of shared holders (0..MaxBackends) */
+       PGPROC     *ending;                     /* transactions wishing to clear state */
+       TransactionId   latest_ending_xid;      /* latest ending XID */
+} ProcArrayLockStruct;
+
+/* There is only one ProcArrayLock. */
+#define        ProcArrayLockPointer() \
+       (AssertMacro(FlexLockArray[ProcArrayLock].flex.locktype == \
+               FLEXLOCK_TYPE_PROCARRAYLOCK), \
+        (volatile ProcArrayLockStruct *) &FlexLockArray[ProcArrayLock])
+
+/*
+ * ProcArrayLockAcquire - acquire a lightweight lock in the specified mode
+ *
+ * If the lock is not available, sleep until it is.
+ *
+ * Side effect: cancel/die interrupts are held off until lock release.
+ */
+void
+ProcArrayLockAcquire(ProcArrayLockMode mode)
+{
+       volatile ProcArrayLockStruct *lock = ProcArrayLockPointer();
+       PGPROC     *proc = MyProc;
+       bool            retry = false;
+       int                     extraWaits = 0;
+
+       /*
+        * We can't wait if we haven't got a PGPROC.  This should only occur
+        * during bootstrap or shared memory initialization.  Put an Assert here
+        * to catch unsafe coding practices.
+        */
+       Assert(!(proc == NULL && IsUnderPostmaster));
+
+       /*
+        * Lock out cancel/die interrupts until we exit the code section protected
+        * by the ProcArrayLock.  This ensures that interrupts will not interfere
+     * with manipulations of data structures in shared memory.
+        */
+       HOLD_INTERRUPTS();
+
+       /*
+        * Loop here to try to acquire lock after each time we are signaled by
+        * ProcArrayLockRelease.  See comments in LWLockAcquire for an explanation
+        * of why do we not attempt to hand off the lock directly.
+        */
+       for (;;)
+       {
+               bool            mustwait;
+
+               /* Acquire mutex.  Time spent holding mutex should be short! */
+               SpinLockAcquire(&lock->flex.mutex);
+
+               /* If retrying, allow LWLockRelease to release waiters again */
+               if (retry)
+                       lock->flex.releaseOK = true;
+
+               /* If I can get the lock, do so quickly. */
+               if (mode == PAL_EXCLUSIVE)
+               {
+                       if (lock->exclusive == 0 && lock->shared == 0)
+                       {
+                               lock->exclusive++;
+                               mustwait = false;
+                       }
+                       else
+                               mustwait = true;
+               }
+               else
+               {
+                       if (lock->exclusive == 0)
+                       {
+                               lock->shared++;
+                               mustwait = false;
+                       }
+                       else
+                               mustwait = true;
+               }
+
+               if (!mustwait)
+                       break;                          /* got the lock */
+
+               /* Add myself to wait queue. */
+               FlexLockJoinWaitQueue(lock, (int) mode);
+
+               /* Can release the mutex now */
+               SpinLockRelease(&lock->flex.mutex);
+
+               /* Wait until awakened. */
+               extraWaits += FlexLockWait(ProcArrayLock, mode);
+
+               /* Now loop back and try to acquire lock again. */
+               retry = true;
+       }
+
+       /* We are done updating shared state of the lock itself. */
+       SpinLockRelease(&lock->flex.mutex);
+
+       TRACE_POSTGRESQL_FLEXLOCK_ACQUIRE(lockid, mode);
+
+       /* Add lock to list of locks held by this backend */
+       FlexLockRemember(ProcArrayLock);
+
+       /*
+        * Fix the process wait semaphore's count for any absorbed wakeups.
+        */
+       while (extraWaits-- > 0)
+               PGSemaphoreUnlock(&proc->sem);
+}
+
+/*
+ * ProcArrayLockClearTransaction - safely clear transaction details
+ *
+ * This can't be done while ProcArrayLock is held, but it's so fast that
+ * we can afford to do it while holding the spinlock, rather than acquiring
+ * and releasing the lock.
+ */
+void
+ProcArrayLockClearTransaction(TransactionId latestXid)
+{
+       volatile ProcArrayLockStruct *lock = ProcArrayLockPointer();
+       PGPROC     *proc = MyProc;
+       int                     extraWaits = 0;
+       bool            mustwait;
+
+       HOLD_INTERRUPTS();
+
+       /* Acquire mutex.  Time spent holding mutex should be short! */
+       SpinLockAcquire(&lock->flex.mutex);
+
+       if (lock->exclusive == 0 && lock->shared == 0)
+       {
+               {
+                       volatile PGPROC *vproc = proc;
+                       volatile PGXACT *pgxact = &ProcGlobal->allPgXact[vproc->pgprocno];
+                       /* If there are no lockers, clear the critical PGPROC fields. */
+                       pgxact->xid = InvalidTransactionId;
+               pgxact->xmin = InvalidTransactionId;
+               /* must be cleared with xid/xmin: */
+               pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
+                       pgxact->nxids = 0;
+                       pgxact->overflowed = false;
+               }
+               mustwait = false;
+
+        /* Also advance global latestCompletedXid while holding the lock */
+        if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
+                                  latestXid))
+            ShmemVariableCache->latestCompletedXid = latestXid;
+       }
+       else
+       {
+               /* Rats, must wait. */
+               proc->flWaitLink = lock->ending;
+               lock->ending = proc;
+               if (!TransactionIdIsValid(lock->latest_ending_xid) ||
+                               TransactionIdPrecedes(lock->latest_ending_xid, latestXid)) 
+                       lock->latest_ending_xid = latestXid;
+               mustwait = true;
+       }
+
+       /* Can release the mutex now */
+       SpinLockRelease(&lock->flex.mutex);
+
+       /*
+        * If we were not able to perfom the operation immediately, we must wait.
+        * But we need not retry after being awoken, because the last lock holder
+        * to release the lock will do the work first, on our behalf.
+        */
+       if (mustwait)
+       {
+               extraWaits += FlexLockWait(ProcArrayLock, 2);
+               while (extraWaits-- > 0)
+                       PGSemaphoreUnlock(&proc->sem);
+       }
+
+       RESUME_INTERRUPTS();
+}
+
+/*
+ * ProcArrayLockRelease - release a previously acquired lock
+ */
+void
+ProcArrayLockRelease(void)
+{
+       volatile ProcArrayLockStruct *lock = ProcArrayLockPointer();
+       PGPROC     *head;
+       PGPROC     *ending = NULL;
+       PGPROC     *proc;
+
+       FlexLockForget(ProcArrayLock);
+
+       /* Acquire mutex.  Time spent holding mutex should be short! */
+       SpinLockAcquire(&lock->flex.mutex);
+
+       /* Release my hold on lock */
+       if (lock->exclusive > 0)
+               lock->exclusive--;
+       else
+       {
+               Assert(lock->shared > 0);
+               lock->shared--;
+       }
+
+       /*
+        * If the lock is now free, but there are some transactions trying to
+        * end, we must clear the critical PGPROC fields for them, and save a
+        * list of them so we can wake them up.
+        */
+       if (lock->exclusive == 0 && lock->shared == 0 && lock->ending != NULL)
+       {
+               volatile PGPROC *vproc;
+
+               ending = lock->ending;
+               vproc = ending;
+
+               while (vproc != NULL)
+               {
+                       volatile PGXACT *pgxact = &ProcGlobal->allPgXact[vproc->pgprocno];
+
+               pgxact->xid = InvalidTransactionId;
+               pgxact->xmin = InvalidTransactionId;
+               /* must be cleared with xid/xmin: */
+               pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
+                       pgxact->nxids = 0;
+                       pgxact->overflowed = false;
+                       vproc = vproc->flWaitLink;
+               }
+
+               /* Also advance global latestCompletedXid */
+               if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
+                                                                 lock->latest_ending_xid))
+                       ShmemVariableCache->latestCompletedXid = lock->latest_ending_xid;
+
+               /* Reset lock state. */
+               lock->ending = NULL;
+               lock->latest_ending_xid = InvalidTransactionId;
+       }
+
+       /*
+        * See if I need to awaken any waiters.  If I released a non-last shared
+        * hold, there cannot be anything to do.  Also, do not awaken any waiters
+        * if someone has already awakened waiters that haven't yet acquired the
+        * lock.
+        */
+       head = lock->flex.head;
+       if (head != NULL)
+       {
+               if (lock->exclusive == 0 && lock->shared == 0 && lock->flex.releaseOK)
+               {
+                       /*
+                        * Remove the to-be-awakened PGPROCs from the queue.  If the front
+                        * waiter wants exclusive lock, awaken him only. Otherwise awaken
+                        * as many waiters as want shared access.
+                        */
+                       proc = head;
+                       if (proc->flWaitMode != LW_EXCLUSIVE)
+                       {
+                               while (proc->flWaitLink != NULL &&
+                                          proc->flWaitLink->flWaitMode != LW_EXCLUSIVE)
+                                       proc = proc->flWaitLink;
+                       }
+                       /* proc is now the last PGPROC to be released */
+                       lock->flex.head = proc->flWaitLink;
+                       proc->flWaitLink = NULL;
+                       /* prevent additional wakeups until retryer gets to run */
+                       lock->flex.releaseOK = false;
+               }
+               else
+               {
+                       /* lock is still held, can't awaken anything */
+                       head = NULL;
+               }
+       }
+
+       /* We are done updating shared state of the lock itself. */
+       SpinLockRelease(&lock->flex.mutex);
+
+       TRACE_POSTGRESQL_FLEXLOCK_RELEASE(lockid);
+
+       /*
+        * Awaken any waiters I removed from the queue.
+        */
+       while (head != NULL)
+       {
+               FlexLockDebug("LWLockRelease", lockid, "release waiter");
+               proc = head;
+               head = proc->flWaitLink;
+               proc->flWaitLink = NULL;
+               proc->flWaitResult = 1;         /* any non-zero value will do */
+               PGSemaphoreUnlock(&proc->sem);
+       }
+
+       /*
+        * Also awaken any processes whose critical PGPROC fields I cleared
+        */
+       while (ending != NULL)
+       {
+               FlexLockDebug("LWLockRelease", lockid, "release ending");
+               proc = ending;
+               ending = proc->flWaitLink;
+               proc->flWaitLink = NULL;
+               proc->flWaitResult = 1;         /* any non-zero value will do */
+               PGSemaphoreUnlock(&proc->sem);
+       }
+
+       /*
+        * Now okay to allow cancel/die interrupts.
+        */
+       RESUME_INTERRUPTS();
+}
index 4fcb3423ddec40404b07d2eb4337f51f72fc595a..a5c571177d63886ccabbaf6c480e77c178c1a393 100644 (file)
@@ -41,6 +41,7 @@ typedef struct FlexLock
 } FlexLock;
 
 #define FLEXLOCK_TYPE_LWLOCK                   'l'
+#define FLEXLOCK_TYPE_PROCARRAYLOCK            'p'
 
 typedef union FlexLockPadded
 {
diff --git a/src/include/storage/procarraylock.h b/src/include/storage/procarraylock.h
new file mode 100644 (file)
index 0000000..678ca6f
--- /dev/null
@@ -0,0 +1,28 @@
+/*-------------------------------------------------------------------------
+ *
+ * procarraylock.h
+ *       Lock management for the ProcArray
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/lwlock.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PROCARRAYLOCK_H
+#define PROCARRAYLOCK_H
+
+#include "storage/flexlock.h"
+
+typedef enum ProcArrayLockMode
+{
+       PAL_EXCLUSIVE,
+       PAL_SHARED
+} ProcArrayLockMode;
+
+extern void ProcArrayLockAcquire(ProcArrayLockMode mode);
+extern void ProcArrayLockClearTransaction(TransactionId latestXid);
+extern void ProcArrayLockRelease(void);
+
+#endif   /* PROCARRAYLOCK_H */