@@ -1028,6 +1028,7 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
1028
1028
MTM_TXTRACE (x , "PostPrepareTransaction Start" );
1029
1029
1030
1030
if (!x -> isDistributed ) {
1031
+ MTM_TXTRACE (x , "not distributed?" );
1031
1032
return ;
1032
1033
}
1033
1034
@@ -1040,25 +1041,34 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
1040
1041
Assert (ts != NULL );
1041
1042
//if (x->gid[0]) MTM_LOG1("Preparing transaction %d (%s) at %ld", x->xid, x->gid, MtmGetCurrentTime());
1042
1043
if (!MtmIsCoordinator (ts ) || Mtm -> status == MTM_RECOVERY ) {
1044
+ MTM_TXTRACE (x , "recovery?" );
1043
1045
Assert (x -> gid [0 ]);
1044
1046
ts -> votingCompleted = true;
1045
- if (Mtm -> status != MTM_RECOVERY || Mtm -> recoverySlot != MtmReplicationNodeId ) {
1047
+ MTM_TXTRACE (x , "recovery? 1" );
1048
+ if (Mtm -> status != MTM_RECOVERY || Mtm -> recoverySlot != MtmReplicationNodeId ) {
1049
+ MTM_TXTRACE (x , "recovery? 2" );
1046
1050
MtmSend2PCMessage (ts , MSG_PREPARED ); /* send notification to coordinator */
1047
1051
if (!MtmUseDtm ) {
1048
1052
ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1049
1053
}
1050
1054
} else {
1055
+ MTM_TXTRACE (x , "recovery? 3" );
1051
1056
ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1052
1057
}
1058
+ MTM_TXTRACE (x , "recovery? 4" );
1053
1059
MtmUnlock ();
1060
+ MTM_TXTRACE (x , "recovery? 5" );
1054
1061
MtmResetTransaction ();
1062
+ MTM_TXTRACE (x , "recovery? 6" );
1055
1063
} else {
1064
+ MTM_TXTRACE (x , "not recovery?" );
1056
1065
Mtm2PCVoting (x , ts );
1057
1066
MtmUnlock ();
1058
1067
if (x -> isTwoPhase ) {
1059
1068
MtmResetTransaction ();
1060
1069
}
1061
1070
}
1071
+ MTM_TXTRACE (x , "recovery? 7" );
1062
1072
//if (x->gid[0]) MTM_LOG1("Prepared transaction %d (%s) csn=%ld at %ld: %d", x->xid, x->gid, ts->csn, MtmGetCurrentTime(), ts->status);
1063
1073
if (Mtm -> inject2PCError == 3 ) {
1064
1074
Mtm -> inject2PCError = 0 ;
@@ -1136,6 +1146,7 @@ MtmLogAbortLogicalMessage(int nodeId, char const* gid)
1136
1146
strcpy (msg .gid , gid );
1137
1147
msg .origin_node = nodeId ;
1138
1148
msg .origin_lsn = replorigin_session_origin_lsn ;
1149
+ MTM_LOG2 ("[TRACE] MtmLogAbortLogicalMessage(%d, %s)" , nodeId , gid );
1139
1150
XLogFlush (LogLogicalMessage ("A" , (char * )& msg , sizeof msg , false));
1140
1151
}
1141
1152
@@ -1224,6 +1235,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
1224
1235
MtmTransactionListAppend (ts );
1225
1236
if (* x -> gid ) {
1226
1237
replorigin_session_origin_lsn = InvalidXLogRecPtr ;
1238
+ MTM_TXTRACE (x , "MtmEndTransaction/MtmLogAbortLogicalMessage" );
1227
1239
MtmLogAbortLogicalMessage (MtmNodeId , x -> gid );
1228
1240
}
1229
1241
}
@@ -2878,7 +2890,9 @@ void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
2878
2890
CommitTransactionCommand ();
2879
2891
MtmEndSession (nodeId , true);
2880
2892
} else if (status == TRANSACTION_STATUS_IN_PROGRESS ) {
2893
+ MtmBeginSession (nodeId );
2881
2894
MtmLogAbortLogicalMessage (nodeId , gid );
2895
+ MtmEndSession (nodeId , true);
2882
2896
}
2883
2897
}
2884
2898
@@ -3045,6 +3059,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
3045
3059
sscanf (strVal (elem -> arg ), "%lx" , & recoveredLSN );
3046
3060
MTM_LOG1 ("Recovered position of node %d is %lx" , MtmReplicationNodeId , recoveredLSN );
3047
3061
if (Mtm -> nodes [MtmReplicationNodeId - 1 ].restartLSN < recoveredLSN ) {
3062
+ MTM_LOG2 ("[restartlsn] node %d: %lx -> %lx (MtmReplicationStartupHook)" , MtmReplicationNodeId , Mtm -> nodes [MtmReplicationNodeId - 1 ].restartLSN , recoveredLSN );
3048
3063
Mtm -> nodes [MtmReplicationNodeId - 1 ].restartLSN = recoveredLSN ;
3049
3064
}
3050
3065
} else {
@@ -3210,18 +3225,20 @@ bool MtmFilterTransaction(char* record, int size)
3210
3225
}
3211
3226
restart_lsn = origin_node == MtmReplicationNodeId ? end_lsn : origin_lsn ;
3212
3227
if (Mtm -> nodes [origin_node - 1 ].restartLSN < restart_lsn ) {
3228
+ MTM_LOG2 ("[restartlsn] node %d: %lx -> %lx (MtmFilterTransaction)" , MtmReplicationNodeId , Mtm -> nodes [MtmReplicationNodeId - 1 ].restartLSN , restart_lsn );
3213
3229
Mtm -> nodes [origin_node - 1 ].restartLSN = restart_lsn ;
3214
3230
} else {
3215
3231
duplicate = true;
3216
3232
}
3217
3233
3218
3234
if (duplicate ) {
3219
- MTM_LOG1 ("Ignore transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn =%lx, current lsn =%lx" ,
3220
- gid , replication_node , end_lsn , flags , origin_node , origin_lsn , restart_lsn );
3235
+ MTM_LOG1 ("Ignore transaction %s from node %d flags=%x, our restartLSN for node: %lx,restart_lsn = ( origin node %d == MtmReplicationNodeId %d) ? end_lsn =%lx, origin_lsn =%lx" ,
3236
+ gid , replication_node , flags , Mtm -> nodes [ origin_node - 1 ]. restartLSN , origin_node , MtmReplicationNodeId , end_lsn , origin_lsn );
3221
3237
} else {
3222
3238
MTM_LOG2 ("Apply transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx" ,
3223
3239
gid , replication_node , end_lsn , flags , origin_node , origin_lsn , restart_lsn );
3224
3240
}
3241
+
3225
3242
return duplicate ;
3226
3243
}
3227
3244
@@ -4127,16 +4144,16 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
4127
4144
4128
4145
case T_VacuumStmt :
4129
4146
skipCommand = true;
4130
- if (context == PROCESS_UTILITY_TOPLEVEL ) {
4131
- MtmProcessDDLCommand (queryString , false, true);
4132
- MtmTx .isDistributed = false;
4133
- } else if (MtmApplyContext != NULL ) {
4134
- MemoryContext oldContext = MemoryContextSwitchTo (MtmApplyContext );
4135
- Assert (oldContext != MtmApplyContext );
4136
- MtmVacuumStmt = (VacuumStmt * )copyObject (parsetree );
4137
- MemoryContextSwitchTo (oldContext );
4138
- return ;
4139
- }
4147
+ // if (context == PROCESS_UTILITY_TOPLEVEL) {
4148
+ // MtmProcessDDLCommand(queryString, false, true);
4149
+ // MtmTx.isDistributed = false;
4150
+ // } else if (MtmApplyContext != NULL) {
4151
+ // MemoryContext oldContext = MemoryContextSwitchTo(MtmApplyContext);
4152
+ // Assert(oldContext != MtmApplyContext);
4153
+ // MtmVacuumStmt = (VacuumStmt*)copyObject(parsetree);
4154
+ // MemoryContextSwitchTo(oldContext);
4155
+ // return;
4156
+ // }
4140
4157
break ;
4141
4158
4142
4159
case T_CreateDomainStmt :
@@ -4231,7 +4248,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
4231
4248
if (indexStmt -> concurrent )
4232
4249
{
4233
4250
if (context == PROCESS_UTILITY_TOPLEVEL ) {
4234
- MtmProcessDDLCommand (queryString , false, true);
4251
+ // MtmProcessDDLCommand(queryString, false, true);
4235
4252
MtmTx .isDistributed = false;
4236
4253
skipCommand = true;
4237
4254
/*
@@ -4258,7 +4275,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
4258
4275
if (stmt -> removeType == OBJECT_INDEX && stmt -> concurrent )
4259
4276
{
4260
4277
if (context == PROCESS_UTILITY_TOPLEVEL ) {
4261
- MtmProcessDDLCommand (queryString , false, true);
4278
+ // MtmProcessDDLCommand(queryString, false, true);
4262
4279
MtmTx .isDistributed = false;
4263
4280
skipCommand = true;
4264
4281
} else if (MtmApplyContext != NULL ) {
0 commit comments