Skip to content

Commit 6309492

Browse files
committed
Implement the "multi-CLOG" version of DTMD.
1 parent acab04b commit 6309492

File tree

15 files changed

+557
-341
lines changed

15 files changed

+557
-341
lines changed

contrib/pg_xtm/dtmd/Makefile

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,22 @@ all: bin/dtmd
1515
@echo Done.
1616
@echo Feel free to run the tests with \'make check\'.
1717

18-
bin/dtmd: obj/intset.o obj/eventwrap.o obj/main.o obj/parser.o obj/clog.o obj/clogfile.o obj/util.o | bindir objdir
18+
bin/dtmd: obj/eventwrap.o obj/main.o obj/parser.o obj/clog.o obj/clogfile.o obj/util.o obj/transaction.o obj/snapshot.o | bindir objdir
1919
$(CC) -o bin/dtmd $(CFLAGS) $(LIBUV_CFLAGS) \
20-
obj/intset.o obj/eventwrap.o obj/main.o obj/parser.o \
21-
obj/clog.o obj/clogfile.o obj/util.o \
20+
obj/eventwrap.o obj/main.o obj/parser.o \
21+
obj/clog.o obj/clogfile.o obj/util.o obj/transaction.o \
22+
obj/snapshot.o \
2223
$(LIBUV_LDFLAGS)
2324

2425
obj/eventwrap.o: src/eventwrap.c | objdir
2526
$(CC) -c -o obj/eventwrap.o $(CFLAGS) $(LIBUV_CFLAGS) src/eventwrap.c
2627

27-
check: bin/intset-test bin/util-test bin/clog-test bin/parser-test
28-
./check.sh intset util parser clog
28+
check: bin/util-test bin/clog-test bin/parser-test
29+
./check.sh util parser clog
2930

3031
obj/%.o: src/%.c | objdir
3132
$(CC) $(CFLAGS) -c -o $@ $<
3233

33-
bin/intset-test: obj/intset-test.o obj/intset.o | bindir
34-
$(CC) -o bin/intset-test $(CFLAGS) obj/intset-test.o obj/intset.o
35-
3634
bin/util-test: obj/util-test.o obj/util.o | bindir
3735
$(CC) -o bin/util-test $(CFLAGS) obj/util-test.o obj/util.o
3836

contrib/pg_xtm/dtmd/include/clog.h

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
#define MIN_GXID 42
1313
#define MAX_GXID 0xdeadbeefcafebabe
1414

15-
#define COMMIT_UNKNOWN 0
16-
#define COMMIT_YES 1
17-
#define COMMIT_NO 2
15+
#define NEUTRAL 0
16+
#define POSITIVE 1
17+
#define NEGATIVE 2
1818

1919
typedef struct clog_data_t *clog_t;
2020

@@ -30,13 +30,6 @@ int clog_read(clog_t clog, xid_t gxid);
3030
// 'false' otherwise.
3131
bool clog_write(clog_t clog, xid_t gxid, int status);
3232

33-
// Allocate a fresh unused gxid. Return INVALID_GXID on error.
34-
xid_t clog_advance(clog_t clog);
35-
36-
// Get the first unknown commit id (used as a snapshot). Return INVALID_GXID on
37-
// error.
38-
xid_t clog_horizon(clog_t clog);
39-
4033
// Forget about the commits before the given one ('until'), and free the
4134
// occupied space if possible. Return 'true' on success, 'false' otherwise.
4235
bool clog_forget(clog_t clog, xid_t until);

contrib/pg_xtm/dtmd/include/limits.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#ifndef LIMITS_H
2+
#define LIMITS_H
3+
4+
#define MAX_TRANSACTIONS_PER_CLIENT 1024
5+
#define MAX_TRANSACTIONS 1024
6+
7+
#define BITS_PER_NODE 4
8+
#define MAX_NODES (1 << BITS_PER_NODE)
9+
10+
#define MUX_XID(NODE, XID) (((XID) << (BITS_PER_NODE)) + NODE)
11+
12+
#endif

contrib/pg_xtm/dtmd/include/parser.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <stdbool.h>
55

66
#include "int.h"
7+
#include "limits.h"
78

89
#define CMD_BEGIN 'b'
910
#define CMD_COMMIT 'c'
@@ -13,26 +14,27 @@
1314

1415
typedef struct cmd_t {
1516
char cmd;
16-
xid_t arg;
17+
int argc;
18+
xid_t argv[MAX_NODES * 2];
1719
} cmd_t;
1820

1921
// Do not rely on the inner structure, it may change tomorrow.
2022
typedef struct parser_data_t *parser_t;
2123

22-
// Allocate and initialize a parser.
24+
// Allocates and initializes a parser.
2325
parser_t parser_create();
2426

25-
// Destroy the parser. The 'p' handle becomes invalid, so do not refer to it
27+
// Destroys the parser. The 'p' handle becomes invalid, so do not refer to it
2628
// after destroying the parser.
2729
void parser_destroy(parser_t p);
2830

