Skip to content

Commit 28cbfd2

Browse files
committed
merge
2 parents fe84577 + 16cb19c commit 28cbfd2

File tree

10 files changed

+285
-189
lines changed

10 files changed

+285
-189
lines changed

contrib/pg_xtm/README

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,13 @@ XidStatus DtmGlobalSetTransStatus(TransactionId xid, XidStatus status, bool wait
6060
XidStatus DtmGlobalGetTransStatus(TransactionId xid, bool wait);
6161

6262
// Reserves at least 'nXids' successive xids for local transactions. The xids
63-
// reserved are not less than 'xid' in value. Returns the actual number
64-
// of xids reserved, and sets the 'first' xid accordingly. The number of xids
65-
// reserved is guaranteed to be at least nXids.
63+
// reserved are not less than 'xid' in value. Returns the actual number of xids
64+
// reserved, and sets the 'first' xid accordingly. The number of xids reserved
65+
// is guaranteed to be at least nXids.
6666
// In other words, *first ≥ xid and result ≥ nXids.
67-
int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first);
67+
// Also sets the 'active' snapshot, which is used as a container for the list
68+
// of active global transactions.
69+
int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first, Snapshot active);
6870

6971
--------------------
7072
Backend-DTM Protocol
@@ -82,10 +84,11 @@ The commands:
8284

8385
'r': reserve(minxid, minsize)
8486
Claims a sequence ≥ minsize of xids ≥ minxid for local usage. This will
85-
prevent DTM from using those values for global transactions.
87+
prevent DTM from using those values for global transactions. The
88+
'snapshot' represent the list of currently active global transactions.
8689

8790
The DTM replies with:
88-
'+'<hex16 min><hex16 max> if reserved a range [min, max]
91+
'+'<hex16 min><hex16 max><snapshot> if reserved a range [min, max]
8992
'-' on failure
9093

9194
'b': begin(size)

