#define DROP_RELS_BSEARCH_THRESHOLD        20
 
+typedef struct PrivateRefCountEntry
+{
+   Buffer buffer;
+   int32 refcount;
+} PrivateRefCountEntry;
+
+/* 64 bytes, about the size of a cache line on common systems */
+#define REFCOUNT_ARRAY_ENTRIES 8
+
 /* GUC variables */
 bool       zero_damaged_pages = false;
 int            bgwriter_lru_maxpages = 100;
 /* local state for LockBufferForCleanup */
 static volatile BufferDesc *PinCountWaitBuf = NULL;
 
+/*
+ * Backend-Private refcount management:
+ *
+ * Each buffer also has a private refcount that keeps track of the number of
+ * times the buffer is pinned in the current process.  This is so that the
+ * shared refcount needs to be modified only once if a buffer is pinned more
+ * than once by a individual backend.  It's also used to check that no buffers
+ * are still pinned at the end of transactions and when exiting.
+ *
+ *
+ * To avoid - as we used to - requiring an array with NBuffers entries to keep
+ * track of local buffers we use a small sequentially searched array
+ * (PrivateRefCountArray) and a overflow hash table (PrivateRefCountHash) to
+ * keep track of backend local pins.
+ *
+ * Until no more than REFCOUNT_ARRAY_ENTRIES buffers are pinned at once, all
+ * refcounts are kept track of in the array; after that, new array entries
+ * displace old ones into the hash table. That way a frequently used entry
+ * can't get "stuck" in the hashtable while infrequent ones clog the array.
+ *
+ * Note that in most scenarios the number of pinned buffers will not exceed
+ * REFCOUNT_ARRAY_ENTRIES.
+ */
+static struct PrivateRefCountEntry PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES];
+static HTAB *PrivateRefCountHash = NULL;
+static int32 PrivateRefCountOverflowed = 0;
+static uint32 PrivateRefCountClock = 0;
+
+static PrivateRefCountEntry* GetPrivateRefCountEntry(Buffer buffer, bool create, bool do_move);
+static inline int32 GetPrivateRefCount(Buffer buffer);
+static void ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref);
+
+/*
+ * Return the PrivateRefCount entry for the passed buffer.
+ *
+ * Returns NULL if create = false is passed and the buffer doesn't have a
+ * PrivateRefCount entry; allocates a new PrivateRefCountEntry if currently
+ * none exists and create = true is passed.
+ *
+ * If do_move is true - only allowed for create = false - the entry is
+ * optimized for frequent access.
+ *
+ * When a returned refcount entry isn't used anymore it has to be forgotten,
+ * using ForgetPrivateRefCountEntry().
+ *
+ * Only works for shared buffers.
+ */
+static PrivateRefCountEntry*
+GetPrivateRefCountEntry(Buffer buffer, bool create, bool do_move)
+{
+   PrivateRefCountEntry *res;
+   PrivateRefCountEntry *free = NULL;
+   bool        found = false;
+   int         i;
+
+   Assert(!create || do_move);
+   Assert(BufferIsValid(buffer));
+   Assert(!BufferIsLocal(buffer));
+
+   /*
+    * First search for references in the array, that'll be sufficient in the
+    * majority of cases.
+    */
+   for (i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
+   {
+       res = &PrivateRefCountArray[i];
+
+       if (res->buffer == buffer)
+           return res;
+
+       /* Remember where to put a new refcount, should it become necessary. */
+       if (free == NULL && res->buffer == InvalidBuffer)
+           free = res;
+   }
+
+   /*
+    * By here we know that the buffer, if already pinned, isn't residing in
+    * the array.
+    */
+   res = NULL;
+   found = false;
+
+   /*
+    * Look up the buffer in the hashtable if we've previously overflowed into
+    * it.
+    */
+   if (PrivateRefCountOverflowed > 0)
+   {
+       res = hash_search(PrivateRefCountHash,
+                         (void *) &buffer,
+                         HASH_FIND,
+                         &found);
+   }
+
+   if (!found)
+   {
+       if (!create)
+       {
+           /* Neither array nor hash have an entry and no new entry is needed */
+           return NULL;
+       }
+       else if (free != NULL)
+       {
+           /* add entry into the free array slot */
+           free->buffer = buffer;
+           free->refcount = 0;
+
+           return free;
+       }
+       else
+       {
+           /*
+            * Move entry from the current clock position in the array into the
+            * hashtable. Use that slot.
+            */
+           PrivateRefCountEntry *arrayent;
+           PrivateRefCountEntry *hashent;
+
+           /* select victim slot */
+           arrayent = &PrivateRefCountArray[
+               PrivateRefCountClock++ % REFCOUNT_ARRAY_ENTRIES];
+           Assert(arrayent->buffer != InvalidBuffer);
+
+           /* enter victim array entry into hashtable */
+           hashent = hash_search(PrivateRefCountHash,
+                                 (void *) &arrayent->buffer,
+                                 HASH_ENTER,
+                                 &found);
+           Assert(!found);
+           hashent->refcount = arrayent->refcount;
+
+           /* fill the now free array slot */
+           arrayent->buffer = buffer;
+           arrayent->refcount = 0;
+
+           PrivateRefCountOverflowed++;
+
+           return arrayent;
+
+       }
+   }
+   else
+   {
+       if (!do_move)
+       {
+           return res;
+       }
+       else if (found && free != NULL)
+       {
+           /* move buffer from hashtable into the free array slot */
+
+           /* fill array slot */
+           free->buffer = buffer;
+           free->refcount = res->refcount;
+
+           /* delete from hashtable */
+           hash_search(PrivateRefCountHash,
+                       (void *) &buffer,
+                       HASH_REMOVE,
+                       &found);
+           Assert(found);
+           Assert(PrivateRefCountOverflowed > 0);
+           PrivateRefCountOverflowed--;
+
+           return free;
+       }
+       else
+       {
+           /*
+            * Swap the entry in the hash table with the one in the array at the
+            * current clock position.
+            */
+           PrivateRefCountEntry *arrayent;
+           PrivateRefCountEntry *hashent;
+
+           /* select victim slot */
+           arrayent = &PrivateRefCountArray[
+               PrivateRefCountClock++ % REFCOUNT_ARRAY_ENTRIES];
+           Assert(arrayent->buffer != InvalidBuffer);
+
+           /* enter victim entry into the hashtable */
+           hashent = hash_search(PrivateRefCountHash,
+                                 (void *) &arrayent->buffer,
+                                 HASH_ENTER,
+                                 &found);
+           Assert(!found);
+           hashent->refcount = arrayent->refcount;
+
+           /* fill now free array entry with previously searched entry */
+           arrayent->buffer = res->buffer;
+           arrayent->refcount = res->refcount;
+
+           /* and remove the old entry */
+           hash_search(PrivateRefCountHash,
+                       (void *) &arrayent->buffer,
+                       HASH_REMOVE,
+                       &found);
+           Assert(found);
+
+           /* PrivateRefCountOverflowed stays the same -1 + +1 = 0*/
+
+           return arrayent;
+       }
+   }
+
+   Assert(false); /* unreachable */
+   return NULL;
+}
+
+/*
+ * Returns how many times the passed buffer is pinned by this backend.
+ *
+ * Only works for shared memory buffers!
+ */
+static inline int32
+GetPrivateRefCount(Buffer buffer)
+{
+   PrivateRefCountEntry *ref;
+
+   Assert(BufferIsValid(buffer));
+   Assert(!BufferIsLocal(buffer));
+
+   ref = GetPrivateRefCountEntry(buffer, false, false);
+
+   if (ref == NULL)
+       return 0;
+   return ref->refcount;
+}
+
+/*
+ * Release resources used to track the reference count of a buffer which we no
+ * longer have pinned and don't want to pin again immediately.
+ */
+static void
+ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref)
+{
+   Assert(ref->refcount == 0);
+
+   if (ref >= &PrivateRefCountArray[0] &&
+       ref < &PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES])
+   {
+       ref->buffer = InvalidBuffer;
+   }
+   else
+   {
+       bool found;
+       Buffer buffer = ref->buffer;
+       hash_search(PrivateRefCountHash,
+                   (void *) &buffer,
+                   HASH_REMOVE,
+                   &found);
+       Assert(found);
+       Assert(PrivateRefCountOverflowed > 0);
+       PrivateRefCountOverflowed--;
+   }
+}
+
+/*
+ * BufferIsPinned
+ *     True iff the buffer is pinned (also checks for valid buffer number).
+ *
+ *     NOTE: what we check here is that *this* backend holds a pin on
+ *     the buffer.  We do not care whether some other backend does.
+ */
+#define BufferIsPinned(bufnum) \
+( \
+   !BufferIsValid(bufnum) ? \
+       false \
+   : \
+       BufferIsLocal(bufnum) ? \
+           (LocalRefCount[-(bufnum) - 1] > 0) \
+       : \
+   (GetPrivateRefCount(bufnum) > 0) \
+)
+
 
 static Buffer ReadBuffer_common(SMgrRelation reln, char relpersistence,
                  ForkNumber forkNum, BlockNumber blockNum,
        UnlockBufHdr(buf);
        LWLockRelease(oldPartitionLock);
        /* safety check: should definitely not be our *own* pin */
-       if (PrivateRefCount[buf->buf_id] != 0)
+       if (GetPrivateRefCount(buf->buf_id) > 0)
            elog(ERROR, "buffer is pinned in InvalidateBuffer");
        WaitIO(buf);
        goto retry;
 
    bufHdr = &BufferDescriptors[buffer - 1];
 
-   Assert(PrivateRefCount[buffer - 1] > 0);
+   Assert(BufferIsPinned(buffer));
    /* unfortunately we can't check if the lock is held exclusively */
    Assert(LWLockHeldByMe(bufHdr->content_lock));
 
 
    if (BufferIsValid(buffer))
    {
+       Assert(BufferIsPinned(buffer));
        if (BufferIsLocal(buffer))
        {
-           Assert(LocalRefCount[-buffer - 1] > 0);
            bufHdr = &LocalBufferDescriptors[-buffer - 1];
            if (bufHdr->tag.blockNum == blockNum &&
                RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
        }
        else
        {
-           Assert(PrivateRefCount[buffer - 1] > 0);
            bufHdr = &BufferDescriptors[buffer - 1];
            /* we have pin, so it's ok to examine tag without spinlock */
            if (bufHdr->tag.blockNum == blockNum &&
 {
    int         b = buf->buf_id;
    bool        result;
+   PrivateRefCountEntry *ref;
 
-   if (PrivateRefCount[b] == 0)
+   ref = GetPrivateRefCountEntry(b + 1, true, true);
+
+   if (ref->refcount == 0)
    {
        LockBufHdr(buf);
        buf->refcount++;
        /* If we previously pinned the buffer, it must surely be valid */
        result = true;
    }
-   PrivateRefCount[b]++;
-   Assert(PrivateRefCount[b] > 0);
+
+   ref->refcount++;
+   Assert(ref->refcount > 0);
    ResourceOwnerRememberBuffer(CurrentResourceOwner,
                                BufferDescriptorGetBuffer(buf));
    return result;
 PinBuffer_Locked(volatile BufferDesc *buf)
 {
    int         b = buf->buf_id;
+   PrivateRefCountEntry *ref;
+
+   ref = GetPrivateRefCountEntry(b + 1, true, true);
 
-   if (PrivateRefCount[b] == 0)
+   if (ref->refcount == 0)
        buf->refcount++;
    UnlockBufHdr(buf);
-   PrivateRefCount[b]++;
-   Assert(PrivateRefCount[b] > 0);
+   ref->refcount++;
+   Assert(ref->refcount > 0);
    ResourceOwnerRememberBuffer(CurrentResourceOwner,
                                BufferDescriptorGetBuffer(buf));
 }
 static void
 UnpinBuffer(volatile BufferDesc *buf, bool fixOwner)
 {
+   PrivateRefCountEntry *ref;
    int         b = buf->buf_id;
 
+   ref = GetPrivateRefCountEntry(b + 1, false, false);
+   Assert(ref != NULL);
+
    if (fixOwner)
        ResourceOwnerForgetBuffer(CurrentResourceOwner,
                                  BufferDescriptorGetBuffer(buf));
 
-   Assert(PrivateRefCount[b] > 0);
-   PrivateRefCount[b]--;
-   if (PrivateRefCount[b] == 0)
+   Assert(ref->refcount > 0);
+   ref->refcount--;
+   if (ref->refcount == 0)
    {
        /* I'd better not still hold any locks on the buffer */
        Assert(!LWLockHeldByMe(buf->content_lock));
        }
        else
            UnlockBufHdr(buf);
+
+       ForgetPrivateRefCountEntry(ref);
    }
 }
 
 
 /*
  *     AtEOXact_Buffers - clean up at end of transaction.
+ *
+ *     As of PostgreSQL 8.0, buffer pins should get released by the
+ *     ResourceOwner mechanism.  This routine is just a debugging
+ *     cross-check that no pins remain.
  */
 void
 AtEOXact_Buffers(bool isCommit)
    CheckForBufferLeaks();
 
    AtEOXact_LocalBuffers(isCommit);
+
+   Assert(PrivateRefCountOverflowed == 0);
+}
+
+/*
+ * Initialize access to shared buffer pool
+ *
+ * This is called during backend startup (whether standalone or under the
+ * postmaster).  It sets up for this backend's access to the already-existing
+ * buffer pool.
+ *
+ * NB: this is called before InitProcess(), so we do not have a PGPROC and
+ * cannot do LWLockAcquire; hence we can't actually access stuff in
+ * shared memory yet.  We are only initializing local data here.
+ * (See also InitBufferPoolBackend)
+ */
+void
+InitBufferPoolAccess(void)
+{
+   HASHCTL     hash_ctl;
+
+   memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray));
+
+   MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+   hash_ctl.keysize = sizeof(int32);
+   hash_ctl.entrysize = sizeof(PrivateRefCountArray);
+   hash_ctl.hash = oid_hash; /* a bit more efficient than tag_hash */
+
+   PrivateRefCountHash = hash_create("PrivateRefCount", 100, &hash_ctl,
+                                     HASH_ELEM | HASH_FUNCTION);
 }
 
 /*
 {
 #ifdef USE_ASSERT_CHECKING
    int         RefCountErrors = 0;
-   Buffer      b;
+   PrivateRefCountEntry *res;
+   int         i;
+
+   /* check the array */
+   for (i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
+   {
+       res = &PrivateRefCountArray[i];
+
+       if (res->buffer != InvalidBuffer)
+       {
+           PrintBufferLeakWarning(res->buffer);
+           RefCountErrors++;
+       }
+   }
 
-   for (b = 1; b <= NBuffers; b++)
+   /* if neccessary search the hash */
+   if (PrivateRefCountOverflowed)
    {
-       if (PrivateRefCount[b - 1] != 0)
+       HASH_SEQ_STATUS hstat;
+       hash_seq_init(&hstat, PrivateRefCountHash);
+       while ((res = (PrivateRefCountEntry *) hash_seq_search(&hstat)) != NULL)
        {
-           PrintBufferLeakWarning(b);
+           PrintBufferLeakWarning(res->buffer);
            RefCountErrors++;
        }
+
    }
+
    Assert(RefCountErrors == 0);
 #endif
 }
    else
    {
        buf = &BufferDescriptors[buffer - 1];
-       loccount = PrivateRefCount[buffer - 1];
+       loccount = GetPrivateRefCount(buffer);
        backend = InvalidBackendId;
    }
 
             i, buf->freeNext,
          relpathbackend(buf->tag.rnode, InvalidBackendId, buf->tag.forkNum),
             buf->tag.blockNum, buf->flags,
-            buf->refcount, PrivateRefCount[i]);
+            buf->refcount, GetPrivateRefCount(i));
    }
 }
 #endif
 
    for (i = 0; i < NBuffers; ++i, ++buf)
    {
-       if (PrivateRefCount[i] > 0)
+       if (GetPrivateRefCount(i + 1) > 0)
        {
            /* theoretically we should lock the bufhdr here */
            elog(LOG,
                 i, buf->freeNext,
                 relpath(buf->tag.rnode, buf->tag.forkNum),
                 buf->tag.blockNum, buf->flags,
-                buf->refcount, PrivateRefCount[i]);
+                buf->refcount, GetPrivateRefCount(i + 1));
        }
    }
 }
 ReleaseBuffer(Buffer buffer)
 {
    volatile BufferDesc *bufHdr;
+   PrivateRefCountEntry *ref;
 
    if (!BufferIsValid(buffer))
        elog(ERROR, "bad buffer ID: %d", buffer);
 
    bufHdr = &BufferDescriptors[buffer - 1];
 
-   Assert(PrivateRefCount[buffer - 1] > 0);
+   ref = GetPrivateRefCountEntry(buffer, false, false);
+   Assert(ref != NULL);
+   Assert(ref->refcount > 0);
 
-   if (PrivateRefCount[buffer - 1] > 1)
-       PrivateRefCount[buffer - 1]--;
+   if (ref->refcount > 1)
+       ref->refcount--;
    else
        UnpinBuffer(bufHdr, false);
 }
    if (BufferIsLocal(buffer))
        LocalRefCount[-buffer - 1]++;
    else
