Skip to content

Commit 26981d2

Browse files
committed
Don't send protocol messages to a shm_mq that no longer exists.
Commit 2bd9e41 introduced a mechanism for relaying protocol messages from a background worker to another backend via a shm_mq. However, there was no provision for shutting down the communication channel. Therefore, a protocol message sent late in the shutdown sequence, such as a DEBUG message resulting from cranking up log_min_messages, could crash the server. To fix, install an on_dsm_detach callback that disables sending messages to the shm_mq when the associated DSM is detached.
1 parent a93b378 commit 26981d2

File tree

5 files changed

+40
-4
lines changed

5 files changed

+40
-4
lines changed

src/backend/access/transam/parallel.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -867,7 +867,7 @@ ParallelWorkerMain(Datum main_arg)
867867
ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
868868
shm_mq_set_sender(mq, MyProc);
869869
mqh = shm_mq_attach(mq, seg, NULL);
870-
pq_redirect_to_shm_mq(mq, mqh);
870+
pq_redirect_to_shm_mq(seg, mqh);
871871
pq_set_parallel_master(fps->parallel_master_pid,
872872
fps->parallel_master_backend_id);
873873

src/backend/libpq/pqmq.c

+26-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ static bool pq_mq_busy = false;
2626
static pid_t pq_mq_parallel_master_pid = 0;
2727
static pid_t pq_mq_parallel_master_backend_id = InvalidBackendId;
2828

29+
static void pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg);
2930
static void mq_comm_reset(void);
3031
static int mq_flush(void);
3132
static int mq_flush_if_writable(void);
@@ -51,13 +52,26 @@ static PQcommMethods PqCommMqMethods = {
5152
* message queue.
5253
*/
5354
void
54-
pq_redirect_to_shm_mq(shm_mq *mq, shm_mq_handle *mqh)
55+
pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
5556
{
5657
PqCommMethods = &PqCommMqMethods;
57-
pq_mq = mq;
58+
pq_mq = shm_mq_get_queue(mqh);
5859
pq_mq_handle = mqh;
5960
whereToSendOutput = DestRemote;
6061
FrontendProtocol = PG_PROTOCOL_LATEST;
62+
on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0);
63+
}
64+
65+
/*
66+
* When the DSM that contains our shm_mq goes away, we need to stop sending
67+
* messages to it.
68+
*/
69+
static void
70+
pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg)
71+
{
72+
pq_mq = NULL;
73+
pq_mq_handle = NULL;
74+
whereToSendOutput = DestNone;
6175
}
6276

6377
/*
@@ -123,9 +137,19 @@ mq_putmessage(char msgtype, const char *s, size_t len)
123137
if (pq_mq != NULL)
124138
shm_mq_detach(pq_mq);
125139
pq_mq = NULL;
140+
pq_mq_handle = NULL;
126141
return EOF;
127142
}
128143

144+
/*
145+
* If the message queue is already gone, just ignore the message. This
146+
* doesn't necessarily indicate a problem; for example, DEBUG messages
147+
* can be generated late in the shutdown sequence, after all DSMs have
148+
* already been detached.
149+
*/
150+
if (pq_mq == NULL)
151+
return 0;
152+
129153
pq_mq_busy = true;
130154

131155
iov[0].data = &msgtype;

src/backend/storage/ipc/shm_mq.c

+9
Original file line numberDiff line numberDiff line change
@@ -745,6 +745,15 @@ shm_mq_detach(shm_mq *mq)
745745
SetLatch(&victim->procLatch);
746746
}
747747

748+
/*
749+
* Get the shm_mq from handle.
750+
*/
751+
shm_mq *
752+
shm_mq_get_queue(shm_mq_handle *mqh)
753+
{
754+
return mqh->mqh_queue;
755+
}
756+
748757
/*
749758
* Write bytes into a shared message queue.
750759
*/

src/include/libpq/pqmq.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
#include "lib/stringinfo.h"
1717
#include "storage/shm_mq.h"
1818

19-
extern void pq_redirect_to_shm_mq(shm_mq *, shm_mq_handle *);
19+
extern void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh);
2020
extern void pq_set_parallel_master(pid_t pid, BackendId backend_id);
2121

2222
extern void pq_parse_errornotice(StringInfo str, ErrorData *edata);

src/include/storage/shm_mq.h

+3
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *);
6565
/* Break connection. */
6666
extern void shm_mq_detach(shm_mq *);
6767

68+
/* Get the shm_mq from handle. */
69+
extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh);
70+
6871
/* Send or receive messages. */
6972
extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
7073
Size nbytes, const void *data, bool nowait);

0 commit comments

Comments
 (0)