Skip to content

Commit 50483ab

Browse files
committed
2 parents 7cacd2b + 4aa6a83 commit 50483ab

File tree

26 files changed

+1953
-418
lines changed

26 files changed

+1953
-418
lines changed

contrib/pg_dtm/README

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ The format of all commands:
4343

4444
The commands:
4545

46+
'h': hello()
47+
The first message.
48+
49+
The arbiter replies with:
50+
[RES_OK] if ready
51+
[RES_FAILED] (or disconnection) if not ready
52+
4653
'r': reserve(minxid, minsize)
4754
Claims a sequence ≥ minsize of xids ≥ minxid for local usage. This will
4855
prevent the arbiter from using those values for global transactions.
@@ -88,7 +95,7 @@ The commands:
8895

8996
The reply and 'wait' logic is the same as for the 'status' command.
9097

91-
'h': snapshot(xid)
98+
't': snapshot(xid)
9299
Tells the arbiter to generate a snapshot for the global transaction
93100
identified by the given 'xid'. The arbiter will create a snapshot for
94101
every participant, so when each of them asks for the snapshot it will

contrib/pg_dtm/dtmd/Makefile

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ CC=gcc
22
CFLAGS=-g -O2 -Wall -Iinclude -D_LARGEFILE64_SOURCE # -DDEBUG
33
SOCKHUB_PREFIX=../sockhub
44
SOCKHUB_CFLAGS=-I"$(SOCKHUB_PREFIX)"
5-
SOCKHUB_LDFLAGS=-lsockhub -L"$(SOCKHUB_PREFIX)"
65

76
SYSTEM=$(shell uname -s)
87
ifeq ($(SYSTEM),Darwin)
@@ -11,15 +10,20 @@ endif
1110

1211
.PHONY: all clean check bindir objdir
1312

14-
all: bin/dtmd
13+
all: bin/dtmd bin/heart
1514
@echo Done.
1615
@echo Feel free to run the tests with \'make check\'.
1716

18-
bin/dtmd: obj/server.o obj/main.o obj/clog.o obj/clogfile.o obj/util.o obj/transaction.o obj/snapshot.o obj/ddd.o | bindir objdir
17+
bin/dtmd: obj/server.o obj/raft.o obj/main.o obj/clog.o obj/clogfile.o obj/util.o obj/transaction.o obj/snapshot.o obj/ddd.o | bindir objdir
1918
$(CC) -o bin/dtmd $(CFLAGS) \
20-
obj/server.o obj/main.o \
19+
obj/server.o obj/raft.o obj/main.o \
2120
obj/clog.o obj/clogfile.o obj/util.o obj/transaction.o \
22-
obj/snapshot.o obj/ddd.o \
21+
obj/snapshot.o obj/ddd.o
22+
23+
bin/heart: obj/heart.o obj/raft.o obj/util.o | bindir objdir
24+
$(CC) -o bin/heart $(CFLAGS) \
25+
obj/heart.o obj/raft.o obj/util.o \
26+
obj/snapshot.o \
2327
$(SOCKHUB_LDFLAGS)
2428

2529
obj/server.o: src/server.c | objdir
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#ifndef DTMD_LIMITS_H
2+
#define DTMD_LIMITS_H
3+
4+
// how many xids are reserved per raft term
5+
#define XIDS_PER_TERM 1000000
6+
7+
// start a new term when this number of xids is left
8+
#define NEW_TERM_THRESHOLD 100000
9+
10+
#define MAX_TRANSACTIONS 4096
11+
12+
#define BUFFER_SIZE (64 * 1024)
13+
#define LISTEN_QUEUE_SIZE 100
14+
#define MAX_STREAMS 4096
15+
16+
#define MAX_SERVERS 16
17+
#define HEARTBEAT_TIMEOUT_MS 20
18+
#define ELECTION_TIMEOUT_MS_MIN 150
19+
#define ELECTION_TIMEOUT_MS_MAX 300
20+
#define RAFT_LOGLEN 1024
21+
#define RAFT_KEEP_APPLIED 512 // how many applied entries to keep during compaction
22+
23+
#endif

contrib/pg_dtm/dtmd/include/limits.h

Lines changed: 0 additions & 10 deletions
This file was deleted.

contrib/pg_dtm/dtmd/include/proto.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
#ifndef PROTO_H
22
#define PROTO_H
33

4+
#define CMD_HELLO 'h'
45
#define CMD_RESERVE 'r'
56
#define CMD_BEGIN 'b'
67
#define CMD_FOR 'y'
78
#define CMD_AGAINST 'n'
8-
#define CMD_SNAPSHOT 'h'
9+
#define CMD_SNAPSHOT 't'
910
#define CMD_STATUS 's'
1011
#define CMD_DEADLOCK 'd'
1112

