Skip to content

Commit 0aa8a01

Browse files
author
Amit Kapila
committed
Extend the output plugin API to allow decoding of prepared xacts.
This adds six methods to the output plugin API, adding support for streaming changes of two-phase transactions at prepare time. * begin_prepare * filter_prepare * prepare * commit_prepared * rollback_prepared * stream_prepare Most of this is a simple extension of the existing methods, with the semantic difference that the transaction is not yet committed and maybe aborted later. Until now two-phase transactions were translated into regular transactions on the subscriber, and the GID was not forwarded to it. None of the two-phase commands were communicated to the subscriber. This patch provides the infrastructure for logical decoding plugins to be informed of two-phase commands Like PREPARE TRANSACTION, COMMIT PREPARED and ROLLBACK PREPARED commands with the corresponding GID. This also extends the 'test_decoding' plugin, implementing these new methods. This commit simply adds these new APIs and the upcoming patch to "allow the decoding at prepare time in ReorderBuffer" will use these APIs. Author: Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich Reviewed-by: Amit Kapila, Peter Smith, Sawada Masahiko, and Dilip Kumar Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com
1 parent fa74469 commit 0aa8a01

File tree

7 files changed

+744
-7
lines changed

7 files changed

+744
-7
lines changed

contrib/test_decoding/test_decoding.c

+167
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,20 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
7676
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
7777
bool transactional, const char *prefix,
7878
Size sz, const char *message);
79+
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
80+
const char *gid);
81+
static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
82+
ReorderBufferTXN *txn);
83+
static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
84+
ReorderBufferTXN *txn,
85+
XLogRecPtr prepare_lsn);
86+
static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
87+
ReorderBufferTXN *txn,
88+
XLogRecPtr commit_lsn);
89+
static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
90+
ReorderBufferTXN *txn,
91+
XLogRecPtr prepare_end_lsn,
92+
TimestampTz prepare_time);
7993
static void pg_decode_stream_start(LogicalDecodingContext *ctx,
8094
ReorderBufferTXN *txn);
8195
static void pg_output_stream_start(LogicalDecodingContext *ctx,
@@ -87,6 +101,9 @@ static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
87101
static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
88102
ReorderBufferTXN *txn,
89103
XLogRecPtr abort_lsn);
104+
static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
105+
ReorderBufferTXN *txn,
106+
XLogRecPtr prepare_lsn);
90107
static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
91108
ReorderBufferTXN *txn,
92109
XLogRecPtr commit_lsn);
@@ -123,9 +140,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
123140
cb->filter_by_origin_cb = pg_decode_filter;
124141
cb->shutdown_cb = pg_decode_shutdown;
125142
cb->message_cb = pg_decode_message;
143+
cb->filter_prepare_cb = pg_decode_filter_prepare;
144+
cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
145+
cb->prepare_cb = pg_decode_prepare_txn;
146+
cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
147+
cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
126148
cb->stream_start_cb = pg_decode_stream_start;
127149
cb->stream_stop_cb = pg_decode_stream_stop;
128150
cb->stream_abort_cb = pg_decode_stream_abort;
151+
cb->stream_prepare_cb = pg_decode_stream_prepare;
129152
cb->stream_commit_cb = pg_decode_stream_commit;
130153
cb->stream_change_cb = pg_decode_stream_change;
131154
cb->stream_message_cb = pg_decode_stream_message;
@@ -141,6 +164,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
141164
ListCell *option;
142165
TestDecodingData *data;
143166
bool enable_streaming = false;
167+
bool enable_twophase = false;
144168

145169
data = palloc0(sizeof(TestDecodingData));
146170
data->context = AllocSetContextCreate(ctx->context,
@@ -241,6 +265,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
241265
errmsg("could not parse value \"%s\" for parameter \"%s\"",
242266
strVal(elem->arg), elem->defname)));
243267
}
268+
else if (strcmp(elem->defname, "two-phase-commit") == 0)
269+
{
270+
if (elem->arg == NULL)
271+
continue;
272+
else if (!parse_bool(strVal(elem->arg), &enable_twophase))
273+
ereport(ERROR,
274+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
275+
errmsg("could not parse value \"%s\" for parameter \"%s\"",
276+
strVal(elem->arg), elem->defname)));
277+
}
244278
else
245279
{
246280
ereport(ERROR,
@@ -252,6 +286,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
252286
}
253287

254288
ctx->streaming &= enable_streaming;
289+
ctx->twophase &= enable_twophase;
255290
}
256291

257292
/* cleanup this plugin's resources */
@@ -320,6 +355,111 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
320355
OutputPluginWrite(ctx, true);
321356
}
322357

