Skip to content

Commit 720e45b

Browse files
committed
GTM seems to work
1 parent 3157940 commit 720e45b

File tree

5 files changed

+137
-53
lines changed

5 files changed

+137
-53
lines changed

contrib/pg_gtm/pg_dtm.c

Lines changed: 65 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717
#include "storage/lmgr.h"
1818
#include "storage/shmem.h"
1919
#include "storage/ipc.h"
20+
#include "storage/proc.h"
21+
#include "storage/procarray.h"
2022
#include "access/xlogdefs.h"
2123
#include "access/xact.h"
2224
#include "access/transam.h"
2325
#include "access/xlog.h"
2426
#include "access/twophase.h"
2527
#include "utils/hsearch.h"
2628
#include "utils/tqual.h"
29+
#include "utils/snapmgr.h"
2730

2831
#include "libdtm.h"
2932
#include "pg_dtm.h"
@@ -48,6 +51,8 @@ typedef struct
4851
{
4952
nodeid_t node_id;
5053
cid_t last_cid;
54+
TransactionId last_xid;
55+
TransactionId first_xid;
5156
volatile slock_t lock;
5257
} DtmNodeState;
5358

@@ -66,7 +71,7 @@ typedef struct
6671
} DtmTransId;
6772

6873
//#define DTM_TRACE(x)
69-
#define DTM_TRACE(x) fprintf x
74+
#define DTM_TRACE(x) elog x
7075

7176
static shmem_startup_hook_type prev_shmem_startup_hook;
7277
static HTAB* xid2status;
@@ -75,6 +80,8 @@ static DtmNodeState* local;
7580
static DtmTransState dtm_tx;
7681
static DTMConn dtm_conn;
7782

83+
static SnapshotProvider DefaultSnapshotProvider;
84+
7885
void _PG_init(void);
7986
void _PG_fini(void);
8087

