Skip to content

Commit e337683

Browse files
committed
Fixes to stop/resume/recover mechanisms
1 parent 9f2b1a3 commit e337683

File tree

6 files changed

+77
-67
lines changed

6 files changed

+77
-67
lines changed

contrib/mmts/Cluster.pm

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -350,22 +350,25 @@ sub is_data_identic()
350350
sub add_node()
351351
{
352352
my ($self, %params) = @_;
353-
my $pgport = defined $params{port} ? $params{port} : (allocate_ports('127.0.0.1', 1))[0];
354-
my $arbiter_port = defined $params{arbiter_port} ? $params{arbiter_port} : (allocate_ports('127.0.0.1', 1))[0];
355353

354+
my $pgport;
355+
my $arbiter_port;
356356
my $connstrs;
357357
my $node_id;
358358

359359
if (defined $params{node_id})
360360
{
361361
$node_id = $params{node_id};
362+
$pgport = $params{port};
363+
$arbiter_port = $params{arbiter_port};
362364
$connstrs = $self->all_connstrs();
363365
}
364366
else
365367
{
366-
my $new_conn = ", dbname=postgres host=127.0.0.1 port=$pgport arbiter_port=$arbiter_port";
367-
$connstrs = $self->all_connstrs() . $new_conn;
368368
$node_id = scalar(@{$self->{nodes}}) + 1;
369+
$pgport = (allocate_ports('127.0.0.1', 1))[0];
370+
$arbiter_port = (allocate_ports('127.0.0.1', 1))[0];
371+
$connstrs = $self->all_connstrs() . ", dbname=postgres host=127.0.0.1 port=$pgport arbiter_port=$arbiter_port";
369372
}
370373

371374
my $node = PostgresNode->get_new_node("node${node_id}x");
@@ -377,6 +380,8 @@ sub add_node()
377380

378381
$node->{_host} = '127.0.0.1';
379382
$node->{_port} = $pgport;
383+
$node->{port} = $pgport;
384+
$node->{host} = '127.0.0.1';
380385
$node->{arbiter_port} = $arbiter_port;
381386
$node->{mmconnstr} = "${ \$node->connstr('postgres') } arbiter_port=${ \$node->{arbiter_port} }";
382387
$node->append_conf("postgresql.conf", qq(

contrib/mmts/multimaster.c

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ static void MtmShmemStartup(void);
170170

171171
static BgwPool* MtmPoolConstructor(void);
172172
static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg);
173-
static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError);
173+
static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError, int forceOnNode);
174174
static void MtmProcessDDLCommand(char const* queryString, bool transactional);
175175

