Better parallel EXPLAIN.
authorRobert Haas <[email protected]>
Wed, 2 Dec 2015 19:02:42 +0000 (14:02 -0500)
committerRobert Haas <[email protected]>
Wed, 2 Dec 2015 19:05:36 +0000 (14:05 -0500)
src/backend/commands/explain.c
src/backend/executor/execParallel.c
src/include/executor/instrument.h
src/include/nodes/execnodes.h

index 183d3d9bcb77af298986812f84bc4502214cb563..12dae778a76493f46a4df0ab795789ca315beca8 100644 (file)
@@ -103,6 +103,7 @@ static void show_instrumentation_count(const char *qlabel, int which,
                                                   PlanState *planstate, ExplainState *es);
 static void show_foreignscan_info(ForeignScanState *fsstate, ExplainState *es);
 static const char *explain_get_index_name(Oid indexId);
+static void show_buffer_usage(ExplainState *es, const BufferUsage *usage);
 static void ExplainIndexScanDetails(Oid indexid, ScanDirection indexorderdir,
                                                ExplainState *es);
 static void ExplainScanTarget(Scan *plan, ExplainState *es);
@@ -1437,108 +1438,73 @@ ExplainNode(PlanState *planstate, List *ancestors,
 
        /* Show buffer usage */
        if (es->buffers && planstate->instrument)
+               show_buffer_usage(es, &planstate->instrument->bufusage);
+
+       /* Show worker detail */
+       if (es->analyze && es->verbose && planstate->worker_instrument)
        {
-               const BufferUsage *usage = &planstate->instrument->bufusage;
+               WorkerInstrumentation *w = planstate->worker_instrument;
+               bool            opened_group = false;
+               int                     n;
 
-               if (es->format == EXPLAIN_FORMAT_TEXT)
+               for (n = 0; n < w->num_workers; ++n)
                {
-                       bool            has_shared = (usage->shared_blks_hit > 0 ||
-                                                                         usage->shared_blks_read > 0 ||
-                                                                         usage->shared_blks_dirtied > 0 ||
-                                                                         usage->shared_blks_written > 0);
-                       bool            has_local = (usage->local_blks_hit > 0 ||
-                                                                        usage->local_blks_read > 0 ||
-                                                                        usage->local_blks_dirtied > 0 ||
-                                                                        usage->local_blks_written > 0);
-                       bool            has_temp = (usage->temp_blks_read > 0 ||
-                                                                       usage->temp_blks_written > 0);
-                       bool            has_timing = (!INSTR_TIME_IS_ZERO(usage->blk_read_time) ||
-                                                                !INSTR_TIME_IS_ZERO(usage->blk_write_time));
+                       Instrumentation *instrument = &w->instrument[n];
+                       double          nloops = instrument->nloops;
+                       double          startup_sec;
+                       double          total_sec;
+                       double          rows;
+
+                       if (nloops <= 0)
+                               continue;
+                       startup_sec = 1000.0 * instrument->startup / nloops;
+                       total_sec = 1000.0 * instrument->total / nloops;
+                       rows = instrument->ntuples / nloops;
 
-                       /* Show only positive counter values. */
-                       if (has_shared || has_local || has_temp)
+                       if (es->format == EXPLAIN_FORMAT_TEXT)
                        {
                                appendStringInfoSpaces(es->str, es->indent * 2);
-                               appendStringInfoString(es->str, "Buffers:");
-
-                               if (has_shared)
-                               {
-                                       appendStringInfoString(es->str, " shared");
-                                       if (usage->shared_blks_hit > 0)
-                                               appendStringInfo(es->str, " hit=%ld",
-                                                                                usage->shared_blks_hit);
-                                       if (usage->shared_blks_read > 0)
-                                               appendStringInfo(es->str, " read=%ld",
-                                                                                usage->shared_blks_read);
-                                       if (usage->shared_blks_dirtied > 0)
-                                               appendStringInfo(es->str, " dirtied=%ld",
-                                                                                usage->shared_blks_dirtied);
-                                       if (usage->shared_blks_written > 0)
-                                               appendStringInfo(es->str, " written=%ld",
-                                                                                usage->shared_blks_written);
-                                       if (has_local || has_temp)
-                                               appendStringInfoChar(es->str, ',');
-                               }
-                               if (has_local)
+                               appendStringInfo(es->str, "Worker %d: ", n);
+                               if (es->timing)
+                                       appendStringInfo(es->str,
+                                                       "actual time=%.3f..%.3f rows=%.0f loops=%.0f\n",
+                                                                startup_sec, total_sec, rows, nloops);
+                               else
+                                       appendStringInfo(es->str,
+                                                                        "actual rows=%.0f loops=%.0f\n",
+                                                                        rows, nloops);
+                               es->indent++;
+                               if (es->buffers)
+                                       show_buffer_usage(es, &instrument->bufusage);
+                               es->indent--;
+                       }
+                       else
+                       {
+                               if (!opened_group)
                                {
-                                       appendStringInfoString(es->str, " local");
-                                       if (usage->local_blks_hit > 0)
-                                               appendStringInfo(es->str, " hit=%ld",
-                                                                                usage->local_blks_hit);
-                                       if (usage->local_blks_read > 0)
-                                               appendStringInfo(es->str, " read=%ld",
-                                                                                usage->local_blks_read);
-                                       if (usage->local_blks_dirtied > 0)
-                                               appendStringInfo(es->str, " dirtied=%ld",
-                                                                                usage->local_blks_dirtied);
-                                       if (usage->local_blks_written > 0)
-                                               appendStringInfo(es->str, " written=%ld",
-                                                                                usage->local_blks_written);
-                                       if (has_temp)
-                                               appendStringInfoChar(es->str, ',');
+                                       ExplainOpenGroup("Workers", "Workers", false, es);
+                                       opened_group = true;
                                }
-                               if (has_temp)
+                               ExplainOpenGroup("Worker", NULL, true, es);
+                               ExplainPropertyInteger("Worker Number", n, es);
+
+                               if (es->timing)
                                {
-                                       appendStringInfoString(es->str, " temp");
-                                       if (usage->temp_blks_read > 0)
-                                               appendStringInfo(es->str, " read=%ld",
-                                                                                usage->temp_blks_read);
-                                       if (usage->temp_blks_written > 0)
-                                               appendStringInfo(es->str, " written=%ld",
-                                                                                usage->temp_blks_written);
+                                       ExplainPropertyFloat("Actual Startup Time", startup_sec, 3, es);
+                                       ExplainPropertyFloat("Actual Total Time", total_sec, 3, es);
                                }
-                               appendStringInfoChar(es->str, '\n');
-                       }
+                               ExplainPropertyFloat("Actual Rows", rows, 0, es);
+                               ExplainPropertyFloat("Actual Loops", nloops, 0, es);
 
-                       /* As above, show only positive counter values. */
-                       if (has_timing)
-                       {
-                               appendStringInfoSpaces(es->str, es->indent * 2);
-                               appendStringInfoString(es->str, "I/O Timings:");
-                               if (!INSTR_TIME_IS_ZERO(usage->blk_read_time))
-                                       appendStringInfo(es->str, " read=%0.3f",
-                                                         INSTR_TIME_GET_MILLISEC(usage->blk_read_time));
-                               if (!INSTR_TIME_IS_ZERO(usage->blk_write_time))
-                                       appendStringInfo(es->str, " write=%0.3f",
-                                                        INSTR_TIME_GET_MILLISEC(usage->blk_write_time));
-                               appendStringInfoChar(es->str, '\n');
+                               if (es->buffers)
+                                       show_buffer_usage(es, &instrument->bufusage);
+
+                               ExplainCloseGroup("Worker", NULL, true, es);
                        }
                }
-               else
-               {
-                       ExplainPropertyLong("Shared Hit Blocks", usage->shared_blks_hit, es);
-                       ExplainPropertyLong("Shared Read Blocks", usage->shared_blks_read, es);
-                       ExplainPropertyLong("Shared Dirtied Blocks", usage->shared_blks_dirtied, es);
-                       ExplainPropertyLong("Shared Written Blocks", usage->shared_blks_written, es);
-                       ExplainPropertyLong("Local Hit Blocks", usage->local_blks_hit, es);
-                       ExplainPropertyLong("Local Read Blocks", usage->local_blks_read, es);
-                       ExplainPropertyLong("Local Dirtied Blocks", usage->local_blks_dirtied, es);
-                       ExplainPropertyLong("Local Written Blocks", usage->local_blks_written, es);
-                       ExplainPropertyLong("Temp Read Blocks", usage->temp_blks_read, es);
-                       ExplainPropertyLong("Temp Written Blocks", usage->temp_blks_written, es);
-                       ExplainPropertyFloat("I/O Read Time", INSTR_TIME_GET_MILLISEC(usage->blk_read_time), 3, es);
-                       ExplainPropertyFloat("I/O Write Time", INSTR_TIME_GET_MILLISEC(usage->blk_write_time), 3, es);
-               }
+
+               if (opened_group)
+                       ExplainCloseGroup("Workers", "Workers", false, es);
        }
 
        /* Get ready to display the child plans */
@@ -2276,6 +2242,113 @@ explain_get_index_name(Oid indexId)
        return result;
 }
 
