/* Do the heap scan */
    reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
-                                  bloomBuildCallback, (void *) &buildstate);
+                                  bloomBuildCallback, (void *) &buildstate,
+                                  NULL);
 
    /*
     * There are could be some items in cached page.  Flush this page if
 
 
         <para>
          When changing this value, consider also adjusting
-         <xref linkend="guc-max-parallel-workers"/> and
+         <xref linkend="guc-max-parallel-workers"/>,
+         <xref linkend="guc-max-parallel-workers-maintenance"/>, and
          <xref linkend="guc-max-parallel-workers-per-gather"/>.
         </para>
        </listitem>
        </listitem>
       </varlistentry>
 
+      <varlistentry id="guc-max-parallel-workers-maintenance" xreflabel="max_parallel_maintenance_workers">
+       <term><varname>max_parallel_maintenance_workers</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>max_parallel_maintenance_workers</varname> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Sets the maximum number of parallel workers that can be
+         started by a single utility command.  Currently, the only
+         parallel utility command that supports the use of parallel
+         workers is <command>CREATE INDEX</command>, and only when
+         building a B-tree index.  Parallel workers are taken from the
+         pool of processes established by <xref
+         linkend="guc-max-worker-processes"/>, limited by <xref
+         linkend="guc-max-parallel-workers"/>.  Note that the requested
+         number of workers may not actually be available at runtime.
+         If this occurs, the utility operation will run with fewer
+         workers than expected.  The default value is 2.  Setting this
+         value to 0 disables the use of parallel workers by utility
+         commands.
+        </para>
+
+        <para>
+         Note that parallel utility commands should not consume
+         substantially more memory than equivalent non-parallel
+         operations.  This strategy differs from that of parallel
+         query, where resource limits generally apply per worker
+         process.  Parallel utility commands treat the resource limit
+         <varname>maintenance_work_mem</varname> as a limit to be applied to
+         the entire utility command, regardless of the number of
+         parallel worker processes.  However, parallel utility
+         commands may still consume substantially more CPU resources
+         and I/O bandwidth.
+        </para>
+       </listitem>
+      </varlistentry>
+
       <varlistentry id="guc-max-parallel-workers" xreflabel="max_parallel_workers">
        <term><varname>max_parallel_workers</varname> (<type>integer</type>)
        <indexterm>
        <listitem>
         <para>
          Sets the maximum number of workers that the system can support for
-         parallel queries.  The default value is 8.  When increasing or
+         parallel operations.  The default value is 8.  When increasing or
          decreasing this value, consider also adjusting
+         <xref linkend="guc-max-parallel-workers-maintenance"/> and
          <xref linkend="guc-max-parallel-workers-per-gather"/>.
          Also, note that a setting for this value which is higher than
          <xref linkend="guc-max-worker-processes"/> will have no effect,
 
          <entry>Waiting in an extension.</entry>
         </row>
         <row>
-         <entry morerows="32"><literal>IPC</literal></entry>
+         <entry morerows="33"><literal>IPC</literal></entry>
          <entry><literal>BgWorkerShutdown</literal></entry>
          <entry>Waiting for background worker to shut down.</entry>
         </row>
          <entry><literal>ParallelBitmapScan</literal></entry>
          <entry>Waiting for parallel bitmap scan to become initialized.</entry>
         </row>
+        <row>
+         <entry><literal>ParallelCreateIndexScan</literal></entry>
+         <entry>Waiting for parallel <command>CREATE INDEX</command> workers to finish heap scan.</entry>
+        </row>
         <row>
          <entry><literal>ProcArrayGroupUpdate</literal></entry>
          <entry>Waiting for group leader to clear transaction id at transaction end.</entry>
     </row>
     <row>
      <entry><literal>sort-start</literal></entry>
-     <entry><literal>(int, bool, int, int, bool)</literal></entry>
+     <entry><literal>(int, bool, int, int, bool, int)</literal></entry>
      <entry>Probe that fires when a sort operation is started.
       arg0 indicates heap, index or datum sort.
       arg1 is true for unique-value enforcement.
       arg2 is the number of key columns.
       arg3 is the number of kilobytes of work memory allowed.
-      arg4 is true if random access to the sort result is required.</entry>
+      arg4 is true if random access to the sort result is required.
+      arg5 indicates serial when <literal>0</literal>, parallel worker when
+      <literal>1</literal>, or parallel leader when <literal>2</literal>.</entry>
     </row>
     <row>
      <entry><literal>sort-done</literal></entry>
 
    which would drive the machine into swapping.
   </para>
 
+  <para>
+   <productname>PostgreSQL</productname> can build indexes while
+   leveraging multiple CPUs in order to process the table rows faster.
+   This feature is known as <firstterm>parallel index
+   build</firstterm>.  For index methods that support building indexes
+   in parallel (currently, only B-tree),
+   <varname>maintenance_work_mem</varname> specifies the maximum
+   amount of memory that can be used by each index build operation as
+   a whole, regardless of how many worker processes were started.
+   Generally, a cost model automatically determines how many worker
+   processes should be requested, if any.
+  </para>
+
+  <para>
+   Parallel index builds may benefit from increasing
+   <varname>maintenance_work_mem</varname> where an equivalent serial
+   index build will see little or no benefit.  Note that
+   <varname>maintenance_work_mem</varname> may influence the number of
+   worker processes requested, since parallel workers must have at
+   least a <literal>32MB</literal> share of the total
+   <varname>maintenance_work_mem</varname> budget.  There must also be
+   a remaining <literal>32MB</literal> share for the leader process.
+   Increasing <xref linkend="guc-max-parallel-workers-maintenance"/>
+   may allow more workers to be used, which will reduce the time
+   needed for index creation, so long as the index build is not
+   already I/O bound.  Of course, there should also be sufficient
+   CPU capacity that would otherwise lie idle.
+  </para>
+
+  <para>
+   Setting a value for <literal>parallel_workers</literal> via <xref
+   linkend="sql-altertable"/> directly controls how many parallel
+   worker processes will be requested by a <command>CREATE
+   INDEX</command> against the table.  This bypasses the cost model
+   completely, and prevents <varname>maintenance_work_mem</varname>
+   from affecting how many parallel workers are requested.  Setting
+   <literal>parallel_workers</literal> to 0 via <command>ALTER
+   TABLE</command> will disable parallel index builds on the table in
+   all cases.
+  </para>
+
+  <tip>
+   <para>
+    You might want to reset <literal>parallel_workers</literal> after
+    setting it as part of tuning an index build.  This avoids
+    inadvertent changes to query plans, since
+    <literal>parallel_workers</literal> affects
+    <emphasis>all</emphasis> parallel table scans.
+   </para>
+  </tip>
+
+  <para>
+   While <command>CREATE INDEX</command> with the
+   <literal>CONCURRENTLY</literal> option supports parallel builds
+   without special restrictions, only the first table scan is actually
+   performed in parallel.
+  </para>
+
   <para>
    Use <xref linkend="sql-dropindex"/>
    to remove an index.
 
       This sets the number of workers that should be used to assist a parallel
       scan of this table.  If not set, the system will determine a value based
       on the relation size.  The actual number of workers chosen by the planner
-      may be less, for example due to
-      the setting of <xref linkend="guc-max-worker-processes"/>.
+      or by utility statements that use parallel scans may be less, for example
+      due to the setting of <xref linkend="guc-max-worker-processes"/>.
      </para>
     </listitem>
    </varlistentry>
 
     * heap blocks in physical order.
     */
    reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
-                                  brinbuildCallback, (void *) state);
+                                  brinbuildCallback, (void *) state, NULL);
 
    /* process the final batch */
    form_and_insert_tuple(state);
    state->bs_currRangeStart = heapBlk;
    IndexBuildHeapRangeScan(heapRel, state->bs_irel, indexInfo, false, true,
                            heapBlk, scanNumBlks,
-                           brinbuildCallback, (void *) state);
+                           brinbuildCallback, (void *) state, NULL);
 
    /*
     * Now we update the values obtained by the scan with the placeholder
 
     * prefers to receive tuples in TID order.
     */
    reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
-                                  ginBuildCallback, (void *) &buildstate);
+                                  ginBuildCallback, (void *) &buildstate, NULL);
 
    /* dump remaining entries to the index */
    oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx);
 
     * Do the heap scan.
     */
    reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
-                                  gistBuildCallback, (void *) &buildstate);
+                                  gistBuildCallback, (void *) &buildstate, NULL);
 
    /*
     * If buffering was used, flush out all the tuples that are still in the
 
 
    /* do the heap scan */
    reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
-                                  hashbuildCallback, (void *) &buildstate);
+                                  hashbuildCallback, (void *) &buildstate, NULL);
 
    if (buildstate.spool)
    {
 
                                                   hspool->low_mask,
                                                   hspool->max_buckets,
                                                   maintenance_work_mem,
+                                                  NULL,
                                                   false);
 
    return hspool;
 
    SpinLockInit(&target->phs_mutex);
    target->phs_startblock = InvalidBlockNumber;
    pg_atomic_init_u64(&target->phs_nallocated, 0);
-   SerializeSnapshot(snapshot, target->phs_snapshot_data);
+   if (IsMVCCSnapshot(snapshot))
+   {
+       SerializeSnapshot(snapshot, target->phs_snapshot_data);
+       target->phs_snapshot_any = false;
+   }
+   else
+   {
+       Assert(snapshot == SnapshotAny);
+       target->phs_snapshot_any = true;
+   }
 }
 
 /* ----------------
    Snapshot    snapshot;
 
    Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
-   snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
-   RegisterSnapshot(snapshot);
+
+   if (!parallel_scan->phs_snapshot_any)
+   {
+       /* Snapshot was serialized -- restore it */
+       snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
+       RegisterSnapshot(snapshot);
+   }
+   else
+   {
+       /* SnapshotAny passed by caller (not serialized) */
+       snapshot = SnapshotAny;
+   }
 
    return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan,
-                                  true, true, true, false, false, true);
+                                  true, true, true, false, false,
+                                  !parallel_scan->phs_snapshot_any);
 }
 
 /* ----------------
 
 #include "access/nbtree.h"
 #include "access/relscan.h"
 #include "access/xlog.h"
-#include "catalog/index.h"
 #include "commands/vacuum.h"
+#include "nodes/execnodes.h"
 #include "pgstat.h"
 #include "storage/condition_variable.h"
 #include "storage/indexfsm.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/smgr.h"
-#include "tcop/tcopprot.h"     /* pgrminclude ignore */
 #include "utils/builtins.h"
 #include "utils/index_selfuncs.h"
 #include "utils/memutils.h"
 
 
-/* Working state for btbuild and its callback */
-typedef struct
-{
-   bool        isUnique;
-   bool        haveDead;
-   Relation    heapRel;
-   BTSpool    *spool;
-
-   /*
-    * spool2 is needed only when the index is a unique index. Dead tuples are
-    * put into spool2 instead of spool in order to avoid uniqueness check.
-    */
-   BTSpool    *spool2;
-   double      indtuples;
-} BTBuildState;
-
 /* Working state needed by btvacuumpage */
 typedef struct
 {
 typedef struct BTParallelScanDescData *BTParallelScanDesc;
 
 
-static void btbuildCallback(Relation index,
-               HeapTuple htup,
-               Datum *values,
-               bool *isnull,
-               bool tupleIsAlive,
-               void *state);
 static void btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
             IndexBulkDeleteCallback callback, void *callback_state,
             BTCycleId cycleid);
    PG_RETURN_POINTER(amroutine);
 }
 
-/*
- * btbuild() -- build a new btree index.
- */
-IndexBuildResult *
-btbuild(Relation heap, Relation index, IndexInfo *indexInfo)
-{
-   IndexBuildResult *result;
-   double      reltuples;
-   BTBuildState buildstate;
-
-   buildstate.isUnique = indexInfo->ii_Unique;
-   buildstate.haveDead = false;
-   buildstate.heapRel = heap;
-   buildstate.spool = NULL;
-   buildstate.spool2 = NULL;
-   buildstate.indtuples = 0;
-
-#ifdef BTREE_BUILD_STATS
-   if (log_btree_build_stats)
-       ResetUsage();
-#endif                         /* BTREE_BUILD_STATS */
-
-   /*
-    * We expect to be called exactly once for any index relation. If that's
-    * not the case, big trouble's what we have.
-    */
-   if (RelationGetNumberOfBlocks(index) != 0)
-       elog(ERROR, "index \"%s\" already contains data",
-            RelationGetRelationName(index));
-
-   buildstate.spool = _bt_spoolinit(heap, index, indexInfo->ii_Unique, false);
-
-   /*
-    * If building a unique index, put dead tuples in a second spool to keep
-    * them out of the uniqueness check.
-    */
-   if (indexInfo->ii_Unique)
-       buildstate.spool2 = _bt_spoolinit(heap, index, false, true);
-
-   /* do the heap scan */
-   reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
-                                  btbuildCallback, (void *) &buildstate);
-
-   /* okay, all heap tuples are indexed */
-   if (buildstate.spool2 && !buildstate.haveDead)
-   {
-       /* spool2 turns out to be unnecessary */
-       _bt_spooldestroy(buildstate.spool2);
-       buildstate.spool2 = NULL;
-   }
-
-   /*
-    * Finish the build by (1) completing the sort of the spool file, (2)
-    * inserting the sorted tuples into btree pages and (3) building the upper
-    * levels.
-    */
-   _bt_leafbuild(buildstate.spool, buildstate.spool2);
-   _bt_spooldestroy(buildstate.spool);
-   if (buildstate.spool2)
-       _bt_spooldestroy(buildstate.spool2);
-
-#ifdef BTREE_BUILD_STATS
-   if (log_btree_build_stats)
-   {
-       ShowUsage("BTREE BUILD STATS");
-       ResetUsage();
-   }
-#endif                         /* BTREE_BUILD_STATS */
-
-   /*
-    * Return statistics
-    */
-   result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
-
-   result->heap_tuples = reltuples;
-   result->index_tuples = buildstate.indtuples;
-
-   return result;
-}
-
-/*
- * Per-tuple callback from IndexBuildHeapScan
- */
-static void
-btbuildCallback(Relation index,
-               HeapTuple htup,
-               Datum *values,
-               bool *isnull,
-               bool tupleIsAlive,
-               void *state)
-{
-   BTBuildState *buildstate = (BTBuildState *) state;
-
-   /*
-    * insert the index tuple into the appropriate spool file for subsequent
-    * processing
-    */
-   if (tupleIsAlive || buildstate->spool2 == NULL)
-       _bt_spool(buildstate->spool, &htup->t_self, values, isnull);
-   else
-   {
-       /* dead tuples are put into spool2 */
-       buildstate->haveDead = true;
-       _bt_spool(buildstate->spool2, &htup->t_self, values, isnull);
-   }
-
-   buildstate->indtuples += 1;
-}
-
 /*
  * btbuildempty() -- build an empty btree index in the initialization fork
  */
 
 #include "postgres.h"
 
 #include "access/nbtree.h"
+#include "access/parallel.h"
+#include "access/relscan.h"
+#include "access/xact.h"
 #include "access/xlog.h"
 #include "access/xloginsert.h"
+#include "catalog/index.h"
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "storage/smgr.h"
-#include "tcop/tcopprot.h"
+#include "tcop/tcopprot.h"     /* pgrminclude ignore */
 #include "utils/rel.h"
 #include "utils/sortsupport.h"
 #include "utils/tuplesort.h"
 
 
+/* Magic numbers for parallel state sharing */
+#define PARALLEL_KEY_BTREE_SHARED      UINT64CONST(0xA000000000000001)
+#define PARALLEL_KEY_TUPLESORT         UINT64CONST(0xA000000000000002)
+#define PARALLEL_KEY_TUPLESORT_SPOOL2  UINT64CONST(0xA000000000000003)
+
+/*
+ * DISABLE_LEADER_PARTICIPATION disables the leader's participation in
+ * parallel index builds.  This may be useful as a debugging aid.
+#undef DISABLE_LEADER_PARTICIPATION
+ */
+
 /*
  * Status record for spooling/sorting phase.  (Note we may have two of
  * these due to the special requirements for uniqueness-checking with
  * dead tuples.)
  */
