Skip to content

Commit 1460198

Browse files
committed
Revert "Merge branch 'PGPROEE9_6' into PGPROEE9_6_MULTIMASTER"
This reverts commit 8581033, reversing changes made to 573beb1. Revert multimaster to state of release 9.6.5.1
1 parent b80997d commit 1460198

38 files changed

+957
-2482
lines changed

contrib/Makefile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ SUBDIRS = \
4747
pg_visibility \
4848
pg_wait_sampling \
4949
postgres_fdw \
50-
referee \
5150
rum \
5251
seg \
5352
spi \

contrib/mmts/Cluster.pm

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ sub new
6666
my $node = new PostgresNode("node$i", $host, $pgport);
6767
$node->{id} = $i;
6868
$node->{arbiter_port} = $arbiter_port;
69-
$node->{mmconnstr} = "${ \$node->connstr('postgres') } arbiter_port=${ \$node->{arbiter_port} }";
7069
push(@$nodes, $node);
7170
}
7271

@@ -90,54 +89,47 @@ sub init
9089
}
9190
}
9291

93-
sub all_connstrs
94-
{
95-
my ($self) = @_;
96-
my $nodes = $self->{nodes};
97-
return join(', ', map { "${ \$_->connstr('postgres') } arbiter_port=${ \$_->{arbiter_port} }" } @$nodes);
98-
}
99-
100-
10192
sub configure
10293
{
10394
my ($self) = @_;
10495
my $nodes = $self->{nodes};
96+
my $nnodes = scalar @{ $nodes };
10597

106-
my $connstr = $self->all_connstrs();
98+
my $connstr = join(', ', map { "${ \$_->connstr('postgres') } arbiter_port=${ \$_->{arbiter_port} }" } @$nodes);
10799

108100
foreach my $node (@$nodes)
109101
{
110102
my $id = $node->{id};
111103
my $host = $node->host;
112104
my $pgport = $node->port;
113105
my $arbiter_port = $node->{arbiter_port};
114-
my $unix_sock_dir = $ENV{PGHOST};
115106

116107
$node->append_conf("postgresql.conf", qq(
117108
log_statement = none
118109
listen_addresses = '$host'
119-
unix_socket_directories = '$unix_sock_dir'
110+
unix_socket_directories = ''
120111
port = $pgport
121112
max_prepared_transactions = 10
122113
max_connections = 10
123114
max_worker_processes = 100
124115
wal_level = logical
125-
max_wal_senders = 6
116+
max_wal_senders = 5
126117
wal_sender_timeout = 0
127118
default_transaction_isolation = 'repeatable read'
128-
max_replication_slots = 6
119+
max_replication_slots = 5
129120
shared_preload_libraries = 'multimaster'
130121
shared_buffers = 16MB
131122
132123
multimaster.arbiter_port = $arbiter_port
133124
multimaster.workers = 1
134125
multimaster.node_id = $id
135126
multimaster.conn_strings = '$connstr'
136-
multimaster.heartbeat_recv_timeout = 1050
127+
multimaster.heartbeat_recv_timeout = 2050
137128
multimaster.heartbeat_send_timeout = 250
138-
multimaster.max_nodes = 6
139-
multimaster.ignore_tables_without_pk = false
129+
multimaster.max_nodes = $nnodes
130+
multimaster.ignore_tables_without_pk = true
140131
multimaster.queue_size = 4194304
132+
multimaster.min_2pc_timeout = 150000
141133
log_line_prefix = '%t: '
142134
));
143135

contrib/mmts/Dockerfile

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@ COPY ./ /pg/mmts/
66
RUN export USE_PGXS=1 && \
77
cd /pg/mmts && make clean && make install
88

9-
RUN export USE_PGXS=1 && \
10-
cd /pg/src/contrib/referee && make clean && make install
11-
129
# pg_regress client assumes such dir exists on server
1310
RUN cp /pg/src/src/test/regress/*.so /pg/install/lib/postgresql/
1411
USER postgres

contrib/mmts/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
EXTENSION = multimaster
33
DATA = multimaster--1.0.sql
4-
OBJS = multimaster.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o spill.o referee.o state.o
4+
OBJS = multimaster.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o spill.o referee.o
55
MODULE_big = multimaster
66

77
PG_CPPFLAGS = -I$(libpq_srcdir)

contrib/mmts/arbiter.c

Lines changed: 30 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@
7676

7777

7878
#include "multimaster.h"
79-
#include "state.h"
8079

8180
#define MAX_ROUTES 16
8281
#define INIT_BUFFER_SIZE 1024
@@ -190,6 +189,7 @@ static void MtmDisconnect(int node)
190189
MtmUnregisterSocket(sockets[node]);
191190
pg_closesocket(sockets[node], MtmUseRDMA);
192191
sockets[node] = -1;
192+
MtmOnNodeDisconnect(node+1);
193193
}
194194

195195
static int MtmWaitSocket(int sd, bool forWrite, timestamp_t timeoutMsec)
@@ -316,22 +316,25 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
316316
} else {
317317
BIT_CLEAR(Mtm->currentLockNodeMask, resp->node-1);
318318
}
319-
320-
// if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1))
321-
// {
322-
// MtmStateProcessEvent(MTM_REMOTE_DISABLE);
323-
// }
324-
325-
if (BIT_CHECK(Mtm->disabledNodeMask, resp->node-1) &&
326-
sockets[resp->node-1] < 0)
327-
{
328-
/* We've received heartbeat from disabled node.
319+
if (
320+
( BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1) || Mtm->status == MTM_IN_MINORITY )
321+
&& !BIT_CHECK(Mtm->disabledNodeMask, resp->node-1)
322+
&& Mtm->status != MTM_RECOVERY
323+
&& Mtm->status != MTM_RECOVERED
324+
&& Mtm->nodes[MtmNodeId-1].lastStatusChangeTime + MSEC_TO_USEC(MtmNodeDisableDelay) < MtmGetSystemTime())
325+
{
326+
MTM_ELOG(WARNING, "Node %d thinks that I'm dead, while I'm %s (message %s)", resp->node, MtmNodeStatusMnem[Mtm->status], MtmMessageKindMnem[resp->code]);
327+
BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
328+
Mtm->nConfigChanges += 1;
329+
MtmSwitchClusterMode(MTM_RECOVERY);
330+
} else if (BIT_CHECK(Mtm->disabledNodeMask, resp->node-1) && sockets[resp->node-1] < 0) {
331+
/* We receive heartbeat from disabled node.
329332
* Looks like it is restarted.
330333
* Try to reconnect to it.
331334
*/
332335
MTM_ELOG(WARNING, "Receive heartbeat from disabled node %d", resp->node);
333336
BIT_SET(Mtm->reconnectMask, resp->node-1);
334-
}
337+
}
335338
}
336339

