Skip to content

Commit f4c9a73

Browse files
committed
2 parents 09cf6e4 + 71cf250 commit f4c9a73

File tree

5 files changed

+116
-71
lines changed

5 files changed

+116
-71
lines changed

contrib/mmts/arbiter.c

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,12 @@ MtmResolveHostByName(const char *hostname, unsigned* addrs, unsigned* n_addrs)
164164
return 1;
165165
}
166166

167+
static int stop = 0;
168+
static void SetStop(int sig)
169+
{
170+
stop = 1;
171+
}
172+
167173
#if USE_EPOLL
168174
static int epollfd;
169175
#else
@@ -530,17 +536,24 @@ static void MtmBroadcastMessage(MtmBuffer* txBuffer, MtmTransState* ts)
530536

531537
static void MtmTransSender(Datum arg)
532538
{
539+
sigset_t sset;
533540
int nNodes = MtmNodes;
534541
int i;
535542
MtmBuffer* txBuffer = (MtmBuffer*)palloc(sizeof(MtmBuffer)*nNodes);
536-
543+
544+
signal(SIGINT, SetStop);
545+
signal(SIGQUIT, SetStop);
546+
signal(SIGTERM, SetStop);
547+
sigfillset(&sset);
548+
sigprocmask(SIG_UNBLOCK, &sset, NULL);
549+
537550
MtmOpenConnections();
538551

539552
for (i = 0; i < nNodes; i++) {
540553
txBuffer[i].used = 0;
541554
}
542555

543-
while (true) {
556+
while (!stop) {
544557
MtmTransState* ts;
545558
PGSemaphoreLock(&Mtm->votingSemaphore);
546559
CHECK_FOR_INTERRUPTS();
@@ -605,6 +618,7 @@ static bool MtmRecovery()
605618

606619
static void MtmTransReceiver(Datum arg)
607620
{
621+
sigset_t sset;
608622
int nNodes = MtmNodes;
609623
int nResponses;
610624
int i, j, n, rc;
@@ -617,14 +631,20 @@ static void MtmTransReceiver(Datum arg)
617631
FD_ZERO(&inset);
618632
max_fd = 0;
619633
#endif
634+
635+
signal(SIGINT, SetStop);
636+
signal(SIGQUIT, SetStop);
637+
signal(SIGTERM, SetStop);
638+
sigfillset(&sset);
639+
sigprocmask(SIG_UNBLOCK, &sset, NULL);
620640

621641
MtmAcceptIncomingConnections();
622642

623643
for (i = 0; i < nNodes; i++) {
624644
rxBuffer[i].used = 0;
625645
}
626646

627-
while (true) {
647+
while (!stop) {
628648
#if USE_EPOLL
629649
n = epoll_wait(epollfd, events, nNodes, MtmKeepaliveTimeout/1000);
630650
if (n < 0) {

contrib/mmts/t/000_deadlock.pl

Lines changed: 64 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use PostgresNode;
55
use TestLib;
6-
use Test::More tests => 2;
6+
use Test::More tests => 1;
77

88
use DBI;
99
use DBD::Pg ':async';
@@ -14,23 +14,23 @@ sub query_row
1414
my $sth = $dbi->prepare($sql) || die;
1515
$sth->execute(@keys) || die;
1616
my $ret = $sth->fetchrow_array || undef;
17-
print "query_row('$sql') -> $ret \n";
17+
diag("query_row('$sql') -> $ret\n");
1818
return $ret;
1919
}
2020

2121
sub query_exec
2222
{
2323
my ($dbi, $sql) = @_;
2424
my $rv = $dbi->do($sql) || die;
25-
print "query_exec('$sql')\n";
25+
diag("query_exec('$sql') = $rv\n");
2626
return $rv;
2727
}
2828

2929
sub query_exec_async
3030
{
3131
my ($dbi, $sql) = @_;
3232
my $rv = $dbi->do($sql, {pg_async => PG_ASYNC}) || die;
33-
print "query_exec('$sql')\n";
33+
diag("query_exec_async('$sql')\n");
3434
return $rv;
3535
}
3636

@@ -44,7 +44,7 @@ sub allocate_ports
4444
{
4545
my $port = int(rand() * 16384) + 49152;
4646
next if $allocated_ports{$port};
47-
print "# Checking for port $port\n";
47+
diag("Checking for port $port\n");
4848
if (!TestLib::run_log(['pg_isready', '-h', $host, '-p', $port]))
4949
{
5050
$allocated_ports{$port} = 1;
@@ -73,8 +73,8 @@ sub allocate_ports
7373
my $mm_connstr = join(',', map { "${ \$_->connstr('postgres') }" } @nodes);
7474
my $raft_peers = join(',', map { join(':', $_->{id}, $_->host, $_->{raftport}) } @nodes);
7575

76-
print("# mm_connstr = $mm_connstr\n");
77-
print("# raft_peers = $raft_peers\n");
76+
diag("mm_connstr = $mm_connstr\n");
77+
diag("raft_peers = $raft_peers\n");
7878

7979
# Init and Configure
8080
foreach my $node (@nodes)
@@ -89,22 +89,19 @@ sub allocate_ports
8989
listen_addresses = '$host'
9090
unix_socket_directories = ''
9191
port = $pgport
92-
max_connections = 200
93-
shared_buffers = 1GB
94-
max_prepared_transactions = 200
95-
max_worker_processes = 100
92+
max_prepared_transactions = 10
93+
max_worker_processes = 10
9694
wal_level = logical
9795
fsync = off
98-
max_wal_size = 100GB
99-
min_wal_size = 1GB
10096
max_wal_senders = 10
10197
wal_sender_timeout = 0
10298
max_replication_slots = 10
10399
shared_preload_libraries = 'raftable,multimaster'
104-
multimaster.workers = 8
105-
multimaster.queue_size = 104857600 # 100mb
100+
multimaster.workers = 4
101+
multimaster.queue_size = 10485760 # 10mb
106102
multimaster.node_id = $id
107103
multimaster.conn_strings = '$mm_connstr'
104+
multimaster.use_raftable = true
108105
raftable.id = $id
109106
raftable.peers = '$raft_peers'
110107
));
@@ -122,27 +119,55 @@ sub allocate_ports
122119
$node->start();
123120
}
124121

125-
$nodes[0]->psql("create table t(k int primary key, v text)");
126-
$nodes[0]->psql("insert into t values (1, 'hello'), (2, 'world')");
127-
128-
#my @conns = map { DBI->connect('DBI:Pg:' . $_->connstr()) } @nodes;
129-
#
130-
#query_exec($conns[0], "begin");
131-
#query_exec($conns[1], "begin");
132-
#
133-
#query_exec($conns[0], "update t set v = 'foo' where k = 1");
134-
#query_exec($conns[1], "update t set v = 'bar' where k = 2");
135-
#
136-
#query_exec($conns[0], "update t set v = 'bar' where k = 2");
137-
#query_exec($conns[1], "update t set v = 'foo' where k = 1");
138-
#
139-
#query_exec_async($conns[0], "commit");
140-
#query_exec_async($conns[1], "commit");
141-
#
142-
#my $ready = 0;
143-
#$ready++ if $conns[0]->pg_ready;
144-
#$ready++ if $conns[1]->pg_ready;
145-
#
146-
#is($ready, 1, "one of the connections is deadlocked");
147-
#
148-
#sleep(2);
122+
my ($rc, $out, $err);
123+
sleep(10);
124+
125+
$nodes[0]->psql('postgres', "create table t(k int primary key, v text)");
126+
$nodes[0]->psql('postgres', "insert into t values (1, 'hello'), (2, 'world')");
127+
128+
my @conns = map { DBI->connect('DBI:Pg:' . $_->connstr()) } @nodes;
129+
130+
query_exec($conns[0], "begin");
131+
query_exec($conns[1], "begin");
132+
133+
query_exec($conns[0], "update t set v = 'asd' where k = 1");
134+
query_exec($conns[1], "update t set v = 'bsd'");
135+
136+
query_exec($conns[0], "update t set v = 'bar' where k = 2");
137+
query_exec($conns[1], "update t set v = 'foo'");
138+
139+
query_exec_async($conns[0], "commit");
140+
query_exec_async($conns[1], "commit");
141+
142+
my $timeout = 5;
143+
while ($timeout > 0)
144+
{
145+
my $r0 = $conns[0]->pg_ready();
146+
my $r1 = $conns[1]->pg_ready();
147+
if ($r0 && $r1) {
148+
last;
149+
}
150+
diag("queries still running: [0]=$r0 [1]=$r1\n");
151+
sleep(1);
152+
}
153+
154+
if ($timeout > 0)
155+
{
156+
diag("queries finished\n");
157+
158+
my $succeeded = 0;
159+
$succeeded++ if $conns[0]->pg_result();
160+
$succeeded++ if $conns[1]->pg_result();
161+
162+
pass("queries finished");
163+
}
164+
else
165+
{
166+
diag("queries timed out\n");
167+
$conns[0]->pg_cancel() unless $conns[0]->pg_ready();
168+
$conns[1]->pg_cancel() unless $conns[1]->pg_ready();
169+
170+
fail("queries timed out");
171+
}
172+
173+
query_row($conns[0], "select * from t where k = 1");

contrib/mmts/t/001_basic_recovery.pl

Lines changed: 24 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
use warnings;
33
use PostgresNode;
44
use TestLib;
5-
use Test::More tests => 2;
5+
use Test::More tests => 3;
66
use DBI;
77
use DBD::Pg ':async';
88

@@ -54,6 +54,7 @@ sub PostgresNode::inet_connstr {
5454
for (my $i=0; $i < $nnodes; $i++) {
5555
$nodes[$i]->append_conf('postgresql.conf', $pgconf_common);
5656
$nodes[$i]->append_conf('postgresql.conf', qq(
57+
#port = ${ \$nodes[$i]->port }
5758
multimaster.node_id = @{[ $i + 1 ]}
5859
multimaster.conn_strings = '$mm_connstr'
5960
#multimaster.arbiter_port = ${ \$nodes[0]->port }
@@ -69,8 +70,8 @@ sub PostgresNode::inet_connstr {
6970
###############################################################################
7071

7172
my $psql_out;
72-
# XXX: change to poll_untill
73-
sleep(7);
73+
# XXX: create extension on start and poll_untill status is Online
74+
sleep(5);
7475

7576
###############################################################################
7677
# Replication check
@@ -79,11 +80,9 @@ sub PostgresNode::inet_connstr {
7980
$nodes[0]->psql('postgres', "
8081
create extension multimaster;
8182
create table if not exists t(k int primary key, v int);
82-
insert into t values(1, 10);
83-
");
84-
83+
insert into t values(1, 10);");
8584
$nodes[1]->psql('postgres', "select v from t where k=1;", stdout => \$psql_out);
86-
is($psql_out, '10', "Check sanity while all nodes are up.");
85+
is($psql_out, '10', "Check replication while all nodes are up.");
8786

8887
###############################################################################
8988
# Isolation regress checks
@@ -97,30 +96,27 @@ sub PostgresNode::inet_connstr {
9796

9897
$nodes[2]->teardown_node;
9998

100-
# $nodes[0]->poll_query_until('postgres',
101-
# "select disconnected = true from mtm.get_nodes_state() where id=3;")
102-
# or die "Timed out while waiting for node to disconnect";
103-
104-
$nodes[0]->psql('postgres', "
105-
insert into t values(2, 20);
106-
");
99+
diag("sleeping");
100+
sleep(15);
107101

102+
diag("inserting 2");
103+
$nodes[0]->psql('postgres', "insert into t values(2, 20);");
104+
diag("selecting");
108105
$nodes[1]->psql('postgres', "select v from t where k=2;", stdout => \$psql_out);
109-
is($psql_out, '20', "Check that we can commit after one node disconnect.");
110-
111-
112-
113-
114-
115-
116-
117-
118-
119-
120-
121-
122-
106+
diag("selected");
107+
is($psql_out, '20', "Check replication after node failure.");
123108

109+
###############################################################################
110+
# Work after node start
111+
###############################################################################
124112

113+
$nodes[2]->start;
114+
sleep(15); # XXX: here we can poll
115+
diag("inserting 3");
116+
$nodes[0]->psql('postgres', "insert into t values(3, 30);");
117+
diag("selecting");
118+
$nodes[2]->psql('postgres', "select v from t where k=3;", stdout => \$psql_out);
119+
diag("selected");
120+
is($psql_out, '30', "Check replication after failed node recovery.");
125121

126122

contrib/raftable/raft/src/raft.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -812,7 +812,7 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
812812
raft_entry_t *e = &r->log.newentry;
813813
raft_update_t *u = &e->update;
814814

815-
if (!m->snapshot && !raft_appendable(r, m->previndex, m->prevterm)) goto finish;
815+
if (!m->empty && !m->snapshot && !raft_appendable(r, m->previndex, m->prevterm)) goto finish;
816816

817817
if (reply.progress.entries > 0) {
818818
reply.term = RAFT_LOG(r, reply.progress.entries - 1).term;

contrib/raftable/worker.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,10 +457,12 @@ void parse_peers(HostPort *peers, char *peerstr)
457457
char *host;
458458
int id, port;
459459
int i;
460+
peerstr = pstrdup(peerstr);
460461

461462
for (i = 0; i < RAFTABLE_PEERS_MAX; i++)
462463
peers[i].up = false;
463464

465+
464466
fprintf(stderr, "parsing '%s'\n", peerstr);
465467
peer = strtok_r(peerstr, ",", &state);
466468
while (peer)
@@ -488,4 +490,6 @@ void parse_peers(HostPort *peers, char *peerstr)
488490

489491
peer = strtok_r(NULL, ",", &state);
490492
}
493+
494+
pfree(peerstr);
491495
}

0 commit comments

Comments
 (0)