bool        got_toast = false;
    bool        got_date_is_int = false;
    bool        got_float8_pass_by_value = false;
+   bool        got_data_checksums = false;
    char       *lc_collate = NULL;
    char       *lc_ctype = NULL;
    char       *lc_monetary = NULL;
        got_float8_pass_by_value = true;
    }
 
+   /* Only in <= 9.2 */
+   if (GET_MAJOR_VERSION(cluster->major_version) <= 902)
+   {
+       cluster->controldata.data_checksums = false;
+       got_data_checksums = true;
+   }
+
    /* we have the result of cmd in "output". so parse it line by line now */
    while (fgets(bufin, sizeof(bufin), output))
    {
            cluster->controldata.float8_pass_by_value = strstr(p, "by value") != NULL;
            got_float8_pass_by_value = true;
        }
+       else if ((p = strstr(bufin, "checksums")) != NULL)
+       {
+           p = strchr(p, ':');
+
+           if (p == NULL || strlen(p) <= 1)
+               pg_log(PG_FATAL, "%d: controldata retrieval problem\n", __LINE__);
+
+           p++;                /* removing ':' char */
+           /* used later for contrib check */
+           cluster->controldata.data_checksums = strstr(p, "enabled") != NULL;
+           got_data_checksums = true;
+       }
        /* In pre-8.4 only */
        else if ((p = strstr(bufin, "LC_COLLATE:")) != NULL)
        {
        !got_tli ||
        !got_align || !got_blocksz || !got_largesz || !got_walsz ||
        !got_walseg || !got_ident || !got_index || !got_toast ||
-       !got_date_is_int || !got_float8_pass_by_value)
+       !got_date_is_int || !got_float8_pass_by_value || !got_data_checksums)
    {
        pg_log(PG_REPORT,
            "The %s cluster lacks some required control information:\n",
        if (!got_float8_pass_by_value)
            pg_log(PG_REPORT, "  float8 argument passing method\n");
 
+       /* value added in Postgres 9.3 */
+       if (!got_data_checksums)
+           pg_log(PG_REPORT, "  data checksums\n");
+
        pg_log(PG_FATAL,
               "Cannot continue without required control information, terminating\n");
    }
               "--disable-integer-datetimes or get server binaries built with those\n"
               "options.\n");
    }
+
+   if (oldctrl->data_checksums != newctrl->data_checksums)
+   {
+       pg_log(PG_FATAL,
+              "old and new pg_controldata checksums settings are invalid or do not match\n");
+   }
 }
 
 
 
    uint32      toast;
    bool        date_is_int;
    bool        float8_pass_by_value;
+   bool        data_checksums;
    char       *lc_collate;
    char       *lc_ctype;
    char       *encoding;
 
       </listitem>
      </varlistentry>
 
+    <varlistentry id="guc-ignore-checksum-failure" xreflabel="ignore_checksum_failure">
+      <term><varname>ignore_checksum_failure</varname> (<type>boolean</type>)</term>
+      <indexterm>
+       <primary><varname>ignore_checksum_failure</> configuration parameter</primary>
+      </indexterm>
+      <listitem>
+       <para>
+        Only has effect if <xref linkend="app-initdb-data-checksums"> are enabled.
+       </para>
+       <para>
+        Detection of a checksum failure during a read normally causes
+        <productname>PostgreSQL</> to report an error, aborting the current
+        transaction.  Setting <varname>ignore_checksum_failure</> to on causes
+        the system to ignore the failure (but still report a warning), and
+        continue processing.  This behavior may <emphasis>cause crashes, propagate
+        or hide corruption, or other serious problems</>.  However, it may allow
+        you to get past the error and retrieve undamaged tuples that might still be
+        present in the table if the block header is still sane. If the header is
+        corrupt an error will be reported even if this option is enabled. The
+        default setting is <literal>off</>, and it can only be changed by a superuser.
+       </para>
+      </listitem>
+     </varlistentry>
+
     <varlistentry id="guc-zero-damaged-pages" xreflabel="zero_damaged_pages">
       <term><varname>zero_damaged_pages</varname> (<type>boolean</type>)</term>
       <indexterm>
 
       </listitem>
      </varlistentry>
 
+     <varlistentry id="app-initdb-data-checksums" xreflabel="data checksums">
+      <term><option>-k</option></term>
+      <term><option>--data-checksums</option></term>
+      <listitem>
+       <para>
+        Use checksums on data pages to help detect corruption by the
+        I/O system that would otherwise be silent. Enabling checksums
+        may incur a noticeable performance penalty. This option can only
+        be set during initialization, and cannot be changed later. If
+        set, checksums are calculated for all objects, in all databases.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>--locale=<replaceable>locale</replaceable></option></term>
       <listitem>
 
            {
                /* Creating index-page GISTSearchItem */
                item->blkno = ItemPointerGetBlockNumber(&it->t_tid);
-               /* lsn of current page is lsn of parent page for child */
-               item->data.parentlsn = PageGetLSN(page);
+
+               /*
+                * LSN of current page is lsn of parent page for child. We only
+                * have a shared lock, so we need to get the LSN atomically.
+                */
+               item->data.parentlsn = BufferGetLSNAtomic(buffer);
            }
 
            /* Insert it into the queue using new distance data */
 
            ItemIdMarkDead(PageGetItemId(page, offnum));
 
            /*
-            * Since this can be redone later if needed, it's treated the same
-            * as a commit-hint-bit status update for heap tuples: we mark the
-            * buffer dirty but don't make a WAL log entry.
+            * Since this can be redone later if needed, mark as a hint.
             */
-           SetBufferCommitInfoNeedsSave(buf);
+           MarkBufferDirtyHint(buf);
        }
 
        /*
 
  * being marked all-visible, and vm_buffer is the buffer containing the
  * corresponding visibility map block. Both should have already been modified
  * and dirtied.
+ *
+ * If checksums are enabled, we also add the heap_buffer to the chain to
+ * protect it from being torn.
  */
 XLogRecPtr
-log_heap_visible(RelFileNode rnode, BlockNumber block, Buffer vm_buffer,
+log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
                 TransactionId cutoff_xid)
 {
    xl_heap_visible xlrec;
    XLogRecPtr  recptr;
-   XLogRecData rdata[2];
+   XLogRecData rdata[3];
+
+   Assert(BufferIsValid(heap_buffer));
+   Assert(BufferIsValid(vm_buffer));
 
    xlrec.node = rnode;
-   xlrec.block = block;
+   xlrec.block = BufferGetBlockNumber(heap_buffer);
    xlrec.cutoff_xid = cutoff_xid;
 
    rdata[0].data = (char *) &xlrec;
    rdata[1].buffer_std = false;
    rdata[1].next = NULL;
 
+   if (DataChecksumsEnabled())
+   {
+       rdata[1].next = &(rdata[2]);
+
+       rdata[2].data = NULL;
+       rdata[2].len = 0;
+       rdata[2].buffer = heap_buffer;
+       rdata[2].buffer_std = true;
+       rdata[2].next = NULL;
+   }
+
    recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_VISIBLE, rdata);
 
    return recptr;
 heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
 {
    xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record);
-   Buffer      buffer;
-   Page        page;
 
    /*
     * If there are any Hot Standby transactions running that have an xmin
        ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, xlrec->node);
 
    /*
-    * Read the heap page, if it still exists.  If the heap file has been
-    * dropped or truncated later in recovery, we don't need to update the
-    * page, but we'd better still update the visibility map.
+    * If heap block was backed up, restore it. This can only happen with
+    * checksums enabled.
     */