-       PrivateRefCount[buffer - 1]++;
+   {
+       PrivateRefCountEntry *ref;
+       ref = GetPrivateRefCountEntry(buffer, false, true);
+       Assert(ref != NULL);
+       ref->refcount++;
+   }
 }
 
 /*
 
    bufHdr = &BufferDescriptors[buffer - 1];
 
-   Assert(PrivateRefCount[buffer - 1] > 0);
+   Assert(GetPrivateRefCount(buffer) > 0);
    /* here, either share or exclusive lock is OK */
    Assert(LWLockHeldByMe(bufHdr->content_lock));
 
    }
 
    /* There should be exactly one local pin */
-   if (PrivateRefCount[buffer - 1] != 1)
+   if (GetPrivateRefCount(buffer) != 1)
        elog(ERROR, "incorrect local pin count: %d",
-            PrivateRefCount[buffer - 1]);
+            GetPrivateRefCount(buffer));
 
    bufHdr = &BufferDescriptors[buffer - 1];
 
    if (bufid < 0)
        return false;
 
-   if (PrivateRefCount[bufid] > 0)
+   if (GetPrivateRefCount(bufid + 1) > 0)
        return true;
 
    return false;
    }
 
    /* There should be exactly one local pin */
-   Assert(PrivateRefCount[buffer - 1] > 0);
-   if (PrivateRefCount[buffer - 1] != 1)
+   Assert(GetPrivateRefCount(buffer) > 0);
+   if (GetPrivateRefCount(buffer) != 1)
        return false;
 
    /* Try to acquire lock */