Skip to content

Commit d6da71f

Browse files
author
Amit Kapila
committed
Fix the logical replication timeout during large transactions.
The problem is that we don't send keep-alive messages for a long time while processing large transactions during logical replication where we don't send any data of such transactions. This can happen when the table modified in the transaction is not published or because all the changes got filtered. We do try to send the keep_alive if necessary at the end of the transaction (via WalSndWriteData()) but by that time the subscriber-side can timeout and exit. To fix this we try to send the keepalive message if required after processing certain threshold of changes. Reported-by: Fabrice Chapuis Author: Wang wei and Amit Kapila Reviewed By: Masahiko Sawada, Euler Taveira, Hou Zhijie, Hayato Kuroda Backpatch-through: 10 Discussion: https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com
1 parent ca9e9b0 commit d6da71f

File tree

4 files changed

+105
-7
lines changed

4 files changed

+105
-7
lines changed

src/backend/replication/logical/logical.c

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
724724

725725
/* set output state */
726726
ctx->accept_writes = false;
727+
ctx->end_xact = false;
727728

728729
/* do the actual work: call callback */
729730
ctx->callbacks.startup_cb(ctx, opt, is_init);
@@ -751,6 +752,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
751752

752753
/* set output state */
753754
ctx->accept_writes = false;
755+
ctx->end_xact = false;
754756

755757
/* do the actual work: call callback */
756758
ctx->callbacks.shutdown_cb(ctx);
@@ -786,6 +788,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
786788
ctx->accept_writes = true;
787789
ctx->write_xid = txn->xid;
788790
ctx->write_location = txn->first_lsn;
791+
ctx->end_xact = false;
789792

790793
/* do the actual work: call callback */
791794
ctx->callbacks.begin_cb(ctx, txn);
@@ -817,6 +820,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
817820
ctx->accept_writes = true;
818821
ctx->write_xid = txn->xid;
819822
ctx->write_location = txn->end_lsn; /* points to the end of the record */
823+
ctx->end_xact = true;
820824

821825
/* do the actual work: call callback */
822826
ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
@@ -857,6 +861,7 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
857861
ctx->accept_writes = true;
858862
ctx->write_xid = txn->xid;
859863
ctx->write_location = txn->first_lsn;
864+
ctx->end_xact = false;
860865

861866
/*
862867
* If the plugin supports two-phase commits then begin prepare callback is
@@ -901,6 +906,7 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
901906
ctx->accept_writes = true;
902907
ctx->write_xid = txn->xid;
903908
ctx->write_location = txn->end_lsn; /* points to the end of the record */
909+
ctx->end_xact = true;
904910

905911
/*
906912
* If the plugin supports two-phase commits then prepare callback is
@@ -945,6 +951,7 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
945951
ctx->accept_writes = true;
946952
ctx->write_xid = txn->xid;
947953
ctx->write_location = txn->end_lsn; /* points to the end of the record */
954+
ctx->end_xact = true;
948955

949956
/*
950957
* If the plugin support two-phase commits then commit prepared callback
@@ -990,6 +997,7 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
990997
ctx->accept_writes = true;
991998
ctx->write_xid = txn->xid;
992999
ctx->write_location = txn->end_lsn; /* points to the end of the record */
1000+
ctx->end_xact = true;
9931001

9941002
/*
9951003
* If the plugin support two-phase commits then rollback prepared callback
@@ -1040,6 +1048,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
10401048
*/
10411049
ctx->write_location = change->lsn;
10421050

1051+
ctx->end_xact = false;
1052+
10431053
ctx->callbacks.change_cb(ctx, txn, relation, change);
10441054

10451055
/* Pop the error context stack */
@@ -1080,6 +1090,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
10801090
*/
10811091
ctx->write_location = change->lsn;
10821092

1093+
ctx->end_xact = false;
1094+
10831095
ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
10841096

10851097
/* Pop the error context stack */
@@ -1107,6 +1119,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
11071119

11081120
/* set output state */
11091121
ctx->accept_writes = false;
1122+
ctx->end_xact = false;
11101123

11111124
/* do the actual work: call callback */
11121125
ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
@@ -1137,6 +1150,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
11371150

11381151
/* set output state */
11391152
ctx->accept_writes = false;
1153+
ctx->end_xact = false;
11401154