-   buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block,
-                                   RBM_NORMAL);
-   if (BufferIsValid(buffer))
+   if (record->xl_info & XLR_BKP_BLOCK(1))
    {
-       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-
-       page = (Page) BufferGetPage(buffer);
+       Assert(DataChecksumsEnabled());
+       (void) RestoreBackupBlock(lsn, record, 1, false, false);
+   }
+   else
+   {
+       Buffer      buffer;
+       Page        page;
 
        /*
-        * We don't bump the LSN of the heap page when setting the visibility
-        * map bit, because that would generate an unworkable volume of
-        * full-page writes.  This exposes us to torn page hazards, but since
-        * we're not inspecting the existing page contents in any way, we
-        * don't care.
-        *
-        * However, all operations that clear the visibility map bit *do* bump
-        * the LSN, and those operations will only be replayed if the XLOG LSN
-        * follows the page LSN.  Thus, if the page LSN has advanced past our
-        * XLOG record's LSN, we mustn't mark the page all-visible, because
-        * the subsequent update won't be replayed to clear the flag.
+        * Read the heap page, if it still exists. If the heap file has been
+        * dropped or truncated later in recovery, we don't need to update the
+        * page, but we'd better still update the visibility map.
         */
-       if (lsn > PageGetLSN(page))
+       buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM,
+                                       xlrec->block, RBM_NORMAL);
+       if (BufferIsValid(buffer))
        {
-           PageSetAllVisible(page);
-           MarkBufferDirty(buffer);
-       }
+           LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
-       /* Done with heap page. */
-       UnlockReleaseBuffer(buffer);
+           page = (Page) BufferGetPage(buffer);
+
+           /*
+            * We don't bump the LSN of the heap page when setting the
+            * visibility map bit (unless checksums are enabled, in which case
+            * we must), because that would generate an unworkable volume of
+            * full-page writes.  This exposes us to torn page hazards, but
+            * since we're not inspecting the existing page contents in any
+            * way, we don't care.
+            *
+            * However, all operations that clear the visibility map bit *do*
+            * bump the LSN, and those operations will only be replayed if the
+            * XLOG LSN follows the page LSN.  Thus, if the page LSN has
+            * advanced past our XLOG record's LSN, we mustn't mark the page
+            * all-visible, because the subsequent update won't be replayed to
+            * clear the flag.
+            */
+           if (lsn > PageGetLSN(page))
+           {
+               PageSetAllVisible(page);
+               MarkBufferDirty(buffer);
+           }
+
+           /* Done with heap page. */
+           UnlockReleaseBuffer(buffer);
+       }
    }
 
    /*
         * real harm is done; and the next VACUUM will fix it.
         */
        if (lsn > PageGetLSN(BufferGetPage(vmbuffer)))
-           visibilitymap_set(reln, xlrec->block, lsn, vmbuffer,
+           visibilitymap_set(reln, xlrec->block, InvalidBuffer, lsn, vmbuffer,
                              xlrec->cutoff_xid);
 
        ReleaseBuffer(vmbuffer);
 
        {
            ((PageHeader) page)->pd_prune_xid = prstate.new_prune_xid;
            PageClearFull(page);
-           SetBufferCommitInfoNeedsSave(buffer);
+           MarkBufferDirtyHint(buffer);
        }
    }
 
 
    /* Write the last page, if any */
    if (state->rs_buffer_valid)
    {
+       PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
+
        if (state->rs_use_wal)
            log_newpage(&state->rs_new_rel->rd_node,
                        MAIN_FORKNUM,
        {
            /* Doesn't fit, so write out the existing page */
 
+           PageSetChecksumInplace(page, state->rs_blockno);
+
            /* XLOG stuff */
            if (state->rs_use_wal)
                log_newpage(&state->rs_new_rel->rd_node,
 
  * marked all-visible; it is needed for Hot Standby, and can be
  * InvalidTransactionId if the page contains no tuples.
  *
+ * Caller is expected to set the heap page's PD_ALL_VISIBLE bit before calling
+ * this function. Except in recovery, caller should also pass the heap
+ * buffer. When checksums are enabled and we're not in recovery, we must add
+ * the heap buffer to the WAL chain to protect it from being torn.
+ *
  * You must pass a buffer containing the correct map page to this function.
  * Call visibilitymap_pin first to pin the right one. This function doesn't do
  * any I/O.
  */
 void
-visibilitymap_set(Relation rel, BlockNumber heapBlk, XLogRecPtr recptr,
-                 Buffer buf, TransactionId cutoff_xid)
+visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
+                 XLogRecPtr recptr, Buffer vmBuf, TransactionId cutoff_xid)
 {
    BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
    uint32      mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
 #endif
 
    Assert(InRecovery || XLogRecPtrIsInvalid(recptr));
+   Assert(InRecovery || BufferIsValid(heapBuf));
 
-   /* Check that we have the right page pinned */
-   if (!BufferIsValid(buf) || BufferGetBlockNumber(buf) != mapBlock)
-       elog(ERROR, "wrong buffer passed to visibilitymap_set");
+   /* Check that we have the right heap page pinned, if present */
+   if (BufferIsValid(heapBuf) && BufferGetBlockNumber(heapBuf) != heapBlk)
+       elog(ERROR, "wrong heap buffer passed to visibilitymap_set");
 
-   page = BufferGetPage(buf);
+   /* Check that we have the right VM page pinned */
+   if (!BufferIsValid(vmBuf) || BufferGetBlockNumber(vmBuf) != mapBlock)
+       elog(ERROR, "wrong VM buffer passed to visibilitymap_set");
+
+   page = BufferGetPage(vmBuf);
    map = PageGetContents(page);
-   LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+   LockBuffer(vmBuf, BUFFER_LOCK_EXCLUSIVE);
 
    if (!(map[mapByte] & (1 << mapBit)))
    {
        START_CRIT_SECTION();
 
        map[mapByte] |= (1 << mapBit);
-       MarkBufferDirty(buf);
+       MarkBufferDirty(vmBuf);
 
        if (RelationNeedsWAL(rel))
        {
            if (XLogRecPtrIsInvalid(recptr))
-               recptr = log_heap_visible(rel->rd_node, heapBlk, buf,
+           {
+               Assert(!InRecovery);
+               recptr = log_heap_visible(rel->rd_node, heapBuf, vmBuf,
                                          cutoff_xid);
+
+               /*
+                * If data checksums are enabled, we need to protect the heap
+                * page from being torn.
+                */
+               if (DataChecksumsEnabled())
+               {
+                   Page heapPage = BufferGetPage(heapBuf);
+
+                   /* caller is expected to set PD_ALL_VISIBLE first */
+                   Assert(PageIsAllVisible(heapPage));
+                   PageSetLSN(heapPage, recptr);
+               }
+           }
            PageSetLSN(page, recptr);
        }
 
        END_CRIT_SECTION();
    }
 
-   LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+   LockBuffer(vmBuf, BUFFER_LOCK_UNLOCK);
 }
 
 /*
    /* Now extend the file */
    while (vm_nblocks_now < vm_nblocks)
    {
+       PageSetChecksumInplace(pg, vm_nblocks_now);
+
        smgrextend(rel->rd_smgr, VISIBILITYMAP_FORKNUM, vm_nblocks_now,
                   (char *) pg, false);
        vm_nblocks_now++;
 
                     */
                    ItemIdMarkDead(curitemid);
                    opaque->btpo_flags |= BTP_HAS_GARBAGE;
-                   /* be sure to mark the proper buffer dirty... */
+
+                   /*
+                    * Mark buffer with a dirty hint, since state is not
+                    * crucial. Be sure to mark the proper buffer dirty.
+                    */
                    if (nbuf != InvalidBuffer)
-                       SetBufferCommitInfoNeedsSave(nbuf);
+                       MarkBufferDirtyHint(nbuf);
                    else
-                       SetBufferCommitInfoNeedsSave(buf);
+                       MarkBufferDirtyHint(buf);
                }
            }
        }
 
    _bt_initmetapage(metapage, P_NONE, 0);
 
    /* Write the page.  If archiving/streaming, XLOG it. */
+   PageSetChecksumInplace(metapage, BTREE_METAPAGE);
    smgrwrite(index->rd_smgr, INIT_FORKNUM, BTREE_METAPAGE,
              (char *) metapage, true);
    if (XLogIsNeeded())
                opaque->btpo_cycleid == vstate->cycleid)
            {
                opaque->btpo_cycleid = 0;
-               SetBufferCommitInfoNeedsSave(buf);
+               MarkBufferDirtyHint(buf);
            }
        }
 
 
    {
        if (!wstate->btws_zeropage)
            wstate->btws_zeropage = (Page) palloc0(BLCKSZ);
+       /* don't set checksum for all-zero page */
        smgrextend(wstate->index->rd_smgr, MAIN_FORKNUM,
                   wstate->btws_pages_written++,
                   (char *) wstate->btws_zeropage,
                   true);
    }
 
