Refactor LogicalTapeSet/LogicalTape interface.
authorHeikki Linnakangas <[email protected]>
Mon, 18 Oct 2021 11:30:00 +0000 (14:30 +0300)
committerHeikki Linnakangas <[email protected]>
Mon, 18 Oct 2021 11:46:01 +0000 (14:46 +0300)
All the tape functions, like LogicalTapeRead and LogicalTapeWrite, now
take a LogicalTape as argument, instead of LogicalTapeSet+tape number.
You can create any number of LogicalTapes in a single LogicalTapeSet, and
you don't need to decide the number upfront, when you create the tape set.

This makes the tape management in hash agg spilling in nodeAgg.c simpler.

Discussion: https://www.postgresql.org/message-id/420a0ec7-602c-d406-1e75-1ef7ddc58d83%40iki.fi
Reviewed-by: Peter Geoghegan, Zhihong Yu, John Naylor
src/backend/executor/nodeAgg.c
src/backend/utils/sort/logtape.c
src/backend/utils/sort/tuplesort.c
src/include/nodes/execnodes.h
src/include/utils/logtape.h

index 39bea204d16ae2790588cb7fe7825aa84a366858..c99a0de4ddb757a83a528dea6c32ee98bf37f722 100644 (file)
  *
  *   Spilled data is written to logical tapes. These provide better control
  *   over memory usage, disk space, and the number of files than if we were
- *   to use a BufFile for each spill.
+ *   to use a BufFile for each spill.  We don't know the number of tapes needed
+ *   at the start of the algorithm (because it can recurse), so a tape set is
+ *   allocated at the beginning, and individual tapes are created as needed.
+ *   As a particular tape is read, logtape.c recycles its disk space. When a
+ *   tape is read to completion, it is destroyed entirely.
+ *
+ *   Tapes' buffers can take up substantial memory when many tapes are open at
+ *   once. We only need one tape open at a time in read mode (using a buffer
+ *   that's a multiple of BLCKSZ); but we need one tape open in write mode (each
+ *   requiring a buffer of size BLCKSZ) for each partition.
  *
  *   Note that it's possible for transition states to start small but then
  *   grow very large; for instance in the case of ARRAY_AGG. In such cases,
  */
 #define CHUNKHDRSZ 16
 
-/*
- * Track all tapes needed for a HashAgg that spills. We don't know the maximum
- * number of tapes needed at the start of the algorithm (because it can
- * recurse), so one tape set is allocated and extended as needed for new
- * tapes. When a particular tape is already read, rewind it for write mode and
- * put it in the free list.
- *
- * Tapes' buffers can take up substantial memory when many tapes are open at
- * once. We only need one tape open at a time in read mode (using a buffer
- * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
- * requiring a buffer of size BLCKSZ) for each partition.
- */
-typedef struct HashTapeInfo
-{
-   LogicalTapeSet *tapeset;
-   int         ntapes;
-   int        *freetapes;
-   int         nfreetapes;
-   int         freetapes_alloc;
-} HashTapeInfo;
-
 /*
  * Represents partitioned spill data for a single hashtable. Contains the
  * necessary information to route tuples to the correct partition, and to
@@ -343,9 +331,8 @@ typedef struct HashTapeInfo
  */
 typedef struct HashAggSpill
 {
-   LogicalTapeSet *tapeset;    /* borrowed reference to tape set */
    int         npartitions;    /* number of partitions */
-   int        *partitions;     /* spill partition tape numbers */
+   LogicalTape **partitions;   /* spill partition tapes */
    int64      *ntuples;        /* number of tuples in each partition */
    uint32      mask;           /* mask to find partition from hash value */
    int         shift;          /* after masking, shift by this amount */
@@ -365,8 +352,7 @@ typedef struct HashAggBatch
 {
    int         setno;          /* grouping set */
    int         used_bits;      /* number of bits of hash already used */
-   LogicalTapeSet *tapeset;    /* borrowed reference to tape set */
-   int         input_tapenum;  /* input partition tape */
+   LogicalTape *input_tape;    /* input partition tape */
    int64       input_tuples;   /* number of tuples in this batch */
    double      input_card;     /* estimated group cardinality */
 } HashAggBatch;
@@ -442,22 +428,17 @@ static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
                                    int npartitions);
 static void hashagg_finish_initial_spills(AggState *aggstate);
 static void hashagg_reset_spill_state(AggState *aggstate);
-static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset,
-                                      int input_tapenum, int setno,
+static HashAggBatch *hashagg_batch_new(LogicalTape *input_tape, int setno,
                                       int64 input_tuples, double input_card,
                                       int used_bits);
 static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
-static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
+static void hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *lts,
                               int used_bits, double input_groups,
                               double hashentrysize);
 static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
                                TupleTableSlot *slot, uint32 hash);
 static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
                                 int setno);
-static void hashagg_tapeinfo_init(AggState *aggstate);
-static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest,
-                                   int ndest);
-static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum);
 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
 static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
                                      AggState *aggstate, EState *estate,
@@ -1887,12 +1868,12 @@ hash_agg_enter_spill_mode(AggState *aggstate)
 
    if (!aggstate->hash_ever_spilled)
    {
-       Assert(aggstate->hash_tapeinfo == NULL);
+       Assert(aggstate->hash_tapeset == NULL);
        Assert(aggstate->hash_spills == NULL);
 
        aggstate->hash_ever_spilled = true;
 
-       hashagg_tapeinfo_init(aggstate);
+       aggstate->hash_tapeset = LogicalTapeSetCreate(true, NULL, -1);
 
        aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes);
 
@@ -1901,7 +1882,7 @@ hash_agg_enter_spill_mode(AggState *aggstate)
            AggStatePerHash perhash = &aggstate->perhash[setno];
            HashAggSpill *spill = &aggstate->hash_spills[setno];
 
-           hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
+           hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
                               perhash->aggnode->numGroups,
                               aggstate->hashentrysize);
        }
@@ -1943,9 +1924,9 @@ hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
        aggstate->hash_mem_peak = total_mem;
 
    /* update disk usage */
-   if (aggstate->hash_tapeinfo != NULL)
+   if (aggstate->hash_tapeset != NULL)
    {
-       uint64      disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024);
+       uint64      disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeset) * (BLCKSZ / 1024);
 
        if (aggstate->hash_disk_used < disk_used)
            aggstate->hash_disk_used = disk_used;
@@ -2132,7 +2113,7 @@ lookup_hash_entries(AggState *aggstate)
            TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
 
            if (spill->partitions == NULL)
-               hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
+               hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
                                   perhash->aggnode->numGroups,
                                   aggstate->hashentrysize);
 
@@ -2597,7 +2578,7 @@ agg_refill_hash_table(AggState *aggstate)
    HashAggBatch *batch;
    AggStatePerHash perhash;
    HashAggSpill spill;
-   HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
+   LogicalTapeSet *tapeset = aggstate->hash_tapeset;
    bool        spill_initialized = false;
 
    if (aggstate->hash_batches == NIL)
@@ -2693,7 +2674,7 @@ agg_refill_hash_table(AggState *aggstate)
                 * that we don't assign tapes that will never be used.
                 */
                spill_initialized = true;
-               hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
+               hashagg_spill_init(&spill, tapeset, batch->used_bits,
                                   batch->input_card, aggstate->hashentrysize);
            }
            /* no memory for a new group, spill */
@@ -2709,7 +2690,7 @@ agg_refill_hash_table(AggState *aggstate)
        ResetExprContext(aggstate->tmpcontext);
    }
 
-   hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);
+   LogicalTapeClose(batch->input_tape);
 
    /* change back to phase 0 */
    aggstate->current_phase = 0;
@@ -2884,67 +2865,6 @@ agg_retrieve_hash_table_in_memory(AggState *aggstate)
    return NULL;
 }
 
