Rework XLogReader callback system
authorAlvaro Herrera <[email protected]>
Fri, 8 May 2020 19:30:34 +0000 (15:30 -0400)
committerAlvaro Herrera <[email protected]>
Fri, 8 May 2020 19:40:11 +0000 (15:40 -0400)
Code review for 0dc8ead46363, prompted by a bug closed by 91c40548d5f7.

XLogReader's system for opening and closing segments had gotten too
complicated, with callbacks being passed at both the XLogReaderAllocate
level (read_page) as well as at the WALRead level (segment_open).  This
was confusing and hard to follow, so restructure things so that these
callbacks are passed together at XLogReaderAllocate time, and add
another callback to the set (segment_close) to make it a coherent whole.
Also, ensure XLogReaderState is an argument to all the callbacks, so
that they can grab at the ->private data if necessary.

Document the whole arrangement more clearly.

Author: Álvaro Herrera <[email protected]>
Reviewed-by: Kyotaro Horiguchi <[email protected]>
Discussion: https://postgr.es/m/20200422175754[email protected]

13 files changed:
src/backend/access/transam/twophase.c
src/backend/access/transam/xlog.c
src/backend/access/transam/xlogreader.c
src/backend/access/transam/xlogutils.c
src/backend/replication/logical/logical.c
src/backend/replication/logical/logicalfuncs.c
src/backend/replication/slotfuncs.c
src/backend/replication/walsender.c
src/bin/pg_rewind/parsexlog.c
src/bin/pg_waldump/pg_waldump.c
src/include/access/xlogreader.h
src/include/access/xlogutils.h
src/include/replication/logical.h

index 2f7d4ed59a87ce4756d9543a2f3288009f320256..e1904877faa5514de427568ae28d2b57544ad174 100644 (file)
@@ -1331,7 +1331,10 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
        char       *errormsg;
 
        xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
