30
30
#include "utils/tqual.h"
31
31
#include "utils/array.h"
32
32
#include "utils/builtins.h"
33
+ #include "utils/memutils.h"
33
34
34
35
#include "libdtm.h"
35
36
@@ -41,41 +42,44 @@ typedef struct
41
42
42
43
#define DTM_SHMEM_SIZE (1024*1024)
43
44
#define DTM_HASH_SIZE 1003
45
+ #define XTM_CONNECT_ATTEMPTS 10
46
+
44
47
45
48
void _PG_init (void );
46
49
void _PG_fini (void );
47
50
48
51
static void DtmEnsureConnection (void );
49
- static Snapshot DtmGetSnapshot (Snapshot snapshot );
50
- static void DtmCopySnapshot (Snapshot dst , Snapshot src );
52
+ static Snapshot DtmGetSnapshot (void );
53
+ static void DtmMergeSnapshots (Snapshot dst , Snapshot src );
54
+ static Snapshot DtmCopySnapshot (Snapshot snapshot );
51
55
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn );
52
56
static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn );
53
57
static void DtmUpdateRecentXmin (void );
54
58
static void DtmInitialize ();
55
59
static void DtmXactCallback (XactEvent event , void * arg );
56
60
57
- static bool TransactionIdIsInDtmSnapshot (Snapshot s , TransactionId xid );
58
- static bool TransactionIdIsInDoubt (Snapshot s , TransactionId xid );
61
+ static bool TransactionIdIsInDtmSnapshot (TransactionId xid );
62
+ static bool TransactionIdIsInDoubt (TransactionId xid );
59
63
60
64
static void dtm_shmem_startup (void );
61
65
62
66
static shmem_startup_hook_type prev_shmem_startup_hook ;
63
67
static HTAB * xid_in_doubt ;
64
68
static DtmState * dtm ;
65
69
static TransactionId DtmCurrentXid = InvalidTransactionId ;
70
+ static Snapshot CurrentTransactionSnapshot ;
66
71
67
72
static NodeId DtmNodeId ;
68
73
static DTMConn DtmConn ;
69
74
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
70
- static bool DtmHasSnapshot = false ;
75
+ static SnapshotData DtmLocalSnapshot = { HeapTupleSatisfiesMVCC } ;
71
76
static bool DtmGlobalTransaction = false;
72
- static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot };
77
+ static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmCopySnapshot };
73
78
static DTMConn DtmConn ;
74
79
75
80
#define XTM_TRACE (fmt , ...)
76
- //#define XTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
77
81
#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
78
- #define XTM_CONNECT_ATTEMPTS 10
82
+ // #define XTM_INFO(fmt, ...)
79
83
80
84
static void DtmEnsureConnection (void )
81
85
{
@@ -113,20 +117,24 @@ static void DumpSnapshot(Snapshot s, char *name)
113
117
XTM_INFO ("%s\n" , buf );
114
118
}
115
119
116
- static bool TransactionIdIsInDtmSnapshot (Snapshot s , TransactionId xid )
120
+ static bool TransactionIdIsInDtmSnapshot (TransactionId xid )
117
121
{
118
- return xid >= s -> xmax
119
- || bsearch (& xid , s -> xip , s -> xcnt , sizeof (TransactionId ), xidComparator ) != NULL ;
122
+ return xid >= DtmSnapshot . xmax
123
+ || bsearch (& xid , DtmSnapshot . xip , DtmSnapshot . xcnt , sizeof (TransactionId ), xidComparator ) != NULL ;
120
124
}
121
125
122
- static bool TransactionIdIsInDoubt (Snapshot s , TransactionId xid )
126
+ static bool TransactionIdIsInDoubt (TransactionId xid )
123
127
{
124
128
bool inDoubt ;
125
129
126
- if (!TransactionIdIsInDtmSnapshot (s , xid )) {
130
+ if (!TransactionIdIsInDtmSnapshot (xid )) {
127
131
LWLockAcquire (dtm -> lock , LW_SHARED );
128
132
inDoubt = hash_search (xid_in_doubt , & xid , HASH_FIND , NULL ) != NULL ;
129
133
LWLockRelease (dtm -> lock );
134
+ if (!inDoubt ) {
135
+ XLogRecPtr lsn ;
136
+ inDoubt = CLOGTransactionIdGetStatus (xid , & lsn ) != TRANSACTION_STATUS_IN_PROGRESS ;
137
+ }
130
138
if (inDoubt ) {
131
139
XTM_INFO ("Wait for transaction %d to complete\n" , xid );
132
140
XactLockTableWait (xid , NULL , NULL , XLTW_None );
@@ -136,50 +144,47 @@ static bool TransactionIdIsInDoubt(Snapshot s, TransactionId xid)
136
144
return false;
137
145
}
138
146
139
- static void DtmCopySnapshot (Snapshot dst , Snapshot src )
147
+ static void DtmMergeSnapshots (Snapshot dst , Snapshot src )
140
148
{
141
149
int i , j , n ;
142
- static TransactionId * buf ;
143
150
TransactionId xid ;
144
-
145
- if (buf == NULL ) {
146
- buf = (TransactionId * )malloc (GetMaxSnapshotXidCount () * sizeof (TransactionId ) * 2 );
147
- }
148
-
149
- DumpSnapshot (dst , "local" );
150
- DumpSnapshot (src , "DTM" );
151
+ Snapshot local ;
151
152
152
153
Assert (TransactionIdIsValid (src -> xmin ) && TransactionIdIsValid (src -> xmax ));
153
154
154
- /* Check that globall competed transactions are not included in local snapshot */
155
- RefreshLocalSnapshot :
156
- GetLocalSnapshotData (dst );
157
- for (i = 0 ; i < dst -> xcnt ; i ++ ) {
158
- if (TransactionIdIsInDoubt (src , dst -> xip [i ])) {
159
- goto RefreshLocalSnapshot ;
160
- }
155
+ GetLocalSnapshot :
156
+ local = GetSnapshotData (& DtmLocalSnapshot );
157
+ for (i = 0 ; i < local -> xcnt ; i ++ ) {
158
+ if (TransactionIdIsInDoubt (local -> xip [i ])) {
159
+ goto GetLocalSnapshot ;
160
+ }
161
161
}
162
- for (xid = dst -> xmax ; xid < src -> xmax ; xid ++ ) {
163
- if (TransactionIdIsInDoubt (src , xid )) {
164
- goto RefreshLocalSnapshot ;
162
+ for (xid = local -> xmax ; xid < src -> xmax ; xid ++ ) {
163
+ if (TransactionIdIsInDoubt (xid )) {
164
+ goto GetLocalSnapshot ;
165
165
}
166
166
}
167
+ DumpSnapshot (local , "local" );
168
+ DumpSnapshot (src , "DTM" );
167
169
168
170
/* Merge two snapshots: produce most restrictive snapshots whihc includes running transactions from both of them */
169
- if (dst -> xmin > src -> xmin ) {
170
- dst -> xmin = src -> xmin ;
171
- }
172
- if (dst -> xmax > src -> xmax ) {
173
- dst -> xmax = src -> xmax ;
171
+ dst -> xmin = local -> xmin < src -> xmin ? local -> xmin : src -> xmin ;
172
+ dst -> xmax = local -> xmax < src -> xmax ? local -> xmax : src -> xmax ;
173
+
174
+ n = local -> xcnt ;
175
+ for (xid = local -> xmax ; xid <= src -> xmin ; xid ++ ) {
176
+ local -> xip [n ++ ] = xid ;
174
177
}
178
+ memcpy (local -> xip + n , src -> xip , src -> xcnt * sizeof (TransactionId ));
179
+ n += src -> xcnt ;
180
+ Assert (n <= GetMaxSnapshotXidCount ());
175
181
176
- memcpy (buf , dst -> xip , dst -> xcnt * sizeof (TransactionId ));
177
- memcpy (buf + dst -> xcnt , src -> xip , src -> xcnt * sizeof (TransactionId ));
178
- qsort (buf , dst -> xcnt + src -> xcnt , sizeof (TransactionId ), xidComparator );
182
+ qsort (local -> xip , n , sizeof (TransactionId ), xidComparator );
179
183
xid = InvalidTransactionId ;
180
- for (i = 0 , j = 0 , n = dst -> xcnt + src -> xcnt ; i < n && buf [i ] < dst -> xmax ; i ++ ) {
181
- if (buf [i ] != xid ) {
182
- dst -> xip [j ++ ] = xid = buf [i ];
184
+
185
+ for (i = 0 , j = 0 ; i < n && local -> xip [i ] < dst -> xmax ; i ++ ) {
186
+ if (local -> xip [i ] != xid ) {
187
+ dst -> xip [j ++ ] = xid = local -> xip [i ];
183
188
}
184
189
}
185
190
dst -> xcnt = j ;
@@ -203,28 +208,50 @@ static void DtmUpdateRecentXmin(void)
203
208
if (RecentGlobalXmin > xmin ) {
204
209
RecentGlobalXmin = xmin ;
205
210
}
206
- RecentXmin = xmin ;
211
+ if (RecentXmin > xmin ) {
212
+ RecentXmin = xmin ;
213
+ }
207
214
}
208
215
}
209
216
210
- static Snapshot DtmGetSnapshot (Snapshot snapshot )
217
+ static Snapshot DtmCopySnapshot (Snapshot snapshot )
211
218
{
212
- if (!IsMVCCSnapshot (snapshot ) || snapshot == & CatalogSnapshotData ) {
213
- snapshot = GetLocalSnapshotData (snapshot );
214
- } else {
215
- XTM_TRACE ("XTM: DtmGetSnapshot \n" );
216
- if (DtmGlobalTransaction /* && !DtmHasSnapshot*/ ) {
217
- DtmHasSnapshot = true;
218
- DtmEnsureConnection ();
219
- DtmGlobalGetSnapshot (DtmConn , DtmNodeId , GetCurrentTransactionId (), & DtmSnapshot );
220
- }
221
- snapshot = GetLocalSnapshotData (snapshot );
222
- if (DtmHasSnapshot ) {
223
- DtmCopySnapshot (snapshot , & DtmSnapshot );
224
- DtmUpdateRecentXmin ();
225
- }
219
+ Snapshot newsnap ;
220
+ Size size = sizeof (SnapshotData ) + GetMaxSnapshotXidCount () * sizeof (TransactionId );
221
+ Size subxipoff = size ;
222
+ if (snapshot -> subxcnt > 0 ) {
223
+ size += snapshot -> subxcnt * sizeof (TransactionId );
226
224
}
227
- return snapshot ;
225
+ newsnap = (Snapshot ) MemoryContextAlloc (TopTransactionContext , size );
226
+ memcpy (newsnap , snapshot , sizeof (SnapshotData ));
227
+
228
+ newsnap -> regd_count = 0 ;
229
+ newsnap -> active_count = 0 ;
230
+ newsnap -> copied = true;
231
+
232
+ newsnap -> xip = (TransactionId * ) (newsnap + 1 );
233
+ if (snapshot -> xcnt > 0 )
234
+ {
235
+ memcpy (newsnap -> xip , snapshot -> xip , snapshot -> xcnt * sizeof (TransactionId ));
236
+ }
237
+ if (snapshot -> subxcnt > 0 &&
238
+ (!snapshot -> suboverflowed || snapshot -> takenDuringRecovery ))
239
+ {
240
+ newsnap -> subxip = (TransactionId * ) ((char * ) newsnap + subxipoff );
241
+ memcpy (newsnap -> subxip , snapshot -> subxip ,
242
+ snapshot -> subxcnt * sizeof (TransactionId ));
243
+ }
244
+ else
245
+ newsnap -> subxip = NULL ;
246
+
247
+ return newsnap ;
248
+ }
249
+
250
+
251
+ static Snapshot DtmGetSnapshot ()
252
+ {
253
+ CurrentTransactionSnapshot = GetLocalTransactionSnapshot ();
254
+ return CurrentTransactionSnapshot ;
228
255
}
229
256
230
257
@@ -243,22 +270,28 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
243
270
if (DtmGlobalTransaction ) {
244
271
/* Already should be IN_PROGRESS */
245
272
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
246
- DtmHasSnapshot = false;
247
273
DtmGlobalTransaction = false;
248
- DtmEnsureConnection ();
249
- XTM_INFO ("Begin commit transaction %d\n" , xid );
250
-
251
- DtmCurrentXid = xid ;
252
- LWLockAcquire (dtm -> lock , LW_EXCLUSIVE );
253
- hash_search (xid_in_doubt , & DtmCurrentXid , HASH_ENTER , NULL );
254
- LWLockRelease (dtm -> lock );
255
-
256
- if (!DtmGlobalSetTransStatus (DtmConn , DtmNodeId , xid , status , true) && status != TRANSACTION_STATUS_ABORTED ) {
257
- elog (ERROR , "DTMD failed to set transaction status" );
274
+ CurrentTransactionSnapshot = NULL ;
275
+ if (status == TRANSACTION_STATUS_ABORTED ) {
276
+ CLOGTransactionIdSetTreeStatus (xid , nsubxids , subxids , status , lsn );
277
+ DtmEnsureConnection ();
278
+ DtmGlobalSetTransStatus (DtmConn , DtmNodeId , xid , status , false);
279
+ XTM_INFO ("Abort transaction %d\n" , xid );
280
+ return ;
281
+ } else {
282
+ DtmEnsureConnection ();
283
+ XTM_INFO ("Begin commit transaction %d\n" , xid );
284
+ DtmCurrentXid = xid ;
285
+ LWLockAcquire (dtm -> lock , LW_EXCLUSIVE );
286
+ hash_search (xid_in_doubt , & DtmCurrentXid , HASH_ENTER , NULL );
287
+ LWLockRelease (dtm -> lock );
288
+ if (!DtmGlobalSetTransStatus (DtmConn , DtmNodeId , xid , status , true)) {
289
+ elog (ERROR , "DTMD failed to set transaction status" );
290
+ }
291
+ XTM_INFO ("Commit transaction %d\n" , xid );
258
292
}
259
- XTM_INFO ("Commit transaction %d\n" , xid );
260
293
} else {
261
- elog ( WARNING , "Set transaction %u status in local CLOG" , xid );
294
+ XTM_INFO ( "Set transaction %u status in local CLOG" , xid );
262
295
}
263
296
} else {
264
297
XidStatus gs ;
@@ -304,6 +337,7 @@ static void DtmInitialize()
304
337
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE );
305
338
306
339
RegisterXactCallback (DtmXactCallback , NULL );
340
+ DtmInitSnapshot (& DtmLocalSnapshot );
307
341
308
342
TM = & DtmTM ;
309
343
}
@@ -390,22 +424,41 @@ static void dtm_shmem_startup(void)
390
424
PG_MODULE_MAGIC ;
391
425
392
426
PG_FUNCTION_INFO_V1 (dtm_begin_transaction );
427
+ PG_FUNCTION_INFO_V1 (dtm_get_current_snapshot_xmax );
428
+ PG_FUNCTION_INFO_V1 (dtm_get_current_snapshot_xmin );
393
429
394
430
Datum
395
431
dtm_begin_transaction (PG_FUNCTION_ARGS )
396
432
{
397
433
GlobalTransactionId gtid ;
398
434
ArrayType * nodes = PG_GETARG_ARRAYTYPE_P (0 );
399
- ArrayType * xids = PG_GETARG_ARRAYTYPE_P (1 );
435
+ ArrayType * xids = PG_GETARG_ARRAYTYPE_P (1 );
400
436
gtid .xids = (TransactionId * )ARR_DATA_PTR (xids );
401
437
gtid .nodes = (NodeId * )ARR_DATA_PTR (nodes );
402
438
gtid .nNodes = ArrayGetNItems (ARR_NDIM (nodes ), ARR_DIMS (nodes ));
403
439
DtmGlobalTransaction = true;
440
+ Assert (gtid .xids [DtmNodeId ] == GetCurrentTransactionId ());
404
441
XTM_INFO ("Start transaction {%d,%d} at node %d\n" , gtid .xids [0 ], gtid .xids [1 ], DtmNodeId );
405
- XTM_TRACE ("XTM: dtm_begin_transaction \n" );
406
442
if (DtmNodeId == gtid .nodes [0 ]) {
407
443
DtmEnsureConnection ();
408
444
DtmGlobalStartTransaction (DtmConn , & gtid );
409
445
}
446
+ DtmEnsureConnection ();
447
+ DtmGlobalGetSnapshot (DtmConn , DtmNodeId , gtid .xids [DtmNodeId ], & DtmSnapshot );
448
+ Assert (CurrentTransactionSnapshot != NULL );
449
+ DtmMergeSnapshots (CurrentTransactionSnapshot , & DtmSnapshot );
450
+ DtmUpdateRecentXmin ();
410
451
PG_RETURN_VOID ();
411
452
}
453
+
454
+ Datum
455
+ dtm_get_current_snapshot_xmin (PG_FUNCTION_ARGS )
456
+ {
457
+ PG_RETURN_INT64 (CurrentTransactionSnapshot -> xmin );
458
+ }
459
+
460
+ Datum
461
+ dtm_get_current_snapshot_xmax (PG_FUNCTION_ARGS )
462
+ {
463
+ PG_RETURN_INT64 (CurrentTransactionSnapshot -> xmax );
464
+ }
0 commit comments