-/*
- * Initialize HashTapeInfo
- */
-static void
-hashagg_tapeinfo_init(AggState *aggstate)
-{
-   HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo));
-   int         init_tapes = 16;    /* expanded dynamically */
-
-   tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, true, NULL, NULL, -1);
-   tapeinfo->ntapes = init_tapes;
-   tapeinfo->nfreetapes = init_tapes;
-   tapeinfo->freetapes_alloc = init_tapes;
-   tapeinfo->freetapes = palloc(init_tapes * sizeof(int));
-   for (int i = 0; i < init_tapes; i++)
-       tapeinfo->freetapes[i] = i;
-
-   aggstate->hash_tapeinfo = tapeinfo;
-}
-
-/*
- * Assign unused tapes to spill partitions, extending the tape set if
- * necessary.
- */
-static void
-hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *partitions,
-                       int npartitions)
-{
-   int         partidx = 0;
-
-   /* use free tapes if available */
-   while (partidx < npartitions && tapeinfo->nfreetapes > 0)
-       partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes];
-
-   if (partidx < npartitions)
-   {
-       LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx);
-
-       while (partidx < npartitions)
-           partitions[partidx++] = tapeinfo->ntapes++;
-   }
-}
-
-/*
- * After a tape has already been written to and then read, this function
- * rewinds it for writing and adds it to the free list.
- */
-static void
-hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
-{
-   /* rewinding frees the buffer while not in use */
-   LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum);
-   if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes)
-   {
-       tapeinfo->freetapes_alloc <<= 1;
-       tapeinfo->freetapes = repalloc(tapeinfo->freetapes,
-                                      tapeinfo->freetapes_alloc * sizeof(int));
-   }
-   tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum;
-}
-
 /*
  * hashagg_spill_init
  *
@@ -2952,7 +2872,7 @@ hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
  * of partitions to create, and initializes them.
  */
 static void
-hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
+hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
                   double input_groups, double hashentrysize)
 {
    int         npartitions;
@@ -2961,13 +2881,13 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
    npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
                                             used_bits, &partition_bits);
 
-   spill->partitions = palloc0(sizeof(int) * npartitions);
+   spill->partitions = palloc0(sizeof(LogicalTape *) * npartitions);
    spill->ntuples = palloc0(sizeof(int64) * npartitions);
    spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
 
-   hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
+   for (int i = 0; i < npartitions; i++)
+       spill->partitions[i] = LogicalTapeCreate(tapeset);
 
-   spill->tapeset = tapeinfo->tapeset;
    spill->shift = 32 - used_bits - partition_bits;
    spill->mask = (npartitions - 1) << spill->shift;
    spill->npartitions = npartitions;
@@ -2986,11 +2906,10 @@ static Size
 hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
                    TupleTableSlot *inputslot, uint32 hash)
 {
-   LogicalTapeSet *tapeset = spill->tapeset;
    TupleTableSlot *spillslot;
    int         partition;
    MinimalTuple tuple;
-   int         tapenum;
+   LogicalTape *tape;
    int         total_written = 0;
    bool        shouldFree;
 
@@ -3029,12 +2948,12 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
     */
    addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
 
-   tapenum = spill->partitions[partition];
+   tape = spill->partitions[partition];
 
-   LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
+   LogicalTapeWrite(tape, (void *) &hash, sizeof(uint32));
    total_written += sizeof(uint32);
 
-   LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len);
+   LogicalTapeWrite(tape, (void *) tuple, tuple->t_len);
    total_written += tuple->t_len;
 
    if (shouldFree)
@@ -3050,15 +2969,14 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
  * be done.
  */
 static HashAggBatch *
-hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
+hashagg_batch_new(LogicalTape *input_tape, int setno,
                  int64 input_tuples, double input_card, int used_bits)
 {
    HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
 
    batch->setno = setno;
    batch->used_bits = used_bits;
-   batch->tapeset = tapeset;
-   batch->input_tapenum = tapenum;
+   batch->input_tape = input_tape;
    batch->input_tuples = input_tuples;
    batch->input_card = input_card;
 
@@ -3072,42 +2990,41 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
 static MinimalTuple
 hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
 {
-   LogicalTapeSet *tapeset = batch->tapeset;
-   int         tapenum = batch->input_tapenum;
+   LogicalTape *tape = batch->input_tape;
    MinimalTuple tuple;
    uint32      t_len;
    size_t      nread;
    uint32      hash;
 
-   nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
+   nread = LogicalTapeRead(tape, &hash, sizeof(uint32));
    if (nread == 0)
        return NULL;
    if (nread != sizeof(uint32))
        ereport(ERROR,
                (errcode_for_file_access(),
-                errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
-                       tapenum, sizeof(uint32), nread)));
+                errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
+                       tape, sizeof(uint32), nread)));
    if (hashp != NULL)
        *hashp = hash;
 
-   nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len));
+   nread = LogicalTapeRead(tape, &t_len, sizeof(t_len));
    if (nread != sizeof(uint32))
        ereport(ERROR,
                (errcode_for_file_access(),
-                errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
-                       tapenum, sizeof(uint32), nread)));
+                errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
+                       tape, sizeof(uint32), nread)));
 
    tuple = (MinimalTuple) palloc(t_len);
    tuple->t_len = t_len;
 
-   nread = LogicalTapeRead(tapeset, tapenum,
+   nread = LogicalTapeRead(tape,
                            (void *) ((char *) tuple + sizeof(uint32)),
                            t_len - sizeof(uint32));
    if (nread != t_len - sizeof(uint32))
        ereport(ERROR,
                (errcode_for_file_access(),
-                errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
-                       tapenum, t_len - sizeof(uint32), nread)));
+                errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
+                       tape, t_len - sizeof(uint32), nread)));
 
    return tuple;
 }
@@ -3164,8 +3081,7 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
 
    for (i = 0; i < spill->npartitions; i++)
    {
-       LogicalTapeSet *tapeset = aggstate->hash_tapeinfo->tapeset;
-       int         tapenum = spill->partitions[i];
+       LogicalTape *tape = spill->partitions[i];
        HashAggBatch *new_batch;
        double      cardinality;
 
@@ -3177,10 +3093,9 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
        freeHyperLogLog(&spill->hll_card[i]);
 
        /* rewinding frees the buffer while not in use */
-       LogicalTapeRewindForRead(tapeset, tapenum,
-                                HASHAGG_READ_BUFFER_SIZE);
+       LogicalTapeRewindForRead(tape, HASHAGG_READ_BUFFER_SIZE);
 
-       new_batch = hashagg_batch_new(tapeset, tapenum, setno,
+       new_batch = hashagg_batch_new(tape, setno,
                                      spill->ntuples[i], cardinality,
                                      used_bits);
        aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
@@ -3227,14 +3142,10 @@ hashagg_reset_spill_state(AggState *aggstate)
    aggstate->hash_batches = NIL;
 
    /* close tape set */
-   if (aggstate->hash_tapeinfo != NULL)
+   if (aggstate->hash_tapeset != NULL)
    {
-       HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
-
-       LogicalTapeSetClose(tapeinfo->tapeset);
-       pfree(tapeinfo->freetapes);
-       pfree(tapeinfo);
-       aggstate->hash_tapeinfo = NULL;
+       LogicalTapeSetClose(aggstate->hash_tapeset);
+       aggstate->hash_tapeset = NULL;
    }
 }
 
index debf12e1b0bd4e2cc12489e5e9e46766b06aa0c0..6d7f862fb5c400db9ed8695f1b2c858be3e964b8 100644 (file)
@@ -9,8 +9,7 @@
  * there is an annoying problem: the peak space usage is at least twice
  * the volume of actual data to be sorted.  (This must be so because each
  * datum will appear in both the input and output tapes of the final
- * merge pass.  For seven-tape polyphase merge, which is otherwise a
- * pretty good algorithm, peak usage is more like 4x actual data volume.)
+ * merge pass.)
  *
  * We can work around this problem by recognizing that any one tape
  * dataset (with the possible exception of the final output) is written
@@ -137,6 +136,8 @@ typedef struct TapeBlockTrailer
  */
 typedef struct LogicalTape
 {
+   LogicalTapeSet *tapeSet;    /* tape set this tape is part of */
+
    bool        writing;        /* T while in write phase */
    bool        frozen;         /* T if blocks should not be freed when read */
    bool        dirty;          /* does buffer need to be written? */
@@ -180,11 +181,14 @@ typedef struct LogicalTape
  * This data structure represents a set of related "logical tapes" sharing
  * space in a single underlying file.  (But that "file" may be multiple files
  * if needed to escape OS limits on file size; buffile.c handles that for us.)
- * The number of tapes is fixed at creation.
+ * Tapes belonging to a tape set can be created and destroyed on-the-fly, on
+ * demand.
  */
 struct LogicalTapeSet
 {
    BufFile    *pfile;          /* underlying file for whole tape set */
+   SharedFileSet *fileset;
+   int         worker;         /* worker # if shared, -1 for leader/serial */
 
    /*
     * File size tracking.  nBlocksWritten is the size of the underlying file,
@@ -213,22 +217,16 @@ struct LogicalTapeSet
    long        nFreeBlocks;    /* # of currently free blocks */
    Size        freeBlocksLen;  /* current allocated length of freeBlocks[] */
    bool        enable_prealloc;    /* preallocate write blocks? */
-
-   /* The array of logical tapes. */
-   int         nTapes;         /* # of logical tapes in set */
-   LogicalTape *tapes;         /* has nTapes nentries */
 };
 
+static LogicalTape *ltsCreateTape(LogicalTapeSet *lts);
 static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
 static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
 static long ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt);
 static long ltsGetFreeBlock(LogicalTapeSet *lts);
 static long ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt);
 static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
