Skip to content

Commit 7f7f25f

Browse files
committed
Revert "Fix race in Parallel Hash Join batch cleanup."
This reverts commit 378802e. This reverts commit 3b8981b. Discussion: https://postgr.es/m/CA%2BhUKGJmcqAE3MZeDCLLXa62cWM0AJbKmp2JrJYaJ86bz36LFA%40mail.gmail.com
1 parent 9fd2952 commit 7f7f25f

File tree

5 files changed

+113
-142
lines changed

5 files changed

+113
-142
lines changed

src/backend/executor/nodeHash.c

+41-58
Original file line numberDiff line numberDiff line change
@@ -246,10 +246,10 @@ MultiExecParallelHash(HashState *node)
246246
*/
247247
pstate = hashtable->parallel_state;
248248
build_barrier = &pstate->build_barrier;
249-
Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATE);
249+
Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATING);
250250
switch (BarrierPhase(build_barrier))
251251
{
252-
case PHJ_BUILD_ALLOCATE:
252+
case PHJ_BUILD_ALLOCATING:
253253

254254
/*
255255
* Either I just allocated the initial hash table in
@@ -259,7 +259,7 @@ MultiExecParallelHash(HashState *node)
259259
BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATE);
260260
/* Fall through. */
261261

262-
case PHJ_BUILD_HASH_INNER:
262+
case PHJ_BUILD_HASHING_INNER:
263263

264264
/*
265265
* It's time to begin hashing, or if we just arrived here then
@@ -271,10 +271,10 @@ MultiExecParallelHash(HashState *node)
271271
* below.
272272
*/
273273
if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) !=
274-
PHJ_GROW_BATCHES_ELECT)
274+
PHJ_GROW_BATCHES_ELECTING)
275275
ExecParallelHashIncreaseNumBatches(hashtable);
276276
if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) !=
277-
PHJ_GROW_BUCKETS_ELECT)
277+
PHJ_GROW_BUCKETS_ELECTING)
278278
ExecParallelHashIncreaseNumBuckets(hashtable);
279279
ExecParallelHashEnsureBatchAccessors(hashtable);
280280
ExecParallelHashTableSetCurrentBatch(hashtable, 0);
@@ -333,22 +333,15 @@ MultiExecParallelHash(HashState *node)
333333
hashtable->nbuckets = pstate->nbuckets;
334334
hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
335335
hashtable->totalTuples = pstate->total_tuples;
336-
337-
/*
338-
* Unless we're completely done and the batch state has been freed, make
339-
* sure we have accessors.
340-
*/
341-
if (BarrierPhase(build_barrier) < PHJ_BUILD_FREE)
342-
ExecParallelHashEnsureBatchAccessors(hashtable);
336+
ExecParallelHashEnsureBatchAccessors(hashtable);
343337

344338
/*
345339
* The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE
346-
* case, which will bring the build phase to PHJ_BUILD_RUN (if it isn't
340+
* case, which will bring the build phase to PHJ_BUILD_DONE (if it isn't
347341
* there already).
348342
*/
349-
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER ||
350-
BarrierPhase(build_barrier) == PHJ_BUILD_RUN ||
351-
BarrierPhase(build_barrier) == PHJ_BUILD_FREE);
343+
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
344+
BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
352345
}
353346

354347
/* ----------------------------------------------------------------
@@ -596,8 +589,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
596589
* Attach to the build barrier. The corresponding detach operation is
597590
* in ExecHashTableDetach. Note that we won't attach to the
598591
* batch_barrier for batch 0 yet. We'll attach later and start it out
599-
* in PHJ_BATCH_PROBE phase, because batch 0 is allocated up front and
600-
* then loaded while hashing (the standard hybrid hash join
592+
* in PHJ_BATCH_PROBING phase, because batch 0 is allocated up front
593+
* and then loaded while hashing (the standard hybrid hash join
601594
* algorithm), and we'll coordinate that using build_barrier.
602595
*/
603596
build_barrier = &pstate->build_barrier;
@@ -610,7 +603,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
610603
* SharedHashJoinBatch objects and the hash table for batch 0. One
611604
* backend will be elected to do that now if necessary.
612605
*/
613-
if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECT &&
606+
if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECTING &&
614607
BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECT))
615608
{
616609
pstate->nbatch = nbatch;
@@ -631,7 +624,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
631624
/*
632625
* The next Parallel Hash synchronization point is in
633626
* MultiExecParallelHash(), which will progress it all the way to
634-
* PHJ_BUILD_RUN. The caller must not return control from this
627+
* PHJ_BUILD_DONE. The caller must not return control from this
635628
* executor node between now and then.
636629
*/
637630
}
@@ -1067,7 +1060,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
10671060
ParallelHashJoinState *pstate = hashtable->parallel_state;
10681061
int i;
10691062

