Skip to content

Commit 77c26d0

Browse files
committed
kinda new callbacks interface
1 parent d5fe0f3 commit 77c26d0

File tree

8 files changed

+261
-12
lines changed

8 files changed

+261
-12
lines changed

contrib/test_decoding/sql/prepared.sql

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,9 @@ PREPARE TRANSACTION 'test_prepared#3';
3232
-- with ddl exists.
3333

3434
-- separate table because of the lock from the ALTER
35-
-- this will come before the '5' row above, as this commits before it.
3635
INSERT INTO test_prepared2 VALUES (7);
3736

38-
COMMIT PREPARED 'test_prepared#3';
37+
ROLLBACK PREPARED 'test_prepared#3';
3938

4039
-- make sure stuff still works
4140
INSERT INTO test_prepared1 VALUES (8);

contrib/test_decoding/test_decoding.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,13 +239,13 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
239239
appendStringInfoString(ctx->out, "COMMIT");
240240
break;
241241
case XLOG_XACT_PREPARE:
242-
appendStringInfoString(ctx->out, "PREPARE");
242+
appendStringInfo(ctx->out, "PREPARE '%s'", txn->gid);
243243
break;
244244
case XLOG_XACT_COMMIT_PREPARED:
245-
appendStringInfoString(ctx->out, "COMMIT PREPARED");
245+
appendStringInfo(ctx->out, "COMMIT PREPARED '%s'", txn->gid);
246246
break;
247247
case XLOG_XACT_ABORT_PREPARED:
248-
appendStringInfoString(ctx->out, "ABORT PREPARED");
248+
appendStringInfo(ctx->out, "ABORT PREPARED '%s'", txn->gid);
249249
break;
250250
}
251251

src/backend/replication/logical/decode.c

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -282,12 +282,31 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
282282
break;
283283
}
284284
case XLOG_XACT_PREPARE:
285-
{
286-
xl_xact_parsed_prepare parsed;
287-
ParsePrepareRecord(XLogRecGetInfo(buf->record),
288-
XLogRecGetData(buf->record), &parsed);
289-
DecodePrepare(ctx, buf, &parsed);
290-
break;
285+
{
286+
xl_xact_parsed_prepare parsed;
287+
288+
/* check that output plugin capable of twophase decoding */
289+
if (!ctx->twophase_hadling)
290+
{
291+
ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
292+
break;
293+
}
294+
295+
/* ok, parse it */
296+
ParsePrepareRecord(XLogRecGetInfo(buf->record),
297+
XLogRecGetData(buf->record), &parsed);
298+
299+
/* does output plugin wants this particular transaction? */
300+
if (ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid,
301+
parsed.twophase_gid))
302+
{
303+
ReorderBufferProcessXid(reorder, parsed.twophase_xid,
304+
buf->origptr);
305+
break;
306+
}
307+
308+
DecodePrepare(ctx, buf, &parsed);
309+
break;
291310
}
292311
default:
293312
elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);

