* amounts are sorted using temporary files and a standard external sort
* algorithm.
*
- * See Knuth, volume 3, for more than you want to know about the external
- * sorting algorithm. Historically, we divided the input into sorted runs
- * using replacement selection, in the form of a priority tree implemented
- * as a heap (essentially his Algorithm 5.2.3H), but now we always use
- * quicksort for run generation. We merge the runs using polyphase merge,
- * Knuth's Algorithm 5.4.2D. The logical "tapes" used by Algorithm D are
- * implemented by logtape.c, which avoids space wastage by recycling disk
- * space as soon as each block is read from its "tape".
+ * See Knuth, volume 3, for more than you want to know about external
+ * sorting algorithms. The algorithm we use is a balanced k-way merge.
+ * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
+ * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
+ * merge is better. Knuth is assuming that tape drives are expensive
+ * beasts, and in particular that there will always be many more runs than
+ * tape drives. The polyphase merge algorithm was good at keeping all the
+ * tape drives busy, but in our implementation a "tape drive" doesn't cost
+ * much more than a few Kb of memory buffers, so we can afford to have
+ * lots of them. In particular, if we can have as many tape drives as
+ * sorted runs, we can eliminate any repeated I/O at all.
+ *
+ * Historically, we divided the input into sorted runs using replacement
+ * selection, in the form of a priority tree implemented as a heap
+ * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
+ * for run generation.
*
* The approximate amount of memory allowed for any one sort operation
* is specified in kilobytes by the caller (most pass work_mem). Initially,
* tuples just by scanning the tuple array sequentially. If we do exceed
* workMem, we begin to emit tuples into sorted runs in temporary tapes.
* When tuples are dumped in batch after quicksorting, we begin a new run
- * with a new output tape (selected per Algorithm D). After the end of the
- * input is reached, we dump out remaining tuples in memory into a final run,
- * then merge the runs using Algorithm D.
+ * with a new output tape. If we reach the max number of tapes, we write
+ * subsequent runs on the existing tapes in a round-robin fashion. We will
+ * need multiple merge passes to finish the merge in that case. After the
+ * end of the input is reached, we dump out remaining tuples in memory into
+ * a final run, then merge the runs.
*
* When merging runs, we use a heap containing just the frontmost tuple from
* each source run; we repeatedly output the smallest tuple and replace it
* accesses. The pre-reading is handled by logtape.c, we just tell it how
* much memory to use for the buffers.
*
+ * In the current code we determine the number of input tapes M on the basis
+ * of workMem: we want workMem/M to be large enough that we read a fair
+ * amount of data each time we read from a tape, so as to maintain the
+ * locality of access described above. Nonetheless, with large workMem we
+ * can have many tapes. The logical "tapes" are implemented by logtape.c,
+ * which avoids space wastage by recycling disk space as soon as each block
+ * is read from its "tape".
+ *
* When the caller requests random access to the sort result, we form
* the final sorted run on a logical tape which is then "frozen", so
* that we can access it randomly. When the caller does not need random
* on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
* saves one cycle of writing all the data out to disk and reading it in.
*
- * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
- * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
- * to Knuth's figure 70 (section 5.4.2). However, Knuth is assuming that
- * tape drives are expensive beasts, and in particular that there will always
- * be many more runs than tape drives. In our implementation a "tape drive"
- * doesn't cost much more than a few Kb of memory buffers, so we can afford
- * to have lots of them. In particular, if we can have as many tape drives
- * as sorted runs, we can eliminate any repeated I/O at all. In the current
- * code we determine the number of tapes M on the basis of workMem: we want
- * workMem/M to be large enough that we read a fair amount of data each time
- * we preread from a tape, so as to maintain the locality of access described
- * above. Nonetheless, with large workMem we can have many tapes (but not
- * too many -- see the comments in tuplesort_merge_order).
- *
* This module supports parallel sorting. Parallel sorts involve coordination
* among one or more worker processes, and a leader process, each with its own
* tuplesort state. The leader process (or, more accurately, the
* worth of buffer space. This ignores the overhead of all the other data
* structures needed for each tape, but it's probably close enough.
*
- * MERGE_BUFFER_SIZE is how much data we'd like to read from each input
- * tape during a preread cycle (see discussion at top of file).
+ * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
+ * input tape, for pre-reading (see discussion at top of file). This is *in
+ * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
*/
#define MINORDER 6 /* minimum merge order */
#define MAXORDER 500 /* maximum merge order */
bool tuples; /* Can SortTuple.tuple ever be set? */
int64 availMem; /* remaining memory available, in bytes */
int64 allowedMem; /* total memory allowed, in bytes */
- int maxTapes; /* number of tapes (Knuth's T) */
- int tapeRange; /* maxTapes-1 (Knuth's P) */
+ int maxTapes; /* max number of input tapes to merge in each
+ * pass */
int64 maxSpace; /* maximum amount of space occupied among sort
* of groups, either in-memory or on-disk */
bool isMaxSpaceDisk; /* true when maxSpace is value for on-disk
MemoryContext sortcontext; /* memory context holding most sort data */
MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
- LogicalTape **tapes;
/*
* These function pointers decouple the routines that must know what kind
char *slabMemoryEnd; /* end of slab memory arena */
SlabSlot *slabFreeHead; /* head of free list */
- /* Buffer size to use for reading input tapes, during merge. */
- size_t read_buffer_size;
+ /* Memory used for input and output tape buffers. */
+ size_t tape_buffer_mem;
/*
* When we return a tuple to the caller in tuplesort_gettuple_XXX, that
int currentRun;
/*
- * Unless otherwise noted, all pointer variables below are pointers to
- * arrays of length maxTapes, holding per-tape data.
+ * Logical tapes, for merging.
+ *
+ * The initial runs are written in the output tapes. In each merge pass,
+ * the output tapes of the previous pass become the input tapes, and new
+ * output tapes are created as needed. When nInputTapes equals
+ * nInputRuns, there is only one merge pass left.
*/
+ LogicalTape **inputTapes;
+ int nInputTapes;
+ int nInputRuns;
- /*
- * This variable is only used during merge passes. mergeactive[i] is true
- * if we are reading an input run from (actual) tape number i and have not
- * yet exhausted that run.
- */
- bool *mergeactive; /* active input run source? */
+ LogicalTape **outputTapes;
+ int nOutputTapes;
+ int nOutputRuns;
- /*
- * Variables for Algorithm D. Note that destTape is a "logical" tape
- * number, ie, an index into the tp_xxx[] arrays. Be careful to keep
- * "logical" and "actual" tape numbers straight!
- */
- int Level; /* Knuth's l */
- int destTape; /* current output tape (Knuth's j, less 1) */
- int *tp_fib; /* Target Fibonacci run counts (A[]) */
- int *tp_runs; /* # of real runs on each tape */
- int *tp_dummy; /* # of dummy runs for each tape (D[]) */
- int *tp_tapenum; /* Actual tape numbers (TAPE[]) */
- int activeTapes; /* # of active input tapes in merge pass */
+ LogicalTape *destTape; /* current output tape */
/*
* These variables are used after completion of sorting to keep track of
* the next tuple to return. (In the tape case, the tape's current read
* position is also critical state.)
*/
- LogicalTape *result_tape; /* tape of finished output */
+ LogicalTape *result_tape; /* actual tape of finished output */
int current; /* array index (only used if SORTEDINMEM) */
bool eof_reached; /* reached EOF (needed for cursors) */
*
* nParticipants is the number of worker Tuplesortstates known by the
* leader to have actually been launched, which implies that they must
- * finish a run leader can merge. Typically includes a worker state held
- * by the leader process itself. Set in the leader Tuplesortstate only.
+ * finish a run that the leader needs to merge. Typically includes a
+ * worker state held by the leader process itself. Set in the leader
+ * Tuplesortstate only.
*/
int worker;
Sharedsort *shared;
static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state);
static void beginmerge(Tuplesortstate *state);
-static bool mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup);
+static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
static void dumptuples(Tuplesortstate *state, bool alltuples);
static void make_bounded_heap(Tuplesortstate *state);
static void sort_bounded_heap(Tuplesortstate *state);
state->currentRun = 0;
/*
- * maxTapes, tapeRange, and Algorithm D variables will be initialized by
- * inittapes(), if needed
+ * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
+ * inittapes(), if needed.
*/
state->result_tape = NULL; /* flag that result tape has not been formed */
*
* Note: want to include this in reported total cost of sort, hence need
* for two #ifdef TRACE_SORT sections.
+ *
+ * We don't bother to destroy the individual tapes here. They will go away
+ * with the sortcontext. (In TSS_FINALMERGE state, we have closed
+ * finished tapes already.)
*/
if (state->tapeset)
LogicalTapeSetClose(state->tapeset);
{
if (state->status == TSS_FINALMERGE)
elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
- state->worker, state->activeTapes,
+ state->worker, state->nInputTapes,
pg_rusage_show(&state->ru_start));
else
elog(LOG, "performsort of worker %d done: %s",
*/
if (state->memtupcount > 0)
{
- int srcTape = state->memtuples[0].srctape;
+ int srcTapeIndex = state->memtuples[0].srctape;
+ LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
SortTuple newtup;
*stup = state->memtuples[0];
* Remove the top node from the heap.
*/
tuplesort_heap_delete_top(state);
+ state->nInputRuns--;
/*
* Close the tape. It'd go away at the end of the sort
* anyway, but better to release the memory early.
*/
- LogicalTapeClose(state->tapes[srcTape]);
+ LogicalTapeClose(srcTape);
return true;
}
- newtup.srctape = srcTape;
+ newtup.srctape = srcTapeIndex;
tuplesort_heap_replace_top(state, &newtup);
return true;
}
{
int mOrder;
- /*
- * We need one tape for each merge input, plus another one for the output,
- * and each of these tapes needs buffer space. In addition we want
- * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't
- * count).
+ /*----------
+ * In the merge phase, we need buffer space for each input and output tape.
+ * Each pass in the balanced merge algorithm reads from M input tapes, and
+ * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes
+ * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per
+ * input tape.
+ *
+ * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
+ * N * TAPE_BUFFER_OVERHEAD
+ *
+ * Except for the last and next-to-last merge passes, where there can be
+ * fewer tapes left to process, M = N. We choose M so that we have the
+ * desired amount of memory available for the input buffers
+ * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
+ * available for the tape buffers (allowedMem).
*
* Note: you might be thinking we need to account for the memtuples[]
* array in this calculation, but we effectively treat that as part of the
* MERGE_BUFFER_SIZE workspace.
+ *----------
*/
- mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
- (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
+ mOrder = allowedMem /
+ (2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE);
/*
* Even in minimum memory, use at least a MINORDER merge. On the other
* which in turn can cause the same sort to need more runs, which makes
* merging slower even if it can still be done in a single pass. Also,
* high order merges are quite slow due to CPU cache effects; it can be
- * faster to pay the I/O cost of a polyphase merge than to perform a
+ * faster to pay the I/O cost of a multi-pass merge than to perform a
* single merge pass across many hundreds of tapes.
*/
mOrder = Max(mOrder, MINORDER);
return mOrder;
}
+/*
+ * Helper function to calculate how much memory to allocate for the read buffer
+ * of each input tape in a merge pass.
+ *
+ * 'avail_mem' is the amount of memory available for the buffers of all the
+ * tapes, both input and output.
+ * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
+ * 'maxOutputTapes' is the max. number of output tapes we should produce.
+ */
+static int64
+merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
+ int maxOutputTapes)
+{
+ int nOutputRuns;
+ int nOutputTapes;
+
+ /*
+ * How many output tapes will we produce in this pass?
+ *
+ * This is nInputRuns / nInputTapes, rounded up.
+ */
+ nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
+
+ nOutputTapes = Min(nOutputRuns, maxOutputTapes);
+
+ /*
+ * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All
+ * remaining memory is divided evenly between the input tapes.
+ *
+ * This also follows from the formula in tuplesort_merge_order, but here
+ * we derive the input buffer size from the amount of memory available,
+ * and M and N.
+ */
+ return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
+}
+
/*
* inittapes - initialize for tape sorting.
*
static void
inittapes(Tuplesortstate *state, bool mergeruns)
{
- int maxTapes,
- j;
-
Assert(!LEADER(state));
if (mergeruns)
{
- /* Compute number of tapes to use: merge order plus 1 */
- maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
+ /* Compute number of input tapes to use when merging */
+ state->maxTapes = tuplesort_merge_order(state->allowedMem);
}
else
{
/* Workers can sometimes produce single run, output without merge */
Assert(WORKER(state));
- maxTapes = MINORDER + 1;
+ state->maxTapes = MINORDER;
}
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "worker %d switching to external sort with %d tapes: %s",
- state->worker, maxTapes, pg_rusage_show(&state->ru_start));
+ state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
#endif
- /* Create the tape set and allocate the per-tape data arrays */
- inittapestate(state, maxTapes);
+ /* Create the tape set */
+ inittapestate(state, state->maxTapes);
state->tapeset =
LogicalTapeSetCreate(false,
state->shared ? &state->shared->fileset : NULL,
state->worker);
- state->tapes = palloc(maxTapes * sizeof(LogicalTape *));
- for (j = 0; j < maxTapes; j++)
- state->tapes[j] = LogicalTapeCreate(state->tapeset);
state->currentRun = 0;
/*
- * Initialize variables of Algorithm D (step D1).
+ * Initialize logical tape arrays.
*/
- for (j = 0; j < maxTapes; j++)
- {
- state->tp_fib[j] = 1;
- state->tp_runs[j] = 0;
- state->tp_dummy[j] = 1;
- state->tp_tapenum[j] = j;
- }
- state->tp_fib[state->tapeRange] = 0;
- state->tp_dummy[state->tapeRange] = 0;
+ state->inputTapes = NULL;
+ state->nInputTapes = 0;
+ state->nInputRuns = 0;
- state->Level = 1;
- state->destTape = 0;
+ state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
+ state->nOutputTapes = 0;
+ state->nOutputRuns = 0;
state->status = TSS_BUILDRUNS;
+
+ selectnewtape(state);
}
/*
* called already, but it doesn't matter if it is called a second time.
*/
PrepareTempTablespaces();
-
- state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
- state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
- state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
- state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
- state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
-
- /* Record # of tapes allocated (for duration of sort) */
- state->maxTapes = maxTapes;
- /* Record maximum # of tapes usable as inputs when merging */
- state->tapeRange = maxTapes - 1;
}
/*
- * selectnewtape -- select new tape for new initial run.
+ * selectnewtape -- select next tape to output to.
*
* This is called after finishing a run when we know another run
- * must be started. This implements steps D3, D4 of Algorithm D.
+ * must be started. This is used both when building the initial
+ * runs, and during merge passes.
*/
static void
selectnewtape(Tuplesortstate *state)
{
- int j;
- int a;
-
- /* Step D3: advance j (destTape) */
- if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
- {
- state->destTape++;
- return;
- }
- if (state->tp_dummy[state->destTape] != 0)
+ if (state->nOutputRuns < state->maxTapes)
{
- state->destTape = 0;
- return;
+ /* Create a new tape to hold the next run */
+ Assert(state->outputTapes[state->nOutputRuns] == NULL);
+ Assert(state->nOutputRuns == state->nOutputTapes);
+ state->destTape = LogicalTapeCreate(state->tapeset);
+ state->outputTapes[state->nOutputRuns] = state->destTape;
+ state->nOutputTapes++;
+ state->nOutputRuns++;
}
-
- /* Step D4: increase level */
- state->Level++;
- a = state->tp_fib[0];
- for (j = 0; j < state->tapeRange; j++)
+ else
{
- state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
- state->tp_fib[j] = a + state->tp_fib[j + 1];
+ /*
+ * We have reached the max number of tapes. Append to an existing
+ * tape.
+ */
+ state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
+ state->nOutputRuns++;
}
- state->destTape = 0;
}
/*
/*
* mergeruns -- merge all the completed initial runs.
*
- * This implements steps D5, D6 of Algorithm D. All input data has
+ * This implements the Balanced k-Way Merge Algorithm. All input data has
* already been written to initial runs on tape (see dumptuples).
*/
static void
mergeruns(Tuplesortstate *state)
{
- int tapenum,
- svTape,
- svRuns,
- svDummy;
- int numTapes;
- int numInputTapes;
+ int tapenum;
Assert(state->status == TSS_BUILDRUNS);
Assert(state->memtupcount == 0);
pfree(state->memtuples);
state->memtuples = NULL;
- /*
- * If we had fewer runs than tapes, refund the memory that we imagined we
- * would need for the tape buffers of the unused tapes.
- *
- * numTapes and numInputTapes reflect the actual number of tapes we will
- * use. Note that the output tape's tape number is maxTapes - 1, so the
- * tape numbers of the used tapes are not consecutive, and you cannot just
- * loop from 0 to numTapes to visit all used tapes!
- */
- if (state->Level == 1)
- {
- numInputTapes = state->currentRun;
- numTapes = numInputTapes + 1;
- FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
- }
- else
- {
- numInputTapes = state->tapeRange;
- numTapes = state->maxTapes;
- }
-
/*
* Initialize the slab allocator. We need one slab slot per input tape,
* for the tuples in the heap, plus one to hold the tuple last returned
* from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
* however, we don't need to do allocate anything.)
*
+ * In a multi-pass merge, we could shrink this allocation for the last
+ * merge pass, if it has fewer tapes than previous passes, but we don't
+ * bother.
+ *
* From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
* to track memory usage of individual tuples.
*/
if (state->tuples)
- init_slab_allocator(state, numInputTapes + 1);
+ init_slab_allocator(state, state->nOutputTapes + 1);
else
init_slab_allocator(state, 0);
/*
* Allocate a new 'memtuples' array, for the heap. It will hold one tuple
* from each input tape.
+ *
+ * We could shrink this, too, between passes in a multi-pass merge, but we
+ * don't bother. (The initial input tapes are still in outputTapes. The
+ * number of input tapes will not increase between passes.)
*/
- state->memtupsize = numInputTapes;
+ state->memtupsize = state->nOutputTapes;
state->memtuples = (SortTuple *) MemoryContextAlloc(state->maincontext,
- numInputTapes * sizeof(SortTuple));
+ state->nOutputTapes * sizeof(SortTuple));
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
/*
- * Use all the remaining memory we have available for read buffers among
- * the input tapes.
- *
- * We don't try to "rebalance" the memory among tapes, when we start a new
- * merge phase, even if some tapes are inactive in the new phase. That
- * would be hard, because logtape.c doesn't know where one run ends and
- * another begins. When a new merge phase begins, and a tape doesn't
- * participate in it, its buffer nevertheless already contains tuples from
- * the next run on same tape, so we cannot release the buffer. That's OK
- * in practice, merge performance isn't that sensitive to the amount of
- * buffers used, and most merge phases use all or almost all tapes,
- * anyway.
+ * Use all the remaining memory we have available for tape buffers among
+ * all the input tapes. At the beginning of each merge pass, we will
+ * divide this memory between the input and output tapes in the pass.
*/
+ state->tape_buffer_mem = state->availMem;
+ USEMEM(state, state->availMem);
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
- state->worker, state->availMem / 1024, numInputTapes);
+ elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for tape buffers",
+ state->worker, state->tape_buffer_mem / 1024);
#endif
- state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
- USEMEM(state, state->read_buffer_size * numInputTapes);
-
- /* End of step D2: rewind all output tapes to prepare for merging */
- for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
- LogicalTapeRewindForRead(state->tapes[tapenum], state->read_buffer_size);
-
for (;;)
{
/*
- * At this point we know that tape[T] is empty. If there's just one
- * (real or dummy) run left on each input tape, then only one merge
- * pass remains. If we don't have to produce a materialized sorted
- * tape, we can stop at this point and do the final merge on-the-fly.
+ * On the first iteration, or if we have read all the runs from the
+ * input tapes in a multi-pass merge, it's time to start a new pass.
+ * Rewind all the output tapes, and make them inputs for the next
+ * pass.
*/
- if (!state->randomAccess && !WORKER(state))
+ if (state->nInputRuns == 0 && !WORKER(state))
{
- bool allOneRun = true;
+ int64 input_buffer_size;
- Assert(state->tp_runs[state->tapeRange] == 0);
- for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
+ /* Close the old, emptied, input tapes */
+ if (state->nInputTapes > 0)
{
- if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
- {
- allOneRun = false;
- break;
- }
+ for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
+ LogicalTapeClose(state->inputTapes[tapenum]);
+ pfree(state->inputTapes);
}
- if (allOneRun)
+
+ /* Previous pass's outputs become next pass's inputs. */
+ state->inputTapes = state->outputTapes;
+ state->nInputTapes = state->nOutputTapes;
+ state->nInputRuns = state->nOutputRuns;
+
+ /*
+ * Reset output tape variables. The actual LogicalTapes will be
+ * created as needed, here we only allocate the array to hold
+ * them.
+ */
+ state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
+ state->nOutputTapes = 0;
+ state->nOutputRuns = 0;
+
+ /*
+ * Redistribute the memory allocated for tape buffers, among the
+ * new input and output tapes.
+ */
+ input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
+ state->nInputTapes,
+ state->nInputRuns,
+ state->maxTapes);
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
+ state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
+ pg_rusage_show(&state->ru_start));
+#endif
+
+ /* Prepare the new input tapes for merge pass. */
+ for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
+ LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
+
+ /*
+ * If there's just one run left on each input tape, then only one
+ * merge pass remains. If we don't have to produce a materialized
+ * sorted tape, we can stop at this point and do the final merge
+ * on-the-fly.
+ */
+ if (!state->randomAccess && state->nInputRuns <= state->nInputTapes)
{
/* Tell logtape.c we won't be writing anymore */
LogicalTapeSetForgetFreeSpace(state->tapeset);
}
}
- /* Step D5: merge runs onto tape[T] until tape[P] is empty */
- while (state->tp_runs[state->tapeRange - 1] ||
- state->tp_dummy[state->tapeRange - 1])
- {
- bool allDummy = true;
-
- for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
- {
- if (state->tp_dummy[tapenum] == 0)
- {
- allDummy = false;
- break;
- }
- }
-
- if (allDummy)
- {
- state->tp_dummy[state->tapeRange]++;
- for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
- state->tp_dummy[tapenum]--;
- }
- else
- mergeonerun(state);
- }
-
- /* Step D6: decrease level */
- if (--state->Level == 0)
- break;
-
- /* rewind output tape T to use as new input */
- LogicalTapeRewindForRead(state->tapes[state->tp_tapenum[state->tapeRange]],
- state->read_buffer_size);
+ /* Select an output tape */
+ selectnewtape(state);
- /* close used-up input tape P, and create a new one for write pass */
- LogicalTapeClose(state->tapes[state->tp_tapenum[state->tapeRange - 1]]);
- state->tapes[state->tp_tapenum[state->tapeRange - 1]] = LogicalTapeCreate(state->tapeset);
- state->tp_runs[state->tapeRange - 1] = 0;
+ /* Merge one run from each input tape. */
+ mergeonerun(state);
/*
- * reassign tape units per step D6; note we no longer care about A[]
+ * If the input tapes are empty, and we output only one output run,
+ * we're done. The current output tape contains the final result.
*/
- svTape = state->tp_tapenum[state->tapeRange];
- svDummy = state->tp_dummy[state->tapeRange];
- svRuns = state->tp_runs[state->tapeRange];
- for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
- {
- state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
- state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
- state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
- }
- state->tp_tapenum[0] = svTape;
- state->tp_dummy[0] = svDummy;
- state->tp_runs[0] = svRuns;
+ if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
+ break;
}
/*
- * Done. Knuth says that the result is on TAPE[1], but since we exited
- * the loop without performing the last iteration of step D6, we have not
- * rearranged the tape unit assignment, and therefore the result is on
- * TAPE[T]. We need to do it this way so that we can freeze the final
- * output tape while rewinding it. The last iteration of step D6 would be
- * a waste of cycles anyway...
+ * Done. The result is on a single run on a single tape.
*/
- state->result_tape = state->tapes[state->tp_tapenum[state->tapeRange]];
+ state->result_tape = state->outputTapes[0];
if (!WORKER(state))
LogicalTapeFreeze(state->result_tape, NULL);
else
worker_freeze_result_tape(state);
state->status = TSS_SORTEDONTAPE;
- /* Close all the other tapes, to release their read buffers. */
- for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
- {
- if (state->tapes[tapenum] != state->result_tape)
- {
- LogicalTapeClose(state->tapes[tapenum]);
- state->tapes[tapenum] = NULL;
- }
- }
+ /* Close all the now-empty input tapes, to release their read buffers. */
+ for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
+ LogicalTapeClose(state->inputTapes[tapenum]);
}
/*
- * Merge one run from each input tape, except ones with dummy runs.
- *
- * This is the inner loop of Algorithm D step D5. We know that the
- * output tape is TAPE[T].
+ * Merge one run from each input tape.
*/
static void
mergeonerun(Tuplesortstate *state)
{
- int destTapeNum = state->tp_tapenum[state->tapeRange];
- LogicalTape *destTape = state->tapes[destTapeNum];
- int srcTape;
+ int srcTapeIndex;
+ LogicalTape *srcTape;
/*
* Start the merge by loading one tuple from each active source tape into
- * the heap. We can also decrease the input run/dummy run counts.
+ * the heap.
*/
beginmerge(state);
SortTuple stup;
/* write the tuple to destTape */
- srcTape = state->memtuples[0].srctape;
- WRITETUP(state, destTape, &state->memtuples[0]);
+ srcTapeIndex = state->memtuples[0].srctape;
+ srcTape = state->inputTapes[srcTapeIndex];
+ WRITETUP(state, state->destTape, &state->memtuples[0]);
/* recycle the slot of the tuple we just wrote out, for the next read */
if (state->memtuples[0].tuple)
*/
if (mergereadnext(state, srcTape, &stup))
{
- stup.srctape = srcTape;
+ stup.srctape = srcTapeIndex;
tuplesort_heap_replace_top(state, &stup);
+
}
else
+ {
tuplesort_heap_delete_top(state);
+ state->nInputRuns--;
+ }
}
/*
* When the heap empties, we're done. Write an end-of-run marker on the
- * output tape, and increment its count of real runs.
+ * output tape.
*/
- markrunend(destTape);
- state->tp_runs[state->tapeRange]++;
-
-#ifdef TRACE_SORT
- if (trace_sort)
- elog(LOG, "worker %d finished %d-way merge step: %s", state->worker,
- state->activeTapes, pg_rusage_show(&state->ru_start));
-#endif
+ markrunend(state->destTape);
}
/*
* beginmerge - initialize for a merge pass
*
- * We decrease the counts of real and dummy runs for each tape, and mark
- * which tapes contain active input runs in mergeactive[]. Then, fill the
- * merge heap with the first tuple from each active tape.
+ * Fill the merge heap with the first tuple from each input tape.
*/
static void
beginmerge(Tuplesortstate *state)
{
int activeTapes;
- int tapenum;
- int srcTape;
+ int srcTapeIndex;
/* Heap should be empty here */
Assert(state->memtupcount == 0);
- /* Adjust run counts and mark the active tapes */
- memset(state->mergeactive, 0,
- state->maxTapes * sizeof(*state->mergeactive));
- activeTapes = 0;
- for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
- {
- if (state->tp_dummy[tapenum] > 0)
- state->tp_dummy[tapenum]--;
- else
- {
- Assert(state->tp_runs[tapenum] > 0);
- state->tp_runs[tapenum]--;
- srcTape = state->tp_tapenum[tapenum];
- state->mergeactive[srcTape] = true;
- activeTapes++;
- }
- }
- Assert(activeTapes > 0);
- state->activeTapes = activeTapes;
+ activeTapes = Min(state->nInputTapes, state->nInputRuns);
- /* Load the merge heap with the first tuple from each input tape */
- for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
+ for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
{
SortTuple tup;
- if (mergereadnext(state, srcTape, &tup))
+ if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
{
- tup.srctape = srcTape;
+ tup.srctape = srcTapeIndex;
tuplesort_heap_insert(state, &tup);
}
}
* Returns false on EOF.
*/
static bool
-mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup)
+mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
{
- LogicalTape *srcTape = state->tapes[srcTapeIndex];
unsigned int tuplen;
- if (!state->mergeactive[srcTapeIndex])
- return false; /* tape's run is already exhausted */
-
/* read next tuple, if any */
if ((tuplen = getlen(srcTape, true)) == 0)
- {
- state->mergeactive[srcTapeIndex] = false;
return false;
- }
READTUP(state, stup, srcTape, tuplen);
return true;
static void
dumptuples(Tuplesortstate *state, bool alltuples)
{
- LogicalTape *destTape;
int memtupwrite;
int i;
* Final call might require no sorting, in rare cases where we just so
* happen to have previously LACKMEM()'d at the point where exactly all
* remaining tuples are loaded into memory, just before input was
- * exhausted.
- *
- * In general, short final runs are quite possible. Rather than allowing
- * a special case where there was a superfluous selectnewtape() call (i.e.
- * a call with no subsequent run actually written to destTape), we prefer
- * to write out a 0 tuple run.
- *
- * mergereadnext() is prepared for 0 tuple runs, and will reliably mark
- * the tape inactive for the merge when called from beginmerge(). This
- * case is therefore similar to the case where mergeonerun() finds a dummy
- * run for the tape, and so doesn't need to merge a run from the tape (or
- * conceptually "merges" the dummy run, if you prefer). According to
- * Knuth, Algorithm D "isn't strictly optimal" in its method of
- * distribution and dummy run assignment; this edge case seems very
- * unlikely to make that appreciably worse.
+ * exhausted. In general, short final runs are quite possible, but avoid
+ * creating a completely empty run. In a worker, though, we must produce
+ * at least one tape, even if it's empty.
*/
+ if (state->memtupcount == 0 && state->currentRun > 0)
+ return;
+
Assert(state->status == TSS_BUILDRUNS);
/*
errmsg("cannot have more than %d runs for an external sort",
INT_MAX)));
+ if (state->currentRun > 0)
+ selectnewtape(state);
+
state->currentRun++;
#ifdef TRACE_SORT
#endif
memtupwrite = state->memtupcount;
- destTape = state->tapes[state->tp_tapenum[state->destTape]];
for (i = 0; i < memtupwrite; i++)
{
- WRITETUP(state, destTape, &state->memtuples[i]);
+ WRITETUP(state, state->destTape, &state->memtuples[i]);
state->memtupcount--;
}
*/
MemoryContextReset(state->tuplecontext);
- markrunend(destTape);
- state->tp_runs[state->destTape]++;
- state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
+ markrunend(state->destTape);
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "worker %d finished writing run %d to tape %d: %s",
- state->worker, state->currentRun, state->destTape,
+ state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
pg_rusage_show(&state->ru_start));
#endif
-
- if (!alltuples)
- selectnewtape(state);
}
/*
{
Assert(WORKER(state));
Assert(state->result_tape == NULL);
+ Assert(state->nOutputRuns == 1);
- state->result_tape = state->tapes[state->tp_tapenum[state->destTape]];
+ state->result_tape = state->destTape;
worker_freeze_result_tape(state);
}
* Create the tapeset from worker tapes, including a leader-owned tape at
* the end. Parallel workers are far more expensive than logical tapes,
* so the number of tapes allocated here should never be excessive.
- *
- * We still have a leader tape, though it's not possible to write to it
- * due to restrictions in the shared fileset infrastructure used by
- * logtape.c. It will never be written to in practice because
- * randomAccess is disallowed for parallel sorts.
*/
- inittapestate(state, nParticipants + 1);
- state->tapeset = LogicalTapeSetCreate(false,
- &shared->fileset,
- state->worker);
- state->tapes = palloc(state->maxTapes * sizeof(LogicalTape *));
- for (j = 0; j < nParticipants; j++)
- state->tapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
- /* tapes[nParticipants] represents the "leader tape", which is not used */
+ inittapestate(state, nParticipants);
+ state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
- /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
+ /*
+ * Set currentRun to reflect the number of runs we will merge (it's not
+ * used for anything, this is just pro forma)
+ */
state->currentRun = nParticipants;
/*
- * Initialize variables of Algorithm D to be consistent with runs from
- * workers having been generated in the leader.
+ * Initialize the state to look the same as after building the initial
+ * runs.
*
* There will always be exactly 1 run per worker, and exactly one input
* tape per run, because workers always output exactly 1 run, even when
* there were no input tuples for workers to sort.
*/
- for (j = 0; j < state->maxTapes; j++)
+ state->inputTapes = NULL;
+ state->nInputTapes = 0;
+ state->nInputRuns = 0;
+
+ state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
+ state->nOutputTapes = nParticipants;
+ state->nOutputRuns = nParticipants;
+
+ for (j = 0; j < nParticipants; j++)
{
- /* One real run; no dummy runs for worker tapes */
- state->tp_fib[j] = 1;
- state->tp_runs[j] = 1;
- state->tp_dummy[j] = 0;
- state->tp_tapenum[j] = j;
+ state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
}
- /* Leader tape gets one dummy run, and no real runs */
- state->tp_fib[state->tapeRange] = 0;
- state->tp_runs[state->tapeRange] = 0;
- state->tp_dummy[state->tapeRange] = 1;
-
- state->Level = 1;
- state->destTape = 0;
state->status = TSS_BUILDRUNS;
}