+   PageSetChecksumInplace(page, blkno);
+
    /*
     * Now write the page.  There's no need for smgr to schedule an fsync for
     * this write; we'll do it ourselves before ending the build.
 
    }
 
    /*
-    * Since this can be redone later if needed, it's treated the same as a
-    * commit-hint-bit status update for heap tuples: we mark the buffer dirty
-    * but don't make a WAL log entry.
+    * Since this can be redone later if needed, mark as dirty hint.
     *
     * Whenever we mark anything LP_DEAD, we also set the page's
     * BTP_HAS_GARBAGE flag, which is likewise just a hint.
    if (killedsomething)
    {
        opaque->btpo_flags |= BTP_HAS_GARBAGE;
-       SetBufferCommitInfoNeedsSave(so->currPos.buf);
+       MarkBufferDirtyHint(so->currPos.buf);
    }
 
    if (!haveLock)
 
        appendStringInfo(buf, "restore point: %s", xlrec->rp_name);
 
    }
+   else if (info == XLOG_HINT)
+   {
+       appendStringInfo(buf, "page hint");
+   }
    else if (info == XLOG_BACKUP_END)
    {
        XLogRecPtr  startpoint;
 
    SpGistInitMetapage(page);
 
    /* Write the page.  If archiving/streaming, XLOG it. */
+   PageSetChecksumInplace(page, SPGIST_METAPAGE_BLKNO);
    smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_METAPAGE_BLKNO,
              (char *) page, true);
    if (XLogIsNeeded())
    /* Likewise for the root page. */
    SpGistInitPage(page, SPGIST_LEAF);
 
+   PageSetChecksumInplace(page, SPGIST_ROOT_BLKNO);
    smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_ROOT_BLKNO,
              (char *) page, true);
    if (XLogIsNeeded())
    /* Likewise for the null-tuples root page. */
    SpGistInitPage(page, SPGIST_LEAF | SPGIST_NULLS);
 
+   PageSetChecksumInplace(page, SPGIST_NULL_BLKNO);
    smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_NULL_BLKNO,
              (char *) page, true);
    if (XLogIsNeeded())
 
 
 4. Mark the shared buffer(s) as dirty with MarkBufferDirty().  (This must
 happen before the WAL record is inserted; see notes in SyncOneBuffer().)
+Note that marking a buffer dirty with MarkBufferDirty() should only
+happen iff you write a WAL record; see Writing Hints below.
 
 5. If the relation requires WAL-logging, build a WAL log record and pass it
 to XLogInsert(); then update the page's LSN using the returned XLOG
 consistency.  Such insertions occur after WAL is operational, so they can
 and should write WAL records for the additional generated actions.
 
+Writing Hints
+-------------
+
+In some cases, we write additional information to data blocks without
+writing a preceding WAL record. This should only happen iff the data can
+be reconstructed later following a crash and the action is simply a way
+of optimising for performance. When a hint is written we use
+MarkBufferDirtyHint() to mark the block dirty.
+
+If the buffer is clean and checksums are in use then
+MarkBufferDirtyHint() inserts an XLOG_HINT record to ensure that we
+take a full page image that includes the hint. We do this to avoid
+a partial page write, when we write the dirtied page. WAL is not
+written during recovery, so we simply skip dirtying blocks because
+of hints when in recovery.
+
+If you do decide to optimise away a WAL record, then any calls to
+MarkBufferDirty() must be replaced by MarkBufferDirtyHint(),
+otherwise you will expose the risk of partial page writes.
+
 
 Write-Ahead Logging for Filesystem Actions
 ------------------------------------------
 
 #include "utils/timestamp.h"
 #include "pg_trace.h"
 
+extern bool bootstrap_data_checksums;
 
 /* File path names (all relative to $PGDATA) */
 #define RECOVERY_COMMAND_FILE  "recovery.conf"
    bool        updrqst;
    bool        doPageWrites;
    bool        isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
+   bool        isHint = (rmid == RM_XLOG_ID && info == XLOG_HINT);
    uint8       info_orig = info;
    static XLogRecord *rechdr;
 
        goto begin;
    }
 
+   /*
+    * If this is a hint record and we don't need a backup block then
+    * we have no more work to do and can exit quickly without inserting
+    * a WAL record at all. In that case return InvalidXLogRecPtr.
+    */
+   if (isHint && !(info & XLR_BKP_BLOCK_MASK))
+   {
+       LWLockRelease(WALInsertLock);
+       END_CRIT_SECTION();
+       return InvalidXLogRecPtr;
+   }
+
    /*
     * If the current page is completely full, the record goes to the next
     * page, right after the page header.
     * not. We don't need the buffer header lock for PageGetLSN because we
     * have exclusive lock on the page and/or the relation.
     */
-   *lsn = PageGetLSN(page);
+   *lsn = BufferGetLSNAtomic(rdata->buffer);
 
    if (doPageWrites &&
-       PageGetLSN(page) <= RedoRecPtr)
+       *lsn <= RedoRecPtr)
    {
        /*
         * The page needs to be backed up, so set up *bkpb
                       BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
            }
 
+           /*
+            * Any checksum set on this page will be invalid. We don't need
+            * to reset it here since it will be set before being written.
+            */
+
            PageSetLSN(page, lsn);
            MarkBufferDirty(buffer);
 
    return ControlFile->system_identifier;
 }
 
+/*
+ * Are checksums enabled for data pages?
+ */
+bool
+DataChecksumsEnabled(void)
+{
+   Assert(ControlFile != NULL);
+   return ControlFile->data_checksums;
+}
+
 /*
  * Returns a fake LSN for unlogged relations.
  *
    ControlFile->max_prepared_xacts = max_prepared_xacts;
    ControlFile->max_locks_per_xact = max_locks_per_xact;
    ControlFile->wal_level = wal_level;
+   ControlFile->data_checksums = bootstrap_data_checksums;
 
    /* some additional ControlFile fields are set in WriteControlFile() */
 
    return RecPtr;
 }
 
