@@ -44,7 +44,6 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot);
44
44
static void DtmCopySnapshot (Snapshot dst , Snapshot src );
45
45
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn );
46
46
static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn );
47
- static XidStatus DtmGetGloabalTransStatus (TransactionId xid );
48
47
static void DtmUpdateRecentXmin (void );
49
48
// static bool IsInDtmSnapshot(TransactionId xid);
50
49
static bool DtmTransactionIsInProgress (TransactionId xid );
@@ -59,6 +58,7 @@ static DTMConn DtmConn;
59
58
60
59
#define XTM_TRACE (fmt , ...)
61
60
//#define XTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
61
+ #define XTM_INFO (fmt , ...) fprintf(stderr, fmt, ## __VA_ARGS__)
62
62
#define XTM_CONNECT_ATTEMPTS 10
63
63
64
64
static void DtmEnsureConnection (void )
@@ -93,40 +93,46 @@ static void DumpSnapshot(Snapshot s, char *name)
93
93
}
94
94
}
95
95
cursor += sprintf (cursor , "]" );
96
- printf ("%s\n" , buf );
96
+ XTM_INFO ("%s\n" , buf );
97
97
}
98
98
99
+ /* DTM snapshot is sorted, so we can use bsearch */
100
+ static bool IsInDtmSnapshot (Snapshot s , TransactionId xid )
101
+ {
102
+ return (xid >= s -> xmax
103
+ || bsearch (& xid , s -> xip , s -> xcnt , sizeof (TransactionId ), xidComparator ) != NULL );
104
+ }
105
+
106
+
99
107
static void DtmCopySnapshot (Snapshot dst , Snapshot src )
100
108
{
101
- int i , j , n ;
102
- static TransactionId * buf ;
103
- TransactionId prev = InvalidTransactionId ;
109
+ int i ;
110
+ TransactionId xid ;
104
111
105
112
DumpSnapshot (dst , "local" );
106
113
DumpSnapshot (src , "DTM" );
107
114
108
- if (buf == NULL ) {
109
- buf = (TransactionId * )malloc (GetMaxSnapshotXidCount () * sizeof (TransactionId ) * 2 );
110
- }
111
-
112
- GetLocalSnapshotData (dst );
113
-
114
- if (dst -> xmin > src -> xmin ) {
115
- dst -> xmin = src -> xmin ;
116
- }
117
- if (dst -> xmax > src -> xmax ) {
118
- dst -> xmax = src -> xmax ;
115
+ Wait :
116
+ while (true) {
117
+ GetLocalSnapshotData (dst );
118
+ for (i = 0 ; i < dst -> xcnt && IsInDtmSnapshot (src , dst -> xip [i ]); i ++ );
119
+ if (i == dst -> xcnt ) {
120
+ break ;
121
+ }
122
+ pg_usleep (MIN_DELAY );
119
123
}
120
-
121
- memcpy (buf , dst -> xip , dst -> xcnt * sizeof (TransactionId ));
122
- memcpy (buf + dst -> xcnt , src -> xip , src -> xcnt * sizeof (TransactionId ));
123
- qsort (buf , dst -> xcnt + src -> xcnt , sizeof (TransactionId ), xidComparator );
124
- for (i = 0 , j = 0 , n = dst -> xcnt + src -> xcnt ; i < n && buf [i ] < dst -> xmax ; i ++ ) {
125
- if (buf [i ] != prev ) {
126
- dst -> xip [j ++ ] = prev = buf [i ];
124
+ for (xid = dst -> xmax ; xid < src -> xmax ; xid ++ ) {
125
+ if (!IsInDtmSnapshot (src , xid )) {
126
+ pg_usleep (MIN_DELAY );
127
+ goto Wait ;
127
128
}
128
129
}
129
- dst -> xcnt = j ;
130
+
131
+
132
+ memcpy (dst -> xip , src -> xip , src -> xcnt * sizeof (TransactionId ));
133
+ dst -> xmin = src -> xmin ;
134
+ dst -> xmax = src -> xmax ;
135
+ dst -> xcnt = src -> xcnt ;
130
136
DumpSnapshot (dst , "merged" );
131
137
}
132
138
@@ -162,12 +168,6 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
162
168
return snapshot ;
163
169
}
164
170
165
- static bool IsInDtmSnapshot (TransactionId xid )
166
- {
167
- return DtmHasSnapshot
168
- && (xid >= DtmSnapshot .xmax
169
- || bsearch (& xid , DtmSnapshot .xip , DtmSnapshot .xcnt , sizeof (TransactionId ), xidComparator ) != NULL );
170
- }
171
171
172
172
static bool DtmTransactionIsInProgress (TransactionId xid )
173
173
{
@@ -188,20 +188,6 @@ static bool DtmTransactionIsInProgress(TransactionId xid)
188
188
return TransactionIdIsRunning (xid );// || IsInDtmSnapshot(xid);
189
189
}
190
190
191
- static XidStatus DtmGetGloabalTransStatus (TransactionId xid )
192
- {
193
- XTM_TRACE ("XTM: DtmGetGloabalTransStatus \n" );
194
- while (true) {
195
- XidStatus status ;
196
- DtmEnsureConnection ();
197
- status = DtmGlobalGetTransStatus (DtmConn , DtmNodeId , xid , true);
198
- if (status == TRANSACTION_STATUS_IN_PROGRESS ) {
199
- elog (ERROR , "DTMD reported status in progress" );
200
- } else {
201
- return status ;
202
- }
203
- }
204
- }
205
191
206
192
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn )
207
193
{
@@ -231,20 +217,23 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
231
217
if (DtmGlobalTransaction ) {
232
218
/* Already should be IN_PROGRESS */
233
219
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
234
-
235
220
DtmHasSnapshot = false;
236
221
DtmGlobalTransaction = false;
237
222
DtmEnsureConnection ();
238
223
if (!DtmGlobalSetTransStatus (DtmConn , DtmNodeId , xid , status ) && status != TRANSACTION_STATUS_ABORTED ) {
239
224
elog (ERROR , "DTMD failed to set transaction status" );
240
225
}
241
- status = DtmGetGloabalTransStatus (xid );
226
+ DtmEnsureConnection ();
227
+ status = DtmGlobalGetTransStatus (DtmConn , DtmNodeId , xid , true);
228
+ XTM_INFO ("Commit transaction %d\n" , xid );
242
229
Assert (status == TRANSACTION_STATUS_ABORTED || status == TRANSACTION_STATUS_COMMITTED );
243
230
} else {
244
231
elog (WARNING , "Set transaction %u status in local CLOG" , xid );
245
232
}
246
233
} else {
247
- XidStatus gs = DtmGetGloabalTransStatus (xid );
234
+ XidStatus gs ;
235
+ DtmEnsureConnection ();
236
+ gs = DtmGlobalGetTransStatus (DtmConn , DtmNodeId , xid , false);
248
237
if (gs != TRANSACTION_STATUS_UNKNOWN ) {
249
238
status = gs ;
250
239
}
@@ -303,6 +292,7 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
303
292
gtid .nodes = (NodeId * )ARR_DATA_PTR (nodes );
304
293
gtid .nNodes = ArrayGetNItems (ARR_NDIM (nodes ), ARR_DIMS (nodes ));
305
294
DtmGlobalTransaction = true;
295
+ XTM_INFO ("Start transaction {%d,%d} at node %d\n" , gtid .xids [0 ], gtid .xids [1 ], DtmNodeId );
306
296
XTM_TRACE ("XTM: dtm_begin_transaction \n" );
307
297
if (DtmNodeId == gtid .nodes [0 ]) {
308
298
DtmEnsureConnection ();
@@ -318,7 +308,6 @@ dtm_get_snapshot(PG_FUNCTION_ARGS)
318
308
DtmGlobalGetSnapshot (DtmConn , DtmNodeId , GetCurrentTransactionId (), & DtmSnapshot );
319
309
320
310
XTM_TRACE ("XTM: dtm_get_snapshot \n" );
321
-
322
311
/* Move it to DtmGlobalGetSnapshot? */
323
312
Assert (!DtmHasSnapshot );
324
313
DtmHasSnapshot = true;
0 commit comments