+/*
+ * Show buffer usage details.
+ */
+static void
+show_buffer_usage(ExplainState *es, const BufferUsage *usage)
+{
+       if (es->format == EXPLAIN_FORMAT_TEXT)
+       {
+               bool            has_shared = (usage->shared_blks_hit > 0 ||
+                                                                 usage->shared_blks_read > 0 ||
+                                                                 usage->shared_blks_dirtied > 0 ||
+                                                                 usage->shared_blks_written > 0);
+               bool            has_local = (usage->local_blks_hit > 0 ||
+                                                                usage->local_blks_read > 0 ||
+                                                                usage->local_blks_dirtied > 0 ||
+                                                                usage->local_blks_written > 0);
+               bool            has_temp = (usage->temp_blks_read > 0 ||
+                                                               usage->temp_blks_written > 0);
+               bool            has_timing = (!INSTR_TIME_IS_ZERO(usage->blk_read_time) ||
+                                                                !INSTR_TIME_IS_ZERO(usage->blk_write_time));
+
+               /* Show only positive counter values. */
+               if (has_shared || has_local || has_temp)
+               {
+                       appendStringInfoSpaces(es->str, es->indent * 2);
+                       appendStringInfoString(es->str, "Buffers:");
+
+                       if (has_shared)
+                       {
+                               appendStringInfoString(es->str, " shared");
+                               if (usage->shared_blks_hit > 0)
+                                       appendStringInfo(es->str, " hit=%ld",
+                                                                        usage->shared_blks_hit);
+                               if (usage->shared_blks_read > 0)
+                                       appendStringInfo(es->str, " read=%ld",
+                                                                        usage->shared_blks_read);
+                               if (usage->shared_blks_dirtied > 0)
+                                       appendStringInfo(es->str, " dirtied=%ld",
+                                                                        usage->shared_blks_dirtied);
+                               if (usage->shared_blks_written > 0)
+                                       appendStringInfo(es->str, " written=%ld",
+                                                                        usage->shared_blks_written);
+                               if (has_local || has_temp)
+                                       appendStringInfoChar(es->str, ',');
+                       }
+                       if (has_local)
+                       {
+                               appendStringInfoString(es->str, " local");
+                               if (usage->local_blks_hit > 0)
+                                       appendStringInfo(es->str, " hit=%ld",
+                                                                        usage->local_blks_hit);
+                               if (usage->local_blks_read > 0)
+                                       appendStringInfo(es->str, " read=%ld",
+                                                                        usage->local_blks_read);
+                               if (usage->local_blks_dirtied > 0)
+                                       appendStringInfo(es->str, " dirtied=%ld",
+                                                                        usage->local_blks_dirtied);
+                               if (usage->local_blks_written > 0)
+                                       appendStringInfo(es->str, " written=%ld",
+                                                                        usage->local_blks_written);
+                               if (has_temp)
+                                       appendStringInfoChar(es->str, ',');
+                       }
+                       if (has_temp)
+                       {
+                               appendStringInfoString(es->str, " temp");
+                               if (usage->temp_blks_read > 0)
+                                       appendStringInfo(es->str, " read=%ld",
+                                                                        usage->temp_blks_read);
+                               if (usage->temp_blks_written > 0)
+                                       appendStringInfo(es->str, " written=%ld",
+                                                                        usage->temp_blks_written);
+                       }
+                       appendStringInfoChar(es->str, '\n');
+               }
+
+               /* As above, show only positive counter values. */
+               if (has_timing)
+               {
+                       appendStringInfoSpaces(es->str, es->indent * 2);
+                       appendStringInfoString(es->str, "I/O Timings:");
+                       if (!INSTR_TIME_IS_ZERO(usage->blk_read_time))
+                               appendStringInfo(es->str, " read=%0.3f",
+                                                 INSTR_TIME_GET_MILLISEC(usage->blk_read_time));
+                       if (!INSTR_TIME_IS_ZERO(usage->blk_write_time))
+                               appendStringInfo(es->str, " write=%0.3f",
+                                                INSTR_TIME_GET_MILLISEC(usage->blk_write_time));
+                       appendStringInfoChar(es->str, '\n');
+               }
+       }
+       else
+       {
+               ExplainPropertyLong("Shared Hit Blocks", usage->shared_blks_hit, es);
+               ExplainPropertyLong("Shared Read Blocks", usage->shared_blks_read, es);
+               ExplainPropertyLong("Shared Dirtied Blocks", usage->shared_blks_dirtied, es);
+               ExplainPropertyLong("Shared Written Blocks", usage->shared_blks_written, es);
+               ExplainPropertyLong("Local Hit Blocks", usage->local_blks_hit, es);
+               ExplainPropertyLong("Local Read Blocks", usage->local_blks_read, es);
+               ExplainPropertyLong("Local Dirtied Blocks", usage->local_blks_dirtied, es);
+               ExplainPropertyLong("Local Written Blocks", usage->local_blks_written, es);
+               ExplainPropertyLong("Temp Read Blocks", usage->temp_blks_read, es);
+               ExplainPropertyLong("Temp Written Blocks", usage->temp_blks_written, es);
+               ExplainPropertyFloat("I/O Read Time", INSTR_TIME_GET_MILLISEC(usage->blk_read_time), 3, es);
+               ExplainPropertyFloat("I/O Write Time", INSTR_TIME_GET_MILLISEC(usage->blk_write_time), 3, es);
+       }
+}
+
 /*
  * Add some additional details about an IndexScan or IndexOnlyScan
  */
