Skip to content

Commit de829dd

Browse files
committed
Add condition variable for walreceiver shutdown.
Use this new CV to wait for walreceiver shutdown without a sleep/poll loop, while also benefiting from standard postmaster death handling. Discussion: https://postgr.es/m/CA%2BhUKGK1607VmtrDUHQXrsooU%3Dap4g4R2yaoByWOOA3m8xevUQ%40mail.gmail.com
1 parent 600f2f5 commit de829dd

File tree

6 files changed

+41
-13
lines changed

6 files changed

+41
-13
lines changed

doc/src/sgml/monitoring.sgml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1766,6 +1766,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
17661766
<entry>Waiting for confirmation from a remote server during synchronous
17671767
replication.</entry>
17681768
</row>
1769+
<row>
1770+
<entry><literal>WalrcvExit</literal></entry>
1771+
<entry>Waiting for the walreceiver to exit.</entry>
1772+
</row>
17691773
<row>
17701774
<entry><literal>XactGroupUpdate</literal></entry>
17711775
<entry>Waiting for the group leader to update transaction status at

src/backend/postmaster/pgstat.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4124,6 +4124,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
41244124
case WAIT_EVENT_SYNC_REP:
41254125
event_name = "SyncRep";
41264126
break;
4127+
case WAIT_EVENT_WALRCV_EXIT:
4128+
event_name = "WalrcvExit";
4129+
break;
41274130
case WAIT_EVENT_XACT_GROUP_UPDATE:
41284131
event_name = "XactGroupUpdate";
41294132
break;

src/backend/replication/walreceiver.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ WalReceiverMain(void)
207207

208208
case WALRCV_STOPPED:
209209
SpinLockRelease(&walrcv->mutex);
210+
ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
210211
proc_exit(1);
211212
break;
212213

@@ -784,6 +785,8 @@ WalRcvDie(int code, Datum arg)
784785
walrcv->latch = NULL;
785786
SpinLockRelease(&walrcv->mutex);
786787

788+
ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
789+
787790
/* Terminate the connection gracefully. */
788791
if (wrconn != NULL)
789792
walrcv_disconnect(wrconn);

src/backend/replication/walreceiverfuncs.c

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <signal.h>
2424

2525
#include "access/xlog_internal.h"
26+
#include "pgstat.h"
2627
#include "postmaster/startup.h"
2728
#include "replication/walreceiver.h"
2829
#include "storage/pmsignal.h"
@@ -62,6 +63,7 @@ WalRcvShmemInit(void)
6263
/* First time through, so initialize */
6364
MemSet(WalRcv, 0, WalRcvShmemSize());
6465
WalRcv->walRcvState = WALRCV_STOPPED;
66+
ConditionVariableInit(&WalRcv->walRcvStoppedCV);
6567
SpinLockInit(&WalRcv->mutex);
6668
pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
6769
WalRcv->latch = NULL;
@@ -95,12 +97,18 @@ WalRcvRunning(void)
9597

9698
if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
9799
{
98-
SpinLockAcquire(&walrcv->mutex);
100+
bool stopped = false;
99101

102+
SpinLockAcquire(&walrcv->mutex);
100103
if (walrcv->walRcvState == WALRCV_STARTING)
104+
{
101105
state = walrcv->walRcvState = WALRCV_STOPPED;
102-
106+
stopped = true;
107+
}
103108
SpinLockRelease(&walrcv->mutex);
109+
110+
if (stopped)
111+
ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
104112
}
105113
}
106114

@@ -140,12 +148,18 @@ WalRcvStreaming(void)
140148

141149
if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
142150
{
143-
SpinLockAcquire(&walrcv->mutex);
151+
bool stopped = false;
144152

153+
SpinLockAcquire(&walrcv->mutex);
145154
if (walrcv->walRcvState == WALRCV_STARTING)
155+
{
146156
state = walrcv->walRcvState = WALRCV_STOPPED;
147-
157+
stopped = true;
158+
}
148159
SpinLockRelease(&walrcv->mutex);
160+
161+
if (stopped)
162+
ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
149163
}
150164
}
151165

@@ -165,6 +179,7 @@ ShutdownWalRcv(void)
165179
{
166180
WalRcvData *walrcv = WalRcv;
167181
pid_t walrcvpid = 0;
182+
bool stopped = false;
168183

169184
/*
170185
* Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
@@ -178,6 +193,7 @@ ShutdownWalRcv(void)
178193
break;
179194
case WALRCV_STARTING:
180195
walrcv->walRcvState = WALRCV_STOPPED;
196+
stopped = true;
181197
break;
182198

183199
case WALRCV_STREAMING:
@@ -191,6 +207,10 @@ ShutdownWalRcv(void)
191207
}
192208
SpinLockRelease(&walrcv->mutex);
193209

210+
/* Unnecessary but consistent. */
211+
if (stopped)
212+
ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
213+
194214
/*
195215
* Signal walreceiver process if it was still running.
196216
*/
@@ -201,16 +221,11 @@ ShutdownWalRcv(void)
201221
* Wait for walreceiver to acknowledge its death by setting state to
202222
* WALRCV_STOPPED.
203223
*/
224+
ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
204225
while (WalRcvRunning())
205-
{
206-
/*
207-
* This possibly-long loop needs to handle interrupts of startup
208-
* process.
209-
*/
210-
HandleStartupProcInterrupts();
211-
212-
pg_usleep(100000); /* 100ms */
213-
}
226+
ConditionVariableSleep(&walrcv->walRcvStoppedCV,
227+
WAIT_EVENT_WALRCV_EXIT);
228+
ConditionVariableCancelSleep();
214229
}
215230

216231
/*

src/include/pgstat.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,6 +1009,7 @@ typedef enum
10091009
WAIT_EVENT_REPLICATION_SLOT_DROP,
10101010
WAIT_EVENT_SAFE_SNAPSHOT,
10111011
WAIT_EVENT_SYNC_REP,
1012+
WAIT_EVENT_WALRCV_EXIT,
10121013
WAIT_EVENT_XACT_GROUP_UPDATE
10131014
} WaitEventIPC;
10141015

src/include/replication/walreceiver.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "port/atomics.h"
2020
#include "replication/logicalproto.h"
2121
#include "replication/walsender.h"
22+
#include "storage/condition_variable.h"
2223
#include "storage/latch.h"
2324
#include "storage/spin.h"
2425
#include "utils/tuplestore.h"
@@ -62,6 +63,7 @@ typedef struct
6263
*/
6364
pid_t pid;
6465
WalRcvState walRcvState;
66+
ConditionVariable walRcvStoppedCV;
6567
pg_time_t startTime;
6668

6769
/*

0 commit comments

Comments
 (0)