Skip to content

Commit 50c8a80

Browse files
committed
Add node hash table
1 parent 7c81f05 commit 50c8a80

File tree

9 files changed

+61
-17
lines changed

9 files changed

+61
-17
lines changed

contrib/pg_dtm/dtmd/include/ddd.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
#include <stdbool.h>
44
#include "transaction.h"
55

6-
typedef struct Instance {
6+
typedef struct Node {
7+
struct Node* collision;
78
struct Edge* edges; /* local subgraph */
8-
} Instance;
9+
nodeid_t node_id;
10+
} Node;
911

1012
typedef struct Edge {
1113
L2List node; /* node of list of outgoing eedges */
@@ -33,9 +35,13 @@ typedef struct Graph
3335
int min_deadlock_duration;
3436
} Graph;
3537

38+
typedef struct Cluster
39+
{
40+
Node* hashtable[MAX_STREAMS];
41+
} Cluster;
3642

3743
extern void initGraph(Graph* graph);
38-
extern void addSubgraph(Instance* instance, Graph* graph, xid_t* xids, int n_xids);
44+
extern void addSubgraph(Graph* graph, nodeid_t node_id, xid_t* xids, int n_xids);
3945
extern bool detectDeadLock(Graph* graph, xid_t root);
4046

4147
#endif

contrib/pg_dtm/dtmd/include/int.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@
22
#define INT_H
33

44
typedef unsigned xid_t;
5+
typedef unsigned long long nodeid_t;
56

67
#endif

contrib/pg_dtm/dtmd/include/server.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,4 +98,6 @@ bool client_message_finish(client_t client);
9898
*/
9999
bool client_message_shortcut(client_t client, xid_t arg);
100100

101+
unsigned client_get_ip_addr(client_t client);
102+
101103
#endif

contrib/pg_dtm/dtmd/src/ddd.c

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
static bool recursiveTraverseGraph(Vertex* root, Vertex* v, int marker);
77

8+
static Cluster cluster;
9+
810
void initGraph(Graph* graph)
911
{
1012
memset(graph->hashtable, 0, sizeof(graph->hashtable));
@@ -74,10 +76,28 @@ static inline Vertex* findVertex(Graph* graph, xid_t xid)
7476
return v;
7577
}
7678

77-
void addSubgraph(Instance* instance, Graph* graph, xid_t* xids, int n_xids)
79+
static inline Node* findNode(Cluster* cluster, nodeid_t node_id)
80+
{
81+
size_t h = node_id % MAX_STREAMS;
82+
Node* node;
83+
for (node = cluster->hashtable[h]; node != NULL; node = node->collision) {
84+
if (node->node_id == node_id) {
85+
return node;
86+
}
87+
}
88+
node = (Node*)malloc(sizeof(Node));
89+
node->node_id = node_id;
90+
node->edges = NULL;
91+
node->collision = cluster->hashtable[h];
92+
cluster->hashtable[h] = node;
93+
return node;
94+
}
95+
96+
void addSubgraph(Graph* graph, nodeid_t node_id, xid_t* xids, int n_xids)
7897
{
7998
xid_t *last = xids + n_xids;
8099
Edge *e, *next, *edges = NULL;
100+
Node* node = findNode(&cluster, node_id);
81101
while (xids != last) {
82102
Vertex* src = findVertex(graph, *xids++);
83103
xid_t xid;
@@ -92,7 +112,7 @@ void addSubgraph(Instance* instance, Graph* graph, xid_t* xids, int n_xids)
92112
l2_list_link(&src->outgoingEdges, &e->node);
93113
}
94114
}
95-
for (e = instance->edges; e != NULL; e = next) {
115+
for (e = node->edges; e != NULL; e = next) {
96116
next = e->next;
97117
l2_list_unlink(&e->node);
98118
if (--e->dst->nIncomingEdges == 0 && l2_list_is_empty(&e->dst->outgoingEdges)) {
@@ -103,7 +123,7 @@ void addSubgraph(Instance* instance, Graph* graph, xid_t* xids, int n_xids)
103123
}
104124
freeEdge(graph, e);
105125
}
106-
instance->edges = edges;
126+
node->edges = edges;
107127
}
108128

109129
static bool recursiveTraverseGraph(Vertex* root, Vertex* v, int marker)

contrib/pg_dtm/dtmd/src/main.c

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ typedef struct client_userdata_t {
3333
int id;
3434
int snapshots_sent;
3535
xid_t xid;
36-
Instance instance; /* It has to be moved somewhere else, because this is per-backend structure */
3736
} client_userdata_t;
3837