+/*
+ * Write a backup block if needed when we are setting a hint. Note that
+ * this may be called for a variety of page types, not just heaps.
+ *
+ * Deciding the "if needed" part is delicate and requires us to either
+ * grab WALInsertLock or check the info_lck spinlock. If we check the
+ * spinlock and it says Yes then we will need to get WALInsertLock as well,
+ * so the design choice here is to just go straight for the WALInsertLock
+ * and trust that calls to this function are minimised elsewhere.
+ *
+ * Callable while holding just share lock on the buffer content.
+ *
+ * Possible that multiple concurrent backends could attempt to write
+ * WAL records. In that case, more than one backup block may be recorded
+ * though that isn't important to the outcome and the backup blocks are
+ * likely to be identical anyway.
+ */
+#define    XLOG_HINT_WATERMARK     13579
+XLogRecPtr
+XLogSaveBufferForHint(Buffer buffer)
+{
+   /*
+    * Make an XLOG entry reporting the hint
+    */
+   XLogRecData rdata[2];
+   int         watermark = XLOG_HINT_WATERMARK;
+
+   /*
+    * Not allowed to have zero-length records, so use a small watermark
+    */
+   rdata[0].data = (char *) (&watermark);
+   rdata[0].len = sizeof(int);
+   rdata[0].buffer = InvalidBuffer;
+   rdata[0].buffer_std = false;
+   rdata[0].next = &(rdata[1]);
+
+   rdata[1].data = NULL;
+   rdata[1].len = 0;
+   rdata[1].buffer = buffer;
+   rdata[1].buffer_std = true;
+   rdata[1].next = NULL;
+
+   return XLogInsert(RM_XLOG_ID, XLOG_HINT, rdata);
+}
+
 /*
  * Check if any of the GUC parameters that are critical for hot standby
  * have changed, and update the value in pg_control file if necessary.
 {
    uint8       info = record->xl_info & ~XLR_INFO_MASK;
 
-   /* Backup blocks are not used in xlog records */
-   Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+   /* Backup blocks are not used in most xlog records */
+   Assert(info == XLOG_HINT || !(record->xl_info & XLR_BKP_BLOCK_MASK));
 
    if (info == XLOG_NEXTOID)
    {
    {
        /* nothing to do here */
    }
+   else if (info == XLOG_HINT)
+   {
+#ifdef USE_ASSERT_CHECKING
+       int *watermark = (int *) XLogRecGetData(record);
+#endif
+
+       /* Check the watermark is correct for the hint record */
+       Assert(*watermark == XLOG_HINT_WATERMARK);
+
+       /* Backup blocks must be present for smgr hint records */
+       Assert(record->xl_info & XLR_BKP_BLOCK_MASK);
+
+       /*
+        * Hint records have no information that needs to be replayed.
+        * The sole purpose of them is to ensure that a hint bit does
+        * not cause a checksum invalidation if a hint bit write should
+        * cause a torn page. So the body of the record is empty but
+        * there must be one backup block.
+        *
+        * Since the only change in the backup block is a hint bit,
+        * there is no confict with Hot Standby.
+        *
+        * This also means there is no corresponding API call for this,
+        * so an smgr implementation has no need to implement anything.
+        * Which means nothing is needed in md.c etc
+        */
+       RestoreBackupBlock(lsn, record, 0, false, false);
+   }
    else if (info == XLOG_BACKUP_END)
    {
        XLogRecPtr  startpoint;
 
 extern int optind;
 extern char *optarg;
 
+bool bootstrap_data_checksums = false;
+
 
 #define ALLOC(t, c)        ((t *) calloc((unsigned)(c), sizeof(t)))
 
    /* If no -x argument, we are a CheckerProcess */
    MyAuxProcType = CheckerProcess;
 
-   while ((flag = getopt(argc, argv, "B:c:d:D:Fr:x:-:")) != -1)
+   while ((flag = getopt(argc, argv, "B:c:d:D:Fkr:x:-:")) != -1)
    {
        switch (flag)
        {
            case 'F':
                SetConfigOption("fsync", "false", PGC_POSTMASTER, PGC_S_ARGV);
                break;
+           case 'k':
+               bootstrap_data_checksums = true;
+               break;
            case 'r':
                strlcpy(OutputFileName, optarg, MAXPGPATH);
                break;
 
        log_newpage(&(relation->rd_node), MAIN_FORKNUM, 0, page);
 
    RelationOpenSmgr(relation);
+
+   PageSetChecksumInplace(page, 0);
    smgrextend(relation->rd_smgr, MAIN_FORKNUM, 0, (char *) page, true);
 
    pfree(page);
 
        HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId);
        seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
        seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
-       SetBufferCommitInfoNeedsSave(*buf);
+       MarkBufferDirtyHint(*buf);
    }
 
    seq = (Form_pg_sequence) GETSTRUCT(seqtuple);
 
 
        smgrread(src, forkNum, blkno, buf);
 
+       PageSetChecksumInplace(page, blkno);
+
        /* XLOG stuff */
        if (use_wal)
            log_newpage(&dst->smgr_rnode.node, forkNum, blkno, page);
 
            {
                PageSetAllVisible(page);
                MarkBufferDirty(buf);
-               visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, vmbuffer,
-                                 InvalidTransactionId);
+               visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
+                                 vmbuffer, InvalidTransactionId);
            }
 
            UnlockReleaseBuffer(buf);
            {
                PageSetAllVisible(page);
                MarkBufferDirty(buf);
-               visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, vmbuffer,
-                                 visibility_cutoff_xid);
+               visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
+                                 vmbuffer, visibility_cutoff_xid);
            }
            else if (!all_visible_according_to_vm)
            {
                 * allowed.  Set the visibility map bit as well so that we get
                 * back in sync.
                 */
-               visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, vmbuffer,
-                                 visibility_cutoff_xid);
+               visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
+                                 vmbuffer, visibility_cutoff_xid);
            }
        }
 
    {
        Assert(BufferIsValid(*vmbuffer));
        PageSetAllVisible(page);
-       visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, *vmbuffer,
+       visibilitymap_set(onerel, blkno, buffer, InvalidXLogRecPtr, *vmbuffer,
                visibility_cutoff_xid);
    }
 
 
 #include <unistd.h>
 
 #include "catalog/catalog.h"
+#include "catalog/storage.h"
 #include "common/relpath.h"
 #include "executor/instrument.h"
 #include "miscadmin.h"
    {
        /* new buffers are zero-filled */
        MemSet((char *) bufBlock, 0, BLCKSZ);
+       /* don't set checksum for all-zero page */
        smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
    }
    else
            }
 
            /* check for garbage data */
