@@ -170,7 +170,7 @@ static void MtmShmemStartup(void);
170
170
171
171
static BgwPool * MtmPoolConstructor (void );
172
172
static bool MtmRunUtilityStmt (PGconn * conn , char const * sql , char * * errmsg );
173
- static void MtmBroadcastUtilityStmt (char const * sql , bool ignoreError );
173
+ static void MtmBroadcastUtilityStmt (char const * sql , bool ignoreError , int forceOnNode );
174
174
static void MtmProcessDDLCommand (char const * queryString , bool transactional );
175
175
176
176
static void MtmLockCluster (void );
@@ -978,7 +978,9 @@ MtmBeginTransaction(MtmCurrentTrans* x)
978
978
x -> isTwoPhase = false;
979
979
x -> isTransactionBlock = IsTransactionBlock ();
980
980
/* Application name can be changed using PGAPPNAME environment variable */
981
- if (x -> isDistributed && Mtm -> status != MTM_ONLINE && strcmp (application_name , MULTIMASTER_ADMIN ) != 0 && !MtmBypass ) {
981
+ if (x -> isDistributed && Mtm -> status != MTM_ONLINE && strcmp (application_name , MULTIMASTER_ADMIN ) != 0
982
+ && strcmp (application_name , MULTIMASTER_BROADCAST_SERVICE ) != 0
983
+ && !MtmBypass ) {
982
984
/* Reject all user's transactions at offline cluster.
983
985
* Allow execution of transaction by bg-workers to make it possible to perform recovery.
984
986
*/
@@ -2410,7 +2412,7 @@ static void MtmInitialize()
2410
2412
for (i = 0 ; i < MtmNodes ; i ++ ) {
2411
2413
Mtm -> nodes [i ].oldestSnapshot = 0 ;
2412
2414
Mtm -> nodes [i ].disabledNodeMask = 0 ;
2413
- Mtm -> nodes [i ].connectivityMask = 7 ; // XXXX
2415
+ Mtm -> nodes [i ].connectivityMask = ((( nodemask_t ) 1 << MtmNodes ) - 1 );
2414
2416
Mtm -> nodes [i ].lockGraphUsed = 0 ;
2415
2417
Mtm -> nodes [i ].lockGraphAllocated = 0 ;
2416
2418
Mtm -> nodes [i ].lockGraphData = NULL ;
@@ -2423,6 +2425,7 @@ static void MtmInitialize()
2423
2425
Mtm -> nodes [i ].originId = InvalidRepOriginId ;
2424
2426
Mtm -> nodes [i ].timeline = 0 ;
2425
2427
Mtm -> nodes [i ].nHeartbeats = 0 ;
2428
+ Mtm -> nodes [i ].manualRecovery = false;
2426
2429
Mtm -> nodes [i ].slotDeleted = false;
2427
2430
}
2428
2431
Mtm -> nodes [MtmNodeId - 1 ].originId = DoNotReplicateId ;
@@ -3345,9 +3348,8 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
3345
3348
}
3346
3349
3347
3350
/* Await until node is connected and both receiver and sender are in clique */
3348
- while (BIT_CHECK (SELF_CONNECTIVITY_MASK , nodeId - 1 ) ||
3349
- !BIT_CHECK (Mtm -> clique , nodeId - 1 ) ||
3350
- !BIT_CHECK (Mtm -> clique , MtmNodeId - 1 ) )
3351
+ while (BIT_CHECK (EFFECTIVE_CONNECTIVITY_MASK , nodeId - 1 ) ||
3352
+ BIT_CHECK (EFFECTIVE_CONNECTIVITY_MASK , MtmNodeId - 1 ))
3351
3353
{
3352
3354
MtmUnlock ();
3353
3355
if (* shutdown )
@@ -3402,6 +3404,7 @@ void MtmRecoverNode(int nodeId)
3402
3404
MTM_ELOG (ERROR , "NodeID %d is out of range [1,%d]" , nodeId , Mtm -> nAllNodes );
3403
3405
}
3404
3406
MtmLock (LW_EXCLUSIVE );
3407
+ Mtm -> nodes [nodeId - 1 ].manualRecovery = true;
3405
3408
if (BIT_CHECK (Mtm -> stoppedNodeMask , nodeId - 1 ))
3406
3409
{
3407
3410
Assert (BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 ));
@@ -3412,8 +3415,8 @@ void MtmRecoverNode(int nodeId)
3412
3415
3413
3416
if (!MtmIsBroadcast ())
3414
3417
{
3415
- MtmBroadcastUtilityStmt (psprintf ("select pg_create_logical_replication_slot('" MULTIMASTER_SLOT_PATTERN "', '" MULTIMASTER_NAME "')" , nodeId ), true);
3416
- MtmBroadcastUtilityStmt (psprintf ("select mtm.recover_node(%d)" , nodeId ), true);
3418
+ MtmBroadcastUtilityStmt (psprintf ("select pg_create_logical_replication_slot('" MULTIMASTER_SLOT_PATTERN "', '" MULTIMASTER_NAME "')" , nodeId ), true, 0 );
3419
+ MtmBroadcastUtilityStmt (psprintf ("select mtm.recover_node(%d)" , nodeId ), true, 0 );
3417
3420
}
3418
3421
}
3419
3422
@@ -3443,7 +3446,7 @@ void MtmResumeNode(int nodeId)
3443
3446
3444
3447
if (!MtmIsBroadcast ())
3445
3448
{
3446
- MtmBroadcastUtilityStmt (psprintf ("select mtm.resume_node(%d)" , nodeId ), true);
3449
+ MtmBroadcastUtilityStmt (psprintf ("select mtm.resume_node(%d)" , nodeId ), true, nodeId );
3447
3450
}
3448
3451
}
3449
3452
@@ -3458,20 +3461,19 @@ void MtmStopNode(int nodeId, bool dropSlot)
3458
3461
MTM_ELOG (ERROR , "NodeID %d is out of range [1,%d]" , nodeId , Mtm -> nAllNodes );
3459
3462
}
3460
3463
3461
- MtmLock (LW_EXCLUSIVE );
3464
+ if (!MtmIsBroadcast ())
3465
+ {
3466
+ MtmBroadcastUtilityStmt (psprintf ("select mtm.stop_node(%d,%s)" , nodeId , dropSlot ? "true" : "false" ), true, nodeId );
3467
+ }
3462
3468
3469
+ MtmLock (LW_EXCLUSIVE );
3463
3470
BIT_SET (Mtm -> stoppedNodeMask , nodeId - 1 );
3464
-
3465
3471
if (!BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 ))
3466
3472
{
3467
3473
MtmDisableNode (nodeId );
3468
3474
}
3469
3475
MtmUnlock ();
3470
3476
3471
- if (!MtmIsBroadcast ())
3472
- {
3473
- MtmBroadcastUtilityStmt (psprintf ("select mtm.stop_node(%d,%s)" , nodeId , dropSlot ? "true" : "false" ), true);
3474
- }
3475
3477
if (dropSlot )
3476
3478
{
3477
3479
MtmDropSlot (nodeId );
@@ -3545,12 +3547,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
3545
3547
}
3546
3548
3547
3549
if (BIT_CHECK (Mtm -> stoppedNodeMask , MtmReplicationNodeId - 1 )) {
3548
- MTM_ELOG (WARNING , "Stopped node %d tries to initiate recovery" , MtmReplicationNodeId );
3549
- do {
3550
- MtmUnlock ();
3551
- MtmSleep (STATUS_POLL_DELAY );
3552
- MtmLock (LW_EXCLUSIVE );
3553
- } while (BIT_CHECK (Mtm -> stoppedNodeMask , MtmReplicationNodeId - 1 ));
3550
+ MtmUnlock ();
3551
+ MTM_ELOG (ERROR , "Stopped node %d tries to connect" , MtmReplicationNodeId );
3554
3552
}
3555
3553
3556
3554
if (MtmIsRecoverySession ) {
@@ -3857,8 +3855,8 @@ mtm_add_node(PG_FUNCTION_ARGS)
3857
3855
}
3858
3856
if (!MtmIsBroadcast ())
3859
3857
{
3860
- MtmBroadcastUtilityStmt (psprintf ("select pg_create_logical_replication_slot('" MULTIMASTER_SLOT_PATTERN "', '" MULTIMASTER_NAME "')" , Mtm -> nAllNodes + 1 ), true);
3861
- MtmBroadcastUtilityStmt (psprintf ("select mtm.add_node('%s')" , connStr ), true);
3858
+ MtmBroadcastUtilityStmt (psprintf ("select pg_create_logical_replication_slot('" MULTIMASTER_SLOT_PATTERN "', '" MULTIMASTER_NAME "')" , Mtm -> nAllNodes + 1 ), true, 0 );
3859
+ MtmBroadcastUtilityStmt (psprintf ("select mtm.add_node('%s')" , connStr ), true, 0 );
3862
3860
}
3863
3861
else
3864
3862
{
@@ -4403,7 +4401,7 @@ MtmNoticeReceiver(void *i, const PGresult *res)
4403
4401
pfree (stripped_notice );
4404
4402
}
4405
4403
4406
- static void MtmBroadcastUtilityStmt (char const * sql , bool ignoreError )
4404
+ static void MtmBroadcastUtilityStmt (char const * sql , bool ignoreError , int forceOnNode )
4407
4405
{
4408
4406
int i = 0 ;
4409
4407
nodemask_t disabledNodeMask = Mtm -> disabledNodeMask ;
@@ -4415,7 +4413,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
4415
4413
4416
4414
for (i = 0 ; i < nNodes ; i ++ )
4417
4415
{
4418
- if (!BIT_CHECK (disabledNodeMask , i ))
4416
+ if (!BIT_CHECK (disabledNodeMask , i ) || ( i + 1 == forceOnNode ) )
4419
4417
{
4420
4418
conns [i ] = PQconnectdb_safe (psprintf ("%s application_name=%s" , Mtm -> nodes [i ].con .connStr , MULTIMASTER_BROADCAST_SERVICE ));
4421
4419
if (PQstatus (conns [i ]) != CONNECTION_OK )
0 commit comments