Skip to content

Commit 8705626

Browse files
committed
Fix walsender timeouts when decoding a large transaction
The logical slots have a fast code path for sending data so as not to impose too high a per message overhead. The fast path skips checks for interrupts and timeouts. However, the existing coding failed to consider the fact that a transaction with a large number of changes may take a very long time to be processed and sent to the client. This causes the walsender to ignore interrupts for potentially a long time and more importantly it will result in the walsender being killed due to timeout at the end of such a transaction. This commit changes the fast path to also check for interrupts and only allows calling the fast path when the last keepalive check happened less than half the walsender timeout ago. Otherwise the slower code path will be taken. Backpatched to 9.4 Petr Jelinek, reviewed by Kyotaro HORIGUCHI, Yura Sokolov, Craig Ringer and Robert Haas. Discussion: https://postgr.es/m/e082a56a-fd95-a250-3bae-0fff93832510@2ndquadrant.com
1 parent 05f239e commit 8705626

File tree

1 file changed

+38
-28
lines changed

1 file changed

+38
-28
lines changed

src/backend/replication/walsender.c

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1068,6 +1068,9 @@ static void
10681068
WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
10691069
bool last_write)
10701070
{
1071+
TimestampTz now;
1072+
int64 now_int;
1073+
10711074
/* output previously gathered data in a CopyData packet */
10721075
pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
10731076

@@ -1077,23 +1080,54 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
10771080
* several releases by streaming physical replication.
10781081
*/
10791082
resetStringInfo(&tmpbuf);
1080-
pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
1083+
now_int = GetCurrentIntegerTimestamp();
1084+
now = IntegerTimestampToTimestampTz(now_int);
1085+
pq_sendint64(&tmpbuf, now_int);
10811086
memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
10821087
tmpbuf.data, sizeof(int64));
10831088

1084-
/* fast path */
1089+
CHECK_FOR_INTERRUPTS();
1090+
10851091
/* Try to flush pending output to the client */
10861092
if (pq_flush_if_writable() != 0)
10871093
WalSndShutdown();
10881094

1089-
if (!pq_is_send_pending())
1095+
/* Try taking fast path unless we get too close to walsender timeout. */
1096+
if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
1097+
wal_sender_timeout / 2) &&
1098+
!pq_is_send_pending())
1099+
{
10901100
return;
1101+
}
10911102

1103+
/* If we have pending write here, go to slow path */
10921104
for (;;)
10931105
{
10941106
int wakeEvents;
10951107
long sleeptime;
1096-
TimestampTz now;
1108+
1109+
/* Check for input from the client */
1110+
ProcessRepliesIfAny();
1111+
1112+
now = GetCurrentTimestamp();
1113+
1114+
/* die if timeout was reached */
1115+
WalSndCheckTimeOut(now);
1116+
1117+
/* Send keepalive if the time has come */
1118+
WalSndKeepaliveIfNecessary(now);
1119+
1120+
if (!pq_is_send_pending())
1121+
break;
1122+
1123+
sleeptime = WalSndComputeSleeptime(now);
1124+
1125+
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1126+
WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
1127+
1128+
/* Sleep until something happens or we time out */
1129+
WaitLatchOrSocket(MyLatch, wakeEvents,
1130+
MyProcPort->sock, sleeptime);
10971131

10981132
/*
10991133
* Emergency bailout if postmaster has died. This is to avoid the
@@ -1115,33 +1149,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
11151149
SyncRepInitConfig();
11161150
}
11171151

1118-
/* Check for input from the client */
1119-
ProcessRepliesIfAny();
1120-
11211152
/* Try to flush pending output to the client */
11221153
if (pq_flush_if_writable() != 0)
11231154
WalSndShutdown();
1124-
1125-
/* If we finished clearing the buffered data, we're done here. */
1126-
if (!pq_is_send_pending())
1127-
break;
1128-
1129-
now = GetCurrentTimestamp();
1130-
1131-
/* die if timeout was reached */
1132-
WalSndCheckTimeOut(now);
1133-
1134-
/* Send keepalive if the time has come */
1135-
WalSndKeepaliveIfNecessary(now);
1136-
1137-
sleeptime = WalSndComputeSleeptime(now);
1138-
1139-
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1140-
WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
1141-
1142-
/* Sleep until something happens or we time out */
1143-
WaitLatchOrSocket(MyLatch, wakeEvents,
1144-
MyProcPort->sock, sleeptime);
11451155
}
11461156

11471157
/* reactivate latch so WalSndLoop knows to continue */

0 commit comments

Comments
 (0)