Skip to content

Commit f6d500c

Browse files
committed
Merge branch 'raftable-dynamic'
2 parents 3884219 + 4646502 commit f6d500c

28 files changed

+2316
-1270
lines changed

contrib/mmts/multimaster.c

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1712,6 +1712,26 @@ static void MtmLoadLocalTables(void)
17121712
}
17131713
}
17141714

1715+
static void MtmRaftableInitialize()
1716+
{
1717+
int i;
1718+
1719+
for (i = 0; i < MtmNodes; i++)
1720+
{
1721+
char const* raftport = strstr(MtmConnections[i].connStr, "raftport=");
1722+
int port;
1723+
if (raftport != NULL) {
1724+
if (sscanf(raftport+9, "%d", &port) != 1) {
1725+
elog(ERROR, "Invalid raftable port: %s", raftport+9);
1726+
}
1727+
} else {
1728+
port = MtmRaftablePort + i;
1729+
}
1730+
raftable_peer(i, MtmConnections[i].hostName, port);
1731+
}
1732+
raftable_start(MtmNodeId - 1);
1733+
}
1734+
17151735

17161736
static void MtmInitialize()
17171737
{
@@ -1867,33 +1887,6 @@ static void MtmSplitConnStrs(void)
18671887
pfree(copy);
18681888
}
18691889

1870-
static void MtmRaftableInitialize()
1871-
{
1872-
int i;
1873-
WorkerConfig wcfg;
1874-
1875-
for (i = 0; i < RAFTABLE_PEERS_MAX; i++)
1876-
{
1877-
wcfg.peers[i].up = false;
1878-
}
1879-
1880-
for (i = 0; i < MtmNodes; i++)
1881-
{
1882-
char const* raftport = strstr(MtmConnections[i].connStr, "raftport=");
1883-
if (raftport != NULL) {
1884-
if (sscanf(raftport+9, "%d", &wcfg.peers[i].port) != 1) {
1885-
elog(ERROR, "Invalid raftable port: %s", raftport+9);
1886-
}
1887-
} else {
1888-
wcfg.peers[i].port = MtmRaftablePort + i;
1889-
}
1890-
wcfg.peers[i].up = true;
1891-
strncpy(wcfg.peers[i].host, MtmConnections[i].hostName, sizeof(wcfg.peers[i].host));
1892-
}
1893-
wcfg.id = MtmNodeId-1;
1894-
worker_register(&wcfg);
1895-
}
1896-
18971890
void
18981891
_PG_init(void)
18991892
{
@@ -2253,7 +2246,7 @@ _PG_init(void)
22532246

22542247
BgwPoolStart(MtmWorkers, MtmPoolConstructor);
22552248

2256-
//MtmRaftableInitialize();
2249+
MtmRaftableInitialize();
22572250
MtmArbiterInitialize();
22582251

22592252
/*

contrib/mmts/raftable.c

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,36 @@
99
*/
1010
void* RaftableGet(char const* key, size_t* size, RaftableTimestamp* ts, bool nowait)
1111
{
12+
void *value;
13+
size_t vallen;
1214
if (!MtmUseRaftable) {
1315
return NULL;
1416
}
15-
return raftable_get(key, size);
17+
value = raftable_get(key, &vallen, MtmHeartbeatSendTimeout);
18+
if (size != NULL) {
19+
*size = vallen;
20+
}
21+
return value;
1622
}
1723

1824

1925
void RaftableSet(char const* key, void const* value, size_t size, bool nowait)
2026
{
2127
if (MtmUseRaftable) {
28+
int tries = 10;
2229
timestamp_t start, stop;
2330
start = MtmGetSystemTime();
2431
if (nowait) {
2532
raftable_set(key, value, size, 0);
2633
} else {
27-
while (!raftable_set(key, value, size, MtmHeartbeatSendTimeout)) {
34+
while (!raftable_set(key, value, size, MtmHeartbeatSendTimeout))
35+
{
2836
MtmCheckHeartbeat();
37+
if (tries-- <= 0)
38+
{
39+
MTM_LOG1("RaftableSet nowait=%d, all attempts failed", nowait);
40+
break;
41+
}
2942
}
3043
}
3144
stop = MtmGetSystemTime();

contrib/raftable/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
MODULE_big = raftable
2-
OBJS = raftable.o worker.o state.o blockmem.o timeout.o raft/obj/raft.o raft/obj/util.o
2+
OBJS = raftable.o worker.o client.o state.o timeout.o raft/obj/raft.o raft/obj/util.o
33
EXTENSION = raftable
44
DATA = raftable--1.0.sql
55

contrib/raftable/blockmem.c

Lines changed: 0 additions & 166 deletions
This file was deleted.

contrib/raftable/blockmem.h

Lines changed: 0 additions & 38 deletions
This file was deleted.

0 commit comments

Comments
 (0)