@@ -553,6 +553,7 @@ static void MtmAddSubtransactions(MtmTransState* ts, TransactionId* subxids, int
553
553
Assert (!found );
554
554
sts -> status = ts -> status ;
555
555
sts -> csn = ts -> csn ;
556
+ sts -> votingCompleted = true;
556
557
MtmTransactionListInsertAfter (ts , sts );
557
558
}
558
559
}
@@ -745,7 +746,8 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
745
746
if (!MtmIsCoordinator (ts ) || Mtm -> status == MTM_RECOVERY ) {
746
747
MtmTransMap * tm = (MtmTransMap * )hash_search (MtmGid2State , x -> gid , HASH_ENTER , NULL );
747
748
Assert (x -> gid [0 ]);
748
- tm -> state = ts ;
749
+ tm -> state = ts ;
750
+ ts -> votingCompleted = true;
749
751
if (Mtm -> status != MTM_RECOVERY ) {
750
752
MtmSendNotificationMessage (ts , MSG_READY ); /* send notification to coordinator */
751
753
} else {
@@ -777,9 +779,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
777
779
MtmLock (LW_EXCLUSIVE );
778
780
tm = (MtmTransMap * )hash_search (MtmGid2State , x -> gid , HASH_REMOVE , NULL );
779
781
Assert (tm != NULL );
780
- tm -> state -> status = TRANSACTION_STATUS_ABORTED ;
781
- MtmAdjustSubtransactions (tm -> state );
782
- Mtm -> nActiveTransactions -= 1 ;
782
+ MtmAbortTransaction (tm -> state );
783
783
MtmUnlock ();
784
784
x -> status = TRANSACTION_STATUS_ABORTED ;
785
785
}
@@ -835,6 +835,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
835
835
ts -> gtid = x -> gtid ;
836
836
ts -> nSubxids = 0 ;
837
837
ts -> cmd = MSG_INVALID ;
838
+ ts -> votingCompleted = true;
838
839
MtmTransactionListAppend (ts );
839
840
}
840
841
MtmSendNotificationMessage (ts , MSG_ABORTED ); /* send notification to coordinator */
@@ -937,6 +938,20 @@ csn_t MtmGetTransactionCSN(TransactionId xid)
937
938
return csn ;
938
939
}
939
940
941
+ void MtmWakeUpBackend (MtmTransState * ts )
942
+ {
943
+ MTM_TRACE ("Wakeup backed procno=%d, pid=%d\n" , ts -> procno , ProcGlobal -> allProcs [ts -> procno ].pid );
944
+ ts -> votingCompleted = true;
945
+ SetLatch (& ProcGlobal -> allProcs [ts -> procno ].procLatch );
946
+ }
947
+
948
+ void MtmAbortTransaction (MtmTransState * ts )
949
+ {
950
+ ts -> status = TRANSACTION_STATUS_ABORTED ;
951
+ MtmAdjustSubtransactions (ts );
952
+ Mtm -> nActiveTransactions -= 1 ;
953
+ }
954
+
940
955
/*
941
956
* -------------------------------------------
942
957
* HA functions
@@ -1213,9 +1228,10 @@ void MtmCheckQuorum(void)
1213
1228
}
1214
1229
}
1215
1230
1216
-
1217
1231
void MtmOnNodeDisconnect (int nodeId )
1218
- {
1232
+ {
1233
+ MtmTransState * ts ;
1234
+
1219
1235
BIT_SET (Mtm -> connectivityMask , nodeId - 1 );
1220
1236
BIT_SET (Mtm -> reconnectMask , nodeId - 1 );
1221
1237
RaftableSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
@@ -1229,6 +1245,16 @@ void MtmOnNodeDisconnect(int nodeId)
1229
1245
BIT_SET (Mtm -> disabledNodeMask , nodeId - 1 );
1230
1246
Mtm -> nNodes -= 1 ;
1231
1247
MtmCheckQuorum ();
1248
+ /* Interrupt voting for active transaction and abort them */
1249
+ for (ts = Mtm -> transListHead ; ts != NULL ; ts = ts -> next ) {
1250
+ if (!ts -> votingCompleted ) {
1251
+ if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1252
+ elog (WARNING , "Rollback active transaction %d:%d" , ts -> gtid .node , ts -> gtid .xid );
1253
+ MtmAbortTransaction (ts );
1254
+ }
1255
+ MtmWakeUpBackend (ts );
1256
+ }
1257
+ }
1232
1258
}
1233
1259
MtmUnlock ();
1234
1260
}
0 commit comments