@@ -166,10 +166,12 @@ int MtmConnectAttempts;
166
166
int MtmConnectTimeout ;
167
167
int MtmKeepaliveTimeout ;
168
168
int MtmReconnectAttempts ;
169
+ MtmConnectionInfo * MtmConnections ;
169
170
170
171
static char * MtmConnStrs ;
171
172
static int MtmQueueSize ;
172
173
static int MtmWorkers ;
174
+ static int MtmVacuumDelay ;
173
175
static int MtmMinRecoveryLag ;
174
176
static int MtmMaxRecoveryLag ;
175
177
@@ -402,26 +404,90 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
402
404
* We collest oldest CSNs from all nodes and choose minimum from them.
403
405
* If no such XID can be located, then return previously observed oldest XID
404
406
*/
407
+ #if 0
405
408
static TransactionId
406
409
MtmAdjustOldestXid (TransactionId xid )
407
410
{
408
411
if (TransactionIdIsValid (xid )) {
409
412
MtmTransState * ts , * prev = NULL ;
410
-
413
+ csn_t oldestSnapshot = 0 ;
414
+ int i ;
415
+
411
416
MtmLock (LW_EXCLUSIVE );
412
- ts = (MtmTransState * )hash_search (MtmXid2State , & xid , HASH_FIND , NULL );
413
- if (ts != NULL && ts -> status == TRANSACTION_STATUS_COMMITTED ) { /* committed transactions have same CSNs at all nodes */
414
- csn_t oldestSnapshot ;
415
- int i ;
417
+ for (ts = Mtm -> transListHead ; ts != NULL ; ts = ts -> next ) {
418
+ if (TransactionIdPrecedes (ts -> xid , xid )
419
+ && ts -> status == TRANSACTION_STATUS_COMMITTED
420
+ && ts -> csn > oldestSnapshot )
421
+ {
422
+ oldestSnapshot = ts -> csn ;
423
+ }
424
+ }
425
+ Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot = oldestSnapshot ;
426
+ for (i = 0 ; i < MtmNodes ; i ++ ) {
427
+ if (!BIT_CHECK (Mtm -> disabledNodeMask , i )
428
+ && Mtm -> nodes [i ].oldestSnapshot < oldestSnapshot )
429
+ {
430
+ oldestSnapshot = Mtm -> nodes [i ].oldestSnapshot ;
431
+ }
432
+ }
433
+ oldestSnapshot -= MtmVacuumDelay * USEC ;
434
+ for (ts = Mtm -> transListHead ;
435
+ ts != NULL
436
+ && ts -> csn < oldestSnapshot
437
+ && TransactionIdPrecedes (ts -> xid , xid )
438
+ && (ts -> status == TRANSACTION_STATUS_COMMITTED ||
439
+ ts -> status == TRANSACTION_STATUS_ABORTED );
440
+ ts = ts -> next )
441
+ {
442
+ if (ts -> status == TRANSACTION_STATUS_COMMITTED ) {
443
+ prev = ts ;
444
+ }
445
+ }
446
+ if (prev != NULL ) {
447
+ for (ts = Mtm -> transListHead ; ts != prev ; ts = ts -> next ) {
448
+ /* Remove information about too old transactions */
449
+ Assert (ts -> status != TRANSACTION_STATUS_UNKNOWN );
450
+ hash_search (MtmXid2State , & ts -> xid , HASH_REMOVE , NULL );
451
+ }
452
+ Mtm -> transListHead = prev ;
453
+ Mtm -> oldestXid = xid = prev -> xid ;
454
+ } else if (TransactionIdPrecedes (Mtm -> oldestXid , xid )) {
455
+ xid = Mtm -> oldestXid ;
456
+ }
457
+ MtmUnlock ();
458
+ }
459
+ return xid ;
460
+ }
461
+ #else
462
+ static TransactionId
463
+ MtmAdjustOldestXid (TransactionId xid )
464
+ {
465
+ if (TransactionIdIsValid (xid )) {
466
+ MtmTransState * ts , * prev = NULL ;
467
+ int i ;
416
468
417
- Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot = oldestSnapshot = ts -> csn ;
469
+ MtmLock (LW_EXCLUSIVE );
470
+ ts = (MtmTransState * )hash_search (MtmXid2State , & xid , HASH_FIND , NULL );
471
+ if (ts != NULL && ts -> status == TRANSACTION_STATUS_COMMITTED ) {
472
+ csn_t oldestSnapshot = ts -> csn ;
473
+ Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot = oldestSnapshot ;
418
474
for (i = 0 ; i < MtmNodes ; i ++ ) {
419
- if (Mtm -> nodes [i ].oldestSnapshot < oldestSnapshot ) {
475
+ if (!BIT_CHECK (Mtm -> disabledNodeMask , i )
476
+ && Mtm -> nodes [i ].oldestSnapshot < oldestSnapshot )
477
+ {
420
478
oldestSnapshot = Mtm -> nodes [i ].oldestSnapshot ;
421
479
}
422
480
}
423
- for (ts = Mtm -> transListHead ; ts != NULL && ts -> csn < oldestSnapshot ; prev = ts , ts = ts -> next ) {
424
- Assert (ts -> status == TRANSACTION_STATUS_COMMITTED || ts -> status == TRANSACTION_STATUS_ABORTED || ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
481
+ oldestSnapshot -= MtmVacuumDelay * USEC ;
482
+
483
+ for (ts = Mtm -> transListHead ;
484
+ ts != NULL
485
+ && ts -> csn < oldestSnapshot
486
+ && TransactionIdPrecedes (ts -> xid , xid )
487
+ && (ts -> status == TRANSACTION_STATUS_COMMITTED ||
488
+ ts -> status == TRANSACTION_STATUS_ABORTED );
489
+ prev = ts , ts = ts -> next )
490
+ {
425
491
if (prev != NULL ) {
426
492
/* Remove information about too old transactions */
427
493
hash_search (MtmXid2State , & prev -> xid , HASH_REMOVE , NULL );
@@ -431,14 +497,14 @@ MtmAdjustOldestXid(TransactionId xid)
431
497
if (prev != NULL ) {
432
498
Mtm -> transListHead = prev ;
433
499
Mtm -> oldestXid = xid = prev -> xid ;
434
- } else {
500
+ } else if ( TransactionIdPrecedes ( Mtm -> oldestXid , xid )) {
435
501
xid = Mtm -> oldestXid ;
436
502
}
437
503
MtmUnlock ();
438
504
}
439
505
return xid ;
440
506
}
441
-
507
+ #endif
442
508
/*
443
509
* -------------------------------------------
444
510
* Transaction list manipulation
@@ -989,7 +1055,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
989
1055
for (i = 0 ; i < n ; i ++ ) {
990
1056
if (i + 1 != MtmNodeId ) {
991
1057
void * data = PaxosGet (psprintf ("node-mask-%d" , i + 1 ), NULL , NULL , nowait );
992
- matrix [i ] = * (nodemask_t * )data ;
1058
+ matrix [i ] = data ? * (nodemask_t * )data : 0 ;
993
1059
} else {
994
1060
matrix [i ] = Mtm -> connectivityMask ;
995
1061
}
@@ -1153,6 +1219,7 @@ static void MtmInitialize()
1153
1219
for (i = 0 ; i < MtmNodes ; i ++ ) {
1154
1220
Mtm -> nodes [i ].oldestSnapshot = 0 ;
1155
1221
Mtm -> nodes [i ].transDelay = 0 ;
1222
+ Mtm -> nodes [i ].con = MtmConnections [i ];
1156
1223
}
1157
1224
PGSemaphoreCreate (& Mtm -> votingSemaphore );
1158
1225
PGSemaphoreReset (& Mtm -> votingSemaphore );
@@ -1178,17 +1245,17 @@ MtmShmemStartup(void)
1178
1245
MtmInitialize ();
1179
1246
}
1180
1247
1181
- void MtmUpdateNodeConnStr ( int nodeId , char const * connStr )
1248
+ void MtmUpdateNodeConnectionInfo ( MtmConnectionInfo * conn , char const * connStr )
1182
1249
{
1183
1250
char const * host ;
1184
1251
char const * end ;
1185
1252
int hostLen ;
1186
1253
1187
1254
if (strlen (connStr ) >= MULTIMASTER_MAX_CONN_STR_SIZE ) {
1188
- elog (ERROR , "Too long (%d) connection string '%s' for node %d, limit is %d" ,
1189
- (int )strlen (connStr ), connStr , nodeId , MULTIMASTER_MAX_CONN_STR_SIZE - 1 );
1255
+ elog (ERROR , "Too long (%d) connection string '%s': limit is %d" ,
1256
+ (int )strlen (connStr ), connStr , MULTIMASTER_MAX_CONN_STR_SIZE - 1 );
1190
1257
}
1191
- strcpy (Mtm -> nodes [ nodeId - 1 ]. connStr , connStr );
1258
+ strcpy (conn -> connStr , connStr );
1192
1259
1193
1260
host = strstr (connStr , "host=" );
1194
1261
if (host == NULL ) {
@@ -1198,30 +1265,46 @@ void MtmUpdateNodeConnStr(int nodeId, char const* connStr)
1198
1265
for (end = host ; * end != ' ' && * end != '\0' ; end ++ );
1199
1266
hostLen = end - host ;
1200
1267
if (hostLen >= MULTIMASTER_MAX_HOST_NAME_SIZE ) {
1201
- elog (ERROR , "Too long (%d) host name '%.*s' for node %d, limit is %d" ,
1202
- hostLen , hostLen , host , nodeId , MULTIMASTER_MAX_HOST_NAME_SIZE - 1 );
1268
+ elog (ERROR , "Too long (%d) host name '%.*s': limit is %d" ,
1269
+ hostLen , hostLen , host , MULTIMASTER_MAX_HOST_NAME_SIZE - 1 );
1203
1270
}
1204
- memcpy (Mtm -> nodes [ nodeId - 1 ]. hostName , host , hostLen );
1205
- Mtm -> nodes [ nodeId - 1 ]. hostName [hostLen ] = '\0' ;
1271
+ memcpy (conn -> hostName , host , hostLen );
1272
+ conn -> hostName [hostLen ] = '\0' ;
1206
1273
}
1207
1274
1208
1275
static void MtmSplitConnStrs (void )
1209
1276
{
1210
1277
int i ;
1211
- char * copy = strdup (MtmConnStrs );
1278
+ char * copy = pstrdup (MtmConnStrs );
1212
1279
char * connStr = copy ;
1213
1280
char * connStrEnd = connStr + strlen (connStr );
1214
1281
1282
+ for (i = 0 ; connStr < connStrEnd ; i ++ ) {
1283
+ char * p = strchr (connStr , ',' );
1284
+ if (p == NULL ) {
1285
+ p = connStrEnd ;
1286
+ }
1287
+ connStr = p + 1 ;
1288
+ }
1289
+ if (i > MAX_NODES ) {
1290
+ elog (ERROR , "Multimaster with more than %d nodes is not currently supported" , MAX_NODES );
1291
+ }
1292
+ if (i < 2 ) {
1293
+ elog (ERROR , "Multimaster should have at least two nodes" );
1294
+ }
1295
+ MtmNodes = i ;
1296
+ MtmConnections = (MtmConnectionInfo * )palloc (i * sizeof (MtmConnectionInfo ));
1297
+ connStr = copy ;
1298
+
1215
1299
for (i = 0 ; connStr < connStrEnd ; i ++ ) {
1216
1300
char * p = strchr (connStr , ',' );
1217
1301
if (p == NULL ) {
1218
1302
p = connStrEnd ;
1219
1303
}
1220
- if (i == MAX_NODES ) {
1221
- elog (ERROR , "Multimaster with more than %d nodes is not currently supported" , MAX_NODES );
1222
- }
1223
1304
* p = '\0' ;
1224
- MtmUpdateNodeConnStr (i + 1 , connStr );
1305
+
1306
+ MtmUpdateNodeConnectionInfo (& MtmConnections [i ], connStr );
1307
+
1225
1308
if (i + 1 == MtmNodeId ) {
1226
1309
char * dbName = strstr (connStr , "dbname=" );
1227
1310
char * end ;
@@ -1232,20 +1315,13 @@ static void MtmSplitConnStrs(void)
1232
1315
dbName += 7 ;
1233
1316
for (end = dbName ; * end != ' ' && * end != '\0' ; end ++ );
1234
1317
len = end - dbName ;
1235
- MtmDatabaseName = (char * )malloc (len + 1 );
1318
+ MtmDatabaseName = (char * )palloc (len + 1 );
1236
1319
memcpy (MtmDatabaseName , dbName , len );
1237
1320
MtmDatabaseName [len ] = '\0' ;
1238
1321
}
1239
1322
connStr = p + 1 ;
1240
1323
}
1241
- free (copy );
1242
- if (i < 2 ) {
1243
- elog (ERROR , "Multimaster should have at least two nodes" );
1244
- }
1245
- MtmNodes = i ;
1246
- if (MtmNodeId > MtmNodes ) {
1247
- elog (ERROR , "Invalid node id %d for specified nubmer of nodes %d" , MtmNodeId , MtmNodes );
1248
- }
1324
+ pfree (copy );
1249
1325
}
1250
1326
1251
1327
void
@@ -1309,6 +1385,21 @@ _PG_init(void)
1309
1385
NULL
1310
1386
);
1311
1387
1388
+ DefineCustomIntVariable (
1389
+ "multimaster.vacuum_delay" ,
1390
+ "Minimal age of records which can be vacuumed (seconds)" ,
1391
+ NULL ,
1392
+ & MtmVacuumDelay ,
1393
+ 1 ,
1394
+ 1 ,
1395
+ INT_MAX ,
1396
+ PGC_BACKEND ,
1397
+ 0 ,
1398
+ NULL ,
1399
+ NULL ,
1400
+ NULL
1401
+ );
1402
+
1312
1403
DefineCustomIntVariable (
1313
1404
"multimaster.queue_size" ,
1314
1405
"Multimaster queue size" ,
0 commit comments