1213
#define RES_FAILED 0xDEADBEEF
1314
#define RES_OK 0xC0FFEE
15+
#define RES_REDIRECT 404
1416
#define RES_DEADLOCK 0xDEADDEED
1517
#define RES_TRANSACTION_COMMITTED 1
1618
#define RES_TRANSACTION_ABORTED 2

contrib/pg_dtm/dtmd/include/raft.h

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
#ifndef RAFT_H
2+
#define RAFT_H
3+
4+
#include <arpa/inet.h>
5+
#include <stdbool.h>
6+
#include "dtmdlimits.h"
7+
8+
#define NOBODY -1
9+
10+
#define MAJORITY_IS_NOT_ENOUGH // wait for unanimous ack for applying a new entry
11+
12+
#define DEFAULT_LISTENHOST "0.0.0.0"
13+
#define DEFAULT_LISTENPORT 5431
14+
15+
#define ROLE_FOLLOWER 0
16+
#define ROLE_CANDIDATE 1
17+
#define ROLE_LEADER 2
18+
19+
#if RAFT_KEEP_APPLIED >= RAFT_LOGLEN
20+
#error please ensure RAFT_KEEP_APPLIED < RAFT_LOGLEN
21+
#endif
22+
23+
#if HEARTBEAT_TIMEOUT_MS >= ELECTION_TIMEOUT_MS_MIN
24+
#error please ensure HEARTBEAT_TIMEOUT_MS < ELECTION_TIMEOUT_MS_MIN (considerably)
25+
#endif
26+
27+
#if ELECTION_TIMEOUT_MS_MIN >= ELECTION_TIMEOUT_MS_MAX
28+
#error please ensure ELECTION_TIMEOUT_MS_MIN < ELECTION_TIMEOUT_MS_MAX
29+
#endif
30+
31+
// raft module does not care what you mean by action and argument
32+
typedef struct raft_entry_t {
33+
int term;
34+
bool snapshot; // true if this is a snapshot entry
35+
union {
36+
struct { // snapshot == false
37+
int action;
38+
int argument;
39+
};
40+
struct { // snapshot == true
41+
int minarg;
42+
int maxarg;
43+
};
44+
};
45+
} raft_entry_t;
46+
47+
typedef void (*raft_applier_t)(int action, int argument);
48+
49+
typedef struct raft_log_t {
50+
int first;
51+
int size; // number of entries past first
52+
int acked; // number of entries replicated to the majority of servers
53+
int applied; // number of entries applied to the state machine
54+
raft_entry_t entries[RAFT_LOGLEN]; // wraps around
55+
} raft_log_t;
56+
57+
typedef struct raft_server_t {
58+
int seqno; // the rpc sequence number
59+
int tosend; // index of the next entry to send
60+
int acked; // index of the highest entry known to be replicated
61+
62+
char *host;
63+
int port;
64+
struct sockaddr_in addr;
65+
} raft_server_t;
66+
67+
typedef struct raft_t {
68+
int term; // current term (latest term we have seen)
69+
int vote; // who received our vote in current term
70+
int role;
71+
int me; // my id
72+
int votes; // how many votes are for me (if candidate)
73+
int leader; // the id of the leader
74+
raft_log_t log;
75+
76+
int sock;
77+
78+
int servernum;
79+
raft_server_t servers[MAX_SERVERS];
80+
81+
int timer;
82+
83+
raft_applier_t applier;
84+
} raft_t;
85+
86+
#define RAFT_LOG(RAFT, INDEX) ((RAFT)->log.entries[(INDEX) % (RAFT_LOGLEN)])
87+
88+
#define RAFT_MSG_UPDATE 0 // append entry
89+
#define RAFT_MSG_DONE 1 // entry appended
90+
#define RAFT_MSG_CLAIM 2 // vote for me
91+
#define RAFT_MSG_VOTE 3 // my vote
92+
93+
typedef struct raft_msg_t {
94+
int msgtype;
95+
int term;
96+
int from;
97+
int seqno;
98+
} raft_msg_t;
99+
100+
typedef struct raft_msg_update_t {
101+
raft_msg_t msg;
102+
int previndex; // the index of the preceding log entry
103+
int prevterm; // the term of the preceding log entry
104+
105+
bool empty; // the message is just a heartbeat if empty
106+
raft_entry_t entry;
107+
108+
int acked; // the leader's acked number
109+
} raft_msg_update_t;
110+
111+
typedef struct raft_msg_done_t {
112+
raft_msg_t msg;
113+
int index; // the index of the appended entry
114+
int term; // the term of the appended entry
115+
bool success;
116+
} raft_msg_done_t;
117+
118+
typedef struct raft_msg_claim_t {
119+
raft_msg_t msg;
120+
int index; // the index of my last entry
121+
int term; // the term of my last entry
122+
} raft_msg_claim_t;
123+
124+
typedef struct raft_msg_vote_t {
125+
raft_msg_t msg;
126+
bool granted;
127+
} raft_msg_vote_t;
128+
129+
// configuration
130+
void raft_init(raft_t *r);
131+
bool raft_add_server(raft_t *r, char *host, int port);
132+
bool raft_set_myid(raft_t *r, int myid);
133+
134+
// log actions
135+
bool raft_emit(raft_t *r, int action, int argument);
136+
int raft_apply(raft_t *r, raft_applier_t applier);
137+
138+
// control
139+
void raft_tick(raft_t *r, int msec);
140+
void raft_handle_message(raft_t *r, raft_msg_t *m);
141+
raft_msg_t *raft_recv_message(raft_t *r);
142+
int raft_create_udp_socket(raft_t *r);
143+
void raft_start_next_term(raft_t *r);
144+
145+
#endif