29-
// Initialize the parser.
31+
// Initializes the parser.
3032
void parser_init(parser_t p);
3133

32-
// Check if parser has failed.
34+
// Checks if parser has failed.
3335
bool parser_failed(parser_t p);
3436

35-
// Get the error message for the parser.
37+
// Gets the error message for the parser.
3638
char *parser_errormsg(parser_t p);
3739

3840
// Feeds a character to the parser, and returns a parsed command if it is
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#ifndef SNAPSHOT_H
2+
#define SNAPSHOT_H
3+
4+
#include "int.h"
5+
#include "limits.h"
6+
7+
typedef struct Snapshot {
8+
// initially 0, which means 'invalid snapshot'
9+
int seqno;
10+
11+
xid_t xmin;
12+
xid_t xmax;
13+
int nactive;
14+
xid_t active[MAX_TRANSACTIONS];
15+
} Snapshot;
16+
17+
char *snapshot_serialize(Snapshot *s);
18+
19+
#endif
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#ifndef TRANSACTION_H
2+
#define TRANSACTION_H
3+
4+
#include <stdbool.h>
5+
#include "int.h"
6+
#include "clog.h"
7+
#include "snapshot.h"
8+
#include "limits.h"
9+
10+
typedef struct Transaction {
11+
// true if the transaction was started on the node
12+
bool active;
13+
14+
int node;
15+
int vote;
16+
17+
xid_t xid;
18+
Snapshot snapshot;
19+
20+
// if this is equal to seqno, we need to generate a new snapshot (for each node)
21+
int sent_seqno;
22+
} Transaction;
23+
24+
typedef struct GlobalTransaction {
25+
Transaction participants[MAX_NODES];
26+
} GlobalTransaction;
27+
28+
int global_transaction_status(GlobalTransaction *gt);
29+
bool global_transaction_mark(clog_t clg, GlobalTransaction *gt, int status);
30+
31+
#endif

contrib/pg_xtm/dtmd/include/util.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <stdbool.h>
99
#include <sys/stat.h>
1010
#include <fcntl.h>
11+
#include <stdio.h>
1112

1213
#include "int.h"
1314

contrib/pg_xtm/dtmd/src/clog-test.c

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,50 +14,32 @@ bool test_clog(char *datadir) {
1414
clog_t clog;
1515
if (!(clog = clog_open(datadir))) return false;
1616

17-
xid_t gxid;
18-
19-
if ((gxid = clog_horizon(clog)) == INVALID_GXID) return false;
20-
printf("horizon = %llu\n", gxid);
21-
22-
int count = 2000;
23-
printf("allocating %d gxids\n", count);
24-
while (count-- > 0) {
25-
if ((gxid = clog_advance(clog)) == INVALID_GXID) return false;
26-
}
27-
28-
xid_t last_gxid = clog_advance(clog);
2917
clog_close(clog);
3018
if (!(clog = clog_open(datadir))) return false;
31-
if ((gxid = clog_advance(clog)) == INVALID_GXID) return false;
32-
if (gxid == last_gxid) {
33-
printf("clog_advance() gave out the same value %llu twice because of clog reopening\n", gxid);
34-
return false;
35-
}
36-
37-
if ((gxid = clog_horizon(clog)) == INVALID_GXID) return false;
38-
printf("horizon = %llu\n", gxid);
3919

20+
if (!clog_write(clog, 42, NEGATIVE)) return false;
21+
if (!clog_write(clog, 1000, NEGATIVE)) return false;
4022
printf("commit %d status %d\n", 42, clog_read(clog, 42));
4123
printf("commit %d status %d\n", 1000, clog_read(clog, 1000));
42-
if (!clog_write(clog, 1000, COMMIT_YES)) return false;
43-
if (!clog_write(clog, 1500, COMMIT_NO)) return false;
24+
if (!clog_write(clog, 1000, POSITIVE)) return false;
25+
if (!clog_write(clog, 1500, NEGATIVE)) return false;
4426

4527
if (!clog_close(clog)) return false;
4628
if (!(clog = clog_open(datadir))) return false;
4729

4830
int status;
4931

5032
printf("commit %d status %d (should be 2)\n", 42, status = clog_read(clog, 42));
51-
if (status != COMMIT_NO) return false;
33+
if (status != NEGATIVE) return false;
5234

5335
printf("commit %d status %d (should be 1)\n", 1000, status = clog_read(clog, 1000));
54-
if (status != COMMIT_YES) return false;
36+
if (status != POSITIVE) return false;
5537

5638
printf("commit %d status %d (should be 2)\n", 1500, status = clog_read(clog, 1500));
57-
if (status != COMMIT_NO) return false;
39+
if (status != NEGATIVE) return false;
5840

5941
printf("commit %d status %d (should be 0)\n", 2044, status = clog_read(clog, 2044));
60-
if (status != COMMIT_UNKNOWN) return false;
42+
if (status != NEUTRAL) return false;
6143

6244
if (!clog_close(clog)) return false;
6345

0 commit comments

Comments
 (0)