Skip to content

Commit 3b549f5

Browse files
committed
Merge branch 'master' of github.com:postgrespro/postgres_cluster
2 parents 8aaae6a + 466f3c1 commit 3b549f5

File tree

3 files changed

+47
-25
lines changed

3 files changed

+47
-25
lines changed

contrib/mmts/multimaster.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1290,7 +1290,7 @@ void MtmCheckQuorum(void)
12901290
}
12911291
} else {
12921292
if (Mtm->status == MTM_IN_MINORITY) {
1293-
MTM_LOG1("Node is in majority: dissbled mask %lx", (long) Mtm->disabledNodeMask);
1293+
MTM_LOG1("Node is in majority: disabled mask %lx", (long) Mtm->disabledNodeMask);
12941294
MtmSwitchClusterMode(MTM_ONLINE);
12951295
}
12961296
}

contrib/mmts/pglogical_receiver.c

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ static char const* const MtmReplicationModeName[] =
194194
{
195195
"recovered", /* SLOT_CREATE_NEW: recovery of node is completed so drop old slot and restart replication from the current position in WAL */
196196
"recovery", /* SLOT_OPEN_EXISTED: perform recorvery of the node by applying all data from theslot from specified point */
197-
"normal" /* SLOT_OPEN_ALWAYS: normal mode: use existeed slot or create new one and start receiving data from it from the specified position */
197+
"normal" /* SLOT_OPEN_ALWAYS: normal mode: use existed slot or create new one and start receiving data from it from the specified position */
198198
};
199199

200200
static void
@@ -248,6 +248,7 @@ pglogical_receiver_main(Datum main_arg)
248248
PQfinish(conn);
249249
ereport(WARNING, (errmsg("%s: Could not establish connection to remote server",
250250
worker_proc)));
251+
/* Do not make decision about node status here because at startup peer node may just no yet started */
251252
/* MtmOnNodeDisconnect(nodeId); */
252253
proc_exit(1);
253254
}
@@ -271,6 +272,7 @@ pglogical_receiver_main(Datum main_arg)
271272
PQclear(res);
272273
ereport(ERROR, (errmsg("%s: Could not create logical slot",
273274
worker_proc)));
275+
MtmOnNodeDisconnect(nodeId);
274276
proc_exit(1);
275277
}
276278
}
@@ -312,6 +314,7 @@ pglogical_receiver_main(Datum main_arg)
312314
PQclear(res);
313315
ereport(WARNING, (errmsg("%s: Could not start logical replication",
314316
worker_proc)));
317+
MtmOnNodeDisconnect(nodeId);
315318
proc_exit(1);
316319
}
317320
PQclear(res);
@@ -402,6 +405,7 @@ pglogical_receiver_main(Datum main_arg)
402405
{
403406
ereport(LOG, (errmsg("%s: streaming header too small: %d",
404407
worker_proc, rc)));
408+
MtmOnNodeDisconnect(nodeId);
405409
proc_exit(1);
406410
}
407411
replyRequested = copybuf[pos];
@@ -421,15 +425,18 @@ pglogical_receiver_main(Datum main_arg)
421425
int64 now = feGetCurrentTimestamp();
422426

423427
/* Leave is feedback is not sent properly */
424-
if (!sendFeedback(conn, now, nodeId))
428+
if (!sendFeedback(conn, now, nodeId)) {
429+
MtmOnNodeDisconnect(nodeId);
425430
proc_exit(1);
431+
}
426432
}
427433
continue;
428434
}
429435
else if (copybuf[0] != 'w')
430436
{
431437
ereport(LOG, (errmsg("%s: Incorrect streaming header",
432438
worker_proc)));
439+
MtmOnNodeDisconnect(nodeId);
433440
proc_exit(1);
434441
}
435442

@@ -538,6 +545,7 @@ pglogical_receiver_main(Datum main_arg)
538545
{
539546
ereport(LOG, (errmsg("%s: Incorrect status received... Leaving.",
540547
worker_proc)));
548+
MtmOnNodeDisconnect(nodeId);
541549
proc_exit(1);
542550
}
543551

