Thunk. parallel_count
authorRobert Haas <[email protected]>
Thu, 15 Oct 2015 21:45:22 +0000 (17:45 -0400)
committerRobert Haas <[email protected]>
Thu, 15 Oct 2015 21:45:22 +0000 (17:45 -0400)
contrib/parallel_dummy/Makefile [new file with mode: 0644]
contrib/parallel_dummy/parallel_dummy--1.0.sql [new file with mode: 0644]
contrib/parallel_dummy/parallel_dummy.c [new file with mode: 0644]
contrib/parallel_dummy/parallel_dummy.control [new file with mode: 0644]
src/backend/access/heap/heapam.c
src/include/access/heapam.h
src/include/access/relscan.h

diff --git a/contrib/parallel_dummy/Makefile b/contrib/parallel_dummy/Makefile
new file mode 100644 (file)
index 0000000..de00f50
--- /dev/null
@@ -0,0 +1,19 @@
+MODULE_big = parallel_dummy
+OBJS = parallel_dummy.o $(WIN32RES)
+PGFILEDESC = "parallel_dummy - dummy use of parallel infrastructure"
+
+EXTENSION = parallel_dummy
+DATA = parallel_dummy--1.0.sql
+
+REGRESS = parallel_dummy
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/parallel_dummy
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/parallel_dummy/parallel_dummy--1.0.sql b/contrib/parallel_dummy/parallel_dummy--1.0.sql
new file mode 100644 (file)
index 0000000..d49bd0f
--- /dev/null
@@ -0,0 +1,7 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION parallel_dummy" to load this file. \quit
+
+CREATE FUNCTION parallel_count(rel pg_catalog.regclass,
+                                                         nworkers pg_catalog.int4)
+    RETURNS pg_catalog.int8 STRICT
+       AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/contrib/parallel_dummy/parallel_dummy.c b/contrib/parallel_dummy/parallel_dummy.c
new file mode 100644 (file)
index 0000000..84c863c
--- /dev/null
@@ -0,0 +1,137 @@
+/*--------------------------------------------------------------------------
+ *
+ * parallel_dummy.c
+ *             Test harness code for parallel mode code.
+ *
+ * Copyright (C) 2013-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *             contrib/parallel_dummy/parallel_dummy.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/parallel.h"
+#include "access/relscan.h"
+#include "access/xact.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/snapmgr.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(parallel_count);
+
+#define                TOC_SCAN_KEY                    1
+#define                TOC_RESULT_KEY                  2
+
+void           _PG_init(void);
+void           count_worker_main(dsm_segment *seg, shm_toc *toc);
+
+static void count_helper(Relation rel, ParallelHeapScanDesc pscan,
+                                                int64 *resultp);
+
+Datum
+parallel_count(PG_FUNCTION_ARGS)
+{
+       Oid                     relid = PG_GETARG_OID(0);
+       int32           nworkers = PG_GETARG_INT32(1);
+       int64      *resultp;
+       int64           result;
+       ParallelContext *pcxt;
+       ParallelHeapScanDesc pscan;
+       Relation        rel;
+       Size            sz;
+
+       if (nworkers < 0)
+               ereport(ERROR,
+                               (errmsg("number of parallel workers must be non-negative")));
+
+       rel = relation_open(relid, AccessShareLock);
+
+       EnterParallelMode();
+
+       pcxt = CreateParallelContextForExternalFunction("parallel_dummy",
+                                                                                        "count_worker_main",
+                                                                                        nworkers);
+       sz = heap_parallelscan_estimate(GetActiveSnapshot());
+       shm_toc_estimate_chunk(&pcxt->estimator, sz);
+       shm_toc_estimate_chunk(&pcxt->estimator, sizeof(int64));
+       shm_toc_estimate_keys(&pcxt->estimator, 2);
+       InitializeParallelDSM(pcxt);
+       pscan = shm_toc_allocate(pcxt->toc, sz);
+       heap_parallelscan_initialize(pscan, rel, GetActiveSnapshot());
+       shm_toc_insert(pcxt->toc, TOC_SCAN_KEY, pscan);
+       resultp = shm_toc_allocate(pcxt->toc, sizeof(int64));
+       shm_toc_insert(pcxt->toc, TOC_RESULT_KEY, resultp);
+
+       LaunchParallelWorkers(pcxt);
+
+       /* here's where we do the "real work" ... */
+       count_helper(rel, pscan, resultp);
+
+       WaitForParallelWorkersToFinish(pcxt);
+
+       result = *resultp;
+
+       DestroyParallelContext(pcxt);
+
+       relation_close(rel, AccessShareLock);
+
+       ExitParallelMode();
+
+       PG_RETURN_INT64(result);
+}
+
+void
+count_worker_main(dsm_segment *seg, shm_toc *toc)
+{
+       ParallelHeapScanDesc    pscan;
+       Relation        rel;
+       int64      *resultp;
+
+       pscan = shm_toc_lookup(toc, TOC_SCAN_KEY);
+       resultp = shm_toc_lookup(toc, TOC_RESULT_KEY);
+       Assert(pscan != NULL && resultp != NULL);
+
+       rel = relation_open(pscan->phs_relid, AccessShareLock);
+       count_helper(rel, pscan, resultp);
+       relation_close(rel, AccessShareLock);
+}
+
+static void
+count_helper(Relation rel, ParallelHeapScanDesc pscan, int64 *resultp)
+{
+       int64           mytuples = 0;
+       HeapScanDesc    scan;
+       BlockNumber     firstblock = InvalidBlockNumber;
+
+       scan = heap_beginscan_parallel(rel, pscan);
+
+       for (;;)
+       {
+               HeapTuple       tup = heap_getnext(scan, ForwardScanDirection);
+
+               if (!HeapTupleIsValid(tup))
+                       break;
+               if (firstblock == InvalidBlockNumber)
+                       firstblock = scan->rs_cblock;
+
+               ++mytuples;
+       }
+
+       heap_endscan(scan);
+
+       SpinLockAcquire(&pscan->phs_mutex); /* dirty hack */
+       *resultp += mytuples;
+       SpinLockRelease(&pscan->phs_mutex);
+
+       elog(NOTICE, "PID %d counted " INT64_FORMAT " tuples starting at block %u",
+               MyProcPid, mytuples, firstblock);
+}
diff --git a/contrib/parallel_dummy/parallel_dummy.control b/contrib/parallel_dummy/parallel_dummy.control
new file mode 100644 (file)
index 0000000..90bae3f
--- /dev/null
@@ -0,0 +1,4 @@
+comment = 'Dummy parallel code'
+default_version = '1.0'
+module_pathname = '$libdir/parallel_dummy'
+relocatable = true
index bcf987124fded27ba94c4b174ad002223fdba6c6..164b6b55eb835397acef7d5d4cdab2433f604706 100644 (file)
@@ -63,6 +63,7 @@
 #include "storage/predicate.h"
 #include "storage/procarray.h"
 #include "storage/smgr.h"
