Support parallel bitmap heap scans.
authorRobert Haas <[email protected]>
Wed, 8 Mar 2017 17:05:43 +0000 (12:05 -0500)
committerRobert Haas <[email protected]>
Wed, 8 Mar 2017 17:05:43 +0000 (12:05 -0500)
The index is scanned by a single process, but then all cooperating
processes can iterate jointly over the resulting set of heap blocks.
In the future, we might also want to support using a parallel bitmap
index scan to set up for a parallel bitmap heap scan, but that's a
job for another day.

Dilip Kumar, with some corrections and cosmetic changes by me.  The
larger patch set of which this is a part has been reviewed and tested
by (at least) Andres Freund, Amit Khandekar, Tushar Ahuja, Rafia
Sabih, Haribabu Kommi, Thomas Munro, and me.

Discussion: http://postgr.es/m/CAFiTN-uc4=0WxRGfCzs-xfkMYcSEWUC-Fon6thkJGjkh9i=13A@mail.gmail.com

24 files changed:
doc/src/sgml/monitoring.sgml
src/backend/access/heap/heapam.c
src/backend/executor/execParallel.c
src/backend/executor/nodeBitmapHeapscan.c
src/backend/executor/nodeBitmapIndexscan.c
src/backend/executor/nodeBitmapOr.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/outfuncs.c
src/backend/nodes/readfuncs.c
src/backend/optimizer/path/allpaths.c
src/backend/optimizer/path/costsize.c
src/backend/optimizer/path/indxpath.c
src/backend/optimizer/plan/createplan.c
src/backend/optimizer/util/pathnode.c
src/backend/postmaster/pgstat.c
src/include/access/heapam.h
src/include/executor/nodeBitmapHeapscan.h
src/include/nodes/execnodes.h
src/include/nodes/plannodes.h
src/include/optimizer/pathnode.h
src/include/optimizer/paths.h
src/include/pgstat.h
src/test/regress/expected/select_parallel.out
src/test/regress/sql/select_parallel.sql

index 27ed35f0a7b756632af4f730a9acfa8ea495c92b..4d03531cc154b9bd06b75280b8d3314b1972f6f4 100644 (file)
@@ -1211,7 +1211,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
          <entry>Waiting in an extension.</entry>
         </row>
         <row>
-         <entry morerows="10"><literal>IPC</></entry>
+         <entry morerows="11"><literal>IPC</></entry>
          <entry><literal>BgWorkerShutdown</></entry>
          <entry>Waiting for background worker to shut down.</entry>
         </row>
@@ -1247,6 +1247,10 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
          <entry><literal>ParallelFinish</></entry>
          <entry>Waiting for parallel workers to finish computing.</entry>
         </row>
+        <row>
+         <entry><literal>ParallelBitmapPopulate</></entry>
+         <entry>Waiting for the leader to populate the TidBitmap.</entry>
+        </row>
         <row>
          <entry><literal>SafeSnapshot</></entry>
          <entry>Waiting for a snapshot for a <literal>READ ONLY DEFERRABLE</> transaction.</entry>
index af258366a207dcecdd22d29a640d0e35d06c3c77..bffc971d6894412b5acce3fe1f465f9a2401222f 100644 (file)
@@ -1753,6 +1753,22 @@ retry:
    return page;
 }
 
+/* ----------------
+ *     heap_update_snapshot
+ *
+ *     Update snapshot info in heap scan descriptor.
+ * ----------------
+ */
+void
+heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot)
+{
+   Assert(IsMVCCSnapshot(snapshot));
+
+   RegisterSnapshot(snapshot);
+   scan->rs_snapshot = snapshot;
+   scan->rs_temp_snap = true;
+}
+
 /* ----------------
  *     heap_getnext    - retrieve next tuple in scan
  *
index de0e2bafe605b9117f57b66d5e304599d8b9ea6c..a1289e5f12ef513d4b573d8b9c3f6ccd7ffdf11c 100644 (file)
@@ -25,6 +25,7 @@
 
 #include "executor/execParallel.h"
 #include "executor/executor.h"
+#include "executor/nodeBitmapHeapscan.h"
 #include "executor/nodeCustom.h"
 #include "executor/nodeForeignscan.h"
 #include "executor/nodeSeqscan.h"
@@ -217,6 +218,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
                ExecCustomScanEstimate((CustomScanState *) planstate,
                                       e->pcxt);
                break;
+           case T_BitmapHeapScanState:
+               ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
+                                      e->pcxt);
+               break;
            default:
                break;
        }
@@ -277,6 +282,11 @@ ExecParallelInitializeDSM(PlanState *planstate,
                ExecCustomScanInitializeDSM((CustomScanState *) planstate,
                                            d->pcxt);
                break;
+           case T_BitmapHeapScanState:
+               ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
+                                           d->pcxt);
+               break;
+
            default:
                break;
        }
@@ -775,6 +785,10 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
                ExecCustomScanInitializeWorker((CustomScanState *) planstate,
                                               toc);
                break;
+           case T_BitmapHeapScanState:
+               ExecBitmapHeapInitializeWorker(
+                                    (BitmapHeapScanState *) planstate, toc);
+               break;
            default:
                break;
        }
index c1aa9f13bdf5b3df609ddef3138f34c532762e87..833a93e1b7d36317b1576bdff009d17375fd29d8 100644 (file)
 
 static TupleTableSlot *BitmapHeapNext(BitmapHeapScanState *node);
 static void bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres);
+static inline void BitmapDoneInitializingSharedState(
+                                 ParallelBitmapHeapState *pstate);
 static inline void BitmapAdjustPrefetchIterator(BitmapHeapScanState *node,
                             TBMIterateResult *tbmres);
 static inline void BitmapAdjustPrefetchTarget(BitmapHeapScanState *node);
 static inline void BitmapPrefetch(BitmapHeapScanState *node,
               HeapScanDesc scan);
+static bool BitmapShouldInitializeSharedState(
+                                 ParallelBitmapHeapState *pstate);
 
 
 /* ----------------------------------------------------------------
@@ -73,9 +77,12 @@ BitmapHeapNext(BitmapHeapScanState *node)
    HeapScanDesc scan;
    TIDBitmap  *tbm;
    TBMIterator *tbmiterator;
+   TBMSharedIterator *shared_tbmiterator;
    TBMIterateResult *tbmres;
    OffsetNumber targoffset;
    TupleTableSlot *slot;
+   ParallelBitmapHeapState *pstate = node->pstate;
+   dsa_area   *dsa = node->ss.ps.state->es_query_dsa;
 
    /*
     * extract necessary information from index scan node
@@ -84,7 +91,10 @@ BitmapHeapNext(BitmapHeapScanState *node)
    slot = node->ss.ss_ScanTupleSlot;
    scan = node->ss.ss_currentScanDesc;
    tbm = node->tbm;
-   tbmiterator = node->tbmiterator;
+   if (pstate == NULL)
+       tbmiterator = node->tbmiterator;
+   else
+       shared_tbmiterator = node->shared_tbmiterator;
    tbmres = node->tbmres;
 
    /*
@@ -99,25 +109,82 @@ BitmapHeapNext(BitmapHeapScanState *node)
     * node->prefetch_maximum.  This is to avoid doing a lot of prefetching in
     * a scan that stops after a few tuples because of a LIMIT.
     */
-   if (tbm == NULL)
+   if (!node->initialized)
    {
-       tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node));
+       if (!pstate)
+       {
+           tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node));
 
-       if (!tbm || !IsA(tbm, TIDBitmap))
-           elog(ERROR, "unrecognized result from subplan");
+           if (!tbm || !IsA(tbm, TIDBitmap))
+               elog(ERROR, "unrecognized result from subplan");
 
-       node->tbm = tbm;
-       node->tbmiterator = tbmiterator = tbm_begin_iterate(tbm);
-       node->tbmres = tbmres = NULL;
+           node->tbm = tbm;
+           node->tbmiterator = tbmiterator = tbm_begin_iterate(tbm);
+           node->tbmres = tbmres = NULL;
 
 #ifdef USE_PREFETCH
-       if (node->prefetch_maximum > 0)
-       {
-           node->prefetch_iterator = tbm_begin_iterate(tbm);
-           node->prefetch_pages = 0;
-           node->prefetch_target = -1;
+           if (node->prefetch_maximum > 0)
+           {
+               node->prefetch_iterator = tbm_begin_iterate(tbm);
+               node->prefetch_pages = 0;
+               node->prefetch_target = -1;
+           }
+#endif   /* USE_PREFETCH */
        }
+       else
+       {
+           /*
+            * The leader will immediately come out of the function, but
+            * others will be blocked until leader populates the TBM and wakes
+            * them up.
+            */
+           if (BitmapShouldInitializeSharedState(pstate))
+           {
+               tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node));
+               if (!tbm || !IsA(tbm, TIDBitmap))
+                   elog(ERROR, "unrecognized result from subplan");
+
+               node->tbm = tbm;
+
+               /*
+                * Prepare to iterate over the TBM. This will return the
+                * dsa_pointer of the iterator state which will be used by
+                * multiple processes to iterate jointly.
+                */
+               pstate->tbmiterator = tbm_prepare_shared_iterate(tbm);
+#ifdef USE_PREFETCH
+               if (node->prefetch_maximum > 0)
+               {
+                   pstate->prefetch_iterator =
+                       tbm_prepare_shared_iterate(tbm);
+
+                   /*
+                    * We don't need the mutex here as we haven't yet woke up
+                    * others.
+                    */
+                   pstate->prefetch_pages = 0;
+                   pstate->prefetch_target = -1;
+               }
+#endif
+
+               /* We have initialized the shared state so wake up others. */
+               BitmapDoneInitializingSharedState(pstate);
+           }
+
+           /* Allocate a private iterator and attach the shared state to it */
+           node->shared_tbmiterator = shared_tbmiterator =
+               tbm_attach_shared_iterate(dsa, pstate->tbmiterator);
+           node->tbmres = tbmres = NULL;
+
+#ifdef USE_PREFETCH
+           if (node->prefetch_maximum > 0)
+           {
+               node->shared_prefetch_iterator =
+                   tbm_attach_shared_iterate(dsa, pstate->prefetch_iterator);
+           }
 #endif   /* USE_PREFETCH */
+       }
+       node->initialized = true;
    }
 
    for (;;)
@@ -130,7 +197,10 @@ BitmapHeapNext(BitmapHeapScanState *node)
         */
        if (tbmres == NULL)
        {
-           node->tbmres = tbmres = tbm_iterate(tbmiterator);
+           if (!pstate)
+               node->tbmres = tbmres = tbm_iterate(tbmiterator);
+           else
+               node->tbmres = tbmres = tbm_shared_iterate(shared_tbmiterator);
            if (tbmres == NULL)
            {
                /* no more entries in the bitmap */
@@ -182,8 +252,19 @@ BitmapHeapNext(BitmapHeapScanState *node)
             * Try to prefetch at least a few pages even before we get to the
             * second page if we don't stop reading after the first tuple.
             */
-           if (node->prefetch_target < node->prefetch_maximum)
-               node->prefetch_target++;
+           if (!pstate)
+           {
+               if (node->prefetch_target < node->prefetch_maximum)
+                   node->prefetch_target++;
+           }
+           else if (pstate->prefetch_target < node->prefetch_maximum)
+           {
+               /* take spinlock while updating shared state */
+               SpinLockAcquire(&pstate->mutex);
+               if (pstate->prefetch_target < node->prefetch_maximum)
+                   pstate->prefetch_target++;
+               SpinLockRelease(&pstate->mutex);
+           }
 #endif   /* USE_PREFETCH */
        }
 
@@ -361,6 +442,21 @@ bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres)
    scan->rs_ntuples = ntup;
 }
 
