Skip to content

Commit f2f2289

Browse files
committed
Merge branch 'mtm-stm' into PGPROEE9_6_MULTIMASTER
2 parents e88832e + 75f92cd commit f2f2289

24 files changed

+1240
-848
lines changed

contrib/mmts/Cluster.pm

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ sub pgbench_async()
304304
-p => $self->{nodes}->[$node]->port(),
305305
'postgres',
306306
);
307-
# diag("running pgbench init");
307+
diag("running pgbench: " . join(" ", @pgbench_command));
308308
my $handle = IPC::Run::start(\@pgbench_command, $in, $out);
309309
return $handle;
310310
}

contrib/mmts/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
EXTENSION = multimaster
33
DATA = multimaster--1.0.sql
4-
OBJS = multimaster.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o spill.o referee.o
4+
OBJS = multimaster.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o spill.o referee.o state.o
55
MODULE_big = multimaster
66

77
PG_CPPFLAGS = -I$(libpq_srcdir)

contrib/mmts/arbiter.c

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676

7777

7878
#include "multimaster.h"
79+
#include "state.h"
7980

8081
#define MAX_ROUTES 16
8182
#define INIT_BUFFER_SIZE 1024
@@ -189,7 +190,6 @@ static void MtmDisconnect(int node)
189190
MtmUnregisterSocket(sockets[node]);
190191
pg_closesocket(sockets[node], MtmUseRDMA);
191192
sockets[node] = -1;
192-
MtmOnNodeDisconnect(node+1);
193193
}
194194

195195
static int MtmWaitSocket(int sd, bool forWrite, timestamp_t timeoutMsec)
@@ -316,25 +316,22 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
316316
} else {
317317
BIT_CLEAR(Mtm->currentLockNodeMask, resp->node-1);
318318
}
319-
if (
320-
( BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1) || Mtm->status == MTM_IN_MINORITY )
321-
&& !BIT_CHECK(Mtm->disabledNodeMask, resp->node-1)
322-
&& Mtm->status != MTM_RECOVERY
323-
&& Mtm->status != MTM_RECOVERED
324-
&& Mtm->nodes[MtmNodeId-1].lastStatusChangeTime + MSEC_TO_USEC(MtmNodeDisableDelay) < MtmGetSystemTime())
325-
{
326-
MTM_ELOG(WARNING, "Node %d thinks that I'm dead, while I'm %s (message %s)", resp->node, MtmNodeStatusMnem[Mtm->status], MtmMessageKindMnem[resp->code]);
327-
BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
328-
Mtm->nConfigChanges += 1;
329-
MtmSwitchClusterMode(MTM_RECOVERY);
330-
} else if (BIT_CHECK(Mtm->disabledNodeMask, resp->node-1) && sockets[resp->node-1] < 0) {
331-
/* We receive heartbeat from disabled node.
319+
320+
// if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1))
321+
// {
322+
// MtmStateProcessEvent(MTM_REMOTE_DISABLE);
323+
// }
324+
325+
if (BIT_CHECK(Mtm->disabledNodeMask, resp->node-1) &&
326+
sockets[resp->node-1] < 0)
327+
{
328+
/* We've received heartbeat from disabled node.
332329
* Looks like it is restarted.
333330
* Try to reconnect to it.
334331
*/
335332
MTM_ELOG(WARNING, "Receive heartbeat from disabled node %d", resp->node);
336333
BIT_SET(Mtm->reconnectMask, resp->node-1);
337-
}
334+
}
338335
}
339336

340337
static void MtmScheduleHeartbeat()
@@ -543,17 +540,9 @@ static void MtmOpenConnections()
543540
for (i = 0; i < nNodes; i++) {
544541
if (i+1 != MtmNodeId && i < Mtm->nAllNodes) {
545542
sockets[i] = MtmConnectSocket(i, Mtm->nodes[i].con.arbiterPort);
546-
if (sockets[i] < 0) {
547-
MtmOnNodeDisconnect(i+1);
548-
}
549543
}
550544
}
551-
if (Mtm->nLiveNodes < Mtm->nAllNodes/2+1) { /* no quorum */
552-
MTM_ELOG(WARNING, "Node is out of quorum: only %d nodes of %d are accessible", Mtm->nLiveNodes, Mtm->nAllNodes);
553-
MtmSwitchClusterMode(MTM_IN_MINORITY);
554-
} else if (Mtm->status == MTM_INITIALIZATION) {
555-
MtmSwitchClusterMode(MTM_CONNECTED);
556-
}
545+
MtmStateProcessEvent(MTM_ARBITER_RECEIVER_START);
557546
}
558547

559548

@@ -586,7 +575,6 @@ static bool MtmSendToNode(int node, void const* buf, int size)
586575
}
587576
sockets[node] = MtmConnectSocket(node, Mtm->nodes[node].con.arbiterPort);
588577
if (sockets[node] < 0) {
589-
MtmOnNodeDisconnect(node+1);
590578
result = false;
591579
break;
592580
}
@@ -727,6 +715,7 @@ static void MtmSender(Datum arg)
727715
pqsignal(SIGINT, SetStop);
728716
pqsignal(SIGQUIT, SetStop);
729717
pqsignal(SIGTERM, SetStop);
718+
pqsignal(SIGHUP, PostgresSigHupHandler);
730719

731720
/* We're now ready to receive signals */
732721
BackgroundWorkerUnblockSignals();
@@ -745,6 +734,12 @@ static void MtmSender(Datum arg)
745734
PGSemaphoreLock(&Mtm->sendSemaphore);
746735
CHECK_FOR_INTERRUPTS();
747736

