Use streaming read I/O in autoprewarm
authorMelanie Plageman <[email protected]>
Fri, 4 Apr 2025 19:25:45 +0000 (15:25 -0400)
committerMelanie Plageman <[email protected]>
Fri, 4 Apr 2025 19:28:54 +0000 (15:28 -0400)
Make a read stream for each valid fork of each valid relation
represented in the autoprewarm dump file and prewarm those blocks
through the read stream API instead of by directly invoking
ReadBuffer().

Co-authored-by: Nazir Bilal Yavuz <[email protected]>
Co-authored-by: Melanie Plageman <[email protected]>
Reviewed-by: Heikki Linnakangas <[email protected]>
Reviewed-by: Daniel Gustafsson <[email protected]>
Reviewed-by: Andrey M. Borodin <[email protected]> (earlier versions)
Reviewed-by: Kirill Reshke <[email protected]> (earlier versions)
Reviewed-by: Matheus Alcantara <[email protected]> (earlier versions)
Discussion: https://postgr.es/m/flat/CAN55FZ3n8Gd%2BhajbL%3D5UkGzu_aHGRqnn%2BxktXq2fuds%3D1AOR6Q%40mail.gmail.com

contrib/pg_prewarm/autoprewarm.c
src/tools/pgindent/typedefs.list

index 761f6a77926bd7b743ed0bdfeb4117ae94f0f9b8..bde0523f082cdfb4b8e270de22050706680bb28d 100644 (file)
@@ -41,6 +41,7 @@
 #include "storage/latch.h"
 #include "storage/lwlock.h"
 #include "storage/procsignal.h"
+#include "storage/read_stream.h"
 #include "storage/smgr.h"
 #include "tcop/tcopprot.h"
 #include "utils/guc.h"
@@ -75,6 +76,28 @@ typedef struct AutoPrewarmSharedState
    int         prewarmed_blocks;
 } AutoPrewarmSharedState;
 
+/*
+ * Private data passed through the read stream API for our use in the
+ * callback.
+ */
+typedef struct AutoPrewarmReadStreamData
+{
+   /* The array of records containing the blocks we should prewarm. */
+   BlockInfoRecord *block_info;
+
+   /*
+    * pos is the read stream callback's index into block_info. Because the
+    * read stream may read ahead, pos is likely to be ahead of the index in
+    * the main loop in autoprewarm_database_main().
+    */
+   int         pos;
+   Oid         tablespace;
+   RelFileNumber filenumber;
+   ForkNumber  forknum;
+   BlockNumber nblocks;
+} AutoPrewarmReadStreamData;
+
+
 PGDLLEXPORT void autoprewarm_main(Datum main_arg);
 PGDLLEXPORT void autoprewarm_database_main(Datum main_arg);
 
@@ -422,6 +445,54 @@ apw_load_buffers(void)
                        apw_state->prewarmed_blocks, num_elements)));
 }
 
+/*
+ * Return the next block number of a specific relation and fork to read
+ * according to the array of BlockInfoRecord.
+ */
+static BlockNumber
+apw_read_stream_next_block(ReadStream *stream,
+                          void *callback_private_data,
+                          void *per_buffer_data)
+{
+   AutoPrewarmReadStreamData *p = callback_private_data;
+
+   CHECK_FOR_INTERRUPTS();
+
+   while (p->pos < apw_state->prewarm_stop_idx)
+   {
+       BlockInfoRecord blk = p->block_info[p->pos];
+
+       if (!have_free_buffer())
+       {
+           p->pos = apw_state->prewarm_stop_idx;
+           return InvalidBlockNumber;
+       }
+
+       if (blk.tablespace != p->tablespace)
+           return InvalidBlockNumber;
+
+       if (blk.filenumber != p->filenumber)
+           return InvalidBlockNumber;
+
+       if (blk.forknum != p->forknum)
+           return InvalidBlockNumber;
+
+       p->pos++;
+
+       /*
+        * Check whether blocknum is valid and within fork file size.
+        * Fast-forward through any invalid blocks. We want p->pos to reflect
+        * the location of the next relation or fork before ending the stream.
+        */
+       if (blk.blocknum >= p->nblocks)
+           continue;
+
+       return blk.blocknum;
+   }
+
+   return InvalidBlockNumber;
+}
+
 /*
  * Prewarm all blocks for one database (and possibly also global objects, if
  * those got grouped with this database).
@@ -462,8 +533,6 @@ autoprewarm_database_main(Datum main_arg)
        Oid         reloid;
        Relation    rel;
 
-       CHECK_FOR_INTERRUPTS();
-
        /*
         * All blocks between prewarm_start_idx and prewarm_stop_idx should
         * belong either to global objects or the same database.
@@ -510,6 +579,8 @@ autoprewarm_database_main(Datum main_arg)
        {
            ForkNumber  forknum = blk.forknum;
            BlockNumber nblocks;
+           struct AutoPrewarmReadStreamData p;
+           ReadStream *stream;
            Buffer      buf;
 
            /*
@@ -540,32 +611,41 @@ autoprewarm_database_main(Datum main_arg)
 
            nblocks = RelationGetNumberOfBlocksInFork(rel, blk.forknum);
 
-           /* Prewarm buffers. */
-           while (i < apw_state->prewarm_stop_idx &&
-                  blk.tablespace == tablespace &&
-                  blk.filenumber == filenumber &&
-                  blk.forknum == forknum &&
-                  have_free_buffer())
+           p = (struct AutoPrewarmReadStreamData)
            {
-               CHECK_FOR_INTERRUPTS();
-
-               /* Check whether blocknum is valid and within fork file size. */
-               if (blk.blocknum >= nblocks)
-               {
-                   blk = block_info[++i];
-                   continue;
-               }
-
-               buf = ReadBufferExtended(rel, blk.forknum, blk.blocknum, RBM_NORMAL,
-                                        NULL);
-
-               blk = block_info[++i];
-               if (!BufferIsValid(buf))
-                   break;
+               .block_info = block_info,
+                   .pos = i,
+                   .tablespace = tablespace,
+                   .filenumber = filenumber,
+                   .forknum = forknum,
+                   .nblocks = nblocks,
+           };
+
+           stream = read_stream_begin_relation(READ_STREAM_DEFAULT |
+                                               READ_STREAM_USE_BATCHING,
+                                               NULL,
+                                               rel,
+                                               p.forknum,
+                                               apw_read_stream_next_block,
+                                               &p,
+                                               0);
 
+           /*
+            * Loop until we've prewarmed all the blocks from this fork. The
+            * read stream callback will check that we still have free buffers
+            * before requesting each block from the read stream API.
+            */
+           while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
+           {
                apw_state->prewarmed_blocks++;
                ReleaseBuffer(buf);
            }
+
+           read_stream_end(stream);
+
+           /* Advance i past all the blocks just prewarmed. */
+           i = p.pos;
+           blk = block_info[i];
        }
 
        relation_close(rel, AccessShareLock);
index 229fbff47aede69d951b17b95bd7079366cd9157..b69b3b1520cb33858ecb020804ba94db2e2b86c1 100644 (file)
@@ -175,6 +175,7 @@ AttributeOpts
 AuthRequest
 AuthToken
 AutoPrewarmSharedState
+AutoPrewarmReadStreamData
 AutoVacOpts
 AutoVacuumShmemStruct
 AutoVacuumWorkItem