58
58
#include "raftable.h"
59
59
#include "multimaster.h"
60
60
#include "ddd.h"
61
- #include "paxos .h"
61
+ #include "raftable .h"
62
62
63
63
typedef struct {
64
64
TransactionId xid ; /* local transaction ID */
@@ -172,6 +172,7 @@ int MtmConnectAttempts;
172
172
int MtmConnectTimeout ;
173
173
int MtmKeepaliveTimeout ;
174
174
int MtmReconnectAttempts ;
175
+ bool MtmUseRaftable ;
175
176
MtmConnectionInfo * MtmConnections ;
176
177
177
178
static char * MtmConnStrs ;
@@ -1104,7 +1105,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
1104
1105
int i , j , n = MtmNodes ;
1105
1106
for (i = 0 ; i < n ; i ++ ) {
1106
1107
if (i + 1 != MtmNodeId ) {
1107
- void * data = PaxosGet (psprintf ("node-mask-%d" , i + 1 ), NULL , NULL , nowait );
1108
+ void * data = RaftableGet (psprintf ("node-mask-%d" , i + 1 ), NULL , NULL , nowait );
1108
1109
if (data == NULL ) {
1109
1110
return false;
1110
1111
}
@@ -1134,7 +1135,7 @@ bool MtmRefreshClusterStatus(bool nowait)
1134
1135
int clique_size ;
1135
1136
int i ;
1136
1137
1137
- if (!MtmBuildConnectivityMatrix (matrix , nowait )) {
1138
+ if (!MtmUseRaftable || ! MtmBuildConnectivityMatrix (matrix , nowait )) {
1138
1139
/* RAFT is not available */
1139
1140
return false;
1140
1141
}
@@ -1194,7 +1195,7 @@ void MtmCheckQuorum(void)
1194
1195
void MtmOnNodeDisconnect (int nodeId )
1195
1196
{
1196
1197
BIT_SET (Mtm -> connectivityMask , nodeId - 1 );
1197
- PaxosSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
1198
+ RaftableSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
1198
1199
1199
1200
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
1200
1201
MtmSleep (MtmKeepaliveTimeout );
@@ -1213,52 +1214,9 @@ void MtmOnNodeDisconnect(int nodeId)
1213
1214
void MtmOnNodeConnect (int nodeId )
1214
1215
{
1215
1216
BIT_CLEAR (Mtm -> connectivityMask , nodeId - 1 );
1216
- PaxosSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
1217
+ RaftableSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
1217
1218
}
1218
1219
1219
- /*
1220
- * Paxos function stubs (until them are miplemented)
1221
- */
1222
- void * PaxosGet (char const * key , int * size , PaxosTimestamp * ts , bool nowait )
1223
- {
1224
- unsigned enclen , declen , len ;
1225
- char * enc , * dec ;
1226
- Assert (ts == NULL ); // not implemented
1227
-
1228
- enc = raftable_get (key );
1229
- if (enc == NULL )
1230
- {
1231
- * size = 0 ;
1232
- return NULL ;
1233
- }
1234
-
1235
- enclen = strlen (enc );
1236
- declen = hex_dec_len (enc , enclen );
1237
- dec = palloc (declen );
1238
- len = hex_decode (enc , enclen , dec );
1239
- pfree (enc );
1240
- Assert (len == declen );
1241
-
1242
- if (size != NULL ) {
1243
- * size = declen ;
1244
- }
1245
- return dec ;
1246
- }
1247
-
1248
- void PaxosSet (char const * key , void const * value , int size , bool nowait )
1249
- {
1250
- unsigned enclen , declen , len ;
1251
- char * enc , * dec ;
1252
-
1253
- enclen = hex_enc_len (value , size );
1254
- enc = palloc (enclen ) + 1 ;
1255
- len = hex_encode (value , size , enc );
1256
- Assert (len == enclen );
1257
- enc [len ] = '\0' ;
1258
-
1259
- raftable_set (key , enc , nowait ? 1 : INT_MAX );
1260
- pfree (enc );
1261
- }
1262
1220
1263
1221
1264
1222
/*
@@ -1485,6 +1443,19 @@ _PG_init(void)
1485
1443
NULL
1486
1444
);
1487
1445
1446
+ DefineCustomBoolVariable (
1447
+ "multimaster.use_raftable" ,
1448
+ "Use raftable plugin for internode communication" ,
1449
+ NULL ,
1450
+ & MtmUseRaftable ,
1451
+ false,
1452
+ PGC_BACKEND ,
1453
+ 0 ,
1454
+ NULL ,
1455
+ NULL ,
1456
+ NULL
1457
+ );
1458
+
1488
1459
DefineCustomIntVariable (
1489
1460
"multimaster.workers" ,
1490
1461
"Number of multimaster executor workers per node" ,
@@ -1775,6 +1746,10 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
1775
1746
break ;
1776
1747
}
1777
1748
}
1749
+ if (isRecoverySession ) {
1750
+ MTM_INFO ("%d: PGLOGICAL startup hook\n" , MyProcPid );
1751
+ sleep (30 );
1752
+ }
1778
1753
MtmLock (LW_EXCLUSIVE );
1779
1754
if (isRecoverySession ) {
1780
1755
elog (WARNING , "Node %d start recovery of node %d" , MtmNodeId , MtmReplicationNodeId );
@@ -1807,7 +1782,7 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
1807
1782
bool res = Mtm -> status != MTM_RECOVERY
1808
1783
&& (args -> origin_id == InvalidRepOriginId
1809
1784
|| MtmIsRecoveredNode (MtmReplicationNodeId ));
1810
- MTM_INFO ("%d: MtmReplicationTxnFilterHook->%d\n" , MyProcPid , res );
1785
+ MTM_TRACE ("%d: MtmReplicationTxnFilterHook->%d\n" , MyProcPid , res );
1811
1786
return res ;
1812
1787
}
1813
1788
@@ -2376,16 +2351,16 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
2376
2351
2377
2352
ByteBufferAlloc (& buf );
2378
2353
EnumerateLocks (MtmSerializeLock , & buf );
2379
- PaxosSet (psprintf ("lock-graph-%d" , MtmNodeId ), buf .data , buf .used , true);
2354
+ RaftableSet (psprintf ("lock-graph-%d" , MtmNodeId ), buf .data , buf .used , true);
2380
2355
MtmGraphInit (& graph );
2381
2356
MtmGraphAdd (& graph , (GlobalTransactionId * )buf .data , buf .used /sizeof (GlobalTransactionId ));
2382
2357
ByteBufferFree (& buf );
2383
2358
for (i = 0 ; i < MtmNodes ; i ++ ) {
2384
2359
if (i + 1 != MtmNodeId && !BIT_CHECK (Mtm -> disabledNodeMask , i )) {
2385
2360
int size ;
2386
- void * data = PaxosGet (psprintf ("lock-graph-%d" , i + 1 ), & size , NULL , true);
2361
+ void * data = RaftableGet (psprintf ("lock-graph-%d" , i + 1 ), & size , NULL , true);
2387
2362
if (data == NULL ) {
2388
- return true; /* Just temporary hack until no Paxos */
2363
+ return true; /* If using Raftable is disabled */
2389
2364
} else {
2390
2365
MtmGraphAdd (& graph , (GlobalTransactionId * )data , size /sizeof (GlobalTransactionId ));
2391
2366
}
0 commit comments