Skip to content

Commit 7d2f872

Browse files
committed
allow to skip prepared tx
1 parent 77c26d0 commit 7d2f872

File tree

7 files changed

+217
-77
lines changed

7 files changed

+217
-77
lines changed

contrib/test_decoding/expected/prepared.out

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ BEGIN;
2525
INSERT INTO test_prepared1 VALUES (5);
2626
ALTER TABLE test_prepared1 ADD COLUMN data text;
2727
INSERT INTO test_prepared1 VALUES (6, 'frakbar');
28-
LOCK test_prepared1;
2928
PREPARE TRANSACTION 'test_prepared#3';
3029
-- test that we decode correctly while an uncommitted prepared xact
3130
-- with ddl exists.
@@ -45,33 +44,27 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
4544
-------------------------------------------------------------------------
4645
BEGIN
4746
table public.test_prepared1: INSERT: id[integer]:1
48-
PREPARE
49-
COMMIT PREPARED
47+
COMMIT
5048
BEGIN
5149
table public.test_prepared1: INSERT: id[integer]:2
5250
COMMIT
5351
BEGIN
54-
table public.test_prepared1: INSERT: id[integer]:3
55-
PREPARE
56-
ABORT PREPARED
57-
BEGIN
5852
table public.test_prepared1: INSERT: id[integer]:4
5953
COMMIT
6054
BEGIN
55+
table public.test_prepared2: INSERT: id[integer]:7
56+
COMMIT
57+
BEGIN
6158
table public.test_prepared1: INSERT: id[integer]:5
6259
table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
63-
PREPARE
64-
BEGIN
65-
table public.test_prepared2: INSERT: id[integer]:7
6660
COMMIT
67-
COMMIT PREPARED
6861
BEGIN
6962
table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
7063
COMMIT
7164
BEGIN
7265
table public.test_prepared2: INSERT: id[integer]:9
7366
COMMIT
74-
(28 rows)
67+
(22 rows)
7568

7669
SELECT pg_drop_replication_slot('regression_slot');
7770
pg_drop_replication_slot

contrib/test_decoding/sql/prepared.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,16 @@ BEGIN;
2525
INSERT INTO test_prepared1 VALUES (5);
2626
ALTER TABLE test_prepared1 ADD COLUMN data text;
2727
INSERT INTO test_prepared1 VALUES (6, 'frakbar');
28-
LOCK test_prepared1;
2928
PREPARE TRANSACTION 'test_prepared#3';
3029

3130
-- test that we decode correctly while an uncommitted prepared xact
3231
-- with ddl exists.
3332

3433
-- separate table because of the lock from the ALTER
34+
-- this will come before the '5' row above, as this commits before it.
3535
INSERT INTO test_prepared2 VALUES (7);
3636

37-
ROLLBACK PREPARED 'test_prepared#3';
37+
COMMIT PREPARED 'test_prepared#3';
3838

3939
-- make sure stuff still works
4040
INSERT INTO test_prepared1 VALUES (8);
@@ -47,4 +47,4 @@ DROP TABLE test_prepared2;
4747
-- show results
4848
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
4949

50-
SELECT pg_drop_replication_slot('regression_slot');
50+
SELECT pg_drop_replication_slot('regression_slot');

contrib/test_decoding/test_decoding.c

Lines changed: 123 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ typedef struct
4646
bool skip_empty_xacts;
4747
bool xact_wrote_changes;
4848
bool only_local;
49+
bool twophase_decoding;
4950
} TestDecodingData;
5051

5152
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -68,6 +69,19 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
6869
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
6970
bool transactional, const char *prefix,
7071
Size sz, const char *message);
72+
static bool pg_filter_prepare(LogicalDecodingContext *ctx,
73+
ReorderBufferTXN *txn,
74+
char *gid);
75+
static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
76+
ReorderBufferTXN *txn,
77+
XLogRecPtr prepare_lsn);
78+
static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
79+
ReorderBufferTXN *txn,
80+
XLogRecPtr commit_lsn);
81+
static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx,
82+
ReorderBufferTXN *txn,
83+
XLogRecPtr abort_lsn);
84+
7185

