Skip to content

Commit bf09fde

Browse files
committed
Revert "Revert "Merge branch 'PGPROEE9_6' into PGPROEE9_6_MULTIMASTER""
This reverts commit 1460198.
1 parent a9317a7 commit bf09fde

38 files changed

+2482
-957
lines changed

contrib/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ SUBDIRS = \
4747
pg_visibility \
4848
pg_wait_sampling \
4949
postgres_fdw \
50+
referee \
5051
rum \
5152
seg \
5253
spi \

contrib/mmts/Cluster.pm

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ sub new
6666
my $node = new PostgresNode("node$i", $host, $pgport);
6767
$node->{id} = $i;
6868
$node->{arbiter_port} = $arbiter_port;
69+
$node->{mmconnstr} = "${ \$node->connstr('postgres') } arbiter_port=${ \$node->{arbiter_port} }";
6970
push(@$nodes, $node);
7071
}
7172

@@ -89,47 +90,54 @@ sub init
8990
}
9091
}
9192

93+
sub all_connstrs
94+
{
95+
my ($self) = @_;
96+
my $nodes = $self->{nodes};
97+
return join(', ', map { "${ \$_->connstr('postgres') } arbiter_port=${ \$_->{arbiter_port} }" } @$nodes);
98+
}
99+
100+
92101
sub configure
93102
{
94103
my ($self) = @_;
95104
my $nodes = $self->{nodes};
96-
my $nnodes = scalar @{ $nodes };
97105

98-
my $connstr = join(', ', map { "${ \$_->connstr('postgres') } arbiter_port=${ \$_->{arbiter_port} }" } @$nodes);
106+
my $connstr = $self->all_connstrs();
99107

100108
foreach my $node (@$nodes)
101109
{
102110
my $id = $node->{id};
103111
my $host = $node->host;
104112
my $pgport = $node->port;
105113
my $arbiter_port = $node->{arbiter_port};
114+
my $unix_sock_dir = $ENV{PGHOST};
106115

107116
$node->append_conf("postgresql.conf", qq(
108117
log_statement = none
109118
listen_addresses = '$host'
110-
unix_socket_directories = ''
119+
unix_socket_directories = '$unix_sock_dir'
111120
port = $pgport
112121
max_prepared_transactions = 10
113122
max_connections = 10
114123
max_worker_processes = 100
115124
wal_level = logical
116-
max_wal_senders = 5
125+
max_wal_senders = 6
117126
wal_sender_timeout = 0
118127
default_transaction_isolation = 'repeatable read'
119-
max_replication_slots = 5
128+
max_replication_slots = 6
120129
shared_preload_libraries = 'multimaster'
121130
shared_buffers = 16MB
122131
123132
multimaster.arbiter_port = $arbiter_port
124133
multimaster.workers = 1
125134
multimaster.node_id = $id
126135
multimaster.conn_strings = '$connstr'
127-
multimaster.heartbeat_recv_timeout = 2050
136+
multimaster.heartbeat_recv_timeout = 1050
128137
multimaster.heartbeat_send_timeout = 250
129-
multimaster.max_nodes = $nnodes
130-
multimaster.ignore_tables_without_pk = true
138+
multimaster.max_nodes = 6
139+
multimaster.ignore_tables_without_pk = false
131140
multimaster.queue_size = 4194304
132-
multimaster.min_2pc_timeout = 150000
133141
log_line_prefix = '%t: '
134142
));
135143

contrib/mmts/Dockerfile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ COPY ./ /pg/mmts/
66
RUN export USE_PGXS=1 && \
77
cd /pg/mmts && make clean && make install
88

9+
RUN export USE_PGXS=1 && \
10+
cd /pg/src/contrib/referee && make clean && make install
11+
912
# pg_regress client assumes such dir exists on server
1013
RUN cp /pg/src/src/test/regress/*.so /pg/install/lib/postgresql/
1114
USER postgres

contrib/mmts/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
EXTENSION = multimaster
33
DATA = multimaster--1.0.sql
4-
OBJS = multimaster.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o spill.o referee.o
4+
OBJS = multimaster.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o spill.o referee.o state.o
55
MODULE_big = multimaster
66

77
PG_CPPFLAGS = -I$(libpq_srcdir)

contrib/mmts/arbiter.c

Lines changed: 41 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676

7777

7878
#include "multimaster.h"
79+
#include "state.h"
7980

8081
#define MAX_ROUTES 16
8182
#define INIT_BUFFER_SIZE 1024
@@ -189,7 +190,6 @@ static void MtmDisconnect(int node)
189190
MtmUnregisterSocket(sockets[node]);
190191
pg_closesocket(sockets[node], MtmUseRDMA);
191192
sockets[node] = -1;
192-
MtmOnNodeDisconnect(node+1);
193193
}
194194

195195
static int MtmWaitSocket(int sd, bool forWrite, timestamp_t timeoutMsec)
@@ -316,25 +316,22 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
316316
} else {
317317
BIT_CLEAR(Mtm->currentLockNodeMask, resp->node-1);
318318
}
319-
if (
320-
( BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1) || Mtm->status == MTM_IN_MINORITY )
321-
&& !BIT_CHECK(Mtm->disabledNodeMask, resp->node-1)
322-
&& Mtm->status != MTM_RECOVERY
323-
&& Mtm->status != MTM_RECOVERED
324-
&& Mtm->nodes[MtmNodeId-1].lastStatusChangeTime + MSEC_TO_USEC(MtmNodeDisableDelay) < MtmGetSystemTime())
325-
{
326-
MTM_ELOG(WARNING, "Node %d thinks that I'm dead, while I'm %s (message %s)", resp->node, MtmNodeStatusMnem[Mtm->status], MtmMessageKindMnem[resp->code]);
327-
BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
328-
Mtm->nConfigChanges += 1;
329-
MtmSwitchClusterMode(MTM_RECOVERY);
330-
} else if (BIT_CHECK(Mtm->disabledNodeMask, resp->node-1) && sockets[resp->node-1] < 0) {
331-
/* We receive heartbeat from disabled node.
319+
320+
// if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1))
321+
// {
322+
// MtmStateProcessEvent(MTM_REMOTE_DISABLE);
323+
// }
324+
325+
if (BIT_CHECK(Mtm->disabledNodeMask, resp->node-1) &&
326+
sockets[resp->node-1] < 0)
327+
{
328+
/* We've received heartbeat from disabled node.
332329
* Looks like it is restarted.
333330
* Try to reconnect to it.
334331
*/
335332
MTM_ELOG(WARNING, "Receive heartbeat from disabled node %d", resp->node);
336333
BIT_SET(Mtm->reconnectMask, resp->node-1);
337-
}
334+
}
338335
}
339336

