Skip to content

Commit 1db67dc

Browse files
committed
Support replication of DDL
1 parent 29c7901 commit 1db67dc

File tree

4 files changed

+258
-87
lines changed

4 files changed

+258
-87
lines changed

contrib/multimaster/dtmd/src/server.c

Lines changed: 116 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -16,53 +16,61 @@
1616
#include <netinet/tcp.h>
1717
#include <netinet/in.h>
1818

19+
#ifdef USE_EPOLL
20+
#include <sys/epoll.h>
21+
#endif
22+
1923
#include "server.h"
2024
#include "limits.h"
2125
#include "util.h"
2226
#include "sockhub.h"
2327

2428
typedef struct buffer_t {
25-
int ready; // number of bytes that are ready to be sent/processed
29+
int ready; /* number of bytes that are ready to be sent/processed */
2630
ShubMessageHdr *curmessage;
27-
char *data; // dynamically allocated buffer
31+
char *data; /* dynamically allocated buffer */
2832
} buffer_t;
2933

3034
typedef struct stream_data_t *stream_t;
3135

3236
typedef struct client_data_t {
33-
stream_t stream; // NULL: client value is empty
37+
stream_t stream; /* NULL: client value is empty */
3438
void *userdata;
3539
unsigned int chan;
3640
} client_data_t;
3741

3842
typedef struct stream_data_t {
3943
int fd;
40-
bool good; // 'false': stop serving this stream and disconnect when possible
44+
bool good; /* 'false': stop serving this stream and disconnect when possible */
4145
buffer_t input;
4246
buffer_t output;
4347

44-
// a map: 'chan' -> client_data_t
45-
// 'chan' is expected to be < MAX_FDS which is pretty low
46-
client_data_t *clients; // dynamically allocated
48+
/* a map: 'chan' -> client_data_t */
49+
/* 'chan' is expected to be < MAX_FDS which is pretty low */
50+
client_data_t *clients; /* dynamically allocated */
51+
struct stream_data_t* next;
4752
} stream_data_t;
4853

4954
typedef struct server_data_t {
5055
char *host;
5156
int port;
5257

53-
int listener; // the listening socket
54-
fd_set all; // all sockets including the listener
58+
int listener; /* the listening socket */
59+
#ifdef USE_EPOLL
60+
int epollfd;
61+
#else
62+
fd_set all; /* all sockets including the listener */
5563
int maxfd;
56-
57-
int streamsnum;
58-
stream_data_t streams[MAX_STREAMS];
64+
#endif
65+
stream_t used_chain;
66+
stream_t free_chain;
5967

6068
onmessage_callback_t onmessage;
6169
onconnect_callback_t onconnect;
6270
ondisconnect_callback_t ondisconnect;
6371
} server_data_t;
6472