3938
clog_t clg;
@@ -536,17 +535,22 @@ static void onnoise(client_t client, int argc, xid_t *argv) {
536535
static Graph graph;
537536

538537
static void ondeadlock(client_t client, int argc, xid_t *argv) {
539-
if (argc < 3) {
538+
int port;
539+
xid_t root;
540+
nodeid_t node_id;
541+
542+
if (argc < 4) {
540543
shout(
541-
"[%d] DEADLOCK: wrong number of arguments %d, expected > 3\n",
544+
"[%d] DEADLOCK: wrong number of arguments %d, expected > 4\n",
542545
CLIENT_ID(client), argc
543546
);
544547
client_message_shortcut(client, RES_FAILED);
545548
return;
546549
}
547-
xid_t root = argv[1];
548-
Instance* instance = &CLIENT_USERDATA(client)->instance;
549-
addSubgraph(instance, &graph, argv+2, argc-2);
550+
port = argv[1];
551+
root = argv[2];
552+
node_id = ((nodeid_t)port << 32) | client_get_ip_addr(client);
553+
addSubgraph(&graph, node_id, argv+3, argc-3);
550554
bool hasDeadLock = detectDeadLock(&graph, root);
551555
client_message_shortcut(client, hasDeadLock ? RES_DEADLOCK : RES_OK);
552556
}

contrib/pg_dtm/dtmd/src/server.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <sys/socket.h>
1515
#include <arpa/inet.h>
1616
#include <netinet/tcp.h>
17+
#include <netinet/in.h>
1718

1819
#include "server.h"
1920
#include "limits.h"
@@ -490,6 +491,15 @@ void *client_get_userdata(client_t client) {
490491
return client->userdata;
491492
}
492493

494+
unsigned client_get_ip_addr(client_t client)
495+
{
496+
struct sockaddr_in inet_addr;
497+
socklen_t inet_addr_len = sizeof(inet_addr);
498+
inet_addr.sin_addr.s_addr = 0;
499+
getpeername(client->stream->fd, (struct sockaddr *)&inet_addr, &inet_addr_len);
500+
return inet_addr.sin_addr.s_addr;
501+
}
502+
493503
#if 0
494504
// usage example
495505

contrib/pg_dtm/libdtm.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -481,13 +481,13 @@ int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first)
481481
return 0;
482482
}
483483

484-
bool DtmGlobalDetectDeadLock(TransactionId xid, void* data, int size)
484+
bool DtmGlobalDetectDeadLock(int port, TransactionId xid, void* data, int size)
485485
{
486-
int msg_size = size + sizeof(xid)*2;
486+
int msg_size = size + sizeof(xid)*3;
487487
int data_size = sizeof(ShubMessageHdr) + msg_size;
488488
char* buf = (char*)malloc(data_size);
489489
ShubMessageHdr* msg = (ShubMessageHdr*)buf;
490-
xid_t* body = (xid_t*)(msg+1);
490+
xid_t* body = (xid_t*)(msg+2);
491491
int sent;
492492
int reslen;
493493
xid_t results[RESULTS_SIZE];
@@ -498,6 +498,7 @@ bool DtmGlobalDetectDeadLock(TransactionId xid, void* data, int size)
498498
msg->size = msg_size;
499499

500500
*body++ = CMD_DEADLOCK;
501+
*body++ = port;
501502
*body++ = xid;
502503
memcpy(body, data, size);
503504

contrib/pg_dtm/libdtm.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,6 @@ int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first);
6262
* resource graph if a new local graph is received from this cluster node (not
6363
* backend).
6464
*/
65-
bool DtmGlobalDetectDeadLock(TransactionId xid, void* graph, int size);
65+
bool DtmGlobalDetectDeadLock(int port, TransactionId xid, void* graph, int size);
6666

6767
#endif

contrib/pg_dtm/pg_dtm.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1029,7 +1029,7 @@ bool DtmDetectGlobalDeadLock(PGPROC* proc)
10291029
ByteBuffer buf;
10301030
ByteBufferAlloc(&buf);
10311031
EnumerateLocks(DtmSerializeLock, &buf);
1032-
hasDeadlock = DtmGlobalDetectDeadLock(proc->lxid, buf.data, buf.used);
1032+
hasDeadlock = DtmGlobalDetectDeadLock(PostPortNumber, proc->lxid, buf.data, buf.used);
10331033
ByteBufferFree(&buf);
10341034
return hasDeadlock;
10351035
}

0 commit comments

Comments
 (0)