@@ -243,6 +243,7 @@ void MtmLock(LWLockMode mode)
243
243
#else
244
244
LWLockAcquire ((LWLockId )& Mtm -> locks [MTM_STATE_LOCK_ID ], mode );
245
245
#endif
246
+ Mtm -> lastLockHolder = MyProcPid ;
246
247
}
247
248
248
249
void MtmUnlock (void )
@@ -252,6 +253,7 @@ void MtmUnlock(void)
252
253
#else
253
254
LWLockRelease ((LWLockId )& Mtm -> locks [MTM_STATE_LOCK_ID ]);
254
255
#endif
256
+ Mtm -> lastLockHolder = 0 ;
255
257
}
256
258
257
259
void MtmLockNode (int nodeId )
@@ -550,16 +552,20 @@ MtmAdjustOldestXid(TransactionId xid)
550
552
551
553
static void MtmTransactionListAppend (MtmTransState * ts )
552
554
{
553
- ts -> next = NULL ;
554
- ts -> nSubxids = 0 ;
555
- * Mtm -> transListTail = ts ;
556
- Mtm -> transListTail = & ts -> next ;
555
+ if (!ts -> isEnqueued ) {
556
+ ts -> isEnqueued = true;
557
+ ts -> next = NULL ;
558
+ ts -> nSubxids = 0 ;
559
+ * Mtm -> transListTail = ts ;
560
+ Mtm -> transListTail = & ts -> next ;
561
+ }
557
562
}
558
563
559
564
static void MtmTransactionListInsertAfter (MtmTransState * after , MtmTransState * ts )
560
565
{
561
566
ts -> next = after -> next ;
562
567
after -> next = ts ;
568
+ ts -> isEnqueued = true;
563
569
if (Mtm -> transListTail == & after -> next ) {
564
570
Mtm -> transListTail = & ts -> next ;
565
571
}
@@ -700,6 +706,9 @@ MtmCreateTransState(MtmCurrentTrans* x)
700
706
ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
701
707
ts -> snapshot = x -> snapshot ;
702
708
ts -> isLocal = true;
709
+ if (!found ) {
710
+ ts -> isEnqueued = false;
711
+ }
703
712
if (TransactionIdIsValid (x -> gtid .xid )) {
704
713
Assert (x -> gtid .node != MtmNodeId );
705
714
ts -> gtid = x -> gtid ;
@@ -829,6 +838,9 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
829
838
Assert (x -> gid [0 ]);
830
839
tm -> state = ts ;
831
840
ts -> votingCompleted = true;
841
+ if (!found ) {
842
+ ts -> isEnqueued = false;
843
+ }
832
844
if (Mtm -> status != MTM_RECOVERY ) {
833
845
MtmSendNotificationMessage (ts , MSG_READY ); /* send notification to coordinator */
834
846
if (!MtmUseDtm ) {
@@ -937,8 +949,12 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
937
949
*/
938
950
MTM_LOG1 ("%d: send ABORT notification abort transaction %d to coordinator %d" , MyProcPid , x -> gtid .xid , x -> gtid .node );
939
951
if (ts == NULL ) {
952
+ bool found ;
940
953
Assert (TransactionIdIsValid (x -> xid ));
941
- ts = hash_search (MtmXid2State , & x -> xid , HASH_ENTER , NULL );
954
+ ts = hash_search (MtmXid2State , & x -> xid , HASH_ENTER , & found );
955
+ if (!found ) {
956
+ ts -> isEnqueued = false;
957
+ }
942
958
ts -> status = TRANSACTION_STATUS_ABORTED ;
943
959
ts -> isLocal = true;
944
960
ts -> snapshot = x -> snapshot ;
@@ -1355,7 +1371,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
1355
1371
*/
1356
1372
bool MtmRefreshClusterStatus (bool nowait )
1357
1373
{
1358
- nodemask_t mask , clique , disabled , enabled ;
1374
+ nodemask_t mask , clique , disabled ;
1359
1375
nodemask_t matrix [MAX_NODES ];
1360
1376
MtmTransState * ts ;
1361
1377
int clique_size ;
@@ -1382,28 +1398,29 @@ bool MtmRefreshClusterStatus(bool nowait)
1382
1398
MTM_LOG1 ("Find clique %lx, disabledNodeMask %lx" , (long ) clique , (long ) Mtm -> disabledNodeMask );
1383
1399
MtmLock (LW_EXCLUSIVE );
1384
1400
disabled = ~clique & (((nodemask_t )1 << Mtm -> nAllNodes )- 1 ) & ~Mtm -> disabledNodeMask ; /* new disabled nodes mask */
1385
- enabled = clique & Mtm -> disabledNodeMask ; /* new enabled nodes mask */
1386
1401
1387
1402
for (i = 0 , mask = disabled ; mask != 0 ; i ++ , mask >>= 1 ) {
1388
1403
if (mask & 1 ) {
1389
1404
MtmDisableNode (i + 1 );
1390
1405
}
1391
- }
1392
-
1406
+ }
1407
+ #if 0 /* Do not enable nodes here: them will be enabled after completion of recovery */
1408
+ enabled = clique & Mtm -> disabledNodeMask ; /* new enabled nodes mask */
1393
1409
for (i = 0 , mask = enabled ; mask != 0 ; i ++ , mask >>= 1 ) {
1394
1410
if (mask & 1 ) {
1395
1411
MtmEnableNode (i + 1 );
1396
1412
}
1397
1413
}
1398
- if (disabled |enabled ) {
1414
+ #endif
1415
+ if (disabled ) {
1399
1416
MtmCheckQuorum ();
1400
1417
}
1401
1418
/* Interrupt voting for active transaction and abort them */
1402
1419
for (ts = Mtm -> transListHead ; ts != NULL ; ts = ts -> next ) {
1403
1420
MTM_LOG3 ("Active transaction gid='%s', coordinator=%d, xid=%d, status=%d, gtid.xid=%d" ,
1404
1421
ts -> gid , ts -> gtid .node , ts -> xid , ts -> status , ts -> gtid .xid );
1405
1422
if (MtmIsCoordinator (ts )) {
1406
- if (!ts -> votingCompleted && ( disabled | enabled ) != 0 && ts -> status != TRANSACTION_STATUS_ABORTED ) {
1423
+ if (!ts -> votingCompleted && disabled != 0 && ts -> status != TRANSACTION_STATUS_ABORTED ) {
1407
1424
MtmAbortTransaction (ts );
1408
1425
MtmWakeUpBackend (ts );
1409
1426
}
@@ -2213,6 +2230,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
2213
2230
{
2214
2231
if (nodeId <= 0 || nodeId > Mtm -> nLiveNodes )
2215
2232
{
2233
+ MtmUnlock ();
2216
2234
elog (ERROR , "NodeID %d is out of range [1,%d]" , nodeId , Mtm -> nLiveNodes );
2217
2235
}
2218
2236
MtmDisableNode (nodeId );
@@ -2278,6 +2296,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
2278
2296
MtmEnableNode (MtmReplicationNodeId );
2279
2297
MtmCheckQuorum ();
2280
2298
} else {
2299
+ MtmUnlock ();
2281
2300
elog (ERROR , "Disabled node %d tries to reconnect without recovery" , MtmReplicationNodeId );
2282
2301
}
2283
2302
} else {
0 commit comments