44
44
#include "storage/lmgr.h"
45
45
46
46
#include "utils/builtins.h"
47
+ #include "utils/guc.h"
47
48
#include "utils/lsyscache.h"
48
49
#include "utils/memutils.h"
49
50
#include "utils/syscache.h"
@@ -60,7 +61,7 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
60
61
static void
61
62
parse_subscription_options (List * options , bool * connect , bool * enabled_given ,
62
63
bool * enabled , bool * create_slot , char * * slot_name ,
63
- bool * copy_data )
64
+ bool * copy_data , char * * synchronous_commit )
64
65
{
65
66
ListCell * lc ;
66
67
bool connect_given = false;
@@ -80,6 +81,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
80
81
* slot_name = NULL ;
81
82
if (copy_data )
82
83
* copy_data = true;
84
+ if (synchronous_commit )
85
+ * synchronous_commit = NULL ;
83
86
84
87
/* Parse options */
85
88
foreach (lc , options )
@@ -165,6 +168,21 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
165
168
copy_data_given = true;
166
169
* copy_data = !defGetBoolean (defel );
167
170
}
171
+ else if (strcmp (defel -> defname , "synchronous_commit" ) == 0 &&
172
+ synchronous_commit )
173
+ {
174
+ if (* synchronous_commit )
175
+ ereport (ERROR ,
176
+ (errcode (ERRCODE_SYNTAX_ERROR ),
177
+ errmsg ("conflicting or redundant options" )));
178
+
179
+ * synchronous_commit = defGetString (defel );
180
+
181
+ /* Test if the given value is valid for synchronous_commit GUC. */
182
+ (void ) set_config_option ("synchronous_commit" , * synchronous_commit ,
183
+ PGC_BACKEND , PGC_S_TEST , GUC_ACTION_SET ,
184
+ false, 0 , false);
185
+ }
168
186
else
169
187
elog (ERROR , "unrecognized option: %s" , defel -> defname );
170
188
}
@@ -269,6 +287,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
269
287
bool enabled_given ;
270
288
bool enabled ;
271
289
bool copy_data ;
290
+ char * synchronous_commit ;
272
291
char * conninfo ;
273
292
char * slotname ;
274
293
char originname [NAMEDATALEN ];
@@ -280,7 +299,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
280
299
* Connection and publication should not be specified here.
281
300
*/
282
301
parse_subscription_options (stmt -> options , & connect , & enabled_given ,
283
- & enabled , & create_slot , & slotname , & copy_data );
302
+ & enabled , & create_slot , & slotname , & copy_data ,
303
+ & synchronous_commit );
284
304
285
305
/*
286
306
* Since creating a replication slot is not transactional, rolling back
@@ -311,6 +331,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
311
331
312
332
if (slotname == NULL )
313
333
slotname = stmt -> subname ;
334
+ /* The default for synchronous_commit of subscriptions is off. */
335
+ if (synchronous_commit == NULL )
336
+ synchronous_commit = "off" ;
314
337
315
338
conninfo = stmt -> conninfo ;
316
339
publications = stmt -> publication ;
@@ -334,6 +357,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
334
357
CStringGetTextDatum (conninfo );
335
358
values [Anum_pg_subscription_subslotname - 1 ] =
336
359
DirectFunctionCall1 (namein , CStringGetDatum (slotname ));
360
+ values [Anum_pg_subscription_subsynccommit - 1 ] =
361
+ CStringGetTextDatum (synchronous_commit );
337
362
values [Anum_pg_subscription_subpublications - 1 ] =
338
363
publicationListToArray (publications );
339
364
@@ -582,13 +607,24 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
582
607
case ALTER_SUBSCRIPTION_OPTIONS :
583
608
{
584
609
char * slot_name ;
610
+ char * synchronous_commit ;
585
611
586
612
parse_subscription_options (stmt -> options , NULL , NULL , NULL ,
587
- NULL , & slot_name , NULL );
613
+ NULL , & slot_name , NULL ,
614
+ & synchronous_commit );
588
615
589
- values [Anum_pg_subscription_subslotname - 1 ] =
590
- DirectFunctionCall1 (namein , CStringGetDatum (slot_name ));
591
- replaces [Anum_pg_subscription_subslotname - 1 ] = true;
616
+ if (slot_name )
617
+ {
618
+ values [Anum_pg_subscription_subslotname - 1 ] =
619
+ DirectFunctionCall1 (namein , CStringGetDatum (slot_name ));
620
+ replaces [Anum_pg_subscription_subslotname - 1 ] = true;
621
+ }
622
+ if (synchronous_commit )
623
+ {
624
+ values [Anum_pg_subscription_subsynccommit - 1 ] =
625
+ CStringGetTextDatum (synchronous_commit );
626
+ replaces [Anum_pg_subscription_subsynccommit - 1 ] = true;
627
+ }
592
628
593
629
update_tuple = true;
594
630
break ;
@@ -601,7 +637,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
601
637
602
638
parse_subscription_options (stmt -> options , NULL ,
603
639
& enabled_given , & enabled , NULL ,
604
- NULL , NULL );
640
+ NULL , NULL , NULL );
605
641
Assert (enabled_given );
606
642
607
643
values [Anum_pg_subscription_subenabled - 1 ] =
@@ -626,7 +662,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
626
662
Subscription * sub = GetSubscription (subid , false);
627
663
628
664
parse_subscription_options (stmt -> options , NULL , NULL , NULL ,
629
- NULL , NULL , & copy_data );
665
+ NULL , NULL , & copy_data , NULL );
630
666
631
667
values [Anum_pg_subscription_subpublications - 1 ] =
632
668
publicationListToArray (stmt -> publication );
@@ -652,7 +688,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
652
688
Subscription * sub = GetSubscription (subid , false);
653
689
654
690
parse_subscription_options (stmt -> options , NULL , NULL , NULL ,
655
- NULL , NULL , & copy_data );
691
+ NULL , NULL , & copy_data , NULL );
656
692
657
693
AlterSubscription_refresh (sub , copy_data );
658
694
0 commit comments