@@ -59,6 +59,7 @@ typedef struct
59
59
TransactionId minXid ; /* XID of oldest transaction visible by any active transaction (local or global) */
60
60
TransactionId nextXid ; /* next XID for local transaction */
61
61
size_t nReservedXids ; /* number of XIDs reserved for local transactions */
62
+ int64 disabledNodeMask ;
62
63
int nNodes ;
63
64
pg_atomic_uint32 nReceivers ;
64
65
bool initialized ;
@@ -74,13 +75,16 @@ typedef struct
74
75
#define DTM_SHMEM_SIZE (64*1024*1024)
75
76
#define DTM_HASH_SIZE 1003
76
77
78
+ #define BIT_SET (mask , bit ) ((mask) & ((int64)1 << (bit)))
79
+
77
80
void _PG_init (void );
78
81
void _PG_fini (void );
79
82
80
83
PG_MODULE_MAGIC ;
81
84
82
85
PG_FUNCTION_INFO_V1 (mm_start_replication );
83
86
PG_FUNCTION_INFO_V1 (mm_stop_replication );
87
+ PG_FUNCTION_INFO_V1 (mm_disable_node );
84
88
85
89
static Snapshot DtmGetSnapshot (Snapshot snapshot );
86
90
static void DtmMergeWithGlobalSnapshot (Snapshot snapshot );
@@ -108,6 +112,7 @@ static void DtmBackgroundWorker(Datum arg);
108
112
static void MMMarkTransAsLocal (TransactionId xid );
109
113
static BgwPool * MMPoolConstructor (void );
110
114
static bool MMRunUtilityStmt (PGconn * conn , char const * sql );
115
+ static void MMBroadcastUtilityStmt (char const * sql , bool ignoreError );
111
116
112
117
static HTAB * xid_in_doubt ;
113
118
static HTAB * local_trans ;
@@ -737,6 +742,7 @@ static void DtmInitialize()
737
742
dtm -> nReservedXids = 0 ;
738
743
dtm -> minXid = InvalidTransactionId ;
739
744
dtm -> nNodes = MMNodes ;
745
+ dtm -> disabledNodeMask = 0 ;
740
746
pg_atomic_write_u32 (& dtm -> nReceivers , 0 );
741
747
dtm -> initialized = false;
742
748
BgwPoolInit (& dtm -> pool , MMExecutor , MMDatabaseName , MMQueueSize );
@@ -1209,6 +1215,22 @@ mm_stop_replication(PG_FUNCTION_ARGS)
1209
1215
PG_RETURN_VOID ();
1210
1216
}
1211
1217
1218
+ Datum
1219
+ mm_disable_node (PG_FUNCTION_ARGS )
1220
+ {
1221
+ int nodeId = PG_GETARG_INT32 (0 );
1222
+ if (!BIT_SET (dtm -> disabledNodeMask , nodeId ))
1223
+ {
1224
+ dtm -> disabledNodeMask |= ((int64 )1 << nodeId );
1225
+ dtm -> nNodes -= 1 ;
1226
+ if (!IsTransactionBlock ())
1227
+ {
1228
+ MMBroadcastUtilityStmt (psprintf ("select mm_disable_node(%d)" , nodeId ), true);
1229
+ }
1230
+ }
1231
+ PG_RETURN_VOID ();
1232
+ }
1233
+
1212
1234
/*
1213
1235
* Execute statement with specified parameters and check its result
1214
1236
*/
@@ -1224,6 +1246,95 @@ static bool MMRunUtilityStmt(PGconn* conn, char const* sql)
1224
1246
return ret ;
1225
1247
}
1226
1248
1249
+ static void MMBroadcastUtilityStmt (char const * sql , bool ignoreError )
1250
+ {
1251
+ char * conn_str = pstrdup (MMConnStrs );
1252
+ char * conn_str_end = conn_str + strlen (conn_str );
1253
+ int i = 0 ;
1254
+ int64 disabledNodeMask = dtm -> disabledNodeMask ;
1255
+ int failedNode = -1 ;
1256
+ char const * errorMsg = NULL ;
1257
+ PGconn * * conns = palloc0 (sizeof (PGconn * )* MMNodes );
1258
+
1259
+ while (conn_str < conn_str_end )
1260
+ {
1261
+ char * p = strchr (conn_str , ',' );
1262
+ if (p == NULL ) {
1263
+ p = conn_str_end ;
1264
+ }
1265
+ * p = '\0' ;
1266
+ if (!BIT_SET (disabledNodeMask , i ))
1267
+ {
1268
+ conns [i ] = PQconnectdb (conn_str );
1269
+ if (PQstatus (conns [i ]) != CONNECTION_OK )
1270
+ {
1271
+ if (ignoreError )
1272
+ {
1273
+ PQfinish (conns [i ]);
1274
+ conns [i ] = NULL ;
1275
+ } else {
1276
+ failedNode = i ;
1277
+ do {
1278
+ PQfinish (conns [i ]);
1279
+ } while (-- i >= 0 );
1280
+ elog (ERROR , "Failed to establish connection '%s' to node %d" , conn_str , failedNode );
1281
+ }
1282
+ }
1283
+ }
1284
+ conn_str = p + 1 ;
1285
+ i += 1 ;
1286
+ }
1287
+ Assert (i == MMNodes );
1288
+
1289
+ for (i = 0 ; i < MMNodes ; i ++ )
1290
+ {
1291
+ if (conns [i ])
1292
+ {
1293
+ if (!MMRunUtilityStmt (conns [i ], "BEGIN TRANSACTION" ) && !ignoreError )
1294
+ {
1295
+ errorMsg = "Failed to start transaction at node %d" ;
1296
+ failedNode = i ;
1297
+ break ;
1298
+ }
1299
+ if (!MMRunUtilityStmt (conns [i ], sql ) && !ignoreError )
1300
+ {
1301
+ errorMsg = "Failed to run command at node %d" ;
1302
+ failedNode = i ;
1303
+ break ;
1304
+ }
1305
+ }
1306
+ }
1307
+ if (failedNode >= 0 && !ignoreError )
1308
+ {
1309
+ for (i = 0 ; i < MMNodes ; i ++ )
1310
+ {
1311
+ if (conns [i ])
1312
+ {
1313
+ MMRunUtilityStmt (conns [i ], "ROLLBACK TRANSACTION" );
1314
+ }
1315
+ }
1316
+ } else {
1317
+ for (i = 0 ; i < MMNodes ; i ++ )
1318
+ {
1319
+ if (conns [i ] && !MMRunUtilityStmt (conns [i ], "COMMIT TRANSACTION" ) && !ignoreError )
1320
+ {
1321
+ errorMsg = "Commit failed at node %d" ;
1322
+ failedNode = i ;
1323
+ }
1324
+ }
1325
+ }
1326
+ for (i = 0 ; i < MMNodes ; i ++ )
1327
+ {
1328
+ if (conns [i ])
1329
+ {
1330
+ PQfinish (conns [i ]);
1331
+ }
1332
+ }
1333
+ if (!ignoreError && failedNode >= 0 )
1334
+ {
1335
+ elog (ERROR , errorMsg , failedNode + 1 );
1336
+ }
1337
+ }
1227
1338
1228
1339
static void MMProcessUtility (Node * parsetree , const char * queryString ,
1229
1340
ProcessUtilityContext context , ParamListInfo params ,
@@ -1267,67 +1378,7 @@ static void MMProcessUtility(Node *parsetree, const char *queryString,
1267
1378
MMIsDistributedTrans = false;
1268
1379
}
1269
1380
} else {
1270
- char * conn_str = pstrdup (MMConnStrs );
1271
- char * conn_str_end = conn_str + strlen (conn_str );
1272
- int i = 0 ;
1273
- int failedNode = -1 ;
1274
- char const * errorMsg = NULL ;
1275
- PGconn * * conns ;
1276
- conns = palloc (sizeof (PGconn * )* MMNodes );
1277
-
1278
- while (conn_str < conn_str_end ) {
1279
- char * p = strchr (conn_str , ',' );
1280
- if (p == NULL ) {
1281
- p = conn_str_end ;
1282
- }
1283
- * p = '\0' ;
1284
- conns [i ] = PQconnectdb (conn_str );
1285
- if (PQstatus (conns [i ]) != CONNECTION_OK )
1286
- {
1287
- failedNode = i ;
1288
- do {
1289
- PQfinish (conns [i ]);
1290
- } while (-- i >= 0 );
1291
- elog (ERROR , "Failed to establish connection '%s' to node %d" , conn_str , failedNode );
1292
- }
1293
- conn_str = p + 1 ;
1294
- i += 1 ;
1295
- }
1296
- Assert (i == MMNodes );
1297
-
1298
- for (i = 0 ; i < MMNodes ; i ++ ) {
1299
- if (!MMRunUtilityStmt (conns [i ], "BEGIN TRANSACTION" ))
1300
- {
1301
- errorMsg = "Failed to start transaction at node %d" ;
1302
- failedNode = i ;
1303
- break ;
1304
- }
1305
- if (!MMRunUtilityStmt (conns [i ], queryString ))
1306
- {
1307
- errorMsg = "Failed to run command at node %d" ;
1308
- failedNode = i ;
1309
- break ;
1310
- }
1311
- }
1312
- if (failedNode >= 0 )
1313
- {
1314
- for (i = 0 ; i < MMNodes ; i ++ ) {
1315
- MMRunUtilityStmt (conns [i ], "ROLLBACK TRANSACTION" );
1316
- }
1317
- } else {
1318
- for (i = 0 ; i < MMNodes ; i ++ ) {
1319
- if (!MMRunUtilityStmt (conns [i ], "COMMIT TRANSACTION" )) {
1320
- errorMsg = "Commit failed at node %d" ;
1321
- failedNode = i ;
1322
- }
1323
- }
1324
- }
1325
- for (i = 0 ; i < MMNodes ; i ++ ) {
1326
- PQfinish (conns [i ]);
1327
- }
1328
- if (failedNode >= 0 ) {
1329
- elog (ERROR , errorMsg , failedNode + 1 );
1330
- }
1381
+ MMBroadcastUtilityStmt (queryString , false);
1331
1382
}
1332
1383
}
1333
1384
static void
0 commit comments