Skip to content

Commit 93ac14e

Browse files
committed
Sync 9.5 version of access/transam/parallel.c with HEAD.
This back-patches commit a5fe473 (notably, marking ParallelMessagePending as volatile, which is not particularly optional). I also back-patched some previous cosmetic changes to remove unnecessary diffs between the two branches. I'm unsure how much of this code is actually reachable in 9.5, but to the extent that it is reachable, it needs to be maintained, and minimizing cross-branch diffs will make that easier.
1 parent 89c30d1 commit 93ac14e

File tree

2 files changed

+27
-27
lines changed

2 files changed

+27
-27
lines changed

src/backend/access/transam/parallel.c

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414

1515
#include "postgres.h"
1616

17+
#include "access/parallel.h"
1718
#include "access/xact.h"
1819
#include "access/xlog.h"
19-
#include "access/parallel.h"
2020
#include "commands/async.h"
2121
#include "libpq/libpq.h"
2222
#include "libpq/pqformat.h"
@@ -33,6 +33,7 @@
3333
#include "utils/resowner.h"
3434
#include "utils/snapmgr.h"
3535

36+
3637
/*
3738
* We don't want to waste a lot of memory on an error queue which, most of
3839
* the time, will process only a handful of small messages. However, it is
@@ -90,7 +91,7 @@ typedef struct FixedParallelState
9091
int ParallelWorkerNumber = -1;
9192

9293
/* Is there a parallel message pending which we need to receive? */
93-
bool ParallelMessagePending = false;
94+
volatile bool ParallelMessagePending = false;
9495

9596
/* Are we initializing a parallel worker? */
9697
bool InitializingParallelWorker = false;
@@ -102,11 +103,12 @@ static FixedParallelState *MyFixedParallelState;
102103
static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
103104

104105
/* Private functions. */
105-
static void HandleParallelMessage(ParallelContext *, int, StringInfo msg);
106+
static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
106107
static void ParallelErrorContext(void *arg);
107108
static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
108109
static void ParallelWorkerMain(Datum main_arg);
109110

111+
110112
/*
111113
* Establish a new parallel context. This should be done after entering
112114
* parallel mode, and (unless there is an error) the context should be
@@ -178,8 +180,8 @@ CreateParallelContextForExternalFunction(char *library_name,
178180

179181
/*
180182
* Establish the dynamic shared memory segment for a parallel context and
181-
* copied state and other bookkeeping information that will need by parallel
182-
* workers into it.
183+
* copy state and other bookkeeping information that will be needed by
184+
* parallel workers into it.
183185
*/
184186
void
185187
InitializeParallelDSM(ParallelContext *pcxt)
@@ -231,7 +233,8 @@ InitializeParallelDSM(ParallelContext *pcxt)
231233
PARALLEL_ERROR_QUEUE_SIZE,
232234
"parallel error queue size not buffer-aligned");
233235
shm_toc_estimate_chunk(&pcxt->estimator,
234-
PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers);
236+
mul_size(PARALLEL_ERROR_QUEUE_SIZE,
237+
pcxt->nworkers));
235238
shm_toc_estimate_keys(&pcxt->estimator, 1);
236239