-                                                                       &read_local_xlog_page, NULL);
+                                                                       XL_ROUTINE(.page_read = &read_local_xlog_page,
+                                                                                          .segment_open = &wal_segment_open,
+                                                                                          .segment_close = &wal_segment_close),
+                                                                       NULL);
        if (!xlogreader)
                ereport(ERROR,
                                (errcode(ERRCODE_OUT_OF_MEMORY),
index 0d3d670928430e7972e2c37ac3fe0a689b8b1bbe..a53e6d963346d0e471d6203ae20e8bffd0b7c32e 100644 (file)
@@ -1223,7 +1223,7 @@ XLogInsertRecord(XLogRecData *rdata,
 
                if (!debug_reader)
                        debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
-                                                                                         NULL, NULL);
+                                                                                         XL_ROUTINE(), NULL);
 
                if (!debug_reader)
                {
@@ -6478,8 +6478,12 @@ StartupXLOG(void)
 
        /* Set up XLOG reader facility */
        MemSet(&private, 0, sizeof(XLogPageReadPrivate));
-       xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
-                                                                       &XLogPageRead, &private);
+       xlogreader =
+               XLogReaderAllocate(wal_segment_size, NULL,
+                                                  XL_ROUTINE(.page_read = &XLogPageRead,
+                                                                         .segment_open = NULL,
+                                                                         .segment_close = wal_segment_close),
+                                                  &private);
        if (!xlogreader)
                ereport(ERROR,
                                (errcode(ERRCODE_OUT_OF_MEMORY),
index 79ff976474c76858cd6948311522ef7baa04ea78..7cee8b92c905235dff50946407d989588aabe8cb 100644 (file)
@@ -71,7 +71,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
  */
 XLogReaderState *
 XLogReaderAllocate(int wal_segment_size, const char *waldir,
-                                  XLogPageReadCB pagereadfunc, void *private_data)
+                                  XLogReaderRoutine *routine, void *private_data)
 {
        XLogReaderState *state;
 
@@ -81,6 +81,9 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
        if (!state)
                return NULL;
 
+       /* initialize caller-provided support functions */
+       state->routine = *routine;
+
        state->max_block_id = -1;
 
        /*
@@ -102,7 +105,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
        WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
                                           waldir);
 
-       state->read_page = pagereadfunc;
        /* system_identifier initialized to zeroes above */
        state->private_data = private_data;
        /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
@@ -137,7 +139,7 @@ XLogReaderFree(XLogReaderState *state)
        int                     block_id;
 
        if (state->seg.ws_file != -1)
-               close(state->seg.ws_file);
+               state->routine.segment_close(state);
 
        for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
        {
@@ -250,7 +252,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
  * XLogBeginRead() or XLogFindNextRecord() must be called before the first call
  * to XLogReadRecord().
  *
- * If the read_page callback fails to read the requested data, NULL is
+ * If the page_read callback fails to read the requested data, NULL is
  * returned.  The callback is expected to have reported the error; errormsg
  * is set to NULL.
  *
@@ -559,10 +561,10 @@ err:
 
 /*
  * Read a single xlog page including at least [pageptr, reqLen] of valid data
- * via the read_page() callback.
+ * via the page_read() callback.
  *
  * Returns -1 if the required page cannot be read for some reason; errormsg_buf
- * is set in that case (unless the error occurs in the read_page callback).
+ * is set in that case (unless the error occurs in the page_read callback).
  *
  * We fetch the page from a reader-local cache if we know we have the required
  * data and if there hasn't been any error since caching the data.
@@ -589,7 +591,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
         * Data is not in our buffer.
         *
         * Every time we actually read the segment, even if we looked at parts of
-        * it before, we need to do verification as the read_page callback might
+        * it before, we need to do verification as the page_read callback might
         * now be rereading data from a different source.
         *
         * Whenever switching to a new WAL segment, we read the first page of the
@@ -601,9 +603,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
        {
                XLogRecPtr      targetSegmentPtr = pageptr - targetPageOff;
 
-               readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
-                                                                  state->currRecPtr,
-                                                                  state->readBuf);
+               readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
+                                                                                  state->currRecPtr,
+                                                                                  state->readBuf);
                if (readLen < 0)
                        goto err;
 
@@ -619,9 +621,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
         * First, read the requested data length, but at least a short page header
         * so that we can validate it.
         */
-       readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
-                                                          state->currRecPtr,
-                                                          state->readBuf);
+       readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
+                                                                          state->currRecPtr,
+                                                                          state->readBuf);
        if (readLen < 0)
                goto err;
 
@@ -638,9 +640,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
        /* still not enough */
        if (readLen < XLogPageHeaderSize(hdr))
        {
-               readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
-                                                                  state->currRecPtr,
-                                                                  state->readBuf);
+               readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
+                                                                                  state->currRecPtr,
+                                                                                  state->readBuf);
                if (readLen < 0)
                        goto err;
        }
@@ -1041,11 +1043,14 @@ err:
 #endif                                                 /* FRONTEND */
 
 /*
+ * Helper function to ease writing of XLogRoutine->page_read callbacks.
+ * If this function is used, caller must supply an open_segment callback in
+ * 'state', as that is used here.
+ *
  * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
  * fetched from timeline 'tli'.
  *
- * 'seg/segcxt' identify the last segment used.  'openSegment' is a callback
- * to open the next segment, if necessary.
+ * 'seg/segcxt' identify the last segment used.
  *
  * Returns true if succeeded, false if an error occurs, in which case
  * 'errinfo' receives error details.
@@ -1054,9 +1059,10 @@ err:
  * WAL buffers when possible.
  */
 bool
-WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
+WALRead(XLogReaderState *state,
+               char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
                WALOpenSegment *seg, WALSegmentContext *segcxt,
-               WALSegmentOpen openSegment, WALReadError *errinfo)
+               WALReadError *errinfo)
 {
        char       *p;
        XLogRecPtr      recptr;
@@ -1086,10 +1092,11 @@ WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
                        XLogSegNo       nextSegNo;
 
                        if (seg->ws_file >= 0)
-                               close(seg->ws_file);
+                               state->routine.segment_close(state);
 
                        XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
-                       seg->ws_file = openSegment(nextSegNo, segcxt, &tli);
+                       seg->ws_file = state->routine.segment_open(state, nextSegNo,
+                                                                                                          segcxt, &tli);
 
                        /* Update the current segment info. */
                        seg->ws_tli = tli;
index 6cb143e161d144a3c5885038d208c1bcd5d42d3c..bbd801513a89fb1fb97de092ecf25e700938c352 100644 (file)
@@ -783,10 +783,10 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
        }
 }
 
-/* openSegment callback for WALRead */
-static int
-wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt,
-                                TimeLineID *tli_p)
+/* XLogReaderRoutine->segment_open callback for local pg_wal files */
+int
+wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
+                                WALSegmentContext *segcxt, TimeLineID *tli_p)
 {
        TimeLineID      tli = *tli_p;
        char            path[MAXPGPATH];
@@ -811,8 +811,17 @@ wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt,
        return -1;                                      /* keep compiler quiet */
 }
 
+/* stock XLogReaderRoutine->segment_close callback */
+void
+wal_segment_close(XLogReaderState *state)
+{
+       close(state->seg.ws_file);
+       /* need to check errno? */
+       state->seg.ws_file = -1;
+}
+
 /*
- * read_page callback for reading local xlog files
+ * XLogReaderRoutine->page_read callback for reading local xlog files
  *
  * Public because it would likely be very helpful for someone writing another
  * output method outside walsender, e.g. in a bgworker.
@@ -937,8 +946,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
         * as 'count', read the whole page anyway. It's guaranteed to be
         * zero-padded up to the page boundary if it's incomplete.
         */
-       if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg,
-                                &state->segcxt, wal_segment_open, &errinfo))
+       if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
+                                &state->seg, &state->segcxt,
+                                &errinfo))
                WALReadRaiseError(&errinfo);
 
        /* number of valid bytes in the buffer */