337340
static void MtmScheduleHeartbeat()
@@ -540,9 +543,17 @@ static void MtmOpenConnections()
540543
for (i = 0; i < nNodes; i++) {
541544
if (i+1 != MtmNodeId && i < Mtm->nAllNodes) {
542545
sockets[i] = MtmConnectSocket(i, Mtm->nodes[i].con.arbiterPort);
546+
if (sockets[i] < 0) {
547+
MtmOnNodeDisconnect(i+1);
548+
}
543549
}
544550
}
545-
MtmStateProcessEvent(MTM_ARBITER_RECEIVER_START);
551+
if (Mtm->nLiveNodes < Mtm->nAllNodes/2+1) { /* no quorum */
552+
MTM_ELOG(WARNING, "Node is out of quorum: only %d nodes of %d are accessible", Mtm->nLiveNodes, Mtm->nAllNodes);
553+
MtmSwitchClusterMode(MTM_IN_MINORITY);
554+
} else if (Mtm->status == MTM_INITIALIZATION) {
555+
MtmSwitchClusterMode(MTM_CONNECTED);
556+
}
546557
}
547558

548559

@@ -575,6 +586,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
575586
}
576587
sockets[node] = MtmConnectSocket(node, Mtm->nodes[node].con.arbiterPort);
577588
if (sockets[node] < 0) {
589+
MtmOnNodeDisconnect(node+1);
578590
result = false;
579591
break;
580592
}
@@ -704,18 +716,16 @@ static void MtmSender(Datum arg)
704716
{
705717
int nNodes = MtmMaxNodes;
706718
int i;
707-
MtmBuffer* txBuffer;
708719

709720
MtmBackgroundWorker = true;
710721

711-
txBuffer = (MtmBuffer*)palloc0(sizeof(MtmBuffer)*nNodes);
722+
MtmBuffer* txBuffer = (MtmBuffer*)palloc0(sizeof(MtmBuffer)*nNodes);
712723
MTM_ELOG(LOG, "Start arbiter sender %d", MyProcPid);
713724
InitializeTimeouts();
714725

715726
pqsignal(SIGINT, SetStop);
716727
pqsignal(SIGQUIT, SetStop);
717728
pqsignal(SIGTERM, SetStop);
718-
pqsignal(SIGHUP, PostgresSigHupHandler);
719729

720730
/* We're now ready to receive signals */
721731
BackgroundWorkerUnblockSignals();
@@ -734,12 +744,6 @@ static void MtmSender(Datum arg)
734744
PGSemaphoreLock(&Mtm->sendSemaphore);
735745
CHECK_FOR_INTERRUPTS();
736746

737-
if (ConfigReloadPending)
738-
{
739-
ConfigReloadPending = false;
740-
ProcessConfigFile(PGC_SIGHUP);
741-
}
742-
743747
MtmCheckHeartbeat();
744748
/*
745749
* Use shared lock to improve locality,
@@ -801,7 +805,6 @@ static void MtmMonitor(Datum arg)
801805
pqsignal(SIGINT, SetStop);
802806
pqsignal(SIGQUIT, SetStop);
803807
pqsignal(SIGTERM, SetStop);
804-
pqsignal(SIGHUP, PostgresSigHupHandler);
805808

806809
MtmBackgroundWorker = true;
807810

@@ -816,13 +819,6 @@ static void MtmMonitor(Datum arg)
816819
if (rc & WL_POSTMASTER_DEATH) {
817820
break;
818821
}
819-
820-
if (ConfigReloadPending)
821-
{
822-
ConfigReloadPending = false;
823-
ProcessConfigFile(PGC_SIGHUP);
824-
}
825-
826822
MtmRefreshClusterStatus();
827823
}
828824
}
@@ -848,7 +844,6 @@ static void MtmReceiver(Datum arg)
848844
pqsignal(SIGINT, SetStop);
849845
pqsignal(SIGQUIT, SetStop);
850846
pqsignal(SIGTERM, SetStop);
851-
pqsignal(SIGHUP, PostgresSigHupHandler);
852847

853848
MtmBackgroundWorker = true;
854849

@@ -884,14 +879,7 @@ static void MtmReceiver(Datum arg)
884879
for (j = 0; j < n; j++) {
885880
if (events[j].events & EPOLLIN)
886881
#else
887-
fd_set events;
888-
889-
if (ConfigReloadPending)
890-
{
891-
ConfigReloadPending = false;
892-
ProcessConfigFile(PGC_SIGHUP);
893-
}
894-
882+
fd_set events;
895883
do {
896884
struct timeval tv;
897885
events = inset;
@@ -1018,7 +1006,7 @@ static void MtmReceiver(Datum arg)
10181006
default:
10191007
break;
10201008
}
1021-
if (BIT_CHECK(msg->disabledNodeMask, node-1) || BIT_CHECK(Mtm->disabledNodeMask, node-1)) {
1009+
if (BIT_CHECK(msg->disabledNodeMask, node-1)) {
10221010
MTM_ELOG(WARNING, "Ignore message from dead node %d\n", node);
10231011
continue;
10241012
}
@@ -1096,7 +1084,7 @@ static void MtmReceiver(Datum arg)
10961084
if (ts->status != TRANSACTION_STATUS_ABORTED) {
10971085
MTM_LOG1("Arbiter receive abort message for transaction %s (%llu) from node %d", ts->gid, (long64)ts->xid, node);
10981086
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
1099-
ts->abortedByNode = node;
1087+
ts->aborted_by_node = node;
11001088
MtmAbortTransaction(ts);
11011089
}
11021090
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
@@ -1173,3 +1161,4 @@ static void MtmReceiver(Datum arg)
11731161
}
11741162
proc_exit(1); /* force restart of this bgwroker */
11751163
}
1164+

