Skip to content

Commit 5c44b4e

Browse files
committed
2 parents 98f9a57 + f3c0e4e commit 5c44b4e

File tree

7 files changed

+167
-54
lines changed

7 files changed

+167
-54
lines changed

contrib/mmts/arbiter.c

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -243,30 +243,34 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
243243
if (!MtmResolveHostByName(host, addrs, &n_addrs)) {
244244
elog(ERROR, "Arbiter failed to resolve host '%s' by name", host);
245245
}
246-
Retry:
247-
sd = socket(AF_INET, SOCK_STREAM, 0);
248-
if (sd < 0) {
249-
elog(ERROR, "Arbiter failed to create socket: %d", errno);
250-
}
246+
247+
Retry:
248+
251249
while (1) {
252250
int rc = -1;
251+
252+
sd = socket(AF_INET, SOCK_STREAM, 0);
253+
if (sd < 0) {
254+
elog(ERROR, "Arbiter failed to create socket: %d", errno);
255+
}
253256
for (i = 0; i < n_addrs; ++i) {
254257
memcpy(&sock_inet.sin_addr, &addrs[i], sizeof sock_inet.sin_addr);
255258
do {
256259
rc = connect(sd, (struct sockaddr*)&sock_inet, sizeof(sock_inet));
257260
} while (rc < 0 && errno == EINTR);
258-
261+
259262
if (rc >= 0 || errno == EINPROGRESS) {
260263
break;
261264
}
262265
}
263266
if (rc < 0) {
264267
if ((errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS) || max_attempts == 0) {
265-
elog(WARNING, "Arbiter failed to connect to %s:%d: %d", host, port, errno);
268+
elog(WARNING, "Arbiter failed to connect to %s:%d: error=%d", host, port, errno);
266269
return -1;
267270
} else {
268271
max_attempts -= 1;
269-
MtmSleep(MtmConnectTimeout);
272+
elog(WARNING, "Arbiter trying to connect to %s:%d: error=%d", host, port, errno);
273+
MtmSleep(5*MtmConnectTimeout);
270274
}
271275
continue;
272276
} else {
@@ -282,7 +286,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
282286
req.hdr.sxid = ShmemVariableCache->nextXid;
283287
req.hdr.csn = MtmGetCurrentTime();
284288
req.hdr.disabledNodeMask = Mtm->disabledNodeMask;
285-
strcpy(req.connStr, Mtm->nodes[MtmNodeId-1].connStr);
289+
strcpy(req.connStr, Mtm->nodes[MtmNodeId-1].con.connStr);
286290
if (!MtmWriteSocket(sd, &req, sizeof req)) {
287291
elog(WARNING, "Arbiter failed to send handshake message to %s:%d: %d", host, port, errno);
288292
close(sd);
@@ -321,7 +325,7 @@ static void MtmOpenConnections()
321325

322326
for (i = 0; i < nNodes; i++) {
323327
if (i+1 != MtmNodeId) {
324-
sockets[i] = MtmConnectSocket(Mtm->nodes[i].hostName, MtmArbiterPort + i + 1, MtmConnectAttempts);
328+
sockets[i] = MtmConnectSocket(Mtm->nodes[i].con.hostName, MtmArbiterPort + i + 1, MtmConnectAttempts);
325329
if (sockets[i] < 0) {
326330
MtmOnNodeDisconnect(i+1);
327331
}
@@ -345,7 +349,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
345349
if (sockets[node] >= 0) {
346350
close(sockets[node]);
347351
}
348-
sockets[node] = MtmConnectSocket(Mtm->nodes[node].hostName, MtmArbiterPort + node + 1, MtmReconnectAttempts);
352+
sockets[node] = MtmConnectSocket(Mtm->nodes[node].con.hostName, MtmArbiterPort + node + 1, MtmReconnectAttempts);
349353
if (sockets[node] < 0) {
350354
MtmOnNodeDisconnect(node+1);
351355
return false;
@@ -385,7 +389,7 @@ static void MtmAcceptOneConnection()
385389
resp.dxid = HANDSHAKE_MAGIC;
386390
resp.sxid = ShmemVariableCache->nextXid;
387391
resp.csn = MtmGetCurrentTime();
388-
MtmUpdateNodeConnStr(req.hdr.node, req.connStr);
392+
MtmUpdateNodeConnectionInfo(&Mtm->nodes[req.hdr.node-1].con, req.connStr);
389393
if (!MtmWriteSocket(fd, &resp, sizeof resp)) {
390394
elog(WARNING, "Arbiter failed to write response for handshake message to node %d", resp.node);
391395
close(fd);

contrib/mmts/multimaster.c

Lines changed: 125 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,12 @@ int MtmConnectAttempts;
166166
int MtmConnectTimeout;
167167
int MtmKeepaliveTimeout;
168168
int MtmReconnectAttempts;
169+
MtmConnectionInfo* MtmConnections;
169170

170171
static char* MtmConnStrs;
171172
static int MtmQueueSize;
172173
static int MtmWorkers;
174+
static int MtmVacuumDelay;
173175
static int MtmMinRecoveryLag;
174176
static int MtmMaxRecoveryLag;
175177

@@ -402,26 +404,90 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
402404
* We collest oldest CSNs from all nodes and choose minimum from them.
403405
* If no such XID can be located, then return previously observed oldest XID
404406
*/
407+
#if 0
405408
static TransactionId
406409
MtmAdjustOldestXid(TransactionId xid)
407410
{
408411
if (TransactionIdIsValid(xid)) {
409412
MtmTransState *ts, *prev = NULL;
410-
413+
csn_t oldestSnapshot = 0;
414+
int i;
415+
411416
MtmLock(LW_EXCLUSIVE);
412-
ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
413-
if (ts != NULL && ts->status == TRANSACTION_STATUS_COMMITTED) { /* committed transactions have same CSNs at all nodes */
414-
csn_t oldestSnapshot;
415-
int i;
417+
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
418+
if (TransactionIdPrecedes(ts->xid, xid)
419+
&& ts->status == TRANSACTION_STATUS_COMMITTED
420+
&& ts->csn > oldestSnapshot)
421+
{
422+
oldestSnapshot = ts->csn;
423+
}
424+
}
425+
Mtm->nodes[MtmNodeId-1].oldestSnapshot = oldestSnapshot;
426+
for (i = 0; i < MtmNodes; i++) {
427+
if (!BIT_CHECK(Mtm->disabledNodeMask, i)
428+
&& Mtm->nodes[i].oldestSnapshot < oldestSnapshot)
429+
{
430+
oldestSnapshot = Mtm->nodes[i].oldestSnapshot;
431+
}
432+
}
433+
oldestSnapshot -= MtmVacuumDelay*USEC;
434+
for (ts = Mtm->transListHead;
435+
ts != NULL
436+
&& ts->csn < oldestSnapshot
437+
&& TransactionIdPrecedes(ts->xid, xid)
438+
&& (ts->status == TRANSACTION_STATUS_COMMITTED ||
439+
ts->status == TRANSACTION_STATUS_ABORTED);
440+
ts = ts->next)
441+
{
442+
if (ts->status == TRANSACTION_STATUS_COMMITTED) {
443+
prev = ts;
444+
}
445+
}
446+
if (prev != NULL) {
447+
for (ts = Mtm->transListHead; ts != prev; ts = ts->next) {
448+
/* Remove information about too old transactions */
449+
Assert(ts->status != TRANSACTION_STATUS_UNKNOWN);
450+
hash_search(MtmXid2State, &ts->xid, HASH_REMOVE, NULL);
451+
}
452+
Mtm->transListHead = prev;
453+
Mtm->oldestXid = xid = prev->xid;
454+
} else if (TransactionIdPrecedes(Mtm->oldestXid, xid)) {
455+
xid = Mtm->oldestXid;
456+
}
457+
MtmUnlock();
458+
}
459+
return xid;
460+
}
461+
#else
462+
static TransactionId
463+
MtmAdjustOldestXid(TransactionId xid)
464+
{
465+
if (TransactionIdIsValid(xid)) {
466+
MtmTransState *ts, *prev = NULL;
467+
int i;
416468

417-
Mtm->nodes[MtmNodeId-1].oldestSnapshot = oldestSnapshot = ts->csn;
469+
MtmLock(LW_EXCLUSIVE);
470+
ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
471+
if (ts != NULL && ts->status == TRANSACTION_STATUS_COMMITTED) {
472+
csn_t oldestSnapshot = ts->csn;
473+
Mtm->nodes[MtmNodeId-1].oldestSnapshot = oldestSnapshot;
418474
for (i = 0; i < MtmNodes; i++) {
419-
if (Mtm->nodes[i].oldestSnapshot < oldestSnapshot) {
475+
if (!BIT_CHECK(Mtm->disabledNodeMask, i)
476+
&& Mtm->nodes[i].oldestSnapshot < oldestSnapshot)
477+
{
420478
oldestSnapshot = Mtm->nodes[i].oldestSnapshot;
421479
}
422480
}
423-
for (ts = Mtm->transListHead; ts != NULL && ts->csn < oldestSnapshot; prev = ts, ts = ts->next) {
424-
Assert(ts->status == TRANSACTION_STATUS_COMMITTED || ts->status == TRANSACTION_STATUS_ABORTED || ts->status == TRANSACTION_STATUS_IN_PROGRESS);
481+
oldestSnapshot -= MtmVacuumDelay*USEC;
482+
483+
for (ts = Mtm->transListHead;
484+
ts != NULL
485+
&& ts->csn < oldestSnapshot
486+
&& TransactionIdPrecedes(ts->xid, xid)
487+
&& (ts->status == TRANSACTION_STATUS_COMMITTED ||
488+
ts->status == TRANSACTION_STATUS_ABORTED);
489+
prev = ts, ts = ts->next)
490+
{
425491
if (prev != NULL) {
426492
/* Remove information about too old transactions */
427493
hash_search(MtmXid2State, &prev->xid, HASH_REMOVE, NULL);
@@ -431,14 +497,14 @@ MtmAdjustOldestXid(TransactionId xid)
431497
if (prev != NULL) {
432498
Mtm->transListHead = prev;
433499
Mtm->oldestXid = xid = prev->xid;
434-
} else {
500+
} else if (TransactionIdPrecedes(Mtm->oldestXid, xid)) {
435501
xid = Mtm->oldestXid;
436502
}
437503
MtmUnlock();
438504
}
439505
return xid;
440506
}
441-
507+
#endif
442508
/*
443509
* -------------------------------------------
444510
* Transaction list manipulation
@@ -989,7 +1055,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
9891055
for (i = 0; i < n; i++) {
9901056
if (i+1 != MtmNodeId) {
9911057
void* data = PaxosGet(psprintf("node-mask-%d", i+1), NULL, NULL, nowait);
992-
matrix[i] = *(nodemask_t*)data;
1058+
matrix[i] = data ? *(nodemask_t*)data : 0;
9931059
} else {
9941060
matrix[i] = Mtm->connectivityMask;
9951061
}
@@ -1153,6 +1219,7 @@ static void MtmInitialize()
11531219
for (i = 0; i < MtmNodes; i++) {
11541220
Mtm->nodes[i].oldestSnapshot = 0;
11551221
Mtm->nodes[i].transDelay = 0;
1222+
Mtm->nodes[i].con = MtmConnections[i];
11561223
}
11571224
PGSemaphoreCreate(&Mtm->votingSemaphore);
11581225
PGSemaphoreReset(&Mtm->votingSemaphore);
@@ -1178,17 +1245,17 @@ MtmShmemStartup(void)
11781245
MtmInitialize();
11791246
}
11801247

1181-
void MtmUpdateNodeConnStr(int nodeId, char const* connStr)
1248+
void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr)
11821249
{
11831250
char const* host;
11841251
char const* end;
11851252
int hostLen;
11861253

11871254
if (strlen(connStr) >= MULTIMASTER_MAX_CONN_STR_SIZE) {
1188-
elog(ERROR, "Too long (%d) connection string '%s' for node %d, limit is %d",
1189-
(int)strlen(connStr), connStr, nodeId, MULTIMASTER_MAX_CONN_STR_SIZE-1);
1255+
elog(ERROR, "Too long (%d) connection string '%s': limit is %d",
1256+
(int)strlen(connStr), connStr, MULTIMASTER_MAX_CONN_STR_SIZE-1);
11901257
}
1191-
strcpy(Mtm->nodes[nodeId-1].connStr, connStr);
1258+
strcpy(conn->connStr, connStr);
11921259

11931260
host = strstr(connStr, "host=");
11941261
if (host == NULL) {
@@ -1198,30 +1265,46 @@ void MtmUpdateNodeConnStr(int nodeId, char const* connStr)
11981265
for (end = host; *end != ' ' && *end != '\0'; end++);
11991266
hostLen = end - host;
12001267
if (hostLen >= MULTIMASTER_MAX_HOST_NAME_SIZE) {
1201-
elog(ERROR, "Too long (%d) host name '%.*s' for node %d, limit is %d",
1202-
hostLen, hostLen, host, nodeId, MULTIMASTER_MAX_HOST_NAME_SIZE-1);
1268+
elog(ERROR, "Too long (%d) host name '%.*s': limit is %d",
1269+
hostLen, hostLen, host, MULTIMASTER_MAX_HOST_NAME_SIZE-1);
12031270
}
1204-
memcpy(Mtm->nodes[nodeId-1].hostName, host, hostLen);
1205-
Mtm->nodes[nodeId-1].hostName[hostLen] = '\0';
1271+
memcpy(conn->hostName, host, hostLen);
1272+
conn->hostName[hostLen] = '\0';
12061273
}
12071274

12081275
static void MtmSplitConnStrs(void)
12091276
{
12101277
int i;
1211-
char* copy = strdup(MtmConnStrs);
1278+
char* copy = pstrdup(MtmConnStrs);
12121279
char* connStr = copy;
12131280
char* connStrEnd = connStr + strlen(connStr);
12141281

1282+
for (i = 0; connStr < connStrEnd; i++) {
1283+
char* p = strchr(connStr, ',');
1284+
if (p == NULL) {
1285+
p = connStrEnd;
1286+
}
1287+
connStr = p + 1;
1288+
}
1289+
if (i > MAX_NODES) {
1290+
elog(ERROR, "Multimaster with more than %d nodes is not currently supported", MAX_NODES);
1291+
}
1292+
if (i < 2) {
1293+
elog(ERROR, "Multimaster should have at least two nodes");
1294+
}
1295+
MtmNodes = i;
1296+
MtmConnections = (MtmConnectionInfo*)palloc(i*sizeof(MtmConnectionInfo));
1297+
connStr = copy;
1298+
12151299
for (i = 0; connStr < connStrEnd; i++) {
12161300
char* p = strchr(connStr, ',');
12171301
if (p == NULL) {
12181302
p = connStrEnd;
12191303
}
1220-
if (i == MAX_NODES) {
1221-
elog(ERROR, "Multimaster with more than %d nodes is not currently supported", MAX_NODES);
1222-
}
12231304
*p = '\0';
1224-
MtmUpdateNodeConnStr(i+1, connStr);
1305+
1306+
MtmUpdateNodeConnectionInfo(&MtmConnections[i], connStr);
1307+
12251308
if (i+1 == MtmNodeId) {
12261309
char* dbName = strstr(connStr, "dbname=");
12271310
char* end;
@@ -1232,20 +1315,13 @@ static void MtmSplitConnStrs(void)
12321315
dbName += 7;
12331316
for (end = dbName; *end != ' ' && *end != '\0'; end++);
12341317
len = end - dbName;
1235-
MtmDatabaseName = (char*)malloc(len + 1);
1318+
MtmDatabaseName = (char*)palloc(len + 1);
12361319
memcpy(MtmDatabaseName, dbName, len);
12371320
MtmDatabaseName[len] = '\0';
12381321
}
12391322
connStr = p + 1;
12401323
}
1241-
free(copy);
1242-
if (i < 2) {
1243-
elog(ERROR, "Multimaster should have at least two nodes");
1244-
}
1245-
MtmNodes = i;
1246-
if (MtmNodeId > MtmNodes) {
1247-
elog(ERROR, "Invalid node id %d for specified nubmer of nodes %d", MtmNodeId, MtmNodes);
1248-
}
1324+
pfree(copy);
12491325
}
12501326

12511327
void
@@ -1309,6 +1385,21 @@ _PG_init(void)
13091385
NULL
13101386
);
13111387

1388+
DefineCustomIntVariable(
1389+
"multimaster.vacuum_delay",
1390+
"Minimal age of records which can be vacuumed (seconds)",
1391+
NULL,
1392+
&MtmVacuumDelay,
1393+
1,
1394+
1,
1395+
INT_MAX,
1396+
PGC_BACKEND,
1397+
0,
1398+
NULL,
1399+
NULL,
1400+
NULL
1401+
);
1402+
13121403
DefineCustomIntVariable(
13131404
"multimaster.queue_size",
13141405
"Multimaster queue size",

contrib/mmts/multimaster.h

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,16 @@ typedef enum
8181

8282
typedef struct
8383
{
84+
char hostName[MULTIMASTER_MAX_HOST_NAME_SIZE];
85+
char connStr[MULTIMASTER_MAX_CONN_STR_SIZE];
86+
} MtmConnectionInfo;
87+
88+
89+
typedef struct
90+
{
91+
MtmConnectionInfo con;
8492
time_t transDelay;
8593
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
86-
char hostName[MULTIMASTER_MAX_HOST_NAME_SIZE];
87-
char connStr[MULTIMASTER_MAX_CONN_STR_SIZE];
8894
} MtmNodeInfo;
8995

9096
typedef struct MtmTransState
@@ -152,6 +158,8 @@ extern int MtmReconnectAttempts;
152158
extern int MtmKeepaliveTimeout;
153159
extern HTAB* MtmXid2State;
154160

161+
extern MtmConnectionInfo* MtmConnections;
162+
155163
extern void MtmArbiterInitialize(void);
156164
extern void MtmStartReceivers(void);
157165
extern csn_t MtmTransactionSnapshot(TransactionId xid);
@@ -183,6 +191,6 @@ extern XidStatus MtmGetGlobalTransactionStatus(char const* gid);
183191
extern bool MtmIsRecoveredNode(int nodeId);
184192
extern void MtmRefreshClusterStatus(bool nowait);
185193
extern void MtmSwitchClusterMode(MtmNodeStatus mode);
186-
extern void MtmUpdateNodeConnStr(int nodeId, char const* connStr);
194+
extern void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr);
187195

188196
#endif

contrib/mmts/pglogical_receiver.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -578,8 +578,8 @@ void MtmStartReceivers(void)
578578

579579
for (i = 0; i < MtmNodes; i++) {
580580
if (i+1 != MtmNodeId) {
581-
ReceiverArgs* ctx = (ReceiverArgs*)malloc(sizeof(ReceiverArgs));
582-
ctx->receiver_conn_string = psprintf("replication=database %s", Mtm->nodes[i].connStr);
581+
ReceiverArgs* ctx = (ReceiverArgs*)palloc(sizeof(ReceiverArgs));
582+
ctx->receiver_conn_string = psprintf("replication=database %s", MtmConnections[i].connStr);
583583
sprintf(ctx->receiver_slot, MULTIMASTER_SLOT_PATTERN, MtmNodeId);
584584
ctx->local_node = MtmNodeId;
585585
ctx->remote_node = i+1;

0 commit comments

Comments
 (0)