Replace polyphase merge algorithm with a simple balanced k-way merge.
authorHeikki Linnakangas <[email protected]>
Mon, 18 Oct 2021 11:33:42 +0000 (14:33 +0300)
committerHeikki Linnakangas <[email protected]>
Mon, 18 Oct 2021 11:46:01 +0000 (14:46 +0300)
The advantage of polyphase merge is that it can reuse the input tapes as
output tapes efficiently, but that is irrelevant on modern hardware, when
we can easily emulate any number of tape drives. The number of input tapes
we can/should use during merging is limited by work_mem, but output tapes
that we are not currently writing to only cost a little bit of memory, so
there is no need to skimp on them.

This makes sorts that need multiple merge passes faster.

Discussion: https://www.postgresql.org/message-id/420a0ec7-602c-d406-1e75-1ef7ddc58d83%40iki.fi
Reviewed-by: Peter Geoghegan, Zhihong Yu, John Naylor
src/backend/utils/sort/tuplesort.c

index d5930f258d9216bb2daa680d9d8e1a70c5e8b96b..d8e8ccad1ff4d5101602f7eae3c8cfefd91d7d78 100644 (file)
  * amounts are sorted using temporary files and a standard external sort
  * algorithm.
  *
- * See Knuth, volume 3, for more than you want to know about the external
- * sorting algorithm.  Historically, we divided the input into sorted runs
- * using replacement selection, in the form of a priority tree implemented
- * as a heap (essentially his Algorithm 5.2.3H), but now we always use
- * quicksort for run generation.  We merge the runs using polyphase merge,
- * Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D are
- * implemented by logtape.c, which avoids space wastage by recycling disk
- * space as soon as each block is read from its "tape".
+ * See Knuth, volume 3, for more than you want to know about external
+ * sorting algorithms.  The algorithm we use is a balanced k-way merge.
+ * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
+ * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
+ * merge is better.  Knuth is assuming that tape drives are expensive
+ * beasts, and in particular that there will always be many more runs than
+ * tape drives.  The polyphase merge algorithm was good at keeping all the
+ * tape drives busy, but in our implementation a "tape drive" doesn't cost
+ * much more than a few Kb of memory buffers, so we can afford to have
+ * lots of them.  In particular, if we can have as many tape drives as
+ * sorted runs, we can eliminate any repeated I/O at all.
+ *
+ * Historically, we divided the input into sorted runs using replacement
+ * selection, in the form of a priority tree implemented as a heap
+ * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
+ * for run generation.
  *
  * The approximate amount of memory allowed for any one sort operation
  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
  * tuples just by scanning the tuple array sequentially.  If we do exceed
  * workMem, we begin to emit tuples into sorted runs in temporary tapes.
  * When tuples are dumped in batch after quicksorting, we begin a new run
- * with a new output tape (selected per Algorithm D).  After the end of the
- * input is reached, we dump out remaining tuples in memory into a final run,
- * then merge the runs using Algorithm D.
+ * with a new output tape.  If we reach the max number of tapes, we write
+ * subsequent runs on the existing tapes in a round-robin fashion.  We will
+ * need multiple merge passes to finish the merge in that case.  After the
+ * end of the input is reached, we dump out remaining tuples in memory into
+ * a final run, then merge the runs.
  *
  * When merging runs, we use a heap containing just the frontmost tuple from
  * each source run; we repeatedly output the smallest tuple and replace it
  * accesses.  The pre-reading is handled by logtape.c, we just tell it how
  * much memory to use for the buffers.
  *
+ * In the current code we determine the number of input tapes M on the basis
+ * of workMem: we want workMem/M to be large enough that we read a fair
+ * amount of data each time we read from a tape, so as to maintain the
+ * locality of access described above.  Nonetheless, with large workMem we
+ * can have many tapes.  The logical "tapes" are implemented by logtape.c,
+ * which avoids space wastage by recycling disk space as soon as each block
+ * is read from its "tape".
+ *
  * When the caller requests random access to the sort result, we form
  * the final sorted run on a logical tape which is then "frozen", so
  * that we can access it randomly.  When the caller does not need random
  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
  * saves one cycle of writing all the data out to disk and reading it in.
  *
- * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
- * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
- * to Knuth's figure 70 (section 5.4.2).  However, Knuth is assuming that
- * tape drives are expensive beasts, and in particular that there will always
- * be many more runs than tape drives.  In our implementation a "tape drive"
- * doesn't cost much more than a few Kb of memory buffers, so we can afford
- * to have lots of them.  In particular, if we can have as many tape drives
- * as sorted runs, we can eliminate any repeated I/O at all.  In the current
- * code we determine the number of tapes M on the basis of workMem: we want
- * workMem/M to be large enough that we read a fair amount of data each time
- * we preread from a tape, so as to maintain the locality of access described
- * above.  Nonetheless, with large workMem we can have many tapes (but not
- * too many -- see the comments in tuplesort_merge_order).
- *
  * This module supports parallel sorting.  Parallel sorts involve coordination
  * among one or more worker processes, and a leader process, each with its own
  * tuplesort state.  The leader process (or, more accurately, the
@@ -223,8 +227,9 @@ typedef enum
  * worth of buffer space.  This ignores the overhead of all the other data
  * structures needed for each tape, but it's probably close enough.
  *
- * MERGE_BUFFER_SIZE is how much data we'd like to read from each input
- * tape during a preread cycle (see discussion at top of file).
+ * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
+ * input tape, for pre-reading (see discussion at top of file).  This is *in
+ * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
  */
 #define MINORDER       6       /* minimum merge order */
 #define MAXORDER       500     /* maximum merge order */
@@ -249,8 +254,8 @@ struct Tuplesortstate
    bool        tuples;         /* Can SortTuple.tuple ever be set? */
    int64       availMem;       /* remaining memory available, in bytes */
    int64       allowedMem;     /* total memory allowed, in bytes */
-   int         maxTapes;       /* number of tapes (Knuth's T) */
-   int         tapeRange;      /* maxTapes-1 (Knuth's P) */
+   int         maxTapes;       /* max number of input tapes to merge in each
+                                * pass */
    int64       maxSpace;       /* maximum amount of space occupied among sort
                                 * of groups, either in-memory or on-disk */
    bool        isMaxSpaceDisk; /* true when maxSpace is value for on-disk
@@ -262,7 +267,6 @@ struct Tuplesortstate
    MemoryContext sortcontext;  /* memory context holding most sort data */
    MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
    LogicalTapeSet *tapeset;    /* logtape.c object for tapes in a temp file */
-   LogicalTape **tapes;
 
    /*
     * These function pointers decouple the routines that must know what kind
@@ -347,8 +351,8 @@ struct Tuplesortstate
    char       *slabMemoryEnd;  /* end of slab memory arena */
    SlabSlot   *slabFreeHead;   /* head of free list */
 
-   /* Buffer size to use for reading input tapes, during merge. */
-   size_t      read_buffer_size;
+   /* Memory used for input and output tape buffers. */
+   size_t      tape_buffer_mem;
 
    /*
     * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
@@ -365,36 +369,29 @@ struct Tuplesortstate
    int         currentRun;
 
    /*
-    * Unless otherwise noted, all pointer variables below are pointers to
-    * arrays of length maxTapes, holding per-tape data.
+    * Logical tapes, for merging.
+    *
+    * The initial runs are written in the output tapes.  In each merge pass,
+    * the output tapes of the previous pass become the input tapes, and new
+    * output tapes are created as needed.  When nInputTapes equals
+    * nInputRuns, there is only one merge pass left.
     */
+   LogicalTape **inputTapes;
+   int         nInputTapes;
+   int         nInputRuns;
 
-   /*
-    * This variable is only used during merge passes.  mergeactive[i] is true
-    * if we are reading an input run from (actual) tape number i and have not
-    * yet exhausted that run.
-    */
-   bool       *mergeactive;    /* active input run source? */
+   LogicalTape **outputTapes;
+   int         nOutputTapes;
+   int         nOutputRuns;
 
-   /*
-    * Variables for Algorithm D.  Note that destTape is a "logical" tape
-    * number, ie, an index into the tp_xxx[] arrays.  Be careful to keep
-    * "logical" and "actual" tape numbers straight!
-    */
-   int         Level;          /* Knuth's l */
-   int         destTape;       /* current output tape (Knuth's j, less 1) */
-   int        *tp_fib;         /* Target Fibonacci run counts (A[]) */
-   int        *tp_runs;        /* # of real runs on each tape */
-   int        *tp_dummy;       /* # of dummy runs for each tape (D[]) */
-   int        *tp_tapenum;     /* Actual tape numbers (TAPE[]) */
-   int         activeTapes;    /* # of active input tapes in merge pass */
+   LogicalTape *destTape;      /* current output tape */
 
    /*
     * These variables are used after completion of sorting to keep track of
     * the next tuple to return.  (In the tape case, the tape's current read
     * position is also critical state.)
     */
-   LogicalTape *result_tape;   /* tape of finished output */
+   LogicalTape *result_tape;   /* actual tape of finished output */
    int         current;        /* array index (only used if SORTEDINMEM) */
    bool        eof_reached;    /* reached EOF (needed for cursors) */
 
@@ -415,8 +412,9 @@ struct Tuplesortstate
     *
     * nParticipants is the number of worker Tuplesortstates known by the
     * leader to have actually been launched, which implies that they must
-    * finish a run leader can merge.  Typically includes a worker state held
-    * by the leader process itself.  Set in the leader Tuplesortstate only.
+    * finish a run that the leader needs to merge.  Typically includes a
+    * worker state held by the leader process itself.  Set in the leader
+    * Tuplesortstate only.
     */
    int         worker;
    Sharedsort *shared;
@@ -620,7 +618,7 @@ static void init_slab_allocator(Tuplesortstate *state, int numSlots);
 static void mergeruns(Tuplesortstate *state);
 static void mergeonerun(Tuplesortstate *state);
 static void beginmerge(Tuplesortstate *state);
-static bool mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup);
+static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
 static void dumptuples(Tuplesortstate *state, bool alltuples);
 static void make_bounded_heap(Tuplesortstate *state);
 static void sort_bounded_heap(Tuplesortstate *state);
