Skip to content

Commit f510c8f

Browse files
committed
Add mm_disable_node function
1 parent 63663e5 commit f510c8f

File tree

2 files changed

+116
-61
lines changed

2 files changed

+116
-61
lines changed

contrib/multimaster/multimaster--1.0.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,7 @@ CREATE FUNCTION mm_stop_replication() RETURNS void
99
AS 'MODULE_PATHNAME','mm_stop_replication'
1010
LANGUAGE C;
1111

12+
CREATE FUNCTION mm_disable_node(node integer) RETURNS void
13+
AS 'MODULE_PATHNAME','mm_disable_node'
14+
LANGUAGE C;
15+

contrib/multimaster/multimaster.c

Lines changed: 112 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ typedef struct
5959
TransactionId minXid; /* XID of oldest transaction visible by any active transaction (local or global) */
6060
TransactionId nextXid; /* next XID for local transaction */
6161
size_t nReservedXids; /* number of XIDs reserved for local transactions */
62+
int64 disabledNodeMask;
6263
int nNodes;
6364
pg_atomic_uint32 nReceivers;
6465
bool initialized;
@@ -74,13 +75,16 @@ typedef struct
7475
#define DTM_SHMEM_SIZE (64*1024*1024)
7576
#define DTM_HASH_SIZE 1003
7677

78+
#define BIT_SET(mask, bit) ((mask) & ((int64)1 << (bit)))
79+
7780
void _PG_init(void);
7881
void _PG_fini(void);
7982

8083
PG_MODULE_MAGIC;
8184

8285
PG_FUNCTION_INFO_V1(mm_start_replication);
8386
PG_FUNCTION_INFO_V1(mm_stop_replication);
87+
PG_FUNCTION_INFO_V1(mm_disable_node);
8488

8589
static Snapshot DtmGetSnapshot(Snapshot snapshot);
8690
static void DtmMergeWithGlobalSnapshot(Snapshot snapshot);
@@ -108,6 +112,7 @@ static void DtmBackgroundWorker(Datum arg);
108112
static void MMMarkTransAsLocal(TransactionId xid);
109113
static BgwPool* MMPoolConstructor(void);
110114
static bool MMRunUtilityStmt(PGconn* conn, char const* sql);
115+
static void MMBroadcastUtilityStmt(char const* sql, bool ignoreError);
111116

112117
static HTAB* xid_in_doubt;
113118
static HTAB* local_trans;
@@ -737,6 +742,7 @@ static void DtmInitialize()
737742
dtm->nReservedXids = 0;
738743
dtm->minXid = InvalidTransactionId;
739744
dtm->nNodes = MMNodes;
745+
dtm->disabledNodeMask = 0;
740746
pg_atomic_write_u32(&dtm->nReceivers, 0);
741747
dtm->initialized = false;
742748
BgwPoolInit(&dtm->pool, MMExecutor, MMDatabaseName, MMQueueSize);
@@ -1209,6 +1215,22 @@ mm_stop_replication(PG_FUNCTION_ARGS)
12091215
PG_RETURN_VOID();
12101216
}
12111217

1218+
Datum
1219+
mm_disable_node(PG_FUNCTION_ARGS)
1220+
{
1221+
int nodeId = PG_GETARG_INT32(0);
1222+
if (!BIT_SET(dtm->disabledNodeMask, nodeId))
1223+
{
1224+
dtm->disabledNodeMask |= ((int64)1 << nodeId);
1225+
dtm->nNodes -= 1;
1226+
if (!IsTransactionBlock())
1227+
{
1228+
MMBroadcastUtilityStmt(psprintf("select mm_disable_node(%d)", nodeId), true);
1229+
}
1230+
}
1231+
PG_RETURN_VOID();
1232+
}
1233+
12121234
/*
12131235
* Execute statement with specified parameters and check its result
12141236
*/
@@ -1224,6 +1246,95 @@ static bool MMRunUtilityStmt(PGconn* conn, char const* sql)
12241246
return ret;
12251247
}
12261248

