Skip to content

Commit 8bdb133

Browse files
author
Amit Kapila
committed
Avoid repeated decoding of prepared transactions after a restart.
In commit a271a1b, we allowed decoding at prepare time and the prepare was decoded again if there is a restart after decoding it. It was done that way because we can't distinguish between the cases where we have not decoded the prepare because it was prior to consistent snapshot or we have decoded it earlier but restarted. To distinguish between these two cases, we have introduced an initial_consistent_point at the slot level which is an LSN at which we found a consistent point at the time of slot creation. This is also the point where we have exported a snapshot for the initial copy. So, prepare transaction prior to this point are sent along with commit prepared. This commit bumps SNAPBUILD_VERSION because of change in SnapBuild. It will break existing slots which is fine in a major release. Author: Ajin Cherian, based on idea by Andres Freund Reviewed-by: Amit Kapila and Vignesh C Discussion: https://postgr.es/m/d0f60d60-133d-bf8d-bd70-47784d8fabf3@enterprisedb.com
1 parent 6230912 commit 8bdb133

File tree

10 files changed

+61
-67
lines changed

10 files changed

+61
-67
lines changed

contrib/test_decoding/expected/twophase.out

+12-26
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
3333

3434
COMMIT PREPARED 'test_prepared#1';
3535
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
36-
data
37-
----------------------------------------------------
38-
BEGIN
39-
table public.test_prepared1: INSERT: id[integer]:1
40-
table public.test_prepared1: INSERT: id[integer]:2
41-
PREPARE TRANSACTION 'test_prepared#1'
36+
data
37+
-----------------------------------
4238
COMMIT PREPARED 'test_prepared#1'
43-
(5 rows)
39+
(1 row)
4440

4541
-- Test that rollback of a prepared xact is decoded.
4642
BEGIN;
@@ -103,13 +99,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
10399

104100
COMMIT PREPARED 'test_prepared#3';
105101
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
106-
data
107-
-------------------------------------------------------------------------
108-
BEGIN
109-
table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
110-
PREPARE TRANSACTION 'test_prepared#3'
102+
data
103+
-----------------------------------
111104
COMMIT PREPARED 'test_prepared#3'
112-
(4 rows)
105+
(1 row)
113106

114107
-- make sure stuff still works
115108
INSERT INTO test_prepared1 VALUES (6);
@@ -158,14 +151,10 @@ RESET statement_timeout;
158151
COMMIT PREPARED 'test_prepared_lock';
159152
-- consume the commit
160153
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
161-
data
162-
---------------------------------------------------------------------------
163-
BEGIN
164-
table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
165-
table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
166-
PREPARE TRANSACTION 'test_prepared_lock'
154+
data
155+
--------------------------------------
167156
COMMIT PREPARED 'test_prepared_lock'
168-
(5 rows)
157+
(1 row)
169158

170159
-- Test savepoints and sub-xacts. Creating savepoints will create
171160
-- sub-xacts implicitly.
@@ -188,13 +177,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
188177
COMMIT PREPARED 'test_prepared_savepoint';
189178
-- consume the commit
190179
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
191-
data
192-
------------------------------------------------------------
193-
BEGIN
194-
table public.test_prepared_savepoint: INSERT: a[integer]:1
195-
PREPARE TRANSACTION 'test_prepared_savepoint'
180+
data
181+
-------------------------------------------
196182
COMMIT PREPARED 'test_prepared_savepoint'
197-
(4 rows)
183+
(1 row)
198184

199185
-- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
200186
BEGIN;

contrib/test_decoding/expected/twophase_stream.out

+3-25
Original file line numberDiff line numberDiff line change
@@ -60,32 +60,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-
6060
COMMIT PREPARED 'test1';
6161
--should show the COMMIT PREPARED and the other changes in the transaction
6262
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
63-
data
64-
-------------------------------------------------------------
65-
BEGIN
66-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
67-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
68-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
69-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
70-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
71-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
72-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
73-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
74-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
75-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
76-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
77-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
78-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
79-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
80-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
81-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
82-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
83-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
84-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
85-
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
86-
PREPARE TRANSACTION 'test1'
63+
data
64+
-------------------------
8765
COMMIT PREPARED 'test1'
88-
(23 rows)
66+
(1 row)
8967

