Skip to content

Commit a390ab0

Browse files
committed
Implement Raft protocol without dynamic configuration.
0 parents  commit a390ab0

File tree

6 files changed

+1498
-0
lines changed

6 files changed

+1498
-0
lines changed

Makefile

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#override CC := clang
2+
override CFLAGS += -Wfatal-errors -O0 -g
3+
override CPPFLAGS += -I. -Iinclude -DDEBUG
4+
override SERVER_LDFLAGS += -Llib -lraft -ljansson
5+
override CLIENT_LDFLAGS += -ljansson
6+
7+
AR = ar
8+
ARFLAGS = -cru
9+
10+
.PHONY: all clean bindir objdir libdir
11+
12+
all: lib/libraft.a bin/heart
13+
@echo Done.
14+
15+
lib/libraft.a: obj/raft.o obj/util.o | libdir objdir
16+
$(AR) $(ARFLAGS) lib/libraft.a obj/raft.o obj/util.o
17+
18+
bin/heart: obj/heart.o lib/libraft.a | bindir objdir
19+
$(CC) -o bin/heart $(CFLAGS) $(CPPFLAGS) \
20+
obj/heart.o $(SERVER_LDFLAGS)
21+
22+
obj/%.o: src/%.c | objdir
23+
$(CC) $(CFLAGS) $(CPPFLAGS) -c -o $@ $<
24+
25+
obj/%.o: example/%.c | objdir
26+
$(CC) $(CFLAGS) $(CPPFLAGS) -c -o $@ $<
27+
28+
bindir:
29+
mkdir -p bin
30+
31+
objdir:
32+
mkdir -p obj
33+
34+
libdir:
35+
mkdir -p lib
36+
37+
clean:
38+
rm -rfv bin obj lib

