static void UnpinBuffer(BufferDesc *buf);
static void UnpinBufferNoOwner(BufferDesc *buf);
static void BufferSync(int flags);
-static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
static int SyncOneBuffer(int buf_id, bool skip_recently_used,
WritebackContext *wb_context);
static void WaitIO(BufferDesc *buf);
bool from_ring;
/*
- * Ensure, while the spinlock's not yet held, that there's a free refcount
- * entry, and a resource owner slot for the pin.
+ * Ensure, before we pin a victim buffer, that there's a free refcount
+ * entry and resource owner slot for the pin.
*/
ReservePrivateRefCountEntry();
ResourceOwnerEnlarge(CurrentResourceOwner);
again:
/*
- * Select a victim buffer. The buffer is returned with its header
- * spinlock still held!
+ * Select a victim buffer. The buffer is returned pinned and owned by
+ * this backend.
*/
buf_hdr = StrategyGetBuffer(strategy, &buf_state, &from_ring);
buf = BufferDescriptorGetBuffer(buf_hdr);
- Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0);
-
- /* Pin the buffer and then release the buffer spinlock */
- PinBuffer_Locked(buf_hdr);
-
/*
* We shouldn't have any other pins for this buffer.
*/
{
result = (buf_state & BM_VALID) != 0;
- ref = NewPrivateRefCountEntry(b);
+ TrackNewBufferPin(b);
/*
* Assume that we acquired a buffer pin for the purposes of
* cannot meddle with that.
*/
result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0;
+
+ Assert(ref->refcount > 0);
+ ref->refcount++;
+ ResourceOwnerRememberBuffer(CurrentResourceOwner, b);
}
- ref->refcount++;
- Assert(ref->refcount > 0);
- ResourceOwnerRememberBuffer(CurrentResourceOwner, b);
return result;
}
static void
PinBuffer_Locked(BufferDesc *buf)
{
- Buffer b;
- PrivateRefCountEntry *ref;
uint32 buf_state;
/*
buf_state += BUF_REFCOUNT_ONE;
UnlockBufHdr(buf, buf_state);
- b = BufferDescriptorGetBuffer(buf);
-
- ref = NewPrivateRefCountEntry(b);
- ref->refcount++;
-
- ResourceOwnerRememberBuffer(CurrentResourceOwner, b);
+ TrackNewBufferPin(BufferDescriptorGetBuffer(buf));
}
/*
}
}
+inline void
+TrackNewBufferPin(Buffer buf)
+{
+ PrivateRefCountEntry *ref;
+
+ ref = NewPrivateRefCountEntry(buf);
+ ref->refcount++;
+
+ ResourceOwnerRememberBuffer(CurrentResourceOwner, buf);
+}
+
#define ST_SORT sort_checkpoint_bufferids
#define ST_ELEMENT_TYPE CkptSortItem
#define ST_COMPARE(a, b) ckpt_buforder_comparator(a, b)
* Obviously the buffer could be locked by the time the value is returned, so
* this is primarily useful in CAS style loops.
*/
-static uint32
+pg_noinline uint32
WaitBufHdrUnlocked(BufferDesc *buf)
{
SpinDelayStatus delayStatus;
* StrategyGetBuffer
*
* Called by the bufmgr to get the next candidate buffer to use in
- * BufferAlloc(). The only hard requirement BufferAlloc() has is that
+ * GetVictimBuffer(). The only hard requirement GetVictimBuffer() has is that
* the selected buffer must not currently be pinned by anyone.
*
* strategy is a BufferAccessStrategy object, or NULL for default strategy.
*
- * To ensure that no one else can pin the buffer before we do, we must
- * return the buffer with the buffer header spinlock still held.
+ * It is the callers responsibility to ensure the buffer ownership can be
+ * tracked via TrackNewBufferPin().
+ *
+ * The buffer is pinned and marked as owned, using TrackNewBufferPin(),
+ * before returning.
*/
BufferDesc *
StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_ring)
BufferDesc *buf;
int bgwprocno;
int trycounter;
- uint32 local_buf_state; /* to avoid repeated (de-)referencing */
*from_ring = false;
trycounter = NBuffers;
for (;;)
{
+ uint32 old_buf_state;
+ uint32 local_buf_state;
+
buf = GetBufferDescriptor(ClockSweepTick());
/*
- * If the buffer is pinned or has a nonzero usage_count, we cannot use
- * it; decrement the usage_count (unless pinned) and keep scanning.
+ * Check whether the buffer can be used and pin it if so. Do this
+ * using a CAS loop, to avoid having to lock the buffer header.
*/
- local_buf_state = LockBufHdr(buf);
-
- if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0)
+ old_buf_state = pg_atomic_read_u32(&buf->state);
+ for (;;)
{
+ local_buf_state = old_buf_state;
+
+ /*
+ * If the buffer is pinned or has a nonzero usage_count, we cannot
+ * use it; decrement the usage_count (unless pinned) and keep
+ * scanning.
+ */
+
+ if (BUF_STATE_GET_REFCOUNT(local_buf_state) != 0)
+ {
+ if (--trycounter == 0)
+ {
+ /*
+ * We've scanned all the buffers without making any state
+ * changes, so all the buffers are pinned (or were when we
+ * looked at them). We could hope that someone will free
+ * one eventually, but it's probably better to fail than
+ * to risk getting stuck in an infinite loop.
+ */
+ elog(ERROR, "no unpinned buffers available");
+ }
+ break;
+ }
+
+ if (unlikely(local_buf_state & BM_LOCKED))
+ {
+ old_buf_state = WaitBufHdrUnlocked(buf);
+ continue;
+ }
+
if (BUF_STATE_GET_USAGECOUNT(local_buf_state) != 0)
{
local_buf_state -= BUF_USAGECOUNT_ONE;
- trycounter = NBuffers;
+ if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
+ local_buf_state))
+ {
+ trycounter = NBuffers;
+ break;
+ }
}
else
{
- /* Found a usable buffer */
- if (strategy != NULL)
- AddBufferToRing(strategy, buf);
- *buf_state = local_buf_state;
- return buf;
+ /* pin the buffer if the CAS succeeds */
+ local_buf_state += BUF_REFCOUNT_ONE;
+
+ if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
+ local_buf_state))
+ {
+ /* Found a usable buffer */
+ if (strategy != NULL)
+ AddBufferToRing(strategy, buf);
+ *buf_state = local_buf_state;
+
+ TrackNewBufferPin(BufferDescriptorGetBuffer(buf));
+
+ return buf;
+ }
}
+
}
- else if (--trycounter == 0)
- {
- /*
- * We've scanned all the buffers without making any state changes,
- * so all the buffers are pinned (or were when we looked at them).
- * We could hope that someone will free one eventually, but it's
- * probably better to fail than to risk getting stuck in an
- * infinite loop.
- */
- UnlockBufHdr(buf, local_buf_state);
- elog(ERROR, "no unpinned buffers available");
- }
- UnlockBufHdr(buf, local_buf_state);
}
}
* GetBufferFromRing -- returns a buffer from the ring, or NULL if the
* ring is empty / not usable.
*
- * The bufhdr spin lock is held on the returned buffer.
+ * The buffer is pinned and marked as owned, using TrackNewBufferPin(), before
+ * returning.
*/
static BufferDesc *
GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state)
{
BufferDesc *buf;
Buffer bufnum;
+ uint32 old_buf_state;
uint32 local_buf_state; /* to avoid repeated (de-)referencing */
if (bufnum == InvalidBuffer)
return NULL;
+ buf = GetBufferDescriptor(bufnum - 1);
+
/*
- * If the buffer is pinned we cannot use it under any circumstances.
- *
- * If usage_count is 0 or 1 then the buffer is fair game (we expect 1,
- * since our own previous usage of the ring element would have left it
- * there, but it might've been decremented by clock-sweep since then). A
- * higher usage_count indicates someone else has touched the buffer, so we
- * shouldn't re-use it.
+ * Check whether the buffer can be used and pin it if so. Do this using a
+ * CAS loop, to avoid having to lock the buffer header.
*/
- buf = GetBufferDescriptor(bufnum - 1);
- local_buf_state = LockBufHdr(buf);
- if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0
- && BUF_STATE_GET_USAGECOUNT(local_buf_state) <= 1)
+ old_buf_state = pg_atomic_read_u32(&buf->state);
+ for (;;)
{
- *buf_state = local_buf_state;
- return buf;
+ local_buf_state = old_buf_state;
+
+ /*
+ * If the buffer is pinned we cannot use it under any circumstances.
+ *
+ * If usage_count is 0 or 1 then the buffer is fair game (we expect 1,
+ * since our own previous usage of the ring element would have left it
+ * there, but it might've been decremented by clock-sweep since then).
+ * A higher usage_count indicates someone else has touched the buffer,
+ * so we shouldn't re-use it.
+ */
+ if (BUF_STATE_GET_REFCOUNT(local_buf_state) != 0
+ || BUF_STATE_GET_USAGECOUNT(local_buf_state) > 1)
+ break;
+
+ if (unlikely(local_buf_state & BM_LOCKED))
+ {
+ old_buf_state = WaitBufHdrUnlocked(buf);
+ continue;
+ }
+
+ /* pin the buffer if the CAS succeeds */
+ local_buf_state += BUF_REFCOUNT_ONE;
+
+ if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
+ local_buf_state))
+ {
+ *buf_state = local_buf_state;
+
+ TrackNewBufferPin(BufferDescriptorGetBuffer(buf));
+ return buf;
+ }
}
- UnlockBufHdr(buf, local_buf_state);
/*
* Tell caller to allocate a new buffer with the normal allocation