Skip to content

Commit 86ef2cd

Browse files
committed
Merge branch 'master' of github.com:postgrespro/postgres_cluster
2 parents 05c333e + dd875d8 commit 86ef2cd

File tree

5 files changed

+43
-64
lines changed

5 files changed

+43
-64
lines changed

contrib/mmts/Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
MODULE_big = multimaster
2-
OBJS = multimaster.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o ddd.o bkb.o
2+
OBJS = multimaster.o raftable.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o ddd.o bkb.o
33

44
override CPPFLAGS += -I../raftable
55

@@ -8,7 +8,7 @@ DATA = multimaster--1.0.sql
88

99
.PHONY: all
1010

11-
all: multimaster.o multimaster.so
11+
all: multimaster.so
1212

1313
PG_CPPFLAGS = -I$(libpq_srcdir) -DUSE_PGLOGICAL_OUTPUT
1414
SHLIB_LINK = $(libpq)

contrib/mmts/multimaster.c

Lines changed: 27 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
#include "raftable.h"
5959
#include "multimaster.h"
6060
#include "ddd.h"
61-
#include "paxos.h"
61+
#include "raftable.h"
6262

6363
typedef struct {
6464
TransactionId xid; /* local transaction ID */
@@ -172,6 +172,7 @@ int MtmConnectAttempts;
172172
int MtmConnectTimeout;
173173
int MtmKeepaliveTimeout;
174174
int MtmReconnectAttempts;
175+
bool MtmUseRaftable;
175176
MtmConnectionInfo* MtmConnections;
176177

177178
static char* MtmConnStrs;
@@ -1104,7 +1105,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
11041105
int i, j, n = MtmNodes;
11051106
for (i = 0; i < n; i++) {
11061107
if (i+1 != MtmNodeId) {
1107-
void* data = PaxosGet(psprintf("node-mask-%d", i+1), NULL, NULL, nowait);
1108+
void* data = RaftableGet(psprintf("node-mask-%d", i+1), NULL, NULL, nowait);
11081109
if (data == NULL) {
11091110
return false;
11101111
}
@@ -1134,7 +1135,7 @@ bool MtmRefreshClusterStatus(bool nowait)
11341135
int clique_size;
11351136
int i;
11361137

1137-
if (!MtmBuildConnectivityMatrix(matrix, nowait)) {
1138+
if (!MtmUseRaftable || !MtmBuildConnectivityMatrix(matrix, nowait)) {
11381139
/* RAFT is not available */
11391140
return false;
11401141
}
@@ -1194,7 +1195,7 @@ void MtmCheckQuorum(void)
11941195
void MtmOnNodeDisconnect(int nodeId)
11951196
{
11961197
BIT_SET(Mtm->connectivityMask, nodeId-1);
1197-
PaxosSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
1198+
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
11981199

11991200
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
12001201
MtmSleep(MtmKeepaliveTimeout);
@@ -1213,52 +1214,9 @@ void MtmOnNodeDisconnect(int nodeId)
12131214
void MtmOnNodeConnect(int nodeId)
12141215
{
12151216
BIT_CLEAR(Mtm->connectivityMask, nodeId-1);
1216-
PaxosSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
1217+
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
12171218
}
12181219

1219-
/*
1220-
* Paxos function stubs (until them are miplemented)
1221-
*/
1222-
void* PaxosGet(char const* key, int* size, PaxosTimestamp* ts, bool nowait)
1223-
{
1224-
unsigned enclen, declen, len;
1225-
char *enc, *dec;
1226-
Assert(ts == NULL); // not implemented
1227-
1228-
enc = raftable_get(key);
1229-
if (enc == NULL)
1230-
{
1231-
*size = 0;
1232-
return NULL;
1233-
}
1234-
1235-
enclen = strlen(enc);
1236-
declen = hex_dec_len(enc, enclen);
1237-
dec = palloc(declen);
1238-
len = hex_decode(enc, enclen, dec);
1239-
pfree(enc);
1240-
Assert(len == declen);
1241-
1242-
if (size != NULL) {
1243-
*size = declen;
1244-
}
1245-
return dec;
1246-
}
1247-
1248-
void PaxosSet(char const* key, void const* value, int size, bool nowait)
1249-
{
1250-
unsigned enclen, declen, len;
1251-
char *enc, *dec;
1252-
1253-
enclen = hex_enc_len(value, size);
1254-
enc = palloc(enclen) + 1;
1255-
len = hex_encode(value, size, enc);
1256-
Assert(len == enclen);
1257-
enc[len] = '\0';
1258-
1259-
raftable_set(key, enc, nowait ? 1 : INT_MAX);
1260-
pfree(enc);
1261-
}
12621220

12631221

12641222
/*
@@ -1485,6 +1443,19 @@ _PG_init(void)
14851443
NULL
14861444
);
14871445

1446+
DefineCustomBoolVariable(
1447+
"multimaster.use_raftable",
1448+
"Use raftable plugin for internode communication",
1449+
NULL,
1450+
&MtmUseRaftable,
1451+
false,
1452+
PGC_BACKEND,
1453+
0,
1454+
NULL,
1455+
NULL,
1456+
NULL
1457+
);
1458+
14881459
DefineCustomIntVariable(
14891460
"multimaster.workers",
14901461
"Number of multimaster executor workers per node",
@@ -1775,6 +1746,10 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
17751746
break;
17761747
}
17771748
}
1749+
if (isRecoverySession) {
1750+
MTM_INFO("%d: PGLOGICAL startup hook\n", MyProcPid);
1751+
sleep(30);
1752+
}
17781753
MtmLock(LW_EXCLUSIVE);
17791754
if (isRecoverySession) {
17801755
elog(WARNING, "Node %d start recovery of node %d", MtmNodeId, MtmReplicationNodeId);
@@ -1807,7 +1782,7 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
18071782
bool res = Mtm->status != MTM_RECOVERY
18081783
&& (args->origin_id == InvalidRepOriginId
18091784
|| MtmIsRecoveredNode(MtmReplicationNodeId));
1810-
MTM_INFO("%d: MtmReplicationTxnFilterHook->%d\n", MyProcPid, res);
1785+
MTM_TRACE("%d: MtmReplicationTxnFilterHook->%d\n", MyProcPid, res);
18111786
return res;
18121787
}
18131788

@@ -2376,16 +2351,16 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
23762351

23772352
ByteBufferAlloc(&buf);
23782353
EnumerateLocks(MtmSerializeLock, &buf);
2379-
PaxosSet(psprintf("lock-graph-%d", MtmNodeId), buf.data, buf.used, true);
2354+
RaftableSet(psprintf("lock-graph-%d", MtmNodeId), buf.data, buf.used, true);
23802355
MtmGraphInit(&graph);
23812356
MtmGraphAdd(&graph, (GlobalTransactionId*)buf.data, buf.used/sizeof(GlobalTransactionId));
23822357
ByteBufferFree(&buf);
23832358
for (i = 0; i < MtmNodes; i++) {
23842359
if (i+1 != MtmNodeId && !BIT_CHECK(Mtm->disabledNodeMask, i)) {
23852360
int size;
2386-
void* data = PaxosGet(psprintf("lock-graph-%d", i+1), &size, NULL, true);
2361+
void* data = RaftableGet(psprintf("lock-graph-%d", i+1), &size, NULL, true);
23872362
if (data == NULL) {
2388-
return true; /* Just temporary hack until no Paxos */
2363+
return true; /* If using Raftable is disabled */
23892364
} else {
23902365
MtmGraphAdd(&graph, (GlobalTransactionId*)data, size/sizeof(GlobalTransactionId));
23912366
}

contrib/mmts/multimaster.control

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,3 @@ default_version = '1.0'
33
module_pathname = '$libdir/multimaster'
44
schema = mtm
55
relocatable = false
6-
requires = raftable
Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
#ifndef __PAXOS_H__
2-
#define __PAXOS_H__
1+
#ifndef __RAFTABLE_H__
2+
#define __RAFTABLE_H__
33

4-
typedef struct PaxosTimestamp {
4+
typedef struct RaftableTimestamp {
55
time_t time; /* local time at master */
66
uint32 master; /* master node for this operation */
7-
uint32 psn; /* PAXOS serial number */
8-
} PaxosTimestamp;
7+
uint32 psn; /* RAFTABLE serial number */
8+
} RaftableTimestamp;
99

1010
/*
1111
* Get key value.
@@ -15,18 +15,24 @@ typedef struct PaxosTimestamp {
1515
* If "ts" is not NULL, then it is assigned timestamp of last update of this value
1616
* If RAFT master is not accessible, then depending non value of "nowait" parameter, this funciton should either block until RAFT quorum is reached, either report error.
1717
*/
18-
extern void* PaxosGet(char const* key, int* size, PaxosTimestamp* ts, bool nowait);
18+
extern void* RaftableGet(char const* key, int* size, RaftableTimestamp* ts, bool nowait);
1919

2020
/*
2121
* Set new value for the specified key. IF value is NULL, then key should be deleted.
2222
* If RAFT master is not accessible, then depending non value of "nowait" parameter, this funciton should either block until RAFT quorum is reached, either report error.
2323
*/
24-
extern void PaxosSet(char const* key, void const* value, int size, bool nowait);
24+
extern void RaftableSet(char const* key, void const* value, int size, bool nowait);
2525

2626
/*
2727
* If key doesn't exists or its value is not equal to the specified value then store this value and return true.
2828
* Otherwise do nothing and return false.
2929
* If RAFT master is not accessible, then depending non value of "nowait" parameter, this funciton should either block until RAFT quorum is reached, either report error.
3030
*/
31-
extern bool PaxosCAS(char const* key, char const* value, bool nowait);
31+
extern bool RaftableCAS(char const* key, char const* value, bool nowait);
32+
33+
typedef void* (*raftable_get_t)(char const* key, int* size, int timeout);
34+
typedef void (*raftable_set_t)(char const* key, void const* value, int size, int timeout);
35+
36+
extern bool MtmUseRaftable;
37+
3238
#endif

src/backend/replication/logical/reorderbuffer.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ static const Size max_cached_changes = 4096 * 2;
160160
static const Size max_cached_tuplebufs = 4096 * 2; /* ~8MB */
161161
static const Size max_cached_transactions = 512;
162162

163-
164163
/* ---------------------------------------
165164
* primary reorderbuffer support routines
166165
* ---------------------------------------

0 commit comments

Comments
 (0)