@@ -885,8 +883,8 @@ tuplesort_begin_batch(Tuplesortstate *state)
    state->currentRun = 0;
 
    /*
-    * maxTapes, tapeRange, and Algorithm D variables will be initialized by
-    * inittapes(), if needed
+    * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
+    * inittapes(), if needed.
     */
 
    state->result_tape = NULL;  /* flag that result tape has not been formed */
@@ -1408,6 +1406,10 @@ tuplesort_free(Tuplesortstate *state)
     *
     * Note: want to include this in reported total cost of sort, hence need
     * for two #ifdef TRACE_SORT sections.
+    *
+    * We don't bother to destroy the individual tapes here. They will go away
+    * with the sortcontext.  (In TSS_FINALMERGE state, we have closed
+    * finished tapes already.)
     */
    if (state->tapeset)
        LogicalTapeSetClose(state->tapeset);
@@ -2130,7 +2132,7 @@ tuplesort_performsort(Tuplesortstate *state)
    {
        if (state->status == TSS_FINALMERGE)
            elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
-                state->worker, state->activeTapes,
+                state->worker, state->nInputTapes,
                 pg_rusage_show(&state->ru_start));
        else
            elog(LOG, "performsort of worker %d done: %s",
@@ -2338,7 +2340,8 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
             */
            if (state->memtupcount > 0)
            {
-               int         srcTape = state->memtuples[0].srctape;
+               int         srcTapeIndex = state->memtuples[0].srctape;
+               LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
                SortTuple   newtup;
 
                *stup = state->memtuples[0];
@@ -2360,15 +2363,16 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                     * Remove the top node from the heap.
                     */
                    tuplesort_heap_delete_top(state);
+                   state->nInputRuns--;
 
                    /*
                     * Close the tape.  It'd go away at the end of the sort
                     * anyway, but better to release the memory early.
                     */
-                   LogicalTapeClose(state->tapes[srcTape]);
+                   LogicalTapeClose(srcTape);
                    return true;
                }
-               newtup.srctape = srcTape;
+               newtup.srctape = srcTapeIndex;
                tuplesort_heap_replace_top(state, &newtup);
                return true;
            }
@@ -2599,18 +2603,29 @@ tuplesort_merge_order(int64 allowedMem)
 {
    int         mOrder;
 
-   /*
-    * We need one tape for each merge input, plus another one for the output,
-    * and each of these tapes needs buffer space.  In addition we want
-    * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't
-    * count).
+   /*----------
+    * In the merge phase, we need buffer space for each input and output tape.
+    * Each pass in the balanced merge algorithm reads from M input tapes, and
+    * writes to N output tapes.  Each tape consumes TAPE_BUFFER_OVERHEAD bytes
+    * of memory.  In addition to that, we want MERGE_BUFFER_SIZE workspace per
+    * input tape.
+    *
+    * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
+    *            N * TAPE_BUFFER_OVERHEAD
+    *
+    * Except for the last and next-to-last merge passes, where there can be
+    * fewer tapes left to process, M = N.  We choose M so that we have the
+    * desired amount of memory available for the input buffers
+    * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
+    * available for the tape buffers (allowedMem).
     *
     * Note: you might be thinking we need to account for the memtuples[]
     * array in this calculation, but we effectively treat that as part of the
     * MERGE_BUFFER_SIZE workspace.
+    *----------
     */
-   mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
-       (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
+   mOrder = allowedMem /
+       (2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE);
 
    /*
     * Even in minimum memory, use at least a MINORDER merge.  On the other
@@ -2620,7 +2635,7 @@ tuplesort_merge_order(int64 allowedMem)
     * which in turn can cause the same sort to need more runs, which makes
     * merging slower even if it can still be done in a single pass.  Also,
     * high order merges are quite slow due to CPU cache effects; it can be
-    * faster to pay the I/O cost of a polyphase merge than to perform a
+    * faster to pay the I/O cost of a multi-pass merge than to perform a
     * single merge pass across many hundreds of tapes.
     */
    mOrder = Max(mOrder, MINORDER);
@@ -2629,6 +2644,42 @@ tuplesort_merge_order(int64 allowedMem)
    return mOrder;
 }
 
+/*
+ * Helper function to calculate how much memory to allocate for the read buffer
+ * of each input tape in a merge pass.
+ *
+ * 'avail_mem' is the amount of memory available for the buffers of all the
+ *     tapes, both input and output.
+ * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
+ * 'maxOutputTapes' is the max. number of output tapes we should produce.
+ */
+static int64
+merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
+                      int maxOutputTapes)
+{
+   int         nOutputRuns;
+   int         nOutputTapes;
+
+   /*
+    * How many output tapes will we produce in this pass?
+    *
+    * This is nInputRuns / nInputTapes, rounded up.
+    */
+   nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
+
+   nOutputTapes = Min(nOutputRuns, maxOutputTapes);
+
+   /*
+    * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory.  All
+    * remaining memory is divided evenly between the input tapes.
+    *
+    * This also follows from the formula in tuplesort_merge_order, but here
+    * we derive the input buffer size from the amount of memory available,
+    * and M and N.
+    */
+   return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
+}
+
 /*
  * inittapes - initialize for tape sorting.
  *
@@ -2637,58 +2688,49 @@ tuplesort_merge_order(int64 allowedMem)
 static void
 inittapes(Tuplesortstate *state, bool mergeruns)
 {
-   int         maxTapes,
-               j;
-
    Assert(!LEADER(state));
 
    if (mergeruns)
    {
-       /* Compute number of tapes to use: merge order plus 1 */
-       maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
+       /* Compute number of input tapes to use when merging */
+       state->maxTapes = tuplesort_merge_order(state->allowedMem);
    }
    else
    {
        /* Workers can sometimes produce single run, output without merge */
        Assert(WORKER(state));
-       maxTapes = MINORDER + 1;
+       state->maxTapes = MINORDER;
    }
 
 #ifdef TRACE_SORT
    if (trace_sort)
        elog(LOG, "worker %d switching to external sort with %d tapes: %s",
-            state->worker, maxTapes, pg_rusage_show(&state->ru_start));
+            state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
 #endif
 
