Skip to content

Commit 69f6a0b

Browse files
committed
Distributed deadlock detection
1 parent 3c5b2fb commit 69f6a0b

File tree

5 files changed

+54
-34
lines changed

5 files changed

+54
-34
lines changed

contrib/pg_dtm/dtmd/src/ddd.c

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ void initGraph(Graph* graph)
1313
graph->freeEdges = NULL;
1414
graph->freeVertexes = NULL;
1515
graph->marker = 0;
16-
graph->min_deadlock_duration = 3;
1716
}
1817

1918
static inline Edge* newEdge(Graph* graph)
@@ -54,7 +53,6 @@ static inline Vertex* newVertex(Graph* graph)
5453
} else {
5554
graph->freeVertexes = v->next;
5655
}
57-
v->deadlock_duration = 0;
5856
return v;
5957
}
6058

@@ -148,9 +146,8 @@ bool detectDeadLock(Graph* graph, xid_t root)
148146
for (v = graph->hashtable[root % MAX_TRANSACTIONS]; v != NULL; v = v->next) {
149147
if (v->xid == root) {
150148
if (recursiveTraverseGraph(v, v, ++graph->marker)) {
151-
return ++v->deadlock_duration >= graph->min_deadlock_duration;
149+
return true;
152150
}
153-
v->deadlock_duration = 0;
154151
break;
155152
}
156153
}

contrib/pg_dtm/dtmd/src/main.c

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,7 @@ static void onmessage(client_t client, size_t len, char *data) {
599599

600600
static void usage(char *prog) {
601601
printf(
602-
"Usage: %s [-d DATADIR] [-k] [-a HOST] [-p PORT] [-l LOGFILE] [-m MIN_DEADLOCK_DURATION]\n"
602+
"Usage: %s [-d DATADIR] [-k] [-a HOST] [-p PORT] [-l LOGFILE]\n"
603603
" arbiter will try to kill the other one running at\n"
604604
" the same DATADIR.\n"
605605
" -l : Run as a daemon and write output to LOGFILE.\n"
@@ -716,9 +716,6 @@ int main(int argc, char **argv) {
716716
case 'k':
717717
assassin = true;
718718
break;
719-
case 'm':
720-
graph.min_deadlock_duration = atoi(optarg);
721-
break;
722719
default:
723720
usage(argv[0]);
724721
return EXIT_FAILURE;

contrib/pg_dtm/pg_dtm.c

Lines changed: 45 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ void _PG_fini(void);
7171
static Snapshot DtmGetSnapshot(Snapshot snapshot);
7272
static void DtmMergeWithGlobalSnapshot(Snapshot snapshot);
7373
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
74-
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
74+
static bool DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
7575
static void DtmUpdateRecentXmin(Snapshot snapshot);
7676
static void DtmInitialize(void);
7777
static void DtmXactCallback(XactEvent event, void *arg);
@@ -627,8 +627,9 @@ static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
627627
return status;
628628
}
629629

630-
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
630+
static bool DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
631631
{
632+
bool acknowledged = true;
632633
XTM_INFO("%d: DtmSetTransactionStatus %u = %u\n", getpid(), xid, status);
633634
if (!RecoveryInProgress())
634635
{
@@ -640,7 +641,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
640641
PgTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn);
641642
DtmGlobalSetTransStatus(xid, status, false);
642643
XTM_INFO("Abort transaction %d\n", xid);
643-
return;
644+
return true;
644645
}
645646
else
646647
{
@@ -649,8 +650,13 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
649650
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
650651
hash_search(xid_in_doubt, &DtmNextXid, HASH_ENTER, NULL);
651652
LWLockRelease(dtm->hashLock);
652-
DtmGlobalSetTransStatus(xid, status, true);
653-
XTM_INFO("Commit transaction %d\n", xid);
653+
if (!DtmGlobalSetTransStatus(xid, status, true)) {
654+
acknowledged = false;
655+
XTM_INFO("Commit of transaction %d in rejected by DTM\n", xid);
656+
status = TRANSACTION_STATUS_ABORTED;
657+
} else {
658+
XTM_INFO("Commit transaction %d\n", xid);
659+
}
654660
}
655661
}
656662
else
@@ -661,11 +667,13 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
661667
else
662668
{
663669
XidStatus gs;
664-
gs = DtmGlobalGetTransStatus(xid, false);
665-
if (gs != TRANSACTION_STATUS_UNKNOWN)
670+
gs = DtmGlobalGetTransStatus(xid, false);
671+
if (gs != TRANSACTION_STATUS_UNKNOWN) {
672+
acknowledged = status == gs;
666673
status = gs;
674+
}
667675
}
668-
PgTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn);
676+
return PgTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn) && acknowledged;
669677
}
670678

