#include "access/xlogreader.h"
#include "access/xlogrecord.h"
+#include "access/xlog_internal.h"
#include "access/transam.h"
#include "common/fe_memutils.h"
#include "getopt_long.h"
* Store per-rmgr and per-record statistics for a given record.
*/
static void
-XLogDumpCountRecord(XLogDumpConfig *config, XLogDumpStats *stats, XLogRecPtr ReadRecPtr, XLogRecord *record)
+XLogDumpCountRecord(XLogDumpConfig *config, XLogDumpStats *stats,
+ XLogReaderState *record)
{
RmgrId rmid;
uint8 recid;
+ uint32 rec_len;
+ uint32 fpi_len;
stats->count++;
/* Update per-rmgr statistics */
- rmid = record->xl_rmid;
+ rmid = XLogRecGetRmid(record);
+ rec_len = XLogRecGetDataLen(record) + SizeOfXLogRecord;
+ fpi_len = record->decoded_record->xl_tot_len - rec_len;
stats->rmgr_stats[rmid].count++;
- stats->rmgr_stats[rmid].rec_len +=
- record->xl_len + SizeOfXLogRecord;
- stats->rmgr_stats[rmid].fpi_len +=
- record->xl_tot_len - (record->xl_len + SizeOfXLogRecord);
+ stats->rmgr_stats[rmid].rec_len += rec_len;
+ stats->rmgr_stats[rmid].fpi_len += fpi_len;
/*
* Update per-record statistics, where the record is identified by a
- * combination of the RmgrId and the four bits of the xl_info field
- * that are the rmgr's domain (resulting in sixteen possible entries
- * per RmgrId).
+ * combination of the RmgrId and the four bits of the xl_info field that
+ * are the rmgr's domain (resulting in sixteen possible entries per
+ * RmgrId).
*/
- recid = record->xl_info >> 4;
+ recid = XLogRecGetInfo(record) >> 4;
stats->record_stats[rmid][recid].count++;
- stats->record_stats[rmid][recid].rec_len +=
- record->xl_len + SizeOfXLogRecord;
- stats->record_stats[rmid][recid].fpi_len +=
- record->xl_tot_len - (record->xl_len + SizeOfXLogRecord);
+ stats->record_stats[rmid][recid].rec_len += rec_len;
+ stats->record_stats[rmid][recid].fpi_len += fpi_len;
}
/*
* Print a record to stdout
*/
static void
-XLogDumpDisplayRecord(XLogDumpConfig *config, XLogRecPtr ReadRecPtr, XLogRecord *record)
+XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
{
- const char *id;
- const RmgrDescData *desc = &RmgrDescTable[record->xl_rmid];
-
- id = desc->rm_identify(record->xl_info);
+ const char *id;
+ const RmgrDescData *desc = &RmgrDescTable[XLogRecGetRmid(record)];
+ RelFileNode rnode;
+ ForkNumber forknum;
+ BlockNumber blk;
+ int block_id;
+ uint8 info = XLogRecGetInfo(record);
+ XLogRecPtr xl_prev = XLogRecGetPrev(record);
+
+ id = desc->rm_identify(info);
if (id == NULL)
- id = psprintf("UNKNOWN (%x)", record->xl_info & ~XLR_INFO_MASK);
+ id = psprintf("UNKNOWN (%x)", info & ~XLR_INFO_MASK);
- printf("rmgr: %-11s len (rec/tot): %6u/%6u, tx: %10u, lsn: %X/%08X, prev %X/%08X, bkp: %u%u%u%u, desc: %s ",
+ printf("rmgr: %-11s len (rec/tot): %6u/%6u, tx: %10u, lsn: %X/%08X, prev %X/%08X, ",
desc->rm_name,
- record->xl_len, record->xl_tot_len,
- record->xl_xid,
- (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr,
- (uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
- !!(XLR_BKP_BLOCK(0) & record->xl_info),
- !!(XLR_BKP_BLOCK(1) & record->xl_info),
- !!(XLR_BKP_BLOCK(2) & record->xl_info),
- !!(XLR_BKP_BLOCK(3) & record->xl_info),
- id);
+ XLogRecGetDataLen(record), XLogRecGetTotalLen(record),
+ XLogRecGetXid(record),
+ (uint32) (record->ReadRecPtr >> 32), (uint32) record->ReadRecPtr,
+ (uint32) (xl_prev >> 32), (uint32) xl_prev);
+ printf("desc: %s ", id);
/* the desc routine will printf the description directly to stdout */
desc->rm_desc(NULL, record);
- putchar('\n');
-
- if (config->bkp_details)
+ if (!config->bkp_details)
{
- int bkpnum;
- char *blk = (char *) XLogRecGetData(record) + record->xl_len;
-
- for (bkpnum = 0; bkpnum < XLR_MAX_BKP_BLOCKS; bkpnum++)
+ /* print block references (short format) */
+ for (block_id = 0; block_id <= record->max_block_id; block_id++)
{
- BkpBlock bkpb;
-
- if (!(XLR_BKP_BLOCK(bkpnum) & record->xl_info))
+ if (!XLogRecHasBlockRef(record, block_id))
continue;
- memcpy(&bkpb, blk, sizeof(BkpBlock));
- blk += sizeof(BkpBlock);
- blk += BLCKSZ - bkpb.hole_length;
+ XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blk);
+ if (forknum != MAIN_FORKNUM)
+ printf(", blkref #%u: rel %u/%u/%u fork %s blk %u",
+ block_id,
+ rnode.spcNode, rnode.dbNode, rnode.relNode,
+ forkNames[forknum],
+ blk);
+ else
+ printf(", blkref #%u: rel %u/%u/%u blk %u",
+ block_id,
+ rnode.spcNode, rnode.dbNode, rnode.relNode,
+ blk);
+ if (XLogRecHasBlockImage(record, block_id))
+ printf(" FPW");
+ }
+ putchar('\n');
+ }
+ else
+ {
+ /* print block references (detailed format) */
+ putchar('\n');
+ for (block_id = 0; block_id <= record->max_block_id; block_id++)
+ {
+ if (!XLogRecHasBlockRef(record, block_id))
+ continue;
- printf("\tbackup bkp #%u; rel %u/%u/%u; fork: %s; block: %u; hole: offset: %u, length: %u\n",
- bkpnum,
- bkpb.node.spcNode, bkpb.node.dbNode, bkpb.node.relNode,
- forkNames[bkpb.fork],
- bkpb.block, bkpb.hole_offset, bkpb.hole_length);
+ XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blk);
+ printf("\tblkref #%u: rel %u/%u/%u fork %s blk %u",
+ block_id,
+ rnode.spcNode, rnode.dbNode, rnode.relNode,
+ forkNames[forknum],
+ blk);
+ if (XLogRecHasBlockImage(record, block_id))
+ {
+ printf(" (FPW); hole: offset: %u, length: %u\n",
+ record->blocks[block_id].hole_offset,
+ record->blocks[block_id].hole_length);
+ }
+ putchar('\n');
}
}
}
/* process the record */
if (config.stats == true)
- XLogDumpCountRecord(&config, &stats, xlogreader_state->ReadRecPtr, record);
+ XLogDumpCountRecord(&config, &stats, xlogreader_state);
else
- XLogDumpDisplayRecord(&config, xlogreader_state->ReadRecPtr, record);
+ XLogDumpDisplayRecord(&config, xlogreader_state);
/* check whether we printed enough */
config.already_displayed_records++;
typedef struct RmgrDescData
{
const char *rm_name;
- void (*rm_desc) (StringInfo buf, XLogRecord *record);
+ void (*rm_desc) (StringInfo buf, XLogReaderState *record);
const char *(*rm_identify) (uint8 info);
} RmgrDescData;
{
xl_brin_createidx xlrec;
XLogRecPtr recptr;
- XLogRecData rdata;
Page page;
- xlrec.node = index->rd_node;
xlrec.version = BRIN_CURRENT_VERSION;
xlrec.pagesPerRange = BrinGetPagesPerRange(index);
- rdata.buffer = InvalidBuffer;
- rdata.data = (char *) &xlrec;
- rdata.len = SizeOfBrinCreateIdx;
- rdata.next = NULL;
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
+ XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT);
- recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX, &rdata);
+ recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
page = BufferGetPage(meta);
PageSetLSN(page, recptr);
/* XLOG stuff */
if (RelationNeedsWAL(idxrel))
{
- BlockNumber blk = BufferGetBlockNumber(oldbuf);
xl_brin_samepage_update xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[2];
uint8 info = XLOG_BRIN_SAMEPAGE_UPDATE;
- xlrec.node = idxrel->rd_node;
- ItemPointerSetBlockNumber(&xlrec.tid, blk);
- ItemPointerSetOffsetNumber(&xlrec.tid, oldoff);
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBrinSamepageUpdate;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ xlrec.offnum = oldoff;
- rdata[1].data = (char *) newtup;
- rdata[1].len = newsz;
- rdata[1].buffer = oldbuf;
- rdata[1].buffer_std = true;
- rdata[1].next = NULL;
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfBrinSamepageUpdate);
- recptr = XLogInsert(RM_BRIN_ID, info, rdata);
+ XLogRegisterBuffer(0, oldbuf, REGBUF_STANDARD);
+ XLogRegisterBufData(0, (char *) newtup, newsz);
+
+ recptr = XLogInsert(RM_BRIN_ID, info);
PageSetLSN(oldpage, recptr);
}
{
xl_brin_update xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[4];
uint8 info;
info = XLOG_BRIN_UPDATE | (extended ? XLOG_BRIN_INIT_PAGE : 0);
- xlrec.insert.node = idxrel->rd_node;
- ItemPointerSet(&xlrec.insert.tid, BufferGetBlockNumber(newbuf), newoff);
+ xlrec.insert.offnum = newoff;
xlrec.insert.heapBlk = heapBlk;
- xlrec.insert.tuplen = newsz;
- xlrec.insert.revmapBlk = BufferGetBlockNumber(revmapbuf);
xlrec.insert.pagesPerRange = pagesPerRange;
- ItemPointerSet(&xlrec.oldtid, BufferGetBlockNumber(oldbuf), oldoff);
+ xlrec.oldOffnum = oldoff;
+
+ XLogBeginInsert();
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBrinUpdate;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ /* new page */
+ XLogRegisterData((char *) &xlrec, SizeOfBrinUpdate);
- rdata[1].data = (char *) newtup;
- rdata[1].len = newsz;
- rdata[1].buffer = extended ? InvalidBuffer : newbuf;
- rdata[1].buffer_std = true;
- rdata[1].next = &(rdata[2]);
+ XLogRegisterBuffer(0, newbuf, REGBUF_STANDARD | (extended ? REGBUF_WILL_INIT : 0));
+ XLogRegisterBufData(0, (char *) newtup, newsz);
- rdata[2].data = (char *) NULL;
- rdata[2].len = 0;
- rdata[2].buffer = revmapbuf;
- rdata[2].buffer_std = true;
- rdata[2].next = &(rdata[3]);
+ /* revmap page */
+ XLogRegisterBuffer(1, revmapbuf, REGBUF_STANDARD);
- rdata[3].data = (char *) NULL;
- rdata[3].len = 0;
- rdata[3].buffer = oldbuf;
- rdata[3].buffer_std = true;
- rdata[3].next = NULL;
+ /* old page */
+ XLogRegisterBuffer(2, oldbuf, REGBUF_STANDARD);
- recptr = XLogInsert(RM_BRIN_ID, info, rdata);
+ recptr = XLogInsert(RM_BRIN_ID, info);
PageSetLSN(oldpage, recptr);
PageSetLSN(newpage, recptr);
{
xl_brin_insert xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[3];
uint8 info;
info = XLOG_BRIN_INSERT | (extended ? XLOG_BRIN_INIT_PAGE : 0);
- xlrec.node = idxrel->rd_node;
xlrec.heapBlk = heapBlk;
xlrec.pagesPerRange = pagesPerRange;
- xlrec.revmapBlk = BufferGetBlockNumber(revmapbuf);
- xlrec.tuplen = itemsz;
- ItemPointerSet(&xlrec.tid, blk, off);
-
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBrinInsert;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].buffer_std = false;
- rdata[0].next = &(rdata[1]);
-
- rdata[1].data = (char *) tup;
- rdata[1].len = itemsz;
- rdata[1].buffer = extended ? InvalidBuffer : *buffer;
- rdata[1].buffer_std = true;
- rdata[1].next = &(rdata[2]);
-
- rdata[2].data = (char *) NULL;
- rdata[2].len = 0;
- rdata[2].buffer = revmapbuf;
- rdata[2].buffer_std = false;
- rdata[2].next = NULL;
-
- recptr = XLogInsert(RM_BRIN_ID, info, rdata);
+ xlrec.offnum = off;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfBrinInsert);
+
+ XLogRegisterBuffer(0, *buffer, REGBUF_STANDARD | (extended ? REGBUF_WILL_INIT : 0));
+ XLogRegisterBufData(0, (char *) tup, itemsz);
+
+ XLogRegisterBuffer(1, revmapbuf, 0);
+
+ recptr = XLogInsert(RM_BRIN_ID, info);
PageSetLSN(page, recptr);
PageSetLSN(BufferGetPage(revmapbuf), recptr);
{
xl_brin_revmap_extend xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[2];
- xlrec.node = revmap->rm_irel->rd_node;
xlrec.targetBlk = mapBlk;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBrinRevmapExtend;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].buffer_std = false;
- rdata[0].next = &(rdata[1]);
-
- rdata[1].data = (char *) NULL;
- rdata[1].len = 0;
- rdata[1].buffer = revmap->rm_metaBuf;
- rdata[1].buffer_std = false;
- rdata[1].next = NULL;
-
- recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_REVMAP_EXTEND, rdata);
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfBrinRevmapExtend);
+ XLogRegisterBuffer(0, revmap->rm_metaBuf, 0);
+
+ XLogRegisterBuffer(1, buf, REGBUF_WILL_INIT);
+
+ recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_REVMAP_EXTEND);
PageSetLSN(metapage, recptr);
PageSetLSN(page, recptr);
}
* xlog replay routines
*/
static void
-brin_xlog_createidx(XLogRecPtr lsn, XLogRecord *record)
+brin_xlog_createidx(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_brin_createidx *xlrec = (xl_brin_createidx *) XLogRecGetData(record);
Buffer buf;
Page page;
- /* Backup blocks are not used in create_index records */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
-
/* create the index' metapage */
- buf = XLogReadBuffer(xlrec->node, BRIN_METAPAGE_BLKNO, true);
+ buf = XLogInitBufferForRedo(record, 0);
Assert(BufferIsValid(buf));
page = (Page) BufferGetPage(buf);
brin_metapage_init(page, xlrec->pagesPerRange, xlrec->version);
* revmap.
*/
static void
-brin_xlog_insert_update(XLogRecPtr lsn, XLogRecord *record,
- xl_brin_insert *xlrec, BrinTuple *tuple)
+brin_xlog_insert_update(XLogReaderState *record,
+ xl_brin_insert *xlrec)
{
- BlockNumber blkno;
+ XLogRecPtr lsn = record->EndRecPtr;
Buffer buffer;
Page page;
XLogRedoAction action;
- blkno = ItemPointerGetBlockNumber(&xlrec->tid);
-
/*
* If we inserted the first and only tuple on the page, re-initialize the
* page from scratch.
*/
- if (record->xl_info & XLOG_BRIN_INIT_PAGE)
+ if (XLogRecGetInfo(record) & XLOG_BRIN_INIT_PAGE)
{
- /*
- * No full-page image here. Don't try to read it, because there
- * might be one for the revmap buffer, below.
- */
- buffer = XLogReadBuffer(xlrec->node, blkno, true);
+ buffer = XLogInitBufferForRedo(record, 0);
page = BufferGetPage(buffer);
brin_page_init(page, BRIN_PAGETYPE_REGULAR);
action = BLK_NEEDS_REDO;
}
else
{
- action = XLogReadBufferForRedo(lsn, record, 0,
- xlrec->node, blkno, &buffer);
+ action = XLogReadBufferForRedo(record, 0, &buffer);
}
/* insert the index item into the page */
if (action == BLK_NEEDS_REDO)
{
OffsetNumber offnum;
+ BrinTuple *tuple;
+ Size tuplen;
+
+ tuple = (BrinTuple *) XLogRecGetBlockData(record, 0, &tuplen);
Assert(tuple->bt_blkno == xlrec->heapBlk);
page = (Page) BufferGetPage(buffer);
- offnum = ItemPointerGetOffsetNumber(&(xlrec->tid));
+ offnum = xlrec->offnum;
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
elog(PANIC, "brin_xlog_insert_update: invalid max offset number");
- offnum = PageAddItem(page, (Item) tuple, xlrec->tuplen, offnum, true,
- false);
+ offnum = PageAddItem(page, (Item) tuple, tuplen, offnum, true, false);
if (offnum == InvalidOffsetNumber)
elog(PANIC, "brin_xlog_insert_update: failed to add tuple");
UnlockReleaseBuffer(buffer);
/* update the revmap */
- action = XLogReadBufferForRedo(lsn, record,
- record->xl_info & XLOG_BRIN_INIT_PAGE ? 0 : 1,
- xlrec->node,
- xlrec->revmapBlk, &buffer);
+ action = XLogReadBufferForRedo(record, 1, &buffer);
if (action == BLK_NEEDS_REDO)
{
+ ItemPointerData tid;
+ BlockNumber blkno = BufferGetBlockNumber(buffer);
+
+ ItemPointerSet(&tid, blkno, xlrec->offnum);
page = (Page) BufferGetPage(buffer);
brinSetHeapBlockItemptr(buffer, xlrec->pagesPerRange, xlrec->heapBlk,
- xlrec->tid);
+ tid);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
* replay a BRIN index insertion
*/
static void
-brin_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
+brin_xlog_insert(XLogReaderState *record)
{
xl_brin_insert *xlrec = (xl_brin_insert *) XLogRecGetData(record);
- BrinTuple *newtup;
- newtup = (BrinTuple *) ((char *) xlrec + SizeOfBrinInsert);
-
- brin_xlog_insert_update(lsn, record, xlrec, newtup);
+ brin_xlog_insert_update(record, xlrec);
}
/*
* replay a BRIN index update
*/
static void
-brin_xlog_update(XLogRecPtr lsn, XLogRecord *record)
+brin_xlog_update(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_brin_update *xlrec = (xl_brin_update *) XLogRecGetData(record);
- BlockNumber blkno;
Buffer buffer;
- BrinTuple *newtup;
XLogRedoAction action;
- newtup = (BrinTuple *) ((char *) xlrec + SizeOfBrinUpdate);
-
/* First remove the old tuple */
- blkno = ItemPointerGetBlockNumber(&(xlrec->oldtid));
- action = XLogReadBufferForRedo(lsn, record, 2, xlrec->insert.node,
- blkno, &buffer);
+ action = XLogReadBufferForRedo(record, 2, &buffer);
if (action == BLK_NEEDS_REDO)
{
Page page;
page = (Page) BufferGetPage(buffer);
- offnum = ItemPointerGetOffsetNumber(&(xlrec->oldtid));
+ offnum = xlrec->oldOffnum;
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
elog(PANIC, "brin_xlog_update: invalid max offset number");
}
/* Then insert the new tuple and update revmap, like in an insertion. */
- brin_xlog_insert_update(lsn, record, &xlrec->insert, newtup);
+ brin_xlog_insert_update(record, &xlrec->insert);
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
* Update a tuple on a single page.
*/
static void
-brin_xlog_samepage_update(XLogRecPtr lsn, XLogRecord *record)
+brin_xlog_samepage_update(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_brin_samepage_update *xlrec;
- BlockNumber blkno;
Buffer buffer;
XLogRedoAction action;
xlrec = (xl_brin_samepage_update *) XLogRecGetData(record);
- blkno = ItemPointerGetBlockNumber(&(xlrec->tid));
- action = XLogReadBufferForRedo(lsn, record, 0, xlrec->node, blkno,
- &buffer);
+ action = XLogReadBufferForRedo(record, 0, &buffer);
if (action == BLK_NEEDS_REDO)
{
- int tuplen;
+ Size tuplen;
BrinTuple *mmtuple;
Page page;
OffsetNumber offnum;
- tuplen = record->xl_len - SizeOfBrinSamepageUpdate;
- mmtuple = (BrinTuple *) ((char *) xlrec + SizeOfBrinSamepageUpdate);
+ mmtuple = (BrinTuple *) XLogRecGetBlockData(record, 0, &tuplen);
page = (Page) BufferGetPage(buffer);
- offnum = ItemPointerGetOffsetNumber(&(xlrec->tid));
+ offnum = xlrec->offnum;
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
elog(PANIC, "brin_xlog_samepage_update: invalid max offset number");
* Replay a revmap page extension
*/
static void
-brin_xlog_revmap_extend(XLogRecPtr lsn, XLogRecord *record)
+brin_xlog_revmap_extend(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_brin_revmap_extend *xlrec;
Buffer metabuf;
Buffer buf;
Page page;
+ BlockNumber targetBlk;
XLogRedoAction action;
xlrec = (xl_brin_revmap_extend *) XLogRecGetData(record);
+
+ XLogRecGetBlockTag(record, 1, NULL, NULL, &targetBlk);
+ Assert(xlrec->targetBlk == targetBlk);
+
/* Update the metapage */
- action = XLogReadBufferForRedo(lsn, record, 0, xlrec->node,
- BRIN_METAPAGE_BLKNO, &metabuf);
+ action = XLogReadBufferForRedo(record, 0, &metabuf);
if (action == BLK_NEEDS_REDO)
{
Page metapg;
* image here.
*/
- buf = XLogReadBuffer(xlrec->node, xlrec->targetBlk, true);
+ buf = XLogInitBufferForRedo(record, 1);
page = (Page) BufferGetPage(buf);
brin_page_init(page, BRIN_PAGETYPE_REVMAP);
}
void
-brin_redo(XLogRecPtr lsn, XLogRecord *record)
+brin_redo(XLogReaderState *record)
{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
switch (info & XLOG_BRIN_OPMASK)
{
case XLOG_BRIN_CREATE_INDEX:
- brin_xlog_createidx(lsn, record);
+ brin_xlog_createidx(record);
break;
case XLOG_BRIN_INSERT:
- brin_xlog_insert(lsn, record);
+ brin_xlog_insert(record);
break;
case XLOG_BRIN_UPDATE:
- brin_xlog_update(lsn, record);
+ brin_xlog_update(record);
break;
case XLOG_BRIN_SAMEPAGE_UPDATE:
- brin_xlog_samepage_update(lsn, record);
+ brin_xlog_samepage_update(record);
break;
case XLOG_BRIN_REVMAP_EXTEND:
- brin_xlog_revmap_extend(lsn, record);
+ brin_xlog_revmap_extend(record);
break;
default:
elog(PANIC, "brin_redo: unknown op code %u", info);
Buffer childbuf, GinStatsData *buildStats)
{
Page page = BufferGetPage(stack->buffer);
- XLogRecData *payloadrdata;
GinPlaceToPageRC rc;
uint16 xlflags = 0;
Page childpage = NULL;
/*
* Try to put the incoming tuple on the page. placeToPage will decide if
* the page needs to be split.
+ *
+ * WAL-logging this operation is a bit funny:
+ *
+ * We're responsible for calling XLogBeginInsert() and XLogInsert().
+ * XLogBeginInsert() must be called before placeToPage, because
+ * placeToPage can register some data to the WAL record.
+ *
+ * If placeToPage returns INSERTED, placeToPage has already called
+ * START_CRIT_SECTION(), and we're responsible for calling
+ * END_CRIT_SECTION. When it returns INSERTED, it is also responsible for
+ * registering any data required to replay the operation with
+ * XLogRegisterData(0, ...). It may only add data to block index 0; the
+ * main data of the WAL record is reserved for this function.
+ *
+ * If placeToPage returns SPLIT, we're wholly responsible for WAL logging.
+ * Splits happen infrequently, so we just make a full-page image of all
+ * the pages involved.
*/
+
+ if (RelationNeedsWAL(btree->index))
+ XLogBeginInsert();
+
rc = btree->placeToPage(btree, stack->buffer, stack,
insertdata, updateblkno,
- &payloadrdata, &newlpage, &newrpage);
+ &newlpage, &newrpage);
if (rc == UNMODIFIED)
+ {
+ XLogResetInsertion();
return true;
+ }
else if (rc == INSERTED)
{
/* placeToPage did START_CRIT_SECTION() */
if (RelationNeedsWAL(btree->index))
{
XLogRecPtr recptr;
- XLogRecData rdata[3];
ginxlogInsert xlrec;
BlockIdData childblknos[2];
- xlrec.node = btree->index->rd_node;
- xlrec.blkno = BufferGetBlockNumber(stack->buffer);
+ /*
+ * placetopage already registered stack->buffer as block 0.
+ */
xlrec.flags = xlflags;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = sizeof(ginxlogInsert);
+ if (childbuf != InvalidBuffer)
+ XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
+
+ XLogRegisterData((char *) &xlrec, sizeof(ginxlogInsert));
/*
* Log information about child if this was an insertion of a
*/
if (childbuf != InvalidBuffer)
{
- rdata[0].next = &rdata[1];
-
BlockIdSet(&childblknos[0], BufferGetBlockNumber(childbuf));
BlockIdSet(&childblknos[1], GinPageGetOpaque(childpage)->rightlink);
-
- rdata[1].buffer = InvalidBuffer;
- rdata[1].data = (char *) childblknos;
- rdata[1].len = sizeof(BlockIdData) * 2;
- rdata[1].next = &rdata[2];
-
- rdata[2].buffer = childbuf;
- rdata[2].buffer_std = false;
- rdata[2].data = NULL;
- rdata[2].len = 0;
- rdata[2].next = payloadrdata;
+ XLogRegisterData((char *) childblknos,
+ sizeof(BlockIdData) * 2);
}
- else
- rdata[0].next = payloadrdata;
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata);
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT);
PageSetLSN(page, recptr);
if (childbuf != InvalidBuffer)
PageSetLSN(childpage, recptr);
}
else if (rc == SPLIT)
{
- /* Didn't fit, have to split */
+ /* Didn't fit, had to split */
Buffer rbuffer;
BlockNumber savedRightLink;
- XLogRecData rdata[2];
ginxlogSplit data;
Buffer lbuffer = InvalidBuffer;
Page newrootpg = NULL;
*/
data.node = btree->index->rd_node;
- data.rblkno = BufferGetBlockNumber(rbuffer);
data.flags = xlflags;
if (childbuf != InvalidBuffer)
{
else
data.leftChildBlkno = data.rightChildBlkno = InvalidBlockNumber;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &data;
- rdata[0].len = sizeof(ginxlogSplit);
-
- if (childbuf != InvalidBuffer)
- {
- rdata[0].next = &rdata[1];
-
- rdata[1].buffer = childbuf;
- rdata[1].buffer_std = false;
- rdata[1].data = NULL;
- rdata[1].len = 0;
- rdata[1].next = payloadrdata;
- }
- else
- rdata[0].next = payloadrdata;
-
if (stack->parent == NULL)
{
/*
buildStats->nEntryPages++;
}
- /*
- * root never has a right-link, so we borrow the rrlink field to
- * store the root block number.
- */
- data.rrlink = BufferGetBlockNumber(stack->buffer);
- data.lblkno = BufferGetBlockNumber(lbuffer);
+ data.rrlink = InvalidBlockNumber;
data.flags |= GIN_SPLIT_ROOT;
GinPageGetOpaque(newrpage)->rightlink = InvalidBlockNumber;
{
/* split non-root page */
data.rrlink = savedRightLink;
- data.lblkno = BufferGetBlockNumber(stack->buffer);
GinPageGetOpaque(newrpage)->rightlink = savedRightLink;
GinPageGetOpaque(newlpage)->flags |= GIN_INCOMPLETE_SPLIT;
{
XLogRecPtr recptr;
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
+ /*
+ * We just take full page images of all the split pages. Splits
+ * are uncommon enough that it's not worth complicating the code
+ * to be more efficient.
+ */
+ if (stack->parent == NULL)
+ {
+ XLogRegisterBuffer(0, lbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
+ XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
+ XLogRegisterBuffer(2, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
+ }
+ else
+ {
+ XLogRegisterBuffer(0, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
+ XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
+ }
+ if (BufferIsValid(childbuf))
+ XLogRegisterBuffer(3, childbuf, 0);
+
+ XLogRegisterData((char *) &data, sizeof(ginxlogSplit));
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT);
PageSetLSN(BufferGetPage(stack->buffer), recptr);
PageSetLSN(BufferGetPage(rbuffer), recptr);
if (stack->parent == NULL)
static void dataSplitPageInternal(GinBtree btree, Buffer origbuf,
GinBtreeStack *stack,
void *insertdata, BlockNumber updateblkno,
- XLogRecData **prdata, Page *newlpage, Page *newrpage);
+ Page *newlpage, Page *newrpage);
static disassembledLeaf *disassembleLeaf(Page page);
static bool leafRepackItems(disassembledLeaf *leaf, ItemPointer remaining);
static bool addItemsToLeaf(disassembledLeaf *leaf, ItemPointer newItems,
int nNewItems);
-static XLogRecData *constructLeafRecompressWALData(Buffer buf,
- disassembledLeaf *leaf);
+static void registerLeafRecompressWALData(Buffer buf, disassembledLeaf *leaf);
static void dataPlaceToPageLeafRecompress(Buffer buf, disassembledLeaf *leaf);
static void dataPlaceToPageLeafSplit(Buffer buf,
disassembledLeaf *leaf,
ItemPointerData lbound, ItemPointerData rbound,
- XLogRecData **prdata, Page lpage, Page rpage);
+ Page lpage, Page rpage);
/*
* Read TIDs from leaf data page to single uncompressed array. The TIDs are
*/
static GinPlaceToPageRC
dataPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
- void *insertdata, XLogRecData **prdata,
- Page *newlpage, Page *newrpage)
+ void *insertdata, Page *newlpage, Page *newrpage)
{
GinBtreeDataLeafInsertData *items = insertdata;
ItemPointer newItems = &items->items[items->curitem];
*/
MemoryContextSwitchTo(oldCxt);
if (RelationNeedsWAL(btree->index))
- *prdata = constructLeafRecompressWALData(buf, leaf);
- else
- *prdata = NULL;
+ registerLeafRecompressWALData(buf, leaf);
START_CRIT_SECTION();
dataPlaceToPageLeafRecompress(buf, leaf);
*newrpage = MemoryContextAlloc(oldCxt, BLCKSZ);
dataPlaceToPageLeafSplit(buf, leaf, lbound, rbound,
- prdata, *newlpage, *newrpage);
+ *newlpage, *newrpage);
Assert(GinPageRightMost(page) ||
ginCompareItemPointers(GinDataPageGetRightBound(*newlpage),
*/
if (removedsomething)
{
- XLogRecData *payloadrdata = NULL;
bool modified;
/*
}
if (RelationNeedsWAL(indexrel))
- payloadrdata = constructLeafRecompressWALData(buffer, leaf);
+ {
+ XLogBeginInsert();
+ registerLeafRecompressWALData(buffer, leaf);
+ }
START_CRIT_SECTION();
dataPlaceToPageLeafRecompress(buffer, leaf);
if (RelationNeedsWAL(indexrel))
{
XLogRecPtr recptr;
- XLogRecData rdata;
- ginxlogVacuumDataLeafPage xlrec;
- xlrec.node = indexrel->rd_node;
- xlrec.blkno = BufferGetBlockNumber(buffer);
-
- rdata.buffer = InvalidBuffer;
- rdata.data = (char *) &xlrec;
- rdata.len = offsetof(ginxlogVacuumDataLeafPage, data);
- rdata.next = payloadrdata;
-
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_DATA_LEAF_PAGE, &rdata);
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_DATA_LEAF_PAGE);
PageSetLSN(page, recptr);
}
* Construct a ginxlogRecompressDataLeaf record representing the changes
* in *leaf.
*/
-static XLogRecData *
-constructLeafRecompressWALData(Buffer buf, disassembledLeaf *leaf)
+static void
+registerLeafRecompressWALData(Buffer buf, disassembledLeaf *leaf)
{
int nmodified = 0;
char *walbufbegin;
char *walbufend;
- XLogRecData *rdata;
dlist_iter iter;
int segno;
ginxlogRecompressDataLeaf *recompress_xlog;
nmodified++;
}
- walbufbegin = palloc(
- sizeof(ginxlogRecompressDataLeaf) +
- BLCKSZ + /* max size needed to hold the segment
- * data */
- nmodified * 2 + /* (segno + action) per action */
- sizeof(XLogRecData));
+ walbufbegin =
+ palloc(sizeof(ginxlogRecompressDataLeaf) +
+ BLCKSZ + /* max size needed to hold the segment data */
+ nmodified * 2 /* (segno + action) per action */
+ );
walbufend = walbufbegin;
recompress_xlog = (ginxlogRecompressDataLeaf *) walbufend;
segno++;
}
- rdata = (XLogRecData *) MAXALIGN(walbufend);
- rdata->buffer = buf;
- rdata->buffer_std = TRUE;
- rdata->data = walbufbegin;
- rdata->len = walbufend - walbufbegin;
- rdata->next = NULL;
- return rdata;
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+ XLogRegisterBufData(0, walbufbegin, walbufend - walbufbegin);
+
}
/*
static void
dataPlaceToPageLeafSplit(Buffer buf, disassembledLeaf *leaf,
ItemPointerData lbound, ItemPointerData rbound,
- XLogRecData **prdata, Page lpage, Page rpage)
+ Page lpage, Page rpage)
{
char *ptr;
int segsize;
dlist_node *firstright;
leafSegmentInfo *seginfo;
- /* these must be static so they can be returned to caller */
- static ginxlogSplitDataLeaf split_xlog;
- static XLogRecData rdata[3];
-
/* Initialize temporary pages to hold the new left and right pages */
GinInitPage(lpage, GIN_DATA | GIN_LEAF | GIN_COMPRESSED, BLCKSZ);
GinInitPage(rpage, GIN_DATA | GIN_LEAF | GIN_COMPRESSED, BLCKSZ);
Assert(rsize == leaf->rsize);
GinDataPageSetDataSize(rpage, rsize);
*GinDataPageGetRightBound(rpage) = rbound;
-
- /* Create WAL record */
- split_xlog.lsize = lsize;
- split_xlog.rsize = rsize;
- split_xlog.lrightbound = lbound;
- split_xlog.rrightbound = rbound;
-
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &split_xlog;
- rdata[0].len = sizeof(ginxlogSplitDataLeaf);
- rdata[0].next = &rdata[1];
-
- rdata[1].buffer = InvalidBuffer;
- rdata[1].data = (char *) GinDataLeafPageGetPostingList(lpage);
- rdata[1].len = lsize;
- rdata[1].next = &rdata[2];
-
- rdata[2].buffer = InvalidBuffer;
- rdata[2].data = (char *) GinDataLeafPageGetPostingList(rpage);
- rdata[2].len = rsize;
- rdata[2].next = NULL;
-
- *prdata = rdata;
}
/*
*
* In addition to inserting the given item, the downlink of the existing item
* at 'off' is updated to point to 'updateblkno'.
+ *
+ * On INSERTED, registers the buffer as buffer ID 0, with data.
+ * On SPLIT, returns rdata that represents the split pages in *prdata.
*/
static GinPlaceToPageRC
dataPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
void *insertdata, BlockNumber updateblkno,
- XLogRecData **prdata, Page *newlpage, Page *newrpage)
+ Page *newlpage, Page *newrpage)
{
Page page = BufferGetPage(buf);
OffsetNumber off = stack->off;
PostingItem *pitem;
- /* these must be static so they can be returned to caller */
- static XLogRecData rdata;
+ /* this must be static so it can be returned to caller */
static ginxlogInsertDataInternal data;
/* split if we have to */
if (GinNonLeafDataPageGetFreeSpace(page) < sizeof(PostingItem))
{
dataSplitPageInternal(btree, buf, stack, insertdata, updateblkno,
- prdata, newlpage, newrpage);
+ newlpage, newrpage);
return SPLIT;
}
- *prdata = &rdata;
Assert(GinPageIsData(page));
START_CRIT_SECTION();
pitem = (PostingItem *) insertdata;
GinDataPageAddPostingItem(page, pitem, off);
- data.offset = off;
- data.newitem = *pitem;
+ if (RelationNeedsWAL(btree->index))
+ {
+ data.offset = off;
+ data.newitem = *pitem;
- rdata.buffer = buf;
- rdata.buffer_std = TRUE;
- rdata.data = (char *) &data;
- rdata.len = sizeof(ginxlogInsertDataInternal);
- rdata.next = NULL;
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+ XLogRegisterBufData(0, (char *) &data,
+ sizeof(ginxlogInsertDataInternal));
+ }
return INSERTED;
}
static GinPlaceToPageRC
dataPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
void *insertdata, BlockNumber updateblkno,
- XLogRecData **prdata,
Page *newlpage, Page *newrpage)
{
Page page = BufferGetPage(buf);
if (GinPageIsLeaf(page))
return dataPlaceToPageLeaf(btree, buf, stack, insertdata,
- prdata, newlpage, newrpage);
+ newlpage, newrpage);
else
return dataPlaceToPageInternal(btree, buf, stack,
insertdata, updateblkno,
- prdata, newlpage, newrpage);
+ newlpage, newrpage);
}
/*
dataSplitPageInternal(GinBtree btree, Buffer origbuf,
GinBtreeStack *stack,
void *insertdata, BlockNumber updateblkno,
- XLogRecData **prdata, Page *newlpage, Page *newrpage)
+ Page *newlpage, Page *newrpage)
{
Page oldpage = BufferGetPage(origbuf);
OffsetNumber off = stack->off;
Page lpage;
Page rpage;
OffsetNumber separator;
-
- /* these must be static so they can be returned to caller */
- static ginxlogSplitDataInternal data;
- static XLogRecData rdata[4];
- static PostingItem allitems[(BLCKSZ / sizeof(PostingItem)) + 1];
+ PostingItem allitems[(BLCKSZ / sizeof(PostingItem)) + 1];
lpage = PageGetTempPage(oldpage);
rpage = PageGetTempPage(oldpage);
GinInitPage(lpage, GinPageGetOpaque(oldpage)->flags, pageSize);
GinInitPage(rpage, GinPageGetOpaque(oldpage)->flags, pageSize);
- *prdata = rdata;
-
/*
* First construct a new list of PostingItems, which includes all the old
* items, and the new item.
/* set up right bound for right page */
*GinDataPageGetRightBound(rpage) = oldbound;
- data.separator = separator;
- data.nitem = nitems;
- data.rightbound = oldbound;
-
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &data;
- rdata[0].len = sizeof(ginxlogSplitDataInternal);
- rdata[0].next = &rdata[1];
-
- rdata[1].buffer = InvalidBuffer;
- rdata[1].data = (char *) allitems;
- rdata[1].len = nitems * sizeof(PostingItem);
- rdata[1].next = NULL;
-
*newlpage = lpage;
*newrpage = rpage;
}
if (RelationNeedsWAL(index))
{
XLogRecPtr recptr;
- XLogRecData rdata[2];
ginxlogCreatePostingTree data;
- data.node = index->rd_node;
- data.blkno = blkno;
data.size = rootsize;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &data;
- rdata[0].len = sizeof(ginxlogCreatePostingTree);
- rdata[0].next = &rdata[1];
+ XLogBeginInsert();
+ XLogRegisterData((char *) &data, sizeof(ginxlogCreatePostingTree));
- rdata[1].buffer = InvalidBuffer;
- rdata[1].data = (char *) GinDataLeafPageGetPostingList(page);
- rdata[1].len = rootsize;
- rdata[1].next = NULL;
+ XLogRegisterData((char *) GinDataLeafPageGetPostingList(page),
+ rootsize);
+ XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_PTREE, rdata);
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_PTREE);
PageSetLSN(page, recptr);
}
static void entrySplitPage(GinBtree btree, Buffer origbuf,
GinBtreeStack *stack,
void *insertPayload,
- BlockNumber updateblkno, XLogRecData **prdata,
+ BlockNumber updateblkno,
Page *newlpage, Page *newrpage);
/*
* On insertion to an internal node, in addition to inserting the given item,
* the downlink of the existing item at 'off' is updated to point to
* 'updateblkno'.
+ *
+ * On INSERTED, registers the buffer as buffer ID 0, with data.
+ * On SPLIT, returns rdata that represents the split pages in *prdata.
*/
static GinPlaceToPageRC
entryPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
void *insertPayload, BlockNumber updateblkno,
- XLogRecData **prdata, Page *newlpage, Page *newrpage)
+ Page *newlpage, Page *newrpage)
{
GinBtreeEntryInsertData *insertData = insertPayload;
Page page = BufferGetPage(buf);
OffsetNumber off = stack->off;
OffsetNumber placed;
- int cnt = 0;
- /* these must be static so they can be returned to caller */
- static XLogRecData rdata[3];
+ /* this must be static so it can be returned to caller. */
static ginxlogInsertEntry data;
/* quick exit if it doesn't fit */
if (!entryIsEnoughSpace(btree, buf, off, insertData))
{
entrySplitPage(btree, buf, stack, insertPayload, updateblkno,
- prdata, newlpage, newrpage);
+ newlpage, newrpage);
return SPLIT;
}
START_CRIT_SECTION();
- *prdata = rdata;
entryPreparePage(btree, page, off, insertData, updateblkno);
placed = PageAddItem(page,
elog(ERROR, "failed to add item to index page in \"%s\"",
RelationGetRelationName(btree->index));
- data.isDelete = insertData->isDelete;
- data.offset = off;
-
- rdata[cnt].buffer = buf;
- rdata[cnt].buffer_std = true;
- rdata[cnt].data = (char *) &data;
- rdata[cnt].len = offsetof(ginxlogInsertEntry, tuple);
- rdata[cnt].next = &rdata[cnt + 1];
- cnt++;
-
- rdata[cnt].buffer = buf;
- rdata[cnt].buffer_std = true;
- rdata[cnt].data = (char *) insertData->entry;
- rdata[cnt].len = IndexTupleSize(insertData->entry);
- rdata[cnt].next = NULL;
+ if (RelationNeedsWAL(btree->index))
+ {
+ data.isDelete = insertData->isDelete;
+ data.offset = off;
+
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+ XLogRegisterBufData(0, (char *) &data,
+ offsetof(ginxlogInsertEntry, tuple));
+ XLogRegisterBufData(0, (char *) insertData->entry,
+ IndexTupleSize(insertData->entry));
+ }
return INSERTED;
}
entrySplitPage(GinBtree btree, Buffer origbuf,
GinBtreeStack *stack,
void *insertPayload,
- BlockNumber updateblkno, XLogRecData **prdata,
+ BlockNumber updateblkno,
Page *newlpage, Page *newrpage)
{
GinBtreeEntryInsertData *insertData = insertPayload;
maxoff,
separator = InvalidOffsetNumber;
Size totalsize = 0;
- Size tupstoresize;
Size lsize = 0,
size;
char *ptr;
Page lpage = PageGetTempPageCopy(BufferGetPage(origbuf));
Page rpage = PageGetTempPageCopy(BufferGetPage(origbuf));
Size pageSize = PageGetPageSize(lpage);
+ char tupstore[2 * BLCKSZ];
- /* these must be static so they can be returned to caller */
- static XLogRecData rdata[2];
- static ginxlogSplitEntry data;
- static char tupstore[2 * BLCKSZ];
-
- *prdata = rdata;
entryPreparePage(btree, lpage, off, insertData, updateblkno);
/*
ptr += size;
totalsize += size + sizeof(ItemIdData);
}
- tupstoresize = ptr - tupstore;
/*
* Initialize the left and right pages, and copy all the tuples back to
ptr += MAXALIGN(IndexTupleSize(itup));
}
- data.separator = separator;
- data.nitem = maxoff;
-
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &data;
- rdata[0].len = sizeof(ginxlogSplitEntry);
- rdata[0].next = &rdata[1];
-
- rdata[1].buffer = InvalidBuffer;
- rdata[1].data = tupstore;
- rdata[1].len = tupstoresize;
- rdata[1].next = NULL;
-
*newlpage = lpage;
*newrpage = rpage;
}
if (RelationNeedsWAL(index))
{
- XLogRecData rdata[2];
ginxlogInsertListPage data;
XLogRecPtr recptr;
- data.node = index->rd_node;
- data.blkno = BufferGetBlockNumber(buffer);
data.rightlink = rightlink;
data.ntuples = ntuples;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &data;
- rdata[0].len = sizeof(ginxlogInsertListPage);
- rdata[0].next = rdata + 1;
+ XLogBeginInsert();
+ XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
- rdata[1].buffer = InvalidBuffer;
- rdata[1].data = workspace;
- rdata[1].len = size;
- rdata[1].next = NULL;
+ XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
+ XLogRegisterBufData(0, workspace, size);
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE, rdata);
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
PageSetLSN(page, recptr);
}
Buffer metabuffer;
Page metapage;
GinMetaPageData *metadata = NULL;
- XLogRecData rdata[2];
Buffer buffer = InvalidBuffer;
Page page = NULL;
ginxlogUpdateMeta data;
bool separateList = false;
bool needCleanup = false;
int cleanupSize;
+ bool needWal;
if (collector->ntuples == 0)
return;
+ needWal = RelationNeedsWAL(index);
+
data.node = index->rd_node;
data.ntuples = 0;
data.newRightlink = data.prevTail = InvalidBlockNumber;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &data;
- rdata[0].len = sizeof(ginxlogUpdateMeta);
- rdata[0].next = NULL;
-
metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
metapage = BufferGetPage(metabuffer);
memset(&sublist, 0, sizeof(GinMetaPageData));
makeSublist(index, collector->tuples, collector->ntuples, &sublist);
+ if (needWal)
+ XLogBeginInsert();
+
/*
* metapage was unlocked, see above
*/
LockBuffer(buffer, GIN_EXCLUSIVE);
page = BufferGetPage(buffer);
- rdata[0].next = rdata + 1;
-
- rdata[1].buffer = buffer;
- rdata[1].buffer_std = true;
- rdata[1].data = NULL;
- rdata[1].len = 0;
- rdata[1].next = NULL;
-
Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
START_CRIT_SECTION();
metadata->nPendingPages += sublist.nPendingPages;
metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
+
+ if (needWal)
+ XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
}
}
else
int i,
tupsize;
char *ptr;
+ char *collectordata;
buffer = ReadBuffer(index, metadata->tail);
LockBuffer(buffer, GIN_EXCLUSIVE);
off = (PageIsEmpty(page)) ? FirstOffsetNumber :
OffsetNumberNext(PageGetMaxOffsetNumber(page));
- rdata[0].next = rdata + 1;
-
- rdata[1].buffer = buffer;
- rdata[1].buffer_std = true;
- ptr = rdata[1].data = (char *) palloc(collector->sumsize);
- rdata[1].len = collector->sumsize;
- rdata[1].next = NULL;
+ collectordata = ptr = (char *) palloc(collector->sumsize);
data.ntuples = collector->ntuples;
+ if (needWal)
+ XLogBeginInsert();
+
START_CRIT_SECTION();
/*
off++;
}
- Assert((ptr - rdata[1].data) <= collector->sumsize);
+ Assert((ptr - collectordata) <= collector->sumsize);
+ if (needWal)
+ {
+ XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
+ XLogRegisterBufData(1, collectordata, collector->sumsize);
+ }
metadata->tailFreeSize = PageGetExactFreeSpace(page);
*/
MarkBufferDirty(metabuffer);
- if (RelationNeedsWAL(index))
+ if (needWal)
{
XLogRecPtr recptr;
memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE, rdata);
+ XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
+ XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
PageSetLSN(metapage, recptr);
if (buffer != InvalidBuffer)
int i;
int64 nDeletedHeapTuples = 0;
ginxlogDeleteListPages data;
- XLogRecData rdata[1];
Buffer buffers[GIN_NDELETE_AT_ONCE];
- data.node = index->rd_node;
-
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &data;
- rdata[0].len = sizeof(ginxlogDeleteListPages);
- rdata[0].next = NULL;
-
data.ndeleted = 0;
while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
{
- data.toDelete[data.ndeleted] = blknoToDelete;
buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
page = BufferGetPage(buffers[data.ndeleted]);
if (stats)
stats->pages_deleted += data.ndeleted;
+ /*
+ * This operation touches an unusually large number of pages, so
+ * prepare the XLogInsert machinery for that before entering the
+ * critical section.
+ */
+ XLogEnsureRecordSpace(data.ndeleted, 0);
+
START_CRIT_SECTION();
metadata->head = blknoToDelete;
{
XLogRecPtr recptr;
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
+ for (i = 0; i < data.ndeleted; i++)
+ XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
+
memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE, rdata);
+ XLogRegisterData((char *) &data,
+ sizeof(ginxlogDeleteListPages));
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
PageSetLSN(metapage, recptr);
for (i = 0; i < data.ndeleted; i++)
if (RelationNeedsWAL(index))
{
XLogRecPtr recptr;
- XLogRecData rdata;
Page page;
- rdata.buffer = InvalidBuffer;
- rdata.data = (char *) &(index->rd_node);
- rdata.len = sizeof(RelFileNode);
- rdata.next = NULL;
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, MetaBuffer, REGBUF_WILL_INIT);
+ XLogRegisterBuffer(1, RootBuffer, REGBUF_WILL_INIT);
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_INDEX, &rdata);
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_INDEX);
page = BufferGetPage(RootBuffer);
PageSetLSN(page, recptr);
{
XLogRecPtr recptr;
ginxlogUpdateMeta data;
- XLogRecData rdata;
data.node = index->rd_node;
data.ntuples = 0;
data.newRightlink = data.prevTail = InvalidBlockNumber;
memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
- rdata.buffer = InvalidBuffer;
- rdata.data = (char *) &data;
- rdata.len = sizeof(ginxlogUpdateMeta);
- rdata.next = NULL;
+ XLogBeginInsert();
+ XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
+ XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE, &rdata);
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
PageSetLSN(metapage, recptr);
}
{
Page page = BufferGetPage(buffer);
XLogRecPtr recptr;
- XLogRecData rdata[3];
- ginxlogVacuumPage xlrec;
- uint16 lower;
- uint16 upper;
/* This is only used for entry tree leaf pages. */
Assert(!GinPageIsData(page));
if (!RelationNeedsWAL(index))
return;
- xlrec.node = index->rd_node;
- xlrec.blkno = BufferGetBlockNumber(buffer);
-
- /* Assume we can omit data between pd_lower and pd_upper */
- lower = ((PageHeader) page)->pd_lower;
- upper = ((PageHeader) page)->pd_upper;
-
- Assert(lower < BLCKSZ);
- Assert(upper < BLCKSZ);
-
- if (lower >= SizeOfPageHeaderData &&
- upper > lower &&
- upper <= BLCKSZ)
- {
- xlrec.hole_offset = lower;
- xlrec.hole_length = upper - lower;
- }
- else
- {
- /* No "hole" to compress out */
- xlrec.hole_offset = 0;
- xlrec.hole_length = 0;
- }
-
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = sizeof(ginxlogVacuumPage);
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &rdata[1];
-
- if (xlrec.hole_length == 0)
- {
- rdata[1].data = (char *) page;
- rdata[1].len = BLCKSZ;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = NULL;
- }
- else
- {
- /* must skip the hole */
- rdata[1].data = (char *) page;
- rdata[1].len = xlrec.hole_offset;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = &rdata[2];
-
- rdata[2].data = (char *) page + (xlrec.hole_offset + xlrec.hole_length);
- rdata[2].len = BLCKSZ - (xlrec.hole_offset + xlrec.hole_length);
- rdata[2].buffer = InvalidBuffer;
- rdata[2].next = NULL;
- }
+ /*
+ * Always create a full image, we don't track the changes on the page at
+ * any more fine-grained level. This could obviously be improved...
+ */
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_PAGE, rdata);
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_PAGE);
PageSetLSN(page, recptr);
}
if (RelationNeedsWAL(gvs->index))
{
XLogRecPtr recptr;
- XLogRecData rdata[4];
ginxlogDeletePage data;
- data.node = gvs->index->rd_node;
- data.blkno = deleteBlkno;
- data.parentBlkno = parentBlkno;
+ /*
+ * We can't pass REGBUF_STANDARD for the deleted page, because we
+ * didn't set pd_lower on pre-9.4 versions. The page might've been
+ * binary-upgraded from an older version, and hence not have pd_lower
+ * set correctly. Ditto for the left page, but removing the item from
+ * the parent updated its pd_lower, so we know that's OK at this
+ * point.
+ */
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, dBuffer, 0);
+ XLogRegisterBuffer(1, pBuffer, REGBUF_STANDARD);
+ XLogRegisterBuffer(2, lBuffer, 0);
+
data.parentOffset = myoff;
- data.leftBlkno = leftBlkno;
data.rightLink = GinPageGetOpaque(page)->rightlink;
- /*
- * We can't pass buffer_std = TRUE, because we didn't set pd_lower on
- * pre-9.4 versions. The page might've been binary-upgraded from an
- * older version, and hence not have pd_lower set correctly. Ditto for
- * the left page, but removing the item from the parent updated its
- * pd_lower, so we know that's OK at this point.
- */
- rdata[0].buffer = dBuffer;
- rdata[0].buffer_std = FALSE;
- rdata[0].data = NULL;
- rdata[0].len = 0;
- rdata[0].next = rdata + 1;
-
- rdata[1].buffer = pBuffer;
- rdata[1].buffer_std = TRUE;
- rdata[1].data = NULL;
- rdata[1].len = 0;
- rdata[1].next = rdata + 2;
-
- rdata[2].buffer = lBuffer;
- rdata[2].buffer_std = FALSE;
- rdata[2].data = NULL;
- rdata[2].len = 0;
- rdata[2].next = rdata + 3;
-
- rdata[3].buffer = InvalidBuffer;
- rdata[3].buffer_std = FALSE;
- rdata[3].len = sizeof(ginxlogDeletePage);
- rdata[3].data = (char *) &data;
- rdata[3].next = NULL;
-
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_PAGE, rdata);
+ XLogRegisterData((char *) &data, sizeof(ginxlogDeletePage));
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_PAGE);
PageSetLSN(page, recptr);
PageSetLSN(parentPage, recptr);
PageSetLSN(BufferGetPage(lBuffer), recptr);
static MemoryContext opCtx; /* working memory for operations */
static void
-ginRedoClearIncompleteSplit(XLogRecPtr lsn, XLogRecord *record,
- int block_index,
- RelFileNode node, BlockNumber blkno)
+ginRedoClearIncompleteSplit(XLogReaderState *record, uint8 block_id)
{
+ XLogRecPtr lsn = record->EndRecPtr;
Buffer buffer;
Page page;
- if (XLogReadBufferForRedo(lsn, record, block_index, node, blkno, &buffer)
- == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, block_id, &buffer) == BLK_NEEDS_REDO)
{
page = (Page) BufferGetPage(buffer);
-
GinPageGetOpaque(page)->flags &= ~GIN_INCOMPLETE_SPLIT;
PageSetLSN(page, lsn);
}
static void
-ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
+ginRedoCreateIndex(XLogReaderState *record)
{
- RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
+ XLogRecPtr lsn = record->EndRecPtr;
Buffer RootBuffer,
MetaBuffer;
Page page;
- /* Backup blocks are not used in create_index records */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
-
- MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true);
- Assert(BufferIsValid(MetaBuffer));
+ MetaBuffer = XLogInitBufferForRedo(record, 0);
+ Assert(BufferGetBlockNumber(MetaBuffer) == GIN_METAPAGE_BLKNO);
page = (Page) BufferGetPage(MetaBuffer);
GinInitMetabuffer(MetaBuffer);
PageSetLSN(page, lsn);
MarkBufferDirty(MetaBuffer);
- RootBuffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true);
- Assert(BufferIsValid(RootBuffer));
+ RootBuffer = XLogInitBufferForRedo(record, 1);
+ Assert(BufferGetBlockNumber(RootBuffer) == GIN_ROOT_BLKNO);
page = (Page) BufferGetPage(RootBuffer);
GinInitBuffer(RootBuffer, GIN_LEAF);
}
static void
-ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
+ginRedoCreatePTree(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
char *ptr;
Buffer buffer;
Page page;
- /* Backup blocks are not used in create_ptree records */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
-
- buffer = XLogReadBuffer(data->node, data->blkno, true);
- Assert(BufferIsValid(buffer));
+ buffer = XLogInitBufferForRedo(record, 0);
page = (Page) BufferGetPage(buffer);
GinInitBuffer(buffer, GIN_DATA | GIN_LEAF | GIN_COMPRESSED);
}
static void
-ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
+ginRedoInsert(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
Buffer buffer;
- char *payload;
+#ifdef NOT_USED
BlockNumber leftChildBlkno = InvalidBlockNumber;
+#endif
BlockNumber rightChildBlkno = InvalidBlockNumber;
bool isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
- payload = XLogRecGetData(record) + sizeof(ginxlogInsert);
-
/*
* First clear incomplete-split flag on child page if this finishes a
* split.
*/
if (!isLeaf)
{
+ char *payload = XLogRecGetData(record) + sizeof(ginxlogInsert);
+
+#ifdef NOT_USED
leftChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
+#endif
payload += sizeof(BlockIdData);
rightChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
payload += sizeof(BlockIdData);
- ginRedoClearIncompleteSplit(lsn, record, 0, data->node, leftChildBlkno);
+ ginRedoClearIncompleteSplit(record, 1);
}
- if (XLogReadBufferForRedo(lsn, record, isLeaf ? 0 : 1, data->node,
- data->blkno, &buffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
Page page = BufferGetPage(buffer);
+ Size len;
+ char *payload = XLogRecGetBlockData(record, 0, &len);
/* How to insert the payload is tree-type specific */
if (data->flags & GIN_INSERT_ISDATA)
}
static void
-ginRedoSplitEntry(Page lpage, Page rpage, void *rdata)
-{
- ginxlogSplitEntry *data = (ginxlogSplitEntry *) rdata;
- IndexTuple itup = (IndexTuple) ((char *) rdata + sizeof(ginxlogSplitEntry));
- OffsetNumber i;
-
- for (i = 0; i < data->separator; i++)
- {
- if (PageAddItem(lpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
- elog(ERROR, "failed to add item to gin index page");
- itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
- }
-
- for (i = data->separator; i < data->nitem; i++)
- {
- if (PageAddItem(rpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
- elog(ERROR, "failed to add item to gin index page");
- itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
- }
-}
-
-static void
-ginRedoSplitData(Page lpage, Page rpage, void *rdata)
-{
- bool isleaf = GinPageIsLeaf(lpage);
-
- if (isleaf)
- {
- ginxlogSplitDataLeaf *data = (ginxlogSplitDataLeaf *) rdata;
- Pointer lptr = (Pointer) rdata + sizeof(ginxlogSplitDataLeaf);
- Pointer rptr = lptr + data->lsize;
-
- Assert(data->lsize > 0 && data->lsize <= GinDataPageMaxDataSize);
- Assert(data->rsize > 0 && data->rsize <= GinDataPageMaxDataSize);
-
- memcpy(GinDataLeafPageGetPostingList(lpage), lptr, data->lsize);
- memcpy(GinDataLeafPageGetPostingList(rpage), rptr, data->rsize);
-
- GinDataPageSetDataSize(lpage, data->lsize);
- GinDataPageSetDataSize(rpage, data->rsize);
- *GinDataPageGetRightBound(lpage) = data->lrightbound;
- *GinDataPageGetRightBound(rpage) = data->rrightbound;
- }
- else
- {
- ginxlogSplitDataInternal *data = (ginxlogSplitDataInternal *) rdata;
- PostingItem *items = (PostingItem *) ((char *) rdata + sizeof(ginxlogSplitDataInternal));
- OffsetNumber i;
- OffsetNumber maxoff;
-
- for (i = 0; i < data->separator; i++)
- GinDataPageAddPostingItem(lpage, &items[i], InvalidOffsetNumber);
- for (i = data->separator; i < data->nitem; i++)
- GinDataPageAddPostingItem(rpage, &items[i], InvalidOffsetNumber);
-
- /* set up right key */
- maxoff = GinPageGetOpaque(lpage)->maxoff;
- *GinDataPageGetRightBound(lpage) = GinDataPageGetPostingItem(lpage, maxoff)->key;
- *GinDataPageGetRightBound(rpage) = data->rightbound;
- }
-}
-
-static void
-ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
+ginRedoSplit(XLogReaderState *record)
{
ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record);
Buffer lbuffer,
- rbuffer;
- Page lpage,
- rpage;
- uint32 flags;
- uint32 lflags,
- rflags;
- char *payload;
+ rbuffer,
+ rootbuf;
bool isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
- bool isData = (data->flags & GIN_INSERT_ISDATA) != 0;
bool isRoot = (data->flags & GIN_SPLIT_ROOT) != 0;
- payload = XLogRecGetData(record) + sizeof(ginxlogSplit);
-
/*
* First clear incomplete-split flag on child page if this finishes a
* split
*/
if (!isLeaf)
- ginRedoClearIncompleteSplit(lsn, record, 0, data->node, data->leftChildBlkno);
-
- flags = 0;
- if (isLeaf)
- flags |= GIN_LEAF;
- if (isData)
- flags |= GIN_DATA;
- if (isLeaf && isData)
- flags |= GIN_COMPRESSED;
-
- lflags = rflags = flags;
- if (!isRoot)
- lflags |= GIN_INCOMPLETE_SPLIT;
-
- lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
- Assert(BufferIsValid(lbuffer));
- lpage = (Page) BufferGetPage(lbuffer);
- GinInitBuffer(lbuffer, lflags);
-
- rbuffer = XLogReadBuffer(data->node, data->rblkno, true);
- Assert(BufferIsValid(rbuffer));
- rpage = (Page) BufferGetPage(rbuffer);
- GinInitBuffer(rbuffer, rflags);
-
- GinPageGetOpaque(lpage)->rightlink = BufferGetBlockNumber(rbuffer);
- GinPageGetOpaque(rpage)->rightlink = isRoot ? InvalidBlockNumber : data->rrlink;
-
- /* Do the tree-type specific portion to restore the page contents */
- if (isData)
- ginRedoSplitData(lpage, rpage, payload);
- else
- ginRedoSplitEntry(lpage, rpage, payload);
+ ginRedoClearIncompleteSplit(record, 3);
- PageSetLSN(rpage, lsn);
- MarkBufferDirty(rbuffer);
+ if (XLogReadBufferForRedo(record, 0, &lbuffer) != BLK_RESTORED)
+ elog(ERROR, "GIN split record did not contain a full-page image of left page");
- PageSetLSN(lpage, lsn);
- MarkBufferDirty(lbuffer);
+ if (XLogReadBufferForRedo(record, 1, &rbuffer) != BLK_RESTORED)
+ elog(ERROR, "GIN split record did not contain a full-page image of right page");
if (isRoot)
{
- BlockNumber rootBlkno = data->rrlink;
- Buffer rootBuf = XLogReadBuffer(data->node, rootBlkno, true);
- Page rootPage = BufferGetPage(rootBuf);
-
- GinInitBuffer(rootBuf, flags & ~GIN_LEAF & ~GIN_COMPRESSED);
-
- if (isData)
- {
- Assert(rootBlkno != GIN_ROOT_BLKNO);
- ginDataFillRoot(NULL, BufferGetPage(rootBuf),
- BufferGetBlockNumber(lbuffer),
- BufferGetPage(lbuffer),
- BufferGetBlockNumber(rbuffer),
- BufferGetPage(rbuffer));
- }
- else
- {
- Assert(rootBlkno == GIN_ROOT_BLKNO);
- ginEntryFillRoot(NULL, BufferGetPage(rootBuf),
- BufferGetBlockNumber(lbuffer),
- BufferGetPage(lbuffer),
- BufferGetBlockNumber(rbuffer),
- BufferGetPage(rbuffer));
- }
-
- PageSetLSN(rootPage, lsn);
-
- MarkBufferDirty(rootBuf);
- UnlockReleaseBuffer(rootBuf);
+ if (XLogReadBufferForRedo(record, 2, &rootbuf) != BLK_RESTORED)
+ elog(ERROR, "GIN split record did not contain a full-page image of root page");
+ UnlockReleaseBuffer(rootbuf);
}
UnlockReleaseBuffer(rbuffer);
* a XLOG_FPI record.
*/
static void
-ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
+ginRedoVacuumPage(XLogReaderState *record)
{
- ginxlogVacuumPage *xlrec = (ginxlogVacuumPage *) XLogRecGetData(record);
- char *blk = ((char *) xlrec) + sizeof(ginxlogVacuumPage);
Buffer buffer;
- Page page;
-
- Assert(xlrec->hole_offset < BLCKSZ);
- Assert(xlrec->hole_length < BLCKSZ);
-
- /* Backup blocks are not used, we'll re-initialize the page always. */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
- buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, true);
- if (!BufferIsValid(buffer))
- return;
- page = (Page) BufferGetPage(buffer);
-
- if (xlrec->hole_length == 0)
+ if (XLogReadBufferForRedo(record, 0, &buffer) != BLK_RESTORED)
{
- memcpy((char *) page, blk, BLCKSZ);
+ elog(ERROR, "replay of gin entry tree page vacuum did not restore the page");
}
- else
- {
- memcpy((char *) page, blk, xlrec->hole_offset);
- /* must zero-fill the hole */
- MemSet((char *) page + xlrec->hole_offset, 0, xlrec->hole_length);
- memcpy((char *) page + (xlrec->hole_offset + xlrec->hole_length),
- blk + xlrec->hole_offset,
- BLCKSZ - (xlrec->hole_offset + xlrec->hole_length));
- }
-
- PageSetLSN(page, lsn);
-
- MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
static void
-ginRedoVacuumDataLeafPage(XLogRecPtr lsn, XLogRecord *record)
+ginRedoVacuumDataLeafPage(XLogReaderState *record)
{
- ginxlogVacuumDataLeafPage *xlrec = (ginxlogVacuumDataLeafPage *) XLogRecGetData(record);
+ XLogRecPtr lsn = record->EndRecPtr;
Buffer buffer;
- if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, xlrec->blkno,
- &buffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
Page page = BufferGetPage(buffer);
+ Size len;
+ ginxlogVacuumDataLeafPage *xlrec;
+
+ xlrec = (ginxlogVacuumDataLeafPage *) XLogRecGetBlockData(record, 0, &len);
Assert(GinPageIsLeaf(page));
Assert(GinPageIsData(page));
}
static void
-ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
+ginRedoDeletePage(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
Buffer dbuffer;
Buffer pbuffer;
Buffer lbuffer;
Page page;
- if (XLogReadBufferForRedo(lsn, record, 0, data->node, data->blkno, &dbuffer)
- == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 0, &dbuffer) == BLK_NEEDS_REDO)
{
page = BufferGetPage(dbuffer);
-
Assert(GinPageIsData(page));
GinPageGetOpaque(page)->flags = GIN_DELETED;
PageSetLSN(page, lsn);
MarkBufferDirty(dbuffer);
}
- if (XLogReadBufferForRedo(lsn, record, 1, data->node, data->parentBlkno,
- &pbuffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 1, &pbuffer) == BLK_NEEDS_REDO)
{
page = BufferGetPage(pbuffer);
-
Assert(GinPageIsData(page));
Assert(!GinPageIsLeaf(page));
GinPageDeletePostingItem(page, data->parentOffset);
MarkBufferDirty(pbuffer);
}
- if (XLogReadBufferForRedo(lsn, record, 2, data->node, data->leftBlkno,
- &lbuffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 2, &lbuffer) == BLK_NEEDS_REDO)
{
page = BufferGetPage(lbuffer);
-
Assert(GinPageIsData(page));
GinPageGetOpaque(page)->rightlink = data->rightLink;
PageSetLSN(page, lsn);
}
static void
-ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
+ginRedoUpdateMetapage(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
ginxlogUpdateMeta *data = (ginxlogUpdateMeta *) XLogRecGetData(record);
Buffer metabuffer;
Page metapage;
* image, so restore the metapage unconditionally without looking at the
* LSN, to avoid torn page hazards.
*/
- metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
- if (!BufferIsValid(metabuffer))
- return; /* assume index was deleted, nothing to do */
+ metabuffer = XLogInitBufferForRedo(record, 0);
+ Assert(BufferGetBlockNumber(metabuffer) == GIN_METAPAGE_BLKNO);
metapage = BufferGetPage(metabuffer);
memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
/*
* insert into tail page
*/
- if (XLogReadBufferForRedo(lsn, record, 0, data->node,
- data->metadata.tail, &buffer)
- == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
{
Page page = BufferGetPage(buffer);
OffsetNumber off;
int i;
Size tupsize;
+ char *payload;
IndexTuple tuples;
+ Size totaltupsize;
- tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta));
+ payload = XLogRecGetBlockData(record, 1, &totaltupsize);
+ tuples = (IndexTuple) payload;
if (PageIsEmpty(page))
off = FirstOffsetNumber;
off++;
}
+ Assert(payload + totaltupsize == (char *) tuples);
/*
* Increase counter of heap tuples
/*
* New tail
*/
- if (XLogReadBufferForRedo(lsn, record, 0, data->node, data->prevTail,
- &buffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
{
Page page = BufferGetPage(buffer);
}
static void
-ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record)
+ginRedoInsertListPage(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
ginxlogInsertListPage *data = (ginxlogInsertListPage *) XLogRecGetData(record);
Buffer buffer;
Page page;
off = FirstOffsetNumber;
int i,
tupsize;
- IndexTuple tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsertListPage));
-
- /*
- * Backup blocks are not used, we always re-initialize the page.
- */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+ char *payload;
+ IndexTuple tuples;
+ Size totaltupsize;
- buffer = XLogReadBuffer(data->node, data->blkno, true);
- Assert(BufferIsValid(buffer));
+ /* We always re-initialize the page. */
+ buffer = XLogInitBufferForRedo(record, 0);
page = BufferGetPage(buffer);
GinInitBuffer(buffer, GIN_LIST);
GinPageGetOpaque(page)->maxoff = 0;
}
+ payload = XLogRecGetBlockData(record, 0, &totaltupsize);
+
+ tuples = (IndexTuple) payload;
for (i = 0; i < data->ntuples; i++)
{
tupsize = IndexTupleSize(tuples);
tuples = (IndexTuple) (((char *) tuples) + tupsize);
off++;
}
+ Assert((char *) tuples == payload + totaltupsize);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
static void
-ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
+ginRedoDeleteListPages(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
ginxlogDeleteListPages *data = (ginxlogDeleteListPages *) XLogRecGetData(record);
Buffer metabuffer;
Page metapage;
int i;
- /* Backup blocks are not used in delete_listpage records */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
-
- metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
- if (!BufferIsValid(metabuffer))
- return; /* assume index was deleted, nothing to do */
+ metabuffer = XLogInitBufferForRedo(record, 0);
+ Assert(BufferGetBlockNumber(metabuffer) == GIN_METAPAGE_BLKNO);
metapage = BufferGetPage(metabuffer);
+ GinInitPage(metapage, GIN_META, BufferGetPageSize(metabuffer));
+
memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
PageSetLSN(metapage, lsn);
MarkBufferDirty(metabuffer);
Buffer buffer;
Page page;
- buffer = XLogReadBuffer(data->node, data->toDelete[i], true);
+ buffer = XLogInitBufferForRedo(record, i + 1);
page = BufferGetPage(buffer);
GinInitBuffer(buffer, GIN_DELETED);
}
void
-gin_redo(XLogRecPtr lsn, XLogRecord *record)
+gin_redo(XLogReaderState *record)
{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
MemoryContext oldCtx;
/*
switch (info)
{
case XLOG_GIN_CREATE_INDEX:
- ginRedoCreateIndex(lsn, record);
+ ginRedoCreateIndex(record);
break;
case XLOG_GIN_CREATE_PTREE:
- ginRedoCreatePTree(lsn, record);
+ ginRedoCreatePTree(record);
break;
case XLOG_GIN_INSERT:
- ginRedoInsert(lsn, record);
+ ginRedoInsert(record);
break;
case XLOG_GIN_SPLIT:
- ginRedoSplit(lsn, record);
+ ginRedoSplit(record);
break;
case XLOG_GIN_VACUUM_PAGE:
- ginRedoVacuumPage(lsn, record);
+ ginRedoVacuumPage(record);
break;
case XLOG_GIN_VACUUM_DATA_LEAF_PAGE:
- ginRedoVacuumDataLeafPage(lsn, record);
+ ginRedoVacuumDataLeafPage(record);
break;
case XLOG_GIN_DELETE_PAGE:
- ginRedoDeletePage(lsn, record);
+ ginRedoDeletePage(record);
break;
case XLOG_GIN_UPDATE_META_PAGE:
- ginRedoUpdateMetapage(lsn, record);
+ ginRedoUpdateMetapage(record);
break;
case XLOG_GIN_INSERT_LISTPAGE:
- ginRedoInsertListPage(lsn, record);
+ ginRedoInsertListPage(record);
break;
case XLOG_GIN_DELETE_LISTPAGE:
- ginRedoDeleteListPages(lsn, record);
+ ginRedoDeleteListPages(record);
break;
default:
elog(PANIC, "gin_redo: unknown op code %u", info);
#include "access/genam.h"
#include "access/gist_private.h"
+#include "access/xloginsert.h"
#include "catalog/index.h"
#include "catalog/pg_collation.h"
#include "miscadmin.h"
GistPageSetNSN(ptr->page, oldnsn);
}
+ /*
+ * gistXLogSplit() needs to WAL log a lot of pages, prepare WAL
+ * insertion for that. NB: The number of pages and data segments
+ * specified here must match the calculations in gistXLogSplit()!
+ */
+ if (RelationNeedsWAL(rel))
+ XLogEnsureRecordSpace(npage, 1 + npage * 2);
+
START_CRIT_SECTION();
/*
if (RelationNeedsWAL(index))
{
XLogRecPtr recptr;
- XLogRecData rdata;
- rdata.data = (char *) &(index->rd_node);
- rdata.len = sizeof(RelFileNode);
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata);
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX);
PageSetLSN(page, recptr);
}
else
#include "access/xlogutils.h"
#include "utils/memutils.h"
-typedef struct
-{
- gistxlogPage *header;
- IndexTuple *itup;
-} NewPage;
-
-typedef struct
-{
- gistxlogPageSplit *data;
- NewPage *page;
-} PageSplitRecord;
-
static MemoryContext opCtx; /* working memory for operations */
/*
* action.)
*/
static void
-gistRedoClearFollowRight(XLogRecPtr lsn, XLogRecord *record, int block_index,
- RelFileNode node, BlockNumber childblkno)
+gistRedoClearFollowRight(XLogReaderState *record, uint8 block_id)
{
+ XLogRecPtr lsn = record->EndRecPtr;
Buffer buffer;
Page page;
XLogRedoAction action;
* Note that we still update the page even if it was restored from a full
* page image, because the updated NSN is not included in the image.
*/
- action = XLogReadBufferForRedo(lsn, record, block_index, node, childblkno,
- &buffer);
+ action = XLogReadBufferForRedo(record, block_id, &buffer);
if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
{
page = BufferGetPage(buffer);
* redo any page update (except page split)
*/
static void
-gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
+gistRedoPageUpdateRecord(XLogReaderState *record)
{
- char *begin = XLogRecGetData(record);
- gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin;
+ XLogRecPtr lsn = record->EndRecPtr;
+ gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record);
Buffer buffer;
Page page;
- char *data;
- if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno,
- &buffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
- page = (Page) BufferGetPage(buffer);
+ char *begin;
+ char *data;
+ Size datalen;
+ int ninserted = 0;
- data = begin + sizeof(gistxlogPageUpdate);
+ data = begin = XLogRecGetBlockData(record, 0, &datalen);
+
+ page = (Page) BufferGetPage(buffer);
/* Delete old tuples */
if (xldata->ntodelete > 0)
}
/* add tuples */
- if (data - begin < record->xl_len)
+ if (data - begin < datalen)
{
OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
OffsetNumberNext(PageGetMaxOffsetNumber(page));
- while (data - begin < record->xl_len)
+ while (data - begin < datalen)
{
IndexTuple itup = (IndexTuple) data;
Size sz = IndexTupleSize(itup);
elog(ERROR, "failed to add item to GiST index page, size %d bytes",
(int) sz);
off++;
+ ninserted++;
}
}
+ Assert(ninserted == xldata->ntoinsert);
+
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
* that even if the target page no longer exists, we still attempt to
* replay the change on the child page.
*/
- if (BlockNumberIsValid(xldata->leftchild))
- gistRedoClearFollowRight(lsn, record, 1,
- xldata->node, xldata->leftchild);
+ if (XLogRecHasBlockRef(record, 1))
+ gistRedoClearFollowRight(record, 1);
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
-static void
-decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
+/*
+ * Returns an array of index pointers.
+ */
+static IndexTuple *
+decodePageSplitRecord(char *begin, int len, int *n)
{
- char *begin = XLogRecGetData(record),
- *ptr;
- int j,
- i = 0;
+ char *ptr;
+ int i = 0;
+ IndexTuple *tuples;
+
+ /* extract the number of tuples */
+ memcpy(n, begin, sizeof(int));
+ ptr = begin + sizeof(int);
- decoded->data = (gistxlogPageSplit *) begin;
- decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage);
+ tuples = palloc(*n * sizeof(IndexTuple));
- ptr = begin + sizeof(gistxlogPageSplit);
- for (i = 0; i < decoded->data->npage; i++)
+ for (i = 0; i < *n; i++)
{
- Assert(ptr - begin < record->xl_len);
- decoded->page[i].header = (gistxlogPage *) ptr;
- ptr += sizeof(gistxlogPage);
-
- decoded->page[i].itup = (IndexTuple *)
- palloc(sizeof(IndexTuple) * decoded->page[i].header->num);
- j = 0;
- while (j < decoded->page[i].header->num)
- {
- Assert(ptr - begin < record->xl_len);
- decoded->page[i].itup[j] = (IndexTuple) ptr;
- ptr += IndexTupleSize((IndexTuple) ptr);
- j++;
- }
+ Assert(ptr - begin < len);
+ tuples[i] = (IndexTuple) ptr;
+ ptr += IndexTupleSize((IndexTuple) ptr);
}
+ Assert(ptr - begin == len);
+
+ return tuples;
}
static void
-gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
+gistRedoPageSplitRecord(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
- PageSplitRecord xlrec;
Buffer firstbuffer = InvalidBuffer;
Buffer buffer;
Page page;
int i;
bool isrootsplit = false;
- decodePageSplitRecord(&xlrec, record);
-
/*
* We must hold lock on the first-listed page throughout the action,
* including while updating the left child page (if any). We can unlock
*/
/* loop around all pages */
- for (i = 0; i < xlrec.data->npage; i++)
+ for (i = 0; i < xldata->npage; i++)
{
- NewPage *newpage = xlrec.page + i;
int flags;
-
- if (newpage->header->blkno == GIST_ROOT_BLKNO)
+ char *data;
+ Size datalen;
+ int num;
+ BlockNumber blkno;
+ IndexTuple *tuples;
+
+ XLogRecGetBlockTag(record, i + 1, NULL, NULL, &blkno);
+ if (blkno == GIST_ROOT_BLKNO)
{
Assert(i == 0);
isrootsplit = true;
}
- buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
- Assert(BufferIsValid(buffer));
+ buffer = XLogInitBufferForRedo(record, i + 1);
page = (Page) BufferGetPage(buffer);
+ data = XLogRecGetBlockData(record, i + 1, &datalen);
+
+ tuples = decodePageSplitRecord(data, datalen, &num);
/* ok, clear buffer */
- if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO)
+ if (xldata->origleaf && blkno != GIST_ROOT_BLKNO)
flags = F_LEAF;
else
flags = 0;
GISTInitBuffer(buffer, flags);
/* and fill it */
- gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
+ gistfillbuffer(page, tuples, num, FirstOffsetNumber);
- if (newpage->header->blkno == GIST_ROOT_BLKNO)
+ if (blkno == GIST_ROOT_BLKNO)
{
GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
GistPageSetNSN(page, xldata->orignsn);
}
else
{
- if (i < xlrec.data->npage - 1)
- GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno;
+ if (i < xldata->npage - 1)
+ {
+ BlockNumber nextblkno;
+
+ XLogRecGetBlockTag(record, i + 2, NULL, NULL, &nextblkno);
+ GistPageGetOpaque(page)->rightlink = nextblkno;
+ }
else
GistPageGetOpaque(page)->rightlink = xldata->origrlink;
GistPageSetNSN(page, xldata->orignsn);
- if (i < xlrec.data->npage - 1 && !isrootsplit &&
+ if (i < xldata->npage - 1 && !isrootsplit &&
xldata->markfollowright)
GistMarkFollowRight(page);
else
}
/* Fix follow-right data on left child page, if any */
- if (BlockNumberIsValid(xldata->leftchild))
- gistRedoClearFollowRight(lsn, record, 0,
- xldata->node, xldata->leftchild);
+ if (XLogRecHasBlockRef(record, 0))
+ gistRedoClearFollowRight(record, 0);
/* Finally, release lock on the first page */
UnlockReleaseBuffer(firstbuffer);
}
static void
-gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
+gistRedoCreateIndex(XLogReaderState *record)
{
- RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
+ XLogRecPtr lsn = record->EndRecPtr;
Buffer buffer;
Page page;
- /* Backup blocks are not used in create_index records */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
-
- buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
- Assert(BufferIsValid(buffer));
+ buffer = XLogInitBufferForRedo(record, 0);
+ Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
page = (Page) BufferGetPage(buffer);
GISTInitBuffer(buffer, F_LEAF);
}
void
-gist_redo(XLogRecPtr lsn, XLogRecord *record)
+gist_redo(XLogReaderState *record)
{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
MemoryContext oldCxt;
/*
switch (info)
{
case XLOG_GIST_PAGE_UPDATE:
- gistRedoPageUpdateRecord(lsn, record);
+ gistRedoPageUpdateRecord(record);
break;
case XLOG_GIST_PAGE_SPLIT:
- gistRedoPageSplitRecord(lsn, record);
+ gistRedoPageSplitRecord(record);
break;
case XLOG_GIST_CREATE_INDEX:
- gistRedoCreateIndex(lsn, record);
+ gistRedoCreateIndex(record);
break;
default:
elog(PANIC, "gist_redo: unknown op code %u", info);
BlockNumber origrlink, GistNSN orignsn,
Buffer leftchildbuf, bool markfollowright)
{
- XLogRecData rdata[GIST_MAX_SPLIT_PAGES * 2 + 2];
gistxlogPageSplit xlrec;
SplitedPageLayout *ptr;
- int npage = 0,
- cur;
+ int npage = 0;
XLogRecPtr recptr;
+ int i;
for (ptr = dist; ptr; ptr = ptr->next)
npage++;
- /*
- * the caller should've checked this already, but doesn't hurt to check
- * again.
- */
- if (npage > GIST_MAX_SPLIT_PAGES)
- elog(ERROR, "GiST page split into too many halves");
-
- xlrec.node = node;
- xlrec.origblkno = blkno;
xlrec.origrlink = origrlink;
xlrec.orignsn = orignsn;
xlrec.origleaf = page_is_leaf;
xlrec.npage = (uint16) npage;
- xlrec.leftchild =
- BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
xlrec.markfollowright = markfollowright;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = sizeof(gistxlogPageSplit);
- rdata[0].buffer = InvalidBuffer;
-
- cur = 1;
+ XLogBeginInsert();
/*
* Include a full page image of the child buf. (only necessary if a
* checkpoint happened since the child page was split)
*/
if (BufferIsValid(leftchildbuf))
- {
- rdata[cur - 1].next = &(rdata[cur]);
- rdata[cur].data = NULL;
- rdata[cur].len = 0;
- rdata[cur].buffer = leftchildbuf;
- rdata[cur].buffer_std = true;
- cur++;
- }
+ XLogRegisterBuffer(0, leftchildbuf, REGBUF_STANDARD);
+ /*
+ * NOTE: We register a lot of data. The caller must've called
+ * XLogEnsureRecordSpace() to prepare for that. We cannot do it here,
+ * because we're already in a critical section. If you change the number
+ * of buffer or data registrations here, make sure you modify the
+ * XLogEnsureRecordSpace() calls accordingly!
+ */
+ XLogRegisterData((char *) &xlrec, sizeof(gistxlogPageSplit));
+
+ i = 1;
for (ptr = dist; ptr; ptr = ptr->next)
{
- rdata[cur - 1].next = &(rdata[cur]);
- rdata[cur].buffer = InvalidBuffer;
- rdata[cur].data = (char *) &(ptr->block);
- rdata[cur].len = sizeof(gistxlogPage);
- cur++;
-
- rdata[cur - 1].next = &(rdata[cur]);
- rdata[cur].buffer = InvalidBuffer;
- rdata[cur].data = (char *) (ptr->list);
- rdata[cur].len = ptr->lenlist;
- cur++;
+ XLogRegisterBuffer(i, ptr->buffer, REGBUF_WILL_INIT);
+ XLogRegisterBufData(i, (char *) &(ptr->block.num), sizeof(int));
+ XLogRegisterBufData(i, (char *) ptr->list, ptr->lenlist);
+ i++;
}
- rdata[cur - 1].next = NULL;
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT);
return recptr;
}
*
* Note that both the todelete array and the tuples are marked as belonging
* to the target buffer; they need not be stored in XLOG if XLogInsert decides
- * to log the whole buffer contents instead. Also, we take care that there's
- * at least one rdata item referencing the buffer, even when ntodelete and
- * ituplen are both zero; this ensures that XLogInsert knows about the buffer.
+ * to log the whole buffer contents instead.
*/
XLogRecPtr
gistXLogUpdate(RelFileNode node, Buffer buffer,
IndexTuple *itup, int ituplen,
Buffer leftchildbuf)
{
- XLogRecData rdata[MaxIndexTuplesPerPage + 3];
gistxlogPageUpdate xlrec;
- int cur,
- i;
+ int i;
XLogRecPtr recptr;
- xlrec.node = node;
- xlrec.blkno = BufferGetBlockNumber(buffer);
xlrec.ntodelete = ntodelete;
- xlrec.leftchild =
- BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
-
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = sizeof(gistxlogPageUpdate);
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ xlrec.ntoinsert = ituplen;
- rdata[1].data = (char *) todelete;
- rdata[1].len = sizeof(OffsetNumber) * ntodelete;
- rdata[1].buffer = buffer;
- rdata[1].buffer_std = true;
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, sizeof(gistxlogPageUpdate));
- cur = 2;
+ XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+ XLogRegisterBufData(0, (char *) todelete, sizeof(OffsetNumber) * ntodelete);
/* new tuples */
for (i = 0; i < ituplen; i++)
- {
- rdata[cur - 1].next = &(rdata[cur]);
- rdata[cur].data = (char *) (itup[i]);
- rdata[cur].len = IndexTupleSize(itup[i]);
- rdata[cur].buffer = buffer;
- rdata[cur].buffer_std = true;
- cur++;
- }
+ XLogRegisterBufData(0, (char *) (itup[i]), IndexTupleSize(itup[i]));
/*
* Include a full page image of the child buf. (only necessary if a
* checkpoint happened since the child page was split)
*/
if (BufferIsValid(leftchildbuf))
- {
- rdata[cur - 1].next = &(rdata[cur]);
- rdata[cur].data = NULL;
- rdata[cur].len = 0;
- rdata[cur].buffer = leftchildbuf;
- rdata[cur].buffer_std = true;
- cur++;
- }
- rdata[cur - 1].next = NULL;
+ XLogRegisterBuffer(1, leftchildbuf, REGBUF_STANDARD);
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE);
return recptr;
}
void
-hash_redo(XLogRecPtr lsn, XLogRecord *record)
+hash_redo(XLogReaderState *record)
{
elog(PANIC, "hash_redo: unimplemented");
}
xl_heap_insert xlrec;
xl_heap_header xlhdr;
XLogRecPtr recptr;
- XLogRecData rdata[4];
Page page = BufferGetPage(buffer);
uint8 info = XLOG_HEAP_INSERT;
- bool need_tuple_data;
+ int bufflags = 0;
/*
- * For logical decoding, we need the tuple even if we're doing a full
- * page write, so make sure to log it separately. (XXX We could
- * alternatively store a pointer into the FPW).
- *
- * Also, if this is a catalog, we need to transmit combocids to
- * properly decode, so log that as well.
+ * If this is a catalog, we need to transmit combocids to properly
+ * decode, so log that as well.
*/
- need_tuple_data = RelationIsLogicallyLogged(relation);
if (RelationIsAccessibleInLogicalDecoding(relation))
log_heap_new_cid(relation, heaptup);
- xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
- xlrec.target.node = relation->rd_node;
- xlrec.target.tid = heaptup->t_self;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapInsert;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
-
- xlhdr.t_infomask2 = heaptup->t_data->t_infomask2;
- xlhdr.t_infomask = heaptup->t_data->t_infomask;
- xlhdr.t_hoff = heaptup->t_data->t_hoff;
-
/*
- * note we mark rdata[1] as belonging to buffer; if XLogInsert decides
- * to write the whole page to the xlog, we don't need to store
- * xl_heap_header in the xlog.
+ * If this is the single and first tuple on page, we can reinit the
+ * page instead of restoring the whole thing. Set flag, and hide
+ * buffer references from XLogInsert.
*/
- rdata[1].data = (char *) &xlhdr;
- rdata[1].len = SizeOfHeapHeader;
- rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer;
- rdata[1].buffer_std = true;
- rdata[1].next = &(rdata[2]);
+ if (ItemPointerGetOffsetNumber(&(heaptup->t_self)) == FirstOffsetNumber &&
+ PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
+ {
+ info |= XLOG_HEAP_INIT_PAGE;
+ bufflags |= REGBUF_WILL_INIT;
+ }
- /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
- rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits);
- rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits);
- rdata[2].buffer = need_tuple_data ? InvalidBuffer : buffer;
- rdata[2].buffer_std = true;
- rdata[2].next = NULL;
+ xlrec.offnum = ItemPointerGetOffsetNumber(&heaptup->t_self);
+ xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
+ Assert(ItemPointerGetBlockNumber(&heaptup->t_self) == BufferGetBlockNumber(buffer));
/*
- * Make a separate rdata entry for the tuple's buffer if we're doing
- * logical decoding, so that an eventual FPW doesn't remove the
- * tuple's data.
+ * For logical decoding, we need the tuple even if we're doing a full
+ * page write, so make sure it's included even if we take a full-page
+ * image. (XXX We could alternatively store a pointer into the FPW).
*/
- if (need_tuple_data)
+ if (RelationIsLogicallyLogged(relation))
{
- rdata[2].next = &(rdata[3]);
-
- rdata[3].data = NULL;
- rdata[3].len = 0;
- rdata[3].buffer = buffer;
- rdata[3].buffer_std = true;
- rdata[3].next = NULL;
-
xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
+ bufflags |= REGBUF_KEEP_DATA;
}
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHeapInsert);
+
+ xlhdr.t_infomask2 = heaptup->t_data->t_infomask2;
+ xlhdr.t_infomask = heaptup->t_data->t_infomask;
+ xlhdr.t_hoff = heaptup->t_data->t_hoff;
+
/*
- * If this is the single and first tuple on page, we can reinit the
- * page instead of restoring the whole thing. Set flag, and hide
- * buffer references from XLogInsert.
+ * note we mark xlhdr as belonging to buffer; if XLogInsert decides to
+ * write the whole page to the xlog, we don't need to store
+ * xl_heap_header in the xlog.
*/
- if (ItemPointerGetOffsetNumber(&(heaptup->t_self)) == FirstOffsetNumber &&
- PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
- {
- info |= XLOG_HEAP_INIT_PAGE;
- rdata[1].buffer = rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
- }
+ XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
+ XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader);
+ /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
+ XLogRegisterBufData(0,
+ (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits),
+ heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits));
- recptr = XLogInsert(RM_HEAP_ID, info, rdata);
+ recptr = XLogInsert(RM_HEAP_ID, info);
PageSetLSN(page, recptr);
}
break;
RelationPutHeapTuple(relation, buffer, heaptup);
+
+ /*
+ * We don't use heap_multi_insert for catalog tuples yet, but
+ * better be prepared...
+ */
+ if (needwal && need_cids)
+ log_heap_new_cid(relation, heaptup);
}
if (PageIsAllVisible(page))
{
XLogRecPtr recptr;
xl_heap_multi_insert *xlrec;
- XLogRecData rdata[3];
uint8 info = XLOG_HEAP2_MULTI_INSERT;
char *tupledata;
int totaldatalen;
char *scratchptr = scratch;
bool init;
+ int bufflags = 0;
/*
* If the page was previously empty, we can reinit the page
tupledata = scratchptr;
xlrec->flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
- xlrec->node = relation->rd_node;
- xlrec->blkno = BufferGetBlockNumber(buffer);
xlrec->ntuples = nthispage;
/*
datalen);
tuphdr->datalen = datalen;
scratchptr += datalen;
-
- /*
- * We don't use heap_multi_insert for catalog tuples yet, but
- * better be prepared...
- */
- if (need_cids)
- log_heap_new_cid(relation, heaptup);
}
totaldatalen = scratchptr - tupledata;
Assert((scratchptr - scratch) < BLCKSZ);
- rdata[0].data = (char *) xlrec;
- rdata[0].len = tupledata - scratch;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &rdata[1];
-
- rdata[1].data = tupledata;
- rdata[1].len = totaldatalen;
- rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer;
- rdata[1].buffer_std = true;
- rdata[1].next = NULL;
-
- /*
- * Make a separate rdata entry for the tuple's buffer if we're
- * doing logical decoding, so that an eventual FPW doesn't remove
- * the tuple's data.
- */
if (need_tuple_data)
- {
- rdata[1].next = &(rdata[2]);
-
- rdata[2].data = NULL;
- rdata[2].len = 0;
- rdata[2].buffer = buffer;
- rdata[2].buffer_std = true;
- rdata[2].next = NULL;
xlrec->flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
- }
/*
- * If we're going to reinitialize the whole page using the WAL
- * record, hide buffer reference from XLogInsert.
+ * Signal that this is the last xl_heap_multi_insert record
+ * emitted by this call to heap_multi_insert(). Needed for logical
+ * decoding so it knows when to cleanup temporary data.
*/
+ if (ndone + nthispage == ntuples)
+ xlrec->flags |= XLOG_HEAP_LAST_MULTI_INSERT;
+
if (init)
{
- rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
info |= XLOG_HEAP_INIT_PAGE;
+ bufflags |= REGBUF_WILL_INIT;
}
/*
- * Signal that this is the last xl_heap_multi_insert record
- * emitted by this call to heap_multi_insert(). Needed for logical
- * decoding so it knows when to cleanup temporary data.
+ * If we're doing logical decoding, include the new tuple data
+ * even if we take a full-page image of the page.
*/
- if (ndone + nthispage == ntuples)
- xlrec->flags |= XLOG_HEAP_LAST_MULTI_INSERT;
+ if (need_tuple_data)
+ bufflags |= REGBUF_KEEP_DATA;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) xlrec, tupledata - scratch);
+ XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
- recptr = XLogInsert(RM_HEAP2_ID, info, rdata);
+ XLogRegisterBufData(0, tupledata, totaldatalen);
+ recptr = XLogInsert(RM_HEAP2_ID, info);
PageSetLSN(page, recptr);
}
{
xl_heap_delete xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[4];
/* For logical decode we need combocids to properly decode the catalog */
if (RelationIsAccessibleInLogicalDecoding(relation))
xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask,
tp.t_data->t_infomask2);
- xlrec.target.node = relation->rd_node;
- xlrec.target.tid = tp.t_self;
+ xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self);
xlrec.xmax = new_xmax;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapDelete;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
- rdata[1].data = NULL;
- rdata[1].len = 0;
- rdata[1].buffer = buffer;
- rdata[1].buffer_std = true;
- rdata[1].next = NULL;
+ if (old_key_tuple != NULL)
+ {
+ if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE;
+ else
+ xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY;
+ }
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHeapDelete);
+
+ XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
/*
* Log replica identity of the deleted tuple if there is one
xlhdr.t_infomask = old_key_tuple->t_data->t_infomask;
xlhdr.t_hoff = old_key_tuple->t_data->t_hoff;
- rdata[1].next = &(rdata[2]);
- rdata[2].data = (char *) &xlhdr;
- rdata[2].len = SizeOfHeapHeader;
- rdata[2].buffer = InvalidBuffer;
- rdata[2].next = NULL;
-
- rdata[2].next = &(rdata[3]);
- rdata[3].data = (char *) old_key_tuple->t_data
- + offsetof(HeapTupleHeaderData, t_bits);
- rdata[3].len = old_key_tuple->t_len
- - offsetof(HeapTupleHeaderData, t_bits);
- rdata[3].buffer = InvalidBuffer;
- rdata[3].next = NULL;
-
- if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
- xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE;
- else
- xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY;
+ XLogRegisterData((char *) &xlhdr, SizeOfHeapHeader);
+ XLogRegisterData((char *) old_key_tuple->t_data
+ + offsetof(HeapTupleHeaderData, t_bits),
+ old_key_tuple->t_len
+ - offsetof(HeapTupleHeaderData, t_bits));
}
- recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata);
+ recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE);
PageSetLSN(page, recptr);
}
{
xl_heap_lock xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[2];
- xlrec.target.node = relation->rd_node;
- xlrec.target.tid = tuple->t_self;
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, *buffer, REGBUF_STANDARD);
+
+ xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self);
xlrec.locking_xid = xid;
xlrec.infobits_set = compute_infobits(new_infomask,
tuple->t_data->t_infomask2);
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapLock;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
-
- rdata[1].data = NULL;
- rdata[1].len = 0;
- rdata[1].buffer = *buffer;
- rdata[1].buffer_std = true;
- rdata[1].next = NULL;
+ XLogRegisterData((char *) &xlrec, SizeOfHeapLock);
- recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK, rdata);
+ recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK);
PageSetLSN(page, recptr);
}
{
xl_heap_lock_updated xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[2];
Page page = BufferGetPage(buf);
- xlrec.target.node = rel->rd_node;
- xlrec.target.tid = mytup.t_self;
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+
+ xlrec.offnum = ItemPointerGetOffsetNumber(&mytup.t_self);
xlrec.xmax = new_xmax;
xlrec.infobits_set = compute_infobits(new_infomask, new_infomask2);
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapLockUpdated;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ XLogRegisterData((char *) &xlrec, SizeOfHeapLockUpdated);
- rdata[1].data = NULL;
- rdata[1].len = 0;
- rdata[1].buffer = buf;
- rdata[1].buffer_std = true;
- rdata[1].next = NULL;
-
- recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_LOCK_UPDATED, rdata);
+ recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_LOCK_UPDATED);
PageSetLSN(page, recptr);
}
{
xl_heap_inplace xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[2];
- xlrec.target.node = relation->rd_node;
- xlrec.target.tid = tuple->t_self;
+ xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self);
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapInplace;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHeapInplace);
- rdata[1].data = (char *) htup + htup->t_hoff;
- rdata[1].len = newlen;
- rdata[1].buffer = buffer;
- rdata[1].buffer_std = true;
- rdata[1].next = NULL;
+ XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+ XLogRegisterBufData(0, (char *) htup + htup->t_hoff, newlen);
- recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE, rdata);
+ recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE);
PageSetLSN(page, recptr);
}
{
xl_heap_cleanup_info xlrec;
XLogRecPtr recptr;
- XLogRecData rdata;
xlrec.node = rnode;
xlrec.latestRemovedXid = latestRemovedXid;
- rdata.data = (char *) &xlrec;
- rdata.len = SizeOfHeapCleanupInfo;
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHeapCleanupInfo);
- recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_CLEANUP_INFO, &rdata);
+ recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_CLEANUP_INFO);
return recptr;
}
TransactionId latestRemovedXid)
{
xl_heap_clean xlrec;
- uint8 info;
XLogRecPtr recptr;
- XLogRecData rdata[4];
/* Caller should not call me on a non-WAL-logged relation */
Assert(RelationNeedsWAL(reln));
- xlrec.node = reln->rd_node;
- xlrec.block = BufferGetBlockNumber(buffer);
xlrec.latestRemovedXid = latestRemovedXid;
xlrec.nredirected = nredirected;
xlrec.ndead = ndead;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapClean;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHeapClean);
+
+ XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
/*
* The OffsetNumber arrays are not actually in the buffer, but we pretend
* even if no item pointers changed state.
*/
if (nredirected > 0)
- {
- rdata[1].data = (char *) redirected;
- rdata[1].len = nredirected * sizeof(OffsetNumber) * 2;
- }
- else
- {
- rdata[1].data = NULL;
- rdata[1].len = 0;
- }
- rdata[1].buffer = buffer;
- rdata[1].buffer_std = true;
- rdata[1].next = &(rdata[2]);
+ XLogRegisterBufData(0, (char *) redirected,
+ nredirected * sizeof(OffsetNumber) * 2);
if (ndead > 0)
- {
- rdata[2].data = (char *) nowdead;
- rdata[2].len = ndead * sizeof(OffsetNumber);
- }
- else
- {
- rdata[2].data = NULL;
- rdata[2].len = 0;
- }
- rdata[2].buffer = buffer;
- rdata[2].buffer_std = true;
- rdata[2].next = &(rdata[3]);
+ XLogRegisterBufData(0, (char *) nowdead,
+ ndead * sizeof(OffsetNumber));
if (nunused > 0)
- {
- rdata[3].data = (char *) nowunused;
- rdata[3].len = nunused * sizeof(OffsetNumber);
- }
- else
- {
- rdata[3].data = NULL;
- rdata[3].len = 0;
- }
- rdata[3].buffer = buffer;
- rdata[3].buffer_std = true;
- rdata[3].next = NULL;
+ XLogRegisterBufData(0, (char *) nowunused,
+ nunused * sizeof(OffsetNumber));
- info = XLOG_HEAP2_CLEAN;
- recptr = XLogInsert(RM_HEAP2_ID, info, rdata);
+ recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_CLEAN);
return recptr;
}
{
xl_heap_freeze_page xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[2];
/* Caller should not call me on a non-WAL-logged relation */
Assert(RelationNeedsWAL(reln));
/* nor when there are no tuples to freeze */
Assert(ntuples > 0);
- xlrec.node = reln->rd_node;
- xlrec.block = BufferGetBlockNumber(buffer);
xlrec.cutoff_xid = cutoff_xid;
xlrec.ntuples = ntuples;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapFreezePage;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHeapFreezePage);
/*
* The freeze plan array is not actually in the buffer, but pretend that
* it is. When XLogInsert stores the whole buffer, the freeze plan need
* not be stored too.
*/
- rdata[1].data = (char *) tuples;
- rdata[1].len = ntuples * sizeof(xl_heap_freeze_tuple);
- rdata[1].buffer = buffer;
- rdata[1].buffer_std = true;
- rdata[1].next = NULL;
+ XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+ XLogRegisterBufData(0, (char *) tuples,
+ ntuples * sizeof(xl_heap_freeze_tuple));
- recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_FREEZE_PAGE, rdata);
+ recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_FREEZE_PAGE);
return recptr;
}
* corresponding visibility map block. Both should have already been modified
* and dirtied.
*
- * If checksums are enabled, we also add the heap_buffer to the chain to
- * protect it from being torn.
+ * If checksums are enabled, we also generate a full-page image of
+ * heap_buffer, if necessary.
*/
XLogRecPtr
log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
{
xl_heap_visible xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[3];
+ uint8 flags;
Assert(BufferIsValid(heap_buffer));
Assert(BufferIsValid(vm_buffer));
- xlrec.node = rnode;
- xlrec.block = BufferGetBlockNumber(heap_buffer);
xlrec.cutoff_xid = cutoff_xid;
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHeapVisible);
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapVisible;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ XLogRegisterBuffer(0, vm_buffer, 0);
- rdata[1].data = NULL;
- rdata[1].len = 0;
- rdata[1].buffer = vm_buffer;
- rdata[1].buffer_std = false;
- rdata[1].next = NULL;
+ flags = REGBUF_STANDARD;
+ if (!XLogHintBitIsNeeded())
+ flags |= REGBUF_NO_IMAGE;
+ XLogRegisterBuffer(1, heap_buffer, flags);
- if (XLogHintBitIsNeeded())
- {
- rdata[1].next = &(rdata[2]);
-
- rdata[2].data = NULL;
- rdata[2].len = 0;
- rdata[2].buffer = heap_buffer;
- rdata[2].buffer_std = true;
- rdata[2].next = NULL;
- }
-
- recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_VISIBLE, rdata);
+ recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_VISIBLE);
return recptr;
}
bool all_visible_cleared, bool new_all_visible_cleared)
{
xl_heap_update xlrec;
- xl_heap_header_len xlhdr;
- xl_heap_header_len xlhdr_idx;
+ xl_heap_header xlhdr;
+ xl_heap_header xlhdr_idx;
uint8 info;
uint16 prefix_suffix[2];
uint16 prefixlen = 0,
suffixlen = 0;
XLogRecPtr recptr;
- XLogRecData rdata[9];
Page page = BufferGetPage(newbuf);
bool need_tuple_data = RelationIsLogicallyLogged(reln);
- int nr;
- Buffer newbufref;
+ bool init;
+ int bufflags;
/* Caller should not call me on a non-WAL-logged relation */
Assert(RelationNeedsWAL(reln));
+ XLogBeginInsert();
+
if (HeapTupleIsHeapOnly(newtup))
info = XLOG_HEAP_HOT_UPDATE;
else
suffixlen = 0;
}
- xlrec.target.node = reln->rd_node;
- xlrec.target.tid = oldtup->t_self;
- xlrec.old_xmax = HeapTupleHeaderGetRawXmax(oldtup->t_data);
- xlrec.old_infobits_set = compute_infobits(oldtup->t_data->t_infomask,
- oldtup->t_data->t_infomask2);
- xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data);
+ /* Prepare main WAL data chain */
xlrec.flags = 0;
if (all_visible_cleared)
xlrec.flags |= XLOG_HEAP_ALL_VISIBLE_CLEARED;
- xlrec.newtid = newtup->t_self;
if (new_all_visible_cleared)
xlrec.flags |= XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED;
if (prefixlen > 0)
xlrec.flags |= XLOG_HEAP_PREFIX_FROM_OLD;
if (suffixlen > 0)
xlrec.flags |= XLOG_HEAP_SUFFIX_FROM_OLD;
+ if (need_tuple_data)
+ {
+ xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
+ if (old_key_tuple)
+ {
+ if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE;
+ else
+ xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY;
+ }
+ }
/* If new tuple is the single and first tuple on page... */
if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
{
info |= XLOG_HEAP_INIT_PAGE;
- newbufref = InvalidBuffer;
+ init = true;
}
else
- newbufref = newbuf;
+ init = false;
- rdata[0].data = NULL;
- rdata[0].len = 0;
- rdata[0].buffer = oldbuf;
- rdata[0].buffer_std = true;
- rdata[0].next = &(rdata[1]);
+ /* Prepare WAL data for the old page */
+ xlrec.old_offnum = ItemPointerGetOffsetNumber(&oldtup->t_self);
+ xlrec.old_xmax = HeapTupleHeaderGetRawXmax(oldtup->t_data);
+ xlrec.old_infobits_set = compute_infobits(oldtup->t_data->t_infomask,
+ oldtup->t_data->t_infomask2);
+
+ /* Prepare WAL data for the new page */
+ xlrec.new_offnum = ItemPointerGetOffsetNumber(&newtup->t_self);
+ xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data);
+
+ bufflags = REGBUF_STANDARD;
+ if (init)
+ bufflags |= REGBUF_WILL_INIT;
+ if (need_tuple_data)
+ bufflags |= REGBUF_KEEP_DATA;
- rdata[1].data = (char *) &xlrec;
- rdata[1].len = SizeOfHeapUpdate;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = &(rdata[2]);
+ XLogRegisterBuffer(0, newbuf, bufflags);
+ if (oldbuf != newbuf)
+ XLogRegisterBuffer(1, oldbuf, REGBUF_STANDARD);
- /* prefix and/or suffix length fields */
+ XLogRegisterData((char *) &xlrec, SizeOfHeapUpdate);
+
+ /*
+ * Prepare WAL data for the new tuple.
+ */
if (prefixlen > 0 || suffixlen > 0)
{
if (prefixlen > 0 && suffixlen > 0)
{
prefix_suffix[0] = prefixlen;
prefix_suffix[1] = suffixlen;
- rdata[2].data = (char *) &prefix_suffix;
- rdata[2].len = 2 * sizeof(uint16);
+ XLogRegisterBufData(0, (char *) &prefix_suffix, sizeof(uint16) * 2);
}
else if (prefixlen > 0)
{
- rdata[2].data = (char *) &prefixlen;
- rdata[2].len = sizeof(uint16);
+ XLogRegisterBufData(0, (char *) &prefixlen, sizeof(uint16));
}
else
{
- rdata[2].data = (char *) &suffixlen;
- rdata[2].len = sizeof(uint16);
+ XLogRegisterBufData(0, (char *) &suffixlen, sizeof(uint16));
}
- rdata[2].buffer = newbufref;
- rdata[2].buffer_std = true;
- rdata[2].next = &(rdata[3]);
- nr = 3;
}
- else
- nr = 2;
-
- xlhdr.header.t_infomask2 = newtup->t_data->t_infomask2;
- xlhdr.header.t_infomask = newtup->t_data->t_infomask;
- xlhdr.header.t_hoff = newtup->t_data->t_hoff;
- Assert(offsetof(HeapTupleHeaderData, t_bits) +prefixlen + suffixlen <= newtup->t_len);
- xlhdr.t_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits) -prefixlen - suffixlen;
- /*
- * As with insert records, we need not store this rdata segment if we
- * decide to store the whole buffer instead, unless we're doing logical
- * decoding.
- */
- rdata[nr].data = (char *) &xlhdr;
- rdata[nr].len = SizeOfHeapHeaderLen;
- rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
- rdata[nr].buffer_std = true;
- rdata[nr].next = &(rdata[nr + 1]);
- nr++;
+ xlhdr.t_infomask2 = newtup->t_data->t_infomask2;
+ xlhdr.t_infomask = newtup->t_data->t_infomask;
+ xlhdr.t_hoff = newtup->t_data->t_hoff;
+ Assert(offsetof(HeapTupleHeaderData, t_bits) + prefixlen + suffixlen <= newtup->t_len);
/*
* PG73FORMAT: write bitmap [+ padding] [+ oid] + data
*
* The 'data' doesn't include the common prefix or suffix.
*/
+ XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader);
if (prefixlen == 0)
{
- rdata[nr].data = ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits);
- rdata[nr].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits) -suffixlen;
- rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
- rdata[nr].buffer_std = true;
- rdata[nr].next = NULL;
- nr++;
+ XLogRegisterBufData(0,
+ ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits),
+ newtup->t_len - offsetof(HeapTupleHeaderData, t_bits) -suffixlen);
}
else
{
/* bitmap [+ padding] [+ oid] */
if (newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits) >0)
{
- rdata[nr - 1].next = &(rdata[nr]);
- rdata[nr].data = ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits);
- rdata[nr].len = newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits);
- rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
- rdata[nr].buffer_std = true;
- rdata[nr].next = NULL;
- nr++;
+ XLogRegisterBufData(0,
+ ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits),
+ newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits));
}
/* data after common prefix */
- rdata[nr - 1].next = &(rdata[nr]);
- rdata[nr].data = ((char *) newtup->t_data) + newtup->t_data->t_hoff + prefixlen;
- rdata[nr].len = newtup->t_len - newtup->t_data->t_hoff - prefixlen - suffixlen;
- rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
- rdata[nr].buffer_std = true;
- rdata[nr].next = NULL;
- nr++;
+ XLogRegisterBufData(0,
+ ((char *) newtup->t_data) + newtup->t_data->t_hoff + prefixlen,
+ newtup->t_len - newtup->t_data->t_hoff - prefixlen - suffixlen);
}
- /*
- * Separate storage for the FPW buffer reference of the new page in the
- * wal_level >= logical case.
- */
- if (need_tuple_data)
+ /* We need to log a tuple identity */
+ if (need_tuple_data && old_key_tuple)
{
- rdata[nr - 1].next = &(rdata[nr]);
-
- rdata[nr].data = NULL,
- rdata[nr].len = 0;
- rdata[nr].buffer = newbufref;
- rdata[nr].buffer_std = true;
- rdata[nr].next = NULL;
- nr++;
-
- xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
+ /* don't really need this, but its more comfy to decode */
+ xlhdr_idx.t_infomask2 = old_key_tuple->t_data->t_infomask2;
+ xlhdr_idx.t_infomask = old_key_tuple->t_data->t_infomask;
+ xlhdr_idx.t_hoff = old_key_tuple->t_data->t_hoff;
- /* We need to log a tuple identity */
- if (old_key_tuple)
- {
- /* don't really need this, but its more comfy to decode */
- xlhdr_idx.header.t_infomask2 = old_key_tuple->t_data->t_infomask2;
- xlhdr_idx.header.t_infomask = old_key_tuple->t_data->t_infomask;
- xlhdr_idx.header.t_hoff = old_key_tuple->t_data->t_hoff;
- xlhdr_idx.t_len = old_key_tuple->t_len;
-
- rdata[nr - 1].next = &(rdata[nr]);
- rdata[nr].data = (char *) &xlhdr_idx;
- rdata[nr].len = SizeOfHeapHeaderLen;
- rdata[nr].buffer = InvalidBuffer;
- rdata[nr].next = &(rdata[nr + 1]);
- nr++;
-
- /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
- rdata[nr].data = (char *) old_key_tuple->t_data
- + offsetof(HeapTupleHeaderData, t_bits);
- rdata[nr].len = old_key_tuple->t_len
- - offsetof(HeapTupleHeaderData, t_bits);
- rdata[nr].buffer = InvalidBuffer;
- rdata[nr].next = NULL;
- nr++;
+ XLogRegisterData((char *) &xlhdr_idx, SizeOfHeapHeader);
- if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
- xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE;
- else
- xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY;
- }
+ /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
+ XLogRegisterData((char *) old_key_tuple->t_data + offsetof(HeapTupleHeaderData, t_bits),
+ old_key_tuple->t_len - offsetof(HeapTupleHeaderData, t_bits));
}
- recptr = XLogInsert(RM_HEAP_ID, info, rdata);
+ recptr = XLogInsert(RM_HEAP_ID, info);
return recptr;
}
xl_heap_new_cid xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[1];
HeapTupleHeader hdr = tup->t_data;
Assert(ItemPointerIsValid(&tup->t_self));
Assert(tup->t_tableOid != InvalidOid);
xlrec.top_xid = GetTopTransactionId();
- xlrec.target.node = relation->rd_node;
- xlrec.target.tid = tup->t_self;
+ xlrec.target_node = relation->rd_node;
+ xlrec.target_tid = tup->t_self;
/*
* If the tuple got inserted & deleted in the same TX we definitely have a
xlrec.combocid = InvalidCommandId;
}
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapNewCid;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = NULL;
+ /*
+ * Note that we don't need to register the buffer here, because this
+ * operation does not modify the page. The insert/update/delete that
+ * called us certainly did, but that's WAL-logged separately.
+ */
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHeapNewCid);
- recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID, rdata);
+ recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID);
return recptr;
}
* Handles CLEANUP_INFO
*/
static void
-heap_xlog_cleanup_info(XLogRecPtr lsn, XLogRecord *record)
+heap_xlog_cleanup_info(XLogReaderState *record)
{
xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record);
*/
/* Backup blocks are not used in cleanup_info records */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+ Assert(!XLogRecHasAnyBlockRefs(record));
}
/*
* Handles HEAP2_CLEAN record type
*/
static void
-heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
+heap_xlog_clean(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_heap_clean *xlrec = (xl_heap_clean *) XLogRecGetData(record);
Buffer buffer;
Size freespace = 0;
BlockNumber blkno;
XLogRedoAction action;
- rnode = xlrec->node;
- blkno = xlrec->block;
+ XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
/*
* We're about to remove tuples. In Hot Standby mode, ensure that there's
* If we have a full-page image, restore it (using a cleanup lock) and
* we're done.
*/
- action = XLogReadBufferForRedoExtended(lsn, record, 0,
- rnode, MAIN_FORKNUM, blkno,
- RBM_NORMAL, true, &buffer);
+ action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true,
+ &buffer);
if (action == BLK_NEEDS_REDO)
{
Page page = (Page) BufferGetPage(buffer);
int nredirected;
int ndead;
int nunused;
+ Size datalen;
+
+ redirected = (OffsetNumber *) XLogRecGetBlockData(record, 0, &datalen);
nredirected = xlrec->nredirected;
ndead = xlrec->ndead;
- end = (OffsetNumber *) ((char *) xlrec + record->xl_len);
- redirected = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean);
+ end = (OffsetNumber *) ((char *) redirected + datalen);
nowdead = redirected + (nredirected * 2);
nowunused = nowdead + ndead;
nunused = (end - nowunused);
* totally accurate anyway.
*/
if (action == BLK_NEEDS_REDO)
- XLogRecordPageWithFreeSpace(xlrec->node, xlrec->block, freespace);
+ XLogRecordPageWithFreeSpace(rnode, blkno, freespace);
}
/*
* page modification would fail to clear the visibility map bit.
*/
static void
-heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
+heap_xlog_visible(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record);
+ Buffer vmbuffer = InvalidBuffer;
Buffer buffer;
Page page;
RelFileNode rnode;
BlockNumber blkno;
XLogRedoAction action;
- rnode = xlrec->node;
- blkno = xlrec->block;
+ XLogRecGetBlockTag(record, 1, &rnode, NULL, &blkno);
/*
* If there are any Hot Standby transactions running that have an xmin
* truncated later in recovery, we don't need to update the page, but we'd
* better still update the visibility map.
*/
- action = XLogReadBufferForRedo(lsn, record, 1, rnode, blkno, &buffer);
+ action = XLogReadBufferForRedo(record, 1, &buffer);
if (action == BLK_NEEDS_REDO)
{
/*
* the visibility map bit does so before checking the page LSN, so any
* bits that need to be cleared will still be cleared.
*/
- if (record->xl_info & XLR_BKP_BLOCK(0))
- (void) RestoreBackupBlock(lsn, record, 0, false, false);
- else
+ if (XLogReadBufferForRedoExtended(record, 0, RBM_ZERO_ON_ERROR, false,
+ &vmbuffer) == BLK_NEEDS_REDO)
{
+ Page vmpage = BufferGetPage(vmbuffer);
Relation reln;
- Buffer vmbuffer = InvalidBuffer;
+
+ /* initialize the page if it was read as zeros */
+ if (PageIsNew(vmpage))
+ PageInit(vmpage, BLCKSZ, 0);
+
+ /*
+ * XLogReplayBufferExtended locked the buffer. But visibilitymap_set
+ * will handle locking itself.
+ */
+ LockBuffer(vmbuffer, BUFFER_LOCK_UNLOCK);
reln = CreateFakeRelcacheEntry(rnode);
visibilitymap_pin(reln, blkno, &vmbuffer);
* we did for the heap page. If this results in a dropped bit, no
* real harm is done; and the next VACUUM will fix it.
*/
- if (lsn > PageGetLSN(BufferGetPage(vmbuffer)))
+ if (lsn > PageGetLSN(vmpage))
visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer,
xlrec->cutoff_xid);
ReleaseBuffer(vmbuffer);
FreeFakeRelcacheEntry(reln);
}
+ else if (BufferIsValid(vmbuffer))
+ UnlockReleaseBuffer(vmbuffer);
}
/*
* Replay XLOG_HEAP2_FREEZE_PAGE records
*/
static void
-heap_xlog_freeze_page(XLogRecPtr lsn, XLogRecord *record)
+heap_xlog_freeze_page(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_heap_freeze_page *xlrec = (xl_heap_freeze_page *) XLogRecGetData(record);
TransactionId cutoff_xid = xlrec->cutoff_xid;
Buffer buffer;
- Page page;
int ntup;
/*
* consider the frozen xids as running.
*/
if (InHotStandby)
- ResolveRecoveryConflictWithSnapshot(cutoff_xid, xlrec->node);
+ {
+ RelFileNode rnode;
- if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, xlrec->block,
- &buffer) == BLK_NEEDS_REDO)
+ XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
+ ResolveRecoveryConflictWithSnapshot(cutoff_xid, rnode);
+ }
+
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
- page = BufferGetPage(buffer);
+ Page page = BufferGetPage(buffer);
+ xl_heap_freeze_tuple *tuples;
+
+ tuples = (xl_heap_freeze_tuple *) XLogRecGetBlockData(record, 0, NULL);
/* now execute freeze plan for each frozen tuple */
for (ntup = 0; ntup < xlrec->ntuples; ntup++)
ItemId lp;
HeapTupleHeader tuple;
- xlrec_tp = &xlrec->tuples[ntup];
+ xlrec_tp = &tuples[ntup];
lp = PageGetItemId(page, xlrec_tp->offset); /* offsets are one-based */
tuple = (HeapTupleHeader) PageGetItem(page, lp);
}
static void
-heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
+heap_xlog_delete(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
Buffer buffer;
Page page;
- OffsetNumber offnum;
ItemId lp = NULL;
HeapTupleHeader htup;
BlockNumber blkno;
RelFileNode target_node;
+ ItemPointerData target_tid;
- blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid));
- target_node = xlrec->target.node;
+ XLogRecGetBlockTag(record, 0, &target_node, NULL, &blkno);
+ ItemPointerSetBlockNumber(&target_tid, blkno);
+ ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
/*
* The visibility map may need to be fixed even if the heap page is
FreeFakeRelcacheEntry(reln);
}
- if (XLogReadBufferForRedo(lsn, record, 0, target_node, blkno, &buffer)
- == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
- page = (Page) BufferGetPage(buffer);
+ page = BufferGetPage(buffer);
- offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
- if (PageGetMaxOffsetNumber(page) >= offnum)
- lp = PageGetItemId(page, offnum);
+ if (PageGetMaxOffsetNumber(page) >= xlrec->offnum)
+ lp = PageGetItemId(page, xlrec->offnum);
- if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+ if (PageGetMaxOffsetNumber(page) < xlrec->offnum || !ItemIdIsNormal(lp))
elog(PANIC, "heap_delete_redo: invalid lp");
htup = (HeapTupleHeader) PageGetItem(page, lp);
HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
/* Mark the page as a candidate for pruning */
- PageSetPrunable(page, record->xl_xid);
+ PageSetPrunable(page, XLogRecGetXid(record));
if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
/* Make sure there is no forward chain link in t_ctid */
- htup->t_ctid = xlrec->target.tid;
+ htup->t_ctid = target_tid;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
}
static void
-heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
+heap_xlog_insert(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
Buffer buffer;
Page page;
- OffsetNumber offnum;
struct
{
HeapTupleHeaderData hdr;
Size freespace = 0;
RelFileNode target_node;
BlockNumber blkno;
+ ItemPointerData target_tid;
XLogRedoAction action;
- target_node = xlrec->target.node;
- blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid));
+ XLogRecGetBlockTag(record, 0, &target_node, NULL, &blkno);
+ ItemPointerSetBlockNumber(&target_tid, blkno);
+ ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
/*
* The visibility map may need to be fixed even if the heap page is
}
/*
- * If we inserted the first and only tuple on the page, re-initialize
- * the page from scratch.
+ * If we inserted the first and only tuple on the page, re-initialize the
+ * page from scratch.
*/
- if (record->xl_info & XLOG_HEAP_INIT_PAGE)
+ if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
{
- XLogReadBufferForRedoExtended(lsn, record, 0,
- target_node, MAIN_FORKNUM, blkno,
- RBM_ZERO_AND_LOCK, false, &buffer);
+ buffer = XLogInitBufferForRedo(record, 0);
page = BufferGetPage(buffer);
PageInit(page, BufferGetPageSize(buffer), 0);
action = BLK_NEEDS_REDO;
}
else
- action = XLogReadBufferForRedo(lsn, record, 0, target_node, blkno,
- &buffer);
-
+ action = XLogReadBufferForRedo(record, 0, &buffer);
if (action == BLK_NEEDS_REDO)
{
+ Size datalen;
+ char *data;
+
page = BufferGetPage(buffer);
- offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
- if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+ if (PageGetMaxOffsetNumber(page) + 1 < xlrec->offnum)
elog(PANIC, "heap_insert_redo: invalid max offset number");
- newlen = record->xl_len - SizeOfHeapInsert - SizeOfHeapHeader;
- Assert(newlen <= MaxHeapTupleSize);
- memcpy((char *) &xlhdr,
- (char *) xlrec + SizeOfHeapInsert,
- SizeOfHeapHeader);
+ data = XLogRecGetBlockData(record, 0, &datalen);
+
+ newlen = datalen - SizeOfHeapHeader;
+ Assert(datalen > SizeOfHeapHeader && newlen <= MaxHeapTupleSize);
+ memcpy((char *) &xlhdr, data, SizeOfHeapHeader);
+ data += SizeOfHeapHeader;
+
htup = &tbuf.hdr;
MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
- (char *) xlrec + SizeOfHeapInsert + SizeOfHeapHeader,
+ data,
newlen);
newlen += offsetof(HeapTupleHeaderData, t_bits);
htup->t_infomask2 = xlhdr.t_infomask2;
htup->t_infomask = xlhdr.t_infomask;
htup->t_hoff = xlhdr.t_hoff;
- HeapTupleHeaderSetXmin(htup, record->xl_xid);
+ HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
HeapTupleHeaderSetCmin(htup, FirstCommandId);
- htup->t_ctid = xlrec->target.tid;
+ htup->t_ctid = target_tid;
- offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
- if (offnum == InvalidOffsetNumber)
+ if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum,
+ true, true) == InvalidOffsetNumber)
elog(PANIC, "heap_insert_redo: failed to add tuple");
freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
* totally accurate anyway.
*/
if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
- XLogRecordPageWithFreeSpace(xlrec->target.node, blkno, freespace);
+ XLogRecordPageWithFreeSpace(target_node, blkno, freespace);
}
/*
* Handles MULTI_INSERT record type.
*/
static void
-heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
+heap_xlog_multi_insert(XLogReaderState *record)
{
- char *recdata = XLogRecGetData(record);
+ XLogRecPtr lsn = record->EndRecPtr;
xl_heap_multi_insert *xlrec;
RelFileNode rnode;
BlockNumber blkno;
uint32 newlen;
Size freespace = 0;
int i;
- bool isinit = (record->xl_info & XLOG_HEAP_INIT_PAGE) != 0;
+ bool isinit = (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) != 0;
XLogRedoAction action;
/*
* Insertion doesn't overwrite MVCC data, so no conflict processing is
* required.
*/
+ xlrec = (xl_heap_multi_insert *) XLogRecGetData(record);
- xlrec = (xl_heap_multi_insert *) recdata;
- recdata += SizeOfHeapMultiInsert;
-
- rnode = xlrec->node;
- blkno = xlrec->blkno;
-
- /*
- * If we're reinitializing the page, the tuples are stored in order from
- * FirstOffsetNumber. Otherwise there's an array of offsets in the WAL
- * record.
- */
- if (!isinit)
- recdata += sizeof(OffsetNumber) * xlrec->ntuples;
+ XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
/*
* The visibility map may need to be fixed even if the heap page is
if (isinit)
{
- XLogReadBufferForRedoExtended(lsn, record, 0,
- rnode, MAIN_FORKNUM, blkno,
- RBM_ZERO_AND_LOCK, false, &buffer);
+ buffer = XLogInitBufferForRedo(record, 0);
page = BufferGetPage(buffer);
PageInit(page, BufferGetPageSize(buffer), 0);
action = BLK_NEEDS_REDO;
}
else
- action = XLogReadBufferForRedo(lsn, record, 0, rnode, blkno, &buffer);
-
+ action = XLogReadBufferForRedo(record, 0, &buffer);
if (action == BLK_NEEDS_REDO)
{
- page = BufferGetPage(buffer);
+ char *tupdata;
+ char *endptr;
+ Size len;
+
+ /* Tuples are stored as block data */
+ tupdata = XLogRecGetBlockData(record, 0, &len);
+ endptr = tupdata + len;
+
+ page = (Page) BufferGetPage(buffer);
+
for (i = 0; i < xlrec->ntuples; i++)
{
OffsetNumber offnum;
xl_multi_insert_tuple *xlhdr;
+ /*
+ * If we're reinitializing the page, the tuples are stored in
+ * order from FirstOffsetNumber. Otherwise there's an array of
+ * offsets in the WAL record, and the tuples come after that.
+ */
if (isinit)
offnum = FirstOffsetNumber + i;
else
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
elog(PANIC, "heap_multi_insert_redo: invalid max offset number");
- xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(recdata);
- recdata = ((char *) xlhdr) + SizeOfMultiInsertTuple;
+ xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(tupdata);
+ tupdata = ((char *) xlhdr) + SizeOfMultiInsertTuple;
newlen = xlhdr->datalen;
&nbs