@@ -53,6 +53,7 @@ static NodeId DtmNodeId;
53
53
static DTMConn DtmConn ;
54
54
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
55
55
static bool DtmHasSnapshot = false;
56
+ static bool DtmGlobalTransaction = false;
56
57
static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmTransactionIsInProgress };
57
58
static DTMConn DtmConn ;
58
59
@@ -100,10 +101,12 @@ static void DtmCopySnapshot(Snapshot dst, Snapshot src)
100
101
static TransactionId * buf ;
101
102
TransactionId prev = InvalidTransactionId ;
102
103
103
- XTM_TRACE ("XTM: DtmCopySnapshot \n" );
104
+ XTM_TRACE ("XTM: DtmCopySnapshot for transaction%u\n" , GetCurrentTransactionId ());
105
+ DumpSnapshot (dst , "local" );
106
+ DumpSnapshot (src , "DTM" );
104
107
105
108
if (buf == NULL ) {
106
- buf = (TransactionId * )malloc (GetMaxSnapshotSubxidCount () * sizeof (TransactionId ) * 2 );
109
+ buf = (TransactionId * )malloc (GetMaxSnapshotXidCount () * sizeof (TransactionId ) * 2 );
107
110
}
108
111
109
112
GetLocalSnapshotData (dst );
@@ -124,6 +127,7 @@ static void DtmCopySnapshot(Snapshot dst, Snapshot src)
124
127
}
125
128
}
126
129
dst -> xcnt = j ;
130
+ DumpSnapshot (dst , "Merged" );
127
131
}
128
132
129
133
static void DtmUpdateRecentXmin (void )
@@ -143,28 +147,26 @@ static void DtmUpdateRecentXmin(void)
143
147
if (RecentGlobalXmin > xmin ) {
144
148
RecentGlobalXmin = xmin ;
145
149
}
146
- RecentXmin = xmin ;
150
+ // RecentXmin = xmin;
147
151
}
148
152
}
149
153
150
154
static Snapshot DtmGetSnapshot (Snapshot snapshot )
151
155
{
152
156
XTM_TRACE ("XTM: DtmGetSnapshot \n" );
153
-
154
- if (DtmHasSnapshot ) {
157
+ snapshot = GetLocalSnapshotData ( snapshot );
158
+ if (DtmHasSnapshot ) {
155
159
DtmCopySnapshot (snapshot , & DtmSnapshot );
156
- DtmUpdateRecentXmin ();
157
- } else {
158
- snapshot = GetLocalSnapshotData (snapshot );
160
+ //DtmUpdateRecentXmin();
159
161
}
160
162
return snapshot ;
161
163
}
162
164
163
165
static bool IsInDtmSnapshot (TransactionId xid )
164
166
{
165
167
return DtmHasSnapshot
166
- && (/* xid > DtmSnapshot.xmax
167
- || */ bsearch (& xid , DtmSnapshot .xip , DtmSnapshot .xcnt , sizeof (TransactionId ), xidComparator ) != NULL );
168
+ && (xid >= DtmSnapshot .xmax
169
+ || bsearch (& xid , DtmSnapshot .xip , DtmSnapshot .xcnt , sizeof (TransactionId ), xidComparator ) != NULL );
168
170
}
169
171
170
172
static bool DtmTransactionIsInProgress (TransactionId xid )
@@ -183,7 +185,7 @@ static bool DtmTransactionIsInProgress(TransactionId xid)
183
185
}
184
186
#endif
185
187
XTM_TRACE ("XTM: DtmTransactionIsInProgress \n" );
186
- return TransactionIdIsRunning (xid ) && ! IsInDtmSnapshot (xid );
188
+ return TransactionIdIsRunning (xid ); // || IsInDtmSnapshot(xid);
187
189
}
188
190
189
191
static XidStatus DtmGetGloabalTransStatus (TransactionId xid )
@@ -207,8 +209,13 @@ static XidStatus DtmGetGloabalTransStatus(TransactionId xid)
207
209
208
210
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn )
209
211
{
210
- XTM_TRACE ("XTM: DtmGetTransactionStatus \n" );
212
+ #if 0
213
+ if (IsInDtmSnapshot (xid )) {
214
+ return TRANSACTION_STATUS_IN_PROGRESS ;
215
+ }
216
+ #endif
211
217
XidStatus status = CLOGTransactionIdGetStatus (xid , lsn );
218
+ XTM_TRACE ("XTM: DtmGetTransactionStatus \n" );
212
219
#if 0
213
220
if (status == TRANSACTION_STATUS_IN_PROGRESS ) {
214
221
status = DtmGetGloabalTransStatus (xid );
@@ -217,23 +224,23 @@ static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
217
224
}
218
225
}
219
226
#endif
220
- return CLOGTransactionIdGetStatus ( xid , lsn ) ;
227
+ return status ;
221
228
}
222
229
223
230
224
231
static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn )
225
232
{
226
233
XTM_TRACE ("XTM: DtmSetTransactionStatus %u = %u \n" , xid , status );
227
234
if (!RecoveryInProgress ()) {
228
- if (DtmHasSnapshot ) {
235
+ if (DtmGlobalTransaction ) {
229
236
/* Already should be IN_PROGRESS */
230
237
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
231
238
232
239
DtmHasSnapshot = false;
240
+ DtmGlobalTransaction = false;
233
241
DtmEnsureConnection ();
234
242
if (!DtmGlobalSetTransStatus (DtmConn , DtmNodeId , xid , status ) && status != TRANSACTION_STATUS_ABORTED ) {
235
243
elog (ERROR , "DTMD failed to set transaction status" );
236
- // elog(WARNING, "DTMD failed to set transaction status");
237
244
}
238
245
status = DtmGetGloabalTransStatus (xid );
239
246
Assert (status == TRANSACTION_STATUS_ABORTED || status == TRANSACTION_STATUS_COMMITTED );
@@ -299,11 +306,12 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
299
306
gtid .xids = (TransactionId * )ARR_DATA_PTR (xids );
300
307
gtid .nodes = (NodeId * )ARR_DATA_PTR (nodes );
301
308
gtid .nNodes = ArrayGetNItems (ARR_NDIM (nodes ), ARR_DIMS (nodes ));
302
-
309
+ DtmGlobalTransaction = true;
303
310
XTM_TRACE ("XTM: dtm_begin_transaction \n" );
304
-
305
- DtmEnsureConnection ();
306
- DtmGlobalStartTransaction (DtmConn , & gtid );
311
+ if (DtmNodeId == gtid .nodes [0 ]) {
312
+ DtmEnsureConnection ();
313
+ DtmGlobalStartTransaction (DtmConn , & gtid );
314
+ }
307
315
PG_RETURN_VOID ();
308
316
}
309
317
@@ -316,7 +324,9 @@ dtm_get_snapshot(PG_FUNCTION_ARGS)
316
324
XTM_TRACE ("XTM: dtm_get_snapshot \n" );
317
325
318
326
/* Move it to DtmGlobalGetSnapshot? */
327
+ Assert (!DtmHasSnapshot );
319
328
DtmHasSnapshot = true;
329
+ DtmGlobalTransaction = true;
320
330
PG_RETURN_VOID ();
321
331
}
322
332
0 commit comments