1249+
static void MMBroadcastUtilityStmt(char const* sql, bool ignoreError)
1250+
{
1251+
char* conn_str = pstrdup(MMConnStrs);
1252+
char* conn_str_end = conn_str + strlen(conn_str);
1253+
int i = 0;
1254+
int64 disabledNodeMask = dtm->disabledNodeMask;
1255+
int failedNode = -1;
1256+
char const* errorMsg = NULL;
1257+
PGconn **conns = palloc0(sizeof(PGconn*)*MMNodes);
1258+
1259+
while (conn_str < conn_str_end)
1260+
{
1261+
char* p = strchr(conn_str, ',');
1262+
if (p == NULL) {
1263+
p = conn_str_end;
1264+
}
1265+
*p = '\0';
1266+
if (!BIT_SET(disabledNodeMask, i))
1267+
{
1268+
conns[i] = PQconnectdb(conn_str);
1269+
if (PQstatus(conns[i]) != CONNECTION_OK)
1270+
{
1271+
if (ignoreError)
1272+
{
1273+
PQfinish(conns[i]);
1274+
conns[i] = NULL;
1275+
} else {
1276+
failedNode = i;
1277+
do {
1278+
PQfinish(conns[i]);
1279+
} while (--i >= 0);
1280+
elog(ERROR, "Failed to establish connection '%s' to node %d", conn_str, failedNode);
1281+
}
1282+
}
1283+
}
1284+
conn_str = p + 1;
1285+
i += 1;
1286+
}
1287+
Assert(i == MMNodes);
1288+
1289+
for (i = 0; i < MMNodes; i++)
1290+
{
1291+
if (conns[i])
1292+
{
1293+
if (!MMRunUtilityStmt(conns[i], "BEGIN TRANSACTION") && !ignoreError)
1294+
{
1295+
errorMsg = "Failed to start transaction at node %d";
1296+
failedNode = i;
1297+
break;
1298+
}
1299+
if (!MMRunUtilityStmt(conns[i], sql) && !ignoreError)
1300+
{
1301+
errorMsg = "Failed to run command at node %d";
1302+
failedNode = i;
1303+
break;
1304+
}
1305+
}
1306+
}
1307+
if (failedNode >= 0 && !ignoreError)
1308+
{
1309+
for (i = 0; i < MMNodes; i++)
1310+
{
1311+
if (conns[i])
1312+
{
1313+
MMRunUtilityStmt(conns[i], "ROLLBACK TRANSACTION");
1314+
}
1315+
}
1316+
} else {
1317+
for (i = 0; i < MMNodes; i++)
1318+
{
1319+
if (conns[i] && !MMRunUtilityStmt(conns[i], "COMMIT TRANSACTION") && !ignoreError)
1320+
{
1321+
errorMsg = "Commit failed at node %d";
1322+
failedNode = i;
1323+
}
1324+
}
1325+
}
1326+
for (i = 0; i < MMNodes; i++)
1327+
{
1328+
if (conns[i])
1329+
{
1330+
PQfinish(conns[i]);
1331+
}
1332+
}
1333+
if (!ignoreError && failedNode >= 0)
1334+
{
1335+
elog(ERROR, errorMsg, failedNode+1);
1336+
}
1337+
}
12271338

12281339
static void MMProcessUtility(Node *parsetree, const char *queryString,
12291340
ProcessUtilityContext context, ParamListInfo params,
@@ -1267,67 +1378,7 @@ static void MMProcessUtility(Node *parsetree, const char *queryString,
12671378
MMIsDistributedTrans = false;
12681379
}
12691380
} else {
1270-
char* conn_str = pstrdup(MMConnStrs);
1271-
char* conn_str_end = conn_str + strlen(conn_str);
1272-
int i = 0;
1273-
int failedNode = -1;
1274-
char const* errorMsg = NULL;
1275-
PGconn **conns;
1276-
conns = palloc(sizeof(PGconn*)*MMNodes);
1277-
1278-
while (conn_str < conn_str_end) {
1279-
char* p = strchr(conn_str, ',');
1280-
if (p == NULL) {
1281-
p = conn_str_end;
1282-
}
1283-
*p = '\0';
1284-
conns[i] = PQconnectdb(conn_str);
1285-
if (PQstatus(conns[i]) != CONNECTION_OK)
1286-
{
1287-
failedNode = i;
1288-
do {
1289-
PQfinish(conns[i]);
1290-
} while (--i >= 0);
1291-
elog(ERROR, "Failed to establish connection '%s' to node %d", conn_str, failedNode);
1292-
}
1293-
conn_str = p + 1;
1294-
i += 1;
1295-
}
1296-
Assert(i == MMNodes);
1297-
1298-
for (i = 0; i < MMNodes; i++) {
1299-
if (!MMRunUtilityStmt(conns[i], "BEGIN TRANSACTION"))
1300-
{
1301-
errorMsg = "Failed to start transaction at node %d";
1302-
failedNode = i;
1303-
break;
1304-
}
1305-
if (!MMRunUtilityStmt(conns[i], queryString))
1306-
{
1307-
errorMsg = "Failed to run command at node %d";
1308-
failedNode = i;
1309-
break;
1310-
}
1311-
}
1312-
if (failedNode >= 0)
1313-
{
1314-
for (i = 0; i < MMNodes; i++) {
1315-
MMRunUtilityStmt(conns[i], "ROLLBACK TRANSACTION");
1316-
}
1317-
} else {
1318-
for (i = 0; i < MMNodes; i++) {
1319-
if (!MMRunUtilityStmt(conns[i], "COMMIT TRANSACTION")) {
1320-
errorMsg = "Commit failed at node %d";
1321-
failedNode = i;
1322-
}
1323-
}
1324-
}
1325-
for (i = 0; i < MMNodes; i++) {
1326-
PQfinish(conns[i]);
1327-
}
1328-
if (failedNode >= 0) {
1329-
elog(ERROR, errorMsg, failedNode+1);
1330-
}
1381+
MMBroadcastUtilityStmt(queryString, false);
13311382
}
13321383
}
13331384
static void

0 commit comments

Comments
 (0)