9068
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
9169
-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.

doc/src/sgml/logicaldecoding.sgml

+2-7
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,6 @@ postgres=# COMMIT PREPARED 'test_prepared1';
191191
postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
192192
lsn | xid | data
193193
-----------+-----+--------------------------------------------
194-
0/1689DC0 | 529 | BEGIN 529
195-
0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
196-
0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
197194
0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
198195
(4 row)
199196

@@ -822,10 +819,8 @@ typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx
822819
<parameter>gid</parameter> field, which is part of the
823820
<parameter>txn</parameter> parameter, can be used in this callback to
824821
check if the plugin has already received this <command>PREPARE</command>
825-
in which case it can skip the remaining changes of the transaction.
826-
This can only happen if the user restarts the decoding after receiving
827-
the <command>PREPARE</command> for a transaction but before receiving
828-
the <command>COMMIT PREPARED</command>, say because of some error.
822+
in which case it can either error out or skip the remaining changes of
823+
the transaction.
829824
<programlisting>
830825
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
831826
ReorderBufferTXN *txn);

src/backend/replication/logical/decode.c

+2
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
730730
if (two_phase)
731731
{
732732
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
733+
SnapBuildInitialConsistentPoint(ctx->snapshot_builder),
733734
commit_time, origin_id, origin_lsn,
734735
parsed->twophase_gid, true);
735736
}
@@ -868,6 +869,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
868869
{
869870
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
870871
abort_time, origin_id, origin_lsn,
872+
InvalidXLogRecPtr,
871873
parsed->twophase_gid, false);
872874
}
873875
else

src/backend/replication/logical/logical.c

+2-1
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options,
207207
ctx->reorder = ReorderBufferAllocate();
208208
ctx->snapshot_builder =
209209
AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
210-
need_full_snapshot);
210+
need_full_snapshot, slot->data.initial_consistent_point);
211211

212212
ctx->reorder->private_data = ctx;
213213

@@ -590,6 +590,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
590590

591591
SpinLockAcquire(&slot->mutex);
592592
slot->data.confirmed_flush = ctx->reader->EndRecPtr;
593+
slot->data.initial_consistent_point = ctx->reader->EndRecPtr;
593594
SpinLockRelease(&slot->mutex);
594595
}
595596

src/backend/replication/logical/reorderbuffer.c

