@@ -71,7 +71,7 @@ void _PG_fini(void);
71
71
static Snapshot DtmGetSnapshot (Snapshot snapshot );
72
72
static void DtmMergeWithGlobalSnapshot (Snapshot snapshot );
73
73
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn );
74
- static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn );
74
+ static bool DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn );
75
75
static void DtmUpdateRecentXmin (Snapshot snapshot );
76
76
static void DtmInitialize (void );
77
77
static void DtmXactCallback (XactEvent event , void * arg );
@@ -627,8 +627,9 @@ static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
627
627
return status ;
628
628
}
629
629
630
- static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn )
630
+ static bool DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn )
631
631
{
632
+ bool acknowledged = true;
632
633
XTM_INFO ("%d: DtmSetTransactionStatus %u = %u\n" , getpid (), xid , status );
633
634
if (!RecoveryInProgress ())
634
635
{
@@ -640,7 +641,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
640
641
PgTransactionIdSetTreeStatus (xid , nsubxids , subxids , status , lsn );
641
642
DtmGlobalSetTransStatus (xid , status , false);
642
643
XTM_INFO ("Abort transaction %d\n" , xid );
643
- return ;
644
+ return true ;
644
645
}
645
646
else
646
647
{
@@ -649,8 +650,13 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
649
650
LWLockAcquire (dtm -> hashLock , LW_EXCLUSIVE );
650
651
hash_search (xid_in_doubt , & DtmNextXid , HASH_ENTER , NULL );
651
652
LWLockRelease (dtm -> hashLock );
652
- DtmGlobalSetTransStatus (xid , status , true);
653
- XTM_INFO ("Commit transaction %d\n" , xid );
653
+ if (!DtmGlobalSetTransStatus (xid , status , true)) {
654
+ acknowledged = false;
655
+ XTM_INFO ("Commit of transaction %d in rejected by DTM\n" , xid );
656
+ status = TRANSACTION_STATUS_ABORTED ;
657
+ } else {
658
+ XTM_INFO ("Commit transaction %d\n" , xid );
659
+ }
654
660
}
655
661
}
656
662
else
@@ -661,11 +667,13 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
661
667
else
662
668
{
663
669
XidStatus gs ;
664
- gs = DtmGlobalGetTransStatus (xid , false);
665
- if (gs != TRANSACTION_STATUS_UNKNOWN )
670
+ gs = DtmGlobalGetTransStatus (xid , false);
671
+ if (gs != TRANSACTION_STATUS_UNKNOWN ) {
672
+ acknowledged = status == gs ;
666
673
status = gs ;
674
+ }
667
675
}
668
- PgTransactionIdSetTreeStatus (xid , nsubxids , subxids , status , lsn );
676
+ return PgTransactionIdSetTreeStatus (xid , nsubxids , subxids , status , lsn ) && acknowledged ;
669
677
}
670
678
671
679
static uint32 dtm_xid_hash_fn (const void * key , Size keysize )
@@ -991,27 +999,35 @@ static void DtmSerializeLock(PROCLOCK* proclock, void* arg)
991
999
{
992
1000
ByteBuffer * buf = (ByteBuffer * )arg ;
993
1001
LOCK * lock = proclock -> tag .myLock ;
994
- PGPROC * proc = proclock -> tag .myProc ;
1002
+ PGPROC * proc = proclock -> tag .myProc ;
1003
+
995
1004
if (lock != NULL ) {
996
- if (proc -> waitLock == lock ) {
1005
+ PGXACT * srcPgXact = & ProcGlobal -> allPgXact [proc -> pgprocno ];
1006
+
1007
+ if (TransactionIdIsValid (srcPgXact -> xid ) && proc -> waitLock == lock ) {
997
1008
LockMethod lockMethodTable = GetLocksMethodTable (lock );
998
1009
int numLockModes = lockMethodTable -> numLockModes ;
999
1010
int conflictMask = lockMethodTable -> conflictTab [proc -> waitLockMode ];
1000
1011
SHM_QUEUE * procLocks = & (lock -> procLocks );
1001
1012
int lm ;
1002
1013
1003
- ByteBufferAppendInt32 (buf , proc -> lxid ); /* waiting transaction */
1014
+ ByteBufferAppendInt32 (buf , srcPgXact -> xid ); /* waiting transaction */
1004
1015
proclock = (PROCLOCK * ) SHMQueueNext (procLocks , procLocks ,
1005
1016
offsetof(PROCLOCK , lockLink ));
1006
1017
while (proclock )
1007
1018
{
1008
1019
if (proc != proclock -> tag .myProc ) {
1009
- for (lm = 1 ; lm <= numLockModes ; lm ++ )
1010
- {
1011
- if ((proclock -> holdMask & LOCKBIT_ON (lm )) && (conflictMask & LOCKBIT_ON (lm )))
1020
+ PGXACT * dstPgXact = & ProcGlobal -> allPgXact [proclock -> tag .myProc -> pgprocno ];
1021
+ if (TransactionIdIsValid (dstPgXact -> xid )) {
1022
+ Assert (srcPgXact -> xid != dstPgXact -> xid );
1023
+ for (lm = 1 ; lm <= numLockModes ; lm ++ )
1012
1024
{
1013
- ByteBufferAppendInt32 (buf , proclock -> tag .myProc -> lxid ); /* transaction holding lock */
1014
- break ;
1025
+ if ((proclock -> holdMask & LOCKBIT_ON (lm )) && (conflictMask & LOCKBIT_ON (lm )))
1026
+ {
1027
+ XTM_INFO ("%d: %u(%u) waits for %u(%u)\n" , getpid (), srcPgXact -> xid , proc -> pid , dstPgXact -> xid , proclock -> tag .myProc -> pid );
1028
+ ByteBufferAppendInt32 (buf , dstPgXact -> xid ); /* transaction holding lock */
1029
+ break ;
1030
+ }
1015
1031
}
1016
1032
}
1017
1033
}
@@ -1025,12 +1041,19 @@ static void DtmSerializeLock(PROCLOCK* proclock, void* arg)
1025
1041
1026
1042
bool DtmDetectGlobalDeadLock (PGPROC * proc )
1027
1043
{
1028
- bool hasDeadlock ;
1044
+ bool hasDeadlock = false ;
1029
1045
ByteBuffer buf ;
1030
- ByteBufferAlloc (& buf );
1031
- EnumerateLocks (DtmSerializeLock , & buf );
1032
- hasDeadlock = DtmGlobalDetectDeadLock (PostPortNumber , proc -> lxid , buf .data , buf .used );
1033
- ByteBufferFree (& buf );
1034
- elog (NOTICE , "Deadlock detected for transaction %u" , proc -> lxid );
1046
+ PGXACT * pgxact = & ProcGlobal -> allPgXact [proc -> pgprocno ];
1047
+
1048
+ if (TransactionIdIsValid (pgxact -> xid )) {
1049
+ ByteBufferAlloc (& buf );
1050
+ XTM_INFO ("%d: wait graph begin\n" , getpid ());
1051
+ EnumerateLocks (DtmSerializeLock , & buf );
1052
+ XTM_INFO ("%d: wait graph end\n" , getpid ());
1053
+ hasDeadlock = DtmGlobalDetectDeadLock (PostPortNumber , pgxact -> xid , buf .data , buf .used );
1054
+ ByteBufferFree (& buf );
1055
+ XTM_INFO ("%d: deadlock detected for %u\n" , getpid (), pgxact -> xid );
1056
+ elog (WARNING , "Deadlock detected for transaction %u" , pgxact -> xid );
1057
+ }
1035
1058
return hasDeadlock ;
1036
1059
}
0 commit comments