Skip to content

Commit 3f42006

Browse files
committed
Add distributed deadlock detection
1 parent 0e2369f commit 3f42006

File tree

3 files changed

+165
-5
lines changed

3 files changed

+165
-5
lines changed

contrib/pg_dtm/dtmd/include/transaction.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,16 @@ typedef struct Transaction {
3434
void *listeners[CHAR_TO_INDEX('z')]; // we are going to use 'a' to 'z' for indexing
3535
} Transaction;
3636

37+
static inline void l2_list_is_empty(L2List* elem)
38+
{
39+
return elem->next == elem;
40+
}
41+
42+
static inline void l2_list_init(L2List* elem)
43+
{
44+
elem->next = elem->prev = elem;
45+
}
46+
3747
static inline void l2_list_link(L2List* after, L2List* elem)
3848
{
3949
elem->next = after->next;

contrib/pg_dtm/dtmd/src/ddd.c

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
#include "transaction.h"
2+
3+
typedef struct Instance {
4+
struct Edge* edges; /* local subgraph */
5+
} Instance;
6+
7+
typedef struct Edge {
8+
L2List node; /* node of list of outgoing eedges */
9+
struct Edge* next; /* list of edges of local subgraph */
10+
struct Vertex* dst;
11+
struct Vertex* src;
12+
} Edge;
13+
14+
typedef struct Vertex
15+
{
16+
L2List outgoingEdges;
17+
xid_t xid;
18+
int nIncomingEdges;
19+
bool visited;
20+
} Vertex;
21+
22+
typedef struct Graph
23+
{
24+
Vertex* hashtable[MAX_TRANSACTIONS];
25+
Edge* freeEdges;
26+
Vertex* freeVertexes;
27+
} Graph;
28+
29+
void initGraph(Graph* graph)
30+
{
31+
memset(graph->hashtable, 0. sizeof(graph->hashtable));
32+
graph->freeEdges = NULL;
33+
graph->freeVertexes = NULL;
34+
}
35+
36+
Edge* newEdge(Graph* graph)
37+
{
38+
Edge* edge = graph->freeEdges;
39+
if (edge == NULL) {
40+
edge = (Edge*)malloc(sizeof(Edge));
41+
} else {
42+
graph->freeEdges = edge->next;
43+
}
44+
return edge;
45+
}
46+
47+
void freeVertex(Graph* graph, Vertex* vertex)
48+
{
49+
vertex->node.next = (L2List*)graph->freeVertexes;
50+
graph->freeVertexes = vertex;
51+
}
52+
53+
void freeEdge(Graph* graph, Edge* edge)
54+
{
55+
edge->next = graph->freeEdges;
56+
graph->freeEdges = edge;
57+
}
58+
59+
Vertex* newVertex(Graph* graph)
60+
{
61+
Vertex* v = graph->freeVertexes;
62+
if (v == NULL) {
63+
v = (Vertex*)malloc(sizeof(Vertex));
64+
} else {
65+
graph->freeVertexes = (Vertex*)v.node.next;
66+
}
67+
return v;
68+
}
69+
70+
Vertex* findVertex(Graph* graph, xid_t xid)
71+
{
72+
xid_t h = xid;
73+
Vertex* v;
74+
while ((v = graph->hashtable[h % MAX_TRANSACTIONS]) != NULL) {
75+
if (v->xid == xid) {
76+
return v;
77+
}
78+
h += 1;
79+
}
80+
v = newVertex(graph);
81+
l2_list_init(v->outgoingEdges);
82+
v->xid = xid;
83+
v->nIncomingEdges = 0;
84+
graph->hashtable[h % MAX_TRANSACTIONS] = v;
85+
return v;
86+
}
87+
88+
void addSubgraph(Instance* instance, Graph* graph, xid_t* xids, int n_xids)
89+
{
90+
xid_t *last = xids + n_xids;
91+
Edge *e, *next, *edges = NULL;
92+
while (xids != last) {
93+
Vertex* src = findVertex(graph, *xids++);
94+
xid_t xid;
95+
while ((xid = *xids++) != 0) {
96+
Vertex* dst = findVertex(graph, xid);
97+
e = newEdge(graph);
98+
dst->nIncomingEdges += 1;
99+
e->dst = dst;
100+
e->src = src;
101+
e->next = edges;
102+
edges = e;
103+
l2_list_link(&src->outgoingEdges, &e->node);
104+
}
105+
}
106+
for (e = instance->edges; e != NULL; e = next) {
107+
next = e->next;
108+
l2_list_unlink(&e->node);
109+
if (--e->dst->nIncomingEdges == 0 && l2_list_is_empty(&e->dst->outgoingEdges)) {
110+
freeVertex(e->dst);
111+
}
112+
if (e->src->nIncomingEdges == 0 && l2_list_is_empty(&e->src->outgoingEdges)) {
113+
freeVertex(e->src);
114+
}
115+
freeEdge(e);
116+
}
117+
}
118+
119+
bool findLoop(Graph* graph)
120+
{
121+
}

contrib/pg_dtm/pg_dtm.c

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ static void DtmBackgroundWorker(Datum arg);
9191

9292
static void ByteBufferAlloc(ByteBuffer* buf);
9393
static void ByteBufferAppend(ByteBuffer* buf, void* data, int len);
94+
static void ByteBufferAppendInt32(ByteBuffer* buf, int data);
9495
static void ByteBufferFree(ByteBuffer* buf);
9596

9697

@@ -976,21 +977,49 @@ static void ByteBufferAppend(ByteBuffer* buf, void* data, int len)
976977
buf->used += len;
977978
}
978979