-   /* Create the tape set and allocate the per-tape data arrays */
-   inittapestate(state, maxTapes);
+   /* Create the tape set */
+   inittapestate(state, state->maxTapes);
    state->tapeset =
        LogicalTapeSetCreate(false,
                             state->shared ? &state->shared->fileset : NULL,
                             state->worker);
-   state->tapes = palloc(maxTapes * sizeof(LogicalTape *));
-   for (j = 0; j < maxTapes; j++)
-       state->tapes[j] = LogicalTapeCreate(state->tapeset);
 
    state->currentRun = 0;
 
    /*
-    * Initialize variables of Algorithm D (step D1).
+    * Initialize logical tape arrays.
     */
-   for (j = 0; j < maxTapes; j++)
-   {
-       state->tp_fib[j] = 1;
-       state->tp_runs[j] = 0;
-       state->tp_dummy[j] = 1;
-       state->tp_tapenum[j] = j;
-   }
-   state->tp_fib[state->tapeRange] = 0;
-   state->tp_dummy[state->tapeRange] = 0;
+   state->inputTapes = NULL;
+   state->nInputTapes = 0;
+   state->nInputRuns = 0;
 
-   state->Level = 1;
-   state->destTape = 0;
+   state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
+   state->nOutputTapes = 0;
+   state->nOutputRuns = 0;
 
    state->status = TSS_BUILDRUNS;
+
+   selectnewtape(state);
 }
 
 /*
@@ -2719,52 +2761,37 @@ inittapestate(Tuplesortstate *state, int maxTapes)
     * called already, but it doesn't matter if it is called a second time.
     */
    PrepareTempTablespaces();
