Skip to content

Commit 64911de

Browse files
committed
2 parents 129a907 + 197b9f5 commit 64911de

File tree

9 files changed

+77
-21
lines changed

9 files changed

+77
-21
lines changed

contrib/mmts/Cluster.pm

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,15 +126,46 @@ sub start
126126
}
127127
}
128128

129+
sub stopnode
130+
{
131+
my ($node, $mode) = @_;
132+
my $port = $node->port;
133+
my $pgdata = $node->data_dir;
134+
my $name = $node->name;
135+
$mode = 'fast' unless defined $mode;
136+
diag("stopping node $name ${mode}ly at $pgdata port $port");
137+
next unless defined $node->{_pid};
138+
my $ret = TestLib::system_log('pg_ctl', '-D', $pgdata, '-m', 'fast', 'stop');
139+
$node->{_pid} = undef;
140+
$node->_update_pid;
141+
142+
if ($ret != 0) {
143+
diag("$name failed to stop ${mode}ly");
144+
return 0;
145+
}
146+
147+
return 1;
148+
}
149+
129150
sub stop
130151
{
131-
my ($self) = @_;
152+
my ($self, $mode) = @_;
132153
my $nodes = $self->{nodes};
154+
$mode = 'fast' unless defined $mode;
133155

156+
my $ok = 1;
157+
diag("stopping cluster ${mode}ly");
134158
foreach my $node (@$nodes)
135159
{
136-
$node->stop('fast');
160+
if (!stopnode($node, $mode)) {
161+
$ok = 0;
162+
if (!stopnode($node, 'immediate')) {
163+
BAIL_OUT("failed to stop $node immediately");
164+
}
165+
}
137166
}
167+
168+
return $ok;
138169
}
139170

140171
sub teardown

contrib/mmts/multimaster.c

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1850,8 +1850,6 @@ void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr)
18501850
elog(ERROR, "Invalid raftable port: %s", port+9);
18511851
}
18521852
n += 9;
1853-
memmove(port, port+n, connStrLen - n + 1);
1854-
connStrLen -= n;
18551853
} else {
18561854
conn->raftablePort = 0;
18571855
}
@@ -1863,8 +1861,6 @@ void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr)
18631861
elog(ERROR, "Invalid arbiter port: %s", port+12);
18641862
}
18651863
n += 12;
1866-
memmove(port, port+n, connStrLen - n + 1);
1867-
connStrLen -= n;
18681864
} else {
18691865
conn->arbiterPort = 0;
18701866
}
@@ -2787,6 +2783,32 @@ typedef struct
27872783
int nodeId;
27882784
} MtmGetClusterInfoCtx;
27892785

2786+
static void erase_option_from_connstr(const char *option, char *connstr)
2787+
{
2788+
char *needle = psprintf("%s=", option);
2789+
while (1) {
2790+
char *found = strstr(connstr, needle);
2791+
if (found == NULL) break;
2792+
while (*found != '\0' && *found != ' ') {
2793+
*found = ' ';
2794+
found++;
2795+
}
2796+
}
2797+
pfree(needle);
2798+
}
2799+
2800+
PGconn *PQconnectdb_safe(const char *conninfo)
2801+
{
2802+
PGconn *conn;
2803+
char *safe_connstr = pstrdup(conninfo);
2804+
erase_option_from_connstr("raftport", safe_connstr);
2805+
erase_option_from_connstr("arbiterport", safe_connstr);
2806+
2807+
conn = PQconnectdb(safe_connstr);
2808+
2809+
pfree(safe_connstr);
2810+
return conn;
2811+
}
27902812