-static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
-                                SharedFileSet *fileset);
-static void ltsInitTape(LogicalTape *lt);
-static void ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt);
+static void ltsInitReadBuffer(LogicalTape *lt);
 
 
 /*
@@ -304,7 +302,7 @@ ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
  * Returns true if anything was read, 'false' on EOF.
  */
 static bool
-ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
+ltsReadFillBuffer(LogicalTape *lt)
 {
    lt->pos = 0;
    lt->nbytes = 0;
@@ -321,9 +319,9 @@ ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
        datablocknum += lt->offsetBlockNumber;
 
        /* Read the block */
-       ltsReadBlock(lts, datablocknum, (void *) thisbuf);
+       ltsReadBlock(lt->tapeSet, datablocknum, (void *) thisbuf);
        if (!lt->frozen)
-           ltsReleaseBlock(lts, datablocknum);
+           ltsReleaseBlock(lt->tapeSet, datablocknum);
        lt->curBlockNumber = lt->nextBlockNumber;
 
        lt->nbytes += TapeBlockGetNBytes(thisbuf);
@@ -530,126 +528,13 @@ ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
    }
 }
 
-/*
- * Claim ownership of a set of logical tapes from existing shared BufFiles.
- *
- * Caller should be leader process.  Though tapes are marked as frozen in
- * workers, they are not frozen when opened within leader, since unfrozen tapes
- * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
- * for random access.)
- */
-static void
-ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
-                    SharedFileSet *fileset)
-{
-   LogicalTape *lt = NULL;
-   long        tapeblocks = 0L;
-   long        nphysicalblocks = 0L;
-   int         i;
-
-   /* Should have at least one worker tape, plus leader's tape */
-   Assert(lts->nTapes >= 2);
-
-   /*
-    * Build concatenated view of all BufFiles, remembering the block number
-    * where each source file begins.  No changes are needed for leader/last
-    * tape.
-    */
-   for (i = 0; i < lts->nTapes - 1; i++)
-   {
-       char        filename[MAXPGPATH];
-       BufFile    *file;
-       int64       filesize;
-
-       lt = &lts->tapes[i];
-
-       pg_itoa(i, filename);
-       file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY, false);
-       filesize = BufFileSize(file);
-
-       /*
-        * Stash first BufFile, and concatenate subsequent BufFiles to that.
-        * Store block offset into each tape as we go.
-        */
-       lt->firstBlockNumber = shared[i].firstblocknumber;
-       if (i == 0)
-       {
-           lts->pfile = file;
-           lt->offsetBlockNumber = 0L;
-       }
-       else
-       {
-           lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
-       }
-       /* Don't allocate more for read buffer than could possibly help */
-       lt->max_size = Min(MaxAllocSize, filesize);
-       tapeblocks = filesize / BLCKSZ;
-       nphysicalblocks += tapeblocks;
-   }
-
-   /*
-    * Set # of allocated blocks, as well as # blocks written.  Use extent of
-    * new BufFile space (from 0 to end of last worker's tape space) for this.
-    * Allocated/written blocks should include space used by holes left
-    * between concatenated BufFiles.
-    */
-   lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
-   lts->nBlocksWritten = lts->nBlocksAllocated;
-
-   /*
-    * Compute number of hole blocks so that we can later work backwards, and
-    * instrument number of physical blocks.  We don't simply use physical
-    * blocks directly for instrumentation because this would break if we ever
-    * subsequently wrote to the leader tape.
-    *
-    * Working backwards like this keeps our options open.  If shared BufFiles
-    * ever support being written to post-export, logtape.c can automatically
-    * take advantage of that.  We'd then support writing to the leader tape
-    * while recycling space from worker tapes, because the leader tape has a
-    * zero offset (write routines won't need to have extra logic to apply an
-    * offset).
-    *
-    * The only thing that currently prevents writing to the leader tape from
-    * working is the fact that BufFiles opened using BufFileOpenFileSet() are
-    * read-only by definition, but that could be changed if it seemed
-    * worthwhile.  For now, writing to the leader tape will raise a "Bad file
-    * descriptor" error, so tuplesort must avoid writing to the leader tape
-    * altogether.
-    */
-   lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
-}
-
-/*
- * Initialize per-tape struct.  Note we allocate the I/O buffer lazily.
- */
-static void
-ltsInitTape(LogicalTape *lt)
-{
-   lt->writing = true;
-   lt->frozen = false;
-   lt->dirty = false;
-   lt->firstBlockNumber = -1L;
-   lt->curBlockNumber = -1L;
-   lt->nextBlockNumber = -1L;
-   lt->offsetBlockNumber = 0L;
-   lt->buffer = NULL;
-   lt->buffer_size = 0;
-   /* palloc() larger than MaxAllocSize would fail */
-   lt->max_size = MaxAllocSize;
-   lt->pos = 0;
-   lt->nbytes = 0;
-   lt->prealloc = NULL;
-   lt->nprealloc = 0;
-   lt->prealloc_size = 0;
-}
-
 /*
  * Lazily allocate and initialize the read buffer. This avoids waste when many
  * tapes are open at once, but not all are active between rewinding and
  * reading.
  */
 static void
-ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt)
+ltsInitReadBuffer(LogicalTape *lt)
 {
    Assert(lt->buffer_size > 0);
    lt->buffer = palloc(lt->buffer_size);
@@ -658,40 +543,32 @@ ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt)
    lt->nextBlockNumber = lt->firstBlockNumber;
    lt->pos = 0;
    lt->nbytes = 0;
-   ltsReadFillBuffer(lts, lt);
+   ltsReadFillBuffer(lt);
 }
 
 /*
- * Create a set of logical tapes in a temporary underlying file.
+ * Create a tape set, backed by a temporary underlying file.
  *
- * Each tape is initialized in write state.  Serial callers pass ntapes,
- * NULL argument for shared, and -1 for worker.  Parallel worker callers
- * pass ntapes, a shared file handle, NULL shared argument,  and their own
- * worker number.  Leader callers, which claim shared worker tapes here,
- * must supply non-sentinel values for all arguments except worker number,
- * which should be -1.
+ * The tape set is initially empty. Use LogicalTapeCreate() to create
+ * tapes in it.
  *
- * Leader caller is passing back an array of metadata each worker captured
- * when LogicalTapeFreeze() was called for their final result tapes.  Passed
- * tapes array is actually sized ntapes - 1, because it includes only
- * worker tapes, whereas leader requires its own leader tape.  Note that we
- * rely on the assumption that reclaimed worker tapes will only be read
- * from once by leader, and never written to again (tapes are initialized
- * for writing, but that's only to be consistent).  Leader may not write to
- * its own tape purely due to a restriction in the shared buffile
- * infrastructure that may be lifted in the future.
+ * Serial callers pass NULL argument for shared, and -1 for worker.  Parallel
+ * worker callers pass a shared file handle and their own worker number.
+ *
+ * Leader callers pass a shared file handle and -1 for worker. After creating
+ * the tape set, use LogicalTapeImport() to import the worker tapes into it.
+ *
+ * Currently, the leader will only import worker tapes into the set, it does
+ * not create tapes of its own, although in principle that should work.
  */
 LogicalTapeSet *
-LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared,
-                    SharedFileSet *fileset, int worker)
+LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
 {
    LogicalTapeSet *lts;
-   int         i;
 
    /*
     * Create top-level struct including per-tape LogicalTape structs.
     */
-   Assert(ntapes > 0);
    lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
    lts->nBlocksAllocated = 0L;
    lts->nBlocksWritten = 0L;
@@ -701,22 +578,21 @@ LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared,
    lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
    lts->nFreeBlocks = 0;
    lts->enable_prealloc = preallocate;
-   lts->nTapes = ntapes;
-   lts->tapes = (LogicalTape *) palloc(ntapes * sizeof(LogicalTape));
 
-   for (i = 0; i < ntapes; i++)
-       ltsInitTape(&lts->tapes[i]);
+   lts->fileset = fileset;
+   lts->worker = worker;
 
    /*
     * Create temp BufFile storage as required.
     *
-    * Leader concatenates worker tapes, which requires special adjustment to
-    * final tapeset data.  Things are simpler for the worker case and the
+    * In leader, we hijack the BufFile of the first tape that's imported, and
+    * concatenate the BufFiles of any subsequent tapes to that. Hence don't
+    * create a BufFile here. Things are simpler for the worker case and the
     * serial case, though.  They are generally very similar -- workers use a
     * shared fileset, whereas serial sorts use a conventional serial BufFile.
     */
-   if (shared)
-       ltsConcatWorkerTapes(lts, shared, fileset);
+   if (fileset && worker == -1)
+       lts->pfile = NULL;
    else if (fileset)
    {
        char        filename[MAXPGPATH];
@@ -731,26 +607,145 @@ LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared,
 }
 
 /*
- * Close a logical tape set and release all resources.
+ * Claim ownership of a logical tape from an existing shared BufFile.
+ *
+ * Caller should be leader process.  Though tapes are marked as frozen in
+ * workers, they are not frozen when opened within leader, since unfrozen tapes
+ * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
+ * for random access.)
  */
-void
-LogicalTapeSetClose(LogicalTapeSet *lts)
+LogicalTape *
+LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
 {
    LogicalTape *lt;
-   int         i;
+   long        tapeblocks;
+   char        filename[MAXPGPATH];
+   BufFile    *file;
+   int64       filesize;
 
-   BufFileClose(lts->pfile);
-   for (i = 0; i < lts->nTapes; i++)
+   lt = ltsCreateTape(lts);
+
+   /*
+    * build concatenated view of all buffiles, remembering the block number
+    * where each source file begins.
+    */
+   pg_itoa(worker, filename);
+   file = BufFileOpenFileSet(&lts->fileset->fs, filename, O_RDONLY, false);
+   filesize = BufFileSize(file);
+
+   /*
+    * Stash first BufFile, and concatenate subsequent BufFiles to that. Store
+    * block offset into each tape as we go.
+    */
+   lt->firstBlockNumber = shared->firstblocknumber;
+   if (lts->pfile == NULL)
    {
-       lt = &lts->tapes[i];
-       if (lt->buffer)
-           pfree(lt->buffer);
+       lts->pfile = file;
+       lt->offsetBlockNumber = 0L;
    }
-   pfree(lts->tapes);
+   else
+   {
+       lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
+   }
+   /* Don't allocate more for read buffer than could possibly help */
+   lt->max_size = Min(MaxAllocSize, filesize);
+   tapeblocks = filesize / BLCKSZ;
+
+   /*
+    * Update # of allocated blocks and # blocks written to reflect the
+    * imported BufFile.  Allocated/written blocks include space used by holes
+    * left between concatenated BufFiles.  Also track the number of hole
+    * blocks so that we can later work backwards to calculate the number of
+    * physical blocks for instrumentation.
+    */
+   lts->nHoleBlocks += lt->offsetBlockNumber - lts->nBlocksAllocated;
+
+   lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
+   lts->nBlocksWritten = lts->nBlocksAllocated;
+
+   return lt;
+}
+
+/*
+ * Close a logical tape set and release all resources.
+ *
+ * NOTE: This doesn't close any of the tapes!  You must close them
+ * first, or you can let them be destroyed along with the memory context.
+ */
+void
+LogicalTapeSetClose(LogicalTapeSet *lts)
+{
+   BufFileClose(lts->pfile);
    pfree(lts->freeBlocks);
    pfree(lts);
 }
 
+/*
+ * Create a logical tape in the given tapeset.
+ *
+ * The tape is initialized in write state.
+ */
+LogicalTape *
+LogicalTapeCreate(LogicalTapeSet *lts)
+{
+   /*
+    * The only thing that currently prevents creating new tapes in leader is
+    * the fact that BufFiles opened using BufFileOpenShared() are read-only
+    * by definition, but that could be changed if it seemed worthwhile.  For
+    * now, writing to the leader tape will raise a "Bad file descriptor"
+    * error, so tuplesort must avoid writing to the leader tape altogether.
+    */
+   if (lts->fileset && lts->worker == -1)
+       elog(ERROR, "cannot create new tapes in leader process");
+
+   return ltsCreateTape(lts);
+}
+
+static LogicalTape *
+ltsCreateTape(LogicalTapeSet *lts)
+{
+   LogicalTape *lt;
+
+   /*
+    * Create per-tape struct.  Note we allocate the I/O buffer lazily.
+    */
+   lt = palloc(sizeof(LogicalTape));
+   lt->tapeSet = lts;
+   lt->writing = true;
+   lt->frozen = false;
+   lt->dirty = false;
+   lt->firstBlockNumber = -1L;
+   lt->curBlockNumber = -1L;
+   lt->nextBlockNumber = -1L;
+   lt->offsetBlockNumber = 0L;
+   lt->buffer = NULL;
+   lt->buffer_size = 0;
+   /* palloc() larger than MaxAllocSize would fail */
+   lt->max_size = MaxAllocSize;
+   lt->pos = 0;
+   lt->nbytes = 0;
+   lt->prealloc = NULL;
+   lt->nprealloc = 0;
+   lt->prealloc_size = 0;
+
+   return lt;
+}
+
+/*
+ * Close a logical tape.
+ *
+ * Note: This doesn't return any blocks to the free list!  You must read
+ * the tape to the end first, to reuse the space.  In current use, though,
+ * we only close tapes after fully reading them.
+ */
+void
+LogicalTapeClose(LogicalTape *lt)
+{
+   if (lt->buffer)
+       pfree(lt->buffer);
+   pfree(lt);
+}
+
 /*
  * Mark a logical tape set as not needing management of free space anymore.
  *
@@ -772,14 +767,11 @@ LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
  * There are no error returns; we ereport() on failure.
  */
 void
-LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
-                void *ptr, size_t size)
+LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size)
 {
-   LogicalTape *lt;
+   LogicalTapeSet *lts = lt->tapeSet;
    size_t      nthistime;
 
-   Assert(tapenum >= 0 && tapenum < lts->nTapes);
-   lt = &lts->tapes[tapenum];
    Assert(lt->writing);
    Assert(lt->offsetBlockNumber == 0L);
 
@@ -818,11 +810,11 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
             * First allocate the next block, so that we can store it in the
             * 'next' pointer of this block.
             */
-           nextBlockNumber = ltsGetBlock(lts, lt);
+           nextBlockNumber = ltsGetBlock(lt->tapeSet, lt);
 
            /* set the next-pointer and dump the current block. */
            TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
-           ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+           ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
 
            /* initialize the prev-pointer of the next block */
            TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
@@ -860,12 +852,9 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
  * byte buffer is used.
  */
 void
-LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
+LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
 {
-   LogicalTape *lt;
-
-   Assert(tapenum >= 0 && tapenum < lts->nTapes);
-   lt = &lts->tapes[tapenum];
+   LogicalTapeSet *lts = lt->tapeSet;
 
    /*
     * Round and cap buffer_size if needed.
@@ -907,7 +896,7 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
                                      lt->buffer_size - lt->nbytes);
 
            TapeBlockSetNBytes(lt->buffer, lt->nbytes);
-           ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+           ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
        }
        lt->writing = false;
    }
@@ -939,61 +928,28 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
    }
 }
 
-/*
- * Rewind logical tape and switch from reading to writing.
- *
- * NOTE: we assume the caller has read the tape to the end; otherwise
- * untouched data will not have been freed. We could add more code to free
- * any unread blocks, but in current usage of this module it'd be useless
- * code.
- */
-void
-LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
-{
-   LogicalTape *lt;
-
-   Assert(tapenum >= 0 && tapenum < lts->nTapes);
-   lt = &lts->tapes[tapenum];
-
-   Assert(!lt->writing && !lt->frozen);
-   lt->writing = true;
-   lt->dirty = false;
-   lt->firstBlockNumber = -1L;
-   lt->curBlockNumber = -1L;
-   lt->pos = 0;
-   lt->nbytes = 0;
-   if (lt->buffer)
-       pfree(lt->buffer);
-   lt->buffer = NULL;
-   lt->buffer_size = 0;
-}
-
 /*
  * Read from a logical tape.
  *
  * Early EOF is indicated by return value less than #bytes requested.
  */
 size_t
-LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
-               void *ptr, size_t size)
+LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
 {
-   LogicalTape *lt;
    size_t      nread = 0;
    size_t      nthistime;
 
-   Assert(tapenum >= 0 && tapenum < lts->nTapes);
-   lt = &lts->tapes[tapenum];
    Assert(!lt->writing);
 
    if (lt->buffer == NULL)
-       ltsInitReadBuffer(lts, lt);
+       ltsInitReadBuffer(lt);
 
    while (size > 0)
    {
        if (lt->pos >= lt->nbytes)
        {
            /* Try to load more data into buffer. */
-           if (!ltsReadFillBuffer(lts, lt))
+           if (!ltsReadFillBuffer(lt))
                break;          /* EOF */
        }
 
@@ -1031,12 +987,10 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
  * Serial sorts should set share to NULL.
  */
 void
-LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
+LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
 {
-   LogicalTape *lt;
+   LogicalTapeSet *lts = lt->tapeSet;
 
-   Assert(tapenum >= 0 && tapenum < lts->nTapes);
-   lt = &lts->tapes[tapenum];
    Assert(lt->writing);
    Assert(lt->offsetBlockNumber == 0L);
 
@@ -1058,8 +1012,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
                                  lt->buffer_size - lt->nbytes);
 
        TapeBlockSetNBytes(lt->buffer, lt->nbytes);
-       ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
-       lt->writing = false;
+       ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
    }
    lt->writing = false;
    lt->frozen = true;
@@ -1086,7 +1039,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
 
    if (lt->firstBlockNumber == -1L)
        lt->nextBlockNumber = -1L;
-   ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+   ltsReadBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
    if (TapeBlockIsLast(lt->buffer))
        lt->nextBlockNumber = -1L;
    else
@@ -1101,25 +1054,6 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
    }
 }
 
