57
57
58
58
#include "multimaster.h"
59
59
#include "ddd.h"
60
- #include "paxos .h"
60
+ #include "raftable .h"
61
61
62
62
typedef struct {
63
63
TransactionId xid ; /* local transaction ID */
@@ -179,6 +179,7 @@ static int MtmWorkers;
179
179
static int MtmVacuumDelay ;
180
180
static int MtmMinRecoveryLag ;
181
181
static int MtmMaxRecoveryLag ;
182
+ static bool MtmUseRaftable ;
182
183
183
184
static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
184
185
static ProcessUtility_hook_type PreviousProcessUtilityHook ;
@@ -1103,7 +1104,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
1103
1104
int i , j , n = MtmNodes ;
1104
1105
for (i = 0 ; i < n ; i ++ ) {
1105
1106
if (i + 1 != MtmNodeId ) {
1106
- void * data = PaxosGet (psprintf ("node-mask-%d" , i + 1 ), NULL , NULL , nowait );
1107
+ void * data = RaftableGet (psprintf ("node-mask-%d" , i + 1 ), NULL , NULL , nowait );
1107
1108
if (data == NULL ) {
1108
1109
return false;
1109
1110
}
@@ -1133,7 +1134,7 @@ bool MtmRefreshClusterStatus(bool nowait)
1133
1134
int clique_size ;
1134
1135
int i ;
1135
1136
1136
- if (!MtmBuildConnectivityMatrix (matrix , nowait )) {
1137
+ if (!MtmUseRaftable || ! MtmBuildConnectivityMatrix (matrix , nowait )) {
1137
1138
/* RAFT is not available */
1138
1139
return false;
1139
1140
}
@@ -1193,7 +1194,7 @@ void MtmCheckQuorum(void)
1193
1194
void MtmOnNodeDisconnect (int nodeId )
1194
1195
{
1195
1196
BIT_SET (Mtm -> connectivityMask , nodeId - 1 );
1196
- PaxosSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
1197
+ RaftableSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
1197
1198
1198
1199
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
1199
1200
MtmSleep (MtmKeepaliveTimeout );
@@ -1212,52 +1213,9 @@ void MtmOnNodeDisconnect(int nodeId)
1212
1213
void MtmOnNodeConnect (int nodeId )
1213
1214
{
1214
1215
BIT_CLEAR (Mtm -> connectivityMask , nodeId - 1 );
1215
- PaxosSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
1216
+ RaftableSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
1216
1217
}
1217
1218
1218
- /*
1219
- * Paxos function stubs (until them are miplemented)
1220
- */
1221
- void * PaxosGet (char const * key , int * size , PaxosTimestamp * ts , bool nowait )
1222
- {
1223
- unsigned enclen , declen , len ;
1224
- char * enc , * dec ;
1225
- Assert (ts == NULL ); // not implemented
1226
-
1227
- enc = raftable_get (key );
1228
- if (enc == NULL )
1229
- {
1230
- * size = 0 ;
1231
- return NULL ;
1232
- }
1233
-
1234
- enclen = strlen (enc );
1235
- declen = hex_dec_len (enc , enclen );
1236
- dec = palloc (declen );
1237
- len = hex_decode (enc , enclen , dec );
1238
- pfree (enc );
1239
- Assert (len == declen );
1240
-
1241
- if (size != NULL ) {
1242
- * size = declen ;
1243
- }
1244
- return dec ;
1245
- }
1246
-
1247
- void PaxosSet (char const * key , void const * value , int size , bool nowait )
1248
- {
1249
- unsigned enclen , declen , len ;
1250
- char * enc , * dec ;
1251
-
1252
- enclen = hex_enc_len (value , size );
1253
- enc = palloc (enclen ) + 1 ;
1254
- len = hex_encode (value , size , enc );
1255
- Assert (len == enclen );
1256
- enc [len ] = '\0' ;
1257
-
1258
- raftable_set (key , enc , nowait ? 1 : INT_MAX );
1259
- pfree (enc );
1260
- }
1261
1219
1262
1220
1263
1221
/*
@@ -1484,6 +1442,19 @@ _PG_init(void)
1484
1442
NULL
1485
1443
);
1486
1444
1445
+ DefineCustomBoolVariable (
1446
+ "multimaster.use_raftable" ,
1447
+ "Use raftable plugin for internode communication" ,
1448
+ NULL ,
1449
+ & MtmUseRaftable ,
1450
+ false,
1451
+ PGC_BACKEND ,
1452
+ 0 ,
1453
+ NULL ,
1454
+ NULL ,
1455
+ NULL
1456
+ );
1457
+
1487
1458
DefineCustomIntVariable (
1488
1459
"multimaster.workers" ,
1489
1460
"Number of multimaster executor workers per node" ,
@@ -1774,6 +1745,10 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
1774
1745
break ;
1775
1746
}
1776
1747
}
1748
+ if (isRecoverySession ) {
1749
+ MTM_INFO ("%d: PGLOGICAL startup hook\n" , MyProcPid );
1750
+ sleep (30 );
1751
+ }
1777
1752
MtmLock (LW_EXCLUSIVE );
1778
1753
if (isRecoverySession ) {
1779
1754
elog (WARNING , "Node %d start recovery of node %d" , MtmNodeId , MtmReplicationNodeId );
@@ -1806,7 +1781,7 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
1806
1781
bool res = Mtm -> status != MTM_RECOVERY
1807
1782
&& (args -> origin_id == InvalidRepOriginId
1808
1783
|| MtmIsRecoveredNode (MtmReplicationNodeId ));
1809
- MTM_INFO ("%d: MtmReplicationTxnFilterHook->%d\n" , MyProcPid , res );
1784
+ MTM_TRACE ("%d: MtmReplicationTxnFilterHook->%d\n" , MyProcPid , res );
1810
1785
return res ;
1811
1786
}
1812
1787
@@ -2375,16 +2350,16 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
2375
2350
2376
2351
ByteBufferAlloc (& buf );
2377
2352
EnumerateLocks (MtmSerializeLock , & buf );
2378
- PaxosSet (psprintf ("lock-graph-%d" , MtmNodeId ), buf .data , buf .used , true);
2353
+ RaftableSet (psprintf ("lock-graph-%d" , MtmNodeId ), buf .data , buf .used , true);
2379
2354
MtmGraphInit (& graph );
2380
2355
MtmGraphAdd (& graph , (GlobalTransactionId * )buf .data , buf .used /sizeof (GlobalTransactionId ));
2381
2356
ByteBufferFree (& buf );
2382
2357
for (i = 0 ; i < MtmNodes ; i ++ ) {
2383
2358
if (i + 1 != MtmNodeId && !BIT_CHECK (Mtm -> disabledNodeMask , i )) {
2384
2359
int size ;
2385
- void * data = PaxosGet (psprintf ("lock-graph-%d" , i + 1 ), & size , NULL , true);
2360
+ void * data = RaftableGet (psprintf ("lock-graph-%d" , i + 1 ), & size , NULL , true);
2386
2361
if (data == NULL ) {
2387
- return true; /* Just temporary hack until no Paxos */
2362
+ return true; /* If using Raftable is disabled */
2388
2363
} else {
2389
2364
MtmGraphAdd (& graph , (GlobalTransactionId * )data , size /sizeof (GlobalTransactionId ));
2390
2365
}
0 commit comments