-           if (!PageHeaderIsValid((PageHeader) bufBlock))
+           if (!PageIsVerified((Page) bufBlock, blockNum))
            {
                if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
                {
                    ereport(WARNING,
                            (errcode(ERRCODE_DATA_CORRUPTED),
-                            errmsg("invalid page header in block %u of relation %s; zeroing out page",
+                            errmsg("invalid page in block %u of relation %s; zeroing out page",
                                    blockNum,
                                    relpath(smgr->smgr_rnode, forkNum))));
                    MemSet((char *) bufBlock, 0, BLCKSZ);
                else
                    ereport(ERROR,
                            (errcode(ERRCODE_DATA_CORRUPTED),
-                    errmsg("invalid page header in block %u of relation %s",
+                    errmsg("invalid page in block %u of relation %s",
                            blockNum,
                            relpath(smgr->smgr_rnode, forkNum))));
            }
                 * victim.  We need lock to inspect the page LSN, so this
                 * can't be done inside StrategyGetBuffer.
                 */
-               if (strategy != NULL &&
-                   XLogNeedsFlush(BufferGetLSN(buf)) &&
-                   StrategyRejectBuffer(strategy, buf))
+               if (strategy != NULL)
                {
-                   /* Drop lock/pin and loop around for another buffer */
-                   LWLockRelease(buf->content_lock);
-                   UnpinBuffer(buf, true);
-                   continue;
+                   XLogRecPtr  lsn;
+
+                   /* Read the LSN while holding buffer header lock */
+                   LockBufHdr(buf);
+                   lsn = BufferGetLSN(buf);
+                   UnlockBufHdr(buf);
+
+                   if (XLogNeedsFlush(lsn) &&
+                       StrategyRejectBuffer(strategy, buf))
+                   {
+                       /* Drop lock/pin and loop around for another buffer */
+                       LWLockRelease(buf->content_lock);
+                       UnpinBuffer(buf, true);
+                       continue;
+                   }
                }
 
                /* OK, do the I/O */
    ErrorContextCallback errcallback;
    instr_time  io_start,
                io_time;
+   Block       bufBlock;
+   char        *bufToWrite;
 
    /*
     * Acquire the buffer's io_in_progress lock.  If StartBufferIO returns
                                        reln->smgr_rnode.node.dbNode,
                                        reln->smgr_rnode.node.relNode);
 
+   LockBufHdr(buf);
+
+   /*
+    * Run PageGetLSN while holding header lock, since we don't have the
+    * buffer locked exclusively in all cases.
+    */
+   recptr = BufferGetLSN(buf);
+
+   /* To check if block content changes while flushing. - vadim 01/17/97 */
+   buf->flags &= ~BM_JUST_DIRTIED;
+   UnlockBufHdr(buf);
+
    /*
     * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
     * rule that log updates must hit disk before any of the data-file changes
     * buffer isn't permanent.
     */
    if (buf->flags & BM_PERMANENT)
-   {
-       recptr = BufferGetLSN(buf);
        XLogFlush(recptr);
-   }
 
    /*
     * Now it's safe to write buffer to disk. Note that no one else should
     * we have the io_in_progress lock.
     */
 
-   /* To check if block content changes while flushing. - vadim 01/17/97 */
-   LockBufHdr(buf);
-   buf->flags &= ~BM_JUST_DIRTIED;
-   UnlockBufHdr(buf);
+   bufBlock = BufHdrGetBlock(buf);
+
+   bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum);
 
    if (track_io_timing)
        INSTR_TIME_SET_CURRENT(io_start);
 
+   /*
+    * bufToWrite is either the shared buffer or a copy, as appropriate.
+    */
    smgrwrite(reln,
              buf->tag.forkNum,
              buf->tag.blockNum,
-             (char *) BufHdrGetBlock(buf),
+             bufToWrite,
              false);
 
    if (track_io_timing)
    return (bufHdr->flags & BM_PERMANENT) != 0;
 }
 
+/*
+ * BufferGetLSNAtomic
+ *     Retrieves the LSN of the buffer atomically using a buffer header lock.
+ *     This is necessary for some callers who may not have an exclusive lock
+ *     on the buffer.
+ */
+XLogRecPtr
+BufferGetLSNAtomic(Buffer buffer)
+{
+   volatile BufferDesc *bufHdr = &BufferDescriptors[buffer - 1];
+   char                *page = BufferGetPage(buffer);
+   XLogRecPtr           lsn;
+
+   /* Local buffers don't need a lock. */
+   if (BufferIsLocal(buffer))
+       return PageGetLSN(page);
+
+   /* Make sure we've got a real buffer, and that we hold a pin on it. */
+   Assert(BufferIsValid(buffer));
+   Assert(BufferIsPinned(buffer));
+
+   LockBufHdr(bufHdr);
+   lsn = PageGetLSN(page);
+   UnlockBufHdr(bufHdr);
+
+   return lsn;
+}
+
 /* ---------------------------------------------------------------------
  *     DropRelFileNodeBuffers
  *
            if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
                (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
            {
-               ErrorContextCallback errcallback;
+               ErrorContextCallback    errcallback;
+               Page                    localpage;
+
+               localpage = (char *) LocalBufHdrGetBlock(bufHdr);
 
                /* Setup error traceback support for ereport() */
                errcallback.callback = local_buffer_write_error_callback;
                errcallback.previous = error_context_stack;
                error_context_stack = &errcallback;
 
+               PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
+
                smgrwrite(rel->rd_smgr,
                          bufHdr->tag.forkNum,
                          bufHdr->tag.blockNum,
-                         (char *) LocalBufHdrGetBlock(bufHdr),
+                         localpage,
                          false);
 
                bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
 }
 
 /*
- * SetBufferCommitInfoNeedsSave
+ * MarkBufferDirtyHint
  *
- * Mark a buffer dirty when we have updated tuple commit-status bits in it.
+ * Mark a buffer dirty for non-critical changes.
  *
- * This is essentially the same as MarkBufferDirty, except that the caller
- * might have only share-lock instead of exclusive-lock on the buffer's
- * content lock.  We preserve the distinction mainly as a way of documenting
- * that the caller has not made a critical data change --- the status-bit
- * update could be redone by someone else just as easily.  Therefore, no WAL
- * log record need be generated, whereas calls to MarkBufferDirty really ought
- * to be associated with a WAL-entry-creating action.
+ * This is essentially the same as MarkBufferDirty, except:
+ *
+ * 1. The caller does not write WAL; so if checksums are enabled, we may need
+ *    to write an XLOG_HINT WAL record to protect against torn pages.
+ * 2. The caller might have only share-lock instead of exclusive-lock on the
+ *    buffer's content lock.
+ * 3. This function does not guarantee that the buffer is always marked dirty
+ *    (due to a race condition), so it cannot be used for important changes.
  */
 void