237240
/* Estimate how much we'll need for extension entrypoint info. */
@@ -257,7 +260,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
257260
* parallelism than to fail outright.
258261
*/
259262
segsize = shm_toc_estimate(&pcxt->estimator);
260-
if (pcxt->nworkers != 0)
263+
if (pcxt->nworkers > 0)
261264
pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
262265
if (pcxt->seg != NULL)
263266
pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
@@ -337,7 +340,8 @@ InitializeParallelDSM(ParallelContext *pcxt)
337340
*/
338341
error_queue_space =
339342
shm_toc_allocate(pcxt->toc,
340-
PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers);
343+
mul_size(PARALLEL_ERROR_QUEUE_SIZE,
344+
pcxt->nworkers));
341345
for (i = 0; i < pcxt->nworkers; ++i)
342346
{
343347
char *start;
@@ -603,17 +607,17 @@ ParallelContextActive(void)
603607

604608
/*
605609
* Handle receipt of an interrupt indicating a parallel worker message.
610+
*
611+
* Note: this is called within a signal handler! All we can do is set
612+
* a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
613+
* HandleParallelMessages().
606614
*/
607615
void
608616
HandleParallelMessageInterrupt(void)
609617
{
610-
int save_errno = errno;
611-
612618
InterruptPending = true;
613619
ParallelMessagePending = true;
614620
SetLatch(MyLatch);
615-
616-
errno = save_errno;
617621
}
618622

619623
/*
@@ -664,11 +668,8 @@ HandleParallelMessages(void)
664668
}
665669
else
666670
ereport(ERROR,
667-
(errcode(ERRCODE_INTERNAL_ERROR), /* XXX: wrong errcode? */
668-
errmsg("lost connection to parallel worker")));
669-
670-
/* This might make the error queue go away. */
671-
CHECK_FOR_INTERRUPTS();
671+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
672+
errmsg("lost connection to parallel worker")));
672673
}
673674
}
674675
}
@@ -714,7 +715,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
714715
errctx.previous = pcxt->error_context_stack;
715716
error_context_stack = &errctx;
716717

717-
/* Parse ErrorReponse or NoticeResponse. */
718+
/* Parse ErrorResponse or NoticeResponse. */
718719
pq_parse_errornotice(msg, &edata);
719720

720721
/* Death of a worker isn't enough justification for suicide. */
@@ -747,7 +748,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
747748

748749
default:
749750
{
750-
elog(ERROR, "unknown message type: %c (%d bytes)",
751+
elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
751752
msgtype, msg->len);
752753
}
753754
}
@@ -847,7 +848,7 @@ ParallelWorkerMain(Datum main_arg)
847848
if (toc == NULL)
848849
ereport(ERROR,
849850
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
850-
errmsg("invalid magic number in dynamic shared memory segment")));
851+
errmsg("invalid magic number in dynamic shared memory segment")));
851852

852853
/* Look up fixed parallel state. */
853854
fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED);

src/include/access/parallel.h

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
#include "postmaster/bgworker.h"
2020
#include "storage/shm_mq.h"
2121
#include "storage/shm_toc.h"
22-
#include "utils/elog.h"
2322

2423
typedef void (*parallel_worker_main_type) (dsm_segment *seg, shm_toc *toc);
2524

@@ -46,24 +45,24 @@ typedef struct ParallelContext
4645
ParallelWorkerInfo *worker;
4746
} ParallelContext;
4847

49-
extern bool ParallelMessagePending;
48+
extern volatile bool ParallelMessagePending;
5049
extern int ParallelWorkerNumber;
5150
extern bool InitializingParallelWorker;
5251

5352
#define IsParallelWorker() (ParallelWorkerNumber >= 0)
5453

5554
extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers);
5655
extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers);
57-
extern void InitializeParallelDSM(ParallelContext *);
58-
extern void LaunchParallelWorkers(ParallelContext *);
59-
extern void WaitForParallelWorkersToFinish(ParallelContext *);
60-
extern void DestroyParallelContext(ParallelContext *);
56+
extern void InitializeParallelDSM(ParallelContext *pcxt);
57+
extern void LaunchParallelWorkers(ParallelContext *pcxt);
58+
extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt);
59+
extern void DestroyParallelContext(ParallelContext *pcxt);
6160
extern bool ParallelContextActive(void);
6261

6362
extern void HandleParallelMessageInterrupt(void);
6463
extern void HandleParallelMessages(void);
6564
extern void AtEOXact_Parallel(bool isCommit);
6665
extern void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId);
67-
extern void ParallelWorkerReportLastRecEnd(XLogRecPtr);
66+
extern void ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end);
6867

6968
#endif /* PARALLEL_H */

0 commit comments

Comments
 (0)