11411155
/* do the actual work: call callback */
11421156
ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
@@ -1174,6 +1188,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
11741188
ctx->accept_writes = true;
11751189
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
11761190
ctx->write_location = message_lsn;
1191+
ctx->end_xact = false;
11771192

11781193
/* do the actual work: call callback */
11791194
ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
@@ -1217,6 +1232,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
12171232
*/
12181233
ctx->write_location = first_lsn;
12191234

1235+
ctx->end_xact = false;
1236+
12201237
/* in streaming mode, stream_start_cb is required */
12211238
if (ctx->callbacks.stream_start_cb == NULL)
12221239
ereport(ERROR,
@@ -1264,6 +1281,8 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
12641281
*/
12651282
ctx->write_location = last_lsn;
12661283

1284+
ctx->end_xact = false;
1285+
12671286
/* in streaming mode, stream_stop_cb is required */
12681287
if (ctx->callbacks.stream_stop_cb == NULL)
12691288
ereport(ERROR,
@@ -1303,6 +1322,7 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
13031322
ctx->accept_writes = true;
13041323
ctx->write_xid = txn->xid;
13051324
ctx->write_location = abort_lsn;
1325+
ctx->end_xact = true;
13061326

13071327
/* in streaming mode, stream_abort_cb is required */
13081328
if (ctx->callbacks.stream_abort_cb == NULL)
@@ -1347,6 +1367,7 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
13471367
ctx->accept_writes = true;
13481368
ctx->write_xid = txn->xid;
13491369
ctx->write_location = txn->end_lsn;
1370+
ctx->end_xact = true;
13501371

13511372
/* in streaming mode with two-phase commits, stream_prepare_cb is required */
13521373
if (ctx->callbacks.stream_prepare_cb == NULL)
@@ -1387,6 +1408,7 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
13871408
ctx->accept_writes = true;
13881409
ctx->write_xid = txn->xid;
13891410
ctx->write_location = txn->end_lsn;
1411+
ctx->end_xact = true;
13901412

13911413
/* in streaming mode, stream_commit_cb is required */
13921414
if (ctx->callbacks.stream_commit_cb == NULL)
@@ -1435,6 +1457,8 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
14351457
*/
14361458
ctx->write_location = change->lsn;
14371459

1460+
ctx->end_xact = false;
1461+
14381462
/* in streaming mode, stream_change_cb is required */
14391463
if (ctx->callbacks.stream_change_cb == NULL)
14401464
ereport(ERROR,
@@ -1479,6 +1503,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
14791503
ctx->accept_writes = true;
14801504
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
14811505
ctx->write_location = message_lsn;
1506+
ctx->end_xact = false;
14821507

14831508
/* do the actual work: call callback */
14841509
ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
@@ -1527,6 +1552,8 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
15271552
*/
15281553
ctx->write_location = change->lsn;
15291554

1555+
ctx->end_xact = false;
1556+
15301557
ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
15311558

15321559
/* Pop the error context stack */

src/backend/replication/pgoutput/pgoutput.c

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ static void publication_invalidation_cb(Datum arg, int cacheid,
7070
uint32 hashvalue);
7171
static void send_relation_and_attrs(Relation relation, TransactionId xid,
7272
LogicalDecodingContext *ctx);
73+
static void update_replication_progress(LogicalDecodingContext *ctx);
7374

7475
/*
7576
* Entry in the map used to remember which relation schemas we sent.
@@ -381,7 +382,7 @@ static void
381382
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
382383
XLogRecPtr commit_lsn)
383384
{
384-
OutputPluginUpdateProgress(ctx);
385+
update_replication_progress(ctx);
385386

386387
OutputPluginPrepareWrite(ctx, true);
387388
logicalrep_write_commit(ctx->out, txn, commit_lsn);
@@ -535,6 +536,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
535536
TransactionId xid = InvalidTransactionId;
536537
Relation ancestor = NULL;
537538

539+
update_replication_progress(ctx);
540+
538541
if (!is_publishable_relation(relation))
539542
return;
540543

@@ -677,6 +680,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
677680
Oid *relids;
678681
TransactionId xid = InvalidTransactionId;
679682

683+
update_replication_progress(ctx);
684+
680685
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
681686
if (in_streaming)
682687
xid = change->txn->xid;
@@ -735,6 +740,8 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
735740
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
736741
TransactionId xid = InvalidTransactionId;
737742

743+
update_replication_progress(ctx);
744+
738745
if (!data->messages)
739746
return;
740747

@@ -921,7 +928,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
921928
Assert(!in_streaming);
922929
Assert(rbtxn_is_streamed(txn));
923930

924-
OutputPluginUpdateProgress(ctx);
931+
update_replication_progress(ctx);
925932

926933
OutputPluginPrepareWrite(ctx, true);
927934
logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1304,3 +1311,36 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
13041311
entry->pubactions.pubtruncate = false;
13051312
}
13061313
}
1314+
1315+
/*
1316+
* Try to update progress and send a keepalive message if too many changes were
1317+
* processed.
1318+
*
1319+
* For a large transaction, if we don't send any change to the downstream for a
1320+
* long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
1321+
* This can happen when all or most of the changes are not published.
1322+
*/
1323+
static void
1324+
update_replication_progress(LogicalDecodingContext *ctx)
1325+
{
1326+
static int changes_count = 0;
1327+
1328+
/*
1329+
* We don't want to try sending a keepalive message after processing each
1330+
* change as that can have overhead. Tests revealed that there is no
1331+
* noticeable overhead in doing it after continuously processing 100 or so
1332+
* changes.
1333+
*/
1334+
#define CHANGES_THRESHOLD 100
1335+
1336+
/*
1337+
* If we are at the end of transaction LSN, update progress tracking.
1338+
* Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
1339+
* try to send a keepalive message if required.
1340+
*/
1341+
if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
1342+
{
1343+
OutputPluginUpdateProgress(ctx);
1344+
changes_count = 0;
1345+
}
1346+
}

src/backend/replication/walsender.c

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ static void ProcessStandbyMessage(void);
240240
static void ProcessStandbyReplyMessage(void);
241241
static void ProcessStandbyHSFeedbackMessage(void);
242242
static void ProcessRepliesIfAny(void);
243+
static void ProcessPendingWrites(void);
243244
static void WalSndKeepalive(bool requestReply);
244245
static void WalSndKeepaliveIfNecessary(void);
245246
static void WalSndCheckTimeOut(void);
@@ -1288,6 +1289,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
12881289
}
12891290

12901291
/* If we have pending write here, go to slow path */
1292+
ProcessPendingWrites();
1293+
}
1294+
1295+
/*
1296+
* Wait until there is no pending write. Also process replies from the other
1297+
* side and check timeouts during that.
1298+
*/
1299+
static void
1300+
ProcessPendingWrites(void)
1301+
{
12911302
for (;;)
12921303
{
12931304
long sleeptime;
@@ -1342,18 +1353,35 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
13421353
{
13431354
static TimestampTz sendTime = 0;
13441355
TimestampTz now = GetCurrentTimestamp();
1356+
bool end_xact = ctx->end_xact;
13451357

13461358
/*
13471359
* Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
13481360
* avoid flooding the lag tracker when we commit frequently.
1361+
*
1362+
* We don't have a mechanism to get the ack for any LSN other than end
1363+
* xact LSN from the downstream. So, we track lag only for end of
1364+
* transaction LSN.
13491365
*/
13501366
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1351-
if (!TimestampDifferenceExceeds(sendTime, now,
1352-
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1353-
return;
1367+
if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1368+
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1369+
{
1370+
LagTrackerWrite(lsn, now);
1371+
sendTime = now;
1372+
}
13541373

1355-
LagTrackerWrite(lsn, now);
1356-
sendTime = now;
1374+
/*
1375+
* Try to send a keepalive if required. We don't need to try sending keep
1376+
* alive messages at the transaction end as that will be done at a later
1377+
* point in time. This is required only for large transactions where we
1378+
* don't send any changes to the downstream and the receiver can timeout
1379+
* due to that.
1380+
*/
1381+
if (!end_xact &&
1382+
now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
1383+
wal_sender_timeout / 2))
1384+
ProcessPendingWrites();
13571385
}
13581386

13591387
/*

src/include/replication/logical.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ typedef struct LogicalDecodingContext
4949
*/
5050
bool fast_forward;
5151

52+
/* Are we processing the end LSN of a transaction? */
53+
bool end_xact;
54+
5255
OutputPluginCallbacks callbacks;
5356
OutputPluginOptions options;
5457

0 commit comments

Comments
 (0)