Trial group locking code for ProcSleep. group_locking
authorRobert Haas <[email protected]>
Tue, 14 Oct 2014 22:30:28 +0000 (18:30 -0400)
committerRobert Haas <[email protected]>
Wed, 15 Oct 2014 02:53:15 +0000 (22:53 -0400)
src/backend/storage/lmgr/proc.c

index b671800bd46678b119ba21724cd6ef0e81eb4b4b..1af985115a0c71d8336634301e7d3872aaf7deba 100644 (file)
@@ -1037,18 +1037,53 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
        bool            early_deadlock = false;
        bool            allow_autovacuum_cancel = true;
        int                     myWaitStatus;
-       PGPROC     *proc;
+       PGPROC     *proc = NULL;
        int                     i;
+       PGPROC     *groupLeader = LockGroupLeader;
+
+       /*
+        * Ignore trivial lock groups.
+        *
+        * We read MyProc->lockGroupMembers here without a lock.  The read itself
+        * is atomic; while the value could be changing under us, it can't change
+        * from a value < 2 to a value >= 2 while any group locks are actually
+        * present.  Similarly, when iterating over the wait queue, we needn't
+        * worry that the lock group membership of a process will change under us:
+        * that's not allowed while a process holds any locks.
+        */
+       if (MyProc == groupLeader && MyProc->lockGroupMembers >= 2)
+               groupLeader = NULL;
 
        /*
         * Determine where to add myself in the wait queue.
         *
-        * Normally I should go at the end of the queue.  However, if I already
-        * hold locks that conflict with the request of any previous waiter, put
-        * myself in the queue just in front of the first such waiter. This is not
-        * a necessary step, since deadlock detection would move me to before that
-        * waiter anyway; but it's relatively cheap to detect such a conflict
-        * immediately, and avoid delaying till deadlock timeout.
+        * Normally I should go at the end of the queue.  However, if I'm a
+        * member of a lock group, and some other member of the lock group is
+        * already waiting for a lock, then add add myself just after the
+        * existing waiters.  This is necessary for correctness; any code that
+        * scans the wait queue is entitled to assume that lockers from the same
+        * group are in consecutive positions in the queue.
+        */
+       if (groupLeader != NULL)
+       {
+               PGPROC *cproc = (PGPROC *) waitQueue->links.next;
+
+               for (i = 0; i < waitQueue->size; i++)
+               {
+                       if (cproc->lockGroupLeader == LockGroupLeader)
+                               proc = cproc;
+                       else if (proc != NULL)
+                               break;
+                       cproc = (PGPROC *) cproc->links.next;
+               }
+       }
+
+       /*
+        * If I already hold locks that conflict with the request of any previous
+        * waiter, put myself in the queue just in front of the first such waiter.
+        * This is not a necessary step, since deadlock detection would move me
+        * to before that waiter anyway; but it's relatively cheap to detect such
+        * a conflict immediately, and avoid delaying till deadlock timeout.
         *
         * Special case: if I find I should go in front of some waiter, check to
         * see if I conflict with already-held locks or the requests before that
@@ -1059,16 +1094,24 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
         */
        if (myHeldLocks != 0)
        {
+               PGPROC *cproc = (PGPROC *) waitQueue->links.next;
                LOCKMASK        aheadRequests = 0;
 
-               proc = (PGPROC *) waitQueue->links.next;
                for (i = 0; i < waitQueue->size; i++)
                {
+                       /*
+                        * If we reached our own lock group in the wait queue without
+                        * finding a conflict, we aren't going to find one at all prior
+                        * to the insertion point, so bail out.
+                        */
+                       if (groupLeader != NULL && cproc->lockGroupLeader == groupLeader)
+                               break;
+
                        /* Must he wait for me? */
-                       if (lockMethodTable->conflictTab[proc->waitLockMode] & myHeldLocks)
+                       if (lockMethodTable->conflictTab[cproc->waitLockMode] & myHeldLocks)
                        {
                                /* Must I wait for him ? */
-                               if (lockMethodTable->conflictTab[lockmode] & proc->heldLocks)
+                               if (lockMethodTable->conflictTab[lockmode] & cproc->heldLocks)
                                {
                                        /*
                                         * Yes, so we have a deadlock.  Easiest way to clean up
@@ -1077,7 +1120,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
                                         * a flag to check below, and break out of loop.  Also,
                                         * record deadlock info for later message.
                                         */
-                                       RememberSimpleDeadLock(MyProc, lockmode, lock, proc);
+                                       RememberSimpleDeadLock(MyProc, lockmode, lock, cproc);
                                        early_deadlock = true;
                                        break;
                                }
@@ -1093,22 +1136,19 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
                                        GrantAwaitedLock();
                                        return STATUS_OK;
                                }
-                               /* Break out of loop to put myself before him */
+                               /* Break out of loop and put myself before him */
+                               proc = cproc;
                                break;
                        }
                        /* Nope, so advance to next waiter */
-                       aheadRequests |= LOCKBIT_ON(proc->waitLockMode);
-                       proc = (PGPROC *) proc->links.next;
+                       aheadRequests |= LOCKBIT_ON(cproc->waitLockMode);
+                       cproc = (PGPROC *) cproc->links.next;
                }
-
-               /*
-                * If we fall out of loop normally, proc points to waitQueue head, so
-                * we will insert at tail of queue as desired.
-                */
        }
-       else
+
+       if (proc == NULL)
        {
-               /* I hold no locks, so I can't push in front of anyone. */
+               /* No special case applies, so I can't push in front of anyone. */
                proc = (PGPROC *) &(waitQueue->links);
        }