Skip to content

Commit 8d25b12

Browse files
committed
Change libdtm to support "multi-CLOG" version of DTM.
1 parent 55892f8 commit 8d25b12

File tree

5 files changed

+88
-156
lines changed

5 files changed

+88
-156
lines changed

contrib/pg_xtm/dtmd/src/snapshot.c

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <assert.h>
44

55
#include "snapshot.h"
6+
#include "util.h"
67

78
static void append_char(char **cursorp, char c) {
89
*((*cursorp)++) = c;
@@ -18,22 +19,24 @@ char *snapshot_serialize(Snapshot *s) {
1819
assert(s->seqno > 0);
1920

2021
int numberlen = 16;
21-
int numbers = 3 + s->nactive;
22-
int len = 1 + numberlen * numbers + 1;
23-
char *data = malloc(len);
22+
int numbers = 3 + s->nactive; // xmin, xmax, n, active...
23+
int len = 1 + numberlen * numbers; // +1 for '+'
24+
char *data = malloc(len + 1); // +1 for '\0'
2425

2526
char *cursor = data;
2627

2728
append_char(&cursor, '+');
2829
append_hex16(&cursor, s->xmin);
2930
append_hex16(&cursor, s->xmax);
31+
append_hex16(&cursor, s->nactive);
3032

3133
int i;
3234
for (i = 0; i < s->nactive; i++) {
3335
append_hex16(&cursor, s->active[i]);
3436
}
3537

38+
shout("cursor - data = %ld, len = %d\n", cursor - data, len);
3639
assert(cursor - data == len);
37-
assert(data[len - 1] == '\0');
40+
assert(data[len] == '\0');
3841
return data;
3942
}

contrib/pg_xtm/libdtm.c

Lines changed: 67 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
#include <stdio.h>
55
#include <unistd.h>
66
#include <stdlib.h>
7+
#include <assert.h>
78

89
#include "libdtm.h"
910

10-
#if 0
11+
#ifdef TEST
12+
// standalone test without postgres functions
1113
#define palloc malloc
1214
#define pfree free
1315
#endif
@@ -119,159 +121,109 @@ void DtmDisconnect(DTMConn dtm) {
119121
free(dtm);
120122
}
121123

122-
// Asks DTM for a fresh snapshot. Returns a snapshot on success, or NULL
123-
// otherwise. Please free the snapshot memory yourself after use.
124-
Snapshot DtmGlobalGetSnapshot(DTMConn dtm, NodeId nodeid, TransactionId xid, Snapshot s) {
124+
static bool dtm_query(DTMConn dtm, char cmd, int argc, ...) {
125+
va_list argv;
126+
int i;
127+
128+
if (!dtm_write_char(dtm, cmd)) return false;
129+
if (!dtm_write_hex16(dtm, argc)) return false;
130+
131+
va_start(argv, argc);
132+
for (i = 0; i < argc; i++) {
133+
xid_t arg = va_arg(argv, xid_t);
134+
if (!dtm_write_hex16(dtm, arg)) {
135+
va_end(argv);
136+
return false;
137+
}
138+
}
139+
va_end(argv);
140+
141+
return true;
142+
}
143+
144+
// Asks DTM for a fresh snapshot. Returns 'true' on success, or 'false'
145+
// otherwise.
146+
bool DtmGlobalGetSnapshot(DTMConn dtm, NodeId nodeid, TransactionId xid, Snapshot s) {
125147
bool ok;
126148
int i;
127149
xid_t number;
128150

129-
if (!dtm_write_char(dtm, 'h')) {
130-
return NULL;
131-
}
151+
assert(s != NULL);
132152

133-
if (!dtm_read_bool(dtm, &ok)) {
134-
return NULL;
135-
}
136-
if (!ok) {
137-
return NULL;
138-
}
153+
// query
154+
if (!dtm_query(dtm, 'h', 2, nodeid, xid)) return false;
139155

140-
if (!dtm_read_hex16(dtm, &number)) {
141-
goto cleanup_snapshot;
142-
}
156+
// response
157+
if (!dtm_read_bool(dtm, &ok)) return false;
158+
if (!ok) return false;
159+
160+
if (!dtm_read_hex16(dtm, &number)) return false;
143161
s->xmin = number;
144162
Assert(s->xmin == number); // the number should fits into xmin field size
145163

146-
if (!dtm_read_hex16(dtm, &number)) {
147-
goto cleanup_snapshot;
148-
}
164+
if (!dtm_read_hex16(dtm, &number)) return false;
149165
s->xmax = number;
150166
Assert(s->xmax == number); // the number should fit into xmax field size
151167

152-
if (!dtm_read_hex16(dtm, &number)) {
153-
goto cleanup_snapshot;
154-
}
168+
if (!dtm_read_hex16(dtm, &number)) return false;
155169
s->xcnt = number;
156170
Assert(s->xcnt == number); // the number should definitely fit into xcnt field size
157171

172+
if (s->xip) pfree(s->xip);
158173
s->xip = palloc(s->xcnt * sizeof(TransactionId));
159174
for (i = 0; i < s->xcnt; i++) {
160-
if (!dtm_read_hex16(dtm, &number)) {
161-
goto cleanup_active_list;
162-
}
175+
if (!dtm_read_hex16(dtm, &number)) return false;
163176
s->xip[i] = number;
164177
Assert(s->xip[i] == number); // the number should fit into xip[i] size
165178
}
166179

167-
return s;
168-
169-
cleanup_active_list:
170-
pfree(s->xip);
171-
cleanup_snapshot:
172-
pfree(s);
173-
return NULL;
174-
}
175-
176-
#if 0
177-
// Starts a transaction. Returns the 'gxid' on success, or INVALID_GXID otherwise.
178-
xid_t DtmGlobalBegin(DTMConn dtm) {
179-
bool ok;
180-
xid_t gxid;
181-
182-
if (!dtm_write_char(dtm, 'b')) {
183-
return INVALID_GXID;
184-
}
185-
186-
if (!dtm_read_bool(dtm, &ok)) {
187-
return INVALID_GXID;
188-
}
189-
if (!ok) {
190-
return INVALID_GXID;
191-
}
192-
193-
if (!dtm_read_hex16(dtm, &gxid)) {
194-
return INVALID_GXID;
195-
}
196-
197-
return gxid;
180+
return true;
198181
}
199-
#endif
200182

201-
void DtmGlobalSetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, XidStatus status)
183+
// Commits transaction only once all participants have called this function,
184+
// does not change CLOG otherwise. Returns 'true' on success, 'false' if
185+
// something failed on the daemon side.
186+
bool DtmGlobalSetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, XidStatus status)
202187
{
203-
}
204-
#if 0
205-
// Marks a given transaction as 'committed'. Returns 'true' on success,
206-
// 'false' otherwise.
207-
bool DtmGlobalCommit(DTMConn dtm, xid_t gxid) {
208-
bool result;
209-
210-
if (!dtm_write_char(dtm, 'c')) {
211-
return false;
212-
}
213-
214-
if (!dtm_write_hex16(dtm, gxid)) {
215-
return false;
216-
}
217-
218-
if (!dtm_read_bool(dtm, &result)) {
219-
return false;
220-
}
221-
222-
return result;
223-
}
224-
225-
// Marks a given transaction as 'aborted'.
226-
void DtmGlobalRollback(DTMConn dtm, xid_t gxid) {
227-
bool result;
228-
229-
if (!dtm_write_char(dtm, 'a')) {
230-
return;
188+
bool ok;
189+
switch (status) {
190+
case TRANSACTION_STATUS_COMMITTED:
191+
// query
192+
if (!dtm_query(dtm, 'c', 2, nodeid, xid)) return false;
193+
break;
194+
case TRANSACTION_STATUS_ABORTED:
195+
// query
196+
if (!dtm_query(dtm, 'a', 2, nodeid, xid)) return false;
197+
break;
198+
default:
199+
assert(false); // should not happen
200+
return false;
231201
}
232202

233-
if (!dtm_write_hex16(dtm, gxid)) {
234-
return;
235-
}
203+
if (!dtm_read_bool(dtm, &ok)) return false;
236204

237-
if (!dtm_read_bool(dtm, &result)) {
238-
return;
239-
}
205+
return ok;
240206
}
241-
#endif
242207

243-
// Gets the status of the transaction identified by 'gxid'. Returns the status
208+
// Gets the status of the transaction identified by 'xid'. Returns the status
244209
// on success, or -1 otherwise.
245-
XidStatus DtmGlobalGetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId gxid) {
210+
XidStatus DtmGlobalGetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid) {
246211
bool result;
247212
char statuschar;
248213

249-
if (!dtm_write_char(dtm, 's')) {
250-
return -1;
251-
}
252-
253-
if (!dtm_write_hex16(dtm, gxid)) {
254-
return -1;
255-
}
214+
if (!dtm_query(dtm, 's', 2, nodeid, xid)) return -1;
256215

257-
if (!dtm_read_bool(dtm, &result)) {
258-
return -1;
259-
}
260-
if (!result) {
261-
return -1;
262-
}
263-
264-
if (!dtm_read_char(dtm, &statuschar)) {
265-
return -1;
266-
}
216+
if (!dtm_read_bool(dtm, &result)) return -1;
217+
if (!result) return -1;
218+
if (!dtm_read_char(dtm, &statuschar)) return -1;
267219

268220
switch (statuschar) {
269221
case 'c':
270-
return COMMIT_YES;
222+
return TRANSACTION_STATUS_COMMITTED;
271223
case 'a':
272-
return COMMIT_NO;
224+
return TRANSACTION_STATUS_ABORTED;
273225
case '?':
274-
return COMMIT_UNKNOWN;
226+
return TRANSACTION_STATUS_IN_PROGRESS;
275227
default:
276228
return -1;
277229
}

contrib/pg_xtm/libdtm.h

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@
66
#include "access/clog.h"
77

88
#define INVALID_XID 0
9-
#define COMMIT_UNKNOWN 0
10-
#define COMMIT_YES 1
11-
#define COMMIT_NO 2
129

1310
typedef int NodeId;
1411
typedef unsigned long long xid_t;
@@ -22,20 +19,26 @@ DTMConn DtmConnect(char *host, int port);
2219
// bad things will happen.
2320
void DtmDisconnect(DTMConn dtm);
2421

25-
2622
typedef struct {
2723
TransactionId* xids;
2824
NodeId* nodes;
2925
int nNodes;
3026
} GlobalTransactionId;
3127

32-
/* create entry for new global transaction */
28+
// Creates an entry for a new global transaction.
3329
void DtmGlobalStartTransaction(DTMConn dtm, GlobalTransactionId* gtid);
3430

35-
Snapshot DtmGlobalGetSnapshot(DTMConn dtm, NodeId nodeid, TransactionId xid, Snapshot snapshot);
31+
// Asks DTM for a fresh snapshot. Returns 'true' on success, or 'false'
32+
// otherwise.
33+
bool DtmGlobalGetSnapshot(DTMConn dtm, NodeId nodeid, TransactionId xid, Snapshot snapshot);
3634

37-
void DtmGlobalSetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, XidStatus status); /* commit transaction only once all participants are committed, before it do not change CLOG */
35+
// Commits transaction only once all participants have called this function,
36+
// does not change CLOG otherwise. Returns 'true' on success, 'false' if
37+
// something failed on the daemon side.
38+
bool DtmGlobalSetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, XidStatus status);
3839