1070-
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASH_INNER);
1063+
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
10711064

10721065
/*
10731066
* It's unlikely, but we need to be prepared for new participants to show
@@ -1076,7 +1069,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
10761069
*/
10771070
switch (PHJ_GROW_BATCHES_PHASE(BarrierPhase(&pstate->grow_batches_barrier)))
10781071
{
1079-
case PHJ_GROW_BATCHES_ELECT:
1072+
case PHJ_GROW_BATCHES_ELECTING:
10801073

10811074
/*
10821075
* Elect one participant to prepare to grow the number of batches.
@@ -1194,13 +1187,13 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
11941187
}
11951188
/* Fall through. */
11961189

1197-
case PHJ_GROW_BATCHES_REALLOCATE:
1190+
case PHJ_GROW_BATCHES_ALLOCATING:
11981191
/* Wait for the above to be finished. */
11991192
BarrierArriveAndWait(&pstate->grow_batches_barrier,
1200-
WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE);
1193+
WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATE);
12011194
/* Fall through. */
12021195

1203-
case PHJ_GROW_BATCHES_REPARTITION:
1196+
case PHJ_GROW_BATCHES_REPARTITIONING:
12041197
/* Make sure that we have the current dimensions and buckets. */
12051198
ExecParallelHashEnsureBatchAccessors(hashtable);
12061199
ExecParallelHashTableSetCurrentBatch(hashtable, 0);
@@ -1213,7 +1206,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
12131206
WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION);
12141207
/* Fall through. */
12151208

1216-
case PHJ_GROW_BATCHES_DECIDE:
1209+
case PHJ_GROW_BATCHES_DECIDING:
12171210

12181211
/*
12191212
* Elect one participant to clean up and decide whether further
@@ -1268,7 +1261,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
12681261
}
12691262
/* Fall through. */
12701263

1271-
case PHJ_GROW_BATCHES_FINISH:
1264+
case PHJ_GROW_BATCHES_FINISHING:
12721265
/* Wait for the above to complete. */
12731266
BarrierArriveAndWait(&pstate->grow_batches_barrier,
12741267
WAIT_EVENT_HASH_GROW_BATCHES_FINISH);
@@ -1508,7 +1501,7 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
15081501
HashMemoryChunk chunk;
15091502
dsa_pointer chunk_s;
15101503

1511-
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASH_INNER);
1504+
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
15121505

15131506
/*
15141507
* It's unlikely, but we need to be prepared for new participants to show
@@ -1517,7 +1510,7 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
15171510
*/
15181511
switch (PHJ_GROW_BUCKETS_PHASE(BarrierPhase(&pstate->grow_buckets_barrier)))
15191512
{
1520-
case PHJ_GROW_BUCKETS_ELECT:
1513+
case PHJ_GROW_BUCKETS_ELECTING:
15211514
/* Elect one participant to prepare to increase nbuckets. */
15221515
if (BarrierArriveAndWait(&pstate->grow_buckets_barrier,
15231516
WAIT_EVENT_HASH_GROW_BUCKETS_ELECT))
@@ -1546,13 +1539,13 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
15461539
}
15471540
/* Fall through. */
15481541

1549-
case PHJ_GROW_BUCKETS_REALLOCATE:
1542+
case PHJ_GROW_BUCKETS_ALLOCATING:
15501543
/* Wait for the above to complete. */
15511544
BarrierArriveAndWait(&pstate->grow_buckets_barrier,
1552-
WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE);
1545+
WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATE);
15531546
/* Fall through. */
15541547

1555-
case PHJ_GROW_BUCKETS_REINSERT:
1548+
case PHJ_GROW_BUCKETS_REINSERTING:
15561549
/* Reinsert all tuples into the hash table. */
15571550
ExecParallelHashEnsureBatchAccessors(hashtable);
15581551
ExecParallelHashTableSetCurrentBatch(hashtable, 0);
@@ -1708,7 +1701,7 @@ ExecParallelHashTableInsert(HashJoinTable hashtable,
17081701

17091702
/* Try to load it into memory. */
17101703
Assert(BarrierPhase(&hashtable->parallel_state->build_barrier) ==
1711-
PHJ_BUILD_HASH_INNER);
1704+
PHJ_BUILD_HASHING_INNER);
17121705
hashTuple = ExecParallelHashTupleAlloc(hashtable,
17131706
HJTUPLE_OVERHEAD + tuple->t_len,
17141707
&shared);
@@ -2862,7 +2855,7 @@ ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size,
28622855
if (pstate->growth != PHJ_GROWTH_DISABLED)
28632856
{
28642857
Assert(curbatch == 0);
2865-
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASH_INNER);
2858+
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
28662859

28672860
/*
28682861
* Check if our space limit would be exceeded. To avoid choking on
@@ -2982,7 +2975,7 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
29822975
{
29832976
/* Batch 0 doesn't need to be loaded. */
29842977
BarrierAttach(&shared->batch_barrier);
2985-
while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBE)
2978+
while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBING)
29862979
BarrierArriveAndWait(&shared->batch_barrier, 0);
29872980
BarrierDetach(&shared->batch_barrier);
29882981
}
@@ -3055,11 +3048,14 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
30553048
}
30563049

30573050
/*
3058-
* We should never see a state where the batch-tracking array is freed,
3059-
* because we should have given up sooner if we join when the build
3060-
* barrier has reached the PHJ_BUILD_FREE phase.
3051+
* It's possible for a backend to start up very late so that the whole
3052+
* join is finished and the shm state for tracking batches has already
3053+
* been freed by ExecHashTableDetach(). In that case we'll just leave
3054+
* hashtable->batches as NULL so that ExecParallelHashJoinNewBatch() gives
3055+
* up early.
30613056
*/
3062-
Assert(DsaPointerIsValid(pstate->batches));
3057+
if (!DsaPointerIsValid(pstate->batches))
3058+
return;
30633059

30643060
/* Use hash join memory context. */
30653061
oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
@@ -3140,7 +3136,7 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
31403136
* longer attached, but since there is no way it's moving after
31413137
* this point it seems safe to make the following assertion.
31423138
*/
3143-
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE);
3139+
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
31443140

31453141
/* Free shared chunks and buckets. */
31463142
while (DsaPointerIsValid(batch->chunks))
@@ -3179,17 +3175,9 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
31793175
void
31803176
ExecHashTableDetach(HashJoinTable hashtable)
31813177
{
3182-
ParallelHashJoinState *pstate = hashtable->parallel_state;
3183-
3184-
/*
3185-
* If we're involved in a parallel query, we must either have got all the
3186-
* way to PHJ_BUILD_RUN, or joined too late and be in PHJ_BUILD_FREE.
3187-
*/
3188-
Assert(!pstate ||
3189-
BarrierPhase(&pstate->build_barrier) >= PHJ_BUILD_RUN);
3190-
3191-
if (pstate && BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_RUN)
3178+
if (hashtable->parallel_state)
31923179
{
3180+
ParallelHashJoinState *pstate = hashtable->parallel_state;
31933181
int i;
31943182

31953183
/* Make sure any temporary files are closed. */
@@ -3205,22 +3193,17 @@ ExecHashTableDetach(HashJoinTable hashtable)
32053193
}
32063194

32073195
/* If we're last to detach, clean up shared memory. */
3208-
if (BarrierArriveAndDetach(&pstate->build_barrier))
3196+
if (BarrierDetach(&pstate->build_barrier))
32093197
{
3210-
/*
3211-
* Late joining processes will see this state and give up
3212-
* immediately.
3213-
*/
3214-
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_FREE);
3215-
32163198
if (DsaPointerIsValid(pstate->batches))
32173199
{
32183200
dsa_free(hashtable->area, pstate->batches);
32193201
pstate->batches = InvalidDsaPointer;
32203202
}
32213203
}
3204+
3205+
hashtable->parallel_state = NULL;
32223206
}
3223-
hashtable->parallel_state = NULL;
32243207
}
32253208

32263209
/*

0 commit comments

Comments
 (0)