+/*
+ * BitmapDoneInitializingSharedState - Shared state is initialized
+ *
+ * By this time the leader has already populated the TBM and initialized the
+ * shared state so wake up other processes.
+ */
+static inline void
+BitmapDoneInitializingSharedState(ParallelBitmapHeapState *pstate)
+{
+   SpinLockAcquire(&pstate->mutex);
+   pstate->state = BM_FINISHED;
+   SpinLockRelease(&pstate->mutex);
+   ConditionVariableBroadcast(&pstate->cv);
+}
+
 /*
  * BitmapAdjustPrefetchIterator - Adjust the prefetch iterator
  */
@@ -369,20 +465,53 @@ BitmapAdjustPrefetchIterator(BitmapHeapScanState *node,
                             TBMIterateResult *tbmres)
 {
 #ifdef USE_PREFETCH
-   TBMIterator *prefetch_iterator = node->prefetch_iterator;
+   ParallelBitmapHeapState *pstate = node->pstate;
 
-   if (node->prefetch_pages > 0)
+   if (pstate == NULL)
    {
-       /* The main iterator has closed the distance by one page */
-       node->prefetch_pages--;
+       TBMIterator *prefetch_iterator = node->prefetch_iterator;
+
+       if (node->prefetch_pages > 0)
+       {
+           /* The main iterator has closed the distance by one page */
+           node->prefetch_pages--;
+       }
+       else if (prefetch_iterator)
+       {
+           /* Do not let the prefetch iterator get behind the main one */
+           TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+
+           if (tbmpre == NULL || tbmpre->blockno != tbmres->blockno)
+               elog(ERROR, "prefetch and main iterators are out of sync");
+       }
+       return;
    }
-   else if (prefetch_iterator)
+
+   if (node->prefetch_maximum > 0)
    {
-       /* Do not let the prefetch iterator get behind the main one */
-       TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+       TBMSharedIterator *prefetch_iterator = node->shared_prefetch_iterator;
+
+       SpinLockAcquire(&pstate->mutex);
+       if (pstate->prefetch_pages > 0)
+       {
+           node->prefetch_pages--;
+           SpinLockRelease(&pstate->mutex);
+       }
+       else
+       {
+           /* Release the mutex before iterating */
+           SpinLockRelease(&pstate->mutex);
 
-       if (tbmpre == NULL || tbmpre->blockno != tbmres->blockno)
-           elog(ERROR, "prefetch and main iterators are out of sync");
+           /*
+            * In case of shared mode, we can not ensure that the current
+            * blockno of the main iterator and that of the prefetch iterator
+            * are same.  It's possible that whatever blockno we are
+            * prefetching will be processed by another process.  Therefore, we
+            * don't validate the blockno here as we do in non-parallel case.
+            */
+           if (prefetch_iterator)
+               tbm_shared_iterate(prefetch_iterator);
+       }
    }
 #endif   /* USE_PREFETCH */
 }
@@ -399,14 +528,35 @@ static inline void
 BitmapAdjustPrefetchTarget(BitmapHeapScanState *node)
 {
 #ifdef USE_PREFETCH
-   if (node->prefetch_target >= node->prefetch_maximum)
-        /* don't increase any further */ ;
-   else if (node->prefetch_target >= node->prefetch_maximum / 2)
-       node->prefetch_target = node->prefetch_maximum;
-   else if (node->prefetch_target > 0)
-       node->prefetch_target *= 2;
-   else
-       node->prefetch_target++;
+   ParallelBitmapHeapState *pstate = node->pstate;
+
+   if (pstate == NULL)
+   {
+       if (node->prefetch_target >= node->prefetch_maximum)
+            /* don't increase any further */ ;
+       else if (node->prefetch_target >= node->prefetch_maximum / 2)
+           node->prefetch_target = node->prefetch_maximum;
+       else if (node->prefetch_target > 0)
+           node->prefetch_target *= 2;
+       else
+           node->prefetch_target++;
+       return;
+   }
+
+   /* Do an unlocked check first to save spinlock acquisitions. */
+   if (pstate->prefetch_target < node->prefetch_maximum)
+   {
+       SpinLockAcquire(&pstate->mutex);
+       if (pstate->prefetch_target >= node->prefetch_maximum)
+            /* don't increase any further */ ;
+       else if (pstate->prefetch_target >= node->prefetch_maximum / 2)
+           pstate->prefetch_target = node->prefetch_maximum;
+       else if (pstate->prefetch_target > 0)
+           pstate->prefetch_target *= 2;
+       else
+           pstate->prefetch_target++;
+       SpinLockRelease(&pstate->mutex);
+   }
 #endif   /* USE_PREFETCH */
 }
 