65-
// Returns the created socket, or -1 if failed.
73+
/* Returns the created socket, or -1 if failed. */
6674
static int create_listening_socket(const char *host, int port) {
6775
int s = socket(AF_INET, SOCK_STREAM, 0);
6876
if (s == -1) {
@@ -113,32 +121,56 @@ server_t server_init(
113121
return server;
114122
}
115123

124+
bool register_socket(server_t server, int fd, stream_t stream)
125+
{
126+
#ifdef USE_EPOLL
127+
struct epoll_event ev;
128+
ev.events = EPOLLIN;
129+
ev.data.ptr = (void*)stream;
130+
if (epoll_ctl(server->epollfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
131+
return false;
132+
}
133+
#else
134+
FD_SET(fd, &server->all);
135+
if (fd > server->maxfd) {
136+
server->maxfd = fd;
137+
}
138+
#endif
139+
return true;
140+
}
141+
116142
bool server_start(server_t server) {
117143
debug("starting the server\n");
118-
server->streamsnum = 0;
119-
144+
server->free_chain = NULL;
145+
server->used_chain = NULL;
146+
120147
server->listener = create_listening_socket(server->host, server->port);
121148
if (server->listener == -1) {
122149
return false;
123150
}
124151

152+
#ifdef USE_EPOLL
153+
server->epollfd = epoll_create(MAX_EVENTS);
154+
if (server->epollfd < 0) {
155+
return false;
156+
}
157+
#else
125158
FD_ZERO(&server->all);
126-
FD_SET(server->listener, &server->all);
127-
server->maxfd = server->listener;
128-
129-
return true;
159+
server->maxfd = 0;
160+
#endif
161+
return register_socket(server, server->listener, NULL);
130162
}
131163

132164
static bool stream_flush(stream_t stream) {
133165
int tosend = stream->output.ready;
134166
if (tosend == 0) {
135-
// nothing to do
167+
/* nothing to do */
136168
return true;
137169
}
138170

139171
char *cursor = stream->output.data;
140172
while (tosend > 0) {
141-
// repeat sending until we send everything
173+
/* repeat sending until we send everything */
142174
int sent = send(stream->fd, cursor, tosend, 0);
143175
if (sent == -1) {
144176
shout("failed to flush the stream\n");
@@ -153,7 +185,7 @@ static bool stream_flush(stream_t stream) {
153185
stream->output.ready = 0;
154186
ShubMessageHdr *msg = stream->output.curmessage;
155187
if (msg) {
156-
// move the unfinished message to the start of the buffer
188+
/* move the unfinished message to the start of the buffer */
157189
memmove(stream->output.data, msg, msg->size + sizeof(ShubMessageHdr));
158190
stream->output.curmessage = (ShubMessageHdr*)stream->output.data;
159191
}
@@ -163,10 +195,9 @@ static bool stream_flush(stream_t stream) {
163195

164196
static void server_flush(server_t server) {
165197
debug("flushing the streams\n");
166-
int i;
167-
for (i = 0; i < server->streamsnum; i++) {
168-
stream_t stream = server->streams + i;
169-
stream_flush(stream);
198+
stream_t s;
199+
for (s = server->used_chain; s != NULL; s = s->next) {
200+
stream_flush(s);
170201
}
171202
}
172203

@@ -187,7 +218,7 @@ static void stream_init(stream_t stream, int fd) {
187218

188219
stream->clients = malloc(MAX_TRANSACTIONS * sizeof(client_data_t));
189220
assert(stream->clients);
190-
// mark all clients as empty
221+
/* mark all clients as empty */
191222
for (i = 0; i < MAX_TRANSACTIONS; i++) {
192223
stream->clients[i].stream = NULL;
193224
}
@@ -207,36 +238,28 @@ static void server_stream_destroy(server_t server, stream_t stream) {
207238
}
208239
}
209240
}
210-
211-
FD_CLR(stream->fd, &server->all);
241+
#ifdef USE_EPOLL
242+
epoll_ctl(server->epollfd, EPOLL_CTL_DEL, stream->fd, NULL);
243+
#else
244+
FD_CLR(stream->fd, &server->all);
245+
#endif
212246
close(stream->fd);
213247
free(stream->clients);
214248
free(stream->input.data);
215249
free(stream->output.data);
216250
}
217251

218-
static void stream_move(stream_t dst, stream_t src) {
219-
int i;
220-
*dst = *src;
221-
for (i = 0; i < MAX_TRANSACTIONS; i++) {
222-
if (dst->clients[i].stream) {
223-
dst->clients[i].stream = dst;
224-
}
225-
}
226-
}
227-
228252
static void server_close_bad_streams(server_t server) {
229-
int i;
230-
for (i = server->streamsnum - 1; i >= 0; i--) {
231-
stream_t stream = server->streams + i;
232-
if (!stream->good) {
233-
server_stream_destroy(server, stream);
234-
if (i != server->streamsnum - 1) {
235-
// move the last one here
236-
*stream = server->streams[server->streamsnum - 1];
237-
stream_move(stream, server->streams + server->streamsnum - 1);
238-
}
239-
server->streamsnum--;
253+
stream_t s, next, *spp;
254+
for (spp = &server->used_chain; (s = *spp) != NULL; s = next) {
255+
next = s->next;
256+
if (!s->good) {
257+
server_stream_destroy(server, s);
258+
*spp = next;
259+
s->next = server->free_chain;
260+
server->free_chain = s;
261+
} else {
262+
spp = &s->next;
240263
}
241264
}
242265
}
@@ -279,7 +302,7 @@ static bool stream_message_append(stream_t stream, size_t len, void *data) {
279302

280303
int newsize = stream->output.curmessage->size + sizeof(ShubMessageHdr) + len;
281304
if (newsize > BUFFER_SIZE) {
282-
// the flushing will not help here
305+
/* the flushing will not help here */
283306
shout("the message cannot be bigger than the buffer size\n");
284307
stream->good = false;
285308
return false;
@@ -326,7 +349,8 @@ bool client_message_finish(client_t client) {
326349
return stream_message_finish(client->stream);
327350
}
328351

329-
bool client_message_shortcut(client_t client, xid_t arg) {
352+
bool client_message_shortcut(client_t client, xid_t arg)
353+
{
330354
if (!stream_message_start(client->stream, client->chan)) {
331355
return false;
332356
}
@@ -348,36 +372,33 @@ static bool server_accept(server_t server) {
348372
return false;
349373
}
350374
debug("a new connection accepted\n");
351-
352-
if (server->streamsnum >= MAX_STREAMS) {
353-
shout("streams limit hit, disconnecting the accepted connection\n");
354-
close(fd);
355-
return false;
375+
376+
stream_t s = server->free_chain;
377+
if (s == NULL) {
378+
s = malloc(sizeof(stream_data_t));
379+
} else {
380+
server->free_chain = s->next;
356381
}
382+
/* add new stream */
383+
s->next = server->used_chain;
384+
server->used_chain = s;
357385

358-
// add new stream
359-
stream_t s = server->streams + server->streamsnum++;
360386
stream_init(s, fd);
361387

362-
FD_SET(fd, &server->all);
363-
if (fd > server->maxfd) {
364-
server->maxfd = fd;
365-
}
366-
367-
return true;
388+
return register_socket(server, fd, s);
368389
}
369390

370391
static client_t stream_get_client(stream_t stream, unsigned int chan, bool *isnew) {
371392
assert(chan < MAX_TRANSACTIONS);
372393
client_t client = stream->clients + chan;
373394
if (client->stream == NULL) {
374-
// client is new
395+
/* client is new */
375396
client->stream = stream;
376397
client->chan = chan;
377398
*isnew = true;
378399
client->userdata = NULL;
379400
} else {
380-
// collisions should not happen
401+
/* collisions should not happen */
381402
assert(client->chan == chan);
382403
*isnew = false;
383404
}
@@ -412,7 +433,7 @@ static bool server_stream_handle(server_t server, stream_t stream) {
412433
ShubMessageHdr *msg = (ShubMessageHdr*)cursor;
413434
int header_and_data = sizeof(ShubMessageHdr) + msg->size;
414435
if (header_and_data <= toprocess) {
415-
// handle message
436+
/* handle message */
416437
bool isnew;
417438
client_t client = stream_get_client(stream, msg->chan, &isnew);
418439
if (isnew) {
@@ -457,9 +478,30 @@ static bool server_stream_handle(server_t server, stream_t stream) {
457478
void server_loop(server_t server) {
458479
while (1) {
459480
int i;
481+
int numready;
482+
#ifdef USE_EPOLL
483+
struct epoll_event events[MAX_EVENTS];
484+
numready = epoll_wait(server->epollfd, events, MAX_EVENTS, -1);
485+
if (numready < 0) {
486+
shout("failed to select: %s\n", strerror(errno));
487+
return;
488+
}
489+
for (i = 0; i < numready; i++) {
490+
stream_t stream = (stream_t)events[i].data.ptr;
491+
if (stream == NULL) {
492+
server_accept(server);
493+
} else {
494+
if (events[i].events & EPOLLERR) {
495+
stream->good = false;
496+
} else if (events[i].events & EPOLLIN) {
497+
server_stream_handle(server, stream);
498+
}
499+
}
500+
}
501+
#else
460502
fd_set readfds = server->all;
461-
debug("selecting\n");
462503
int numready = select(server->maxfd + 1, &readfds, NULL, NULL, NULL);
504+
stream_t s;
463505
if (numready == -1) {
464506
shout("failed to select: %s\n", strerror(errno));
465507
return;
@@ -470,14 +512,13 @@ void server_loop(server_t server) {
470512
server_accept(server);
471513
}
472514

473-
for (i = 0; (i < server->streamsnum) && (numready > 0); i++) {
474-
stream_t stream = server->streams + i;
475-
if (FD_ISSET(stream->fd, &readfds)) {
476-
server_stream_handle(server, stream);
515+
for (s = server_used_chain; s != NULL && numready > 0; s = s->next) {
516+
if (FD_ISSET(s->fd, &readfds)) {
517+
server_stream_handle(server, s);
477518
numready--;
478519
}
479520
}
480-
521+
#endif
481522
server_close_bad_streams(server);
482523
server_flush(server);
483524
}
@@ -501,7 +542,7 @@ unsigned client_get_ip_addr(client_t client)
501542
}
502543

503544
#if 0
504-
// usage example
545+
/* usage example */
505546

506547
void test_onconnect(client_t client) {
507548
char *name = "hello";

0 commit comments

Comments
 (0)