-struct BTSpool
+typedef struct BTSpool
 {
    Tuplesortstate *sortstate;  /* state data for tuplesort.c */
    Relation    heap;
    Relation    index;
    bool        isunique;
-};
+} BTSpool;
+
+/*
+ * Status for index builds performed in parallel.  This is allocated in a
+ * dynamic shared memory segment.  Note that there is a separate tuplesort TOC
+ * entry, private to tuplesort.c but allocated by this module on its behalf.
+ */
+typedef struct BTShared
+{
+   /*
+    * These fields are not modified during the sort.  They primarily exist
+    * for the benefit of worker processes that need to create BTSpool state
+    * corresponding to that used by the leader.
+    */
+   Oid         heaprelid;
+   Oid         indexrelid;
+   bool        isunique;
+   bool        isconcurrent;
+   int         scantuplesortstates;
+
+   /*
+    * workersdonecv is used to monitor the progress of workers.  All parallel
+    * participants must indicate that they are done before leader can use
+    * mutable state that workers maintain during scan (and before leader can
+    * proceed to tuplesort_performsort()).
+    */
+   ConditionVariable workersdonecv;
+
+   /*
+    * mutex protects all fields before heapdesc.
+    *
+    * These fields contain status information of interest to B-Tree index
+    * builds that must work just the same when an index is built in parallel.
+    */
+   slock_t     mutex;
+
+   /*
+    * Mutable state that is maintained by workers, and reported back to
+    * leader at end of parallel scan.
+    *
+    * nparticipantsdone is number of worker processes finished.
+    *
+    * reltuples is the total number of input heap tuples.
+    *
+    * havedead indicates if RECENTLY_DEAD tuples were encountered during
+    * build.
+    *
+    * indtuples is the total number of tuples that made it into the index.
+    *
+    * brokenhotchain indicates if any worker detected a broken HOT chain
+    * during build.
+    */
+   int         nparticipantsdone;
+   double      reltuples;
+   bool        havedead;
+   double      indtuples;
+   bool        brokenhotchain;
+
+   /*
+    * This variable-sized field must come last.
+    *
+    * See _bt_parallel_estimate_shared().
+    */
+   ParallelHeapScanDescData heapdesc;
+} BTShared;
+
+/*
+ * Status for leader in parallel index build.
+ */
+typedef struct BTLeader
+{
+   /* parallel context itself */
+   ParallelContext *pcxt;
+
+   /*
+    * nparticipanttuplesorts is the exact number of worker processes
+    * successfully launched, plus one leader process if it participates as a
+    * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader
+    * participating as a worker).
+    */
+   int         nparticipanttuplesorts;
+
+   /*
+    * Leader process convenience pointers to shared state (leader avoids TOC
+    * lookups).
+    *
+    * btshared is the shared state for entire build.  sharedsort is the
+    * shared, tuplesort-managed state passed to each process tuplesort.
+    * sharedsort2 is the corresponding btspool2 shared state, used only when
+    * building unique indexes.  snapshot is the snapshot used by the scan iff
+    * an MVCC snapshot is required.
+    */
+   BTShared   *btshared;
+   Sharedsort *sharedsort;
+   Sharedsort *sharedsort2;
+   Snapshot    snapshot;
+} BTLeader;
+
+/*
+ * Working state for btbuild and its callback.
+ *
+ * When parallel CREATE INDEX is used, there is a BTBuildState for each
+ * participant.
+ */
+typedef struct BTBuildState
+{
+   bool        isunique;
+   bool        havedead;
+   Relation    heap;
+   BTSpool    *spool;
+
+   /*
+    * spool2 is needed only when the index is a unique index. Dead tuples are
+    * put into spool2 instead of spool in order to avoid uniqueness check.
+    */
+   BTSpool    *spool2;
+   double      indtuples;
+
+   /*
+    * btleader is only present when a parallel index build is performed, and
+    * only in the leader process. (Actually, only the leader has a
+    * BTBuildState.  Workers have their own spool and spool2, though.)
+    */
+   BTLeader   *btleader;
+} BTBuildState;
 
 /*
  * Status record for a btree page being built.  We have one of these
 } BTWriteState;
 
 
+static double _bt_spools_heapscan(Relation heap, Relation index,
+                   BTBuildState *buildstate, IndexInfo *indexInfo);
+static void _bt_spooldestroy(BTSpool *btspool);
+static void _bt_spool(BTSpool *btspool, ItemPointer self,
+         Datum *values, bool *isnull);
+static void _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2);
+static void _bt_build_callback(Relation index, HeapTuple htup, Datum *values,
+                  bool *isnull, bool tupleIsAlive, void *state);
 static Page _bt_blnewpage(uint32 level);
 static BTPageState *_bt_pagestate(BTWriteState *wstate, uint32 level);
 static void _bt_slideleft(Page page);
 static void _bt_uppershutdown(BTWriteState *wstate, BTPageState *state);
 static void _bt_load(BTWriteState *wstate,
         BTSpool *btspool, BTSpool *btspool2);
+static void _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent,
+                  int request);
+static void _bt_end_parallel(BTLeader *btleader);
+static Size _bt_parallel_estimate_shared(Snapshot snapshot);
+static double _bt_parallel_heapscan(BTBuildState *buildstate,
+                     bool *brokenhotchain);
+static void _bt_leader_participate_as_worker(BTBuildState *buildstate);
+static void _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
+                          BTShared *btshared, Sharedsort *sharedsort,
+                          Sharedsort *sharedsort2, int sortmem);
 
 
 /*
- * Interface routines
+ * btbuild() -- build a new btree index.
  */
+IndexBuildResult *
+btbuild(Relation heap, Relation index, IndexInfo *indexInfo)
+{
+   IndexBuildResult *result;
+   BTBuildState buildstate;
+   double      reltuples;
+
+#ifdef BTREE_BUILD_STATS
+   if (log_btree_build_stats)
+       ResetUsage();
+#endif                         /* BTREE_BUILD_STATS */
+
+   buildstate.isunique = indexInfo->ii_Unique;
+   buildstate.havedead = false;
+   buildstate.heap = heap;
+   buildstate.spool = NULL;
+   buildstate.spool2 = NULL;
+   buildstate.indtuples = 0;
+   buildstate.btleader = NULL;
+
+   /*
+    * We expect to be called exactly once for any index relation. If that's
+    * not the case, big trouble's what we have.
+    */
+   if (RelationGetNumberOfBlocks(index) != 0)
+       elog(ERROR, "index \"%s\" already contains data",
+            RelationGetRelationName(index));
+
+   reltuples = _bt_spools_heapscan(heap, index, &buildstate, indexInfo);
+
+   /*
+    * Finish the build by (1) completing the sort of the spool file, (2)
+    * inserting the sorted tuples into btree pages and (3) building the upper
+    * levels.  Finally, it may also be necessary to end use of parallelism.
+    */
+   _bt_leafbuild(buildstate.spool, buildstate.spool2);
+   _bt_spooldestroy(buildstate.spool);
+   if (buildstate.spool2)
+       _bt_spooldestroy(buildstate.spool2);
+   if (buildstate.btleader)
+       _bt_end_parallel(buildstate.btleader);
+
+   result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
+
+   result->heap_tuples = reltuples;
+   result->index_tuples = buildstate.indtuples;
+
+#ifdef BTREE_BUILD_STATS
+   if (log_btree_build_stats)
+   {
+       ShowUsage("BTREE BUILD STATS");
+       ResetUsage();
+   }
+#endif                         /* BTREE_BUILD_STATS */
 
+   return result;
+}
 
 /*
- * create and initialize a spool structure
+ * Create and initialize one or two spool structures, and save them in caller's
+ * buildstate argument.  May also fill-in fields within indexInfo used by index
+ * builds.
+ *
+ * Scans the heap, possibly in parallel, filling spools with IndexTuples.  This
+ * routine encapsulates all aspects of managing parallelism.  Caller need only
+ * call _bt_end_parallel() in parallel case after it is done with spool/spool2.
+ *
+ * Returns the total number of heap tuples scanned.
  */
-BTSpool *
-_bt_spoolinit(Relation heap, Relation index, bool isunique, bool isdead)
+static double
+_bt_spools_heapscan(Relation heap, Relation index, BTBuildState *buildstate,
+                   IndexInfo *indexInfo)
 {
    BTSpool    *btspool = (BTSpool *) palloc0(sizeof(BTSpool));
-   int         btKbytes;
+   SortCoordinate coordinate = NULL;
+   double      reltuples = 0;
 
+   /*
+    * We size the sort area as maintenance_work_mem rather than work_mem to
+    * speed index creation.  This should be OK since a single backend can't
+    * run multiple index creations in parallel (see also: notes on
+    * parallelism and maintenance_work_mem below).
+    */
    btspool->heap = heap;
    btspool->index = index;
-   btspool->isunique = isunique;
+   btspool->isunique = indexInfo->ii_Unique;
+
+   /* Save as primary spool */
+   buildstate->spool = btspool;
+
+   /* Attempt to launch parallel worker scan when required */
+   if (indexInfo->ii_ParallelWorkers > 0)
+       _bt_begin_parallel(buildstate, indexInfo->ii_Concurrent,
+                          indexInfo->ii_ParallelWorkers);
 
    /*
-    * We size the sort area as maintenance_work_mem rather than work_mem to
-    * speed index creation.  This should be OK since a single backend can't
-    * run multiple index creations in parallel.  Note that creation of a
-    * unique index actually requires two BTSpool objects.  We expect that the
-    * second one (for dead tuples) won't get very full, so we give it only
-    * work_mem.
+    * If parallel build requested and at least one worker process was
+    * successfully launched, set up coordination state
     */
-   btKbytes = isdead ? work_mem : maintenance_work_mem;
-   btspool->sortstate = tuplesort_begin_index_btree(heap, index, isunique,
-                                                    btKbytes, false);
+   if (buildstate->btleader)
+   {
+       coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData));
+       coordinate->isWorker = false;
+       coordinate->nParticipants =
+           buildstate->btleader->nparticipanttuplesorts;
+       coordinate->sharedsort = buildstate->btleader->sharedsort;
+   }
 
-   return btspool;
+   /*
+    * Begin serial/leader tuplesort.
+    *
+    * In cases where parallelism is involved, the leader receives the same
+    * share of maintenance_work_mem as a serial sort (it is generally treated
+    * in the same way as a serial sort once we return).  Parallel worker
+    * Tuplesortstates will have received only a fraction of
+    * maintenance_work_mem, though.
+    *
+    * We rely on the lifetime of the Leader Tuplesortstate almost not
+    * overlapping with any worker Tuplesortstate's lifetime.  There may be
+    * some small overlap, but that's okay because we rely on leader
+    * Tuplesortstate only allocating a small, fixed amount of memory here.
+    * When its tuplesort_performsort() is called (by our caller), and
+    * significant amounts of memory are likely to be used, all workers must
+    * have already freed almost all memory held by their Tuplesortstates
+    * (they are about to go away completely, too).  The overall effect is
+    * that maintenance_work_mem always represents an absolute high watermark
+    * on the amount of memory used by a CREATE INDEX operation, regardless of
+    * the use of parallelism or any other factor.
+    */
+   buildstate->spool->sortstate =
+       tuplesort_begin_index_btree(heap, index, buildstate->isunique,
+                                   maintenance_work_mem, coordinate,
+                                   false);
+
+   /*
+    * If building a unique index, put dead tuples in a second spool to keep
+    * them out of the uniqueness check.  We expect that the second spool (for
+    * dead tuples) won't get very full, so we give it only work_mem.
+    */
+   if (indexInfo->ii_Unique)
+   {
+       BTSpool    *btspool2 = (BTSpool *) palloc0(sizeof(BTSpool));
+       SortCoordinate coordinate2 = NULL;
+
+       /* Initialize secondary spool */
+       btspool2->heap = heap;
+       btspool2->index = index;
+       btspool2->isunique = false;
+       /* Save as secondary spool */
+       buildstate->spool2 = btspool2;
+
+       if (buildstate->btleader)
+       {
+           /*
+            * Set up non-private state that is passed to
+            * tuplesort_begin_index_btree() about the basic high level
+            * coordination of a parallel sort.
+            */
+           coordinate2 = (SortCoordinate) palloc0(sizeof(SortCoordinateData));
+           coordinate2->isWorker = false;
+           coordinate2->nParticipants =
+               buildstate->btleader->nparticipanttuplesorts;
+           coordinate2->sharedsort = buildstate->btleader->sharedsort2;
+       }
+
+       /*
+        * We expect that the second one (for dead tuples) won't get very
+        * full, so we give it only work_mem
+        */
+       buildstate->spool2->sortstate =
+           tuplesort_begin_index_btree(heap, index, false, work_mem,
+                                       coordinate2, false);
+   }
+
+   /* Fill spool using either serial or parallel heap scan */
+   if (!buildstate->btleader)
+       reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
+                                      _bt_build_callback, (void *) buildstate,
+                                      NULL);
+   else
+       reltuples = _bt_parallel_heapscan(buildstate,
+                                         &indexInfo->ii_BrokenHotChain);
+
+   /* okay, all heap tuples are spooled */
+   if (buildstate->spool2 && !buildstate->havedead)
+   {
+       /* spool2 turns out to be unnecessary */
+       _bt_spooldestroy(buildstate->spool2);
+       buildstate->spool2 = NULL;
+   }
+
+   return reltuples;
 }
 
 /*
  * clean up a spool structure and its substructures.
  */
