From b060dbe0001a1d6bf26cd294710f3cb203868d46 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Fri, 8 May 2020 15:30:34 -0400 Subject: [PATCH] Rework XLogReader callback system MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Code review for 0dc8ead46363, prompted by a bug closed by 91c40548d5f7. XLogReader's system for opening and closing segments had gotten too complicated, with callbacks being passed at both the XLogReaderAllocate level (read_page) as well as at the WALRead level (segment_open). This was confusing and hard to follow, so restructure things so that these callbacks are passed together at XLogReaderAllocate time, and add another callback to the set (segment_close) to make it a coherent whole. Also, ensure XLogReaderState is an argument to all the callbacks, so that they can grab at the ->private data if necessary. Document the whole arrangement more clearly. Author: Álvaro Herrera Reviewed-by: Kyotaro Horiguchi Discussion: https://postgr.es/m/20200422175754.GA19858@alvherre.pgsql --- src/backend/access/transam/twophase.c | 5 +- src/backend/access/transam/xlog.c | 10 +- src/backend/access/transam/xlogreader.c | 51 ++++---- src/backend/access/transam/xlogutils.c | 24 ++-- src/backend/replication/logical/logical.c | 20 +-- .../replication/logical/logicalfuncs.c | 4 +- src/backend/replication/slotfuncs.c | 10 +- src/backend/replication/walsender.c | 36 ++++-- src/bin/pg_rewind/parsexlog.c | 9 +- src/bin/pg_waldump/pg_waldump.c | 30 +++-- src/include/access/xlogreader.h | 119 +++++++++++------- src/include/access/xlogutils.h | 5 + src/include/replication/logical.h | 4 +- 13 files changed, 214 insertions(+), 113 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 2f7d4ed59a..e1904877fa 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1331,7 +1331,10 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) char *errormsg; xlogreader = XLogReaderAllocate(wal_segment_size, NULL, - &read_local_xlog_page, NULL); + XL_ROUTINE(.page_read = &read_local_xlog_page, + .segment_open = &wal_segment_open, + .segment_close = &wal_segment_close), + NULL); if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 0d3d670928..a53e6d9633 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1223,7 +1223,7 @@ XLogInsertRecord(XLogRecData *rdata, if (!debug_reader) debug_reader = XLogReaderAllocate(wal_segment_size, NULL, - NULL, NULL); + XL_ROUTINE(), NULL); if (!debug_reader) { @@ -6478,8 +6478,12 @@ StartupXLOG(void) /* Set up XLOG reader facility */ MemSet(&private, 0, sizeof(XLogPageReadPrivate)); - xlogreader = XLogReaderAllocate(wal_segment_size, NULL, - &XLogPageRead, &private); + xlogreader = + XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &XLogPageRead, + .segment_open = NULL, + .segment_close = wal_segment_close), + &private); if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 79ff976474..7cee8b92c9 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -71,7 +71,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) */ XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, - XLogPageReadCB pagereadfunc, void *private_data) + XLogReaderRoutine *routine, void *private_data) { XLogReaderState *state; @@ -81,6 +81,9 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, if (!state) return NULL; + /* initialize caller-provided support functions */ + state->routine = *routine; + state->max_block_id = -1; /* @@ -102,7 +105,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size, waldir); - state->read_page = pagereadfunc; /* system_identifier initialized to zeroes above */ state->private_data = private_data; /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */ @@ -137,7 +139,7 @@ XLogReaderFree(XLogReaderState *state) int block_id; if (state->seg.ws_file != -1) - close(state->seg.ws_file); + state->routine.segment_close(state); for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++) { @@ -250,7 +252,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) * XLogBeginRead() or XLogFindNextRecord() must be called before the first call * to XLogReadRecord(). * - * If the read_page callback fails to read the requested data, NULL is + * If the page_read callback fails to read the requested data, NULL is * returned. The callback is expected to have reported the error; errormsg * is set to NULL. * @@ -559,10 +561,10 @@ err: /* * Read a single xlog page including at least [pageptr, reqLen] of valid data - * via the read_page() callback. + * via the page_read() callback. * * Returns -1 if the required page cannot be read for some reason; errormsg_buf - * is set in that case (unless the error occurs in the read_page callback). + * is set in that case (unless the error occurs in the page_read callback). * * We fetch the page from a reader-local cache if we know we have the required * data and if there hasn't been any error since caching the data. @@ -589,7 +591,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) * Data is not in our buffer. * * Every time we actually read the segment, even if we looked at parts of - * it before, we need to do verification as the read_page callback might + * it before, we need to do verification as the page_read callback might * now be rereading data from a different source. * * Whenever switching to a new WAL segment, we read the first page of the @@ -601,9 +603,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) { XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; - readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ, - state->currRecPtr, - state->readBuf); + readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ, + state->currRecPtr, + state->readBuf); if (readLen < 0) goto err; @@ -619,9 +621,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) * First, read the requested data length, but at least a short page header * so that we can validate it. */ - readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), - state->currRecPtr, - state->readBuf); + readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), + state->currRecPtr, + state->readBuf); if (readLen < 0) goto err; @@ -638,9 +640,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) /* still not enough */ if (readLen < XLogPageHeaderSize(hdr)) { - readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr), - state->currRecPtr, - state->readBuf); + readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr), + state->currRecPtr, + state->readBuf); if (readLen < 0) goto err; } @@ -1041,11 +1043,14 @@ err: #endif /* FRONTEND */ /* + * Helper function to ease writing of XLogRoutine->page_read callbacks. + * If this function is used, caller must supply an open_segment callback in + * 'state', as that is used here. + * * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL * fetched from timeline 'tli'. * - * 'seg/segcxt' identify the last segment used. 'openSegment' is a callback - * to open the next segment, if necessary. + * 'seg/segcxt' identify the last segment used. * * Returns true if succeeded, false if an error occurs, in which case * 'errinfo' receives error details. @@ -1054,9 +1059,10 @@ err: * WAL buffers when possible. */ bool -WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, +WALRead(XLogReaderState *state, + char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALOpenSegment *seg, WALSegmentContext *segcxt, - WALSegmentOpen openSegment, WALReadError *errinfo) + WALReadError *errinfo) { char *p; XLogRecPtr recptr; @@ -1086,10 +1092,11 @@ WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, XLogSegNo nextSegNo; if (seg->ws_file >= 0) - close(seg->ws_file); + state->routine.segment_close(state); XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize); - seg->ws_file = openSegment(nextSegNo, segcxt, &tli); + seg->ws_file = state->routine.segment_open(state, nextSegNo, + segcxt, &tli); /* Update the current segment info. */ seg->ws_tli = tli; diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 6cb143e161..bbd801513a 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -783,10 +783,10 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa } } -/* openSegment callback for WALRead */ -static int -wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt, - TimeLineID *tli_p) +/* XLogReaderRoutine->segment_open callback for local pg_wal files */ +int +wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, + WALSegmentContext *segcxt, TimeLineID *tli_p) { TimeLineID tli = *tli_p; char path[MAXPGPATH]; @@ -811,8 +811,17 @@ wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt, return -1; /* keep compiler quiet */ } +/* stock XLogReaderRoutine->segment_close callback */ +void +wal_segment_close(XLogReaderState *state) +{ + close(state->seg.ws_file); + /* need to check errno? */ + state->seg.ws_file = -1; +} + /* - * read_page callback for reading local xlog files + * XLogReaderRoutine->page_read callback for reading local xlog files * * Public because it would likely be very helpful for someone writing another * output method outside walsender, e.g. in a bgworker. @@ -937,8 +946,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * as 'count', read the whole page anyway. It's guaranteed to be * zero-padded up to the page boundary if it's incomplete. */ - if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg, - &state->segcxt, wal_segment_open, &errinfo)) + if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli, + &state->seg, &state->segcxt, + &errinfo)) WALReadRaiseError(&errinfo); /* number of valid bytes in the buffer */ diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5adf253583..dc69e5ce5f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -120,7 +120,7 @@ StartupDecodingContext(List *output_plugin_options, TransactionId xmin_horizon, bool need_full_snapshot, bool fast_forward, - XLogPageReadCB read_page, + XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -169,7 +169,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->slot = slot; - ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx); + ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx); if (!ctx->reader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -215,7 +215,8 @@ StartupDecodingContext(List *output_plugin_options, * Otherwise, we set for decoding to start from the given LSN without * marking WAL reserved beforehand. In that scenario, it's up to the * caller to guarantee that WAL remains available. - * read_page, prepare_write, do_write, update_progress -- + * xl_routine -- XLogReaderRoutine for underlying XLogReader + * prepare_write, do_write, update_progress -- * callbacks that perform the use-case dependent, actual, work. * * Needs to be called while in a memory context that's at least as long lived @@ -230,7 +231,7 @@ CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, - XLogPageReadCB read_page, + XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -327,7 +328,7 @@ CreateInitDecodingContext(char *plugin, ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, need_full_snapshot, false, - read_page, prepare_write, do_write, + xl_routine, prepare_write, do_write, update_progress); /* call output plugin initialization callback */ @@ -357,7 +358,10 @@ CreateInitDecodingContext(char *plugin, * fast_forward * bypass the generation of logical changes. * - * read_page, prepare_write, do_write, update_progress + * xl_routine + * XLogReaderRoutine used by underlying xlogreader + * + * prepare_write, do_write, update_progress * callbacks that have to be filled to perform the use-case dependent, * actual work. * @@ -372,7 +376,7 @@ LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, - XLogPageReadCB read_page, + XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -425,7 +429,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, false, - fast_forward, read_page, prepare_write, + fast_forward, xl_routine, prepare_write, do_write, update_progress); /* call output plugin initialization callback */ diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index fded8e8290..b99c94e848 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -233,7 +233,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin ctx = CreateDecodingContext(InvalidXLogRecPtr, options, false, - read_local_xlog_page, + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), LogicalOutputPrepareWrite, LogicalOutputWrite, NULL); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index ae751e94e7..26890dffb4 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -152,8 +152,10 @@ create_logical_replication_slot(char *name, char *plugin, ctx = CreateInitDecodingContext(plugin, NIL, false, /* just catalogs is OK */ restart_lsn, - read_local_xlog_page, NULL, NULL, - NULL); + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), + NULL, NULL, NULL); /* * If caller needs us to determine the decoding start point, do so now. @@ -464,7 +466,9 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) ctx = CreateDecodingContext(InvalidXLogRecPtr, NIL, true, /* fast_forward */ - read_local_xlog_page, + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), NULL, NULL, NULL); /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 8b55bbfcb2..d18475b854 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -54,8 +54,8 @@ #include "access/transam.h" #include "access/xact.h" #include "access/xlog_internal.h" +#include "access/xlogreader.h" #include "access/xlogutils.h" - #include "catalog/pg_authid.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" @@ -248,8 +248,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); -static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, - TimeLineID *tli_p); +static int WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, + WALSegmentContext *segcxt, TimeLineID *tli_p); static void UpdateSpillStats(LogicalDecodingContext *ctx); @@ -798,7 +798,8 @@ StartReplication(StartReplicationCmd *cmd) } /* - * read_page callback for logical decoding contexts, as a walsender process. + * XLogReaderRoutine->page_read callback for logical decoding contexts, as a + * walsender process. * * Inside the walsender we can do better than read_local_xlog_page, * which has to do a plain sleep/busy loop, because the walsender's latch gets @@ -832,7 +833,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req count = flushptr - targetPagePtr; /* part of the page available */ /* now actually read the data, we know it's there */ - if (!WALRead(cur_page, + if (!WALRead(state, + cur_page, targetPagePtr, XLOG_BLCKSZ, sendSeg->ws_tli, /* Pass the current TLI because only @@ -840,7 +842,6 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req * TLI is needed. */ sendSeg, sendCxt, - WalSndSegmentOpen, &errinfo)) WALReadRaiseError(&errinfo); @@ -1005,7 +1006,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, InvalidXLogRecPtr, - logical_read_xlog_page, + XL_ROUTINE(.page_read = logical_read_xlog_page, + .segment_open = WalSndSegmentOpen, + .segment_close = wal_segment_close), WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); @@ -1168,7 +1171,9 @@ StartLogicalReplication(StartReplicationCmd *cmd) */ logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options, false, - logical_read_xlog_page, + XL_ROUTINE(.page_read = logical_read_xlog_page, + .segment_open = WalSndSegmentOpen, + .segment_close = wal_segment_close), WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); @@ -2441,9 +2446,10 @@ WalSndKill(int code, Datum arg) SpinLockRelease(&walsnd->mutex); } -/* walsender's openSegment callback for WALRead */ +/* XLogReaderRoutine->segment_open callback */ static int -WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, +WalSndSegmentOpen(XLogReaderState *state, + XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p) { char path[MAXPGPATH]; @@ -2531,6 +2537,12 @@ XLogSendPhysical(void) Size nbytes; XLogSegNo segno; WALReadError errinfo; + static XLogReaderState fake_xlogreader = + { + /* Fake xlogreader state for WALRead */ + .routine.segment_open = WalSndSegmentOpen, + .routine.segment_close = wal_segment_close + }; /* If requested switch the WAL sender to the stopping state. */ if (got_STOPPING) @@ -2748,7 +2760,8 @@ XLogSendPhysical(void) enlargeStringInfo(&output_message, nbytes); retry: - if (!WALRead(&output_message.data[output_message.len], + if (!WALRead(&fake_xlogreader, + &output_message.data[output_message.len], startptr, nbytes, sendSeg->ws_tli, /* Pass the current TLI because only @@ -2756,7 +2769,6 @@ retry: * TLI is needed. */ sendSeg, sendCxt, - WalSndSegmentOpen, &errinfo)) WALReadRaiseError(&errinfo); diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index c51b5db315..d637f5eb77 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -66,7 +66,8 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, private.tliIndex = tliIndex; private.restoreCommand = restoreCommand; - xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead, + xlogreader = XLogReaderAllocate(WalSegSz, datadir, + XL_ROUTINE(.page_read = &SimpleXLogPageRead), &private); if (xlogreader == NULL) pg_fatal("out of memory"); @@ -117,7 +118,8 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex, private.tliIndex = tliIndex; private.restoreCommand = restoreCommand; - xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead, + xlogreader = XLogReaderAllocate(WalSegSz, datadir, + XL_ROUTINE(.page_read = &SimpleXLogPageRead), &private); if (xlogreader == NULL) pg_fatal("out of memory"); @@ -176,7 +178,8 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, private.tliIndex = tliIndex; private.restoreCommand = restoreCommand; - xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead, + xlogreader = XLogReaderAllocate(WalSegSz, datadir, + XL_ROUTINE(.page_read = &SimpleXLogPageRead), &private); if (xlogreader == NULL) pg_fatal("out of memory"); diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index d7bd9ccac2..e29f65500f 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -279,9 +279,10 @@ identify_target_directory(char *directory, char *fname) return NULL; /* not reached */ } -/* pg_waldump's openSegment callback for WALRead */ +/* pg_waldump's XLogReaderRoutine->segment_open callback */ static int -WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt, +WALDumpOpenSegment(XLogReaderState *state, + XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p) { TimeLineID tli = *tli_p; @@ -321,8 +322,18 @@ WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt, } /* - * XLogReader read_page callback + * pg_waldump's XLogReaderRoutine->segment_close callback. Same as + * wal_segment_close */ +static void +WALDumpCloseSegment(XLogReaderState *state) +{ + close(state->seg.ws_file); + /* need to check errno? */ + state->seg.ws_file = -1; +} + +/* pg_waldump's XLogReaderRoutine->page_read callback */ static int WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetPtr, char *readBuff) @@ -344,8 +355,9 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, } } - if (!WALRead(readBuff, targetPagePtr, count, private->timeline, - &state->seg, &state->segcxt, WALDumpOpenSegment, &errinfo)) + if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline, + &state->seg, &state->segcxt, + &errinfo)) { WALOpenSegment *seg = &errinfo.wre_seg; char fname[MAXPGPATH]; @@ -1031,8 +1043,12 @@ main(int argc, char **argv) /* done with argument parsing, do the actual work */ /* we have everything we need, start reading */ - xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, WALDumpReadPage, - &private); + xlogreader_state = + XLogReaderAllocate(WalSegSz, waldir, + XL_ROUTINE(.page_read = WALDumpReadPage, + .segment_open = WALDumpOpenSegment, + .segment_close = WALDumpCloseSegment), + &private); if (!xlogreader_state) fatal_error("out of memory"); diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 4582196e18..81af200f5e 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -17,6 +17,13 @@ * XLogBeginRead() or XLogFindNextRecord(), and call XLogReadRecord() * until it returns NULL. * + * Callers supply a page_read callback if they want to to call + * XLogReadRecord or XLogFindNextRecord; it can be passed in as NULL + * otherwise. The WALRead function can be used as a helper to write + * page_read callbacks, but it is not mandatory; callers that use it, + * must supply open_segment callbacks. The close_segment callback + * must always be supplied. + * * After reading a record with XLogReadRecord(), it's decomposed into * the per-block and main data parts, and the parts can be accessed * with the XLogRec* macros and functions. You can also decode a @@ -50,12 +57,69 @@ typedef struct WALSegmentContext typedef struct XLogReaderState XLogReaderState; -/* Function type definition for the read_page callback */ +/* Function type definitions for various xlogreader interactions */ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf); +typedef int (*WALSegmentOpenCB) (XLogReaderState *xlogreader, + XLogSegNo nextSegNo, + WALSegmentContext *segcxt, + TimeLineID *tli_p); +typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader); + +typedef struct XLogReaderRoutine +{ + /* + * Data input callback + * + * This callback shall read at least reqLen valid bytes of the xlog page + * starting at targetPagePtr, and store them in readBuf. The callback + * shall return the number of bytes read (never more than XLOG_BLCKSZ), or + * -1 on failure. The callback shall sleep, if necessary, to wait for the + * requested bytes to become available. The callback will not be invoked + * again for the same page unless more than the returned number of bytes + * are needed. + * + * targetRecPtr is the position of the WAL record we're reading. Usually + * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs + * to read and verify the page or segment header, before it reads the + * actual WAL record it's interested in. In that case, targetRecPtr can + * be used to determine which timeline to read the page from. + * + * The callback shall set ->seg.ws_tli to the TLI of the file the page was + * read from. + */ + XLogPageReadCB page_read; + + /* + * Callback to open the specified WAL segment for reading. The file + * descriptor of the opened segment shall be returned. In case of + * failure, an error shall be raised by the callback and it shall not + * return. + * + * "nextSegNo" is the number of the segment to be opened. + * + * "segcxt" is additional information about the segment. + * + * "tli_p" is an input/output argument. XLogRead() uses it to pass the + * timeline in which the new segment should be found, but the callback can + * use it to return the TLI that it actually opened. + * + * BasicOpenFile() is the preferred way to open the segment file in + * backend code, whereas open(2) should be used in frontend. + */ + WALSegmentOpenCB segment_open; + + /* + * WAL segment close callback. ->seg.ws_file shall be set to a negative + * number. + */ + WALSegmentCloseCB segment_close; +} XLogReaderRoutine; + +#define XL_ROUTINE(...) &(XLogReaderRoutine){__VA_ARGS__} typedef struct { @@ -88,33 +152,16 @@ typedef struct struct XLogReaderState { + /* + * Operational callbacks + */ + XLogReaderRoutine routine; + /* ---------------------------------------- * Public parameters * ---------------------------------------- */ - /* - * Data input callback (mandatory). - * - * This callback shall read at least reqLen valid bytes of the xlog page - * starting at targetPagePtr, and store them in readBuf. The callback - * shall return the number of bytes read (never more than XLOG_BLCKSZ), or - * -1 on failure. The callback shall sleep, if necessary, to wait for the - * requested bytes to become available. The callback will not be invoked - * again for the same page unless more than the returned number of bytes - * are needed. - * - * targetRecPtr is the position of the WAL record we're reading. Usually - * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs - * to read and verify the page or segment header, before it reads the - * actual WAL record it's interested in. In that case, targetRecPtr can - * be used to determine which timeline to read the page from. - * - * The callback shall set ->seg.ws_tli to the TLI of the file the page was - * read from. - */ - XLogPageReadCB read_page; - /* * System identifier of the xlog files we're about to read. Set to zero * (the default value) if unknown or unimportant. @@ -214,30 +261,13 @@ struct XLogReaderState /* Get a new XLogReader */ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size, const char *waldir, - XLogPageReadCB pagereadfunc, + XLogReaderRoutine *routine, void *private_data); +extern XLogReaderRoutine *LocalXLogReaderRoutine(void); /* Free an XLogReader */ extern void XLogReaderFree(XLogReaderState *state); -/* - * Callback to open the specified WAL segment for reading. Returns a valid - * file descriptor when the file was opened successfully. - * - * "nextSegNo" is the number of the segment to be opened. - * - * "segcxt" is additional information about the segment. - * - * "tli_p" is an input/output argument. XLogRead() uses it to pass the - * timeline in which the new segment should be found, but the callback can use - * it to return the TLI that it actually opened. - * - * BasicOpenFile() is the preferred way to open the segment file in backend - * code, whereas open(2) should be used in frontend. - */ -typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt, - TimeLineID *tli_p); - /* Initialize supporting structures */ extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir); @@ -269,9 +299,10 @@ typedef struct WALReadError WALOpenSegment wre_seg; /* Segment we tried to read from. */ } WALReadError; -extern bool WALRead(char *buf, XLogRecPtr startptr, Size count, +extern bool WALRead(XLogReaderState *state, + char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALOpenSegment *seg, - WALSegmentContext *segcxt, WALSegmentOpen openSegment, + WALSegmentContext *segcxt, WALReadError *errinfo); /* Functions for decoding an XLogRecord */ diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 5181a077d9..68ce815476 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -50,6 +50,11 @@ extern void FreeFakeRelcacheEntry(Relation fakerel); extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page); +extern int wal_segment_open(XLogReaderState *state, + XLogSegNo nextSegNo, + WALSegmentContext *segcxt, + TimeLineID *tli_p); +extern void wal_segment_close(XLogReaderState *state); extern void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength); diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 3b7ca7f1da..c2f2475e5d 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -95,14 +95,14 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, - XLogPageReadCB read_page, + XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress); extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, - XLogPageReadCB read_page, + XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress); -- 2.30.2