int recovery_prefetch = RECOVERY_PREFETCH_TRY;
#ifdef USE_PREFETCH
-#define RecoveryPrefetchEnabled() (recovery_prefetch != RECOVERY_PREFETCH_OFF)
+#define RecoveryPrefetchEnabled() \
+ (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
+ maintenance_io_concurrency > 0)
#else
#define RecoveryPrefetchEnabled() false
#endif
XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
{
DecodedXLogRecord *record;
+ XLogRecPtr replayed_up_to;
/*
* See if it's time to reset the prefetching machinery, because a relevant
if (RecoveryPrefetchEnabled())
{
- max_inflight = Max(maintenance_io_concurrency, 2);
+ Assert(maintenance_io_concurrency > 0);
+ max_inflight = maintenance_io_concurrency;
max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
}
else
}
/*
- * Release last returned record, if there is one. We need to do this so
- * that we can check for empty decode queue accurately.
+ * Release last returned record, if there is one, as it's now been
+ * replayed.
*/
- XLogReleasePreviousRecord(prefetcher->reader);
+ replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
- /* If there's nothing queued yet, then start prefetching. */
+ /*
+ * Can we drop any filters yet? If we were waiting for a relation to be
+ * created or extended, it is now OK to access blocks in the covered
+ * range.
+ */
+ XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
+
+ /*
+ * All IO initiated by earlier WAL is now completed. This might trigger
+ * further prefetching.
+ */
+ lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
+
+ /*
+ * If there's nothing queued yet, then start prefetching to cause at least
+ * one record to be queued.
+ */
if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
+ {
+ Assert(lrq_inflight(prefetcher->streaming_read) == 0);
+ Assert(lrq_completed(prefetcher->streaming_read) == 0);
lrq_prefetch(prefetcher->streaming_read);
+ }
/* Read the next record. */
record = XLogNextRecord(prefetcher->reader, errmsg);
Assert(record == prefetcher->reader->record);
/*
- * Can we drop any prefetch filters yet, given the record we're about to
- * return? This assumes that any records with earlier LSNs have been
- * replayed, so if we were waiting for a relation to be created or
- * extended, it is now OK to access blocks in the covered range.
+ * If maintenance_io_concurrency is set very low, we might have started
+ * prefetching some but not all of the blocks referenced in the record
+ * we're about to return. Forget about the rest of the blocks in this
+ * record by dropping the prefetcher's reference to it.
*/
- XLogPrefetcherCompleteFilters(prefetcher, record->lsn);
+ if (record == prefetcher->record)
+ prefetcher->record = NULL;
/*
* See if it's time to compute some statistics, because enough WAL has
if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
XLogPrefetcherComputeStats(prefetcher);
- /*
- * The caller is about to replay this record, so we can now report that
- * all IO initiated because of early WAL must be finished. This may
- * trigger more readahead.
- */
- lrq_complete_lsn(prefetcher->streaming_read, record->lsn);
-
Assert(record == prefetcher->reader->record);
return &record->header;
}
/*
- * See if we can release the last record that was returned by
- * XLogNextRecord(), if any, to free up space.
+ * Release the last record that was returned by XLogNextRecord(), if any, to
+ * free up space. Returns the LSN past the end of the record.
*/
-void
+XLogRecPtr
XLogReleasePreviousRecord(XLogReaderState *state)
{
DecodedXLogRecord *record;
+ XLogRecPtr next_lsn;
if (!state->record)
- return;
+ return InvalidXLogRecPtr;
/*
* Remove it from the decoded record queue. It must be the oldest item
* decoded, decode_queue_head.
*/
record = state->record;
+ next_lsn = record->next_lsn;
Assert(record == state->decode_queue_head);
state->record = NULL;
state->decode_queue_head = record->next;
state->decode_buffer_tail = state->decode_buffer;
}
}
+
+ return next_lsn;
}
/*
*/
state->abortedRecPtr = RecPtr;
state->missingContrecPtr = targetPagePtr;
+
+ /*
+ * If we got here without reporting an error, report one now so that
+ * XLogPrefetcherReadRecord() doesn't bring us back a second time and
+ * clobber the above state. Otherwise, the existing error takes
+ * precedence.
+ */
+ if (!state->errormsg_buf[0])
+ report_invalid_record(state,
+ "missing contrecord at %X/%X",
+ LSN_FORMAT_ARGS(RecPtr));
}
if (decoded && decoded->oversized)