69
69
#define SUBOPT_DISABLE_ON_ERR 0x00000400
70
70
#define SUBOPT_PASSWORD_REQUIRED 0x00000800
71
71
#define SUBOPT_RUN_AS_OWNER 0x00001000
72
- #define SUBOPT_LSN 0x00002000
73
- #define SUBOPT_ORIGIN 0x00004000
72
+ #define SUBOPT_FAILOVER 0x00002000
73
+ #define SUBOPT_LSN 0x00004000
74
+ #define SUBOPT_ORIGIN 0x00008000
75
+
74
76
75
77
/* check if the 'val' has 'bits' set */
76
78
#define IsSet (val , bits ) (((val) & (bits)) == (bits))
@@ -95,6 +97,7 @@ typedef struct SubOpts
95
97
bool disableonerr ;
96
98
bool passwordrequired ;
97
99
bool runasowner ;
100
+ bool failover ;
98
101
char * origin ;
99
102
XLogRecPtr lsn ;
100
103
} SubOpts ;
@@ -155,6 +158,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
155
158
opts -> passwordrequired = true;
156
159
if (IsSet (supported_opts , SUBOPT_RUN_AS_OWNER ))
157
160
opts -> runasowner = false;
161
+ if (IsSet (supported_opts , SUBOPT_FAILOVER ))
162
+ opts -> failover = false;
158
163
if (IsSet (supported_opts , SUBOPT_ORIGIN ))
159
164
opts -> origin = pstrdup (LOGICALREP_ORIGIN_ANY );
160
165
@@ -303,6 +308,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
303
308
opts -> specified_opts |= SUBOPT_RUN_AS_OWNER ;
304
309
opts -> runasowner = defGetBoolean (defel );
305
310
}
311
+ else if (IsSet (supported_opts , SUBOPT_FAILOVER ) &&
312
+ strcmp (defel -> defname , "failover" ) == 0 )
313
+ {
314
+ if (IsSet (opts -> specified_opts , SUBOPT_FAILOVER ))
315
+ errorConflictingDefElem (defel , pstate );
316
+
317
+ opts -> specified_opts |= SUBOPT_FAILOVER ;
318
+ opts -> failover = defGetBoolean (defel );
319
+ }
306
320
else if (IsSet (supported_opts , SUBOPT_ORIGIN ) &&
307
321
strcmp (defel -> defname , "origin" ) == 0 )
308
322
{
@@ -388,6 +402,13 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
388
402
errmsg ("%s and %s are mutually exclusive options" ,
389
403
"connect = false" , "copy_data = true" )));
390
404
405
+ if (opts -> failover &&
406
+ IsSet (opts -> specified_opts , SUBOPT_FAILOVER ))
407
+ ereport (ERROR ,
408
+ (errcode (ERRCODE_SYNTAX_ERROR ),
409
+ errmsg ("%s and %s are mutually exclusive options" ,
410
+ "connect = false" , "failover = true" )));
411
+
391
412
/* Change the defaults of other options. */
392
413
opts -> enabled = false;
393
414
opts -> create_slot = false;
@@ -591,7 +612,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
591
612
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
592
613
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
593
614
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
594
- SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN );
615
+ SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN );
595
616
parse_subscription_options (pstate , stmt -> options , supported_opts , & opts );
596
617
597
618
/*
@@ -697,6 +718,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
697
718
values [Anum_pg_subscription_subdisableonerr - 1 ] = BoolGetDatum (opts .disableonerr );
698
719
values [Anum_pg_subscription_subpasswordrequired - 1 ] = BoolGetDatum (opts .passwordrequired );
699
720
values [Anum_pg_subscription_subrunasowner - 1 ] = BoolGetDatum (opts .runasowner );
721
+ values [Anum_pg_subscription_subfailover - 1 ] = BoolGetDatum (opts .failover );
700
722
values [Anum_pg_subscription_subconninfo - 1 ] =
701
723
CStringGetTextDatum (conninfo );
702
724
if (opts .slot_name )
@@ -807,7 +829,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
807
829
twophase_enabled = true;
808
830
809
831
walrcv_create_slot (wrconn , opts .slot_name , false, twophase_enabled ,
810
- false , CRS_NOEXPORT_SNAPSHOT , NULL );
832
+ opts . failover , CRS_NOEXPORT_SNAPSHOT , NULL );
811
833
812
834
if (twophase_enabled )
813
835
UpdateTwoPhaseState (subid , LOGICALREP_TWOPHASE_STATE_ENABLED );
@@ -816,6 +838,24 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
816
838
(errmsg ("created replication slot \"%s\" on publisher" ,
817
839
opts .slot_name )));
818
840
}
841
+
842
+ /*
843
+ * If the slot_name is specified without the create_slot option,
844
+ * it is possible that the user intends to use an existing slot on
845
+ * the publisher, so here we alter the failover property of the
846
+ * slot to match the failover value in subscription.
847
+ *
848
+ * We do not need to change the failover to false if the server
849
+ * does not support failover (e.g. pre-PG17).
850
+ */
851
+ else if (opts .slot_name &&
852
+ (opts .failover || walrcv_server_version (wrconn ) >= 170000 ))
853
+ {
854
+ walrcv_alter_slot (wrconn , opts .slot_name , opts .failover );
855
+ ereport (NOTICE ,
856
+ (errmsg ("changed the failover state of replication slot \"%s\" on publisher to %s" ,
857
+ opts .slot_name , opts .failover ? "true" : "false" )));
858
+ }
819
859
}
820
860
PG_FINALLY ();
821
861
{
@@ -1132,7 +1172,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1132
1172
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
1133
1173
SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
1134
1174
SUBOPT_PASSWORD_REQUIRED |
1135
- SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN );
1175
+ SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
1176
+ SUBOPT_ORIGIN );
1136
1177
1137
1178
parse_subscription_options (pstate , stmt -> options ,
1138
1179
supported_opts , & opts );
@@ -1211,6 +1252,31 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1211
1252
replaces [Anum_pg_subscription_subrunasowner - 1 ] = true;
1212
1253
}
1213
1254
1255
+ if (IsSet (opts .specified_opts , SUBOPT_FAILOVER ))
1256
+ {
1257
+ if (!sub -> slotname )
1258
+ ereport (ERROR ,
1259
+ (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
1260
+ errmsg ("cannot set %s for a subscription that does not have a slot name" ,
1261
+ "failover" )));
1262
+
1263
+ /*
1264
+ * Do not allow changing the failover state if the
1265
+ * subscription is enabled. This is because the failover
1266
+ * state of the slot on the publisher cannot be modified
1267
+ * if the slot is currently acquired by the apply worker.
1268
+ */
1269
+ if (sub -> enabled )
1270
+ ereport (ERROR ,
1271
+ (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
1272
+ errmsg ("cannot set %s for enabled subscription" ,
1273
+ "failover" )));
1274
+
1275
+ values [Anum_pg_subscription_subfailover - 1 ] =
1276
+ BoolGetDatum (opts .failover );
1277
+ replaces [Anum_pg_subscription_subfailover - 1 ] = true;
1278
+ }
1279
+
1214
1280
if (IsSet (opts .specified_opts , SUBOPT_ORIGIN ))
1215
1281
{
1216
1282
values [Anum_pg_subscription_suborigin - 1 ] =
@@ -1453,6 +1519,46 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1453
1519
heap_freetuple (tup );
1454
1520
}
1455
1521
1522
+ /*
1523
+ * Try to acquire the connection necessary for altering slot.
1524
+ *
1525
+ * This has to be at the end because otherwise if there is an error while
1526
+ * doing the database operations we won't be able to rollback altered
1527
+ * slot.
1528
+ */
1529
+ if (replaces [Anum_pg_subscription_subfailover - 1 ])
1530
+ {
1531
+ bool must_use_password ;
1532
+ char * err ;
1533
+ WalReceiverConn * wrconn ;
1534
+
1535
+ /* Load the library providing us libpq calls. */
1536
+ load_file ("libpqwalreceiver" , false);
1537
+
1538
+ /* Try to connect to the publisher. */
1539
+ must_use_password = sub -> passwordrequired && !sub -> ownersuperuser ;
1540
+ wrconn = walrcv_connect (sub -> conninfo , true, must_use_password ,
1541
+ sub -> name , & err );
1542
+ if (!wrconn )
1543
+ ereport (ERROR ,
1544
+ (errcode (ERRCODE_CONNECTION_FAILURE ),
1545
+ errmsg ("could not connect to the publisher: %s" , err )));
1546
+
1547
+ PG_TRY ();
1548
+ {
1549
+ walrcv_alter_slot (wrconn , sub -> slotname , opts .failover );
1550
+
1551
+ ereport (NOTICE ,
1552
+ (errmsg ("changed the failover state of replication slot \"%s\" on publisher to %s" ,
1553
+ sub -> slotname , opts .failover ? "true" : "false" )));
1554
+ }
1555
+ PG_FINALLY ();
1556
+ {
1557
+ walrcv_disconnect (wrconn );
1558
+ }
1559
+ PG_END_TRY ();
1560
+ }
1561
+
1456
1562
table_close (rel , RowExclusiveLock );
1457
1563
1458
1564
ObjectAddressSet (myself , SubscriptionRelationId , subid );
0 commit comments