27912813
Datum
27922814
mtm_get_cluster_info(PG_FUNCTION_ARGS)
@@ -2819,9 +2841,9 @@ mtm_get_cluster_info(PG_FUNCTION_ARGS)
28192841
if (usrfctx->nodeId > Mtm->nAllNodes) {
28202842
SRF_RETURN_DONE(funcctx);
28212843
}
2822-
conn = PQconnectdb(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
2844+
conn = PQconnectdb_safe(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
28232845
if (PQstatus(conn) != CONNECTION_OK) {
2824-
elog(ERROR, "Failed to establish connection '%s' to node %d", Mtm->nodes[usrfctx->nodeId-1].con.connStr, usrfctx->nodeId);
2846+
elog(ERROR, "Failed to establish connection '%s' to node %d: error = %s", Mtm->nodes[usrfctx->nodeId-1].con.connStr, usrfctx->nodeId, PQerrorMessage(conn));
28252847
}
28262848
result = PQexec(conn, "select * from mtm.get_cluster_state()");
28272849

@@ -2996,7 +3018,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
29963018
{
29973019
if (!BIT_CHECK(disabledNodeMask, i))
29983020
{
2999-
conns[i] = PQconnectdb(psprintf("%s application_name=%s", Mtm->nodes[i].con.connStr, MULTIMASTER_BROADCAST_SERVICE));
3021+
conns[i] = PQconnectdb_safe(psprintf("%s application_name=%s", Mtm->nodes[i].con.connStr, MULTIMASTER_BROADCAST_SERVICE));
30003022
if (PQstatus(conns[i]) != CONNECTION_OK)
30013023
{
30023024
if (ignoreError)
@@ -3008,7 +3030,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
30083030
do {
30093031
PQfinish(conns[i]);
30103032
} while (--i >= 0);
3011-
elog(ERROR, "Failed to establish connection '%s' to node %d", Mtm->nodes[failedNode].con.connStr, failedNode+1);
3033+
elog(ERROR, "Failed to establish connection '%s' to node %d, error = %s", Mtm->nodes[failedNode].con.connStr, failedNode+1, PQerrorMessage(conns[i]));
30123034
}
30133035
}
30143036
PQsetNoticeReceiver(conns[i], MtmNoticeReceiver, &i);

contrib/mmts/multimaster.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
#include "access/clog.h"
99
#include "pglogical_output/hooks.h"
10+
#include "libpq-fe.h"
1011

1112
#define DEBUG_LEVEL 0
1213

@@ -283,7 +284,7 @@ extern void MtmUpdateLsnMapping(int nodeId, XLogRecPtr endLsn);
283284
extern XLogRecPtr MtmGetFlushPosition(int nodeId);
284285
extern bool MtmWatchdog(timestamp_t now);
285286
extern void MtmCheckHeartbeat(void);
286-
287+
extern PGconn *PQconnectdb_safe(const char *conninfo);
287288

288289

289290
#endif

contrib/mmts/pglogical_receiver.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,9 @@ pglogical_receiver_main(Datum main_arg)
217217
char *copybuf = NULL;
218218
int spill_file = -1;
219219
StringInfoData spill_info;
220+
char *slotName;
220221
char* connString = psprintf("replication=database %s", Mtm->nodes[nodeId-1].con.connStr);
221-
char* slotName = psprintf(MULTIMASTER_SLOT_PATTERN, MtmNodeId);
222+
slotName = psprintf(MULTIMASTER_SLOT_PATTERN, MtmNodeId);
222223

223224
initStringInfo(&spill_info);
224225

@@ -261,7 +262,7 @@ pglogical_receiver_main(Datum main_arg)
261262
count = Mtm->recoveryCount;
262263

263264
/* Establish connection to remote server */
264-
conn = PQconnectdb(connString);
265+
conn = PQconnectdb_safe(connString);
265266
status = PQstatus(conn);
266267
if (status != CONNECTION_OK)
267268
{

contrib/mmts/t/000_deadlock.pl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use Cluster;
55
use TestLib;
6-
use Test::More tests => 1;
6+
use Test::More tests => 2;
77

88
use DBI;
99
use DBD::Pg ':async';
@@ -93,5 +93,5 @@ sub query_exec_async
9393

9494
query_row($conns[0], "select * from t where k = 1");
9595

96-
$cluster->stop();
96+
ok($cluster->stop(), "cluster stops");
9797
1;

contrib/mmts/t/001_basic_recovery.pl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
use warnings;
33
use Cluster;
44
use TestLib;
5-
use Test::More tests => 7;
5+
use Test::More tests => 8;
66

77
my $cluster = new Cluster(3);
88
$cluster->init();
@@ -108,5 +108,5 @@
108108

109109
is($psql_out, '90', "Check replication after failed node recovery.");
110110

111-
$cluster->stop();
111+
ok($cluster->stop(), "cluster stops");
112112
1;

contrib/mmts/t/002_cross.pl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use Cluster;
55
use TestLib;
6-
use Test::More tests => 1;
6+
use Test::More tests => 2;
77
use IPC::Run qw(start finish);
88
use Cwd;
99

@@ -150,5 +150,5 @@ sub parse_state
150150

151151
is($anomalies, 0, "no cross anomalies after $selects selects");
152152

153-
$cluster->stop();
153+
ok($cluster->stop(), "cluster stops");
154154
1;

contrib/mmts/t/003_pgbench.pl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use Cluster;
55
use TestLib;
6-
use Test::More tests => 2;
6+
use Test::More tests => 3;
77
use IPC::Run qw(start finish);
88
use Cwd;
99

@@ -110,5 +110,5 @@ sub writer
110110
($rc, $out, $err) = $cluster->psql(0, 'postgres', "select count(*) from reader_log where v = 0;");
111111
isnt($out, 0, "there are some zeros in reader_log");
112112

113-
$cluster->stop();
113+
ok($cluster->stop(), "cluster stops");
114114
1;

contrib/mmts/tests/daemons.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ func postgres(bin string, datadir string, postgresi []string, port int, nodeid i
113113
"-c", "max_replication_slots=10",
114114
"-c", "max_wal_senders=10",
115115
"-c", "max_worker_processes=100",
116+
"-c", "max_prepared_transactions=200",
116117
"-c", "default_transaction_isolation=repeatable read",
117118
"-c", "multimaster.conn_strings=" + strings.Join(postgresi, ","),
118119
"-c", "multimaster.node_id=" + strconv.Itoa(nodeid + 1),

0 commit comments

Comments
 (0)