-void
+static void
 _bt_spooldestroy(BTSpool *btspool)
 {
    tuplesort_end(btspool->sortstate);
 /*
  * spool an index entry into the sort file.
  */
-void
+static void
 _bt_spool(BTSpool *btspool, ItemPointer self, Datum *values, bool *isnull)
 {
    tuplesort_putindextuplevalues(btspool->sortstate, btspool->index,
  * given a spool loaded by successive calls to _bt_spool,
  * create an entire btree.
  */
-void
+static void
 _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
 {
    BTWriteState wstate;
    _bt_load(&wstate, btspool, btspool2);
 }
 
-
 /*
- * Internal routines.
+ * Per-tuple callback from IndexBuildHeapScan
  */
+static void
+_bt_build_callback(Relation index,
+                  HeapTuple htup,
+                  Datum *values,
+                  bool *isnull,
+                  bool tupleIsAlive,
+                  void *state)
+{
+   BTBuildState *buildstate = (BTBuildState *) state;
 
+   /*
+    * insert the index tuple into the appropriate spool file for subsequent
+    * processing
+    */
+   if (tupleIsAlive || buildstate->spool2 == NULL)
+       _bt_spool(buildstate->spool, &htup->t_self, values, isnull);
+   else
+   {
+       /* dead tuples are put into spool2 */
+       buildstate->havedead = true;
+       _bt_spool(buildstate->spool2, &htup->t_self, values, isnull);
+   }
+
+   buildstate->indtuples += 1;
+}
 
 /*
  * allocate workspace for a new, clean btree page, not linked to any siblings.
        smgrimmedsync(wstate->index->rd_smgr, MAIN_FORKNUM);
    }
 }
+
+/*
+ * Create parallel context, and launch workers for leader.
+ *
+ * buildstate argument should be initialized (with the exception of the
+ * tuplesort state in spools, which may later be created based on shared
+ * state initially set up here).
+ *
+ * isconcurrent indicates if operation is CREATE INDEX CONCURRENTLY.
+ *
+ * request is the target number of parallel worker processes to launch.
+ *
+ * Sets buildstate's BTLeader, which caller must use to shut down parallel
+ * mode by passing it to _bt_end_parallel() at the very end of its index
+ * build.  If not even a single worker process can be launched, this is
+ * never set, and caller should proceed with a serial index build.
+ */
+static void
+_bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
+{
+   ParallelContext *pcxt;
+   int         scantuplesortstates;
+   Snapshot    snapshot;
+   Size        estbtshared;
+   Size        estsort;
+   BTShared   *btshared;
+   Sharedsort *sharedsort;
+   Sharedsort *sharedsort2;
+   BTSpool    *btspool = buildstate->spool;
+   BTLeader   *btleader = (BTLeader *) palloc0(sizeof(BTLeader));
+   bool        leaderparticipates = true;
+
+#ifdef DISABLE_LEADER_PARTICIPATION
+   leaderparticipates = false;
+#endif
+
+   /*
+    * Enter parallel mode, and create context for parallel build of btree
+    * index
+    */
+   EnterParallelMode();
+   Assert(request > 0);
+   pcxt = CreateParallelContext("postgres", "_bt_parallel_build_main",
+                                request, true);
+   scantuplesortstates = leaderparticipates ? request + 1 : request;
+
+   /*
+    * Prepare for scan of the base relation.  In a normal index build, we use
+    * SnapshotAny because we must retrieve all tuples and do our own time
+    * qual checks (because we have to index RECENTLY_DEAD tuples).  In a
+    * concurrent build, we take a regular MVCC snapshot and index whatever's
+    * live according to that.
+    */
+   if (!isconcurrent)
+       snapshot = SnapshotAny;
+   else
+       snapshot = RegisterSnapshot(GetTransactionSnapshot());
+
+   /*
+    * Estimate size for at least two keys -- our own
+    * PARALLEL_KEY_BTREE_SHARED workspace, and PARALLEL_KEY_TUPLESORT
+    * tuplesort workspace
+    */
+   estbtshared = _bt_parallel_estimate_shared(snapshot);
+   shm_toc_estimate_chunk(&pcxt->estimator, estbtshared);
+   estsort = tuplesort_estimate_shared(scantuplesortstates);
+   shm_toc_estimate_chunk(&pcxt->estimator, estsort);
+
+   /*
+    * Unique case requires a second spool, and so we may have to account for
+    * a third shared workspace -- PARALLEL_KEY_TUPLESORT_SPOOL2
+    */
+   if (!btspool->isunique)
+       shm_toc_estimate_keys(&pcxt->estimator, 2);
+   else
+   {
+       shm_toc_estimate_chunk(&pcxt->estimator, estsort);
+       shm_toc_estimate_keys(&pcxt->estimator, 3);
+   }
+
+   /* Everyone's had a chance to ask for space, so now create the DSM */
+   InitializeParallelDSM(pcxt);
+
+   /* Store shared build state, for which we reserved space */
+   btshared = (BTShared *) shm_toc_allocate(pcxt->toc, estbtshared);
+   /* Initialize immutable state */
+   btshared->heaprelid = RelationGetRelid(btspool->heap);
+   btshared->indexrelid = RelationGetRelid(btspool->index);
+   btshared->isunique = btspool->isunique;
+   btshared->isconcurrent = isconcurrent;
+   btshared->scantuplesortstates = scantuplesortstates;
+   ConditionVariableInit(&btshared->workersdonecv);
+   SpinLockInit(&btshared->mutex);
+   /* Initialize mutable state */
+   btshared->nparticipantsdone = 0;
+   btshared->reltuples = 0.0;
+   btshared->havedead = false;
+   btshared->indtuples = 0.0;
+   btshared->brokenhotchain = false;
+   heap_parallelscan_initialize(&btshared->heapdesc, btspool->heap, snapshot);
+
+   /*
+    * Store shared tuplesort-private state, for which we reserved space.
+    * Then, initialize opaque state using tuplesort routine.
+    */
+   sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort);
+   tuplesort_initialize_shared(sharedsort, scantuplesortstates,
+                               pcxt->seg);
+
+   shm_toc_insert(pcxt->toc, PARALLEL_KEY_BTREE_SHARED, btshared);
+   shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort);
+
+   /* Unique case requires a second spool, and associated shared state */
+   if (!btspool->isunique)
+       sharedsort2 = NULL;
+   else
+   {
+       /*
+        * Store additional shared tuplesort-private state, for which we
+        * reserved space.  Then, initialize opaque state using tuplesort
+        * routine.
+        */
+       sharedsort2 = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort);
+       tuplesort_initialize_shared(sharedsort2, scantuplesortstates,
+                                   pcxt->seg);
+
+       shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT_SPOOL2, sharedsort2);
+   }
+
+   /* Launch workers, saving status for leader/caller */
+   LaunchParallelWorkers(pcxt);
+   btleader->pcxt = pcxt;
+   btleader->nparticipanttuplesorts = pcxt->nworkers_launched;
+   if (leaderparticipates)
+       btleader->nparticipanttuplesorts++;
+   btleader->btshared = btshared;
+   btleader->sharedsort = sharedsort;
+   btleader->sharedsort2 = sharedsort2;
+   btleader->snapshot = snapshot;
+
+   /* If no workers were successfully launched, back out (do serial build) */
+   if (pcxt->nworkers_launched == 0)
+   {
+       _bt_end_parallel(btleader);
+       return;
+   }
+
+   /* Save leader state now that it's clear build will be parallel */
+   buildstate->btleader = btleader;
+
+   /* Join heap scan ourselves */
+   if (leaderparticipates)
+       _bt_leader_participate_as_worker(buildstate);
+
+   /*
+    * Caller needs to wait for all launched workers when we return.  Make
+    * sure that the failure-to-start case will not hang forever.
+    */
+   WaitForParallelWorkersToAttach(pcxt);
+}
+
+/*
+ * Shut down workers, destroy parallel context, and end parallel mode.
+ */
+static void
+_bt_end_parallel(BTLeader *btleader)
+{
+   /* Shutdown worker processes */
+   WaitForParallelWorkersToFinish(btleader->pcxt);
+   /* Free last reference to MVCC snapshot, if one was used */
+   if (IsMVCCSnapshot(btleader->snapshot))
+       UnregisterSnapshot(btleader->snapshot);
+   DestroyParallelContext(btleader->pcxt);
+   ExitParallelMode();
+}
+
+/*
+ * Returns size of shared memory required to store state for a parallel
+ * btree index build based on the snapshot its parallel scan will use.
+ */
+static Size
+_bt_parallel_estimate_shared(Snapshot snapshot)
+{
+   if (!IsMVCCSnapshot(snapshot))
+   {
+       Assert(snapshot == SnapshotAny);
+       return sizeof(BTShared);
+   }
+
+   return add_size(offsetof(BTShared, heapdesc) +
+                   offsetof(ParallelHeapScanDescData, phs_snapshot_data),
+                   EstimateSnapshotSpace(snapshot));
+}
+
+/*
+ * Within leader, wait for end of heap scan.
+ *
+ * When called, parallel heap scan started by _bt_begin_parallel() will
+ * already be underway within worker processes (when leader participates
+ * as a worker, we should end up here just as workers are finishing).
+ *
+ * Fills in fields needed for ambuild statistics, and lets caller set
+ * field indicating that some worker encountered a broken HOT chain.
+ *
+ * Returns the total number of heap tuples scanned.
+ */
+static double
+_bt_parallel_heapscan(BTBuildState *buildstate, bool *brokenhotchain)
+{
+   BTShared   *btshared = buildstate->btleader->btshared;
+   int         nparticipanttuplesorts;
+   double      reltuples;
+
+   nparticipanttuplesorts = buildstate->btleader->nparticipanttuplesorts;
+   for (;;)
+   {
+       SpinLockAcquire(&btshared->mutex);
+       if (btshared->nparticipantsdone == nparticipanttuplesorts)
+       {
+           buildstate->havedead = btshared->havedead;
+           buildstate->indtuples = btshared->indtuples;
+           *brokenhotchain = btshared->brokenhotchain;
+           reltuples = btshared->reltuples;
+           SpinLockRelease(&btshared->mutex);
+           break;
+       }
+       SpinLockRelease(&btshared->mutex);
+
+       ConditionVariableSleep(&btshared->workersdonecv,
+                              WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN);
+   }
+
+   ConditionVariableCancelSleep();
+
+   return reltuples;
+}
+
+/*
+ * Within leader, participate as a parallel worker.
+ */
+static void
+_bt_leader_participate_as_worker(BTBuildState *buildstate)
+{
+   BTLeader   *btleader = buildstate->btleader;
+   BTSpool    *leaderworker;
+   BTSpool    *leaderworker2;
+   int         sortmem;
+
+   /* Allocate memory and initialize private spool */
+   leaderworker = (BTSpool *) palloc0(sizeof(BTSpool));
+   leaderworker->heap = buildstate->spool->heap;
+   leaderworker->index = buildstate->spool->index;
+   leaderworker->isunique = buildstate->spool->isunique;
+
+   /* Initialize second spool, if required */
+   if (!btleader->btshared->isunique)
+       leaderworker2 = NULL;
+   else
+   {
+       /* Allocate memory for worker's own private secondary spool */
+       leaderworker2 = (BTSpool *) palloc0(sizeof(BTSpool));
+
+       /* Initialize worker's own secondary spool */
+       leaderworker2->heap = leaderworker->heap;
+       leaderworker2->index = leaderworker->index;
+       leaderworker2->isunique = false;
+   }
+
+   /*
+    * Might as well use reliable figure when doling out maintenance_work_mem
+    * (when requested number of workers were not launched, this will be
+    * somewhat higher than it is for other workers).
+    */
+   sortmem = maintenance_work_mem / btleader->nparticipanttuplesorts;
+
+   /* Perform work common to all participants */
+   _bt_parallel_scan_and_sort(leaderworker, leaderworker2, btleader->btshared,
+                              btleader->sharedsort, btleader->sharedsort2,
+                              sortmem);
+
+#ifdef BTREE_BUILD_STATS
+   if (log_btree_build_stats)
+   {
+       ShowUsage("BTREE BUILD (Leader Partial Spool) STATISTICS");
+       ResetUsage();
+   }
+#endif                         /* BTREE_BUILD_STATS */
+}
+
+/*
+ * Perform work within a launched parallel process.
+ */
+void
+_bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
+{
+   BTSpool    *btspool;
+   BTSpool    *btspool2;
+   BTShared   *btshared;
+   Sharedsort *sharedsort;
+   Sharedsort *sharedsort2;
+   Relation    heapRel;
+   Relation    indexRel;
+   LOCKMODE    heapLockmode;
+   LOCKMODE    indexLockmode;
+   int         sortmem;
+
+#ifdef BTREE_BUILD_STATS
+   if (log_btree_build_stats)
+       ResetUsage();
+#endif                         /* BTREE_BUILD_STATS */
+
+   /* Look up shared state */
+   btshared = shm_toc_lookup(toc, PARALLEL_KEY_BTREE_SHARED, false);
+
+   /* Open relations using lock modes known to be obtained by index.c */
+   if (!btshared->isconcurrent)
+   {
+       heapLockmode = ShareLock;
+       indexLockmode = AccessExclusiveLock;
+   }
+   else
+   {
+       heapLockmode = ShareUpdateExclusiveLock;
+       indexLockmode = RowExclusiveLock;
+   }
+
+   /* Open relations within worker */
+   heapRel = heap_open(btshared->heaprelid, heapLockmode);
+   indexRel = index_open(btshared->indexrelid, indexLockmode);
+
+   /* Initialize worker's own spool */
+   btspool = (BTSpool *) palloc0(sizeof(BTSpool));
+   btspool->heap = heapRel;
+   btspool->index = indexRel;
+   btspool->isunique = btshared->isunique;
+
+   /* Look up shared state private to tuplesort.c */
+   sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false);
+   tuplesort_attach_shared(sharedsort, seg);
+   if (!btshared->isunique)
+   {
+       btspool2 = NULL;
+       sharedsort2 = NULL;
+   }
+   else
+   {
+       /* Allocate memory for worker's own private secondary spool */
+       btspool2 = (BTSpool *) palloc0(sizeof(BTSpool));
+
+       /* Initialize worker's own secondary spool */
+       btspool2->heap = btspool->heap;
+       btspool2->index = btspool->index;
+       btspool2->isunique = false;
+       /* Look up shared state private to tuplesort.c */
+       sharedsort2 = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT_SPOOL2, false);
+       tuplesort_attach_shared(sharedsort2, seg);
+   }
+
+   /* Perform sorting of spool, and possibly a spool2 */
+   sortmem = maintenance_work_mem / btshared->scantuplesortstates;
+   _bt_parallel_scan_and_sort(btspool, btspool2, btshared, sharedsort,
+                              sharedsort2, sortmem);
+
+#ifdef BTREE_BUILD_STATS
+   if (log_btree_build_stats)
+   {
+       ShowUsage("BTREE BUILD (Worker Partial Spool) STATISTICS");
+       ResetUsage();
+   }
+#endif                         /* BTREE_BUILD_STATS */
+
+   index_close(indexRel, indexLockmode);
+   heap_close(heapRel, heapLockmode);
+}
+
+/*
+ * Perform a worker's portion of a parallel sort.
+ *
+ * This generates a tuplesort for passed btspool, and a second tuplesort
+ * state if a second btspool is need (i.e. for unique index builds).  All
+ * other spool fields should already be set when this is called.
+ *
+ * sortmem is the amount of working memory to use within each worker,
+ * expressed in KBs.
+ *
+ * When this returns, workers are done, and need only release resources.
+ */
+static void
+_bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
+                          BTShared *btshared, Sharedsort *sharedsort,
+                          Sharedsort *sharedsort2, int sortmem)
+{
+   SortCoordinate coordinate;
+   BTBuildState buildstate;
+   HeapScanDesc scan;
+   double      reltuples;
+   IndexInfo  *indexInfo;
+
+   /* Initialize local tuplesort coordination state */
+   coordinate = palloc0(sizeof(SortCoordinateData));
+   coordinate->isWorker = true;
+   coordinate->nParticipants = -1;
+   coordinate->sharedsort = sharedsort;
+
+   /* Begin "partial" tuplesort */
+   btspool->sortstate = tuplesort_begin_index_btree(btspool->heap,
+                                                    btspool->index,
+                                                    btspool->isunique,
+                                                    sortmem, coordinate,
+                                                    false);
+
+   /*
+    * Just as with serial case, there may be a second spool.  If so, a
+    * second, dedicated spool2 partial tuplesort is required.
+    */
+   if (btspool2)
+   {
+       SortCoordinate coordinate2;
+
+       /*
+        * We expect that the second one (for dead tuples) won't get very
+        * full, so we give it only work_mem (unless sortmem is less for
+        * worker).  Worker processes are generally permitted to allocate
+        * work_mem independently.
+        */
+       coordinate2 = palloc0(sizeof(SortCoordinateData));
+       coordinate2->isWorker = true;
+       coordinate2->nParticipants = -1;
+       coordinate2->sharedsort = sharedsort2;
+       btspool2->sortstate =
+           tuplesort_begin_index_btree(btspool->heap, btspool->index, false,
+                                       Min(sortmem, work_mem), coordinate2,
+                                       false);
+   }
+
+   /* Fill in buildstate for _bt_build_callback() */
+   buildstate.isunique = btshared->isunique;
+   buildstate.havedead = false;
+   buildstate.heap = btspool->heap;
+   buildstate.spool = btspool;
+   buildstate.spool2 = btspool2;
+   buildstate.indtuples = 0;
+   buildstate.btleader = NULL;
+
+   /* Join parallel scan */
+   indexInfo = BuildIndexInfo(btspool->index);
+   indexInfo->ii_Concurrent = btshared->isconcurrent;
+   scan = heap_beginscan_parallel(btspool->heap, &btshared->heapdesc);
+   reltuples = IndexBuildHeapScan(btspool->heap, btspool->index, indexInfo,
+                                  true, _bt_build_callback,
+                                  (void *) &buildstate, scan);
+
+   /*
+    * Execute this worker's part of the sort.
+    *
+    * Unlike leader and serial cases, we cannot avoid calling
+    * tuplesort_performsort() for spool2 if it ends up containing no dead
+    * tuples (this is disallowed for workers by tuplesort).
+    */
+   tuplesort_performsort(btspool->sortstate);
+   if (btspool2)
+       tuplesort_performsort(btspool2->sortstate);
+
+   /*
+    * Done.  Record ambuild statistics, and whether we encountered a broken
+    * HOT chain.
+    */
+   SpinLockAcquire(&btshared->mutex);
+   btshared->nparticipantsdone++;
+   btshared->reltuples += reltuples;
+   if (buildstate.havedead)
+       btshared->havedead = true;
+   btshared->indtuples += buildstate.indtuples;
+   if (indexInfo->ii_BrokenHotChain)
+       btshared->brokenhotchain = true;
+   SpinLockRelease(&btshared->mutex);
+
+   /* Notify leader */
+   ConditionVariableSignal(&btshared->workersdonecv);
+
+   /* We can end tuplesorts immediately */
+   tuplesort_end(btspool->sortstate);
+   if (btspool2)
+       tuplesort_end(btspool2->sortstate);
+}
 
                                              ALLOCSET_DEFAULT_SIZES);
 
    reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
-                                  spgistBuildCallback, (void *) &buildstate);
+                                  spgistBuildCallback, (void *) &buildstate,
+                                  NULL);
 
    MemoryContextDelete(buildstate.tmpCtx);
 
 
 
 #include "postgres.h"
 