+#include "storage/spin.h"
 #include "storage/standby.h"
 #include "utils/datum.h"
 #include "utils/inval.h"
@@ -80,12 +81,14 @@ bool                synchronize_seqscans = true;
 static HeapScanDesc heap_beginscan_internal(Relation relation,
                                                Snapshot snapshot,
                                                int nkeys, ScanKey key,
+                                               ParallelHeapScanDesc parallel_scan,
                                                bool allow_strat,
                                                bool allow_sync,
                                                bool allow_pagemode,
                                                bool is_bitmapscan,
                                                bool is_samplescan,
                                                bool temp_snap);
+static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
 static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
                                        TransactionId xid, CommandId cid, int options);
 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
@@ -226,7 +229,10 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
         * results for a non-MVCC snapshot, the caller must hold some higher-level
         * lock that ensures the interesting tuple(s) won't change.)
         */
-       scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
+       if (scan->rs_parallel != NULL)
+               scan->rs_nblocks = scan->rs_parallel->phs_nblocks;
+       else
+               scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
 
        /*
         * If the table is large relative to NBuffers, use a bulk-read access
@@ -237,7 +243,8 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
         * behaviors, independently of the size of the table; also there is a GUC
         * variable that can disable synchronized scanning.)
         *
-        * During a rescan, don't make a new strategy object if we don't have to.
+        * Note that heap_parallelscan_initialize has a very similar test; if you
+        * change this, consider changing that one, too.
         */
        if (!RelationUsesLocalBuffers(scan->rs_rd) &&
                scan->rs_nblocks > NBuffers / 4)