+5-5
Original file line numberDiff line numberDiff line change
@@ -2672,6 +2672,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
26722672
void
26732673
ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
26742674
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2675+
XLogRecPtr initial_consistent_point,
26752676
TimestampTz commit_time, RepOriginId origin_id,
26762677
XLogRecPtr origin_lsn, char *gid, bool is_commit)
26772678
{
@@ -2698,12 +2699,11 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
26982699
/*
26992700
* It is possible that this transaction is not decoded at prepare time
27002701
* either because by that time we didn't have a consistent snapshot or it
2701-
* was decoded earlier but we have restarted. We can't distinguish between
2702-
* those two cases so we send the prepare in both the cases and let
2703-
* downstream decide whether to process or skip it. We don't need to
2704-
* decode the xact for aborts if it is not done already.
2702+
* was decoded earlier but we have restarted. We only need to send the
2703+
* prepare if it was not decoded earlier. We don't need to decode the xact
2704+
* for aborts if it is not done already.
27052705
*/
2706-
if (!rbtxn_prepared(txn) && is_commit)
2706+
if ((txn->final_lsn < initial_consistent_point) && is_commit)
27072707
{
27082708
txn->txn_flags |= RBTXN_PREPARE;
27092709

src/backend/replication/logical/snapbuild.c

+24-2
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,17 @@ struct SnapBuild
164164
*/
165165
XLogRecPtr start_decoding_at;
166166

167+
/*
168+
* LSN at which we found a consistent point at the time of slot creation.
169+
* This is also the point where we have exported a snapshot for the
170+
* initial copy.
171+
*
172+
* The prepared transactions that are not covered by initial snapshot
173+
* needs to be sent later along with commit prepared and they must be
174+
* before this point.
175+
*/
176+
XLogRecPtr initial_consistent_point;
177+
167178
/*
168179
* Don't start decoding WAL until the "xl_running_xacts" information
169180
* indicates there are no running xids with an xid smaller than this.
@@ -269,7 +280,8 @@ SnapBuild *
269280
AllocateSnapshotBuilder(ReorderBuffer *reorder,
270281
TransactionId xmin_horizon,
271282
XLogRecPtr start_lsn,
272-
bool need_full_snapshot)
283+
bool need_full_snapshot,
284+
XLogRecPtr initial_consistent_point)
273285
{
274286
MemoryContext context;
275287
MemoryContext oldcontext;
@@ -297,6 +309,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
297309
builder->initial_xmin_horizon = xmin_horizon;
298310
builder->start_decoding_at = start_lsn;
299311
builder->building_full_snapshot = need_full_snapshot;
312+
builder->initial_consistent_point = initial_consistent_point;
300313

301314
MemoryContextSwitchTo(oldcontext);
302315

@@ -356,6 +369,15 @@ SnapBuildCurrentState(SnapBuild *builder)
356369
return builder->state;
357370
}
358371

372+
/*
373+
* Return the LSN at which the snapshot was exported
374+
*/
375+
XLogRecPtr
376+
SnapBuildInitialConsistentPoint(SnapBuild *builder)
377+
{
378+
return builder->initial_consistent_point;
379+
}
380+
359381
/*
360382
* Should the contents of transaction ending at 'ptr' be decoded?
361383
*/
@@ -1422,7 +1444,7 @@ typedef struct SnapBuildOnDisk
14221444
offsetof(SnapBuildOnDisk, version)
14231445

14241446
#define SNAPBUILD_MAGIC 0x51A1E001
1425-
#define SNAPBUILD_VERSION 3
1447+
#define SNAPBUILD_VERSION 4
14261448

14271449
/*
14281450
* Store/Load a snapshot from disk, depending on the snapshot builder's state.

src/include/replication/reorderbuffer.h

+1
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,7 @@ void ReorderBufferCommit(ReorderBuffer *, TransactionId,
643643
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
644644
void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
645645
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
646+
XLogRecPtr initial_consistent_point,
646647
TimestampTz commit_time,
647648
RepOriginId origin_id, XLogRecPtr origin_lsn,
648649
char *gid, bool is_commit);

src/include/replication/slot.h

+7
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,13 @@ typedef struct ReplicationSlotPersistentData
9191
*/
9292
XLogRecPtr confirmed_flush;
9393

94+
/*
95+
* LSN at which we found a consistent point at the time of slot creation.
96+
* This is also the point where we have exported a snapshot for the
97+
* initial copy.
98+
*/
99+
XLogRecPtr initial_consistent_point;
100+
94101
/* plugin name */
95102
NameData plugin;
96103
} ReplicationSlotPersistentData;

src/include/replication/snapbuild.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ extern void CheckPointSnapBuild(void);
6161

6262
extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
6363
TransactionId xmin_horizon, XLogRecPtr start_lsn,
64-
bool need_full_snapshot);
64+
bool need_full_snapshot,
65+
XLogRecPtr initial_consistent_point);
6566
extern void FreeSnapshotBuilder(SnapBuild *cache);
6667

6768
extern void SnapBuildSnapDecRefcount(Snapshot snap);
@@ -75,6 +76,7 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
7576
TransactionId xid);
7677

7778
extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
79+
extern XLogRecPtr SnapBuildInitialConsistentPoint(SnapBuild *builder);
7880

7981
extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
8082
TransactionId xid, int nsubxacts,

0 commit comments

Comments
 (0)