-
-   state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
-   state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
-   state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
-   state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
-   state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
-
-   /* Record # of tapes allocated (for duration of sort) */
-   state->maxTapes = maxTapes;
-   /* Record maximum # of tapes usable as inputs when merging */
-   state->tapeRange = maxTapes - 1;
 }
 
 /*
- * selectnewtape -- select new tape for new initial run.
+ * selectnewtape -- select next tape to output to.
  *
  * This is called after finishing a run when we know another run
- * must be started.  This implements steps D3, D4 of Algorithm D.
+ * must be started.  This is used both when building the initial
+ * runs, and during merge passes.
  */
 static void
 selectnewtape(Tuplesortstate *state)
 {
-   int         j;
-   int         a;
-
-   /* Step D3: advance j (destTape) */
-   if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
-   {
-       state->destTape++;
-       return;
-   }
-   if (state->tp_dummy[state->destTape] != 0)
+   if (state->nOutputRuns < state->maxTapes)
    {
-       state->destTape = 0;
-       return;
+       /* Create a new tape to hold the next run */
+       Assert(state->outputTapes[state->nOutputRuns] == NULL);
+       Assert(state->nOutputRuns == state->nOutputTapes);
+       state->destTape = LogicalTapeCreate(state->tapeset);
+       state->outputTapes[state->nOutputRuns] = state->destTape;
+       state->nOutputTapes++;
+       state->nOutputRuns++;
    }
-
-   /* Step D4: increase level */
-   state->Level++;
-   a = state->tp_fib[0];
-   for (j = 0; j < state->tapeRange; j++)
+   else
    {
-       state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
-       state->tp_fib[j] = a + state->tp_fib[j + 1];
+       /*
+        * We have reached the max number of tapes.  Append to an existing
+        * tape.
+        */
+       state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
+       state->nOutputRuns++;
    }
-   state->destTape = 0;
 }
 
 /*
@@ -2803,18 +2830,13 @@ init_slab_allocator(Tuplesortstate *state, int numSlots)
 /*
  * mergeruns -- merge all the completed initial runs.
  *
- * This implements steps D5, D6 of Algorithm D.  All input data has
+ * This implements the Balanced k-Way Merge Algorithm.  All input data has
  * already been written to initial runs on tape (see dumptuples).
  */
 static void
 mergeruns(Tuplesortstate *state)
 {
-   int         tapenum,
-               svTape,
-               svRuns,
-               svDummy;
-   int         numTapes;
-   int         numInputTapes;
+   int         tapenum;
 
    Assert(state->status == TSS_BUILDRUNS);
    Assert(state->memtupcount == 0);
@@ -2849,99 +2871,111 @@ mergeruns(Tuplesortstate *state)
    pfree(state->memtuples);
    state->memtuples = NULL;
 
-   /*
-    * If we had fewer runs than tapes, refund the memory that we imagined we
-    * would need for the tape buffers of the unused tapes.
-    *
-    * numTapes and numInputTapes reflect the actual number of tapes we will
-    * use.  Note that the output tape's tape number is maxTapes - 1, so the
-    * tape numbers of the used tapes are not consecutive, and you cannot just
-    * loop from 0 to numTapes to visit all used tapes!
-    */
-   if (state->Level == 1)
-   {
-       numInputTapes = state->currentRun;
-       numTapes = numInputTapes + 1;
-       FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
-   }
-   else
-   {
-       numInputTapes = state->tapeRange;
-       numTapes = state->maxTapes;
-   }
-
    /*
     * Initialize the slab allocator.  We need one slab slot per input tape,
     * for the tuples in the heap, plus one to hold the tuple last returned
     * from tuplesort_gettuple.  (If we're sorting pass-by-val Datums,
     * however, we don't need to do allocate anything.)
     *
+    * In a multi-pass merge, we could shrink this allocation for the last
+    * merge pass, if it has fewer tapes than previous passes, but we don't
+    * bother.
+    *
     * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
     * to track memory usage of individual tuples.
     */
    if (state->tuples)
-       init_slab_allocator(state, numInputTapes + 1);
+       init_slab_allocator(state, state->nOutputTapes + 1);
    else
        init_slab_allocator(state, 0);
 
    /*
     * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
     * from each input tape.
+    *
+    * We could shrink this, too, between passes in a multi-pass merge, but we
+    * don't bother.  (The initial input tapes are still in outputTapes.  The
+    * number of input tapes will not increase between passes.)
     */
-   state->memtupsize = numInputTapes;
+   state->memtupsize = state->nOutputTapes;
    state->memtuples = (SortTuple *) MemoryContextAlloc(state->maincontext,
-                                                       numInputTapes * sizeof(SortTuple));
+                                                       state->nOutputTapes * sizeof(SortTuple));
    USEMEM(state, GetMemoryChunkSpace(state->memtuples));
 
    /*
-    * Use all the remaining memory we have available for read buffers among
-    * the input tapes.
-    *
-    * We don't try to "rebalance" the memory among tapes, when we start a new
-    * merge phase, even if some tapes are inactive in the new phase.  That
-    * would be hard, because logtape.c doesn't know where one run ends and
-    * another begins.  When a new merge phase begins, and a tape doesn't
-    * participate in it, its buffer nevertheless already contains tuples from
-    * the next run on same tape, so we cannot release the buffer.  That's OK
-    * in practice, merge performance isn't that sensitive to the amount of
-    * buffers used, and most merge phases use all or almost all tapes,
-    * anyway.
+    * Use all the remaining memory we have available for tape buffers among
+    * all the input tapes.  At the beginning of each merge pass, we will
+    * divide this memory between the input and output tapes in the pass.
     */
+   state->tape_buffer_mem = state->availMem;
+   USEMEM(state, state->availMem);
 #ifdef TRACE_SORT
    if (trace_sort)
-       elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
-            state->worker, state->availMem / 1024, numInputTapes);
+       elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for tape buffers",
+            state->worker, state->tape_buffer_mem / 1024);
 #endif
 
