Skip to content

Commit 7082e61

Browse files
committed
Provide DSM segment to ExecXXXInitializeWorker functions.
Previously, executor nodes running in parallel worker processes didn't have access to the dsm_segment object used for parallel execution. In order to support resource management based on DSM segment lifetime, they need that. So create a ParallelWorkerContext object to hold it and pass it to all InitializeWorker functions. Author: Thomas Munro Reviewed-By: Andres Freund Discussion: https://postgr.es/m/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com
1 parent 09a7774 commit 7082e61

17 files changed

+55
-33
lines changed

src/backend/executor/execParallel.c

+17-10
Original file line numberDiff line numberDiff line change
@@ -1122,7 +1122,7 @@ ExecParallelReportInstrumentation(PlanState *planstate,
11221122
* is allocated and initialized by executor; that is, after ExecutorStart().
11231123
*/
11241124
static bool
1125-
ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
1125+
ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
11261126
{
11271127
if (planstate == NULL)
11281128
return false;
@@ -1131,40 +1131,44 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
11311131
{
11321132
case T_SeqScanState:
11331133
if (planstate->plan->parallel_aware)
1134-
ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
1134+
ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
11351135
break;
11361136
case T_IndexScanState:
11371137
if (planstate->plan->parallel_aware)
1138-
ExecIndexScanInitializeWorker((IndexScanState *) planstate, toc);
1138+
ExecIndexScanInitializeWorker((IndexScanState *) planstate,
1139+
pwcxt);
11391140
break;
11401141
case T_IndexOnlyScanState:
11411142
if (planstate->plan->parallel_aware)
1142-
ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate, toc);
1143+
ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1144+
pwcxt);
11431145
break;
11441146
case T_ForeignScanState:
11451147
if (planstate->plan->parallel_aware)
11461148
ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1147-
toc);
1149+
pwcxt);
11481150
break;
11491151
case T_CustomScanState:
11501152
if (planstate->plan->parallel_aware)
11511153
ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1152-
toc);
1154+
pwcxt);
11531155
break;
11541156
case T_BitmapHeapScanState:
11551157
if (planstate->plan->parallel_aware)
1156-
ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, toc);
1158+
ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1159+
pwcxt);
11571160
break;
11581161
case T_SortState:
11591162
/* even when not parallel-aware */
1160-
ExecSortInitializeWorker((SortState *) planstate, toc);
1163+
ExecSortInitializeWorker((SortState *) planstate, pwcxt);
11611164
break;
11621165

11631166
default:
11641167
break;
11651168
}
11661169

1167-
return planstate_tree_walker(planstate, ExecParallelInitializeWorker, toc);
1170+
return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1171+
pwcxt);
11681172
}
11691173

11701174
/*
@@ -1194,6 +1198,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
11941198
int instrument_options = 0;
11951199
void *area_space;
11961200
dsa_area *area;
1201+
ParallelWorkerContext pwcxt;
11971202

11981203
/* Get fixed-size state. */
11991204
fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
@@ -1231,7 +1236,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
12311236
RestoreParamExecParams(paramexec_space, queryDesc->estate);
12321237

12331238
}
1234-
ExecParallelInitializeWorker(queryDesc->planstate, toc);
1239+
pwcxt.toc = toc;
1240+
pwcxt.seg = seg;
1241+
ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
12351242

12361243
/* Pass down any tuple bound */
12371244
ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);

src/backend/executor/nodeBitmapHeapscan.c

+3-2
Original file line numberDiff line numberDiff line change
@@ -1102,12 +1102,13 @@ ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node,
11021102
* ----------------------------------------------------------------
11031103
*/
11041104
void
1105-
ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, shm_toc *toc)
1105+
ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node,
1106+
ParallelWorkerContext *pwcxt)
11061107
{
11071108
ParallelBitmapHeapState *pstate;
11081109
Snapshot snapshot;
11091110

1110-
pstate = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false);
1111+
pstate = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
11111112
node->pstate = pstate;
11121113

11131114
snapshot = RestoreSnapshot(pstate->phs_snapshot_data);