+#include "access/nbtree.h"
 #include "access/parallel.h"
 #include "access/session.h"
 #include "access/xact.h"
 {
    {
        "ParallelQueryMain", ParallelQueryMain
+   },
+   {
+       "_bt_parallel_build_main", _bt_parallel_build_main
    }
 };
 
  */
 ParallelContext *
 CreateParallelContext(const char *library_name, const char *function_name,
-                     int nworkers)
+                     int nworkers, bool serializable_okay)
 {
    MemoryContext oldcontext;
    ParallelContext *pcxt;
    /*
     * If we are running under serializable isolation, we can't use parallel
     * workers, at least not until somebody enhances that mechanism to be
-    * parallel-aware.
+    * parallel-aware.  Utility statement callers may ask us to ignore this
+    * restriction because they're always able to safely ignore the fact that
+    * SIREAD locks do not work with parallelism.
     */
-   if (IsolationIsSerializable())
+   if (IsolationIsSerializable() && !serializable_okay)
        nworkers = 0;
 
    /* We might be running in a short-lived memory context. */
 
        heap = heap_open(ILHead->il_heap, NoLock);
        ind = index_open(ILHead->il_ind, NoLock);
 
-       index_build(heap, ind, ILHead->il_info, false, false);
+       index_build(heap, ind, ILHead->il_info, false, false, false);
 
        index_close(ind, NoLock);
        heap_close(heap, NoLock);
 
 
        /* Initialize the index and rebuild */
        /* Note: we do not need to re-establish pkey setting */
-       index_build(heapRelation, currentIndex, indexInfo, false, true);
+       index_build(heapRelation, currentIndex, indexInfo, false, true, false);
 
        /* We're done with this index */
        index_close(currentIndex, NoLock);
 
 #include "nodes/makefuncs.h"
 #include "nodes/nodeFuncs.h"
 #include "optimizer/clauses.h"
+#include "optimizer/planner.h"
 #include "parser/parser.h"
 #include "rewrite/rewriteManip.h"
 #include "storage/bufmgr.h"
    Assert(indexRelationId == RelationGetRelid(indexRelation));
 
    /*
-    * Obtain exclusive lock on it.  Although no other backends can see it
+    * Obtain exclusive lock on it.  Although no other transactions can see it
     * until we commit, this prevents deadlock-risk complaints from lock
     * manager in cases such as CLUSTER.
     */
    }
    else
    {
-       index_build(heapRelation, indexRelation, indexInfo, isprimary, false);
+       index_build(heapRelation, indexRelation, indexInfo, isprimary, false,
+                   true);
    }
 
    /*
    /* initialize index-build state to default */
    ii->ii_Concurrent = false;
    ii->ii_BrokenHotChain = false;
+   ii->ii_ParallelWorkers = 0;
 
    /* set up for possible use by index AM */
    ii->ii_Am = index->rd_rel->relam;
  *
  * isprimary tells whether to mark the index as a primary-key index.
  * isreindex indicates we are recreating a previously-existing index.
+ * parallel indicates if parallelism may be useful.
  *
  * Note: when reindexing an existing index, isprimary can be false even if
  * the index is a PK; it's already properly marked and need not be re-marked.
            Relation indexRelation,
            IndexInfo *indexInfo,
            bool isprimary,
-           bool isreindex)
+           bool isreindex,
+           bool parallel)
 {
    IndexBuildResult *stats;
    Oid         save_userid;
    Assert(PointerIsValid(indexRelation->rd_amroutine->ambuild));
    Assert(PointerIsValid(indexRelation->rd_amroutine->ambuildempty));
 
-   ereport(DEBUG1,
-           (errmsg("building index \"%s\" on table \"%s\"",
-                   RelationGetRelationName(indexRelation),
-                   RelationGetRelationName(heapRelation))));
+   /*
+    * Determine worker process details for parallel CREATE INDEX.  Currently,
+    * only btree has support for parallel builds.
+    *
+    * Note that planner considers parallel safety for us.
+    */
+   if (parallel && IsNormalProcessingMode() &&
+       indexRelation->rd_rel->relam == BTREE_AM_OID)
+       indexInfo->ii_ParallelWorkers =
+           plan_create_index_workers(RelationGetRelid(heapRelation),
+                                     RelationGetRelid(indexRelation));
+
+   if (indexInfo->ii_ParallelWorkers == 0)
+       ereport(DEBUG1,
+               (errmsg("building index \"%s\" on table \"%s\" serially",
+                       RelationGetRelationName(indexRelation),
+                       RelationGetRelationName(heapRelation))));
+   else
+       ereport(DEBUG1,
+               (errmsg_plural("building index \"%s\" on table \"%s\" with request for %d parallel worker",
+                              "building index \"%s\" on table \"%s\" with request for %d parallel workers",
+                              indexInfo->ii_ParallelWorkers,
+                              RelationGetRelationName(indexRelation),
+                              RelationGetRelationName(heapRelation),
+                              indexInfo->ii_ParallelWorkers)));
 
    /*
     * Switch to the table owner's userid, so that any index functions are run
                   IndexInfo *indexInfo,
                   bool allow_sync,
                   IndexBuildCallback callback,
-                  void *callback_state)
+                  void *callback_state,
+                  HeapScanDesc scan)
 {
    return IndexBuildHeapRangeScan(heapRelation, indexRelation,
                                   indexInfo, allow_sync,
                                   false,
                                   0, InvalidBlockNumber,
-                                  callback, callback_state);
+                                  callback, callback_state, scan);
 }
 
 /*
                        BlockNumber start_blockno,
                        BlockNumber numblocks,
                        IndexBuildCallback callback,
-                       void *callback_state)
+                       void *callback_state,
+                       HeapScanDesc scan)
 {
    bool        is_system_catalog;
    bool        checking_uniqueness;
-   HeapScanDesc scan;
    HeapTuple   heapTuple;
    Datum       values[INDEX_MAX_KEYS];
    bool        isnull[INDEX_MAX_KEYS];
    EState     *estate;
    ExprContext *econtext;
    Snapshot    snapshot;
+   bool        need_unregister_snapshot = false;
    TransactionId OldestXmin;
    BlockNumber root_blkno = InvalidBlockNumber;
    OffsetNumber root_offsets[MaxHeapTuplesPerPage];
     * concurrent build, or during bootstrap, we take a regular MVCC snapshot
     * and index whatever's live according to that.
     */
-   if (IsBootstrapProcessingMode() || indexInfo->ii_Concurrent)
-   {
-       snapshot = RegisterSnapshot(GetTransactionSnapshot());
-       OldestXmin = InvalidTransactionId;  /* not used */
+   OldestXmin = InvalidTransactionId;
+
+   /* okay to ignore lazy VACUUMs here */
+   if (!IsBootstrapProcessingMode() && !indexInfo->ii_Concurrent)
+       OldestXmin = GetOldestXmin(heapRelation, PROCARRAY_FLAGS_VACUUM);
 
-       /* "any visible" mode is not compatible with this */
-       Assert(!anyvisible);
+   if (!scan)
+   {
+       /*
+        * Serial index build.
+        *
+        * Must begin our own heap scan in this case.  We may also need to
+        * register a snapshot whose lifetime is under our direct control.
+        */
+       if (!TransactionIdIsValid(OldestXmin))
+       {
+           snapshot = RegisterSnapshot(GetTransactionSnapshot());
+           need_unregister_snapshot = true;
+       }
+       else
+           snapshot = SnapshotAny;
+
+       scan = heap_beginscan_strat(heapRelation,   /* relation */
+                                   snapshot,   /* snapshot */
+                                   0,  /* number of keys */
+                                   NULL,   /* scan key */
+                                   true,   /* buffer access strategy OK */
+                                   allow_sync);    /* syncscan OK? */
    }
    else
    {
-       snapshot = SnapshotAny;
-       /* okay to ignore lazy VACUUMs here */
-       OldestXmin = GetOldestXmin(heapRelation, PROCARRAY_FLAGS_VACUUM);
+       /*
+        * Parallel index build.
+        *
+        * Parallel case never registers/unregisters own snapshot.  Snapshot
+        * is taken from parallel heap scan, and is SnapshotAny or an MVCC
+        * snapshot, based on same criteria as serial case.
+        */
+       Assert(!IsBootstrapProcessingMode());
+       Assert(allow_sync);
+       snapshot = scan->rs_snapshot;
    }
 
-   scan = heap_beginscan_strat(heapRelation,   /* relation */
-                               snapshot,   /* snapshot */
-                               0,  /* number of keys */
-                               NULL,   /* scan key */
-                               true,   /* buffer access strategy OK */
-                               allow_sync);    /* syncscan OK? */
+   /*
+    * Must call GetOldestXmin() with SnapshotAny.  Should never call
+    * GetOldestXmin() with MVCC snapshot. (It's especially worth checking
+    * this for parallel builds, since ambuild routines that support parallel
+    * builds must work these details out for themselves.)
+    */
+   Assert(snapshot == SnapshotAny || IsMVCCSnapshot(snapshot));
+   Assert(snapshot == SnapshotAny ? TransactionIdIsValid(OldestXmin) :
+          !TransactionIdIsValid(OldestXmin));
+   Assert(snapshot == SnapshotAny || !anyvisible);
 
    /* set our scan endpoints */
    if (!allow_sync)
 
    heap_endscan(scan);
 
-   /* we can now forget our snapshot, if set */
-   if (IsBootstrapProcessingMode() || indexInfo->ii_Concurrent)
+   /* we can now forget our snapshot, if set and registered by us */
+   if (need_unregister_snapshot)
        UnregisterSnapshot(snapshot);
 
    ExecDropSingleTupleTableSlot(slot);
    state.tuplesort = tuplesort_begin_datum(INT8OID, Int8LessOperator,
                                            InvalidOid, false,
                                            maintenance_work_mem,
-                                           false);
+                                           NULL, false);
    state.htups = state.itups = state.tups_inserted = 0;
 
    (void) index_bulk_delete(&ivinfo, NULL,
 
        /* Initialize the index and rebuild */
        /* Note: we do not need to re-establish pkey setting */
-       index_build(heapRelation, iRel, indexInfo, false, true);
+       index_build(heapRelation, iRel, indexInfo, false, true, true);
    }
    PG_CATCH();
    {
 static void
 ResetReindexProcessing(void)
 {
-   if (IsInParallelMode())
-       elog(ERROR, "cannot modify reindex state during a parallel operation");
+   /* This may be called in leader error path */
    currentlyReindexedHeap = InvalidOid;
    currentlyReindexedIndex = InvalidOid;
 }
 
    indexInfo->ii_ReadyForInserts = true;
    indexInfo->ii_Concurrent = false;
    indexInfo->ii_BrokenHotChain = false;
+   indexInfo->ii_ParallelWorkers = 0;
    indexInfo->ii_Am = BTREE_AM_OID;
    indexInfo->ii_AmCache = NULL;
    indexInfo->ii_Context = CurrentMemoryContext;
 
    /* Set up sorting if wanted */
    if (use_sort)
        tuplesort = tuplesort_begin_cluster(oldTupDesc, OldIndex,
-                                           maintenance_work_mem, false);
+                                           maintenance_work_mem,
+                                           NULL, false);
    else
        tuplesort = NULL;
 
 
     * this will typically require the caller to have already locked the
     * relation.  To avoid lock upgrade hazards, that lock should be at least
     * as strong as the one we take here.
+    *
+    * NB: If the lock strength here ever changes, code that is run by
+    * parallel workers under the control of certain particular ambuild
+    * functions will need to be updated, too.
     */
    lockmode = stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock;
    rel = heap_open(relationId, lockmode);
    indexInfo->ii_ReadyForInserts = !stmt->concurrent;
    indexInfo->ii_Concurrent = stmt->concurrent;
    indexInfo->ii_BrokenHotChain = false;
+   indexInfo->ii_ParallelWorkers = 0;
    indexInfo->ii_Am = accessMethodId;
    indexInfo->ii_AmCache = NULL;
    indexInfo->ii_Context = CurrentMemoryContext;
    indexInfo->ii_BrokenHotChain = false;
 
    /* Now build the index */
-   index_build(rel, indexRelation, indexInfo, stmt->primary, false);
+   index_build(rel, indexRelation, indexInfo, stmt->primary, false, true);
 
    /* Close both the relations, but keep the locks */
    heap_close(rel, NoLock);
 
    pstmt_data = ExecSerializePlan(planstate->plan, estate);
 
    /* Create a parallel context. */
-   pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
+   pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers, false);
    pei->pcxt = pcxt;
 
    /*
 
                                                  sortnode->collations,
                                                  sortnode->nullsFirst,
                                                  work_mem,
-                                                 false);
+                                                 NULL, false);
    }
 
    aggstate->current_phase = newphase;
                                      pertrans->sortOperators[0],
                                      pertrans->sortCollations[0],
                                      pertrans->sortNullsFirst[0],
-                                     work_mem, false);
+                                     work_mem, NULL, false);
        }
        else
            pertrans->sortstates[aggstate->current_set] =
                                     pertrans->sortOperators,
                                     pertrans->sortCollations,
                                     pertrans->sortNullsFirst,
-                                    work_mem, false);
+                                    work_mem, NULL, false);
    }
 
    /*
 
                                              plannode->collations,
                                              plannode->nullsFirst,
                                              work_mem,
-                                             node->randomAccess);
+                                             NULL, node->randomAccess);
        if (node->bounded)
            tuplesort_set_bound(tuplesortstate, node->bound);
        node->tuplesortstate = (void *) tuplesortstate;
 
 {
    int         parallel_workers;
 
-   parallel_workers = compute_parallel_worker(rel, rel->pages, -1);
+   parallel_workers = compute_parallel_worker(rel, rel->pages, -1,
+                                              max_parallel_workers_per_gather);
 
    /* If any limit was set to zero, the user doesn't want a parallel scan. */
    if (parallel_workers <= 0)
    pages_fetched = compute_bitmap_pages(root, rel, bitmapqual, 1.0,
                                         NULL, NULL);
 
-   parallel_workers = compute_parallel_worker(rel, pages_fetched, -1);
+   parallel_workers = compute_parallel_worker(rel, pages_fetched, -1,
+                                              max_parallel_workers_per_gather);
 
    if (parallel_workers <= 0)
        return;
  *
  * "index_pages" is the number of pages from the index that we expect to scan, or
  * -1 if we don't expect to scan any.
+ *
+ * "max_workers" is caller's limit on the number of workers.  This typically
+ * comes from a GUC.
  */
 int
-compute_parallel_worker(RelOptInfo *rel, double heap_pages, double index_pages)
+compute_parallel_worker(RelOptInfo *rel, double heap_pages, double index_pages,
+                       int max_workers)
 {
    int         parallel_workers = 0;
 
        }
    }
 
-   /*
-    * In no case use more than max_parallel_workers_per_gather workers.
-    */
-   parallel_workers = Min(parallel_workers, max_parallel_workers_per_gather);
+   /* In no case use more than caller supplied maximum number of workers */
+   parallel_workers = Min(parallel_workers, max_workers);
 
    return parallel_workers;
 }
 
         * order.
         */
        path->path.parallel_workers = compute_parallel_worker(baserel,
-                                                             rand_heap_pages, index_pages);
+                                                             rand_heap_pages,
+                                                             index_pages,
+                                                             max_parallel_workers_per_gather);
 
        /*
         * Fall out if workers can't be assigned for parallel scan, because in
 
    return (seqScanAndSortPath.total_cost < indexScanPath->path.total_cost);
 }
 
+/*
+ * plan_create_index_workers
+ *     Use the planner to decide how many parallel worker processes
+ *     CREATE INDEX should request for use
+ *
+ * tableOid is the table on which the index is to be built.  indexOid is the
+ * OID of an index to be created or reindexed (which must be a btree index).
+ *
+ * Return value is the number of parallel worker processes to request.  It
+ * may be unsafe to proceed if this is 0.  Note that this does not include the
+ * leader participating as a worker (value is always a number of parallel
+ * worker processes).
+ *
+ * Note: caller had better already hold some type of lock on the table and
+ * index.
+ */
+int
+plan_create_index_workers(Oid tableOid, Oid indexOid)
+{
+   PlannerInfo *root;
+   Query      *query;
+   PlannerGlobal *glob;
+   RangeTblEntry *rte;
+   Relation    heap;
+   Relation    index;
+   RelOptInfo *rel;
+   int         parallel_workers;
+   BlockNumber heap_blocks;
+   double      reltuples;
+   double      allvisfrac;
+
+   /* Return immediately when parallelism disabled */
+   if (max_parallel_maintenance_workers == 0)
+       return 0;
+
+   /* Set up largely-dummy planner state */
+   query = makeNode(Query);
+   query->commandType = CMD_SELECT;
+
+   glob = makeNode(PlannerGlobal);
+
+   root = makeNode(PlannerInfo);
+   root->parse = query;
+   root->glob = glob;
+   root->query_level = 1;
+   root->planner_cxt = CurrentMemoryContext;
+   root->wt_param_id = -1;
+
+   /*
+    * Build a minimal RTE.
+    *
+    * Set the target's table to be an inheritance parent.  This is a kludge
+    * that prevents problems within get_relation_info(), which does not
+    * expect that any IndexOptInfo is currently undergoing REINDEX.
+    */
+   rte = makeNode(RangeTblEntry);
+   rte->rtekind = RTE_RELATION;
+   rte->relid = tableOid;
+   rte->relkind = RELKIND_RELATION;    /* Don't be too picky. */
+   rte->lateral = false;
+   rte->inh = true;
+   rte->inFromCl = true;
+   query->rtable = list_make1(rte);
+
+   /* Set up RTE/RelOptInfo arrays */
+   setup_simple_rel_arrays(root);
+
+   /* Build RelOptInfo */
+   rel = build_simple_rel(root, 1, NULL);
+
+   heap = heap_open(tableOid, NoLock);
+   index = index_open(indexOid, NoLock);
+
+   /*
+    * Determine if it's safe to proceed.
+    *
+    * Currently, parallel workers can't access the leader's temporary tables.
+    * Furthermore, any index predicate or index expressions must be parallel
+    * safe.
+    */
+   if (heap->rd_rel->relpersistence == RELPERSISTENCE_TEMP ||
+       !is_parallel_safe(root, (Node *) RelationGetIndexExpressions(index)) ||
+       !is_parallel_safe(root, (Node *) RelationGetIndexPredicate(index)))
+   {
+       parallel_workers = 0;
+       goto done;
+   }
+
+   /*
+    * If parallel_workers storage parameter is set for the table, accept that
+    * as the number of parallel worker processes to launch (though still cap
+    * at max_parallel_maintenance_workers).  Note that we deliberately do not
+    * consider any other factor when parallel_workers is set. (e.g., memory
+    * use by workers.)
+    */
+   if (rel->rel_parallel_workers != -1)
+   {
+       parallel_workers = Min(rel->rel_parallel_workers,
+                              max_parallel_maintenance_workers);
+       goto done;
+   }
+
+   /*
+    * Estimate heap relation size ourselves, since rel->pages cannot be
+    * trusted (heap RTE was marked as inheritance parent)
+    */
+   estimate_rel_size(heap, NULL, &heap_blocks, &reltuples, &allvisfrac);
+
+   /*
+    * Determine number of workers to scan the heap relation using generic
+    * model
+    */
+   parallel_workers = compute_parallel_worker(rel, heap_blocks, -1,
+                                              max_parallel_maintenance_workers);
+
+   /*
+    * Cap workers based on available maintenance_work_mem as needed.
+    *
+    * Note that each tuplesort participant receives an even share of the
+    * total maintenance_work_mem budget.  Aim to leave participants
+    * (including the leader as a participant) with no less than 32MB of
+    * memory.  This leaves cases where maintenance_work_mem is set to 64MB
+    * immediately past the threshold of being capable of launching a single
+    * parallel worker to sort.
+    */
+   while (parallel_workers > 0 &&
+          maintenance_work_mem / (parallel_workers + 1) < 32768L)
+       parallel_workers--;
+
+done:
+   index_close(index, NoLock);
+   heap_close(heap, NoLock);
+
+   return parallel_workers;
+}
+
 /*
  * get_partitioned_child_rels
  *     Returns a list of the RT indexes of the partitioned child relations
 
        case WAIT_EVENT_PARALLEL_BITMAP_SCAN:
            event_name = "ParallelBitmapScan";
            break;
+       case WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN:
+           event_name = "ParallelCreateIndexScan";
+           break;
        case WAIT_EVENT_PROCARRAY_GROUP_UPDATE:
            event_name = "ProcArrayGroupUpdate";
            break;
 
  * Open a file that was previously created in another backend (or this one)
  * with BufFileCreateShared in the same SharedFileSet using the same name.
  * The backend that created the file must have called BufFileClose() or
- * BufFileExport() to make sure that it is ready to be opened by other
+ * BufFileExportShared() to make sure that it is ready to be opened by other
  * backends and render it read-only.
  */
 BufFile *
 }
 
 #endif
+
+/*
+ * Return the current file size.  Counts any holes left behind by
+ * BufFileViewAppend as part of the size.
+ */
+off_t
+BufFileSize(BufFile *file)
+{
+   return ((file->numFiles - 1) * (off_t) MAX_PHYSICAL_FILESIZE) +
+       FileGetSize(file->files[file->numFiles - 1]);
+}
+
+/*
+ * Append the contents of source file (managed within shared fileset) to
+ * end of target file (managed within same shared fileset).
+ *
+ * Note that operation subsumes ownership of underlying resources from
+ * "source".  Caller should never call BufFileClose against source having
+ * called here first.  Resource owners for source and target must match,
+ * too.
+ *
+ * This operation works by manipulating lists of segment files, so the
+ * file content is always appended at a MAX_PHYSICAL_FILESIZE-aligned
+ * boundary, typically creating empty holes before the boundary.  These
+ * areas do not contain any interesting data, and cannot be read from by
+ * caller.
+ *
+ * Returns the block number within target where the contents of source
+ * begins.  Caller should apply this as an offset when working off block
+ * positions that are in terms of the original BufFile space.
+ */
+long
+BufFileAppend(BufFile *target, BufFile *source)
+{
+   long        startBlock = target->numFiles * BUFFILE_SEG_SIZE;
+   int         newNumFiles = target->numFiles + source->numFiles;
+   int         i;
+
+   Assert(target->fileset != NULL);
+   Assert(source->readOnly);
+   Assert(!source->dirty);
+   Assert(source->fileset != NULL);
+
+   if (target->resowner != source->resowner)
+       elog(ERROR, "could not append BufFile with non-matching resource owner");
+
+   target->files = (File *)
+       repalloc(target->files, sizeof(File) * newNumFiles);
+   target->offsets = (off_t *)
+       repalloc(target->offsets, sizeof(off_t) * newNumFiles);
+   for (i = target->numFiles; i < newNumFiles; i++)
+   {
+       target->files[i] = source->files[i - target->numFiles];
+       target->offsets[i] = 0L;
+   }
+   target->numFiles = newNumFiles;
+
+   return startBlock;
+}
 
    return VfdCache[file].fileMode;
 }
 
+/*
+ * FileGetSize - returns the size of file
+ */
+off_t
+FileGetSize(File file)
+{
+   Assert(FileIsValid(file));
+   return VfdCache[file].fileSize;
+}
+
 /*
  * Make room for another allocatedDescs[] array entry if needed and possible.
  * Returns true if an array element is available.
 
                                                   qstate->sortCollations,
                                                   qstate->sortNullsFirsts,
                                                   work_mem,
+                                                  NULL,
                                                   qstate->rescan_needed);
    else
        osastate->sortstate = tuplesort_begin_datum(qstate->sortColType,
                                                    qstate->sortCollation,
                                                    qstate->sortNullsFirst,
                                                    work_mem,
+                                                   NULL,
                                                    qstate->rescan_needed);
 
    osastate->number_of_rows = 0;
 
 bool       allowSystemTableMods = false;
 int            work_mem = 1024;
 int            maintenance_work_mem = 16384;
+int            max_parallel_maintenance_workers = 2;
 
 /*
  * Primary determinants of sizes of shared-memory structures.
 
        check_autovacuum_max_workers, NULL, NULL
    },
 
+   {
+       {"max_parallel_maintenance_workers", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
+           gettext_noop("Sets the maximum number of parallel processes per maintenance operation."),
+           NULL
+       },
+       &max_parallel_maintenance_workers,
+       2, 0, 1024,
+       NULL, NULL, NULL
+   },
+
    {
        {"max_parallel_workers_per_gather", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
            gettext_noop("Sets the maximum number of parallel processes per executor node."),
 
 
 #effective_io_concurrency = 1      # 1-1000; 0 disables prefetching
 #max_worker_processes = 8      # (change requires restart)
+#max_parallel_maintenance_workers = 2  # taken from max_parallel_workers
 #max_parallel_workers_per_gather = 2   # taken from max_parallel_workers
 #parallel_leader_participation = on
 #max_parallel_workers = 8      # maximum number of max_worker_processes that
-                   # can be used in parallel queries
+                   # can be used in parallel operations
 #old_snapshot_threshold = -1       # 1min-60d; -1 disables; 0 is immediate
                    # (change requires restart)
 #backend_flush_after = 0       # measured in pages, 0 disables
 
    probe query__done(const char *);
    probe statement__status(const char *);
 
-   probe sort__start(int, bool, int, int, bool);
+   probe sort__start(int, bool, int, int, bool, int);
    probe sort__done(bool, long);
 
    probe buffer__read__start(ForkNumber, BlockNumber, Oid, Oid, Oid, int, bool);
 
  * care that all calls for a single LogicalTapeSet are made in the same
  * palloc context.
  *
+ * To support parallel sort operations involving coordinated callers to
+ * tuplesort.c routines across multiple workers, it is necessary to
+ * concatenate each worker BufFile/tapeset into one single logical tapeset
+ * managed by the leader.  Workers should have produced one final
+ * materialized tape (their entire output) when this happens in leader.
+ * There will always be the same number of runs as input tapes, and the same
+ * number of input tapes as participants (worker Tuplesortstates).
+ *
  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
 #include "postgres.h"
 
 #include "storage/buffile.h"
+#include "utils/builtins.h"
 #include "utils/logtape.h"
 #include "utils/memutils.h"
 
     * a frozen tape.  (When reading from an unfrozen tape, we use a larger
     * read buffer that holds multiple blocks, so the "current" block is
     * ambiguous.)
+    *
+    * When concatenation of worker tape BufFiles is performed, an offset to
+    * the first block in the unified BufFile space is applied during reads.
     */
    long        firstBlockNumber;
    long        curBlockNumber;
    long        nextBlockNumber;
+   long        offsetBlockNumber;
 
    /*
     * Buffer for current data block(s).
     */
    char       *buffer;         /* physical buffer (separately palloc'd) */
    int         buffer_size;    /* allocated size of the buffer */
+   int         max_size;       /* highest useful, safe buffer_size */
    int         pos;            /* next read/write position in buffer */
    int         nbytes;         /* total # of valid bytes in buffer */
 } LogicalTape;
     * by ltsGetFreeBlock(), and it is always greater than or equal to
     * nBlocksWritten.  Blocks between nBlocksAllocated and nBlocksWritten are
     * blocks that have been allocated for a tape, but have not been written
-    * to the underlying file yet.
+    * to the underlying file yet.  nHoleBlocks tracks the total number of
+    * blocks that are in unused holes between worker spaces following BufFile
+    * concatenation.
     */
    long        nBlocksAllocated;   /* # of blocks allocated */
    long        nBlocksWritten; /* # of blocks used in underlying file */
+   long        nHoleBlocks;    /* # of "hole" blocks left */
 
    /*
     * We store the numbers of recycled-and-available blocks in freeBlocks[].
 static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
 static long ltsGetFreeBlock(LogicalTapeSet *lts);
 static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
+static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
+                    SharedFileSet *fileset);
 
 
 /*
     * previous tape isn't flushed to disk until the end of the sort, so you
     * get one-block hole, where the last block of the previous tape will
     * later go.
+    *
+    * Note that BufFile concatenation can leave "holes" in BufFile between
+    * worker-owned block ranges.  These are tracked for reporting purposes
+    * only.  We never read from nor write to these hole blocks, and so they
+    * are not considered here.
     */
    while (blocknum > lts->nBlocksWritten)
    {
    do
    {
        char       *thisbuf = lt->buffer + lt->nbytes;
+       long        datablocknum = lt->nextBlockNumber;
 
        /* Fetch next block number */
-       if (lt->nextBlockNumber == -1L)
+       if (datablocknum == -1L)
            break;              /* EOF */
+       /* Apply worker offset, needed for leader tapesets */
+       datablocknum += lt->offsetBlockNumber;
 
        /* Read the block */
-       ltsReadBlock(lts, lt->nextBlockNumber, (void *) thisbuf);
+       ltsReadBlock(lts, datablocknum, (void *) thisbuf);
        if (!lt->frozen)
-           ltsReleaseBlock(lts, lt->nextBlockNumber);
+           ltsReleaseBlock(lts, datablocknum);
        lt->curBlockNumber = lt->nextBlockNumber;
 
        lt->nbytes += TapeBlockGetNBytes(thisbuf);
        lts->blocksSorted = false;
 }
 
+/*
+ * Claim ownership of a set of logical tapes from existing shared BufFiles.
+ *
+ * Caller should be leader process.  Though tapes are marked as frozen in
+ * workers, they are not frozen when opened within leader, since unfrozen tapes
+ * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
+ * for random access.)
+ */
+static void
+ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
+                    SharedFileSet *fileset)
+{
+   LogicalTape *lt = NULL;
+   long        tapeblocks;
+   long        nphysicalblocks = 0L;
+   int         i;
+
+   /* Should have at least one worker tape, plus leader's tape */
+   Assert(lts->nTapes >= 2);
+
+   /*
+    * Build concatenated view of all BufFiles, remembering the block number
+    * where each source file begins.  No changes are needed for leader/last
+    * tape.
+    */
+   for (i = 0; i < lts->nTapes - 1; i++)
+   {
+       char        filename[MAXPGPATH];
+       BufFile    *file;
+
+       lt = <s->tapes[i];
+
+       pg_itoa(i, filename);
+       file = BufFileOpenShared(fileset, filename);
+
+       /*
+        * Stash first BufFile, and concatenate subsequent BufFiles to that.
+        * Store block offset into each tape as we go.
+        */
+       lt->firstBlockNumber = shared[i].firstblocknumber;
+       if (i == 0)
+       {
+           lts->pfile = file;
+           lt->offsetBlockNumber = 0L;
+       }
+       else
+       {
+           lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
+       }
+       /* Don't allocate more for read buffer than could possibly help */
+       lt->max_size = Min(MaxAllocSize, shared[i].buffilesize);
+       tapeblocks = shared[i].buffilesize / BLCKSZ;
+       nphysicalblocks += tapeblocks;
+   }
+
+   /*
+    * Set # of allocated blocks, as well as # blocks written.  Use extent of
+    * new BufFile space (from 0 to end of last worker's tape space) for this.
+    * Allocated/written blocks should include space used by holes left
+    * between concatenated BufFiles.
+    */
+   lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
+   lts->nBlocksWritten = lts->nBlocksAllocated;
+
+   /*
+    * Compute number of hole blocks so that we can later work backwards, and
+    * instrument number of physical blocks.  We don't simply use physical
+    * blocks directly for instrumentation because this would break if we ever
+    * subsequently wrote to worker tape.
+    *
+    * Working backwards like this keeps our options open.  If shared BufFiles
+    * ever support being written to post-export, logtape.c can automatically
+    * take advantage of that.  We'd then support writing to the leader tape
+    * while recycling space from worker tapes, because the leader tape has a
+    * zero offset (write routines won't need to have extra logic to apply an
+    * offset).
+    *
+    * The only thing that currently prevents writing to the leader tape from
+    * working is the fact that BufFiles opened using BufFileOpenShared() are
+    * read-only by definition, but that could be changed if it seemed
+    * worthwhile.  For now, writing to the leader tape will raise a "Bad file
+    * descriptor" error, so tuplesort must avoid writing to the leader tape
+    * altogether.
+    */
+   lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
+}
+
 /*
  * Create a set of logical tapes in a temporary underlying file.
  *
- * Each tape is initialized in write state.
+ * Each tape is initialized in write state.  Serial callers pass ntapes,
+ * NULL argument for shared, and -1 for worker.  Parallel worker callers
+ * pass ntapes, a shared file handle, NULL shared argument,  and their own
+ * worker number.  Leader callers, which claim shared worker tapes here,
+ * must supply non-sentinel values for all arguments except worker number,
+ * which should be -1.
+ *
+ * Leader caller is passing back an array of metadata each worker captured
+ * when LogicalTapeFreeze() was called for their final result tapes.  Passed
+ * tapes array is actually sized ntapes - 1, because it includes only
+ * worker tapes, whereas leader requires its own leader tape.  Note that we
+ * rely on the assumption that reclaimed worker tapes will only be read
+ * from once by leader, and never written to again (tapes are initialized
+ * for writing, but that's only to be consistent).  Leader may not write to
+ * its own tape purely due to a restriction in the shared buffile
+ * infrastructure that may be lifted in the future.
  */
 LogicalTapeSet *