src/backend/replication/logical/logical.c

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@ static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions
5858
bool is_init);
5959
static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
6060
static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
61+
static bool filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
62+
char *gid);
63+
static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
64+
XLogRecPtr prepare_lsn);
65+
static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
66+
XLogRecPtr commit_lsn);
67+
static void abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
68+
XLogRecPtr abort_lsn);
6169
static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
6270
XLogRecPtr commit_lsn);
6371
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -122,6 +130,7 @@ StartupDecodingContext(List *output_plugin_options,
122130
MemoryContext context,
123131
old_context;
124132
LogicalDecodingContext *ctx;
133+
int twophase_callbacks;
125134

126135
/* shorter lines... */
127136
slot = MyReplicationSlot;
@@ -179,8 +188,25 @@ StartupDecodingContext(List *output_plugin_options,
179188
ctx->reorder->begin = begin_cb_wrapper;
180189
ctx->reorder->apply_change = change_cb_wrapper;
181190
ctx->reorder->commit = commit_cb_wrapper;
191+
ctx->reorder->filter_prepare = filter_prepare_cb_wrapper;
192+
ctx->reorder->prepare = prepare_cb_wrapper;
193+
ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
194+
ctx->reorder->abort_prepared = abort_prepared_cb_wrapper;
182195
ctx->reorder->message = message_cb_wrapper;
183196

197+
/* check that plugin implements all necessary callbacks to perform 2PC */
198+
twophase_callbacks = (ctx->callbacks.prepare_cb != NULL) +
199+
(ctx->callbacks.commit_prepared_cb != NULL) +
200+
(ctx->callbacks.abort_prepared_cb != NULL);
201+
202+
ctx->twophase_hadling = (twophase_callbacks == 3);
203+
204+
if (twophase_callbacks != 3 && twophase_callbacks != 0)
205+
ereport(WARNING,
206+
(errmsg("Output plugin registered only %d twophase callbacks out of 3. "
207+
"Twophase transactions will be decoded as ordinary ones.",
208+
twophase_callbacks)));
209+
184210
ctx->out = makeStringInfo();
185211
ctx->prepare_write = prepare_write;
186212
ctx->write = do_write;
@@ -649,6 +675,93 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
649675
error_context_stack = errcallback.previous;
650676
}
651677

678+
static void
679+
prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
680+
XLogRecPtr prepare_lsn)
681+
{
682+
LogicalDecodingContext *ctx = cache->private_data;
683+
LogicalErrorCallbackState state;
684+
ErrorContextCallback errcallback;
685+
686+
/* Push callback + info on the error context stack */
687+
state.ctx = ctx;
688+
state.callback_name = "prepare";
689+
state.report_location = txn->final_lsn; /* beginning of commit record */
690+
errcallback.callback = output_plugin_error_callback;
691+
errcallback.arg = (void *) &state;
692+
errcallback.previous = error_context_stack;
693+
error_context_stack = &errcallback;
694+
695+
/* set output state */
696+
ctx->accept_writes = true;
697+
ctx->write_xid = txn->xid;
698+
ctx->write_location = txn->end_lsn; /* points to the end of the record */
699+
700+
/* do the actual work: call callback */
701+
ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
702+
703+
/* Pop the error context stack */
704+
error_context_stack = errcallback.previous;
705+
}
706+
707+
static void
708+
commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
709+
XLogRecPtr commit_lsn)
710+
{
711+
LogicalDecodingContext *ctx = cache->private_data;
712+
LogicalErrorCallbackState state;
713+
ErrorContextCallback errcallback;
714+
715+
/* Push callback + info on the error context stack */
716+
state.ctx = ctx;
717+
state.callback_name = "commit_prepared";
718+
state.report_location = txn->final_lsn; /* beginning of commit record */
719+
errcallback.callback = output_plugin_error_callback;
720+
errcallback.arg = (void *) &state;
721+
errcallback.previous = error_context_stack;
722+
error_context_stack = &errcallback;
723+
724+
/* set output state */
725+
ctx->accept_writes = true;
726+
ctx->write_xid = txn->xid;
727+
ctx->write_location = txn->end_lsn; /* points to the end of the record */
728+
729+
/* do the actual work: call callback */
730+
ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
731+
732+
/* Pop the error context stack */
733+
error_context_stack = errcallback.previous;
734+
}
735+
736+
static void
737+
abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
738+
XLogRecPtr abort_lsn)
739+
{
740+
LogicalDecodingContext *ctx = cache->private_data;
741+
LogicalErrorCallbackState state;
742+
ErrorContextCallback errcallback;
743+
744+
/* Push callback + info on the error context stack */
745+
state.ctx = ctx;
746+
state.callback_name = "abort_prepared";
747+
state.report_location = txn->final_lsn; /* beginning of commit record */
748+
errcallback.callback = output_plugin_error_callback;
749+
errcallback.arg = (void *) &state;
750+
errcallback.previous = error_context_stack;
751+
error_context_stack = &errcallback;
752+
753+
/* set output state */
754+
ctx->accept_writes = true;
755+
ctx->write_xid = txn->xid;
756+
ctx->write_location = txn->end_lsn; /* points to the end of the record */
757+
758+
/* do the actual work: call callback */
759+
ctx->callbacks.abort_prepared_cb(ctx, txn, abort_lsn);
760+
761+
/* Pop the error context stack */
762+
error_context_stack = errcallback.previous;
763+
}
764+
652765
static void
653766
change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
654767
Relation relation, ReorderBufferChange *change)
@@ -684,6 +797,34 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
684797
error_context_stack = errcallback.previous;
685798
}
686799

800+
static bool
801+
filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, char *gid)
802+
{
803+
LogicalDecodingContext *ctx = cache->private_data;
804+
LogicalErrorCallbackState state;
805+
ErrorContextCallback errcallback;
806+
bool ret;
807+
808+
/* Push callback + info on the error context stack */
809+
state.ctx = ctx;
810+
state.callback_name = "filter_prepare";
811+
state.report_location = InvalidXLogRecPtr;
812+
errcallback.callback = output_plugin_error_callback;
813+
errcallback.arg = (void *) &state;
814+
errcallback.previous = error_context_stack;
815+
error_context_stack = &errcallback;
816+
817+
// /* set output state */
818+
// ctx->accept_writes = false;
819+
820+
/* do the actual work: call callback */
821+
ret = ctx->callbacks.filter_prepare_cb(ctx, txn, gid);
822+
823+
/* Pop the error context stack */
824+
error_context_stack = errcallback.previous;
825+
return ret;
826+
}
827+
687828
bool
688829
filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
689830
{

src/backend/replication/logical/reorderbuffer.c

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1669,6 +1669,27 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
16691669
PG_END_TRY();
16701670
}
16711671

