Skip to content

Commit 3b94b3a

Browse files
committed
logical decoding: Fix handling of large old tuples with replica identity full.
When decoding the old version of an UPDATE or DELETE change, and if that tuple was bigger than MaxHeapTupleSize, we either Assert'ed out, or failed in more subtle ways in non-assert builds. Normally individual tuples aren't bigger than MaxHeapTupleSize, with big datums toasted. But that's not the case for the old version of a tuple for logical decoding; the replica identity is logged as one piece. With the default replica identity btree limits that to small tuples, but that's not the case for FULL. Change the tuple buffer infrastructure to separate allocate over-large tuples, instead of always going through the slab cache. This unfortunately requires changing the ReorderBufferTupleBuf definition, we need to store the allocated size someplace. To avoid requiring output plugins to recompile, don't store HeapTupleHeaderData directly after HeapTupleData, but point to it via t_data; that leaves rooms for the allocated size. As there's no reason for an output plugin to look at ReorderBufferTupleBuf->t_data.header, remove the field. It was just a minor convenience having it directly accessible. Reported-By: Adam Dratwiński Discussion: CAKg6ypLd7773AOX4DiOGRwQk1TVOQKhNwjYiVjJnpq8Wo+i62Q@mail.gmail.com
1 parent a50f50a commit 3b94b3a

File tree

5 files changed

+236
-72
lines changed

5 files changed

+236
-72
lines changed

contrib/test_decoding/expected/toast.out

+58
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,64 @@ SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot',
285285
COMMIT
286286
(232 rows)
287287

288+
-- test we can decode "old" tuples bigger than the max heap tuple size correctly
289+
DROP TABLE IF EXISTS toasted_several;
290+
NOTICE: table "toasted_several" does not exist, skipping
291+
CREATE TABLE toasted_several (
292+
id serial unique not null,
293+
toasted_key text primary key,
294+
toasted_col1 text,
295+
toasted_col2 text
296+
);
297+
ALTER TABLE toasted_several REPLICA IDENTITY FULL;
298+
ALTER TABLE toasted_several ALTER COLUMN toasted_key SET STORAGE EXTERNAL;
299+
ALTER TABLE toasted_several ALTER COLUMN toasted_col1 SET STORAGE EXTERNAL;
300+
ALTER TABLE toasted_several ALTER COLUMN toasted_col2 SET STORAGE EXTERNAL;
301+
INSERT INTO toasted_several(toasted_key) VALUES(repeat('9876543210', 2000));
302+
SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
303+
regexp_replace
304+
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
305+
BEGIN
306+
table public.toasted_several: INSERT: id[integer]:1 toasted_key[text]:'98765432109876543210987654321..098765432109876543210987654321098765432109876543210' toasted_col1[text]:null toasted_col2[text]:null
307+
COMMIT
308+
(3 rows)
309+
310+
-- test update of a toasted key without changing it
311+
UPDATE toasted_several SET toasted_col1 = toasted_key;
312+
UPDATE toasted_several SET toasted_col2 = toasted_col1;
313+
SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
314+
regexp_replace
315+
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
316+
BEGIN
317+
table public.toasted_several: INSERT: id[integer]:1 toasted_key[text]:'98765432109876543210987654321..098765432109876543210987654321098765432109876543210' toasted_col1[text]:null toasted_col2[text]:null
318+
COMMIT
319+
BEGIN
320+
table public.toasted_several: UPDATE: old-key: id[integer]:1 toasted_key[text]:'98765432109876543210..432109876543210987654321098765432109876543210987654321098765432109876543210' toasted_col2[text]:null
321+
COMMIT
322+
BEGIN
323+
table public.toasted_several: UPDATE: old-key: id[integer]:1 toasted_key[text]:'98765432109876543210..876543210987654321098765432109876543210987654321098765432109876543210987654321098765432109876543210'
324+
COMMIT
325+
(9 rows)
326+
327+
/*
328+
* update with large tuplebuf, in a transaction large enough to force to spool to disk
329+
*/
330+
BEGIN;
331+
INSERT INTO toasted_several(toasted_key) SELECT * FROM generate_series(1, 10234);
332+
UPDATE toasted_several SET toasted_col1 = toasted_col2 WHERE id = 1;
333+
DELETE FROM toasted_several WHERE id = 1;
334+
COMMIT;
335+
DROP TABLE toasted_several;
336+
SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
337+
WHERE data NOT LIKE '%INSERT: %';
338+
regexp_replace
339+
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
340+
BEGIN
341+
table public.toasted_several: UPDATE: old-key: id[integer]:1 toasted_key[text]:'98765432109876543210..7654321098765432109876543210987654321098765432109876543210' toasted_col2[text]:unchanged-toast-datum
342+
table public.toasted_several: DELETE: id[integer]:1 toasted_key[text]:'98765432109876543210987654321..876543210987654321098765432109876543210987654321098765432109876543210987654321098765432109876543210'
343+
COMMIT
344+
(4 rows)
345+
288346
SELECT pg_drop_replication_slot('regression_slot');
289347
pg_drop_replication_slot
290348
--------------------------

