@@ -156,7 +156,8 @@ char const* const MtmNodeStatusMnem[] =
156
156
"Offline" ,
157
157
"Connected" ,
158
158
"Online" ,
159
- "Recovery"
159
+ "Recovery" ,
160
+ "InMinor"
160
161
};
161
162
162
163
bool MtmDoReplication ;
@@ -633,10 +634,10 @@ MtmBeginTransaction(MtmCurrentTrans* x)
633
634
x -> isDistributed = MtmIsUserTransaction ();
634
635
x -> isPrepared = false;
635
636
x -> isTransactionBlock = IsTransactionBlock ();
636
- if (x -> isDistributed && Mtm -> status != MTM_ONLINE ) {
637
+ /* Application name can be cahnged usnig PGAPPNAME environment variable */
638
+ if (x -> isDistributed && Mtm -> status != MTM_ONLINE && strcmp (application_name , MULTIMASTER_ADMIN ) != 0 ) {
637
639
/* reject all user's transactions at offline cluster */
638
640
MtmUnlock ();
639
- Assert (Mtm -> status == MTM_ONLINE );
640
641
elog (ERROR , "Multimaster node is not online: current status %s" , MtmNodeStatusMnem [Mtm -> status ]);
641
642
}
642
643
x -> containsDML = false;
@@ -983,11 +984,14 @@ bool MtmIsRecoveredNode(int nodeId)
983
984
* We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
984
985
* Is there some better way to establish mapping between nodes ad WAL-seconder?
985
986
*/
987
+ elog (WARNING ,"Node %d is catching up" , nodeId );
986
988
MtmLock (LW_EXCLUSIVE );
987
989
BIT_SET (Mtm -> nodeLockerMask , nodeId - 1 );
988
990
BIT_SET (Mtm -> walSenderLockerMask , MyWalSnd - WalSndCtl -> walsnds );
989
991
Mtm -> nLockers += 1 ;
990
992
MtmUnlock ();
993
+ } else {
994
+ MTM_INFO ("Continue recovery of node %d, slot position %lx, WAL position %lx, lockers %d\n" , nodeId , MyWalSnd -> sentPtr , GetXLogInsertRecPtr (), Mtm -> nLockers );
991
995
}
992
996
return true;
993
997
}
@@ -1024,7 +1028,7 @@ MtmCheckClusterLock()
1024
1028
break ;
1025
1029
} else {
1026
1030
/* recovered replica catched up with master */
1027
- elog (WARNING , "WAL-sender %d complete receovery " , i );
1031
+ elog (WARNING , "WAL-sender %d complete recovery " , i );
1028
1032
BIT_CLEAR (Mtm -> walSenderLockerMask , i );
1029
1033
}
1030
1034
}
@@ -1610,8 +1614,9 @@ void MtmReceiverStarted(int nodeId)
1610
1614
if (!BIT_CHECK (Mtm -> pglogicalNodeMask , nodeId - 1 )) {
1611
1615
BIT_SET (Mtm -> pglogicalNodeMask , nodeId - 1 );
1612
1616
if (++ Mtm -> nReceivers == Mtm -> nNodes - 1 ) {
1613
- Assert (Mtm -> status == MTM_CONNECTED );
1614
- MtmSwitchClusterMode (MTM_ONLINE );
1617
+ if (Mtm -> status == MTM_CONNECTED ) {
1618
+ MtmSwitchClusterMode (MTM_ONLINE );
1619
+ }
1615
1620
}
1616
1621
}
1617
1622
SpinLockRelease (& Mtm -> spinlock );
@@ -1624,19 +1629,28 @@ void MtmReceiverStarted(int nodeId)
1624
1629
*/
1625
1630
MtmSlotMode MtmReceiverSlotMode (int nodeId )
1626
1631
{
1632
+ bool recovery = false;
1627
1633
while (Mtm -> status != MTM_CONNECTED && Mtm -> status != MTM_ONLINE ) {
1634
+ MTM_INFO ("%d: receiver slot mode %s\n" , MyProcPid , MtmNodeStatusMnem [Mtm -> status ]);
1628
1635
if (Mtm -> status == MTM_RECOVERY ) {
1636
+ recovery = true;
1629
1637
if (Mtm -> recoverySlot == 0 || Mtm -> recoverySlot == nodeId ) {
1630
1638
/* Choose for recovery first available slot */
1639
+ elog (WARNING , "Start recovery from node %d" , nodeId );
1631
1640
Mtm -> recoverySlot = nodeId ;
1632
1641
return SLOT_OPEN_EXISTED ;
1633
1642
}
1634
1643
}
1635
1644
/* delay opening of other slots until recovery is completed */
1636
1645
MtmSleep (STATUS_POLL_DELAY );
1637
1646
}
1647
+ if (recovery ) {
1648
+ elog (WARNING , "Recreate replication slot for node %d after end of recovery" , nodeId );
1649
+ } else {
1650
+ MTM_INFO ("%d: Reuse replication slot for node %d\n" , MyProcPid , nodeId );
1651
+ }
1638
1652
/* After recovery completion we need to drop all other slots to avoid receive of redundant data */
1639
- return Mtm -> recoverySlot ? SLOT_CREATE_NEW : SLOT_OPEN_ALWAYS ;
1653
+ return recovery ? SLOT_CREATE_NEW : SLOT_OPEN_ALWAYS ;
1640
1654
}
1641
1655
1642
1656
static bool MtmIsBroadcast ()
@@ -1692,7 +1706,11 @@ MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
1692
1706
static bool
1693
1707
MtmReplicationTxnFilterHook (struct PGLogicalTxnFilterArgs * args )
1694
1708
{
1695
- return args -> origin_id == InvalidRepOriginId || MtmIsRecoveredNode (MtmReplicationNodeId );
1709
+ bool res = Mtm -> status != MTM_RECOVERY
1710
+ && (args -> origin_id == InvalidRepOriginId
1711
+ || MtmIsRecoveredNode (MtmReplicationNodeId ));
1712
+ MTM_TRACE ("%d: MtmReplicationTxnFilterHook->%d\n" , MyProcPid , res );
1713
+ return res ;
1696
1714
}
1697
1715
1698
1716
void MtmSetupReplicationHooks (struct PGLogicalHooks * hooks )
0 commit comments