1
- /*
1
+ sisva /*
2
2
* pg_dtm.c
3
3
*
4
4
* Pluggable distributed transaction manager
36
36
37
37
typedef struct
38
38
{
39
- LWLockId lock ; /* protect access to hash table */
39
+ LWLockId hashLock ;
40
+ LWLockId xidLock ;
41
+ TransactionId nextXid ;
42
+ size_t nReservedXids ;
40
43
} DtmState ;
41
44
42
45
@@ -48,15 +51,15 @@ typedef struct
48
51
void _PG_init (void );
49
52
void _PG_fini (void );
50
53
51
- static void DtmEnsureConnection (void );
52
54
static Snapshot DtmGetSnapshot (void );
53
55
static void DtmMergeSnapshots (Snapshot dst , Snapshot src );
54
56
static Snapshot DtmCopySnapshot (Snapshot snapshot );
55
57
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn );
56
58
static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn );
57
59
static void DtmUpdateRecentXmin (void );
58
- static void DtmInitialize ();
60
+ static void DtmInitialize (void );
59
61
static void DtmXactCallback (XactEvent event , void * arg );
62
+ static void DtmGetNextXid (void );
60
63
61
64
static bool TransactionIdIsInDtmSnapshot (TransactionId xid );
62
65
static bool TransactionIdIsInDoubt (TransactionId xid );
@@ -69,33 +72,18 @@ static DtmState* dtm;
69
72
static TransactionId DtmCurrentXid = InvalidTransactionId ;
70
73
static Snapshot CurrentTransactionSnapshot ;
71
74
72
- static NodeId DtmNodeId ;
73
- static DTMConn DtmConn ;
75
+ static TransactionId DtmNextXid ;
74
76
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
75
77
static SnapshotData DtmLocalSnapshot = { HeapTupleSatisfiesMVCC };
76
- static bool DtmGlobalTransaction = false;
77
- static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmCopySnapshot };
78
- static DTMConn DtmConn ;
78
+ static bool DtmIsGlobalTransaction = false;
79
+ static int DtmLocalXidReserve ;
80
+ static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmCopySnapshot , DtmGetNextXid };
81
+
79
82
80
83
#define XTM_TRACE (fmt , ...)
81
84
#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
82
85
//#define XTM_INFO(fmt, ...)
83
86
84
- static void DtmEnsureConnection (void )
85
- {
86
- int attempt = 0 ;
87
- XTM_TRACE ("XTM: DtmEnsureConnection\n" );
88
- while (attempt < XTM_CONNECT_ATTEMPTS ) {
89
- if (DtmConn ) {
90
- return ;
91
- }
92
- XTM_TRACE ("XTM: DtmEnsureConnection, attempt #%u\n" , attempt );
93
- DtmConn = DtmConnect ("127.0.0.1" , 5431 );
94
- attempt ++ ;
95
- }
96
- elog (ERROR , "Failed to connect to DTMD" );
97
- }
98
-
99
87
static void DumpSnapshot (Snapshot s , char * name )
100
88
{
101
89
int i ;
@@ -247,11 +235,37 @@ static Snapshot DtmCopySnapshot(Snapshot snapshot)
247
235
return newsnap ;
248
236
}
249
237
238
+ static TransactionId DtmGetNextXid ()
239
+ {
240
+ TransactionId xid ;
241
+ if (TransactionIdIsValid (DtmNextXid )) {
242
+ xid = DtmNextXid ;
243
+ DtmNextXid = InvalidTransactionId ;
244
+ } else {
245
+ LWLockAcquire (dtm -> xidLock , LW_EXCLUSIVE );
246
+ if (dtm -> nReservedXids == 0 ) {
247
+ xid = DtmGlobalReserve (DtmLocalXidReserve );
248
+ dtm -> nReservedXids = DtmLocalXidReserve ;
249
+ ShmemVariableCache -> nextXid = xid ;
250
+ }
251
+ Assert (dtm -> nextXid == ShmemVariableCache -> nextXid );
252
+ xid = ShmemVariableCache -> nextXid ;
253
+ dtm -> nextXid += 1 ;
254
+ dtm -> nReservedXids -= 1 ;
255
+ LWLockRelease (dtm -> xidLock );
256
+ }
257
+ return xid ;
258
+ }
250
259
251
260
static Snapshot DtmGetSnapshot ()
252
261
{
253
- CurrentTransactionSnapshot = GetLocalTransactionSnapshot ();
254
- return CurrentTransactionSnapshot ;
262
+ Snapshot snapshot = GetLocalTransactionSnapshot ();
263
+ if (DtmIsGlobalTransaction ) {
264
+ DtmMergeSnapshots (snapshot , & DtmSnapshot );
265
+ DtmUpdateRecentXmin ();
266
+ }
267
+ CurrentTransactionSnapshot = snapshot ;
268
+ return snapshot ;
255
269
}
256
270
257
271
@@ -274,18 +288,16 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
274
288
CurrentTransactionSnapshot = NULL ;
275
289
if (status == TRANSACTION_STATUS_ABORTED ) {
276
290
CLOGTransactionIdSetTreeStatus (xid , nsubxids , subxids , status , lsn );
277
- DtmEnsureConnection ();
278
- DtmGlobalSetTransStatus (DtmConn , DtmNodeId , xid , status , false);
291
+ DtmGlobalSetTransStatus (xid , status , false);
279
292
XTM_INFO ("Abort transaction %d\n" , xid );
280
293
return ;
281
294
} else {
282
- DtmEnsureConnection ();
283
295
XTM_INFO ("Begin commit transaction %d\n" , xid );
284
296
DtmCurrentXid = xid ;
285
- LWLockAcquire (dtm -> lock , LW_EXCLUSIVE );
297
+ LWLockAcquire (dtm -> hashLock , LW_EXCLUSIVE );
286
298
hash_search (xid_in_doubt , & DtmCurrentXid , HASH_ENTER , NULL );
287
- LWLockRelease (dtm -> lock );
288
- if (!DtmGlobalSetTransStatus (DtmConn , DtmNodeId , xid , status , true)) {
299
+ LWLockRelease (dtm -> hashLock );
300
+ if (!DtmGlobalSetTransStatus (xid , status , true)) {
289
301
elog (ERROR , "DTMD failed to set transaction status" );
290
302
}
291
303
XTM_INFO ("Commit transaction %d\n" , xid );
@@ -295,8 +307,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
295
307
}
296
308
} else {
297
309
XidStatus gs ;
298
- DtmEnsureConnection ();
299
- gs = DtmGlobalGetTransStatus (DtmConn , DtmNodeId , xid , false);
310
+ gs = DtmGlobalGetTransStatus (xid , false);
300
311
if (gs != TRANSACTION_STATUS_UNKNOWN ) {
301
312
status = gs ;
302
313
}
@@ -324,7 +335,9 @@ static void DtmInitialize()
324
335
dtm = ShmemInitStruct ("dtm" , sizeof (DtmState ), & found );
325
336
if (!found )
326
337
{
327
- dtm -> lock = LWLockAssign ();
338
+ dtm -> hashLock = LWLockAssign ();
339
+ dtm -> xidLock = LWLockAssign ();
340
+ dtm -> nReservedXids = 0 ;
328
341
}
329
342
LWLockRelease (AddinShmemInitLock );
330
343
@@ -346,9 +359,9 @@ static void
346
359
DtmXactCallback (XactEvent event , void * arg )
347
360
{
348
361
if (event == XACT_EVENT_COMMIT && DtmCurrentXid != InvalidTransactionId ) {
349
- LWLockAcquire (dtm -> lock , LW_EXCLUSIVE );
362
+ LWLockAcquire (dtm -> hashLock , LW_EXCLUSIVE );
350
363
hash_search (xid_in_doubt , & DtmCurrentXid , HASH_REMOVE , NULL );
351
- LWLockRelease (dtm -> lock );
364
+ LWLockRelease (dtm -> hashLock );
352
365
}
353
366
}
354
367
@@ -377,14 +390,14 @@ _PG_init(void)
377
390
* resources in imcs_shmem_startup().
378
391
*/
379
392
RequestAddinShmemSpace (DTM_SHMEM_SIZE );
380
- RequestAddinLWLocks (1 );
393
+ RequestAddinLWLocks (2 );
381
394
382
- DefineCustomIntVariable ("dtm.node_id " ,
383
- "Identifier of node in distributed cluster for DTM " ,
395
+ DefineCustomIntVariable ("dtm.local_xid_reserve " ,
396
+ "Number of XIDs reserved by node for local transactions " ,
384
397
NULL ,
385
- & DtmNodeId ,
386
- 0 ,
387
- 0 ,
398
+ & DtmLocalXidReserve ,
399
+ 100 ,
400
+ 1 ,
388
401
INT_MAX ,
389
402
PGC_BACKEND ,
390
403
0 ,
@@ -424,41 +437,59 @@ static void dtm_shmem_startup(void)
424
437
PG_MODULE_MAGIC ;
425
438
426
439
PG_FUNCTION_INFO_V1 (dtm_begin_transaction );
440
+ PG_FUNCTION_INFO_V1 (dtm_get_snapshot );
441
+ PG_FUNCTION_INFO_V1 (dtm_new_snapshot );
427
442
PG_FUNCTION_INFO_V1 (dtm_get_current_snapshot_xmax );
428
443
PG_FUNCTION_INFO_V1 (dtm_get_current_snapshot_xmin );
429
444
430
445
Datum
431
- dtm_begin_transaction (PG_FUNCTION_ARGS )
446
+ dtm_get_current_snapshot_xmin (PG_FUNCTION_ARGS )
432
447
{
433
- GlobalTransactionId gtid ;
434
- ArrayType * nodes = PG_GETARG_ARRAYTYPE_P (0 );
435
- ArrayType * xids = PG_GETARG_ARRAYTYPE_P (1 );
436
- gtid .xids = (TransactionId * )ARR_DATA_PTR (xids );
437
- gtid .nodes = (NodeId * )ARR_DATA_PTR (nodes );
438
- gtid .nNodes = ArrayGetNItems (ARR_NDIM (nodes ), ARR_DIMS (nodes ));
439
- DtmGlobalTransaction = true;
440
- Assert (gtid .xids [DtmNodeId ] == GetCurrentTransactionId ());
441
- XTM_INFO ("Start transaction {%d,%d} at node %d\n" , gtid .xids [0 ], gtid .xids [1 ], DtmNodeId );
442
- if (DtmNodeId == gtid .nodes [0 ]) {
443
- DtmEnsureConnection ();
444
- DtmGlobalStartTransaction (DtmConn , & gtid );
445
- }
446
- DtmEnsureConnection ();
447
- DtmGlobalGetSnapshot (DtmConn , DtmNodeId , gtid .xids [DtmNodeId ], & DtmSnapshot );
448
- Assert (CurrentTransactionSnapshot != NULL );
449
- DtmMergeSnapshots (CurrentTransactionSnapshot , & DtmSnapshot );
450
- DtmUpdateRecentXmin ();
451
- PG_RETURN_VOID ();
448
+ PG_RETURN_INT32 (CurrentTransactionSnapshot -> xmin );
452
449
}
453
450
454
451
Datum
455
- dtm_get_current_snapshot_xmin (PG_FUNCTION_ARGS )
452
+ dtm_get_current_snapshot_xmax (PG_FUNCTION_ARGS )
456
453
{
457
- PG_RETURN_INT64 (CurrentTransactionSnapshot -> xmin );
454
+ PG_RETURN_INT32 (CurrentTransactionSnapshot -> xmax );
458
455
}
459
456
457
+
460
458
Datum
461
- dtm_get_current_snapshot_xmax (PG_FUNCTION_ARGS )
459
+ dtm_begin_transaction (PG_FUNCTION_ARGS )
460
+ {
461
+ int nPaticipants = PG_GETARG_INT32 (0 );
462
+ Assert (!TransactionIdIsValid (DtmNextXid ));
463
+
464
+ DtmNextXid = DtmGlobalStartTransaction (nParticipants , & DtmSnapshot );
465
+ Assert (TransactionIdIsValid (DtmNextXid ));
466
+
467
+ DtmIsGlobalTransaction = true;
468
+
469
+ PG_RETURN_INT32 (DtmNextXid );
470
+ }
471
+
472
+ Datum dtm_get_snapshot (PG_FUNCTION_ARGS )
473
+ {
474
+ Assert (!TransactionIdIsValid (DtmNextXid ));
475
+ DtmNextXid = PG_GETARG_INT32 (0 );
476
+ Assert (TransactionIdIsValid (DtmNextXid ));
477
+
478
+ DtmNextXid = DtmGlobalGetSnapshot (DtmConn , DtmNextXid , & DtmSnapshot );
479
+
480
+ DtmIsGlobalTransaction = true;
481
+
482
+ PG_RETURN_VOID ();
483
+ }
484
+
485
+ Datum dtm_new_snapshot (PG_FUNCTION_ARGS )
462
486
{
463
- PG_RETURN_INT64 (CurrentTransactionSnapshot -> xmax );
487
+ Assert (!TransactionIdIsValid (DtmNextXid ));
488
+ DtmNextXid = PG_GETARG_INT32 (0 );
489
+ Assert (TransactionIdIsValid (DtmNextXid ));
490
+
491
+ DtmNextXid = DtmGlobalNewSnapshot (DtmConn , DtmNextXid , & DtmSnapshot );
492
+
493
+ PG_RETURN_VOID ();
464
494
}
495
+
0 commit comments