@@ -417,23 +567,70 @@ static inline void
 BitmapPrefetch(BitmapHeapScanState *node, HeapScanDesc scan)
 {
 #ifdef USE_PREFETCH
-   TBMIterator *prefetch_iterator = node->prefetch_iterator;
+   ParallelBitmapHeapState *pstate = node->pstate;
 
-   if (prefetch_iterator)
+   if (pstate == NULL)
    {
-       while (node->prefetch_pages < node->prefetch_target)
+       TBMIterator *prefetch_iterator = node->prefetch_iterator;
+
+       if (prefetch_iterator)
        {
-           TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+           while (node->prefetch_pages < node->prefetch_target)
+           {
+               TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+
+               if (tbmpre == NULL)
+               {
+                   /* No more pages to prefetch */
+                   tbm_end_iterate(prefetch_iterator);
+                   node->prefetch_iterator = NULL;
+                   break;
+               }
+               node->prefetch_pages++;
+               PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
+           }
+       }
+
+       return;
+   }
 
-           if (tbmpre == NULL)
+   if (pstate->prefetch_pages < pstate->prefetch_target)
+   {
+       TBMSharedIterator *prefetch_iterator = node->shared_prefetch_iterator;
+
+       if (prefetch_iterator)
+       {
+           while (1)
            {
-               /* No more pages to prefetch */
-               tbm_end_iterate(prefetch_iterator);
-               node->prefetch_iterator = NULL;
-               break;
+               TBMIterateResult *tbmpre;
+               bool        do_prefetch = false;
+
+               /*
+                * Recheck under the mutex. If some other process has already
+                * done enough prefetching then we need not to do anything.
+                */
+               SpinLockAcquire(&pstate->mutex);
+               if (pstate->prefetch_pages < pstate->prefetch_target)
+               {
+                   pstate->prefetch_pages++;
+                   do_prefetch = true;
+               }
+               SpinLockRelease(&pstate->mutex);
+
+               if (!do_prefetch)
+                   return;
+
+               tbmpre = tbm_shared_iterate(prefetch_iterator);
+               if (tbmpre == NULL)
+               {
+                   /* No more pages to prefetch */
+                   tbm_end_shared_iterate(prefetch_iterator);
+                   node->shared_prefetch_iterator = NULL;
+                   break;
+               }
+
+               PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
            }
-           node->prefetch_pages++;
-           PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
        }
    }
 #endif   /* USE_PREFETCH */
@@ -488,12 +685,36 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
        tbm_end_iterate(node->tbmiterator);
    if (node->prefetch_iterator)
        tbm_end_iterate(node->prefetch_iterator);
+   if (node->shared_tbmiterator)
+       tbm_end_shared_iterate(node->shared_tbmiterator);
+   if (node->shared_prefetch_iterator)
+       tbm_end_shared_iterate(node->shared_prefetch_iterator);
    if (node->tbm)
        tbm_free(node->tbm);
    node->tbm = NULL;
    node->tbmiterator = NULL;
    node->tbmres = NULL;
    node->prefetch_iterator = NULL;
+   node->initialized = false;
+   node->shared_tbmiterator = NULL;
+   node->shared_prefetch_iterator = NULL;
+
+   /* Reset parallel bitmap state, if present */
+   if (node->pstate)
+   {
+       dsa_area   *dsa = node->ss.ps.state->es_query_dsa;
+
+       node->pstate->state = BM_INITIAL;
+
+       if (DsaPointerIsValid(node->pstate->tbmiterator))
+           tbm_free_shared_area(dsa, node->pstate->tbmiterator);
+
+       if (DsaPointerIsValid(node->pstate->prefetch_iterator))
+           tbm_free_shared_area(dsa, node->pstate->prefetch_iterator);
+
+       node->pstate->tbmiterator = InvalidDsaPointer;
+       node->pstate->prefetch_iterator = InvalidDsaPointer;
+   }
 
    ExecScanReScan(&node->ss);
 
@@ -546,6 +767,10 @@ ExecEndBitmapHeapScan(BitmapHeapScanState *node)
        tbm_end_iterate(node->prefetch_iterator);
    if (node->tbm)
        tbm_free(node->tbm);
+   if (node->shared_tbmiterator)
+       tbm_end_shared_iterate(node->shared_tbmiterator);
+   if (node->shared_prefetch_iterator)
+       tbm_end_shared_iterate(node->shared_prefetch_iterator);
 
    /*
     * close heap scan
@@ -597,6 +822,10 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
    scanstate->prefetch_target = 0;
    /* may be updated below */
    scanstate->prefetch_maximum = target_prefetch_pages;
+   scanstate->pscan_len = 0;
+   scanstate->initialized = false;
+   scanstate->shared_tbmiterator = NULL;
+   scanstate->pstate = NULL;
 
    /*
     * Miscellaneous initialization
@@ -681,3 +910,108 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
     */
    return scanstate;
 }
+
+/*----------------
+ *     BitmapShouldInitializeSharedState
+ *
+ *     The first process to come here and see the state to the BM_INITIAL
+ *     will become the leader for the parallel bitmap scan and will be
+ *     responsible for populating the TIDBitmap.  The other processes will
+ *     be blocked by the condition variable until the leader wakes them up.
+ * ---------------
+ */
+static bool
+BitmapShouldInitializeSharedState(ParallelBitmapHeapState *pstate)
+{
+   SharedBitmapState state;
+
+   while (1)
+   {
+       SpinLockAcquire(&pstate->mutex);
+       state = pstate->state;
+       if (pstate->state == BM_INITIAL)
+           pstate->state = BM_INPROGRESS;
+       SpinLockRelease(&pstate->mutex);
+
+       /* Exit if bitmap is done, or if we're the leader. */
+       if (state != BM_INPROGRESS)
+           break;
+
+       /* Wait for the leader to wake us up. */
+       ConditionVariableSleep(&pstate->cv, WAIT_EVENT_PARALLEL_BITMAP_SCAN);
+   }
+
+   ConditionVariableCancelSleep();
+
+   return (state == BM_INITIAL);
+}
+
+/* ----------------------------------------------------------------
+ *     ExecBitmapHeapEstimate
+ *
+ *     estimates the space required to serialize bitmap scan node.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapEstimate(BitmapHeapScanState *node,
+                      ParallelContext *pcxt)
+{
+   EState     *estate = node->ss.ps.state;
+
+   node->pscan_len = add_size(offsetof(ParallelBitmapHeapState,
+                                       phs_snapshot_data),
+                              EstimateSnapshotSpace(estate->es_snapshot));
+
+   shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+   shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ *     ExecBitmapHeapInitializeDSM
+ *
+ *     Set up a parallel bitmap heap scan descriptor.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
+                           ParallelContext *pcxt)
+{
+   ParallelBitmapHeapState *pstate;
+   EState     *estate = node->ss.ps.state;
+
+   pstate = shm_toc_allocate(pcxt->toc, node->pscan_len);
+
+   pstate->tbmiterator = 0;
+   pstate->prefetch_iterator = 0;
+
+   /* Initialize the mutex */
+   SpinLockInit(&pstate->mutex);
+   pstate->prefetch_pages = 0;
+   pstate->prefetch_target = 0;
+   pstate->state = BM_INITIAL;
+
+   ConditionVariableInit(&pstate->cv);
+   SerializeSnapshot(estate->es_snapshot, pstate->phs_snapshot_data);
+
+   shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pstate);
+   node->pstate = pstate;
+}
+
+/* ----------------------------------------------------------------
+ *     ExecBitmapHeapInitializeWorker
+ *
+ *     Copy relevant information from TOC into planstate.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, shm_toc *toc)
+{
+   ParallelBitmapHeapState *pstate;
+   Snapshot    snapshot;
+
+   pstate = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id);
+   node->pstate = pstate;
+
+   snapshot = RestoreSnapshot(pstate->phs_snapshot_data);
+   heap_update_snapshot(node->ss.ss_currentScanDesc, snapshot);
+}
index 94bb289f1bb32d31d83446d9da97476f67330989..ce2f3210a4a01d00d30b48b8430be5dac625346b 100644 (file)
@@ -78,7 +78,9 @@ MultiExecBitmapIndexScan(BitmapIndexScanState *node)
    else
    {
        /* XXX should we use less than work_mem for this? */
