Skip to content

Commit 4868c96

Browse files
author
Amit Kapila
committed
Fix slot synchronization for two_phase enabled slots.
The issue is that the transactions prepared before two-phase decoding is enabled can fail to replicate to the subscriber after being committed on a promoted standby following a failover. This is because the two_phase_at field of a slot, which tracks the LSN from which two-phase decoding starts, is not synchronized to standby servers. Without two_phase_at, the logical decoding might incorrectly identify prepared transaction as already replicated to the subscriber after promotion of standby server, causing them to be skipped. To address the issue on HEAD, the two_phase_at field of the slot is exposed by the pg_replication_slots view and allows the slot synchronization to copy this value to the corresponding synced slot on the standby server. This bug is likely to occur if the user toggles the two_phase option to true after initial slot creation. Given that altering the two_phase option of a replication slot is not allowed in PostgreSQL 17, this bug is less likely to occur. We can't change the view/function definition in backbranch so we can't push the same fix but we are brainstorming an appropriate solution for PG17. Author: Zhijie Hou <houzj.fnst@fujitsu.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Discussion: https://postgr.es/m/TYAPR01MB5724CC7C288535BBCEEE65DA94A72@TYAPR01MB5724.jpnprd01.prod.outlook.com
1 parent a7187c3 commit 4868c96

File tree

8 files changed

+112
-14
lines changed

8 files changed

+112
-14
lines changed

doc/src/sgml/system-views.sgml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2854,6 +2854,17 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
28542854
</para></entry>
28552855
</row>
28562856

2857+
<row>
2858+
<entry role="catalog_table_entry"><para role="column_definition">
2859+
<structfield>two_phase_at</structfield> <type>pg_lsn</type>
2860+
</para>
2861+
<para>
2862+
The address (<literal>LSN</literal>) from which the decoding of prepared
2863+
transactions is enabled. <literal>NULL</literal> for logical slots
2864+
where <structfield>two_phase</structfield> is false and for physical slots.
2865+
</para></entry>
2866+
</row>
2867+
28572868
<row>
28582869
<entry role="catalog_table_entry"><para role="column_definition">
28592870
<structfield>inactive_since</structfield> <type>timestamptz</type>

src/backend/catalog/system_views.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,6 +1025,7 @@ CREATE VIEW pg_replication_slots AS
10251025
L.wal_status,
10261026
L.safe_wal_size,
10271027
L.two_phase,
1028+
L.two_phase_at,
10281029
L.inactive_since,
10291030
L.conflicting,
10301031
L.invalidation_reason,

src/backend/replication/logical/slotsync.c

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ typedef struct RemoteSlot
139139
bool failover;
140140
XLogRecPtr restart_lsn;
141141
XLogRecPtr confirmed_lsn;
142+
XLogRecPtr two_phase_at;
142143
TransactionId catalog_xmin;
143144

144145
/* RS_INVAL_NONE if valid, or the reason of invalidation */
@@ -276,7 +277,8 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
276277
if (remote_dbid != slot->data.database ||
277278
remote_slot->two_phase != slot->data.two_phase ||
278279
remote_slot->failover != slot->data.failover ||
279-
strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0)
280+
strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0 ||
281+
remote_slot->two_phase_at != slot->data.two_phase_at)
280282
{
281283
NameData plugin_name;
282284

@@ -287,6 +289,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
287289
slot->data.plugin = plugin_name;
288290
slot->data.database = remote_dbid;
289291
slot->data.two_phase = remote_slot->two_phase;
292+
slot->data.two_phase_at = remote_slot->two_phase_at;
290293
slot->data.failover = remote_slot->failover;
291294
SpinLockRelease(&slot->mutex);
292295

@@ -788,17 +791,17 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
788791
static bool
789792
synchronize_slots(WalReceiverConn *wrconn)
790793
{
791-
#define SLOTSYNC_COLUMN_COUNT 9
794+
#define SLOTSYNC_COLUMN_COUNT 10
792795
Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
793-
LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
796+
LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID};
794797

795798
WalRcvExecResult *res;
796799
TupleTableSlot *tupslot;
797800
List *remote_slot_list = NIL;
798801
bool some_slot_updated = false;
799802
bool started_tx = false;
800803
const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
801-
" restart_lsn, catalog_xmin, two_phase, failover,"
804+
" restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
802805
" database, invalidation_reason"
803806
" FROM pg_catalog.pg_replication_slots"
804807
" WHERE failover and NOT temporary";
@@ -853,6 +856,9 @@ synchronize_slots(WalReceiverConn *wrconn)
853856
&isnull));
854857
Assert(!isnull);
855858

