Skip to content

Commit 465dd92

Browse files
committed
logical decoding: Tell reorderbuffer about all xids.
Logical decoding's reorderbuffer keeps transactions in an LSN ordered list for efficiency. To make that's efficiently possible upper-level xids are forced to be logged before nested subtransaction xids. That only works though if these records are all looked at: Unfortunately we didn't do so for e.g. row level locks, which are otherwise uninteresting for logical decoding. This could lead to errors like: "ERROR: subxact logged without previous toplevel record". It's not sufficient to just look at row locking records, the xid could appear first due to a lot of other types of records (which will trigger the transaction to be marked logged with MarkCurrentTransactionIdLoggedIfAny). So invent infrastructure to tell reorderbuffer about xids seen, when they'd otherwise not pass through reorderbuffer.c. Reported-By: Jarred Ward Bug: #13844 Discussion: 20160105033249.1087.66040@wrigleys.postgresql.org Backpatch: 9.4, where logical decoding was added
1 parent a9613ee commit 465dd92

File tree

7 files changed

+108
-16
lines changed

7 files changed

+108
-16
lines changed

contrib/test_decoding/Makefile

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ submake-isolation:
3737
submake-test_decoding:
3838
$(MAKE) -C $(top_builddir)/contrib/test_decoding
3939

40-
REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel binary prepared
40+
REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
41+
decoding_into_rel binary prepared
4142

4243
regresscheck: all | submake-regress submake-test_decoding
4344
$(MKDIR_P) regression_output
+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
-- predictability
2+
SET synchronous_commit = on;
3+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
4+
?column?
5+
----------
6+
init
7+
(1 row)
8+
9+
-- bug #13844, xids in non-decoded records need to be inspected
10+
CREATE TABLE xact_test(data text);
11+
INSERT INTO xact_test VALUES ('before-test');
12+
BEGIN;
13+
-- perform operation in xact that creates and logs xid, but isn't decoded
14+
SELECT * FROM xact_test FOR UPDATE;
15+
data
16+
-------------
17+
before-test
18+
(1 row)
19+
20+
SAVEPOINT foo;
21+
-- and now actually insert in subxact, xid is expected to be known
22+
INSERT INTO xact_test VALUES ('after-assignment');
23+
COMMIT;
24+
-- and now show those changes
25+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
26+
data
27+
---------------------------------------------------------------
28+
BEGIN
29+
table public.xact_test: INSERT: data[text]:'before-test'
30+
COMMIT
31+
BEGIN
32+
table public.xact_test: INSERT: data[text]:'after-assignment'
33+
COMMIT
34+
(6 rows)
35+
36+
DROP TABLE xact_test;
37+
SELECT pg_drop_replication_slot('regression_slot');
38+
pg_drop_replication_slot
39+
--------------------------
40+
41+
(1 row)
42+

contrib/test_decoding/sql/xact.sql

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
-- predictability
2+
SET synchronous_commit = on;
3+
4+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
5+
6+
-- bug #13844, xids in non-decoded records need to be inspected
7+
CREATE TABLE xact_test(data text);
8+
INSERT INTO xact_test VALUES ('before-test');
9+
10+
BEGIN;
11+
-- perform operation in xact that creates and logs xid, but isn't decoded
12+
SELECT * FROM xact_test FOR UPDATE;
13+
SAVEPOINT foo;
14+
-- and now actually insert in subxact, xid is expected to be known
15+
INSERT INTO xact_test VALUES ('after-assignment');
16+
COMMIT;
17+
-- and now show those changes
18+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
19+
20+
DROP TABLE xact_test;
21+
22+
SELECT pg_drop_replication_slot('regression_slot');

src/backend/replication/logical/decode.c

+27-1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,14 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
7777
* Take every XLogReadRecord()ed record and perform the actions required to
7878
* decode it using the output plugin already setup in the logical decoding
7979
* context.
80+
*
81+
* NB: Note that every record's xid needs to be processed by reorderbuffer
82+
* (xids contained in the content of records are not relevant for this rule).
83+
* That means that for records which'd otherwise not go through the
84+
* reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
85+
* call ReorderBufferProcessXid for each record type by default, because
86+
* e.g. empty xacts can be handled more efficiently if there's no previous
87+
* state for them.
8088
*/
8189
void
8290
LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record)
@@ -132,6 +140,9 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record)
132140
case RM_GIST_ID:
133141
case RM_SEQ_ID:
134142
case RM_SPGIST_ID:
143+
/* just deal with xid, and done */
144+
ReorderBufferProcessXid(ctx->reorder, record->xl_xid,
145+
buf.origptr);
135146
break;
136147
case RM_NEXT_ID:
137148
elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) buf.record.xl_rmid);
@@ -147,6 +158,9 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
147158
SnapBuild *builder = ctx->snapshot_builder;
148159
uint8 info = buf->record.xl_info & ~XLR_INFO_MASK;
149160