-       tbm = tbm_create(work_mem * 1024L, NULL);
+       tbm = tbm_create(work_mem * 1024L,
+                        ((BitmapIndexScan *) node->ss.ps.plan)->isshared ?
+                        node->ss.ps.state->es_query_dsa : NULL);
    }
 
    /*
index 1d280beddc171e00290bd4cb68716059b1379cf7..c0f261407bfb1072914ad18cbe563e130894e34f 100644 (file)
@@ -129,7 +129,9 @@ MultiExecBitmapOr(BitmapOrState *node)
            if (result == NULL) /* first subplan */
            {
                /* XXX should we use less than work_mem for this? */
-               result = tbm_create(work_mem * 1024L, NULL);
+               result = tbm_create(work_mem * 1024L,
+                                   ((BitmapOr *) node->ps.plan)->isshared ?
+                                   node->ps.state->es_query_dsa : NULL);
            }
 
            ((BitmapIndexScanState *) subnode)->biss_result = result;
index b3eac06c50ee664ec225d43e48b8f6d16792a95b..ac8e50ef1dc98b89e71dbcd9d52d84fa987f3918 100644 (file)
@@ -331,6 +331,7 @@ _copyBitmapOr(const BitmapOr *from)
    /*
     * copy remainder of node
     */
+   COPY_SCALAR_FIELD(isshared);
    COPY_NODE_FIELD(bitmapplans);
 
    return newnode;
@@ -496,6 +497,7 @@ _copyBitmapIndexScan(const BitmapIndexScan *from)
     * copy remainder of node
     */
    COPY_SCALAR_FIELD(indexid);
+   COPY_SCALAR_FIELD(isshared);
    COPY_NODE_FIELD(indexqual);
    COPY_NODE_FIELD(indexqualorig);
 
index d4297d11b1890b4a2769ea33e3b6ded089dfa085..825a7b283a348148e6935604eb95983e1bd57e17 100644 (file)
@@ -441,6 +441,7 @@ _outBitmapOr(StringInfo str, const BitmapOr *node)
 
    _outPlanInfo(str, (const Plan *) node);
 
+   WRITE_BOOL_FIELD(isshared);
    WRITE_NODE_FIELD(bitmapplans);
 }
 
@@ -520,6 +521,7 @@ _outBitmapIndexScan(StringInfo str, const BitmapIndexScan *node)
    _outScanInfo(str, (const Scan *) node);
 
    WRITE_OID_FIELD(indexid);
+   WRITE_BOOL_FIELD(isshared);
    WRITE_NODE_FIELD(indexqual);
    WRITE_NODE_FIELD(indexqualorig);
 }
index b02d9fa24693562f689c5a5c45211194b80ec5dd..8f39d93a123c99abe734da3dbda1b121446d206e 100644 (file)
@@ -1633,6 +1633,7 @@ _readBitmapOr(void)
 
    ReadCommonPlan(&local_node->plan);
 
+   READ_BOOL_FIELD(isshared);
    READ_NODE_FIELD(bitmapplans);
 
    READ_DONE();
@@ -1744,6 +1745,7 @@ _readBitmapIndexScan(void)
    ReadCommonScan(&local_node->scan);
 
    READ_OID_FIELD(indexid);
+   READ_BOOL_FIELD(isshared);
    READ_NODE_FIELD(indexqual);
    READ_NODE_FIELD(indexqualorig);
 
index 932c84c949befb99a0014c46bc651230ea7e1762..fbb2cda9d73bd89579ee0c2fa75c38b16ca1397e 100644 (file)
@@ -2911,6 +2911,30 @@ remove_unused_subquery_outputs(Query *subquery, RelOptInfo *rel)
    }
 }
 