40+
// Gets the status of the transaction identified by 'xid'. Returns the status
41+
// on success, or -1 otherwise.
3942
XidStatus DtmGlobalGetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid);
4043

4144
#endif

contrib/pg_xtm/libdtm/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
CC=clang
1+
CC=clang -DTEST
22
CFLAGS=-g -Wall -I"../../../src/include"
33

44

contrib/pg_xtm/libdtm/src/example.c

Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,36 +10,10 @@ int main() {
1010
exit(1);
1111
}
1212

13-
xid_t gxid;
14-
Snapshot s;
13+
xid_t xid = 0xdeadbeefcafebabe;
14+
Snapshot s = malloc(sizeof(SnapshotData));
1515

16-
gxid = DtmGlobalBegin(conn);
17-
if (gxid == INVALID_GXID) {
18-
fprintf(stderr, "failed to begin a transaction\n");
19-
exit(1);
20-
}
21-
fprintf(stdout, "began gxid = %llu\n", gxid);
22-
23-
s = DtmGlobalGetSnapshot(conn);
24-
fprintf(stdout, "snapshot is %d xids wide\n", s->xcnt);
25-
26-
if (!DtmGlobalCommit(conn, gxid)) {
27-
fprintf(stderr, "failed to commit gxid = %llu\n", gxid);
28-
exit(1);
29-
}
30-
31-
gxid = DtmGlobalBegin(conn);
32-
if (gxid == INVALID_GXID) {
33-
fprintf(stderr, "failed to begin a transaction\n");
34-
exit(1);
35-
}
36-
fprintf(stdout, "began gxid = %llu\n", gxid);
37-
38-
DtmGlobalRollback(conn, gxid);
39-
int status = DtmGlobalGetTransStatus(conn, gxid);
40-
fprintf(stdout, "status of %llu is %d\n", gxid, status);
41-
42-
s = DtmGlobalGetSnapshot(conn);
16+
DtmGlobalGetSnapshot(conn, 0, xid, s);
4317
fprintf(stdout, "snapshot is %d xids wide\n", s->xcnt);
4418

4519
DtmDisconnect(conn);

0 commit comments

Comments
 (0)