contrib/test_decoding/sql/toast.sql

+37
Original file line numberDiff line numberDiff line change
@@ -260,4 +260,41 @@ ALTER TABLE toasted_copy ALTER COLUMN data SET STORAGE EXTERNAL;
260260
203 untoasted200
261261
\.
262262
SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
263+
264+
-- test we can decode "old" tuples bigger than the max heap tuple size correctly
265+
DROP TABLE IF EXISTS toasted_several;
266+
CREATE TABLE toasted_several (
267+
id serial unique not null,
268+
toasted_key text primary key,
269+
toasted_col1 text,
270+
toasted_col2 text
271+
);
272+
ALTER TABLE toasted_several REPLICA IDENTITY FULL;
273+
ALTER TABLE toasted_several ALTER COLUMN toasted_key SET STORAGE EXTERNAL;
274+
ALTER TABLE toasted_several ALTER COLUMN toasted_col1 SET STORAGE EXTERNAL;
275+
ALTER TABLE toasted_several ALTER COLUMN toasted_col2 SET STORAGE EXTERNAL;
276+
277+
INSERT INTO toasted_several(toasted_key) VALUES(repeat('9876543210', 2000));
278+
279+
SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
280+
281+
-- test update of a toasted key without changing it
282+
UPDATE toasted_several SET toasted_col1 = toasted_key;
283+
UPDATE toasted_several SET toasted_col2 = toasted_col1;
284+
285+
SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
286+
287+
/*
288+
* update with large tuplebuf, in a transaction large enough to force to spool to disk
289+
*/
290+
BEGIN;
291+
INSERT INTO toasted_several(toasted_key) SELECT * FROM generate_series(1, 10234);
292+
UPDATE toasted_several SET toasted_col1 = toasted_col2 WHERE id = 1;
293+
DELETE FROM toasted_several WHERE id = 1;
294+
COMMIT;
295+
296+
DROP TABLE toasted_several;
297+
298+
SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
299+
WHERE data NOT LIKE '%INSERT: %';
263300
SELECT pg_drop_replication_slot('regression_slot');

src/backend/replication/logical/decode.c

+48-34
Original file line numberDiff line numberDiff line change
@@ -625,13 +625,15 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
625625

626626
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
627627
{
628+
Size tuplelen = r->xl_len - SizeOfHeapInsert;
629+
628630
Assert(r->xl_len > (SizeOfHeapInsert + SizeOfHeapHeader));
629631

630-
change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
632+
change->data.tp.newtuple =
633+
ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
631634

632635
DecodeXLogTuple((char *) xlrec + SizeOfHeapInsert,
633-
r->xl_len - SizeOfHeapInsert,
634-
change->data.tp.newtuple);
636+
tuplelen, change->data.tp.newtuple);
635637
}
636638