1672+
bool
1673+
ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, char *gid)
1674+
{
1675+
ReorderBufferTXN *txn;
1676+
1677+
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
1678+
Assert(txn != NULL);
1679+
return rb->filter_prepare(rb, txn, gid);
1680+
}
1681+
1682+
void
1683+
ReorderBufferStartPrepare()
1684+
{
1685+
1686+
}
1687+
1688+
void
1689+
ReorderBufferFinishPrepare()
1690+
{
1691+
1692+
}
16721693

16731694
/*
16741695
* Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED.

src/include/replication/logical.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ typedef struct LogicalDecodingContext
7373
bool prepared_write;
7474
XLogRecPtr write_location;
7575
TransactionId write_xid;
76+
77+
/*
78+
* Capabilities of decoding plugin used.
79+
*/
80+
bool twophase_hadling;
7681
} LogicalDecodingContext;
7782

7883
extern void CheckLogicalDecodingRequirements(void);
@@ -98,5 +103,4 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
98103
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
99104

100105
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
101-
102106
#endif

src/include/replication/output_plugin.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,38 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
6767
ReorderBufferTXN *txn,
6868
XLogRecPtr commit_lsn);
6969

70+
/*
71+
* Called before decoding of PREPARE record to decide whether this
72+
* transaction should be decoded with separate calls to prepare
73+
* and commit_prepared/abort_prepared callbacks or wait till COMMIT PREPARED
74+
* and send as usual transaction.
75+
*/
76+
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
77+
ReorderBufferTXN *txn,
78+
char *gid);
79+
80+
/*
81+
* Called for PREPARE record unless it was filtered by filter_prepare()
82+
* callback.
83+
*/
84+
typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
85+
ReorderBufferTXN *txn,
86+
XLogRecPtr prepare_lsn);
87+
88+
/*
89+
* Called for COMMIT PREPARED.
90+
*/
91+
typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
92+
ReorderBufferTXN *txn,
93+
XLogRecPtr commit_lsn);
94+
95+
/*
96+
* Called for ROLLBACK PREPARED.
97+
*/
98+
typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx,
99+
ReorderBufferTXN *txn,
100+
XLogRecPtr abort_lsn);
101+
70102
/*
71103
* Called for the generic logical decoding messages.
72104
*/
@@ -98,6 +130,10 @@ typedef struct OutputPluginCallbacks
98130
LogicalDecodeBeginCB begin_cb;
99131
LogicalDecodeChangeCB change_cb;
100132
LogicalDecodeCommitCB commit_cb;
133+
LogicalDecodeFilterPrepareCB filter_prepare_cb;
134+
LogicalDecodePrepareCB prepare_cb;
135+
LogicalDecodeCommitPreparedCB commit_prepared_cb;
136+
LogicalDecodeAbortPreparedCB abort_prepared_cb;
101137
LogicalDecodeMessageCB message_cb;
102138
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
103139
LogicalDecodeShutdownCB shutdown_cb;

src/include/replication/reorderbuffer.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,29 @@ typedef void (*ReorderBufferCommitCB) (
292292
ReorderBufferTXN *txn,
293293
XLogRecPtr commit_lsn);
294294

295+
typedef bool (*ReorderBufferFilterPrepareCB) (
296+
ReorderBuffer *rb,
297+
ReorderBufferTXN *txn,
298+
char *gid);
299+
300+
/* prepare callback signature */
301+
typedef void (*ReorderBufferPrepareCB) (
302+
ReorderBuffer *rb,
303+
ReorderBufferTXN *txn,
304+
XLogRecPtr prepare_lsn);
305+
306+
/* commit prepared callback signature */
307+
typedef void (*ReorderBufferCommitPreparedCB) (
308+
ReorderBuffer *rb,
309+
ReorderBufferTXN *txn,
310+
XLogRecPtr commit_lsn);
311+
312+
/* abort prepared callback signature */
313+
typedef void (*ReorderBufferAbortPreparedCB) (
314+
ReorderBuffer *rb,
315+
ReorderBufferTXN *txn,
316+
XLogRecPtr abort_lsn);
317+
295318
/* message callback signature */
296319
typedef void (*ReorderBufferMessageCB) (
297320
ReorderBuffer *rb,
@@ -331,6 +354,10 @@ struct ReorderBuffer
331354
ReorderBufferBeginCB begin;
332355
ReorderBufferApplyChangeCB apply_change;
333356
ReorderBufferCommitCB commit;
357+
ReorderBufferFilterPrepareCB filter_prepare;
358+
ReorderBufferPrepareCB prepare;
359+
ReorderBufferCommitPreparedCB commit_prepared;
360+
ReorderBufferAbortPreparedCB abort_prepared;
334361
ReorderBufferMessageCB message;
335362

336363
/*
@@ -413,6 +440,8 @@ void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog
413440
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
414441
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
415442

443+
bool ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, char *gid);
444+
416445
ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
417446

418447
void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);

0 commit comments

Comments
 (0)