Skip to content

Commit b26ac71

Browse files
committed
Integrate deadlock detection in DTMD
1 parent ebb4958 commit b26ac71

File tree

4 files changed

+63
-2
lines changed

4 files changed

+63
-2
lines changed

contrib/pg_dtm/dtmd/include/proto.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77
#define CMD_AGAINST 'n'
88
#define CMD_SNAPSHOT 'h'
99
#define CMD_STATUS 's'
10+
#define CMD_DEADLOCK 'd'
1011

1112
#define RES_FAILED 0xDEADBEEF
1213
#define RES_OK 0xC0FFEE
14+
#define RES_DEADLOCK 0xDEADDEED
1315
#define RES_TRANSACTION_COMMITTED 1
1416
#define RES_TRANSACTION_ABORTED 2
1517
#define RES_TRANSACTION_INPROGRESS 3

contrib/pg_dtm/dtmd/src/main.c

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "util.h"
1414
#include "transaction.h"
1515
#include "proto.h"
16+
#include "ddd.h"
1617

1718
#define DEFAULT_DATADIR "/tmp/clog"
1819
#define DEFAULT_LISTENHOST "0.0.0.0"
@@ -32,6 +33,7 @@ typedef struct client_userdata_t {
3233
int id;
3334
int snapshots_sent;
3435
xid_t xid;
36+
Instance instance; /* It has to be moved somewhere else, because this is per-backend structure */
3537
} client_userdata_t;
3638

3739
clog_t clg;
@@ -158,6 +160,7 @@ static void debug_cmd(client_t client, int argc, xid_t *argv) {
158160
case CMD_AGAINST : cmdname = "AGAINST"; break;
159161
case CMD_SNAPSHOT: cmdname = "SNAPSHOT"; break;
160162
case CMD_STATUS : cmdname = "STATUS"; break;
163+
case CMD_DEADLOCK: cmdname = "DEADLOCK"; break;
161164
default : cmdname = "unknown";
162165
}
163166
debug("[%d] %s", CLIENT_ID(client), cmdname);
@@ -530,6 +533,25 @@ static void onnoise(client_t client, int argc, xid_t *argv) {
530533
client_message_shortcut(client, RES_FAILED);
531534
}
532535

536+
static Graph graph;
537+
538+
static void ondeadlock(client_t client, int argc, xid_t *argv) {
539+
if (argc < 3) {
540+
shout(
541+
"[%d] DEADLOCK: wrong number of arguments %d, expected > 3\n",
542+
CLIENT_ID(client), argc
543+
);
544+
client_message_shortcut(client, RES_FAILED);
545+
return;
546+
}
547+
xid_t root = argv[1];
548+
Instance* instance = &CLIENT_USERDATA(client)->instance;
549+
addSubgraph(instance, &graph, argv+2, argc-2);
550+
bool hasDeadLock = findLoop(&graph, root);
551+
client_message_shortcut(client, hasDeadLock ? RES_DEADLOCK : RES_OK);
552+
}
553+
554+
533555
static void oncmd(client_t client, int argc, xid_t *argv) {
534556
debug_cmd(client, argc, argv);
535557

@@ -553,6 +575,9 @@ static void oncmd(client_t client, int argc, xid_t *argv) {
553575
case CMD_STATUS:
554576
onstatus(client, argc, argv);
555577
break;
578+
case CMD_DEADLOCK:
579+
ondeadlock(client, argc, argv);
580+
break;
556581
default:
557582
onnoise(client, argc, argv);
558583
}

contrib/pg_dtm/dtmd/src/util.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ char *join_path(const char *dir, const char *file) {
3131
}
3232
} else {
3333
// 'file' is empty
34-
return strndup(dir, dirlen);
34+
return strdup(dir);
3535
}
3636

3737
size_t pathlen = dirlen + 1 + filelen;

contrib/pg_dtm/libdtm.c

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,5 +502,39 @@ int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first)
502502

503503
bool DtmGlobalDetectDeadLock(TransactionId xid, void* data, int size)
504504
{
505-
return false;
505+
int msg_size = size + sizeof(xid)*2;
506+
int data_size = sizeof(ShubMessageHdr) + msg_size;
507+
char* buf = (char*)malloc(data_size);
508+
ShubMessageHdr* msg = (ShubMessageHdr*)buf;
509+
xid_t* body = (xid_t*)(msg+1);
510+
int sent;
511+
int reslen;
512+
xid_t results[RESULTS_SIZE];
513+
DTMConn dtm = GetConnection();
514+
515+
msg->chan = 0;
516+
msg->code = MSG_FIRST_USER_CODE;
517+
msg->size = msg_size;
518+
519+
*body++ = CMD_DEADLOCK;
520+
*body++ = xid;
521+
memcpy(body, data, size);
522+
523+
sent = 0;
524+
while (sent < data_size)
525+
{
526+
int new_bytes = write(dtm->sock, buf + sent, data_size - sent);
527+
if (new_bytes == -1)
528+
{
529+
elog(ERROR, "Failed to send a command to arbiter");
530+
return false;
531+
}
532+
sent += new_bytes;
533+
}
534+
reslen = dtm_recv_results(dtm, RESULTS_SIZE, results);
535+
if (reslen != 1 || (results[0] != RES_OK && results[0] != RES_DEADLOCK)) {
536+
fprintf(stderr, "DtmGlobalDetectDeadLock: failed to check deadlocks for transaction %u\n", xid);
537+
return false;
538+
}
539+
return results[0] == RES_DEADLOCK;
506540
}

0 commit comments

Comments
 (0)