340337
static void MtmScheduleHeartbeat()
@@ -543,17 +540,9 @@ static void MtmOpenConnections()
543540
for (i = 0; i < nNodes; i++) {
544541
if (i+1 != MtmNodeId && i < Mtm->nAllNodes) {
545542
sockets[i] = MtmConnectSocket(i, Mtm->nodes[i].con.arbiterPort);
546-
if (sockets[i] < 0) {
547-
MtmOnNodeDisconnect(i+1);
548-
}
549543
}
550544
}
551-
if (Mtm->nLiveNodes < Mtm->nAllNodes/2+1) { /* no quorum */
552-
MTM_ELOG(WARNING, "Node is out of quorum: only %d nodes of %d are accessible", Mtm->nLiveNodes, Mtm->nAllNodes);
553-
MtmSwitchClusterMode(MTM_IN_MINORITY);
554-
} else if (Mtm->status == MTM_INITIALIZATION) {
555-
MtmSwitchClusterMode(MTM_CONNECTED);
556-
}
545+
MtmStateProcessEvent(MTM_ARBITER_RECEIVER_START);
557546
}
558547

559548

@@ -586,7 +575,6 @@ static bool MtmSendToNode(int node, void const* buf, int size)
586575
}
587576
sockets[node] = MtmConnectSocket(node, Mtm->nodes[node].con.arbiterPort);
588577
if (sockets[node] < 0) {
589-
MtmOnNodeDisconnect(node+1);
590578
result = false;
591579
break;
592580
}
@@ -716,16 +704,18 @@ static void MtmSender(Datum arg)
716704
{
717705
int nNodes = MtmMaxNodes;
718706
int i;
707+
MtmBuffer* txBuffer;
719708

720709
MtmBackgroundWorker = true;
721710

722-
MtmBuffer* txBuffer = (MtmBuffer*)palloc0(sizeof(MtmBuffer)*nNodes);
711+
txBuffer = (MtmBuffer*)palloc0(sizeof(MtmBuffer)*nNodes);
723712
MTM_ELOG(LOG, "Start arbiter sender %d", MyProcPid);
724713
InitializeTimeouts();
725714

726715
pqsignal(SIGINT, SetStop);
727716
pqsignal(SIGQUIT, SetStop);
728717
pqsignal(SIGTERM, SetStop);
718+
pqsignal(SIGHUP, PostgresSigHupHandler);
729719

730720
/* We're now ready to receive signals */
731721
BackgroundWorkerUnblockSignals();
@@ -744,6 +734,12 @@ static void MtmSender(Datum arg)
744734
PGSemaphoreLock(&Mtm->sendSemaphore);
745735
CHECK_FOR_INTERRUPTS();
746736

737+
if (ConfigReloadPending)
738+
{
739+
ConfigReloadPending = false;
740+
ProcessConfigFile(PGC_SIGHUP);
741+
}
742+
747743
MtmCheckHeartbeat();
748744
/*
749745
* Use shared lock to improve locality,
@@ -805,6 +801,7 @@ static void MtmMonitor(Datum arg)
805801
pqsignal(SIGINT, SetStop);
806802
pqsignal(SIGQUIT, SetStop);
807803
pqsignal(SIGTERM, SetStop);
804+
pqsignal(SIGHUP, PostgresSigHupHandler);
808805

809806
MtmBackgroundWorker = true;
810807

@@ -819,6 +816,13 @@ static void MtmMonitor(Datum arg)
819816
if (rc & WL_POSTMASTER_DEATH) {
820817
break;
821818
}
819+
820+
if (ConfigReloadPending)
821+
{
822+
ConfigReloadPending = false;
823+
ProcessConfigFile(PGC_SIGHUP);
824+
}
825+
822826
MtmRefreshClusterStatus();
823827
}
824828
}
@@ -844,6 +848,7 @@ static void MtmReceiver(Datum arg)
844848
pqsignal(SIGINT, SetStop);
845849
pqsignal(SIGQUIT, SetStop);
846850
pqsignal(SIGTERM, SetStop);
851+
pqsignal(SIGHUP, PostgresSigHupHandler);
847852

848853
MtmBackgroundWorker = true;
849854

@@ -879,7 +884,14 @@ static void MtmReceiver(Datum arg)
879884
for (j = 0; j < n; j++) {
880885
if (events[j].events & EPOLLIN)
881886
#else
882-
fd_set events;
887+
fd_set events;
888+
889+
if (ConfigReloadPending)
890+
{
891+
ConfigReloadPending = false;
892+
ProcessConfigFile(PGC_SIGHUP);
893+
}
894+
883895
do {
884896
struct timeval tv;
885897
events = inset;
@@ -1006,7 +1018,7 @@ static void MtmReceiver(Datum arg)
10061018
default:
10071019
break;
10081020
}
1009-
if (BIT_CHECK(msg->disabledNodeMask, node-1)) {
1021+
if (BIT_CHECK(msg->disabledNodeMask, node-1) || BIT_CHECK(Mtm->disabledNodeMask, node-1)) {
10101022
MTM_ELOG(WARNING, "Ignore message from dead node %d\n", node);
10111023
continue;
10121024
}
@@ -1084,7 +1096,7 @@ static void MtmReceiver(Datum arg)
10841096
if (ts->status != TRANSACTION_STATUS_ABORTED) {
10851097
MTM_LOG1("Arbiter receive abort message for transaction %s (%llu) from node %d", ts->gid, (long64)ts->xid, node);
10861098
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
1087-
ts->aborted_by_node = node;
1099+
ts->abortedByNode = node;
10881100
MtmAbortTransaction(ts);
10891101
}
10901102
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
@@ -1161,4 +1173,3 @@ static void MtmReceiver(Datum arg)
11611173
}
11621174
proc_exit(1); /* force restart of this bgwroker */
11631175
}
1164-