src/backend/executor/nodeCustom.c

+4-3
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,8 @@ ExecCustomScanReInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
210210
}
211211

212212
void
213-
ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
213+
ExecCustomScanInitializeWorker(CustomScanState *node,
214+
ParallelWorkerContext *pwcxt)
214215
{
215216
const CustomExecMethods *methods = node->methods;
216217

@@ -219,8 +220,8 @@ ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
219220
int plan_node_id = node->ss.ps.plan->plan_node_id;
220221
void *coordinate;
221222

222-
coordinate = shm_toc_lookup(toc, plan_node_id, false);
223-
methods->InitializeWorkerCustomScan(node, toc, coordinate);
223+
coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false);
224+
methods->InitializeWorkerCustomScan(node, pwcxt->toc, coordinate);
224225
}
225226
}
226227

src/backend/executor/nodeForeignscan.c

+4-3
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,8 @@ ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
359359
* ----------------------------------------------------------------
360360
*/
361361
void
362-
ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc)
362+
ExecForeignScanInitializeWorker(ForeignScanState *node,
363+
ParallelWorkerContext *pwcxt)
363364
{
364365
FdwRoutine *fdwroutine = node->fdwroutine;
365366

@@ -368,8 +369,8 @@ ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc)
368369
int plan_node_id = node->ss.ps.plan->plan_node_id;
369370
void *coordinate;
370371

371-
coordinate = shm_toc_lookup(toc, plan_node_id, false);
372-
fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate);
372+
coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false);
373+
fdwroutine->InitializeWorkerForeignScan(node, pwcxt->toc, coordinate);
373374
}
374375
}
375376

src/backend/executor/nodeIndexonlyscan.c

+3-2
Original file line numberDiff line numberDiff line change
@@ -678,11 +678,12 @@ ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node,
678678
* ----------------------------------------------------------------
679679
*/
680680
void
681-
ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, shm_toc *toc)
681+
ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node,
682+
ParallelWorkerContext *pwcxt)
682683
{
683684
ParallelIndexScanDesc piscan;
684685

685-
piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false);
686+
piscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
686687
node->ioss_ScanDesc =
687688
index_beginscan_parallel(node->ss.ss_currentRelation,
688689
node->ioss_RelationDesc,

src/backend/executor/nodeIndexscan.c

+3-2
Original file line numberDiff line numberDiff line change
@@ -1716,11 +1716,12 @@ ExecIndexScanReInitializeDSM(IndexScanState *node,
17161716
* ----------------------------------------------------------------
17171717
*/
17181718
void
1719-
ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc)
1719+
ExecIndexScanInitializeWorker(IndexScanState *node,
1720+
ParallelWorkerContext *pwcxt)
17201721
{
17211722
ParallelIndexScanDesc piscan;
17221723

1723-
piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false);
1724+
piscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
17241725
node->iss_ScanDesc =
17251726
index_beginscan_parallel(node->ss.ss_currentRelation,
17261727
node->iss_RelationDesc,

src/backend/executor/nodeSeqscan.c

+3-2
Original file line numberDiff line numberDiff line change
@@ -348,11 +348,12 @@ ExecSeqScanReInitializeDSM(SeqScanState *node,
348348
* ----------------------------------------------------------------
349349
*/
350350
void
351-
ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc)
351+
ExecSeqScanInitializeWorker(SeqScanState *node,
352+
ParallelWorkerContext *pwcxt)
352353
{
353354
ParallelHeapScanDesc pscan;
354355

355-
pscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false);
356+
pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
356357
node->ss.ss_currentScanDesc =
357358
heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
358359
}

src/backend/executor/nodeSort.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -420,10 +420,10 @@ ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt)
420420
* ----------------------------------------------------------------
421421
*/
422422
void
423-
ExecSortInitializeWorker(SortState *node, shm_toc *toc)
423+
ExecSortInitializeWorker(SortState *node, ParallelWorkerContext *pwcxt)
424424
{
425425
node->shared_info =
426-
shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, true);
426+
shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
427427
node->am_worker = true;
428428
}
429429

