Skip to content

Commit 76666c3

Browse files
committed
some comments
1 parent 1801ced commit 76666c3

File tree

7 files changed

+76
-21
lines changed

7 files changed

+76
-21
lines changed

contrib/test_decoding/test_decoding.c

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -279,32 +279,48 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
279279
OutputPluginWrite(ctx, true);
280280
}
281281

282+
283+
/* Filter out unnecessary two-phase transactions */
282284
static bool
283285
pg_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
284286
char *gid)
285287
{
286288
TestDecodingData *data = ctx->output_plugin_private;
287289

290+
/* treat all transaction as one-phase */
288291
if (!data->twophase_decoding)
289292
return true;
290293

294+
/*
295+
* Two-phase transactions that accessed catalog require special treatment.
296+
*
297+
* Right now we don't have a save way to decode catalog changes made in
298+
* prepared transaction that was already aborted by the time of decoding.
299+
*
300+
* That kind of problem arises only when we are trying to retrospectively
301+
* decode aborted transactions. If one wants to code distributed commit
302+
* based on prepare decoding then commits/aborts will happend strictly after
303+
* decoding will be completed, so it is safe to skip any checks/locks here.
304+
*/
291305
if (txn->has_catalog_changes)
292306
{
293307
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
294308

295309
if (TransactionIdIsInProgress(txn->xid))
296310
{
297311
/*
298-
* XXX
312+
* For the sake of simplicity we just ignore in-progess transaction
313+
* in this extension, as they may abort during deconing.
299314
*/
300315
LWLockRelease(TwoPhaseStateLock);
301316
return true;
302317
}
303318
else if (TransactionIdDidAbort(txn->xid))
304319
{
305320
/*
306-
* Here we know that it is already aborted and should humble
307-
* ourselves.
321+
* Here we know that it is already aborted and there is no
322+
* mush sence in doing something with this transaction.
323+
* Consequent ABORT PREPARED will be suppressed.
308324
*/
309325
LWLockRelease(TwoPhaseStateLock);
310326
return true;

src/backend/replication/logical/decode.c

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
295295
XLogRecGetData(buf->record), &parsed);
296296

297297
/* does output plugin wants this particular transaction? */
298-
if (ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid,
298+
if (ctx->callbacks.filter_prepare_cb &&
299+
ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid,
299300
parsed.twophase_gid))
300301
{
301302
ReorderBufferProcessXid(reorder, parsed.twophase_xid,
@@ -641,22 +642,23 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
641642
}
642643
}
643644

645+
646+
/*
647+
* Decode PREPARE record. Same logic as in COMMIT, but diffent calls
648+
* to SnapshotBuilder as we need to mark this transaction as commited
649+
* instead of running to properly decode it. When prepared transation
650+
* is decoded we mark it in snapshot as running again.
651+
*/
644652
static void
645653
DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
646654
xl_xact_parsed_prepare *parsed)
647655
{
648-
XLogRecPtr origin_lsn = InvalidXLogRecPtr;
649-
TimestampTz commit_time = 0;
656+
XLogRecPtr origin_lsn = parsed->origin_lsn;
657+
TimestampTz commit_time = parsed->origin_timestamp;
650658
XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
651659
int i;
652660
TransactionId xid = parsed->twophase_xid;
653661

654-
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
655-
{
656-
origin_lsn = parsed->origin_lsn;
657-
commit_time = parsed->origin_timestamp;
658-
}
659-
660662
/*
661663
* Process invalidation messages, even if we're not interested in the
662664
* transaction's contents, since the various caches need to always be

src/backend/replication/logical/logical.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -815,7 +815,7 @@ filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, char *gid
815815
error_context_stack = &errcallback;
816816

817817
/* set output state */
818-
ctx->accept_writes = true; // ->false
818+
ctx->accept_writes = false;
819819

820820
/* do the actual work: call callback */
821821
ret = ctx->callbacks.filter_prepare_cb(ctx, txn, gid);

src/backend/replication/logical/reorderbuffer.c

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1667,17 +1667,24 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn,
16671667
PG_END_TRY();
16681668
}
16691669

1670+
/*
1671+
* Ask output plugin whether we want to skip this PREPARE and send
1672+
* this transaction as one-phase later on commit.
1673+
*/
16701674
bool
16711675
ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, char *gid)
16721676
{
16731677
ReorderBufferTXN *txn;
16741678

16751679
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
1676-
Assert(txn != NULL);
1677-
// ctx->callbacks.filter_prepare_cb
1680+
16781681
return rb->filter_prepare(rb, txn, gid);
16791682
}
16801683