index 6730037710912edc917913be650a820cfe6c2a91..641c1bc15434d51c017b16b369cf35b3143dbb98 100644 (file)
 
 #define PARALLEL_TUPLE_QUEUE_SIZE              65536
 
-/* DSM structure for accumulating per-PlanState instrumentation. */
-typedef struct SharedPlanStateInstrumentation
-{
-       int plan_node_id;
-       slock_t mutex;
-       Instrumentation instr;
-} SharedPlanStateInstrumentation;
-
 /* DSM structure for accumulating per-PlanState instrumentation. */
 struct SharedExecutorInstrumentation
 {
        int instrument_options;
-       int ps_ninstrument;                     /* # of ps_instrument structures following */
-       SharedPlanStateInstrumentation ps_instrument[FLEXIBLE_ARRAY_MEMBER];
+       int instrument_offset;          /* offset of first Instrumentation struct */
+       int num_workers;                                                        /* # of workers */
+       int num_plan_nodes;                                                     /* # of plan nodes */
+       int plan_node_id[FLEXIBLE_ARRAY_MEMBER];        /* array of plan node IDs */
+       /* array of num_plan_nodes * num_workers Instrumentation objects follows */
 };
+#define GetInstrumentationArray(sei) \
+       (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
+        (Instrumentation *) (((char *) sei) + sei->instrument_offset))
 
 /* Context object for ExecParallelEstimate. */
 typedef struct ExecParallelEstimateContext
