@@ -287,7 +287,8 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
287
287
if (!reinitialize )
288
288
tqueuespace =
289
289
shm_toc_allocate (pcxt -> toc ,
290
- PARALLEL_TUPLE_QUEUE_SIZE * pcxt -> nworkers );
290
+ mul_size (PARALLEL_TUPLE_QUEUE_SIZE ,
291
+ pcxt -> nworkers ));
291
292
else
292
293
tqueuespace = shm_toc_lookup (pcxt -> toc , PARALLEL_KEY_TUPLE_QUEUE );
293
294
@@ -296,7 +297,8 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
296
297
{
297
298
shm_mq * mq ;
298
299
299
- mq = shm_mq_create (tqueuespace + i * PARALLEL_TUPLE_QUEUE_SIZE ,
300
+ mq = shm_mq_create (tqueuespace +
301
+ ((Size ) i ) * PARALLEL_TUPLE_QUEUE_SIZE ,
300
302
(Size ) PARALLEL_TUPLE_QUEUE_SIZE );
301
303
302
304
shm_mq_set_receiver (mq , MyProc );
@@ -380,12 +382,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
380
382
* looking at pgBufferUsage, so do it unconditionally.
381
383
*/
382
384
shm_toc_estimate_chunk (& pcxt -> estimator ,
383
- sizeof (BufferUsage ) * pcxt -> nworkers );
385
+ mul_size ( sizeof (BufferUsage ), pcxt -> nworkers ) );
384
386
shm_toc_estimate_keys (& pcxt -> estimator , 1 );
385
387
386
388
/* Estimate space for tuple queues. */
387
389
shm_toc_estimate_chunk (& pcxt -> estimator ,
388
- PARALLEL_TUPLE_QUEUE_SIZE * pcxt -> nworkers );
390
+ mul_size ( PARALLEL_TUPLE_QUEUE_SIZE , pcxt -> nworkers ) );
389
391
shm_toc_estimate_keys (& pcxt -> estimator , 1 );
390
392
391
393
/*
@@ -404,7 +406,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
404
406
sizeof (int ) * e .nnodes ;
405
407
instrumentation_len = MAXALIGN (instrumentation_len );
406
408
instrument_offset = instrumentation_len ;
407
- instrumentation_len += sizeof (Instrumentation ) * e .nnodes * nworkers ;
409
+ instrumentation_len +=
410
+ mul_size (sizeof (Instrumentation ),
411
+ mul_size (e .nnodes , nworkers ));
408
412
shm_toc_estimate_chunk (& pcxt -> estimator , instrumentation_len );
409
413
shm_toc_estimate_keys (& pcxt -> estimator , 1 );
410
414
}
@@ -432,7 +436,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
432
436
433
437
/* Allocate space for each worker's BufferUsage; no need to initialize. */
434
438
bufusage_space = shm_toc_allocate (pcxt -> toc ,
435
- sizeof (BufferUsage ) * pcxt -> nworkers );
439
+ mul_size ( sizeof (BufferUsage ), pcxt -> nworkers ) );
436
440
shm_toc_insert (pcxt -> toc , PARALLEL_KEY_BUFFER_USAGE , bufusage_space );
437
441
pei -> buffer_usage = bufusage_space ;
438
442
@@ -511,7 +515,7 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
511
515
InstrAggNode (planstate -> instrument , & instrument [n ]);
512
516
513
517
/* Also store the per-worker detail. */
514
- ibytes = instrumentation -> num_workers * sizeof (Instrumentation );
518
+ ibytes = mul_size ( instrumentation -> num_workers , sizeof (Instrumentation ) );
515
519
planstate -> worker_instrument =
516
520
palloc (ibytes + offsetof(WorkerInstrumentation , instrument ));
517
521
planstate -> worker_instrument -> num_workers = instrumentation -> num_workers ;
0 commit comments