static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
int reqLen);
static void XLogReaderInvalReadState(XLogReaderState *state);
+static XLogPageReadResult XLogDecodeNextRecord(XLogReaderState *state, bool non_blocking);
static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
/* size of the buffer allocated for error message. */
#define MAX_ERRORMSG_LEN 1000
+/*
+ * Default size; large enough that typical users of XLogReader won't often need
+ * to use the 'oversized' memory allocation code path.
+ */
+#define DEFAULT_DECODE_BUFFER_SIZE (64 * 1024)
+
/*
* Construct a string in state->errormsg_buf explaining what's wrong with
* the current record being read.
va_start(args, fmt);
vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
va_end(args);
+
+ state->errormsg_deferred = true;
+}
+
+/*
+ * Set the size of the decoding buffer. A pointer to a caller supplied memory
+ * region may also be passed in, in which case non-oversized records will be
+ * decoded there.
+ */
+void
+XLogReaderSetDecodeBuffer(XLogReaderState *state, void *buffer, size_t size)
+{
+ Assert(state->decode_buffer == NULL);
+
+ state->decode_buffer = buffer;
+ state->decode_buffer_size = size;
+ state->decode_buffer_tail = buffer;
+ state->decode_buffer_head = buffer;
}
/*
/* initialize caller-provided support functions */
state->routine = *routine;
- state->max_block_id = -1;
-
/*
* Permanently allocate readBuf. We do it this way, rather than just
* making a static array, for two reasons: (1) no need to waste the
void
XLogReaderFree(XLogReaderState *state)
{
- int block_id;
-
if (state->seg.ws_file != -1)
state->routine.segment_close(state);
- for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
- {
- if (state->blocks[block_id].data)
- pfree(state->blocks[block_id].data);
- }
- if (state->main_data)
- pfree(state->main_data);
+ if (state->decode_buffer && state->free_decode_buffer)
+ pfree(state->decode_buffer);
pfree(state->errormsg_buf);
if (state->readRecordBuf)
/* Begin at the passed-in record pointer. */
state->EndRecPtr = RecPtr;
+ state->NextRecPtr = RecPtr;
state->ReadRecPtr = InvalidXLogRecPtr;
+ state->DecodeRecPtr = InvalidXLogRecPtr;
+}
+
+/*
+ * See if we can release the last record that was returned by
+ * XLogNextRecord(), if any, to free up space.
+ */
+void
+XLogReleasePreviousRecord(XLogReaderState *state)
+{
+ DecodedXLogRecord *record;
+
+ if (!state->record)
+ return;
+
+ /*
+ * Remove it from the decoded record queue. It must be the oldest item
+ * decoded, decode_queue_head.
+ */
+ record = state->record;
+ Assert(record == state->decode_queue_head);
+ state->record = NULL;
+ state->decode_queue_head = record->next;
+
+ /* It might also be the newest item decoded, decode_queue_tail. */
+ if (state->decode_queue_tail == record)
+ state->decode_queue_tail = NULL;
+
+ /* Release the space. */
+ if (unlikely(record->oversized))
+ {
+ /* It's not in the the decode buffer, so free it to release space. */
+ pfree(record);
+ }
+ else
+ {
+ /* It must be the head (oldest) record in the decode buffer. */
+ Assert(state->decode_buffer_head == (char *) record);
+
+ /*
+ * We need to update head to point to the next record that is in the
+ * decode buffer, if any, being careful to skip oversized ones
+ * (they're not in the decode buffer).
+ */
+ record = record->next;
+ while (unlikely(record && record->oversized))
+ record = record->next;
+
+ if (record)
+ {
+ /* Adjust head to release space up to the next record. */
+ state->decode_buffer_head = (char *) record;
+ }
+ else
+ {
+ /*
+ * Otherwise we might as well just reset head and tail to the
+ * start of the buffer space, because we're empty. This means
+ * we'll keep overwriting the same piece of memory if we're not
+ * doing any prefetching.
+ */
+ state->decode_buffer_head = state->decode_buffer;
+ state->decode_buffer_tail = state->decode_buffer;
+ }
+ }
+}
+
+/*
+ * Attempt to read an XLOG record.
+ *
+ * XLogBeginRead() or XLogFindNextRecord() and then XLogReadAhead() must be
+ * called before the first call to XLogNextRecord(). This functions returns
+ * records and errors that were put into an internal queue by XLogReadAhead().
+ *
+ * On success, a record is returned.
+ *
+ * The returned record (or *errormsg) points to an internal buffer that's
+ * valid until the next call to XLogNextRecord.
+ */
+DecodedXLogRecord *
+XLogNextRecord(XLogReaderState *state, char **errormsg)
+{
+ /* Release the last record returned by XLogNextRecord(). */
+ XLogReleasePreviousRecord(state);
+
+ if (state->decode_queue_head == NULL)
+ {
+ *errormsg = NULL;
+ if (state->errormsg_deferred)
+ {
+ if (state->errormsg_buf[0] != '\0')
+ *errormsg = state->errormsg_buf;
+ state->errormsg_deferred = false;
+ }
+
+ /*
+ * state->EndRecPtr is expected to have been set by the last call to
+ * XLogBeginRead() or XLogNextRecord(), and is the location of the
+ * error.
+ */
+ Assert(!XLogRecPtrIsInvalid(state->EndRecPtr));
+
+ return NULL;
+ }
+
+ /*
+ * Record this as the most recent record returned, so that we'll release
+ * it next time. This also exposes it to the traditional
+ * XLogRecXXX(xlogreader) macros, which work with the decoder rather than
+ * the record for historical reasons.
+ */
+ state->record = state->decode_queue_head;
+
+ /*
+ * Update the pointers to the beginning and one-past-the-end of this
+ * record, again for the benefit of historical code that expected the
+ * decoder to track this rather than accessing these fields of the record
+ * itself.
+ */
+ state->ReadRecPtr = state->record->lsn;
+ state->EndRecPtr = state->record->next_lsn;
+
+ *errormsg = NULL;
+
+ return state->record;
}
/*
*/
XLogRecord *
XLogReadRecord(XLogReaderState *state, char **errormsg)
+{
+ DecodedXLogRecord *decoded;
+
+ /*
+ * Release last returned record, if there is one. We need to do this so
+ * that we can check for empty decode queue accurately.
+ */
+ XLogReleasePreviousRecord(state);
+
+ /*
+ * Call XLogReadAhead() in blocking mode to make sure there is something
+ * in the queue, though we don't use the result.
+ */
+ if (!XLogReaderHasQueuedRecordOrError(state))
+ XLogReadAhead(state, false /* nonblocking */ );
+
+ /* Consume the head record or error. */
+ decoded = XLogNextRecord(state, errormsg);
+ if (decoded)
+ {
+ /*
+ * This function returns a pointer to the record's header, not the
+ * actual decoded record. The caller will access the decoded record
+ * through the XLogRecGetXXX() macros, which reach the decoded
+ * recorded as xlogreader->record.
+ */
+ Assert(state->record == decoded);
+ return &decoded->header;
+ }
+
+ return NULL;
+}
+
+/*
+ * Allocate space for a decoded record. The only member of the returned
+ * object that is initialized is the 'oversized' flag, indicating that the
+ * decoded record wouldn't fit in the decode buffer and must eventually be
+ * freed explicitly.
+ *
+ * The caller is responsible for adjusting decode_buffer_tail with the real
+ * size after successfully decoding a record into this space. This way, if
+ * decoding fails, then there is nothing to undo unless the 'oversized' flag
+ * was set and pfree() must be called.
+ *
+ * Return NULL if there is no space in the decode buffer and allow_oversized
+ * is false, or if memory allocation fails for an oversized buffer.
+ */
+static DecodedXLogRecord *
+XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversized)
+{
+ size_t required_space = DecodeXLogRecordRequiredSpace(xl_tot_len);
+ DecodedXLogRecord *decoded = NULL;
+
+ /* Allocate a circular decode buffer if we don't have one already. */
+ if (unlikely(state->decode_buffer == NULL))
+ {
+ if (state->decode_buffer_size == 0)
+ state->decode_buffer_size = DEFAULT_DECODE_BUFFER_SIZE;
+ state->decode_buffer = palloc(state->decode_buffer_size);
+ state->decode_buffer_head = state->decode_buffer;
+ state->decode_buffer_tail = state->decode_buffer;
+ state->free_decode_buffer = true;
+ }
+
+ /* Try to allocate space in the circular decode buffer. */
+ if (state->decode_buffer_tail >= state->decode_buffer_head)
+ {
+ /* Empty, or tail is to the right of head. */
+ if (state->decode_buffer_tail + required_space <=
+ state->decode_buffer + state->decode_buffer_size)
+ {
+ /* There is space between tail and end. */
+ decoded = (DecodedXLogRecord *) state->decode_buffer_tail;
+ decoded->oversized = false;
+ return decoded;
+ }
+ else if (state->decode_buffer + required_space <
+ state->decode_buffer_head)
+ {
+ /* There is space between start and head. */
+ decoded = (DecodedXLogRecord *) state->decode_buffer;
+ decoded->oversized = false;
+ return decoded;
+ }
+ }
+ else
+ {
+ /* Tail is to the left of head. */
+ if (state->decode_buffer_tail + required_space <
+ state->decode_buffer_head)
+ {
+ /* There is space between tail and head. */
+ decoded = (DecodedXLogRecord *) state->decode_buffer_tail;
+ decoded->oversized = false;
+ return decoded;
+ }
+ }
+
+ /* Not enough space in the decode buffer. Are we allowed to allocate? */
+ if (allow_oversized)
+ {
+ decoded = palloc_extended(required_space, MCXT_ALLOC_NO_OOM);
+ if (decoded == NULL)
+ return NULL;
+ decoded->oversized = true;
+ return decoded;
+ }
+
+ return NULL;
+}
+
+static XLogPageReadResult
+XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
{
XLogRecPtr RecPtr;
XLogRecord *record;
bool assembled;
bool gotheader;
int readOff;
+ DecodedXLogRecord *decoded;
+ char *errormsg; /* not used */
/*
* randAccess indicates whether to verify the previous-record pointer of
randAccess = false;
/* reset error state */
- *errormsg = NULL;
state->errormsg_buf[0] = '\0';
+ decoded = NULL;
- ResetDecoder(state);
state->abortedRecPtr = InvalidXLogRecPtr;
state->missingContrecPtr = InvalidXLogRecPtr;
- RecPtr = state->EndRecPtr;
+ RecPtr = state->NextRecPtr;
- if (state->ReadRecPtr != InvalidXLogRecPtr)
+ if (state->DecodeRecPtr != InvalidXLogRecPtr)
{
/* read the record after the one we just read */
/*
- * EndRecPtr is pointing to end+1 of the previous WAL record. If
+ * NextRecPtr is pointing to end+1 of the previous WAL record. If
* we're at a page boundary, no more records can fit on the current
* page. We must skip over the page header, but we can't do that until
* we've read in the page, since the header size is variable.
/*
* Caller supplied a position to start at.
*
- * In this case, EndRecPtr should already be pointing to a valid
+ * In this case, NextRecPtr should already be pointing to a valid
* record starting position.
*/
Assert(XRecOffIsValid(RecPtr));
}
restart:
+ state->nonblocking = nonblocking;
state->currRecPtr = RecPtr;
assembled = false;
*/
readOff = ReadPageInternal(state, targetPagePtr,
Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
- if (readOff < 0)
+ if (readOff == XLREAD_WOULDBLOCK)
+ return XLREAD_WOULDBLOCK;
+ else if (readOff < 0)
goto err;
/*
*/
if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
{
- if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
+ if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, record,
randAccess))
goto err;
gotheader = true;
gotheader = false;
}
+ /*
+ * Find space to decode this record. Don't allow oversized allocation if
+ * the caller requested nonblocking. Otherwise, we *have* to try to
+ * decode the record now because the caller has nothing else to do, so
+ * allow an oversized record to be palloc'd if that turns out to be
+ * necessary.
+ */
+ decoded = XLogReadRecordAlloc(state,
+ total_len,
+ !nonblocking /* allow_oversized */ );
+ if (decoded == NULL)
+ {
+ /*
+ * There is no space in the decode buffer. The caller should help
+ * with that problem by consuming some records.
+ */
+ if (nonblocking)
+ return XLREAD_WOULDBLOCK;
+
+ /* We failed to allocate memory for an oversized record. */
+ report_invalid_record(state,
+ "out of memory while trying to decode a record of length %u", total_len);
+ goto err;
+ }
+
len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
if (total_len > len)
{
Min(total_len - gotlen + SizeOfXLogShortPHD,
XLOG_BLCKSZ));
- if (readOff < 0)
+ if (readOff == XLREAD_WOULDBLOCK)
+ return XLREAD_WOULDBLOCK;
+ else if (readOff < 0)
goto err;
Assert(SizeOfXLogShortPHD <= readOff);
if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
{
state->overwrittenRecPtr = RecPtr;
- ResetDecoder(state);
RecPtr = targetPagePtr;
goto restart;
}
if (!gotheader)
{
record = (XLogRecord *) state->readRecordBuf;
- if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
+ if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr,
record, randAccess))
goto err;
gotheader = true;
goto err;
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
- state->ReadRecPtr = RecPtr;
- state->EndRecPtr = targetPagePtr + pageHeaderSize
+ state->DecodeRecPtr = RecPtr;
+ state->NextRecPtr = targetPagePtr + pageHeaderSize
+ MAXALIGN(pageHeader->xlp_rem_len);
}
else
/* Wait for the record data to become available */
readOff = ReadPageInternal(state, targetPagePtr,
Min(targetRecOff + total_len, XLOG_BLCKSZ));
- if (readOff < 0)
+ if (readOff == XLREAD_WOULDBLOCK)
+ return XLREAD_WOULDBLOCK;
+ else if (readOff < 0)
goto err;
/* Record does not cross a page boundary */
if (!ValidXLogRecord(state, record, RecPtr))
goto err;
- state->EndRecPtr = RecPtr + MAXALIGN(total_len);
+ state->NextRecPtr = RecPtr + MAXALIGN(total_len);
- state->ReadRecPtr = RecPtr;
+ state->DecodeRecPtr = RecPtr;
}
/*
(record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
{
/* Pretend it extends to end of segment */
- state->EndRecPtr += state->segcxt.ws_segsize - 1;
- state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
+ state->NextRecPtr += state->segcxt.ws_segsize - 1;
+ state->NextRecPtr -= XLogSegmentOffset(state->NextRecPtr, state->segcxt.ws_segsize);
}
- if (DecodeXLogRecord(state, record, errormsg))
- return record;
+ if (DecodeXLogRecord(state, decoded, record, RecPtr, &errormsg))
+ {
+ /* Record the location of the next record. */
+ decoded->next_lsn = state->NextRecPtr;
+
+ /*
+ * If it's in the decode buffer, mark the decode buffer space as
+ * occupied.
+ */
+ if (!decoded->oversized)
+ {
+ /* The new decode buffer head must be MAXALIGNed. */
+ Assert(decoded->size == MAXALIGN(decoded->size));
+ if ((char *) decoded == state->decode_buffer)
+ state->decode_buffer_tail = state->decode_buffer + decoded->size;
+ else
+ state->decode_buffer_tail += decoded->size;
+ }
+
+ /* Insert it into the queue of decoded records. */
+ Assert(state->decode_queue_tail != decoded);
+ if (state->decode_queue_tail)
+ state->decode_queue_tail->next = decoded;
+ state->decode_queue_tail = decoded;
+ if (!state->decode_queue_head)
+ state->decode_queue_head = decoded;
+ return XLREAD_SUCCESS;
+ }
else
- return NULL;
+ return XLREAD_FAIL;
err:
if (assembled)
state->missingContrecPtr = targetPagePtr;
}
+ if (decoded && decoded->oversized)
+ pfree(decoded);
+
/*
* Invalidate the read state. We might read from a different source after
* failure.
*/
XLogReaderInvalReadState(state);
- if (state->errormsg_buf[0] != '\0')
- *errormsg = state->errormsg_buf;
+ /*
+ * If an error was written to errmsg_buf, it'll be returned to the caller
+ * of XLogReadRecord() after all successfully decoded records from the
+ * read queue.
+ */
+
+ return XLREAD_FAIL;
+}
+
+/*
+ * Try to decode the next available record, and return it. The record will
+ * also be returned to XLogNextRecord(), which must be called to 'consume'
+ * each record.
+ *
+ * If nonblocking is true, may return NULL due to lack of data or WAL decoding
+ * space.
+ */
+DecodedXLogRecord *
+XLogReadAhead(XLogReaderState *state, bool nonblocking)
+{
+ XLogPageReadResult result;
+
+ if (state->errormsg_deferred)
+ return NULL;
+
+ result = XLogDecodeNextRecord(state, nonblocking);
+ if (result == XLREAD_SUCCESS)
+ {
+ Assert(state->decode_queue_tail != NULL);
+ return state->decode_queue_tail;
+ }
return NULL;
}
* Read a single xlog page including at least [pageptr, reqLen] of valid data
* via the page_read() callback.
*
- * Returns -1 if the required page cannot be read for some reason; errormsg_buf
- * is set in that case (unless the error occurs in the page_read callback).
+ * Returns XLREAD_FAIL if the required page cannot be read for some
+ * reason; errormsg_buf is set in that case (unless the error occurs in the
+ * page_read callback).
+ *
+ * Returns XLREAD_WOULDBLOCK if the requested data can't be read without
+ * waiting. This can be returned only if the installed page_read callback
+ * respects the state->nonblocking flag, and cannot read the requested data
+ * immediately.
*
* We fetch the page from a reader-local cache if we know we have the required
* data and if there hasn't been any error since caching the data.
readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
state->currRecPtr,
state->readBuf);
- if (readLen < 0)
+ if (readLen == XLREAD_WOULDBLOCK)
+ return XLREAD_WOULDBLOCK;
+ else if (readLen < 0)
goto err;
/* we can be sure to have enough WAL available, we scrolled back */
readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
state->currRecPtr,
state->readBuf);
- if (readLen < 0)
+ if (readLen == XLREAD_WOULDBLOCK)
+ return XLREAD_WOULDBLOCK;
+ else if (readLen < 0)
goto err;
Assert(readLen <= XLOG_BLCKSZ);
readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
state->currRecPtr,
state->readBuf);
- if (readLen < 0)
+ if (readLen == XLREAD_WOULDBLOCK)
+ return XLREAD_WOULDBLOCK;
+ else if (readLen < 0)
goto err;
}
return readLen;
err:
- XLogReaderInvalReadState(state);
- return -1;
+ if (state->errormsg_buf[0] != '\0')
+ {
+ state->errormsg_deferred = true;
+ XLogReaderInvalReadState(state);
+ }
+ return XLREAD_FAIL;
}
/*
Assert(!XLogRecPtrIsInvalid(RecPtr));
+ /* Make sure ReadPageInternal() can't return XLREAD_WOULDBLOCK. */
+ state->nonblocking = false;
+
/*
* skip over potential continuation data, keeping in mind that it may span
* multiple pages
* ----------------------------------------
*/
-/* private function to reset the state between records */
+/*
+ * Private function to reset the state, forgetting all decoded records, if we
+ * are asked to move to a new read position.
+ */
static void
ResetDecoder(XLogReaderState *state)
{
- int block_id;
+ DecodedXLogRecord *r;
- state->decoded_record = NULL;
-
- state->main_data_len = 0;
-
- for (block_id = 0; block_id <= state->max_block_id; block_id++)
+ /* Reset the decoded record queue, freeing any oversized records. */
+ while ((r = state->decode_queue_head) != NULL)
{
- state->blocks[block_id].in_use = false;
- state->blocks[block_id].has_image = false;
- state->blocks[block_id].has_data = false;
- state->blocks[block_id].apply_image = false;
+ state->decode_queue_head = r->next;
+ if (r->oversized)
+ pfree(r);
}
- state->max_block_id = -1;
+ state->decode_queue_tail = NULL;
+ state->decode_queue_head = NULL;
+ state->record = NULL;
+
+ /* Reset the decode buffer to empty. */
+ state->decode_buffer_tail = state->decode_buffer;
+ state->decode_buffer_head = state->decode_buffer;
+
+ /* Clear error state. */
+ state->errormsg_buf[0] = '\0';
+ state->errormsg_deferred = false;
}
/*
- * Decode the previously read record.
+ * Compute the maximum possible amount of padding that could be required to
+ * decode a record, given xl_tot_len from the record's header. This is the
+ * amount of output buffer space that we need to decode a record, though we
+ * might not finish up using it all.
+ *
+ * This computation is pessimistic and assumes the maximum possible number of
+ * blocks, due to lack of better information.
+ */
+size_t
+DecodeXLogRecordRequiredSpace(size_t xl_tot_len)
+{
+ size_t size = 0;
+
+ /* Account for the fixed size part of the decoded record struct. */
+ size += offsetof(DecodedXLogRecord, blocks[0]);
+ /* Account for the flexible blocks array of maximum possible size. */
+ size += sizeof(DecodedBkpBlock) * (XLR_MAX_BLOCK_ID + 1);
+ /* Account for all the raw main and block data. */
+ size += xl_tot_len;
+ /* We might insert padding before main_data. */
+ size += (MAXIMUM_ALIGNOF - 1);
+ /* We might insert padding before each block's data. */
+ size += (MAXIMUM_ALIGNOF - 1) * (XLR_MAX_BLOCK_ID + 1);
+ /* We might insert padding at the end. */
+ size += (MAXIMUM_ALIGNOF - 1);
+
+ return size;
+}
+
+/*
+ * Decode a record. "decoded" must point to a MAXALIGNed memory area that has
+ * space for at least DecodeXLogRecordRequiredSpace(record) bytes. On
+ * success, decoded->size contains the actual space occupied by the decoded
+ * record, which may turn out to be less.
+ *
+ * Only decoded->oversized member must be initialized already, and will not be
+ * modified. Other members will be initialized as required.
*
* On error, a human-readable error message is returned in *errormsg, and
* the return value is false.
*/
bool
-DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
+DecodeXLogRecord(XLogReaderState *state,
+ DecodedXLogRecord *decoded,
+ XLogRecord *record,
+ XLogRecPtr lsn,
+ char **errormsg)
{
/*
* read next _size bytes from record buffer, but check for overrun first.
} while(0)
char *ptr;
+ char *out;
uint32 remaining;
uint32 datatotal;
RelFileNode *rnode = NULL;
uint8 block_id;
- ResetDecoder(state);
-
- state->decoded_record = record;
- state->record_origin = InvalidRepOriginId;
- state->toplevel_xid = InvalidTransactionId;
-
+ decoded->header = *record;
+ decoded->lsn = lsn;
+ decoded->next = NULL;
+ decoded->record_origin = InvalidRepOriginId;
+ decoded->toplevel_xid = InvalidTransactionId;
+ decoded->main_data = NULL;
+ decoded->main_data_len = 0;
+ decoded->max_block_id = -1;
ptr = (char *) record;
ptr += SizeOfXLogRecord;
remaining = record->xl_tot_len - SizeOfXLogRecord;
COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
- state->main_data_len = main_data_len;
+ decoded->main_data_len = main_data_len;
datatotal += main_data_len;
break; /* by convention, the main data fragment is
* always last */
uint32 main_data_len;
COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
- state->main_data_len = main_data_len;
+ decoded->main_data_len = main_data_len;
datatotal += main_data_len;
break; /* by convention, the main data fragment is
* always last */
}
else if (block_id == XLR_BLOCK_ID_ORIGIN)
{
- COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
+ COPY_HEADER_FIELD(&decoded->record_origin, sizeof(RepOriginId));
}
else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
{
- COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId));
+ COPY_HEADER_FIELD(&decoded->toplevel_xid, sizeof(TransactionId));
}
else if (block_id <= XLR_MAX_BLOCK_ID)
{
DecodedBkpBlock *blk;
uint8 fork_flags;
- if (block_id <= state->max_block_id)
+ /* mark any intervening block IDs as not in use */
+ for (int i = decoded->max_block_id + 1; i < block_id; ++i)
+ decoded->blocks[i].in_use = false;
+
+ if (block_id <= decoded->max_block_id)
{
report_invalid_record(state,
"out-of-order block_id %u at %X/%X",
LSN_FORMAT_ARGS(state->ReadRecPtr));
goto err;
}
- state->max_block_id = block_id;
+ decoded->max_block_id = block_id;
- blk = &state->blocks[block_id];
+ blk = &decoded->blocks[block_id];
blk->in_use = true;
blk->apply_image = false;
/*
* Ok, we've parsed the fragment headers, and verified that the total
* length of the payload in the fragments is equal to the amount of data
- * left. Copy the data of each fragment to a separate buffer.
- *
- * We could just set up pointers into readRecordBuf, but we want to align
- * the data for the convenience of the callers. Backup images are not
- * copied, however; they don't need alignment.
+ * left. Copy the data of each fragment to contiguous space after the
+ * blocks array, inserting alignment padding before the data fragments so
+ * they can be cast to struct pointers by REDO routines.
*/
+ out = ((char *) decoded) +
+ offsetof(DecodedXLogRecord, blocks) +
+ sizeof(decoded->blocks[0]) * (decoded->max_block_id + 1);
/* block data first */
- for (block_id = 0; block_id <= state->max_block_id; block_id++)
+ for (block_id = 0; block_id <= decoded->max_block_id; block_id++)
{
- DecodedBkpBlock *blk = &state->blocks[block_id];
+ DecodedBkpBlock *blk = &decoded->blocks[block_id];
if (!blk->in_use)
continue;
if (blk->has_image)
{
- blk->bkp_image = ptr;
+ /* no need to align image */
+ blk->bkp_image = out;
+ memcpy(out, ptr, blk->bimg_len);
ptr += blk->bimg_len;
+ out += blk->bimg_len;
}
if (blk->has_data)
{
- if (!blk->data || blk->data_len > blk->data_bufsz)
- {
- if (blk->data)
- pfree(blk->data);
-
- /*
- * Force the initial request to be BLCKSZ so that we don't
- * waste time with lots of trips through this stanza as a
- * result of WAL compression.
- */
- blk->data_bufsz = MAXALIGN(Max(blk->data_len, BLCKSZ));
- blk->data = palloc(blk->data_bufsz);
- }
+ out = (char *) MAXALIGN(out);
+ blk->data = out;
memcpy(blk->data, ptr, blk->data_len);
ptr += blk->data_len;
+ out += blk->data_len;
}
}
/* and finally, the main data */
- if (state->main_data_len > 0)
+ if (decoded->main_data_len > 0)
{
- if (!state->main_data || state->main_data_len > state->main_data_bufsz)
- {
- if (state->main_data)
- pfree(state->main_data);
-
- /*
- * main_data_bufsz must be MAXALIGN'ed. In many xlog record
- * types, we omit trailing struct padding on-disk to save a few
- * bytes; but compilers may generate accesses to the xlog struct
- * that assume that padding bytes are present. If the palloc
- * request is not large enough to include such padding bytes then
- * we'll get valgrind complaints due to otherwise-harmless fetches
- * of the padding bytes.
- *
- * In addition, force the initial request to be reasonably large
- * so that we don't waste time with lots of trips through this
- * stanza. BLCKSZ / 2 seems like a good compromise choice.
- */
- state->main_data_bufsz = MAXALIGN(Max(state->main_data_len,
- BLCKSZ / 2));
- state->main_data = palloc(state->main_data_bufsz);
- }
- memcpy(state->main_data, ptr, state->main_data_len);
- ptr += state->main_data_len;
+ out = (char *) MAXALIGN(out);
+ decoded->main_data = out;
+ memcpy(decoded->main_data, ptr, decoded->main_data_len);
+ ptr += decoded->main_data_len;
+ out += decoded->main_data_len;
}
+ /* Report the actual size we used. */
+ decoded->size = MAXALIGN(out - (char *) decoded);
+ Assert(DecodeXLogRecordRequiredSpace(record->xl_tot_len) >=
+ decoded->size);
+
return true;
shortdata_err:
{
DecodedBkpBlock *bkpb;
- if (!record->blocks[block_id].in_use)
+ if (block_id > record->record->max_block_id ||
+ !record->record->blocks[block_id].in_use)
return false;
- bkpb = &record->blocks[block_id];
+ bkpb = &record->record->blocks[block_id];
if (rnode)
*rnode = bkpb->rnode;
if (forknum)
{
DecodedBkpBlock *bkpb;
- if (!record->blocks[block_id].in_use)
+ if (block_id > record->record->max_block_id ||
+ !record->record->blocks[block_id].in_use)
return NULL;
- bkpb = &record->blocks[block_id];
+ bkpb = &record->record->blocks[block_id];
if (!bkpb->has_data)
{
char *ptr;
PGAlignedBlock tmp;
- if (!record->blocks[block_id].in_use)
+ if (block_id > record->record->max_block_id ||
+ !record->record->blocks[block_id].in_use)
return false;
- if (!record->blocks[block_id].has_image)
+ if (!record->record->blocks[block_id].has_image)
return false;
- bkpb = &record->blocks[block_id];
+ bkpb = &record->record->blocks[block_id];
ptr = bkpb->bkp_image;
if (BKPIMAGE_COMPRESSED(bkpb->bimg_info))
uint16 data_bufsz;
} DecodedBkpBlock;
+/*
+ * The decoded contents of a record. This occupies a contiguous region of
+ * memory, with main_data and blocks[n].data pointing to memory after the
+ * members declared here.
+ */
+typedef struct DecodedXLogRecord
+{
+ /* Private member used for resource management. */
+ size_t size; /* total size of decoded record */
+ bool oversized; /* outside the regular decode buffer? */
+ struct DecodedXLogRecord *next; /* decoded record queue link */
+
+ /* Public members. */
+ XLogRecPtr lsn; /* location */
+ XLogRecPtr next_lsn; /* location of next record */
+ XLogRecord header; /* header */
+ RepOriginId record_origin;
+ TransactionId toplevel_xid; /* XID of top-level transaction */
+ char *main_data; /* record's main data portion */
+ uint32 main_data_len; /* main data portion's length */
+ int max_block_id; /* highest block_id in use (-1 if none) */
+ DecodedBkpBlock blocks[FLEXIBLE_ARRAY_MEMBER];
+} DecodedXLogRecord;
+
struct XLogReaderState
{
/*
* Start and end point of last record read. EndRecPtr is also used as the
* position to read next. Calling XLogBeginRead() sets EndRecPtr to the
* starting position and ReadRecPtr to invalid.
+ *
+ * Start and end point of last record returned by XLogReadRecord(). These
+ * are also available as record->lsn and record->next_lsn.
*/
XLogRecPtr ReadRecPtr; /* start of last record read */
XLogRecPtr EndRecPtr; /* end+1 of last record read */
* Use XLogRecGet* functions to investigate the record; these fields
* should not be accessed directly.
* ----------------------------------------
+ * Start and end point of the last record read and decoded by
+ * XLogReadRecordInternal(). NextRecPtr is also used as the position to
+ * decode next. Calling XLogBeginRead() sets NextRecPtr and EndRecPtr to
+ * the requested starting position.
*/
- XLogRecord *decoded_record; /* currently decoded record */
+ XLogRecPtr DecodeRecPtr; /* start of last record decoded */
+ XLogRecPtr NextRecPtr; /* end+1 of last record decoded */
+ XLogRecPtr PrevRecPtr; /* start of previous record decoded */
- char *main_data; /* record's main data portion */
- uint32 main_data_len; /* main data portion's length */
- uint32 main_data_bufsz; /* allocated size of the buffer */
-
- RepOriginId record_origin;
-
- TransactionId toplevel_xid; /* XID of top-level transaction */
-
- /* information about blocks referenced by the record. */
- DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
-
- int max_block_id; /* highest block_id in use (-1 if none) */
+ /* Last record returned by XLogReadRecord(). */
+ DecodedXLogRecord *record;
/* ----------------------------------------
* private/internal state
* ----------------------------------------
*/
+ /*
+ * Buffer for decoded records. This is a circular buffer, though
+ * individual records can't be split in the middle, so some space is often
+ * wasted at the end. Oversized records that don't fit in this space are
+ * allocated separately.
+ */
+ char *decode_buffer;
+ size_t decode_buffer_size;
+ bool free_decode_buffer; /* need to free? */
+ char *decode_buffer_head; /* data is read from the head */
+ char *decode_buffer_tail; /* new data is written at the tail */
+
+ /*
+ * Queue of records that have been decoded. This is a linked list that
+ * usually consists of consecutive records in decode_buffer, but may also
+ * contain oversized records allocated with palloc().
+ */
+ DecodedXLogRecord *decode_queue_head; /* oldest decoded record */
+ DecodedXLogRecord *decode_queue_tail; /* newest decoded record */
+
/*
* Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least
* readLen bytes)
/* Buffer to hold error message */
char *errormsg_buf;
+ bool errormsg_deferred;
+
+ /*
+ * Flag to indicate to XLogPageReadCB that it should not block waiting for
+ * data.
+ */
+ bool nonblocking;
};
+/*
+ * Check if XLogNextRecord() has any more queued records or an error to return.
+ */
+static inline bool
+XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
+{
+ return (state->decode_queue_head != NULL) || state->errormsg_deferred;
+}
+
/* Get a new XLogReader */
extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
const char *waldir,
/* Free an XLogReader */
extern void XLogReaderFree(XLogReaderState *state);
+/* Optionally provide a circular decoding buffer to allow readahead. */
+extern void XLogReaderSetDecodeBuffer(XLogReaderState *state,
+ void *buffer,
+ size_t size);
+
/* Position the XLogReader to given record */
extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr);
#ifdef FRONTEND
extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
#endif /* FRONTEND */
+/* Return values from XLogPageReadCB. */
+typedef enum XLogPageReadResult
+{
+ XLREAD_SUCCESS = 0, /* record is successfully read */
+ XLREAD_FAIL = -1, /* failed during reading a record */
+ XLREAD_WOULDBLOCK = -2 /* nonblocking mode only, no data */
+} XLogPageReadResult;
+
/* Read the next XLog record. Returns NULL on end-of-WAL or failure */
extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
char **errormsg);
+/* Consume the next record or error. */
+extern DecodedXLogRecord *XLogNextRecord(XLogReaderState *state,
+ char **errormsg);
+
+/* Release the previously returned record, if necessary. */
+extern void XLogReleasePreviousRecord(XLogReaderState *state);
+
+/* Try to read ahead, if there is data and space. */
+extern DecodedXLogRecord *XLogReadAhead(XLogReaderState *state,
+ bool nonblocking);
+
/* Validate a page */
extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
XLogRecPtr recptr, char *phdr);
/* Functions for decoding an XLogRecord */
-extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
+extern size_t DecodeXLogRecordRequiredSpace(size_t xl_tot_len);
+extern bool DecodeXLogRecord(XLogReaderState *state,
+ DecodedXLogRecord *decoded,
+ XLogRecord *record,
+ XLogRecPtr lsn,
char **errmsg);
-#define XLogRecGetTotalLen(decoder) ((decoder)->decoded_record->xl_tot_len)
-#define XLogRecGetPrev(decoder) ((decoder)->decoded_record->xl_prev)
-#define XLogRecGetInfo(decoder) ((decoder)->decoded_record->xl_info)
-#define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
-#define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
-#define XLogRecGetOrigin(decoder) ((decoder)->record_origin)
-#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid)
-#define XLogRecGetData(decoder) ((decoder)->main_data)
-#define XLogRecGetDataLen(decoder) ((decoder)->main_data_len)
-#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)
-#define XLogRecHasBlockRef(decoder, block_id) \
- ((decoder)->blocks[block_id].in_use)
-#define XLogRecHasBlockImage(decoder, block_id) \
- ((decoder)->blocks[block_id].has_image)
-#define XLogRecBlockImageApply(decoder, block_id) \
- ((decoder)->blocks[block_id].apply_image)
+/*
+ * Macros that provide access to parts of the record most recently returned by
+ * XLogReadRecord() or XLogNextRecord().
+ */
+#define XLogRecGetTotalLen(decoder) ((decoder)->record->header.xl_tot_len)
+#define XLogRecGetPrev(decoder) ((decoder)->record->header.xl_prev)
+#define XLogRecGetInfo(decoder) ((decoder)->record->header.xl_info)
+#define XLogRecGetRmid(decoder) ((decoder)->record->header.xl_rmid)
+#define XLogRecGetXid(decoder) ((decoder)->record->header.xl_xid)
+#define XLogRecGetOrigin(decoder) ((decoder)->record->record_origin)
+#define XLogRecGetTopXid(decoder) ((decoder)->record->toplevel_xid)
+#define XLogRecGetData(decoder) ((decoder)->record->main_data)
+#define XLogRecGetDataLen(decoder) ((decoder)->record->main_data_len)
+#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->record->max_block_id >= 0)
+#define XLogRecMaxBlockId(decoder) ((decoder)->record->max_block_id)
+#define XLogRecGetBlock(decoder, i) (&(decoder)->record->blocks[(i)])
+#define XLogRecHasBlockRef(decoder, block_id) \
+ (((decoder)->record->max_block_id >= (block_id)) && \
+ ((decoder)->record->blocks[block_id].in_use))
+#define XLogRecHasBlockImage(decoder, block_id) \
+ ((decoder)->record->blocks[block_id].has_image)
+#define XLogRecBlockImageApply(decoder, block_id) \
+ ((decoder)->record->blocks[block_id].apply_image)
#ifndef FRONTEND
extern FullTransactionId XLogRecGetFullXid(XLogReaderState *record);