Skip to content

Commit 10c0558

Browse files
committed
Fix several mistakes around parallel workers and client_encoding.
Previously, workers sent data to the leader using the client encoding. That mostly worked, but the leader the converted the data back to the server encoding. Since not all encoding conversions are reversible, that could provoke failures. Fix by using the database encoding for all communication between worker and leader. Also, while temporary changes to GUC settings, as from the SET clause of a function, are in general OK for parallel query, changing client_encoding this way inside of a parallel worker is not OK. Previously, that would have confused the leader; with these changes, it would not confuse the leader, but it wouldn't do anything either. So refuse such changes in parallel workers. Also, the previous code naively assumed that when it received a NotifyResonse from the worker, it could pass that directly back to the user. But now that worker-to-leader communication always uses the database encoding, that's clearly no longer correct - though, actually, the old way was always broken for V2 clients. So disassemble and reconstitute the message instead. Issues reported by Peter Eisentraut. Patch by me, reviewed by Peter Eisentraut.
1 parent f8c5855 commit 10c0558

File tree

7 files changed

+78
-6
lines changed

7 files changed

+78
-6
lines changed

src/backend/access/transam/parallel.c

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -810,7 +810,17 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
810810
case 'A': /* NotifyResponse */
811811
{
812812
/* Propagate NotifyResponse. */
813-
pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1);
813+
int32 pid;
814+
const char *channel;
815+
const char *payload;
816+
817+
pid = pq_getmsgint(msg, 4);
818+
channel = pq_getmsgrawstring(msg);
819+
payload = pq_getmsgrawstring(msg);
820+
pq_endmessage(msg);
821+
822+
NotifyMyFrontEnd(channel, payload, pid);
823+
814824
break;
815825
}
816826

@@ -988,6 +998,12 @@ ParallelWorkerMain(Datum main_arg)
988998
BackgroundWorkerInitializeConnectionByOid(fps->database_id,
989999
fps->authenticated_user_id);
9901000

1001+
/*
1002+
* Set the client encoding to the database encoding, since that is what
1003+
* the leader will expect.
1004+
*/
1005+
SetClientEncoding(GetDatabaseEncoding());
1006+
9911007
/* Restore GUC values from launching backend. */
9921008
gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC);
9931009
Assert(gucspace != NULL);

src/backend/commands/async.c

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -390,9 +390,6 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
390390
char *page_buffer);
391391
static void asyncQueueAdvanceTail(void);
392392
static void ProcessIncomingNotify(void);
393-
static void NotifyMyFrontEnd(const char *channel,
394-
const char *payload,
395-
int32 srcPid);
396393
static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
397394
static void ClearPendingActionsAndNotifies(void);
398395

@@ -2076,7 +2073,7 @@ ProcessIncomingNotify(void)
20762073
/*
20772074
* Send NOTIFY message to my front end.
20782075
*/
2079-
static void
2076+
void
20802077
NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
20812078
{
20822079
if (whereToSendOutput == DestRemote)

src/backend/commands/variable.c

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,30 @@ assign_client_encoding(const char *newval, void *extra)
755755
{
756756
int encoding = *((int *) extra);
757757

758+
/*
759+
* Parallel workers send data to the leader, not the client. They always
760+
* send data using the database encoding.
761+
*/
762+
if (IsParallelWorker())
763+
{
764+
/*
765+
* During parallel worker startup, we want to accept the leader's
766+
* client_encoding setting so that anyone who looks at the value in
767+
* the worker sees the same value that they would see in the leader.
768+
*/
769+
if (InitializingParallelWorker)
770+
return;
771+
772+
/*
773+
* A change other than during startup, for example due to a SET clause
774+
* attached to a function definition, should be rejected, as there is
775+
* nothing we can do inside the worker to make it take effect.
776+
*/
777+
ereport(ERROR,
778+
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
779+
errmsg("cannot change client_encoding in a parallel worker")));
780+
}
781+
758782
/* We do not expect an error if PrepareClientEncoding succeeded */
759783
if (SetClientEncoding(encoding) < 0)
760784
elog(LOG, "SetClientEncoding(%d) failed", encoding);

src/backend/libpq/pqformat.c

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
* pq_copymsgbytes - copy raw data from a message buffer
6666
* pq_getmsgtext - get a counted text string (with conversion)
6767
* pq_getmsgstring - get a null-terminated text string (with conversion)
68+
* pq_getmsgrawstring - get a null-terminated text string - NO conversion
6869
* pq_getmsgend - verify message fully consumed
6970
*/
7071

@@ -639,6 +640,35 @@ pq_getmsgstring(StringInfo msg)
639640
return pg_client_to_server(str, slen);
640641
}
641642

643+
/* --------------------------------
644+
* pq_getmsgrawstring - get a null-terminated text string - NO conversion
645+
*
646+
* Returns a pointer directly into the message buffer.
647+
* --------------------------------
648+
*/
649+
const char *
650+
pq_getmsgrawstring(StringInfo msg)
651+
{
652+
char *str;
653+
int slen;
654+
655+
str = &msg->data[msg->cursor];
656+
657+
/*
658+
* It's safe to use strlen() here because a StringInfo is guaranteed to
659+
* have a trailing null byte. But check we found a null inside the
660+
* message.
661+
*/
662+
slen = strlen(str);
663+
if (msg->cursor + slen >= msg->len)
664+
ereport(ERROR,
665+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
666+
errmsg("invalid string in message")));
667+
msg->cursor += slen + 1;
668+
669+
return str;
670+
}
671+
642672
/* --------------------------------
643673
* pq_getmsgend - verify message fully consumed
644674
* --------------------------------

src/backend/libpq/pqmq.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ pq_parse_errornotice(StringInfo msg, ErrorData *edata)
232232
pq_getmsgend(msg);
233233
break;
234234
}
235-
value = pq_getmsgstring(msg);
235+
value = pq_getmsgrawstring(msg);
236236

237237
switch (code)
238238
{

src/include/commands/async.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ extern volatile sig_atomic_t notifyInterruptPending;
2828
extern Size AsyncShmemSize(void);
2929
extern void AsyncShmemInit(void);
3030

31+
extern void NotifyMyFrontEnd(const char *channel,
32+
const char *payload,
33+
int32 srcPid);
34+
3135
/* notify-related SQL statements */
3236
extern void Async_Notify(const char *channel, const char *payload);
3337
extern void Async_Listen(const char *channel);

src/include/libpq/pqformat.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ extern const char *pq_getmsgbytes(StringInfo msg, int datalen);
4444
extern void pq_copymsgbytes(StringInfo msg, char *buf, int datalen);
4545
extern char *pq_getmsgtext(StringInfo msg, int rawbytes, int *nbytes);
4646
extern const char *pq_getmsgstring(StringInfo msg);
47+
extern const char *pq_getmsgrawstring(StringInfo msg);
4748
extern void pq_getmsgend(StringInfo msg);
4849

4950
#endif /* PQFORMAT_H */

0 commit comments

Comments
 (0)