-/*
- * Add additional tapes to this tape set. Not intended to be used when any
- * tapes are frozen.
- */
-void
-LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional)
-{
-   int         i;
-   int         nTapesOrig = lts->nTapes;
-
-   lts->nTapes += nAdditional;
-
-   lts->tapes = (LogicalTape *) repalloc(lts->tapes,
-                                         lts->nTapes * sizeof(LogicalTape));
-
-   for (i = nTapesOrig; i < lts->nTapes; i++)
-       ltsInitTape(&lts->tapes[i]);
-}
-
 /*
  * Backspace the tape a given number of bytes.  (We also support a more
  * general seek interface, see below.)
@@ -1134,18 +1068,15 @@ LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional)
  * that case.
  */
 size_t
-LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
+LogicalTapeBackspace(LogicalTape *lt, size_t size)
 {
-   LogicalTape *lt;
    size_t      seekpos = 0;
 
-   Assert(tapenum >= 0 && tapenum < lts->nTapes);
-   lt = &lts->tapes[tapenum];
    Assert(lt->frozen);
    Assert(lt->buffer_size == BLCKSZ);
 
    if (lt->buffer == NULL)
-       ltsInitReadBuffer(lts, lt);
+       ltsInitReadBuffer(lt);
 
    /*
     * Easy case for seek within current block.
@@ -1175,7 +1106,7 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
            return seekpos;
        }
 
-       ltsReadBlock(lts, prev, (void *) lt->buffer);
+       ltsReadBlock(lt->tapeSet, prev, (void *) lt->buffer);
 
        if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
            elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
@@ -1208,23 +1139,18 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
  * LogicalTapeTell().
  */
 void
-LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
-               long blocknum, int offset)
+LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset)
 {
-   LogicalTape *lt;
-
-   Assert(tapenum >= 0 && tapenum < lts->nTapes);
-   lt = &lts->tapes[tapenum];
    Assert(lt->frozen);
    Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
    Assert(lt->buffer_size == BLCKSZ);
 
    if (lt->buffer == NULL)
-       ltsInitReadBuffer(lts, lt);
+       ltsInitReadBuffer(lt);
 
    if (blocknum != lt->curBlockNumber)
    {
-       ltsReadBlock(lts, blocknum, (void *) lt->buffer);
+       ltsReadBlock(lt->tapeSet, blocknum, (void *) lt->buffer);
        lt->curBlockNumber = blocknum;
        lt->nbytes = TapeBlockPayloadSize;
        lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
@@ -1242,16 +1168,10 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
  * the position for a seek after freezing.  Not clear if anyone needs that.
  */
 void
-LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
-               long *blocknum, int *offset)
+LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset)
 {
-   LogicalTape *lt;
-
-   Assert(tapenum >= 0 && tapenum < lts->nTapes);
-   lt = &lts->tapes[tapenum];
-
    if (lt->buffer == NULL)
-       ltsInitReadBuffer(lts, lt);
+       ltsInitReadBuffer(lt);
 
    Assert(lt->offsetBlockNumber == 0L);
 
@@ -1271,13 +1191,5 @@ LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
 long
 LogicalTapeSetBlocks(LogicalTapeSet *lts)
 {
-#ifdef USE_ASSERT_CHECKING
-   for (int i = 0; i < lts->nTapes; i++)
-   {
-       LogicalTape *lt = &lts->tapes[i];
-
-       Assert(!lt->writing || lt->buffer == NULL);
-   }
-#endif
    return lts->nBlocksWritten - lts->nHoleBlocks;
 }
index b17347b21419ed2ce8cc6d94c1edfa26e575e675..d5930f258d9216bb2daa680d9d8e1a70c5e8b96b 100644 (file)
@@ -262,6 +262,7 @@ struct Tuplesortstate
    MemoryContext sortcontext;  /* memory context holding most sort data */
    MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
    LogicalTapeSet *tapeset;    /* logtape.c object for tapes in a temp file */
+   LogicalTape **tapes;
 
    /*
     * These function pointers decouple the routines that must know what kind
@@ -290,7 +291,7 @@ struct Tuplesortstate
     * SortTuple struct!), and increase state->availMem by the amount of
     * memory space thereby released.
     */
-   void        (*writetup) (Tuplesortstate *state, int tapenum,
+   void        (*writetup) (Tuplesortstate *state, LogicalTape *tape,
                             SortTuple *stup);
 
    /*
@@ -299,7 +300,7 @@ struct Tuplesortstate
     * from the slab memory arena, or is palloc'd, see readtup_alloc().
     */
    void        (*readtup) (Tuplesortstate *state, SortTuple *stup,
-                           int tapenum, unsigned int len);
+                           LogicalTape *tape, unsigned int len);
 
    /*
     * This array holds the tuples now in sort memory.  If we are in state
@@ -393,7 +394,7 @@ struct Tuplesortstate
     * the next tuple to return.  (In the tape case, the tape's current read
     * position is also critical state.)
     */
-   int         result_tape;    /* actual tape number of finished output */
+   LogicalTape *result_tape;   /* tape of finished output */
    int         current;        /* array index (only used if SORTEDINMEM) */
    bool        eof_reached;    /* reached EOF (needed for cursors) */
 
@@ -599,9 +600,9 @@ struct Sharedsort
  */
 
 /* When using this macro, beware of double evaluation of len */
-#define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
+#define LogicalTapeReadExact(tape, ptr, len) \
    do { \
-       if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
+       if (LogicalTapeRead(tape, ptr, len) != (size_t) (len)) \
            elog(ERROR, "unexpected end of data"); \
    } while(0)
 
@@ -619,7 +620,7 @@ static void init_slab_allocator(Tuplesortstate *state, int numSlots);
 static void mergeruns(Tuplesortstate *state);
 static void mergeonerun(Tuplesortstate *state);
 static void beginmerge(Tuplesortstate *state);
-static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
+static bool mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup);
 static void dumptuples(Tuplesortstate *state, bool alltuples);
 static void make_bounded_heap(Tuplesortstate *state);
 static void sort_bounded_heap(Tuplesortstate *state);
@@ -628,39 +629,39 @@ static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
 static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
 static void tuplesort_heap_delete_top(Tuplesortstate *state);
 static void reversedirection(Tuplesortstate *state);
