@@ -800,7 +800,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
800
800
* Send notification only if ABORT happens during transaction processing at replicas,
801
801
* do not send notification if ABORT is receiver from master
802
802
*/
803
- MTM_TRACE ("%d: send ABORT notification to coordinator %d\n" , MyProcPid , x -> gtid .node );
803
+ MTM_INFO ("%d: send ABORT notification abort transaction %d to coordinator %d\n" , MyProcPid , x -> gtid . xid , x -> gtid .node );
804
804
if (ts == NULL ) {
805
805
Assert (TransactionIdIsValid (x -> xid ));
806
806
ts = hash_search (MtmXid2State , & x -> xid , HASH_ENTER , NULL );
@@ -1604,6 +1604,11 @@ MtmSlotMode MtmReceiverSlotMode(int nodeId)
1604
1604
return Mtm -> recoverySlot ? SLOT_CREATE_NEW : SLOT_OPEN_ALWAYS ;
1605
1605
}
1606
1606
1607
+ static bool MtmIsBroadcast ()
1608
+ {
1609
+ return application_name != NULL && strcmp (application_name , MULTIMASTER_BROADCAST_SERVICE ) == 0 ;
1610
+ }
1611
+
1607
1612
void MtmRecoverNode (int nodeId )
1608
1613
{
1609
1614
if (nodeId <= 0 || nodeId > Mtm -> nNodes )
@@ -1613,7 +1618,7 @@ void MtmRecoverNode(int nodeId)
1613
1618
if (!BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 )) {
1614
1619
elog (ERROR , "Node %d was not disabled" , nodeId );
1615
1620
}
1616
- if (!IsTransactionBlock ())
1621
+ if (!MtmIsBroadcast ())
1617
1622
{
1618
1623
MtmBroadcastUtilityStmt (psprintf ("select pg_create_logical_replication_slot('" MULTIMASTER_SLOT_PATTERN "', '" MULTIMASTER_NAME "')" , nodeId ), true);
1619
1624
}
@@ -1630,7 +1635,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
1630
1635
}
1631
1636
BIT_SET (Mtm -> disabledNodeMask , nodeId - 1 );
1632
1637
Mtm -> nNodes -= 1 ;
1633
- if (!IsTransactionBlock ())
1638
+ if (!MtmIsBroadcast ())
1634
1639
{
1635
1640
MtmBroadcastUtilityStmt (psprintf ("select mtm.drop_node(%d,%s)" , nodeId , dropSlot ? "true" : "false" ), true);
1636
1641
}
@@ -1650,7 +1655,6 @@ MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
1650
1655
static bool
1651
1656
MtmReplicationTxnFilterHook (struct PGLogicalTxnFilterArgs * args )
1652
1657
{
1653
- elog (WARNING , "MtmReplicationTxnFilterHook: args->origin_id=%d, MtmReplicationNodeId=%d" , args -> origin_id , MtmReplicationNodeId );
1654
1658
return args -> origin_id == InvalidRepOriginId || MtmIsRecoveredNode (MtmReplicationNodeId );
1655
1659
}
1656
1660
@@ -1797,7 +1801,6 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg)
1797
1801
{
1798
1802
PGresult * result = PQexec (conn , sql );
1799
1803
int status = PQresultStatus (result );
1800
- char * errstr ;
1801
1804
1802
1805
bool ret = status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK ;
1803
1806
@@ -1817,25 +1820,18 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg)
1817
1820
1818
1821
static void MtmBroadcastUtilityStmt (char const * sql , bool ignoreError )
1819
1822
{
1820
- char * conn_str = pstrdup (MtmConnStrs );
1821
- char * conn_str_end = conn_str + strlen (conn_str );
1822
1823
int i = 0 ;
1823
1824
nodemask_t disabledNodeMask = Mtm -> disabledNodeMask ;
1824
1825
int failedNode = -1 ;
1825
1826
char const * errorMsg = NULL ;
1826
1827
PGconn * * conns = palloc0 (sizeof (PGconn * )* MtmNodes );
1827
1828
char * utility_errmsg ;
1828
1829
1829
- while ( conn_str < conn_str_end )
1830
+ for ( i = 0 ; i < MtmNodes ; i ++ )
1830
1831
{
1831
- char * p = strchr (conn_str , ',' );
1832
- if (p == NULL ) {
1833
- p = conn_str_end ;
1834
- }
1835
- * p = '\0' ;
1836
1832
if (!BIT_CHECK (disabledNodeMask , i ))
1837
1833
{
1838
- conns [i ] = PQconnectdb (conn_str );
1834
+ conns [i ] = PQconnectdb (psprintf ( "%s application_name=%s" , Mtm -> nodes [ i ]. con . connStr , MULTIMASTER_BROADCAST_SERVICE ) );
1839
1835
if (PQstatus (conns [i ]) != CONNECTION_OK )
1840
1836
{
1841
1837
if (ignoreError )
@@ -1847,12 +1843,10 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
1847
1843
do {
1848
1844
PQfinish (conns [i ]);
1849
1845
} while (-- i >= 0 );
1850
- elog (ERROR , "Failed to establish connection '%s' to node %d" , conn_str , failedNode );
1846
+ elog (ERROR , "Failed to establish connection '%s' to node %d" , Mtm -> nodes [ i ]. con . connStr , failedNode );
1851
1847
}
1852
1848
}
1853
1849
}
1854
- conn_str = p + 1 ;
1855
- i += 1 ;
1856
1850
}
1857
1851
Assert (i == MtmNodes );
1858
1852
0 commit comments