-LogicalTapeSetCreate(int ntapes)
+LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset,
+                    int worker)
 {
    LogicalTapeSet *lts;
    LogicalTape *lt;
    Assert(ntapes > 0);
    lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) +
                                    ntapes * sizeof(LogicalTape));
-   lts->pfile = BufFileCreateTemp(false);
    lts->nBlocksAllocated = 0L;
    lts->nBlocksWritten = 0L;
+   lts->nHoleBlocks = 0L;
    lts->forgetFreeSpace = false;
    lts->blocksSorted = true;   /* a zero-length array is sorted ... */
    lts->freeBlocksLen = 32;    /* reasonable initial guess */
        lt->dirty = false;
        lt->firstBlockNumber = -1L;
        lt->curBlockNumber = -1L;
+       lt->nextBlockNumber = -1L;
+       lt->offsetBlockNumber = 0L;
        lt->buffer = NULL;
        lt->buffer_size = 0;
+       /* palloc() larger than MaxAllocSize would fail */
+       lt->max_size = MaxAllocSize;
        lt->pos = 0;
        lt->nbytes = 0;
    }
+
+   /*
+    * Create temp BufFile storage as required.
+    *
+    * Leader concatenates worker tapes, which requires special adjustment to
+    * final tapeset data.  Things are simpler for the worker case and the
+    * serial case, though.  They are generally very similar -- workers use a
+    * shared fileset, whereas serial sorts use a conventional serial BufFile.
+    */
+   if (shared)
+       ltsConcatWorkerTapes(lts, shared, fileset);
+   else if (fileset)
+   {
+       char        filename[MAXPGPATH];
+
+       pg_itoa(worker, filename);
+       lts->pfile = BufFileCreateShared(fileset, filename);
+   }
+   else
+       lts->pfile = BufFileCreateTemp(false);
+
    return lts;
 }
 
    Assert(tapenum >= 0 && tapenum < lts->nTapes);
    lt = <s->tapes[tapenum];
    Assert(lt->writing);
+   Assert(lt->offsetBlockNumber == 0L);
 
    /* Allocate data buffer and first block on first write */
    if (lt->buffer == NULL)
        if (buffer_size < BLCKSZ)
            buffer_size = BLCKSZ;
 
-       /*
-        * palloc() larger than MaxAllocSize would fail (a multi-gigabyte
-        * buffer is unlikely to be helpful, anyway)
-        */
-       if (buffer_size > MaxAllocSize)
-           buffer_size = MaxAllocSize;
+       /* palloc() larger than max_size is unlikely to be helpful */
+       if (buffer_size > lt->max_size)
+           buffer_size = lt->max_size;
 
        /* round down to BLCKSZ boundary */
        buffer_size -= buffer_size % BLCKSZ;
  * tape is rewound (after rewind is too late!).  It performs a rewind
  * and switch to read mode "for free".  An immediately following rewind-
  * for-read call is OK but not necessary.
+ *
+ * share output argument is set with details of storage used for tape after
+ * freezing, which may be passed to LogicalTapeSetCreate within leader
+ * process later.  This metadata is only of interest to worker callers
+ * freezing their final output for leader (single materialized tape).
+ * Serial sorts should set share to NULL.
  */
 void
-LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
+LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
 {
    LogicalTape *lt;
 
    Assert(tapenum >= 0 && tapenum < lts->nTapes);
    lt = <s->tapes[tapenum];
    Assert(lt->writing);
+   Assert(lt->offsetBlockNumber == 0L);
 
    /*
     * Completion of a write phase.  Flush last partial data block, and rewind
    else
        lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
    lt->nbytes = TapeBlockGetNBytes(lt->buffer);
+
+   /* Handle extra steps when caller is to share its tapeset */
+   if (share)
+   {
+       BufFileExportShared(lts->pfile);
+       share->firstblocknumber = lt->firstBlockNumber;
+       share->buffilesize = BufFileSize(lts->pfile);
+   }
 }
 
 /*
 
    Assert(tapenum >= 0 && tapenum < lts->nTapes);
    lt = <s->tapes[tapenum];
+   Assert(lt->offsetBlockNumber == 0L);
 
    /* With a larger buffer, 'pos' wouldn't be the same as offset within page */
    Assert(lt->buffer_size == BLCKSZ);
 long
 LogicalTapeSetBlocks(LogicalTapeSet *lts)
 {
-   return lts->nBlocksAllocated;
+   return lts->nBlocksAllocated - lts->nHoleBlocks;
 }
 
  * above.  Nonetheless, with large workMem we can have many tapes (but not
  * too many -- see the comments in tuplesort_merge_order).
  *
+ * This module supports parallel sorting.  Parallel sorts involve coordination
+ * among one or more worker processes, and a leader process, each with its own
+ * tuplesort state.  The leader process (or, more accurately, the
+ * Tuplesortstate associated with a leader process) creates a full tapeset
+ * consisting of worker tapes with one run to merge; a run for every
+ * worker process.  This is then merged.  Worker processes are guaranteed to
+ * produce exactly one output run from their partial input.
+ *
  *
  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
 #define DATUM_SORT     2
 #define CLUSTER_SORT   3
 
+/* Sort parallel code from state for sort__start probes */
+#define PARALLEL_SORT(state)   ((state)->shared == NULL ? 0 : \
+                                (state)->worker >= 0 ? 1 : 2)
+
 /* GUC variables */
 #ifdef TRACE_SORT
 bool       trace_sort = false;
    int         markpos_offset; /* saved "current", or offset in tape block */
    bool        markpos_eof;    /* saved "eof_reached" */
 
+   /*
+    * These variables are used during parallel sorting.
+    *
+    * worker is our worker identifier.  Follows the general convention that
+    * -1 value relates to a leader tuplesort, and values >= 0 worker
+    * tuplesorts. (-1 can also be a serial tuplesort.)
+    *
+    * shared is mutable shared memory state, which is used to coordinate
+    * parallel sorts.
+    *
+    * nParticipants is the number of worker Tuplesortstates known by the
+    * leader to have actually been launched, which implies that they must
+    * finish a run leader can merge.  Typically includes a worker state held
+    * by the leader process itself.  Set in the leader Tuplesortstate only.
+    */
+   int         worker;
+   Sharedsort *shared;
+   int         nParticipants;
+
    /*
     * The sortKeys variable is used by every case other than the hash index
     * case; it is set by tuplesort_begin_xxx.  tupDesc is only used by the
 #endif
 };
 
+/*
+ * Private mutable state of tuplesort-parallel-operation.  This is allocated
+ * in shared memory.
+ */
+struct Sharedsort
+{
+   /* mutex protects all fields prior to tapes */
+   slock_t     mutex;
+
+   /*
+    * currentWorker generates ordinal identifier numbers for parallel sort
+    * workers.  These start from 0, and are always gapless.
+    *
+    * Workers increment workersFinished to indicate having finished.  If this
+    * is equal to state.nParticipants within the leader, leader is ready to
+    * merge worker runs.
+    */
+   int         currentWorker;
+   int         workersFinished;
+
+   /* Temporary file space */
+   SharedFileSet fileset;
+
+   /* Size of tapes flexible array */
+   int         nTapes;
+
+   /*
+    * Tapes array used by workers to report back information needed by the
+    * leader to concatenate all worker tapes into one for merging
+    */
+   TapeShare   tapes[FLEXIBLE_ARRAY_MEMBER];
+};
+
 /*
  * Is the given tuple allocated from the slab memory arena?
  */
 #define LACKMEM(state)     ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
 #define USEMEM(state,amt)  ((state)->availMem -= (amt))
 #define FREEMEM(state,amt) ((state)->availMem += (amt))
+#define SERIAL(state)      ((state)->shared == NULL)
+#define WORKER(state)      ((state)->shared && (state)->worker != -1)
+#define LEADER(state)      ((state)->shared && (state)->worker == -1)
 
 /*
  * NOTES about on-tape representation of tuples:
    } while(0)
 
 
-static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
+static Tuplesortstate *tuplesort_begin_common(int workMem,
+                      SortCoordinate coordinate,
+                      bool randomAccess);
 static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
 static bool consider_abort_common(Tuplesortstate *state);
-static void inittapes(Tuplesortstate *state);
+static void inittapes(Tuplesortstate *state, bool mergeruns);
+static void inittapestate(Tuplesortstate *state, int maxTapes);
 static void selectnewtape(Tuplesortstate *state);
 static void init_slab_allocator(Tuplesortstate *state, int numSlots);
 static void mergeruns(Tuplesortstate *state);
               SortTuple *stup);
 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
              int tapenum, unsigned int len);
+static int worker_get_identifier(Tuplesortstate *state);
+static void worker_freeze_result_tape(Tuplesortstate *state);
+static void worker_nomergeruns(Tuplesortstate *state);
+static void leader_takeover_tapes(Tuplesortstate *state);
 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
 
 /*
  */
 
 static Tuplesortstate *
-tuplesort_begin_common(int workMem, bool randomAccess)
+tuplesort_begin_common(int workMem, SortCoordinate coordinate,
+                      bool randomAccess)
 {
    Tuplesortstate *state;
    MemoryContext sortcontext;
    MemoryContext tuplecontext;
    MemoryContext oldcontext;
 
+   /* See leader_takeover_tapes() remarks on randomAccess support */
+   if (coordinate && randomAccess)
+       elog(ERROR, "random access disallowed under parallel sort");
+
    /*
     * Create a working memory context for this sort operation. All data
     * needed by the sort will live inside this context.
    state->bounded = false;
    state->tuples = true;
    state->boundUsed = false;
-   state->allowedMem = workMem * (int64) 1024;
+
+   /*
+    * workMem is forced to be at least 64KB, the current minimum valid value
+    * for the work_mem GUC.  This is a defense against parallel sort callers
+    * that divide out memory among many workers in a way that leaves each
+    * with very little memory.
+    */
+   state->allowedMem = Max(workMem, 64) * (int64) 1024;
    state->availMem = state->allowedMem;
    state->sortcontext = sortcontext;
    state->tuplecontext = tuplecontext;
 
    state->result_tape = -1;    /* flag that result tape has not been formed */
 
+   /*
+    * Initialize parallel-related state based on coordination information
+    * from caller
+    */
+   if (!coordinate)
+   {
+       /* Serial sort */
+       state->shared = NULL;
+       state->worker = -1;
+       state->nParticipants = -1;
+   }
+   else if (coordinate->isWorker)
+   {
+       /* Parallel worker produces exactly one final run from all input */
+       state->shared = coordinate->sharedsort;
+       state->worker = worker_get_identifier(state);
+       state->nParticipants = -1;
+   }
+   else
+   {
+       /* Parallel leader state only used for final merge */
+       state->shared = coordinate->sharedsort;
+       state->worker = -1;
+       state->nParticipants = coordinate->nParticipants;
+       Assert(state->nParticipants >= 1);
+   }
+
    MemoryContextSwitchTo(oldcontext);
 
    return state;
                     int nkeys, AttrNumber *attNums,
                     Oid *sortOperators, Oid *sortCollations,
                     bool *nullsFirstFlags,
-                    int workMem, bool randomAccess)
+                    int workMem, SortCoordinate coordinate, bool randomAccess)
 {
-   Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+   Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                  randomAccess);
    MemoryContext oldcontext;
    int         i;
 
                                false,  /* no unique check */
                                nkeys,
                                workMem,
-                               randomAccess);
+                               randomAccess,
+                               PARALLEL_SORT(state));
 
    state->comparetup = comparetup_heap;
    state->copytup = copytup_heap;
 Tuplesortstate *
 tuplesort_begin_cluster(TupleDesc tupDesc,
                        Relation indexRel,
-                       int workMem, bool randomAccess)
+                       int workMem,
+                       SortCoordinate coordinate, bool randomAccess)
 {
-   Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+   Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                  randomAccess);
    ScanKey     indexScanKey;
    MemoryContext oldcontext;
    int         i;
                                false,  /* no unique check */
                                state->nKeys,
                                workMem,
-                               randomAccess);
+                               randomAccess,
+                               PARALLEL_SORT(state));
 
    state->comparetup = comparetup_cluster;
    state->copytup = copytup_cluster;
 tuplesort_begin_index_btree(Relation heapRel,
                            Relation indexRel,
                            bool enforceUnique,
-                           int workMem, bool randomAccess)
+                           int workMem,
+                           SortCoordinate coordinate,
+                           bool randomAccess)
 {
-   Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+   Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                  randomAccess);
    ScanKey     indexScanKey;
    MemoryContext oldcontext;
    int         i;
                                enforceUnique,
                                state->nKeys,
                                workMem,
-                               randomAccess);
+                               randomAccess,
+                               PARALLEL_SORT(state));
 
    state->comparetup = comparetup_index_btree;
    state->copytup = copytup_index;
                           uint32 high_mask,
                           uint32 low_mask,
                           uint32 max_buckets,
-                          int workMem, bool randomAccess)
+                          int workMem,
+                          SortCoordinate coordinate,
+                          bool randomAccess)
 {
-   Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+   Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                  randomAccess);
    MemoryContext oldcontext;
 
    oldcontext = MemoryContextSwitchTo(state->sortcontext);
 
 Tuplesortstate *
 tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
-                     bool nullsFirstFlag,
-                     int workMem, bool randomAccess)
+                     bool nullsFirstFlag, int workMem,
+                     SortCoordinate coordinate, bool randomAccess)
 {
-   Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+   Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                  randomAccess);
    MemoryContext oldcontext;
    int16       typlen;
    bool        typbyval;
                                false,  /* no unique check */
                                1,
                                workMem,
-                               randomAccess);
+                               randomAccess,
+                               PARALLEL_SORT(state));
 
    state->comparetup = comparetup_datum;
    state->copytup = copytup_datum;
  * delayed calls at the moment.)
  *
  * This is a hint only. The tuplesort may still return more tuples than
- * requested.
+ * requested.  Parallel leader tuplesorts will always ignore the hint.
  */
 void
 tuplesort_set_bound(Tuplesortstate *state, int64 bound)
    Assert(state->status == TSS_INITIAL);
    Assert(state->memtupcount == 0);
    Assert(!state->bounded);
+   Assert(!WORKER(state));
 
 #ifdef DEBUG_BOUNDED_SORT
    /* Honor GUC setting that disables the feature (for easy testing) */
        return;
 #endif
 