-static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
-static void markrunend(Tuplesortstate *state, int tapenum);
+static unsigned int getlen(LogicalTape *tape, bool eofOK);
+static void markrunend(LogicalTape *tape);
 static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
 static int comparetup_heap(const SortTuple *a, const SortTuple *b,
                            Tuplesortstate *state);
 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_heap(Tuplesortstate *state, int tapenum,
+static void writetup_heap(Tuplesortstate *state, LogicalTape *tape,
                          SortTuple *stup);
 static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
-                        int tapenum, unsigned int len);
+                        LogicalTape *tape, unsigned int len);
 static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
                               Tuplesortstate *state);
 static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_cluster(Tuplesortstate *state, int tapenum,
+static void writetup_cluster(Tuplesortstate *state, LogicalTape *tape,
                             SortTuple *stup);
 static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
-                           int tapenum, unsigned int len);
+                           LogicalTape *tape, unsigned int len);
 static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
                                   Tuplesortstate *state);
 static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
                                  Tuplesortstate *state);
 static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_index(Tuplesortstate *state, int tapenum,
+static void writetup_index(Tuplesortstate *state, LogicalTape *tape,
                           SortTuple *stup);
 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
-                         int tapenum, unsigned int len);
+                         LogicalTape *tape, unsigned int len);
 static int comparetup_datum(const SortTuple *a, const SortTuple *b,
                             Tuplesortstate *state);
 static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_datum(Tuplesortstate *state, int tapenum,
+static void writetup_datum(Tuplesortstate *state, LogicalTape *tape,
                           SortTuple *stup);
 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
-                         int tapenum, unsigned int len);
+                         LogicalTape *tape, unsigned int len);
 static int worker_get_identifier(Tuplesortstate *state);
 static void worker_freeze_result_tape(Tuplesortstate *state);
 static void worker_nomergeruns(Tuplesortstate *state);
@@ -888,7 +889,7 @@ tuplesort_begin_batch(Tuplesortstate *state)
     * inittapes(), if needed
     */
 
-   state->result_tape = -1;    /* flag that result tape has not been formed */
+   state->result_tape = NULL;  /* flag that result tape has not been formed */
 
    MemoryContextSwitchTo(oldcontext);
 }
@@ -2221,7 +2222,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                if (state->eof_reached)
                    return false;
 
