Avoid aggregating worker instrumentation multiple times.
authorRobert Haas <[email protected]>
Wed, 18 Nov 2015 17:35:25 +0000 (12:35 -0500)
committerRobert Haas <[email protected]>
Wed, 18 Nov 2015 17:35:25 +0000 (12:35 -0500)
Amit Kapila, per design ideas from me.

src/backend/executor/execParallel.c
src/backend/executor/nodeGather.c
src/include/executor/execParallel.h

index eae13c5647752c79a42eff96406ed50b014edae5..6730037710912edc917913be650a820cfe6c2a91 100644 (file)
@@ -277,13 +277,15 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
 }
 
 /*
- * Re-initialize the response queues for backend workers to return tuples
- * to the main backend and start the workers.
+ * Re-initialize the parallel executor info such that it can be reused by
+ * workers.
  */
-shm_mq_handle **
-ExecParallelReinitializeTupleQueues(ParallelContext *pcxt)
+void
+ExecParallelReinitialize(ParallelExecutorInfo *pei)
 {
-       return ExecParallelSetupTupleQueues(pcxt, true);
+       ReinitializeParallelDSM(pei->pcxt);
+       pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
+       pei->finished = false;
 }
 
 /*
@@ -308,6 +310,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
 
        /* Allocate object for return value. */
        pei = palloc0(sizeof(ParallelExecutorInfo));
+       pei->finished = false;
        pei->planstate = planstate;
 
        /* Fix up and serialize plan to be sent to workers. */
@@ -469,6 +472,9 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
 {
        int             i;
 
+       if (pei->finished)
+               return;
+
        /* First, wait for the workers to finish. */
        WaitForParallelWorkersToFinish(pei->pcxt);
 
@@ -480,6 +486,8 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
        if (pei->instrumentation)
                ExecParallelRetrieveInstrumentation(pei->planstate,
                                                                                        pei->instrumentation);
+
+       pei->finished = true;
 }
 
 /*
index b368b48d01d839c76b6a871adc44595ddcc9aa96..b6e82d1664f83dedcdf1e1eb8e8e35071ffcff76 100644 (file)
@@ -456,11 +456,7 @@ ExecReScanGather(GatherState *node)
        node->initialized = false;
 
        if (node->pei)
-       {
-               ReinitializeParallelDSM(node->pei->pcxt);
-               node->pei->tqueue =
-                               ExecParallelReinitializeTupleQueues(node->pei->pcxt);
-       }
+               ExecParallelReinitialize(node->pei);
 
        ExecReScan(node->ps.lefttree);
 }
index 23c29ebb9027de0c40794d82b951e09eda5d0958..b43af1dd2b3c220110d4d819ca61e207b541a0c4 100644 (file)
@@ -27,12 +27,13 @@ typedef struct ParallelExecutorInfo
        BufferUsage *buffer_usage;
        SharedExecutorInstrumentation *instrumentation;
        shm_mq_handle **tqueue;
+       bool    finished;
 }      ParallelExecutorInfo;
 
 extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
                                         EState *estate, int nworkers);
 extern void ExecParallelFinish(ParallelExecutorInfo *pei);
 extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
-extern shm_mq_handle **ExecParallelReinitializeTupleQueues(ParallelContext *pcxt);
+extern void ExecParallelReinitialize(ParallelExecutorInfo *pei);
 
 #endif   /* EXECPARALLEL_H */