57
57
58
58
#include "multimaster.h"
59
59
#include "ddd.h"
60
- #include "paxos .h"
60
+ #include "raftable .h"
61
61
62
62
typedef struct {
63
63
TransactionId xid ; /* local transaction ID */
@@ -178,6 +178,7 @@ static int MtmWorkers;
178
178
static int MtmVacuumDelay ;
179
179
static int MtmMinRecoveryLag ;
180
180
static int MtmMaxRecoveryLag ;
181
+ static bool MtmUseRaftable ;
181
182
182
183
static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
183
184
static ProcessUtility_hook_type PreviousProcessUtilityHook ;
@@ -1102,7 +1103,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
1102
1103
int i , j , n = MtmNodes ;
1103
1104
for (i = 0 ; i < n ; i ++ ) {
1104
1105
if (i + 1 != MtmNodeId ) {
1105
- void * data = PaxosGet (psprintf ("node-mask-%d" , i + 1 ), NULL , NULL , nowait );
1106
+ void * data = RaftableGet (psprintf ("node-mask-%d" , i + 1 ), NULL , NULL , nowait );
1106
1107
if (data == NULL ) {
1107
1108
return false;
1108
1109
}
@@ -1132,7 +1133,7 @@ bool MtmRefreshClusterStatus(bool nowait)
1132
1133
int clique_size ;
1133
1134
int i ;
1134
1135
1135
- if (!MtmBuildConnectivityMatrix (matrix , nowait )) {
1136
+ if (!MtmUseRaftable || ! MtmBuildConnectivityMatrix (matrix , nowait )) {
1136
1137
/* RAFT is not available */
1137
1138
return false;
1138
1139
}
@@ -1192,7 +1193,7 @@ void MtmCheckQuorum(void)
1192
1193
void MtmOnNodeDisconnect (int nodeId )
1193
1194
{
1194
1195
BIT_SET (Mtm -> connectivityMask , nodeId - 1 );
1195
- PaxosSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
1196
+ RaftableSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
1196
1197
1197
1198
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
1198
1199
MtmSleep (MtmKeepaliveTimeout );
@@ -1211,52 +1212,9 @@ void MtmOnNodeDisconnect(int nodeId)
1211
1212
void MtmOnNodeConnect (int nodeId )
1212
1213
{
1213
1214
BIT_CLEAR (Mtm -> connectivityMask , nodeId - 1 );
1214
- PaxosSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
1215
+ RaftableSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
1215
1216
}
1216
1217
1217
- /*
1218
- * Paxos function stubs (until them are miplemented)
1219
- */
1220
- void * PaxosGet (char const * key , int * size , PaxosTimestamp * ts , bool nowait )
1221
- {
1222
- unsigned enclen , declen , len ;
1223
- char * enc , * dec ;
1224
- Assert (ts == NULL ); // not implemented
1225
-
1226
- enc = raftable_get (key );
1227
- if (enc == NULL )
1228
- {
1229
- * size = 0 ;
1230
- return NULL ;
1231
- }
1232
-
1233
- enclen = strlen (enc );
1234
- declen = hex_dec_len (enc , enclen );
1235
- dec = palloc (declen );
1236
- len = hex_decode (enc , enclen , dec );
1237
- pfree (enc );
1238
- Assert (len == declen );
1239
-
1240
- if (size != NULL ) {
1241
- * size = declen ;
1242
- }
1243
- return dec ;
1244
- }
1245
-
1246
- void PaxosSet (char const * key , void const * value , int size , bool nowait )
1247
- {
1248
- unsigned enclen , declen , len ;
1249
- char * enc , * dec ;
1250
-
1251
- enclen = hex_enc_len (value , size );
1252
- enc = palloc (enclen ) + 1 ;
1253
- len = hex_encode (value , size , enc );
1254
- Assert (len == enclen );
1255
- enc [len ] = '\0' ;
1256
-
1257
- raftable_set (key , enc , nowait ? 1 : INT_MAX );
1258
- pfree (enc );
1259
- }
1260
1218
1261
1219
1262
1220
/*
@@ -1483,6 +1441,19 @@ _PG_init(void)
1483
1441
NULL
1484
1442
);
1485
1443
1444
+ DefineCustomBoolVariable (
1445
+ "multimaster.use_raftable" ,
1446
+ "Use raftable plugin for internode communication" ,
1447
+ NULL ,
1448
+ & MtmUseRaftable ,
1449
+ false,
1450
+ PGC_BACKEND ,
1451
+ 0 ,
1452
+ NULL ,
1453
+ NULL ,
1454
+ NULL
1455
+ );
1456
+
1486
1457
DefineCustomIntVariable (
1487
1458
"multimaster.workers" ,
1488
1459
"Number of multimaster executor workers per node" ,
@@ -1773,6 +1744,10 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
1773
1744
break ;
1774
1745
}
1775
1746
}
1747
+ if (isRecoverySession ) {
1748
+ MTM_INFO ("%d: PGLOGICAL startup hook\n" , MyProcPid );
1749
+ sleep (30 );
1750
+ }
1776
1751
MtmLock (LW_EXCLUSIVE );
1777
1752
if (isRecoverySession ) {
1778
1753
elog (WARNING , "Node %d start recovery of node %d" , MtmNodeId , MtmReplicationNodeId );
@@ -1805,7 +1780,7 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
1805
1780
bool res = Mtm -> status != MTM_RECOVERY
1806
1781
&& (args -> origin_id == InvalidRepOriginId
1807
1782
|| MtmIsRecoveredNode (MtmReplicationNodeId ));
1808
- MTM_INFO ("%d: MtmReplicationTxnFilterHook->%d\n" , MyProcPid , res );
1783
+ MTM_TRACE ("%d: MtmReplicationTxnFilterHook->%d\n" , MyProcPid , res );
1809
1784
return res ;
1810
1785
}
1811
1786
@@ -2373,16 +2348,16 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
2373
2348
2374
2349
ByteBufferAlloc (& buf );
2375
2350
EnumerateLocks (MtmSerializeLock , & buf );
2376
- PaxosSet (psprintf ("lock-graph-%d" , MtmNodeId ), buf .data , buf .used , true);
2351
+ RaftableSet (psprintf ("lock-graph-%d" , MtmNodeId ), buf .data , buf .used , true);
2377
2352
MtmGraphInit (& graph );
2378
2353
MtmGraphAdd (& graph , (GlobalTransactionId * )buf .data , buf .used /sizeof (GlobalTransactionId ));
2379
2354
ByteBufferFree (& buf );
2380
2355
for (i = 0 ; i < MtmNodes ; i ++ ) {
2381
2356
if (i + 1 != MtmNodeId && !BIT_CHECK (Mtm -> disabledNodeMask , i )) {
2382
2357
int size ;
2383
- void * data = PaxosGet (psprintf ("lock-graph-%d" , i + 1 ), & size , NULL , true);
2358
+ void * data = RaftableGet (psprintf ("lock-graph-%d" , i + 1 ), & size , NULL , true);
2384
2359
if (data == NULL ) {
2385
- return true; /* Just temporary hack until no Paxos */
2360
+ return true; /* If using Raftable is disabled */
2386
2361
} else {
2387
2362
MtmGraphAdd (& graph , (GlobalTransactionId * )data , size /sizeof (GlobalTransactionId ));
2388
2363
}
0 commit comments