Skip to content

Commit 9fcf670

Browse files
committed
Fix signal handling in logical replication workers
The logical replication worker processes now use the normal die() handler for SIGTERM and CHECK_FOR_INTERRUPTS() instead of custom code. One problem before was that the apply worker would not exit promptly when a subscription was dropped, which could lead to deadlocks. Author: Petr Jelinek <petr.jelinek@2ndquadrant.com> Reported-by: Masahiko Sawada <sawada.mshk@gmail.com>
1 parent acbd837 commit 9fcf670

File tree

6 files changed

+50
-21
lines changed

6 files changed

+50
-21
lines changed

src/backend/replication/logical/launcher.c

+8-8
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ static void logicalrep_worker_detach(void);
8080
static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
8181

8282
/* Flags set by signal handlers */
83-
volatile sig_atomic_t got_SIGHUP = false;
84-
volatile sig_atomic_t got_SIGTERM = false;
83+
static volatile sig_atomic_t got_SIGHUP = false;
84+
static volatile sig_atomic_t got_SIGTERM = false;
8585

8686
static bool on_commit_launcher_wakeup = false;
8787

@@ -624,8 +624,8 @@ logicalrep_worker_onexit(int code, Datum arg)
624624
}
625625

626626
/* SIGTERM: set flag to exit at next convenient time */
627-
void
628-
logicalrep_worker_sigterm(SIGNAL_ARGS)
627+
static void
628+
logicalrep_launcher_sigterm(SIGNAL_ARGS)
629629
{
630630
int save_errno = errno;
631631

@@ -638,8 +638,8 @@ logicalrep_worker_sigterm(SIGNAL_ARGS)
638638
}
639639

640640
/* SIGHUP: set flag to reload configuration at next convenient time */
641-
void
642-
logicalrep_worker_sighup(SIGNAL_ARGS)
641+
static void
642+
logicalrep_launcher_sighup(SIGNAL_ARGS)
643643
{
644644
int save_errno = errno;
645645

@@ -799,8 +799,8 @@ ApplyLauncherMain(Datum main_arg)
799799
before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
800800

801801
/* Establish signal handlers. */
802-
pqsignal(SIGHUP, logicalrep_worker_sighup);
803-
pqsignal(SIGTERM, logicalrep_worker_sigterm);
802+
pqsignal(SIGHUP, logicalrep_launcher_sighup);
803+
pqsignal(SIGTERM, logicalrep_launcher_sigterm);
804804
BackgroundWorkerUnblockSignals();
805805

806806
/* Make it easy to identify our processes. */

src/backend/replication/logical/tablesync.c

+4-6
Original file line numberDiff line numberDiff line change
@@ -154,10 +154,12 @@ wait_for_sync_status_change(Oid relid, char origstate)
154154
int rc;
155155
char state = origstate;
156156

157-
while (!got_SIGTERM)
157+
for (;;)
158158
{
159159
LogicalRepWorker *worker;
160160

161+
CHECK_FOR_INTERRUPTS();
162+
161163
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
162164
worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
163165
relid, false);
@@ -525,7 +527,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
525527
bytesread += avail;
526528
}
527529

528-
while (!got_SIGTERM && maxread > 0 && bytesread < minread)
530+
while (maxread > 0 && bytesread < minread)
529531
{
530532
pgsocket fd = PGINVALID_SOCKET;
531533
int rc;
@@ -579,10 +581,6 @@ copy_read_data(void *outbuf, int minread, int maxread)
579581
ResetLatch(&MyProc->procLatch);
580582
}
581583

582-
/* Check for exit condition. */
583-
if (got_SIGTERM)
584-
proc_exit(0);
585-
586584
return bytesread;
587585
}
588586

src/backend/replication/logical/worker.c

+31-3
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@
7272
#include "storage/proc.h"
7373
#include "storage/procarray.h"
7474

75+
#include "tcop/tcopprot.h"
76+
7577
#include "utils/builtins.h"
7678
#include "utils/catcache.h"
7779
#include "utils/datum.h"
@@ -118,6 +120,9 @@ static void store_flush_position(XLogRecPtr remote_lsn);
118120

119121
static void reread_subscription(void);
120122

123+
/* Flags set by signal handlers */
124+
static volatile sig_atomic_t got_SIGHUP = false;
125+
121126
/*
122127
* Should this worker apply changes for given relation.
123128
*
@@ -1005,7 +1010,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
10051010
/* mark as idle, before starting to loop */
10061011
pgstat_report_activity(STATE_IDLE, NULL);
10071012