@@ -250,6 +257,7 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
 
        if (allow_strat)
        {
+               /* During a rescan, keep the previous strategy object. */
                if (scan->rs_strategy == NULL)
                        scan->rs_strategy = GetAccessStrategy(BAS_BULKREAD);
        }
@@ -260,7 +268,12 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
                scan->rs_strategy = NULL;
        }
 
-       if (keep_startblock)
+       if (scan->rs_parallel != NULL)
+       {
+               /* For parallel scan, believe whatever ParallelHeapScanDesc says. */
+               scan->rs_syncscan = scan->rs_parallel->phs_syncscan;
+       }
+       else if (keep_startblock)
        {
                /*
                 * When rescanning, we want to keep the previous startblock setting,
@@ -496,7 +509,20 @@ heapgettup(HeapScanDesc scan,
                                tuple->t_data = NULL;
                                return;
                        }
-                       page = scan->rs_startblock; /* first page */
+                       if (scan->rs_parallel != NULL)
+                       {
+                               page = heap_parallelscan_nextpage(scan);
+
+                               /* Other processes might have already finished the scan. */
+                               if (page == InvalidBlockNumber)
+                               {
+                                       Assert(!BufferIsValid(scan->rs_cbuf));
+                                       tuple->t_data = NULL;
+                                       return;
+                               }
+                       }
+                       else
+                               page = scan->rs_startblock;             /* first page */
                        heapgetpage(scan, page);
                        lineoff = FirstOffsetNumber;            /* first offnum */
                        scan->rs_inited = true;
@@ -519,6 +545,9 @@ heapgettup(HeapScanDesc scan,
        }
        else if (backward)
        {
+               /* backward parallel scan not supported */
+               Assert(scan->rs_parallel == NULL);
+
                if (!scan->rs_inited)
                {
                        /*
@@ -671,11 +700,20 @@ heapgettup(HeapScanDesc scan,
                }
                else
                {
-                       page++;
-                       if (page >= scan->rs_nblocks)
-                               page = 0;
-                       finished = (page == scan->rs_startblock) ||
-                               (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false);
+                       if (scan->rs_parallel != NULL)
+                       {
+                               page = heap_parallelscan_nextpage(scan);
+                               finished = (page == InvalidBlockNumber);
+                       }
+                       else
+                       {
+                               page++;
+                               if (page >= scan->rs_nblocks)
+                                       page = 0;
+
+                               finished = (page == scan->rs_startblock) ||
+                                       (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false);
+                       }
 
                        /*
                         * Report our new scan position for synchronization purposes. We
@@ -773,7 +811,20 @@ heapgettup_pagemode(HeapScanDesc scan,
                                tuple->t_data = NULL;
                                return;
                        }
-                       page = scan->rs_startblock; /* first page */
+                       if (scan->rs_parallel != NULL)
+                       {
+                               page = heap_parallelscan_nextpage(scan);
+
+                               /* Other processes might have already finished the scan. */
+                               if (page == InvalidBlockNumber)
+                               {
+                                       Assert(!BufferIsValid(scan->rs_cbuf));
+                                       tuple->t_data = NULL;
+                                       return;
+                               }
+                       }
+                       else
+                               page = scan->rs_startblock;             /* first page */
                        heapgetpage(scan, page);
                        lineindex = 0;
                        scan->rs_inited = true;
@@ -793,6 +844,9 @@ heapgettup_pagemode(HeapScanDesc scan,
        }
        else if (backward)
        {
+               /* backward parallel scan not supported */
+               Assert(scan->rs_parallel == NULL);
+
                if (!scan->rs_inited)
                {
                        /*
@@ -934,11 +988,20 @@ heapgettup_pagemode(HeapScanDesc scan,
                }
                else
                {
-                       page++;
-                       if (page >= scan->rs_nblocks)
-                               page = 0;
-                       finished = (page == scan->rs_startblock) ||
-                               (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false);
+                       if (scan->rs_parallel != NULL)
+                       {
+                               page = heap_parallelscan_nextpage(scan);
+                               finished = (page == InvalidBlockNumber);
+                       }
+                       else
+                       {
+                               page++;
+                               if (page >= scan->rs_nblocks)
+                                       page = 0;
+
+                               finished = (page == scan->rs_startblock) ||
+                                       (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false);
+                       }
 
                        /*
                         * Report our new scan position for synchronization purposes. We
@@ -1341,7 +1404,7 @@ HeapScanDesc
 heap_beginscan(Relation relation, Snapshot snapshot,
                           int nkeys, ScanKey key)
 {
-       return heap_beginscan_internal(relation, snapshot, nkeys, key,
+       return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
                                                                   true, true, true, false, false, false);
 }
 
@@ -1351,7 +1414,7 @@ heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
        Oid                     relid = RelationGetRelid(relation);
        Snapshot        snapshot = RegisterSnapshot(GetCatalogSnapshot(relid));
 
-       return heap_beginscan_internal(relation, snapshot, nkeys, key,
+       return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
                                                                   true, true, true, false, false, true);
 }
 
@@ -1360,7 +1423,7 @@ heap_beginscan_strat(Relation relation, Snapshot snapshot,
                                         int nkeys, ScanKey key,
                                         bool allow_strat, bool allow_sync)
 {
-       return heap_beginscan_internal(relation, snapshot, nkeys, key,
+       return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
                                                                   allow_strat, allow_sync, true,
                                                                   false, false, false);
 }
@@ -1369,7 +1432,7 @@ HeapScanDesc
 heap_beginscan_bm(Relation relation, Snapshot snapshot,
                                  int nkeys, ScanKey key)
 {
-       return heap_beginscan_internal(relation, snapshot, nkeys, key,
+       return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
                                                                   false, false, true, true, false, false);
 }
 
@@ -1378,7 +1441,7 @@ heap_beginscan_sampling(Relation relation, Snapshot snapshot,
                                                int nkeys, ScanKey key,
                                          bool allow_strat, bool allow_sync, bool allow_pagemode)
 {
-       return heap_beginscan_internal(relation, snapshot, nkeys, key,
+       return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
                                                                   allow_strat, allow_sync, allow_pagemode,
                                                                   false, true, false);
 }
@@ -1386,6 +1449,7 @@ heap_beginscan_sampling(Relation relation, Snapshot snapshot,
 static HeapScanDesc
 heap_beginscan_internal(Relation relation, Snapshot snapshot,
                                                int nkeys, ScanKey key,
+                                               ParallelHeapScanDesc parallel_scan,
                                                bool allow_strat,
                                                bool allow_sync,
                                                bool allow_pagemode,
@@ -1418,6 +1482,7 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot,
        scan->rs_allow_strat = allow_strat;
        scan->rs_allow_sync = allow_sync;
        scan->rs_temp_snap = temp_snap;
+       scan->rs_parallel = parallel_scan;
 
        /*
         * we can use page-at-a-time mode if it's an MVCC-safe snapshot
@@ -1473,6 +1538,25 @@ heap_rescan(HeapScanDesc scan,
         * reinitialize scan descriptor
         */
        initscan(scan, key, true);
+
+       /*
+        * reset parallel scan, if present
+        */
+       if (scan->rs_parallel != NULL)
+       {
+               ParallelHeapScanDesc parallel_scan;
+
+               /*
+                * Caller is responsible for making sure that all workers have
+                * finished the scan before calling this, so it really shouldn't be
+                * necessary to acquire the mutex at all.  We acquire it anyway, just
+                * to be tidy.
+                */
+               parallel_scan = scan->rs_parallel;
+               SpinLockAcquire(&parallel_scan->phs_mutex);
+               parallel_scan->phs_cblock = parallel_scan->phs_startblock;
+               SpinLockRelease(&parallel_scan->phs_mutex);
+       }
 }
 
 /* ----------------
@@ -1531,6 +1615,133 @@ heap_endscan(HeapScanDesc scan)
        pfree(scan);
 }
 
+/* ----------------
+ *             heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc
+ *
+ *             Sadly, this doesn't reduce to a constant, because the size required
+ *             to serialize the snapshot can vary.
+ * ----------------
+ */
+Size
+heap_parallelscan_estimate(Snapshot snapshot)
+{
+       return add_size(offsetof(ParallelHeapScanDescData, phs_snapshot_data),
+                                       EstimateSnapshotSpace(snapshot));
+}
+
+/* ----------------
+ *             heap_parallelscan_initialize - initialize ParallelHeapScanDesc
+ *
+ *             Must allow as many bytes of shared memory as returned by
+ *             heap_parallelscan_estimate.  Call this just once in the leader
+ *             process; then, individual workers attach via heap_beginscan_parallel.
+ * ----------------
+ */
+void
+heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
+                                                        Snapshot snapshot)
+{
+       target->phs_relid = RelationGetRelid(relation);
+       target->phs_nblocks = RelationGetNumberOfBlocks(relation);
+       /* compare phs_syncscan initialization to similar logic in initscan */
+       target->phs_syncscan = synchronize_seqscans &&
+               !RelationUsesLocalBuffers(relation) &&
+               target->phs_nblocks > NBuffers / 4;
+       SpinLockInit(&target->phs_mutex);
+       target->phs_cblock = InvalidBlockNumber;
+       target->phs_startblock = InvalidBlockNumber;
+       SerializeSnapshot(snapshot, target->phs_snapshot_data);
+}
+
+/* ----------------
+ *             heap_beginscan_parallel - join a parallel scan
+ *
+ *             Caller must hold a suitable lock on the correct relation.
+ * ----------------
+ */
+HeapScanDesc
+heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
+{
+       Snapshot        snapshot;
+
+       Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
+       snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
+       RegisterSnapshot(snapshot);
+
+       return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan,
+                                                                  true, true, true, false, false, true);
+}
+
+/* ----------------
+ *             heap_parallelscan_nextpage - get the next page to scan
+ *
+ *             Get the next page to scan.  Even if there are no pages left to scan,
+ *             another backend could have grabbed a page to scan and not yet finished
+ *             looking at it, so it doesn't follow that the scan is done when the
+ *             first backend gets an InvalidBlockNumber return.
+ * ----------------
+ */
+static BlockNumber
+heap_parallelscan_nextpage(HeapScanDesc scan)
+{
+       BlockNumber page = InvalidBlockNumber;
+       BlockNumber sync_startpage = InvalidBlockNumber;
+       ParallelHeapScanDesc parallel_scan;
+
+       Assert(scan->rs_parallel);
+       parallel_scan = scan->rs_parallel;
+
+retry:
+       /* Grab the spinlock. */
+       SpinLockAcquire(&parallel_scan->phs_mutex);
+
+       /*
+        * If the scan's startblock has not yet been initialized, we must do so
+        * now.  If this is not a synchronized scan, we just start at block 0, but
+        * if it is a synchronized scan, we must get the starting position from
+        * the synchronized scan machinery.  We can't hold the spinlock while
+        * doing that, though, so release the spinlock, get the information we
+        * need, and retry.  If nobody else has initialized the scan in the
+        * meantime, we'll fill in the value we fetched on the second time
+        * through.
+        */
+       if (parallel_scan->phs_startblock == InvalidBlockNumber)
+       {
+               if (!parallel_scan->phs_syncscan)
+                       parallel_scan->phs_startblock = 0;
+               else if (sync_startpage != InvalidBlockNumber)
+                       parallel_scan->phs_startblock = sync_startpage;
+               else
+               {
+                       SpinLockRelease(&parallel_scan->phs_mutex);
+                       sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks);
+                       goto retry;
+               }
+               parallel_scan->phs_cblock = parallel_scan->phs_startblock;
+       }
+
+       /*
+        * The current block number is the next one that needs to be scanned,
+        * unless it's InvalidBlockNumber already, in which case there are no more
+        * blocks to scan.  After remembering the current value, we must advance
+        * it so that the next call to this function returns the next block to be
+        * scanned.
+        */
+       page = parallel_scan->phs_cblock;
+       if (page != InvalidBlockNumber)
+       {
+               parallel_scan->phs_cblock++;
+               if (parallel_scan->phs_cblock >= scan->rs_nblocks)
+                       parallel_scan->phs_cblock = 0;
+               if (parallel_scan->phs_cblock == parallel_scan->phs_startblock)
+                       parallel_scan->phs_cblock = InvalidBlockNumber;
+       }
+
+       /* Release the lock and return. */
+       SpinLockRelease(&parallel_scan->phs_mutex);
+       return page;
+}
+
 /* ----------------
  *             heap_getnext    - retrieve next tuple in scan
  *
index 75e6b72f9e0204913254548a42322a6fa7708d63..98eeadd23f82ca227ab7b2610130e1148e4f6586 100644 (file)
@@ -96,8 +96,9 @@ extern Relation heap_openrv_extended(const RangeVar *relation,
 
 #define heap_close(r,l)  relation_close(r,l)
 
-/* struct definition appears in relscan.h */
+/* struct definitions appear in relscan.h */
 typedef struct HeapScanDescData *HeapScanDesc;
+typedef struct ParallelHeapScanDescData *ParallelHeapScanDesc;
 
 /*
  * HeapScanIsValid
@@ -126,6 +127,11 @@ extern void heap_rescan_set_params(HeapScanDesc scan, ScanKey key,
 extern void heap_endscan(HeapScanDesc scan);
 extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction);
 
+extern Size heap_parallelscan_estimate(Snapshot snapshot);
+extern void heap_parallelscan_initialize(ParallelHeapScanDesc target,
+                                                        Relation relation, Snapshot snapshot);
+extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc);
+
 extern bool heap_fetch(Relation relation, Snapshot snapshot,
                   HeapTuple tuple, Buffer *userbuf, bool keep_buf,
                   Relation stats_relation);
index 6e6231971fdca3ef0a780f179b7cda515b4835a3..356c7e6b048f98d7a03a5d1475cea1f7c4fb9ed8 100644 (file)
 #include "access/itup.h"
 #include "access/tupdesc.h"
 
+/*
+ * Shared state for parallel heap scan.
+ *
+ * Each backend participating in a parallel heap scan has its own
+ * HeapScanDesc in backend-private memory, and those objects all contain
+ * a pointer to this structure.  The information here must be sufficient
+ * to properly initialize each new HeapScanDesc as workers join the scan,
+ * and it must act as a font of block numbers for those workers.
+ */
+typedef struct ParallelHeapScanDescData
+{
+       Oid                     phs_relid;              /* OID of relation to scan */
+       bool            phs_syncscan;   /* report location to syncscan logic? */
+       BlockNumber phs_nblocks;        /* # blocks in relation at start of scan */
+       slock_t         phs_mutex;              /* mutual exclusion for block number fields */
+       BlockNumber phs_startblock; /* starting block number */
+       BlockNumber phs_cblock;         /* current block number */
+       char            phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
+}      ParallelHeapScanDescData;
 
 typedef struct HeapScanDescData
 {
@@ -49,6 +68,7 @@ typedef struct HeapScanDescData
        BlockNumber rs_cblock;          /* current block # in scan, if any */
        Buffer          rs_cbuf;                /* current buffer in scan, if any */
        /* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */
+       ParallelHeapScanDesc rs_parallel;       /* parallel scan information */
 
        /* these fields only used in page-at-a-time mode and for bitmap scans */
        int                     rs_cindex;              /* current tuple's index in vistuples */