@@ -196,18 +194,10 @@ ExecParallelInitializeDSM(PlanState *planstate,
        if (planstate == NULL)
                return false;
 
-       /* If instrumentation is enabled, initialize array slot for this node. */
+       /* If instrumentation is enabled, initialize slot for this node. */
        if (d->instrumentation != NULL)
-       {
-               SharedPlanStateInstrumentation *instrumentation;
-
-               instrumentation = &d->instrumentation->ps_instrument[d->nnodes];
-               Assert(d->nnodes < d->instrumentation->ps_ninstrument);
-               instrumentation->plan_node_id = planstate->plan->plan_node_id;
-               SpinLockInit(&instrumentation->mutex);
-               InstrInit(&instrumentation->instr,
-                                 d->instrumentation->instrument_options);
-       }
+               d->instrumentation->plan_node_id[d->nnodes] =
+                       planstate->plan->plan_node_id;
 
        /* Count this node. */
        d->nnodes++;
@@ -307,6 +297,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
        int                     pstmt_len;
        int                     param_len;
        int                     instrumentation_len = 0;
+       int                     instrument_offset;
 
        /* Allocate object for return value. */
        pei = palloc0(sizeof(ParallelExecutorInfo));
@@ -364,8 +355,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
        if (estate->es_instrument)
        {
                instrumentation_len =
-                       offsetof(SharedExecutorInstrumentation, ps_instrument)
-                       + sizeof(SharedPlanStateInstrumentation) * e.nnodes;
+                       offsetof(SharedExecutorInstrumentation, plan_node_id)
+                       + sizeof(int) * e.nnodes;
+               instrumentation_len = MAXALIGN(instrumentation_len);
+               instrument_offset = instrumentation_len;
+               instrumentation_len += sizeof(Instrumentation) * e.nnodes * nworkers;
                shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
                shm_toc_estimate_keys(&pcxt->estimator, 1);
        }
@@ -407,9 +401,17 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
         */
        if (estate->es_instrument)
        {
+               Instrumentation *instrument;
+               int             i;
+
                instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
                instrumentation->instrument_options = estate->es_instrument;
-               instrumentation->ps_ninstrument = e.nnodes;
+               instrumentation->instrument_offset = instrument_offset;
+               instrumentation->num_workers = nworkers;
+               instrumentation->num_plan_nodes = e.nnodes;
+               instrument = GetInstrumentationArray(instrumentation);
+               for (i = 0; i < nworkers * e.nnodes; ++i)
+                       InstrInit(&instrument[i], estate->es_instrument);
                shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
                                           instrumentation);
                pei->instrumentation = instrumentation;
@@ -444,20 +446,31 @@ static bool
 ExecParallelRetrieveInstrumentation(PlanState *planstate,
                                                  SharedExecutorInstrumentation *instrumentation)
 {
+       Instrumentation *instrument;
        int             i;
+       int             n;
+       int             ibytes;
        int             plan_node_id = planstate->plan->plan_node_id;
-       SharedPlanStateInstrumentation *ps_instrument;
 
        /* Find the instumentation for this node. */
-       for (i = 0; i < instrumentation->ps_ninstrument; ++i)
-               if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id)
+       for (i = 0; i < instrumentation->num_plan_nodes; ++i)
+               if (instrumentation->plan_node_id[i] == plan_node_id)
                        break;
-       if (i >= instrumentation->ps_ninstrument)
+       if (i >= instrumentation->num_plan_nodes)
                elog(ERROR, "plan node %d not found", plan_node_id);
 
-       /* No need to acquire the spinlock here; workers have exited already. */
-       ps_instrument = &instrumentation->ps_instrument[i];
-       InstrAggNode(planstate->instrument, &ps_instrument->instr);
+       /* Accumulate the statistics from all workers. */
+       instrument = GetInstrumentationArray(instrumentation);
+       instrument += i * instrumentation->num_workers;
+       for (n = 0; n < instrumentation->num_workers; ++n)
+               InstrAggNode(planstate->instrument, &instrument[n]);
+
+       /* Also store the per-worker detail. */
+       ibytes = instrumentation->num_workers * sizeof(Instrumentation);
+       planstate->worker_instrument =
+               palloc(offsetof(WorkerInstrumentation, instrument) + ibytes);
+       planstate->worker_instrument->num_workers = instrumentation->num_workers;
+       memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
 
        return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
                                                                 instrumentation);
