14
14
15
15
#include "postgres.h"
16
16
17
+ #include "access/parallel.h"
17
18
#include "access/xact.h"
18
19
#include "access/xlog.h"
19
- #include "access/parallel.h"
20
20
#include "commands/async.h"
21
21
#include "libpq/libpq.h"
22
22
#include "libpq/pqformat.h"
33
33
#include "utils/resowner.h"
34
34
#include "utils/snapmgr.h"
35
35
36
+
36
37
/*
37
38
* We don't want to waste a lot of memory on an error queue which, most of
38
39
* the time, will process only a handful of small messages. However, it is
@@ -90,7 +91,7 @@ typedef struct FixedParallelState
90
91
int ParallelWorkerNumber = -1 ;
91
92
92
93
/* Is there a parallel message pending which we need to receive? */
93
- bool ParallelMessagePending = false;
94
+ volatile bool ParallelMessagePending = false;
94
95
95
96
/* Are we initializing a parallel worker? */
96
97
bool InitializingParallelWorker = false;
@@ -102,11 +103,12 @@ static FixedParallelState *MyFixedParallelState;
102
103
static dlist_head pcxt_list = DLIST_STATIC_INIT (pcxt_list );
103
104
104
105
/* Private functions. */
105
- static void HandleParallelMessage (ParallelContext * , int , StringInfo msg );
106
+ static void HandleParallelMessage (ParallelContext * pcxt , int i , StringInfo msg );
106
107
static void ParallelErrorContext (void * arg );
107
108
static void ParallelExtensionTrampoline (dsm_segment * seg , shm_toc * toc );
108
109
static void ParallelWorkerMain (Datum main_arg );
109
110
111
+
110
112
/*
111
113
* Establish a new parallel context. This should be done after entering
112
114
* parallel mode, and (unless there is an error) the context should be
@@ -178,8 +180,8 @@ CreateParallelContextForExternalFunction(char *library_name,
178
180
179
181
/*
180
182
* Establish the dynamic shared memory segment for a parallel context and
181
- * copied state and other bookkeeping information that will need by parallel
182
- * workers into it.
183
+ * copy state and other bookkeeping information that will be needed by
184
+ * parallel workers into it.
183
185
*/
184
186
void
185
187
InitializeParallelDSM (ParallelContext * pcxt )
@@ -231,7 +233,8 @@ InitializeParallelDSM(ParallelContext *pcxt)
231
233
PARALLEL_ERROR_QUEUE_SIZE ,
232
234
"parallel error queue size not buffer-aligned" );
233
235
shm_toc_estimate_chunk (& pcxt -> estimator ,
234
- PARALLEL_ERROR_QUEUE_SIZE * pcxt -> nworkers );
236
+ mul_size (PARALLEL_ERROR_QUEUE_SIZE ,
237
+ pcxt -> nworkers ));
235
238
shm_toc_estimate_keys (& pcxt -> estimator , 1 );
236
239
237
240
/* Estimate how much we'll need for extension entrypoint info. */
@@ -257,7 +260,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
257
260
* parallelism than to fail outright.
258
261
*/
259
262
segsize = shm_toc_estimate (& pcxt -> estimator );
260
- if (pcxt -> nworkers != 0 )
263
+ if (pcxt -> nworkers > 0 )
261
264
pcxt -> seg = dsm_create (segsize , DSM_CREATE_NULL_IF_MAXSEGMENTS );
262
265
if (pcxt -> seg != NULL )
263
266
pcxt -> toc = shm_toc_create (PARALLEL_MAGIC ,
@@ -337,7 +340,8 @@ InitializeParallelDSM(ParallelContext *pcxt)
337
340
*/
338
341
error_queue_space =
339
342
shm_toc_allocate (pcxt -> toc ,
340
- PARALLEL_ERROR_QUEUE_SIZE * pcxt -> nworkers );
343
+ mul_size (PARALLEL_ERROR_QUEUE_SIZE ,
344
+ pcxt -> nworkers ));
341
345
for (i = 0 ; i < pcxt -> nworkers ; ++ i )
342
346
{
343
347
char * start ;
@@ -603,17 +607,17 @@ ParallelContextActive(void)
603
607
604
608
/*
605
609
* Handle receipt of an interrupt indicating a parallel worker message.
610
+ *
611
+ * Note: this is called within a signal handler! All we can do is set
612
+ * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
613
+ * HandleParallelMessages().
606
614
*/
607
615
void
608
616
HandleParallelMessageInterrupt (void )
609
617
{
610
- int save_errno = errno ;
611
-
612
618
InterruptPending = true;
613
619
ParallelMessagePending = true;
614
620
SetLatch (MyLatch );
615
-
616
- errno = save_errno ;
617
621
}
618
622
619
623
/*
@@ -664,11 +668,8 @@ HandleParallelMessages(void)
664
668
}
665
669
else
666
670
ereport (ERROR ,
667
- (errcode (ERRCODE_INTERNAL_ERROR ), /* XXX: wrong errcode? */
668
- errmsg ("lost connection to parallel worker" )));
669
-
670
- /* This might make the error queue go away. */
671
- CHECK_FOR_INTERRUPTS ();
671
+ (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
672
+ errmsg ("lost connection to parallel worker" )));
672
673
}
673
674
}
674
675
}
@@ -714,7 +715,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
714
715
errctx .previous = pcxt -> error_context_stack ;
715
716
error_context_stack = & errctx ;
716
717
717
- /* Parse ErrorReponse or NoticeResponse. */
718
+ /* Parse ErrorResponse or NoticeResponse. */
718
719
pq_parse_errornotice (msg , & edata );
719
720
720
721
/* Death of a worker isn't enough justification for suicide. */
@@ -747,7 +748,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
747
748
748
749
default :
749
750
{
750
- elog (ERROR , "unknown message type: %c (%d bytes)" ,
751
+ elog (ERROR , "unrecognized message type received from parallel worker : %c (message length %d bytes)" ,
751
752
msgtype , msg -> len );
752
753
}
753
754
}
@@ -847,7 +848,7 @@ ParallelWorkerMain(Datum main_arg)
847
848
if (toc == NULL )
848
849
ereport (ERROR ,
849
850
(errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
850
- errmsg ("invalid magic number in dynamic shared memory segment" )));
851
+ errmsg ("invalid magic number in dynamic shared memory segment" )));
851
852
852
853
/* Look up fixed parallel state. */
853
854
fps = shm_toc_lookup (toc , PARALLEL_KEY_FIXED );
0 commit comments