Skip to content

Commit f1faa89

Browse files
committed
Merge branch 'PGPROEE9_6_MULTIMASTER' into PGPROEE9_6
2 parents 62d4762 + 7f077cf commit f1faa89

File tree

11 files changed

+391
-132
lines changed

11 files changed

+391
-132
lines changed

contrib/mmts/Cluster.pm

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use Cwd;
1010

1111
use Socket;
1212

13+
use IPC::Run;
14+
1315
sub check_port
1416
{
1517
my ($host, $port) = @_;
@@ -27,7 +29,6 @@ sub check_port
2729
}
2830

2931
close(SOCK);
30-
diag("checking for port $port = $available\n");
3132
return $available;
3233
}
3334

@@ -148,6 +149,7 @@ sub start
148149
foreach my $node (@$nodes)
149150
{
150151
$node->start();
152+
diag "Starting node with connstr 'dbname=postgres port=@{[ $node->port() ]} host=@{[ $node->host() ]}'";
151153
}
152154
}
153155

@@ -272,4 +274,37 @@ sub poll
272274
return 0;
273275
}
274276

277+
sub pgbench()
278+
{
279+
my ($self, $node, @args) = @_;
280+
my $pgbench_handle = $self->pgbench_async($node, @args);
281+
$self->pgbench_await($pgbench_handle);
282+
}
283+
284+
sub pgbench_async()
285+
{
286+
my ($self, $node, @args) = @_;
287+
288+
my ($in, $out, $err, $rc);
289+
$in = '';
290+
$out = '';
291+
292+
my @pgbench_command = (
293+
'pgbench',
294+
@args,
295+
-h => $self->{nodes}->[$node]->host(),
296+
-p => $self->{nodes}->[$node]->port(),
297+
'postgres',
298+
);
299+
# diag("running pgbench init");
300+
my $handle = IPC::Run::start(\@pgbench_command, $in, $out);
301+
return $handle;
302+
}
303+
304+
sub pgbench_await()
305+
{
306+
my ($self, $pgbench_handle) = @_;
307+
IPC::Run::finish($pgbench_handle) || BAIL_OUT("pgbench exited with $?");
308+
}
309+
275310
1;

contrib/mmts/arbiter.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,8 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
314314
} else {
315315
BIT_CLEAR(Mtm->currentLockNodeMask, resp->node-1);
316316
}
317-
if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1)
317+
if (
318+
( BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1) || Mtm->status == MTM_IN_MINORITY )
318319
&& !BIT_CHECK(Mtm->disabledNodeMask, resp->node-1)
319320
&& Mtm->status != MTM_RECOVERY
320321
&& Mtm->status != MTM_RECOVERED
@@ -714,6 +715,8 @@ static void MtmSender(Datum arg)
714715
int nNodes = MtmMaxNodes;
715716
int i;
716717

718+
MtmBackgroundWorker = true;
719+
717720
MtmBuffer* txBuffer = (MtmBuffer*)palloc0(sizeof(MtmBuffer)*nNodes);
718721
MTM_ELOG(LOG, "Start arbiter sender %d", MyProcPid);
719722
InitializeTimeouts();
@@ -801,6 +804,8 @@ static void MtmMonitor(Datum arg)
801804
pqsignal(SIGQUIT, SetStop);
802805
pqsignal(SIGTERM, SetStop);
803806

807+
MtmBackgroundWorker = true;
808+
804809
/* We're now ready to receive signals */
805810
BackgroundWorkerUnblockSignals();
806811

@@ -837,7 +842,9 @@ static void MtmReceiver(Datum arg)
837842
pqsignal(SIGINT, SetStop);
838843
pqsignal(SIGQUIT, SetStop);
839844
pqsignal(SIGTERM, SetStop);
840-
845+
846+
MtmBackgroundWorker = true;
847+
841848
/* We're now ready to receive signals */
842849
BackgroundWorkerUnblockSignals();
843850

