Skip to content

Commit 9346d39

Browse files
committed
In walreceiver, don't try to do ereport() in a signal handler.
This is quite unsafe, even for the case of ereport(FATAL) where we won't return control to the interrupted code, and despite this code's use of a flag to restrict the areas where we'd try to do it. It's possible for example that we interrupt malloc or free while that's holding a lock that's meant to protect against cross-thread interference. Then, any attempt to do malloc or free within ereport() will result in a deadlock, preventing the walreceiver process from exiting in response to SIGTERM. We hypothesize that this explains some hard-to-reproduce failures seen in the buildfarm. Hence, get rid of the immediate-exit code in WalRcvShutdownHandler, as well as the logic associated with WalRcvImmediateInterruptOK. Instead, we need to take care that potentially-blocking operations in the walreceiver's data transmission logic (libpqwalreceiver.c) will respond reasonably promptly to the process's latch becoming set and then call ProcessWalRcvInterrupts. Much of the needed code for that was already present in libpqwalreceiver.c. I refactored things a bit so that all the uses of PQgetResult use latch-aware waiting, but didn't need to do much more. These changes should be enough to ensure that libpqwalreceiver.c will respond promptly to SIGTERM whenever it's waiting to receive data. In principle, it could block for a long time while waiting to send data too, and this patch does nothing to guard against that. I think that that hazard is mostly theoretical though: such blocking should occur only if we fill the kernel's data transmission buffers, and we don't generally send enough data to make that happen without waiting for input. If we find out that the hazard isn't just theoretical, we could fix it by using PQsetnonblocking, but that would require more ticklish changes than I care to make now. Back-patch of commit a1a789e. This problem goes all the way back to the origins of walreceiver; but given the substantial reworking the module received during the v10 cycle, it seems unsafe to assume that our testing on HEAD validates this patch for pre-v10 branches. And we'd need to back-patch some prerequisite patches (at least 597a87c and its followups, maybe other things), increasing the risk of problems. Given the dearth of field reports matching this problem, it's not worth much risk. Hence back-patch to v10 and v11 only. Patch by me; thanks to Thomas Munro for review. Discussion: https://postgr.es/m/20190416070119.GK2673@paquier.xyz
1 parent 0b6edb9 commit 9346d39

File tree

3 files changed

+89
-105
lines changed

3 files changed

+89
-105
lines changed

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 71 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
9898

9999
/* Prototypes for private functions */
100100
static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
101+
static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
101102
static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
102103

103104
/*
@@ -200,7 +201,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
200201
if (rc & WL_LATCH_SET)
201202
{
202203
ResetLatch(MyLatch);
203-
CHECK_FOR_INTERRUPTS();
204+
ProcessWalRcvInterrupts();
204205
}
205206

206207
/* If socket is ready, advance the libpq state machine */
@@ -454,6 +455,10 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
454455
{
455456
PGresult *res;
456457

458+
/*
459+
* Send copy-end message. As in libpqrcv_PQexec, this could theoretically
460+
* block, but the risk seems small.
461+
*/
457462
if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
458463
PQflush(conn->streamConn))
459464
ereport(ERROR,
@@ -470,7 +475,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
470475
* If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
471476
* also possible in case we aborted the copy in mid-stream.
472477
*/
473-
res = PQgetResult(conn->streamConn);
478+
res = libpqrcv_PQgetResult(conn->streamConn);
474479
if (PQresultStatus(res) == PGRES_TUPLES_OK)
475480
{
476481
/*
@@ -484,7 +489,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
484489
PQclear(res);
485490

486491
/* the result set should be followed by CommandComplete */
487-
res = PQgetResult(conn->streamConn);
492+
res = libpqrcv_PQgetResult(conn->streamConn);
488493
}
489494
else if (PQresultStatus(res) == PGRES_COPY_OUT)
490495
{
@@ -497,7 +502,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
497502
pchomp(PQerrorMessage(conn->streamConn)))));
498503

499504
/* CommandComplete should follow */
500-
res = PQgetResult(conn->streamConn);
505+
res = libpqrcv_PQgetResult(conn->streamConn);
501506
}
502507

