33
33
34
34
#include "libdtm.h"
35
35
36
- #define MIN_DELAY 10000
37
- #define MAX_DELAY 100000
36
+ typedef struct
37
+ {
38
+ LWLockId lock ; /* protect access to hash table */
39
+ } DtmState ;
40
+
41
+
42
+ #define DTM_SHMEM_SIZE (1024*1024)
43
+ #define DTM_HASH_SIZE 1003
38
44
39
45
void _PG_init (void );
40
46
void _PG_fini (void );
@@ -45,9 +51,19 @@ static void DtmCopySnapshot(Snapshot dst, Snapshot src);
45
51
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn );
46
52
static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn );
47
53
static void DtmUpdateRecentXmin (void );
54
+ static void DtmInitialize ();
55
+ static void DtmXactCallback (XactEvent event , void * arg );
56
+
48
57
static bool TransactionIdIsInDtmSnapshot (Snapshot s , TransactionId xid );
49
58
static bool TransactionIdIsInDoubt (Snapshot s , TransactionId xid );
50
59
60
+ static void dtm_shmem_startup (void );
61
+
62
+ static shmem_startup_hook_type prev_shmem_startup_hook ;
63
+ static HTAB * xid_in_doubt ;
64
+ static DtmState * dtm ;
65
+ static TransactionId DtmCurrentXid = InvalidTransactionId ;
66
+
51
67
static NodeId DtmNodeId ;
52
68
static DTMConn DtmConn ;
53
69
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
@@ -105,10 +121,13 @@ static bool TransactionIdIsInDtmSnapshot(Snapshot s, TransactionId xid)
105
121
106
122
static bool TransactionIdIsInDoubt (Snapshot s , TransactionId xid )
107
123
{
124
+ bool inDoubt ;
125
+
108
126
if (!TransactionIdIsInDtmSnapshot (s , xid )) {
109
- XLogRecPtr lsn ;
110
- XidStatus status = CLOGTransactionIdGetStatus (xid , & lsn );
111
- if (status != TRANSACTION_STATUS_IN_PROGRESS ) {
127
+ LWLockAcquire (dtm -> lock , LW_SHARED );
128
+ inDoubt = hash_search (xid_in_doubt , & xid , HASH_FIND , NULL ) != NULL ;
129
+ LWLockRelease (dtm -> lock );
130
+ if (inDoubt ) {
112
131
XTM_INFO ("Wait for transaction %d to complete\n" , xid );
113
132
XactLockTableWait (xid , NULL , NULL , XLTW_None );
114
133
return true;
@@ -191,7 +210,7 @@ static void DtmUpdateRecentXmin(void)
191
210
static Snapshot DtmGetSnapshot (Snapshot snapshot )
192
211
{
193
212
XTM_TRACE ("XTM: DtmGetSnapshot \n" );
194
- if (DtmGlobalTransaction && !DtmHasSnapshot ) {
213
+ if (DtmGlobalTransaction /* && !DtmHasSnapshot*/ ) {
195
214
DtmHasSnapshot = true;
196
215
DtmEnsureConnection ();
197
216
DtmGlobalGetSnapshot (DtmConn , DtmNodeId , GetCurrentTransactionId (), & DtmSnapshot );
@@ -224,7 +243,12 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
224
243
DtmGlobalTransaction = false;
225
244
DtmEnsureConnection ();
226
245
XTM_INFO ("Begin commit transaction %d\n" , xid );
227
- CLOGTransactionIdSetTreeStatus (xid , nsubxids , subxids , TRANSACTION_STATUS_COMMITTED , lsn );
246
+
247
+ DtmCurrentXid = xid ;
248
+ LWLockAcquire (dtm -> lock , LW_EXCLUSIVE );
249
+ hash_search (xid_in_doubt , & DtmCurrentXid , HASH_ENTER , NULL );
250
+ LWLockRelease (dtm -> lock );
251
+
228
252
if (!DtmGlobalSetTransStatus (DtmConn , DtmNodeId , xid , status , true) && status != TRANSACTION_STATUS_ABORTED ) {
229
253
elog (ERROR , "DTMD failed to set transaction status" );
230
254
}
@@ -243,15 +267,80 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
243
267
CLOGTransactionIdSetTreeStatus (xid , nsubxids , subxids , status , lsn );
244
268
}
245
269
270
+ static uint32 dtm_xid_hash_fn (const void * key , Size keysize )
271
+ {
272
+ return (uint32 )* (TransactionId * )key ;
273
+ }
274
+
275
+ static int dtm_xid_match_fn (const void * key1 , const void * key2 , Size keysize )
276
+ {
277
+ return * (TransactionId * )key1 - * (TransactionId * )key2 ;
278
+ }
279
+
280
+
281
+ static void DtmInitialize ()
282
+ {
283
+ bool found ;
284
+ static HASHCTL info ;
285
+
286
+ LWLockAcquire (AddinShmemInitLock , LW_EXCLUSIVE );
287
+ dtm = ShmemInitStruct ("dtm" , sizeof (DtmState ), & found );
288
+ if (!found )
289
+ {
290
+ dtm -> lock = LWLockAssign ();
291
+ }
292
+ LWLockRelease (AddinShmemInitLock );
293
+
294
+ info .keysize = sizeof (TransactionId );
295
+ info .entrysize = sizeof (TransactionId );
296
+ info .hash = dtm_xid_hash_fn ;
297
+ info .match = dtm_xid_match_fn ;
298
+ xid_in_doubt = ShmemInitHash ("xid_in_doubt" , DTM_HASH_SIZE , DTM_HASH_SIZE ,
299
+ & info ,
300
+ HASH_ELEM | HASH_FUNCTION | HASH_COMPARE );
301
+
302
+ RegisterXactCallback (DtmXactCallback , NULL );
303
+
304
+ TM = & DtmTM ;
305
+ }
306
+
307
+ static void
308
+ DtmXactCallback (XactEvent event , void * arg )
309
+ {
310
+ if (event == XACT_EVENT_COMMIT && DtmCurrentXid != InvalidTransactionId ) {
311
+ LWLockAcquire (dtm -> lock , LW_EXCLUSIVE );
312
+ hash_search (xid_in_doubt , & DtmCurrentXid , HASH_REMOVE , NULL );
313
+ LWLockRelease (dtm -> lock );
314
+ }
315
+ }
316
+
317
+
246
318
/*
247
319
* ***************************************************************************
248
320
*/
249
321
250
322
void
251
323
_PG_init (void )
252
324
{
253
- TM = & DtmTM ;
254
-
325
+ /*
326
+ * In order to create our shared memory area, we have to be loaded via
327
+ * shared_preload_libraries. If not, fall out without hooking into any of
328
+ * the main system. (We don't throw error here because it seems useful to
329
+ * allow the cs_* functions to be created even when the
330
+ * module isn't active. The functions must protect themselves against
331
+ * being called then, however.)
332
+ */
333
+ if (!process_shared_preload_libraries_in_progress )
334
+ return ;
335
+
336
+ /*
337
+ * Request additional shared resources. (These are no-ops if we're not in
338
+ * the postmaster process.) We'll allocate or attach to the shared
339
+ * resources in imcs_shmem_startup().
340
+ */
341
+ RequestAddinShmemSpace (DTM_SHMEM_SIZE );
342
+ RequestAddinLWLocks (1 );
343
+
255
344
DefineCustomIntVariable ("dtm.node_id" ,
256
345
"Identifier of node in distributed cluster for DTM" ,
257
346
NULL ,
@@ -264,6 +353,12 @@ _PG_init(void)
264
353
NULL ,
265
354
NULL ,
266
355
NULL );
356
+
357
+ /*
358
+ * Install hooks.
359
+ */
360
+ prev_shmem_startup_hook = shmem_startup_hook ;
361
+ shmem_startup_hook = dtm_shmem_startup ;
267
362
}
268
363
269
364
/*
@@ -272,6 +367,16 @@ _PG_init(void)
272
367
void
273
368
_PG_fini (void )
274
369
{
370
+ shmem_startup_hook = prev_shmem_startup_hook ;
371
+ }
372
+
373
+
374
+ static void dtm_shmem_startup (void )
375
+ {
376
+ if (prev_shmem_startup_hook ) {
377
+ prev_shmem_startup_hook ();
378
+ }
379
+ DtmInitialize ();
275
380
}
276
381
277
382
/*
0 commit comments