@@ -1075,6 +1082,7 @@ static void MtmReceiver(Datum arg)
10751082
if (ts->status != TRANSACTION_STATUS_ABORTED) {
10761083
MTM_LOG1("Arbiter receive abort message for transaction %s (%llu) from node %d", ts->gid, (long64)ts->xid, node);
10771084
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
1085+
ts->aborted_by_node = node;
10781086
MtmAbortTransaction(ts);
10791087
}
10801088
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {

contrib/mmts/bgwpool.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
3636

3737
MTM_ELOG(LOG, "Start background worker %d, shutdown=%d", MyProcPid, pool->shutdown);
3838

39+
MtmBackgroundWorker = true;
3940
MtmIsLogicalReceiver = true;
4041
MtmPool = pool;
4142

contrib/mmts/multimaster--1.0.sql

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,28 @@ CREATE FUNCTION mtm.stop_replication() RETURNS void
1818
AS 'MODULE_PATHNAME','mtm_stop_replication'
1919
LANGUAGE C;
2020

21+
-- Stop replication to the node. Node is didsabled, If drop_slot is true, then replication slot is dropped and node can be recovered using basebackup and recover_node function.
22+
-- If drop_slot is false and limit for maximal slot gap was not reached, then node can be restarted using resume_node function.
2123
CREATE FUNCTION mtm.stop_node(node integer, drop_slot bool default false) RETURNS void
2224
AS 'MODULE_PATHNAME','mtm_stop_node'
2325
LANGUAGE C;
2426

27+
-- Add new node to the cluster. Number of nodes should not exeed maximal number of nodes in the cluster.
2528
CREATE FUNCTION mtm.add_node(conn_str text) RETURNS void
2629
AS 'MODULE_PATHNAME','mtm_add_node'
2730
LANGUAGE C;
2831

29-
-- Create replication slot for the node which was previously stopped
32+
-- Create replication slot for the node which was previously stalled (its replicatoin slot was deleted)
3033
CREATE FUNCTION mtm.recover_node(node integer) RETURNS void
3134
AS 'MODULE_PATHNAME','mtm_recover_node'
3235
LANGUAGE C;
3336

37+
-- Resume previously stopped node with live replication slot. If node was not stopped, this function has no effect.
38+
-- It doesn't create slot and returns error if node is stalled (slot eas dropped)
39+
CREATE FUNCTION mtm.resume_node(node integer) RETURNS void
40+
AS 'MODULE_PATHNAME','mtm_resume_node'
41+
LANGUAGE C;
42+
3443

3544
CREATE FUNCTION mtm.get_snapshot() RETURNS bigint
3645
AS 'MODULE_PATHNAME','mtm_get_snapshot'
@@ -63,11 +72,11 @@ CREATE FUNCTION mtm.get_trans_by_xid(xid bigint) RETURNS mtm.trans_state
6372
AS 'MODULE_PATHNAME','mtm_get_trans_by_xid'
6473
LANGUAGE C;
6574

66-
CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
75+
CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
6776
AS 'MODULE_PATHNAME','mtm_get_cluster_state'
6877
LANGUAGE C;
6978

70-
CREATE FUNCTION mtm.collect_cluster_info() RETURNS SETOF mtm.cluster_state
79+
CREATE FUNCTION mtm.collect_cluster_info() RETURNS SETOF mtm.cluster_state
7180
AS 'MODULE_PATHNAME','mtm_collect_cluster_info'
7281
LANGUAGE C;
7382

@@ -103,9 +112,9 @@ CREATE FUNCTION mtm.referee_poll(xid bigint) RETURNS bigint
103112
AS 'MODULE_PATHNAME','mtm_referee_poll'
104113
LANGUAGE C;
105114

106-
CREATE TABLE IF NOT EXISTS mtm.local_tables(rel_schema text, rel_name text, primary key(rel_schema, rel_name));
115+
CREATE TABLE IF NOT EXISTS mtm.local_tables(rel_schema name, rel_name name, primary key(rel_schema, rel_name));
107116

108-
CREATE OR REPLACE FUNCTION mtm.alter_sequences() RETURNS boolean AS
117+
CREATE OR REPLACE FUNCTION mtm.alter_sequences() RETURNS boolean AS
109118
$$
110119
DECLARE
111120
seq_class record;

0 commit comments

Comments
 (0)