@@ -91,21 +91,19 @@ static void MtmMonitor(Datum arg);
91
91
static void MtmSendHeartbeat (void );
92
92
static bool MtmSendToNode (int node , void const * buf , int size );
93
93
94
- /*
95
- static char const* const messageText[] =
94
+ static char const * const messageKindText [] =
96
95
{
97
96
"INVALID" ,
98
97
"HANDSHAKE" ,
99
- "READY",
100
- "PREPARE",
101
98
"PREPARED" ,
99
+ "PRECOMMIT" ,
100
+ "PRECOMMITTED" ,
102
101
"ABORTED" ,
103
102
"STATUS" ,
104
103
"HEARTBEAT" ,
105
104
"POLL_REQUEST" ,
106
105
"POLL_STATUS"
107
106
};
108
- */
109
107
110
108
static BackgroundWorker MtmSenderWorker = {
111
109
"mtm-sender" ,
@@ -364,7 +362,7 @@ static void MtmSendHeartbeat()
364
362
MTM_LOG2 ("Send heartbeat to node %d with timestamp %ld" , i + 1 , now );
365
363
}
366
364
} else {
367
- MTM_LOG1 ("Do not send heartbeat to node %d, busy mask %lld, status %d" , i + 1 , (long long ) busy_mask , Mtm -> status );
365
+ MTM_LOG2 ("Do not send heartbeat to node %d, busy mask %lld, status %d" , i + 1 , (long long ) busy_mask , Mtm -> status );
368
366
}
369
367
}
370
368
}
@@ -902,9 +900,14 @@ static void MtmReceiver(Datum arg)
902
900
msg -> status = TRANSACTION_STATUS_ABORTED ;
903
901
} else {
904
902
msg -> status = tm -> state -> status ;
903
+ msg -> csn = tm -> state -> csn ;
905
904
MTM_LOG1 ("Send response %d for transaction %s to node %d" , msg -> status , msg -> gid , msg -> node );
906
905
}
907
- msg -> code = MSG_POLL_STATUS ;
906
+ msg -> disabledNodeMask = Mtm -> disabledNodeMask ;
907
+ msg -> connectivityMask = Mtm -> connectivityMask ;
908
+ msg -> oldestSnapshot = Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot ;
909
+ msg -> code = MSG_POLL_STATUS ;
910
+ msg -> csn = ts -> csn ;
908
911
MtmSendMessage (msg );
909
912
continue ;
910
913
case MSG_POLL_STATUS :
@@ -915,41 +918,34 @@ static void MtmReceiver(Datum arg)
915
918
} else {
916
919
ts = tm -> state ;
917
920
BIT_SET (ts -> votedMask , node - 1 );
918
- if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
919
- if (msg -> status == TRANSACTION_STATUS_UNKNOWN || msg -> status == TRANSACTION_STATUS_COMMITTED ) {
920
- elog (LOG , "Commit transaction %s because it is in state %d at node %d" ,
921
+ if (ts -> status == TRANSACTION_STATUS_UNKNOWN ) {
922
+ if (msg -> status == TRANSACTION_STATUS_IN_PROGRESS || msg -> status == TRANSACTION_STATUS_ABORTED ) {
923
+ elog (LOG , "Abort transaction %s because it is in state %d at node %d" ,
921
924
msg -> gid , ts -> status , node );
922
- Assert (!IsTransactionState ());
923
- StartTransactionCommand ();
924
- MtmSetCurrentTransactionGID (ts -> gid );
925
- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
926
- FinishPreparedTransaction (ts -> gid , true);
927
- CommitTransactionCommand ();
928
- Assert (ts -> status == TRANSACTION_STATUS_COMMITTED );
929
- } else if (msg -> status == TRANSACTION_STATUS_ABORTED
930
- || ((ts -> participantsMask & ~Mtm -> disabledNodeMask ) & ~ts -> votedMask ) == 0 )
925
+ MtmFinishPreparedTransaction (node , ts , false);
926
+ }
927
+ else if (msg -> status == TRANSACTION_STATUS_COMMITTED || msg -> status == TRANSACTION_STATUS_UNKNOWN )
931
928
{
932
- if (msg -> status == TRANSACTION_STATUS_ABORTED ) {
933
- elog (LOG , "Abort transaction %s because it is aborted at node %d" , msg -> gid , node );
934
- } else {
935
- elog (LOG , "Abort transaction %s because it is not prepared at any online node" , msg -> gid );
929
+ if (msg -> csn > ts -> csn ) {
930
+ ts -> csn = msg -> csn ;
931
+ MtmSyncClock (ts -> csn );
932
+ }
933
+ if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask ) == 0 ) {
934
+ elog (LOG , "Commit transaction %s because it is prepared at all live nodes" , msg -> gid );
935
+ MtmFinishPreparedTransaction (node , ts , true);
936
936
}
937
- Assert (!IsTransactionState ());
938
- StartTransactionCommand ();
939
- MtmSetCurrentTransactionGID (ts -> gid );
940
- FinishPreparedTransaction (ts -> gid , false);
941
- CommitTransactionCommand ();
942
- Assert (ts -> status == TRANSACTION_STATUS_ABORTED );
943
937
} else {
944
938
elog (LOG , "Receive response %d for transaction %s for node %d, votedMask=%llx, participantsMask=%llx" ,
945
- msg -> status , msg -> gid , node , (long long ) ts -> votedMask ,
946
- (long long ) (ts -> participantsMask & ~Mtm -> disabledNodeMask ) );
939
+ msg -> status , msg -> gid , node , (long long ) ts -> votedMask , (long long ) (ts -> participantsMask & ~Mtm -> disabledNodeMask ));
947
940
continue ;
948
941
}
949
942
} else if (ts -> status == TRANSACTION_STATUS_ABORTED && msg -> status == TRANSACTION_STATUS_COMMITTED ) {
950
943
elog (WARNING , "Transaction %s is aborted at node %d but committed at node %d" , msg -> gid , MtmNodeId , node );
951
944
} else if (msg -> status == TRANSACTION_STATUS_ABORTED && ts -> status == TRANSACTION_STATUS_COMMITTED ) {
952
945
elog (WARNING , "Transaction %s is committed at node %d but aborted at node %d" , msg -> gid , MtmNodeId , node );
946
+ } else {
947
+ elog (LOG , "Receive response %d for transaction %s status %d for node %d, votedMask=%llx, participantsMask=%llx" ,
948
+ msg -> status , msg -> gid , ts -> status , node , (long long ) ts -> votedMask , (long long ) (ts -> participantsMask & ~Mtm -> disabledNodeMask ) );
953
949
}
954
950
}
955
951
continue ;
@@ -965,50 +961,49 @@ static void MtmReceiver(Datum arg)
965
961
elog (WARNING , "Ignore response for unexisted transaction %d from node %d" , msg -> dxid , node );
966
962
continue ;
967
963
}
964
+ if (BIT_CHECK (ts -> votedMask , node - 1 )) {
965
+ elog (WARNING , "Receive deteriorated %s response for transaction %d (%s) from node %d" ,
966
+ messageKindText [msg -> code ], ts -> xid , ts -> gid , node );
967
+ continue ;
968
+ }
968
969
MtmCheckResponse (msg );
969
-
970
+ BIT_SET (ts -> votedMask , node - 1 );
971
+
970
972
if (MtmIsCoordinator (ts )) {
971
973
switch (msg -> code ) {
972
- case MSG_READY :
973
- MTM_TXTRACE (ts , "MtmTransReceiver got MSG_READY " );
974
+ case MSG_PREPARED :
975
+ MTM_TXTRACE (ts , "MtmTransReceiver got MSG_PREPARED " );
974
976
if (ts -> status == TRANSACTION_STATUS_COMMITTED ) {
975
- elog (WARNING , "Receive READY response for already committed transaction %d from node %d" ,
977
+ elog (WARNING , "Receive PREPARED response for already committed transaction %d from node %d" ,
976
978
ts -> xid , node );
977
979
continue ;
978
980
}
979
- if (ts -> nVotes >= Mtm -> nLiveNodes ) {
980
- elog (WARNING , "Receive deteriorated READY response for transaction %d (%s) from node %d" ,
981
- ts -> xid , ts -> gid , node );
981
+ Mtm -> nodes [node - 1 ].transDelay += MtmGetCurrentTime () - ts -> csn ;
982
+ ts -> xids [node - 1 ] = msg -> sxid ;
983
+
984
+ if ((~msg -> disabledNodeMask & Mtm -> disabledNodeMask ) != 0 ) {
985
+ /* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
986
+ commit on smaller subset of nodes */
987
+ elog (WARNING , "Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx" ,
988
+ node , (long ) Mtm -> disabledNodeMask , (long ) msg -> disabledNodeMask );
982
989
MtmAbortTransaction (ts );
983
- MtmWakeUpBackend (ts );
984
- } else {
985
- Mtm -> nodes [node - 1 ].transDelay += MtmGetCurrentTime () - ts -> csn ;
986
- ts -> xids [node - 1 ] = msg -> sxid ;
987
-
988
- if ((~msg -> disabledNodeMask & Mtm -> disabledNodeMask ) != 0 ) {
989
- /* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
990
- commit on smaller subset of nodes */
991
- elog (WARNING , "Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx" ,
992
- node , (long ) Mtm -> disabledNodeMask , (long ) msg -> disabledNodeMask );
993
- MtmAbortTransaction (ts );
994
- }
995
-
996
- if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
997
- /* All nodes are finished their transactions */
998
- if (ts -> status == TRANSACTION_STATUS_ABORTED ) {
999
- MtmWakeUpBackend (ts );
990
+ }
991
+ if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask ) == 0 ) {
992
+ /* All nodes are finished their transactions */
993
+ if (ts -> status == TRANSACTION_STATUS_ABORTED ) {
994
+ MtmWakeUpBackend (ts );
995
+ } else {
996
+ Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
997
+ ts -> isPrepared = true;
998
+ if (ts -> isTwoPhase ) {
999
+ MtmWakeUpBackend (ts );
1000
+ } else if (MtmUseDtm ) {
1001
+ ts -> votedMask = 0 ;
1002
+ MTM_TXTRACE (ts , "MtmTransReceiver send MSG_PRECOMMIT" );
1003
+ MtmSend2PCMessage (ts , MSG_PRECOMMIT );
1000
1004
} else {
1001
- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1002
- if (ts -> isTwoPhase ) {
1003
- MtmWakeUpBackend (ts );
1004
- } else if (MtmUseDtm ) {
1005
- ts -> nVotes = 1 ; /* I voted myself */
1006
- MTM_TXTRACE (ts , "MtmTransReceiver send MSG_PREPARE" );
1007
- MtmSend2PCMessage (ts , MSG_PREPARE );
1008
- } else {
1009
- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1010
- MtmWakeUpBackend (ts );
1011
- }
1005
+ ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1006
+ MtmWakeUpBackend (ts );
1012
1007
}
1013
1008
}
1014
1009
}
@@ -1023,47 +1018,40 @@ static void MtmReceiver(Datum arg)
1023
1018
Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1024
1019
MtmAbortTransaction (ts );
1025
1020
}
1026
- if (++ ts -> nVotes >= Mtm -> nLiveNodes ) {
1021
+ if (( ts -> participantsMask & ~ Mtm -> disabledNodeMask & ~ ts -> votedMask ) == 0 ) {
1027
1022
MtmWakeUpBackend (ts );
1028
1023
}
1029
1024
break ;
1030
- case MSG_PREPARED :
1031
- MTM_TXTRACE (ts , "MtmTransReceiver got MSG_PREPARED" );
1032
- if (ts -> nVotes >= Mtm -> nLiveNodes ) {
1033
- elog (WARNING , "Receive deteriorated PREPARED response for transaction %d (%s) from node %d" ,
1034
- ts -> xid , ts -> gid , node );
1035
- MtmAbortTransaction (ts );
1036
- MtmWakeUpBackend (ts );
1025
+ case MSG_PRECOMMITTED :
1026
+ MTM_TXTRACE (ts , "MtmTransReceiver got MSG_PRECOMMITTED" );
1027
+ if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1028
+ Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1029
+ if (msg -> csn > ts -> csn ) {
1030
+ ts -> csn = msg -> csn ;
1031
+ MtmSyncClock (ts -> csn );
1032
+ }
1033
+ if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask ) == 0 ) {
1034
+ ts -> csn = MtmAssignCSN ();
1035
+ ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1036
+ MtmWakeUpBackend (ts );
1037
+ }
1037
1038
} else {
1038
- if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1039
- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1040
- if (msg -> csn > ts -> csn ) {
1041
- ts -> csn = msg -> csn ;
1042
- MtmSyncClock (ts -> csn );
1043
- }
1044
- if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
1045
- ts -> csn = MtmAssignCSN ();
1046
- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1047
- MtmWakeUpBackend (ts );
1048
- }
1049
- } else {
1050
- if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
1051
- MtmWakeUpBackend (ts );
1052
- }
1053
- }
1054
- }
1039
+ if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask ) == 0 ) {
1040
+ MtmWakeUpBackend (ts );
1041
+ }
1042
+ }
1055
1043
break ;
1056
1044
default :
1057
1045
Assert (false);
1058
1046
}
1059
1047
} else {
1060
1048
switch (msg -> code ) {
1061
- case MSG_PREPARE :
1049
+ case MSG_PRECOMMIT :
1062
1050
if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
1063
1051
ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1064
1052
ts -> csn = MtmAssignCSN ();
1065
1053
MtmAdjustSubtransactions (ts );
1066
- MtmSend2PCMessage (ts , MSG_PREPARED );
1054
+ MtmSend2PCMessage (ts , MSG_PRECOMMITTED );
1067
1055
} else {
1068
1056
Assert (ts -> status == TRANSACTION_STATUS_ABORTED );
1069
1057
MtmSend2PCMessage (ts , MSG_ABORTED );
0 commit comments