@@ -69,13 +69,12 @@ static void dtm_shmem_startup(void);
69
69
static shmem_startup_hook_type prev_shmem_startup_hook ;
70
70
static HTAB * xid_in_doubt ;
71
71
static DtmState * dtm ;
72
- static TransactionId DtmCurrentXid = InvalidTransactionId ;
73
72
static Snapshot CurrentTransactionSnapshot ;
74
73
75
74
static TransactionId DtmNextXid ;
76
75
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
77
76
static SnapshotData DtmLocalSnapshot = { HeapTupleSatisfiesMVCC };
78
- static bool DtmIsGlobalTransaction = false ;
77
+ static bool DtmHasGlobalSnapshot ;
79
78
static int DtmLocalXidReserve ;
80
79
static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmCopySnapshot , DtmGetNextXid };
81
80
@@ -185,7 +184,7 @@ static void DtmUpdateRecentXmin(void)
185
184
186
185
XTM_TRACE ("XTM: DtmUpdateRecentXmin \n" );
187
186
188
- if (xmin != InvalidTransactionId ) {
187
+ if (TransactionIdIsValid ( xmin ) ) {
189
188
xmin -= vacuum_defer_cleanup_age ;
190
189
if (!TransactionIdIsNormal (xmin )) {
191
190
xmin = FirstNormalTransactionId ;
@@ -238,9 +237,8 @@ static Snapshot DtmCopySnapshot(Snapshot snapshot)
238
237
static TransactionId DtmGetNextXid ()
239
238
{
240
239
TransactionId xid ;
241
- if (TransactionIdIsValid (DtmNextXid )) {
240
+ if (TransactionIdIsValid (DtmNextXid )) {
242
241
xid = DtmNextXid ;
243
- DtmNextXid = InvalidTransactionId ;
244
242
} else {
245
243
LWLockAcquire (dtm -> xidLock , LW_EXCLUSIVE );
246
244
if (dtm -> nReservedXids == 0 ) {
@@ -260,9 +258,14 @@ static TransactionId DtmGetNextXid()
260
258
static Snapshot DtmGetSnapshot ()
261
259
{
262
260
Snapshot snapshot = GetLocalTransactionSnapshot ();
263
- if (DtmIsGlobalTransaction ) {
261
+ if (TransactionIdIsValid (DtmNextXid )) {
262
+ if (!DtmHasGlobalSnapshot ) {
263
+ Assert (!IsolationUsesXactSnapshot ());
264
+ DtmGlobalGetSnapshot (DtmNextXid , & DtmSnapshot );
265
+ }
264
266
DtmMergeSnapshots (snapshot , & DtmSnapshot );
265
267
DtmUpdateRecentXmin ();
268
+ DtmHasGlobalSnapshot = false;
266
269
}
267
270
CurrentTransactionSnapshot = snapshot ;
268
271
return snapshot ;
@@ -293,9 +296,8 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
293
296
return ;
294
297
} else {
295
298
XTM_INFO ("Begin commit transaction %d\n" , xid );
296
- DtmCurrentXid = xid ;
297
299
LWLockAcquire (dtm -> hashLock , LW_EXCLUSIVE );
298
- hash_search (xid_in_doubt , & DtmCurrentXid , HASH_ENTER , NULL );
300
+ hash_search (xid_in_doubt , & DtmNextXid , HASH_ENTER , NULL );
299
301
LWLockRelease (dtm -> hashLock );
300
302
if (!DtmGlobalSetTransStatus (xid , status , true)) {
301
303
elog (ERROR , "DTMD failed to set transaction status" );
@@ -358,10 +360,16 @@ static void DtmInitialize()
358
360
static void
359
361
DtmXactCallback (XactEvent event , void * arg )
360
362
{
361
- if (event == XACT_EVENT_COMMIT && DtmCurrentXid != InvalidTransactionId ) {
362
- LWLockAcquire (dtm -> hashLock , LW_EXCLUSIVE );
363
- hash_search (xid_in_doubt , & DtmCurrentXid , HASH_REMOVE , NULL );
364
- LWLockRelease (dtm -> hashLock );
363
+ if (TransactionIdIsValid (DtmNextXid )) {
364
+ switch (event ) {
365
+ case XACT_EVENT_COMMIT :
366
+ LWLockAcquire (dtm -> hashLock , LW_EXCLUSIVE );
367
+ hash_search (xid_in_doubt , & DtmNextXid , HASH_REMOVE , NULL );
368
+ LWLockRelease (dtm -> hashLock );
369
+ /* no break */
370
+ case XACT_EVENT_ABORT :
371
+ DtmNextXid = InvalidTransactionId ;
372
+ }
365
373
}
366
374
}
367
375
@@ -437,8 +445,7 @@ static void dtm_shmem_startup(void)
437
445
PG_MODULE_MAGIC ;
438
446
439
447
PG_FUNCTION_INFO_V1 (dtm_begin_transaction );
440
- PG_FUNCTION_INFO_V1 (dtm_get_snapshot );
441
- PG_FUNCTION_INFO_V1 (dtm_new_snapshot );
448
+ PG_FUNCTION_INFO_V1 (dtm_join_transaction );
442
449
PG_FUNCTION_INFO_V1 (dtm_get_current_snapshot_xmax );
443
450
PG_FUNCTION_INFO_V1 (dtm_get_current_snapshot_xmin );
444
451
@@ -464,32 +471,22 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
464
471
DtmNextXid = DtmGlobalStartTransaction (nParticipants , & DtmSnapshot );
465
472
Assert (TransactionIdIsValid (DtmNextXid ));
466
473
467
- DtmIsGlobalTransaction = true;
474
+ DtmHasGlobalSnapshot = true;
468
475
469
476
PG_RETURN_INT32 (DtmNextXid );
470
477
}
471
478
472
- Datum dtm_get_snapshot (PG_FUNCTION_ARGS )
479
+ Datum dtm_join_transaction (PG_FUNCTION_ARGS )
473
480
{
474
481
Assert (!TransactionIdIsValid (DtmNextXid ));
475
482
DtmNextXid = PG_GETARG_INT32 (0 );
476
483
Assert (TransactionIdIsValid (DtmNextXid ));
477
484
478
- DtmNextXid = DtmGlobalGetSnapshot (DtmConn , DtmNextXid , & DtmSnapshot );
485
+ DtmGlobalGetSnapshot (DtmNextXid , & DtmSnapshot );
479
486
480
- DtmIsGlobalTransaction = true;
487
+ DtmHasGlobalSnapshot = true;
481
488
482
489
PG_RETURN_VOID ();
483
490
}
484
491
485
- Datum dtm_new_snapshot (PG_FUNCTION_ARGS )
486
- {
487
- Assert (!TransactionIdIsValid (DtmNextXid ));
488
- DtmNextXid = PG_GETARG_INT32 (0 );
489
- Assert (TransactionIdIsValid (DtmNextXid ));
490
-
491
- DtmNextXid = DtmGlobalNewSnapshot (DtmConn , DtmNextXid , & DtmSnapshot );
492
492
493
- PG_RETURN_VOID ();
494
- }
495
-
0 commit comments