358+
/* BEGIN PREPARE callback */
359+
static void
360+
pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
361+
{
362+
TestDecodingData *data = ctx->output_plugin_private;
363+
TestDecodingTxnData *txndata =
364+
MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
365+
366+
txndata->xact_wrote_changes = false;
367+
txn->output_plugin_private = txndata;
368+
369+
if (data->skip_empty_xacts)
370+
return;
371+
372+
pg_output_begin(ctx, data, txn, true);
373+
}
374+
375+
/* PREPARE callback */
376+
static void
377+
pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
378+
XLogRecPtr prepare_lsn)
379+
{
380+
TestDecodingData *data = ctx->output_plugin_private;
381+
TestDecodingTxnData *txndata = txn->output_plugin_private;
382+
383+
if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
384+
return;
385+
386+
OutputPluginPrepareWrite(ctx, true);
387+
388+
appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
389+
quote_literal_cstr(txn->gid));
390+
391+
if (data->include_xids)
392+
appendStringInfo(ctx->out, ", txid %u", txn->xid);
393+
394+
if (data->include_timestamp)
395+
appendStringInfo(ctx->out, " (at %s)",
396+
timestamptz_to_str(txn->commit_time));
397+
398+
OutputPluginWrite(ctx, true);
399+
}
400+
401+
/* COMMIT PREPARED callback */
402+
static void
403+
pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
404+
XLogRecPtr commit_lsn)
405+
{
406+
TestDecodingData *data = ctx->output_plugin_private;
407+
408+
OutputPluginPrepareWrite(ctx, true);
409+
410+
appendStringInfo(ctx->out, "COMMIT PREPARED %s",
411+
quote_literal_cstr(txn->gid));
412+
413+
if (data->include_xids)
414+
appendStringInfo(ctx->out, ", txid %u", txn->xid);
415+
416+
if (data->include_timestamp)
417+
appendStringInfo(ctx->out, " (at %s)",
418+
timestamptz_to_str(txn->commit_time));
419+
420+
OutputPluginWrite(ctx, true);
421+
}
422+
423+
/* ROLLBACK PREPARED callback */
424+
static void
425+
pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
426+
ReorderBufferTXN *txn,
427+
XLogRecPtr prepare_end_lsn,
428+
TimestampTz prepare_time)
429+
{
430+
TestDecodingData *data = ctx->output_plugin_private;
431+
432+
OutputPluginPrepareWrite(ctx, true);
433+
434+
appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
435+
quote_literal_cstr(txn->gid));
436+
437+
if (data->include_xids)
438+
appendStringInfo(ctx->out, ", txid %u", txn->xid);
439+
440+
if (data->include_timestamp)
441+
appendStringInfo(ctx->out, " (at %s)",
442+
timestamptz_to_str(txn->commit_time));
443+
444+
OutputPluginWrite(ctx, true);
445+
}
446+
447+
/*
448+
* Filter out two-phase transactions.
449+
*
450+
* Each plugin can implement its own filtering logic. Here we demonstrate a
451+
* simple logic by checking the GID. If the GID contains the "_nodecode"
452+
* substring, then we filter it out.
453+
*/
454+
static bool
455+
pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
456+
{
457+
if (strstr(gid, "_nodecode") != NULL)
458+
return true;
459+
460+
return false;
461+
}
462+
323463
static bool
324464
pg_decode_filter(LogicalDecodingContext *ctx,
325465
RepOriginId origin_id)
@@ -701,6 +841,33 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
701841
OutputPluginWrite(ctx, true);
702842
}
703843

844+
static void
845+
pg_decode_stream_prepare(LogicalDecodingContext *ctx,
846+
ReorderBufferTXN *txn,
847+
XLogRecPtr prepare_lsn)
848+
{
849+
TestDecodingData *data = ctx->output_plugin_private;
850+
TestDecodingTxnData *txndata = txn->output_plugin_private;
851+
852+
if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
853+
return;
854+
855+
OutputPluginPrepareWrite(ctx, true);
856+
857+
if (data->include_xids)
858+
appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
859+
quote_literal_cstr(txn->gid), txn->xid);
860+
else
861+
appendStringInfo(ctx->out, "preparing streamed transaction %s",
862+
quote_literal_cstr(txn->gid));
863+
864+
if (data->include_timestamp)
865+
appendStringInfo(ctx->out, " (at %s)",
866+
timestamptz_to_str(txn->commit_time));
867+
868+
OutputPluginWrite(ctx, true);
869+
}
870+
704871
static void
705872
pg_decode_stream_commit(LogicalDecodingContext *ctx,
706873
ReorderBufferTXN *txn,

0 commit comments

Comments
 (0)