src/include/access/parallel.h

+6
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ typedef struct ParallelContext
4545
ParallelWorkerInfo *worker;
4646
} ParallelContext;
4747

48+
typedef struct ParallelWorkerContext
49+
{
50+
dsm_segment *seg;
51+
shm_toc *toc;
52+
} ParallelWorkerContext;
53+
4854
extern volatile bool ParallelMessagePending;
4955
extern int ParallelWorkerNumber;
5056
extern bool InitializingParallelWorker;

src/include/executor/nodeBitmapHeapscan.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,6 @@ extern void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
2727
extern void ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node,
2828
ParallelContext *pcxt);
2929
extern void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node,
30-
shm_toc *toc);
30+
ParallelWorkerContext *pwcxt);
3131

3232
#endif /* NODEBITMAPHEAPSCAN_H */

src/include/executor/nodeCustom.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ extern void ExecCustomScanInitializeDSM(CustomScanState *node,
3737
extern void ExecCustomScanReInitializeDSM(CustomScanState *node,
3838
ParallelContext *pcxt);
3939
extern void ExecCustomScanInitializeWorker(CustomScanState *node,
40-
shm_toc *toc);
40+
ParallelWorkerContext *pwcxt);
4141
extern void ExecShutdownCustomScan(CustomScanState *node);
4242

4343
#endif /* NODECUSTOM_H */

src/include/executor/nodeForeignscan.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ extern void ExecForeignScanInitializeDSM(ForeignScanState *node,
2828
extern void ExecForeignScanReInitializeDSM(ForeignScanState *node,
2929
ParallelContext *pcxt);
3030
extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
31-
shm_toc *toc);
31+
ParallelWorkerContext *pwcxt);
3232
extern void ExecShutdownForeignScan(ForeignScanState *node);
3333

3434
#endif /* NODEFOREIGNSCAN_H */

src/include/executor/nodeIndexonlyscan.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,6 @@ extern void ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node,
3131
extern void ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node,
3232
ParallelContext *pcxt);
3333
extern void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node,
34-
shm_toc *toc);
34+
ParallelWorkerContext *pwcxt);
3535

3636
#endif /* NODEINDEXONLYSCAN_H */

src/include/executor/nodeIndexscan.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ extern void ExecReScanIndexScan(IndexScanState *node);
2525
extern void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt);
2626
extern void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt);
2727
extern void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt);
28-
extern void ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc);
28+
extern void ExecIndexScanInitializeWorker(IndexScanState *node,
29+
ParallelWorkerContext *pwcxt);
2930

3031
/*
3132
* These routines are exported to share code with nodeIndexonlyscan.c and

src/include/executor/nodeSeqscan.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ extern void ExecReScanSeqScan(SeqScanState *node);
2525
extern void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt);
2626
extern void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
2727
extern void ExecSeqScanReInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
28-
extern void ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc);
28+
extern void ExecSeqScanInitializeWorker(SeqScanState *node,
29+
ParallelWorkerContext *pwcxt);
2930

3031
#endif /* NODESEQSCAN_H */

src/include/executor/nodeSort.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ extern void ExecReScanSort(SortState *node);
2727
extern void ExecSortEstimate(SortState *node, ParallelContext *pcxt);
2828
extern void ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt);
2929
extern void ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt);
30-
extern void ExecSortInitializeWorker(SortState *node, shm_toc *toc);
30+
extern void ExecSortInitializeWorker(SortState *node, ParallelWorkerContext *pwcxt);
3131
extern void ExecSortRetrieveInstrumentation(SortState *node);
3232

3333
#endif /* NODESORT_H */

src/tools/pgindent/typedefs.list

+1
Original file line numberDiff line numberDiff line change
@@ -1534,6 +1534,7 @@ ParallelHeapScanDesc
15341534
ParallelIndexScanDesc
15351535
ParallelSlot
15361536
ParallelState
1537+
ParallelWorkerContext
15371538
ParallelWorkerInfo
15381539
Param
15391540
ParamExecData

0 commit comments

Comments
 (0)