737+
if (ConfigReloadPending)
738+
{
739+
ConfigReloadPending = false;
740+
ProcessConfigFile(PGC_SIGHUP);
741+
}
742+
748743
MtmCheckHeartbeat();
749744
/*
750745
* Use shared lock to improve locality,
@@ -806,6 +801,7 @@ static void MtmMonitor(Datum arg)
806801
pqsignal(SIGINT, SetStop);
807802
pqsignal(SIGQUIT, SetStop);
808803
pqsignal(SIGTERM, SetStop);
804+
pqsignal(SIGHUP, PostgresSigHupHandler);
809805

810806
MtmBackgroundWorker = true;
811807

@@ -820,6 +816,13 @@ static void MtmMonitor(Datum arg)
820816
if (rc & WL_POSTMASTER_DEATH) {
821817
break;
822818
}
819+
820+
if (ConfigReloadPending)
821+
{
822+
ConfigReloadPending = false;
823+
ProcessConfigFile(PGC_SIGHUP);
824+
}
825+
823826
MtmRefreshClusterStatus();
824827
}
825828
}
@@ -845,6 +848,7 @@ static void MtmReceiver(Datum arg)
845848
pqsignal(SIGINT, SetStop);
846849
pqsignal(SIGQUIT, SetStop);
847850
pqsignal(SIGTERM, SetStop);
851+
pqsignal(SIGHUP, PostgresSigHupHandler);
848852

849853
MtmBackgroundWorker = true;
850854

@@ -880,7 +884,14 @@ static void MtmReceiver(Datum arg)
880884
for (j = 0; j < n; j++) {
881885
if (events[j].events & EPOLLIN)
882886
#else
883-
fd_set events;
887+
fd_set events;
888+
889+
if (ConfigReloadPending)
890+
{
891+
ConfigReloadPending = false;
892+
ProcessConfigFile(PGC_SIGHUP);
893+
}
894+
884895
do {
885896
struct timeval tv;
886897
events = inset;
@@ -1007,7 +1018,7 @@ static void MtmReceiver(Datum arg)
10071018
default:
10081019
break;
10091020
}
1010-
if (BIT_CHECK(msg->disabledNodeMask, node-1)) {
1021+
if (BIT_CHECK(msg->disabledNodeMask, node-1) || BIT_CHECK(Mtm->disabledNodeMask, node-1)) {
10111022
MTM_ELOG(WARNING, "Ignore message from dead node %d\n", node);
10121023
continue;
10131024
}
@@ -1085,7 +1096,7 @@ static void MtmReceiver(Datum arg)
10851096
if (ts->status != TRANSACTION_STATUS_ABORTED) {
10861097
MTM_LOG1("Arbiter receive abort message for transaction %s (%llu) from node %d", ts->gid, (long64)ts->xid, node);
10871098
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
1088-
ts->aborted_by_node = node;
1099+
ts->abortedByNode = node;
10891100
MtmAbortTransaction(ts);
10901101
}
10911102
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {

contrib/mmts/bgwpool.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include "bgwpool.h"
1616
#include "multimaster.h"
17+
#include "utils/guc.h"
1718

1819
bool MtmIsLogicalReceiver;
1920
int MtmMaxWorkers;
@@ -43,14 +44,21 @@ static void BgwPoolMainLoop(BgwPool* pool)
4344
pqsignal(SIGINT, BgwShutdownWorker);
4445
pqsignal(SIGQUIT, BgwShutdownWorker);
4546
pqsignal(SIGTERM, BgwShutdownWorker);
47+
pqsignal(SIGHUP, PostgresSigHupHandler);
4648

4749
BackgroundWorkerUnblockSignals();
4850
BackgroundWorkerInitializeConnection(pool->dbname, pool->dbuser);
4951
ActivePortal = &fakePortal;
5052
ActivePortal->status = PORTAL_ACTIVE;
5153
ActivePortal->sourceText = "";
5254

53-
while (true) {
55+
while (true) {
56+
if (ConfigReloadPending)
57+
{
58+
ConfigReloadPending = false;
59+
ProcessConfigFile(PGC_SIGHUP);
60+
}
61+
5462
PGSemaphoreLock(&pool->available);
5563
SpinLockAcquire(&pool->lock);
5664
if (pool->shutdown) {

contrib/mmts/multimaster--1.0.sql

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,3 +151,27 @@ $$
151151
LANGUAGE plpgsql;
152152

153153
-- select mtm.alter_sequences();
154+
155+
-- referee stuff
156+
CREATE TABLE IF NOT EXISTS mtm.referee_decision(key text primary key not null, node_id int);
157+
158+
CREATE OR REPLACE FUNCTION mtm.referee_get_winner(applicant_id int) RETURNS int AS
159+
$$
160+
DECLARE
161+
winner_id int;
162+
BEGIN
163+
insert into mtm.referee_decision values ('winner', applicant_id);
164+
select node_id into winner_id from mtm.referee_decision where key = 'winner';
165+
return winner_id;
166+
EXCEPTION WHEN others THEN
167+
select node_id into winner_id from mtm.referee_decision where key = 'winner';
168+
return winner_id;
169+
END
170+
$$
171+
LANGUAGE plpgsql;
172+
173+
CREATE OR REPLACE FUNCTION mtm.referee_clean() RETURNS void AS
174+
$$
175+
delete from mtm.referee_decision where key = 'winner';
176+
$$
177+
LANGUAGE sql;

0 commit comments

Comments
 (0)