Skip to content

Commit b83be88

Browse files
committed
Rename paxos to taftable and make last one optionally included
1 parent a088277 commit b83be88

File tree

3 files changed

+40
-61
lines changed

3 files changed

+40
-61
lines changed

contrib/mmts/multimaster.c

Lines changed: 27 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757

5858
#include "multimaster.h"
5959
#include "ddd.h"
60-
#include "paxos.h"
60+
#include "raftable.h"
6161

6262
typedef struct {
6363
TransactionId xid; /* local transaction ID */
@@ -179,6 +179,7 @@ static int MtmWorkers;
179179
static int MtmVacuumDelay;
180180
static int MtmMinRecoveryLag;
181181
static int MtmMaxRecoveryLag;
182+
static bool MtmUseRaftable;
182183

183184
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
184185
static ProcessUtility_hook_type PreviousProcessUtilityHook;
@@ -1103,7 +1104,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
11031104
int i, j, n = MtmNodes;
11041105
for (i = 0; i < n; i++) {
11051106
if (i+1 != MtmNodeId) {
1106-
void* data = PaxosGet(psprintf("node-mask-%d", i+1), NULL, NULL, nowait);
1107+
void* data = RaftableGet(psprintf("node-mask-%d", i+1), NULL, NULL, nowait);
11071108
if (data == NULL) {
11081109
return false;
11091110
}
@@ -1133,7 +1134,7 @@ bool MtmRefreshClusterStatus(bool nowait)
11331134
int clique_size;
11341135
int i;
11351136

1136-
if (!MtmBuildConnectivityMatrix(matrix, nowait)) {
1137+
if (!MtmUseRaftable || !MtmBuildConnectivityMatrix(matrix, nowait)) {
11371138
/* RAFT is not available */
11381139
return false;
11391140
}
@@ -1193,7 +1194,7 @@ void MtmCheckQuorum(void)
11931194
void MtmOnNodeDisconnect(int nodeId)
11941195
{
11951196
BIT_SET(Mtm->connectivityMask, nodeId-1);
1196-
PaxosSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
1197+
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
11971198

11981199
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
11991200
MtmSleep(MtmKeepaliveTimeout);
@@ -1212,52 +1213,9 @@ void MtmOnNodeDisconnect(int nodeId)
12121213
void MtmOnNodeConnect(int nodeId)
12131214
{
12141215
BIT_CLEAR(Mtm->connectivityMask, nodeId-1);
1215-
PaxosSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
1216+
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
12161217
}
12171218

1218-
/*
1219-
* Paxos function stubs (until them are miplemented)
1220-
*/
1221-
void* PaxosGet(char const* key, int* size, PaxosTimestamp* ts, bool nowait)
1222-
{
1223-
unsigned enclen, declen, len;
1224-
char *enc, *dec;
1225-
Assert(ts == NULL); // not implemented
1226-
1227-
enc = raftable_get(key);
1228-
if (enc == NULL)
1229-
{
1230-
*size = 0;
1231-
return NULL;
1232-
}
1233-
1234-
enclen = strlen(enc);
1235-
declen = hex_dec_len(enc, enclen);
1236-
dec = palloc(declen);
1237-
len = hex_decode(enc, enclen, dec);
1238-
pfree(enc);
1239-
Assert(len == declen);
1240-
1241-
if (size != NULL) {
1242-
*size = declen;
1243-
}
1244-
return dec;
1245-
}
1246-
1247-
void PaxosSet(char const* key, void const* value, int size, bool nowait)
1248-
{
1249-
unsigned enclen, declen, len;
1250-
char *enc, *dec;
1251-
1252-
enclen = hex_enc_len(value, size);
1253-
enc = palloc(enclen) + 1;
1254-
len = hex_encode(value, size, enc);
1255-
Assert(len == enclen);
1256-
enc[len] = '\0';
1257-
1258-
raftable_set(key, enc, nowait ? 1 : INT_MAX);
1259-
pfree(enc);
1260-
}
12611219

12621220

12631221
/*
@@ -1484,6 +1442,19 @@ _PG_init(void)
14841442
NULL
14851443
);
14861444

1445+
DefineCustomBoolVariable(
1446+
"multimaster.use_raftable",
1447+
"Use raftable plugin for internode communication",
1448+
NULL,
1449+
&MtmUseRaftable,
1450+
false,
1451+
PGC_BACKEND,
1452+
0,
1453+
NULL,
1454+
NULL,
1455+
NULL
1456+
);
1457+
14871458
DefineCustomIntVariable(
14881459
"multimaster.workers",
14891460
"Number of multimaster executor workers per node",
@@ -1774,6 +1745,10 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
17741745
break;
17751746
}
17761747
}
1748+
if (isRecoverySession) {
1749+
MTM_INFO("%d: PGLOGICAL startup hook\n", MyProcPid);
1750+
sleep(30);
1751+
}
17771752
MtmLock(LW_EXCLUSIVE);
17781753
if (isRecoverySession) {
17791754
elog(WARNING, "Node %d start recovery of node %d", MtmNodeId, MtmReplicationNodeId);
@@ -1806,7 +1781,7 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
18061781
bool res = Mtm->status != MTM_RECOVERY
18071782
&& (args->origin_id == InvalidRepOriginId
18081783
|| MtmIsRecoveredNode(MtmReplicationNodeId));
1809-
MTM_INFO("%d: MtmReplicationTxnFilterHook->%d\n", MyProcPid, res);
1784+
MTM_TRACE("%d: MtmReplicationTxnFilterHook->%d\n", MyProcPid, res);
18101785
return res;
18111786
}
18121787

@@ -2375,16 +2350,16 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
23752350

23762351
ByteBufferAlloc(&buf);
23772352
EnumerateLocks(MtmSerializeLock, &buf);
2378-
PaxosSet(psprintf("lock-graph-%d", MtmNodeId), buf.data, buf.used, true);
2353+
RaftableSet(psprintf("lock-graph-%d", MtmNodeId), buf.data, buf.used, true);
23792354
MtmGraphInit(&graph);
23802355
MtmGraphAdd(&graph, (GlobalTransactionId*)buf.data, buf.used/sizeof(GlobalTransactionId));
23812356
ByteBufferFree(&buf);
23822357
for (i = 0; i < MtmNodes; i++) {
23832358
if (i+1 != MtmNodeId && !BIT_CHECK(Mtm->disabledNodeMask, i)) {
23842359
int size;
2385-
void* data = PaxosGet(psprintf("lock-graph-%d", i+1), &size, NULL, true);
2360+
void* data = RaftableGet(psprintf("lock-graph-%d", i+1), &size, NULL, true);
23862361
if (data == NULL) {
2387-
return true; /* Just temporary hack until no Paxos */
2362+
return true; /* If using Raftable is disabled */
23882363
} else {
23892364
MtmGraphAdd(&graph, (GlobalTransactionId*)data, size/sizeof(GlobalTransactionId));
23902365
}
Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
#ifndef __PAXOS_H__
2-
#define __PAXOS_H__
1+
#ifndef __RAFTABLE_H__
2+
#define __RAFTABLE_H__
33

4-
typedef struct PaxosTimestamp {
4+
typedef struct RaftableTimestamp {
55
time_t time; /* local time at master */
66
uint32 master; /* master node for this operation */
7-
uint32 psn; /* PAXOS serial number */
8-
} PaxosTimestamp;
7+
uint32 psn; /* RAFTABLE serial number */
8+
} RaftableTimestamp;
99

1010
/*
1111
* Get key value.
@@ -15,18 +15,23 @@ typedef struct PaxosTimestamp {
1515
* If "ts" is not NULL, then it is assigned timestamp of last update of this value
1616
* If RAFT master is not accessible, then depending non value of "nowait" parameter, this funciton should either block until RAFT quorum is reached, either report error.
1717
*/
18-
extern void* PaxosGet(char const* key, int* size, PaxosTimestamp* ts, bool nowait);
18+
extern void* RaftableGet(char const* key, int* size, RaftableTimestamp* ts, bool nowait);
1919

2020
/*
2121
* Set new value for the specified key. IF value is NULL, then key should be deleted.
2222
* If RAFT master is not accessible, then depending non value of "nowait" parameter, this funciton should either block until RAFT quorum is reached, either report error.
2323
*/
24-
extern void PaxosSet(char const* key, void const* value, int size, bool nowait);
24+
extern void RaftableSet(char const* key, void const* value, int size, bool nowait);
2525

2626
/*
2727
* If key doesn't exists or its value is not equal to the specified value then store this value and return true.
2828
* Otherwise do nothing and return false.
2929
* If RAFT master is not accessible, then depending non value of "nowait" parameter, this funciton should either block until RAFT quorum is reached, either report error.
3030
*/
31-
extern bool PaxosCAS(char const* key, char const* value, bool nowait);
31+
extern bool RaftableCAS(char const* key, char const* value, bool nowait);
32+
33+
typedef void* (*raftable_get_t)(char const* key, int* size, int timeout);
34+
typedef void (*raftable_set_t)(char const* key, void const* value, int size, int timeout);
35+
36+
3237
#endif

src/backend/replication/logical/reorderbuffer.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ static const Size max_cached_changes = 4096 * 2;
160160
static const Size max_cached_tuplebufs = 4096 * 2; /* ~8MB */
161161
static const Size max_cached_transactions = 512;
162162

163-
164163
/* ---------------------------------------
165164
* primary reorderbuffer support routines
166165
* ---------------------------------------

0 commit comments

Comments
 (0)