161+
ReorderBufferProcessXid(ctx->reorder, buf->record.xl_xid,
162+
buf->origptr);
163+
150164
switch (info)
151165
{
152166
/* this is also used in END_OF_RECOVERY checkpoints */
@@ -187,7 +201,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
187201
XLogRecord *r = &buf->record;
188202
uint8 info = r->xl_info & ~XLR_INFO_MASK;
189203

190-
/* no point in doing anything yet, data could not be decoded anyway */
204+
/*
205+
* No point in doing anything yet, data could not be decoded anyway. It's
206+
* ok not to call ReorderBufferProcessXid() in that case, except in the
207+
* assignment case there'll not be any later records with the same xid;
208+
* and in the assignment case we'll not decode those xacts.
209+
*/
191210
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
192211
return;
193212

@@ -302,6 +321,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
302321
* transactions in the changestream allowing for a kind of
303322
* distributed 2PC.
304323
*/
324+
ReorderBufferProcessXid(reorder, r->xl_xid, buf->origptr);
305325
break;
306326
default:
307327
elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
@@ -318,6 +338,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
318338
XLogRecord *r = &buf->record;
319339
uint8 info = r->xl_info & ~XLR_INFO_MASK;
320340

341+
ReorderBufferProcessXid(ctx->reorder, r->xl_xid, buf->origptr);
342+
321343
switch (info)
322344
{
323345
case XLOG_RUNNING_XACTS:
@@ -355,6 +377,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
355377
TransactionId xid = buf->record.xl_xid;
356378
SnapBuild *builder = ctx->snapshot_builder;
357379

380+
ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
381+
358382
/* no point in doing anything yet */
359383
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
360384
return;
@@ -408,6 +432,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
408432
TransactionId xid = buf->record.xl_xid;
409433
SnapBuild *builder = ctx->snapshot_builder;
410434

435+
ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
436+
411437
/* no point in doing anything yet */
412438
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
413439
return;

src/backend/replication/logical/reorderbuffer.c

+13-8
Original file line numberDiff line numberDiff line change
@@ -1669,16 +1669,21 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
16691669

16701670

16711671
/*
1672-
* Check whether a transaction is already known in this module.xs
1672+
* Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
1673+
* least once for every xid in XLogRecord->xl_xid (other places in records
1674+
* may, but do not have to be passed through here).
1675+
*
1676+
* Reorderbuffer keeps some datastructures about transactions in LSN order,
1677+
* for efficiency. To do that it has to know about when transactions are seen
1678+
* first in the WAL. As many types of records are not actually interesting for
1679+
* logical decoding, they do not necessarily pass though here.
16731680
*/
1674-
bool
1675-
ReorderBufferIsXidKnown(ReorderBuffer *rb, TransactionId xid)
1681+
void
1682+
ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
16761683
{
1677-
ReorderBufferTXN *txn;
1678-
1679-
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1680-
false);
1681-
return txn != NULL;
1684+
/* many records won't have an xid assigned, centralize check here */
1685+
if (xid != InvalidTransactionId)
1686+
ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
16821687
}
16831688

16841689
/*

src/backend/replication/logical/snapbuild.c

+1-5
Original file line numberDiff line numberDiff line change
@@ -636,8 +636,6 @@ SnapBuildClearExportedSnapshot()
636636
bool
637637
SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
638638
{
639-
bool is_old_tx;
640-
641639
/*
642640
* We can't handle data in transactions if we haven't built a snapshot
643641
* yet, so don't store them.
@@ -658,9 +656,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
658656
* If the reorderbuffer doesn't yet have a snapshot, add one now, it will
659657
* be needed to decode the change we're currently processing.
660658
*/
661-
is_old_tx = ReorderBufferIsXidKnown(builder->reorder, xid);
662-
663-
if (!is_old_tx || !ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
659+
if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
664660
{
665661
/* only build a new snapshot if we don't have a prebuilt one */
666662
if (builder->snapshot == NULL)

src/include/replication/reorderbuffer.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn
352352
CommandId cmin, CommandId cmax, CommandId combocid);
353353
void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
354354
Size nmsgs, SharedInvalidationMessage *msgs);
355-
bool ReorderBufferIsXidKnown(ReorderBuffer *, TransactionId xid);
355+
void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
356356
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
357357
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
358358
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);

0 commit comments

Comments
 (0)