Skip to content

Commit f64ea6d

Browse files
author
Amit Kapila
committed
Add a xid argument to the filter_prepare callback for output plugins.
Along with gid, this provides a different way to identify the transaction. The users that use xid in some way to prepare the transactions can use it to filter prepare transactions. The later commands COMMIT PREPARED or ROLLBACK PREPARED carries both identifiers, providing an output plugin the choice of what to use. Author: Markus Wanner Reviewed-by: Vignesh C, Amit Kapila Discussion: https://postgr.es/m/ee280000-7355-c4dc-e47b-2436e7be959c@enterprisedb.com
1 parent bc2797e commit f64ea6d

File tree

6 files changed

+38
-21
lines changed

6 files changed

+38
-21
lines changed

contrib/test_decoding/test_decoding.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
7777
bool transactional, const char *prefix,
7878
Size sz, const char *message);
7979
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
80+
TransactionId xid,
8081
const char *gid);
8182
static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
8283
ReorderBufferTXN *txn);
@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
440441
* substring, then we filter it out.
441442
*/
442443
static bool
443-
pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
444+
pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
445+
const char *gid)
444446
{
445447
if (strstr(gid, "_nodecode") != NULL)
446448
return true;

doc/src/sgml/logicaldecoding.sgml

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -794,20 +794,25 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
794794
<command>COMMIT PREPARED</command> time. To signal that
795795
decoding should be skipped, return <literal>true</literal>;
796796
<literal>false</literal> otherwise. When the callback is not
797-
defined, <literal>false</literal> is assumed (i.e. nothing is
798-
filtered).
797+
defined, <literal>false</literal> is assumed (i.e. no filtering, all
798+
transactions using two-phase commit are decoded in two phases as well).
799799
<programlisting>
800800
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
801+
TransactionId xid,
801802
const char *gid);
802803
</programlisting>
803-
The <parameter>ctx</parameter> parameter has the same contents as for the
804-
other callbacks. The <parameter>gid</parameter> is the identifier that later
805-
identifies this transaction for <command>COMMIT PREPARED</command> or
806-
<command>ROLLBACK PREPARED</command>.
804+
The <parameter>ctx</parameter> parameter has the same contents as for
805+
the other callbacks. The parameters <parameter>xid</parameter>
806+
and <parameter>gid</parameter> provide two different ways to identify
807+
the transaction. The later <command>COMMIT PREPARED</command> or
808+
<command>ROLLBACK PREPARED</command> carries both identifiers,
809+
providing an output plugin the choice of what to use.
807810
</para>
808811
<para>
809-
The callback has to provide the same static answer for a given
810-
<parameter>gid</parameter> every time it is called.
812+
The callback may be invoked multiple times per transaction to decode
813+
and must provide the same static answer for a given pair of
814+
<parameter>xid</parameter> and <parameter>gid</parameter> every time
815+
it is called.
811816
</para>
812817
</sect3>
813818

@@ -1219,9 +1224,11 @@ stream_commit_cb(...); &lt;-- commit of the streamed transaction
12191224
</para>
12201225

12211226
<para>
1222-
Optionally the output plugin can specify a name pattern in the
1223-
<function>filter_prepare_cb</function> and transactions with gid containing
1224-
that name pattern will not be decoded as a two-phase commit transaction.
1227+
Optionally the output plugin can define filtering rules via
1228+
<function>filter_prepare_cb</function> to decode only specific transaction
1229+
in two phases. This can be achieved by pattern matching on the
1230+
<parameter>gid</parameter> or via lookups using the
1231+
<parameter>xid</parameter>.
12251232
</para>
12261233

12271234
<para>

src/backend/replication/logical/decode.c

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
8080
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
8181

8282
/* helper functions for decoding transactions */
83-
static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
83+
static inline bool FilterPrepare(LogicalDecodingContext *ctx,
84+
TransactionId xid, const char *gid);
8485
static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
8586
XLogRecordBuffer *buf, Oid dbId,
8687
RepOriginId origin_id);
@@ -271,7 +272,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
271272
* doesn't filter the transaction at prepare time.
272273
*/
273274
if (info == XLOG_XACT_COMMIT_PREPARED)
274-
two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
275+
two_phase = !(FilterPrepare(ctx, xid,
276+
parsed.twophase_gid));
275277

276278
DecodeCommit(ctx, buf, &parsed, xid, two_phase);
277279
break;
@@ -298,7 +300,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
298300
* doesn't filter the transaction at prepare time.
299301
*/
300302
if (info == XLOG_XACT_ABORT_PREPARED)
301-
two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
303+
two_phase = !(FilterPrepare(ctx, xid,
304+
parsed.twophase_gid));
302305

303306
DecodeAbort(ctx, buf, &parsed, xid, two_phase);
304307
break;
@@ -355,7 +358,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
355358
* manner iff output plugin supports two-phase commits and
356359
* doesn't filter the transaction at prepare time.
357360
*/
358-
if (FilterPrepare(ctx, parsed.twophase_gid))
361+
if (FilterPrepare(ctx, parsed.twophase_xid,
362+
parsed.twophase_gid))
359363
{
360364
ReorderBufferProcessXid(reorder, parsed.twophase_xid,
361365
buf->origptr);
@@ -581,7 +585,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
581585
* this transaction as a regular commit later.
582586
*/
583587
static inline bool
584-
FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
588+
FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
589+
const char *gid)
585590
{
586591
/*
587592
* Skip if decoding of two-phase transactions at PREPARE time is not
@@ -599,7 +604,7 @@ FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
599604
if (ctx->callbacks.filter_prepare_cb == NULL)
600605
return false;
601606

602-
return filter_prepare_cb_wrapper(ctx, gid);
607+
return filter_prepare_cb_wrapper(ctx, xid, gid);
603608
}
604609

605610
static inline bool

src/backend/replication/logical/logical.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,7 +1083,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
10831083
}
10841084

10851085
bool
1086-
filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
1086+
filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
1087+
const char *gid)
10871088
{
10881089
LogicalErrorCallbackState state;
10891090
ErrorContextCallback errcallback;
@@ -1104,7 +1105,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
11041105
ctx->accept_writes = false;
11051106

11061107
/* do the actual work: call callback */
1107-
ret = ctx->callbacks.filter_prepare_cb(ctx, gid);
1108+
ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
11081109

11091110
/* Pop the error context stack */
11101111
error_context_stack = errcallback.previous;

src/include/replication/logical.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
125125
XLogRecPtr restart_lsn);
126126
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
127127

128-
extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid);
128+
extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
129+
TransactionId xid, const char *gid);
129130
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
130131
extern void ResetLogicalStreamingState(void);
131132
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);

src/include/replication/output_plugin.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
106106
* and sent as usual transaction.
107107
*/
108108
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
109+
TransactionId xid,
109110
const char *gid);
110111

111112
/*

0 commit comments

Comments
 (0)