#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/procsignal.h"
+#include "storage/read_stream.h"
#include "storage/smgr.h"
#include "tcop/tcopprot.h"
#include "utils/guc.h"
int prewarmed_blocks;
} AutoPrewarmSharedState;
+/*
+ * Private data passed through the read stream API for our use in the
+ * callback.
+ */
+typedef struct AutoPrewarmReadStreamData
+{
+ /* The array of records containing the blocks we should prewarm. */
+ BlockInfoRecord *block_info;
+
+ /*
+ * pos is the read stream callback's index into block_info. Because the
+ * read stream may read ahead, pos is likely to be ahead of the index in
+ * the main loop in autoprewarm_database_main().
+ */
+ int pos;
+ Oid tablespace;
+ RelFileNumber filenumber;
+ ForkNumber forknum;
+ BlockNumber nblocks;
+} AutoPrewarmReadStreamData;
+
+
PGDLLEXPORT void autoprewarm_main(Datum main_arg);
PGDLLEXPORT void autoprewarm_database_main(Datum main_arg);
apw_state->prewarmed_blocks, num_elements)));
}
+/*
+ * Return the next block number of a specific relation and fork to read
+ * according to the array of BlockInfoRecord.
+ */
+static BlockNumber
+apw_read_stream_next_block(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ AutoPrewarmReadStreamData *p = callback_private_data;
+
+ CHECK_FOR_INTERRUPTS();
+
+ while (p->pos < apw_state->prewarm_stop_idx)
+ {
+ BlockInfoRecord blk = p->block_info[p->pos];
+
+ if (!have_free_buffer())
+ {
+ p->pos = apw_state->prewarm_stop_idx;
+ return InvalidBlockNumber;
+ }
+
+ if (blk.tablespace != p->tablespace)
+ return InvalidBlockNumber;
+
+ if (blk.filenumber != p->filenumber)
+ return InvalidBlockNumber;
+
+ if (blk.forknum != p->forknum)
+ return InvalidBlockNumber;
+
+ p->pos++;
+
+ /*
+ * Check whether blocknum is valid and within fork file size.
+ * Fast-forward through any invalid blocks. We want p->pos to reflect
+ * the location of the next relation or fork before ending the stream.
+ */
+ if (blk.blocknum >= p->nblocks)
+ continue;
+
+ return blk.blocknum;
+ }
+
+ return InvalidBlockNumber;
+}
+
/*
* Prewarm all blocks for one database (and possibly also global objects, if
* those got grouped with this database).
Oid reloid;
Relation rel;
- CHECK_FOR_INTERRUPTS();
-
/*
* All blocks between prewarm_start_idx and prewarm_stop_idx should
* belong either to global objects or the same database.
{
ForkNumber forknum = blk.forknum;
BlockNumber nblocks;
+ struct AutoPrewarmReadStreamData p;
+ ReadStream *stream;
Buffer buf;
/*
nblocks = RelationGetNumberOfBlocksInFork(rel, blk.forknum);
- /* Prewarm buffers. */
- while (i < apw_state->prewarm_stop_idx &&
- blk.tablespace == tablespace &&
- blk.filenumber == filenumber &&
- blk.forknum == forknum &&
- have_free_buffer())
+ p = (struct AutoPrewarmReadStreamData)
{
- CHECK_FOR_INTERRUPTS();
-
- /* Check whether blocknum is valid and within fork file size. */
- if (blk.blocknum >= nblocks)
- {
- blk = block_info[++i];
- continue;
- }
-
- buf = ReadBufferExtended(rel, blk.forknum, blk.blocknum, RBM_NORMAL,
- NULL);
-
- blk = block_info[++i];
- if (!BufferIsValid(buf))
- break;
+ .block_info = block_info,
+ .pos = i,
+ .tablespace = tablespace,
+ .filenumber = filenumber,
+ .forknum = forknum,
+ .nblocks = nblocks,
+ };
+
+ stream = read_stream_begin_relation(READ_STREAM_DEFAULT |
+ READ_STREAM_USE_BATCHING,
+ NULL,
+ rel,
+ p.forknum,
+ apw_read_stream_next_block,
+ &p,
+ 0);
+ /*
+ * Loop until we've prewarmed all the blocks from this fork. The
+ * read stream callback will check that we still have free buffers
+ * before requesting each block from the read stream API.
+ */
+ while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
+ {
apw_state->prewarmed_blocks++;
ReleaseBuffer(buf);
}
+
+ read_stream_end(stream);
+
+ /* Advance i past all the blocks just prewarmed. */
+ i = p.pos;
+ blk = block_info[i];
}
relation_close(rel, AccessShareLock);