980+
static void ByteBufferAppendInt32(ByteBuffer* buf, int data)
981+
{
982+
ByteBufferAppend(buf, &data, sizeof data);
983+
}
984+
979985
static void ByteBufferFree(ByteBuffer* buf)
980986
{
981987
pfree(buf->data);
982988
}
983989

984-
#define APPEND(buf, x) ByteBufferAppend(buf, &x, sizeof(x))
985-
986990
static void DtmSerializeLock(PROCLOCK* proclock, void* arg)
987991
{
988992
ByteBuffer* buf = (ByteBuffer*)arg;
989993
LOCK* lock = proclock->tag.myLock;
994+
PGPROC* proc = proclock->tag.myProc;
990995
if (lock != NULL) {
991-
APPEND(buf, proclock->tag.myProc->lxid);
992-
APPEND(buf, proclock->holdMask);
993-
APPEND(buf, lock->tag.locktag_lockmethodid);
996+
if (proc->waitLock == lock) {
997+
LockMethod lockMethodTable = GetLocksMethodTable(lock);
998+
int numLockModes = lockMethodTable->numLockModes;
999+
int conflictMask = lockMethodTable->conflictTab[proc->waitLockMode];
1000+
SHM_QUEUE *procLocks = &(lock->procLocks);
1001+
int lm;
1002+
1003+
ByteBufferAppendInt32(buf, proc->lxid); /* waiting transaction */
1004+
proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
1005+
offsetof(PROCLOCK, lockLink));
1006+
while (proclock)
1007+
{
1008+
if (proc != proclock->tag.myProc) {
1009+
for (lm = 1; lm <= numLockModes; lm++)
1010+
{
1011+
if ((proclock->holdMask & LOCKBIT_ON(lm)) && (conflictMask & LOCKBIT_ON(lm)))
1012+
{
1013+
ByteBufferAppendInt32(buf, proclock->tag.myProc->lxid); /* transaction holding lock */
1014+
break;
1015+
}
1016+
}
1017+
}
1018+
proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
1019+
offsetof(PROCLOCK, lockLink));
1020+
}
1021+
ByteBufferAppendInt32(buf, 0); /* end of lock owners list */
1022+
}
9941023
}
9951024
}
9961025

0 commit comments

Comments
 (0)