23
23
#include <signal.h>
24
24
25
25
#include "access/xlog_internal.h"
26
+ #include "pgstat.h"
26
27
#include "postmaster/startup.h"
27
28
#include "replication/walreceiver.h"
28
29
#include "storage/pmsignal.h"
@@ -62,6 +63,7 @@ WalRcvShmemInit(void)
62
63
/* First time through, so initialize */
63
64
MemSet (WalRcv , 0 , WalRcvShmemSize ());
64
65
WalRcv -> walRcvState = WALRCV_STOPPED ;
66
+ ConditionVariableInit (& WalRcv -> walRcvStoppedCV );
65
67
SpinLockInit (& WalRcv -> mutex );
66
68
pg_atomic_init_u64 (& WalRcv -> writtenUpto , 0 );
67
69
WalRcv -> latch = NULL ;
@@ -95,12 +97,18 @@ WalRcvRunning(void)
95
97
96
98
if ((now - startTime ) > WALRCV_STARTUP_TIMEOUT )
97
99
{
98
- SpinLockAcquire ( & walrcv -> mutex ) ;
100
+ bool stopped = false ;
99
101
102
+ SpinLockAcquire (& walrcv -> mutex );
100
103
if (walrcv -> walRcvState == WALRCV_STARTING )
104
+ {
101
105
state = walrcv -> walRcvState = WALRCV_STOPPED ;
102
-
106
+ stopped = true;
107
+ }
103
108
SpinLockRelease (& walrcv -> mutex );
109
+
110
+ if (stopped )
111
+ ConditionVariableBroadcast (& walrcv -> walRcvStoppedCV );
104
112
}
105
113
}
106
114
@@ -140,12 +148,18 @@ WalRcvStreaming(void)
140
148
141
149
if ((now - startTime ) > WALRCV_STARTUP_TIMEOUT )
142
150
{
143
- SpinLockAcquire ( & walrcv -> mutex ) ;
151
+ bool stopped = false ;
144
152
153
+ SpinLockAcquire (& walrcv -> mutex );
145
154
if (walrcv -> walRcvState == WALRCV_STARTING )
155
+ {
146
156
state = walrcv -> walRcvState = WALRCV_STOPPED ;
147
-
157
+ stopped = true;
158
+ }
148
159
SpinLockRelease (& walrcv -> mutex );
160
+
161
+ if (stopped )
162
+ ConditionVariableBroadcast (& walrcv -> walRcvStoppedCV );
149
163
}
150
164
}
151
165
@@ -165,6 +179,7 @@ ShutdownWalRcv(void)
165
179
{
166
180
WalRcvData * walrcv = WalRcv ;
167
181
pid_t walrcvpid = 0 ;
182
+ bool stopped = false;
168
183
169
184
/*
170
185
* Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
@@ -178,6 +193,7 @@ ShutdownWalRcv(void)
178
193
break ;
179
194
case WALRCV_STARTING :
180
195
walrcv -> walRcvState = WALRCV_STOPPED ;
196
+ stopped = true;
181
197
break ;
182
198
183
199
case WALRCV_STREAMING :
@@ -191,6 +207,10 @@ ShutdownWalRcv(void)
191
207
}
192
208
SpinLockRelease (& walrcv -> mutex );
193
209
210
+ /* Unnecessary but consistent. */
211
+ if (stopped )
212
+ ConditionVariableBroadcast (& walrcv -> walRcvStoppedCV );
213
+
194
214
/*
195
215
* Signal walreceiver process if it was still running.
196
216
*/
@@ -201,16 +221,11 @@ ShutdownWalRcv(void)
201
221
* Wait for walreceiver to acknowledge its death by setting state to
202
222
* WALRCV_STOPPED.
203
223
*/
224
+ ConditionVariablePrepareToSleep (& walrcv -> walRcvStoppedCV );
204
225
while (WalRcvRunning ())
205
- {
206
- /*
207
- * This possibly-long loop needs to handle interrupts of startup
208
- * process.
209
- */
210
- HandleStartupProcInterrupts ();
211
-
212
- pg_usleep (100000 ); /* 100ms */
213
- }
226
+ ConditionVariableSleep (& walrcv -> walRcvStoppedCV ,
227
+ WAIT_EVENT_WALRCV_EXIT );
228
+ ConditionVariableCancelSleep ();
214
229
}
215
230
216
231
/*
0 commit comments