Skip to content

Commit f5c7e0c

Browse files
committed
Fix walsender timeouts when decoding a large transaction
The logical slots have a fast code path for sending data so as not to impose too high a per message overhead. The fast path skips checks for interrupts and timeouts. However, the existing coding failed to consider the fact that a transaction with a large number of changes may take a very long time to be processed and sent to the client. This causes the walsender to ignore interrupts for potentially a long time and more importantly it will result in the walsender being killed due to timeout at the end of such a transaction. This commit changes the fast path to also check for interrupts and only allows calling the fast path when the last keepalive check happened less than half the walsender timeout ago. Otherwise the slower code path will be taken. Backpatched to 9.4 Petr Jelinek, reviewed by Kyotaro HORIGUCHI, Yura Sokolov, Craig Ringer and Robert Haas. Discussion: https://postgr.es/m/e082a56a-fd95-a250-3bae-0fff93832510@2ndquadrant.com
1 parent 239b01e commit f5c7e0c

File tree

1 file changed

+37
-30
lines changed

1 file changed

+37
-30
lines changed

src/backend/replication/walsender.c

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,6 +1066,9 @@ static void
10661066
WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
10671067
bool last_write)
10681068
{
1069+
TimestampTz now;
1070+
int64 now_int;
1071+
10691072
/* output previously gathered data in a CopyData packet */
10701073
pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
10711074

@@ -1075,53 +1078,35 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
10751078
* several releases by streaming physical replication.
10761079
*/
10771080
resetStringInfo(&tmpbuf);
1078-
pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
1081+
now_int = GetCurrentIntegerTimestamp();
1082+
now = IntegerTimestampToTimestampTz(now_int);
1083+
pq_sendint64(&tmpbuf, now_int);
10791084
memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
10801085
tmpbuf.data, sizeof(int64));
10811086

1082-
/* fast path */
1087+
CHECK_FOR_INTERRUPTS();
1088+
10831089
/* Try to flush pending output to the client */
10841090
if (pq_flush_if_writable() != 0)
10851091
WalSndShutdown();
10861092

1087-
if (!pq_is_send_pending())
1093+
/* Try taking fast path unless we get too close to walsender timeout. */
1094+
if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
1095+
wal_sender_timeout / 2) &&
1096+
!pq_is_send_pending())
1097+
{
10881098
return;
1099+
}
10891100

1101+
/* If we have pending write here, go to slow path */
10901102
for (;;)
10911103
{
10921104
int wakeEvents;
10931105
long sleeptime;
1094-
TimestampTz now;
1095-
1096-
/*
1097-
* Emergency bailout if postmaster has died. This is to avoid the
1098-
* necessity for manual cleanup of all postmaster children.
1099-
*/
1100-
if (!PostmasterIsAlive())
1101-
exit(1);
1102-
1103-
/* Process any requests or signals received recently */
1104-
if (ConfigReloadPending)
1105-
{
1106-
ConfigReloadPending = false;
1107-
ProcessConfigFile(PGC_SIGHUP);
1108-
SyncRepInitConfig();
1109-
}
11101106

11111107
/* Check for input from the client */
11121108
ProcessRepliesIfAny();
11131109

1114-
/* Clear any already-pending wakeups */
1115-
ResetLatch(&MyWalSnd->latch);
1116-
1117-
/* Try to flush pending output to the client */
1118-
if (pq_flush_if_writable() != 0)
1119-
WalSndShutdown();
1120-
1121-
/* If we finished clearing the buffered data, we're done here. */
1122-
if (!pq_is_send_pending())
1123-
break;
1124-
11251110
now = GetCurrentTimestamp();
11261111

11271112
/* die if timeout was reached */
@@ -1130,6 +1115,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
11301115
/* Send keepalive if the time has come */
11311116
WalSndKeepaliveIfNecessary(now);
11321117

1118+
if (!pq_is_send_pending())
1119+
break;
1120+
11331121
sleeptime = WalSndComputeSleeptime(now);
11341122

11351123
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
@@ -1141,6 +1129,25 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
11411129
WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
11421130
MyProcPort->sock, sleeptime);
11431131
ImmediateInterruptOK = false;
1132+
1133+
/*
1134+
* Emergency bailout if postmaster has died. This is to avoid the
1135+
* necessity for manual cleanup of all postmaster children.
1136+
*/
1137+
if (!PostmasterIsAlive())
1138+
exit(1);
1139+
1140+
/* Process any requests or signals received recently */
1141+
if (ConfigReloadPending)
1142+
{
1143+
ConfigReloadPending = false;
1144+
ProcessConfigFile(PGC_SIGHUP);
1145+
SyncRepInitConfig();
1146+
}
1147+
1148+
/* Try to flush pending output to the client */
1149+
if (pq_flush_if_writable() != 0)
1150+
WalSndShutdown();
11441151
}
11451152

11461153
/* reactivate latch so WalSndLoop knows to continue */

0 commit comments

Comments
 (0)