@@ -1531,8 +1531,8 @@ static void MtmEnableNode(int nodeId)
1531
1531
void MtmRecoveryCompleted (void )
1532
1532
{
1533
1533
int i ;
1534
- MTM_LOG1 ("Recovery of node %d is completed, disabled mask=%llx, connectivity mask=%llx, live nodes=%d" ,
1535
- MtmNodeId , (long long ) Mtm -> disabledNodeMask , (long long ) Mtm -> connectivityMask , Mtm -> nLiveNodes );
1534
+ MTM_LOG1 ("Recovery of node %d is completed, disabled mask=%llx, connectivity mask=%llx, endLSN=%lx, live nodes=%d" ,
1535
+ MtmNodeId , (long long ) Mtm -> disabledNodeMask , (long long ) Mtm -> connectivityMask , GetXLogInsertRecPtr (), Mtm -> nLiveNodes );
1536
1536
MtmLock (LW_EXCLUSIVE );
1537
1537
Mtm -> recoverySlot = 0 ;
1538
1538
Mtm -> recoveredLSN = GetXLogInsertRecPtr ();
@@ -1542,7 +1542,7 @@ void MtmRecoveryCompleted(void)
1542
1542
for (i = 0 ; i < Mtm -> nAllNodes ; i ++ ) {
1543
1543
Mtm -> nodes [i ].lastHeartbeat = 0 ; /* defuse watchdog until first heartbeat is received */
1544
1544
}
1545
- /* Mode will be changed to online once all logical reciever are connected */
1545
+ /* Mode will be changed to online once all logical receiver are connected */
1546
1546
MtmSwitchClusterMode (MTM_CONNECTED );
1547
1547
MtmUnlock ();
1548
1548
}
@@ -2131,7 +2131,6 @@ static void MtmInitialize()
2131
2131
Mtm -> nodes [i ].restartLSN = InvalidXLogRecPtr ;
2132
2132
Mtm -> nodes [i ].originId = InvalidRepOriginId ;
2133
2133
Mtm -> nodes [i ].timeline = 0 ;
2134
- Mtm -> nodes [i ].recoveredLSN = InvalidXLogRecPtr ;
2135
2134
}
2136
2135
Mtm -> nodes [MtmNodeId - 1 ].originId = DoNotReplicateId ;
2137
2136
/* All transaction originated from the current node should be ignored during recovery */
@@ -2882,13 +2881,14 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
2882
2881
{
2883
2882
MtmReplicationMode mode = REPLMODE_OPEN_EXISTED ;
2884
2883
2884
+ MtmLock (LW_EXCLUSIVE );
2885
2885
while ((Mtm -> status != MTM_CONNECTED && Mtm -> status != MTM_ONLINE ) || BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 ))
2886
2886
{
2887
2887
if (* shutdown )
2888
2888
{
2889
+ MtmUnlock ();
2889
2890
return REPLMODE_EXIT ;
2890
2891
}
2891
- MtmLock (LW_EXCLUSIVE );
2892
2892
if (BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 )) {
2893
2893
mode = REPLMODE_CREATE_NEW ;
2894
2894
}
@@ -2911,6 +2911,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
2911
2911
MtmUnlock ();
2912
2912
/* delay opening of other slots until recovery is completed */
2913
2913
MtmSleep (STATUS_POLL_DELAY );
2914
+ MtmLock (LW_EXCLUSIVE );
2914
2915
}
2915
2916
if (mode == REPLMODE_RECOVERED ) {
2916
2917
MTM_LOG1 ("%d: Restart replication from node %d after end of recovery" , MyProcPid , nodeId );
@@ -2919,6 +2920,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
2919
2920
} else {
2920
2921
MTM_LOG1 ("%d: Continue replication from node %d" , MyProcPid , nodeId );
2921
2922
}
2923
+ MtmUnlock ();
2922
2924
return mode ;
2923
2925
}
2924
2926
@@ -3012,7 +3014,12 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
3012
3014
}
3013
3015
} else if (strcmp ("mtm_recovered_pos" , elem -> defname ) == 0 ) {
3014
3016
if (elem -> arg != NULL && strVal (elem -> arg ) != NULL ) {
3015
- sscanf (strVal (elem -> arg ), "%lx" , & Mtm -> nodes [MtmReplicationNodeId - 1 ].recoveredLSN );
3017
+ XLogRecPtr recoveredLSN ;
3018
+ sscanf (strVal (elem -> arg ), "%lx" , & recoveredLSN );
3019
+ MTM_LOG1 ("Recovered position of node %d is %lx" , MtmReplicationNodeId , recoveredLSN );
3020
+ if (Mtm -> nodes [MtmReplicationNodeId - 1 ].restartLSN < recoveredLSN ) {
3021
+ Mtm -> nodes [MtmReplicationNodeId - 1 ].restartLSN = recoveredLSN ;
3022
+ }
3016
3023
} else {
3017
3024
elog (ERROR , "Recovered position is not specified" );
3018
3025
}
@@ -3127,16 +3134,21 @@ MtmReplicationRowFilterHook(struct PGLogicalRowFilterArgs* args)
3127
3134
return isDistributed ;
3128
3135
}
3129
3136
3137
+ /*
3138
+ * Filter received transacyions at destination side.
3139
+ * This function is executed by receiver, so there are no race conditions and it is possible to update nodes[i].restaetLSN without lock
3140
+ */
3130
3141
bool MtmFilterTransaction (char * record , int size )
3131
3142
{
3132
3143
StringInfoData s ;
3133
3144
uint8 flags ;
3134
3145
XLogRecPtr origin_lsn ;
3135
3146
XLogRecPtr end_lsn ;
3147
+ XLogRecPtr restart_lsn ;
3136
3148
int replication_node ;
3137
3149
int origin_node ;
3138
3150
char const * gid = "" ;
3139
- bool duplicate ;
3151
+ bool duplicate = false ;
3140
3152
3141
3153
s .data = record ;
3142
3154
s .len = size ;
@@ -3172,11 +3184,17 @@ bool MtmFilterTransaction(char* record, int size)
3172
3184
default :
3173
3185
break ;
3174
3186
}
3187
+ restart_lsn = origin_node == MtmReplicationNodeId ? end_lsn : origin_lsn ;
3188
+ if (Mtm -> nodes [origin_node - 1 ].restartLSN < restart_lsn ) {
3189
+ Mtm -> nodes [origin_node - 1 ].restartLSN = restart_lsn ;
3190
+ } else {
3191
+ duplicate = true;
3192
+ }
3193
+
3175
3194
//duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLSN;
3176
- duplicate = origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm -> nodes [origin_node - 1 ].restartLSN ;
3177
3195
3178
3196
MTM_LOG1 ("%s transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx" ,
3179
- duplicate ? "Ignore" : "Apply" , gid , replication_node , end_lsn , flags , origin_node , origin_lsn , Mtm -> nodes [ origin_node - 1 ]. restartLSN );
3197
+ duplicate ? "Ignore" : "Apply" , gid , replication_node , end_lsn , flags , origin_node , origin_lsn , restart_lsn );
3180
3198
return duplicate ;
3181
3199
}
3182
3200
0 commit comments