+/*
+ * create_partial_bitmap_paths
+ *   Build partial bitmap heap path for the relation
+ */
+void
+create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel,
+                           Path *bitmapqual)
+{
+   int         parallel_workers;
+   double      pages_fetched;
+
+   /* Compute heap pages for bitmap heap scan */
+   pages_fetched = compute_bitmap_pages(root, rel, bitmapqual, 1.0,
+                                        NULL, NULL);
+
+   parallel_workers = compute_parallel_worker(rel, pages_fetched, 0);
+
+   if (parallel_workers <= 0)
+       return;
+
+   add_partial_path(rel, (Path *) create_bitmap_heap_path(root, rel,
+                   bitmapqual, rel->lateral_relids, 1.0, parallel_workers));
+}
+
 /*
  * Compute the number of parallel workers that should be used to scan a
  * relation.  We compute the parallel workers based on the size of the heap to
index 3eaed5af7a56a38e6400fe13b3efc4c777e2719a..627e3f1b954121337783db5834b7385f9b3c2750 100644 (file)
@@ -860,6 +860,7 @@ cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
    QualCost    qpqual_cost;
    Cost        cpu_per_tuple;
    Cost        cost_per_page;
+   Cost        cpu_run_cost;
    double      tuples_fetched;
    double      pages_fetched;
    double      spc_seq_page_cost,
@@ -921,8 +922,21 @@ cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
 
    startup_cost += qpqual_cost.startup;
    cpu_per_tuple = cpu_tuple_cost + qpqual_cost.per_tuple;
+   cpu_run_cost = cpu_per_tuple * tuples_fetched;
+
+   /* Adjust costing for parallelism, if used. */
+   if (path->parallel_workers > 0)
+   {
+       double      parallel_divisor = get_parallel_divisor(path);
+
+       /* The CPU cost is divided among all the workers. */
+       cpu_run_cost /= parallel_divisor;
 
-   run_cost += cpu_per_tuple * tuples_fetched;
+       path->rows = clamp_row_est(path->rows / parallel_divisor);
+   }
+
+
+   run_cost += cpu_run_cost;
 
    /* tlist eval costs are paid per output row, not per tuple scanned */
    startup_cost += path->pathtarget->cost.startup;
index d8e5b811268be0c7390433200c17c0c443c64137..c2b72d410af0a916f5b782064f1dc53b49813f64 100644 (file)
@@ -337,8 +337,12 @@ create_index_paths(PlannerInfo *root, RelOptInfo *rel)
 
        bitmapqual = choose_bitmap_and(root, rel, bitindexpaths);
        bpath = create_bitmap_heap_path(root, rel, bitmapqual,
-                                       rel->lateral_relids, 1.0);
+                                       rel->lateral_relids, 1.0, 0);
        add_path(rel, (Path *) bpath);
