1
- /*
1
+ sisva /*
2
2
* pg_dtm.c
3
3
*
4
4
* Pluggable distributed transaction manager
36
36
37
37
typedef struct
38
38
{
39
- LWLockId lock ; /* protect access to hash table */
39
+ LWLockId hashLock ;
40
+ LWLockId xidLock ;
41
+ TransactionId nextXid ;
42
+ size_t nReservedXids ;
40
43
} DtmState ;
41
44
42
45
@@ -48,15 +51,15 @@ typedef struct
48
51
void _PG_init (void );
49
52
void _PG_fini (void );
50
53
51
- static void DtmEnsureConnection (void );
52
54
static Snapshot DtmGetSnapshot (void );
53
55
static void DtmMergeSnapshots (Snapshot dst , Snapshot src );
54
56
static Snapshot DtmCopySnapshot (Snapshot snapshot );
55
57
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn );
56
58
static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn );
57
59
static void DtmUpdateRecentXmin (void );
58
- static void DtmInitialize ();
60
+ static void DtmInitialize (void );
59
61
static void DtmXactCallback (XactEvent event , void * arg );
62
+ static void DtmGetNextXid (void );
60
63
61
64
static bool TransactionIdIsInDtmSnapshot (TransactionId xid );
62
65
static bool TransactionIdIsInDoubt (TransactionId xid );
@@ -69,33 +72,18 @@ static DtmState* dtm;
69
72
static TransactionId DtmCurrentXid = InvalidTransactionId ;
70
73
static Snapshot CurrentTransactionSnapshot ;
71
74
72
- static NodeId DtmNodeId ;
73
- static DTMConn DtmConn ;
75
+ static TransactionId DtmNextXid ;
74
76
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
75
77
static SnapshotData DtmLocalSnapshot = { HeapTupleSatisfiesMVCC };
76
- static bool DtmGlobalTransaction = false;
77
- static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmCopySnapshot };
78
- static DTMConn DtmConn ;
78
+ static bool DtmIsGlobalTransaction = false;
79
+ static int DtmLocalXidReserve ;
80
+ static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmCopySnapshot , DtmGetNextXid };
81
+
79
82
80
83
#define XTM_TRACE (fmt , ...)
81
84
#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
82
85
//#define XTM_INFO(fmt, ...)
83
86
84
- static void DtmEnsureConnection (void )
85
- {
86
- int attempt = 0 ;
87
- XTM_TRACE ("XTM: DtmEnsureConnection\n" );
88
- while (attempt < XTM_CONNECT_ATTEMPTS ) {
89
- if (DtmConn ) {
90
- return ;
91
- }
92
- XTM_TRACE ("XTM: DtmEnsureConnection, attempt #%u\n" , attempt );
93
- DtmConn = DtmConnect ("127.0.0.1" , 5431 );
94
- attempt ++ ;
95
- }
96
- elog (ERROR , "Failed to connect to DTMD" );
97
- }
98
-
99
87
static void DumpSnapshot (Snapshot s , char * name )
100
88
{
101
89
int i ;
@@ -247,17 +235,43 @@ static Snapshot DtmCopySnapshot(Snapshot snapshot)
247
235
return newsnap ;
248
236
}
249
237
238
+ static TransactionId DtmGetNextXid ()
239
+ {
240
+ TransactionId xid ;
241
+ if (TransactionIdIsValid (DtmNextXid )) {
242
+ xid = DtmNextXid ;
243
+ DtmNextXid = InvalidTransactionId ;
244
+ } else {
245
+ LWLockAcquire (dtm -> xidLock , LW_EXCLUSIVE );
246
+ if (dtm -> nReservedXids == 0 ) {
247
+ xid = DtmGlobalReserve (DtmLocalXidReserve );
248
+ dtm -> nReservedXids = DtmLocalXidReserve ;
249
+ ShmemVariableCache -> nextXid = xid ;
250
+ }
251
+ Assert (dtm -> nextXid == ShmemVariableCache -> nextXid );
252
+ xid = ShmemVariableCache -> nextXid ;
253
+ dtm -> nextXid += 1 ;
254
+ dtm -> nReservedXids -= 1 ;
255
+ LWLockRelease (dtm -> xidLock );
256
+ }
257
+ return xid ;
258
+ }
250
259
251
260
static Snapshot DtmGetSnapshot ()
252
261
{
253
- CurrentTransactionSnapshot = GetLocalTransactionSnapshot ();
254
- if (DtmGlobalTransaction && !IsolationUsesXactSnapshot ()){ /* RU & RC */
255
- DtmEnsureConnection ();
256
- DtmGlobalGetSnapshot (DtmConn , DtmNodeId , GetCurrentTransactionId (), & DtmSnapshot );
257
- DtmMergeSnapshots (CurrentTransactionSnapshot , & DtmSnapshot );
262
+ Snapshot snapshot = GetLocalTransactionSnapshot ();
263
+
264
+ if (DtmIsGlobalTransaction ) {
265
+ if (!IsolationUsesXactSnapshot ()){ /* RU & RC */
266
+ DtmEnsureConnection ();
267
+ DtmGlobalGetSnapshot (DtmConn , DtmNodeId , GetCurrentTransactionId (), & DtmSnapshot );
268
+ }
269
+ /* Should we actually perform that for RR & S levels? */
270
+ DtmMergeSnapshots (snapshot , & DtmSnapshot );
258
271
DtmUpdateRecentXmin ();
259
272
}
260
- return CurrentTransactionSnapshot ;
273
+ CurrentTransactionSnapshot = snapshot ;
274
+ return snapshot ;
261
275
}
262
276
263
277
@@ -280,18 +294,16 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
280
294
CurrentTransactionSnapshot = NULL ;
281
295
if (status == TRANSACTION_STATUS_ABORTED ) {
282
296
CLOGTransactionIdSetTreeStatus (xid , nsubxids , subxids , status , lsn );
283
- DtmEnsureConnection ();
284
- DtmGlobalSetTransStatus (DtmConn , DtmNodeId , xid , status , false);
297
+ DtmGlobalSetTransStatus (xid , status , false);
285
298
XTM_INFO ("Abort transaction %d\n" , xid );
286
299
return ;
287
300
} else {
288
- DtmEnsureConnection ();
289
301
XTM_INFO ("Begin commit transaction %d\n" , xid );
290
302
DtmCurrentXid = xid ;
291
- LWLockAcquire (dtm -> lock , LW_EXCLUSIVE );
303
+ LWLockAcquire (dtm -> hashLock , LW_EXCLUSIVE );
292
304
hash_search (xid_in_doubt , & DtmCurrentXid , HASH_ENTER , NULL );
293
- LWLockRelease (dtm -> lock );
294
- if (!DtmGlobalSetTransStatus (DtmConn , DtmNodeId , xid , status , true)) {
305
+ LWLockRelease (dtm -> hashLock );
306
+ if (!DtmGlobalSetTransStatus (xid , status , true)) {
295
307
elog (ERROR , "DTMD failed to set transaction status" );
296
308
}
297
309
XTM_INFO ("Commit transaction %d\n" , xid );
@@ -301,8 +313,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
301
313
}
302
314
} else {
303
315
XidStatus gs ;
304
- DtmEnsureConnection ();
305
- gs = DtmGlobalGetTransStatus (DtmConn , DtmNodeId , xid , false);
316
+ gs = DtmGlobalGetTransStatus (xid , false);
306
317
if (gs != TRANSACTION_STATUS_UNKNOWN ) {
307
318
status = gs ;
308
319
}
@@ -330,7 +341,9 @@ static void DtmInitialize()
330
341
dtm = ShmemInitStruct ("dtm" , sizeof (DtmState ), & found );
331
342
if (!found )
332
343
{
333
- dtm -> lock = LWLockAssign ();
344
+ dtm -> hashLock = LWLockAssign ();
345
+ dtm -> xidLock = LWLockAssign ();
346
+ dtm -> nReservedXids = 0 ;
334
347
}
335
348
LWLockRelease (AddinShmemInitLock );
336
349
@@ -352,9 +365,9 @@ static void
352
365
DtmXactCallback (XactEvent event , void * arg )
353
366
{
354
367
if (event == XACT_EVENT_COMMIT && DtmCurrentXid != InvalidTransactionId ) {
355
- LWLockAcquire (dtm -> lock , LW_EXCLUSIVE );
368
+ LWLockAcquire (dtm -> hashLock , LW_EXCLUSIVE );
356
369
hash_search (xid_in_doubt , & DtmCurrentXid , HASH_REMOVE , NULL );
357
- LWLockRelease (dtm -> lock );
370
+ LWLockRelease (dtm -> hashLock );
358
371
}
359
372
}
360
373
@@ -383,14 +396,14 @@ _PG_init(void)
383
396
* resources in imcs_shmem_startup().
384
397
*/
385
398
RequestAddinShmemSpace (DTM_SHMEM_SIZE );
386
- RequestAddinLWLocks (1 );
399
+ RequestAddinLWLocks (2 );
387
400
388
- DefineCustomIntVariable ("dtm.node_id " ,
389
- "Identifier of node in distributed cluster for DTM " ,
401
+ DefineCustomIntVariable ("dtm.local_xid_reserve " ,
402
+ "Number of XIDs reserved by node for local transactions " ,
390
403
NULL ,
391
- & DtmNodeId ,
392
- 0 ,
393
- 0 ,
404
+ & DtmLocalXidReserve ,
405
+ 100 ,
406
+ 1 ,
394
407
INT_MAX ,
395
408
PGC_BACKEND ,
396
409
0 ,
@@ -430,45 +443,66 @@ static void dtm_shmem_startup(void)
430
443
PG_MODULE_MAGIC ;
431
444
432
445
PG_FUNCTION_INFO_V1 (dtm_begin_transaction );
446
+ PG_FUNCTION_INFO_V1 (dtm_get_snapshot );
447
+ PG_FUNCTION_INFO_V1 (dtm_new_snapshot );
433
448
PG_FUNCTION_INFO_V1 (dtm_get_current_snapshot_xmax );
434
449
PG_FUNCTION_INFO_V1 (dtm_get_current_snapshot_xmin );
435
450
436
451
Datum
437
- dtm_begin_transaction (PG_FUNCTION_ARGS )
452
+ dtm_get_current_snapshot_xmin (PG_FUNCTION_ARGS )
438
453
{
439
- GlobalTransactionId gtid ;
440
- ArrayType * nodes = PG_GETARG_ARRAYTYPE_P (0 );
441
- ArrayType * xids = PG_GETARG_ARRAYTYPE_P (1 );
442
- gtid .xids = (TransactionId * )ARR_DATA_PTR (xids );
443
- gtid .nodes = (NodeId * )ARR_DATA_PTR (nodes );
444
- gtid .nNodes = ArrayGetNItems (ARR_NDIM (nodes ), ARR_DIMS (nodes ));
445
- DtmGlobalTransaction = true;
446
- Assert (gtid .xids [DtmNodeId ] == GetCurrentTransactionId ());
447
- XTM_INFO ("Start transaction {%d,%d} at node %d, iso=%d\n" , gtid .xids [0 ], gtid .xids [1 ], DtmNodeId , XactIsoLevel );
448
- if (DtmNodeId == gtid .nodes [0 ]) {
449
- DtmEnsureConnection ();
450
- DtmGlobalStartTransaction (DtmConn , & gtid );
451
- }
452
-
453
- if (IsolationUsesXactSnapshot ()){ /* RR & S */
454
- DtmEnsureConnection ();
455
- DtmGlobalGetSnapshot (DtmConn , DtmNodeId , gtid .xids [DtmNodeId ], & DtmSnapshot );
456
- Assert (CurrentTransactionSnapshot != NULL );
457
- DtmMergeSnapshots (CurrentTransactionSnapshot , & DtmSnapshot );
458
- DtmUpdateRecentXmin ();
459
- }
460
-
461
- PG_RETURN_VOID ();
454
+ // if (IsolationUsesXactSnapshot()){ /* RR & S */
455
+ // DtmEnsureConnection();
456
+ // DtmGlobalGetSnapshot(DtmConn, DtmNodeId, gtid.xids[DtmNodeId], &DtmSnapshot);
457
+ // Assert(CurrentTransactionSnapshot != NULL);
458
+ // DtmMergeSnapshots(CurrentTransactionSnapshot, &DtmSnapshot);
459
+ // DtmUpdateRecentXmin();
460
+ // }
461
+ PG_RETURN_INT32 (CurrentTransactionSnapshot -> xmin );
462
462
}
463
463
464
464
Datum
465
- dtm_get_current_snapshot_xmin (PG_FUNCTION_ARGS )
465
+ dtm_get_current_snapshot_xmax (PG_FUNCTION_ARGS )
466
466
{
467
- PG_RETURN_INT64 (CurrentTransactionSnapshot -> xmin );
467
+ PG_RETURN_INT32 (CurrentTransactionSnapshot -> xmax );
468
468
}
469
469
470
+
470
471
Datum
471
- dtm_get_current_snapshot_xmax (PG_FUNCTION_ARGS )
472
+ dtm_begin_transaction (PG_FUNCTION_ARGS )
473
+ {
474
+ int nPaticipants = PG_GETARG_INT32 (0 );
475
+ Assert (!TransactionIdIsValid (DtmNextXid ));
476
+
477
+ DtmNextXid = DtmGlobalStartTransaction (nParticipants , & DtmSnapshot );
478
+ Assert (TransactionIdIsValid (DtmNextXid ));
479
+
480
+ DtmIsGlobalTransaction = true;
481
+
482
+ PG_RETURN_INT32 (DtmNextXid );
483
+ }
484
+
485
+ Datum dtm_get_snapshot (PG_FUNCTION_ARGS )
472
486
{
473
- PG_RETURN_INT64 (CurrentTransactionSnapshot -> xmax );
487
+ Assert (!TransactionIdIsValid (DtmNextXid ));
488
+ DtmNextXid = PG_GETARG_INT32 (0 );
489
+ Assert (TransactionIdIsValid (DtmNextXid ));
490
+
491
+ DtmNextXid = DtmGlobalGetSnapshot (DtmConn , DtmNextXid , & DtmSnapshot );
492
+
493
+ DtmIsGlobalTransaction = true;
494
+
495
+ PG_RETURN_VOID ();
474
496
}
497
+
498
+ Datum dtm_new_snapshot (PG_FUNCTION_ARGS )
499
+ {
500
+ Assert (!TransactionIdIsValid (DtmNextXid ));
501
+ DtmNextXid = PG_GETARG_INT32 (0 );
502
+ Assert (TransactionIdIsValid (DtmNextXid ));
503
+
504
+ DtmNextXid = DtmGlobalNewSnapshot (DtmConn , DtmNextXid , & DtmSnapshot );
505
+
506
+ PG_RETURN_VOID ();
507
+ }
508
+
0 commit comments