176176
static void MtmLockCluster(void);
@@ -978,7 +978,9 @@ MtmBeginTransaction(MtmCurrentTrans* x)
978978
x->isTwoPhase = false;
979979
x->isTransactionBlock = IsTransactionBlock();
980980
/* Application name can be changed using PGAPPNAME environment variable */
981-
if (x->isDistributed && Mtm->status != MTM_ONLINE && strcmp(application_name, MULTIMASTER_ADMIN) != 0 && !MtmBypass) {
981+
if (x->isDistributed && Mtm->status != MTM_ONLINE && strcmp(application_name, MULTIMASTER_ADMIN) != 0
982+
&& strcmp(application_name, MULTIMASTER_BROADCAST_SERVICE) != 0
983+
&& !MtmBypass) {
982984
/* Reject all user's transactions at offline cluster.
983985
* Allow execution of transaction by bg-workers to make it possible to perform recovery.
984986
*/
@@ -2410,7 +2412,7 @@ static void MtmInitialize()
24102412
for (i = 0; i < MtmNodes; i++) {
24112413
Mtm->nodes[i].oldestSnapshot = 0;
24122414
Mtm->nodes[i].disabledNodeMask = 0;
2413-
Mtm->nodes[i].connectivityMask = 7; // XXXX
2415+
Mtm->nodes[i].connectivityMask = (((nodemask_t)1 << MtmNodes) - 1);
24142416
Mtm->nodes[i].lockGraphUsed = 0;
24152417
Mtm->nodes[i].lockGraphAllocated = 0;
24162418
Mtm->nodes[i].lockGraphData = NULL;
@@ -2423,6 +2425,7 @@ static void MtmInitialize()
24232425
Mtm->nodes[i].originId = InvalidRepOriginId;
24242426
Mtm->nodes[i].timeline = 0;
24252427
Mtm->nodes[i].nHeartbeats = 0;
2428+
Mtm->nodes[i].manualRecovery = false;
24262429
Mtm->nodes[i].slotDeleted = false;
24272430
}
24282431
Mtm->nodes[MtmNodeId-1].originId = DoNotReplicateId;
@@ -3345,9 +3348,8 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33453348
}
33463349

33473350
/* Await until node is connected and both receiver and sender are in clique */
3348-
while (BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId - 1) ||
3349-
!BIT_CHECK(Mtm->clique, nodeId - 1) ||
3350-
!BIT_CHECK(Mtm->clique, MtmNodeId - 1) )
3351+
while (BIT_CHECK(EFFECTIVE_CONNECTIVITY_MASK, nodeId - 1) ||
3352+
BIT_CHECK(EFFECTIVE_CONNECTIVITY_MASK, MtmNodeId - 1))
33513353
{
33523354
MtmUnlock();
33533355
if (*shutdown)
@@ -3402,6 +3404,7 @@ void MtmRecoverNode(int nodeId)
34023404
MTM_ELOG(ERROR, "NodeID %d is out of range [1,%d]", nodeId, Mtm->nAllNodes);
34033405
}
34043406
MtmLock(LW_EXCLUSIVE);
3407+
Mtm->nodes[nodeId-1].manualRecovery = true;
34053408
if (BIT_CHECK(Mtm->stoppedNodeMask, nodeId-1))
34063409
{
34073410
Assert(BIT_CHECK(Mtm->disabledNodeMask, nodeId-1));
@@ -3412,8 +3415,8 @@ void MtmRecoverNode(int nodeId)
34123415

34133416
if (!MtmIsBroadcast())
34143417
{
3415-
MtmBroadcastUtilityStmt(psprintf("select pg_create_logical_replication_slot('" MULTIMASTER_SLOT_PATTERN "', '" MULTIMASTER_NAME "')", nodeId), true);
3416-
MtmBroadcastUtilityStmt(psprintf("select mtm.recover_node(%d)", nodeId), true);
3418+
MtmBroadcastUtilityStmt(psprintf("select pg_create_logical_replication_slot('" MULTIMASTER_SLOT_PATTERN "', '" MULTIMASTER_NAME "')", nodeId), true, 0);
3419+
MtmBroadcastUtilityStmt(psprintf("select mtm.recover_node(%d)", nodeId), true, 0);
34173420
}
34183421
}
34193422

@@ -3443,7 +3446,7 @@ void MtmResumeNode(int nodeId)
34433446

34443447
if (!MtmIsBroadcast())
34453448
{
3446-
MtmBroadcastUtilityStmt(psprintf("select mtm.resume_node(%d)", nodeId), true);
3449+
MtmBroadcastUtilityStmt(psprintf("select mtm.resume_node(%d)", nodeId), true, nodeId);
34473450
}
34483451
}
34493452

@@ -3458,20 +3461,19 @@ void MtmStopNode(int nodeId, bool dropSlot)
34583461
MTM_ELOG(ERROR, "NodeID %d is out of range [1,%d]", nodeId, Mtm->nAllNodes);
34593462
}
34603463

3461-
MtmLock(LW_EXCLUSIVE);
3464+
if (!MtmIsBroadcast())
3465+
{
3466+
MtmBroadcastUtilityStmt(psprintf("select mtm.stop_node(%d,%s)", nodeId, dropSlot ? "true" : "false"), true, nodeId);
3467+
}
34623468

