#include "access/twophase.h"
#include "miscadmin.h"
#include "storage/procarray.h"
+#include "storage/procarraylock.h"
#include "storage/spin.h"
#include "utils/builtins.h"
#include "utils/snapmgr.h"
ProcArrayStruct *arrayP = procArray;
int index;
- LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ ProcArrayLockAcquire(PAL_EXCLUSIVE);
if (arrayP->numProcs >= arrayP->maxProcs)
{
* fixed supply of PGPROC structs too, and so we should have failed
* earlier.)
*/
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
ereport(FATAL,
(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
errmsg("sorry, too many clients already")));
arrayP->pgprocnos[index] = proc->pgprocno;
arrayP->numProcs++;
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
}
/*
DisplayXidCache();
#endif
- LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ ProcArrayLockAcquire(PAL_EXCLUSIVE);
if (TransactionIdIsValid(latestXid))
{
(arrayP->numProcs - index - 1) * sizeof (int));
arrayP->pgprocnos[arrayP->numProcs - 1] = -1; /* for debugging */
arrayP->numProcs--;
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
return;
}
}
/* Ooops */
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
elog(LOG, "failed to find proc %p in ProcArray", proc);
}
if (TransactionIdIsValid(latestXid))
{
- /*
- * We must lock ProcArrayLock while clearing our advertised XID, so
- * that we do not exit the set of "running" transactions while someone
- * else is taking a snapshot. See discussion in
- * src/backend/access/transam/README.
- */
- Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
-
- LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
-
- pgxact->xid = InvalidTransactionId;
- proc->lxid = InvalidLocalTransactionId;
- pgxact->xmin = InvalidTransactionId;
- /* must be cleared with xid/xmin: */
- pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
- pgxact->inCommit = false; /* be sure this is cleared in abort */
- proc->recoveryConflictPending = false;
-
- /* Clear the subtransaction-XID cache too while holding the lock */
- pgxact->nxids = 0;
- pgxact->overflowed = false;
-
- /* Also advance global latestCompletedXid while holding the lock */
- if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
- latestXid))
- ShmemVariableCache->latestCompletedXid = latestXid;
-
- LWLockRelease(ProcArrayLock);
+ Assert(proc == MyProc);
+ ProcArrayLockClearTransaction(latestXid);
}
else
{
- /*
- * If we have no XID, we don't need to lock, since we won't affect
- * anyone else's calculation of a snapshot. We might change their
- * estimate of global xmin, but that's OK.
- */
- Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
-
- proc->lxid = InvalidLocalTransactionId;
pgxact->xmin = InvalidTransactionId;
/* must be cleared with xid/xmin: */
pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
- pgxact->inCommit = false; /* be sure this is cleared in abort */
- proc->recoveryConflictPending = false;
-
- Assert(pgxact->nxids == 0);
- Assert(pgxact->overflowed == false);
}
+
+ proc->lxid = InvalidLocalTransactionId;
+ pgxact->inCommit = false; /* be sure this is cleared in abort */
+ proc->recoveryConflictPending = false;
}
/*
* Nobody else is running yet, but take locks anyhow
*/
- LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ ProcArrayLockAcquire(PAL_EXCLUSIVE);
/*
* KnownAssignedXids is sorted so we cannot just add the xids, we have to
Assert(TransactionIdIsNormal(ShmemVariableCache->latestCompletedXid));
Assert(TransactionIdIsValid(ShmemVariableCache->nextXid));
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
if (standbyState == STANDBY_SNAPSHOT_READY)
/*
* Uses same locking as transaction commit
*/
- LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ ProcArrayLockAcquire(PAL_EXCLUSIVE);
/*
* Remove subxids from known-assigned-xacts.
if (TransactionIdPrecedes(procArray->lastOverflowedXid, max_xid))
procArray->lastOverflowedXid = max_xid;
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
}
/*
errmsg("out of memory")));
}
- LWLockAcquire(ProcArrayLock, LW_SHARED);
+ ProcArrayLockAcquire(PAL_SHARED);
/*
* Now that we have the lock, we can check latestCompletedXid; if the
*/
if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, xid))
{
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
xc_by_latest_xid_inc();
return true;
}
*/
if (TransactionIdEquals(pxid, xid))
{
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
xc_by_main_xid_inc();
return true;
}
if (TransactionIdEquals(cxid, xid))
{
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
xc_by_child_xid_inc();
return true;
}
if (KnownAssignedXidExists(xid))
{
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
xc_by_known_assigned_inc();
return true;
}
nxids = KnownAssignedXidsGet(xids, xid);
}
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
/*
* If none of the relevant caches overflowed, we know the Xid is not
if (TransactionIdPrecedes(xid, RecentXmin))
return false;
- LWLockAcquire(ProcArrayLock, LW_SHARED);
+ ProcArrayLockAcquire(PAL_SHARED);
for (i = 0; i < arrayP->numProcs; i++)
{
}
}
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
return result;
}
/* Cannot look for individual databases during recovery */
Assert(allDbs || !RecoveryInProgress());
- LWLockAcquire(ProcArrayLock, LW_SHARED);
+ ProcArrayLockAcquire(PAL_SHARED);
/*
* We initialize the MIN() calculation with latestCompletedXid + 1. This
*/
TransactionId kaxmin = KnownAssignedXidsGetOldestXmin();
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
if (TransactionIdIsNormal(kaxmin) &&
TransactionIdPrecedes(kaxmin, result))
/*
* No other information needed, so release the lock immediately.
*/
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
/*
* Compute the cutoff XID by subtracting vacuum_defer_cleanup_age,
* It is sufficient to get shared lock on ProcArrayLock, even if we are
* going to set MyProc->xmin.
*/
- LWLockAcquire(ProcArrayLock, LW_SHARED);
+ ProcArrayLockAcquire(PAL_SHARED);
/* xmax is always latestCompletedXid + 1 */
xmax = ShmemVariableCache->latestCompletedXid;
if (!TransactionIdIsValid(MyPgXact->xmin))
MyPgXact->xmin = TransactionXmin = xmin;
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
/*
* Update globalxmin to include actual process xids. This is a slightly
return false;
/* Get lock so source xact can't end while we're doing this */
- LWLockAcquire(ProcArrayLock, LW_SHARED);
+ ProcArrayLockAcquire(PAL_SHARED);
for (index = 0; index < arrayP->numProcs; index++)
{
break;
}
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
return result;
}
* Ensure that no xids enter or leave the procarray while we obtain
* snapshot.
*/
- LWLockAcquire(ProcArrayLock, LW_SHARED);
+ ProcArrayLockAcquire(PAL_SHARED);
LWLockAcquire(XidGenLock, LW_SHARED);
latestCompletedXid = ShmemVariableCache->latestCompletedXid;
CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
/* We don't release XidGenLock here, the caller is responsible for that */
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));
Assert(!RecoveryInProgress());
- LWLockAcquire(ProcArrayLock, LW_SHARED);
+ ProcArrayLockAcquire(PAL_SHARED);
oldestRunningXid = ShmemVariableCache->nextXid;
*/
}
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
return oldestRunningXid;
}
xids = (TransactionId *) palloc(arrayP->maxProcs * sizeof(TransactionId));
nxids = 0;
- LWLockAcquire(ProcArrayLock, LW_SHARED);
+ ProcArrayLockAcquire(PAL_SHARED);
for (index = 0; index < arrayP->numProcs; index++)
{
xids[nxids++] = pxid;
}
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
*xids_p = xids;
return nxids;
ProcArrayStruct *arrayP = procArray;
int index;
- LWLockAcquire(ProcArrayLock, LW_SHARED);
+ ProcArrayLockAcquire(PAL_SHARED);
for (index = 0; index < arrayP->numProcs; index++)
{
}
}
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
return result;
}
if (pid == 0) /* never match dummy PGPROCs */
return NULL;
- LWLockAcquire(ProcArrayLock, LW_SHARED);
+ ProcArrayLockAcquire(PAL_SHARED);
for (index = 0; index < arrayP->numProcs; index++)
{
}
}
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
return result;
}
if (xid == InvalidTransactionId) /* never match invalid xid */
return 0;
- LWLockAcquire(ProcArrayLock, LW_SHARED);
+ ProcArrayLockAcquire(PAL_SHARED);
for (index = 0; index < arrayP->numProcs; index++)
{
}
}
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
return result;
}
vxids = (VirtualTransactionId *)
palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
- LWLockAcquire(ProcArrayLock, LW_SHARED);
+ ProcArrayLockAcquire(PAL_SHARED);
for (index = 0; index < arrayP->numProcs; index++)
{
}
}
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
*nvxids = count;
return vxids;
errmsg("out of memory")));
}
- LWLockAcquire(ProcArrayLock, LW_SHARED);
+ ProcArrayLockAcquire(PAL_SHARED);
for (index = 0; index < arrayP->numProcs; index++)
{
}
}
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
/* add the terminator */
vxids[count].backendId = InvalidBackendId;
int index;
pid_t pid = 0;
- LWLockAcquire(ProcArrayLock, LW_SHARED);
+ ProcArrayLockAcquire(PAL_SHARED);
for (index = 0; index < arrayP->numProcs; index++)
{
}
}
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
return pid;
}
int count = 0;
int index;
- LWLockAcquire(ProcArrayLock, LW_SHARED);
+ ProcArrayLockAcquire(PAL_SHARED);
for (index = 0; index < arrayP->numProcs; index++)
{
count++;
}
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
return count;
}
pid_t pid = 0;
/* tell all backends to die */
- LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ ProcArrayLockAcquire(PAL_EXCLUSIVE);
for (index = 0; index < arrayP->numProcs; index++)
{
}
}
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
}
/*
int count = 0;
int index;
- LWLockAcquire(ProcArrayLock, LW_SHARED);
+ ProcArrayLockAcquire(PAL_SHARED);
for (index = 0; index < arrayP->numProcs; index++)
{
count++;
}
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
return count;
}
*nbackends = *nprepared = 0;
- LWLockAcquire(ProcArrayLock, LW_SHARED);
+ ProcArrayLockAcquire(PAL_SHARED);
for (index = 0; index < arrayP->numProcs; index++)
{
}
}
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
if (!found)
return false; /* no conflicting backends, so done */
* to abort subtransactions, but pending closer analysis we'd best be
* conservative.
*/
- LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ ProcArrayLockAcquire(PAL_EXCLUSIVE);
/*
* Under normal circumstances xid and xids[] will be in increasing order,
latestXid))
ShmemVariableCache->latestCompletedXid = latestXid;
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
}
#ifdef XIDCACHE_DEBUG
/*
* Uses same locking as transaction commit
*/
- LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ ProcArrayLockAcquire(PAL_EXCLUSIVE);
KnownAssignedXidsRemoveTree(xid, nsubxids, subxids);
max_xid))
ShmemVariableCache->latestCompletedXid = max_xid;
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
}
/*
void
ExpireAllKnownAssignedTransactionIds(void)
{
- LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ ProcArrayLockAcquire(PAL_EXCLUSIVE);
KnownAssignedXidsRemovePreceding(InvalidTransactionId);
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
}
/*
void
ExpireOldKnownAssignedTransactionIds(TransactionId xid)
{
- LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ ProcArrayLockAcquire(PAL_EXCLUSIVE);
KnownAssignedXidsRemovePreceding(xid);
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
}
{
/* must hold lock to compress */
if (!exclusive_lock)
- LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ ProcArrayLockAcquire(PAL_EXCLUSIVE);
KnownAssignedXidsCompress(true);
/* note: we no longer care about the tail pointer */
if (!exclusive_lock)
- LWLockRelease(ProcArrayLock);
+ ProcArrayLockRelease();
/*
* If it still won't fit then we're out of memory
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * procarraylock.c
+ * Lock management for the ProcArray
+ *
+ * Because the ProcArray data structure is highly trafficked, it is
+ * critical that mutual exclusion for ProcArray options be as efficient
+ * as possible. A particular problem is transaction end (commit or abort)
+ * which cannot be done in parallel with snapshot acquisition. We
+ * therefore include some special hacks to deal with this case efficiently.
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/storage/lmgr/procarraylock.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "pg_trace.h"
+#include "access/transam.h"
+#include "storage/flexlock_internals.h"
+#include "storage/ipc.h"
+#include "storage/procarraylock.h"
+#include "storage/proc.h"
+#include "storage/spin.h"
+
+typedef struct ProcArrayLockStruct
+{
+ FlexLock flex; /* common FlexLock infrastructure */
+ char exclusive; /* # of exclusive holders (0 or 1) */
+ int shared; /* # of shared holders (0..MaxBackends) */
+ PGPROC *ending; /* transactions wishing to clear state */
+ TransactionId latest_ending_xid; /* latest ending XID */
+} ProcArrayLockStruct;
+
+/* There is only one ProcArrayLock. */
+#define ProcArrayLockPointer() \
+ (AssertMacro(FlexLockArray[ProcArrayLock].flex.locktype == \
+ FLEXLOCK_TYPE_PROCARRAYLOCK), \
+ (volatile ProcArrayLockStruct *) &FlexLockArray[ProcArrayLock])
+
+/*
+ * ProcArrayLockAcquire - acquire a lightweight lock in the specified mode
+ *
+ * If the lock is not available, sleep until it is.
+ *
+ * Side effect: cancel/die interrupts are held off until lock release.
+ */
+void
+ProcArrayLockAcquire(ProcArrayLockMode mode)
+{
+ volatile ProcArrayLockStruct *lock = ProcArrayLockPointer();
+ PGPROC *proc = MyProc;
+ bool retry = false;
+ int extraWaits = 0;
+
+ /*
+ * We can't wait if we haven't got a PGPROC. This should only occur
+ * during bootstrap or shared memory initialization. Put an Assert here
+ * to catch unsafe coding practices.
+ */
+ Assert(!(proc == NULL && IsUnderPostmaster));
+
+ /*
+ * Lock out cancel/die interrupts until we exit the code section protected
+ * by the ProcArrayLock. This ensures that interrupts will not interfere
+ * with manipulations of data structures in shared memory.
+ */
+ HOLD_INTERRUPTS();
+
+ /*
+ * Loop here to try to acquire lock after each time we are signaled by
+ * ProcArrayLockRelease. See comments in LWLockAcquire for an explanation
+ * of why do we not attempt to hand off the lock directly.
+ */
+ for (;;)
+ {
+ bool mustwait;
+
+ /* Acquire mutex. Time spent holding mutex should be short! */
+ SpinLockAcquire(&lock->flex.mutex);
+
+ /* If retrying, allow LWLockRelease to release waiters again */
+ if (retry)
+ lock->flex.releaseOK = true;
+
+ /* If I can get the lock, do so quickly. */
+ if (mode == PAL_EXCLUSIVE)
+ {
+ if (lock->exclusive == 0 && lock->shared == 0)
+ {
+ lock->exclusive++;
+ mustwait = false;
+ }
+ else
+ mustwait = true;
+ }
+ else
+ {
+ if (lock->exclusive == 0)
+ {
+ lock->shared++;
+ mustwait = false;
+ }
+ else
+ mustwait = true;
+ }
+
+ if (!mustwait)
+ break; /* got the lock */
+
+ /* Add myself to wait queue. */
+ FlexLockJoinWaitQueue(lock, (int) mode);
+
+ /* Can release the mutex now */
+ SpinLockRelease(&lock->flex.mutex);
+
+ /* Wait until awakened. */
+ extraWaits += FlexLockWait(ProcArrayLock, mode);
+
+ /* Now loop back and try to acquire lock again. */
+ retry = true;
+ }
+
+ /* We are done updating shared state of the lock itself. */
+ SpinLockRelease(&lock->flex.mutex);
+
+ TRACE_POSTGRESQL_FLEXLOCK_ACQUIRE(lockid, mode);
+
+ /* Add lock to list of locks held by this backend */
+ FlexLockRemember(ProcArrayLock);
+
+ /*
+ * Fix the process wait semaphore's count for any absorbed wakeups.
+ */
+ while (extraWaits-- > 0)
+ PGSemaphoreUnlock(&proc->sem);
+}
+
+/*
+ * ProcArrayLockClearTransaction - safely clear transaction details
+ *
+ * This can't be done while ProcArrayLock is held, but it's so fast that
+ * we can afford to do it while holding the spinlock, rather than acquiring
+ * and releasing the lock.
+ */
+void
+ProcArrayLockClearTransaction(TransactionId latestXid)
+{
+ volatile ProcArrayLockStruct *lock = ProcArrayLockPointer();
+ PGPROC *proc = MyProc;
+ int extraWaits = 0;
+ bool mustwait;
+
+ HOLD_INTERRUPTS();
+
+ /* Acquire mutex. Time spent holding mutex should be short! */
+ SpinLockAcquire(&lock->flex.mutex);
+
+ if (lock->exclusive == 0 && lock->shared == 0)
+ {
+ {
+ volatile PGPROC *vproc = proc;
+ volatile PGXACT *pgxact = &ProcGlobal->allPgXact[vproc->pgprocno];
+ /* If there are no lockers, clear the critical PGPROC fields. */
+ pgxact->xid = InvalidTransactionId;
+ pgxact->xmin = InvalidTransactionId;
+ /* must be cleared with xid/xmin: */
+ pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
+ pgxact->nxids = 0;
+ pgxact->overflowed = false;
+ }
+ mustwait = false;
+
+ /* Also advance global latestCompletedXid while holding the lock */
+ if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
+ latestXid))
+ ShmemVariableCache->latestCompletedXid = latestXid;
+ }
+ else
+ {
+ /* Rats, must wait. */
+ proc->flWaitLink = lock->ending;
+ lock->ending = proc;
+ if (!TransactionIdIsValid(lock->latest_ending_xid) ||
+ TransactionIdPrecedes(lock->latest_ending_xid, latestXid))
+ lock->latest_ending_xid = latestXid;
+ mustwait = true;
+ }
+
+ /* Can release the mutex now */
+ SpinLockRelease(&lock->flex.mutex);
+
+ /*
+ * If we were not able to perfom the operation immediately, we must wait.
+ * But we need not retry after being awoken, because the last lock holder
+ * to release the lock will do the work first, on our behalf.
+ */
+ if (mustwait)
+ {
+ extraWaits += FlexLockWait(ProcArrayLock, 2);
+ while (extraWaits-- > 0)
+ PGSemaphoreUnlock(&proc->sem);
+ }
+
+ RESUME_INTERRUPTS();
+}
+
+/*
+ * ProcArrayLockRelease - release a previously acquired lock
+ */
+void
+ProcArrayLockRelease(void)
+{
+ volatile ProcArrayLockStruct *lock = ProcArrayLockPointer();
+ PGPROC *head;
+ PGPROC *ending = NULL;
+ PGPROC *proc;
+
+ FlexLockForget(ProcArrayLock);
+
+ /* Acquire mutex. Time spent holding mutex should be short! */
+ SpinLockAcquire(&lock->flex.mutex);
+
+ /* Release my hold on lock */
+ if (lock->exclusive > 0)
+ lock->exclusive--;
+ else
+ {
+ Assert(lock->shared > 0);
+ lock->shared--;
+ }
+
+ /*
+ * If the lock is now free, but there are some transactions trying to
+ * end, we must clear the critical PGPROC fields for them, and save a
+ * list of them so we can wake them up.
+ */
+ if (lock->exclusive == 0 && lock->shared == 0 && lock->ending != NULL)
+ {
+ volatile PGPROC *vproc;
+
+ ending = lock->ending;
+ vproc = ending;
+
+ while (vproc != NULL)
+ {
+ volatile PGXACT *pgxact = &ProcGlobal->allPgXact[vproc->pgprocno];
+
+ pgxact->xid = InvalidTransactionId;
+ pgxact->xmin = InvalidTransactionId;
+ /* must be cleared with xid/xmin: */
+ pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
+ pgxact->nxids = 0;
+ pgxact->overflowed = false;
+ vproc = vproc->flWaitLink;
+ }
+
+ /* Also advance global latestCompletedXid */
+ if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
+ lock->latest_ending_xid))
+ ShmemVariableCache->latestCompletedXid = lock->latest_ending_xid;
+
+ /* Reset lock state. */
+ lock->ending = NULL;
+ lock->latest_ending_xid = InvalidTransactionId;
+ }
+
+ /*
+ * See if I need to awaken any waiters. If I released a non-last shared
+ * hold, there cannot be anything to do. Also, do not awaken any waiters
+ * if someone has already awakened waiters that haven't yet acquired the
+ * lock.
+ */
+ head = lock->flex.head;
+ if (head != NULL)
+ {
+ if (lock->exclusive == 0 && lock->shared == 0 && lock->flex.releaseOK)
+ {
+ /*
+ * Remove the to-be-awakened PGPROCs from the queue. If the front
+ * waiter wants exclusive lock, awaken him only. Otherwise awaken
+ * as many waiters as want shared access.
+ */
+ proc = head;
+ if (proc->flWaitMode != LW_EXCLUSIVE)
+ {
+ while (proc->flWaitLink != NULL &&
+ proc->flWaitLink->flWaitMode != LW_EXCLUSIVE)
+ proc = proc->flWaitLink;
+ }
+ /* proc is now the last PGPROC to be released */
+ lock->flex.head = proc->flWaitLink;
+ proc->flWaitLink = NULL;
+ /* prevent additional wakeups until retryer gets to run */
+ lock->flex.releaseOK = false;
+ }
+ else
+ {
+ /* lock is still held, can't awaken anything */
+ head = NULL;
+ }
+ }
+
+ /* We are done updating shared state of the lock itself. */
+ SpinLockRelease(&lock->flex.mutex);
+
+ TRACE_POSTGRESQL_FLEXLOCK_RELEASE(lockid);
+
+ /*
+ * Awaken any waiters I removed from the queue.
+ */
+ while (head != NULL)
+ {
+ FlexLockDebug("LWLockRelease", lockid, "release waiter");
+ proc = head;
+ head = proc->flWaitLink;
+ proc->flWaitLink = NULL;
+ proc->flWaitResult = 1; /* any non-zero value will do */
+ PGSemaphoreUnlock(&proc->sem);
+ }
+
+ /*
+ * Also awaken any processes whose critical PGPROC fields I cleared
+ */
+ while (ending != NULL)
+ {
+ FlexLockDebug("LWLockRelease", lockid, "release ending");
+ proc = ending;
+ ending = proc->flWaitLink;
+ proc->flWaitLink = NULL;
+ proc->flWaitResult = 1; /* any non-zero value will do */
+ PGSemaphoreUnlock(&proc->sem);
+ }
+
+ /*
+ * Now okay to allow cancel/die interrupts.
+ */
+ RESUME_INTERRUPTS();
+}