contrib/pg_dtm/dtmd/include/server.h

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,37 @@ server_t server_init(
4040
ondisconnect_callback_t ondisconnect
4141
);
4242

43+
/*
44+
* Assigns the given raft socket to the server. The server will add the socket
45+
* to the 'select' calls and give you the incoming messages.
46+
*/
47+
void server_set_raft_socket(server_t server, int sock);
48+
4349
/*
4450
* Starts the server. Returns 'true' on success, 'false' otherwise.
4551
*/
4652
bool server_start(server_t server);
4753

4854
/*
49-
* The main server loop. Does not return, so use the callbacks and signal
50-
* handlers to add more logic.
55+
* The main server loop. Returns true if there is a raft message ready, or NULL
56+
* if timed out. Use the callbacks and signal handlers to add more logic.
57+
*/
58+
bool server_tick(server_t server, int timeout_ms);
59+
60+
/*
61+
* Closes all client connections on the server and refuses to accept new ones.
62+
*/
63+
void server_disable(server_t server);
64+
65+
/*
66+
* Allows the server to accept new connections.
5167
*/
52-
void server_loop(server_t server);
68+
void server_enable(server_t server);
69+
70+
/*
71+
* Enables or disables the server depending on the argument.
72+
*/
73+
void server_set_enabled(server_t server, bool enable);
5374

5475
/*
5576
* These two methods allow you to set and get your custom 'userdata' for the
@@ -98,6 +119,13 @@ bool client_message_finish(client_t client);
98119
*/
99120
bool client_message_shortcut(client_t client, xid_t arg);
100121

122+
/*
123+
* A shortcut to send the 'redirect' message.
124+
*
125+
* Returns 'true' on success, 'false' otherwise.
126+
*/
127+
bool client_redirect(client_t client, unsigned addr, int port);
128+
101129
unsigned client_get_ip_addr(client_t client);
102130

103131
#endif

contrib/pg_dtm/dtmd/include/snapshot.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#define SNAPSHOT_H
33

44
#include "int.h"
5-
#include "limits.h"
5+
#include "dtmdlimits.h"
66

77
typedef struct Snapshot {
88
xid_t xmin;

contrib/pg_dtm/dtmd/include/transaction.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
#include "int.h"
66
#include "clog.h"
77
#include "snapshot.h"
8-
#include "limits.h"
8+
#include "dtmdlimits.h"
99

1010
#define MAX_SNAPSHOTS_PER_TRANS 8
1111

@@ -68,6 +68,6 @@ int transaction_status(Transaction *t);
6868
void transaction_clear(Transaction *t);
6969
void transaction_push_listener(Transaction *t, char cmd, void *listener);
7070
void *transaction_pop_listener(Transaction *t, char cmd);
71-
bool transaction_participate(Transaction *t, int clientid);
71+
bool transaction_remove_listener(Transaction *t, char cmd, void *listener);
7272

7373
#endif

contrib/pg_dtm/dtmd/include/util.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,40 @@
77

88
#include <stdbool.h>
99
#include <sys/stat.h>
10+
#include <sys/time.h>
1011
#include <fcntl.h>
1112
#include <stdio.h>
13+
#include <stdlib.h>
1214

1315
#include "int.h"
1416

1517
char *join_path(const char *dir, const char *file);
1618
bool inrange(xid_t min, xid_t x, xid_t max);
1719
int falloc(int fd, off64_t size);
1820

21+
static inline int min(int a, int b) {
22+
return a < b ? a : b;
23+
}
24+
25+
static inline int max(int a, int b) {
26+
return a > b ? a : b;
27+
}
28+
29+
static inline int rand_between(int min, int max) {
30+
return rand() % (max - min + 1) + min;
31+
}
32+
33+
// ------ timing ------
34+
35+
typedef struct mstimer_t {
36+
struct timeval tv;
37+
} mstimer_t;
38+
39+
int mstimer_reset(mstimer_t *t);
40+
struct timeval ms2tv(int ms);
41+
42+
// ------ logging ------
43+
1944
#ifndef DEBUG
2045
#define debug(...)
2146
#else

0 commit comments

Comments
 (0)