-SetBufferCommitInfoNeedsSave(Buffer buffer)
+MarkBufferDirtyHint(Buffer buffer)
 {
    volatile BufferDesc *bufHdr;
+   Page    page = BufferGetPage(buffer);
 
    if (!BufferIsValid(buffer))
        elog(ERROR, "bad buffer ID: %d", buffer);
    /*
     * This routine might get called many times on the same page, if we are
     * making the first scan after commit of an xact that added/deleted many
-    * tuples.  So, be as quick as we can if the buffer is already dirty.  We
-    * do this by not acquiring spinlock if it looks like the status bits are
-    * already.  Since we make this test unlocked, there's a chance we might
-    * fail to notice that the flags have just been cleared, and failed to
-    * reset them, due to memory-ordering issues.  But since this function is
-    * only intended to be used in cases where failing to write out the data
+    * tuples. So, be as quick as we can if the buffer is already dirty.  We do
+    * this by not acquiring spinlock if it looks like the status bits are
+    * already set.  Since we make this test unlocked, there's a chance we
+    * might fail to notice that the flags have just been cleared, and failed
+    * to reset them, due to memory-ordering issues.  But since this function
+    * is only intended to be used in cases where failing to write out the data
     * would be harmless anyway, it doesn't really matter.
     */
    if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
        (BM_DIRTY | BM_JUST_DIRTIED))
    {
+       XLogRecPtr  lsn = InvalidXLogRecPtr;
+       bool        dirtied = false;
+       bool        delayChkpt = false;
+
+       /*
+        * If checksums are enabled, and the buffer is permanent, then a full
+        * page image may be required even for some hint bit updates to protect
+        * against torn pages. This full page image is only necessary if the
+        * hint bit update is the first change to the page since the last
+        * checkpoint.
+        *
+        * We don't check full_page_writes here because that logic is
+        * included when we call XLogInsert() since the value changes
+        * dynamically.
+        */
+       if (DataChecksumsEnabled() && (bufHdr->flags & BM_PERMANENT))
+       {
+           /*
+            * If we're in recovery we cannot dirty a page because of a hint.
+            * We can set the hint, just not dirty the page as a result so
+            * the hint is lost when we evict the page or shutdown.
+            *
+            * See src/backend/storage/page/README for longer discussion.
+            */
+           if (RecoveryInProgress())
+               return;
+
+           /*
+            * If the block is already dirty because we either made a change
+            * or set a hint already, then we don't need to write a full page
+            * image.  Note that aggressive cleaning of blocks
+            * dirtied by hint bit setting would increase the call rate.
+            * Bulk setting of hint bits would reduce the call rate...
+            *
+            * We must issue the WAL record before we mark the buffer dirty.
+            * Otherwise we might write the page before we write the WAL.
+            * That causes a race condition, since a checkpoint might occur
+            * between writing the WAL record and marking the buffer dirty.
+            * We solve that with a kluge, but one that is already in use
+            * during transaction commit to prevent race conditions.
+            * Basically, we simply prevent the checkpoint WAL record from
+            * being written until we have marked the buffer dirty. We don't
+            * start the checkpoint flush until we have marked dirty, so our
+            * checkpoint must flush the change to disk successfully or the
+            * checkpoint never gets written, so crash recovery will fix.
+            *
+            * It's possible we may enter here without an xid, so it is
+            * essential that CreateCheckpoint waits for virtual transactions
+            * rather than full transactionids.
+            */
+           MyPgXact->delayChkpt = delayChkpt = true;
+           lsn = XLogSaveBufferForHint(buffer);
+       }
+
        LockBufHdr(bufHdr);
        Assert(bufHdr->refcount > 0);
        if (!(bufHdr->flags & BM_DIRTY))
        {
-           /* Do vacuum cost accounting */
+           dirtied = true;     /* Means "will be dirtied by this action" */
+
+           /*
+            * Set the page LSN if we wrote a backup block. We aren't
+            * supposed to set this when only holding a share lock but
+            * as long as we serialise it somehow we're OK. We choose to
+            * set LSN while holding the buffer header lock, which causes
+            * any reader of an LSN who holds only a share lock to also
+            * obtain a buffer header lock before using PageGetLSN().
+            * Fortunately, thats not too many places.
+            *
+            * If checksums are enabled, you might think we should reset the
+            * checksum here. That will happen when the page is written
+            * sometime later in this checkpoint cycle.
+            */
+           if (!XLogRecPtrIsInvalid(lsn))
+               PageSetLSN(page, lsn);
+       }
+       bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+       UnlockBufHdr(bufHdr);
+
+       if (delayChkpt)
+           MyPgXact->delayChkpt = false;
+
+       if (dirtied)
+       {
            VacuumPageDirty++;
            if (VacuumCostActive)
                VacuumCostBalance += VacuumCostPageDirty;
        }
-       bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
-       UnlockBufHdr(bufHdr);
    }
 }
 
 
     */
    if (bufHdr->flags & BM_DIRTY)
    {
-       SMgrRelation oreln;
+       SMgrRelation    oreln;
+       Page            localpage = (char *) LocalBufHdrGetBlock(bufHdr);
 
        /* Find smgr relation for buffer */
        oreln = smgropen(bufHdr->tag.rnode, MyBackendId);
 
+       PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
+
        /* And write... */
        smgrwrite(oreln,
                  bufHdr->tag.forkNum,
                  bufHdr->tag.blockNum,
-                 (char *) LocalBufHdrGetBlock(bufHdr),
+                 localpage,
                  false);
 
        /* Mark not-dirty now in case we error out below */
 
 --------
 
 The FSM is not explicitly WAL-logged. Instead, we rely on a bunch of
-self-correcting measures to repair possible corruption.
+self-correcting measures to repair possible corruption. As a result when
+we write to the FSM we treat that as a hint and thus use MarkBufferDirtyHint()
+rather than MarkBufferDirty().
 
 First of all, whenever a value is set on an FSM page, the root node of the
 page is compared against the new value after bubbling up the change is
 
        PageInit(page, BLCKSZ, 0);
 
    if (fsm_set_avail(page, slot, new_cat))
-       MarkBufferDirty(buf);
+       MarkBufferDirtyHint(buf);
    UnlockReleaseBuffer(buf);
 }
 
            return;             /* nothing to do; the FSM was already smaller */
        LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
        fsm_truncate_avail(BufferGetPage(buf), first_removed_slot);
-       MarkBufferDirty(buf);
+       MarkBufferDirtyHint(buf);
        UnlockReleaseBuffer(buf);
 
        new_nfsmblocks = fsm_logical_to_physical(first_removed_address) + 1;
 
    while (fsm_nblocks_now < fsm_nblocks)
    {
+       PageSetChecksumInplace(pg, fsm_nblocks_now);
+
        smgrextend(rel->rd_smgr, FSM_FORKNUM, fsm_nblocks_now,
                   (char *) pg, false);
        fsm_nblocks_now++;
    page = BufferGetPage(buf);
 
    if (fsm_set_avail(page, slot, newValue))
-       MarkBufferDirty(buf);
+       MarkBufferDirtyHint(buf);
 
    if (minValue != 0)
    {
            {
                LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
                fsm_set_avail(BufferGetPage(buf), slot, child_avail);
-               MarkBufferDirty(buf);
+               MarkBufferDirtyHint(buf);
                LockBuffer(buf, BUFFER_LOCK_UNLOCK);
            }
        }
 
                exclusive_lock_held = true;
            }
            fsm_rebuild_page(page);
-           MarkBufferDirty(buf);
+           MarkBufferDirtyHint(buf);
            goto restart;
        }
    }
 
 #include "postgres.h"
 
 #include "access/htup_details.h"
+#include "access/xlog.h"
 
+bool ignore_checksum_failure = false;
+
+static char pageCopyData[BLCKSZ];  /* for checksum calculation */
+static Page pageCopy = pageCopyData;
+
+static uint16 PageCalcChecksum16(Page page, BlockNumber blkno);
 
 /* ----------------------------------------------------------------
  *                     Page support functions
 /*
  * PageInit
  *     Initializes the contents of a page.
+ *     Note that we don't calculate an initial checksum here; that's not done
+ *     until it's time to write.
  */
 void
 PageInit(Page page, Size pageSize, Size specialSize)
    /* Make sure all fields of page are zero, as well as unused space */
    MemSet(p, 0, pageSize);
 
-   /* p->pd_flags = 0;                             done by above MemSet */
+   p->pd_flags = 0;
    p->pd_lower = SizeOfPageHeaderData;
    p->pd_upper = pageSize - specialSize;
    p->pd_special = pageSize - specialSize;
 
 
 /*
- * PageHeaderIsValid
- *     Check that the header fields of a page appear valid.
+ * PageIsVerified
+ *     Check that the page header and checksum (if any) appear valid.
  *
  * This is called when a page has just been read in from disk. The idea is
  * to cheaply detect trashed pages before we go nuts following bogus item
  * will clean up such a page and make it usable.
  */
 bool
