@@ -865,10 +865,13 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
865
865
866
866
void MtmJoinTransaction (GlobalTransactionId * gtid , csn_t globalSnapshot )
867
867
{
868
- MtmLock (LW_EXCLUSIVE );
869
- MtmSyncClock (globalSnapshot );
870
- MtmUnlock ();
871
-
868
+ if (globalSnapshot != INVALID_CSN ) {
869
+ MtmLock (LW_EXCLUSIVE );
870
+ MtmSyncClock (globalSnapshot );
871
+ MtmUnlock ();
872
+ } else {
873
+ globalSnapshot = MtmTx .snapshot ;
874
+ }
872
875
if (!TransactionIdIsValid (gtid -> xid )) {
873
876
/* In case of recovery InvalidTransactionId is passed */
874
877
Assert (Mtm -> status == MTM_RECOVERY );
@@ -1877,6 +1880,14 @@ void MtmDropNode(int nodeId, bool dropSlot)
1877
1880
}
1878
1881
}
1879
1882
}
1883
+ static void
1884
+ MtmOnProcExit (int code , Datum arg )
1885
+ {
1886
+ if (MtmReplicationNodeId >= 0 ) {
1887
+ elog (WARNING , "WAL-sender to %d is terminated" , MtmReplicationNodeId );
1888
+ MtmOnNodeDisconnect (MtmReplicationNodeId );
1889
+ }
1890
+ }
1880
1891
1881
1892
static void
1882
1893
MtmReplicationStartupHook (struct PGLogicalStartupHookArgs * args )
@@ -1923,13 +1934,17 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
1923
1934
elog (NOTICE , "Node %d start logical replication to node %d in normal mode" , MtmNodeId , MtmReplicationNodeId );
1924
1935
}
1925
1936
MtmUnlock ();
1937
+ on_proc_exit (MtmOnProcExit , 0 );
1926
1938
}
1927
1939
1928
1940
static void
1929
1941
MtmReplicationShutdownHook (struct PGLogicalShutdownHookArgs * args )
1930
1942
{
1931
- elog (WARNING , "Logical replication to node %d is stopped" , MtmReplicationNodeId );
1932
- MtmOnNodeDisconnect (MtmReplicationNodeId );
1943
+ if (MtmReplicationNodeId >= 0 ) {
1944
+ elog (WARNING , "Logical replication to node %d is stopped" , MtmReplicationNodeId );
1945
+ MtmOnNodeDisconnect (MtmReplicationNodeId );
1946
+ MtmReplicationNodeId = -1 ; /* defuse on_proc_exit hook */
1947
+ }
1933
1948
}
1934
1949
1935
1950
static bool
@@ -2167,7 +2182,8 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg)
2167
2182
return ret ;
2168
2183
}
2169
2184
2170
- void MtmNoticeReceiver (void * i , const PGresult * res )
2185
+ static void
2186
+ MtmNoticeReceiver (void * i , const PGresult * res )
2171
2187
{
2172
2188
char * notice = PQresultErrorMessage (res );
2173
2189
char * stripped_notice ;
0 commit comments