@@ -91,7 +98,7 @@ static void dtm_ensure_connection(void);
9198
void
9299
_PG_init(void)
93100
{
94-
DTM_TRACE((stderr, "DTM_PG_init \n"));
101+
DTM_TRACE((WARNING, "DTM_PG_init \n"));
95102

96103
/*
97104
* In order to create our shared memory area, we have to be loaded via
@@ -161,7 +168,7 @@ static GlobalTransactionId dtm_get_global_trans_id()
161168
static void
162169
dtm_xact_callback(XactEvent event, void *arg)
163170
{
164-
DTM_TRACE((stderr, "Backend %d dtm_xact_callback %d\n", getpid(), event));
171+
DTM_TRACE((WARNING, "Backend %d dtm_xact_callback %d\n", getpid(), event));
165172
switch (event)
166173
{
167174
case XACT_EVENT_START:
@@ -220,7 +227,7 @@ dtm_extend(PG_FUNCTION_ARGS)
220227
{
221228
GlobalTransactionId gtid = PG_GETARG_CSTRING(0);
222229
cid_t cid = DtmLocalExtend(&dtm_tx, gtid);
223-
DTM_TRACE((stderr, "Backend %d extends transaction %u(%s) to global with cid=%llu\n", getpid(), dtm_tx.xid, gtid, cid));
230+
DTM_TRACE((WARNING, "Backend %d extends transaction %u(%s) to global with cid=%llu\n", getpid(), dtm_tx.xid, gtid, cid));
224231
PG_RETURN_INT64(cid);
225232
}
226233

@@ -229,7 +236,7 @@ dtm_access(PG_FUNCTION_ARGS)
229236
{
230237
cid_t cid = PG_GETARG_INT64(0);
231238
GlobalTransactionId gtid = PG_GETARG_CSTRING(1);
232-
DTM_TRACE((stderr, "Backend %d joins transaction %u(%s) with cid=%llu\n", getpid(), dtm_tx.xid, gtid, cid));
239+
DTM_TRACE((WARNING, "Backend %d joins transaction %u(%s) with cid=%llu\n", getpid(), dtm_tx.xid, gtid, cid));
233240
cid = DtmLocalAccess(&dtm_tx, gtid, cid);
234241
PG_RETURN_INT64(cid);
235242
}
@@ -240,7 +247,7 @@ dtm_begin_prepare(PG_FUNCTION_ARGS)
240247
GlobalTransactionId gtid = PG_GETARG_CSTRING(0);
241248
nodeid_t coordinator = PG_GETARG_INT32(1);
242249
DtmLocalBeginPrepare(gtid, coordinator);
243-
DTM_TRACE((stderr, "Backend %d begins prepare of transaction '%s'\n", getpid(), gtid));
250+
DTM_TRACE((WARNING, "Backend %d begins prepare of transaction '%s'\n", getpid(), gtid));
244251
PG_RETURN_VOID();
245252
}
246253

@@ -250,7 +257,7 @@ dtm_prepare(PG_FUNCTION_ARGS)
250257
GlobalTransactionId gtid = PG_GETARG_CSTRING(0);
251258
cid_t cid = PG_GETARG_INT64(1);
252259
cid = DtmLocalPrepare(gtid, cid);
253-
DTM_TRACE((stderr, "Backend %d prepares transaction '%s' with cid=%llu\n", getpid(), gtid, cid));
260+
DTM_TRACE((WARNING, "Backend %d prepares transaction '%s' with cid=%llu\n", getpid(), gtid, cid));
254261
PG_RETURN_INT64(cid);
255262
}
256263

@@ -259,7 +266,7 @@ dtm_end_prepare(PG_FUNCTION_ARGS)
259266
{
260267
GlobalTransactionId gtid = PG_GETARG_CSTRING(0);
261268
cid_t cid = PG_GETARG_INT64(1);
262-
DTM_TRACE((stderr, "Backend %d ends prepare of transaction '%s' with cid=%llu\n", getpid(), gtid, cid));
269+
DTM_TRACE((WARNING, "Backend %d ends prepare of transaction '%s' with cid=%llu\n", getpid(), gtid, cid));
263270
DtmLocalEndPrepare(gtid, cid);
264271
PG_RETURN_VOID();
265272
}
@@ -324,12 +331,13 @@ static VisibilityCheckResult DtmVisibilityCheck(TransactionId xid)
324331
if (ts != NULL)
325332
{
326333
if (ts->cid > dtm_tx.snapshot) {
334+
DTM_TRACE((WARNING, "Backend %d xid %u=>%llu is not visible for snapshot %llu\n", getpid(), xid, ts->cid, dtm_tx.snapshot));
327335
result = XID_INVISIBLE;
328336
break;
329337
}
330338
if (ts->status == XID_INPROGRESS)
331339
{
332-
DTM_TRACE((stderr, "Wait for in-doubt transaction %u\n", xid));
340+
DTM_TRACE((WARNING, "Wait for in-doubt transaction %u\n", xid));
333341
SpinLockRelease(&local->lock);
334342
pg_usleep(delay);
335343
if (delay*2 <= MAX_WAIT_TIMEOUT) {
@@ -339,19 +347,22 @@ static VisibilityCheckResult DtmVisibilityCheck(TransactionId xid)
339347
}
340348
else
341349
{
350+
if (ts->status != XID_COMMITTED) {
351+
DTM_TRACE((WARNING, "Backend %d xid %u=>%llu belongs to aborted transaction\n", getpid(), xid, ts->cid));
352+
}
342353
result = ts->status == XID_COMMITTED ? XID_VISIBLE : XID_INVISIBLE;
343-
DTM_TRACE((stderr, "Backend %d visibility check for %u=%d\n", getpid(), xid, result));
344354
break;
345355
}
346356
}
347357
else
348358
{
349-
//DTM_TRACE((stderr, "Visibility check is skept for transaction %u\n", xid));
359+
//DTM_TRACE((WARNING, "Visibility check is skept for transaction %u\n", xid));
350360
result = XID_IN_DOUBT;
351361
break;
352362
}
353363
}
354-
SpinLockRelease(&local->lock);
364+
DTM_TRACE((WARNING, "Backend %d visibility check for %u=%d\n", getpid(), xid, result));
365+
SpinLockRelease(&local->lock);
355366
return result;
356367
}
357368

@@ -381,12 +392,15 @@ void DtmInitialize()
381392
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_KEYCOPY);
382393

383394
RegisterTransactionVisibilityCallback(DtmVisibilityCheck);
395+
DefaultSnapshotProvider = SetSnapshotProvider(DtmSnapshotProvider);
384396

385397
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
386398
local = (DtmNodeState*)ShmemInitStruct("dtm", sizeof(DtmNodeState), &found);
387399
if (!found)
388400
{
389401
local->last_cid = INVALID_CID;
402+
local->last_xid = InvalidTransactionId;
403+
local->first_xid = InvalidTransactionId;
390404
local->node_id = -1;
391405
SpinLockInit(&local->lock);
392406
}
@@ -399,18 +413,38 @@ void DtmInitialize()
399413
}
400414

401415

416+
Snapshot DtmSnapshotProvider(Snapshot snapshot)
417+
{
418+
if (dtm_tx.local_snapshot == NULL) {
419+
dtm_tx.local_snapshot = DefaultSnapshotProvider(snapshot);
420+
}
421+
if (local->last_xid != InvalidTransactionId && RecentGlobalDataXmin > local->last_xid) {
422+
RecentGlobalDataXmin = local->last_xid;
423+
}
424+
//if (local->first_xid != InvalidTransactionId && RecentGlobalXmin > local->first_xid) {
425+
// RecentGlobalXmin = local->first_xid;
426+
//}
427+
return dtm_tx.local_snapshot;
428+
}
429+
402430
void DtmLocalBegin(DtmTransState* x)
403431
{
404432
if (x->xid == InvalidTransactionId) {
405-
// SpinLockAcquire(&local->lock);
433+
SpinLockAcquire(&local->lock);
406434
x->xid = GetCurrentTransactionId();
407435
Assert(x->xid != InvalidTransactionId);
408436
x->cid = INVALID_CID;
409437
x->is_global = false;
410438
x->is_prepared = false;
411439
x->snapshot = local->last_cid;
412-
// SpinLockRelease(&local->lock);
413-
DTM_TRACE((stderr, "DtmLocalBegin: transaction %u uses local snapshot %llu\n", x->xid, x->snapshot));
440+
if (RecentGlobalDataXmin > local->last_xid) {
441+
RecentGlobalDataXmin = local->last_xid;
442+
}
443+
//if (RecentGlobalXmin > local->first_xid) {
444+
// RecentGlobalXmin = local->first_xid;
445+
//}
446+
SpinLockRelease(&local->lock);
447+
DTM_TRACE((WARNING, "DtmLocalBegin: transaction %u uses local snapshot %llu\n", x->xid, x->snapshot));
414448
}
415449
}
416450

@@ -487,7 +521,7 @@ void DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
487521
Assert(ts != NULL);
488522
ts->cid = cid;
489523

490-
DTM_TRACE((stderr, "Prepare transaction %u(%s) with CSN %llu\n", id->xid, gtid, cid));
524+
DTM_TRACE((WARNING, "Prepare transaction %u(%s) with CSN %llu\n", id->xid, gtid, cid));
491525
}
492526
SpinLockRelease(&local->lock);
493527
}
@@ -503,7 +537,7 @@ void DtmLocalCommitPrepared(DtmTransState* x, GlobalTransactionId gtid)
503537

504538
x->is_prepared = true;
505539
x->xid = id->xid;
506-
DTM_TRACE((stderr, "Global transaction %u(%s) is prepared\n", x->xid, gtid));
540+
DTM_TRACE((WARNING, "Global transaction %u(%s) is prepared\n", x->xid, gtid));
507541
}
508542
SpinLockRelease(&local->lock);
509543
}
@@ -519,12 +553,22 @@ void DtmLocalCommit(DtmTransState* x)
519553
Assert(ts);
520554
if (ts->cid > local->last_cid) {
521555
local->last_cid = ts->cid;
556+
local->last_xid = x->xid;
557+
if (local->first_xid == InvalidTransactionId) {
558+
local->first_xid = x->xid;
559+
}
560+
if (RecentGlobalDataXmin > local->last_xid) {
561+
RecentGlobalDataXmin = local->last_xid;
562+
}
563+
//if (RecentGlobalXmin > local->first_xid) {
564+
// RecentGlobalXmin = local->first_xid;
565+
//}
522566
}
523567
if (ts->is_coordinator) {
524568
gcid = ts->cid;
525569
}
526570
ts->status = XID_COMMITTED;
527-
DTM_TRACE((stderr, "Transaction %u is committed at %llu\n", x->xid, ts->cid));
571+
DTM_TRACE((WARNING, "Transaction %u is committed at %llu\n", x->xid, ts->cid));
528572
}
529573
SpinLockRelease(&local->lock);
530574

@@ -548,7 +592,7 @@ void DtmLocalAbortPrepared(DtmTransState* x, GlobalTransactionId gtid)
548592
x->is_prepared = true;
549593
x->xid = id->xid;
550594

551-
DTM_TRACE((stderr, "Global transaction %u(%s) is preaborted\n", x->xid, gtid));
595+
DTM_TRACE((WARNING, "Global transaction %u(%s) is preaborted\n", x->xid, gtid));
552596
}
553597
SpinLockRelease(&local->lock);
554598
}
@@ -566,7 +610,7 @@ void DtmLocalAbort(DtmTransState* x)
566610
gcid = ts->cid;
567611
}
568612
ts->status = XID_ABORTED;
569-
DTM_TRACE((stderr, "Local transaction %u is aborted at %llu\n", x->xid, x->cid));
613+
DTM_TRACE((WARNING, "Local transaction %u is aborted at %llu\n", x->xid, x->cid));
570614
}
571615
SpinLockRelease(&local->lock);
572616

@@ -583,5 +627,6 @@ void DtmLocalEnd(DtmTransState* x)
583627
x->is_prepared = false;
584628
x->xid = InvalidTransactionId;
585629
x->cid = INVALID_CID;
630+
x->local_snapshot = NULL;
586631
}
587632

contrib/pg_gtm/pg_dtm.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ typedef struct {
99
bool is_prepared;
1010
cid_t cid;
1111
cid_t snapshot;
12+
Snapshot local_snapshot;
1213
} DtmTransState;
1314

1415
typedef char const* GlobalTransactionId;
@@ -37,5 +38,6 @@ void DtmLocalCommit(DtmTransState* x);
3738
void DtmLocalAbort(DtmTransState* x);
3839
/* Invoked at the end of any local or global transaction: free transaction state */
3940
void DtmLocalEnd(DtmTransState* x);
41+
Snapshot DtmSnapshotProvider(Snapshot snashot);
4042

4143
#endif

src/backend/storage/ipc/procarray.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ typedef struct ProcArrayStruct
9595
int pgprocnos[FLEXIBLE_ARRAY_MEMBER];
9696
} ProcArrayStruct;
9797

98+
static Snapshot
99+
GetCurrentSnapshotData(Snapshot snapshot);
100+
101+
static SnapshotProvider GetSnapshotImpl = GetCurrentSnapshotData;
102+
98103
static ProcArrayStruct *procArray;
99104

100105
static PGPROC *allProcs;
@@ -1492,6 +1497,20 @@ GetMaxSnapshotSubxidCount(void)
14921497
*/
14931498
Snapshot
14941499
GetSnapshotData(Snapshot snapshot)
1500+
{
1501+
return (*GetSnapshotImpl)(snapshot);
1502+
}
1503+
1504+
SnapshotProvider
1505+
SetSnapshotProvider(SnapshotProvider provider)
1506+
{
1507+
SnapshotProvider old = GetSnapshotImpl;
1508+
GetSnapshotImpl = provider;
1509+
return old;
1510+
}
1511+
1512+
static Snapshot
1513+
GetCurrentSnapshotData(Snapshot snapshot)
14951514
{
14961515
ProcArrayStruct *arrayP = procArray;
14971516
TransactionId xmin;

0 commit comments

Comments
 (0)