-   state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
-   USEMEM(state, state->read_buffer_size * numInputTapes);
-
-   /* End of step D2: rewind all output tapes to prepare for merging */
-   for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
-       LogicalTapeRewindForRead(state->tapes[tapenum], state->read_buffer_size);
-
    for (;;)
    {
        /*
-        * At this point we know that tape[T] is empty.  If there's just one
-        * (real or dummy) run left on each input tape, then only one merge
-        * pass remains.  If we don't have to produce a materialized sorted
-        * tape, we can stop at this point and do the final merge on-the-fly.
+        * On the first iteration, or if we have read all the runs from the
+        * input tapes in a multi-pass merge, it's time to start a new pass.
+        * Rewind all the output tapes, and make them inputs for the next
+        * pass.
         */
-       if (!state->randomAccess && !WORKER(state))
+       if (state->nInputRuns == 0 && !WORKER(state))
        {
-           bool        allOneRun = true;
+           int64       input_buffer_size;
 
-           Assert(state->tp_runs[state->tapeRange] == 0);
-           for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
+           /* Close the old, emptied, input tapes */
+           if (state->nInputTapes > 0)
            {
-               if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
-               {
-                   allOneRun = false;
-                   break;
-               }
+               for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
+                   LogicalTapeClose(state->inputTapes[tapenum]);
+               pfree(state->inputTapes);
            }
-           if (allOneRun)
+
+           /* Previous pass's outputs become next pass's inputs. */
+           state->inputTapes = state->outputTapes;
+           state->nInputTapes = state->nOutputTapes;
+           state->nInputRuns = state->nOutputRuns;
+
+           /*
+            * Reset output tape variables.  The actual LogicalTapes will be
+            * created as needed, here we only allocate the array to hold
+            * them.
+            */
+           state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
+           state->nOutputTapes = 0;
+           state->nOutputRuns = 0;
+
+           /*
+            * Redistribute the memory allocated for tape buffers, among the
+            * new input and output tapes.
+            */
+           input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
+                                                      state->nInputTapes,
+                                                      state->nInputRuns,
+                                                      state->maxTapes);
+
+#ifdef TRACE_SORT
+           if (trace_sort)
+               elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
+                    state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
+                    pg_rusage_show(&state->ru_start));
+#endif
+
+           /* Prepare the new input tapes for merge pass. */
+           for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
+               LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
+
+           /*
+            * If there's just one run left on each input tape, then only one
+            * merge pass remains.  If we don't have to produce a materialized
+            * sorted tape, we can stop at this point and do the final merge
+            * on-the-fly.
+            */
+           if (!state->randomAccess && state->nInputRuns <= state->nInputTapes)
            {
                /* Tell logtape.c we won't be writing anymore */
                LogicalTapeSetForgetFreeSpace(state->tapeset);
@@ -2952,103 +2986,47 @@ mergeruns(Tuplesortstate *state)
            }
        }
 
-       /* Step D5: merge runs onto tape[T] until tape[P] is empty */
-       while (state->tp_runs[state->tapeRange - 1] ||
-              state->tp_dummy[state->tapeRange - 1])
-       {
-           bool        allDummy = true;
-
-           for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
-           {
-               if (state->tp_dummy[tapenum] == 0)
-               {
-                   allDummy = false;
-                   break;
-               }
-           }
-
-           if (allDummy)
-           {
-               state->tp_dummy[state->tapeRange]++;
-               for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
-                   state->tp_dummy[tapenum]--;
-           }
-           else
-               mergeonerun(state);
-       }
-
-       /* Step D6: decrease level */
-       if (--state->Level == 0)
-           break;
-
-       /* rewind output tape T to use as new input */
-       LogicalTapeRewindForRead(state->tapes[state->tp_tapenum[state->tapeRange]],
-                                state->read_buffer_size);
+       /* Select an output tape */
+       selectnewtape(state);
 
-       /* close used-up input tape P, and create a new one for write pass */
-       LogicalTapeClose(state->tapes[state->tp_tapenum[state->tapeRange - 1]]);
-       state->tapes[state->tp_tapenum[state->tapeRange - 1]] = LogicalTapeCreate(state->tapeset);
-       state->tp_runs[state->tapeRange - 1] = 0;
+       /* Merge one run from each input tape. */
+       mergeonerun(state);
 
        /*
-        * reassign tape units per step D6; note we no longer care about A[]
+        * If the input tapes are empty, and we output only one output run,
+        * we're done.  The current output tape contains the final result.
         */
-       svTape = state->tp_tapenum[state->tapeRange];
-       svDummy = state->tp_dummy[state->tapeRange];
-       svRuns = state->tp_runs[state->tapeRange];
-       for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
-       {
-           state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
-           state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
-           state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
-       }
-       state->tp_tapenum[0] = svTape;
-       state->tp_dummy[0] = svDummy;
-       state->tp_runs[0] = svRuns;
+       if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
+           break;
    }
 
    /*
-    * Done.  Knuth says that the result is on TAPE[1], but since we exited
-    * the loop without performing the last iteration of step D6, we have not
-    * rearranged the tape unit assignment, and therefore the result is on
-    * TAPE[T].  We need to do it this way so that we can freeze the final
-    * output tape while rewinding it.  The last iteration of step D6 would be
-    * a waste of cycles anyway...
+    * Done.  The result is on a single run on a single tape.
     */
-   state->result_tape = state->tapes[state->tp_tapenum[state->tapeRange]];
+   state->result_tape = state->outputTapes[0];
    if (!WORKER(state))
        LogicalTapeFreeze(state->result_tape, NULL);
    else
        worker_freeze_result_tape(state);
    state->status = TSS_SORTEDONTAPE;
 
