int16 ios_in_progress;
int16 queue_size;
int16 max_pinned_buffers;
+ int16 forwarded_buffers;
int16 pinned_buffers;
int16 distance;
+ int16 initialized_buffers;
bool advice_enabled;
bool temporary;
read_stream_start_pending_read(ReadStream *stream)
{
bool need_wait;
+ int requested_nblocks;
int nblocks;
int flags;
+ int forwarded;
int16 io_index;
int16 overflow;
int16 buffer_index;
}
}
- /* How many more buffers is this backend allowed? */
+ /*
+ * How many more buffers is this backend allowed?
+ *
+ * Forwarded buffers are already pinned and map to the leading blocks of
+ * the pending read (the remaining portion of an earlier short read that
+ * we're about to continue). They are not counted in pinned_buffers, but
+ * they are counted as pins already held by this backend according to the
+ * buffer manager, so they must be added to the limit it grants us.
+ */
if (stream->temporary)
buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
else
buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
+ Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
+ buffer_limit += stream->forwarded_buffers;
if (buffer_limit == 0 && stream->pinned_buffers == 0)
buffer_limit = 1; /* guarantee progress */
/*
* We say how many blocks we want to read, but it may be smaller on return
- * if the buffer manager decides to shorten the read.
+ * if the buffer manager decides to shorten the read. Initialize buffers
+ * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
+ * and keep the original nblocks number so we can check for forwarded
+ * buffers as output, below.
*/
buffer_index = stream->next_buffer_index;
io_index = stream->next_io_index;
+ while (stream->initialized_buffers < buffer_index + nblocks)
+ stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
+ requested_nblocks = nblocks;
need_wait = StartReadBuffers(&stream->ios[io_index].op,
&stream->buffers[buffer_index],
stream->pending_read_blocknum,
stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
}
+ /*
+ * How many pins were acquired but forwarded to the next call? These need
+ * to be passed to the next StartReadBuffers() call by leaving them
+ * exactly where they are in the queue, or released if the stream ends
+ * early. We need the number for accounting purposes, since they are not
+ * counted in stream->pinned_buffers but we already hold them.
+ */
+ forwarded = 0;
+ while (nblocks + forwarded < requested_nblocks &&
+ stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
+ forwarded++;
+ stream->forwarded_buffers = forwarded;
+
/*
* We gave a contiguous range of buffer space to StartReadBuffers(), but
- * we want it to wrap around at queue_size. Slide overflowing buffers to
- * the front of the array.
+ * we want it to wrap around at queue_size. Copy overflowing buffers to
+ * the front of the array where they'll be consumed, but also leave a copy
+ * in the overflow zone which the I/O operation has a pointer to (it needs
+ * a contiguous array). Both copies will be cleared when the buffers are
+ * handed to the consumer.
*/
- overflow = (buffer_index + nblocks) - stream->queue_size;
+ overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
if (overflow > 0)
- memmove(&stream->buffers[0],
- &stream->buffers[stream->queue_size],
- sizeof(stream->buffers[0]) * overflow);
+ {
+ Assert(overflow < stream->queue_size); /* can't overlap */
+ memcpy(&stream->buffers[0],
+ &stream->buffers[stream->queue_size],
+ sizeof(stream->buffers[0]) * overflow);
+ }
/* Compute location of start of next read, without using % operator. */
buffer_index += nblocks;
/* Fast path assumptions. */
Assert(stream->ios_in_progress == 0);
+ Assert(stream->forwarded_buffers == 0);
Assert(stream->pinned_buffers == 1);
Assert(stream->distance == 1);
Assert(stream->pending_read_nblocks == 0);
Assert(stream->per_buffer_data_size == 0);
+ Assert(stream->initialized_buffers > stream->oldest_buffer_index);
/* We're going to return the buffer we pinned last time. */
oldest_buffer_index = stream->oldest_buffer_index;
stream->distance = 0;
stream->oldest_buffer_index = stream->next_buffer_index;
stream->pinned_buffers = 0;
+ stream->buffers[oldest_buffer_index] = InvalidBuffer;
}
stream->fast_path = false;
stream->seq_until_processed = InvalidBlockNumber;
}
-#ifdef CLOBBER_FREED_MEMORY
- /* Clobber old buffer for debugging purposes. */
+ /*
+ * We must zap this queue entry, or else it would appear as a forwarded
+ * buffer. If it's potentially in the overflow zone (ie from a
+ * multi-block I/O that wrapped around the queue), also zap the copy.
+ */
stream->buffers[oldest_buffer_index] = InvalidBuffer;
-#endif
+ if (oldest_buffer_index < stream->io_combine_limit - 1)
+ stream->buffers[stream->queue_size + oldest_buffer_index] =
+ InvalidBuffer;
#if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
#ifndef READ_STREAM_DISABLE_FAST_PATH
/* See if we can take the fast path for all-cached scans next time. */
if (stream->ios_in_progress == 0 &&
+ stream->forwarded_buffers == 0 &&
stream->pinned_buffers == 1 &&
stream->distance == 1 &&
stream->pending_read_nblocks == 0 &&
void
read_stream_reset(ReadStream *stream)
{
+ int16 index;
Buffer buffer;
/* Stop looking ahead. */
while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
ReleaseBuffer(buffer);
+ /* Unpin any unused forwarded buffers. */
+ index = stream->next_buffer_index;
+ while (index < stream->initialized_buffers &&
+ (buffer = stream->buffers[index]) != InvalidBuffer)
+ {
+ Assert(stream->forwarded_buffers > 0);
+ stream->forwarded_buffers--;
+ ReleaseBuffer(buffer);
+
+ stream->buffers[index] = InvalidBuffer;
+ if (index < stream->io_combine_limit - 1)
+ stream->buffers[stream->queue_size + index] = InvalidBuffer;
+
+ if (++index == stream->queue_size)
+ index = 0;
+ }
+
+ Assert(stream->forwarded_buffers == 0);
Assert(stream->pinned_buffers == 0);
Assert(stream->ios_in_progress == 0);