7286
void
7387
_PG_init(void)
@@ -85,9 +99,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
8599
cb->begin_cb = pg_decode_begin_txn;
86100
cb->change_cb = pg_decode_change;
87101
cb->commit_cb = pg_decode_commit_txn;
102+
88103
cb->filter_by_origin_cb = pg_decode_filter;
89104
cb->shutdown_cb = pg_decode_shutdown;
90105
cb->message_cb = pg_decode_message;
106+
107+
cb->filter_prepare_cb = pg_filter_prepare;
108+
cb->prepare_cb = pg_decode_prepare_txn;
109+
cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
110+
cb->abort_prepared_cb = pg_decode_abort_prepared_txn;
91111
}
92112

93113

@@ -107,6 +127,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
107127
data->include_timestamp = false;
108128
data->skip_empty_xacts = false;
109129
data->only_local = false;
130+
data->twophase_decoding = false;
110131

111132
ctx->output_plugin_private = data;
112133

@@ -176,6 +197,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
176197
errmsg("could not parse value \"%s\" for parameter \"%s\"",
177198
strVal(elem->arg), elem->defname)));
178199
}
200+
else if (strcmp(elem->defname, "twophase-decoding") == 0)
201+
{
202+
203+
if (elem->arg == NULL)
204+
data->twophase_decoding = true;
205+
else if (!parse_bool(strVal(elem->arg), &data->twophase_decoding))
206+
ereport(ERROR,
207+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
208+
errmsg("could not parse value \"%s\" for parameter \"%s\"",
209+
strVal(elem->arg), elem->defname)));
210+
}
179211
else
180212
{
181213
ereport(ERROR,
@@ -233,21 +265,97 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
233265

234266
OutputPluginPrepareWrite(ctx, true);
235267

236-
switch(txn->xact_action)
237-
{
238-
case XLOG_XACT_COMMIT:
239-
appendStringInfoString(ctx->out, "COMMIT");
240-
break;
241-
case XLOG_XACT_PREPARE:
242-
appendStringInfo(ctx->out, "PREPARE '%s'", txn->gid);
243-
break;
244-
case XLOG_XACT_COMMIT_PREPARED:
245-
appendStringInfo(ctx->out, "COMMIT PREPARED '%s'", txn->gid);
246-
break;
247-
case XLOG_XACT_ABORT_PREPARED:
248-
appendStringInfo(ctx->out, "ABORT PREPARED '%s'", txn->gid);
249-
break;
250-
}
268+
appendStringInfoString(ctx->out, "COMMIT");
269+
270+
if (data->include_xids)
271+
appendStringInfo(ctx->out, " %u", txn->xid);
272+
273+
if (data->include_timestamp)
274+
appendStringInfo(ctx->out, " (at %s)",
275+
timestamptz_to_str(txn->commit_time));
276+
277+
OutputPluginWrite(ctx, true);
278+
}
279+
280+
static bool
281+
pg_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
282+
char *gid)
283+
{
284+
TestDecodingData *data = ctx->output_plugin_private;
285+
286+
// has_catalog_changes?
287+
// LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
288+
289+
// OutputPluginPrepareWrite(ctx, true);
290+
291+
// appendStringInfo(ctx->out, "pg_filter_prepare %s", gid);
292+
293+
// OutputPluginWrite(ctx, true);
294+
return true;
295+
}
296+
297+
298+
/* PREPARE callback */
299+
static void
300+
pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
301+
XLogRecPtr prepare_lsn)
302+
{
303+
TestDecodingData *data = ctx->output_plugin_private;
304+
305+
if (data->skip_empty_xacts && !data->xact_wrote_changes)
306+
return;
307+
308+
OutputPluginPrepareWrite(ctx, true);
309+
310+
appendStringInfo(ctx->out, "PREPARE! '%s'", txn->gid);
311+
312+
if (data->include_xids)
313+
appendStringInfo(ctx->out, " %u", txn->xid);
314+
315+
if (data->include_timestamp)
316+
appendStringInfo(ctx->out, " (at %s)",
317+
timestamptz_to_str(txn->commit_time));
318+
319+
OutputPluginWrite(ctx, true);
320+
}
321+
322+
/* COMMIT PREPARED callback */
323+
static void
324+
pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
325+
XLogRecPtr commit_lsn)
326+
{
327+
TestDecodingData *data = ctx->output_plugin_private;
328+
329+
if (data->skip_empty_xacts && !data->xact_wrote_changes)
330+
return;
331+
332+
OutputPluginPrepareWrite(ctx, true);
333+
334+
appendStringInfo(ctx->out, "COMMIT PREPARED '%s'", txn->gid);
335+
336+
if (data->include_xids)
337+
appendStringInfo(ctx->out, " %u", txn->xid);
338+
339+
if (data->include_timestamp)
340+
appendStringInfo(ctx->out, " (at %s)",
341+
timestamptz_to_str(txn->commit_time));
342+
343+
OutputPluginWrite(ctx, true);
344+
}
345+
346+
/* ABORT PREPARED callback */
347+
static void
348+
pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
349+
XLogRecPtr abort_lsn)
350+
{
351+
TestDecodingData *data = ctx->output_plugin_private;
352+
353+
if (data->skip_empty_xacts && !data->xact_wrote_changes)
354+
return;
355+
356+
OutputPluginPrepareWrite(ctx, true);
357+
358+
appendStringInfo(ctx->out, "ABORT PREPARED '%s'", txn->gid);
251359

252360
if (data->include_xids)
253361
appendStringInfo(ctx->out, " %u", txn->xid);

src/backend/replication/logical/decode.c

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,6 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
224224
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
225225
return;
226226

227-
reorder->xact_action = info;
228-
229227
switch (info)
230228
{
231229
case XLOG_XACT_COMMIT:
@@ -627,14 +625,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
627625
buf->origptr, buf->endptr);
628626
}
629627