-   /* Close all the other tapes, to release their read buffers. */
-   for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
-   {
-       if (state->tapes[tapenum] != state->result_tape)
-       {
-           LogicalTapeClose(state->tapes[tapenum]);
-           state->tapes[tapenum] = NULL;
-       }
-   }
+   /* Close all the now-empty input tapes, to release their read buffers. */
+   for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
+       LogicalTapeClose(state->inputTapes[tapenum]);
 }
 
 /*
- * Merge one run from each input tape, except ones with dummy runs.
- *
- * This is the inner loop of Algorithm D step D5.  We know that the
- * output tape is TAPE[T].
+ * Merge one run from each input tape.
  */
 static void
 mergeonerun(Tuplesortstate *state)
 {
-   int         destTapeNum = state->tp_tapenum[state->tapeRange];
-   LogicalTape *destTape = state->tapes[destTapeNum];
-   int         srcTape;
+   int         srcTapeIndex;
+   LogicalTape *srcTape;
 
    /*
     * Start the merge by loading one tuple from each active source tape into
-    * the heap.  We can also decrease the input run/dummy run counts.
+    * the heap.
     */
    beginmerge(state);
 
@@ -3062,8 +3040,9 @@ mergeonerun(Tuplesortstate *state)
        SortTuple   stup;
 
        /* write the tuple to destTape */
-       srcTape = state->memtuples[0].srctape;
-       WRITETUP(state, destTape, &state->memtuples[0]);
+       srcTapeIndex = state->memtuples[0].srctape;
+       srcTape = state->inputTapes[srcTapeIndex];
+       WRITETUP(state, state->destTape, &state->memtuples[0]);
 
        /* recycle the slot of the tuple we just wrote out, for the next read */
        if (state->memtuples[0].tuple)
@@ -3075,72 +3054,47 @@ mergeonerun(Tuplesortstate *state)
         */
        if (mergereadnext(state, srcTape, &stup))
        {
-           stup.srctape = srcTape;
+           stup.srctape = srcTapeIndex;
            tuplesort_heap_replace_top(state, &stup);
+
        }
        else
+       {
            tuplesort_heap_delete_top(state);
+           state->nInputRuns--;
+       }
    }
 
    /*
     * When the heap empties, we're done.  Write an end-of-run marker on the
-    * output tape, and increment its count of real runs.
+    * output tape.
     */
-   markrunend(destTape);
-   state->tp_runs[state->tapeRange]++;
-
-#ifdef TRACE_SORT
-   if (trace_sort)
-       elog(LOG, "worker %d finished %d-way merge step: %s", state->worker,
-            state->activeTapes, pg_rusage_show(&state->ru_start));
-#endif
+   markrunend(state->destTape);
 }
 
 /*
  * beginmerge - initialize for a merge pass
  *
- * We decrease the counts of real and dummy runs for each tape, and mark
- * which tapes contain active input runs in mergeactive[].  Then, fill the
- * merge heap with the first tuple from each active tape.
+ * Fill the merge heap with the first tuple from each input tape.
  */
 static void
 beginmerge(Tuplesortstate *state)
 {
    int         activeTapes;
-   int         tapenum;
-   int         srcTape;
+   int         srcTapeIndex;
 
    /* Heap should be empty here */
    Assert(state->memtupcount == 0);
 
-   /* Adjust run counts and mark the active tapes */
-   memset(state->mergeactive, 0,
-          state->maxTapes * sizeof(*state->mergeactive));
-   activeTapes = 0;
-   for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
-   {
-       if (state->tp_dummy[tapenum] > 0)
-           state->tp_dummy[tapenum]--;
-       else
-       {
-           Assert(state->tp_runs[tapenum] > 0);
-           state->tp_runs[tapenum]--;
-           srcTape = state->tp_tapenum[tapenum];
-           state->mergeactive[srcTape] = true;
-           activeTapes++;
-       }
-   }
-   Assert(activeTapes > 0);
-   state->activeTapes = activeTapes;
+   activeTapes = Min(state->nInputTapes, state->nInputRuns);
 
-   /* Load the merge heap with the first tuple from each input tape */
-   for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
+   for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
    {
        SortTuple   tup;
 
-       if (mergereadnext(state, srcTape, &tup))
+       if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
        {
-           tup.srctape = srcTape;
+           tup.srctape = srcTapeIndex;
            tuplesort_heap_insert(state, &tup);
        }
    }
@@ -3152,20 +3106,13 @@ beginmerge(Tuplesortstate *state)
  * Returns false on EOF.
  */
 static bool
-mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup)
+mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
 {
-   LogicalTape *srcTape = state->tapes[srcTapeIndex];
    unsigned int tuplen;
 
-   if (!state->mergeactive[srcTapeIndex])
-       return false;           /* tape's run is already exhausted */
-
    /* read next tuple, if any */
    if ((tuplen = getlen(srcTape, true)) == 0)
-   {
-       state->mergeactive[srcTapeIndex] = false;
        return false;
-   }
    READTUP(state, stup, srcTape, tuplen);
 
    return true;
@@ -3180,7 +3127,6 @@ mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup)
 static void
 dumptuples(Tuplesortstate *state, bool alltuples)
 {
-   LogicalTape *destTape;
    int         memtupwrite;
    int         i;
 
@@ -3196,22 +3142,13 @@ dumptuples(Tuplesortstate *state, bool alltuples)
     * Final call might require no sorting, in rare cases where we just so
     * happen to have previously LACKMEM()'d at the point where exactly all
     * remaining tuples are loaded into memory, just before input was
-    * exhausted.
-    *
-    * In general, short final runs are quite possible.  Rather than allowing
-    * a special case where there was a superfluous selectnewtape() call (i.e.
-    * a call with no subsequent run actually written to destTape), we prefer
-    * to write out a 0 tuple run.
-    *
-    * mergereadnext() is prepared for 0 tuple runs, and will reliably mark
-    * the tape inactive for the merge when called from beginmerge().  This
-    * case is therefore similar to the case where mergeonerun() finds a dummy
-    * run for the tape, and so doesn't need to merge a run from the tape (or
-    * conceptually "merges" the dummy run, if you prefer).  According to
-    * Knuth, Algorithm D "isn't strictly optimal" in its method of
-    * distribution and dummy run assignment; this edge case seems very
-    * unlikely to make that appreciably worse.
+    * exhausted.  In general, short final runs are quite possible, but avoid
+    * creating a completely empty run.  In a worker, though, we must produce
+    * at least one tape, even if it's empty.
     */