859+
d = slot_getattr(tupslot, ++col, &isnull);
860+
remote_slot->two_phase_at = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
861+
856862
remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
857863
&isnull));
858864
Assert(!isnull);

src/backend/replication/slotfuncs.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
235235
Datum
236236
pg_get_replication_slots(PG_FUNCTION_ARGS)
237237
{
238-
#define PG_GET_REPLICATION_SLOTS_COLS 19
238+
#define PG_GET_REPLICATION_SLOTS_COLS 20
239239
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
240240
XLogRecPtr currlsn;
241241
int slotno;
@@ -406,6 +406,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
406406

407407
values[i++] = BoolGetDatum(slot_contents.data.two_phase);
408408

409+
if (slot_contents.data.two_phase &&
410+
!XLogRecPtrIsInvalid(slot_contents.data.two_phase_at))
411+
values[i++] = LSNGetDatum(slot_contents.data.two_phase_at);
412+
else
413+
nulls[i++] = true;
414+
409415
if (slot_contents.inactive_since > 0)
410416
values[i++] = TimestampTzGetDatum(slot_contents.inactive_since);
411417
else

src/include/catalog/catversion.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,6 @@
5757
*/
5858

5959
/* yyyymmddN */
60-
#define CATALOG_VERSION_NO 202504021
60+
#define CATALOG_VERSION_NO 202504031
6161

6262
#endif

src/include/catalog/pg_proc.dat

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11424,9 +11424,9 @@
1142411424
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
1142511425
proretset => 't', provolatile => 's', prorettype => 'record',
1142611426
proargtypes => '',
11427-
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,timestamptz,bool,text,bool,bool}',
11428-
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
11429-
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,inactive_since,conflicting,invalidation_reason,failover,synced}',
11427+
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,pg_lsn,timestamptz,bool,text,bool,bool}',
11428+
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
11429+
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,two_phase_at,inactive_since,conflicting,invalidation_reason,failover,synced}',
1143011430
prosrc => 'pg_get_replication_slots' },
1143111431
{ oid => '3786', descr => 'set up a logical replication slot',
1143211432
proname => 'pg_create_logical_replication_slot', provolatile => 'v',

src/test/recovery/t/040_standby_failover_slots_sync.pl

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222
# Disable autovacuum to avoid generating xid during stats update as otherwise
2323
# the new XID could then be replicated to standby at some random point making
2424
# slots at primary lag behind standby during slot sync.
25-
$publisher->append_conf('postgresql.conf', 'autovacuum = off');
25+
$publisher->append_conf(
26+
'postgresql.conf', qq{
27+
autovacuum = off
28+
max_prepared_transactions = 1
29+
});
2630
$publisher->start;
2731

2832
$publisher->safe_psql('postgres',
@@ -33,6 +37,7 @@
3337
# Create a subscriber node, wait for sync to complete
3438
my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
3539
$subscriber1->init;
40+
$subscriber1->append_conf('postgresql.conf', 'max_prepared_transactions = 1');
3641
$subscriber1->start;
3742

3843
# Capture the time before the logical failover slot is created on the
@@ -830,13 +835,72 @@
830835
"'sb1_slot'");
831836
$primary->reload;
832837

838+
##################################################
839+
# Test the synchronization of the two_phase setting for a subscription with the
840+
# standby. Additionally, prepare a transaction before enabling the two_phase
841+
# option; subsequent tests will verify if it can be correctly replicated to the
842+
# subscriber after committing it on the promoted standby.
843+
##################################################
844+
845+
$standby1->start;
846+
847+
# Prepare a transaction
848+
$primary->safe_psql(
849+
'postgres', qq[
850+
BEGIN;
851+
INSERT INTO tab_int values(0);
852+
PREPARE TRANSACTION 'test_twophase_slotsync';
853+
]);
854+
855+
$primary->wait_for_replay_catchup($standby1);
856+
$primary->wait_for_catchup('regress_mysub1');
857+
858+
# Disable the subscription to allow changing the two_phase option.
859+
$subscriber1->safe_psql('postgres',
860+
"ALTER SUBSCRIPTION regress_mysub1 DISABLE");
861+
862+
# Wait for the replication slot to become inactive on the publisher
863+
$primary->poll_query_until(
864+
'postgres',
865+
"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'",
866+
1);
867+
868+
# Set two_phase to true and enable the subscription
869+
$subscriber1->safe_psql(
870+
'postgres', qq[
871+
ALTER SUBSCRIPTION regress_mysub1 SET (two_phase = true);
872+
ALTER SUBSCRIPTION regress_mysub1 ENABLE;
873+
]);
874+
875+
$primary->wait_for_catchup('regress_mysub1');
876+
877+
my $two_phase_at = $primary->safe_psql('postgres',
878+
"SELECT two_phase_at from pg_replication_slots WHERE slot_name = 'lsub1_slot';"
879+
);
880+
881+
# Confirm that two_phase setting of lsub1_slot slot is synced to the standby
882+
ok( $standby1->poll_query_until(
883+
'postgres',
884+
"SELECT two_phase AND '$two_phase_at' = two_phase_at from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"
885+
),
886+
'two_phase setting of slot lsub1_slot synced to standby');
887+
888+
# Confirm that the prepared transaction is not yet replicated to the
889+
# subscriber.
890+
$result = $subscriber1->safe_psql('postgres',
891+
"SELECT count(*) = 0 FROM pg_prepared_xacts;");
892+
is($result, 't',
893+
"the prepared transaction is not replicated to the subscriber");
894+
833895
##################################################
834896
# Promote the standby1 to primary. Confirm that:
835897
# a) the slot 'lsub1_slot' and 'snap_test_slot' are retained on the new primary
836898
# b) logical replication for regress_mysub1 is resumed successfully after failover
837-
# c) changes can be consumed from the synced slot 'snap_test_slot'
899+
# c) changes from the transaction prepared 'test_twophase_slotsync' can be
900+
# consumed from the synced slot 'snap_test_slot' once committed on the new
901+
# primary.
902+
# d) changes can be consumed from the synced slot 'snap_test_slot'
838903
##################################################
839-
$standby1->start;
840904
$primary->wait_for_replay_catchup($standby1);
841905

842906
# Capture the time before the standby is promoted
@@ -876,14 +940,23 @@
876940
't',
877941
'synced slot retained on the new primary');
878942