contrib/mmts/bgwpool.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include "bgwpool.h"
1616
#include "multimaster.h"
17+
#include "utils/guc.h"
1718

1819
bool MtmIsLogicalReceiver;
1920
int MtmMaxWorkers;
@@ -43,14 +44,21 @@ static void BgwPoolMainLoop(BgwPool* pool)
4344
pqsignal(SIGINT, BgwShutdownWorker);
4445
pqsignal(SIGQUIT, BgwShutdownWorker);
4546
pqsignal(SIGTERM, BgwShutdownWorker);
47+
pqsignal(SIGHUP, PostgresSigHupHandler);
4648

4749
BackgroundWorkerUnblockSignals();
4850
BackgroundWorkerInitializeConnection(pool->dbname, pool->dbuser);
4951
ActivePortal = &fakePortal;
5052
ActivePortal->status = PORTAL_ACTIVE;
5153
ActivePortal->sourceText = "";
5254

53-
while (true) {
55+
while (true) {
56+
if (ConfigReloadPending)
57+
{
58+
ConfigReloadPending = false;
59+
ProcessConfigFile(PGC_SIGHUP);
60+
}
61+
5462
PGSemaphoreLock(&pool->available);
5563
SpinLockAcquire(&pool->lock);
5664
if (pool->shutdown) {

0 commit comments

Comments
 (0)