+
+       /* create a partial bitmap heap path */
+       if (rel->consider_parallel && rel->lateral_relids == NULL)
+           create_partial_bitmap_paths(root, rel, bitmapqual);
    }
 
    /*
@@ -410,7 +414,7 @@ create_index_paths(PlannerInfo *root, RelOptInfo *rel)
            required_outer = get_bitmap_tree_required_outer(bitmapqual);
            loop_count = get_loop_count(root, rel->relid, required_outer);
            bpath = create_bitmap_heap_path(root, rel, bitmapqual,
-                                           required_outer, loop_count);
+                                           required_outer, loop_count, 0);
            add_path(rel, (Path *) bpath);
        }
    }
@@ -1617,6 +1621,11 @@ bitmap_scan_cost_est(PlannerInfo *root, RelOptInfo *rel, Path *ipath)
    bpath.path.pathkeys = NIL;
    bpath.bitmapqual = ipath;
 
+   /*
+    * Check the cost of temporary path without considering parallelism.
+    * Parallel bitmap heap path will be considered at later stage.
+    */
+   bpath.path.parallel_workers = 0;
    cost_bitmap_heap_scan(&bpath.path, root, rel,
                          bpath.path.param_info,
                          ipath,
@@ -1659,6 +1668,12 @@ bitmap_and_cost_est(PlannerInfo *root, RelOptInfo *rel, List *paths)
    bpath.path.pathkeys = NIL;
    bpath.bitmapqual = (Path *) &apath;
 
+   /*
+    * Check the cost of temporary path without considering parallelism.
+    * Parallel bitmap heap path will be considered at later stage.
+    */
+   bpath.path.parallel_workers = 0;
+
    /* Now we can do cost_bitmap_heap_scan */
    cost_bitmap_heap_scan(&bpath.path, root, rel,
                          bpath.path.param_info,
index f1c7f609c0a706d1225304fbc129c80dad0ba703..8f8663c1e1408be5c10b5734046d988f6cfdd8d1 100644 (file)
@@ -125,6 +125,7 @@ static BitmapHeapScan *create_bitmap_scan_plan(PlannerInfo *root,
                        List *tlist, List *scan_clauses);
 static Plan *create_bitmap_subplan(PlannerInfo *root, Path *bitmapqual,
                      List **qual, List **indexqual, List **indexECs);
+static void bitmap_subplan_mark_shared(Plan *plan);
 static TidScan *create_tidscan_plan(PlannerInfo *root, TidPath *best_path,
                    List *tlist, List *scan_clauses);
 static SubqueryScan *create_subqueryscan_plan(PlannerInfo *root,
@@ -2590,6 +2591,9 @@ create_bitmap_scan_plan(PlannerInfo *root,
                                           &bitmapqualorig, &indexquals,
                                           &indexECs);
 
+   if (best_path->path.parallel_aware)
+       bitmap_subplan_mark_shared(bitmapqualplan);
+
    /*
     * The qpqual list must contain all restrictions not automatically handled
     * by the index, other than pseudoconstant clauses which will be handled
@@ -4756,6 +4760,24 @@ label_sort_with_costsize(PlannerInfo *root, Sort *plan, double limit_tuples)
    plan->plan.parallel_aware = false;
 }
 
+/*
+ * bitmap_subplan_mark_shared
+ *   Set isshared flag in bitmap subplan so that it will be created in
+ *  shared memory.
+ */
+static void
+bitmap_subplan_mark_shared(Plan *plan)
+{
+   if (IsA(plan, BitmapAnd))
+       bitmap_subplan_mark_shared(
+                               linitial(((BitmapAnd *) plan)->bitmapplans));
+   else if (IsA(plan, BitmapOr))
+       ((BitmapOr *) plan)->isshared = true;
+   else if (IsA(plan, BitmapIndexScan))
+       ((BitmapIndexScan *) plan)->isshared = true;
+   else
+       elog(ERROR, "unrecognized node type: %d", nodeTag(plan));
+}
 
 /*****************************************************************************
  *
index 86aee2f8ec37cc1e44b23dfb0ad9a1806613afd7..0d925c6fcbf816690e1cd32604ace7cdda9b946b 100644 (file)
@@ -1068,7 +1068,8 @@ create_bitmap_heap_path(PlannerInfo *root,
                        RelOptInfo *rel,
                        Path *bitmapqual,
                        Relids required_outer,
-                       double loop_count)
+                       double loop_count,
+                       int parallel_degree)
 {
    BitmapHeapPath *pathnode = makeNode(BitmapHeapPath);
 
@@ -1077,9 +1078,9 @@ create_bitmap_heap_path(PlannerInfo *root,
    pathnode->path.pathtarget = rel->reltarget;
    pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
                                                          required_outer);
-   pathnode->path.parallel_aware = false;
+   pathnode->path.parallel_aware = parallel_degree > 0 ? true : false;
    pathnode->path.parallel_safe = rel->consider_parallel;
-   pathnode->path.parallel_workers = 0;
+   pathnode->path.parallel_workers = parallel_degree;
    pathnode->path.pathkeys = NIL;      /* always unordered */
 
    pathnode->bitmapqual = bitmapqual;
@@ -3281,7 +3282,7 @@ reparameterize_path(PlannerInfo *root, Path *path,
                                                        rel,
                                                        bpath->bitmapqual,
                                                        required_outer,
-                                                       loop_count);
+                                                       loop_count, 0);
            }
        case T_SubqueryScan:
            {
index 2fb9a8bf580639d50ca1ea224cae434d9678bfd1..7cacb1e9b24989e91369abc6c64517d166276e0d 100644 (file)
@@ -3395,6 +3395,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
        case WAIT_EVENT_PARALLEL_FINISH:
            event_name = "ParallelFinish";
            break;
+       case WAIT_EVENT_PARALLEL_BITMAP_SCAN:
+           event_name = "ParallelBitmapScan";
+           break;
        case WAIT_EVENT_SAFE_SNAPSHOT:
            event_name = "SafeSnapshot";
            break;
index a864f7860d8b6ac16f91d77a341b4931048f0f65..7e85510d2fd4c305a26f500db689adca9b341742 100644 (file)
@@ -179,6 +179,7 @@ extern void simple_heap_update(Relation relation, ItemPointer otid,
                   HeapTuple tup);
 
 extern void heap_sync(Relation relation);
+extern void heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot);
 
 /* in heap/pruneheap.c */
 extern void heap_page_prune_opt(Relation relation, Buffer buffer);
index d7659b94e61591d54c20b5636210d8f0ac3b70df..465c58e6ee54f6720aeea00f2700324273cf4006 100644 (file)
 #define NODEBITMAPHEAPSCAN_H
 
 #include "nodes/execnodes.h"
+#include "access/parallel.h"
 
 extern BitmapHeapScanState *ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecBitmapHeapScan(BitmapHeapScanState *node);
 extern void ExecEndBitmapHeapScan(BitmapHeapScanState *node);
 extern void ExecReScanBitmapHeapScan(BitmapHeapScanState *node);
+extern void ExecBitmapHeapEstimate(BitmapHeapScanState *node,
+                      ParallelContext *pcxt);
+extern void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
+                           ParallelContext *pcxt);
+extern void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node,
+                              shm_toc *toc);
 
 #endif   /* NODEBITMAPHEAPSCAN_H */
index 2fde67a9c8d9fd605b7b237af9e8abe1a63e365a..6a0d590ef2974c55e937ca694fda94e01cec40a7 100644 (file)
@@ -26,6 +26,8 @@
 #include "utils/sortsupport.h"
 #include "utils/tuplestore.h"
 #include "utils/tuplesort.h"
+#include "nodes/tidbitmap.h"
+#include "storage/condition_variable.h"
 
 
 /* ----------------
@@ -1464,6 +1466,51 @@ typedef struct BitmapIndexScanState
    IndexScanDesc biss_ScanDesc;
 } BitmapIndexScanState;
 
+/* ----------------
+ *  SharedBitmapState information
+ *
+ *     BM_INITIAL      TIDBitmap creation is not yet started, so first worker
+ *                     to see this state will set the state to BM_INPROGRESS
+ *                     and that process will be responsible for creating
+ *                     TIDBitmap.
+ *     BM_INPROGRESS   TIDBitmap creation is in progress; workers need to
+ *                     sleep until it's finished.
+ *     BM_FINISHED     TIDBitmap creation is done, so now all workers can
+ *                     proceed to iterate over TIDBitmap.
+ * ----------------
+ */
+typedef enum
+{
+   BM_INITIAL,
+   BM_INPROGRESS,
+   BM_FINISHED
+} SharedBitmapState;
+
+/* ----------------
+ *  ParallelBitmapHeapState information
+ *     tbmiterator             iterator for scanning current pages
+ *     prefetch_iterator       iterator for prefetching ahead of current page
+ *     mutex                   mutual exclusion for the prefetching variable
+ *                             and state
+ *     prefetch_pages          # pages prefetch iterator is ahead of current
+ *     prefetch_target         current target prefetch distance
+ *     state                   current state of the TIDBitmap
+ *     cv                      conditional wait variable
+ *     phs_snapshot_data       snapshot data shared to workers
+ * ----------------
+ */
+typedef struct ParallelBitmapHeapState
+{
+   dsa_pointer tbmiterator;
+   dsa_pointer prefetch_iterator;
+   slock_t     mutex;
+   int         prefetch_pages;
+   int         prefetch_target;
+   SharedBitmapState state;
+   ConditionVariable cv;
+   char        phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
+} ParallelBitmapHeapState;
+
 /* ----------------
  *  BitmapHeapScanState information
  *
@@ -1477,6 +1524,11 @@ typedef struct BitmapIndexScanState
  *     prefetch_pages     # pages prefetch iterator is ahead of current
  *     prefetch_target    current target prefetch distance
  *     prefetch_maximum   maximum value for prefetch_target
+ *     pscan_len          size of the shared memory for parallel bitmap
+ *     initialized        is node is ready to iterate
+ *     shared_tbmiterator     shared iterator
+ *     shared_prefetch_iterator shared iterator for prefetching
+ *     pstate             shared state for parallel bitmap scan
  * ----------------
  */
 typedef struct BitmapHeapScanState
