@@ -1104,7 +1104,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
1104
1104
static void
1105
1105
MtmEndTransaction (MtmCurrentTrans * x , bool commit )
1106
1106
{
1107
- MTM_LOG1 ("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, 2pc=%d, gid=%s -> %s" ,
1107
+ MTM_LOG2 ("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, 2pc=%d, gid=%s -> %s" ,
1108
1108
MyProcPid , x -> xid , x -> isPrepared , x -> isReplicated , x -> isDistributed , x -> isTwoPhase , x -> gid , commit ? "commit" : "abort" );
1109
1109
if (x -> status != TRANSACTION_STATUS_ABORTED && x -> isDistributed && (x -> isPrepared || x -> isReplicated ) && !x -> isTwoPhase ) {
1110
1110
MtmTransState * ts = NULL ;
@@ -1122,7 +1122,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
1122
1122
}
1123
1123
if (ts != NULL ) {
1124
1124
if (* ts -> gid )
1125
- MTM_LOG1 ("TRANSLOG: %s transaction %s status %d" , (commit ? "commit" : "rollback" ), ts -> gid , ts -> status );
1125
+ MTM_LOG2 ("TRANSLOG: %s transaction %s status %d" , (commit ? "commit" : "rollback" ), ts -> gid , ts -> status );
1126
1126
if (commit ) {
1127
1127
if (!(ts -> status == TRANSACTION_STATUS_UNKNOWN
1128
1128
|| (ts -> status == TRANSACTION_STATUS_IN_PROGRESS && Mtm -> status == MTM_RECOVERY )))
@@ -1177,6 +1177,9 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
1177
1177
Mtm -> nActiveTransactions -= 1 ;
1178
1178
}
1179
1179
MtmTransactionListAppend (ts );
1180
+ if (* x -> gid ) {
1181
+ LogLogicalMessage ("A" , x -> gid , strlen (x -> gid ) + 1 , false);
1182
+ }
1180
1183
}
1181
1184
MtmSend2PCMessage (ts , MSG_ABORTED ); /* send notification to coordinator */
1182
1185
} else if (x -> status == TRANSACTION_STATUS_ABORTED && x -> isReplicated && !x -> isPrepared ) {
@@ -1230,7 +1233,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
1230
1233
MtmSendMessage (& msg );
1231
1234
}
1232
1235
}
1233
- } else {
1236
+ } else if (! BIT_CHECK ( Mtm -> disabledNodeMask , ts -> gtid . node - 1 )) {
1234
1237
msg .node = ts -> gtid .node ;
1235
1238
msg .dxid = ts -> gtid .xid ;
1236
1239
MtmSendMessage (& msg );
@@ -1436,7 +1439,7 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
1436
1439
Assert (ts -> gid [0 ]);
1437
1440
if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
1438
1441
elog (LOG , "Abort transaction %s because its coordinator is disabled and it is not prepared at node %d" , ts -> gid , MtmNodeId );
1439
- MtmFinishPreparedTransaction (disabledNodeId , ts , false);
1442
+ MtmFinishPreparedTransaction (ts , false);
1440
1443
} else {
1441
1444
MTM_LOG1 ("Poll state of transaction %d (%s)" , ts -> xid , ts -> gid );
1442
1445
MtmBroadcastPollMessage (ts );
@@ -1459,7 +1462,9 @@ static void MtmDisableNode(int nodeId)
1459
1462
if (nodeId != MtmNodeId ) {
1460
1463
Mtm -> nLiveNodes -= 1 ;
1461
1464
}
1465
+ MtmUnlock ();
1462
1466
MtmPollStatusOfPreparedTransactions (nodeId );
1467
+ MtmLock (LW_EXCLUSIVE );
1463
1468
}
1464
1469
1465
1470
static void MtmEnableNode (int nodeId )
@@ -2778,34 +2783,41 @@ void MtmReleaseRecoverySlot(int nodeId)
2778
2783
}
2779
2784
}
2780
2785
2781
- void MtmFinishPreparedTransaction ( int nodeId , MtmTransState * ts , bool commit )
2786
+ void MtmRollbackPreparedTransaction ( char const * gid )
2782
2787
{
2788
+ MTM_LOG1 ("Abort prepared transaction %s" , gid );
2789
+ if (MtmExchangeGlobalTransactionStatus (gid , TRANSACTION_STATUS_ABORTED ) == TRANSACTION_STATUS_UNKNOWN ) {
2790
+ MTM_LOG1 ("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2" , gid );
2791
+ MtmResetTransaction ();
2792
+ StartTransactionCommand ();
2793
+ MtmBeginSession (MtmReplicationNodeId );
2794
+ MtmSetCurrentTransactionGID (gid );
2795
+ FinishPreparedTransaction (gid , false);
2796
+ CommitTransactionCommand ();
2797
+ MtmEndSession (MtmReplicationNodeId , true);
2798
+ }
2799
+ }
2800
+
2801
+
2802
+ void MtmFinishPreparedTransaction (MtmTransState * ts , bool commit )
2803
+ {
2804
+ if (Mtm -> nodes [MtmNodeId - 1 ].originId == InvalidRepOriginId ) {
2805
+ /* This dummy origin is used for local commits/aborts which should not be replicated */
2806
+ Mtm -> nodes [MtmNodeId - 1 ].originId = replorigin_create (psprintf (MULTIMASTER_SLOT_PATTERN , MtmNodeId ));
2807
+ }
2783
2808
Assert (ts -> votingCompleted );
2784
2809
Assert (!IsTransactionState ());
2785
2810
MtmResetTransaction ();
2786
2811
StartTransactionCommand ();
2787
- MtmBeginSession (nodeId );
2812
+ MtmBeginSession (MtmNodeId );
2788
2813
MtmSetCurrentTransactionCSN (ts -> csn );
2789
2814
MtmSetCurrentTransactionGID (ts -> gid );
2790
2815
FinishPreparedTransaction (ts -> gid , commit );
2791
2816
CommitTransactionCommand ();
2792
- MtmEndSession (nodeId , true);
2817
+ MtmEndSession (MtmNodeId , true);
2793
2818
Assert (ts -> status == commit ? TRANSACTION_STATUS_COMMITTED : TRANSACTION_STATUS_ABORTED );
2794
2819
}
2795
2820
2796
- #if 0
2797
- static void MtmFinishAllPreparedTransactions (void )
2798
- {
2799
- MtmTransState * ts ;
2800
- for (ts = Mtm -> transListHead ; ts != NULL ; ts = ts -> next ) {
2801
- if (ts -> status != TRANSACTION_STATUS_COMMITTED && ts -> status != TRANSACTION_STATUS_ABORTED ) {
2802
- MtmFinishPreparedTransaction (MtmReplicationNodeId , ts , false);
2803
- }
2804
- }
2805
- }
2806
- #endif
2807
-
2808
-
2809
2821
/*
2810
2822
* Determine when and how we should open replication slot.
2811
2823
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
@@ -2839,11 +2851,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
2839
2851
Mtm -> nodes [i ].restartLsn = InvalidXLogRecPtr ;
2840
2852
}
2841
2853
MtmUnlock ();
2842
- #if 0
2843
- MtmBeginSession (MtmReplicationNodeId );
2844
- FinishAllPreparedTransactions (false);
2845
- MtmEndSession (MtmReplicationNodeId , true);
2846
- #endif
2847
2854
return REPLMODE_RECOVERY ;
2848
2855
}
2849
2856
}
0 commit comments