Back-patch fix to grab read lock on a buffer while it is written out.
authorTom Lane <[email protected]>
Mon, 25 Sep 2000 04:34:10 +0000 (04:34 +0000)
committerTom Lane <[email protected]>
Mon, 25 Sep 2000 04:34:10 +0000 (04:34 +0000)
src/backend/storage/buffer/bufmgr.c

index aa93a04b3c174953eb981a74d28b6c9740125216..a1edc3a3db1f8fb3331f0cfaa4e6b27ef2842598 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.81 2000/05/19 03:22:28 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.81.2.1 2000/09/25 04:34:10 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -75,7 +75,6 @@ static void WaitIO(BufferDesc *buf, SPINLOCK spinlock);
 static void StartBufferIO(BufferDesc *buf, bool forInput);
 static void TerminateBufferIO(BufferDesc *buf);
 static void ContinueBufferIO(BufferDesc *buf, bool forInput);
-extern void InitBufferIO(void);
 extern void AbortBufferIO(void);
 
 /*
@@ -430,17 +429,10 @@ BufferAlloc(Relation reln,
    inProgress = FALSE;
    for (buf = (BufferDesc *) NULL; buf == (BufferDesc *) NULL;)
    {
-
-       /* GetFreeBuffer will abort if it can't find a free buffer */
        buf = GetFreeBuffer();
 
-       /*
-        * But it can return buf == NULL if we are in aborting transaction
-        * now and so elog(ERROR,...) in GetFreeBuffer will not abort
-        * again.
-        */
-       if (buf == NULL)
-           return NULL;
+       /* GetFreeBuffer will abort if it can't find a free buffer */
+       Assert(buf);
 
        /*
         * There should be exactly one pin on the buffer after it is
@@ -790,11 +782,21 @@ FlushBuffer(Buffer buffer, bool release)
    WaitIO(bufHdr, BufMgrLock); /* confirm end of IO */
    bufHdr->flags &= ~BM_JUST_DIRTIED;
    StartBufferIO(bufHdr, false);       /* output IO start */
+
    SpinRelease(BufMgrLock);
 
+   /*
+    * Grab a read lock on the buffer to ensure that no
+    * other backend changes its contents while we write it;
+    * see comments in BufferSync().
+    */
+   LockBuffer(BufferDescriptorGetBuffer(bufHdr), BUFFER_LOCK_SHARE);
+
    status = smgrflush(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
                       (char *) MAKE_PTR(bufHdr->data));
 
+   LockBuffer(BufferDescriptorGetBuffer(bufHdr), BUFFER_LOCK_UNLOCK);
+
    /* drop relcache refcnt incremented by RelationIdCacheGetRelation */
    RelationDecrementReferenceCount(bufrel);
 
@@ -1018,19 +1020,6 @@ ClearBufferDirtiedByMe(Buffer buffer, BufferDesc *bufHdr)
  *     that have been dirtied by the current xact and flush them to disk.
  *     We do *not* flush dirty buffers that have been dirtied by other xacts.
  *     (This is a substantial change from pre-7.0 behavior.)
- *
- * OLD COMMENTS (do these still apply?)
- *
- *     Also, we need to be sure that no other transaction is
- *     modifying the page as we flush it.  This is only a problem for objects
- *     that use a non-two-phase locking protocol, like btree indices.  For
- *     those objects, we would like to set a write lock for the duration of
- *     our IO.  Another possibility is to code updates to btree pages
- *     carefully, so that writing them out out of order cannot cause
- *     any unrecoverable errors.
- *
- *     I don't want to think hard about this right now, so I will try
- *     to come back to it later.
  */
 static void
 BufferSync()
@@ -1113,15 +1102,28 @@ BufferSync()
                    bufHdr->flags &= ~BM_JUST_DIRTIED;
                    StartBufferIO(bufHdr, false);       /* output IO start */
 
+                   SpinRelease(BufMgrLock);
+
+                   /*
+                    * Grab a read lock on the buffer to ensure that no
+                    * other backend changes its contents while we write it;
+                    * otherwise we could write a non-self-consistent page
+                    * image to disk, which'd be bad news if the other
+                    * transaction aborts before writing its changes.
+                    *
+                    * Note that we still need the BM_JUST_DIRTIED mechanism
+                    * in case someone dirties the buffer just before we
+                    * grab this lock or just after we release it.
+                    */
+                   LockBuffer(BufferDescriptorGetBuffer(bufHdr),
+                              BUFFER_LOCK_SHARE);
+
                    /*
                     * If we didn't have the reldesc in our local cache,
                     * write this page out using the 'blind write' storage
                     * manager routine.  If we did find it, use the
                     * standard interface.
                     */
-#ifndef OPTIMIZE_SINGLE
-                   SpinRelease(BufMgrLock);
-#endif  /* OPTIMIZE_SINGLE */
                    if (reln == (Relation) NULL)
                    {
                        status = smgrblindwrt(DEFAULT_SMGR,
@@ -1138,9 +1140,14 @@ BufferSync()
                                           bufHdr->tag.blockNum,
                                        (char *) MAKE_PTR(bufHdr->data));
                    }
-#ifndef OPTIMIZE_SINGLE
+
+                   /*
+                    * Release the per-buffer readlock, reacquire BufMgrLock.
+                    */
+                   LockBuffer(BufferDescriptorGetBuffer(bufHdr),
+                              BUFFER_LOCK_UNLOCK);
+
                    SpinAcquire(BufMgrLock);
-#endif  /* OPTIMIZE_SINGLE */
 
                    UnpinBuffer(bufHdr);
                    if (status == SM_FAIL)
@@ -1523,9 +1530,14 @@ BufferReplace(BufferDesc *bufHdr)
    /* To check if block content changed while flushing. - vadim 01/17/97 */
    bufHdr->flags &= ~BM_JUST_DIRTIED;
 
-#ifndef OPTIMIZE_SINGLE
    SpinRelease(BufMgrLock);
-#endif  /* OPTIMIZE_SINGLE */
+
+   /*
+    * Grab a read lock on the buffer to ensure that no
+    * other backend changes its contents while we write it;
+    * see comments in BufferSync().
+    */
+   LockBuffer(BufferDescriptorGetBuffer(bufHdr), BUFFER_LOCK_SHARE);
 
    if (reln != (Relation) NULL)
    {
@@ -1541,9 +1553,9 @@ BufferReplace(BufferDesc *bufHdr)
                              false);   /* no fsync */
    }
 
-#ifndef OPTIMIZE_SINGLE
+   LockBuffer(BufferDescriptorGetBuffer(bufHdr), BUFFER_LOCK_UNLOCK);
+
    SpinAcquire(BufMgrLock);
-#endif  /* OPTIMIZE_SINGLE */
 
    /* drop relcache refcnt incremented by RelationIdCacheGetRelation */
    if (reln != (Relation) NULL)
@@ -2488,11 +2500,13 @@ ContinueBufferIO(BufferDesc *buf, bool forInput)
    IsForInput = forInput;
 }
 
+#ifdef NOT_USED
 void
 InitBufferIO(void)
 {
    InProgressBuf = (BufferDesc *) 0;
 }
+#endif
 
 /*
  * This function is called from ProcReleaseSpins().