Skip to content

Commit aaf9e95

Browse files
author
Amit Kapila
committed
Fix xmin advancement during fast_forward decoding.
During logical decoding, we advance catalog_xmin of logical too early in fast_forward mode, resulting in required catalog data being removed by vacuum. This mode is normally used to advance the slot without processing the changes, but we still can't let the slot's xmin to advance to an incorrect value. Commit f49a80c fixed a similar issue where the logical slot's catalog_xmin was getting advanced prematurely during non-fast-forward mode. During xl_running_xacts processing, instead of directly advancing the slot's xmin to the oldest running xid in the record, it allowed the xmin to be held back for snapshots that can be used for not-yet-replayed transactions, as those might consider older txns as running too. However, it missed the fact that the same problem can happen during fast_forward mode decoding, as we won't build a base snapshot in that mode, and the future call to get_changes from the same slot can miss seeing the required catalog changes leading to incorrect reslts. This commit allows building the base snapshot even in fast_forward mode to prevent the early advancement of xmin. Reported-by: Amit Kapila <amit.kapila16@gmail.com> Author: Zhijie Hou <houzj.fnst@fujitsu.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Reviewed-by: shveta malik <shveta.malik@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Backpatch-through: 13 Discussion: https://postgr.es/m/CAA4eK1LqWncUOqKijiafe+Ypt1gQAQRjctKLMY953J79xDBgAg@mail.gmail.com Discussion: https://postgr.es/m/OS0PR01MB57163087F86621D44D9A72BF94BB2@OS0PR01MB5716.jpnprd01.prod.outlook.com
1 parent b225c5e commit aaf9e95

File tree

3 files changed

+71
-12
lines changed

3 files changed

+71
-12
lines changed

contrib/test_decoding/expected/oldest_xmin.out

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,44 @@ COMMIT
3838
stop
3939
(1 row)
4040

41+
42+
starting permutation: s0_begin s0_getxid s1_begin s1_insert s0_alter s0_commit s0_checkpoint s0_advance_slot s0_advance_slot s1_commit s0_vacuum s0_get_changes
43+
step s0_begin: BEGIN;
44+
step s0_getxid: SELECT pg_current_xact_id() IS NULL;
45+
?column?
46+
--------
47+
f
48+
(1 row)
49+
50+
step s1_begin: BEGIN;
51+
step s1_insert: INSERT INTO harvest VALUES ((1, 2, 3));
52+
step s0_alter: ALTER TYPE basket DROP ATTRIBUTE mangos;
53+
step s0_commit: COMMIT;
54+
step s0_checkpoint: CHECKPOINT;
55+
step s0_advance_slot: SELECT slot_name FROM pg_replication_slot_advance('isolation_slot', pg_current_wal_lsn());
56+
slot_name
57+
--------------
58+
isolation_slot
59+
(1 row)
60+
61+
step s0_advance_slot: SELECT slot_name FROM pg_replication_slot_advance('isolation_slot', pg_current_wal_lsn());
62+
slot_name
63+
--------------
64+
isolation_slot
65+
(1 row)
66+
67+
step s1_commit: COMMIT;
68+
step s0_vacuum: VACUUM pg_attribute;
69+
step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
70+
data
71+
------------------------------------------------------
72+
BEGIN
73+
table public.harvest: INSERT: fruits[basket]:'(1,2,3)'
74+
COMMIT
75+
(3 rows)
76+
77+
?column?
78+
--------
79+
stop
80+
(1 row)
81+

contrib/test_decoding/specs/oldest_xmin.spec

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ step "s0_commit" { COMMIT; }
2525
step "s0_checkpoint" { CHECKPOINT; }
2626
step "s0_vacuum" { VACUUM pg_attribute; }
2727
step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
28+
step "s0_advance_slot" { SELECT slot_name FROM pg_replication_slot_advance('isolation_slot', pg_current_wal_lsn()); }
2829