contrib/pg_xtm/dtmd/include/transaction.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ typedef struct Transaction {
2828

2929
Snapshot *transaction_latest_snapshot(Transaction *t);
3030
Snapshot *transaction_snapshot(Transaction *t, int snapno);
31+
Snapshot *transaction_next_snapshot(Transaction *t);
3132
int transaction_status(Transaction *t);
3233
void transaction_clear(Transaction *t);
3334
void transaction_push_listener(Transaction *t, char cmd, void *listener);

contrib/pg_xtm/dtmd/src/main.c

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,32 @@ static xid_t max(xid_t a, xid_t b) {
160160
return a > b ? a : b;
161161
}
162162

163+
static void gen_snapshot(Snapshot *s) {
164+
s->times_sent = 0;
165+
s->nactive = 0;
166+
s->xmin = MAX_XID;
167+
s->xmax = MIN_XID;
168+
int i;
169+
for (i = 0; i < transactions_count; i++) {
170+
Transaction *t = transactions + i;
171+
if (t->xid < s->xmin) {
172+
s->xmin = t->xid;
173+
}
174+
if (t->xid >= s->xmax) {
175+
s->xmax = t->xid + 1;
176+
}
177+
s->active[s->nactive++] = t->xid;
178+
}
179+
if (s->nactive > 0) {
180+
assert(s->xmin < MAX_XID);
181+
assert(s->xmax > MIN_XID);
182+
assert(s->xmin <= s->xmax);
183+
snapshot_sort(s);
184+
} else {
185+
s->xmin = s->xmax = 0;
186+
}
187+
}
188+
163189
static char *onreserve(void *stream, void *clientdata, cmd_t *cmd) {
164190
CHECK(
165191
cmd->argc == 2,
@@ -195,49 +221,29 @@ static char *onreserve(void *stream, void *clientdata, cmd_t *cmd) {
195221
minxid, maxxid
196222
);
197223

198-
char response[1+16+16+1];
199-
sprintf(response, "+%016llx%016llx", minxid, maxxid);
200-
return strdup(response);
201-
}
224+
char head[1+16+16+1];
225+
sprintf(head, "+%016llx%016llx", minxid, maxxid);
202226

203-
static void gen_snapshot(Transaction *t) {
204-
t->snapshots_count += 1;
205-
Snapshot *s = transaction_latest_snapshot(t);
227+
Snapshot s;
228+
gen_snapshot(&s);
229+
char *snapser = snapshot_serialize(&s);
206230

207-
s->times_sent = 0;
208-
s->nactive = 0;
209-
s->xmin = MAX_XID;
210-
s->xmax = MIN_XID;
211-
int i;
212-
for (i = 0; i < transactions_count; i++) {
213-
Transaction *t = transactions + i;
214-
if (t->xid < s->xmin) {
215-
s->xmin = t->xid;
216-
}
217-
if (t->xid >= s->xmax) {
218-
s->xmax = t->xid + 1;
219-
}
220-
s->active[s->nactive++] = t->xid;
221-
}
222-
assert(s->xmin < MAX_XID);
223-
assert(s->xmax > MIN_XID);
224-
assert(s->xmin <= s->xmax);
225-
snapshot_sort(s);
231+
return destructive_concat(strdup(head), snapser);
226232
}
227233

228234
static xid_t get_global_xmin() {
229235
int i, j;
230-
xid_t xmin = MAX_XID;
236+
xid_t xmin = INVALID_XID;
231237
Transaction *t;
232238
for (i = 0; i < transactions_count; i++) {
233239
t = transactions + i;
234-
j = t->snapshots_count > MAX_SNAPSHOTS_PER_TRANS ? MAX_SNAPSHOTS_PER_TRANS : t->snapshots_count;
235-
while (--j >= 0) {
236-
Snapshot* s = transaction_snapshot(t, j);
237-
if (s->xmin < xmin) {
238-
xmin = s->xmin;
239-
}
240-
// minor TODO: Use 'times_sent' to generate a bit greater xmin?
240+
j = t->snapshots_count > MAX_SNAPSHOTS_PER_TRANS ? MAX_SNAPSHOTS_PER_TRANS : t->snapshots_count;
241+
while (--j >= 0) {
242+
Snapshot* s = transaction_snapshot(t, j);
243+
if ((xmin == INVALID_XID) || (s->xmin < xmin)) {
244+
xmin = s->xmin;
245+
}
246+
// minor TODO: Use 'times_sent' to generate a bit greater xmin?
241247
}
242248
}
243249
return xmin;
@@ -293,7 +299,7 @@ static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
293299

294300
transactions_count++;
295301

296-
gen_snapshot(t);
302+
gen_snapshot(transaction_next_snapshot(t));
297303
// will wrap around if exceeded max snapshots
298304
Snapshot *snap = transaction_latest_snapshot(t);
299305
char *snapser = snapshot_serialize(snap);
@@ -442,7 +448,7 @@ static char *onsnapshot(void *stream, void *clientdata, cmd_t *cmd) {
442448

443449
if (CLIENT_SNAPSENT(clientdata) == t->snapshots_count) {
444450
// a fresh snapshot is needed
445-
gen_snapshot(t);
451+
gen_snapshot(transaction_next_snapshot(t));
446452
}
447453

448454
char head[1+16+1];

contrib/pg_xtm/dtmd/src/transaction.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,7 @@ Snapshot *transaction_snapshot(Transaction *t, int snapno) {
6464
Snapshot *transaction_latest_snapshot(Transaction *t) {
6565
return transaction_snapshot(t, t->snapshots_count - 1);
6666
}
67+
68+
Snapshot *transaction_next_snapshot(Transaction *t) {
69+
return transaction_snapshot(t, t->snapshots_count++);
70+
}

contrib/pg_xtm/libdtm.c

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -403,11 +403,13 @@ XidStatus DtmGlobalGetTransStatus(TransactionId xid, bool wait)
403403
}
404404

405405
// Reserves at least 'nXids' successive xids for local transactions. The xids
406-
// reserved are not less than 'xid' in value. Returns the actual number
407-
// of xids reserved, and sets the 'first' xid accordingly. The number of xids
408-
// reserved is guaranteed to be at least nXids.
406+
// reserved are not less than 'xid' in value. Returns the actual number of xids
407+
// reserved, and sets the 'first' xid accordingly. The number of xids reserved
408+
// is guaranteed to be at least nXids.
409409
// In other words, *first ≥ xid and result ≥ nXids.
410-
int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first)
410+
// Also sets the 'active' snapshot, which is used as a container for the list
411+
// of active global transactions.
412+
int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first, Snapshot active)
411413
{
412414
bool ok;
413415
xid_t xmin, xmax;
@@ -421,6 +423,7 @@ int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first)
421423

422424
if (!dtm_read_hex16(dtm, &xmin)) goto failure;
423425
if (!dtm_read_hex16(dtm, &xmax)) goto failure;
426+
if (!dtm_read_snapshot(dtm, active)) goto failure;
424427

425428
*first = xmin;
426429
count = xmax - xmin + 1;

contrib/pg_xtm/libdtm.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,12 @@ XidStatus DtmGlobalSetTransStatus(TransactionId xid, XidStatus status, bool wait
3232
XidStatus DtmGlobalGetTransStatus(TransactionId xid, bool wait);
3333

3434
// Reserves at least 'nXids' successive xids for local transactions. The xids
35-
// reserved are not less than 'xid' in value. Returns the actual number
36-
// of xids reserved, and sets the 'first' xid accordingly. The number of xids
37-
// reserved is guaranteed to be at least nXids.
35+
// reserved are not less than 'xid' in value. Returns the actual number of xids
36+
// reserved, and sets the 'first' xid accordingly. The number of xids reserved
37+
// is guaranteed to be at least nXids.
3838
// In other words, *first ≥ xid and result ≥ nXids.
39-
int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first);
39+
// Also sets the 'active' snapshot, which is used as a container for the list
40+
// of active global transactions.
41+
int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first, Snapshot active);
4042

4143
#endif

0 commit comments

Comments
 (0)