+   /* Parallel leader ignores hint */
+   if (LEADER(state))
+       return;
+
    /* We want to be able to compute bound * 2, so limit the setting */
    if (bound > (int64) (INT_MAX / 2))
        return;
    if (trace_sort)
    {
        if (state->tapeset)
-           elog(LOG, "external sort ended, %ld disk blocks used: %s",
-                spaceUsed, pg_rusage_show(&state->ru_start));
+           elog(LOG, "%s of %d ended, %ld disk blocks used: %s",
+                SERIAL(state) ? "external sort" : "parallel external sort",
+                state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
        else
-           elog(LOG, "internal sort ended, %ld KB used: %s",
-                spaceUsed, pg_rusage_show(&state->ru_start));
+           elog(LOG, "%s of %d ended, %ld KB used: %s",
+                SERIAL(state) ? "internal sort" : "unperformed parallel sort",
+                state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
    }
 
    TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
 static void
 puttuple_common(Tuplesortstate *state, SortTuple *tuple)
 {
+   Assert(!LEADER(state));
+
    switch (state->status)
    {
        case TSS_INITIAL:
            /*
             * Nope; time to switch to tape-based operation.
             */
-           inittapes(state);
+           inittapes(state, true);
 
            /*
             * Dump all tuples.
 
 #ifdef TRACE_SORT
    if (trace_sort)
-       elog(LOG, "performsort starting: %s",
-            pg_rusage_show(&state->ru_start));
+       elog(LOG, "performsort of %d starting: %s",
+            state->worker, pg_rusage_show(&state->ru_start));
 #endif
 
    switch (state->status)
 
            /*
             * We were able to accumulate all the tuples within the allowed
-            * amount of memory.  Just qsort 'em and we're done.
+            * amount of memory, or leader to take over worker tapes
             */
-           tuplesort_sort_memtuples(state);
+           if (SERIAL(state))
+           {
+               /* Just qsort 'em and we're done */
+               tuplesort_sort_memtuples(state);
+               state->status = TSS_SORTEDINMEM;
+           }
+           else if (WORKER(state))
+           {
+               /*
+                * Parallel workers must still dump out tuples to tape.  No
+                * merge is required to produce single output run, though.
+                */
+               inittapes(state, false);
+               dumptuples(state, true);
+               worker_nomergeruns(state);
+               state->status = TSS_SORTEDONTAPE;
+           }
+           else
+           {
+               /*
+                * Leader will take over worker tapes and merge worker runs.
+                * Note that mergeruns sets the correct state->status.
+                */
+               leader_takeover_tapes(state);
+               mergeruns(state);
+           }
            state->current = 0;
            state->eof_reached = false;
+           state->markpos_block = 0L;
            state->markpos_offset = 0;
            state->markpos_eof = false;
-           state->status = TSS_SORTEDINMEM;
            break;
 
        case TSS_BOUNDED:
            /*
             * Finish tape-based sort.  First, flush all tuples remaining in
             * memory out to tape; then merge until we have a single remaining
-            * run (or, if !randomAccess, one run per tape). Note that
-            * mergeruns sets the correct state->status.
+            * run (or, if !randomAccess and !WORKER(), one run per tape).
+            * Note that mergeruns sets the correct state->status.
             */
            dumptuples(state, true);
            mergeruns(state);
    if (trace_sort)
    {
        if (state->status == TSS_FINALMERGE)
-           elog(LOG, "performsort done (except %d-way final merge): %s",
-                state->activeTapes,
+           elog(LOG, "performsort of %d done (except %d-way final merge): %s",
+                state->worker, state->activeTapes,
                 pg_rusage_show(&state->ru_start));
        else
-           elog(LOG, "performsort done: %s",
-                pg_rusage_show(&state->ru_start));
+           elog(LOG, "performsort of %d done: %s",
+                state->worker, pg_rusage_show(&state->ru_start));
    }
 #endif
 
    unsigned int tuplen;
    size_t      nmoved;
 
+   Assert(!WORKER(state));
+
    switch (state->status)
    {
        case TSS_SORTEDINMEM:
     */
    Assert(forward);
    Assert(ntuples >= 0);
+   Assert(!WORKER(state));
 
    switch (state->status)
    {
 /*
  * inittapes - initialize for tape sorting.
  *
- * This is called only if we have found we don't have room to sort in memory.
+ * This is called only if we have found we won't sort in memory.
  */
 static void
-inittapes(Tuplesortstate *state)
+inittapes(Tuplesortstate *state, bool mergeruns)
 {
    int         maxTapes,
                j;
-   int64       tapeSpace;
 
-   /* Compute number of tapes to use: merge order plus 1 */
-   maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
+   Assert(!LEADER(state));
 
-   state->maxTapes = maxTapes;
-   state->tapeRange = maxTapes - 1;
+   if (mergeruns)
+   {
+       /* Compute number of tapes to use: merge order plus 1 */
+       maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
+   }
+   else
+   {
+       /* Workers can sometimes produce single run, output without merge */
+       Assert(WORKER(state));
+       maxTapes = MINORDER + 1;
+   }
 
 #ifdef TRACE_SORT
    if (trace_sort)
-       elog(LOG, "switching to external sort with %d tapes: %s",
-            maxTapes, pg_rusage_show(&state->ru_start));
+       elog(LOG, "%d switching to external sort with %d tapes: %s",
+            state->worker, maxTapes, pg_rusage_show(&state->ru_start));
 #endif
 
-   /*
-    * Decrease availMem to reflect the space needed for tape buffers, when
-    * writing the initial runs; but don't decrease it to the point that we
-    * have no room for tuples.  (That case is only likely to occur if sorting
-    * pass-by-value Datums; in all other scenarios the memtuples[] array is
-    * unlikely to occupy more than half of allowedMem.  In the pass-by-value
-    * case it's not important to account for tuple space, so we don't care if
-    * LACKMEM becomes inaccurate.)
-    */
-   tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
-
-   if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
-       USEMEM(state, tapeSpace);
-
-   /*
-    * Make sure that the temp file(s) underlying the tape set are created in
-    * suitable temp tablespaces.
-    */
-   PrepareTempTablespaces();
-
-   /*
-    * Create the tape set and allocate the per-tape data arrays.
-    */
-   state->tapeset = LogicalTapeSetCreate(maxTapes);
-
-   state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
-   state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
-   state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
-   state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
-   state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
+   /* Create the tape set and allocate the per-tape data arrays */
+   inittapestate(state, maxTapes);
+   state->tapeset =
+       LogicalTapeSetCreate(maxTapes, NULL,
+                            state->shared ? &state->shared->fileset : NULL,
+                            state->worker);
 
    state->currentRun = 0;
 
    state->status = TSS_BUILDRUNS;
 }
 
+/*
+ * inittapestate - initialize generic tape management state
+ */
+static void
+inittapestate(Tuplesortstate *state, int maxTapes)
+{
+   int64       tapeSpace;
+
+   /*
+    * Decrease availMem to reflect the space needed for tape buffers; but
+    * don't decrease it to the point that we have no room for tuples. (That
+    * case is only likely to occur if sorting pass-by-value Datums; in all
+    * other scenarios the memtuples[] array is unlikely to occupy more than
+    * half of allowedMem.  In the pass-by-value case it's not important to
+    * account for tuple space, so we don't care if LACKMEM becomes
+    * inaccurate.)
+    */
+   tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
+
+   if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
+       USEMEM(state, tapeSpace);
+
+   /*
+    * Make sure that the temp file(s) underlying the tape set are created in
+    * suitable temp tablespaces.  For parallel sorts, this should have been
+    * called already, but it doesn't matter if it is called a second time.
+    */
+   PrepareTempTablespaces();
+
+   state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
+   state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
+   state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
+   state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
+   state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
+
+   /* Record # of tapes allocated (for duration of sort) */
+   state->maxTapes = maxTapes;
+   /* Record maximum # of tapes usable as inputs when merging */
+   state->tapeRange = maxTapes - 1;
+}
+
 /*
  * selectnewtape -- select new tape for new initial run.
  *
     */
 #ifdef TRACE_SORT
    if (trace_sort)
-       elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
-            (state->availMem) / 1024, numInputTapes);
+       elog(LOG, "%d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
+            state->worker, state->availMem / 1024, numInputTapes);
 #endif
 
    state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
         * pass remains.  If we don't have to produce a materialized sorted
         * tape, we can stop at this point and do the final merge on-the-fly.
         */
-       if (!state->randomAccess)
+       if (!state->randomAccess && !WORKER(state))
        {
            bool        allOneRun = true;
 
     * a waste of cycles anyway...
     */
    state->result_tape = state->tp_tapenum[state->tapeRange];
-   LogicalTapeFreeze(state->tapeset, state->result_tape);
+   if (!WORKER(state))
+       LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
+   else
+       worker_freeze_result_tape(state);
    state->status = TSS_SORTEDONTAPE;
 
    /* Release the read buffers of all the other tapes, by rewinding them. */
 
 #ifdef TRACE_SORT
    if (trace_sort)
-       elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
-            pg_rusage_show(&state->ru_start));
+       elog(LOG, "%d finished %d-way merge step: %s", state->worker,
+            state->activeTapes, pg_rusage_show(&state->ru_start));
 #endif
 }
 
 
 #ifdef TRACE_SORT
    if (trace_sort)
-       elog(LOG, "starting quicksort of run %d: %s",
-            state->currentRun, pg_rusage_show(&state->ru_start));
+       elog(LOG, "%d starting quicksort of run %d: %s",
+            state->worker, state->currentRun,
+            pg_rusage_show(&state->ru_start));
 #endif
 
    /*
 
 #ifdef TRACE_SORT
    if (trace_sort)
-       elog(LOG, "finished quicksort of run %d: %s",
-            state->currentRun, pg_rusage_show(&state->ru_start));
+       elog(LOG, "%d finished quicksort of run %d: %s",
+            state->worker, state->currentRun,
+            pg_rusage_show(&state->ru_start));
 #endif
 
    memtupwrite = state->memtupcount;
 
 #ifdef TRACE_SORT
    if (trace_sort)
-       elog(LOG, "finished writing run %d to tape %d: %s",
-            state->currentRun, state->destTape,
+       elog(LOG, "%d finished writing run %d to tape %d: %s",
+            state->worker, state->currentRun, state->destTape,
             pg_rusage_show(&state->ru_start));
 #endif
 
    Assert(state->status == TSS_INITIAL);
    Assert(state->bounded);
    Assert(tupcount >= state->bound);
+   Assert(SERIAL(state));
 
    /* Reverse sort direction so largest entry will be at root */
    reversedirection(state);
    Assert(state->status == TSS_BOUNDED);
    Assert(state->bounded);
    Assert(tupcount == state->bound);
+   Assert(SERIAL(state));
 
    /*
     * We can unheapify in place because each delete-top call will remove the
 static void
 tuplesort_sort_memtuples(Tuplesortstate *state)
 {
+   Assert(!LEADER(state));
+
    if (state->memtupcount > 1)
    {
        /* Can we use the single-key sort function? */
                             &tuplen, sizeof(tuplen));
 }
 
+/*
+ * Parallel sort routines
+ */
+
+/*
+ * tuplesort_estimate_shared - estimate required shared memory allocation
+ *
+ * nWorkers is an estimate of the number of workers (it's the number that
+ * will be requested).
+ */
+Size
+tuplesort_estimate_shared(int nWorkers)
+{
+   Size        tapesSize;
+
+   Assert(nWorkers > 0);
+
+   /* Make sure that BufFile shared state is MAXALIGN'd */
+   tapesSize = mul_size(sizeof(TapeShare), nWorkers);
+   tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
+
+   return tapesSize;
+}
+
+/*
+ * tuplesort_initialize_shared - initialize shared tuplesort state
+ *
+ * Must be called from leader process before workers are launched, to
+ * establish state needed up-front for worker tuplesortstates.  nWorkers
+ * should match the argument passed to tuplesort_estimate_shared().
+ */
+void
+tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
+{
+   int         i;
+
+   Assert(nWorkers > 0);
+
+   SpinLockInit(&shared->mutex);
+   shared->currentWorker = 0;
+   shared->workersFinished = 0;
+   SharedFileSetInit(&shared->fileset, seg);
+   shared->nTapes = nWorkers;
+   for (i = 0; i < nWorkers; i++)
+   {
+       shared->tapes[i].firstblocknumber = 0L;
+       shared->tapes[i].buffilesize = 0;
+   }
+}
+
+/*
+ * tuplesort_attach_shared - attach to shared tuplesort state
+ *
+ * Must be called by all worker processes.
+ */
+void
+tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
+{
+   /* Attach to SharedFileSet */
+   SharedFileSetAttach(&shared->fileset, seg);
+}
+
+/*
+ * worker_get_identifier - Assign and return ordinal identifier for worker
+ *
+ * The order in which these are assigned is not well defined, and should not
+ * matter; worker numbers across parallel sort participants need only be
+ * distinct and gapless.  logtape.c requires this.
+ *
+ * Note that the identifiers assigned from here have no relation to
+ * ParallelWorkerNumber number, to avoid making any assumption about
+ * caller's requirements.  However, we do follow the ParallelWorkerNumber
+ * convention of representing a non-worker with worker number -1.  This
+ * includes the leader, as well as serial Tuplesort processes.
+ */
+static int
+worker_get_identifier(Tuplesortstate *state)
+{
+   Sharedsort *shared = state->shared;
+   int         worker;
+
+   Assert(WORKER(state));
+
+   SpinLockAcquire(&shared->mutex);
+   worker = shared->currentWorker++;
+   SpinLockRelease(&shared->mutex);
+
+   return worker;
+}
+
+/*
+ * worker_freeze_result_tape - freeze worker's result tape for leader
+ *
+ * This is called by workers just after the result tape has been determined,
+ * instead of calling LogicalTapeFreeze() directly.  They do so because
+ * workers require a few additional steps over similar serial
+ * TSS_SORTEDONTAPE external sort cases, which also happen here.  The extra
+ * steps are around freeing now unneeded resources, and representing to
+ * leader that worker's input run is available for its merge.
+ *
+ * There should only be one final output run for each worker, which consists
+ * of all tuples that were originally input into worker.
+ */
+static void
+worker_freeze_result_tape(Tuplesortstate *state)
+{
+   Sharedsort *shared = state->shared;
+   TapeShare   output;
+
+   Assert(WORKER(state));
+   Assert(state->result_tape != -1);
+   Assert(state->memtupcount == 0);
+
+   /*
+    * Free most remaining memory, in case caller is sensitive to our holding
+    * on to it.  memtuples may not be a tiny merge heap at this point.
+    */
+   pfree(state->memtuples);
+   /* Be tidy */
+   state->memtuples = NULL;
+   state->memtupsize = 0;
+
+   /*
+    * Parallel worker requires result tape metadata, which is to be stored in
+    * shared memory for leader
+    */
+   LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
+
+   /* Store properties of output tape, and update finished worker count */
+   SpinLockAcquire(&shared->mutex);
+   shared->tapes[state->worker] = output;
+   shared->workersFinished++;
+   SpinLockRelease(&shared->mutex);
+}
+
+/*
+ * worker_nomergeruns - dump memtuples in worker, without merging
+ *
+ * This called as an alternative to mergeruns() with a worker when no
+ * merging is required.
+ */
+static void
+worker_nomergeruns(Tuplesortstate *state)
+{
+   Assert(WORKER(state));
+   Assert(state->result_tape == -1);
+
+   state->result_tape = state->tp_tapenum[state->destTape];
+   worker_freeze_result_tape(state);
+}
+
+/*
+ * leader_takeover_tapes - create tapeset for leader from worker tapes
+ *
+ * So far, leader Tuplesortstate has performed no actual sorting.  By now, all
+ * sorting has occurred in workers, all of which must have already returned
+ * from tuplesort_performsort().
+ *
+ * When this returns, leader process is left in a state that is virtually
+ * indistinguishable from it having generated runs as a serial external sort
+ * might have.
+ */
+static void
+leader_takeover_tapes(Tuplesortstate *state)
+{
+   Sharedsort *shared = state->shared;
+   int         nParticipants = state->nParticipants;
+   int         workersFinished;
+   int         j;
+
+   Assert(LEADER(state));
+   Assert(nParticipants >= 1);
+
+   SpinLockAcquire(&shared->mutex);
+   workersFinished = shared->workersFinished;
+   SpinLockRelease(&shared->mutex);
+
+   if (nParticipants != workersFinished)
+       elog(ERROR, "cannot take over tapes before all workers finish");
+
+   /*
+    * Create the tapeset from worker tapes, including a leader-owned tape at
+    * the end.  Parallel workers are far more expensive than logical tapes,
+    * so the number of tapes allocated here should never be excessive.
+    *
+    * We still have a leader tape, though it's not possible to write to it
+    * due to restrictions in the shared fileset infrastructure used by
+    * logtape.c.  It will never be written to in practice because
+    * randomAccess is disallowed for parallel sorts.
+    */
+   inittapestate(state, nParticipants + 1);
+   state->tapeset = LogicalTapeSetCreate(nParticipants + 1, shared->tapes,
+                                         &shared->fileset, state->worker);
+
+   /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
+   state->currentRun = nParticipants;
+
+   /*
+    * Initialize variables of Algorithm D to be consistent with runs from
+    * workers having been generated in the leader.
+    *
+    * There will always be exactly 1 run per worker, and exactly one input
+    * tape per run, because workers always output exactly 1 run, even when
+    * there were no input tuples for workers to sort.
+    */
+   for (j = 0; j < state->maxTapes; j++)
+   {
+       /* One real run; no dummy runs for worker tapes */
+       state->tp_fib[j] = 1;
+       state->tp_runs[j] = 1;
+       state->tp_dummy[j] = 0;
+       state->tp_tapenum[j] = j;
+   }
+   /* Leader tape gets one dummy run, and no real runs */
+   state->tp_fib[state->tapeRange] = 0;
+   state->tp_runs[state->tapeRange] = 0;
+   state->tp_dummy[state->tapeRange] = 1;
+
+   state->Level = 1;
+   state->destTape = 0;
+
+   state->status = TSS_BUILDRUNS;
+}
+
 /*
  * Convenience routine to free a tuple previously loaded into sort memory
  */
 
 #include "catalog/pg_index.h"
 #include "lib/stringinfo.h"
 #include "storage/bufmgr.h"
+#include "storage/shm_toc.h"
 
 /* There's room for a 16-bit vacuum cycle ID in BTPageOpaqueData */
 typedef uint16 BTCycleId;
 /*
  * external entry points for btree, in nbtree.c
  */
-extern IndexBuildResult *btbuild(Relation heap, Relation index,
-       struct IndexInfo *indexInfo);
 extern void btbuildempty(Relation index);
 extern bool btinsert(Relation rel, Datum *values, bool *isnull,
         ItemPointer ht_ctid, Relation heapRel,
 /*
  * prototypes for functions in nbtsort.c
  */
-typedef struct BTSpool BTSpool; /* opaque type known only within nbtsort.c */
-
-extern BTSpool *_bt_spoolinit(Relation heap, Relation index,
-             bool isunique, bool isdead);
-extern void _bt_spooldestroy(BTSpool *btspool);
-extern void _bt_spool(BTSpool *btspool, ItemPointer self,
-         Datum *values, bool *isnull);
-extern void _bt_leafbuild(BTSpool *btspool, BTSpool *spool2);
+extern IndexBuildResult *btbuild(Relation heap, Relation index,
+       struct IndexInfo *indexInfo);
+extern void _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc);
 
 #endif                         /* NBTREE_H */
 
 
 #define        IsParallelWorker()      (ParallelWorkerNumber >= 0)
 
-extern ParallelContext *CreateParallelContext(const char *library_name, const char *function_name, int nworkers);
+extern ParallelContext *CreateParallelContext(const char *library_name,
+                     const char *function_name, int nworkers,
+                     bool serializable_okay);
 extern void InitializeParallelDSM(ParallelContext *pcxt);
 extern void ReinitializeParallelDSM(ParallelContext *pcxt);
 extern void LaunchParallelWorkers(ParallelContext *pcxt);
 
    BlockNumber phs_startblock; /* starting block number */
    pg_atomic_uint64 phs_nallocated;    /* number of blocks allocated to
                                         * workers so far. */
+   bool        phs_snapshot_any;   /* SnapshotAny, not phs_snapshot_data? */
    char        phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
 }          ParallelHeapScanDescData;
 
 
            Relation indexRelation,
            IndexInfo *indexInfo,
            bool isprimary,
-           bool isreindex);
+           bool isreindex,
+           bool parallel);
 
 extern double IndexBuildHeapScan(Relation heapRelation,
                   Relation indexRelation,
                   IndexInfo *indexInfo,
                   bool allow_sync,
                   IndexBuildCallback callback,
-                  void *callback_state);
+                  void *callback_state,
+                  HeapScanDesc scan);
 extern double IndexBuildHeapRangeScan(Relation heapRelation,
                        Relation indexRelation,
                        IndexInfo *indexInfo,
                        BlockNumber start_blockno,
                        BlockNumber end_blockno,
                        IndexBuildCallback callback,
-                       void *callback_state);
+                       void *callback_state,
+                       HeapScanDesc scan);
 
 extern void validate_index(Oid heapId, Oid indexId, Snapshot snapshot);
 
 
 extern PGDLLIMPORT bool allowSystemTableMods;
 extern PGDLLIMPORT int work_mem;
 extern PGDLLIMPORT int maintenance_work_mem;