3469+
MtmLock(LW_EXCLUSIVE);
34633470
BIT_SET(Mtm->stoppedNodeMask, nodeId-1);
3464-
34653471
if (!BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
34663472
{
34673473
MtmDisableNode(nodeId);
34683474
}
34693475
MtmUnlock();
34703476

3471-
if (!MtmIsBroadcast())
3472-
{
3473-
MtmBroadcastUtilityStmt(psprintf("select mtm.stop_node(%d,%s)", nodeId, dropSlot ? "true" : "false"), true);
3474-
}
34753477
if (dropSlot)
34763478
{
34773479
MtmDropSlot(nodeId);
@@ -3545,12 +3547,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
35453547
}
35463548

35473549
if (BIT_CHECK(Mtm->stoppedNodeMask, MtmReplicationNodeId-1)) {
3548-
MTM_ELOG(WARNING, "Stopped node %d tries to initiate recovery", MtmReplicationNodeId);
3549-
do {
3550-
MtmUnlock();
3551-
MtmSleep(STATUS_POLL_DELAY);
3552-
MtmLock(LW_EXCLUSIVE);
3553-
} while (BIT_CHECK(Mtm->stoppedNodeMask, MtmReplicationNodeId-1));
3550+
MtmUnlock();
3551+
MTM_ELOG(ERROR, "Stopped node %d tries to connect", MtmReplicationNodeId);
35543552
}
35553553

35563554
if (MtmIsRecoverySession) {
@@ -3857,8 +3855,8 @@ mtm_add_node(PG_FUNCTION_ARGS)
38573855
}
38583856
if (!MtmIsBroadcast())
38593857
{
3860-
MtmBroadcastUtilityStmt(psprintf("select pg_create_logical_replication_slot('" MULTIMASTER_SLOT_PATTERN "', '" MULTIMASTER_NAME "')", Mtm->nAllNodes+1), true);
3861-
MtmBroadcastUtilityStmt(psprintf("select mtm.add_node('%s')", connStr), true);
3858+
MtmBroadcastUtilityStmt(psprintf("select pg_create_logical_replication_slot('" MULTIMASTER_SLOT_PATTERN "', '" MULTIMASTER_NAME "')", Mtm->nAllNodes+1), true, 0);
3859+
MtmBroadcastUtilityStmt(psprintf("select mtm.add_node('%s')", connStr), true, 0);
38623860
}
38633861
else
38643862
{
@@ -4403,7 +4401,7 @@ MtmNoticeReceiver(void *i, const PGresult *res)
44034401
pfree(stripped_notice);
44044402
}
44054403

4406-
static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
4404+
static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError, int forceOnNode)
44074405
{
44084406
int i = 0;
44094407
nodemask_t disabledNodeMask = Mtm->disabledNodeMask;
@@ -4415,7 +4413,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
44154413

44164414
for (i = 0; i < nNodes; i++)
44174415
{
4418-
if (!BIT_CHECK(disabledNodeMask, i))
4416+
if (!BIT_CHECK(disabledNodeMask, i) || (i + 1 == forceOnNode))
44194417
{
44204418
conns[i] = PQconnectdb_safe(psprintf("%s application_name=%s", Mtm->nodes[i].con.connStr, MULTIMASTER_BROADCAST_SERVICE));
44214419
if (PQstatus(conns[i]) != CONNECTION_OK)

contrib/mmts/multimaster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ typedef ulong64 lsn_t;
9595
typedef char pgid_t[MULTIMASTER_MAX_GID_SIZE];
9696

9797
#define SELF_CONNECTIVITY_MASK (Mtm->nodes[MtmNodeId-1].connectivityMask)
98+
#define EFFECTIVE_CONNECTIVITY_MASK ( SELF_CONNECTIVITY_MASK | Mtm->stoppedNodeMask | ~Mtm->clique )
9899

99100
typedef enum
100101
{
@@ -232,6 +233,7 @@ typedef struct
232233
int lockGraphAllocated;
233234
int lockGraphUsed;
234235
uint64 nHeartbeats;
236+
bool manualRecovery;
235237
bool slotDeleted; /* Signalizes that node is already deleted our slot and
236238
* recovery from that node isn't possible.
237239
*/

contrib/mmts/pglogical_receiver.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ pglogical_receiver_main(Datum main_arg)
309309

310310
/* Start logical replication at specified position */
311311
originStartPos = replorigin_get_progress(originId, false);
312-
if (originStartPos == INVALID_LSN) {
312+
if (originStartPos == INVALID_LSN || Mtm->nodes[nodeId-1].manualRecovery) {
313313
/*
314314
* We are just creating new replication slot.
315315
* It is assumed that state of local and remote nodes is the same at this moment.
@@ -331,6 +331,7 @@ pglogical_receiver_main(Datum main_arg)
331331
}
332332
PQclear(res);
333333
resetPQExpBuffer(query);
334+
Mtm->nodes[nodeId-1].manualRecovery = false;
334335
} else {
335336
if (Mtm->nodes[nodeId-1].restartLSN < originStartPos) {
336337
MTM_LOG1("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)", nodeId, Mtm->nodes[nodeId-1].restartLSN, originStartPos);

contrib/mmts/state.c

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,21 +85,23 @@ MtmCheckState(void)
8585
int nReceivers = Mtm->nAllNodes - countZeroBits(Mtm->pglogicalReceiverMask, Mtm->nAllNodes);
8686
int nSenders = Mtm->nAllNodes - countZeroBits(Mtm->pglogicalSenderMask, Mtm->nAllNodes);
8787

88-
MTM_LOG1("[STATE] Status = (disabled=%s, unaccessible=%s, clique=%s, receivers=%s, senders=%s, total=%i, major=%d)",
88+
MTM_LOG1("[STATE] Status = (disabled=%s, unaccessible=%s, clique=%s, receivers=%s, senders=%s, total=%i, major=%d, stopped=%s)",
8989
maskToString(Mtm->disabledNodeMask, Mtm->nAllNodes),
9090
maskToString(SELF_CONNECTIVITY_MASK, Mtm->nAllNodes),
9191
maskToString(Mtm->clique, Mtm->nAllNodes),
9292
maskToString(Mtm->pglogicalReceiverMask, Mtm->nAllNodes),
9393
maskToString(Mtm->pglogicalSenderMask, Mtm->nAllNodes),
9494
Mtm->nAllNodes,
95-
(MtmMajorNode || Mtm->refereeGrant) );
95+
(MtmMajorNode || Mtm->refereeGrant),
96+
maskToString(Mtm->stoppedNodeMask, Mtm->nAllNodes));
9697

9798
isEnabledState =
9899
( (nConnected >= Mtm->nAllNodes/2+1) /* majority */
99100
// XXXX: should we restrict major with two nodes setup?
100101
|| (nConnected == Mtm->nAllNodes/2 && MtmMajorNode) /* or half + major node */
101102
|| (nConnected == Mtm->nAllNodes/2 && Mtm->refereeGrant) ) /* or half + referee */
102-
&& BIT_CHECK(Mtm->clique, MtmNodeId-1); /* in clique */
103+
&& BIT_CHECK(Mtm->clique, MtmNodeId-1) /* in clique */
104+
&& !BIT_CHECK(Mtm->stoppedNodeMask, MtmNodeId-1); /* is not stopped */
103105

104106
/* ANY -> MTM_DISABLED */
105107
if (!isEnabledState)
@@ -336,7 +338,7 @@ void MtmOnNodeConnect(int nodeId)
336338
// MtmRefreshClusterStatus();
337339
}
338340

339-
void MtmReconnectNode(int nodeId)
341+
void MtmReconnectNode(int nodeId) // XXXX evict that
340342
{
341343
// MTM_LOG1("[STATE] ReconnectNode for node %u", nodeId);
342344
MtmLock(LW_EXCLUSIVE);
@@ -391,13 +393,11 @@ MtmRefreshClusterStatus()
391393
* Check for referee decidion when pnly half of nodes are visible.
392394
*/
393395
if (MtmRefereeConnStr && *MtmRefereeConnStr && !Mtm->refereeGrant &&
394-
// XXXX visibility & ~clique?
395-
countZeroBits(SELF_CONNECTIVITY_MASK, Mtm->nAllNodes) == Mtm->nAllNodes/2)
396+
countZeroBits(EFFECTIVE_CONNECTIVITY_MASK, Mtm->nAllNodes) == Mtm->nAllNodes/2)
396397
{
397398
int winner_node_id = MtmGetRefereeWinner();
398399
if (winner_node_id != -1 &&
399-
// XXXX visibility & ~clique?
400-
!BIT_CHECK(SELF_CONNECTIVITY_MASK, winner_node_id - 1))
400+
!BIT_CHECK(EFFECTIVE_CONNECTIVITY_MASK, winner_node_id - 1))
401401
{
402402
MTM_LOG1("[STATE] Referee allowed to proceed with half of the nodes (winner_id = %d)",
403403
winner_node_id);

contrib/mmts/t/005_add_stop_node.pl

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use PostgresNode;
44
use Cluster;
55
use TestLib;
6-
use Test::More tests => 5;
6+
use Test::More tests => 7;
77

88
my $cluster = new Cluster(3);
99
$cluster->init();
@@ -68,39 +68,43 @@
6868
$cluster->{nodes}->[2]->psql('postgres', "select 't'",
6969
stdout => \$stopped_out, stderr => \$stopped_err);
7070
is($cluster->is_data_identic( (0,1,3) ), 1, "soft stop / resume");
71+
print("::$stopped_out ::$stopped_err\n");
7172
is($stopped_out eq '' && $stopped_err ne '', 1, "soft stop / resume");
7273

7374
$cluster->psql(0, 'postgres', "select mtm.resume_node(3)");
7475
$cluster->{nodes}->[2]->poll_query_until('postgres', "select 't'");
7576
$cluster->pgbench(2, ('-N', '-n', -T => '1') );
7677
is($cluster->is_data_identic( (0,1,2,3) ), 1, "soft stop / resume");
7778

78-
# ################################################################################
79-
# # hard stop / basebackup / recover
80-
# ################################################################################
79+
################################################################################
80+
# hard stop / basebackup / recover
81+
################################################################################
82+
83+
diag('Stopping node with slot drop');
84+
$cluster->psql(0, 'postgres', "select mtm.stop_node(3,'t')");
85+
# await for comletion?
86+
$cluster->{nodes}->[2]->stop('fast');
87+
88+
$cluster->pgbench(0, ('-N', '-n', -T => '1') );
89+
$cluster->pgbench(1, ('-N', '-n', -T => '1') );
90+
$cluster->pgbench(3, ('-N', '-n', -T => '1') );
91+
is($cluster->is_data_identic( (0,1,3) ), 1, "hard stop / resume");
92+
93+
$cluster->psql(0, 'postgres', "select mtm.recover_node(3)");
94+
95+
# now we need to perform backup from live node
96+
$cluster->add_node(port => $cluster->{nodes}->[2]->{_port},
97+
arbiter_port => $cluster->{nodes}->[2]->{arbiter_port},
98+
node_id => 3);
99+
100+
my $dd = $cluster->{nodes}->[4]->data_dir;
101+
diag("preparing to start $dd");
81102

82-
# diag('Stopping node with slot drop');
83-
# $cluster->psql(0, 'postgres', "select mtm.stop_node(3,'t')");
84-
# # await for comletion?
85-
# $cluster->{nodes}->[2]->stop();
86-
87-
# $cluster->pgbench(0, ('-N', '-n', -T => '1') );
88-
# $cluster->pgbench(1, ('-N', '-n', -T => '1') );
89-
# $cluster->pgbench(3, ('-N', '-n', -T => '1') );
90-
# is($cluster->is_data_identic( (0,1,3) ), 1, "hard stop / resume");
91-
92-
# $cluster->psql(0, 'postgres', "select mtm.recover_node(3)");
93-
94-
# # now we need to perform backup from live node
95-
# $cluster->add_node(port => $cluster->{nodes}->[2]->{port},
96-
# arbiter_port => $cluster->{nodes}->[2]->{arbiter_port},
97-
# node_id => 3);
98-
# diag("preparing to start");
99-
# $cluster->{nodes}->[4]->start;
100-
# $cluster->{nodes}->[4]->poll_query_until('postgres', "select 't'");
101-
102-
# $cluster->pgbench(0, ('-N', '-n', -T => '1') );
103-
# $cluster->pgbench(1, ('-N', '-n', -T => '1') );
104-
# $cluster->pgbench(3, ('-N', '-n', -T => '1') );
105-
# $cluster->pgbench(4, ('-N', '-n', -T => '1') );
106-
# is($cluster->is_data_identic( (0,1,3,4) ), 1, "hard stop / resume");
103+
$cluster->{nodes}->[4]->start;
104+
$cluster->{nodes}->[4]->poll_query_until('postgres', "select 't'");
105+
106+
$cluster->pgbench(0, ('-N', '-n', -T => '1') );
107+
$cluster->pgbench(1, ('-N', '-n', -T => '1') );
108+
$cluster->pgbench(3, ('-N', '-n', -T => '1') );
109+
$cluster->pgbench(4, ('-N', '-n', -T => '1') );
110+
is($cluster->is_data_identic( (0,1,3,4) ), 1, "hard stop / resume");

0 commit comments

Comments
 (0)