943+
# Commit the prepared transaction
944+
$standby1->safe_psql('postgres',
945+
"COMMIT PREPARED 'test_twophase_slotsync';");
946+
$standby1->wait_for_catchup('regress_mysub1');
947+
948+
# Confirm that the prepared transaction is replicated to the subscriber
949+
is($subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
950+
"11", 'prepared data replicated from the new primary');
951+
879952
# Insert data on the new primary
880953
$standby1->safe_psql('postgres',
881954
"INSERT INTO tab_int SELECT generate_series(11, 20);");
882955
$standby1->wait_for_catchup('regress_mysub1');
883956

884957
# Confirm that data in tab_int replicated on the subscriber
885958
is($subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
886-
"20", 'data replicated from the new primary');
959+
"21", 'data replicated from the new primary');
887960

888961
# Consume the data from the snap_test_slot. The synced slot should reach a
889962
# consistent point by restoring the snapshot at the restart_lsn serialized

src/test/regress/expected/rules.out

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1490,12 +1490,13 @@ pg_replication_slots| SELECT l.slot_name,
14901490
l.wal_status,
14911491
l.safe_wal_size,
14921492
l.two_phase,
1493+
l.two_phase_at,
14931494
l.inactive_since,
14941495
l.conflicting,
14951496
l.invalidation_reason,
14961497
l.failover,
14971498
l.synced
1498-
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, inactive_since, conflicting, invalidation_reason, failover, synced)
1499+
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, two_phase_at, inactive_since, conflicting, invalidation_reason, failover, synced)
14991500
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
15001501
pg_roles| SELECT pg_authid.rolname,
15011502
pg_authid.rolsuper,

0 commit comments

Comments
 (0)