Skip to content

Commit 523a7bc

Browse files
committed
Fix snapshot merging
1 parent cfe0454 commit 523a7bc

File tree

13 files changed

+188
-105
lines changed

13 files changed

+188
-105
lines changed

contrib/pg_gtm/dtmd/bin/dtmd

5.2 KB
Binary file not shown.

contrib/pg_gtm/tests/transfers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ import (
99
)
1010

1111
const (
12-
TRANSFER_CONNECTIONS = 4
12+
TRANSFER_CONNECTIONS = 8
1313
INIT_AMOUNT = 10000
1414
N_ITERATIONS = 10000
15-
N_ACCOUNTS = 1//100000
15+
N_ACCOUNTS = 8//100000
1616
)
1717

1818

contrib/pg_xtm/dtmd/src/main.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,14 +316,17 @@ static char *onabort(void *stream, void *clientdata, cmd_t *cmd) {
316316
static void gen_snapshot(Snapshot *s, int node) {
317317
s->nactive = 0;
318318
s->xmin = xmax[node];
319-
s->xmax = s->xmin + 1;
319+
s->xmax = 0;
320320
int i;
321321
for (i = 0; i < transactions_count; i++) {
322322
Transaction *t = transactions[i].participants + node;
323323
if (t->active) {
324324
if (t->xid < s->xmin) {
325325
s->xmin = t->xid;
326326
}
327+
if (t->xid >= s->xmax) {
328+
s->xmax = t->xid + 1;
329+
}
327330
s->active[s->nactive++] = t->xid;
328331
}
329332
}

contrib/pg_xtm/pg_dtm--1.0.sql

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,11 @@
44
CREATE FUNCTION dtm_begin_transaction(nodes integer[], xids integer[]) RETURNS void
55
AS 'MODULE_PATHNAME','dtm_begin_transaction'
66
LANGUAGE C;
7+
8+
CREATE FUNCTION dtm_get_current_snapshot_xmin() RETURNS bigint
9+
AS 'MODULE_PATHNAME','dtm_get_current_snapshot_xmin'
10+
LANGUAGE C;
11+
12+
CREATE FUNCTION dtm_get_current_snapshot_xmax() RETURNS bigint
13+
AS 'MODULE_PATHNAME','dtm_get_current_snapshot_xmax'
14+
LANGUAGE C;

contrib/pg_xtm/pg_dtm.c

Lines changed: 128 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "utils/tqual.h"
3131
#include "utils/array.h"
3232
#include "utils/builtins.h"
33+
#include "utils/memutils.h"
3334

3435
#include "libdtm.h"
3536

@@ -41,41 +42,44 @@ typedef struct
4142

4243
#define DTM_SHMEM_SIZE (1024*1024)
4344
#define DTM_HASH_SIZE 1003
45+
#define XTM_CONNECT_ATTEMPTS 10
46+
4447

4548
void _PG_init(void);
4649
void _PG_fini(void);
4750

4851
static void DtmEnsureConnection(void);
49-
static Snapshot DtmGetSnapshot(Snapshot snapshot);
50-
static void DtmCopySnapshot(Snapshot dst, Snapshot src);
52+
static Snapshot DtmGetSnapshot(void);
53+
static void DtmMergeSnapshots(Snapshot dst, Snapshot src);
54+
static Snapshot DtmCopySnapshot(Snapshot snapshot);
5155
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
5256
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
5357
static void DtmUpdateRecentXmin(void);
5458
static void DtmInitialize();
5559
static void DtmXactCallback(XactEvent event, void *arg);
5660

57-
static bool TransactionIdIsInDtmSnapshot(Snapshot s, TransactionId xid);
58-
static bool TransactionIdIsInDoubt(Snapshot s, TransactionId xid);
61+
static bool TransactionIdIsInDtmSnapshot(TransactionId xid);
62+
static bool TransactionIdIsInDoubt(TransactionId xid);
5963

6064
static void dtm_shmem_startup(void);
6165

6266
static shmem_startup_hook_type prev_shmem_startup_hook;
6367
static HTAB* xid_in_doubt;
6468
static DtmState* dtm;
6569
static TransactionId DtmCurrentXid = InvalidTransactionId;
70+
static Snapshot CurrentTransactionSnapshot;
6671

6772
static NodeId DtmNodeId;
6873
static DTMConn DtmConn;
6974
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
70-
static bool DtmHasSnapshot = false;
75+
static SnapshotData DtmLocalSnapshot = { HeapTupleSatisfiesMVCC };
7176
static bool DtmGlobalTransaction = false;
72-
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot };
77+
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmCopySnapshot };
7378
static DTMConn DtmConn;
7479