@@ -568,7 +581,9 @@ ExecParallelReportInstrumentation(PlanState *planstate,
 {
        int             i;
        int             plan_node_id = planstate->plan->plan_node_id;
-       SharedPlanStateInstrumentation *ps_instrument;
+       Instrumentation *instrument;
+
+       InstrEndLoop(planstate->instrument);
 
        /*
         * If we shuffled the plan_node_id values in ps_instrument into sorted
@@ -576,20 +591,21 @@ ExecParallelReportInstrumentation(PlanState *planstate,
         * if we're pushing down sufficiently large plan trees.  For now, do it
         * the slow, dumb way.
         */
-       for (i = 0; i < instrumentation->ps_ninstrument; ++i)
-               if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id)
+       for (i = 0; i < instrumentation->num_plan_nodes; ++i)
+               if (instrumentation->plan_node_id[i] == plan_node_id)
                        break;
-       if (i >= instrumentation->ps_ninstrument)
+       if (i >= instrumentation->num_plan_nodes)
                elog(ERROR, "plan node %d not found", plan_node_id);
 
        /*
-        * There's one SharedPlanStateInstrumentation per plan_node_id, so we
-        * must use a spinlock in case multiple workers report at the same time.
+        * Add our statistics to the per-node, per-worker totals.  It's possible
+        * that this could happen more than once if we relaunched workers.
         */
-       ps_instrument = &instrumentation->ps_instrument[i];
-       SpinLockAcquire(&ps_instrument->mutex);
-       InstrAggNode(&ps_instrument->instr, planstate->instrument);
-       SpinLockRelease(&ps_instrument->mutex);
+       instrument = GetInstrumentationArray(instrumentation);
+       instrument += i * instrumentation->num_workers;
+       Assert(IsParallelWorker());
+       Assert(ParallelWorkerNumber < instrumentation->num_workers);
+       InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
 
        return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
                                                                 instrumentation);
index f28e56ce48c36c1bd00f691527d234006bf970af..52d3c8182825b5c39693937a6a8d9a06efa17721 100644 (file)
@@ -63,6 +63,12 @@ typedef struct Instrumentation
        BufferUsage bufusage;           /* Total buffer usage */
 } Instrumentation;
 
+typedef struct WorkerInstrumentation
+{
+       int                     num_workers;    /* # of structures that follow */
+       Instrumentation instrument[FLEXIBLE_ARRAY_MEMBER];
+} WorkerInstrumentation;
+
 extern PGDLLIMPORT BufferUsage pgBufferUsage;
 
 extern Instrumentation *InstrAlloc(int n, int instrument_options);
index eb3591a663f5d316242f9fa669c582bf76c1f339..5ccf4700afd113273b5e9fa90d8be3d4485c3d18 100644 (file)
@@ -1029,6 +1029,7 @@ typedef struct PlanState
                                                                 * top-level plan */
 
        Instrumentation *instrument;    /* Optional runtime stats for this node */
+       WorkerInstrumentation *worker_instrument; /* per-worker instrumentation */
 
        /*
         * Common structural data for all Plan types.  These links to subsidiary