Skip to content

Commit 535c5a2

Browse files
committed
Fix fatal bugs in raftable.
1 parent 1349cbf commit 535c5a2

File tree

3 files changed

+33
-10
lines changed

3 files changed

+33
-10
lines changed

contrib/raftable/raftable.c

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,11 @@ static void *get_shared_state(void)
5252
return shared.state;
5353
}
5454

55-
static void try_next_peer(void)
55+
static void select_next_peer(void)
5656
{
57-
while (!wcfg.peers[*shared.leader].up)
57+
do {
5858
*shared.leader = (*shared.leader + 1) % RAFTABLE_PEERS_MAX;
59+
} while (!wcfg.peers[*shared.leader].up);
5960
}
6061

6162
static void disconnect_leader(void)
@@ -64,7 +65,7 @@ static void disconnect_leader(void)
6465
{
6566
close(leadersock);
6667
}
67-
try_next_peer();
68+
select_next_peer();
6869
leadersock = -1;
6970
}
7071

@@ -74,6 +75,9 @@ static bool connect_leader(void)
7475
struct addrinfo hint;
7576
char portstr[6];
7677
struct addrinfo *a;
78+
int rc;
79+
80+
if (*shared.leader == NOBODY) select_next_peer();
7781

7882
HostPort *leaderhp = wcfg.peers + *shared.leader;
7983

@@ -83,10 +87,12 @@ static bool connect_leader(void)
8387
snprintf(portstr, 6, "%d", leaderhp->port);
8488
hint.ai_protocol = getprotobyname("tcp")->p_proto;
8589

86-
if (getaddrinfo(leaderhp->host, portstr, &hint, &addrs))
90+
if ((rc = getaddrinfo(leaderhp->host, portstr, &hint, &addrs)))
8791
{
8892
disconnect_leader();
89-
perror("failed to resolve address");
93+
fprintf(stderr, "failed to resolve address '%s:%d': %s",
94+
leaderhp->host, leaderhp->port,
95+
gai_strerror(rc));
9096
return false;
9197
}
9298

@@ -168,11 +174,14 @@ void raftable_set(char *key, char *value)
168174
size += vallen;
169175
ru = palloc(size);
170176

177+
ru->expector = wcfg.id;
178+
ru->fieldnum = 1;
179+
171180
RaftableField *f = (RaftableField *)ru->data;
172181
f->keylen = keylen;
173182
f->vallen = vallen;
174183
memcpy(f->data, key, keylen);
175-
memcpy(f->data + keylen, key, vallen);
184+
memcpy(f->data + keylen, value, vallen);
176185

177186
bool ok = false;
178187
while (!ok)
@@ -201,6 +210,18 @@ void raftable_set(char *key, char *value)
201210
}
202211
sent += newbytes;
203212
}
213+
214+
if (ok)
215+
{
216+
int status;
217+
int recved = read(s, &status, sizeof(status));
218+
if (recved != sizeof(status))
219+
{
220+
disconnect_leader();
221+
fprintf(stderr, "failed to recv the update status from the leader\n");
222+
ok = false;
223+
}
224+
}
204225
}
205226

206227
pfree(ru);
@@ -358,6 +379,7 @@ _PG_init(void)
358379
);
359380
parse_peers(wcfg.peers, peerstr);
360381

382+
request_shmem();
361383
worker_register(&wcfg);
362384

363385
PreviousShmemStartupHook = shmem_startup_hook;

contrib/raftable/state.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,6 @@ bool state_next(StateP state, void *scan, char **key, char **value)
239239
}
240240
else
241241
{
242-
hash_seq_term((HASH_SEQ_STATUS *)scan);
243242
LWLockRelease(state->lock);
244243
pfree(scan);
245244
return false;
@@ -248,11 +247,13 @@ bool state_next(StateP state, void *scan, char **key, char **value)
248247

249248
void state_shmem_request()
250249
{
250+
int flags;
251251
HASHCTL info;
252252
info.keysize = sizeof(RaftableKey);
253253
info.entrysize = sizeof(RaftableEntry);
254254
info.dsize = info.max_dsize = hash_select_dirsize(RAFTABLE_HASH_SIZE);
255-
RequestAddinShmemSpace(RAFTABLE_BLOCK_MEM + sizeof(State) + hash_get_shared_size(&info, HASH_ELEM));
255+
flags = HASH_SHARED_MEM | HASH_ALLOC | HASH_DIRSIZE | HASH_ELEM;
256+
RequestAddinShmemSpace(RAFTABLE_BLOCK_MEM + sizeof(State) + hash_get_shared_size(&info, flags));
256257
RequestAddinLWLocks(1);
257258
}
258259

contrib/raftable/worker.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -442,9 +442,9 @@ static void worker_main(Datum arg)
442442

443443
void worker_register(WorkerConfig *cfg)
444444
{
445-
BackgroundWorker worker;
445+
BackgroundWorker worker = {};
446446
strcpy(worker.bgw_name, "raftable worker");
447-
worker.bgw_flags = 0;
447+
worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
448448
worker.bgw_start_time = BgWorkerStart_PostmasterStart;
449449
worker.bgw_restart_time = 1;
450450
worker.bgw_main = worker_main;

0 commit comments

Comments
 (0)