-               if ((tuplen = getlen(state, state->result_tape, true)) != 0)
+               if ((tuplen = getlen(state->result_tape, true)) != 0)
                {
                    READTUP(state, stup, state->result_tape, tuplen);
 
@@ -2254,8 +2255,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                 * end of file; back up to fetch last tuple's ending length
                 * word.  If seek fails we must have a completely empty file.
                 */
-               nmoved = LogicalTapeBackspace(state->tapeset,
-                                             state->result_tape,
+               nmoved = LogicalTapeBackspace(state->result_tape,
                                              2 * sizeof(unsigned int));
                if (nmoved == 0)
                    return false;
@@ -2269,20 +2269,18 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                 * Back up and fetch previously-returned tuple's ending length
                 * word.  If seek fails, assume we are at start of file.
                 */
-               nmoved = LogicalTapeBackspace(state->tapeset,
-                                             state->result_tape,
+               nmoved = LogicalTapeBackspace(state->result_tape,
                                              sizeof(unsigned int));
                if (nmoved == 0)
                    return false;
                else if (nmoved != sizeof(unsigned int))
                    elog(ERROR, "unexpected tape position");
-               tuplen = getlen(state, state->result_tape, false);
+               tuplen = getlen(state->result_tape, false);
 
                /*
                 * Back up to get ending length word of tuple before it.
                 */
-               nmoved = LogicalTapeBackspace(state->tapeset,
-                                             state->result_tape,
+               nmoved = LogicalTapeBackspace(state->result_tape,
                                              tuplen + 2 * sizeof(unsigned int));
                if (nmoved == tuplen + sizeof(unsigned int))
                {
@@ -2299,15 +2297,14 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                    elog(ERROR, "bogus tuple length in backward scan");
            }
 
-           tuplen = getlen(state, state->result_tape, false);
+           tuplen = getlen(state->result_tape, false);
 
            /*
             * Now we have the length of the prior tuple, back up and read it.
             * Note: READTUP expects we are positioned after the initial
             * length word of the tuple, so back up to that point.
             */
-           nmoved = LogicalTapeBackspace(state->tapeset,
-                                         state->result_tape,
+           nmoved = LogicalTapeBackspace(state->result_tape,
                                          tuplen);
            if (nmoved != tuplen)
                elog(ERROR, "bogus tuple length in backward scan");
@@ -2365,11 +2362,10 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                    tuplesort_heap_delete_top(state);
 
                    /*
-                    * Rewind to free the read buffer.  It'd go away at the
-                    * end of the sort anyway, but better to release the
-                    * memory early.
+                    * Close the tape.  It'd go away at the end of the sort
+                    * anyway, but better to release the memory early.
                     */
-                   LogicalTapeRewindForWrite(state->tapeset, srcTape);
+                   LogicalTapeClose(state->tapes[srcTape]);
                    return true;
                }
                newtup.srctape = srcTape;
@@ -2667,9 +2663,12 @@ inittapes(Tuplesortstate *state, bool mergeruns)
    /* Create the tape set and allocate the per-tape data arrays */
    inittapestate(state, maxTapes);
    state->tapeset =
-       LogicalTapeSetCreate(maxTapes, false, NULL,
+       LogicalTapeSetCreate(false,
                             state->shared ? &state->shared->fileset : NULL,
                             state->worker);
+   state->tapes = palloc(maxTapes * sizeof(LogicalTape *));
+   for (j = 0; j < maxTapes; j++)
+       state->tapes[j] = LogicalTapeCreate(state->tapeset);
 
    state->currentRun = 0;
 
@@ -2919,7 +2918,7 @@ mergeruns(Tuplesortstate *state)
 
    /* End of step D2: rewind all output tapes to prepare for merging */
    for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
-       LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
+       LogicalTapeRewindForRead(state->tapes[tapenum], state->read_buffer_size);
 
    for (;;)
    {
@@ -2981,11 +2980,14 @@ mergeruns(Tuplesortstate *state)
        /* Step D6: decrease level */
        if (--state->Level == 0)
            break;
+
        /* rewind output tape T to use as new input */
-       LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
+       LogicalTapeRewindForRead(state->tapes[state->tp_tapenum[state->tapeRange]],
                                 state->read_buffer_size);
-       /* rewind used-up input tape P, and prepare it for write pass */
-       LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
+
+       /* close used-up input tape P, and create a new one for write pass */
+       LogicalTapeClose(state->tapes[state->tp_tapenum[state->tapeRange - 1]]);
+       state->tapes[state->tp_tapenum[state->tapeRange - 1]] = LogicalTapeCreate(state->tapeset);
        state->tp_runs[state->tapeRange - 1] = 0;
 
        /*
@@ -3013,18 +3015,21 @@ mergeruns(Tuplesortstate *state)
     * output tape while rewinding it.  The last iteration of step D6 would be
     * a waste of cycles anyway...
     */
-   state->result_tape = state->tp_tapenum[state->tapeRange];
+   state->result_tape = state->tapes[state->tp_tapenum[state->tapeRange]];
    if (!WORKER(state))
-       LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
+       LogicalTapeFreeze(state->result_tape, NULL);
    else
        worker_freeze_result_tape(state);
    state->status = TSS_SORTEDONTAPE;
 
-   /* Release the read buffers of all the other tapes, by rewinding them. */
+   /* Close all the other tapes, to release their read buffers. */
    for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
    {
-       if (tapenum != state->result_tape)
-           LogicalTapeRewindForWrite(state->tapeset, tapenum);
+       if (state->tapes[tapenum] != state->result_tape)
+       {
+           LogicalTapeClose(state->tapes[tapenum]);
+           state->tapes[tapenum] = NULL;
+       }
    }
 }
 
@@ -3037,7 +3042,8 @@ mergeruns(Tuplesortstate *state)
 static void
 mergeonerun(Tuplesortstate *state)
 {
-   int         destTape = state->tp_tapenum[state->tapeRange];
+   int         destTapeNum = state->tp_tapenum[state->tapeRange];
+   LogicalTape *destTape = state->tapes[destTapeNum];
    int         srcTape;
 
    /*
@@ -3080,7 +3086,7 @@ mergeonerun(Tuplesortstate *state)
     * When the heap empties, we're done.  Write an end-of-run marker on the
     * output tape, and increment its count of real runs.
     */
-   markrunend(state, destTape);
+   markrunend(destTape);
    state->tp_runs[state->tapeRange]++;
 
 #ifdef TRACE_SORT
@@ -3146,17 +3152,18 @@ beginmerge(Tuplesortstate *state)
  * Returns false on EOF.
  */
 static bool
-mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
+mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup)
 {
+   LogicalTape *srcTape = state->tapes[srcTapeIndex];
    unsigned int tuplen;
 
-   if (!state->mergeactive[srcTape])
+   if (!state->mergeactive[srcTapeIndex])
        return false;           /* tape's run is already exhausted */
 
    /* read next tuple, if any */
-   if ((tuplen = getlen(state, srcTape, true)) == 0)
+   if ((tuplen = getlen(srcTape, true)) == 0)
    {
-       state->mergeactive[srcTape] = false;
+       state->mergeactive[srcTapeIndex] = false;
        return false;
    }
    READTUP(state, stup, srcTape, tuplen);
@@ -3173,6 +3180,7 @@ mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
 static void
 dumptuples(Tuplesortstate *state, bool alltuples)
 {
+   LogicalTape *destTape;
    int         memtupwrite;
    int         i;
 
@@ -3239,10 +3247,10 @@ dumptuples(Tuplesortstate *state, bool alltuples)
 #endif
 
    memtupwrite = state->memtupcount;
+   destTape = state->tapes[state->tp_tapenum[state->destTape]];
    for (i = 0; i < memtupwrite; i++)
    {
-       WRITETUP(state, state->tp_tapenum[state->destTape],
-                &state->memtuples[i]);
+       WRITETUP(state, destTape, &state->memtuples[i]);
        state->memtupcount--;
    }
 
@@ -3255,7 +3263,7 @@ dumptuples(Tuplesortstate *state, bool alltuples)
     */
    MemoryContextReset(state->tuplecontext);
 
-   markrunend(state, state->tp_tapenum[state->destTape]);
+   markrunend(destTape);
    state->tp_runs[state->destTape]++;
    state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
 
@@ -3289,9 +3297,7 @@ tuplesort_rescan(Tuplesortstate *state)
            state->markpos_eof = false;
            break;
        case TSS_SORTEDONTAPE:
-           LogicalTapeRewindForRead(state->tapeset,
-                                    state->result_tape,
-                                    0);
+           LogicalTapeRewindForRead(state->result_tape, 0);
            state->eof_reached = false;
            state->markpos_block = 0L;
            state->markpos_offset = 0;
@@ -3322,8 +3328,7 @@ tuplesort_markpos(Tuplesortstate *state)
            state->markpos_eof = state->eof_reached;
            break;
        case TSS_SORTEDONTAPE:
-           LogicalTapeTell(state->tapeset,
-                           state->result_tape,
+           LogicalTapeTell(state->result_tape,
                            &state->markpos_block,
                            &state->markpos_offset);
            state->markpos_eof = state->eof_reached;
@@ -3354,8 +3359,7 @@ tuplesort_restorepos(Tuplesortstate *state)
            state->eof_reached = state->markpos_eof;
            break;
        case TSS_SORTEDONTAPE:
-           LogicalTapeSeek(state->tapeset,
-                           state->result_tape,
+           LogicalTapeSeek(state->result_tape,
                            state->markpos_block,
                            state->markpos_offset);
            state->eof_reached = state->markpos_eof;
@@ -3697,11 +3701,11 @@ reversedirection(Tuplesortstate *state)
  */
 
 static unsigned int
-getlen(Tuplesortstate *state, int tapenum, bool eofOK)
+getlen(LogicalTape *tape, bool eofOK)
 {
    unsigned int len;
 
-   if (LogicalTapeRead(state->tapeset, tapenum,
+   if (LogicalTapeRead(tape,
                        &len, sizeof(len)) != sizeof(len))
        elog(ERROR, "unexpected end of tape");
    if (len == 0 && !eofOK)
@@ -3710,11 +3714,11 @@ getlen(Tuplesortstate *state, int tapenum, bool eofOK)
 }
 
 static void
-markrunend(Tuplesortstate *state, int tapenum)
+markrunend(LogicalTape *tape)
 {
    unsigned int len = 0;
 
-   LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
+   LogicalTapeWrite(tape, (void *) &len, sizeof(len));
 }
 
 /*
@@ -3892,7 +3896,7 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
 }
 
 static void
-writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_heap(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
 {
    MinimalTuple tuple = (MinimalTuple) stup->tuple;
 
@@ -3903,13 +3907,10 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
    /* total on-disk footprint: */
    unsigned int tuplen = tupbodylen + sizeof(int);
 
-   LogicalTapeWrite(state->tapeset, tapenum,
-                    (void *) &tuplen, sizeof(tuplen));
-   LogicalTapeWrite(state->tapeset, tapenum,
-                    (void *) tupbody, tupbodylen);
+   LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
+   LogicalTapeWrite(tape, (void *) tupbody, tupbodylen);
    if (state->randomAccess)    /* need trailing length word? */
-       LogicalTapeWrite(state->tapeset, tapenum,
-                        (void *) &tuplen, sizeof(tuplen));
+       LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
 
    if (!state->slabAllocatorUsed)
    {
@@ -3920,7 +3921,7 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
 
 static void
 readtup_heap(Tuplesortstate *state, SortTuple *stup,
-            int tapenum, unsigned int len)
+            LogicalTape *tape, unsigned int len)
 {
    unsigned int tupbodylen = len - sizeof(int);
    unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
@@ -3930,11 +3931,9 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
 
    /* read in the tuple proper */
    tuple->t_len = tuplen;
-   LogicalTapeReadExact(state->tapeset, tapenum,
-                        tupbody, tupbodylen);
+   LogicalTapeReadExact(tape, tupbody, tupbodylen);
    if (state->randomAccess)    /* need trailing length word? */
-       LogicalTapeReadExact(state->tapeset, tapenum,
-                            &tuplen, sizeof(tuplen));
+       LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
    stup->tuple = (void *) tuple;
    /* set up first-column key value */
    htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
@@ -4135,21 +4134,17 @@ copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
 }
 
 static void
-writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_cluster(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
 {
    HeapTuple   tuple = (HeapTuple) stup->tuple;
    unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
 
    /* We need to store t_self, but not other fields of HeapTupleData */
-   LogicalTapeWrite(state->tapeset, tapenum,
-                    &tuplen, sizeof(tuplen));
-   LogicalTapeWrite(state->tapeset, tapenum,
-                    &tuple->t_self, sizeof(ItemPointerData));
-   LogicalTapeWrite(state->tapeset, tapenum,
-                    tuple->t_data, tuple->t_len);
+   LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
+   LogicalTapeWrite(tape, &tuple->t_self, sizeof(ItemPointerData));
+   LogicalTapeWrite(tape, tuple->t_data, tuple->t_len);
    if (state->randomAccess)    /* need trailing length word? */
-       LogicalTapeWrite(state->tapeset, tapenum,
-                        &tuplen, sizeof(tuplen));
+       LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
 
    if (!state->slabAllocatorUsed)
    {
@@ -4160,7 +4155,7 @@ writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
 
 static void
 readtup_cluster(Tuplesortstate *state, SortTuple *stup,
-               int tapenum, unsigned int tuplen)
+               LogicalTape *tape, unsigned int tuplen)
 {
    unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
    HeapTuple   tuple = (HeapTuple) readtup_alloc(state,
@@ -4169,16 +4164,13 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
    /* Reconstruct the HeapTupleData header */
    tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
    tuple->t_len = t_len;
-   LogicalTapeReadExact(state->tapeset, tapenum,
-                        &tuple->t_self, sizeof(ItemPointerData));
+   LogicalTapeReadExact(tape, &tuple->t_self, sizeof(ItemPointerData));
    /* We don't currently bother to reconstruct t_tableOid */
    tuple->t_tableOid = InvalidOid;
    /* Read in the tuple body */
-   LogicalTapeReadExact(state->tapeset, tapenum,
-                        tuple->t_data, tuple->t_len);
+   LogicalTapeReadExact(tape, tuple->t_data, tuple->t_len);
    if (state->randomAccess)    /* need trailing length word? */
-       LogicalTapeReadExact(state->tapeset, tapenum,
-                            &tuplen, sizeof(tuplen));
+       LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
    stup->tuple = (void *) tuple;
    /* set up first-column key value, if it's a simple column */
    if (state->indexInfo->ii_IndexAttrNumbers[0] != 0)
@@ -4392,19 +4384,16 @@ copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
 }
 
 static void
-writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_index(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
 {
    IndexTuple  tuple = (IndexTuple) stup->tuple;
    unsigned int tuplen;
 
    tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
-   LogicalTapeWrite(state->tapeset, tapenum,
-                    (void *) &tuplen, sizeof(tuplen));
-   LogicalTapeWrite(state->tapeset, tapenum,
-                    (void *) tuple, IndexTupleSize(tuple));
+   LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
+   LogicalTapeWrite(tape, (void *) tuple, IndexTupleSize(tuple));
    if (state->randomAccess)    /* need trailing length word? */
-       LogicalTapeWrite(state->tapeset, tapenum,
-                        (void *) &tuplen, sizeof(tuplen));
+       LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
 
    if (!state->slabAllocatorUsed)
    {
@@ -4415,16 +4404,14 @@ writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
 
 static void
 readtup_index(Tuplesortstate *state, SortTuple *stup,
-             int tapenum, unsigned int len)
+             LogicalTape *tape, unsigned int len)
 {
    unsigned int tuplen = len - sizeof(unsigned int);
    IndexTuple  tuple = (IndexTuple) readtup_alloc(state, tuplen);
 
-   LogicalTapeReadExact(state->tapeset, tapenum,
-                        tuple, tuplen);
+   LogicalTapeReadExact(tape, tuple, tuplen);
    if (state->randomAccess)    /* need trailing length word? */
-       LogicalTapeReadExact(state->tapeset, tapenum,
-                            &tuplen, sizeof(tuplen));
+       LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
    stup->tuple = (void *) tuple;
    /* set up first-column key value */
    stup->datum1 = index_getattr(tuple,
@@ -4466,7 +4453,7 @@ copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
 }
 
 static void
-writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_datum(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
 {
    void       *waddr;
    unsigned int tuplen;
@@ -4491,13 +4478,10 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
 
    writtenlen = tuplen + sizeof(unsigned int);
 
-   LogicalTapeWrite(state->tapeset, tapenum,
-                    (void *) &writtenlen, sizeof(writtenlen));
-   LogicalTapeWrite(state->tapeset, tapenum,
-                    waddr, tuplen);
+   LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen));
+   LogicalTapeWrite(tape, waddr, tuplen);
    if (state->randomAccess)    /* need trailing length word? */
-       LogicalTapeWrite(state->tapeset, tapenum,
-                        (void *) &writtenlen, sizeof(writtenlen));
+       LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen));
 
    if (!state->slabAllocatorUsed && stup->tuple)
    {
@@ -4508,7 +4492,7 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
 
 static void
 readtup_datum(Tuplesortstate *state, SortTuple *stup,
-             int tapenum, unsigned int len)
+             LogicalTape *tape, unsigned int len)
 {
    unsigned int tuplen = len - sizeof(unsigned int);
 
@@ -4522,8 +4506,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
    else if (!state->tuples)
    {
        Assert(tuplen == sizeof(Datum));
-       LogicalTapeReadExact(state->tapeset, tapenum,
-                            &stup->datum1, tuplen);
+       LogicalTapeReadExact(tape, &stup->datum1, tuplen);
        stup->isnull1 = false;
        stup->tuple = NULL;
    }
@@ -4531,16 +4514,14 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
    {
        void       *raddr = readtup_alloc(state, tuplen);
 
-       LogicalTapeReadExact(state->tapeset, tapenum,
-                            raddr, tuplen);
+       LogicalTapeReadExact(tape, raddr, tuplen);
        stup->datum1 = PointerGetDatum(raddr);
        stup->isnull1 = false;
        stup->tuple = raddr;
    }
 
    if (state->randomAccess)    /* need trailing length word? */
-       LogicalTapeReadExact(state->tapeset, tapenum,
-                            &tuplen, sizeof(tuplen));
+       LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
 }
 
 /*
@@ -4652,7 +4633,7 @@ worker_freeze_result_tape(Tuplesortstate *state)
    TapeShare   output;
 
    Assert(WORKER(state));
-   Assert(state->result_tape != -1);
+   Assert(state->result_tape != NULL);
    Assert(state->memtupcount == 0);
 
    /*
@@ -4668,7 +4649,7 @@ worker_freeze_result_tape(Tuplesortstate *state)
     * Parallel worker requires result tape metadata, which is to be stored in
     * shared memory for leader
     */
-   LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
+   LogicalTapeFreeze(state->result_tape, &output);
 
    /* Store properties of output tape, and update finished worker count */
    SpinLockAcquire(&shared->mutex);
@@ -4687,9 +4668,9 @@ static void
 worker_nomergeruns(Tuplesortstate *state)
 {
    Assert(WORKER(state));
-   Assert(state->result_tape == -1);
+   Assert(state->result_tape == NULL);
 
-   state->result_tape = state->tp_tapenum[state->destTape];
+   state->result_tape = state->tapes[state->tp_tapenum[state->destTape]];
    worker_freeze_result_tape(state);
 }
 
@@ -4733,9 +4714,13 @@ leader_takeover_tapes(Tuplesortstate *state)
     * randomAccess is disallowed for parallel sorts.
     */
    inittapestate(state, nParticipants + 1);
-   state->tapeset = LogicalTapeSetCreate(nParticipants + 1, false,
-                                         shared->tapes, &shared->fileset,
+   state->tapeset = LogicalTapeSetCreate(false,
+                                         &shared->fileset,
                                          state->worker);
+   state->tapes = palloc(state->maxTapes * sizeof(LogicalTape *));
+   for (j = 0; j < nParticipants; j++)
+       state->tapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
+   /* tapes[nParticipants] represents the "leader tape", which is not used */
 
    /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
    state->currentRun = nParticipants;
index 37cb4f3d59a56c6c2fed0bf8bebf06e8fbb52ab6..2e8cbee69ff6ebbcf080deed0fc68976150e7920 100644 (file)
@@ -41,6 +41,7 @@ struct ExprContext;
 struct RangeTblEntry;          /* avoid including parsenodes.h here */
 struct ExprEvalStep;           /* avoid including execExpr.h everywhere */
 struct CopyMultiInsertBuffer;
+struct LogicalTapeSet;
 
 
 /* ----------------
@@ -2316,7 +2317,7 @@ typedef struct AggState
    bool        table_filled;   /* hash table filled yet? */
    int         num_hashes;
    MemoryContext hash_metacxt; /* memory for hash table itself */
-   struct HashTapeInfo *hash_tapeinfo; /* metadata for spill tapes */
+   struct LogicalTapeSet *hash_tapeset;    /* tape set for hash spill tapes */
    struct HashAggSpill *hash_spills;   /* HashAggSpill for each grouping set,
                                         * exists only during first pass */
    TupleTableSlot *hash_spill_rslot;   /* for reading spill files */
index 85d2e03c631c6eb199fcd280cf330ffaa90b0d3d..758a19779c689328c6c502bdfd9231bd39b5f71e 100644 (file)
 
 #include "storage/sharedfileset.h"
 
-/* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */
-
+/*
+ * LogicalTapeSet and LogicalTape are opaque types whose details are not
+ * known outside logtape.c.
+ */
 typedef struct LogicalTapeSet LogicalTapeSet;
+typedef struct LogicalTape LogicalTape;
+
 
 /*
  * The approach tuplesort.c takes to parallel external sorts is that workers,
@@ -54,27 +58,20 @@ typedef struct TapeShare
  * prototypes for functions in logtape.c
  */
 
-extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes, bool preallocate,
-                                           TapeShare *shared,
+extern LogicalTapeSet *LogicalTapeSetCreate(bool preallocate,
                                            SharedFileSet *fileset, int worker);
+extern void LogicalTapeClose(LogicalTape *lt);
 extern void LogicalTapeSetClose(LogicalTapeSet *lts);
+extern LogicalTape *LogicalTapeCreate(LogicalTapeSet *lts);
+extern LogicalTape *LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared);
 extern void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts);
-extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
-                             void *ptr, size_t size);
-extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
-                            void *ptr, size_t size);
-extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum,
-                                    size_t buffer_size);
-extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum);
-extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum,
-                             TapeShare *share);
-extern void LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional);
-extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
-                                  size_t size);
-extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
-                           long blocknum, int offset);
-extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
-                           long *blocknum, int *offset);
+extern size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size);
+extern void LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size);
+extern void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size);
+extern void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share);
+extern size_t LogicalTapeBackspace(LogicalTape *lt, size_t size);
+extern void LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset);
+extern void LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset);
 extern long LogicalTapeSetBlocks(LogicalTapeSet *lts);
 
 #endif                         /* LOGTAPE_H */