diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 0a669d9..1e7d369 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1023,7 +1023,10 @@ ExplainNode(PlanState *planstate, List *ancestors, pname = sname = "Limit"; break; case T_Hash: - pname = sname = "Hash"; + if (((Hash *) plan)->shared_table) + pname = sname = "Shared Hash"; + else + pname = sname = "Hash"; break; default: pname = sname = "???"; diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 72bacd5..c8c39f7 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -27,6 +27,7 @@ #include "executor/executor.h" #include "executor/nodeCustom.h" #include "executor/nodeForeignscan.h" +#include "executor/nodeHashjoin.h" #include "executor/nodeSeqscan.h" #include "executor/tqueue.h" #include "nodes/nodeFuncs.h" @@ -203,6 +204,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecCustomScanEstimate((CustomScanState *) planstate, e->pcxt); break; + case T_HashJoinState: + ExecHashJoinEstimate((HashJoinState *) planstate, + e->pcxt); + break; default: break; } @@ -255,6 +260,9 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecCustomScanInitializeDSM((CustomScanState *) planstate, d->pcxt); break; + case T_HashJoinState: + ExecHashJoinInitializeDSM((HashJoinState *) planstate, + d->pcxt); default: break; } @@ -724,6 +732,10 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) ExecCustomScanInitializeWorker((CustomScanState *) planstate, toc); break; + case T_HashJoinState: + ExecHashJoinInitializeWorker((HashJoinState *) planstate, + toc); + break; default: break; } diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 6375d9b..0b8d27b 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -25,6 +25,7 @@ #include #include "access/htup_details.h" +#include "access/parallel.h" #include "catalog/pg_statistic.h" #include "commands/tablespace.h" #include "executor/execdebug.h" @@ -32,12 +33,13 @@ #include "executor/nodeHash.h" #include "executor/nodeHashjoin.h" #include "miscadmin.h" +#include "pgstat.h" +#include "port/atomics.h" #include "utils/dynahash.h" #include "utils/memutils.h" #include "utils/lsyscache.h" #include "utils/syscache.h" - static void ExecHashIncreaseNumBatches(HashJoinTable hashtable); static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable); static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, @@ -47,8 +49,30 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable, uint32 hashvalue, int bucketNumber); static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable); +static void ExecHashRebucket(HashJoinTable hashtable); +static void ExecHashTableComputeOptimalBuckets(HashJoinTable hashtable); + +static void add_tuple_count(HashJoinTable hashtable, int count, + bool secondary); +static HashJoinTuple next_tuple_in_bucket(HashJoinTable table, + HashJoinTuple tuple); +static HashJoinTuple first_tuple_in_skew_bucket(HashJoinTable table, + int skew_bucket_no); +static HashJoinTuple first_tuple_in_skew_bucket(HashJoinTable table, + int bucket_no); +static void insert_tuple_into_bucket(HashJoinTable table, int bucket_no, + HashJoinTuple tuple, + dsa_pointer tuple_pointer); +static void insert_tuple_into_skew_bucket(HashJoinTable table, + int bucket_no, + HashJoinTuple tuple, + dsa_pointer tuple_pointer); static void *dense_alloc(HashJoinTable hashtable, Size size); +static void *dense_alloc_shared(HashJoinTable hashtable, Size size, + dsa_pointer *chunk_shared, + bool secondary); + /* ---------------------------------------------------------------- * ExecHash @@ -64,6 +88,100 @@ ExecHash(HashState *node) } /* ---------------------------------------------------------------- + * ExecHashCheckForEarlyExit + * + * return true if this process needs to abandon work on the + * hash join to avoid a deadlock + * ---------------------------------------------------------------- + */ +bool +ExecHashCheckForEarlyExit(HashJoinTable hashtable) +{ + /* + * The golden rule of leader deadlock avoidance: since leader processes + * have two separate roles, namely reading from worker queues AND executing + * the same plan as workers, we must never allow a leader to wait for + * workers if there is any possibility those workers have emitted tuples. + * Otherwise we could get into a situation where a worker fills up its + * output tuple queue and begins waiting for the leader to read, while + * the leader is busy waiting for the worker. + * + * Parallel hash joins with shared tables are inherently susceptible to + * such deadlocks because there are points at which all participants must + * wait (you can't start check for unmatched tuples in the hash table until + * probing has completed in all workers, etc). + * + * So we follow these rules: + * + * 1. If there are workers participating, the leader MUST NOT not + * participate in any further work after probing the first batch, so + * that it never has to wait for workers that might have emitted + * tuples. + * + * 2. If there are no workers participating, the leader MUST run all the + * batches to completion, because that's the only way for the join + * to complete. There is no deadlock risk if there are no workers. + * + * 3. Workers MUST NOT participate if the hashing phase has finished by + * the time they have joined, so that the leader can reliably determine + * whether there are any workers running when it comes to the point + * where it must choose between 1 and 2. + * + * In other words, if the leader makes it all the way through hashing and + * probing before any workers show up, then the leader will run the whole + * hash join on its own. If workers do show up any time before hashing is + * finished, the leader will stop executing the join after helping probe + * the first batch. In the unlikely event of the first worker showing up + * after the leader has finished hashing, it will exit because it's too + * late, the leader has already decided to do all the work alone. + */ + + if (!IsParallelWorker()) + { + /* Running in the leader process. */ + if (BarrierPhase(&hashtable->shared->barrier) == PHJ_PHASE_PROBING && + hashtable->shared->at_least_one_worker) + { + /* Abandon ship due to rule 1. There are workers running. */ + hashtable->detached_early = true; + } + else + { + /* + * Continue processing due to rule 2. There are no workers, and + * any workers that show up later will abandon ship. + */ + } + } + else + { + /* Running in a worker process. */ + if (hashtable->attached_at_phase < PHJ_PHASE_PROBING) + { + /* + * Advertise that there are workers, so that the leader can + * choose between rules 1 and 2. It's OK that several workers can + * write to this variable without immediately memory + * synchronization, because the leader will only read it in a later + * phase (see above). + */ + hashtable->shared->at_least_one_worker = true; + } + else + { + /* Abandon ship due to rule 3. */ + hashtable->detached_early = true; + } + } + + /* If we decided to exit early, detach now. */ + if (hashtable->detached_early) + BarrierDetach(&hashtable->shared->barrier); + + return hashtable->detached_early; +} + +/* ---------------------------------------------------------------- * MultiExecHash * * build hash table for hashjoin, doing partitioning if more @@ -79,6 +197,7 @@ MultiExecHash(HashState *node) TupleTableSlot *slot; ExprContext *econtext; uint32 hashvalue; + Barrier *barrier = NULL; /* must provide our own instrumentation support */ if (node->ps.instrument) @@ -90,6 +209,55 @@ MultiExecHash(HashState *node) outerNode = outerPlanState(node); hashtable = node->hashtable; + if (HashJoinTableIsShared(hashtable)) + { + /* + * Synchronize parallel hash table builds. At this stage we know that + * the shared hash table has been created, but we don't know if our + * peers are still in MultiExecHash and if so how far through. We use + * the phase to synchronize with them. + */ + barrier = &hashtable->shared->barrier; + + switch (BarrierPhase(barrier)) + { + case PHJ_PHASE_INIT: + /* ExecHashTableCreate already handled this phase. */ + Assert(false); + case PHJ_PHASE_CREATING: + /* Wait for serial phase, and then either hash or wait. */ + if (BarrierWait(barrier, WAIT_EVENT_HASH_CREATING)) + goto hash; + else if (node->ps.plan->parallel_aware) + goto hash; + else + goto post_hash; + case PHJ_PHASE_HASHING: + /* Hashing is already underway. Can we join in? */ + if (node->ps.plan->parallel_aware) + goto hash; + else + goto post_hash; + case PHJ_PHASE_RESIZING: + /* Can't help with serial phase. */ + goto post_resize; + case PHJ_PHASE_REBUCKETING: + /* Rebucketing is in progress. Let's help do that. */ + goto rebucket; + default: + /* The hash table building work is already finished. */ + goto finish; + } + } + + hash: + if (HashJoinTableIsShared(hashtable)) + { + /* Make sure our local hashtable is up-to-date so we can hash. */ + Assert(BarrierPhase(barrier) == PHJ_PHASE_HASHING); + ExecHashUpdate(hashtable); + } + /* * set expression context */ @@ -123,22 +291,98 @@ MultiExecHash(HashState *node) else { /* Not subject to skew optimization, so insert normally */ - ExecHashTableInsert(hashtable, slot, hashvalue); + ExecHashTableInsert(hashtable, slot, hashvalue, false); } - hashtable->totalTuples += 1; + /* + * Shared tuple counters are managed by dense_alloc_shared. For + * private hash tables we maintain the counter here. + */ + if (!HashJoinTableIsShared(hashtable)) + hashtable->totalTuples += 1; } } + if (HashJoinTableIsShared(hashtable)) + { + /* + * Update shared tuple count for the current chunk. Other chunks are + * accounted for already, when new chunks are allocated. + */ + if (hashtable->primary_chunk != NULL) + add_tuple_count(hashtable, hashtable->primary_chunk->ntuples, + false); + } + + post_hash: + if (HashJoinTableIsShared(hashtable)) + { + bool elected_to_resize; + + /* + * Wait for all backends to finish hashing. If only one worker is + * running the hashing phase because of a non-partial inner plan, the + * other workers will pile up here waiting. If multiple worker are + * hashing, they should finish close to each other in time. + */ + Assert(BarrierPhase(barrier) == PHJ_PHASE_HASHING); + elected_to_resize = BarrierWait(barrier, WAIT_EVENT_HASH_HASHING); + /* + * Resizing is a serial phase. All but one should skip ahead to + * rebucketing, but all workers should update their copy of the shared + * tuple count with the final total first. + */ + hashtable->totalTuples = + pg_atomic_read_u64(&hashtable->shared->total_primary_tuples); + if (!elected_to_resize) + goto post_resize; + Assert(BarrierPhase(barrier) == PHJ_PHASE_RESIZING); + } + /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */ - if (hashtable->nbuckets != hashtable->nbuckets_optimal) - ExecHashIncreaseNumBuckets(hashtable); + ExecHashIncreaseNumBuckets(hashtable); + + post_resize: + if (HashJoinTableIsShared(hashtable)) + { + Assert(BarrierPhase(barrier) == PHJ_PHASE_RESIZING); + BarrierWait(&hashtable->shared->barrier, + WAIT_EVENT_HASH_RESIZING); + Assert(BarrierPhase(barrier) == PHJ_PHASE_REBUCKETING); + } + + rebucket: + /* If the table was resized, insert tuples into the new buckets. */ + ExecHashUpdate(hashtable); + ExecHashRebucket(hashtable); /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ - hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); + hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinBucketHead); if (hashtable->spaceUsed > hashtable->spacePeak) hashtable->spacePeak = hashtable->spaceUsed; + if (HashJoinTableIsShared(hashtable)) + { + Assert(BarrierPhase(barrier) == PHJ_PHASE_REBUCKETING); + BarrierWait(barrier, WAIT_EVENT_HASH_REBUCKETING); + Assert(BarrierPhase(barrier) == PHJ_PHASE_PROBING); + } + + finish: + if (HashJoinTableIsShared(hashtable)) + { + /* + * All hashing work has finished. The other workers may be probing or + * processing unmatched tuples for the initial batch, or dealing with + * later batches. The next synchronization point is in ExecHashJoin's + * HJ_BUILD_HASHTABLE case, which will figure that out and synchronize + * its local state machine with the parallel processing group's phase. + */ + Assert(BarrierPhase(barrier) >= PHJ_PHASE_PROBING); + ExecHashUpdate(hashtable); + } + /* must provide our own instrumentation support */ + /* TODO: report only the tuples that WE hashed here? */ if (node->ps.instrument) InstrStopNode(node->ps.instrument, hashtable->totalTuples); @@ -243,8 +487,9 @@ ExecEndHash(HashState *node) * ---------------------------------------------------------------- */ HashJoinTable -ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) +ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) { + Hash *node; HashJoinTable hashtable; Plan *outerNode; int nbuckets; @@ -261,6 +506,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) * "outer" subtree of this node, but the inner relation of the hashjoin). * Compute the appropriate size of the hash table. */ + node = (Hash *) state->ps.plan; outerNode = outerPlan(node); ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width, @@ -305,7 +551,14 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) hashtable->spaceUsedSkew = 0; hashtable->spaceAllowedSkew = hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100; - hashtable->chunks = NULL; + hashtable->primary_chunk = NULL; + hashtable->secondary_chunk = NULL; + hashtable->chunks_to_rebucket = NULL; + hashtable->primary_chunk_shared = InvalidDsaPointer; + hashtable->secondary_chunk_shared = InvalidDsaPointer; + hashtable->area = state->ps.state->es_query_area; + hashtable->shared = state->shared_table_data; + hashtable->detached_early = false; #ifdef HJDEBUG printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n", @@ -368,23 +621,101 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) PrepareTempTablespaces(); } - /* - * Prepare context for the first-scan space allocations; allocate the - * hashbucket array therein, and set each bucket "empty". - */ - MemoryContextSwitchTo(hashtable->batchCxt); + MemoryContextSwitchTo(oldcxt); - hashtable->buckets = (HashJoinTuple *) - palloc0(nbuckets * sizeof(HashJoinTuple)); + if (HashJoinTableIsShared(hashtable)) + { + Barrier *barrier; - /* - * Set up for skew optimization, if possible and there's a need for more - * than one batch. (In a one-batch join, there's no point in it.) - */ - if (nbatch > 1) - ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs); + /* + * Attach to the barrier. The corresponding detach operation is in + * ExecHashTableDestroy. + */ + barrier = &hashtable->shared->barrier; + hashtable->attached_at_phase = BarrierAttach(barrier); - MemoryContextSwitchTo(oldcxt); + /* + * So far we have no idea whether there are any other participants, and + * if so, what phase they are working on. The only thing we care about + * at this point is whether someone has already created the shared + * hash table yet. If not, one backend will be elected to do that + * now. + */ + if (BarrierPhase(barrier) == PHJ_PHASE_INIT) + { + if (BarrierWait(barrier, WAIT_EVENT_HASH_INIT)) + { + /* Serial phase: create the hash tables */ + Size bytes; + HashJoinBucketHead *buckets; + int i; + SharedHashJoinTable shared; + dsa_area *area; + + shared = hashtable->shared; + area = hashtable->area; + bytes = nbuckets * sizeof(HashJoinBucketHead); + + /* Allocate the primary and secondary hash tables. */ + shared->primary_buckets = dsa_allocate(area, bytes); + shared->secondary_buckets = dsa_allocate(area, bytes); + if (!DsaPointerIsValid(shared->primary_buckets) || + !DsaPointerIsValid(shared->secondary_buckets)) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("out of memory"))); + + /* Set up primary table's buckets. */ + buckets = dsa_get_address(area, shared->primary_buckets); + for (i = 0; i < nbuckets; ++i) + dsa_pointer_atomic_init(&buckets[i].shared, + InvalidDsaPointer); + /* Set up secondary table's buckets. */ + buckets = dsa_get_address(area, shared->secondary_buckets); + for (i = 0; i < nbuckets; ++i) + dsa_pointer_atomic_init(&buckets[i].shared, + InvalidDsaPointer); + + /* Initialize the rest of parallel_state. */ + hashtable->shared->nbuckets = nbuckets; + pg_atomic_write_u32(&hashtable->shared->next_unmatched_bucket, + 0); + ExecHashJoinRewindBatches(hashtable, 0); + + /* TODO: ExecHashBuildSkewHash */ + + /* + * The backend-local pointers in hashtable will be set up by + * ExecHashUpdate, at each point where they might have + * changed. + */ + } + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_CREATING); + /* The next synchronization point is in MultiExecHash. */ + } + } + else + { + /* + * Prepare context for the first-scan space allocations; allocate the + * hashbucket array therein, and set each bucket "empty". + */ + MemoryContextSwitchTo(hashtable->batchCxt); + + hashtable->buckets = (HashJoinBucketHead *) + palloc0(nbuckets * sizeof(HashJoinBucketHead)); + + MemoryContextSwitchTo(oldcxt); + + /* + * Set up for skew optimization, if possible and there's a need for + * more than one batch. (In a one-batch join, there's no point in + * it.) + */ + if (nbatch > 1) + ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs); + } return hashtable; } @@ -481,8 +812,8 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, * Note that both nbuckets and nbatch must be powers of 2 to make * ExecHashGetBucketAndBatch fast. */ - max_pointers = (work_mem * 1024L) / sizeof(HashJoinTuple); - max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple)); + max_pointers = (work_mem * 1024L) / sizeof(HashJoinBucketHead); + max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinBucketHead)); /* If max_pointers isn't a power of 2, must round it down to one */ mppow2 = 1L << my_log2(max_pointers); if (max_pointers != mppow2) @@ -504,7 +835,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, * If there's not enough space to store the projected number of tuples and * the required bucket headers, we will need multiple batches. */ - bucket_bytes = sizeof(HashJoinTuple) * nbuckets; + bucket_bytes = sizeof(HashJoinBucketHead) * nbuckets; if (inner_rel_bytes + bucket_bytes > hash_table_bytes) { /* We'll need multiple batches */ @@ -519,12 +850,12 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, * NTUP_PER_BUCKET tuples, whose projected size already includes * overhead for the hash code, pointer to the next tuple, etc. */ - bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple)); + bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinBucketHead)); lbuckets = 1L << my_log2(hash_table_bytes / bucket_size); lbuckets = Min(lbuckets, max_pointers); nbuckets = (int) lbuckets; nbuckets = 1 << my_log2(nbuckets); - bucket_bytes = nbuckets * sizeof(HashJoinTuple); + bucket_bytes = nbuckets * sizeof(HashJoinBucketHead); /* * Buckets are simple pointers to hashjoin tuples, while tupsize @@ -564,6 +895,38 @@ ExecHashTableDestroy(HashJoinTable hashtable) { int i; + /* Detached, if we haven't already. */ + if (HashJoinTableIsShared(hashtable) && !hashtable->detached_early) + { + Barrier *barrier = &hashtable->shared->barrier; + + /* + * TODO: Can we just detach if there is only one batch, but wait here + * if there is more than one (to make sure batch files created by this + * participant are not deleted)? When detaching, the last one to + * detach should do the cleanup work, and/or leave things in the right + * state for rescanning. + */ + + if (BarrierWait(barrier, WAIT_EVENT_HASH_DESTROY)) + { + /* Serial: free the tables */ + if (DsaPointerIsValid(hashtable->shared->primary_buckets)) + { + dsa_free(hashtable->area, + hashtable->shared->primary_buckets); + hashtable->shared->primary_buckets = InvalidDsaPointer; + } + if (DsaPointerIsValid(hashtable->shared->secondary_buckets)) + { + dsa_free(hashtable->area, + hashtable->shared->secondary_buckets); + hashtable->shared->secondary_buckets = InvalidDsaPointer; + } + } + BarrierDetach(&hashtable->shared->barrier); + } + /* * Make sure all the temp files are closed. We skip batch 0, since it * can't have any temp files (and the arrays might not even exist if @@ -600,6 +963,15 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) long nfreed; HashMemoryChunk oldchunks; + /* + * TODO: Implement for shared tables. It's OK for different workers to + * have different ideas of nbatch for short times, as long as they agree + * at key points in time (ie when deciding if we're finished). Working on + * this... + */ + if (HashJoinTableIsShared(hashtable)) + return; + /* do nothing if we've decided to shut off growth */ if (!hashtable->growEnabled) return; @@ -661,7 +1033,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; hashtable->buckets = repalloc(hashtable->buckets, - sizeof(HashJoinTuple) * hashtable->nbuckets); + sizeof(HashJoinBucketHead) * hashtable->nbuckets); } /* @@ -669,14 +1041,14 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) * buckets now and not have to keep track which tuples in the buckets have * already been processed. We will free the old chunks as we go. */ - memset(hashtable->buckets, 0, sizeof(HashJoinTuple) * hashtable->nbuckets); - oldchunks = hashtable->chunks; - hashtable->chunks = NULL; + memset(hashtable->buckets, 0, sizeof(HashJoinBucketHead) * hashtable->nbuckets); + oldchunks = hashtable->primary_chunk; + hashtable->primary_chunk = NULL; /* so, let's scan through the old chunks, and all tuples in each chunk */ while (oldchunks != NULL) { - HashMemoryChunk nextchunk = oldchunks->next; + HashMemoryChunk nextchunk = oldchunks->next.private; /* position within the buffer (up to oldchunks->used) */ size_t idx = 0; @@ -699,20 +1071,23 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) /* keep tuple in memory - copy it into the new chunk */ HashJoinTuple copyTuple; - copyTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize); + copyTuple = (HashJoinTuple) + dense_alloc(hashtable, hashTupleSize); memcpy(copyTuple, hashTuple, hashTupleSize); /* and add it back to the appropriate bucket */ - copyTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = copyTuple; + insert_tuple_into_bucket(hashtable, bucketno, copyTuple, + InvalidDsaPointer); } else { /* dump it out */ Assert(batchno > curbatch); - ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple), + ExecHashJoinSaveTuple(hashtable, + HJTUPLE_MINTUPLE(hashTuple), hashTuple->hashvalue, - &hashtable->innerBatchFile[batchno]); + batchno, + true); hashtable->spaceUsed -= hashTupleSize; nfreed++; @@ -758,8 +1133,6 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable) { - HashMemoryChunk chunk; - /* do nothing if not an increase (it's called increase for a reason) */ if (hashtable->nbuckets >= hashtable->nbuckets_optimal) return; @@ -780,16 +1153,156 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable) * Just reallocate the proper number of buckets - we don't need to walk * through them - we can walk the dense-allocated chunks (just like in * ExecHashIncreaseNumBatches, but without all the copying into new - * chunks) + * chunks): see ExecHashRebucket, which must be called next. */ - hashtable->buckets = - (HashJoinTuple *) repalloc(hashtable->buckets, - hashtable->nbuckets * sizeof(HashJoinTuple)); + if (HashJoinTableIsShared(hashtable)) + { + Size bytes; + int i; + + /* Serial phase: only one backend reallocates. */ + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_RESIZING); + + /* Free the old arrays. */ + dsa_free(hashtable->area, + hashtable->shared->primary_buckets); + dsa_free(hashtable->area, + hashtable->shared->secondary_buckets); + /* Allocate replacements. */ + bytes = hashtable->nbuckets * sizeof(HashJoinBucketHead); + hashtable->shared->primary_buckets = + dsa_allocate(hashtable->area, bytes); + hashtable->shared->secondary_buckets = + dsa_allocate(hashtable->area, bytes); + if (!DsaPointerIsValid(hashtable->shared->primary_buckets) || + !DsaPointerIsValid(hashtable->shared->secondary_buckets)) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("out of memory"))); + /* Initialize empty buckets. */ + hashtable->buckets = + dsa_get_address(hashtable->area, + hashtable->shared->primary_buckets); + for (i = 0; i < hashtable->nbuckets; ++i) + dsa_pointer_atomic_write(&hashtable->buckets[i].shared, + InvalidDsaPointer); + hashtable->next_buckets = + dsa_get_address(hashtable->area, + hashtable->shared->secondary_buckets); + for (i = 0; i < hashtable->nbuckets; ++i) + dsa_pointer_atomic_write(&hashtable->next_buckets[i].shared, + InvalidDsaPointer); + hashtable->shared->nbuckets = hashtable->nbuckets; + /* Move all primary chunks to the rebucket list. */ + dsa_pointer_atomic_write(&hashtable->shared->chunks_to_rebucket, + dsa_pointer_atomic_read(&hashtable->shared->head_primary_chunk)); + dsa_pointer_atomic_write(&hashtable->shared->head_primary_chunk, + InvalidDsaPointer); + } + else + { + hashtable->buckets = + (HashJoinBucketHead *) repalloc(hashtable->buckets, + hashtable->nbuckets * sizeof(HashJoinBucketHead)); + + memset(hashtable->buckets, 0, hashtable->nbuckets * sizeof(HashJoinBucketHead)); + /* Move all chunks to the rebucket list. */ + hashtable->chunks_to_rebucket = hashtable->primary_chunk; + hashtable->primary_chunk = NULL; + } +} + +/* + * Pop a memory chunk from a given list atomically. Returns a backend-local + * pointer to the chunk, or NULL if the list is empty. Also sets *chunk_out + * to the dsa_pointer to the chunk. + */ +static HashMemoryChunk +ExecHashPopChunk(HashJoinTable hashtable, + dsa_pointer *chunk_out, + dsa_pointer_atomic *head) +{ + HashMemoryChunk chunk = NULL; + + /* + * We could see a stale empty list and exist early without a barrier, so + * explicitly include one before we read the head of the list for the + * first time. + */ + pg_read_barrier(); - memset(hashtable->buckets, 0, hashtable->nbuckets * sizeof(HashJoinTuple)); + for (;;) + { + *chunk_out = dsa_pointer_atomic_read(head); + if (!DsaPointerIsValid(*chunk_out)) + { + chunk = NULL; + break; + } + chunk = (HashMemoryChunk) + dsa_get_address(hashtable->area, *chunk_out); + if (dsa_pointer_atomic_compare_exchange(head, + chunk_out, + chunk->next.shared)) + break; + } + + return chunk; +} + +/* + * Push a shared memory chunk onto a given list atomically. + */ +static void +ExecHashPushChunk(HashJoinTable hashtable, + HashMemoryChunk chunk, + dsa_pointer chunk_shared, + dsa_pointer_atomic *head) +{ + Assert(chunk == dsa_get_address(hashtable->area, chunk_shared)); + + for (;;) + { + chunk->next.shared = dsa_pointer_atomic_read(head); + if (dsa_pointer_atomic_compare_exchange(head, + &chunk->next.shared, + chunk_shared)) + break; + } +} + +/* + * ExecHashRebucket + * insert the tuples from all chunks into the correct bucket + */ +static void +ExecHashRebucket(HashJoinTable hashtable) +{ + HashMemoryChunk chunk; + dsa_pointer chunk_shared; + + if (HashJoinTableIsShared(hashtable)) + { + /* + * This is a parallel phase. Workers will atomically pop one chunk at + * a time and rebucket all of its tuples. + */ + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_REBUCKETING); + } - /* scan through all tuples in all chunks to rebuild the hash table */ - for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next) + /* + * Scan through all tuples in all chunks in the rebucket list to rebuild + * the hash table. + */ + if (HashJoinTableIsShared(hashtable)) + chunk = + ExecHashPopChunk(hashtable, &chunk_shared, + &hashtable->shared->chunks_to_rebucket); + else + chunk = hashtable->chunks_to_rebucket; + while (chunk != NULL) { /* process all tuples stored in this chunk */ size_t idx = 0; @@ -797,6 +1310,8 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable) while (idx < chunk->used) { HashJoinTuple hashTuple = (HashJoinTuple) (chunk->data + idx); + dsa_pointer hashTuple_shared = chunk_shared + + offsetof(HashMemoryChunkData, data) + idx; int bucketno; int batchno; @@ -804,16 +1319,52 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable) &bucketno, &batchno); /* add the tuple to the proper bucket */ - hashTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = hashTuple; + insert_tuple_into_bucket(hashtable, bucketno, hashTuple, + hashTuple_shared); /* advance index past the tuple */ idx += MAXALIGN(HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(hashTuple)->t_len); } + + /* Push chunk onto regular list and move to next chunk. */ + if (HashJoinTableIsShared(hashtable)) + { + ExecHashPushChunk(hashtable, chunk, chunk_shared, + &hashtable->shared->head_primary_chunk); + chunk = + ExecHashPopChunk(hashtable, &chunk_shared, + &hashtable->shared->chunks_to_rebucket); + } + else + { + HashMemoryChunk next = chunk->next.private; + + chunk->next.private = hashtable->primary_chunk; + hashtable->primary_chunk = chunk; + chunk = next; + } } } +static void +ExecHashTableComputeOptimalBuckets(HashJoinTable hashtable) +{ + double ntuples = (hashtable->totalTuples - hashtable->skewTuples); + + /* + * Guard against integer overflow and alloc size overflow. The + * MaxAllocSize limitation doesn't really apply for shared hash tables, + * since DSA has no such limit, but for now let's apply the same limit. + */ + while (ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET) && + hashtable->nbuckets_optimal <= INT_MAX / 2 && + hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinBucketHead)) + { + hashtable->nbuckets_optimal *= 2; + hashtable->log2_nbuckets_optimal += 1; + } +} /* * ExecHashTableInsert @@ -829,7 +1380,8 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable) void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, - uint32 hashvalue) + uint32 hashvalue, + bool secondary) { MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot); int bucketno; @@ -848,11 +1400,17 @@ ExecHashTableInsert(HashJoinTable hashtable, */ HashJoinTuple hashTuple; int hashTupleSize; - double ntuples = (hashtable->totalTuples - hashtable->skewTuples); + dsa_pointer hashTuple_shared = InvalidDsaPointer; /* Create the HashJoinTuple */ hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len; - hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize); + if (HashJoinTableIsShared(hashtable)) + hashTuple = (HashJoinTuple) + dense_alloc_shared(hashtable, hashTupleSize, + &hashTuple_shared, secondary); + else + hashTuple = (HashJoinTuple) + dense_alloc(hashtable, hashTupleSize); hashTuple->hashvalue = hashvalue; memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); @@ -866,32 +1424,23 @@ ExecHashTableInsert(HashJoinTable hashtable, HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); /* Push it onto the front of the bucket's list */ - hashTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = hashTuple; + insert_tuple_into_bucket(hashtable, bucketno, hashTuple, + hashTuple_shared); /* * Increase the (optimal) number of buckets if we just exceeded the * NTUP_PER_BUCKET threshold, but only when there's still a single * batch. */ - if (hashtable->nbatch == 1 && - ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET)) - { - /* Guard against integer overflow and alloc size overflow */ - if (hashtable->nbuckets_optimal <= INT_MAX / 2 && - hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple)) - { - hashtable->nbuckets_optimal *= 2; - hashtable->log2_nbuckets_optimal += 1; - } - } + if (hashtable->nbatch == 1) + ExecHashTableComputeOptimalBuckets(hashtable); /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; if (hashtable->spaceUsed > hashtable->spacePeak) hashtable->spacePeak = hashtable->spaceUsed; if (hashtable->spaceUsed + - hashtable->nbuckets_optimal * sizeof(HashJoinTuple) + hashtable->nbuckets_optimal * sizeof(HashJoinBucketHead) > hashtable->spaceAllowed) ExecHashIncreaseNumBatches(hashtable); } @@ -901,9 +1450,11 @@ ExecHashTableInsert(HashJoinTable hashtable, * put the tuple into a temp file for later batches */ Assert(batchno > hashtable->curbatch); - ExecHashJoinSaveTuple(tuple, + ExecHashJoinSaveTuple(hashtable, + tuple, hashvalue, - &hashtable->innerBatchFile[batchno]); + batchno, + true); } } @@ -1047,6 +1598,138 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable, } /* + * Update the local hashtable with the current pointers and sizes from + * hashtable->parallel_state. + */ +void +ExecHashUpdate(HashJoinTable hashtable) +{ + Barrier *barrier; + + if (!HashJoinTableIsShared(hashtable)) + return; + + barrier = &hashtable->shared->barrier; + + /* + * This should only be called in a phase when the hash table is not being + * mutated (ie resized, swapped etc). + */ + Assert(!PHJ_PHASE_MUTATING_TABLE( + BarrierPhase(&hashtable->shared->barrier))); + + /* The primary hash table. */ + hashtable->buckets = (HashJoinBucketHead *) + dsa_get_address(hashtable->area, + hashtable->shared->primary_buckets); + hashtable->nbuckets = hashtable->shared->nbuckets; + hashtable->log2_nbuckets = my_log2(hashtable->nbuckets); + /* The secondary hash table, if there is one (NULL for initial batch). */ + hashtable->next_buckets = (HashJoinBucketHead *) + dsa_get_address(hashtable->area, + hashtable->shared->secondary_buckets); + + hashtable->curbatch = PHJ_PHASE_TO_BATCHNO(BarrierPhase(barrier)); +} + +/* + * Get the next tuple in the same bucket as 'tuple'. + */ +static HashJoinTuple +next_tuple_in_bucket(HashJoinTable table, HashJoinTuple tuple) +{ + if (HashJoinTableIsShared(table)) + return (HashJoinTuple) + dsa_get_address(table->area, tuple->next.shared); + else + return tuple->next.private; +} + +/* + * Get the first tuple in a given skew bucket identified by number. + */ +static HashJoinTuple +first_tuple_in_skew_bucket(HashJoinTable table, int skew_bucket_no) +{ + if (HashJoinTableIsShared(table)) + return (HashJoinTuple) + dsa_get_address(table->area, + table->skewBucket[skew_bucket_no]->tuples.shared); + else + return table->skewBucket[skew_bucket_no]->tuples.private; +} + +/* + * Get the first tuple in a given bucket identified by number. + */ +static HashJoinTuple +first_tuple_in_bucket(HashJoinTable table, int bucket_no) +{ + if (HashJoinTableIsShared(table)) + { + dsa_pointer p = + dsa_pointer_atomic_read(&table->buckets[bucket_no].shared); + return (HashJoinTuple) dsa_get_address(table->area, p); + } + else + return table->buckets[bucket_no].private; +} + +/* + * Insert a tuple at the front of a given bucket identified by number. For + * shared hash joins, tuple_shared must be provided, pointing to the tuple in + * the dsa_area backing the table. For private hash joins, it should be + * InvalidDsaPointer. + */ +static void +insert_tuple_into_bucket(HashJoinTable table, int bucket_no, + HashJoinTuple tuple, dsa_pointer tuple_shared) +{ + if (HashJoinTableIsShared(table)) + { + Assert(tuple == dsa_get_address(table->area, tuple_shared)); + for (;;) + { + tuple->next.shared = + dsa_pointer_atomic_read(&table->buckets[bucket_no].shared); + if (dsa_pointer_atomic_compare_exchange(&table->buckets[bucket_no].shared, + &tuple->next.shared, + tuple_shared)) + break; + } + } + else + { + tuple->next.private = table->buckets[bucket_no].private; + table->buckets[bucket_no].private = tuple; + } +} + +/* + * Insert a tuple at the front of a given skew bucket identified by number. + * For shared hash joins, tuple_shared must be provided, pointing to the tuple + * in the dsa_area backing the table. For private hash joins, it should be + * InvalidDsaPointer. + */ +static void +insert_tuple_into_skew_bucket(HashJoinTable table, int skew_bucket_no, + HashJoinTuple tuple, + dsa_pointer tuple_shared) +{ + if (HashJoinTableIsShared(table)) + { + tuple->next.shared = + table->skewBucket[skew_bucket_no]->tuples.shared; + table->skewBucket[skew_bucket_no]->tuples.shared = tuple_shared; + } + else + { + tuple->next.private = table->skewBucket[skew_bucket_no]->tuples.private; + table->skewBucket[skew_bucket_no]->tuples.private = tuple; + } +} + +/* * ExecScanHashBucket * scan a hash bucket for matches to the current outer tuple * @@ -1073,11 +1756,12 @@ ExecScanHashBucket(HashJoinState *hjstate, * otherwise scan the standard hashtable bucket. */ if (hashTuple != NULL) - hashTuple = hashTuple->next; + hashTuple = next_tuple_in_bucket(hashtable, hashTuple); else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO) - hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples; + hashTuple = first_tuple_in_skew_bucket(hashtable, + hjstate->hj_CurSkewBucketNo); else - hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo]; + hashTuple = first_tuple_in_bucket(hashtable, hjstate->hj_CurBucketNo); while (hashTuple != NULL) { @@ -1101,7 +1785,7 @@ ExecScanHashBucket(HashJoinState *hjstate, } } - hashTuple = hashTuple->next; + hashTuple = next_tuple_in_bucket(hashtable, hashTuple); } /* @@ -1144,6 +1828,21 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) HashJoinTable hashtable = hjstate->hj_HashTable; HashJoinTuple hashTuple = hjstate->hj_CurTuple; + if (HashJoinTableIsShared(hashtable)) + { + int phase PG_USED_FOR_ASSERTS_ONLY; + + /* + * TODO: This walks the buckets in parallel mode, like the existing + * code, but it might make more sense to hand out chunks to workers + * instead of buckets. + */ + + phase = BarrierPhase(&hashtable->shared->barrier); + Assert(PHJ_PHASE_TO_SUBPHASE(phase) == PHJ_SUBPHASE_UNMATCHED); + Assert(PHJ_PHASE_TO_BATCHNO(phase) == hashtable->curbatch); + } + for (;;) { /* @@ -1152,21 +1851,35 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) * bucket. */ if (hashTuple != NULL) - hashTuple = hashTuple->next; - else if (hjstate->hj_CurBucketNo < hashtable->nbuckets) + hashTuple = next_tuple_in_bucket(hashtable, hashTuple); + else if (HashJoinTableIsShared(hashtable)) { - hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo]; - hjstate->hj_CurBucketNo++; + int bucketno = + (int) pg_atomic_fetch_add_u32( + &hashtable->shared->next_unmatched_bucket, 1); + + if (bucketno >= hashtable->nbuckets) + break; /* finished all buckets */ + + hashTuple = first_tuple_in_bucket(hashtable, bucketno); + + /* TODO: parallel skew bucket support */ } - else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets) + else { - int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo]; + if (hjstate->hj_CurBucketNo < hashtable->nbuckets) + hashTuple = first_tuple_in_bucket(hashtable, + hjstate->hj_CurBucketNo++); + else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets) + { + int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo]; - hashTuple = hashtable->skewBucket[j]->tuples; - hjstate->hj_CurSkewBucketNo++; + hashTuple = first_tuple_in_skew_bucket(hashtable, j); + hjstate->hj_CurSkewBucketNo++; + } + else + break; /* finished all buckets */ } - else - break; /* finished all buckets */ while (hashTuple != NULL) { @@ -1191,7 +1904,7 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) return true; } - hashTuple = hashTuple->next; + hashTuple = next_tuple_in_bucket(hashtable, hashTuple); } } @@ -1212,6 +1925,65 @@ ExecHashTableReset(HashJoinTable hashtable) MemoryContext oldcxt; int nbuckets = hashtable->nbuckets; + if (HashJoinTableIsShared(hashtable)) + { + /* Wait for all workers to finish accessing the primary hash table. */ + Assert(PHJ_PHASE_TO_SUBPHASE(BarrierPhase(&hashtable->shared->barrier)) == + PHJ_SUBPHASE_UNMATCHED); + if (BarrierWait(&hashtable->shared->barrier, WAIT_EVENT_HASH_UNMATCHED)) + { + /* Serial phase: promote the secondary table to primary. */ + dsa_pointer tmp; + int i; + + Assert(PHJ_PHASE_TO_SUBPHASE(BarrierPhase(&hashtable->shared->barrier)) == + PHJ_SUBPHASE_PROMOTING); + + /* Clear the old primary table. */ + for (i = 0; i < nbuckets; ++i) + dsa_pointer_atomic_write(&hashtable->buckets[i].shared, + InvalidDsaPointer); + + /* Swap the two tables. */ + tmp = hashtable->shared->primary_buckets; + hashtable->shared->primary_buckets = + hashtable->shared->secondary_buckets; + hashtable->shared->secondary_buckets = tmp; + + /* Swap the chunk lists. */ + tmp = dsa_pointer_atomic_read(&hashtable->shared->head_primary_chunk); + dsa_pointer_atomic_write(&hashtable->shared->head_primary_chunk, + dsa_pointer_atomic_read(&hashtable->shared->head_secondary_chunk)); + dsa_pointer_atomic_write(&hashtable->shared->head_secondary_chunk, + tmp); + + /* Free the secondary chunks. */ + /* TODO: Or put them on a freelist in one cheap operation instead? */ + tmp = dsa_pointer_atomic_read(&hashtable->shared->head_secondary_chunk); + while (DsaPointerIsValid(tmp)) + { + HashMemoryChunk chunk = (HashMemoryChunk) + dsa_get_address(hashtable->area, tmp); + dsa_pointer next = chunk->next.shared; + + dsa_free(hashtable->area, tmp); + tmp = next; + } + dsa_pointer_atomic_write(&hashtable->shared->head_secondary_chunk, + InvalidDsaPointer); + + /* Reset the unmatched cursor. */ + pg_atomic_write_u32(&hashtable->shared->next_unmatched_bucket, + 0); + } + /* Wait again, so that all workers now have the new table. */ + BarrierWait(&hashtable->shared->barrier, WAIT_EVENT_HASH_PROMOTING); + Assert(PHJ_PHASE_TO_SUBPHASE(BarrierPhase(&hashtable->shared->barrier)) == + PHJ_SUBPHASE_LOADING); + ExecHashUpdate(hashtable); + return; + } + /* * Release all the hash buckets and tuples acquired in the prior pass, and * reinitialize the context for a new pass. @@ -1220,15 +1992,15 @@ ExecHashTableReset(HashJoinTable hashtable) oldcxt = MemoryContextSwitchTo(hashtable->batchCxt); /* Reallocate and reinitialize the hash bucket headers. */ - hashtable->buckets = (HashJoinTuple *) - palloc0(nbuckets * sizeof(HashJoinTuple)); + hashtable->buckets = (HashJoinBucketHead *) + palloc0(nbuckets * sizeof(HashJoinBucketHead)); hashtable->spaceUsed = 0; MemoryContextSwitchTo(oldcxt); /* Forget the chunks (the memory was freed by the context reset above). */ - hashtable->chunks = NULL; + hashtable->primary_chunk = NULL; } /* @@ -1241,10 +2013,14 @@ ExecHashTableResetMatchFlags(HashJoinTable hashtable) HashJoinTuple tuple; int i; + /* TODO: share parallel reset work! coordinate! */ + /* Reset all flags in the main table ... */ for (i = 0; i < hashtable->nbuckets; i++) { - for (tuple = hashtable->buckets[i]; tuple != NULL; tuple = tuple->next) + for (tuple = first_tuple_in_bucket(hashtable, i); + tuple != NULL; + tuple = next_tuple_in_bucket(hashtable, tuple)) HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple)); } @@ -1252,9 +2028,10 @@ ExecHashTableResetMatchFlags(HashJoinTable hashtable) for (i = 0; i < hashtable->nSkewBuckets; i++) { int j = hashtable->skewBucketNums[i]; - HashSkewBucket *skewBucket = hashtable->skewBucket[j]; - for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next) + for (tuple = first_tuple_in_skew_bucket(hashtable, j); + tuple != NULL; + tuple = next_tuple_in_bucket(hashtable, tuple)) HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple)); } } @@ -1414,11 +2191,11 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse) continue; /* Okay, create a new skew bucket for this hashvalue. */ - hashtable->skewBucket[bucket] = (HashSkewBucket *) + hashtable->skewBucket[bucket] = (HashSkewBucket *) /* TODO */ MemoryContextAlloc(hashtable->batchCxt, sizeof(HashSkewBucket)); hashtable->skewBucket[bucket]->hashvalue = hashvalue; - hashtable->skewBucket[bucket]->tuples = NULL; + hashtable->skewBucket[bucket]->tuples.private = NULL; hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket; hashtable->nSkewBuckets++; hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD; @@ -1496,18 +2273,29 @@ ExecHashSkewTableInsert(HashJoinTable hashtable, MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot); HashJoinTuple hashTuple; int hashTupleSize; + dsa_pointer tuple_pointer; /* Create the HashJoinTuple */ hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len; - hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt, - hashTupleSize); + if (HashJoinTableIsShared(hashtable)) + { + tuple_pointer = dsa_allocate(hashtable->area, hashTupleSize); + hashTuple = (HashJoinTuple) dsa_get_address(hashtable->area, + tuple_pointer); + } + else + { + tuple_pointer = InvalidDsaPointer; + hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt, + hashTupleSize); + } hashTuple->hashvalue = hashvalue; memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); /* Push it onto the front of the skew bucket's list */ - hashTuple->next = hashtable->skewBucket[bucketNumber]->tuples; - hashtable->skewBucket[bucketNumber]->tuples = hashTuple; + insert_tuple_into_skew_bucket(hashtable, bucketNumber, hashTuple, + tuple_pointer); /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; @@ -1538,6 +2326,9 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) int batchno; HashJoinTuple hashTuple; + /* TODO: skew buckets not yet supported for parallel mode */ + Assert(!HashJoinTableIsShared(hashtable)); + /* Locate the bucket to remove */ bucketToRemove = hashtable->skewBucketNums[hashtable->nSkewBuckets - 1]; bucket = hashtable->skewBucket[bucketToRemove]; @@ -1552,10 +2343,10 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); /* Process all tuples in the bucket */ - hashTuple = bucket->tuples; + hashTuple = first_tuple_in_skew_bucket(hashtable, bucketToRemove); while (hashTuple != NULL) { - HashJoinTuple nextHashTuple = hashTuple->next; + HashJoinTuple nextHashTuple = next_tuple_in_bucket(hashtable, hashTuple); MinimalTuple tuple; Size tupleSize; @@ -1581,8 +2372,8 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) memcpy(copyTuple, hashTuple, tupleSize); pfree(hashTuple); - copyTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = copyTuple; + insert_tuple_into_bucket(hashtable, bucketno, copyTuple, + InvalidDsaPointer); /* We have reduced skew space, but overall space doesn't change */ hashtable->spaceUsedSkew -= tupleSize; @@ -1591,8 +2382,8 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) { /* Put the tuple into a temp file for later batches */ Assert(batchno > hashtable->curbatch); - ExecHashJoinSaveTuple(tuple, hashvalue, - &hashtable->innerBatchFile[batchno]); + ExecHashJoinSaveTuple(hashtable, tuple, hashvalue, + batchno, true); pfree(hashTuple); hashtable->spaceUsed -= tupleSize; hashtable->spaceUsedSkew -= tupleSize; @@ -1636,6 +2427,141 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) } /* + * Add to the primary or secondary tuple counter. + */ +static void +add_tuple_count(HashJoinTable hashtable, int count, bool secondary) +{ + if (secondary) + pg_atomic_fetch_add_u64(&hashtable->shared->total_secondary_tuples, + count); + else + { + uint32 total = + pg_atomic_fetch_add_u64(&hashtable->shared->total_primary_tuples, + count); + /* Also update this backend's counter. */ + hashtable->totalTuples = total + count; + } +} + +/* + * Allocate 'size' bytes from the currently active shared HashMemoryChunk. + * This is essentially the same as the private memory version, but allocates + * from separate chunks for the secondary table and periodically updates the + * shared tuple counter. + */ +static void * +dense_alloc_shared(HashJoinTable hashtable, + Size size, + dsa_pointer *shared, + bool secondary) +{ + dsa_pointer chunk_shared; + HashMemoryChunk chunk; + char *ptr; + + /* just in case the size is not already aligned properly */ + size = MAXALIGN(size); + + /* + * If tuple size is larger than of 1/4 of chunk size, allocate a separate + * chunk. + */ + if (size > HASH_CHUNK_THRESHOLD) + { + /* allocate new chunk */ + chunk_shared = + dsa_allocate(hashtable->area, + offsetof(HashMemoryChunkData, data) + size); + chunk = (HashMemoryChunk) + dsa_get_address(hashtable->area, chunk_shared); + *shared = chunk_shared + offsetof(HashMemoryChunkData, data); + chunk->maxlen = size; + chunk->used = size; + chunk->ntuples = 1; + + /* + * Push onto the appropriate chunk list, but don't make it the current + * chunk because it hasn't got any more useful space in it. The + * current chunk may still have space, so keep that one current. + */ + ExecHashPushChunk(hashtable, chunk, chunk_shared, + secondary ? + &hashtable->shared->head_secondary_chunk : + &hashtable->shared->head_primary_chunk); + + /* Count these huge tuples immediately. */ + add_tuple_count(hashtable, 1, secondary); + return chunk->data; + } + + /* + * See if we have enough space for it in the current chunk (if any). If + * not, allocate a fresh chunk. + */ + chunk = secondary ? hashtable->secondary_chunk : hashtable->primary_chunk; + if (chunk == NULL || (chunk->maxlen - chunk->used) < size) + { + /* + * Add the tuplecount for the outgoing chunk to the shared counter. + * Doing this only every time we need to allocate a new chunk should + * reduce contention on the shared counter. + */ + if (chunk != NULL) + add_tuple_count(hashtable, chunk->ntuples, secondary); + + /* + * Allocate new chunk and make it the current chunk for this backend + * to allocate from. + */ + chunk_shared = + dsa_allocate(hashtable->area, + offsetof(HashMemoryChunkData, data) + + HASH_CHUNK_SIZE); + chunk = (HashMemoryChunk) + dsa_get_address(hashtable->area, chunk_shared); + *shared = chunk_shared + offsetof(HashMemoryChunkData, data); + if (secondary) + { + hashtable->secondary_chunk = chunk; + hashtable->secondary_chunk_shared = chunk_shared; + ExecHashPushChunk(hashtable, chunk, chunk_shared, + &hashtable->shared->head_secondary_chunk); + } + else + { + hashtable->primary_chunk = chunk; + hashtable->primary_chunk_shared = chunk_shared; + ExecHashPushChunk(hashtable, chunk, chunk_shared, + &hashtable->shared->head_primary_chunk); + } + chunk->maxlen = HASH_CHUNK_SIZE; + chunk->used = size; + chunk->ntuples = 1; + + /* + * The shared tuple counter will be updated when this chunk is + * eventually full. See above. + */ + + return chunk->data; + } + + /* There is enough space in the current chunk, let's add the tuple */ + chunk_shared = + secondary ? hashtable->secondary_chunk_shared : + hashtable->primary_chunk_shared; + ptr = chunk->data + chunk->used; + *shared = chunk_shared + offsetof(HashMemoryChunkData, data) + chunk->used; + chunk->used += size; + chunk->ntuples += 1; + + /* return pointer to the start of the tuple memory */ + return ptr; +} + +/* * Allocate 'size' bytes from the currently active HashMemoryChunk */ static void * @@ -1653,9 +2579,11 @@ dense_alloc(HashJoinTable hashtable, Size size) */ if (size > HASH_CHUNK_THRESHOLD) { + /* allocate new chunk and put it at the beginning of the list */ - newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt, - offsetof(HashMemoryChunkData, data) + size); + newChunk = (HashMemoryChunk) + MemoryContextAlloc(hashtable->batchCxt, + offsetof(HashMemoryChunkData, data) + size); newChunk->maxlen = size; newChunk->used = 0; newChunk->ntuples = 0; @@ -1664,15 +2592,15 @@ dense_alloc(HashJoinTable hashtable, Size size) * Add this chunk to the list after the first existing chunk, so that * we don't lose the remaining space in the "current" chunk. */ - if (hashtable->chunks != NULL) + if (hashtable->primary_chunk != NULL) { - newChunk->next = hashtable->chunks->next; - hashtable->chunks->next = newChunk; + newChunk->next.private = hashtable->primary_chunk->next.private; + hashtable->primary_chunk->next.private = newChunk; } else { - newChunk->next = hashtable->chunks; - hashtable->chunks = newChunk; + newChunk->next.private = NULL; + hashtable->primary_chunk = newChunk; } newChunk->used += size; @@ -1685,27 +2613,27 @@ dense_alloc(HashJoinTable hashtable, Size size) * See if we have enough space for it in the current chunk (if any). If * not, allocate a fresh chunk. */ - if ((hashtable->chunks == NULL) || - (hashtable->chunks->maxlen - hashtable->chunks->used) < size) + if ((hashtable->primary_chunk == NULL) || + (hashtable->primary_chunk->maxlen - hashtable->primary_chunk->used) < size) { /* allocate new chunk and put it at the beginning of the list */ - newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt, - offsetof(HashMemoryChunkData, data) + HASH_CHUNK_SIZE); - + newChunk = (HashMemoryChunk) + MemoryContextAlloc(hashtable->batchCxt, + offsetof(HashMemoryChunkData, data) + + HASH_CHUNK_SIZE); + newChunk->next.private = hashtable->primary_chunk; + hashtable->primary_chunk = newChunk; newChunk->maxlen = HASH_CHUNK_SIZE; newChunk->used = size; newChunk->ntuples = 1; - newChunk->next = hashtable->chunks; - hashtable->chunks = newChunk; - return newChunk->data; } /* There is enough space in the current chunk, let's add the tuple */ - ptr = hashtable->chunks->data + hashtable->chunks->used; - hashtable->chunks->used += size; - hashtable->chunks->ntuples += 1; + ptr = hashtable->primary_chunk->data + hashtable->primary_chunk->used; + hashtable->primary_chunk->used += size; + hashtable->primary_chunk->ntuples += 1; /* return pointer to the start of the tuple memory */ return ptr; diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 369e666..b8f90a6 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -21,8 +21,11 @@ #include "executor/nodeHash.h" #include "executor/nodeHashjoin.h" #include "miscadmin.h" +#include "pgstat.h" +#include "storage/barrier.h" #include "utils/memutils.h" +#include /* TODO: remove */ /* * States of the ExecHashJoin state machine @@ -42,11 +45,13 @@ static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue); -static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate, - BufFile *file, +static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinTable hashtable, uint32 *hashvalue, TupleTableSlot *tupleSlot); static bool ExecHashJoinNewBatch(HashJoinState *hjstate); +static void ExecHashJoinLoadBatch(HashJoinState *hjstate); +static void ExecHashJoinExportBatches(HashJoinTable hashtable); +static void ExecHashJoinPreloadNextBatch(HashJoinTable hashtable); /* ---------------------------------------------------------------- @@ -147,6 +152,14 @@ ExecHashJoin(HashJoinState *node) /* no chance to not build the hash table */ node->hj_FirstOuterTupleSlot = NULL; } + else if (hashNode->shared_table_data != NULL) + { + /* + * TODO: The empty-outer optimization is not implemented + * for shared hash tables yet. + */ + node->hj_FirstOuterTupleSlot = NULL; + } else if (HJ_FILL_OUTER(node) || (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost && !node->hj_OuterNotEmpty)) @@ -166,7 +179,7 @@ ExecHashJoin(HashJoinState *node) /* * create the hash table */ - hashtable = ExecHashTableCreate((Hash *) hashNode->ps.plan, + hashtable = ExecHashTableCreate(hashNode, node->hj_HashOperators, HJ_FILL_INNER(node)); node->hj_HashTable = hashtable; @@ -177,12 +190,29 @@ ExecHashJoin(HashJoinState *node) hashNode->hashtable = hashtable; (void) MultiExecProcNode((PlanState *) hashNode); + if (HashJoinTableIsShared(hashtable)) + { + Assert(BarrierPhase(&hashtable->shared->barrier) >= + PHJ_PHASE_HASHING); + + /* Allow other backends to access batches we generated. */ + ExecHashJoinExportBatches(hashtable); + + /* + * Check if we are a worker that attached too late to + * avoid deadlock risk with the leader. + */ + if (ExecHashCheckForEarlyExit(hashtable)) + return NULL; + } + /* * If the inner relation is completely empty, and we're not * doing a left outer join, we can quit without scanning the * outer relation. */ - if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node)) + if (!HashJoinTableIsShared(hashtable) && /* TODO:TM */ + hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node)) return NULL; /* @@ -198,12 +228,73 @@ ExecHashJoin(HashJoinState *node) */ node->hj_OuterNotEmpty = false; - node->hj_JoinState = HJ_NEED_NEW_OUTER; + if (HashJoinTableIsShared(hashtable)) + { + Barrier *barrier = &hashtable->shared->barrier; + int phase = BarrierPhase(barrier); + + /* + * Map the current phase to the appropriate initial state + * for this worker, so we can get started. + */ + Assert(BarrierPhase(barrier) >= PHJ_PHASE_PROBING); + hashtable->curbatch = PHJ_PHASE_TO_BATCHNO(phase); + switch (PHJ_PHASE_TO_SUBPHASE(phase)) + { + case PHJ_SUBPHASE_PROMOTING: + /* Wait for serial phase to finish. */ + BarrierWait(barrier, WAIT_EVENT_HASHJOIN_PROMOTING); + Assert(PHJ_PHASE_TO_SUBPHASE(BarrierPhase(barrier)) == + PHJ_SUBPHASE_LOADING); + /* fall through */ + case PHJ_SUBPHASE_LOADING: + /* Help load the current batch. */ + ExecHashUpdate(hashtable); + ExecHashJoinOpenBatch(hashtable, hashtable->curbatch, + true); + ExecHashJoinLoadBatch(node); + Assert(PHJ_PHASE_TO_SUBPHASE(BarrierPhase(barrier)) == + PHJ_SUBPHASE_PROBING); + /* fall through */ + case PHJ_SUBPHASE_PREPARING: + /* Wait for serial phase to finish. */ + BarrierWait(barrier, WAIT_EVENT_HASHJOIN_PROMOTING); + Assert(PHJ_PHASE_TO_SUBPHASE(BarrierPhase(barrier)) == + PHJ_SUBPHASE_PROBING); + /* fall through */ + case PHJ_SUBPHASE_PROBING: + /* Help probe the current batch. */ + ExecHashUpdate(hashtable); + ExecHashJoinOpenBatch(hashtable, hashtable->curbatch, + false); + node->hj_JoinState = HJ_NEED_NEW_OUTER; + break; + case PHJ_SUBPHASE_UNMATCHED: + /* Help scan for unmatched inner tuples. */ + ExecHashUpdate(hashtable); + node->hj_JoinState = HJ_FILL_INNER_TUPLES; + break; + } + continue; + } + else + { + node->hj_JoinState = HJ_NEED_NEW_OUTER; + ExecHashJoinOpenBatch(hashtable, 0, false); + } /* FALL THRU */ case HJ_NEED_NEW_OUTER: + if (HashJoinTableIsShared(hashtable)) + { + Assert(PHJ_PHASE_TO_BATCHNO(BarrierPhase(&hashtable->shared->barrier)) == + hashtable->curbatch); + Assert(PHJ_PHASE_TO_SUBPHASE(BarrierPhase(&hashtable->shared->barrier)) == + PHJ_SUBPHASE_PROBING); + } + /* * We don't have an outer tuple, try to get the next one */ @@ -213,6 +304,47 @@ ExecHashJoin(HashJoinState *node) if (TupIsNull(outerTupleSlot)) { /* end of batch, or maybe whole join */ + + /* + * Switch to reading tuples from the next inner batch. We + * do this here because in the shared hash table case we + * want to do this before ExecHashJoinPreloadNextBatch. + */ + if (hashtable->curbatch + 1 < hashtable->nbatch) + ExecHashJoinOpenBatch(hashtable, + hashtable->curbatch + 1, + true); + + if (HashJoinTableIsShared(hashtable)) + { + /* Allow other backends to access our batches. */ + ExecHashJoinExportBatches(hashtable); + /* + * Check if we are a leader that can't go further than + * probing the first batch without deadlock risk, + * because there are workers running. + */ + if (ExecHashCheckForEarlyExit(hashtable)) + return NULL; + + /* + * We may be able to load some amount of the next + * batch into spare work_mem, before we start waiting + * for other workers to finish probing the current + * batch. + */ + ExecHashJoinPreloadNextBatch(hashtable); + + /* + * You can't start searching for unmatched tuples + * until all workers have finished probing, so we + * synchronize here. + */ + BarrierWait(&hashtable->shared->barrier, + WAIT_EVENT_HASHJOIN_PROBING); + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_UNMATCHED_BATCH(hashtable->curbatch)); + } if (HJ_FILL_INNER(node)) { /* set up to scan for unmatched inner tuples */ @@ -250,9 +382,9 @@ ExecHashJoin(HashJoinState *node) * Save it in the corresponding outer-batch file. */ Assert(batchno > hashtable->curbatch); - ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot), - hashvalue, - &hashtable->outerBatchFile[batchno]); + ExecHashJoinSaveTuple(hashtable, + ExecFetchSlotMinimalTuple(outerTupleSlot), + hashvalue, batchno, false); /* Loop around, staying in HJ_NEED_NEW_OUTER state */ continue; } @@ -296,6 +428,13 @@ ExecHashJoin(HashJoinState *node) if (joinqual == NIL || ExecQual(joinqual, econtext, false)) { node->hj_MatchedOuter = true; + /* + * Note: it is OK to do this in a shared hash table + * without any kind of memory synchronization, because the + * only transition is 0->1, so ordering doesn't matter if + * several backends do it, and there will be a memory + * barrier before anyone reads it. + */ HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)); /* In an antijoin, we never return a matched tuple */ @@ -632,6 +771,29 @@ ExecEndHashJoin(HashJoinState *node) } /* + * For shared hash joins, load as much of the next batch as we can as part of + * the probing phase for the current batch. This overlapping means that we do + * something useful before we start waiting for other workers. + */ +static void +ExecHashJoinPreloadNextBatch(HashJoinTable hashtable) +{ + if (HashJoinTableIsShared(hashtable)) + { + Barrier *barrier PG_USED_FOR_ASSERTS_ONLY = &hashtable->shared->barrier; + int curbatch = hashtable->curbatch; + int next_batch = curbatch + 1; + + Assert(BarrierPhase(barrier) == PHJ_PHASE_PROBING_BATCH(curbatch)); + + if (next_batch < hashtable->nbatch) + { + /* TODO: Load into secondary hash table while memory is free! */ + } + } +} + +/* * ExecHashJoinOuterGetTuple * * get the next outer tuple for hashjoin: either by @@ -702,8 +864,7 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode, if (file == NULL) return NULL; - slot = ExecHashJoinGetSavedTuple(hjstate, - file, + slot = ExecHashJoinGetSavedTuple(hashtable, hashvalue, hjstate->hj_OuterTupleSlot); if (!TupIsNull(slot)) @@ -726,13 +887,14 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) HashJoinTable hashtable = hjstate->hj_HashTable; int nbatch; int curbatch; - BufFile *innerFile; - TupleTableSlot *slot; - uint32 hashvalue; nbatch = hashtable->nbatch; curbatch = hashtable->curbatch; + if (HashJoinTableIsShared(hashtable)) + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_UNMATCHED_BATCH(curbatch)); + if (curbatch > 0) { /* @@ -776,7 +938,8 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * need to be reassigned. */ curbatch++; - while (curbatch < nbatch && + while (!HashJoinTableIsShared(hashtable) && + curbatch < nbatch && (hashtable->outerBatchFile[curbatch] == NULL || hashtable->innerBatchFile[curbatch] == NULL)) { @@ -792,7 +955,6 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) if (hashtable->outerBatchFile[curbatch] && nbatch != hashtable->nbatch_outstart) break; /* must process due to rule 3 */ - /* We can ignore this batch. */ /* Release associated temp files right away. */ if (hashtable->innerBatchFile[curbatch]) BufFileClose(hashtable->innerBatchFile[curbatch]); @@ -812,48 +974,175 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * Reload the hash table with the new inner batch (which could be empty) */ ExecHashTableReset(hashtable); + ExecHashJoinLoadBatch(hjstate); - innerFile = hashtable->innerBatchFile[curbatch]; + return true; +} + +static void +ExecHashJoinLoadBatch(HashJoinState *hjstate) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int curbatch = hashtable->curbatch; + TupleTableSlot *slot; + uint32 hashvalue; - if (innerFile != NULL) + if (HashJoinTableIsShared(hashtable)) + Assert(PHJ_PHASE_TO_SUBPHASE(BarrierPhase(&hashtable->shared->barrier)) == + PHJ_SUBPHASE_LOADING); + + /* + * In HJ_NEED_NEW_OUTER, we already selected the current inner batch for + * reading from. If there is a shared hash table, we may have already + * partially loaded the hash table in ExecHashJoinPreloadNextBatch. + */ + Assert(hashtable->batch_reader.batchno = curbatch); + Assert(hashtable->batch_reader.inner); + + for (;;) { - if (BufFileSeek(innerFile, 0, 0L, SEEK_SET)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not rewind hash-join temporary file: %m"))); + slot = ExecHashJoinGetSavedTuple(hashtable, + &hashvalue, + hjstate->hj_HashTupleSlot); - while ((slot = ExecHashJoinGetSavedTuple(hjstate, - innerFile, - &hashvalue, - hjstate->hj_HashTupleSlot))) - { - /* - * NOTE: some tuples may be sent to future batches. Also, it is - * possible for hashtable->nbatch to be increased here! - */ - ExecHashTableInsert(hashtable, slot, hashvalue); - } + if (slot == NULL) + break; /* - * after we build the hash table, the inner batch file is no longer - * needed + * NOTE: some tuples may be sent to future batches. Also, it is + * possible for hashtable->nbatch to be increased here! */ - BufFileClose(innerFile); - hashtable->innerBatchFile[curbatch] = NULL; + ExecHashTableInsert(hashtable, slot, hashvalue, false); } /* - * Rewind outer batch file (if present), so that we can start reading it. + * Now that we have finished loading this batch into the hash table, we + * can set our outer batch read head to the start of the current batch, + * and our inner batch read head to the start of the NEXT batch (as + * expected by ExecHashJoinPreloadNextBatch). */ - if (hashtable->outerBatchFile[curbatch] != NULL) + if (HashJoinTableIsShared(hashtable)) + { + /* + * Wait until all workers have finished loading their portion of the + * hash table. + */ + if (BarrierWait(&hashtable->shared->barrier, WAIT_EVENT_HASHJOIN_LOADING)) + { + /* Serial phase: prepare to read this outer and next inner batch */ + ExecHashJoinRewindBatches(hashtable, hashtable->curbatch); + } + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_PREPARING_BATCH(hashtable->curbatch)); + BarrierWait(&hashtable->shared->barrier, WAIT_EVENT_HASHJOIN_PREPARING); + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_PROBING_BATCH(hashtable->curbatch)); + } + else + ExecHashJoinRewindBatches(hashtable, hashtable->curbatch); + + /* + * The inner batch file is no longer needed by any participant, because + * the hash table has been fully reloaded. + */ + ExecHashJoinCloseBatch(hashtable, hashtable->curbatch, true); + + /* Prepare to read from the current outer batch. */ + ExecHashJoinOpenBatch(hashtable, hashtable->curbatch, false); +} + +/* + * Export a BufFile, copy the descriptor to DSA memory and return the + * dsa_pointer. + */ +static dsa_pointer +make_batch_descriptor(dsa_area *area, BufFile *file) +{ + dsa_pointer pointer; + BufFileDescriptor *source; + BufFileDescriptor *target; + size_t size; + + source = BufFileExport(file); + size = BufFileDescriptorSize(source); + pointer = dsa_allocate(area, size); + if (!DsaPointerIsValid(pointer)) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed on dsa_allocate of size %zu.", size))); + target = dsa_get_address(area, pointer); + memcpy(target, source, size); + pfree(source); + + return pointer; +} + +/* + * Publish a batch descriptor for a future batch so that other participants + * can import it and read it. If 'descriptor' is InvalidDsaPointer, then + * forget the published descriptor so that it will be reexported later. + */ +static void +set_batch_descriptor(HashJoinTable hashtable, int batchno, bool inner, + dsa_pointer descriptor) +{ + HashJoinParticipantState *participant; + dsa_pointer *level1; + dsa_pointer *level2; + int rank; + int index; + + participant = &hashtable->shared->participants[HashJoinParticipantNumber()]; + rank = fls(batchno); + index = batchno % (1 << (rank - 1)); + level1 = inner ? participant->inner_batch_descriptors + : participant->outer_batch_descriptors; + if (level1[rank] == InvalidDsaPointer) { - if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET)) + size_t size = sizeof(dsa_pointer) * (1 << rank); + + level1[rank] = dsa_allocate(hashtable->area, size); + if (level1[rank] == InvalidDsaPointer) ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not rewind hash-join temporary file: %m"))); + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed on dsa_allocate of size %zu.", size))); + level2 = dsa_get_address(hashtable->area, level1[rank]); + memset(level2, 0, size); } + level2 = dsa_get_address(hashtable->area, level1[rank]); + if (level2[index] != InvalidDsaPointer) + dsa_free(hashtable->area, level2[index]); + level2[index] = descriptor; +} - return true; +/* + * Get a batch descriptor published by a given participant, if there is one. + */ +static BufFileDescriptor * +get_batch_descriptor(HashJoinTable hashtable, int participant_number, + int batchno, bool inner) +{ + HashJoinParticipantState *participant; + dsa_pointer *level1; + dsa_pointer *level2; + int rank; + int index; + + participant = &hashtable->shared->participants[participant_number]; + rank = fls(batchno); + index = batchno % (1 << (rank - 1)); + level1 = inner ? participant->inner_batch_descriptors + : participant->outer_batch_descriptors; + if (level1[rank] == InvalidDsaPointer) + return NULL; + level2 = dsa_get_address(hashtable->area, level1[rank]); + if (level2[index] == InvalidDsaPointer) + return NULL; + + return (BufFileDescriptor *) + dsa_get_address(hashtable->area, level2[index]); } /* @@ -868,17 +1157,40 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * will get messed up. */ void -ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, - BufFile **fileptr) +ExecHashJoinSaveTuple(HashJoinTable hashtable, + MinimalTuple tuple, uint32 hashvalue, + int batchno, + bool inner) { - BufFile *file = *fileptr; + BufFile *file; size_t written; + if (inner) + file = hashtable->innerBatchFile[batchno]; + else + file = hashtable->outerBatchFile[batchno]; if (file == NULL) { /* First write to this batch file, so open it. */ file = BufFileCreateTemp(false); - *fileptr = file; + if (inner) + hashtable->innerBatchFile[batchno] = file; + else + hashtable->outerBatchFile[batchno] = file; + } + + if (HashJoinTableIsShared(hashtable)) + { + /* This batch needs to be re-exported, if it was already exported. */ + /* + * TODO: This is far too expensive: need a bitmap? or maybe just + * export every batch when it's the next one to be processed, + * regardless of whether we've written anything to it (the point being + * that the list of files backing a BufFile can change when you write + * to it)? If we do that then we still need to export ALL before + * exiting early. + */ + set_batch_descriptor(hashtable, batchno, inner, InvalidDsaPointer); } written = BufFileWrite(file, (void *) &hashvalue, sizeof(uint32)); @@ -895,54 +1207,337 @@ ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, } /* + * Export unexported future batches created by this participant, so that other + * participants can read from them after they have finished reading their own. + */ +static void +ExecHashJoinExportBatches(HashJoinTable hashtable) +{ + int i; + + /* Find this participant's HashJoinParticipantState object. */ + Assert(HashJoinParticipantNumber() < hashtable->shared->planned_participants); + + /* Export future batches and copy their descriptors into DSA memory. */ + for (i = hashtable->curbatch + 1; i < hashtable->nbatch; ++i) + { + if (hashtable->innerBatchFile[i] != NULL && + get_batch_descriptor(hashtable, HashJoinParticipantNumber(), i, true) == InvalidDsaPointer) + set_batch_descriptor(hashtable, i, true, + make_batch_descriptor(hashtable->area, hashtable->innerBatchFile[i])); + if (hashtable->outerBatchFile[i] != NULL && + get_batch_descriptor(hashtable, HashJoinParticipantNumber(), i, false) == InvalidDsaPointer) + set_batch_descriptor(hashtable, i, false, + make_batch_descriptor(hashtable->area, hashtable->outerBatchFile[i])); + } +} + +/* + * Select the batch file that ExecHashJoinGetSavedTuple will read from. + */ +void +ExecHashJoinOpenBatch(HashJoinTable hashtable, int batchno, bool inner) +{ + HashJoinBatchReader *batch_reader = &hashtable->batch_reader; + + if (batchno == 0) + batch_reader->file = NULL; + else + batch_reader->file = inner + ? hashtable->innerBatchFile[batchno] + : hashtable->outerBatchFile[batchno]; + + if (HashJoinTableIsShared(hashtable)) + { + HashJoinParticipantState *participant; + + participant = + &hashtable->shared->participants[HashJoinParticipantNumber()]; + batch_reader->shared = inner + ? &participant->inner_batch_reader + : &participant->outer_batch_reader; + /* We will seek to the shared position at next read. */ + batch_reader->head.fileno = -1; + batch_reader->head.offset = -1; + } + else + { + batch_reader->shared = NULL; + /* Seek to start of batch now, if there is one. */ + if (batch_reader->file != NULL) + BufFileSeek(batch_reader->file, 0, 0, SEEK_SET); + } + + batch_reader->participant_number = HashJoinParticipantNumber(); + batch_reader->batchno = batchno; + batch_reader->inner = inner; + +} + +/* + * Close a batch, once it is not needed by any participant. This causes batch + * files created by this participant to be deleted. + */ +void +ExecHashJoinCloseBatch(HashJoinTable hashtable, int batchno, bool inner) +{ + HashJoinParticipantState *participant; + HashJoinBatchReader *batch_reader; + BufFile *file; + + /* + * We only need to close the batch owned by THIS participant. That causes + * it to be deleted. Batches opened in this backend but created by other + * participants are closed by ExecHashJoinGetSavedTuple when it reaches + * the end of the file, allowing them to be closed sooner. + */ + batch_reader = &hashtable->batch_reader; + participant = &hashtable->shared->participants[HashJoinParticipantNumber()]; + if (inner) + { + file = hashtable->innerBatchFile[batchno]; + hashtable->innerBatchFile[batchno] = NULL; + } + else + { + file = hashtable->outerBatchFile[batchno]; + hashtable->outerBatchFile[batchno] = NULL; + } + if (file == NULL) + return; + + Assert(batch_reader->file == NULL || file == batch_reader->file); + BufFileClose(file); + batch_reader->file = NULL; +} + +/* + * Rewind batch readers. The outer batch reader is rewound to the start of + * batchno. The inner batch reader is rewound to the start of batchno + 1, in + * anticipation of preloading the next batch. + */ +void +ExecHashJoinRewindBatches(HashJoinTable hashtable, int batchno) +{ + HashJoinBatchReader *batch_reader; + int i; + + batch_reader = &hashtable->batch_reader; + + if (HashJoinTableIsShared(hashtable)) + { + Assert(BarrierPhase(&hashtable->shared->barrier) == PHJ_PHASE_CREATING || + (PHJ_PHASE_TO_SUBPHASE(BarrierPhase(&hashtable->shared->barrier)) == + PHJ_SUBPHASE_PREPARING && + PHJ_PHASE_TO_BATCHNO(BarrierPhase(&hashtable->shared->barrier)) == + batchno)); + + /* + * Position the shared read heads for each participant's batch. + * Readers will seek their BufFile as required to synchronize. + */ + for (i = 0; i < hashtable->shared->planned_participants; ++i) + { + HashJoinSharedBatchReader *reader; + + reader = &hashtable->shared->participants[i].outer_batch_reader; + reader->batchno = batchno; /* for probing this batch */ + reader->head.fileno = 0; + reader->head.offset = 0; + + reader = &hashtable->shared->participants[i].inner_batch_reader; + reader->batchno = batchno + 1; /* for preloading the next batch */ + reader->head.fileno = 0; + reader->head.offset = 0; + } + } +} + +/* * ExecHashJoinGetSavedTuple - * read the next tuple from a batch file. Return NULL if no more. + * read the next tuple from the batch selected with + * ExecHashJoinOpenBatch, including the batch files of + * other participants if the hash table is shared. Return NULL if no + * more. * * On success, *hashvalue is set to the tuple's hash value, and the tuple * itself is stored in the given slot. */ static TupleTableSlot * -ExecHashJoinGetSavedTuple(HashJoinState *hjstate, - BufFile *file, +ExecHashJoinGetSavedTuple(HashJoinTable hashtable, uint32 *hashvalue, TupleTableSlot *tupleSlot) { - uint32 header[2]; - size_t nread; - MinimalTuple tuple; + TupleTableSlot *result = NULL; + HashJoinBatchReader *batch_reader = &hashtable->batch_reader; + BufFileDescriptor *descriptor; - /* - * Since both the hash value and the MinimalTuple length word are uint32, - * we can read them both in one BufFileRead() call without any type - * cheating. - */ - nread = BufFileRead(file, (void *) header, sizeof(header)); - if (nread == 0) /* end of file */ + for (;;) { - ExecClearTuple(tupleSlot); - return NULL; - } - if (nread != sizeof(header)) - ereport(ERROR, - (errcode_for_file_access(), + uint32 header[2]; + size_t nread; + MinimalTuple tuple; + bool can_close = false; + + if (batch_reader->file == NULL) + { + /* + * No file found for the current participant. Try stealing tuples + * from the next participant. + */ + goto next_participant; + } + + if (HashJoinTableIsShared(hashtable)) + { + LWLockAcquire(&batch_reader->shared->lock, LW_EXCLUSIVE); + Assert(batch_reader->shared->batchno == batch_reader->batchno); + if (batch_reader->shared->error) + { + /* Don't try to read if reading failed in some other backend. */ + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from hash-join temporary file"))); + } + + /* Set the shared error flag, which we'll clear if we succeed. */ + batch_reader->shared->error = true; + + /* + * If another worker has moved the shared read head since we last read, + * we'll need to seek to the new shared position. + */ + if (batch_reader->head.fileno != batch_reader->shared->head.fileno || + batch_reader->head.offset != batch_reader->shared->head.offset) + { + BufFileSeek(batch_reader->file, + batch_reader->shared->head.fileno, + batch_reader->shared->head.offset, + SEEK_SET); + batch_reader->head = batch_reader->shared->head; + } + } + + /* Try to read the size and hash. */ + nread = BufFileRead(batch_reader->file, (void *) header, sizeof(header)); + if (nread > 0) + { + if (nread != sizeof(header)) + { + ereport(ERROR, + (errcode_for_file_access(), errmsg("could not read from hash-join temporary file: %m"))); - *hashvalue = header[0]; - tuple = (MinimalTuple) palloc(header[1]); - tuple->t_len = header[1]; - nread = BufFileRead(file, - (void *) ((char *) tuple + sizeof(uint32)), - header[1] - sizeof(uint32)); - if (nread != header[1] - sizeof(uint32)) - ereport(ERROR, - (errcode_for_file_access(), + } + *hashvalue = header[0]; + tuple = (MinimalTuple) palloc(header[1]); + tuple->t_len = header[1]; + nread = BufFileRead(batch_reader->file, + (void *) ((char *) tuple + sizeof(uint32)), + header[1] - sizeof(uint32)); + if (nread != header[1] - sizeof(uint32)) + { + ereport(ERROR, + (errcode_for_file_access(), errmsg("could not read from hash-join temporary file: %m"))); - return ExecStoreMinimalTuple(tuple, tupleSlot, true); -} + } + + result = ExecStoreMinimalTuple(tuple, tupleSlot, true); + + } + + if (HashJoinTableIsShared(hashtable)) + { + if (nread == 0 && + batch_reader->participant_number != + HashJoinParticipantNumber()) + { + /* + * We've reached the end of another paticipant's batch file, + * so close it now. We'll deal with closing THIS + * participant's batch file later, because we don't want the + * files to be deleted just yet. + */ + can_close = true; + } + /* Commit new head position to shared memory and clear error. */ + BufFileTell(batch_reader->file, + &batch_reader->head.fileno, + &batch_reader->head.offset); + batch_reader->shared->head = batch_reader->head; + batch_reader->shared->error = false; + LWLockRelease(&batch_reader->shared->lock); + } + + if (can_close) + { + BufFileClose(batch_reader->file); + batch_reader->file = NULL; + } + + if (result != NULL) + return result; + +next_participant: + if (!HashJoinTableIsShared(hashtable)) + { + /* Private hash table, end of batch. */ + ExecClearTuple(tupleSlot); + return NULL; + } + + /* Try the next participant's batch file. */ + batch_reader->participant_number = + (batch_reader->participant_number + 1) % + hashtable->shared->planned_participants; + if (batch_reader->participant_number == HashJoinParticipantNumber()) + { + /* + * We've made it all the way back to the file we started with, + * which is the one that this backend wrote. So there are no more + * tuples to be had in any participant's batch file. + */ + ExecClearTuple(tupleSlot); + return NULL; + } + /* Import the BufFile from that participant, if it exported one. */ + descriptor = get_batch_descriptor(hashtable, + batch_reader->participant_number, + batch_reader->batchno, + batch_reader->inner); + if (descriptor == NULL) + batch_reader->file = NULL; + else + { + HashJoinParticipantState *participant; + + batch_reader->file = BufFileImport(descriptor); + participant = + &hashtable->shared->participants[batch_reader->participant_number]; + if (batch_reader->inner) + batch_reader->shared = &participant->inner_batch_reader; + else + batch_reader->shared = &participant->outer_batch_reader; + batch_reader->head.fileno = batch_reader->head.offset = -1; + } + } +} void ExecReScanHashJoin(HashJoinState *node) { + HashState *hashNode = (HashState *) innerPlanState(node); + + /* We can't use HashJoinTableIsShared if the table is NULL. */ + if (hashNode->shared_table_data != NULL) + { + elog(ERROR, "TODO: shared ExecReScanHashJoin not implemented"); + + /* Coordinate a rewind to the shared hash table creation phase. */ + BarrierWaitSet(&hashNode->shared_table_data->barrier, PHJ_PHASE_INIT, + WAIT_EVENT_HASHJOIN_REWINDING); + } + /* * In a multi-batch join, we currently have to do rescans the hard way, * primarily because batch temp files may have already been released. But @@ -977,6 +1572,14 @@ ExecReScanHashJoin(HashJoinState *node) /* ExecHashJoin can skip the BUILD_HASHTABLE step */ node->hj_JoinState = HJ_NEED_NEW_OUTER; + + if (HashJoinTableIsShared(node->hj_HashTable)) + { + /* Coordinate a rewind to the shared probing phase. */ + BarrierWaitSet(&hashNode->shared_table_data->barrier, + PHJ_PHASE_PROBING, + WAIT_EVENT_HASHJOIN_REWINDING2); + } } else { @@ -985,6 +1588,14 @@ ExecReScanHashJoin(HashJoinState *node) node->hj_HashTable = NULL; node->hj_JoinState = HJ_BUILD_HASHTABLE; + if (HashJoinTableIsShared(node->hj_HashTable)) + { + /* Coordinate a rewind to the shared hash table creation phase. */ + BarrierWaitSet(&hashNode->shared_table_data->barrier, + PHJ_PHASE_INIT, + WAIT_EVENT_HASHJOIN_REWINDING3); + } + /* * if chgParam of subnode is not null then plan will be re-scanned * by first ExecProcNode. @@ -1011,3 +1622,110 @@ ExecReScanHashJoin(HashJoinState *node) if (node->js.ps.lefttree->chgParam == NULL) ExecReScan(node->js.ps.lefttree); } + +void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt) +{ + size_t size; + + size = offsetof(SharedHashJoinTableData, participants) + + sizeof(HashJoinParticipantState) * (pcxt->nworkers + 1); + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +static void +configure_reader_locks(HashJoinParticipantState *participants, int count) +{ + int i; + + static LWLockTranche inner_tranche; + static LWLockTranche outer_tranche; + + inner_tranche.name = "Hash Join/inner batch"; + inner_tranche.array_base = + (char *) &participants[0].inner_batch_reader.lock; + inner_tranche.array_stride = sizeof(HashJoinParticipantState); + LWLockRegisterTranche(LWTRANCHE_PARALLEL_HASH_JOIN_INNER_BATCH_READER, + &inner_tranche); + + outer_tranche.name = "Hash Join/outer batch"; + outer_tranche.array_base = + (char *) &participants[0].outer_batch_reader.lock; + outer_tranche.array_stride = sizeof(HashJoinParticipantState); + LWLockRegisterTranche(LWTRANCHE_PARALLEL_HASH_JOIN_OUTER_BATCH_READER, + &outer_tranche); + + for (i = 0; i < count; ++i) + { + LWLockInitialize(&participants[i].inner_batch_reader.lock, + LWTRANCHE_PARALLEL_HASH_JOIN_INNER_BATCH_READER); + LWLockInitialize(&participants[i].outer_batch_reader.lock, + LWTRANCHE_PARALLEL_HASH_JOIN_OUTER_BATCH_READER); + } +} + +void +ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt) +{ + HashState *hashNode; + SharedHashJoinTable shared; + size_t size; + int planned_participants; + + /* + * Set up the state needed to coordinate access to the shared hash table, + * using the plan node ID as the toc key. + */ + planned_participants = pcxt->nworkers + 1; /* possible workers + leader */ + size = offsetof(SharedHashJoinTableData, participants) + + sizeof(HashJoinParticipantState) * planned_participants; + shared = shm_toc_allocate(pcxt->toc, size); + BarrierInit(&shared->barrier, 0); + shared->primary_buckets = InvalidDsaPointer; + shared->secondary_buckets = InvalidDsaPointer; + pg_atomic_init_u32(&shared->next_unmatched_bucket, 0); + pg_atomic_init_u64(&shared->total_primary_tuples, 0); + pg_atomic_init_u64(&shared->total_secondary_tuples, 0); + dsa_pointer_atomic_init(&shared->head_primary_chunk, InvalidDsaPointer); + dsa_pointer_atomic_init(&shared->head_secondary_chunk, InvalidDsaPointer); + dsa_pointer_atomic_init(&shared->chunks_to_rebucket, InvalidDsaPointer); + shared->planned_participants = planned_participants; + shm_toc_insert(pcxt->toc, state->js.ps.plan->plan_node_id, shared); + configure_reader_locks(shared->participants, planned_participants); + + /* + * Pass the SharedHashJoinTable to the hash node. If the Gather node + * running in the leader backend decides to execute the hash join, it + * hasn't called ExecHashJoinInitializeWorker so it doesn't have + * state->shared_table_data set up. So we must do it here. + */ + hashNode = (HashState *) innerPlanState(state); + hashNode->shared_table_data = shared; +} + +void +ExecHashJoinInitializeWorker(HashJoinState *state, shm_toc *toc) +{ + HashState *hashNode; + + state->hj_sharedHashJoinTable = + shm_toc_lookup(toc, state->js.ps.plan->plan_node_id); + + /* + * Inject SharedHashJoinTable into the hash node. It could instead have + * its own ExecHashInitializeWorker function, but we only want to set its + * 'parallel_aware' flag if we want to tell it to actually build the hash + * table in parallel. Since its parallel_aware flag also controls whether + * its 'InitializeWorker' function gets called, and it also needs access + * to this object for serial shared hash mode, we'll pass it on here + * instead of depending on that. + */ + hashNode = (HashState *) innerPlanState(state); + hashNode->shared_table_data = state->hj_sharedHashJoinTable; + Assert(hashNode->shared_table_data != NULL); + + Assert(HashJoinParticipantNumber() < + hashNode->shared_table_data->planned_participants); + + configure_reader_locks(hashNode->shared_table_data->participants, 0); +} diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 00bf3a5..361eb5d 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -31,6 +31,8 @@ #include "executor/nodeSeqscan.h" #include "utils/rel.h" +#include + static void InitScanRelation(SeqScanState *node, EState *estate, int eflags); static TupleTableSlot *SeqNext(SeqScanState *node); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index ae86954..ca215dd 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -1993,6 +1993,7 @@ _outHashPath(StringInfo str, const HashPath *node) WRITE_NODE_FIELD(path_hashclauses); WRITE_INT_FIELD(num_batches); + WRITE_ENUM_FIELD(table_type, HashPathTableType); } static void diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 2a49639..79c7650 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -104,6 +104,7 @@ double seq_page_cost = DEFAULT_SEQ_PAGE_COST; double random_page_cost = DEFAULT_RANDOM_PAGE_COST; double cpu_tuple_cost = DEFAULT_CPU_TUPLE_COST; +double cpu_shared_tuple_cost = DEFAULT_CPU_SHARED_TUPLE_COST; double cpu_index_tuple_cost = DEFAULT_CPU_INDEX_TUPLE_COST; double cpu_operator_cost = DEFAULT_CPU_OPERATOR_COST; double parallel_tuple_cost = DEFAULT_PARALLEL_TUPLE_COST; @@ -2694,7 +2695,8 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, List *hashclauses, Path *outer_path, Path *inner_path, SpecialJoinInfo *sjinfo, - SemiAntiJoinFactors *semifactors) + SemiAntiJoinFactors *semifactors, + HashPathTableType table_type) { Cost startup_cost = 0; Cost run_cost = 0; @@ -2725,6 +2727,26 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, run_cost += cpu_operator_cost * num_hashclauses * outer_path_rows; /* + * If this is a shared hash table, there is a extra charge for inserting + * each tuple into the shared hash table, to cover the overhead of memory + * synchronization that makes the hash table slightly slower to build than + * a private hash table. There is no extra charge for probing the hash + * table for outer path row, on the basis that read-only access to the + * hash table shouldn't generate any extra memory synchronization. + * + * TODO: Really what we want is some guess at the number of cache sync + * overhead generated by inserting into cachelines that have been + * invalidated by someone else inserting into a bucket in the same + * cacheline. Not sure if it's better to introduce a + * cpu_cacheline_sync_cost (or _miss_cost?) and then here estimate the + * number of collisions we expect based by num buckets, cacheline size, + * num workers. But that might be too detailed/low level/variable + * heavy/bogus. + */ + if (table_type != HASHPATH_TABLE_PRIVATE) + startup_cost += cpu_shared_tuple_cost * inner_path_rows; + + /* * Get hash table size that executor would use for inner relation. * * XXX for the moment, always assume that skew optimization will be diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index cc7384f..87c4cef 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -483,7 +483,8 @@ try_hashjoin_path(PlannerInfo *root, Path *inner_path, List *hashclauses, JoinType jointype, - JoinPathExtraData *extra) + JoinPathExtraData *extra, + HashPathTableType table_type) { Relids required_outer; JoinCostWorkspace workspace; @@ -508,7 +509,7 @@ try_hashjoin_path(PlannerInfo *root, */ initial_cost_hashjoin(root, &workspace, jointype, hashclauses, outer_path, inner_path, - extra->sjinfo, &extra->semifactors); + extra->sjinfo, &extra->semifactors, table_type); if (add_path_precheck(joinrel, workspace.startup_cost, workspace.total_cost, @@ -525,7 +526,8 @@ try_hashjoin_path(PlannerInfo *root, inner_path, extra->restrictlist, required_outer, - hashclauses)); + hashclauses, + table_type)); } else { @@ -546,7 +548,8 @@ try_partial_hashjoin_path(PlannerInfo *root, Path *inner_path, List *hashclauses, JoinType jointype, - JoinPathExtraData *extra) + JoinPathExtraData *extra, + HashPathTableType table_type) { JoinCostWorkspace workspace; @@ -571,7 +574,8 @@ try_partial_hashjoin_path(PlannerInfo *root, */ initial_cost_hashjoin(root, &workspace, jointype, hashclauses, outer_path, inner_path, - extra->sjinfo, &extra->semifactors); + extra->sjinfo, &extra->semifactors, + table_type); if (!add_partial_path_precheck(joinrel, workspace.total_cost, NIL)) return; @@ -587,7 +591,8 @@ try_partial_hashjoin_path(PlannerInfo *root, inner_path, extra->restrictlist, NULL, - hashclauses)); + hashclauses, + table_type)); } /* @@ -1356,7 +1361,8 @@ hash_inner_and_outer(PlannerInfo *root, cheapest_total_inner, hashclauses, jointype, - extra); + extra, + HASHPATH_TABLE_PRIVATE); /* no possibility of cheap startup here */ } else if (jointype == JOIN_UNIQUE_INNER) @@ -1372,7 +1378,8 @@ hash_inner_and_outer(PlannerInfo *root, cheapest_total_inner, hashclauses, jointype, - extra); + extra, + HASHPATH_TABLE_PRIVATE); if (cheapest_startup_outer != NULL && cheapest_startup_outer != cheapest_total_outer) try_hashjoin_path(root, @@ -1381,7 +1388,8 @@ hash_inner_and_outer(PlannerInfo *root, cheapest_total_inner, hashclauses, jointype, - extra); + extra, + HASHPATH_TABLE_PRIVATE); } else { @@ -1402,7 +1410,8 @@ hash_inner_and_outer(PlannerInfo *root, cheapest_total_inner, hashclauses, jointype, - extra); + extra, + HASHPATH_TABLE_PRIVATE); foreach(lc1, outerrel->cheapest_parameterized_paths) { @@ -1436,7 +1445,8 @@ hash_inner_and_outer(PlannerInfo *root, innerpath, hashclauses, jointype, - extra); + extra, + HASHPATH_TABLE_PRIVATE); } } } @@ -1445,23 +1455,32 @@ hash_inner_and_outer(PlannerInfo *root, * If the joinrel is parallel-safe, we may be able to consider a * partial hash join. However, we can't handle JOIN_UNIQUE_OUTER, * because the outer path will be partial, and therefore we won't be - * able to properly guarantee uniqueness. Similarly, we can't handle - * JOIN_FULL and JOIN_RIGHT, because they can produce false null - * extended rows. Also, the resulting path must not be parameterized. + * able to properly guarantee uniqueness. Also, the resulting path + * must not be parameterized. */ if (joinrel->consider_parallel && jointype != JOIN_UNIQUE_OUTER && - jointype != JOIN_FULL && - jointype != JOIN_RIGHT && outerrel->partial_pathlist != NIL && bms_is_empty(joinrel->lateral_relids)) { Path *cheapest_partial_outer; + Path *cheapest_partial_inner = NULL; Path *cheapest_safe_inner = NULL; cheapest_partial_outer = (Path *) linitial(outerrel->partial_pathlist); + /* Can we use a partial inner plan too? */ + if (innerrel->partial_pathlist != NIL) + cheapest_partial_inner = + (Path *) linitial(innerrel->partial_pathlist); + if (cheapest_partial_inner != NULL) + try_partial_hashjoin_path(root, joinrel, + cheapest_partial_outer, + cheapest_partial_inner, + hashclauses, jointype, extra, + HASHPATH_TABLE_SHARED_PARALLEL); + /* * Normally, given that the joinrel is parallel-safe, the cheapest * total inner path will also be parallel-safe, but if not, we'll @@ -1488,10 +1507,20 @@ hash_inner_and_outer(PlannerInfo *root, } if (cheapest_safe_inner != NULL) + { + /* Try a shared table with only one worker building the table. */ try_partial_hashjoin_path(root, joinrel, cheapest_partial_outer, cheapest_safe_inner, - hashclauses, jointype, extra); + hashclauses, jointype, extra, + HASHPATH_TABLE_SHARED_SERIAL); + /* Also private hash tables, built by each worker. */ + try_partial_hashjoin_path(root, joinrel, + cheapest_partial_outer, + cheapest_safe_inner, + hashclauses, jointype, extra, + HASHPATH_TABLE_PRIVATE); + } } } } diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index ad49674..4954c4c 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -3938,6 +3938,23 @@ create_hashjoin_plan(PlannerInfo *root, copy_plan_costsize(&hash_plan->plan, inner_plan); hash_plan->plan.startup_cost = hash_plan->plan.total_cost; + /* + * Set the table as sharable if appropriate, with parallel or serial + * building. + */ + switch (best_path->table_type) + { + case HASHPATH_TABLE_SHARED_PARALLEL: + hash_plan->shared_table = true; + hash_plan->plan.parallel_aware = true; + break; + case HASHPATH_TABLE_SHARED_SERIAL: + hash_plan->shared_table = true; + break; + case HASHPATH_TABLE_PRIVATE: + break; + } + join_plan = make_hashjoin(tlist, joinclauses, otherclauses, diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index abb7507..68cabe6 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -2096,6 +2096,7 @@ create_mergejoin_path(PlannerInfo *root, * 'required_outer' is the set of required outer rels * 'hashclauses' are the RestrictInfo nodes to use as hash clauses * (this should be a subset of the restrict_clauses list) + * 'table_type' for level of hash table sharing */ HashPath * create_hashjoin_path(PlannerInfo *root, @@ -2108,7 +2109,8 @@ create_hashjoin_path(PlannerInfo *root, Path *inner_path, List *restrict_clauses, Relids required_outer, - List *hashclauses) + List *hashclauses, + HashPathTableType table_type) { HashPath *pathnode = makeNode(HashPath); @@ -2123,9 +2125,13 @@ create_hashjoin_path(PlannerInfo *root, sjinfo, required_outer, &restrict_clauses); - pathnode->jpath.path.parallel_aware = false; + pathnode->jpath.path.parallel_aware = + joinrel->consider_parallel && + (table_type == HASHPATH_TABLE_SHARED_SERIAL || + table_type == HASHPATH_TABLE_SHARED_PARALLEL); pathnode->jpath.path.parallel_safe = joinrel->consider_parallel && outer_path->parallel_safe && inner_path->parallel_safe; + pathnode->table_type = table_type; /* This is a foolish way to estimate parallel_workers, but for now... */ pathnode->jpath.path.parallel_workers = outer_path->parallel_workers; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index a392197..c1e8819 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3393,6 +3393,51 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_SYNC_REP: event_name = "SyncRep"; break; + case WAIT_EVENT_HASH_CREATING: + event_name = "Hash/Creating"; + break; + case WAIT_EVENT_HASH_HASHING: + event_name = "Hash/Hashing"; + break; + case WAIT_EVENT_HASH_RESIZING: + event_name = "Hash/Resizing"; + break; + case WAIT_EVENT_HASH_REBUCKETING: + event_name = "Hash/Rebucketing"; + break; + case WAIT_EVENT_HASH_INIT: + event_name = "Hash/Init"; + break; + case WAIT_EVENT_HASH_DESTROY: + event_name = "Hash/Destroy"; + break; + case WAIT_EVENT_HASH_UNMATCHED: + event_name = "Hash/Unmatched"; + break; + case WAIT_EVENT_HASH_PROMOTING: + event_name = "Hash/Promoting"; + break; + case WAIT_EVENT_HASHJOIN_PROMOTING: + event_name = "HashJoin/Promoting"; + break; + case WAIT_EVENT_HASHJOIN_PREPARING: + event_name = "HashJoin/Preparing"; + break; + case WAIT_EVENT_HASHJOIN_PROBING: + event_name = "HashJoin/Probing"; + break; + case WAIT_EVENT_HASHJOIN_LOADING: + event_name = "HashJoin/Loading";; + break; + case WAIT_EVENT_HASHJOIN_REWINDING: + event_name = "HashJoin/Rewinding";; + break; + case WAIT_EVENT_HASHJOIN_REWINDING2: + event_name = "HashJoin/Rewinding2";; + break; + case WAIT_EVENT_HASHJOIN_REWINDING3: + event_name = "HashJoin/Rewinding3";; + break; /* no default case, so that compiler will warn */ } diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index 042be79..0fc8404 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -42,6 +42,8 @@ #include "storage/buf_internals.h" #include "utils/resowner.h" +extern int ParallelWorkerNumber; + /* * We break BufFiles into gigabyte-sized segments, regardless of RELSEG_SIZE. * The reason is that we'd like large temporary BufFiles to be spread across @@ -89,6 +91,24 @@ struct BufFile char buffer[BLCKSZ]; }; +/* + * Serialized representation of a single file managed by a BufFile. + */ +typedef struct BufFileFileDescriptor +{ + char path[MAXPGPATH]; +} BufFileFileDescriptor; + +/* + * Serialized representation of a BufFile, to be created by BufFileExport and + * consumed by BufFileImport. + */ +struct BufFileDescriptor +{ + size_t num_files; + BufFileFileDescriptor files[FLEXIBLE_ARRAY_MEMBER]; +}; + static BufFile *makeBufFile(File firstfile); static void extendBufFile(BufFile *file); static void BufFileLoadBuffer(BufFile *file); @@ -178,6 +198,77 @@ BufFileCreateTemp(bool interXact) return file; } +/* + * Export a BufFile description in a serialized form so that another backend + * can attach to it and read from it. The format is opaque, but it may be + * bitwise copied, and its size may be obtained with BufFileDescriptorSize(). + */ +BufFileDescriptor * +BufFileExport(BufFile *file) +{ + BufFileDescriptor *descriptor; + int i; + + /* Flush output from local buffers. */ + BufFileFlush(file); + + /* Create and fill in a descriptor. */ + descriptor = palloc0(offsetof(BufFileDescriptor, files) + + sizeof(BufFileFileDescriptor) * file->numFiles); + descriptor->num_files = file->numFiles; + for (i = 0; i < descriptor->num_files; ++i) + strcpy(descriptor->files[i].path, FilePathName(file->files[i])); + + return descriptor; +} + +/* + * Return the size in bytes of a BufFileDescriptor, so that it can be copied. + */ +size_t +BufFileDescriptorSize(const BufFileDescriptor *descriptor) +{ + return offsetof(BufFileDescriptor, files) + + sizeof(BufFileFileDescriptor) * descriptor->num_files; +} + +/* + * Open a BufFile that was created by another backend and then exported. The + * file must be read-only in all backends, and is still owned by the backend + * that created it. This provides a way for cooperating backends to share + * immutable temporary data such as hash join batches. + */ +BufFile * +BufFileImport(BufFileDescriptor *descriptor) +{ + BufFile *file = (BufFile *) palloc(sizeof(BufFile)); + int i; + + file->numFiles = descriptor->num_files; + file->files = (File *) palloc0(sizeof(File) * descriptor->num_files); + file->offsets = (off_t *) palloc0(sizeof(off_t) * descriptor->num_files); + file->isTemp = false; + file->isInterXact = true; /* prevent cleanup by this backend */ + file->dirty = false; + file->resowner = CurrentResourceOwner; + file->curFile = 0; + file->curOffset = 0L; + file->pos = 0; + file->nbytes = 0; + + for (i = 0; i < descriptor->num_files; ++i) + { + file->files[i] = + PathNameOpenFile(descriptor->files[i].path, + O_RDONLY | PG_BINARY, 0600); + if (file->files[i] <= 0) + elog(ERROR, "failed to import \"%s\": %m", + descriptor->files[i].path); + } + + return file; +} + #ifdef NOT_USED /* * Create a BufFile and attach it to an already-opened virtual File. diff --git a/src/backend/storage/ipc/barrier.c b/src/backend/storage/ipc/barrier.c index 8b83c1d..5a45103 100644 --- a/src/backend/storage/ipc/barrier.c +++ b/src/backend/storage/ipc/barrier.c @@ -16,6 +16,7 @@ #include "storage/barrier.h" + /* * Initialize this barrier, setting a static number of participants that we * will wait for at each computation phase. To use a dynamic number of diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 2d3cf9e..6c79733 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -788,7 +788,6 @@ pg_stat_get_activity(PG_FUNCTION_ARGS) raw_wait_event = UINT32_ACCESS_ONCE(proc->wait_event_info); wait_event_type = pgstat_get_wait_event_type(raw_wait_event); wait_event = pgstat_get_wait_event(raw_wait_event); - } else { diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 65660c1..9b49918 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2857,6 +2857,16 @@ static struct config_real ConfigureNamesReal[] = NULL, NULL, NULL }, { + {"cpu_shared_tuple_cost", PGC_USERSET, QUERY_TUNING_COST, + gettext_noop("Sets the planner's estimate of the cost of " + "sharing each tuple with other parallel workers."), + NULL + }, + &cpu_shared_tuple_cost, + DEFAULT_CPU_TUPLE_COST, -DBL_MAX, DBL_MAX, + NULL, NULL, NULL + }, + { {"cpu_index_tuple_cost", PGC_USERSET, QUERY_TUNING_COST, gettext_noop("Sets the planner's estimate of the cost of " "processing each index entry during an index scan."), diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 6d0e12b..1bbf376 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -15,7 +15,13 @@ #define HASHJOIN_H #include "nodes/execnodes.h" +#include "port/atomics.h" +#include "storage/barrier.h" #include "storage/buffile.h" +#include "storage/dsa.h" +#include "storage/fd.h" +#include "storage/lwlock.h" +#include "storage/spin.h" /* ---------------------------------------------------------------- * hash-join hash table structures @@ -63,7 +69,12 @@ typedef struct HashJoinTupleData { - struct HashJoinTupleData *next; /* link to next tuple in same bucket */ + /* link to next tuple in same bucket */ + union + { + dsa_pointer shared; + struct HashJoinTupleData *private; + } next; uint32 hashvalue; /* tuple's hash code */ /* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */ } HashJoinTupleData; @@ -94,7 +105,12 @@ typedef struct HashJoinTupleData typedef struct HashSkewBucket { uint32 hashvalue; /* common hash value */ - HashJoinTuple tuples; /* linked list of inner-relation tuples */ + /* linked list of inner-relation tuples */ + union + { + dsa_pointer shared; + HashJoinTuple private; + } tuples; } HashSkewBucket; #define SKEW_BUCKET_OVERHEAD MAXALIGN(sizeof(HashSkewBucket)) @@ -103,8 +119,9 @@ typedef struct HashSkewBucket #define SKEW_MIN_OUTER_FRACTION 0.01 /* - * To reduce palloc overhead, the HashJoinTuples for the current batch are - * packed in 32kB buffers instead of pallocing each tuple individually. + * To reduce palloc/dsa_allocate overhead, the HashJoinTuples for the current + * batch are packed in 32kB buffers instead of pallocing each tuple + * individually. */ typedef struct HashMemoryChunkData { @@ -112,17 +129,120 @@ typedef struct HashMemoryChunkData size_t maxlen; /* size of the buffer holding the tuples */ size_t used; /* number of buffer bytes already used */ - struct HashMemoryChunkData *next; /* pointer to the next chunk (linked - * list) */ + /* pointer to the next chunk (linked list) */ + union + { + dsa_pointer shared; + struct HashMemoryChunkData *private; + } next; char data[FLEXIBLE_ARRAY_MEMBER]; /* buffer allocated at the end */ } HashMemoryChunkData; typedef struct HashMemoryChunkData *HashMemoryChunk; + + #define HASH_CHUNK_SIZE (32 * 1024L) #define HASH_CHUNK_THRESHOLD (HASH_CHUNK_SIZE / 4) +/* + * Read head position in a shared batch file. + */ +typedef struct HashJoinBatchPosition +{ + int fileno; + off_t offset; +} HashJoinBatchPosition; + +/* + * The state exposed in shared memory for each participant to coordinate + * reading of batch files that it wrote. + */ +typedef struct HashJoinSharedBatchReader +{ + int batchno; /* the batch number we are currently reading */ + + LWLock lock; /* protects access to the members below */ + bool error; /* has an IO error occurred? */ + HashJoinBatchPosition head; /* shared read head for current batch */ +} HashJoinSharedBatchReader; + +/* + * The state exposed in shared memory by each participant allowing its batch + * files to be read by other participants. + */ +typedef struct HashJoinParticipantState +{ + /* + * Arrays of pointers to arrays of pointers to BufFileDesciptor objects + * exported by this participant. The descriptor for batch i is in slot + * i % (1 << fls(i - 1)) of the array at index fls(i). + * + * This arrangement means that we can modify future batches without + * moving/reallocating the current batch. The current batch is therefore + * immutable and accessible by other backends which need to read it. + */ + dsa_pointer inner_batch_descriptors[32]; /* number of bits in batchno */ + dsa_pointer outer_batch_descriptors[32]; + + /* + * The shared state used to coordinate reading from the current batch. We + * need separate objects for the outer and inner side, because in the + * probing phase some participants can be reading from the outer batch, + * while others can be reading from the inner side to preload the next + * batch. + */ + HashJoinSharedBatchReader inner_batch_reader; + HashJoinSharedBatchReader outer_batch_reader; +} HashJoinParticipantState; + +/* + * The state used by each backend to manage reading from batch files written + * by all participants. + */ +typedef struct HashJoinBatchReader +{ + int participant_number; /* read which participant's batch? */ + int batchno; /* which batch are we reading? */ + bool inner; /* inner or outer? */ + HashJoinSharedBatchReader *shared; /* holder of the shared read head */ + BufFile *file; /* the file opened in this backend */ + HashJoinBatchPosition head; /* local read head position */ +} HashJoinBatchReader; + +/* + * State for a shared hash join table. Each backend participating in a hash + * join with a shared hash table also has a HashJoinTableData object in + * backend-private memory, which points to this shared state in the DSM + * segment. + */ +typedef struct SharedHashJoinTableData +{ + Barrier barrier; /* for synchronizing workers */ + dsa_pointer primary_buckets; /* primary hash table */ + dsa_pointer secondary_buckets; /* hash table for preloading next batch */ + bool at_least_one_worker; /* did at least one worker join in time? */ + int nbuckets; + int nbuckets_optimal; + pg_atomic_uint32 next_unmatched_bucket; + pg_atomic_uint64 total_primary_tuples; + pg_atomic_uint64 total_secondary_tuples; + dsa_pointer_atomic head_primary_chunk; + dsa_pointer_atomic head_secondary_chunk; + dsa_pointer_atomic chunks_to_rebucket; + int planned_participants; /* number of planned workers + leader */ + + /* state exposed by each participant for sharing batches */ + HashJoinParticipantState participants[FLEXIBLE_ARRAY_MEMBER]; +} SharedHashJoinTableData; + +typedef union HashJoinBucketHead +{ + dsa_pointer_atomic shared; + HashJoinTuple private; +} HashJoinBucketHead; + typedef struct HashJoinTableData { int nbuckets; /* # buckets in the in-memory hash table */ @@ -134,9 +254,11 @@ typedef struct HashJoinTableData int log2_nbuckets_optimal; /* log2(nbuckets_optimal) */ /* buckets[i] is head of list of tuples in i'th in-memory bucket */ - struct HashJoinTupleData **buckets; + HashJoinBucketHead *buckets; /* buckets array is per-batch storage, as are all the tuples */ + HashJoinBucketHead *next_buckets; /* for preloading next batch */ + bool keepNulls; /* true to store unmatchable NULL tuples */ bool skewEnabled; /* are we using skew optimization? */ @@ -185,7 +307,73 @@ typedef struct HashJoinTableData MemoryContext batchCxt; /* context for this-batch-only storage */ /* used for dense allocation of tuples (into linked chunks) */ - HashMemoryChunk chunks; /* one list for the whole batch */ + HashMemoryChunk primary_chunk; /* current chunk for this batch */ + HashMemoryChunk secondary_chunk; /* current chunk for next batch */ + HashMemoryChunk chunks_to_rebucket; /* after resizing table */ + dsa_pointer primary_chunk_shared; /* DSA pointer to primary_chunk */ + dsa_pointer secondary_chunk_shared; /* DSA pointer to secondary_chunk */ + + /* State for coordinating shared tables for parallel hash joins. */ + dsa_area *area; + SharedHashJoinTableData *shared; /* the shared state */ + int attached_at_phase; /* the phase this participant joined */ + bool detached_early; /* did we decide to detach early? */ + HashJoinBatchReader batch_reader; /* state for reading batches in */ } HashJoinTableData; +/* Check if a HashJoinTable is shared by parallel workers. */ +#define HashJoinTableIsShared(table) ((table)->shared != NULL) + +/* The phases of parallel hash computation. */ +#define PHJ_PHASE_INIT 0 +#define PHJ_PHASE_CREATING 1 +#define PHJ_PHASE_HASHING 2 +#define PHJ_PHASE_RESIZING 3 +#define PHJ_PHASE_REBUCKETING 4 +#define PHJ_PHASE_PROBING 5 /* PHJ_PHASE_PROBING_BATCH(0) */ +#define PHJ_PHASE_UNMATCHED 6 /* PHJ_PHASE_UNMATCHED_BATCH(0) */ + +/* The subphases for batches. */ +#define PHJ_SUBPHASE_PROMOTING 0 +#define PHJ_SUBPHASE_LOADING 1 +#define PHJ_SUBPHASE_PREPARING 2 +#define PHJ_SUBPHASE_PROBING 3 +#define PHJ_SUBPHASE_UNMATCHED 4 + +/* The phases of parallel processing for batch(n). */ +#define PHJ_PHASE_PROMOTING_BATCH(n) (PHJ_PHASE_UNMATCHED + (n) * 5 - 4) +#define PHJ_PHASE_LOADING_BATCH(n) (PHJ_PHASE_UNMATCHED + (n) * 5 - 3) +#define PHJ_PHASE_PREPARING_BATCH(n) (PHJ_PHASE_UNMATCHED + (n) * 5 - 2) +#define PHJ_PHASE_PROBING_BATCH(n) (PHJ_PHASE_UNMATCHED + (n) * 5 - 1) +#define PHJ_PHASE_UNMATCHED_BATCH(n) (PHJ_PHASE_UNMATCHED + (n) * 5 - 0) + +/* Phase number -> sub-phase within a batch. */ +#define PHJ_PHASE_TO_SUBPHASE(p) \ + (((int)(p) - PHJ_PHASE_UNMATCHED + PHJ_SUBPHASE_UNMATCHED) % 5) + +/* Phase number -> batch number. */ +#define PHJ_PHASE_TO_BATCHNO(p) \ + (((int)(p) - PHJ_PHASE_UNMATCHED + PHJ_SUBPHASE_UNMATCHED) / 5) + +/* + * Is a given phase one in which a new hash table array is being assigned by + * one elected backend? That includes initial creation, reallocation during + * resize, and promotion of secondary hash table to primary. Workers that + * show up and attach at an arbitrary time must wait such phases out before + * doing anything with the hash table. + */ +#define PHJ_PHASE_MUTATING_TABLE(p) \ + ((p) == PHJ_PHASE_CREATING || \ + (p) == PHJ_PHASE_RESIZING || \ + (PHJ_PHASE_TO_BATCHNO(p) > 0 && \ + PHJ_PHASE_TO_SUBPHASE(p) == PHJ_SUBPHASE_PROMOTING)) + +/* + * Return the 'participant number' for a process participating in a parallel + * hash join. We give a number < hashtable->shared->planned_participants + * to each potential participant, including the leader. + */ +#define HashJoinParticipantNumber() \ + (IsParallelWorker() ? ParallelWorkerNumber + 1 : 0) + #endif /* HASHJOIN_H */ diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 8cf6d15..b1e80f3 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -22,12 +22,12 @@ extern Node *MultiExecHash(HashState *node); extern void ExecEndHash(HashState *node); extern void ExecReScanHash(HashState *node); -extern HashJoinTable ExecHashTableCreate(Hash *node, List *hashOperators, +extern HashJoinTable ExecHashTableCreate(HashState *node, List *hashOperators, bool keepNulls); extern void ExecHashTableDestroy(HashJoinTable hashtable); extern void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, - uint32 hashvalue); + uint32 hashvalue, bool secondary); extern bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, @@ -49,5 +49,7 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, int *numbatches, int *num_skew_mcvs); extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue); +extern void ExecHashUpdate(HashJoinTable hashtable); +extern bool ExecHashCheckForEarlyExit(HashJoinTable hashtable); #endif /* NODEHASH_H */ diff --git a/src/include/executor/nodeHashjoin.h b/src/include/executor/nodeHashjoin.h index f24127a..d123e7e 100644 --- a/src/include/executor/nodeHashjoin.h +++ b/src/include/executor/nodeHashjoin.h @@ -14,15 +14,27 @@ #ifndef NODEHASHJOIN_H #define NODEHASHJOIN_H +#include "access/parallel.h" #include "nodes/execnodes.h" #include "storage/buffile.h" +#include "storage/shm_toc.h" extern HashJoinState *ExecInitHashJoin(HashJoin *node, EState *estate, int eflags); extern TupleTableSlot *ExecHashJoin(HashJoinState *node); extern void ExecEndHashJoin(HashJoinState *node); extern void ExecReScanHashJoin(HashJoinState *node); -extern void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, - BufFile **fileptr); +extern void ExecHashJoinSaveTuple(HashJoinTable hashtable, + MinimalTuple tuple, uint32 hashvalue, + int batchno, bool inner); +extern void ExecHashJoinRewindBatches(HashJoinTable hashtable, int batchno); +extern void ExecHashJoinOpenBatch(HashJoinTable hashtable, + int batchno, bool inner); +extern void ExecHashJoinCloseBatch(HashJoinTable hashtable, + int batchno, bool inner); + +extern void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt); +extern void ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt); +extern void ExecHashJoinInitializeWorker(HashJoinState *state, shm_toc *toc); #endif /* NODEHASHJOIN_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 2fadf76..9ae55be 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1738,6 +1738,7 @@ typedef struct MergeJoinState /* these structs are defined in executor/hashjoin.h: */ typedef struct HashJoinTupleData *HashJoinTuple; typedef struct HashJoinTableData *HashJoinTable; +typedef struct SharedHashJoinTableData *SharedHashJoinTable; typedef struct HashJoinState { @@ -1759,6 +1760,7 @@ typedef struct HashJoinState int hj_JoinState; bool hj_MatchedOuter; bool hj_OuterNotEmpty; + SharedHashJoinTable hj_sharedHashJoinTable; } HashJoinState; @@ -1982,6 +1984,9 @@ typedef struct HashState HashJoinTable hashtable; /* hash table for the hashjoin */ List *hashkeys; /* list of ExprState nodes */ /* hashkeys is same as parent's hj_InnerHashKeys */ + + /* The following are the same as the parent's. */ + SharedHashJoinTable shared_table_data; } HashState; /* ---------------- diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index e2fbc7d..e8f90d9 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -782,6 +782,7 @@ typedef struct Hash bool skewInherit; /* is outer join rel an inheritance tree? */ Oid skewColType; /* datatype of the outer key column */ int32 skewColTypmod; /* typmod of the outer key column */ + bool shared_table; /* table shared by multiple workers? */ /* all other info is in the parent HashJoin node */ } Hash; diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 3a1255a..8b06551 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -1258,6 +1258,16 @@ typedef struct MergePath bool materialize_inner; /* add Materialize to inner? */ } MergePath; +typedef enum +{ + /* Every worker builds its own private copy of the hash table. */ + HASHPATH_TABLE_PRIVATE, + /* One worker builds a shared hash table, and all workers probe it. */ + HASHPATH_TABLE_SHARED_SERIAL, + /* All workers build a shared hash table, and then probe it. */ + HASHPATH_TABLE_SHARED_PARALLEL +} HashPathTableType; + /* * A hashjoin path has these fields. * @@ -1272,6 +1282,7 @@ typedef struct HashPath JoinPath jpath; List *path_hashclauses; /* join clauses used for hashing */ int num_batches; /* number of batches expected */ + HashPathTableType table_type; /* level of sharedness */ } HashPath; /* diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 2a4df2f..7bb0d1d 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -24,6 +24,7 @@ #define DEFAULT_SEQ_PAGE_COST 1.0 #define DEFAULT_RANDOM_PAGE_COST 4.0 #define DEFAULT_CPU_TUPLE_COST 0.01 +#define DEFAULT_CPU_SHARED_TUPLE_COST 0.0 #define DEFAULT_CPU_INDEX_TUPLE_COST 0.005 #define DEFAULT_CPU_OPERATOR_COST 0.0025 #define DEFAULT_PARALLEL_TUPLE_COST 0.1 @@ -48,6 +49,7 @@ typedef enum extern PGDLLIMPORT double seq_page_cost; extern PGDLLIMPORT double random_page_cost; extern PGDLLIMPORT double cpu_tuple_cost; +extern PGDLLIMPORT double cpu_shared_tuple_cost; extern PGDLLIMPORT double cpu_index_tuple_cost; extern PGDLLIMPORT double cpu_operator_cost; extern PGDLLIMPORT double parallel_tuple_cost; @@ -144,7 +146,8 @@ extern void initial_cost_hashjoin(PlannerInfo *root, List *hashclauses, Path *outer_path, Path *inner_path, SpecialJoinInfo *sjinfo, - SemiAntiJoinFactors *semifactors); + SemiAntiJoinFactors *semifactors, + HashPathTableType table_type); extern void final_cost_hashjoin(PlannerInfo *root, HashPath *path, JoinCostWorkspace *workspace, SpecialJoinInfo *sjinfo, diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 71d9154..5f4ca87 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -134,7 +134,8 @@ extern HashPath *create_hashjoin_path(PlannerInfo *root, Path *inner_path, List *restrict_clauses, Relids required_outer, - List *hashclauses); + List *hashclauses, + HashPathTableType table_type); extern ProjectionPath *create_projection_path(PlannerInfo *root, RelOptInfo *rel, diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 0b85b7a..0157d52 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -785,7 +785,22 @@ typedef enum WAIT_EVENT_MQ_SEND, WAIT_EVENT_PARALLEL_FINISH, WAIT_EVENT_SAFE_SNAPSHOT, - WAIT_EVENT_SYNC_REP + WAIT_EVENT_SYNC_REP, + WAIT_EVENT_HASH_CREATING, + WAIT_EVENT_HASH_HASHING, + WAIT_EVENT_HASH_RESIZING, + WAIT_EVENT_HASH_REBUCKETING, + WAIT_EVENT_HASH_INIT, + WAIT_EVENT_HASH_DESTROY, + WAIT_EVENT_HASH_UNMATCHED, + WAIT_EVENT_HASH_PROMOTING, + WAIT_EVENT_HASHJOIN_PROMOTING, + WAIT_EVENT_HASHJOIN_PROBING, + WAIT_EVENT_HASHJOIN_LOADING, + WAIT_EVENT_HASHJOIN_PREPARING, + WAIT_EVENT_HASHJOIN_REWINDING, + WAIT_EVENT_HASHJOIN_REWINDING2, /* TODO: rename me */ + WAIT_EVENT_HASHJOIN_REWINDING3 /* TODO: rename me */ } WaitEventIPC; /* ---------- diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h index 809e596..044262d 100644 --- a/src/include/storage/buffile.h +++ b/src/include/storage/buffile.h @@ -30,12 +30,17 @@ typedef struct BufFile BufFile; +typedef struct BufFileDescriptor BufFileDescriptor; + /* * prototypes for functions in buffile.c */ extern BufFile *BufFileCreateTemp(bool interXact); extern void BufFileClose(BufFile *file); +extern BufFileDescriptor *BufFileExport(BufFile *file); +extern BufFile *BufFileImport(BufFileDescriptor *descriptor); +extern size_t BufFileDescriptorSize(const BufFileDescriptor *descriptor); extern size_t BufFileRead(BufFile *file, void *ptr, size_t size); extern size_t BufFileWrite(BufFile *file, void *ptr, size_t size); extern int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence); diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 951e421..7af6e04 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -236,6 +236,8 @@ typedef enum BuiltinTrancheIds LWTRANCHE_LOCK_MANAGER, LWTRANCHE_PREDICATE_LOCK_MANAGER, LWTRANCHE_PARALLEL_EXEC_AREA, + LWTRANCHE_PARALLEL_HASH_JOIN_INNER_BATCH_READER, + LWTRANCHE_PARALLEL_HASH_JOIN_OUTER_BATCH_READER, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds;