@@ -246,10 +246,10 @@ MultiExecParallelHash(HashState *node)
246
246
*/
247
247
pstate = hashtable -> parallel_state ;
248
248
build_barrier = & pstate -> build_barrier ;
249
- Assert (BarrierPhase (build_barrier ) >= PHJ_BUILD_ALLOCATE );
249
+ Assert (BarrierPhase (build_barrier ) >= PHJ_BUILD_ALLOCATING );
250
250
switch (BarrierPhase (build_barrier ))
251
251
{
252
- case PHJ_BUILD_ALLOCATE :
252
+ case PHJ_BUILD_ALLOCATING :
253
253
254
254
/*
255
255
* Either I just allocated the initial hash table in
@@ -259,7 +259,7 @@ MultiExecParallelHash(HashState *node)
259
259
BarrierArriveAndWait (build_barrier , WAIT_EVENT_HASH_BUILD_ALLOCATE );
260
260
/* Fall through. */
261
261
262
- case PHJ_BUILD_HASH_INNER :
262
+ case PHJ_BUILD_HASHING_INNER :
263
263
264
264
/*
265
265
* It's time to begin hashing, or if we just arrived here then
@@ -271,10 +271,10 @@ MultiExecParallelHash(HashState *node)
271
271
* below.
272
272
*/
273
273
if (PHJ_GROW_BATCHES_PHASE (BarrierAttach (& pstate -> grow_batches_barrier )) !=
274
- PHJ_GROW_BATCHES_ELECT )
274
+ PHJ_GROW_BATCHES_ELECTING )
275
275
ExecParallelHashIncreaseNumBatches (hashtable );
276
276
if (PHJ_GROW_BUCKETS_PHASE (BarrierAttach (& pstate -> grow_buckets_barrier )) !=
277
- PHJ_GROW_BUCKETS_ELECT )
277
+ PHJ_GROW_BUCKETS_ELECTING )
278
278
ExecParallelHashIncreaseNumBuckets (hashtable );
279
279
ExecParallelHashEnsureBatchAccessors (hashtable );
280
280
ExecParallelHashTableSetCurrentBatch (hashtable , 0 );
@@ -333,22 +333,15 @@ MultiExecParallelHash(HashState *node)
333
333
hashtable -> nbuckets = pstate -> nbuckets ;
334
334
hashtable -> log2_nbuckets = my_log2 (hashtable -> nbuckets );
335
335
hashtable -> totalTuples = pstate -> total_tuples ;
336
-
337
- /*
338
- * Unless we're completely done and the batch state has been freed, make
339
- * sure we have accessors.
340
- */
341
- if (BarrierPhase (build_barrier ) < PHJ_BUILD_FREE )
342
- ExecParallelHashEnsureBatchAccessors (hashtable );
336
+ ExecParallelHashEnsureBatchAccessors (hashtable );
343
337
344
338
/*
345
339
* The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE
346
- * case, which will bring the build phase to PHJ_BUILD_RUN (if it isn't
340
+ * case, which will bring the build phase to PHJ_BUILD_DONE (if it isn't
347
341
* there already).
348
342
*/
349
- Assert (BarrierPhase (build_barrier ) == PHJ_BUILD_HASH_OUTER ||
350
- BarrierPhase (build_barrier ) == PHJ_BUILD_RUN ||
351
- BarrierPhase (build_barrier ) == PHJ_BUILD_FREE );
343
+ Assert (BarrierPhase (build_barrier ) == PHJ_BUILD_HASHING_OUTER ||
344
+ BarrierPhase (build_barrier ) == PHJ_BUILD_DONE );
352
345
}
353
346
354
347
/* ----------------------------------------------------------------
@@ -596,8 +589,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
596
589
* Attach to the build barrier. The corresponding detach operation is
597
590
* in ExecHashTableDetach. Note that we won't attach to the
598
591
* batch_barrier for batch 0 yet. We'll attach later and start it out
599
- * in PHJ_BATCH_PROBE phase, because batch 0 is allocated up front and
600
- * then loaded while hashing (the standard hybrid hash join
592
+ * in PHJ_BATCH_PROBING phase, because batch 0 is allocated up front
593
+ * and then loaded while hashing (the standard hybrid hash join
601
594
* algorithm), and we'll coordinate that using build_barrier.
602
595
*/
603
596
build_barrier = & pstate -> build_barrier ;
@@ -610,7 +603,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
610
603
* SharedHashJoinBatch objects and the hash table for batch 0. One
611
604
* backend will be elected to do that now if necessary.
612
605
*/
613
- if (BarrierPhase (build_barrier ) == PHJ_BUILD_ELECT &&
606
+ if (BarrierPhase (build_barrier ) == PHJ_BUILD_ELECTING &&
614
607
BarrierArriveAndWait (build_barrier , WAIT_EVENT_HASH_BUILD_ELECT ))
615
608
{
616
609
pstate -> nbatch = nbatch ;
@@ -631,7 +624,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
631
624
/*
632
625
* The next Parallel Hash synchronization point is in
633
626
* MultiExecParallelHash(), which will progress it all the way to
634
- * PHJ_BUILD_RUN . The caller must not return control from this
627
+ * PHJ_BUILD_DONE . The caller must not return control from this
635
628
* executor node between now and then.
636
629
*/
637
630
}
@@ -1067,7 +1060,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
1067
1060
ParallelHashJoinState * pstate = hashtable -> parallel_state ;
1068
1061
int i ;
1069
1062
1070
- Assert (BarrierPhase (& pstate -> build_barrier ) == PHJ_BUILD_HASH_INNER );
1063
+ Assert (BarrierPhase (& pstate -> build_barrier ) == PHJ_BUILD_HASHING_INNER );
1071
1064
1072
1065
/*
1073
1066
* It's unlikely, but we need to be prepared for new participants to show
@@ -1076,7 +1069,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
1076
1069
*/
1077
1070
switch (PHJ_GROW_BATCHES_PHASE (BarrierPhase (& pstate -> grow_batches_barrier )))
1078
1071
{
1079
- case PHJ_GROW_BATCHES_ELECT :
1072
+ case PHJ_GROW_BATCHES_ELECTING :
1080
1073
1081
1074
/*
1082
1075
* Elect one participant to prepare to grow the number of batches.
@@ -1194,13 +1187,13 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
1194
1187
}
1195
1188
/* Fall through. */
1196
1189
1197
- case PHJ_GROW_BATCHES_REALLOCATE :
1190
+ case PHJ_GROW_BATCHES_ALLOCATING :
1198
1191
/* Wait for the above to be finished. */
1199
1192
BarrierArriveAndWait (& pstate -> grow_batches_barrier ,
1200
- WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE );
1193
+ WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATE );
1201
1194
/* Fall through. */
1202
1195
1203
- case PHJ_GROW_BATCHES_REPARTITION :
1196
+ case PHJ_GROW_BATCHES_REPARTITIONING :
1204
1197
/* Make sure that we have the current dimensions and buckets. */
1205
1198
ExecParallelHashEnsureBatchAccessors (hashtable );
1206
1199
ExecParallelHashTableSetCurrentBatch (hashtable , 0 );
@@ -1213,7 +1206,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
1213
1206
WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION );
1214
1207
/* Fall through. */
1215
1208
1216
- case PHJ_GROW_BATCHES_DECIDE :
1209
+ case PHJ_GROW_BATCHES_DECIDING :
1217
1210
1218
1211
/*
1219
1212
* Elect one participant to clean up and decide whether further
@@ -1268,7 +1261,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
1268
1261
}
1269
1262
/* Fall through. */
1270
1263
1271
- case PHJ_GROW_BATCHES_FINISH :
1264
+ case PHJ_GROW_BATCHES_FINISHING :
1272
1265
/* Wait for the above to complete. */
1273
1266
BarrierArriveAndWait (& pstate -> grow_batches_barrier ,
1274
1267
WAIT_EVENT_HASH_GROW_BATCHES_FINISH );
@@ -1508,7 +1501,7 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
1508
1501
HashMemoryChunk chunk ;
1509
1502
dsa_pointer chunk_s ;
1510
1503
1511
- Assert (BarrierPhase (& pstate -> build_barrier ) == PHJ_BUILD_HASH_INNER );
1504
+ Assert (BarrierPhase (& pstate -> build_barrier ) == PHJ_BUILD_HASHING_INNER );
1512
1505
1513
1506
/*
1514
1507
* It's unlikely, but we need to be prepared for new participants to show
@@ -1517,7 +1510,7 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
1517
1510
*/
1518
1511
switch (PHJ_GROW_BUCKETS_PHASE (BarrierPhase (& pstate -> grow_buckets_barrier )))
1519
1512
{
1520
- case PHJ_GROW_BUCKETS_ELECT :
1513
+ case PHJ_GROW_BUCKETS_ELECTING :
1521
1514
/* Elect one participant to prepare to increase nbuckets. */
1522
1515
if (BarrierArriveAndWait (& pstate -> grow_buckets_barrier ,
1523
1516
WAIT_EVENT_HASH_GROW_BUCKETS_ELECT ))
@@ -1546,13 +1539,13 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
1546
1539
}
1547
1540
/* Fall through. */
1548
1541
1549
- case PHJ_GROW_BUCKETS_REALLOCATE :
1542
+ case PHJ_GROW_BUCKETS_ALLOCATING :
1550
1543
/* Wait for the above to complete. */
1551
1544
BarrierArriveAndWait (& pstate -> grow_buckets_barrier ,
1552
- WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE );
1545
+ WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATE );
1553
1546
/* Fall through. */
1554
1547
1555
- case PHJ_GROW_BUCKETS_REINSERT :
1548
+ case PHJ_GROW_BUCKETS_REINSERTING :
1556
1549
/* Reinsert all tuples into the hash table. */
1557
1550
ExecParallelHashEnsureBatchAccessors (hashtable );
1558
1551
ExecParallelHashTableSetCurrentBatch (hashtable , 0 );
@@ -1708,7 +1701,7 @@ ExecParallelHashTableInsert(HashJoinTable hashtable,
1708
1701
1709
1702
/* Try to load it into memory. */
1710
1703
Assert (BarrierPhase (& hashtable -> parallel_state -> build_barrier ) ==
1711
- PHJ_BUILD_HASH_INNER );
1704
+ PHJ_BUILD_HASHING_INNER );
1712
1705
hashTuple = ExecParallelHashTupleAlloc (hashtable ,
1713
1706
HJTUPLE_OVERHEAD + tuple -> t_len ,
1714
1707
& shared );
@@ -2862,7 +2855,7 @@ ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size,
2862
2855
if (pstate -> growth != PHJ_GROWTH_DISABLED )
2863
2856
{
2864
2857
Assert (curbatch == 0 );
2865
- Assert (BarrierPhase (& pstate -> build_barrier ) == PHJ_BUILD_HASH_INNER );
2858
+ Assert (BarrierPhase (& pstate -> build_barrier ) == PHJ_BUILD_HASHING_INNER );
2866
2859
2867
2860
/*
2868
2861
* Check if our space limit would be exceeded. To avoid choking on
@@ -2982,7 +2975,7 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
2982
2975
{
2983
2976
/* Batch 0 doesn't need to be loaded. */
2984
2977
BarrierAttach (& shared -> batch_barrier );
2985
- while (BarrierPhase (& shared -> batch_barrier ) < PHJ_BATCH_PROBE )
2978
+ while (BarrierPhase (& shared -> batch_barrier ) < PHJ_BATCH_PROBING )
2986
2979
BarrierArriveAndWait (& shared -> batch_barrier , 0 );
2987
2980
BarrierDetach (& shared -> batch_barrier );
2988
2981
}
@@ -3055,11 +3048,14 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
3055
3048
}
3056
3049
3057
3050
/*
3058
- * We should never see a state where the batch-tracking array is freed,
3059
- * because we should have given up sooner if we join when the build
3060
- * barrier has reached the PHJ_BUILD_FREE phase.
3051
+ * It's possible for a backend to start up very late so that the whole
3052
+ * join is finished and the shm state for tracking batches has already
3053
+ * been freed by ExecHashTableDetach(). In that case we'll just leave
3054
+ * hashtable->batches as NULL so that ExecParallelHashJoinNewBatch() gives
3055
+ * up early.
3061
3056
*/
3062
- Assert (DsaPointerIsValid (pstate -> batches ));
3057
+ if (!DsaPointerIsValid (pstate -> batches ))
3058
+ return ;
3063
3059
3064
3060
/* Use hash join memory context. */
3065
3061
oldcxt = MemoryContextSwitchTo (hashtable -> hashCxt );
@@ -3140,7 +3136,7 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
3140
3136
* longer attached, but since there is no way it's moving after
3141
3137
* this point it seems safe to make the following assertion.
3142
3138
*/
3143
- Assert (BarrierPhase (& batch -> batch_barrier ) == PHJ_BATCH_FREE );
3139
+ Assert (BarrierPhase (& batch -> batch_barrier ) == PHJ_BATCH_DONE );
3144
3140
3145
3141
/* Free shared chunks and buckets. */
3146
3142
while (DsaPointerIsValid (batch -> chunks ))
@@ -3179,17 +3175,9 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
3179
3175
void
3180
3176
ExecHashTableDetach (HashJoinTable hashtable )
3181
3177
{
3182
- ParallelHashJoinState * pstate = hashtable -> parallel_state ;
3183
-
3184
- /*
3185
- * If we're involved in a parallel query, we must either have got all the
3186
- * way to PHJ_BUILD_RUN, or joined too late and be in PHJ_BUILD_FREE.
3187
- */
3188
- Assert (!pstate ||
3189
- BarrierPhase (& pstate -> build_barrier ) >= PHJ_BUILD_RUN );
3190
-
3191
- if (pstate && BarrierPhase (& pstate -> build_barrier ) == PHJ_BUILD_RUN )
3178
+ if (hashtable -> parallel_state )
3192
3179
{
3180
+ ParallelHashJoinState * pstate = hashtable -> parallel_state ;
3193
3181
int i ;
3194
3182
3195
3183
/* Make sure any temporary files are closed. */
@@ -3205,22 +3193,17 @@ ExecHashTableDetach(HashJoinTable hashtable)
3205
3193
}
3206
3194
3207
3195
/* If we're last to detach, clean up shared memory. */
3208
- if (BarrierArriveAndDetach (& pstate -> build_barrier ))
3196
+ if (BarrierDetach (& pstate -> build_barrier ))
3209
3197
{
3210
- /*
3211
- * Late joining processes will see this state and give up
3212
- * immediately.
3213
- */
3214
- Assert (BarrierPhase (& pstate -> build_barrier ) == PHJ_BUILD_FREE );
3215
-
3216
3198
if (DsaPointerIsValid (pstate -> batches ))
3217
3199
{
3218
3200
dsa_free (hashtable -> area , pstate -> batches );
3219
3201
pstate -> batches = InvalidDsaPointer ;
3220
3202
}
3221
3203
}
3204
+
3205
+ hashtable -> parallel_state = NULL ;
3222
3206
}
3223
- hashtable -> parallel_state = NULL ;
3224
3207
}
3225
3208
3226
3209
/*
0 commit comments