index 5adf253583b1cfc7dc4e0586afd834a1f4bf68bd..dc69e5ce5f32cdedb8e18c720c579f20e929997f 100644 (file)
@@ -120,7 +120,7 @@ StartupDecodingContext(List *output_plugin_options,
                                           TransactionId xmin_horizon,
                                           bool need_full_snapshot,
                                           bool fast_forward,
-                                          XLogPageReadCB read_page,
+                                          XLogReaderRoutine *xl_routine,
                                           LogicalOutputPluginWriterPrepareWrite prepare_write,
                                           LogicalOutputPluginWriterWrite do_write,
                                           LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -169,7 +169,7 @@ StartupDecodingContext(List *output_plugin_options,
 
        ctx->slot = slot;
 
-       ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx);
+       ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
        if (!ctx->reader)
                ereport(ERROR,
                                (errcode(ERRCODE_OUT_OF_MEMORY),
@@ -215,7 +215,8 @@ StartupDecodingContext(List *output_plugin_options,
  *             Otherwise, we set for decoding to start from the given LSN without
  *             marking WAL reserved beforehand.  In that scenario, it's up to the
  *             caller to guarantee that WAL remains available.
- * read_page, prepare_write, do_write, update_progress --
+ * xl_routine -- XLogReaderRoutine for underlying XLogReader
+ * prepare_write, do_write, update_progress --
  *             callbacks that perform the use-case dependent, actual, work.
  *
  * Needs to be called while in a memory context that's at least as long lived
@@ -230,7 +231,7 @@ CreateInitDecodingContext(char *plugin,
                                                  List *output_plugin_options,
                                                  bool need_full_snapshot,
                                                  XLogRecPtr restart_lsn,
-                                                 XLogPageReadCB read_page,
+                                                 XLogReaderRoutine *xl_routine,
                                                  LogicalOutputPluginWriterPrepareWrite prepare_write,
                                                  LogicalOutputPluginWriterWrite do_write,
                                                  LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -327,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
 
        ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
                                                                 need_full_snapshot, false,
-                                                                read_page, prepare_write, do_write,
+                                                                xl_routine, prepare_write, do_write,
                                                                 update_progress);
 
        /* call output plugin initialization callback */
@@ -357,7 +358,10 @@ CreateInitDecodingContext(char *plugin,
  * fast_forward
  *             bypass the generation of logical changes.
  *
- * read_page, prepare_write, do_write, update_progress
+ * xl_routine
+ *             XLogReaderRoutine used by underlying xlogreader
+ *
+ * prepare_write, do_write, update_progress
  *             callbacks that have to be filled to perform the use-case dependent,
  *             actual work.
  *
@@ -372,7 +376,7 @@ LogicalDecodingContext *
 CreateDecodingContext(XLogRecPtr start_lsn,
                                          List *output_plugin_options,
                                          bool fast_forward,
-                                         XLogPageReadCB read_page,
+                                         XLogReaderRoutine *xl_routine,
                                          LogicalOutputPluginWriterPrepareWrite prepare_write,
                                          LogicalOutputPluginWriterWrite do_write,
                                          LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -425,7 +429,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
        ctx = StartupDecodingContext(output_plugin_options,
                                                                 start_lsn, InvalidTransactionId, false,
-                                                                fast_forward, read_page, prepare_write,
+                                                                fast_forward, xl_routine, prepare_write,
                                                                 do_write, update_progress);
 
        /* call output plugin initialization callback */
index fded8e82908b9aa19e5d9243007d03da9fb54711..b99c94e84891a4f9d7cd88c6af667743a64fb25b 100644 (file)
@@ -233,7 +233,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
                ctx = CreateDecodingContext(InvalidXLogRecPtr,
                                                                        options,
                                                                        false,
-                                                                       read_local_xlog_page,
+                                                                       XL_ROUTINE(.page_read = read_local_xlog_page,
+                                                                                          .segment_open = wal_segment_open,
+                                                                                          .segment_close = wal_segment_close),
                                                                        LogicalOutputPrepareWrite,
                                                                        LogicalOutputWrite, NULL);
 
index ae751e94e765f2333ddf9a40675f13c9661d12c5..26890dffb456de506bd011c7026e537afc7a0851 100644 (file)
@@ -152,8 +152,10 @@ create_logical_replication_slot(char *name, char *plugin,
        ctx = CreateInitDecodingContext(plugin, NIL,
                                                                        false,  /* just catalogs is OK */
                                                                        restart_lsn,
-                                                                       read_local_xlog_page, NULL, NULL,
-                                                                       NULL);
+                                                                       XL_ROUTINE(.page_read = read_local_xlog_page,
+                                                                                          .segment_open = wal_segment_open,
+                                                                                          .segment_close = wal_segment_close),
+                                                                       NULL, NULL, NULL);
 
        /*
         * If caller needs us to determine the decoding start point, do so now.
@@ -464,7 +466,9 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
                ctx = CreateDecodingContext(InvalidXLogRecPtr,
                                                                        NIL,
                                                                        true,   /* fast_forward */
-                                                                       read_local_xlog_page,
+                                                                       XL_ROUTINE(.page_read = read_local_xlog_page,
+                                                                                          .segment_open = wal_segment_open,
+                                                                                          .segment_close = wal_segment_close),
                                                                        NULL, NULL, NULL);
 
                /*
index 8b55bbfcb2ec31b2859bf03f87d5bbc9b19c0412..d18475b8540605eca44a22b52e29658726c9ade2 100644 (file)
@@ -54,8 +54,8 @@
 #include "access/transam.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
+#include "access/xlogreader.h"
 #include "access/xlogutils.h"
-
 #include "catalog/pg_authid.h"
 #include "catalog/pg_type.h"
 #include "commands/dbcommands.h"
@@ -248,8 +248,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
-static int     WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
-                                                         TimeLineID *tli_p);
+static int     WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
+                                                         WALSegmentContext *segcxt, TimeLineID *tli_p);
 static void UpdateSpillStats(LogicalDecodingContext *ctx);
 
 
@@ -798,7 +798,8 @@ StartReplication(StartReplicationCmd *cmd)
 }
 
 /*
- * read_page callback for logical decoding contexts, as a walsender process.
+ * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
+ * walsender process.
  *
  * Inside the walsender we can do better than read_local_xlog_page,
  * which has to do a plain sleep/busy loop, because the walsender's latch gets
@@ -832,7 +833,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
                count = flushptr - targetPagePtr;       /* part of the page available */
 
        /* now actually read the data, we know it's there */
-       if (!WALRead(cur_page,
+       if (!WALRead(state,
+                                cur_page,
                                 targetPagePtr,
                                 XLOG_BLCKSZ,
                                 sendSeg->ws_tli,       /* Pass the current TLI because only
@@ -840,7 +842,6 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
                                                                         * TLI is needed. */
                                 sendSeg,
                                 sendCxt,
-                                WalSndSegmentOpen,
                                 &errinfo))
                WALReadRaiseError(&errinfo);
 
@@ -1005,7 +1006,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
                ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
                                                                                InvalidXLogRecPtr,
-                                                                               logical_read_xlog_page,
+                                                                               XL_ROUTINE(.page_read = logical_read_xlog_page,
+                                                                                                  .segment_open = WalSndSegmentOpen,
+                                                                                                  .segment_close = wal_segment_close),
                                                                                WalSndPrepareWrite, WalSndWriteData,
                                                                                WalSndUpdateProgress);
 
@@ -1168,7 +1171,9 @@ StartLogicalReplication(StartReplicationCmd *cmd)
         */
        logical_decoding_ctx =
                CreateDecodingContext(cmd->startpoint, cmd->options, false,
-                                                         logical_read_xlog_page,
+                                                         XL_ROUTINE(.page_read = logical_read_xlog_page,
+                                                                                .segment_open = WalSndSegmentOpen,
+                                                                                .segment_close = wal_segment_close),
                                                          WalSndPrepareWrite, WalSndWriteData,
                                                          WalSndUpdateProgress);
 
@@ -2441,9 +2446,10 @@ WalSndKill(int code, Datum arg)
        SpinLockRelease(&walsnd->mutex);
 }
 
-/* walsender's openSegment callback for WALRead */
+/* XLogReaderRoutine->segment_open callback */
 static int
-WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+WalSndSegmentOpen(XLogReaderState *state,
+                                 XLogSegNo nextSegNo, WALSegmentContext *segcxt,
                                  TimeLineID *tli_p)
 {
        char            path[MAXPGPATH];
@@ -2531,6 +2537,12 @@ XLogSendPhysical(void)
        Size            nbytes;
        XLogSegNo       segno;
        WALReadError errinfo;
+       static XLogReaderState fake_xlogreader =
+       {
+               /* Fake xlogreader state for WALRead */
+               .routine.segment_open = WalSndSegmentOpen,
+               .routine.segment_close = wal_segment_close
+       };
 
        /* If requested switch the WAL sender to the stopping state. */
        if (got_STOPPING)
@@ -2748,7 +2760,8 @@ XLogSendPhysical(void)
        enlargeStringInfo(&output_message, nbytes);
 
 retry:
-       if (!WALRead(&output_message.data[output_message.len],
+       if (!WALRead(&fake_xlogreader,
+                                &output_message.data[output_message.len],
                                 startptr,
                                 nbytes,
                                 sendSeg->ws_tli,       /* Pass the current TLI because only
@@ -2756,7 +2769,6 @@ retry:
                                                                         * TLI is needed. */
                                 sendSeg,
                                 sendCxt,
-                                WalSndSegmentOpen,
                                 &errinfo))
                WALReadRaiseError(&errinfo);
 
index c51b5db315aab812f00a7afb7f8de4a7632e94da..d637f5eb7715ad000ff954c4ae8bd4c9809611be 100644 (file)
@@ -66,7 +66,8 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
 
        private.tliIndex = tliIndex;
        private.restoreCommand = restoreCommand;
-       xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
+       xlogreader = XLogReaderAllocate(WalSegSz, datadir,
+                                                                       XL_ROUTINE(.page_read = &SimpleXLogPageRead),
                                                                        &private);
        if (xlogreader == NULL)
                pg_fatal("out of memory");
@@ -117,7 +118,8 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex,
 
        private.tliIndex = tliIndex;
        private.restoreCommand = restoreCommand;
-       xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
+       xlogreader = XLogReaderAllocate(WalSegSz, datadir,
+                                                                       XL_ROUTINE(.page_read = &SimpleXLogPageRead),
                                                                        &private);
        if (xlogreader == NULL)
                pg_fatal("out of memory");
@@ -176,7 +178,8 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 
        private.tliIndex = tliIndex;
        private.restoreCommand = restoreCommand;
-       xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
+       xlogreader = XLogReaderAllocate(WalSegSz, datadir,
+                                                                       XL_ROUTINE(.page_read = &SimpleXLogPageRead),
                                                                        &private);
        if (xlogreader == NULL)
                pg_fatal("out of memory");
index d7bd9ccac24d6d1c9ac3a853b5e3b9e13a970e17..e29f65500fb630afde0b9eea1a888e866ac95243 100644 (file)
@@ -279,9 +279,10 @@ identify_target_directory(char *directory, char *fname)
        return NULL;                            /* not reached */
 }
 
-/* pg_waldump's openSegment callback for WALRead */
+/* pg_waldump's XLogReaderRoutine->segment_open callback */
 static int
-WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+WALDumpOpenSegment(XLogReaderState *state,
+                                  XLogSegNo nextSegNo, WALSegmentContext *segcxt,
                                   TimeLineID *tli_p)
 {
        TimeLineID      tli = *tli_p;
@@ -321,8 +322,18 @@ WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
 }
 
 /*
- * XLogReader read_page callback
+ * pg_waldump's XLogReaderRoutine->segment_close callback.  Same as
+ * wal_segment_close
  */
+static void
+WALDumpCloseSegment(XLogReaderState *state)
+{
+       close(state->seg.ws_file);
+       /* need to check errno? */
+       state->seg.ws_file = -1;
+}
+
+/* pg_waldump's XLogReaderRoutine->page_read callback */
 static int
 WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
                                XLogRecPtr targetPtr, char *readBuff)
@@ -344,8 +355,9 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
                }
        }
 
-       if (!WALRead(readBuff, targetPagePtr, count, private->timeline,
-                                &state->seg, &state->segcxt, WALDumpOpenSegment, &errinfo))
+       if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline,
+                                &state->seg, &state->segcxt,
+                                &errinfo))
        {
                WALOpenSegment *seg = &errinfo.wre_seg;
                char            fname[MAXPGPATH];
@@ -1031,8 +1043,12 @@ main(int argc, char **argv)
        /* done with argument parsing, do the actual work */
 
        /* we have everything we need, start reading */
-       xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, WALDumpReadPage,
-                                                                                 &private);
+       xlogreader_state =
+               XLogReaderAllocate(WalSegSz, waldir,
+                                                  XL_ROUTINE(.page_read = WALDumpReadPage,
+                                                                         .segment_open = WALDumpOpenSegment,
+                                                                         .segment_close = WALDumpCloseSegment),
+                                                  &private);
        if (!xlogreader_state)
                fatal_error("out of memory");
 
index 4582196e1851ad7f5454c901c46e1c434300e277..81af200f5e644192479aadf989a384ff894dc2bb 100644 (file)
  *             XLogBeginRead() or XLogFindNextRecord(), and call XLogReadRecord()
  *             until it returns NULL.
  *
+ *             Callers supply a page_read callback if they want to to call
+ *             XLogReadRecord or XLogFindNextRecord; it can be passed in as NULL
+ *             otherwise.  The WALRead function can be used as a helper to write
+ *             page_read callbacks, but it is not mandatory; callers that use it,
+ *             must supply open_segment callbacks.  The close_segment callback
+ *             must always be supplied.
+ *
  *             After reading a record with XLogReadRecord(), it's decomposed into
  *             the per-block and main data parts, and the parts can be accessed
  *             with the XLogRec* macros and functions. You can also decode a
@@ -50,12 +57,69 @@ typedef struct WALSegmentContext
 
 typedef struct XLogReaderState XLogReaderState;
 
-/* Function type definition for the read_page callback */
+/* Function type definitions for various xlogreader interactions */
 typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
                                                           XLogRecPtr targetPagePtr,
                                                           int reqLen,
                                                           XLogRecPtr targetRecPtr,
                                                           char *readBuf);
+typedef int (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
+                                                                XLogSegNo nextSegNo,
+                                                                WALSegmentContext *segcxt,
+                                                                TimeLineID *tli_p);
+typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader);
+
+typedef struct XLogReaderRoutine
+{
+       /*
+        * Data input callback
+        *
+        * This callback shall read at least reqLen valid bytes of the xlog page
+        * starting at targetPagePtr, and store them in readBuf.  The callback
+        * shall return the number of bytes read (never more than XLOG_BLCKSZ), or
+        * -1 on failure.  The callback shall sleep, if necessary, to wait for the
+        * requested bytes to become available.  The callback will not be invoked
+        * again for the same page unless more than the returned number of bytes
+        * are needed.
+        *
+        * targetRecPtr is the position of the WAL record we're reading.  Usually
+        * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
+        * to read and verify the page or segment header, before it reads the
+        * actual WAL record it's interested in.  In that case, targetRecPtr can
+        * be used to determine which timeline to read the page from.
+        *
+        * The callback shall set ->seg.ws_tli to the TLI of the file the page was
+        * read from.
+        */
+       XLogPageReadCB page_read;
+
+       /*
+        * Callback to open the specified WAL segment for reading.  The file
+        * descriptor of the opened segment shall be returned.  In case of
+        * failure, an error shall be raised by the callback and it shall not
+        * return.
+        *
+        * "nextSegNo" is the number of the segment to be opened.
+        *
+        * "segcxt" is additional information about the segment.
+        *
+        * "tli_p" is an input/output argument. XLogRead() uses it to pass the
+        * timeline in which the new segment should be found, but the callback can
+        * use it to return the TLI that it actually opened.
+        *
+        * BasicOpenFile() is the preferred way to open the segment file in
+        * backend code, whereas open(2) should be used in frontend.
+        */
+       WALSegmentOpenCB segment_open;
+
+       /*
+        * WAL segment close callback.  ->seg.ws_file shall be set to a negative
+        * number.
+        */
+       WALSegmentCloseCB segment_close;
+} XLogReaderRoutine;
+
+#define XL_ROUTINE(...) &(XLogReaderRoutine){__VA_ARGS__}
 
 typedef struct
 {
@@ -88,33 +152,16 @@ typedef struct
 
 struct XLogReaderState
 {
+       /*
+        * Operational callbacks
+        */
+       XLogReaderRoutine routine;
+
        /* ----------------------------------------
         * Public parameters
         * ----------------------------------------
         */
 
-       /*
-        * Data input callback (mandatory).
-        *
-        * This callback shall read at least reqLen valid bytes of the xlog page
-        * starting at targetPagePtr, and store them in readBuf.  The callback
-        * shall return the number of bytes read (never more than XLOG_BLCKSZ), or
-        * -1 on failure.  The callback shall sleep, if necessary, to wait for the
-        * requested bytes to become available.  The callback will not be invoked
-        * again for the same page unless more than the returned number of bytes
-        * are needed.
-        *
-        * targetRecPtr is the position of the WAL record we're reading.  Usually
-        * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
-        * to read and verify the page or segment header, before it reads the
-        * actual WAL record it's interested in.  In that case, targetRecPtr can
-        * be used to determine which timeline to read the page from.
-        *
-        * The callback shall set ->seg.ws_tli to the TLI of the file the page was
-        * read from.
-        */
-       XLogPageReadCB read_page;
-
        /*
         * System identifier of the xlog files we're about to read.  Set to zero
         * (the default value) if unknown or unimportant.
@@ -214,30 +261,13 @@ struct XLogReaderState
 /* Get a new XLogReader */
 extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
                                                                                   const char *waldir,
-                                                                                  XLogPageReadCB pagereadfunc,
+                                                                                  XLogReaderRoutine *routine,
                                                                                   void *private_data);
+extern XLogReaderRoutine *LocalXLogReaderRoutine(void);
 
 /* Free an XLogReader */
 extern void XLogReaderFree(XLogReaderState *state);
 
-/*
- * Callback to open the specified WAL segment for reading.  Returns a valid
- * file descriptor when the file was opened successfully.
- *
- * "nextSegNo" is the number of the segment to be opened.
- *
- * "segcxt" is additional information about the segment.
- *
- * "tli_p" is an input/output argument. XLogRead() uses it to pass the
- * timeline in which the new segment should be found, but the callback can use
- * it to return the TLI that it actually opened.
- *
- * BasicOpenFile() is the preferred way to open the segment file in backend
- * code, whereas open(2) should be used in frontend.
- */
-typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt,
-                                                          TimeLineID *tli_p);
-
 /* Initialize supporting structures */
 extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
                                                           int segsize, const char *waldir);
