@@ -762,15 +762,14 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
762
762
/* make sure we have enough WAL available */
763
763
flushptr = WalSndWaitForWal (targetPagePtr + reqLen );
764
764
765
- /* more than one block available */
766
- if (targetPagePtr + XLOG_BLCKSZ <= flushptr )
767
- count = XLOG_BLCKSZ ;
768
- /* not enough WAL synced, that can happen during shutdown */
769
- else if (targetPagePtr + reqLen > flushptr )
765
+ /* fail if not (implies we are going to shut down) */
766
+ if (flushptr < targetPagePtr + reqLen )
770
767
return -1 ;
771
- /* part of the page available */
768
+
769
+ if (targetPagePtr + XLOG_BLCKSZ <= flushptr )
770
+ count = XLOG_BLCKSZ ; /* more than one block available */
772
771
else
773
- count = flushptr - targetPagePtr ;
772
+ count = flushptr - targetPagePtr ; /* part of the page available */
774
773
775
774
/* now actually read the data, we know it's there */
776
775
XLogRead (cur_page , targetPagePtr , XLOG_BLCKSZ );
@@ -1149,7 +1148,11 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
1149
1148
}
1150
1149
1151
1150
/*
1152
- * Wait till WAL < loc is flushed to disk so it can be safely read.
1151
+ * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1152
+ *
1153
+ * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1154
+ * if we detect a shutdown request (either from postmaster or client)
1155
+ * we will return early, so caller must always check.
1153
1156
*/
1154
1157
static XLogRecPtr
1155
1158
WalSndWaitForWal (XLogRecPtr loc )
@@ -1214,9 +1217,7 @@ WalSndWaitForWal(XLogRecPtr loc)
1214
1217
RecentFlushPtr = GetXLogReplayRecPtr (NULL );
1215
1218
1216
1219
/*
1217
- * If postmaster asked us to stop, don't wait here anymore. This will
1218
- * cause the xlogreader to return without reading a full record, which
1219
- * is the fastest way to reach the mainloop which then can quit.
1220
+ * If postmaster asked us to stop, don't wait anymore.
1220
1221
*
1221
1222
* It's important to do this check after the recomputation of
1222
1223
* RecentFlushPtr, so we can send all remaining data before shutting
@@ -1247,14 +1248,20 @@ WalSndWaitForWal(XLogRecPtr loc)
1247
1248
WalSndCaughtUp = true;
1248
1249
1249
1250
/*
1250
- * Try to flush pending output to the client. Also wait for the socket
1251
- * becoming writable, if there's still pending output after an attempt
1252
- * to flush. Otherwise we might just sit on output data while waiting
1253
- * for new WAL being generated.
1251
+ * Try to flush any pending output to the client.
1254
1252
*/
1255
1253
if (pq_flush_if_writable () != 0 )
1256
1254
WalSndShutdown ();
1257
1255
1256
+ /*
1257
+ * If we have received CopyDone from the client, sent CopyDone
1258
+ * ourselves, and the output buffer is empty, it's time to exit
1259
+ * streaming, so fail the current WAL fetch request.
1260
+ */
1261
+ if (streamingDoneReceiving && streamingDoneSending &&
1262
+ !pq_is_send_pending ())
1263
+ break ;
1264
+
1258
1265
now = GetCurrentTimestamp ();
1259
1266
1260
1267
/* die if timeout was reached */
@@ -1263,6 +1270,13 @@ WalSndWaitForWal(XLogRecPtr loc)
1263
1270
/* Send keepalive if the time has come */
1264
1271
WalSndKeepaliveIfNecessary (now );
1265
1272
1273
+ /*
1274
+ * Sleep until something happens or we time out. Also wait for the
1275
+ * socket becoming writable, if there's still pending output.
1276
+ * Otherwise we might sit on sendable output data while waiting for
1277
+ * new WAL to be generated. (But if we have nothing to send, we don't
1278
+ * want to wake on socket-writable.)
1279
+ */
1266
1280
sleeptime = WalSndComputeSleeptime (now );
1267
1281
1268
1282
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
@@ -1271,7 +1285,6 @@ WalSndWaitForWal(XLogRecPtr loc)
1271
1285
if (pq_is_send_pending ())
1272
1286
wakeEvents |= WL_SOCKET_WRITEABLE ;
1273
1287
1274
- /* Sleep until something happens or we time out */
1275
1288
ImmediateInterruptOK = true;
1276
1289
CHECK_FOR_INTERRUPTS ();
1277
1290
WaitLatchOrSocket (& MyWalSnd -> latch , wakeEvents ,
@@ -1861,7 +1874,8 @@ WalSndLoop(WalSndSendDataCallback send_data)
1861
1874
* ourselves, and the output buffer is empty, it's time to exit
1862
1875
* streaming.
1863
1876
*/
1864
- if (!pq_is_send_pending () && streamingDoneSending && streamingDoneReceiving )
1877
+ if (streamingDoneReceiving && streamingDoneSending &&
1878
+ !pq_is_send_pending ())
1865
1879
break ;
1866
1880
1867
1881
/*
0 commit comments