62
62
#include "pglogical_output/hooks.h"
63
63
#include "parser/analyze.h"
64
64
#include "parser/parse_relation.h"
65
+ #include "tcop/pquery.h"
65
66
66
67
#include "multimaster.h"
67
68
#include "ddd.h"
@@ -150,7 +151,7 @@ static void MtmShmemStartup(void);
150
151
static BgwPool * MtmPoolConstructor (void );
151
152
static bool MtmRunUtilityStmt (PGconn * conn , char const * sql , char * * errmsg );
152
153
static void MtmBroadcastUtilityStmt (char const * sql , bool ignoreError );
153
- static bool MtmProcessDDLCommand (char const * queryString );
154
+ static bool MtmProcessDDLCommand (char const * queryString , bool transactional );
154
155
155
156
MtmState * Mtm ;
156
157
@@ -3022,7 +3023,7 @@ mtm_drop_node(PG_FUNCTION_ARGS)
3022
3023
Datum
3023
3024
mtm_add_node (PG_FUNCTION_ARGS )
3024
3025
{
3025
- char * connStr = PG_GETARG_CSTRING ( 0 );
3026
+ char * connStr = text_to_cstring ( PG_GETARG_TEXT_PP ( 0 ) );
3026
3027
3027
3028
if (Mtm -> nAllNodes == MtmMaxNodes ) {
3028
3029
elog (ERROR , "Maximal number of nodes %d is reached" , MtmMaxNodes );
@@ -3729,7 +3730,7 @@ static char * MtmGucSerialize(void)
3729
3730
* -------------------------------------------
3730
3731
*/
3731
3732
3732
- static bool MtmProcessDDLCommand (char const * queryString )
3733
+ static bool MtmProcessDDLCommand (char const * queryString , bool transactional )
3733
3734
{
3734
3735
char * queryWithContext ;
3735
3736
char * gucContext ;
@@ -3748,7 +3749,12 @@ static bool MtmProcessDDLCommand(char const* queryString)
3748
3749
}
3749
3750
3750
3751
MTM_LOG1 ("Sending utility: %s" , queryWithContext );
3751
- LogLogicalMessage ("G" , queryWithContext , strlen (queryWithContext )+ 1 , true);
3752
+ if (transactional )
3753
+ /* DDL */
3754
+ LogLogicalMessage ("D" , queryWithContext , strlen (queryWithContext ) + 1 , true);
3755
+ else
3756
+ /* CONCURRENT DDL */
3757
+ LogLogicalMessage ("C" , queryWithContext , strlen (queryWithContext ) + 1 , false);
3752
3758
3753
3759
MtmTx .containsDML = true;
3754
3760
return false;
@@ -3785,17 +3791,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3785
3791
MTM_LOG3 ("%d: Process utility statement %s" , MyProcPid , queryString );
3786
3792
switch (nodeTag (parsetree ))
3787
3793
{
3788
- case T_IndexStmt :
3789
- {
3790
- IndexStmt * stmt = (IndexStmt * ) parsetree ;
3791
- if (stmt -> concurrent ) {
3792
- stmt -> concurrent = false;
3793
- elog (WARNING , "Disable concurrent option for index creation" );
3794
- }
3795
- break ;
3796
- }
3797
-
3798
- case T_TransactionStmt :
3794
+ case T_TransactionStmt :
3799
3795
{
3800
3796
TransactionStmt * stmt = (TransactionStmt * ) parsetree ;
3801
3797
switch (stmt -> kind )
@@ -3893,6 +3889,30 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3893
3889
}
3894
3890
break ;
3895
3891
3892
+ case T_IndexStmt :
3893
+ {
3894
+ IndexStmt * indexStmt = (IndexStmt * ) parsetree ;
3895
+ if (indexStmt -> concurrent && !IsTransactionBlock ())
3896
+ {
3897
+ skipCommand = true;
3898
+ MtmProcessDDLCommand (queryString , false);
3899
+ MtmTx .isDistributed = false;
3900
+ }
3901
+ }
3902
+ break ;
3903
+
3904
+ case T_DropStmt :
3905
+ {
3906
+ DropStmt * stmt = (DropStmt * ) parsetree ;
3907
+ if (stmt -> removeType == OBJECT_INDEX && stmt -> concurrent && !IsTransactionBlock ())
3908
+ {
3909
+ skipCommand = true;
3910
+ MtmProcessDDLCommand (queryString , false);
3911
+ MtmTx .isDistributed = false;
3912
+ }
3913
+ }
3914
+ break ;
3915
+
3896
3916
/* Copy need some special care */
3897
3917
case T_CopyStmt :
3898
3918
{
@@ -3926,13 +3946,15 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3926
3946
if (!skipCommand && (context == PROCESS_UTILITY_TOPLEVEL || MtmUtilityProcessedInXid != GetCurrentTransactionId ()))
3927
3947
MtmUtilityProcessedInXid = InvalidTransactionId ;
3928
3948
3929
- if (context == PROCESS_UTILITY_TOPLEVEL || context == PROCESS_UTILITY_QUERY )
3930
- {
3931
- if (!skipCommand && !MtmTx .isReplicated && (MtmUtilityProcessedInXid == InvalidTransactionId )) {
3932
- MtmUtilityProcessedInXid = GetCurrentTransactionId ();
3933
- MtmProcessDDLCommand (queryString );
3934
- executed = true;
3935
- }
3949
+ if (!skipCommand && !MtmTx .isReplicated && (MtmUtilityProcessedInXid == InvalidTransactionId )) {
3950
+ MtmUtilityProcessedInXid = GetCurrentTransactionId ();
3951
+
3952
+ if (context == PROCESS_UTILITY_TOPLEVEL )
3953
+ MtmProcessDDLCommand (queryString , true);
3954
+ else
3955
+ MtmProcessDDLCommand (ActivePortal -> sourceText , true);
3956
+
3957
+ executed = true;
3936
3958
}
3937
3959
3938
3960
if (PreviousProcessUtilityHook != NULL )
@@ -3945,19 +3967,18 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3945
3967
standard_ProcessUtility (parsetree , queryString , context ,
3946
3968
params , dest , completionTag );
3947
3969
}
3948
-
3949
3970
if (!MtmVolksWagenMode && MtmTx .isDistributed && XactIsoLevel != XACT_REPEATABLE_READ ) {
3950
3971
elog (ERROR , "Isolation level %s is not supported by multimaster" , isoLevelStr [XactIsoLevel ]);
3951
3972
}
3952
-
3973
+
3953
3974
if (MyXactAccessedTempRel )
3954
3975
{
3955
3976
MTM_LOG1 ("Xact accessed temp table, stopping replication" );
3956
3977
MtmTx .isDistributed = false; /* Skip */
3957
3978
MtmTx .snapshot = INVALID_CSN ;
3958
3979
}
3959
3980
3960
- if (executed )
3981
+ if (executed && ! skipCommand )
3961
3982
{
3962
3983
MtmFinishDDLCommand ();
3963
3984
}
0 commit comments