+   if (state->memtupcount == 0 && state->currentRun > 0)
+       return;
+
    Assert(state->status == TSS_BUILDRUNS);
 
    /*
@@ -3224,6 +3161,9 @@ dumptuples(Tuplesortstate *state, bool alltuples)
                 errmsg("cannot have more than %d runs for an external sort",
                        INT_MAX)));
 
+   if (state->currentRun > 0)
+       selectnewtape(state);
+
    state->currentRun++;
 
 #ifdef TRACE_SORT
@@ -3247,10 +3187,9 @@ dumptuples(Tuplesortstate *state, bool alltuples)
 #endif
 
    memtupwrite = state->memtupcount;
-   destTape = state->tapes[state->tp_tapenum[state->destTape]];
    for (i = 0; i < memtupwrite; i++)
    {
-       WRITETUP(state, destTape, &state->memtuples[i]);
+       WRITETUP(state, state->destTape, &state->memtuples[i]);
        state->memtupcount--;
    }
 
@@ -3263,19 +3202,14 @@ dumptuples(Tuplesortstate *state, bool alltuples)
     */
    MemoryContextReset(state->tuplecontext);
 
-   markrunend(destTape);
-   state->tp_runs[state->destTape]++;
-   state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
+   markrunend(state->destTape);
 
 #ifdef TRACE_SORT
    if (trace_sort)
        elog(LOG, "worker %d finished writing run %d to tape %d: %s",
-            state->worker, state->currentRun, state->destTape,
+            state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
             pg_rusage_show(&state->ru_start));
 #endif
-
-   if (!alltuples)
-       selectnewtape(state);
 }
 
 /*
@@ -4669,8 +4603,9 @@ worker_nomergeruns(Tuplesortstate *state)
 {
    Assert(WORKER(state));
    Assert(state->result_tape == NULL);
+   Assert(state->nOutputRuns == 1);
 
-   state->result_tape = state->tapes[state->tp_tapenum[state->destTape]];
+   state->result_tape = state->destTape;
    worker_freeze_result_tape(state);
 }
 
@@ -4707,47 +4642,36 @@ leader_takeover_tapes(Tuplesortstate *state)
     * Create the tapeset from worker tapes, including a leader-owned tape at
     * the end.  Parallel workers are far more expensive than logical tapes,
     * so the number of tapes allocated here should never be excessive.
-    *
-    * We still have a leader tape, though it's not possible to write to it
-    * due to restrictions in the shared fileset infrastructure used by
-    * logtape.c.  It will never be written to in practice because
-    * randomAccess is disallowed for parallel sorts.
     */
-   inittapestate(state, nParticipants + 1);
-   state->tapeset = LogicalTapeSetCreate(false,
-                                         &shared->fileset,
-                                         state->worker);
-   state->tapes = palloc(state->maxTapes * sizeof(LogicalTape *));
-   for (j = 0; j < nParticipants; j++)
-       state->tapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
-   /* tapes[nParticipants] represents the "leader tape", which is not used */
+   inittapestate(state, nParticipants);
+   state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
 
-   /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
+   /*
+    * Set currentRun to reflect the number of runs we will merge (it's not
+    * used for anything, this is just pro forma)
+    */
    state->currentRun = nParticipants;
 
    /*
-    * Initialize variables of Algorithm D to be consistent with runs from
-    * workers having been generated in the leader.
+    * Initialize the state to look the same as after building the initial
+    * runs.
     *
     * There will always be exactly 1 run per worker, and exactly one input
     * tape per run, because workers always output exactly 1 run, even when
     * there were no input tuples for workers to sort.
     */
-   for (j = 0; j < state->maxTapes; j++)
+   state->inputTapes = NULL;
+   state->nInputTapes = 0;
+   state->nInputRuns = 0;
+
+   state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
+   state->nOutputTapes = nParticipants;
+   state->nOutputRuns = nParticipants;
+
+   for (j = 0; j < nParticipants; j++)
    {
-       /* One real run; no dummy runs for worker tapes */
-       state->tp_fib[j] = 1;
-       state->tp_runs[j] = 1;
-       state->tp_dummy[j] = 0;
-       state->tp_tapenum[j] = j;
+       state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
    }
-   /* Leader tape gets one dummy run, and no real runs */
-   state->tp_fib[state->tapeRange] = 0;
-   state->tp_runs[state->tapeRange] = 0;
-   state->tp_dummy[state->tapeRange] = 1;
-
-   state->Level = 1;
-   state->destTape = 0;
 
    state->status = TSS_BUILDRUNS;
 }