-PageHeaderIsValid(PageHeader page)
+PageIsVerified(Page page, BlockNumber blkno)
 {
+   PageHeader  p = (PageHeader) page;
    char       *pagebytes;
    int         i;
+   bool        checksum_failure = false;
+   bool        header_sane = false;
+   bool        all_zeroes = false;
+   uint16      checksum;
 
-   /* Check normal case */
-   if (PageGetPageSize(page) == BLCKSZ &&
-       PageGetPageLayoutVersion(page) == PG_PAGE_LAYOUT_VERSION &&
-       (page->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
-       page->pd_lower >= SizeOfPageHeaderData &&
-       page->pd_lower <= page->pd_upper &&
-       page->pd_upper <= page->pd_special &&
-       page->pd_special <= BLCKSZ &&
-       page->pd_special == MAXALIGN(page->pd_special))
-       return true;
+   /*
+    * Don't verify page data unless the page passes basic non-zero test
+    */
+   if (!PageIsNew(page))
+   {
+       if (DataChecksumsEnabled())
+       {
+           checksum = PageCalcChecksum16(page, blkno);
+
+           if (checksum != p->pd_checksum)
+               checksum_failure = true;
+       }
+
+       /*
+        * The following checks don't prove the header is correct,
+        * only that it looks sane enough to allow into the buffer pool.
+        * Later usage of the block can still reveal problems,
+        * which is why we offer the checksum option.
+        */
+       if ((p->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
+            p->pd_lower <= p->pd_upper &&
+            p->pd_upper <= p->pd_special &&
+            p->pd_special <= BLCKSZ &&
+            p->pd_special == MAXALIGN(p->pd_special))
+           header_sane = true;
+
+       if (header_sane && !checksum_failure)
+           return true;
+   }
 
    /* Check all-zeroes case */
+   all_zeroes = true;
    pagebytes = (char *) page;
    for (i = 0; i < BLCKSZ; i++)
    {
        if (pagebytes[i] != 0)
-           return false;
+       {
+           all_zeroes = false;
+           break;
+       }
+   }
+
+   if (all_zeroes)
+       return true;
+
+   /*
+    * Throw a WARNING if the checksum fails, but only after we've checked for
+    * the all-zeroes case.
+    */
+   if (checksum_failure)
+   {
+       ereport(WARNING,
+               (ERRCODE_DATA_CORRUPTED,
+                errmsg("page verification failed, calculated checksum %u but expected %u",
+                       checksum, p->pd_checksum)));
+
+       if (header_sane && ignore_checksum_failure)
+           return true;
    }
-   return true;
+
+   return false;
 }
 
 
 
    pfree(itemidbase);
 }
+
+/*
+ * Set checksum for page in shared buffers.
+ *
+ * If checksums are disabled, or if the page is not initialized, just return
+ * the input. Otherwise, we must make a copy of the page before calculating the
+ * checksum, to prevent concurrent modifications (e.g. setting hint bits) from
+ * making the final checksum invalid.
+ *
+ * Returns a pointer to the block-sized data that needs to be written. Uses
+ * statically-allocated memory, so the caller must immediately write the
+ * returned page and not refer to it again.
+ */
+char *
+PageSetChecksumCopy(Page page, BlockNumber blkno)
+{
+   if (PageIsNew(page) || !DataChecksumsEnabled())
+       return (char *) page;
+
+   /*
+    * We make a copy iff we need to calculate a checksum because other
+    * backends may set hint bits on this page while we write, which
+    * would mean the checksum differs from the page contents. It doesn't
+    * matter if we include or exclude hints during the copy, as long
+    * as we write a valid page and associated checksum.
+    */
+   memcpy((char *) pageCopy, (char *) page, BLCKSZ);
+   PageSetChecksumInplace(pageCopy, blkno);
+   return (char *) pageCopy;
+}
+
+/*
+ * Set checksum for page in private memory.
+ *
+ * This is a simpler version of PageSetChecksumCopy(). The more explicit API
+ * allows us to more easily see if we're making the correct call and reduces
+ * the amount of additional code specific to page verification.
+ */
+void
+PageSetChecksumInplace(Page page, BlockNumber blkno)
+{
+   if (PageIsNew(page))
+       return;
+
+   if (DataChecksumsEnabled())
+   {
+       PageHeader  p = (PageHeader) page;
+       p->pd_checksum = PageCalcChecksum16(page, blkno);
+   }
+
+   return;
+}
+
+/*
+ * Calculate checksum for a PostgreSQL Page. This includes the block number (to
+ * detect the case when a page is somehow moved to a different location), the
+ * page header (excluding the checksum itself), and the page data.
+ *
+ * Note that if the checksum validation fails we cannot tell the difference
+ * between a transposed block and failure from direct on-block corruption,
+ * though that is better than just ignoring transposed blocks altogether.
+ */
+static uint16
+PageCalcChecksum16(Page page, BlockNumber blkno)
+{
+   pg_crc32            crc;
+   PageHeader  p = (PageHeader) page;
+
+   /* only calculate the checksum for properly-initialized pages */
+   Assert(!PageIsNew(page));
+
+   INIT_CRC32(crc);
+
+   /*
+    * Initialize the checksum calculation with the block number. This helps
+    * catch corruption from whole blocks being transposed with other whole
+    * blocks.
+    */
+   COMP_CRC32(crc, &blkno, sizeof(blkno));
+
+   /*
+    * Now add in the LSN, which is always the first field on the page.
+    */
+   COMP_CRC32(crc, page, sizeof(p->pd_lsn));
+
+   /*
+    * Now add the rest of the page, skipping the pd_checksum field.
+    */
+   COMP_CRC32(crc, page + sizeof(p->pd_lsn) + sizeof(p->pd_checksum),
+                 BLCKSZ - sizeof(p->pd_lsn) - sizeof(p->pd_checksum));
+
+   FIN_CRC32(crc);
+
+   return (uint16) crc;
+}
 
 extern int CommitSiblings;
 extern char *default_tablespace;
 extern char *temp_tablespaces;
+extern bool ignore_checksum_failure;
 extern bool synchronize_seqscans;
 extern int ssl_renegotiation_limit;
 extern char *SSLCipherSuites;
        true,
        NULL, NULL, NULL
    },
