*
* Spilled data is written to logical tapes. These provide better control
* over memory usage, disk space, and the number of files than if we were
- * to use a BufFile for each spill.
+ * to use a BufFile for each spill. We don't know the number of tapes needed
+ * at the start of the algorithm (because it can recurse), so a tape set is
+ * allocated at the beginning, and individual tapes are created as needed.
+ * As a particular tape is read, logtape.c recycles its disk space. When a
+ * tape is read to completion, it is destroyed entirely.
+ *
+ * Tapes' buffers can take up substantial memory when many tapes are open at
+ * once. We only need one tape open at a time in read mode (using a buffer
+ * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
+ * requiring a buffer of size BLCKSZ) for each partition.
*
* Note that it's possible for transition states to start small but then
* grow very large; for instance in the case of ARRAY_AGG. In such cases,
*/
#define CHUNKHDRSZ 16
-/*
- * Track all tapes needed for a HashAgg that spills. We don't know the maximum
- * number of tapes needed at the start of the algorithm (because it can
- * recurse), so one tape set is allocated and extended as needed for new
- * tapes. When a particular tape is already read, rewind it for write mode and
- * put it in the free list.
- *
- * Tapes' buffers can take up substantial memory when many tapes are open at
- * once. We only need one tape open at a time in read mode (using a buffer
- * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
- * requiring a buffer of size BLCKSZ) for each partition.
- */
-typedef struct HashTapeInfo
-{
- LogicalTapeSet *tapeset;
- int ntapes;
- int *freetapes;
- int nfreetapes;
- int freetapes_alloc;
-} HashTapeInfo;
-
/*
* Represents partitioned spill data for a single hashtable. Contains the
* necessary information to route tuples to the correct partition, and to
*/
typedef struct HashAggSpill
{
- LogicalTapeSet *tapeset; /* borrowed reference to tape set */
int npartitions; /* number of partitions */
- int *partitions; /* spill partition tape numbers */
+ LogicalTape **partitions; /* spill partition tapes */
int64 *ntuples; /* number of tuples in each partition */
uint32 mask; /* mask to find partition from hash value */
int shift; /* after masking, shift by this amount */
{
int setno; /* grouping set */
int used_bits; /* number of bits of hash already used */
- LogicalTapeSet *tapeset; /* borrowed reference to tape set */
- int input_tapenum; /* input partition tape */
+ LogicalTape *input_tape; /* input partition tape */
int64 input_tuples; /* number of tuples in this batch */
double input_card; /* estimated group cardinality */
} HashAggBatch;
int npartitions);
static void hashagg_finish_initial_spills(AggState *aggstate);
static void hashagg_reset_spill_state(AggState *aggstate);
-static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset,
- int input_tapenum, int setno,
+static HashAggBatch *hashagg_batch_new(LogicalTape *input_tape, int setno,
int64 input_tuples, double input_card,
int used_bits);
static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
-static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
+static void hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *lts,
int used_bits, double input_groups,
double hashentrysize);
static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
TupleTableSlot *slot, uint32 hash);
static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
int setno);
-static void hashagg_tapeinfo_init(AggState *aggstate);
-static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest,
- int ndest);
-static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum);
static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
AggState *aggstate, EState *estate,
if (!aggstate->hash_ever_spilled)
{
- Assert(aggstate->hash_tapeinfo == NULL);
+ Assert(aggstate->hash_tapeset == NULL);
Assert(aggstate->hash_spills == NULL);
aggstate->hash_ever_spilled = true;
- hashagg_tapeinfo_init(aggstate);
+ aggstate->hash_tapeset = LogicalTapeSetCreate(true, NULL, -1);
aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes);
AggStatePerHash perhash = &aggstate->perhash[setno];
HashAggSpill *spill = &aggstate->hash_spills[setno];
- hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
+ hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
perhash->aggnode->numGroups,
aggstate->hashentrysize);
}
aggstate->hash_mem_peak = total_mem;
/* update disk usage */
- if (aggstate->hash_tapeinfo != NULL)
+ if (aggstate->hash_tapeset != NULL)
{
- uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024);
+ uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeset) * (BLCKSZ / 1024);
if (aggstate->hash_disk_used < disk_used)
aggstate->hash_disk_used = disk_used;
TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
if (spill->partitions == NULL)
- hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
+ hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
perhash->aggnode->numGroups,
aggstate->hashentrysize);
HashAggBatch *batch;
AggStatePerHash perhash;
HashAggSpill spill;
- HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
+ LogicalTapeSet *tapeset = aggstate->hash_tapeset;
bool spill_initialized = false;
if (aggstate->hash_batches == NIL)
* that we don't assign tapes that will never be used.
*/
spill_initialized = true;
- hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
+ hashagg_spill_init(&spill, tapeset, batch->used_bits,
batch->input_card, aggstate->hashentrysize);
}
/* no memory for a new group, spill */
ResetExprContext(aggstate->tmpcontext);
}
- hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);
+ LogicalTapeClose(batch->input_tape);
/* change back to phase 0 */
aggstate->current_phase = 0;
return NULL;
}
-/*
- * Initialize HashTapeInfo
- */
-static void
-hashagg_tapeinfo_init(AggState *aggstate)
-{
- HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo));
- int init_tapes = 16; /* expanded dynamically */
-
- tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, true, NULL, NULL, -1);
- tapeinfo->ntapes = init_tapes;
- tapeinfo->nfreetapes = init_tapes;
- tapeinfo->freetapes_alloc = init_tapes;
- tapeinfo->freetapes = palloc(init_tapes * sizeof(int));
- for (int i = 0; i < init_tapes; i++)
- tapeinfo->freetapes[i] = i;
-
- aggstate->hash_tapeinfo = tapeinfo;
-}
-
-/*
- * Assign unused tapes to spill partitions, extending the tape set if
- * necessary.
- */
-static void
-hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *partitions,
- int npartitions)
-{
- int partidx = 0;
-
- /* use free tapes if available */
- while (partidx < npartitions && tapeinfo->nfreetapes > 0)
- partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes];
-
- if (partidx < npartitions)
- {
- LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx);
-
- while (partidx < npartitions)
- partitions[partidx++] = tapeinfo->ntapes++;
- }
-}
-
-/*
- * After a tape has already been written to and then read, this function
- * rewinds it for writing and adds it to the free list.
- */
-static void
-hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
-{
- /* rewinding frees the buffer while not in use */
- LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum);
- if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes)
- {
- tapeinfo->freetapes_alloc <<= 1;
- tapeinfo->freetapes = repalloc(tapeinfo->freetapes,
- tapeinfo->freetapes_alloc * sizeof(int));
- }
- tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum;
-}
-
/*
* hashagg_spill_init
*
* of partitions to create, and initializes them.
*/
static void
-hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
+hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
double input_groups, double hashentrysize)
{
int npartitions;
npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
used_bits, &partition_bits);
- spill->partitions = palloc0(sizeof(int) * npartitions);
+ spill->partitions = palloc0(sizeof(LogicalTape *) * npartitions);
spill->ntuples = palloc0(sizeof(int64) * npartitions);
spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
- hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
+ for (int i = 0; i < npartitions; i++)
+ spill->partitions[i] = LogicalTapeCreate(tapeset);
- spill->tapeset = tapeinfo->tapeset;
spill->shift = 32 - used_bits - partition_bits;
spill->mask = (npartitions - 1) << spill->shift;
spill->npartitions = npartitions;
hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
TupleTableSlot *inputslot, uint32 hash)
{
- LogicalTapeSet *tapeset = spill->tapeset;
TupleTableSlot *spillslot;
int partition;
MinimalTuple tuple;
- int tapenum;
+ LogicalTape *tape;
int total_written = 0;
bool shouldFree;
*/
addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
- tapenum = spill->partitions[partition];
+ tape = spill->partitions[partition];
- LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
+ LogicalTapeWrite(tape, (void *) &hash, sizeof(uint32));
total_written += sizeof(uint32);
- LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len);
+ LogicalTapeWrite(tape, (void *) tuple, tuple->t_len);
total_written += tuple->t_len;
if (shouldFree)
* be done.
*/
static HashAggBatch *
-hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
+hashagg_batch_new(LogicalTape *input_tape, int setno,
int64 input_tuples, double input_card, int used_bits)
{
HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
batch->setno = setno;
batch->used_bits = used_bits;
- batch->tapeset = tapeset;
- batch->input_tapenum = tapenum;
+ batch->input_tape = input_tape;
batch->input_tuples = input_tuples;
batch->input_card = input_card;
static MinimalTuple
hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
{
- LogicalTapeSet *tapeset = batch->tapeset;
- int tapenum = batch->input_tapenum;
+ LogicalTape *tape = batch->input_tape;
MinimalTuple tuple;
uint32 t_len;
size_t nread;
uint32 hash;
- nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
+ nread = LogicalTapeRead(tape, &hash, sizeof(uint32));
if (nread == 0)
return NULL;
if (nread != sizeof(uint32))
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
- tapenum, sizeof(uint32), nread)));
+ errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
+ tape, sizeof(uint32), nread)));
if (hashp != NULL)
*hashp = hash;
- nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len));
+ nread = LogicalTapeRead(tape, &t_len, sizeof(t_len));
if (nread != sizeof(uint32))
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
- tapenum, sizeof(uint32), nread)));
+ errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
+ tape, sizeof(uint32), nread)));
tuple = (MinimalTuple) palloc(t_len);
tuple->t_len = t_len;
- nread = LogicalTapeRead(tapeset, tapenum,
+ nread = LogicalTapeRead(tape,
(void *) ((char *) tuple + sizeof(uint32)),
t_len - sizeof(uint32));
if (nread != t_len - sizeof(uint32))
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
- tapenum, t_len - sizeof(uint32), nread)));
+ errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
+ tape, t_len - sizeof(uint32), nread)));
return tuple;
}
for (i = 0; i < spill->npartitions; i++)
{
- LogicalTapeSet *tapeset = aggstate->hash_tapeinfo->tapeset;
- int tapenum = spill->partitions[i];
+ LogicalTape *tape = spill->partitions[i];
HashAggBatch *new_batch;
double cardinality;
freeHyperLogLog(&spill->hll_card[i]);
/* rewinding frees the buffer while not in use */
- LogicalTapeRewindForRead(tapeset, tapenum,
- HASHAGG_READ_BUFFER_SIZE);
+ LogicalTapeRewindForRead(tape, HASHAGG_READ_BUFFER_SIZE);
- new_batch = hashagg_batch_new(tapeset, tapenum, setno,
+ new_batch = hashagg_batch_new(tape, setno,
spill->ntuples[i], cardinality,
used_bits);
aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
aggstate->hash_batches = NIL;
/* close tape set */
- if (aggstate->hash_tapeinfo != NULL)
+ if (aggstate->hash_tapeset != NULL)
{
- HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
-
- LogicalTapeSetClose(tapeinfo->tapeset);
- pfree(tapeinfo->freetapes);
- pfree(tapeinfo);
- aggstate->hash_tapeinfo = NULL;
+ LogicalTapeSetClose(aggstate->hash_tapeset);
+ aggstate->hash_tapeset = NULL;
}
}
* there is an annoying problem: the peak space usage is at least twice
* the volume of actual data to be sorted. (This must be so because each
* datum will appear in both the input and output tapes of the final
- * merge pass. For seven-tape polyphase merge, which is otherwise a
- * pretty good algorithm, peak usage is more like 4x actual data volume.)
+ * merge pass.)
*
* We can work around this problem by recognizing that any one tape
* dataset (with the possible exception of the final output) is written
*/
typedef struct LogicalTape
{
+ LogicalTapeSet *tapeSet; /* tape set this tape is part of */
+
bool writing; /* T while in write phase */
bool frozen; /* T if blocks should not be freed when read */
bool dirty; /* does buffer need to be written? */
* This data structure represents a set of related "logical tapes" sharing
* space in a single underlying file. (But that "file" may be multiple files
* if needed to escape OS limits on file size; buffile.c handles that for us.)
- * The number of tapes is fixed at creation.
+ * Tapes belonging to a tape set can be created and destroyed on-the-fly, on
+ * demand.
*/
struct LogicalTapeSet
{
BufFile *pfile; /* underlying file for whole tape set */
+ SharedFileSet *fileset;
+ int worker; /* worker # if shared, -1 for leader/serial */
/*
* File size tracking. nBlocksWritten is the size of the underlying file,
long nFreeBlocks; /* # of currently free blocks */
Size freeBlocksLen; /* current allocated length of freeBlocks[] */
bool enable_prealloc; /* preallocate write blocks? */
-
- /* The array of logical tapes. */
- int nTapes; /* # of logical tapes in set */
- LogicalTape *tapes; /* has nTapes nentries */
};
+static LogicalTape *ltsCreateTape(LogicalTapeSet *lts);
static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
static long ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt);
static long ltsGetFreeBlock(LogicalTapeSet *lts);
static long ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt);
static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
-static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
- SharedFileSet *fileset);
-static void ltsInitTape(LogicalTape *lt);
-static void ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt);
+static void ltsInitReadBuffer(LogicalTape *lt);
/*
* Returns true if anything was read, 'false' on EOF.
*/
static bool
-ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
+ltsReadFillBuffer(LogicalTape *lt)
{
lt->pos = 0;
lt->nbytes = 0;
datablocknum += lt->offsetBlockNumber;
/* Read the block */
- ltsReadBlock(lts, datablocknum, (void *) thisbuf);
+ ltsReadBlock(lt->tapeSet, datablocknum, (void *) thisbuf);
if (!lt->frozen)
- ltsReleaseBlock(lts, datablocknum);
+ ltsReleaseBlock(lt->tapeSet, datablocknum);
lt->curBlockNumber = lt->nextBlockNumber;
lt->nbytes += TapeBlockGetNBytes(thisbuf);
}
}
-/*
- * Claim ownership of a set of logical tapes from existing shared BufFiles.
- *
- * Caller should be leader process. Though tapes are marked as frozen in
- * workers, they are not frozen when opened within leader, since unfrozen tapes
- * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
- * for random access.)
- */
-static void
-ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
- SharedFileSet *fileset)
-{
- LogicalTape *lt = NULL;
- long tapeblocks = 0L;
- long nphysicalblocks = 0L;
- int i;
-
- /* Should have at least one worker tape, plus leader's tape */
- Assert(lts->nTapes >= 2);
-
- /*
- * Build concatenated view of all BufFiles, remembering the block number
- * where each source file begins. No changes are needed for leader/last
- * tape.
- */
- for (i = 0; i < lts->nTapes - 1; i++)
- {
- char filename[MAXPGPATH];
- BufFile *file;
- int64 filesize;
-
- lt = <s->tapes[i];
-
- pg_itoa(i, filename);
- file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY, false);
- filesize = BufFileSize(file);
-
- /*
- * Stash first BufFile, and concatenate subsequent BufFiles to that.
- * Store block offset into each tape as we go.
- */
- lt->firstBlockNumber = shared[i].firstblocknumber;
- if (i == 0)
- {
- lts->pfile = file;
- lt->offsetBlockNumber = 0L;
- }
- else
- {
- lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
- }
- /* Don't allocate more for read buffer than could possibly help */
- lt->max_size = Min(MaxAllocSize, filesize);
- tapeblocks = filesize / BLCKSZ;
- nphysicalblocks += tapeblocks;
- }
-
- /*
- * Set # of allocated blocks, as well as # blocks written. Use extent of
- * new BufFile space (from 0 to end of last worker's tape space) for this.
- * Allocated/written blocks should include space used by holes left
- * between concatenated BufFiles.
- */
- lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
- lts->nBlocksWritten = lts->nBlocksAllocated;
-
- /*
- * Compute number of hole blocks so that we can later work backwards, and
- * instrument number of physical blocks. We don't simply use physical
- * blocks directly for instrumentation because this would break if we ever
- * subsequently wrote to the leader tape.
- *
- * Working backwards like this keeps our options open. If shared BufFiles
- * ever support being written to post-export, logtape.c can automatically
- * take advantage of that. We'd then support writing to the leader tape
- * while recycling space from worker tapes, because the leader tape has a
- * zero offset (write routines won't need to have extra logic to apply an
- * offset).
- *
- * The only thing that currently prevents writing to the leader tape from
- * working is the fact that BufFiles opened using BufFileOpenFileSet() are
- * read-only by definition, but that could be changed if it seemed
- * worthwhile. For now, writing to the leader tape will raise a "Bad file
- * descriptor" error, so tuplesort must avoid writing to the leader tape
- * altogether.
- */
- lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
-}
-
-/*
- * Initialize per-tape struct. Note we allocate the I/O buffer lazily.
- */
-static void
-ltsInitTape(LogicalTape *lt)
-{
- lt->writing = true;
- lt->frozen = false;
- lt->dirty = false;
- lt->firstBlockNumber = -1L;
- lt->curBlockNumber = -1L;
- lt->nextBlockNumber = -1L;
- lt->offsetBlockNumber = 0L;
- lt->buffer = NULL;
- lt->buffer_size = 0;
- /* palloc() larger than MaxAllocSize would fail */
- lt->max_size = MaxAllocSize;
- lt->pos = 0;
- lt->nbytes = 0;
- lt->prealloc = NULL;
- lt->nprealloc = 0;
- lt->prealloc_size = 0;
-}
-
/*
* Lazily allocate and initialize the read buffer. This avoids waste when many
* tapes are open at once, but not all are active between rewinding and
* reading.
*/
static void
-ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt)
+ltsInitReadBuffer(LogicalTape *lt)
{
Assert(lt->buffer_size > 0);
lt->buffer = palloc(lt->buffer_size);
lt->nextBlockNumber = lt->firstBlockNumber;
lt->pos = 0;
lt->nbytes = 0;
- ltsReadFillBuffer(lts, lt);
+ ltsReadFillBuffer(lt);
}
/*
- * Create a set of logical tapes in a temporary underlying file.
+ * Create a tape set, backed by a temporary underlying file.
*
- * Each tape is initialized in write state. Serial callers pass ntapes,
- * NULL argument for shared, and -1 for worker. Parallel worker callers
- * pass ntapes, a shared file handle, NULL shared argument, and their own
- * worker number. Leader callers, which claim shared worker tapes here,
- * must supply non-sentinel values for all arguments except worker number,
- * which should be -1.
+ * The tape set is initially empty. Use LogicalTapeCreate() to create
+ * tapes in it.
*
- * Leader caller is passing back an array of metadata each worker captured
- * when LogicalTapeFreeze() was called for their final result tapes. Passed
- * tapes array is actually sized ntapes - 1, because it includes only
- * worker tapes, whereas leader requires its own leader tape. Note that we
- * rely on the assumption that reclaimed worker tapes will only be read
- * from once by leader, and never written to again (tapes are initialized
- * for writing, but that's only to be consistent). Leader may not write to
- * its own tape purely due to a restriction in the shared buffile
- * infrastructure that may be lifted in the future.
+ * Serial callers pass NULL argument for shared, and -1 for worker. Parallel
+ * worker callers pass a shared file handle and their own worker number.
+ *
+ * Leader callers pass a shared file handle and -1 for worker. After creating
+ * the tape set, use LogicalTapeImport() to import the worker tapes into it.
+ *
+ * Currently, the leader will only import worker tapes into the set, it does
+ * not create tapes of its own, although in principle that should work.
*/
LogicalTapeSet *
-LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared,
- SharedFileSet *fileset, int worker)
+LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
{
LogicalTapeSet *lts;
- int i;
/*
* Create top-level struct including per-tape LogicalTape structs.
*/
- Assert(ntapes > 0);
lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
lts->nBlocksAllocated = 0L;
lts->nBlocksWritten = 0L;
lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
lts->nFreeBlocks = 0;
lts->enable_prealloc = preallocate;
- lts->nTapes = ntapes;
- lts->tapes = (LogicalTape *) palloc(ntapes * sizeof(LogicalTape));
- for (i = 0; i < ntapes; i++)
- ltsInitTape(<s->tapes[i]);
+ lts->fileset = fileset;
+ lts->worker = worker;
/*
* Create temp BufFile storage as required.
*
- * Leader concatenates worker tapes, which requires special adjustment to
- * final tapeset data. Things are simpler for the worker case and the
+ * In leader, we hijack the BufFile of the first tape that's imported, and
+ * concatenate the BufFiles of any subsequent tapes to that. Hence don't
+ * create a BufFile here. Things are simpler for the worker case and the
* serial case, though. They are generally very similar -- workers use a
* shared fileset, whereas serial sorts use a conventional serial BufFile.
*/
- if (shared)
- ltsConcatWorkerTapes(lts, shared, fileset);
+ if (fileset && worker == -1)
+ lts->pfile = NULL;
else if (fileset)
{
char filename[MAXPGPATH];
}
/*
- * Close a logical tape set and release all resources.
+ * Claim ownership of a logical tape from an existing shared BufFile.
+ *
+ * Caller should be leader process. Though tapes are marked as frozen in
+ * workers, they are not frozen when opened within leader, since unfrozen tapes
+ * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
+ * for random access.)
*/
-void
-LogicalTapeSetClose(LogicalTapeSet *lts)
+LogicalTape *
+LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
{
LogicalTape *lt;
- int i;
+ long tapeblocks;
+ char filename[MAXPGPATH];
+ BufFile *file;
+ int64 filesize;
- BufFileClose(lts->pfile);
- for (i = 0; i < lts->nTapes; i++)
+ lt = ltsCreateTape(lts);
+
+ /*
+ * build concatenated view of all buffiles, remembering the block number
+ * where each source file begins.
+ */
+ pg_itoa(worker, filename);
+ file = BufFileOpenFileSet(<s->fileset->fs, filename, O_RDONLY, false);
+ filesize = BufFileSize(file);
+
+ /*
+ * Stash first BufFile, and concatenate subsequent BufFiles to that. Store
+ * block offset into each tape as we go.
+ */
+ lt->firstBlockNumber = shared->firstblocknumber;
+ if (lts->pfile == NULL)
{
- lt = <s->tapes[i];
- if (lt->buffer)
- pfree(lt->buffer);
+ lts->pfile = file;
+ lt->offsetBlockNumber = 0L;
}
- pfree(lts->tapes);
+ else
+ {
+ lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
+ }
+ /* Don't allocate more for read buffer than could possibly help */
+ lt->max_size = Min(MaxAllocSize, filesize);
+ tapeblocks = filesize / BLCKSZ;
+
+ /*
+ * Update # of allocated blocks and # blocks written to reflect the
+ * imported BufFile. Allocated/written blocks include space used by holes
+ * left between concatenated BufFiles. Also track the number of hole
+ * blocks so that we can later work backwards to calculate the number of
+ * physical blocks for instrumentation.
+ */
+ lts->nHoleBlocks += lt->offsetBlockNumber - lts->nBlocksAllocated;
+
+ lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
+ lts->nBlocksWritten = lts->nBlocksAllocated;
+
+ return lt;
+}
+
+/*
+ * Close a logical tape set and release all resources.
+ *
+ * NOTE: This doesn't close any of the tapes! You must close them
+ * first, or you can let them be destroyed along with the memory context.
+ */
+void
+LogicalTapeSetClose(LogicalTapeSet *lts)
+{
+ BufFileClose(lts->pfile);
pfree(lts->freeBlocks);
pfree(lts);
}
+/*
+ * Create a logical tape in the given tapeset.
+ *
+ * The tape is initialized in write state.
+ */
+LogicalTape *
+LogicalTapeCreate(LogicalTapeSet *lts)
+{
+ /*
+ * The only thing that currently prevents creating new tapes in leader is
+ * the fact that BufFiles opened using BufFileOpenShared() are read-only
+ * by definition, but that could be changed if it seemed worthwhile. For
+ * now, writing to the leader tape will raise a "Bad file descriptor"
+ * error, so tuplesort must avoid writing to the leader tape altogether.
+ */
+ if (lts->fileset && lts->worker == -1)
+ elog(ERROR, "cannot create new tapes in leader process");
+
+ return ltsCreateTape(lts);
+}
+
+static LogicalTape *
+ltsCreateTape(LogicalTapeSet *lts)
+{
+ LogicalTape *lt;
+
+ /*
+ * Create per-tape struct. Note we allocate the I/O buffer lazily.
+ */
+ lt = palloc(sizeof(LogicalTape));
+ lt->tapeSet = lts;
+ lt->writing = true;
+ lt->frozen = false;
+ lt->dirty = false;
+ lt->firstBlockNumber = -1L;
+ lt->curBlockNumber = -1L;
+ lt->nextBlockNumber = -1L;
+ lt->offsetBlockNumber = 0L;
+ lt->buffer = NULL;
+ lt->buffer_size = 0;
+ /* palloc() larger than MaxAllocSize would fail */
+ lt->max_size = MaxAllocSize;
+ lt->pos = 0;
+ lt->nbytes = 0;
+ lt->prealloc = NULL;
+ lt->nprealloc = 0;
+ lt->prealloc_size = 0;
+
+ return lt;
+}
+
+/*
+ * Close a logical tape.
+ *
+ * Note: This doesn't return any blocks to the free list! You must read
+ * the tape to the end first, to reuse the space. In current use, though,
+ * we only close tapes after fully reading them.
+ */
+void
+LogicalTapeClose(LogicalTape *lt)
+{
+ if (lt->buffer)
+ pfree(lt->buffer);
+ pfree(lt);
+}
+
/*
* Mark a logical tape set as not needing management of free space anymore.
*
* There are no error returns; we ereport() on failure.
*/
void
-LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
- void *ptr, size_t size)
+LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size)
{
- LogicalTape *lt;
+ LogicalTapeSet *lts = lt->tapeSet;
size_t nthistime;
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
Assert(lt->writing);
Assert(lt->offsetBlockNumber == 0L);
* First allocate the next block, so that we can store it in the
* 'next' pointer of this block.
*/
- nextBlockNumber = ltsGetBlock(lts, lt);
+ nextBlockNumber = ltsGetBlock(lt->tapeSet, lt);
/* set the next-pointer and dump the current block. */
TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
- ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+ ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
/* initialize the prev-pointer of the next block */
TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
* byte buffer is used.
*/
void
-LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
+LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
{
- LogicalTape *lt;
-
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
+ LogicalTapeSet *lts = lt->tapeSet;
/*
* Round and cap buffer_size if needed.
lt->buffer_size - lt->nbytes);
TapeBlockSetNBytes(lt->buffer, lt->nbytes);
- ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+ ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
}
lt->writing = false;
}
}
}
-/*
- * Rewind logical tape and switch from reading to writing.
- *
- * NOTE: we assume the caller has read the tape to the end; otherwise
- * untouched data will not have been freed. We could add more code to free
- * any unread blocks, but in current usage of this module it'd be useless
- * code.
- */
-void
-LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
-{
- LogicalTape *lt;
-
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
-
- Assert(!lt->writing && !lt->frozen);
- lt->writing = true;
- lt->dirty = false;
- lt->firstBlockNumber = -1L;
- lt->curBlockNumber = -1L;
- lt->pos = 0;
- lt->nbytes = 0;
- if (lt->buffer)
- pfree(lt->buffer);
- lt->buffer = NULL;
- lt->buffer_size = 0;
-}
-
/*
* Read from a logical tape.
*
* Early EOF is indicated by return value less than #bytes requested.
*/
size_t
-LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
- void *ptr, size_t size)
+LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
{
- LogicalTape *lt;
size_t nread = 0;
size_t nthistime;
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
Assert(!lt->writing);
if (lt->buffer == NULL)
- ltsInitReadBuffer(lts, lt);
+ ltsInitReadBuffer(lt);
while (size > 0)
{
if (lt->pos >= lt->nbytes)
{
/* Try to load more data into buffer. */
- if (!ltsReadFillBuffer(lts, lt))
+ if (!ltsReadFillBuffer(lt))
break; /* EOF */
}
* Serial sorts should set share to NULL.
*/
void
-LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
+LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
{
- LogicalTape *lt;
+ LogicalTapeSet *lts = lt->tapeSet;
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
Assert(lt->writing);
Assert(lt->offsetBlockNumber == 0L);
lt->buffer_size - lt->nbytes);
TapeBlockSetNBytes(lt->buffer, lt->nbytes);
- ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
- lt->writing = false;
+ ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
}
lt->writing = false;
lt->frozen = true;
if (lt->firstBlockNumber == -1L)
lt->nextBlockNumber = -1L;
- ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+ ltsReadBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
if (TapeBlockIsLast(lt->buffer))
lt->nextBlockNumber = -1L;
else
}
}
-/*
- * Add additional tapes to this tape set. Not intended to be used when any
- * tapes are frozen.
- */
-void
-LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional)
-{
- int i;
- int nTapesOrig = lts->nTapes;
-
- lts->nTapes += nAdditional;
-
- lts->tapes = (LogicalTape *) repalloc(lts->tapes,
- lts->nTapes * sizeof(LogicalTape));
-
- for (i = nTapesOrig; i < lts->nTapes; i++)
- ltsInitTape(<s->tapes[i]);
-}
-
/*
* Backspace the tape a given number of bytes. (We also support a more
* general seek interface, see below.)
* that case.
*/
size_t
-LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
+LogicalTapeBackspace(LogicalTape *lt, size_t size)
{
- LogicalTape *lt;
size_t seekpos = 0;
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
Assert(lt->frozen);
Assert(lt->buffer_size == BLCKSZ);
if (lt->buffer == NULL)
- ltsInitReadBuffer(lts, lt);
+ ltsInitReadBuffer(lt);
/*
* Easy case for seek within current block.
return seekpos;
}
- ltsReadBlock(lts, prev, (void *) lt->buffer);
+ ltsReadBlock(lt->tapeSet, prev, (void *) lt->buffer);
if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
* LogicalTapeTell().
*/
void
-LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
- long blocknum, int offset)
+LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset)
{
- LogicalTape *lt;
-
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
Assert(lt->frozen);
Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
Assert(lt->buffer_size == BLCKSZ);
if (lt->buffer == NULL)
- ltsInitReadBuffer(lts, lt);
+ ltsInitReadBuffer(lt);
if (blocknum != lt->curBlockNumber)
{
- ltsReadBlock(lts, blocknum, (void *) lt->buffer);
+ ltsReadBlock(lt->tapeSet, blocknum, (void *) lt->buffer);
lt->curBlockNumber = blocknum;
lt->nbytes = TapeBlockPayloadSize;
lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
* the position for a seek after freezing. Not clear if anyone needs that.
*/
void
-LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
- long *blocknum, int *offset)
+LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset)
{
- LogicalTape *lt;
-
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
-
if (lt->buffer == NULL)
- ltsInitReadBuffer(lts, lt);
+ ltsInitReadBuffer(lt);
Assert(lt->offsetBlockNumber == 0L);
long
LogicalTapeSetBlocks(LogicalTapeSet *lts)
{
-#ifdef USE_ASSERT_CHECKING
- for (int i = 0; i < lts->nTapes; i++)
- {
- LogicalTape *lt = <s->tapes[i];
-
- Assert(!lt->writing || lt->buffer == NULL);
- }
-#endif
return lts->nBlocksWritten - lts->nHoleBlocks;
}
MemoryContext sortcontext; /* memory context holding most sort data */
MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
+ LogicalTape **tapes;
/*
* These function pointers decouple the routines that must know what kind
* SortTuple struct!), and increase state->availMem by the amount of
* memory space thereby released.
*/
- void (*writetup) (Tuplesortstate *state, int tapenum,
+ void (*writetup) (Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
/*
* from the slab memory arena, or is palloc'd, see readtup_alloc().
*/
void (*readtup) (Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len);
+ LogicalTape *tape, unsigned int len);
/*
* This array holds the tuples now in sort memory. If we are in state
* the next tuple to return. (In the tape case, the tape's current read
* position is also critical state.)
*/
- int result_tape; /* actual tape number of finished output */
+ LogicalTape *result_tape; /* tape of finished output */
int current; /* array index (only used if SORTEDINMEM) */
bool eof_reached; /* reached EOF (needed for cursors) */
*/
/* When using this macro, beware of double evaluation of len */
-#define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
+#define LogicalTapeReadExact(tape, ptr, len) \
do { \
- if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
+ if (LogicalTapeRead(tape, ptr, len) != (size_t) (len)) \
elog(ERROR, "unexpected end of data"); \
} while(0)
static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state);
static void beginmerge(Tuplesortstate *state);
-static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
+static bool mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup);
static void dumptuples(Tuplesortstate *state, bool alltuples);
static void make_bounded_heap(Tuplesortstate *state);
static void sort_bounded_heap(Tuplesortstate *state);
static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
static void tuplesort_heap_delete_top(Tuplesortstate *state);
static void reversedirection(Tuplesortstate *state);
-static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
-static void markrunend(Tuplesortstate *state, int tapenum);
+static unsigned int getlen(LogicalTape *tape, bool eofOK);
+static void markrunend(LogicalTape *tape);
static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
static int comparetup_heap(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_heap(Tuplesortstate *state, int tapenum,
+static void writetup_heap(Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len);
+ LogicalTape *tape, unsigned int len);
static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_cluster(Tuplesortstate *state, int tapenum,
+static void writetup_cluster(Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len);
+ LogicalTape *tape, unsigned int len);
static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_index(Tuplesortstate *state, int tapenum,
+static void writetup_index(Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
static void readtup_index(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len);
+ LogicalTape *tape, unsigned int len);
static int comparetup_datum(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_datum(Tuplesortstate *state, int tapenum,
+static void writetup_datum(Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len);
+ LogicalTape *tape, unsigned int len);
static int worker_get_identifier(Tuplesortstate *state);
static void worker_freeze_result_tape(Tuplesortstate *state);
static void worker_nomergeruns(Tuplesortstate *state);
* inittapes(), if needed
*/
- state->result_tape = -1; /* flag that result tape has not been formed */
+ state->result_tape = NULL; /* flag that result tape has not been formed */
MemoryContextSwitchTo(oldcontext);
}
if (state->eof_reached)
return false;
- if ((tuplen = getlen(state, state->result_tape, true)) != 0)
+ if ((tuplen = getlen(state->result_tape, true)) != 0)
{
READTUP(state, stup, state->result_tape, tuplen);
* end of file; back up to fetch last tuple's ending length
* word. If seek fails we must have a completely empty file.
*/
- nmoved = LogicalTapeBackspace(state->tapeset,
- state->result_tape,
+ nmoved = LogicalTapeBackspace(state->result_tape,
2 * sizeof(unsigned int));
if (nmoved == 0)
return false;
* Back up and fetch previously-returned tuple's ending length
* word. If seek fails, assume we are at start of file.
*/
- nmoved = LogicalTapeBackspace(state->tapeset,
- state->result_tape,
+ nmoved = LogicalTapeBackspace(state->result_tape,
sizeof(unsigned int));
if (nmoved == 0)
return false;
else if (nmoved != sizeof(unsigned int))
elog(ERROR, "unexpected tape position");
- tuplen = getlen(state, state->result_tape, false);
+ tuplen = getlen(state->result_tape, false);
/*
* Back up to get ending length word of tuple before it.
*/
- nmoved = LogicalTapeBackspace(state->tapeset,
- state->result_tape,
+ nmoved = LogicalTapeBackspace(state->result_tape,
tuplen + 2 * sizeof(unsigned int));
if (nmoved == tuplen + sizeof(unsigned int))
{
elog(ERROR, "bogus tuple length in backward scan");
}
- tuplen = getlen(state, state->result_tape, false);
+ tuplen = getlen(state->result_tape, false);
/*
* Now we have the length of the prior tuple, back up and read it.
* Note: READTUP expects we are positioned after the initial
* length word of the tuple, so back up to that point.
*/
- nmoved = LogicalTapeBackspace(state->tapeset,
- state->result_tape,
+ nmoved = LogicalTapeBackspace(state->result_tape,
tuplen);
if (nmoved != tuplen)
elog(ERROR, "bogus tuple length in backward scan");
tuplesort_heap_delete_top(state);
/*
- * Rewind to free the read buffer. It'd go away at the
- * end of the sort anyway, but better to release the
- * memory early.
+ * Close the tape. It'd go away at the end of the sort
+ * anyway, but better to release the memory early.
*/
- LogicalTapeRewindForWrite(state->tapeset, srcTape);
+ LogicalTapeClose(state->tapes[srcTape]);
return true;
}
newtup.srctape = srcTape;
/* Create the tape set and allocate the per-tape data arrays */
inittapestate(state, maxTapes);
state->tapeset =
- LogicalTapeSetCreate(maxTapes, false, NULL,
+ LogicalTapeSetCreate(false,
state->shared ? &state->shared->fileset : NULL,
state->worker);
+ state->tapes = palloc(maxTapes * sizeof(LogicalTape *));
+ for (j = 0; j < maxTapes; j++)
+ state->tapes[j] = LogicalTapeCreate(state->tapeset);
state->currentRun = 0;
/* End of step D2: rewind all output tapes to prepare for merging */
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
- LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
+ LogicalTapeRewindForRead(state->tapes[tapenum], state->read_buffer_size);
for (;;)
{
/* Step D6: decrease level */
if (--state->Level == 0)
break;
+
/* rewind output tape T to use as new input */
- LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
+ LogicalTapeRewindForRead(state->tapes[state->tp_tapenum[state->tapeRange]],
state->read_buffer_size);
- /* rewind used-up input tape P, and prepare it for write pass */
- LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
+
+ /* close used-up input tape P, and create a new one for write pass */
+ LogicalTapeClose(state->tapes[state->tp_tapenum[state->tapeRange - 1]]);
+ state->tapes[state->tp_tapenum[state->tapeRange - 1]] = LogicalTapeCreate(state->tapeset);
state->tp_runs[state->tapeRange - 1] = 0;
/*
* output tape while rewinding it. The last iteration of step D6 would be
* a waste of cycles anyway...
*/
- state->result_tape = state->tp_tapenum[state->tapeRange];
+ state->result_tape = state->tapes[state->tp_tapenum[state->tapeRange]];
if (!WORKER(state))
- LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
+ LogicalTapeFreeze(state->result_tape, NULL);
else
worker_freeze_result_tape(state);
state->status = TSS_SORTEDONTAPE;
- /* Release the read buffers of all the other tapes, by rewinding them. */
+ /* Close all the other tapes, to release their read buffers. */
for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
{
- if (tapenum != state->result_tape)
- LogicalTapeRewindForWrite(state->tapeset, tapenum);
+ if (state->tapes[tapenum] != state->result_tape)
+ {
+ LogicalTapeClose(state->tapes[tapenum]);
+ state->tapes[tapenum] = NULL;
+ }
}
}
static void
mergeonerun(Tuplesortstate *state)
{
- int destTape = state->tp_tapenum[state->tapeRange];
+ int destTapeNum = state->tp_tapenum[state->tapeRange];
+ LogicalTape *destTape = state->tapes[destTapeNum];
int srcTape;
/*
* When the heap empties, we're done. Write an end-of-run marker on the
* output tape, and increment its count of real runs.
*/
- markrunend(state, destTape);
+ markrunend(destTape);
state->tp_runs[state->tapeRange]++;
#ifdef TRACE_SORT
* Returns false on EOF.
*/
static bool
-mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
+mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup)
{
+ LogicalTape *srcTape = state->tapes[srcTapeIndex];
unsigned int tuplen;
- if (!state->mergeactive[srcTape])
+ if (!state->mergeactive[srcTapeIndex])
return false; /* tape's run is already exhausted */
/* read next tuple, if any */
- if ((tuplen = getlen(state, srcTape, true)) == 0)
+ if ((tuplen = getlen(srcTape, true)) == 0)
{
- state->mergeactive[srcTape] = false;
+ state->mergeactive[srcTapeIndex] = false;
return false;
}
READTUP(state, stup, srcTape, tuplen);
static void
dumptuples(Tuplesortstate *state, bool alltuples)
{
+ LogicalTape *destTape;
int memtupwrite;
int i;
#endif
memtupwrite = state->memtupcount;
+ destTape = state->tapes[state->tp_tapenum[state->destTape]];
for (i = 0; i < memtupwrite; i++)
{
- WRITETUP(state, state->tp_tapenum[state->destTape],
- &state->memtuples[i]);
+ WRITETUP(state, destTape, &state->memtuples[i]);
state->memtupcount--;
}
*/
MemoryContextReset(state->tuplecontext);
- markrunend(state, state->tp_tapenum[state->destTape]);
+ markrunend(destTape);
state->tp_runs[state->destTape]++;
state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
state->markpos_eof = false;
break;
case TSS_SORTEDONTAPE:
- LogicalTapeRewindForRead(state->tapeset,
- state->result_tape,
- 0);
+ LogicalTapeRewindForRead(state->result_tape, 0);
state->eof_reached = false;
state->markpos_block = 0L;
state->markpos_offset = 0;
state->markpos_eof = state->eof_reached;
break;
case TSS_SORTEDONTAPE:
- LogicalTapeTell(state->tapeset,
- state->result_tape,
+ LogicalTapeTell(state->result_tape,
&state->markpos_block,
&state->markpos_offset);
state->markpos_eof = state->eof_reached;
state->eof_reached = state->markpos_eof;
break;
case TSS_SORTEDONTAPE:
- LogicalTapeSeek(state->tapeset,
- state->result_tape,
+ LogicalTapeSeek(state->result_tape,
state->markpos_block,
state->markpos_offset);
state->eof_reached = state->markpos_eof;
*/
static unsigned int
-getlen(Tuplesortstate *state, int tapenum, bool eofOK)
+getlen(LogicalTape *tape, bool eofOK)
{
unsigned int len;
- if (LogicalTapeRead(state->tapeset, tapenum,
+ if (LogicalTapeRead(tape,
&len, sizeof(len)) != sizeof(len))
elog(ERROR, "unexpected end of tape");
if (len == 0 && !eofOK)
}
static void
-markrunend(Tuplesortstate *state, int tapenum)
+markrunend(LogicalTape *tape)
{
unsigned int len = 0;
- LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
+ LogicalTapeWrite(tape, (void *) &len, sizeof(len));
}
/*
}
static void
-writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_heap(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
{
MinimalTuple tuple = (MinimalTuple) stup->tuple;
/* total on-disk footprint: */
unsigned int tuplen = tupbodylen + sizeof(int);
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) &tuplen, sizeof(tuplen));
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) tupbody, tupbodylen);
+ LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, (void *) tupbody, tupbodylen);
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
if (!state->slabAllocatorUsed)
{
static void
readtup_heap(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len)
+ LogicalTape *tape, unsigned int len)
{
unsigned int tupbodylen = len - sizeof(int);
unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
/* read in the tuple proper */
tuple->t_len = tuplen;
- LogicalTapeReadExact(state->tapeset, tapenum,
- tupbody, tupbodylen);
+ LogicalTapeReadExact(tape, tupbody, tupbodylen);
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeReadExact(state->tapeset, tapenum,
- &tuplen, sizeof(tuplen));
+ LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
stup->tuple = (void *) tuple;
/* set up first-column key value */
htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
}
static void
-writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_cluster(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
{
HeapTuple tuple = (HeapTuple) stup->tuple;
unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
/* We need to store t_self, but not other fields of HeapTupleData */
- LogicalTapeWrite(state->tapeset, tapenum,
- &tuplen, sizeof(tuplen));
- LogicalTapeWrite(state->tapeset, tapenum,
- &tuple->t_self, sizeof(ItemPointerData));
- LogicalTapeWrite(state->tapeset, tapenum,
- tuple->t_data, tuple->t_len);
+ LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, &tuple->t_self, sizeof(ItemPointerData));
+ LogicalTapeWrite(tape, tuple->t_data, tuple->t_len);
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeWrite(state->tapeset, tapenum,
- &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
if (!state->slabAllocatorUsed)
{
static void
readtup_cluster(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int tuplen)
+ LogicalTape *tape, unsigned int tuplen)
{
unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
HeapTuple tuple = (HeapTuple) readtup_alloc(state,
/* Reconstruct the HeapTupleData header */
tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
tuple->t_len = t_len;
- LogicalTapeReadExact(state->tapeset, tapenum,
- &tuple->t_self, sizeof(ItemPointerData));
+ LogicalTapeReadExact(tape, &tuple->t_self, sizeof(ItemPointerData));
/* We don't currently bother to reconstruct t_tableOid */
tuple->t_tableOid = InvalidOid;
/* Read in the tuple body */
- LogicalTapeReadExact(state->tapeset, tapenum,
- tuple->t_data, tuple->t_len);
+ LogicalTapeReadExact(tape, tuple->t_data, tuple->t_len);
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeReadExact(state->tapeset, tapenum,
- &tuplen, sizeof(tuplen));
+ LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
stup->tuple = (void *) tuple;
/* set up first-column key value, if it's a simple column */
if (state->indexInfo->ii_IndexAttrNumbers[0] != 0)
}
static void
-writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_index(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
{
IndexTuple tuple = (IndexTuple) stup->tuple;
unsigned int tuplen;
tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) &tuplen, sizeof(tuplen));
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) tuple, IndexTupleSize(tuple));
+ LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, (void *) tuple, IndexTupleSize(tuple));
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
if (!state->slabAllocatorUsed)
{
static void
readtup_index(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len)
+ LogicalTape *tape, unsigned int len)
{
unsigned int tuplen = len - sizeof(unsigned int);
IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen);
- LogicalTapeReadExact(state->tapeset, tapenum,
- tuple, tuplen);
+ LogicalTapeReadExact(tape, tuple, tuplen);
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeReadExact(state->tapeset, tapenum,
- &tuplen, sizeof(tuplen));
+ LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
stup->tuple = (void *) tuple;
/* set up first-column key value */
stup->datum1 = index_getattr(tuple,
}
static void
-writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_datum(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
{
void *waddr;
unsigned int tuplen;
writtenlen = tuplen + sizeof(unsigned int);
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) &writtenlen, sizeof(writtenlen));
- LogicalTapeWrite(state->tapeset, tapenum,
- waddr, tuplen);
+ LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen));
+ LogicalTapeWrite(tape, waddr, tuplen);
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) &writtenlen, sizeof(writtenlen));
+ LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen));
if (!state->slabAllocatorUsed && stup->tuple)
{
static void
readtup_datum(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len)
+ LogicalTape *tape, unsigned int len)
{
unsigned int tuplen = len - sizeof(unsigned int);
else if (!state->tuples)
{
Assert(tuplen == sizeof(Datum));
- LogicalTapeReadExact(state->tapeset, tapenum,
- &stup->datum1, tuplen);
+ LogicalTapeReadExact(tape, &stup->datum1, tuplen);
stup->isnull1 = false;
stup->tuple = NULL;
}
{
void *raddr = readtup_alloc(state, tuplen);
- LogicalTapeReadExact(state->tapeset, tapenum,
- raddr, tuplen);
+ LogicalTapeReadExact(tape, raddr, tuplen);
stup->datum1 = PointerGetDatum(raddr);
stup->isnull1 = false;
stup->tuple = raddr;
}
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeReadExact(state->tapeset, tapenum,
- &tuplen, sizeof(tuplen));
+ LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
}
/*
TapeShare output;
Assert(WORKER(state));
- Assert(state->result_tape != -1);
+ Assert(state->result_tape != NULL);
Assert(state->memtupcount == 0);
/*
* Parallel worker requires result tape metadata, which is to be stored in
* shared memory for leader
*/
- LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
+ LogicalTapeFreeze(state->result_tape, &output);
/* Store properties of output tape, and update finished worker count */
SpinLockAcquire(&shared->mutex);
worker_nomergeruns(Tuplesortstate *state)
{
Assert(WORKER(state));
- Assert(state->result_tape == -1);
+ Assert(state->result_tape == NULL);
- state->result_tape = state->tp_tapenum[state->destTape];
+ state->result_tape = state->tapes[state->tp_tapenum[state->destTape]];
worker_freeze_result_tape(state);
}
* randomAccess is disallowed for parallel sorts.
*/
inittapestate(state, nParticipants + 1);
- state->tapeset = LogicalTapeSetCreate(nParticipants + 1, false,
- shared->tapes, &shared->fileset,
+ state->tapeset = LogicalTapeSetCreate(false,
+ &shared->fileset,
state->worker);
+ state->tapes = palloc(state->maxTapes * sizeof(LogicalTape *));
+ for (j = 0; j < nParticipants; j++)
+ state->tapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
+ /* tapes[nParticipants] represents the "leader tape", which is not used */
/* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
state->currentRun = nParticipants;
struct RangeTblEntry; /* avoid including parsenodes.h here */
struct ExprEvalStep; /* avoid including execExpr.h everywhere */
struct CopyMultiInsertBuffer;
+struct LogicalTapeSet;
/* ----------------
bool table_filled; /* hash table filled yet? */
int num_hashes;
MemoryContext hash_metacxt; /* memory for hash table itself */
- struct HashTapeInfo *hash_tapeinfo; /* metadata for spill tapes */
+ struct LogicalTapeSet *hash_tapeset; /* tape set for hash spill tapes */
struct HashAggSpill *hash_spills; /* HashAggSpill for each grouping set,
* exists only during first pass */
TupleTableSlot *hash_spill_rslot; /* for reading spill files */
#include "storage/sharedfileset.h"
-/* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */
-
+/*
+ * LogicalTapeSet and LogicalTape are opaque types whose details are not
+ * known outside logtape.c.
+ */
typedef struct LogicalTapeSet LogicalTapeSet;
+typedef struct LogicalTape LogicalTape;
+
/*
* The approach tuplesort.c takes to parallel external sorts is that workers,
* prototypes for functions in logtape.c
*/
-extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes, bool preallocate,
- TapeShare *shared,
+extern LogicalTapeSet *LogicalTapeSetCreate(bool preallocate,
SharedFileSet *fileset, int worker);
+extern void LogicalTapeClose(LogicalTape *lt);
extern void LogicalTapeSetClose(LogicalTapeSet *lts);
+extern LogicalTape *LogicalTapeCreate(LogicalTapeSet *lts);
+extern LogicalTape *LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared);
extern void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts);
-extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
- void *ptr, size_t size);
-extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
- void *ptr, size_t size);
-extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum,
- size_t buffer_size);
-extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum);
-extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum,
- TapeShare *share);
-extern void LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional);
-extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
- size_t size);
-extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
- long blocknum, int offset);
-extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
- long *blocknum, int *offset);
+extern size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size);
+extern void LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size);
+extern void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size);
+extern void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share);
+extern size_t LogicalTapeBackspace(LogicalTape *lt, size_t size);
+extern void LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset);
+extern void LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset);
extern long LogicalTapeSetBlocks(LogicalTapeSet *lts);
#endif /* LOGTAPE_H */