671679
static uint32 dtm_xid_hash_fn(const void *key, Size keysize)
@@ -991,27 +999,35 @@ static void DtmSerializeLock(PROCLOCK* proclock, void* arg)
991999
{
9921000
ByteBuffer* buf = (ByteBuffer*)arg;
9931001
LOCK* lock = proclock->tag.myLock;
994-
PGPROC* proc = proclock->tag.myProc;
1002+
PGPROC* proc = proclock->tag.myProc;
1003+
9951004
if (lock != NULL) {
996-
if (proc->waitLock == lock) {
1005+
PGXACT* srcPgXact = &ProcGlobal->allPgXact[proc->pgprocno];
1006+
1007+
if (TransactionIdIsValid(srcPgXact->xid) && proc->waitLock == lock) {
9971008
LockMethod lockMethodTable = GetLocksMethodTable(lock);
9981009
int numLockModes = lockMethodTable->numLockModes;
9991010
int conflictMask = lockMethodTable->conflictTab[proc->waitLockMode];
10001011
SHM_QUEUE *procLocks = &(lock->procLocks);
10011012
int lm;
10021013

1003-
ByteBufferAppendInt32(buf, proc->lxid); /* waiting transaction */
1014+
ByteBufferAppendInt32(buf, srcPgXact->xid); /* waiting transaction */
10041015
proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
10051016
offsetof(PROCLOCK, lockLink));
10061017
while (proclock)
10071018
{
10081019
if (proc != proclock->tag.myProc) {
1009-
for (lm = 1; lm <= numLockModes; lm++)
1010-
{
1011-
if ((proclock->holdMask & LOCKBIT_ON(lm)) && (conflictMask & LOCKBIT_ON(lm)))
1020+
PGXACT* dstPgXact = &ProcGlobal->allPgXact[proclock->tag.myProc->pgprocno];
1021+
if (TransactionIdIsValid(dstPgXact->xid)) {
1022+
Assert(srcPgXact->xid != dstPgXact->xid);
1023+
for (lm = 1; lm <= numLockModes; lm++)
10121024
{
1013-
ByteBufferAppendInt32(buf, proclock->tag.myProc->lxid); /* transaction holding lock */
1014-
break;
1025+
if ((proclock->holdMask & LOCKBIT_ON(lm)) && (conflictMask & LOCKBIT_ON(lm)))
1026+
{
1027+
XTM_INFO("%d: %u(%u) waits for %u(%u)\n", getpid(), srcPgXact->xid, proc->pid, dstPgXact->xid, proclock->tag.myProc->pid);
1028+
ByteBufferAppendInt32(buf, dstPgXact->xid); /* transaction holding lock */
1029+
break;
1030+
}
10151031
}
10161032
}
10171033
}
@@ -1025,12 +1041,19 @@ static void DtmSerializeLock(PROCLOCK* proclock, void* arg)
10251041

10261042
bool DtmDetectGlobalDeadLock(PGPROC* proc)
10271043
{
1028-
bool hasDeadlock;
1044+
bool hasDeadlock = false;
10291045
ByteBuffer buf;
1030-
ByteBufferAlloc(&buf);
1031-
EnumerateLocks(DtmSerializeLock, &buf);
1032-
hasDeadlock = DtmGlobalDetectDeadLock(PostPortNumber, proc->lxid, buf.data, buf.used);
1033-
ByteBufferFree(&buf);
1034-
elog(NOTICE, "Deadlock detected for transaction %u", proc->lxid);
1046+
PGXACT* pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
1047+
1048+
if (TransactionIdIsValid(pgxact->xid)) {
1049+
ByteBufferAlloc(&buf);
1050+
XTM_INFO("%d: wait graph begin\n", getpid());
1051+
EnumerateLocks(DtmSerializeLock, &buf);
1052+
XTM_INFO("%d: wait graph end\n", getpid());
1053+
hasDeadlock = DtmGlobalDetectDeadLock(PostPortNumber, pgxact->xid, buf.data, buf.used);
1054+
ByteBufferFree(&buf);
1055+
XTM_INFO("%d: deadlock detected for %u\n", getpid(), pgxact->xid);
1056+
elog(WARNING, "Deadlock detected for transaction %u", pgxact->xid);
1057+
}
10351058
return hasDeadlock;
10361059
}

contrib/pg_dtm/tests/dtmbench.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,10 @@ void* writer(void* arg)
161161
for (int i = 0; i < cfg.nIterations; i++)
162162
{
163163
int srcCon, dstCon;
164-
int srcAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
165-
int dstAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
164+
//int srcAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
165+
//int dstAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
166+
int srcAcc = random() % cfg.nAccounts;
167+
int dstAcc = random() % cfg.nAccounts;
166168

167169
do {
168170
srcCon = random() % cfg.connections.size();
@@ -183,7 +185,7 @@ void* writer(void* arg)
183185
exec(dstTx, "update t set v = v + 1 where u=%d", dstAcc);
184186
} catch (pqxx_exception const& x) {
185187
exec(srcTx, "rollback");
186-
exec(srcTx, "rollback");
188+
exec(dstTx, "rollback");
187189
t.aborts += 1;
188190
i -= 1;
189191
continue;

src/backend/access/transam/xact.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1312,6 +1312,7 @@ RecordTransactionCommit(void)
13121312
MyPgXact->delayChkpt = false;
13131313
END_CRIT_SECTION();
13141314
if (!committed) {
1315+
CurrentTransactionState->state = TRANS_ABORT;
13151316
elog(ERROR, "Transaction commit rejected by XTM");
13161317
}
13171318
}
@@ -5405,7 +5406,7 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
54055406
StandbyReleaseLockTree(xid, 0, NULL);
54065407
}
54075408
if (!committed) {
5408-
elog(NOTICE, "XTM rejected recovert of tran saction %u", xid);
5409+
elog(WARNING, "XTM rejected recovery of transaction %u", xid);
54095410
}
54105411
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
54115412
{

0 commit comments

Comments
 (0)