contrib/mmts/bgwpool.c

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
#include "bgwpool.h"
1616
#include "multimaster.h"
17-
#include "utils/guc.h"
1817

1918
bool MtmIsLogicalReceiver;
2019
int MtmMaxWorkers;
@@ -44,21 +43,14 @@ static void BgwPoolMainLoop(BgwPool* pool)
4443
pqsignal(SIGINT, BgwShutdownWorker);
4544
pqsignal(SIGQUIT, BgwShutdownWorker);
4645
pqsignal(SIGTERM, BgwShutdownWorker);
47-
pqsignal(SIGHUP, PostgresSigHupHandler);
4846

4947
BackgroundWorkerUnblockSignals();
5048
BackgroundWorkerInitializeConnection(pool->dbname, pool->dbuser);
5149
ActivePortal = &fakePortal;
5250
ActivePortal->status = PORTAL_ACTIVE;
5351
ActivePortal->sourceText = "";
5452

55-
while (true) {
56-
if (ConfigReloadPending)
57-
{
58-
ConfigReloadPending = false;
59-
ProcessConfigFile(PGC_SIGHUP);
60-
}
61-
53+
while (true) {
6254
PGSemaphoreLock(&pool->available);
6355
SpinLockAcquire(&pool->lock);
6456
if (pool->shutdown) {

0 commit comments

Comments
 (0)