2930
session "s1"
3031
setup { SET synchronous_commit=on; }
@@ -40,3 +41,7 @@ step "s1_commit" { COMMIT; }
4041
# will be removed (xmax set) before T1 commits. That is, interlocking doesn't
4142
# forbid modifying catalog after someone read it (and didn't commit yet).
4243
permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" "s0_commit" "s0_checkpoint" "s0_get_changes" "s0_get_changes" "s1_commit" "s0_vacuum" "s0_get_changes"
44+
45+
# Perform the same testing process as described above, but use advance_slot to
46+
# forces xmin advancement during fast forward decoding.
47+
permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" "s0_commit" "s0_checkpoint" "s0_advance_slot" "s0_advance_slot" "s1_commit" "s0_vacuum" "s0_get_changes"

src/backend/replication/logical/decode.c

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -412,19 +412,24 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
412412

413413
/*
414414
* If we don't have snapshot or we are just fast-forwarding, there is no
415-
* point in decoding changes.
415+
* point in decoding data changes. However, it's crucial to build the base
416+
* snapshot during fast-forward mode (as is done in
417+
* SnapBuildProcessChange()) because we require the snapshot's xmin when
418+
* determining the candidate catalog_xmin for the replication slot. See
419+
* SnapBuildProcessRunningXacts().
416420
*/
417-
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
418-
ctx->fast_forward)
421+
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
419422
return;
420423

421424
switch (info)
422425
{
423426
case XLOG_HEAP2_MULTI_INSERT:
424-
if (SnapBuildProcessChange(builder, xid, buf->origptr))
427+
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
428+
!ctx->fast_forward)
425429
DecodeMultiInsert(ctx, buf);
426430
break;
427431
case XLOG_HEAP2_NEW_CID:
432+
if (!ctx->fast_forward)
428433
{
429434
xl_heap_new_cid *xlrec;
430435

@@ -471,16 +476,20 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
471476

472477
/*
473478
* If we don't have snapshot or we are just fast-forwarding, there is no
474-
* point in decoding data changes.
479+
* point in decoding data changes. However, it's crucial to build the base
480+
* snapshot during fast-forward mode (as is done in
481+
* SnapBuildProcessChange()) because we require the snapshot's xmin when
482+
* determining the candidate catalog_xmin for the replication slot. See
483+
* SnapBuildProcessRunningXacts().
475484
*/
476-
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
477-
ctx->fast_forward)
485+
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
478486
return;
479487

480488
switch (info)
481489
{
482490
case XLOG_HEAP_INSERT:
483-
if (SnapBuildProcessChange(builder, xid, buf->origptr))
491+
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
492+
!ctx->fast_forward)
484493
DecodeInsert(ctx, buf);
485494
break;
486495

@@ -491,17 +500,20 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
491500
*/
492501
case XLOG_HEAP_HOT_UPDATE:
493502
case XLOG_HEAP_UPDATE:
494-
if (SnapBuildProcessChange(builder, xid, buf->origptr))
503+
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
504+
!ctx->fast_forward)
495505
DecodeUpdate(ctx, buf);
496506
break;
497507

498508
case XLOG_HEAP_DELETE:
499-
if (SnapBuildProcessChange(builder, xid, buf->origptr))
509+
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
510+
!ctx->fast_forward)
500511
DecodeDelete(ctx, buf);
501512
break;
502513

503514
case XLOG_HEAP_TRUNCATE:
504-
if (SnapBuildProcessChange(builder, xid, buf->origptr))
515+
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
516+
!ctx->fast_forward)
505517
DecodeTruncate(ctx, buf);
506518
break;
507519

@@ -525,7 +537,8 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
525537
break;
526538

527539
case XLOG_HEAP_CONFIRM:
528-
if (SnapBuildProcessChange(builder, xid, buf->origptr))
540+
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
541+
!ctx->fast_forward)
529542
DecodeSpecConfirm(ctx, buf);
530543
break;
531544

0 commit comments

Comments
 (0)