hjstate->hj_CurTuple = NULL;
}
+/*
+ * Decide if this process is allowed to run the unmatched scan. If so, the
+ * batch barrier is advanced to PHJ_BATCH_SCAN and true is returned.
+ * Otherwise the batch is detached and false is returned.
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int curbatch = hashtable->curbatch;
+ ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+
+ Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE);
+
+ /*
+ * It would not be deadlock-free to wait on the batch barrier, because it
+ * is in PHJ_BATCH_PROBE phase, and thus processes attached to it have
+ * already emitted tuples. Therefore, we'll hold a wait-free election:
+ * only one process can continue to the next phase, and all others detach
+ * from this batch. They can still go any work on other batches, if there
+ * are any.
+ */
+ if (!BarrierArriveAndDetachExceptLast(&batch->batch_barrier))
+ {
+ /* This process considers the batch to be done. */
+ hashtable->batches[hashtable->curbatch].done = true;
+
+ /* Make sure any temporary files are closed. */
+ sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
+ sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
+
+ /*
+ * Track largest batch we've seen, which would normally happen in
+ * ExecHashTableDetachBatch().
+ */
+ hashtable->spacePeak =
+ Max(hashtable->spacePeak,
+ batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
+ hashtable->curbatch = -1;
+ return false;
+ }
+
+ /* Now we are alone with this batch. */
+ Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);
+ Assert(BarrierParticipants(&batch->batch_barrier) == 1);
+
+ /*
+ * Has another process decided to give up early and command all processes
+ * to skip the unmatched scan?
+ */
+ if (batch->skip_unmatched)
+ {
+ hashtable->batches[hashtable->curbatch].done = true;
+ ExecHashTableDetachBatch(hashtable);
+ return false;
+ }
+
+ /* Now prepare the process local state, just as for non-parallel join. */
+ ExecPrepHashTableForUnmatched(hjstate);
+
+ return true;
+}
+
/*
* ExecScanHashTableForUnmatched
* scan the hash table for unmatched inner tuples
return false;
}
+/*
+ * ExecParallelScanHashTableForUnmatched
+ * scan the hash table for unmatched inner tuples, in parallel join
+ *
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
+ * for the latter.
+ */
+bool
+ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+ ExprContext *econtext)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ HashJoinTuple hashTuple = hjstate->hj_CurTuple;
+
+ for (;;)
+ {
+ /*
+ * hj_CurTuple is the address of the tuple last returned from the
+ * current bucket, or NULL if it's time to start scanning a new
+ * bucket.
+ */
+ if (hashTuple != NULL)
+ hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
+ else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
+ hashTuple = ExecParallelHashFirstTuple(hashtable,
+ hjstate->hj_CurBucketNo++);
+ else
+ break; /* finished all buckets */
+
+ while (hashTuple != NULL)
+ {
+ if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+ {
+ TupleTableSlot *inntuple;
+
+ /* insert hashtable's tuple into exec slot */
+ inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+ hjstate->hj_HashTupleSlot,
+ false); /* do not pfree */
+ econtext->ecxt_innertuple = inntuple;
+
+ /*
+ * Reset temp memory each time; although this function doesn't
+ * do any qual eval, the caller will, so let's keep it
+ * parallel to ExecScanHashBucket.
+ */
+ ResetExprContext(econtext);
+
+ hjstate->hj_CurTuple = hashTuple;
+ return true;
+ }
+
+ hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
+ }
+
+ /* allow this loop to be cancellable */
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ /*
+ * no more unmatched tuples
+ */
+ return false;
+}
+
/*
* ExecHashTableReset
*
accessor->shared = shared;
accessor->preallocated = 0;
accessor->done = false;
+ accessor->outer_eof = false;
accessor->inner_tuples =
sts_attach(ParallelHashJoinBatchInner(shared),
ParallelWorkerNumber + 1,
{
int curbatch = hashtable->curbatch;
ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+ bool attached = true;
/* Make sure any temporary files are closed. */
sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
- /* Detach from the batch we were last working on. */
- if (BarrierArriveAndDetach(&batch->batch_barrier))
+ /* After attaching we always get at least to PHJ_BATCH_PROBE. */
+ Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE ||
+ BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);
+
+ /*
+ * If we're abandoning the PHJ_BATCH_PROBE phase early without having
+ * reached the end of it, it means the plan doesn't want any more
+ * tuples, and it is happy to abandon any tuples buffered in this
+ * process's subplans. For correctness, we can't allow any process to
+ * execute the PHJ_BATCH_SCAN phase, because we will never have the
+ * complete set of match bits. Therefore we skip emitting unmatched
+ * tuples in all backends (if this is a full/right join), as if those
+ * tuples were all due to be emitted by this process and it has
+ * abandoned them too.
+ */
+ if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE &&
+ !hashtable->batches[curbatch].outer_eof)
+ {
+ /*
+ * This flag may be written to by multiple backends during
+ * PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN
+ * phase so requires no extra locking.
+ */
+ batch->skip_unmatched = true;
+ }
+
+ /*
+ * Even if we aren't doing a full/right outer join, we'll step through
+ * the PHJ_BATCH_SCAN phase just to maintain the invariant that
+ * freeing happens in PHJ_BATCH_FREE, but that'll be wait-free.
+ */
+ if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE)
+ attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
+ if (attached && BarrierArriveAndDetach(&batch->batch_barrier))
{
/*
- * Technically we shouldn't access the barrier because we're no
- * longer attached, but since there is no way it's moving after
- * this point it seems safe to make the following assertion.
+ * We are not longer attached to the batch barrier, but we're the
+ * process that was chosen to free resources and it's safe to
+ * assert the current phase. The ParallelHashJoinBatch can't go
+ * away underneath us while we are attached to the build barrier,
+ * making this access safe.
*/
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE);
* PHJ_BATCH_ALLOCATE* -- one allocates buckets
* PHJ_BATCH_LOAD -- all load the hash table from disk
* PHJ_BATCH_PROBE -- all probe
+ * PHJ_BATCH_SCAN* -- one does full/right unmatched scan
* PHJ_BATCH_FREE* -- one frees memory
*
* Batch 0 is a special case, because it starts out in phase
* to a barrier, unless the barrier has reached a phase that means that no
* process will wait on it again. We emit tuples while attached to the build
* barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
- * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_FREE
- * respectively without waiting, using BarrierArriveAndDetach(). The last to
- * detach receives a different return value so that it knows that it's safe to
+ * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
+ * respectively without waiting, using BarrierArriveAndDetach() and
+ * BarrierArriveAndDetachExceptLast() respectively. The last to detach
+ * receives a different return value so that it knows that it's safe to
* clean up. Any straggler process that attaches after that phase is reached
* will see that it's too late to participate or access the relevant shared
* memory objects.
if (HJ_FILL_INNER(node))
{
/* set up to scan for unmatched inner tuples */
- ExecPrepHashTableForUnmatched(node);
- node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ if (parallel)
+ {
+ /*
+ * Only one process is currently allow to handle
+ * each batch's unmatched tuples, in a parallel
+ * join.
+ */
+ if (ExecParallelPrepHashTableForUnmatched(node))
+ node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ else
+ node->hj_JoinState = HJ_NEED_NEW_BATCH;
+ }
+ else
+ {
+ ExecPrepHashTableForUnmatched(node);
+ node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ }
}
else
node->hj_JoinState = HJ_NEED_NEW_BATCH;
{
node->hj_MatchedOuter = true;
- if (parallel)
- {
- /*
- * Full/right outer joins are currently not supported
- * for parallel joins, so we don't need to set the
- * match bit. Experiments show that it's worth
- * avoiding the shared memory traffic on large
- * systems.
- */
- Assert(!HJ_FILL_INNER(node));
- }
- else
- {
- /*
- * This is really only needed if HJ_FILL_INNER(node),
- * but we'll avoid the branch and just set it always.
- */
+
+ /*
+ * This is really only needed if HJ_FILL_INNER(node), but
+ * we'll avoid the branch and just set it always.
+ */
+ if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
- }
/* In an antijoin, we never return a matched tuple */
if (node->js.jointype == JOIN_ANTI)
* so any unmatched inner tuples in the hashtable have to be
* emitted before we continue to the next batch.
*/
- if (!ExecScanHashTableForUnmatched(node, econtext))
+ if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
+ : ExecScanHashTableForUnmatched(node, econtext)))
{
/* no more unmatched tuples */
node->hj_JoinState = HJ_NEED_NEW_BATCH;
}
/* End of this batch */
+ hashtable->batches[curbatch].outer_eof = true;
+
return NULL;
}
* hash table stays alive until everyone's finished
* probing it, but no participant is allowed to wait at
* this barrier again (or else a deadlock could occur).
- * All attached participants must eventually call
- * BarrierArriveAndDetach() so that the final phase
- * PHJ_BATCH_FREE can be reached.
+ * All attached participants must eventually detach from
+ * the barrier and one worker must advance the phase so
+ * that the final phase is reached.
*/
ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
+
return true;
+ case PHJ_BATCH_SCAN:
+
+ /*
+ * In principle, we could help scan for unmatched tuples,
+ * since that phase is already underway (the thing we
+ * can't do under current deadlock-avoidance rules is wait
+ * for others to arrive at PHJ_BATCH_SCAN, because
+ * PHJ_BATCH_PROBE emits tuples, but in this case we just
+ * got here without waiting). That is not yet done. For
+ * now, we just detach and go around again. We have to
+ * use ExecHashTableDetachBatch() because there's a small
+ * chance we'll be the last to detach, and then we're
+ * responsible for freeing memory.
+ */
+ ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
+ hashtable->batches[batchno].done = true;
+ ExecHashTableDetachBatch(hashtable);
+ break;
case PHJ_BATCH_FREE:
* able to properly guarantee uniqueness. Similarly, we can't handle
* JOIN_FULL and JOIN_RIGHT, because they can produce false null
* extended rows. Also, the resulting path must not be parameterized.
- * We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel
- * Hash, since in that case we're back to a single hash table with a
- * single set of match bits for each batch, but that will require
- * figuring out a deadlock-free way to wait for the probe to finish.
*/
if (joinrel->consider_parallel &&
save_jointype != JOIN_UNIQUE_OUTER &&
- save_jointype != JOIN_FULL &&
- save_jointype != JOIN_RIGHT &&
outerrel->partial_pathlist != NIL &&
bms_is_empty(joinrel->lateral_relids))
{
* total inner path will also be parallel-safe, but if not, we'll
* have to search for the cheapest safe, unparameterized inner
* path. If doing JOIN_UNIQUE_INNER, we can't use any alternative
- * inner path.
+ * inner path. If full or right join, we can't use parallelism
+ * (building the hash table in each backend) because no one
+ * process has all the match bits.
*/
- if (cheapest_total_inner->parallel_safe)
+ if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT)
+ cheapest_safe_inner = NULL;
+ else if (cheapest_total_inner->parallel_safe)
cheapest_safe_inner = cheapest_total_inner;
else if (save_jointype != JOIN_UNIQUE_INNER)
cheapest_safe_inner =
size_t ntuples; /* number of tuples loaded */
size_t old_ntuples; /* number of tuples before repartitioning */
bool space_exhausted;
+ bool skip_unmatched; /* whether to abandon unmatched scan */
/*
* Variable-sized SharedTuplestore objects follow this struct in memory.
size_t estimated_size; /* size of partition on disk */
size_t old_ntuples; /* how many tuples before repartitioning? */
bool at_least_one_chunk; /* has this backend allocated a chunk? */
-
+ bool outer_eof; /* has this process hit end of batch? */
bool done; /* flag to remember that a batch is done */
SharedTuplestoreAccessor *inner_tuples;
SharedTuplestoreAccessor *outer_tuples;
#define PHJ_BATCH_ALLOCATE 1
#define PHJ_BATCH_LOAD 2
#define PHJ_BATCH_PROBE 3
-#define PHJ_BATCH_FREE 4
+#define PHJ_BATCH_SCAN 4
+#define PHJ_BATCH_FREE 5
/* The phases of batch growth while hashing, for grow_batches_barrier. */
#define PHJ_GROW_BATCHES_ELECT 0
extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
+extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate);
extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
ExprContext *econtext);
+extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+ ExprContext *econtext);
extern void ExecHashTableReset(HashJoinTable hashtable);
extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
t | f
(1 row)
+-- parallel full multi-batch hash join
+select count(*) from simple r full outer join simple s using (id);
+ count
+-------
+ 20000
+(1 row)
+
rollback to settings;
-- The "bad" case: during execution we need to increase number of
-- batches; in this case we plan for 1 batch, and increase at least a
(1 row)
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s using (id);
(1 row)
rollback to settings;
--- An full outer join where every record is not matched.
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s using (id);
+ QUERY PLAN
+-------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 2
+ -> Partial Aggregate
+ -> Parallel Hash Full Join
+ Hash Cond: (r.id = s.id)
+ -> Parallel Seq Scan on simple r
+ -> Parallel Hash
+ -> Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r full outer join simple s using (id);
+ count
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- A full outer join where every record is not matched.
-- non-parallel
savepoint settings;
set local max_parallel_workers_per_gather = 0;
(1 row)
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
40000
(1 row)
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ QUERY PLAN
+-------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 2
+ -> Partial Aggregate
+ -> Parallel Hash Full Join
+ Hash Cond: ((0 - s.id) = r.id)
+ -> Parallel Seq Scan on simple s
+ -> Parallel Hash
+ -> Parallel Seq Scan on simple r
+(9 rows)
+
+select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count
+-------
+ 40000
+(1 row)
+
rollback to settings;
-- exercise special code paths for huge tuples (note use of non-strict
-- expression and left join required to get the detoasted tuple into
$$
select count(*) from simple r join simple s using (id);
$$);
+-- parallel full multi-batch hash join
+select count(*) from simple r full outer join simple s using (id);
rollback to settings;
-- The "bad" case: during execution we need to increase number of
select count(*) from simple r full outer join simple s using (id);
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s using (id);
select count(*) from simple r full outer join simple s using (id);
rollback to settings;
--- An full outer join where every record is not matched.
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s using (id);
+select count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- A full outer join where every record is not matched.
-- non-parallel
savepoint settings;
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
-- exercise special code paths for huge tuples (note use of non-strict
-- expression and left join required to get the detoasted tuple into
-- the hash table)