@@ -269,9 +299,10 @@ typedef struct WALReadError
        WALOpenSegment wre_seg;         /* Segment we tried to read from. */
 } WALReadError;
 
-extern bool WALRead(char *buf, XLogRecPtr startptr, Size count,
+extern bool WALRead(XLogReaderState *state,
+                                       char *buf, XLogRecPtr startptr, Size count,
                                        TimeLineID tli, WALOpenSegment *seg,
-                                       WALSegmentContext *segcxt, WALSegmentOpen openSegment,
+                                       WALSegmentContext *segcxt,
                                        WALReadError *errinfo);
 
 /* Functions for decoding an XLogRecord */
index 5181a077d964d6201dece864b0aeb7675e922166..68ce815476ca618f642ccc945a6f4172f8ba9eeb 100644 (file)
@@ -50,6 +50,11 @@ extern void FreeFakeRelcacheEntry(Relation fakerel);
 extern int     read_local_xlog_page(XLogReaderState *state,
                                                                 XLogRecPtr targetPagePtr, int reqLen,
                                                                 XLogRecPtr targetRecPtr, char *cur_page);
+extern int     wal_segment_open(XLogReaderState *state,
+                                                        XLogSegNo nextSegNo,
+                                                        WALSegmentContext *segcxt,
+                                                        TimeLineID *tli_p);
+extern void wal_segment_close(XLogReaderState *state);
 
 extern void XLogReadDetermineTimeline(XLogReaderState *state,
                                                                          XLogRecPtr wantPage, uint32 wantLength);
index 3b7ca7f1da47e814702b7aca2ad653b7190758c1..c2f2475e5d3e5f193f46e9a5ceaf3a017deaca91 100644 (file)
@@ -95,14 +95,14 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
                                                                                                                 List *output_plugin_options,
                                                                                                                 bool need_full_snapshot,
                                                                                                                 XLogRecPtr restart_lsn,
-                                                                                                                XLogPageReadCB read_page,
+                                                                                                                XLogReaderRoutine *xl_routine,
                                                                                                                 LogicalOutputPluginWriterPrepareWrite prepare_write,
                                                                                                                 LogicalOutputPluginWriterWrite do_write,
                                                                                                                 LogicalOutputPluginWriterUpdateProgress update_progress);
 extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
                                                                                                         List *output_plugin_options,
                                                                                                         bool fast_forward,
-                                                                                                        XLogPageReadCB read_page,
+                                                                                                        XLogReaderRoutine *xl_routine,
                                                                                                         LogicalOutputPluginWriterPrepareWrite prepare_write,
                                                                                                         LogicalOutputPluginWriterWrite do_write,
                                                                                                         LogicalOutputPluginWriterUpdateProgress update_progress);