637639
change->data.tp.clear_toast_afterwards = true;
@@ -650,7 +652,6 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
650652
{
651653
XLogRecord *r = &buf->record;
652654
xl_heap_update *xlrec;
653-
xl_heap_header_len xlhdr;
654655
ReorderBufferChange *change;
655656
char *data;
656657

@@ -669,16 +670,20 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
669670

670671
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
671672
{
673+
Size tuplelen;
674+
xl_heap_header_len xlhdr;
675+
672676
Assert(r->xl_len > (SizeOfHeapUpdate + SizeOfHeapHeaderLen));
673677

674678
memcpy(&xlhdr, data, sizeof(xlhdr));
675679
data += offsetof(xl_heap_header_len, header);
676680

677-
change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
681+
tuplelen = xlhdr.t_len + SizeOfHeapHeader;
678682

679-
DecodeXLogTuple(data,
680-
xlhdr.t_len + SizeOfHeapHeader,
681-
change->data.tp.newtuple);
683+
change->data.tp.newtuple =
684+
ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
685+
686+
DecodeXLogTuple(data, tuplelen, change->data.tp.newtuple);
682687
/* skip over the rest of the tuple header */
683688
data += SizeOfHeapHeader;
684689
/* skip over the tuple data */
@@ -687,14 +692,18 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
687692

688693
if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
689694
{
695+
Size tuplelen;
696+
xl_heap_header_len xlhdr;
697+
690698
memcpy(&xlhdr, data, sizeof(xlhdr));
691699
data += offsetof(xl_heap_header_len, header);
692700

693-
change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
701+
tuplelen = xlhdr.t_len + SizeOfHeapHeader;
694702

695-
DecodeXLogTuple(data,
696-
xlhdr.t_len + SizeOfHeapHeader,
697-
change->data.tp.oldtuple);
703+
change->data.tp.oldtuple =
704+
ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
705+
706+
DecodeXLogTuple(data, tuplelen, change->data.tp.oldtuple);
698707
#ifdef NOT_USED
699708
data += SizeOfHeapHeader;
700709
data += xlhdr.t_len;
@@ -732,13 +741,15 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
732741
/* old primary key stored */
733742
if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
734743
{
744+
Size len = r->xl_len - SizeOfHeapDelete;
745+
735746
Assert(r->xl_len > (SizeOfHeapDelete + SizeOfHeapHeader));
736747

737-
change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
748+
change->data.tp.oldtuple =
749+
ReorderBufferGetTupleBuf(ctx->reorder, len);
738750

739751
DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
740-
r->xl_len - SizeOfHeapDelete,
741-
change->data.tp.oldtuple);
752+
len, change->data.tp.oldtuple);
742753
}
743754

744755
change->data.tp.clear_toast_afterwards = true;
@@ -795,37 +806,40 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
795806
*/
796807
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
797808
{
798-
change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
809+
HeapTupleHeader header;
810+
811+
xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
812+
data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
813+
datalen = xlhdr->datalen;
814+
815+
change->data.tp.newtuple =
816+
ReorderBufferGetTupleBuf(ctx->reorder, datalen);
799817

800818
tuple = change->data.tp.newtuple;
819+
header = tuple->tuple.t_data;
801820

802821
/* not a disk based tuple */
803822
ItemPointerSetInvalid(&tuple->tuple.t_self);
804823

805-
xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
806-
data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
807-
datalen = xlhdr->datalen;
808-
809824
/*
810825
* We can only figure this out after reassembling the
811826
* transactions.
812827
*/
813828
tuple->tuple.t_tableOid = InvalidOid;
814-
tuple->tuple.t_data = &tuple->header;
829+
815830
tuple->tuple.t_len = datalen
816831
+ offsetof(HeapTupleHeaderData, t_bits);
817832

818-
memset(&tuple->header, 0, sizeof(HeapTupleHeaderData));
833+
memset(header, 0, offsetof(HeapTupleHeaderData, t_bits));
819834

820-
memcpy((char *) &tuple->header
821-
+ offsetof(HeapTupleHeaderData, t_bits),
835+
memcpy((char *) tuple->tuple.t_data + offsetof(HeapTupleHeaderData, t_bits),
822836
(char *) data,
823837
datalen);
824838
data += datalen;
825839

826-
tuple->header.t_infomask = xlhdr->t_infomask;
827-
tuple->header.t_infomask2 = xlhdr->t_infomask2;
828-
tuple->header.t_hoff = xlhdr->t_hoff;
840+
header->t_infomask = xlhdr->t_infomask;
841+
header->t_infomask2 = xlhdr->t_infomask2;
842+
header->t_hoff = xlhdr->t_hoff;
829843
}
830844

831845
/*
@@ -856,31 +870,31 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
856870
{
857871
xl_heap_header xlhdr;
858872
int datalen = len - SizeOfHeapHeader;
873+
HeapTupleHeader header;
859874

860875
Assert(datalen >= 0);
861-
Assert(datalen <= MaxHeapTupleSize);
862876

863877
tuple->tuple.t_len = datalen + offsetof(HeapTupleHeaderData, t_bits);
878+
header = tuple->tuple.t_data;
864879

865880
/* not a disk based tuple */
866881
ItemPointerSetInvalid(&tuple->tuple.t_self);
867882

868883
/* we can only figure this out after reassembling the transactions */
869884
tuple->tuple.t_tableOid = InvalidOid;
870-
tuple->tuple.t_data = &tuple->header;
871885

872886
/* data is not stored aligned, copy to aligned storage */
873887
memcpy((char *) &xlhdr,
874888
data,
875889
SizeOfHeapHeader);
876890

877-
memset(&tuple->header, 0, sizeof(HeapTupleHeaderData));
891+
memset(header, 0, offsetof(HeapTupleHeaderData, t_bits));
878892

879-
memcpy((char *) &tuple->header + offsetof(HeapTupleHeaderData, t_bits),
893+
memcpy(((char *) tuple->tuple.t_data) + offsetof(HeapTupleHeaderData, t_bits),
880894
data + SizeOfHeapHeader,
881895
datalen);
882896

883-
tuple->header.t_infomask = xlhdr.t_infomask;
884-
tuple->header.t_infomask2 = xlhdr.t_infomask2;
885-
tuple->header.t_hoff = xlhdr.t_hoff;
897+
header->t_infomask = xlhdr.t_infomask;
898+
header->t_infomask2 = xlhdr.t_infomask2;
899+
header->t_hoff = xlhdr.t_hoff;
886900
}

0 commit comments

Comments
 (0)