1684+
1685+
/*
1686+
* Commit non-twophase transaction. See comments to ReorderBufferCommitInternal()
1687+
*/
16811688
void
16821689
ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
16831690
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
@@ -1693,6 +1700,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
16931700
commit_time, origin_id, origin_lsn);
16941701
}
16951702

1703+
/*
1704+
* Prepare twophase transaction. It calls ReorderBufferCommitInternal()
1705+
* since all transaction changes should be decoded on PREPARE.
1706+
*/
16961707
void
16971708
ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
16981709
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
@@ -1712,6 +1723,10 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
17121723
commit_time, origin_id, origin_lsn);
17131724
}
17141725

1726+
/*
1727+
* Check whether this transaction was sent as prepared to receiver.
1728+
* Called upon commit/abort prepared.
1729+
*/
17151730
bool
17161731
ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid)
17171732
{
@@ -1720,7 +1735,6 @@ ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid)
17201735
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
17211736
false);
17221737

1723-
/* TXN not found mean that it was send already, isn't it? */
17241738
return txn == NULL ? true : txn->prepared;
17251739
}
17261740

src/backend/replication/logical/snapbuild.c

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1101,18 +1101,39 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
11011101
}
11021102
}
11031103

1104+
/*
1105+
* Just a wrapper to clarify DecodePrepare().
1106+
* Right now we can't extract correct historic catalog data that
1107+
* was produced by aborted prepared transaction, so it work of
1108+
* decoding plugin to avoid such situation and here we just construct usual
1109+
* snapshot to able to decode prepare.
1110+
*/
11041111
void
11051112
SnapBuildPrepareTxnStart(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
11061113
int nsubxacts, TransactionId *subxacts)
11071114
{
11081115
SnapBuildCommitTxn(builder, lsn, xid, nsubxacts, subxacts);
11091116
}
11101117

1118+
1119+
/*
1120+
* When decoding of preppare is finished we want should exclude our xid
1121+
* from list of committed xids to have correct snapshot between prepare
1122+
* and commit.
1123+
*
1124+
* However, this is not sctrictly needed. Prepared transaction holds locks
1125+
* between prepare and commit so nodody can produce new version of our
1126+
* catalog tuples. In case of abort we will have this xid in array of
1127+
* commited xids, but it also will not cause a problem since checks of
1128+
* HeapTupleHeaderXminInvalid() in HeapTupleSatisfiesHistoricMVCC()
1129+
* have higher priority then checks for xip array. Anyway let's be consistent
1130+
* about definitions and delete this xid from xip array.
1131+
*/
11111132
void
11121133
SnapBuildPrepareTxnFinish(SnapBuild *builder, TransactionId xid)
11131134
{
11141135
TransactionId *search = bsearch(&xid, builder->running.xip,
1115-
builder->running.xcnt_space, sizeof(TransactionId), xidComparator);
1136+
builder->running.xcnt, sizeof(TransactionId), xidComparator);
11161137

11171138
if (search == NULL)
11181139
return;
@@ -1121,6 +1142,8 @@ SnapBuildPrepareTxnFinish(SnapBuild *builder, TransactionId xid)
11211142
memmove(search, search + 1,
11221143
((builder->running.xip + builder->running.xcnt - 1) - search) * sizeof(TransactionId));
11231144
builder->running.xcnt--;
1145+
1146+
/* update min/max */
11241147
builder->running.xmin = builder->running.xip[0];
11251148
builder->running.xmax = builder->running.xip[builder->running.xcnt - 1];
11261149
}

src/include/replication/output_plugin.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
7171
* Called before decoding of PREPARE record to decide whether this
7272
* transaction should be decoded with separate calls to prepare
7373
* and commit_prepared/abort_prepared callbacks or wait till COMMIT PREPARED
74-
* and send as usual transaction.
74+
* and sent as usual transaction.
7575
*/
7676
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
7777
ReorderBufferTXN *txn,

src/include/replication/reorderbuffer.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,9 @@ typedef struct ReorderBufferTXN
149149
char gid[GIDSIZE];
150150

151151
/*
152-
* We have ability to treat twophase transaction as ordinary one
153-
* with the help of filter_prepare callback.
154-
* XXX: try to reword that comment
152+
* By using filter_prepare() callback we can force decoding to treat
153+
* two-phase transaction as on ordinary one. This flag is set if we are
154+
* actually called prepape() callback in output plugin.
155155
*/
156156
bool prepared;
157157

0 commit comments

Comments
 (0)