Skip to content

Commit 602a32a

Browse files
author
Amit Kapila
committed
Fix decoding of speculative aborts.
During decoding for speculative inserts, we were relying for cleaning toast hash on confirmation records or next change records. But that could lead to multiple problems (a) memory leak if there is neither a confirmation record nor any other record after toast insertion for a speculative insert in the transaction, (b) error and assertion failures if the next operation is not an insert/update on the same table. The fix is to start queuing spec abort change and clean up toast hash and change record during its processing. Currently, we are queuing the spec aborts for both toast and main table even though we perform cleanup while processing the main table's spec abort record. Later, if we have a way to distinguish between the spec abort record of toast and the main table, we can avoid queuing the change for spec aborts of toast tables. Reported-by: Ashutosh Bapat Author: Dilip Kumar Reviewed-by: Amit Kapila Backpatch-through: 9.6, where it was introduced Discussion: https://postgr.es/m/CAExHW5sPKF-Oovx_qZe4p5oM6Dvof7_P+XgsNAViug15Fm99jA@mail.gmail.com
1 parent 2da5ddf commit 602a32a

File tree

3 files changed

+48
-24
lines changed

3 files changed

+48
-24
lines changed

src/backend/replication/logical/decode.c

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -800,19 +800,17 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
800800
if (target_node.dbNode != ctx->slot->data.database)
801801
return;
802802

803-
/*
804-
* Super deletions are irrelevant for logical decoding, it's driven by the
805-
* confirmation records.
806-
*/
807-
if (xlrec->flags & XLH_DELETE_IS_SUPER)
808-
return;
809-
810803
/* output plugin doesn't look for this origin, no need to queue */
811804
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
812805
return;
813806

814807
change = ReorderBufferGetChange(ctx->reorder);
815-
change->action = REORDER_BUFFER_CHANGE_DELETE;
808+
809+
if (xlrec->flags & XLH_DELETE_IS_SUPER)
810+
change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
811+
else
812+
change->action = REORDER_BUFFER_CHANGE_DELETE;
813+
816814
change->origin_id = XLogRecGetOrigin(r);
817815

818816
memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));

src/backend/replication/logical/reorderbuffer.c

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
397397
txn->invalidations = NULL;
398398
}
399399

400+
/* Reset the toast hash */
401+
ReorderBufferToastReset(rb, txn);
402+
400403
pfree(txn);
401404
}
402405

@@ -467,6 +470,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
467470
}
468471
break;
469472
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
473+
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
470474
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
471475
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
472476
break;
@@ -1677,8 +1681,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
16771681
change_done:
16781682

16791683
/*
1680-
* Either speculative insertion was confirmed, or it was
1681-
* unsuccessful and the record isn't needed anymore.
1684+
* If speculative insertion was confirmed, the record isn't
1685+
* needed anymore.
16821686
*/
16831687
if (specinsert != NULL)
16841688
{
@@ -1720,6 +1724,32 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
17201724
specinsert = change;
17211725
break;
17221726

1727+
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
1728+
1729+
/*
1730+
* Abort for speculative insertion arrived. So cleanup the
1731+
* specinsert tuple and toast hash.
1732+
*
1733+
* Note that we get the spec abort change for each toast
1734+
* entry but we need to perform the cleanup only the first
1735+
* time we get it for the main table.
1736+
*/
1737+
if (specinsert != NULL)
1738+
{
1739+
/*
1740+
* We must clean the toast hash before processing a
1741+
* completely new tuple to avoid confusion about the
1742+
* previous tuple's toast chunks.
1743+
*/
1744+
Assert(change->data.tp.clear_toast_afterwards);
1745+
ReorderBufferToastReset(rb, txn);
1746+
1747+
/* We don't need this record anymore. */
1748+
ReorderBufferReturnChange(rb, specinsert);
1749+
specinsert = NULL;
1750+
}
1751+
break;
1752+
17231753
case REORDER_BUFFER_CHANGE_TRUNCATE:
17241754
{
17251755
int i;
@@ -1827,16 +1857,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
18271857
}
18281858
}
18291859

1830-
/*
1831-
* There's a speculative insertion remaining, just clean in up, it
1832-
* can't have been successful, otherwise we'd gotten a confirmation
1833-
* record.
1834-
*/
1835-
if (specinsert)
1836-
{
1837-
ReorderBufferReturnChange(rb, specinsert);
1838-
specinsert = NULL;
1839-
}
1860+
/* speculative insertion record must be freed by now */
1861+
Assert(!specinsert);
18401862

18411863
/* clean up the iterator */
18421864
ReorderBufferIterTXNFinish(rb, iterstate);
@@ -2640,6 +2662,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
26402662
break;
26412663
}
26422664
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2665+
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
26432666
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
26442667
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
26452668
/* ReorderBufferChange contains everything important */
@@ -2747,6 +2770,7 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
27472770
break;
27482771
}
27492772
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2773+
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
27502774
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
27512775
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
27522776
/* ReorderBufferChange contains everything important */
@@ -3032,6 +3056,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
30323056
break;
30333057
}
30343058
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
3059+
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
30353060
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
30363061
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
30373062
break;

src/include/replication/reorderbuffer.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ typedef struct ReorderBufferTupleBuf
4646
* changes. Users of the decoding facilities will never see changes with
4747
* *_INTERNAL_* actions.
4848
*
49-
* The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM changes concern
50-
* "speculative insertions", and their confirmation respectively. They're
51-
* used by INSERT .. ON CONFLICT .. UPDATE. Users of logical decoding don't
52-
* have to care about these.
49+
* The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM, and INTERNAL_SPEC_ABORT
50+
* changes concern "speculative insertions", their confirmation, and abort
51+
* respectively. They're used by INSERT .. ON CONFLICT .. UPDATE. Users of
52+
* logical decoding don't have to care about these.
5353
*/
5454
enum ReorderBufferChangeType
5555
{
@@ -62,6 +62,7 @@ enum ReorderBufferChangeType
6262
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
6363
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
6464
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
65+
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT,
6566
REORDER_BUFFER_CHANGE_TRUNCATE
6667
};
6768

0 commit comments

Comments
 (0)