Skip to content

Commit 942d916

Browse files
committed
postpone ReorderBuffer cleanup
1 parent 3539109 commit 942d916

File tree

3 files changed

+24
-6
lines changed

3 files changed

+24
-6
lines changed

contrib/test_decoding/expected/prepared.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'in
9898
BEGIN
9999
table public.test_prepared2: INSERT: id[integer]:9
100100
COMMIT
101-
(27 rows)
101+
(28 rows)
102102

103103
SELECT pg_drop_replication_slot('regression_slot');
104104
pg_drop_replication_slot

src/backend/replication/logical/decode.c

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -570,8 +570,13 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
570570
* Process invalidation messages, even if we're not interested in the
571571
* transaction's contents, since the various caches need to always be
572572
* consistent.
573+
*
574+
* Also if that transaction was sent to prepare callback then both
575+
* this function were called during prepare.
573576
*/
574-
if (parsed->nmsgs > 0)
577+
if (parsed->nmsgs > 0 &&
578+
!(TransactionIdIsValid(parsed->twophase_xid) &&
579+
ReorderBufferTxnIsPrepared(ctx->reorder, xid)))
575580
{
576581
ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
577582
parsed->nmsgs, parsed->msgs);
@@ -634,8 +639,10 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
634639
* empty. So we can skip use shortcut for coomiting bare xact.
635640
*/
636641
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
637-
commit_time, origin_id, origin_lsn, parsed->twophase_gid, true);
638-
} else {
642+
commit_time, origin_id, origin_lsn, parsed->twophase_gid, true);
643+
}
644+
else
645+
{
639646
/* replay actions of all transaction + subtransactions in order */
640647
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
641648
commit_time, origin_id, origin_lsn);
@@ -724,7 +731,9 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
724731
* If that is ROLLBACK PREPARED than send that to callbacks.
725732
*/
726733
if (TransactionIdIsValid(xid) &&
734+
!SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) &&
727735
parsed->dbId == ctx->slot->data.database &&
736+
!FilterByOrigin(ctx, origin_id) &&
728737
ReorderBufferTxnIsPrepared(ctx->reorder, xid))
729738
{
730739
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,

src/backend/replication/logical/reorderbuffer.c

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1633,8 +1633,12 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn,
16331633
if (snapshot_now->copied)
16341634
ReorderBufferFreeSnap(rb, snapshot_now);
16351635

1636-
/* remove potential on-disk data, and deallocate */
1637-
ReorderBufferCleanupTXN(rb, txn);
1636+
/*
1637+
* remove potential on-disk data, and deallocate or postpone that
1638+
* till the finish of two-phase tx
1639+
*/
1640+
if (!txn->prepared)
1641+
ReorderBufferCleanupTXN(rb, txn);
16381642
}
16391643
PG_CATCH();
16401644
{
@@ -1735,6 +1739,10 @@ ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid)
17351739
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
17361740
false);
17371741

1742+
/*
1743+
* If txn == NULL then presumably subscriber confirmed prepare
1744+
* but we are rebooted.
1745+
*/
17381746
return txn == NULL ? true : txn->prepared;
17391747
}
17401748

@@ -1765,6 +1773,7 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
17651773
else
17661774
rb->abort_prepared(rb, txn, commit_lsn);
17671775

1776+
ReorderBufferCleanupTXN(rb, txn);
17681777
}
17691778

17701779
/*

0 commit comments

Comments
 (0)