17
17
#include "storage/lmgr.h"
18
18
#include "storage/shmem.h"
19
19
#include "storage/ipc.h"
20
+ #include "storage/proc.h"
21
+ #include "storage/procarray.h"
20
22
#include "access/xlogdefs.h"
21
23
#include "access/xact.h"
22
24
#include "access/transam.h"
23
25
#include "access/xlog.h"
24
26
#include "access/twophase.h"
25
27
#include "utils/hsearch.h"
26
28
#include "utils/tqual.h"
29
+ #include "utils/snapmgr.h"
27
30
28
31
#include "libdtm.h"
29
32
#include "pg_dtm.h"
@@ -48,6 +51,8 @@ typedef struct
48
51
{
49
52
nodeid_t node_id ;
50
53
cid_t last_cid ;
54
+ TransactionId last_xid ;
55
+ TransactionId first_xid ;
51
56
volatile slock_t lock ;
52
57
} DtmNodeState ;
53
58
@@ -66,7 +71,7 @@ typedef struct
66
71
} DtmTransId ;
67
72
68
73
//#define DTM_TRACE(x)
69
- #define DTM_TRACE (x ) fprintf x
74
+ #define DTM_TRACE (x ) elog x
70
75
71
76
static shmem_startup_hook_type prev_shmem_startup_hook ;
72
77
static HTAB * xid2status ;
@@ -75,6 +80,8 @@ static DtmNodeState* local;
75
80
static DtmTransState dtm_tx ;
76
81
static DTMConn dtm_conn ;
77
82
83
+ static SnapshotProvider DefaultSnapshotProvider ;
84
+
78
85
void _PG_init (void );
79
86
void _PG_fini (void );
80
87
@@ -91,7 +98,7 @@ static void dtm_ensure_connection(void);
91
98
void
92
99
_PG_init (void )
93
100
{
94
- DTM_TRACE ((stderr , "DTM_PG_init \n" ));
101
+ DTM_TRACE ((WARNING , "DTM_PG_init \n" ));
95
102
96
103
/*
97
104
* In order to create our shared memory area, we have to be loaded via
@@ -161,7 +168,7 @@ static GlobalTransactionId dtm_get_global_trans_id()
161
168
static void
162
169
dtm_xact_callback (XactEvent event , void * arg )
163
170
{
164
- DTM_TRACE ((stderr , "Backend %d dtm_xact_callback %d\n" , getpid (), event ));
171
+ DTM_TRACE ((WARNING , "Backend %d dtm_xact_callback %d\n" , getpid (), event ));
165
172
switch (event )
166
173
{
167
174
case XACT_EVENT_START :
@@ -220,7 +227,7 @@ dtm_extend(PG_FUNCTION_ARGS)
220
227
{
221
228
GlobalTransactionId gtid = PG_GETARG_CSTRING (0 );
222
229
cid_t cid = DtmLocalExtend (& dtm_tx , gtid );
223
- DTM_TRACE ((stderr , "Backend %d extends transaction %u(%s) to global with cid=%llu\n" , getpid (), dtm_tx .xid , gtid , cid ));
230
+ DTM_TRACE ((WARNING , "Backend %d extends transaction %u(%s) to global with cid=%llu\n" , getpid (), dtm_tx .xid , gtid , cid ));
224
231
PG_RETURN_INT64 (cid );
225
232
}
226
233
@@ -229,7 +236,7 @@ dtm_access(PG_FUNCTION_ARGS)
229
236
{
230
237
cid_t cid = PG_GETARG_INT64 (0 );
231
238
GlobalTransactionId gtid = PG_GETARG_CSTRING (1 );
232
- DTM_TRACE ((stderr , "Backend %d joins transaction %u(%s) with cid=%llu\n" , getpid (), dtm_tx .xid , gtid , cid ));
239
+ DTM_TRACE ((WARNING , "Backend %d joins transaction %u(%s) with cid=%llu\n" , getpid (), dtm_tx .xid , gtid , cid ));
233
240
cid = DtmLocalAccess (& dtm_tx , gtid , cid );
234
241
PG_RETURN_INT64 (cid );
235
242
}
@@ -240,7 +247,7 @@ dtm_begin_prepare(PG_FUNCTION_ARGS)
240
247
GlobalTransactionId gtid = PG_GETARG_CSTRING (0 );
241
248
nodeid_t coordinator = PG_GETARG_INT32 (1 );
242
249
DtmLocalBeginPrepare (gtid , coordinator );
243
- DTM_TRACE ((stderr , "Backend %d begins prepare of transaction '%s'\n" , getpid (), gtid ));
250
+ DTM_TRACE ((WARNING , "Backend %d begins prepare of transaction '%s'\n" , getpid (), gtid ));
244
251
PG_RETURN_VOID ();
245
252
}
246
253
@@ -250,7 +257,7 @@ dtm_prepare(PG_FUNCTION_ARGS)
250
257
GlobalTransactionId gtid = PG_GETARG_CSTRING (0 );
251
258
cid_t cid = PG_GETARG_INT64 (1 );
252
259
cid = DtmLocalPrepare (gtid , cid );
253
- DTM_TRACE ((stderr , "Backend %d prepares transaction '%s' with cid=%llu\n" , getpid (), gtid , cid ));
260
+ DTM_TRACE ((WARNING , "Backend %d prepares transaction '%s' with cid=%llu\n" , getpid (), gtid , cid ));
254
261
PG_RETURN_INT64 (cid );
255
262
}
256
263
@@ -259,7 +266,7 @@ dtm_end_prepare(PG_FUNCTION_ARGS)
259
266
{
260
267
GlobalTransactionId gtid = PG_GETARG_CSTRING (0 );
261
268
cid_t cid = PG_GETARG_INT64 (1 );
262
- DTM_TRACE ((stderr , "Backend %d ends prepare of transaction '%s' with cid=%llu\n" , getpid (), gtid , cid ));
269
+ DTM_TRACE ((WARNING , "Backend %d ends prepare of transaction '%s' with cid=%llu\n" , getpid (), gtid , cid ));
263
270
DtmLocalEndPrepare (gtid , cid );
264
271
PG_RETURN_VOID ();
265
272
}
@@ -324,12 +331,13 @@ static VisibilityCheckResult DtmVisibilityCheck(TransactionId xid)
324
331
if (ts != NULL )
325
332
{
326
333
if (ts -> cid > dtm_tx .snapshot ) {
334
+ DTM_TRACE ((WARNING , "Backend %d xid %u=>%llu is not visible for snapshot %llu\n" , getpid (), xid , ts -> cid , dtm_tx .snapshot ));
327
335
result = XID_INVISIBLE ;
328
336
break ;
329
337
}
330
338
if (ts -> status == XID_INPROGRESS )
331
339
{
332
- DTM_TRACE ((stderr , "Wait for in-doubt transaction %u\n" , xid ));
340
+ DTM_TRACE ((WARNING , "Wait for in-doubt transaction %u\n" , xid ));
333
341
SpinLockRelease (& local -> lock );
334
342
pg_usleep (delay );
335
343
if (delay * 2 <= MAX_WAIT_TIMEOUT ) {
@@ -339,19 +347,22 @@ static VisibilityCheckResult DtmVisibilityCheck(TransactionId xid)
339
347
}
340
348
else
341
349
{
350
+ if (ts -> status != XID_COMMITTED ) {
351
+ DTM_TRACE ((WARNING , "Backend %d xid %u=>%llu belongs to aborted transaction\n" , getpid (), xid , ts -> cid ));
352
+ }
342
353
result = ts -> status == XID_COMMITTED ? XID_VISIBLE : XID_INVISIBLE ;
343
- DTM_TRACE ((stderr , "Backend %d visibility check for %u=%d\n" , getpid (), xid , result ));
344
354
break ;
345
355
}
346
356
}
347
357
else
348
358
{
349
- //DTM_TRACE((stderr , "Visibility check is skept for transaction %u\n", xid));
359
+ //DTM_TRACE((WARNING , "Visibility check is skept for transaction %u\n", xid));
350
360
result = XID_IN_DOUBT ;
351
361
break ;
352
362
}
353
363
}
354
- SpinLockRelease (& local -> lock );
364
+ DTM_TRACE ((WARNING , "Backend %d visibility check for %u=%d\n" , getpid (), xid , result ));
365
+ SpinLockRelease (& local -> lock );
355
366
return result ;
356
367
}
357
368
@@ -381,12 +392,15 @@ void DtmInitialize()
381
392
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_KEYCOPY );
382
393
383
394
RegisterTransactionVisibilityCallback (DtmVisibilityCheck );
395
+ DefaultSnapshotProvider = SetSnapshotProvider (DtmSnapshotProvider );
384
396
385
397
LWLockAcquire (AddinShmemInitLock , LW_EXCLUSIVE );
386
398
local = (DtmNodeState * )ShmemInitStruct ("dtm" , sizeof (DtmNodeState ), & found );
387
399
if (!found )
388
400
{
389
401
local -> last_cid = INVALID_CID ;
402
+ local -> last_xid = InvalidTransactionId ;
403
+ local -> first_xid = InvalidTransactionId ;
390
404
local -> node_id = -1 ;
391
405
SpinLockInit (& local -> lock );
392
406
}
@@ -399,18 +413,38 @@ void DtmInitialize()
399
413
}
400
414
401
415
416
+ Snapshot DtmSnapshotProvider (Snapshot snapshot )
417
+ {
418
+ if (dtm_tx .local_snapshot == NULL ) {
419
+ dtm_tx .local_snapshot = DefaultSnapshotProvider (snapshot );
420
+ }
421
+ if (local -> last_xid != InvalidTransactionId && RecentGlobalDataXmin > local -> last_xid ) {
422
+ RecentGlobalDataXmin = local -> last_xid ;
423
+ }
424
+ //if (local->first_xid != InvalidTransactionId && RecentGlobalXmin > local->first_xid) {
425
+ // RecentGlobalXmin = local->first_xid;
426
+ //}
427
+ return dtm_tx .local_snapshot ;
428
+ }
429
+
402
430
void DtmLocalBegin (DtmTransState * x )
403
431
{
404
432
if (x -> xid == InvalidTransactionId ) {
405
- // SpinLockAcquire(&local->lock);
433
+ SpinLockAcquire (& local -> lock );
406
434
x -> xid = GetCurrentTransactionId ();
407
435
Assert (x -> xid != InvalidTransactionId );
408
436
x -> cid = INVALID_CID ;
409
437
x -> is_global = false;
410
438
x -> is_prepared = false;
411
439
x -> snapshot = local -> last_cid ;
412
- // SpinLockRelease(&local->lock);
413
- DTM_TRACE ((stderr , "DtmLocalBegin: transaction %u uses local snapshot %llu\n" , x -> xid , x -> snapshot ));
440
+ if (RecentGlobalDataXmin > local -> last_xid ) {
441
+ RecentGlobalDataXmin = local -> last_xid ;
442
+ }
443
+ //if (RecentGlobalXmin > local->first_xid) {
444
+ // RecentGlobalXmin = local->first_xid;
445
+ //}
446
+ SpinLockRelease (& local -> lock );
447
+ DTM_TRACE ((WARNING , "DtmLocalBegin: transaction %u uses local snapshot %llu\n" , x -> xid , x -> snapshot ));
414
448
}
415
449
}
416
450
@@ -487,7 +521,7 @@ void DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
487
521
Assert (ts != NULL );
488
522
ts -> cid = cid ;
489
523
490
- DTM_TRACE ((stderr , "Prepare transaction %u(%s) with CSN %llu\n" , id -> xid , gtid , cid ));
524
+ DTM_TRACE ((WARNING , "Prepare transaction %u(%s) with CSN %llu\n" , id -> xid , gtid , cid ));
491
525
}
492
526
SpinLockRelease (& local -> lock );
493
527
}
@@ -503,7 +537,7 @@ void DtmLocalCommitPrepared(DtmTransState* x, GlobalTransactionId gtid)
503
537
504
538
x -> is_prepared = true;
505
539
x -> xid = id -> xid ;
506
- DTM_TRACE ((stderr , "Global transaction %u(%s) is prepared\n" , x -> xid , gtid ));
540
+ DTM_TRACE ((WARNING , "Global transaction %u(%s) is prepared\n" , x -> xid , gtid ));
507
541
}
508
542
SpinLockRelease (& local -> lock );
509
543
}
@@ -519,12 +553,22 @@ void DtmLocalCommit(DtmTransState* x)
519
553
Assert (ts );
520
554
if (ts -> cid > local -> last_cid ) {
521
555
local -> last_cid = ts -> cid ;
556
+ local -> last_xid = x -> xid ;
557
+ if (local -> first_xid == InvalidTransactionId ) {
558
+ local -> first_xid = x -> xid ;
559
+ }
560
+ if (RecentGlobalDataXmin > local -> last_xid ) {
561
+ RecentGlobalDataXmin = local -> last_xid ;
562
+ }
563
+ //if (RecentGlobalXmin > local->first_xid) {
564
+ // RecentGlobalXmin = local->first_xid;
565
+ //}
522
566
}
523
567
if (ts -> is_coordinator ) {
524
568
gcid = ts -> cid ;
525
569
}
526
570
ts -> status = XID_COMMITTED ;
527
- DTM_TRACE ((stderr , "Transaction %u is committed at %llu\n" , x -> xid , ts -> cid ));
571
+ DTM_TRACE ((WARNING , "Transaction %u is committed at %llu\n" , x -> xid , ts -> cid ));
528
572
}
529
573
SpinLockRelease (& local -> lock );
530
574
@@ -548,7 +592,7 @@ void DtmLocalAbortPrepared(DtmTransState* x, GlobalTransactionId gtid)
548
592
x -> is_prepared = true;
549
593
x -> xid = id -> xid ;
550
594
551
- DTM_TRACE ((stderr , "Global transaction %u(%s) is preaborted\n" , x -> xid , gtid ));
595
+ DTM_TRACE ((WARNING , "Global transaction %u(%s) is preaborted\n" , x -> xid , gtid ));
552
596
}
553
597
SpinLockRelease (& local -> lock );
554
598
}
@@ -566,7 +610,7 @@ void DtmLocalAbort(DtmTransState* x)
566
610
gcid = ts -> cid ;
567
611
}
568
612
ts -> status = XID_ABORTED ;
569
- DTM_TRACE ((stderr , "Local transaction %u is aborted at %llu\n" , x -> xid , x -> cid ));
613
+ DTM_TRACE ((WARNING , "Local transaction %u is aborted at %llu\n" , x -> xid , x -> cid ));
570
614
}
571
615
SpinLockRelease (& local -> lock );
572
616
@@ -583,5 +627,6 @@ void DtmLocalEnd(DtmTransState* x)
583
627
x -> is_prepared = false;
584
628
x -> xid = InvalidTransactionId ;
585
629
x -> cid = INVALID_CID ;
630
+ x -> local_snapshot = NULL ;
586
631
}
587
632
0 commit comments