+   {
+       {"ignore_checksum_failure", PGC_SUSET, DEVELOPER_OPTIONS,
+           gettext_noop("Continues processing after a checksum failure."),
+           gettext_noop("Detection of a checksum failure normally causes PostgreSQL to "
+               "report an error, aborting the current transaction. Setting "
+                        "ignore_checksum_failure to true causes the system to ignore the failure "
+                        "(but still report a warning), and continue processing. This "
+                        "behavior could cause crashes or other serious problems. Only "
+                        "has an effect if checksums are enabled."),
+           GUC_NOT_IN_SAMPLE
+       },
+       &ignore_checksum_failure,
+       false,
+       NULL, NULL, NULL
+   },
    {
        {"zero_damaged_pages", PGC_SUSET, DEVELOPER_OPTIONS,
            gettext_noop("Continues processing past damaged page headers."),
 
  * NOTE: all the HeapTupleSatisfies routines will update the tuple's
  * "hint" status bits if we see that the inserting or deleting transaction
  * has now committed or aborted (and it is safe to set the hint bits).
- * If the hint bits are changed, SetBufferCommitInfoNeedsSave is called on
+ * If the hint bits are changed, MarkBufferDirtyHint is called on
  * the passed-in buffer.  The caller must hold not only a pin, but at least
  * shared buffer content lock on the buffer containing the tuple.
  *
    }
 
    tuple->t_infomask |= infomask;
-   SetBufferCommitInfoNeedsSave(buffer);
+   MarkBufferDirtyHint(buffer);
 }
 
 /*
 
 static bool do_sync = true;
 static bool sync_only = false;
 static bool show_setting = false;
+static bool data_checksums = false;
 static char *xlog_dir = "";
 
 
    unsetenv("PGCLIENTENCODING");
 
    snprintf(cmd, sizeof(cmd),
-            "\"%s\" --boot -x1 %s %s",
-            backend_exec, boot_options, talkargs);
+            "\"%s\" --boot -x1 %s %s %s",
+            backend_exec,
+            data_checksums ? "-k" : "",
+            boot_options, talkargs);
 
    PG_CMD_OPEN;
 
    printf(_("  -X, --xlogdir=XLOGDIR     location for the transaction log directory\n"));
    printf(_("\nLess commonly used options:\n"));
    printf(_("  -d, --debug               generate lots of debugging output\n"));
+   printf(_("  -k, --data-checksums      data page checksums\n"));
    printf(_("  -L DIRECTORY              where to find the input files\n"));
    printf(_("  -n, --noclean             do not clean up after errors\n"));
    printf(_("  -N, --nosync              do not wait for changes to be written safely to disk\n"));
        {"nosync", no_argument, NULL, 'N'},
        {"sync-only", no_argument, NULL, 'S'},
        {"xlogdir", required_argument, NULL, 'X'},
+       {"data-checksums", no_argument, NULL, 'k'},
        {NULL, 0, NULL, 0}
    };
 
 
    /* process command-line options */
 
-   while ((c = getopt_long(argc, argv, "dD:E:L:nNU:WA:sST:X:", long_options, &option_index)) != -1)
+   while ((c = getopt_long(argc, argv, "dD:E:kL:nNU:WA:sST:X:", long_options, &option_index)) != -1)
    {
        switch (c)
        {
            case 'S':
                sync_only = true;
                break;
+           case 'k':
+               data_checksums = true;
+               break;
            case 'L':
                share_path = pg_strdup(optarg);
                break;
    setup_locale_encoding();
 
    setup_text_search();
+
+   if (data_checksums)
+       printf(_("Data page checksums are enabled.\n"));
+   else
+       printf(_("Data page checksums are disabled.\n"));
    
    printf("\n");
 
 
           (ControlFile.float4ByVal ? _("by value") : _("by reference")));
    printf(_("Float8 argument passing:              %s\n"),
           (ControlFile.float8ByVal ? _("by value") : _("by reference")));
+   printf(_("Data page checksums:                  %s\n"),
+          (ControlFile.data_checksums ? _("enabled") : _("disabled")));
    return 0;
 }
 
           (ControlFile.float4ByVal ? _("by value") : _("by reference")));
    printf(_("Float8 argument passing:              %s\n"),
           (ControlFile.float8ByVal ? _("by value") : _("by reference")));
+   printf(_("Data page checksums:                  %s\n"),
+          (ControlFile.data_checksums ? _("enabled") : _("disabled")));
 }
 
 
 
 extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer,
                TransactionId cutoff_xid, MultiXactId cutoff_multi,
                OffsetNumber *offsets, int offcnt);
-extern XLogRecPtr log_heap_visible(RelFileNode rnode, BlockNumber block,
+extern XLogRecPtr log_heap_visible(RelFileNode rnode, Buffer heap_buffer,
                 Buffer vm_buffer, TransactionId cutoff_xid);
 extern XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum,
            BlockNumber blk, Page page);
 
 extern void visibilitymap_pin(Relation rel, BlockNumber heapBlk,
                  Buffer *vmbuf);
 extern bool visibilitymap_pin_ok(BlockNumber heapBlk, Buffer vmbuf);
-extern void visibilitymap_set(Relation rel, BlockNumber heapBlk,
-                 XLogRecPtr recptr, Buffer vmbuf, TransactionId cutoff_xid);
+extern void visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
+                 XLogRecPtr recptr, Buffer vmBuf, TransactionId cutoff_xid);
 extern bool visibilitymap_test(Relation rel, BlockNumber heapBlk, Buffer *vmbuf);
 extern BlockNumber visibilitymap_count(Relation rel);
 extern void visibilitymap_truncate(Relation rel, BlockNumber nheapblocks);
 
 extern int XLogFileInit(XLogSegNo segno, bool *use_existent, bool use_lock);
 extern int XLogFileOpen(XLogSegNo segno);
 
+extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer);
+
 extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
 extern void XLogSetAsyncXactLSN(XLogRecPtr record);
 
 
 extern void UpdateControlFile(void);
 extern uint64 GetSystemIdentifier(void);
+extern bool DataChecksumsEnabled(void);
 extern XLogRecPtr GetFakeLSNForUnloggedRel(void);
 extern Size XLOGShmemSize(void);
 extern void XLOGShmemInit(void);
 
 
 
 /* Version identifier for this pg_control format */
-#define PG_CONTROL_VERSION 935
+#define PG_CONTROL_VERSION 936
 
 /*
  * Body of CheckPoint XLOG records.  This is declared here because we keep
 #define XLOG_BACKUP_END                    0x50
 #define XLOG_PARAMETER_CHANGE          0x60
 #define XLOG_RESTORE_POINT             0x70
-#define XLOG_FPW_CHANGE                0x80
+#define XLOG_FPW_CHANGE                    0x80
 #define XLOG_END_OF_RECOVERY           0x90
+#define XLOG_HINT                      0xA0
 
 
 /*
    bool        float4ByVal;    /* float4 pass-by-value? */
    bool        float8ByVal;    /* float8, int8, etc pass-by-value? */
 
+   /* Are data pages protected by checksums? */
+   bool        data_checksums;
+
    /* CRC of all above ... MUST BE LAST! */
    pg_crc32    crc;
 } ControlFileData;
 
    RelationGetNumberOfBlocksInFork(reln, MAIN_FORKNUM)
 
 extern bool BufferIsPermanent(Buffer buffer);
+extern XLogRecPtr BufferGetLSNAtomic(Buffer buffer);
 
 #ifdef NOT_USED
 extern void PrintPinnedBufs(void);
 extern void BufferGetTag(Buffer buffer, RelFileNode *rnode,
             ForkNumber *forknum, BlockNumber *blknum);
 
-extern void SetBufferCommitInfoNeedsSave(Buffer buffer);
+extern void MarkBufferDirtyHint(Buffer buffer);
 
 extern void UnlockBuffers(void);
 extern void LockBuffer(Buffer buffer, int mode);
 
 #define BUFPAGE_H
 
 #include "access/xlogdefs.h"
+#include "storage/block.h"
 #include "storage/item.h"
 #include "storage/off.h"
 
  */
 
 extern void PageInit(Page page, Size pageSize, Size specialSize);
-extern bool PageHeaderIsValid(PageHeader page);
+extern bool PageIsVerified(Page page, BlockNumber blkno);
 extern OffsetNumber PageAddItem(Page page, Item item, Size size,
            OffsetNumber offsetNumber, bool overwrite, bool is_heap);
 extern Page PageGetTempPage(Page page);
 extern Size PageGetHeapFreeSpace(Page page);
 extern void PageIndexTupleDelete(Page page, OffsetNumber offset);
 extern void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems);
+extern char *PageSetChecksumCopy(Page page, BlockNumber blkno);
+extern void PageSetChecksumInplace(Page page, BlockNumber blkno);
 
 #endif   /* BUFPAGE_H */