503508
if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -507,7 +512,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
507512
PQclear(res);
508513

509514
/* Verify that there are no more results */
510-
res = PQgetResult(conn->streamConn);
515+
res = libpqrcv_PQgetResult(conn->streamConn);
511516
if (res != NULL)
512517
ereport(ERROR,
513518
(errmsg("unexpected result after CommandComplete: %s",
@@ -570,12 +575,11 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
570575
* The function is modeled on PQexec() in libpq, but only implements
571576
* those parts that are in use in the walreceiver api.
572577
*
573-
* Queries are always executed on the connection in streamConn.
578+
* May return NULL, rather than an error result, on failure.
574579
*/
575580
static PGresult *
576581
libpqrcv_PQexec(PGconn *streamConn, const char *query)
577582
{
578-
PGresult *result = NULL;
579583
PGresult *lastResult = NULL;
580584

581585
/*
@@ -586,64 +590,26 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
586590
*/
587591

588592
/*
589-
* Submit a query. Since we don't use non-blocking mode, this also can
590-
* block. But its risk is relatively small, so we ignore that for now.
593+
* Submit the query. Since we don't use non-blocking mode, this could
594+
* theoretically block. In practice, since we don't send very long query
595+
* strings, the risk seems negligible.
591596
*/
592597
if (!PQsendQuery(streamConn, query))
593598
return NULL;
594599

595600
for (;;)
596601
{
597-
/*
598-
* Receive data until PQgetResult is ready to get the result without
599-
* blocking.
600-
*/
601-
while (PQisBusy(streamConn))
602-
{
603-
int rc;
604-
605-
/*
606-
* We don't need to break down the sleep into smaller increments,
607-
* since we'll get interrupted by signals and can either handle
608-
* interrupts here or elog(FATAL) within SIGTERM signal handler if
609-
* the signal arrives in the middle of establishment of
610-
* replication connection.
611-
*/
612-
rc = WaitLatchOrSocket(MyLatch,
613-
WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
614-
WL_LATCH_SET,
615-
PQsocket(streamConn),
616-
0,
617-
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
618-
619-
/* Emergency bailout? */
620-
if (rc & WL_POSTMASTER_DEATH)
621-
exit(1);
622-
623-
/* Interrupted? */
624-
if (rc & WL_LATCH_SET)
625-
{
626-
ResetLatch(MyLatch);
627-
CHECK_FOR_INTERRUPTS();
628-
}
602+
/* Wait for, and collect, the next PGresult. */
603+
PGresult *result;
629604

630-
/* Consume whatever data is available from the socket */
631-
if (PQconsumeInput(streamConn) == 0)
632-
{
633-
/* trouble; drop whatever we had and return NULL */
634-
PQclear(lastResult);
635-
return NULL;
636-
}
637-
}
605+
result = libpqrcv_PQgetResult(streamConn);
606+
if (result == NULL)
607+
break; /* query is complete, or failure */
638608

639609
/*
640610
* Emulate PQexec()'s behavior of returning the last result when there
641611
* are many. We are fine with returning just last error message.
642612
*/
643-
result = PQgetResult(streamConn);
644-
if (result == NULL)
645-
break; /* query is complete */
646-
647613
PQclear(lastResult);
648614
lastResult = result;
649615

@@ -657,6 +623,55 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
657623
return lastResult;
658624
}
659625

626+
/*
627+
* Perform the equivalent of PQgetResult(), but watch for interrupts.
628+
*/
629+
static PGresult *
630+
libpqrcv_PQgetResult(PGconn *streamConn)
631+
{
632+
/*
633+
* Collect data until PQgetResult is ready to get the result without
634+
* blocking.
635+
*/
636+
while (PQisBusy(streamConn))
637+
{
638+
int rc;
639+
640+
/*
641+
* We don't need to break down the sleep into smaller increments,
642+
* since we'll get interrupted by signals and can handle any
643+
* interrupts here.
644+
*/
645+
rc = WaitLatchOrSocket(MyLatch,
646+
WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
647+
WL_LATCH_SET,
648+
PQsocket(streamConn),
649+
0,
650+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
651+
652+
/* Emergency bailout? */
653+
if (rc & WL_POSTMASTER_DEATH)
654+
exit(1);
655+
656+
/* Interrupted? */
657+
if (rc & WL_LATCH_SET)
658+
{
659+
ResetLatch(MyLatch);
660+
ProcessWalRcvInterrupts();
661+
}
662+
663+
/* Consume whatever data is available from the socket */
664+
if (PQconsumeInput(streamConn) == 0)
665+
{
666+
/* trouble; return NULL */
667+
return NULL;
668+
}
669+
}
670+
671+
/* Now we can collect and return the next PGresult */
672+
return PQgetResult(streamConn);
673+
}
674+
660675
/*
661676
* Disconnect connection to primary, if any.
662677
*/
@@ -718,13 +733,13 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
718733
{
719734
PGresult *res;
720735

721-
res = PQgetResult(conn->streamConn);
736+
res = libpqrcv_PQgetResult(conn->streamConn);
722737
if (PQresultStatus(res) == PGRES_COMMAND_OK)
723738
{
724739
PQclear(res);
725740

726741
/* Verify that there are no more results. */
727-
res = PQgetResult(conn->streamConn);
742+
res = libpqrcv_PQgetResult(conn->streamConn);
728743
if (res != NULL)
729744
{
730745
PQclear(res);
@@ -888,7 +903,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
888903
{
889904
char *cstrs[MaxTupleAttributeNumber];
890905

891-
CHECK_FOR_INTERRUPTS();
906+
ProcessWalRcvInterrupts();
892907

893908
/* Do the allocations in temporary context. */
894909
oldcontext = MemoryContextSwitchTo(rowcontext);

src/backend/replication/walreceiver.c

Lines changed: 17 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -112,28 +112,7 @@ static struct
112112
static StringInfoData reply_message;
113113
static StringInfoData incoming_message;
114114

115-
/*
116-
* About SIGTERM handling:
117-
*
118-
* We can't just exit(1) within SIGTERM signal handler, because the signal
119-
* might arrive in the middle of some critical operation, like while we're
120-
* holding a spinlock. We also can't just set a flag in signal handler and
121-
* check it in the main loop, because we perform some blocking operations
122-
* like libpqrcv_PQexec(), which can take a long time to finish.
123-
*
124-
* We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
125-
* safe for the signal handler to elog(FATAL) immediately. Otherwise it just
126-
* sets got_SIGTERM flag, which is checked in the main loop when convenient.
127-
*
128-
* This is very much like what regular backends do with ImmediateInterruptOK,
129-
* ProcessInterrupts() etc.
130-
*/
131-
static volatile bool WalRcvImmediateInterruptOK = false;
132-
133115
/* Prototypes for private functions */
134-
static void ProcessWalRcvInterrupts(void);
135-
static void EnableWalRcvImmediateExit(void);
136-
static void DisableWalRcvImmediateExit(void);
137116
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
138117
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
139118
static void WalRcvDie(int code, Datum arg);
@@ -151,7 +130,20 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS);
151130
static void WalRcvQuickDieHandler(SIGNAL_ARGS);
152131

153132

154-
static void
133+
/*
134+
* Process any interrupts the walreceiver process may have received.
135+
* This should be called any time the process's latch has become set.
136+
*
137+
* Currently, only SIGTERM is of interest. We can't just exit(1) within the
138+
* SIGTERM signal handler, because the signal might arrive in the middle of
139+
* some critical operation, like while we're holding a spinlock. Instead, the
140+
* signal handler sets a flag variable as well as setting the process's latch.
141+
* We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
142+
* latch has become set. Operations that could block for a long time, such as
143+
* reading from a remote server, must pay attention to the latch too; see
144+
* libpqrcv_PQgetResult for example.
145+
*/
146+
void
155147
ProcessWalRcvInterrupts(void)
156148
{
157149
/*
@@ -163,26 +155,12 @@ ProcessWalRcvInterrupts(void)
163155

164156
if (got_SIGTERM)
165157
{
166-
WalRcvImmediateInterruptOK = false;
167158
ereport(FATAL,
168159
(errcode(ERRCODE_ADMIN_SHUTDOWN),
169160
errmsg("terminating walreceiver process due to administrator command")));
170161
}
171162
}
172163

173-
static void
174-
EnableWalRcvImmediateExit(void)
175-
{
176-
WalRcvImmediateInterruptOK = true;
177-
ProcessWalRcvInterrupts();
178-
}
179-
180-
static void
181-
DisableWalRcvImmediateExit(void)
182-
{
183-
WalRcvImmediateInterruptOK = false;
184-
ProcessWalRcvInterrupts();
185-
}
186164

187165
/* Main entry point for walreceiver process */
188166
void
@@ -302,12 +280,10 @@ WalReceiverMain(void)
302280
PG_SETMASK(&UnBlockSig);
303281

304282
/* Establish the connection to the primary for XLOG streaming */
305-
EnableWalRcvImmediateExit();
306283
wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
307284
if (!wrconn)
308285
ereport(ERROR,
309286
(errmsg("could not connect to the primary server: %s", err)));
310-
DisableWalRcvImmediateExit();
311287

312288
/*
313289
* Save user-visible connection string. This clobbers the original
@@ -347,7 +323,6 @@ WalReceiverMain(void)
347323
* Check that we're connected to a valid server using the
348324
* IDENTIFY_SYSTEM replication command.
349325
*/
350-
EnableWalRcvImmediateExit();
351326
primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
352327
&server_version);
353328

@@ -360,7 +335,6 @@ WalReceiverMain(void)
360335
errdetail("The primary's identifier is %s, the standby's identifier is %s.",
361336
primary_sysid, standby_sysid)));
362337
}
363-
DisableWalRcvImmediateExit();
364338

365339
/*
366340
* Confirm that the current timeline of the primary is the same or
@@ -521,6 +495,8 @@ WalReceiverMain(void)
521495
if (rc & WL_LATCH_SET)
522496
{
523497
ResetLatch(walrcv->latch);
498+
ProcessWalRcvInterrupts();
499+
524500
if (walrcv->force_reply)
525501
{
526502
/*
@@ -598,9 +574,7 @@ WalReceiverMain(void)
598574
* The backend finished streaming. Exit streaming COPY-mode from
599575
* our side, too.
600576
*/
601-
EnableWalRcvImmediateExit();
602577
walrcv_endstreaming(wrconn, &primaryTLI);
603-
DisableWalRcvImmediateExit();
604578

605579
/*
606580
* If the server had switched to a new timeline that we didn't
@@ -754,9 +728,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
754728
(errmsg("fetching timeline history file for timeline %u from primary server",
755729
tli)));
756730

757-
EnableWalRcvImmediateExit();
758731
walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
759-
DisableWalRcvImmediateExit();
760732

761733
/*
762734
* Check that the filename on the master matches what we
@@ -833,7 +805,7 @@ WalRcvSigUsr1Handler(SIGNAL_ARGS)
833805
errno = save_errno;
834806
}
835807

836-
/* SIGTERM: set flag for main loop, or shutdown immediately if safe */
808+
/* SIGTERM: set flag for ProcessWalRcvInterrupts */
837809
static void
838810
WalRcvShutdownHandler(SIGNAL_ARGS)
839811
{
@@ -844,10 +816,6 @@ WalRcvShutdownHandler(SIGNAL_ARGS)
844816
if (WalRcv->latch)
845817
SetLatch(WalRcv->latch);
846818

847-
/* Don't joggle the elbow of proc_exit */
848-
if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
849-
ProcessWalRcvInterrupts();
850-
851819
errno = save_errno;
852820
}
853821

src/include/replication/walreceiver.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ walrcv_clear_result(WalRcvExecResult *walres)
299299

300300
/* prototypes for functions in walreceiver.c */
301301
extern void WalReceiverMain(void) pg_attribute_noreturn();
302+
extern void ProcessWalRcvInterrupts(void);
302303

303304
/* prototypes for functions in walreceiverfuncs.c */
304305
extern Size WalRcvShmemSize(void);

0 commit comments

Comments
 (0)