example/heart.c

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
#include <stdio.h>
2+
#include <stdlib.h>
3+
#include <string.h>
4+
#include <unistd.h>
5+
#include <assert.h>
6+
#include <time.h>
7+
#include <signal.h>
8+
#include <errno.h>
9+
#include <sys/wait.h>
10+
#include <sys/time.h>
11+
#include <stdbool.h>
12+
13+
#include <jansson.h>
14+
15+
#include "raft.h"
16+
#include "util.h"
17+
18+
static void applier(void *state, raft_update_t update, bool snapshot) {
19+
json_error_t error;
20+
21+
json_t *patch = json_loadb(update.data, update.len, 0, &error);
22+
if (!patch) {
23+
shout(
24+
"error parsing json at position %d: %s\n",
25+
error.column,
26+
error.text
27+
);
28+
}
29+
30+
if (snapshot) {
31+
json_object_clear(state);
32+
}
33+
34+
if (json_object_update(state, patch)) {
35+
shout("error updating state\n");
36+
}
37+
38+
json_decref(patch);
39+
40+
char *encoded = json_dumps(state, JSON_INDENT(4) | JSON_SORT_KEYS);
41+
if (encoded) {
42+
debug(
43+
"applied %s: new state is %s\n",
44+
snapshot ? "a snapshot" : "an update",
45+
encoded
46+
);
47+
} else {
48+
shout(
49+
"applied %s, but new state could not be encoded\n",
50+
snapshot ? "a snapshot" : "an update"
51+
);
52+
}
53+
free(encoded);
54+
}
55+
56+
static raft_update_t snapshooter(void *state) {
57+
json_error_t error;
58+
59+
raft_update_t shot;
60+
shot.data = json_dumps(state, JSON_SORT_KEYS);
61+
shot.len = strlen(shot.data);
62+
if (shot.data) {
63+
debug("snapshot taken: %.*s\n", shot.len, shot.data);
64+
} else {
65+
shout("failed to take a snapshot\n");
66+
}
67+
68+
return shot;
69+
}
70+
71+
static void die(int signum) {
72+
debug("dying\n");
73+
exit(signum);
74+
}
75+
76+
static void usage(char *prog) {
77+
printf(
78+
"Usage: %s -i ID -r ID:HOST:PORT [-r ID:HOST:PORT ...] [-l LOGFILE]\n"
79+
" -l : Run as a daemon and write output to LOGFILE.\n",
80+
prog
81+
);
82+
}
83+
84+
raft_t raft;
85+
86+
static void main_loop(char *host, int port) {
87+
mstimer_t t;
88+
mstimer_reset(&t);
89+
90+
// create a UDP socket for raft
91+
int r = raft_create_udp_socket(raft);
92+
if (r == NOBODY) {
93+
die(EXIT_FAILURE);
94+
}
95+
96+
#define EMIT_EVERY_MS 1000
97+
int emit_ms = 0;
98+
while (true) {
99+
int ms;
100+
raft_msg_t m;
101+
int applied;
102+
103+
ms = mstimer_reset(&t);
104+
105+
raft_tick(raft, ms);
106+
m = raft_recv_message(raft);
107+
if (m) {
108+
raft_handle_message(raft, m);
109+
}
110+
111+
if (raft_is_leader(raft)) {
112+
emit_ms += ms;
113+
while (emit_ms > EMIT_EVERY_MS) {
114+
emit_ms -= EMIT_EVERY_MS;
115+
char buf[1000];
116+
char key = 'a' + rand() % 5;
117+
int value = rand() % 10000;
118+
sprintf(buf, "{\"key-%c\":%d}", key, value);
119+
shout("emit update: = %s\n", buf);
120+
raft_update_t update = {strlen(buf), buf, NULL};
121+
raft_emit(raft, update);
122+
}
123+
}
124+
}
125+
126+
close(r);
127+
}
128+
129+
int main(int argc, char **argv) {
130+
char *logfilename = NULL;
131+
bool daemonize = false;
132+
133+
int myid = NOBODY;
134+
int id;
135+
char *host;
136+
char *str;
137+
int port;
138+
int opt;
139+
140+
char *myhost = NULL;
141+
int myport;
142+
143+
json_t *state = json_object();
144+
145+
raft_config_t rc;
146+
rc.peernum_max = 64;
147+
rc.heartbeat_ms = 20;
148+
rc.election_ms_min = 150;
149+
rc.election_ms_max = 300;
150+
rc.log_len = 10;
151+
rc.chunk_len = 4;
152+
rc.msg_len_max = 500;
153+
rc.applier = applier;
154+
rc.snapshooter = snapshooter;
155+
raft = raft_init(&rc);
156+
157+
int peernum = 0;
158+
while ((opt = getopt(argc, argv, "hi:r:l:")) != -1) {
159+
switch (opt) {
160+
case 'i':
161+
myid = atoi(optarg);
162+
break;
163+
case 'r':
164+
if (myid == NOBODY) {
165+
usage(argv[0]);
166+
return EXIT_FAILURE;
167+
}
168+
169+
str = strtok(optarg, ":");
170+
if (str) {
171+
id = atoi(str);
172+
} else {
173+
usage(argv[0]);
174+
return EXIT_FAILURE;
175+
}
176+
177+
host = strtok(NULL, ":");
178+
179+
str = strtok(NULL, ":");
180+
if (str) {
181+
port = atoi(str);
182+
} else {
183+
usage(argv[0]);
184+
return EXIT_FAILURE;
185+
}
186+
187+
if (!raft_peer_up(raft, id, host, port, id == myid)) {
188+
usage(argv[0]);
189+
return EXIT_FAILURE;
190+
}
191+
if (id == myid) {
192+
myhost = host;
193+
myport = port;
194+
}
195+
peernum++;
196+
break;
197+
case 'l':
198+
logfilename = optarg;
199+
daemonize = true;
200+
break;
201+
case 'h':
202+
usage(argv[0]);
203+
return EXIT_SUCCESS;
204+
default:
205+
usage(argv[0]);
206+
return EXIT_FAILURE;
207+
}
208+
}
209+
if (!myhost) {
210+
usage(argv[0]);
211+
return EXIT_FAILURE;
212+
}
213+
214+
if (logfilename) {
215+
if (!freopen(logfilename, "a", stdout)) {
216+
// nowhere to report this failure
217+
return EXIT_FAILURE;
218+
}
219+
if (!freopen(logfilename, "a", stderr)) {
220+
// nowhere to report this failure
221+
return EXIT_FAILURE;
222+
}
223+
}
224+
225+
if (daemonize) {
226+
if (daemon(true, true) == -1) {
227+
shout("could not daemonize: %s\n", strerror(errno));
228+
return EXIT_FAILURE;
229+
}
230+
}
231+
232+
signal(SIGTERM, die);
233+
signal(SIGINT, die);
234+
235+
main_loop(myhost, myport);
236+
237+
return EXIT_SUCCESS;
238+
}