7580
#define XTM_TRACE(fmt, ...)
76-
//#define XTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
7781
#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
78-
#define XTM_CONNECT_ATTEMPTS 10
82+
//#define XTM_INFO(fmt, ...)
7983

8084
static void DtmEnsureConnection(void)
8185
{
@@ -113,20 +117,24 @@ static void DumpSnapshot(Snapshot s, char *name)
113117
XTM_INFO("%s\n", buf);
114118
}
115119

116-
static bool TransactionIdIsInDtmSnapshot(Snapshot s, TransactionId xid)
120+
static bool TransactionIdIsInDtmSnapshot(TransactionId xid)
117121
{
118-
return xid >= s->xmax
119-
|| bsearch(&xid, s->xip, s->xcnt, sizeof(TransactionId), xidComparator) != NULL;
122+
return xid >= DtmSnapshot.xmax
123+
|| bsearch(&xid, DtmSnapshot.xip, DtmSnapshot.xcnt, sizeof(TransactionId), xidComparator) != NULL;
120124
}
121125

122-
static bool TransactionIdIsInDoubt(Snapshot s, TransactionId xid)
126+
static bool TransactionIdIsInDoubt(TransactionId xid)
123127
{
124128
bool inDoubt;
125129

126-
if (!TransactionIdIsInDtmSnapshot(s, xid)) {
130+
if (!TransactionIdIsInDtmSnapshot(xid)) {
127131
LWLockAcquire(dtm->lock, LW_SHARED);
128132
inDoubt = hash_search(xid_in_doubt, &xid, HASH_FIND, NULL) != NULL;
129133
LWLockRelease(dtm->lock);
134+
if (!inDoubt) {
135+
XLogRecPtr lsn;
136+
inDoubt = CLOGTransactionIdGetStatus(xid, &lsn) != TRANSACTION_STATUS_IN_PROGRESS;
137+
}
130138
if (inDoubt) {
131139
XTM_INFO("Wait for transaction %d to complete\n", xid);
132140
XactLockTableWait(xid, NULL, NULL, XLTW_None);
@@ -136,50 +144,47 @@ static bool TransactionIdIsInDoubt(Snapshot s, TransactionId xid)
136144
return false;
137145
}
138146

139-
static void DtmCopySnapshot(Snapshot dst, Snapshot src)
147+
static void DtmMergeSnapshots(Snapshot dst, Snapshot src)
140148
{
141149
int i, j, n;
142-
static TransactionId* buf;
143150
TransactionId xid;
144-
145-
if (buf == NULL) {
146-
buf = (TransactionId *)malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId) * 2);
147-
}
148-
149-
DumpSnapshot(dst, "local");
150-
DumpSnapshot(src, "DTM");
151+
Snapshot local;
151152

152153
Assert(TransactionIdIsValid(src->xmin) && TransactionIdIsValid(src->xmax));
153154

154-
/* Check that globall competed transactions are not included in local snapshot */
155-
RefreshLocalSnapshot:
156-
GetLocalSnapshotData(dst);
157-
for (i = 0; i < dst->xcnt; i++) {
158-
if (TransactionIdIsInDoubt(src, dst->xip[i])) {
159-
goto RefreshLocalSnapshot;
160-
}
155+
GetLocalSnapshot:
156+
local = GetSnapshotData(&DtmLocalSnapshot);
157+
for (i = 0; i < local->xcnt; i++) {
158+
if (TransactionIdIsInDoubt(local->xip[i])) {
159+
goto GetLocalSnapshot;
160+
}
161161
}
162-
for (xid = dst->xmax; xid < src->xmax; xid++) {
163-
if (TransactionIdIsInDoubt(src, xid)) {
164-
goto RefreshLocalSnapshot;
162+
for (xid = local->xmax; xid < src->xmax; xid++) {
163+
if (TransactionIdIsInDoubt(xid)) {
164+
goto GetLocalSnapshot;
165165
}
166166
}
167+
DumpSnapshot(local, "local");
168+
DumpSnapshot(src, "DTM");
167169

168170
/* Merge two snapshots: produce most restrictive snapshots whihc includes running transactions from both of them */
169-
if (dst->xmin > src->xmin) {
170-
dst->xmin = src->xmin;
171-
}
172-
if (dst->xmax > src->xmax) {
173-
dst->xmax = src->xmax;
171+
dst->xmin = local->xmin < src->xmin ? local->xmin : src->xmin;
172+
dst->xmax = local->xmax < src->xmax ? local->xmax : src->xmax;
173+
174+
n = local->xcnt;
175+
for (xid = local->xmax; xid <= src->xmin; xid++) {
176+
local->xip[n++] = xid;
174177
}
178+
memcpy(local->xip + n, src->xip, src->xcnt*sizeof(TransactionId));
179+
n += src->xcnt;
180+
Assert(n <= GetMaxSnapshotXidCount());
175181

176-
memcpy(buf, dst->xip, dst->xcnt*sizeof(TransactionId));
177-
memcpy(buf + dst->xcnt, src->xip, src->xcnt*sizeof(TransactionId));
178-
qsort(buf, dst->xcnt + src->xcnt, sizeof(TransactionId), xidComparator);
182+
qsort(local->xip, n, sizeof(TransactionId), xidComparator);
179183
xid = InvalidTransactionId;
180-
for (i = 0, j = 0, n = dst->xcnt + src->xcnt; i < n && buf[i] < dst->xmax; i++) {
181-
if (buf[i] != xid) {
182-
dst->xip[j++] = xid = buf[i];
184+
185+
for (i = 0, j = 0; i < n && local->xip[i] < dst->xmax; i++) {
186+
if (local->xip[i] != xid) {
187+
dst->xip[j++] = xid = local->xip[i];
183188
}
184189
}
185190
dst->xcnt = j;
@@ -203,28 +208,50 @@ static void DtmUpdateRecentXmin(void)
203208
if (RecentGlobalXmin > xmin) {
204209
RecentGlobalXmin = xmin;
205210
}
206-
RecentXmin = xmin;
211+
if (RecentXmin > xmin) {
212+
RecentXmin = xmin;
213+
}
207214
}
208215
}
209216

210-
static Snapshot DtmGetSnapshot(Snapshot snapshot)
217+
static Snapshot DtmCopySnapshot(Snapshot snapshot)
211218
{
212-
if (!IsMVCCSnapshot(snapshot) || snapshot == &CatalogSnapshotData) {
213-
snapshot = GetLocalSnapshotData(snapshot);
214-
} else {
215-
XTM_TRACE("XTM: DtmGetSnapshot \n");
216-
if (DtmGlobalTransaction/* && !DtmHasSnapshot*/) {
217-
DtmHasSnapshot = true;
218-
DtmEnsureConnection();
219-
DtmGlobalGetSnapshot(DtmConn, DtmNodeId, GetCurrentTransactionId(), &DtmSnapshot);
220-
}
221-
snapshot = GetLocalSnapshotData(snapshot);
222-
if (DtmHasSnapshot) {
223-
DtmCopySnapshot(snapshot, &DtmSnapshot);
224-
DtmUpdateRecentXmin();
225-
}
219+
Snapshot newsnap;
220+
Size size = sizeof(SnapshotData) + GetMaxSnapshotXidCount() * sizeof(TransactionId);
221+
Size subxipoff = size;
222+
if (snapshot->subxcnt > 0) {
223+
size += snapshot->subxcnt * sizeof(TransactionId);
226224
}
227-
return snapshot;
225+
newsnap = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
226+
memcpy(newsnap, snapshot, sizeof(SnapshotData));
227+
228+
newsnap->regd_count = 0;
229+
newsnap->active_count = 0;
230+
newsnap->copied = true;
231+
232+
newsnap->xip = (TransactionId *) (newsnap + 1);
233+
if (snapshot->xcnt > 0)
234+
{
235+
memcpy(newsnap->xip, snapshot->xip, snapshot->xcnt * sizeof(TransactionId));
236+
}
237+
if (snapshot->subxcnt > 0 &&
238+
(!snapshot->suboverflowed || snapshot->takenDuringRecovery))
239+
{
240+
newsnap->subxip = (TransactionId *) ((char *) newsnap + subxipoff);
241+
memcpy(newsnap->subxip, snapshot->subxip,
242+
snapshot->subxcnt * sizeof(TransactionId));
243+
}
244+
else
245+
newsnap->subxip = NULL;
246+
247+
return newsnap;
248+
}
249+
250+
251+
static Snapshot DtmGetSnapshot()
252+
{
253+
CurrentTransactionSnapshot = GetLocalTransactionSnapshot();
254+
return CurrentTransactionSnapshot;
228255
}
229256

230257

@@ -243,22 +270,28 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
243270
if (DtmGlobalTransaction) {
244271
/* Already should be IN_PROGRESS */
245272
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
246-
DtmHasSnapshot = false;
247273
DtmGlobalTransaction = false;
248-
DtmEnsureConnection();
249-
XTM_INFO("Begin commit transaction %d\n", xid);
250-
251-
DtmCurrentXid = xid;
252-
LWLockAcquire(dtm->lock, LW_EXCLUSIVE);
253-
hash_search(xid_in_doubt, &DtmCurrentXid, HASH_ENTER, NULL);
254-
LWLockRelease(dtm->lock);
255-
256-
if (!DtmGlobalSetTransStatus(DtmConn, DtmNodeId, xid, status, true) && status != TRANSACTION_STATUS_ABORTED) {
257-
elog(ERROR, "DTMD failed to set transaction status");
274+
CurrentTransactionSnapshot = NULL;
275+
if (status == TRANSACTION_STATUS_ABORTED) {
276+
CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn);
277+
DtmEnsureConnection();
278+
DtmGlobalSetTransStatus(DtmConn, DtmNodeId, xid, status, false);
279+
XTM_INFO("Abort transaction %d\n", xid);
280+
return;
281+
} else {
282+
DtmEnsureConnection();
283+
XTM_INFO("Begin commit transaction %d\n", xid);
284+
DtmCurrentXid = xid;
285+
LWLockAcquire(dtm->lock, LW_EXCLUSIVE);
286+
hash_search(xid_in_doubt, &DtmCurrentXid, HASH_ENTER, NULL);
287+
LWLockRelease(dtm->lock);
288+
if (!DtmGlobalSetTransStatus(DtmConn, DtmNodeId, xid, status, true)) {
289+
elog(ERROR, "DTMD failed to set transaction status");
290+
}
291+
XTM_INFO("Commit transaction %d\n", xid);
258292
}
259-
XTM_INFO("Commit transaction %d\n", xid);
260293
} else {
261-
elog(WARNING, "Set transaction %u status in local CLOG" , xid);
294+
XTM_INFO("Set transaction %u status in local CLOG" , xid);
262295
}
263296
} else {
264297
XidStatus gs;
@@ -304,6 +337,7 @@ static void DtmInitialize()
304337
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE);
305338

306339
RegisterXactCallback(DtmXactCallback, NULL);
340+
DtmInitSnapshot(&DtmLocalSnapshot);
307341

308342
TM = &DtmTM;
309343
}
@@ -390,22 +424,41 @@ static void dtm_shmem_startup(void)
390424
PG_MODULE_MAGIC;
391425

392426
PG_FUNCTION_INFO_V1(dtm_begin_transaction);
427+
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xmax);
428+
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xmin);
393429

394430
Datum
395431
dtm_begin_transaction(PG_FUNCTION_ARGS)
396432
{
397433
GlobalTransactionId gtid;
398434
ArrayType* nodes = PG_GETARG_ARRAYTYPE_P(0);
399-
ArrayType* xids = PG_GETARG_ARRAYTYPE_P(1);
435+
ArrayType* xids = PG_GETARG_ARRAYTYPE_P(1);
400436
gtid.xids = (TransactionId*)ARR_DATA_PTR(xids);
401437
gtid.nodes = (NodeId*)ARR_DATA_PTR(nodes);
402438
gtid.nNodes = ArrayGetNItems(ARR_NDIM(nodes), ARR_DIMS(nodes));
403439
DtmGlobalTransaction = true;
440+
Assert(gtid.xids[DtmNodeId] == GetCurrentTransactionId());
404441
XTM_INFO("Start transaction {%d,%d} at node %d\n", gtid.xids[0], gtid.xids[1], DtmNodeId);
405-
XTM_TRACE("XTM: dtm_begin_transaction \n");
406442
if (DtmNodeId == gtid.nodes[0]) {
407443
DtmEnsureConnection();
408444
DtmGlobalStartTransaction(DtmConn, &gtid);
409445
}
446+
DtmEnsureConnection();
447+
DtmGlobalGetSnapshot(DtmConn, DtmNodeId, gtid.xids[DtmNodeId], &DtmSnapshot);
448+
Assert(CurrentTransactionSnapshot != NULL);
449+
DtmMergeSnapshots(CurrentTransactionSnapshot, &DtmSnapshot);
450+
DtmUpdateRecentXmin();
410451
PG_RETURN_VOID();
411452
}
453+
454+
Datum
455+
dtm_get_current_snapshot_xmin(PG_FUNCTION_ARGS)
456+
{
457+
PG_RETURN_INT64(CurrentTransactionSnapshot->xmin);
458+
}
459+
460+
Datum
461+
dtm_get_current_snapshot_xmax(PG_FUNCTION_ARGS)
462+
{
463+
PG_RETURN_INT64(CurrentTransactionSnapshot->xmax);
464+
}

0 commit comments

Comments
 (0)