Buffer *buffers,
                     BlockNumber blockNum,
                     int *nblocks,
-                    int flags)
+                    int flags,
+                    bool allow_forwarding)
 {
    int         actual_nblocks = *nblocks;
-   int         io_buffers_len = 0;
    int         maxcombine = 0;
 
+   Assert(*nblocks == 1 || allow_forwarding);
    Assert(*nblocks > 0);
    Assert(*nblocks <= MAX_IO_COMBINE_LIMIT);
 
    {
        bool        found;
 
-       buffers[i] = PinBufferForBlock(operation->rel,
-                                      operation->smgr,
-                                      operation->persistence,
-                                      operation->forknum,
-                                      blockNum + i,
-                                      operation->strategy,
-                                      &found);
+       if (allow_forwarding && buffers[i] != InvalidBuffer)
+       {
+           BufferDesc *bufHdr;
+
+           /*
+            * This is a buffer that was pinned by an earlier call to
+            * StartReadBuffers(), but couldn't be handled in one operation at
+            * that time.  The operation was split, and the caller has passed
+            * an already pinned buffer back to us to handle the rest of the
+            * operation.  It must continue at the expected block number.
+            */
+           Assert(BufferGetBlockNumber(buffers[i]) == blockNum + i);
+
+           /*
+            * It might be an already valid buffer (a hit) that followed the
+            * final contiguous block of an earlier I/O (a miss) marking the
+            * end of it, or a buffer that some other backend has since made
+            * valid by performing the I/O for us, in which case we can handle
+            * it as a hit now.  It is safe to check for a BM_VALID flag with
+            * a relaxed load, because we got a fresh view of it while pinning
+            * it in the previous call.
+            *
+            * On the other hand if we don't see BM_VALID yet, it must be an
+            * I/O that was split by the previous call and we need to try to
+            * start a new I/O from this block.  We're also racing against any
+            * other backend that might start the I/O or even manage to mark
+            * it BM_VALID after this check, but StartBufferIO() will handle
+            * those cases.
+            */
+           if (BufferIsLocal(buffers[i]))
+               bufHdr = GetLocalBufferDescriptor(-buffers[i] - 1);
+           else
+               bufHdr = GetBufferDescriptor(buffers[i] - 1);
+           Assert(pg_atomic_read_u32(&bufHdr->state) & BM_TAG_VALID);
+           found = pg_atomic_read_u32(&bufHdr->state) & BM_VALID;
+       }
+       else
+       {
+           buffers[i] = PinBufferForBlock(operation->rel,
+                                          operation->smgr,
+                                          operation->persistence,
+                                          operation->forknum,
+                                          blockNum + i,
+                                          operation->strategy,
+                                          &found);
+       }
 
        if (found)
        {
            /*
-            * Terminate the read as soon as we get a hit.  It could be a
-            * single buffer hit, or it could be a hit that follows a readable
-            * range.  We don't want to create more than one readable range,
-            * so we stop here.
+            * We have a hit.  If it's the first block in the requested range,
+            * we can return it immediately and report that WaitReadBuffers()
+            * does not need to be called.  If the initial value of *nblocks
+            * was larger, the caller will have to call again for the rest.
             */
-           actual_nblocks = i + 1;
+           if (i == 0)
+           {
+               *nblocks = 1;
+               return false;
+           }
+
+           /*
+            * Otherwise we already have an I/O to perform, but this block
+            * can't be included as it is already valid.  Split the I/O here.
+            * There may or may not be more blocks requiring I/O after this
+            * one, we haven't checked, but they can't be contiguous with this
+            * one in the way.  We'll leave this buffer pinned, forwarding it
+            * to the next call, avoiding the need to unpin it here and re-pin
+            * it in the next call.
+            */
+           actual_nblocks = i;
            break;
        }
        else
        {
-           /* Extend the readable range to cover this block. */
-           io_buffers_len++;
-
            /*
             * Check how many blocks we can cover with the same IO. The smgr
             * implementation might e.g. be limited due to a segment boundary.
    }
    *nblocks = actual_nblocks;
 
-   if (likely(io_buffers_len == 0))
-       return false;
-
    /* Populate information needed for I/O. */
    operation->buffers = buffers;
    operation->blocknum = blockNum;
    operation->flags = flags;
    operation->nblocks = actual_nblocks;
-   operation->io_buffers_len = io_buffers_len;
 
    if (flags & READ_BUFFERS_ISSUE_ADVICE)
    {
        smgrprefetch(operation->smgr,
                     operation->forknum,
                     blockNum,
-                    operation->io_buffers_len);
+                    actual_nblocks);
    }
 
    /* Indicate that WaitReadBuffers() should be called. */
 
 /*
  * Begin reading a range of blocks beginning at blockNum and extending for
- * *nblocks.  On return, up to *nblocks pinned buffers holding those blocks
- * are written into the buffers array, and *nblocks is updated to contain the
- * actual number, which may be fewer than requested.  Caller sets some of the
- * members of operation; see struct definition.
- *
- * If false is returned, no I/O is necessary.  If true is returned, one I/O
- * has been started, and WaitReadBuffers() must be called with the same
- * operation object before the buffers are accessed.  Along with the operation
- * object, the caller-supplied array of buffers must remain valid until
- * WaitReadBuffers() is called.
+ * *nblocks.  *nblocks and the buffers array are in/out parameters.  On entry,
+ * the buffers elements covered by *nblocks must hold either InvalidBuffer or
+ * buffers forwarded by an earlier call to StartReadBuffers() that was split
+ * and is now being continued.  On return, *nblocks holds the number of blocks
+ * accepted by this operation.  If it is less than the original number then
+ * this operation has been split, but buffer elements up to the original
+ * requested size may hold forwarded buffers to be used for a continuing
+ * operation.  The caller must either start a new I/O beginning at the block
+ * immediately following the blocks accepted by this call and pass those
+ * buffers back in, or release them if it chooses not to.  It shouldn't make
+ * any other use of or assumptions about forwarded buffers.
+ *
+ * If false is returned, no I/O is necessary and the buffers covered by
+ * *nblocks on exit are valid and ready to be accessed.  If true is returned,
+ * an I/O has been started, and WaitReadBuffers() must be called with the same
+ * operation object before the buffers covered by *nblocks on exit can be
+ * accessed.  Along with the operation object, the caller-supplied array of
+ * buffers must remain valid until WaitReadBuffers() is called, and any
+ * forwarded buffers must also be preserved for a continuing call unless
+ * they are explicitly released.
  *
  * Currently the I/O is only started with optional operating system advice if
  * requested by the caller with READ_BUFFERS_ISSUE_ADVICE, and the real I/O
                 int *nblocks,
                 int flags)
 {
-   return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags);
+   return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags,
+                               true /* expect forwarded buffers */ );
 }
 
 /*
  * Single block version of the StartReadBuffers().  This might save a few
  * instructions when called from another translation unit, because it is
  * specialized for nblocks == 1.
+ *
+ * This version does not support "forwarded" buffers: they cannot be created
+ * by reading only one block and *buffer is ignored on entry.
  */
 bool
 StartReadBuffer(ReadBuffersOperation *operation,
    int         nblocks = 1;
    bool        result;
 
-   result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags);
+   result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags,
+                                 false /* single block, no forwarding */ );
    Assert(nblocks == 1);       /* single block can't be short */
 
    return result;
    IOObject    io_object;
    char        persistence;
 
-   /*
-    * Currently operations are only allowed to include a read of some range,
-    * with an optional extra buffer that is already pinned at the end.  So
-    * nblocks can be at most one more than io_buffers_len.
-    */
-   Assert((operation->nblocks == operation->io_buffers_len) ||
-          (operation->nblocks == operation->io_buffers_len + 1));
-
    /* Find the range of the physical read we need to perform. */
-   nblocks = operation->io_buffers_len;
-   if (nblocks == 0)
-       return;                 /* nothing to do */
-
+   nblocks = operation->nblocks;
    buffers = &operation->buffers[0];
    blocknum = operation->blocknum;
    forknum = operation->forknum;
    persistence = operation->persistence;
 
+   Assert(nblocks > 0);
+   Assert(nblocks <= MAX_IO_COMBINE_LIMIT);
+
    if (persistence == RELPERSISTENCE_TEMP)
    {
        io_context = IOCONTEXT_NORMAL;