include/raft.h

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#ifndef RAFT_H
2+
#define RAFT_H
3+
4+
#define NOBODY -1
5+
6+
typedef struct raft_data_t *raft_t;
7+
typedef struct raft_msg_data_t *raft_msg_t;
8+
9+
typedef struct raft_update_t {
10+
int len;
11+
char *data;
12+
void *userdata; // use this to track which query caused this update
13+
} raft_update_t;
14+
15+
// --- Callbacks ---
16+
17+
// This should be a function that applies an 'update' to the state machine.
18+
// 'snapshot' is true if 'update' contains a snapshot. 'userdata' is the
19+
// userdata that raft was configured with.
20+
typedef void (*raft_applier_t)(void *userdata, raft_update_t update, bool snapshot);
21+
22+
// This should be a function that makes a snapshot of the state machine. Used
23+
// for raft log compaction. 'userdata' is the userdata that raft was configured
24+
// with.
25+
typedef raft_update_t (*raft_snapshooter_t)(void *userdata);
26+
27+
// --- Configuration ---
28+
29+
typedef struct raft_config_t {
30+
int peernum_max;
31+
32+
int heartbeat_ms;
33+
int election_ms_min;
34+
int election_ms_max;
35+
36+
int log_len;
37+
38+
int chunk_len;
39+
int msg_len_max;
40+
41+
void *userdata; // this will get passed to applier() and snapshooter()
42+
raft_applier_t applier;
43+
raft_snapshooter_t snapshooter;
44+
} raft_config_t;
45+
46+
// Initialize a raft instance. Returns NULL on failure.
47+
raft_t raft_init(raft_config_t *config);
48+
49+
// Add a peer named 'id'. 'self' should be true, if that peer is this instance.
50+
// Only one peer should have 'self' == true.
51+
bool raft_peer_up(raft_t r, int id, char *host, int port, bool self);
52+
53+
// Remove a previously added peer named 'id'.
54+
bool raft_peer_down(raft_t r, int id);
55+
56+
// --- Log Actions ---
57+
58+
// Emit an 'update'. Returns true if emitted successfully.
59+
bool raft_emit(raft_t r, raft_update_t update);
60+
61+
// --- Control ---
62+
63+
// Note, that UDP socket and raft messages are exposed to the user. This gives
64+
// the user the opportunity to incorporate the socket with other sockets in
65+
// select() or poll(). Thus, the messages will be processed as soon as they
66+
// come, not as soon as we call raft_tick().
67+
68+
// Perform various raft logic tied to time. Call this function once in a while
69+
// and pass the elapsed 'msec' from the previous call. This function will only
70+
// trigger time-related events, and will not receive and process messages (see
71+
// the note above).
72+
void raft_tick(raft_t r, int msec);
73+
74+
// Receive a raft message. Returns NULL if no message available.
75+
raft_msg_t raft_recv_message(raft_t r);
76+
77+
// Process the message.
78+
void raft_handle_message(raft_t r, raft_msg_t m);
79+
80+
// Create the raft socket.
81+
int raft_create_udp_socket(raft_t r);
82+
83+
// Returns true if this peer thinks it is the leader.
84+
bool raft_is_leader(raft_t r);
85+
86+
// Returns the id of the current leader, or NOBODY if no leader.
87+
int raft_get_leader(raft_t r);
88+
89+
#endif

0 commit comments

Comments
 (0)