1008-
while (!got_SIGTERM)
1013+
for (;;)
10091014
{
10101015
pgsocket fd = PGINVALID_SOCKET;
10111016
int rc;
@@ -1015,6 +1020,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
10151020
TimestampTz last_recv_timestamp = GetCurrentTimestamp();
10161021
bool ping_sent = false;
10171022

1023+
CHECK_FOR_INTERRUPTS();
1024+
10181025
MemoryContextSwitchTo(ApplyMessageContext);
10191026

10201027
len = walrcv_receive(wrconn, &buf, &fd);
@@ -1437,6 +1444,19 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
14371444
MySubscriptionValid = false;
14381445
}
14391446

1447+
/* SIGHUP: set flag to reload configuration at next convenient time */
1448+
static void
1449+
logicalrep_worker_sighup(SIGNAL_ARGS)
1450+
{
1451+
int save_errno = errno;
1452+
1453+
got_SIGHUP = true;
1454+
1455+
/* Waken anything waiting on the process latch */
1456+
SetLatch(MyLatch);
1457+
1458+
errno = save_errno;
1459+
}
14401460

14411461
/* Logical Replication Apply worker entry point */
14421462
void
@@ -1454,7 +1474,7 @@ ApplyWorkerMain(Datum main_arg)
14541474

14551475
/* Setup signal handling */
14561476
pqsignal(SIGHUP, logicalrep_worker_sighup);
1457-
pqsignal(SIGTERM, logicalrep_worker_sigterm);
1477+
pqsignal(SIGTERM, die);
14581478
BackgroundWorkerUnblockSignals();
14591479

14601480
/* Initialise stats to a sanish value */
@@ -1604,6 +1624,14 @@ ApplyWorkerMain(Datum main_arg)
16041624
/* Run the main loop. */
16051625
LogicalRepApplyLoop(origin_startpos);
16061626

1607-
/* We should only get here if we received SIGTERM */
16081627
proc_exit(0);
16091628
}
1629+
1630+
/*
1631+
* Is current process a logical replication worker?
1632+
*/
1633+
bool
1634+
IsLogicalWorker(void)
1635+
{
1636+
return MyLogicalRepWorker != NULL;
1637+
}

src/backend/tcop/postgres.c

+5
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
#include "pg_getopt.h"
5656
#include "postmaster/autovacuum.h"
5757
#include "postmaster/postmaster.h"
58+
#include "replication/logicalworker.h"
5859
#include "replication/slot.h"
5960
#include "replication/walsender.h"
6061
#include "rewrite/rewriteHandler.h"
@@ -2845,6 +2846,10 @@ ProcessInterrupts(void)
28452846
ereport(FATAL,
28462847
(errcode(ERRCODE_ADMIN_SHUTDOWN),
28472848
errmsg("terminating autovacuum process due to administrator command")));
2849+
else if (IsLogicalWorker())
2850+
ereport(FATAL,
2851+
(errcode(ERRCODE_ADMIN_SHUTDOWN),
2852+
errmsg("terminating logical replication worker due to administrator command")));
28482853
else if (RecoveryConflictPending && RecoveryConflictRetryable)
28492854
{
28502855
pgstat_report_recovery_conflict(RecoveryConflictReason);

src/include/replication/logicalworker.h

+2
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,6 @@
1414

1515
extern void ApplyWorkerMain(Datum main_arg);
1616

17+
extern bool IsLogicalWorker(void);
18+
1719
#endif /* LOGICALWORKER_H */

src/include/replication/worker_internal.h

-4
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ extern Subscription *MySubscription;
6767
extern LogicalRepWorker *MyLogicalRepWorker;
6868

6969
extern bool in_remote_transaction;
70-
extern volatile sig_atomic_t got_SIGHUP;
71-
extern volatile sig_atomic_t got_SIGTERM;
7270

7371
extern void logicalrep_worker_attach(int slot);
7472
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
@@ -81,8 +79,6 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
8179

8280
extern int logicalrep_sync_worker_count(Oid subid);
8381

84-
extern void logicalrep_worker_sighup(SIGNAL_ARGS);
85-
extern void logicalrep_worker_sigterm(SIGNAL_ARGS);
8682
extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
8783
void process_syncing_tables(XLogRecPtr current_lsn);
8884
void invalidate_syncing_table_states(Datum arg, int cacheid,

0 commit comments

Comments
 (0)