--- /dev/null
+/*--------------------------------------------------------------------------
+ *
+ * parallel_dummy.c
+ * Test harness code for parallel mode code.
+ *
+ * Copyright (C) 2013-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/parallel_dummy/parallel_dummy.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/parallel.h"
+#include "access/relscan.h"
+#include "access/xact.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/snapmgr.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(parallel_count);
+
+#define TOC_SCAN_KEY 1
+#define TOC_RESULT_KEY 2
+
+void _PG_init(void);
+void count_worker_main(dsm_segment *seg, shm_toc *toc);
+
+static void count_helper(Relation rel, ParallelHeapScanDesc pscan,
+ int64 *resultp);
+
+Datum
+parallel_count(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ int32 nworkers = PG_GETARG_INT32(1);
+ int64 *resultp;
+ int64 result;
+ ParallelContext *pcxt;
+ ParallelHeapScanDesc pscan;
+ Relation rel;
+ Size sz;
+
+ if (nworkers < 0)
+ ereport(ERROR,
+ (errmsg("number of parallel workers must be non-negative")));
+
+ rel = relation_open(relid, AccessShareLock);
+
+ EnterParallelMode();
+
+ pcxt = CreateParallelContextForExternalFunction("parallel_dummy",
+ "count_worker_main",
+ nworkers);
+ sz = heap_parallelscan_estimate(GetActiveSnapshot());
+ shm_toc_estimate_chunk(&pcxt->estimator, sz);
+ shm_toc_estimate_chunk(&pcxt->estimator, sizeof(int64));
+ shm_toc_estimate_keys(&pcxt->estimator, 2);
+ InitializeParallelDSM(pcxt);
+ pscan = shm_toc_allocate(pcxt->toc, sz);
+ heap_parallelscan_initialize(pscan, rel, GetActiveSnapshot());
+ shm_toc_insert(pcxt->toc, TOC_SCAN_KEY, pscan);
+ resultp = shm_toc_allocate(pcxt->toc, sizeof(int64));
+ shm_toc_insert(pcxt->toc, TOC_RESULT_KEY, resultp);
+
+ LaunchParallelWorkers(pcxt);
+
+ /* here's where we do the "real work" ... */
+ count_helper(rel, pscan, resultp);
+
+ WaitForParallelWorkersToFinish(pcxt);
+
+ result = *resultp;
+
+ DestroyParallelContext(pcxt);
+
+ relation_close(rel, AccessShareLock);
+
+ ExitParallelMode();
+
+ PG_RETURN_INT64(result);
+}
+
+void
+count_worker_main(dsm_segment *seg, shm_toc *toc)
+{
+ ParallelHeapScanDesc pscan;
+ Relation rel;
+ int64 *resultp;
+
+ pscan = shm_toc_lookup(toc, TOC_SCAN_KEY);
+ resultp = shm_toc_lookup(toc, TOC_RESULT_KEY);
+ Assert(pscan != NULL && resultp != NULL);
+
+ rel = relation_open(pscan->phs_relid, AccessShareLock);
+ count_helper(rel, pscan, resultp);
+ relation_close(rel, AccessShareLock);
+}
+
+static void
+count_helper(Relation rel, ParallelHeapScanDesc pscan, int64 *resultp)
+{
+ int64 mytuples = 0;
+ HeapScanDesc scan;
+ BlockNumber firstblock = InvalidBlockNumber;
+
+ scan = heap_beginscan_parallel(rel, pscan);
+
+ for (;;)
+ {
+ HeapTuple tup = heap_getnext(scan, ForwardScanDirection);
+
+ if (!HeapTupleIsValid(tup))
+ break;
+ if (firstblock == InvalidBlockNumber)
+ firstblock = scan->rs_cblock;
+
+ ++mytuples;
+ }
+
+ heap_endscan(scan);
+
+ SpinLockAcquire(&pscan->phs_mutex); /* dirty hack */
+ *resultp += mytuples;
+ SpinLockRelease(&pscan->phs_mutex);
+
+ elog(NOTICE, "PID %d counted " INT64_FORMAT " tuples starting at block %u",
+ MyProcPid, mytuples, firstblock);
+}
#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
+#include "storage/spin.h"
#include "storage/standby.h"
#include "utils/datum.h"
#include "utils/inval.h"
static HeapScanDesc heap_beginscan_internal(Relation relation,
Snapshot snapshot,
int nkeys, ScanKey key,
+ ParallelHeapScanDesc parallel_scan,
bool allow_strat,
bool allow_sync,
bool allow_pagemode,
bool is_bitmapscan,
bool is_samplescan,
bool temp_snap);
+static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
TransactionId xid, CommandId cid, int options);
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
* results for a non-MVCC snapshot, the caller must hold some higher-level
* lock that ensures the interesting tuple(s) won't change.)
*/
- scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
+ if (scan->rs_parallel != NULL)
+ scan->rs_nblocks = scan->rs_parallel->phs_nblocks;
+ else
+ scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
/*
* If the table is large relative to NBuffers, use a bulk-read access
* behaviors, independently of the size of the table; also there is a GUC
* variable that can disable synchronized scanning.)
*
- * During a rescan, don't make a new strategy object if we don't have to.
+ * Note that heap_parallelscan_initialize has a very similar test; if you
+ * change this, consider changing that one, too.
*/
if (!RelationUsesLocalBuffers(scan->rs_rd) &&
scan->rs_nblocks > NBuffers / 4)
if (allow_strat)
{
+ /* During a rescan, keep the previous strategy object. */
if (scan->rs_strategy == NULL)
scan->rs_strategy = GetAccessStrategy(BAS_BULKREAD);
}
scan->rs_strategy = NULL;
}
- if (keep_startblock)
+ if (scan->rs_parallel != NULL)
+ {
+ /* For parallel scan, believe whatever ParallelHeapScanDesc says. */
+ scan->rs_syncscan = scan->rs_parallel->phs_syncscan;
+ }
+ else if (keep_startblock)
{
/*
* When rescanning, we want to keep the previous startblock setting,
tuple->t_data = NULL;
return;
}
- page = scan->rs_startblock; /* first page */
+ if (scan->rs_parallel != NULL)
+ {
+ page = heap_parallelscan_nextpage(scan);
+
+ /* Other processes might have already finished the scan. */
+ if (page == InvalidBlockNumber)
+ {
+ Assert(!BufferIsValid(scan->rs_cbuf));
+ tuple->t_data = NULL;
+ return;
+ }
+ }
+ else
+ page = scan->rs_startblock; /* first page */
heapgetpage(scan, page);
lineoff = FirstOffsetNumber; /* first offnum */
scan->rs_inited = true;
}
else if (backward)
{
+ /* backward parallel scan not supported */
+ Assert(scan->rs_parallel == NULL);
+
if (!scan->rs_inited)
{
/*
}
else
{
- page++;
- if (page >= scan->rs_nblocks)
- page = 0;
- finished = (page == scan->rs_startblock) ||
- (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false);
+ if (scan->rs_parallel != NULL)
+ {
+ page = heap_parallelscan_nextpage(scan);
+ finished = (page == InvalidBlockNumber);
+ }
+ else
+ {
+ page++;
+ if (page >= scan->rs_nblocks)
+ page = 0;
+
+ finished = (page == scan->rs_startblock) ||
+ (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false);
+ }
/*
* Report our new scan position for synchronization purposes. We
tuple->t_data = NULL;
return;
}
- page = scan->rs_startblock; /* first page */
+ if (scan->rs_parallel != NULL)
+ {
+ page = heap_parallelscan_nextpage(scan);
+
+ /* Other processes might have already finished the scan. */
+ if (page == InvalidBlockNumber)
+ {
+ Assert(!BufferIsValid(scan->rs_cbuf));
+ tuple->t_data = NULL;
+ return;
+ }
+ }
+ else
+ page = scan->rs_startblock; /* first page */
heapgetpage(scan, page);
lineindex = 0;
scan->rs_inited = true;
}
else if (backward)
{
+ /* backward parallel scan not supported */
+ Assert(scan->rs_parallel == NULL);
+
if (!scan->rs_inited)
{
/*
}
else
{
- page++;
- if (page >= scan->rs_nblocks)
- page = 0;
- finished = (page == scan->rs_startblock) ||
- (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false);
+ if (scan->rs_parallel != NULL)
+ {
+ page = heap_parallelscan_nextpage(scan);
+ finished = (page == InvalidBlockNumber);
+ }
+ else
+ {
+ page++;
+ if (page >= scan->rs_nblocks)
+ page = 0;
+
+ finished = (page == scan->rs_startblock) ||
+ (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false);
+ }
/*
* Report our new scan position for synchronization purposes. We
heap_beginscan(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key)
{
- return heap_beginscan_internal(relation, snapshot, nkeys, key,
+ return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
true, true, true, false, false, false);
}
Oid relid = RelationGetRelid(relation);
Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid));
- return heap_beginscan_internal(relation, snapshot, nkeys, key,
+ return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
true, true, true, false, false, true);
}
int nkeys, ScanKey key,
bool allow_strat, bool allow_sync)
{
- return heap_beginscan_internal(relation, snapshot, nkeys, key,
+ return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
allow_strat, allow_sync, true,
false, false, false);
}
heap_beginscan_bm(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key)
{
- return heap_beginscan_internal(relation, snapshot, nkeys, key,
+ return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
false, false, true, true, false, false);
}
int nkeys, ScanKey key,
bool allow_strat, bool allow_sync, bool allow_pagemode)
{
- return heap_beginscan_internal(relation, snapshot, nkeys, key,
+ return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
allow_strat, allow_sync, allow_pagemode,
false, true, false);
}
static HeapScanDesc
heap_beginscan_internal(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key,
+ ParallelHeapScanDesc parallel_scan,
bool allow_strat,
bool allow_sync,
bool allow_pagemode,
scan->rs_allow_strat = allow_strat;
scan->rs_allow_sync = allow_sync;
scan->rs_temp_snap = temp_snap;
+ scan->rs_parallel = parallel_scan;
/*
* we can use page-at-a-time mode if it's an MVCC-safe snapshot
* reinitialize scan descriptor
*/
initscan(scan, key, true);
+
+ /*
+ * reset parallel scan, if present
+ */
+ if (scan->rs_parallel != NULL)
+ {
+ ParallelHeapScanDesc parallel_scan;
+
+ /*
+ * Caller is responsible for making sure that all workers have
+ * finished the scan before calling this, so it really shouldn't be
+ * necessary to acquire the mutex at all. We acquire it anyway, just
+ * to be tidy.
+ */
+ parallel_scan = scan->rs_parallel;
+ SpinLockAcquire(¶llel_scan->phs_mutex);
+ parallel_scan->phs_cblock = parallel_scan->phs_startblock;
+ SpinLockRelease(¶llel_scan->phs_mutex);
+ }
}
/* ----------------
pfree(scan);
}
+/* ----------------
+ * heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc
+ *
+ * Sadly, this doesn't reduce to a constant, because the size required
+ * to serialize the snapshot can vary.
+ * ----------------
+ */
+Size
+heap_parallelscan_estimate(Snapshot snapshot)
+{
+ return add_size(offsetof(ParallelHeapScanDescData, phs_snapshot_data),
+ EstimateSnapshotSpace(snapshot));
+}
+
+/* ----------------
+ * heap_parallelscan_initialize - initialize ParallelHeapScanDesc
+ *
+ * Must allow as many bytes of shared memory as returned by
+ * heap_parallelscan_estimate. Call this just once in the leader
+ * process; then, individual workers attach via heap_beginscan_parallel.
+ * ----------------
+ */
+void
+heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
+ Snapshot snapshot)
+{
+ target->phs_relid = RelationGetRelid(relation);
+ target->phs_nblocks = RelationGetNumberOfBlocks(relation);
+ /* compare phs_syncscan initialization to similar logic in initscan */
+ target->phs_syncscan = synchronize_seqscans &&
+ !RelationUsesLocalBuffers(relation) &&
+ target->phs_nblocks > NBuffers / 4;
+ SpinLockInit(&target->phs_mutex);
+ target->phs_cblock = InvalidBlockNumber;
+ target->phs_startblock = InvalidBlockNumber;
+ SerializeSnapshot(snapshot, target->phs_snapshot_data);
+}
+
+/* ----------------
+ * heap_beginscan_parallel - join a parallel scan
+ *
+ * Caller must hold a suitable lock on the correct relation.
+ * ----------------
+ */
+HeapScanDesc
+heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
+{
+ Snapshot snapshot;
+
+ Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
+ snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
+ RegisterSnapshot(snapshot);
+
+ return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan,
+ true, true, true, false, false, true);
+}
+
+/* ----------------
+ * heap_parallelscan_nextpage - get the next page to scan
+ *
+ * Get the next page to scan. Even if there are no pages left to scan,
+ * another backend could have grabbed a page to scan and not yet finished
+ * looking at it, so it doesn't follow that the scan is done when the
+ * first backend gets an InvalidBlockNumber return.
+ * ----------------
+ */
+static BlockNumber
+heap_parallelscan_nextpage(HeapScanDesc scan)
+{
+ BlockNumber page = InvalidBlockNumber;
+ BlockNumber sync_startpage = InvalidBlockNumber;
+ ParallelHeapScanDesc parallel_scan;
+
+ Assert(scan->rs_parallel);
+ parallel_scan = scan->rs_parallel;
+
+retry:
+ /* Grab the spinlock. */
+ SpinLockAcquire(¶llel_scan->phs_mutex);
+
+ /*
+ * If the scan's startblock has not yet been initialized, we must do so
+ * now. If this is not a synchronized scan, we just start at block 0, but
+ * if it is a synchronized scan, we must get the starting position from
+ * the synchronized scan machinery. We can't hold the spinlock while
+ * doing that, though, so release the spinlock, get the information we
+ * need, and retry. If nobody else has initialized the scan in the
+ * meantime, we'll fill in the value we fetched on the second time
+ * through.
+ */
+ if (parallel_scan->phs_startblock == InvalidBlockNumber)
+ {
+ if (!parallel_scan->phs_syncscan)
+ parallel_scan->phs_startblock = 0;
+ else if (sync_startpage != InvalidBlockNumber)
+ parallel_scan->phs_startblock = sync_startpage;
+ else
+ {
+ SpinLockRelease(¶llel_scan->phs_mutex);
+ sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks);
+ goto retry;
+ }
+ parallel_scan->phs_cblock = parallel_scan->phs_startblock;
+ }
+
+ /*
+ * The current block number is the next one that needs to be scanned,
+ * unless it's InvalidBlockNumber already, in which case there are no more
+ * blocks to scan. After remembering the current value, we must advance
+ * it so that the next call to this function returns the next block to be
+ * scanned.
+ */
+ page = parallel_scan->phs_cblock;
+ if (page != InvalidBlockNumber)
+ {
+ parallel_scan->phs_cblock++;
+ if (parallel_scan->phs_cblock >= scan->rs_nblocks)
+ parallel_scan->phs_cblock = 0;
+ if (parallel_scan->phs_cblock == parallel_scan->phs_startblock)
+ parallel_scan->phs_cblock = InvalidBlockNumber;
+ }
+
+ /* Release the lock and return. */
+ SpinLockRelease(¶llel_scan->phs_mutex);
+ return page;
+}
+
/* ----------------
* heap_getnext - retrieve next tuple in scan
*