630-
if (TransactionIdIsValid(parsed->twophase_xid)) {
628+
if (TransactionIdIsValid(parsed->twophase_xid) &&
629+
ReorderBufferTxnIsPrepared(ctx->reorder, xid))
630+
{
631631
/*
632632
* We are processing COMMIT PREPARED and know that reorder buffer is
633633
* empty. So we can skip use shortcut for coomiting bare xact.
634634
*/
635-
strcpy(ctx->reorder->gid, parsed->twophase_gid);
636-
ReorderBufferCommitBareXact(ctx->reorder, xid, buf->origptr, buf->endptr,
637-
commit_time, origin_id, origin_lsn);
635+
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
636+
commit_time, origin_id, origin_lsn, parsed->twophase_gid, true);
638637
} else {
639638
/* replay actions of all transaction + subtransactions in order */
640639
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
@@ -651,7 +650,6 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
651650
XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
652651
int i;
653652
TransactionId xid = parsed->twophase_xid;
654-
strcpy(ctx->reorder->gid, parsed->twophase_gid);
655653

656654
/*
657655
* Process invalidation messages, even if we're not interested in the
@@ -689,8 +687,8 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
689687
}
690688

691689
/* replay actions of all transaction + subtransactions in order */
692-
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
693-
commit_time, origin_id, origin_lsn);
690+
ReorderBufferPrepare(ctx->reorder, xid, buf->origptr, buf->endptr,
691+
commit_time, origin_id, origin_lsn, parsed->twophase_gid);
694692
}
695693

696694
/*
@@ -709,13 +707,13 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
709707
/*
710708
* If that is ROLLBACK PREPARED than send that to callbacks.
711709
*/
712-
if (TransactionIdIsValid(parsed->twophase_xid)
713-
&& (parsed->dbId == ctx->slot->data.database)) {
714-
715-
strcpy(ctx->reorder->gid, parsed->twophase_gid);
716-
717-
ReorderBufferCommitBareXact(ctx->reorder, xid, buf->origptr, buf->endptr,
718-
commit_time, origin_id, origin_lsn);
710+
if (TransactionIdIsValid(xid) &&
711+
parsed->dbId == ctx->slot->data.database &&
712+
ReorderBufferTxnIsPrepared(ctx->reorder, xid))
713+
{
714+
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
715+
commit_time, origin_id, origin_lsn,
716+
parsed->twophase_gid, false);
719717
return;
720718
}
721719

src/backend/replication/logical/logical.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -814,8 +814,8 @@ filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, char *gid
814814
errcallback.previous = error_context_stack;
815815
error_context_stack = &errcallback;
816816

817-
// /* set output state */
818-
// ctx->accept_writes = false;
817+
/* set output state */
818+
ctx->accept_writes = true; // ->false
819819

820820
/* do the actual work: call callback */
821821
ret = ctx->callbacks.filter_prepare_cb(ctx, txn, gid);

0 commit comments

Comments
 (0)