@@ -1492,6 +1544,11 @@ typedef struct BitmapHeapScanState
    int         prefetch_pages;
    int         prefetch_target;
    int         prefetch_maximum;
+   Size        pscan_len;
+   bool        initialized;
+   TBMSharedIterator *shared_tbmiterator;
+   TBMSharedIterator *shared_prefetch_iterator;
+   ParallelBitmapHeapState *pstate;
 } BitmapHeapScanState;
 
 /* ----------------
index e30ed6aa29b54ce5caae32d26aec7eaeddbc1b84..7fbb0c2c77e550321fc25fef293fd1244fd80256 100644 (file)
@@ -292,6 +292,7 @@ typedef struct BitmapAnd
 typedef struct BitmapOr
 {
    Plan        plan;
+   bool        isshared;
    List       *bitmapplans;
 } BitmapOr;
 
@@ -420,6 +421,7 @@ typedef struct BitmapIndexScan
 {
    Scan        scan;
    Oid         indexid;        /* OID of index to scan */
+   bool        isshared;       /* Create shared bitmap if set */
    List       *indexqual;      /* list of index quals (OpExprs) */
    List       *indexqualorig;  /* the same in original form */
 } BitmapIndexScan;
index befe5781416f1a71eea32b5d8befca6aca3f4737..f0fe8307224218bbf9042ea7746b3550e1c556c5 100644 (file)
@@ -53,7 +53,8 @@ extern BitmapHeapPath *create_bitmap_heap_path(PlannerInfo *root,
                        RelOptInfo *rel,
                        Path *bitmapqual,
                        Relids required_outer,
-                       double loop_count);
+                       double loop_count,
+                       int parallel_degree);
 extern BitmapAndPath *create_bitmap_and_path(PlannerInfo *root,
                       RelOptInfo *rel,
                       List *bitmapquals);
index bc0dcf4468f2eb1d780fb35d5a826e6bbda2ecd3..247fd118793229c5eb53e4e5a150be49a963e895 100644 (file)
@@ -56,6 +56,8 @@ extern RelOptInfo *standard_join_search(PlannerInfo *root, int levels_needed,
 extern void generate_gather_paths(PlannerInfo *root, RelOptInfo *rel);
 extern int compute_parallel_worker(RelOptInfo *rel, BlockNumber heap_pages,
                        BlockNumber index_pages);
+extern void create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel,
+                                       Path *bitmapqual);
 
 #ifdef OPTIMIZER_DEBUG
 extern void debug_print_rel(PlannerInfo *root, RelOptInfo *rel);
index 0062fb8af24d0af80e4acd089f83b32013b81168..60c78d118f9ca55f80e66c9cd4de484b4b6da353 100644 (file)
@@ -787,6 +787,7 @@ typedef enum
    WAIT_EVENT_MQ_RECEIVE,
    WAIT_EVENT_MQ_SEND,
    WAIT_EVENT_PARALLEL_FINISH,
+   WAIT_EVENT_PARALLEL_BITMAP_SCAN,
    WAIT_EVENT_SAFE_SNAPSHOT,
    WAIT_EVENT_SYNC_REP
 } WaitEventIPC;
index 75558d05e0816244de30a42100a8cfbfd8308593..290b735b6b59527156b546e4f15b694fef0ee19c 100644 (file)
@@ -169,6 +169,25 @@ select  count(*) from tenk1 where thousand > 95;
 
 reset enable_seqscan;
 reset enable_bitmapscan;
+-- test parallel bitmap heap scan.
+set enable_seqscan to off;
+set enable_indexscan to off;
+explain (costs off)
+   select  count((unique1)) from tenk1 where hundred > 1;
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 4
+         ->  Partial Aggregate
+               ->  Parallel Bitmap Heap Scan on tenk1
+                     Recheck Cond: (hundred > 1)
+                     ->  Bitmap Index Scan on tenk1_hundred
+                           Index Cond: (hundred > 1)
+(8 rows)
+
+reset enable_seqscan;
+reset enable_indexscan;
 -- test parallel merge join path.
 set enable_hashjoin to off;
 set enable_nestloop to off;
index ebdae7e9391c5bda7e85b8d9b0324042d2b0cf5f..80412b990d24c8cd1dd683e1a2a3d72a35989dd5 100644 (file)
@@ -64,6 +64,16 @@ select  count(*) from tenk1 where thousand > 95;
 reset enable_seqscan;
 reset enable_bitmapscan;
 
+-- test parallel bitmap heap scan.
+set enable_seqscan to off;
+set enable_indexscan to off;
+
+explain (costs off)
+   select  count((unique1)) from tenk1 where hundred > 1;
+
+reset enable_seqscan;
+reset enable_indexscan;
+
 -- test parallel merge join path.
 set enable_hashjoin to off;
 set enable_nestloop to off;