+extern PGDLLIMPORT int max_parallel_maintenance_workers;
 
 extern int VacuumCostPageHit;
 extern int VacuumCostPageMiss;
 
  *     ReadyForInserts     is it valid for inserts?
  *     Concurrent          are we doing a concurrent index build?
  *     BrokenHotChain      did we detect any broken HOT chains?
+ *     ParallelWorkers     # of workers requested (excludes leader)
  *     AmCache             private cache area for index AM
  *     Context             memory context holding this IndexInfo
  *
- * ii_Concurrent and ii_BrokenHotChain are used only during index build;
- * they're conventionally set to false otherwise.
+ * ii_Concurrent, ii_BrokenHotChain, and ii_ParallelWorkers are used only
+ * during index build; they're conventionally zeroed otherwise.
  * ----------------
  */
 typedef struct IndexInfo
    bool        ii_ReadyForInserts;
    bool        ii_Concurrent;
    bool        ii_BrokenHotChain;
+   int         ii_ParallelWorkers;
    Oid         ii_Am;
    void       *ii_AmCache;
    MemoryContext ii_Context;
 
 
 extern void generate_gather_paths(PlannerInfo *root, RelOptInfo *rel);
 extern int compute_parallel_worker(RelOptInfo *rel, double heap_pages,
-                       double index_pages);
+                       double index_pages, int max_workers);
 extern void create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel,
                            Path *bitmapqual);
 extern void generate_partition_wise_join_paths(PlannerInfo *root,
 
 extern Expr *preprocess_phv_expression(PlannerInfo *root, Expr *expr);
 
 extern bool plan_cluster_use_sort(Oid tableOid, Oid indexOid);
+extern int plan_create_index_workers(Oid tableOid, Oid indexOid);
 
 extern List *get_partitioned_child_rels(PlannerInfo *root, Index rti,
                           bool *part_cols_updated);
 
    WAIT_EVENT_MQ_SEND,
    WAIT_EVENT_PARALLEL_FINISH,
    WAIT_EVENT_PARALLEL_BITMAP_SCAN,
+   WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN,
    WAIT_EVENT_PROCARRAY_GROUP_UPDATE,
    WAIT_EVENT_CLOG_GROUP_UPDATE,
    WAIT_EVENT_REPLICATION_ORIGIN_DROP,
 
 extern int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence);
 extern void BufFileTell(BufFile *file, int *fileno, off_t *offset);
 extern int BufFileSeekBlock(BufFile *file, long blknum);
+extern off_t BufFileSize(BufFile *file);
+extern long BufFileAppend(BufFile *target, BufFile *source);
 
 extern BufFile *BufFileCreateShared(SharedFileSet *fileset, const char *name);
 extern void BufFileExportShared(BufFile *file);
 
 extern int FileGetRawDesc(File file);
 extern int FileGetRawFlags(File file);
 extern mode_t FileGetRawMode(File file);
+extern off_t FileGetSize(File file);
 
 /* Operations used for sharing named temporary files */
 extern File PathNameCreateTemporaryFile(const char *name, bool error_on_failure);
 
 #ifndef LOGTAPE_H
 #define LOGTAPE_H
 
+#include "storage/sharedfileset.h"
+
 /* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */
 
 typedef struct LogicalTapeSet LogicalTapeSet;
 
+/*
+ * The approach tuplesort.c takes to parallel external sorts is that workers,
+ * whose state is almost the same as independent serial sorts, are made to
+ * produce a final materialized tape of sorted output in all cases.  This is
+ * frozen, just like any case requiring a final materialized tape.  However,
+ * there is one difference, which is that freezing will also export an
+ * underlying shared fileset BufFile for sharing.  Freezing produces TapeShare
+ * metadata for the worker when this happens, which is passed along through
+ * shared memory to leader.
+ *
+ * The leader process can then pass an array of TapeShare metadata (one per
+ * worker participant) to LogicalTapeSetCreate(), alongside a handle to a
+ * shared fileset, which is sufficient to construct a new logical tapeset that
+ * consists of each of the tapes materialized by workers.
+ *
+ * Note that while logtape.c does create an empty leader tape at the end of the
+ * tapeset in the leader case, it can never be written to due to a restriction
+ * in the shared buffile infrastructure.
+ */
+typedef struct TapeShare
+{
+   /*
+    * firstblocknumber is first block that should be read from materialized
+    * tape.
+    *
+    * buffilesize is the size of associated BufFile following freezing.
+    */
+   long        firstblocknumber;
+   off_t       buffilesize;
+} TapeShare;
+
 /*
  * prototypes for functions in logtape.c
  */
 
-extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes);
+extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes, TapeShare *shared,
+                    SharedFileSet *fileset, int worker);
 extern void LogicalTapeSetClose(LogicalTapeSet *lts);
 extern void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts);
 extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
 extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum,
                         size_t buffer_size);
 extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum);
-extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum);
+extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum,
+                 TapeShare *share);
 extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
                     size_t size);
 extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
 
  * if necessary).  It works efficiently for both small and large amounts
  * of data.  Small amounts are sorted in-memory using qsort().  Large
  * amounts are sorted using temporary files and a standard external sort
- * algorithm.
+ * algorithm.  Parallel sorts use a variant of this external sort
+ * algorithm, and are typically only used for large amounts of data.
  *
  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
 #include "access/itup.h"
 #include "executor/tuptable.h"
 #include "fmgr.h"
+#include "storage/dsm.h"
 #include "utils/relcache.h"
 
 
-/* Tuplesortstate is an opaque type whose details are not known outside
- * tuplesort.c.
+/*
+ * Tuplesortstate and Sharedsort are opaque types whose details are not
+ * known outside tuplesort.c.
  */
 typedef struct Tuplesortstate Tuplesortstate;
+typedef struct Sharedsort Sharedsort;
+
+/*
+ * Tuplesort parallel coordination state, allocated by each participant in
+ * local memory.  Participant caller initializes everything.  See usage notes
+ * below.
+ */
+typedef struct SortCoordinateData
+{
+   /* Worker process?  If not, must be leader. */
+   bool        isWorker;
+
+   /*
+    * Leader-process-passed number of participants known launched (workers
+    * set this to -1).  Includes state within leader needed for it to
+    * participate as a worker, if any.
+    */
+   int         nParticipants;
+
+   /* Private opaque state (points to shared memory) */
+   Sharedsort *sharedsort;
+} SortCoordinateData;
+
+typedef struct SortCoordinateData *SortCoordinate;
 
 /*
  * Data structures for reporting sort statistics.  Note that
  * sorting HeapTuples and two more for sorting IndexTuples.  Yet another
  * API supports sorting bare Datums.
  *
+ * Serial sort callers should pass NULL for their coordinate argument.
+ *
  * The "heap" API actually stores/sorts MinimalTuples, which means it doesn't
  * preserve the system columns (tuple identity and transaction visibility
  * info).  The sort keys are specified by column numbers within the tuples
  *
  * The "index_hash" API is similar to index_btree, but the tuples are
  * actually sorted by their hash codes not the raw data.
+ *
+ * Parallel sort callers are required to coordinate multiple tuplesort states
+ * in a leader process and one or more worker processes.  The leader process
+ * must launch workers, and have each perform an independent "partial"
+ * tuplesort, typically fed by the parallel heap interface.  The leader later
+ * produces the final output (internally, it merges runs output by workers).
+ *
+ * Callers must do the following to perform a sort in parallel using multiple
+ * worker processes:
+ *
+ * 1. Request tuplesort-private shared memory for n workers.  Use
+ *    tuplesort_estimate_shared() to get the required size.
+ * 2. Have leader process initialize allocated shared memory using
+ *    tuplesort_initialize_shared().  Launch workers.
+ * 3. Initialize a coordinate argument within both the leader process, and
+ *    for each worker process.  This has a pointer to the shared
+ *    tuplesort-private structure, as well as some caller-initialized fields.
+ *    Leader's coordinate argument reliably indicates number of workers
+ *    launched (this is unused by workers).
+ * 4. Begin a tuplesort using some appropriate tuplesort_begin* routine,
+ *    (passing the coordinate argument) within each worker.  The workMem
+ *    arguments need not be identical.  All other arguments should match
+ *    exactly, though.
+ * 5. tuplesort_attach_shared() should be called by all workers.  Feed tuples
+ *    to each worker, and call tuplesort_performsort() within each when input
+ *    is exhausted.
+ * 6. Call tuplesort_end() in each worker process.  Worker processes can shut
+ *    down once tuplesort_end() returns.
+ * 7. Begin a tuplesort in the leader using the same tuplesort_begin*
+ *    routine, passing a leader-appropriate coordinate argument (this can
+ *    happen as early as during step 3, actually, since we only need to know
+ *    the number of workers successfully launched).  The leader must now wait
+ *    for workers to finish.  Caller must use own mechanism for ensuring that
+ *    next step isn't reached until all workers have called and returned from
+ *    tuplesort_performsort().  (Note that it's okay if workers have already
+ *    also called tuplesort_end() by then.)
+ * 8. Call tuplesort_performsort() in leader.  Consume output using the
+ *    appropriate tuplesort_get* routine.  Leader can skip this step if
+ *    tuplesort turns out to be unnecessary.
+ * 9. Call tuplesort_end() in leader.
+ *
+ * This division of labor assumes nothing about how input tuples are produced,
+ * but does require that caller combine the state of multiple tuplesorts for
+ * any purpose other than producing the final output.  For example, callers
+ * must consider that tuplesort_get_stats() reports on only one worker's role
+ * in a sort (or the leader's role), and not statistics for the sort as a
+ * whole.
+ *
+ * Note that callers may use the leader process to sort runs as if it was an
+ * independent worker process (prior to the process performing a leader sort
+ * to produce the final sorted output).  Doing so only requires a second
+ * "partial" tuplesort within the leader process, initialized like that of a
+ * worker process.  The steps above don't touch on this directly.  The only
+ * difference is that the tuplesort_attach_shared() call is never needed within
+ * leader process, because the backend as a whole holds the shared fileset
+ * reference.  A worker Tuplesortstate in leader is expected to do exactly the
+ * same amount of total initial processing work as a worker process
+ * Tuplesortstate, since the leader process has nothing else to do before
+ * workers finish.
+ *
+ * Note that only a very small amount of memory will be allocated prior to
+ * the leader state first consuming input, and that workers will free the
+ * vast majority of their memory upon returning from tuplesort_performsort().
+ * Callers can rely on this to arrange for memory to be used in a way that
+ * respects a workMem-style budget across an entire parallel sort operation.
+ *
+ * Callers are responsible for parallel safety in general.  However, they
+ * can at least rely on there being no parallel safety hazards within
+ * tuplesort, because tuplesort thinks of the sort as several independent
+ * sorts whose results are combined.  Since, in general, the behavior of
+ * sort operators is immutable, caller need only worry about the parallel
+ * safety of whatever the process is through which input tuples are
+ * generated (typically, caller uses a parallel heap scan).
  */
 
 extern Tuplesortstate *tuplesort_begin_heap(TupleDesc tupDesc,
                     int nkeys, AttrNumber *attNums,
                     Oid *sortOperators, Oid *sortCollations,
                     bool *nullsFirstFlags,
-                    int workMem, bool randomAccess);
+                    int workMem, SortCoordinate coordinate,
+                    bool randomAccess);
 extern Tuplesortstate *tuplesort_begin_cluster(TupleDesc tupDesc,
-                       Relation indexRel,
-                       int workMem, bool randomAccess);
+                       Relation indexRel, int workMem,
+                       SortCoordinate coordinate, bool randomAccess);
 extern Tuplesortstate *tuplesort_begin_index_btree(Relation heapRel,
                            Relation indexRel,
                            bool enforceUnique,
-                           int workMem, bool randomAccess);
+                           int workMem, SortCoordinate coordinate,
+                           bool randomAccess);
 extern Tuplesortstate *tuplesort_begin_index_hash(Relation heapRel,
                           Relation indexRel,
                           uint32 high_mask,
                           uint32 low_mask,
                           uint32 max_buckets,
-                          int workMem, bool randomAccess);
+                          int workMem, SortCoordinate coordinate,
+                          bool randomAccess);
 extern Tuplesortstate *tuplesort_begin_datum(Oid datumType,
                      Oid sortOperator, Oid sortCollation,
                      bool nullsFirstFlag,
-                     int workMem, bool randomAccess);
+                     int workMem, SortCoordinate coordinate,
+                     bool randomAccess);
 
 extern void tuplesort_set_bound(Tuplesortstate *state, int64 bound);
 
 
 extern int tuplesort_merge_order(int64 allowedMem);
 
+extern Size tuplesort_estimate_shared(int nworkers);
+extern void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers,
+                           dsm_segment *seg);
+extern void tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg);
+
 /*
  * These routines may only be called if randomAccess was specified 'true'.
  * Likewise, backwards scan in gettuple/getdatum is only allowed if
- * randomAccess was specified.
+ * randomAccess was specified.  Note that parallel sorts do not support
+ * randomAccess.
  */
 
 extern void tuplesort_rescan(Tuplesortstate *state);
 
 BTBuildState
 BTCycleId
 BTIndexStat
+BTLeader
 BTMetaPageData
 BTOneVacInfo
 BTPS_State
 BTScanPos
 BTScanPosData
 BTScanPosItem
+BTShared
 BTSortArrayContext
 BTSpool
 BTStack
 SharedTuplestore
 SharedTuplestoreAccessor
 SharedTypmodTableEntry
+Sharedsort
 ShellTypeInfo
 ShippableCacheEntry
 ShippableCacheKey
 SortBy
 SortByDir
 SortByNulls
+SortCoordinate
+SortCoordinateData
 SortGroupClause
 SortItem
 SortPath
 TablespaceList
 TablespaceListCell
 TapeBlockTrailer
+TapeShare
 TarMethodData
 TarMethodFile
 TargetEntry