@@ -1104,7 +1104,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
1104
1104
static void
1105
1105
MtmEndTransaction (MtmCurrentTrans * x , bool commit )
1106
1106
{
1107
- MTM_LOG1 ("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, 2pc=%d, gid=%s -> %s" ,
1107
+ MTM_LOG2 ("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, 2pc=%d, gid=%s -> %s" ,
1108
1108
MyProcPid , x -> xid , x -> isPrepared , x -> isReplicated , x -> isDistributed , x -> isTwoPhase , x -> gid , commit ? "commit" : "abort" );
1109
1109
if (x -> status != TRANSACTION_STATUS_ABORTED && x -> isDistributed && (x -> isPrepared || x -> isReplicated ) && !x -> isTwoPhase ) {
1110
1110
MtmTransState * ts = NULL ;
@@ -1122,7 +1122,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
1122
1122
}
1123
1123
if (ts != NULL ) {
1124
1124
if (* ts -> gid )
1125
- MTM_LOG1 ("TRANSLOG: %s transaction %s status %d" , (commit ? "commit" : "rollback" ), ts -> gid , ts -> status );
1125
+ MTM_LOG2 ("TRANSLOG: %s transaction %s status %d" , (commit ? "commit" : "rollback" ), ts -> gid , ts -> status );
1126
1126
if (commit ) {
1127
1127
if (!(ts -> status == TRANSACTION_STATUS_UNKNOWN
1128
1128
|| (ts -> status == TRANSACTION_STATUS_IN_PROGRESS && Mtm -> status == MTM_RECOVERY )))
@@ -1177,6 +1177,9 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
1177
1177
Mtm -> nActiveTransactions -= 1 ;
1178
1178
}
1179
1179
MtmTransactionListAppend (ts );
1180
+ if (* x -> gid ) {
1181
+ LogLogicalMessage ("A" , x -> gid , strlen (x -> gid ) + 1 , false);
1182
+ }
1180
1183
}
1181
1184
MtmSend2PCMessage (ts , MSG_ABORTED ); /* send notification to coordinator */
1182
1185
} else if (x -> status == TRANSACTION_STATUS_ABORTED && x -> isReplicated && !x -> isPrepared ) {
@@ -1230,7 +1233,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
1230
1233
MtmSendMessage (& msg );
1231
1234
}
1232
1235
}
1233
- } else {
1236
+ } else if (! BIT_CHECK ( Mtm -> disabledNodeMask , ts -> gtid . node - 1 )) {
1234
1237
msg .node = ts -> gtid .node ;
1235
1238
msg .dxid = ts -> gtid .xid ;
1236
1239
MtmSendMessage (& msg );
@@ -1436,7 +1439,7 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
1436
1439
Assert (ts -> gid [0 ]);
1437
1440
if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
1438
1441
elog (LOG , "Abort transaction %s because its coordinator is disabled and it is not prepared at node %d" , ts -> gid , MtmNodeId );
1439
- MtmFinishPreparedTransaction (disabledNodeId , ts , false);
1442
+ MtmFinishPreparedTransaction (ts , false);
1440
1443
} else {
1441
1444
MTM_LOG1 ("Poll state of transaction %d (%s)" , ts -> xid , ts -> gid );
1442
1445
MtmBroadcastPollMessage (ts );
@@ -1459,7 +1462,9 @@ static void MtmDisableNode(int nodeId)
1459
1462
if (nodeId != MtmNodeId ) {
1460
1463
Mtm -> nLiveNodes -= 1 ;
1461
1464
}
1465
+ MtmUnlock ();
1462
1466
MtmPollStatusOfPreparedTransactions (nodeId );
1467
+ MtmLock (LW_EXCLUSIVE );
1463
1468
}
1464
1469
1465
1470
static void MtmEnableNode (int nodeId )
@@ -2780,34 +2785,41 @@ void MtmReleaseRecoverySlot(int nodeId)
2780
2785
}
2781
2786
}
2782
2787
2783
- void MtmFinishPreparedTransaction (int nodeId , MtmTransState * ts , bool commit )
2788
+ void MtmRollbackPreparedTransaction (char const * gid )
2789
+ {
2790
+ MTM_LOG1 ("Abort prepared transaction %s" , gid );
2791
+ if (MtmExchangeGlobalTransactionStatus (gid , TRANSACTION_STATUS_ABORTED ) == TRANSACTION_STATUS_UNKNOWN ) {
2792
+ MTM_LOG1 ("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2" , gid );
2793
+ MtmResetTransaction ();
2794
+ StartTransactionCommand ();
2795
+ MtmBeginSession (MtmReplicationNodeId );
2796
+ MtmSetCurrentTransactionGID (gid );
2797
+ FinishPreparedTransaction (gid , false);
2798
+ CommitTransactionCommand ();
2799
+ MtmEndSession (MtmReplicationNodeId , true);
2800
+ }
2801
+ }
2802
+
2803
+
2804
+ void MtmFinishPreparedTransaction (MtmTransState * ts , bool commit )
2784
2805
{
2785
2806
Assert (ts -> votingCompleted );
2786
2807
Assert (!IsTransactionState ());
2787
2808
MtmResetTransaction ();
2788
2809
StartTransactionCommand ();
2789
- MtmBeginSession (nodeId );
2810
+ if (Mtm -> nodes [MtmNodeId - 1 ].originId == InvalidRepOriginId ) {
2811
+ /* This dummy origin is used for local commits/aborts which should not be replicated */
2812
+ Mtm -> nodes [MtmNodeId - 1 ].originId = replorigin_create (psprintf (MULTIMASTER_SLOT_PATTERN , MtmNodeId ));
2813
+ }
2814
+ MtmBeginSession (MtmNodeId );
2790
2815
MtmSetCurrentTransactionCSN (ts -> csn );
2791
2816
MtmSetCurrentTransactionGID (ts -> gid );
2792
2817
FinishPreparedTransaction (ts -> gid , commit );
2793
2818
CommitTransactionCommand ();
2794
- MtmEndSession (nodeId , true);
2819
+ MtmEndSession (MtmNodeId , true);
2795
2820
Assert (ts -> status == commit ? TRANSACTION_STATUS_COMMITTED : TRANSACTION_STATUS_ABORTED );
2796
2821
}
2797
2822
2798
- #if 0
2799
- static void MtmFinishAllPreparedTransactions (void )
2800
- {
2801
- MtmTransState * ts ;
2802
- for (ts = Mtm -> transListHead ; ts != NULL ; ts = ts -> next ) {
2803
- if (ts -> status != TRANSACTION_STATUS_COMMITTED && ts -> status != TRANSACTION_STATUS_ABORTED ) {
2804
- MtmFinishPreparedTransaction (MtmReplicationNodeId , ts , false);
2805
- }
2806
- }
2807
- }
2808
- #endif
2809
-
2810
-
2811
2823
/*
2812
2824
* Determine when and how we should open replication slot.
2813
2825
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
@@ -2841,11 +2853,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
2841
2853
Mtm -> nodes [i ].restartLsn = InvalidXLogRecPtr ;
2842
2854
}
2843
2855
MtmUnlock ();
2844
- #if 0
2845
- MtmBeginSession (MtmReplicationNodeId );
2846
- FinishAllPreparedTransactions (false);
2847
- MtmEndSession (MtmReplicationNodeId , true);
2848
- #endif
2849
2856
return REPLMODE_RECOVERY ;
2850
2857
}
2851
2858
}
0 commit comments