@@ -546,6 +554,7 @@ pglogical_receiver_main(Datum main_arg)
546554
{
547555
ereport(LOG, (errmsg("%s: Data remaining on the socket... Leaving.",
548556
worker_proc)));
557+
MtmOnNodeDisconnect(nodeId);
549558
proc_exit(1);
550559
}
551560
continue;
@@ -564,6 +573,7 @@ pglogical_receiver_main(Datum main_arg)
564573
{
565574
ereport(LOG, (errmsg("%s: Failure while receiving changes...",
566575
worker_proc)));
576+
MtmOnNodeDisconnect(nodeId);
567577
proc_exit(1);
568578
}
569579
}

contrib/raftable/raft/src/raft.c

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -499,11 +499,44 @@ static void raft_beat(raft_t r, int dst) {
499499
free(m);
500500
}
501501

502+
static void raft_reset_bytes_acked(raft_t r) {
503+
for (int i = 0; i < r->config.peernum_max; i++) {
504+
r->peers[i].acked.bytes = 0;
505+
}
506+
}
507+
508+
static void raft_reset_silent_time(raft_t r, int id) {
509+
for (int i = 0; i < r->config.peernum_max; i++) {
510+
if ((i == id) || (id == NOBODY)) {
511+
r->peers[i].silent_ms = 0;
512+
}
513+
}
514+
}
515+
516+
// Returns true if we got the support of a majority and became the leader
517+
static bool raft_become_leader(raft_t r) {
518+
if (r->votes * 2 > r->peernum) {
519+
// got the support of a majority
520+
r->role = LEADER;
521+
r->leader = r->me;
522+
raft_reset_bytes_acked(r);
523+
raft_reset_silent_time(r, NOBODY);
524+
raft_reset_timer(r);
525+
shout("became the leader\n");
526+
return true;
527+
}
528+
return false;
529+
}
530+
502531
static void raft_claim(raft_t r) {
503532
assert(r->role == CANDIDATE);
504533
assert(r->leader == NOBODY);
505534

506535
r->votes = 1; // vote for self
536+
if (raft_become_leader(r)) {
537+
// no need to send any messages, since we are alone
538+
return;
539+
}
507540

508541
raft_msg_claim_t m;
509542

@@ -995,20 +1028,6 @@ static void raft_handle_claim(raft_t r, raft_msg_claim_t *m) {
9951028
raft_send(r, candidate, &reply, sizeof(reply));
9961029
}
9971030

998-
static void raft_reset_bytes_acked(raft_t r) {
999-
for (int i = 0; i < r->config.peernum_max; i++) {
1000-
r->peers[i].acked.bytes = 0;
1001-
}
1002-
}
1003-
1004-
static void raft_reset_silent_time(raft_t r, int id) {
1005-
for (int i = 0; i < r->config.peernum_max; i++) {
1006-
if ((i == id) || (id == NOBODY)) {
1007-
r->peers[i].silent_ms = 0;
1008-
}
1009-
}
1010-
}
1011-
10121031
static void raft_handle_vote(raft_t r, raft_msg_vote_t *m) {
10131032
int sender = m->msg.from;
10141033
raft_peer_t *peer = r->peers + sender;
@@ -1022,14 +1041,7 @@ static void raft_handle_vote(raft_t r, raft_msg_vote_t *m) {
10221041
r->votes++;
10231042
}
10241043

1025-
if (r->votes * 2 > r->peernum) {
1026-
// got the support of a majority
1027-
r->role = LEADER;
1028-
r->leader = r->me;
1029-
raft_reset_bytes_acked(r);
1030-
raft_reset_silent_time(r, NOBODY);
1031-
raft_reset_timer(r);
1032-
}
1044+
raft_become_leader(r);
10331045
}
10341046

10351047
void raft_handle_message(raft_t r, raft_msg_t m) {

0 commit comments

Comments
 (0)