@@ -44,7 +44,6 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot);
44
44
static void DtmCopySnapshot (Snapshot dst , Snapshot src );
45
45
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn );
46
46
static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn );
47
- static XidStatus DtmGetGlobalTransStatus (TransactionId xid );
48
47
static void DtmUpdateRecentXmin (void );
49
48
// static bool IsInDtmSnapshot(TransactionId xid);
50
49
static bool DtmTransactionIsInProgress (TransactionId xid );
@@ -57,7 +56,9 @@ static bool DtmGlobalTransaction = false;
57
56
static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmTransactionIsInProgress };
58
57
static DTMConn DtmConn ;
59
58
60
- #define XTM_TRACE (fmt , ...) fprintf(stderr, fmt, ## __VA_ARGS__)
59
+ #define XTM_TRACE (fmt , ...)
60
+ //#define XTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
61
+ #define XTM_INFO (fmt , ...) fprintf(stderr, fmt, ## __VA_ARGS__)
61
62
#define XTM_CONNECT_ATTEMPTS 10
62
63
63
64
static void DtmEnsureConnection (void )
@@ -81,8 +82,8 @@ static void DumpSnapshot(Snapshot s, char *name)
81
82
char * cursor = buf ;
82
83
cursor += sprintf (
83
84
cursor ,
84
- "snapshot %s: xmin=%d, xmax=%d, active=[" ,
85
- name , s -> xmin , s -> xmax
85
+ "snapshot %s for transaction %d : xmin=%d, xmax=%d, active=[" ,
86
+ name , GetCurrentTransactionId (), s -> xmin , s -> xmax
86
87
);
87
88
for (i = 0 ; i < s -> xcnt ; i ++ ) {
88
89
if (i == 0 ) {
@@ -92,25 +93,69 @@ static void DumpSnapshot(Snapshot s, char *name)
92
93
}
93
94
}
94
95
cursor += sprintf (cursor , "]" );
95
- XTM_TRACE ("%s\n" , buf );
96
+ XTM_INFO ("%s\n" , buf );
96
97
}
97
98
99
+ static bool IsInSnapshot (Snapshot s , TransactionId xid )
100
+ {
101
+ int i ;
102
+ if (xid < s -> xmin ) {
103
+ return false;
104
+ }
105
+ if (xid >= s -> xmax ) {
106
+ return true;
107
+ }
108
+ for (i = 0 ; i < s -> xcnt ; i ++ ) {
109
+ if (s -> xip [i ] == xid ) {
110
+ return true;
111
+ }
112
+ }
113
+ return false;
114
+ }
115
+
116
+
98
117
static void DtmCopySnapshot (Snapshot dst , Snapshot src )
99
118
{
100
119
int i , j , n ;
101
120
static TransactionId * buf ;
102
- TransactionId prev = InvalidTransactionId ;
103
-
104
- XTM_TRACE ("XTM: DtmCopySnapshot for transaction%u\n" , GetCurrentTransactionId ());
105
- DumpSnapshot (dst , "local" );
106
- DumpSnapshot (src , "DTM" );
121
+ TransactionId xid ;
107
122
108
123
if (buf == NULL ) {
109
124
buf = (TransactionId * )malloc (GetMaxSnapshotXidCount () * sizeof (TransactionId ) * 2 );
110
125
}
111
126
127
+ DumpSnapshot (dst , "local" );
128
+ DumpSnapshot (src , "DTM" );
129
+
130
+ Assert (TransactionIdIsValid (src -> xmin ) && TransactionIdIsValid (src -> xmax ));
131
+
132
+ RefreshLocalSnapshot :
112
133
GetLocalSnapshotData (dst );
134
+ xid = src -> xmin < dst -> xmin ? src -> xmin : dst -> xmin ;
135
+ for (i = 0 ; i < src -> xcnt ; i ++ ) {
136
+ while (src -> xip [i ] > xid ) { /* XID is completed according to global snapshot... */
137
+ if (IsInSnapshot (dst , xid )) { /* ...but still marked as running in local snapshot */
138
+ pg_usleep (MIN_DELAY );
139
+ goto RefreshLocalSnapshot ;
140
+ } else {
141
+ xid += 1 ; /* XID is also marked completed in local snapshot */
142
+ }
143
+ }
144
+ /* XID is considered as running in global snapshot */
145
+ /* doesn't matter what local snapshot thinks about it */
146
+ xid = src -> xip [i ]+ 1 ;
147
+ }
148
+ while (xid < src -> xmax ) {
149
+ if (IsInSnapshot (dst , xid )) { /* ...but still marked as running in local snapshot */
150
+ pg_usleep (MIN_DELAY );
151
+ goto RefreshLocalSnapshot ;
152
+ } else {
153
+ xid += 1 ; /* XID is also marked completed in local snapshot */
154
+ }
155
+ }
156
+ /* At this point we are sure that all transactions marked as completed in global snapshot are also finished locally */
113
157
158
+ /* merge two snapshots: produce most restrictive snapshots whihc includes running transactions from both of them */
114
159
if (dst -> xmin > src -> xmin ) {
115
160
dst -> xmin = src -> xmin ;
116
161
}
@@ -120,14 +165,15 @@ static void DtmCopySnapshot(Snapshot dst, Snapshot src)
120
165
121
166
memcpy (buf , dst -> xip , dst -> xcnt * sizeof (TransactionId ));
122
167
memcpy (buf + dst -> xcnt , src -> xip , src -> xcnt * sizeof (TransactionId ));
123
- qsort (buf , dst -> xcnt + src -> xcnt , sizeof (TransactionId ), xidComparator );
168
+ qsort (buf , dst -> xcnt + src -> xcnt , sizeof (TransactionId ), xidComparator );
169
+ xid = InvalidTransactionId ;
124
170
for (i = 0 , j = 0 , n = dst -> xcnt + src -> xcnt ; i < n && buf [i ] < dst -> xmax ; i ++ ) {
125
- if (buf [i ] != prev ) {
126
- dst -> xip [j ++ ] = prev = buf [i ];
171
+ if (buf [i ] != xid ) {
172
+ dst -> xip [j ++ ] = xid = buf [i ];
127
173
}
128
174
}
129
175
dst -> xcnt = j ;
130
- DumpSnapshot (dst , "Merged " );
176
+ DumpSnapshot (dst , "merged " );
131
177
}
132
178
133
179
static void DtmUpdateRecentXmin (void )
@@ -157,69 +203,22 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
157
203
snapshot = GetLocalSnapshotData (snapshot );
158
204
if (DtmHasSnapshot ) {
159
205
DtmCopySnapshot (snapshot , & DtmSnapshot );
160
- // DtmUpdateRecentXmin();
206
+ DtmUpdateRecentXmin ();
161
207
}
162
208
return snapshot ;
163
209
}
164
210
165
- static bool IsInDtmSnapshot (TransactionId xid )
166
- {
167
- return DtmHasSnapshot
168
- && (xid >= DtmSnapshot .xmax
169
- || bsearch (& xid , DtmSnapshot .xip , DtmSnapshot .xcnt , sizeof (TransactionId ), xidComparator ) != NULL );
170
- }
171
211
172
212
static bool DtmTransactionIsInProgress (TransactionId xid )
173
213
{
174
- #if 0
175
- if (IsInDtmSnapshot (xid )) {
176
- unsigned delay = MIN_DELAY ;
177
- XLogRecPtr lsn ;
178
- while (CLOGTransactionIdGetStatus (xid , & lsn ) == TRANSACTION_STATUS_IN_PROGRESS ) {
179
- pg_usleep (delay );
180
- if (delay < MAX_DELAY ) {
181
- delay *= 2 ;
182
- }
183
- }
184
- return false;
185
- }
186
- #endif
187
214
XTM_TRACE ("XTM: DtmTransactionIsInProgress \n" );
188
- return TransactionIdIsRunning (xid );// || IsInDtmSnapshot(xid);
189
- }
190
-
191
- static XidStatus DtmGetGlobalTransStatus (TransactionId xid )
192
- {
193
- XTM_TRACE ("XTM: DtmGetGlobalTransStatus \n" );
194
- while (true) {
195
- XidStatus status ;
196
- DtmEnsureConnection ();
197
- status = DtmGlobalGetTransStatus (DtmConn , DtmNodeId , xid , true);
198
- if (status == TRANSACTION_STATUS_IN_PROGRESS ) {
199
- elog (ERROR , "DTMD reported status in progress" );
200
- } else {
201
- return status ;
202
- }
203
- }
215
+ return TransactionIdIsRunning (xid );
204
216
}
205
217
206
218
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn )
207
219
{
208
- #if 0
209
- if (IsInDtmSnapshot (xid )) {
210
- return TRANSACTION_STATUS_IN_PROGRESS ;
211
- }
212
- #endif
213
220
XidStatus status = CLOGTransactionIdGetStatus (xid , lsn );
214
221
XTM_TRACE ("XTM: DtmGetTransactionStatus \n" );
215
- #if 0
216
- if (status == TRANSACTION_STATUS_IN_PROGRESS ) {
217
- status = DtmGetGlobalTransStatus (xid );
218
- if (status == TRANSACTION_STATUS_UNKNOWN ) {
219
- status = TRANSACTION_STATUS_IN_PROGRESS ;
220
- }
221
- }
222
- #endif
223
222
return status ;
224
223
}
225
224
@@ -231,20 +230,22 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
231
230
if (DtmGlobalTransaction ) {
232
231
/* Already should be IN_PROGRESS */
233
232
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
234
-
235
233
DtmHasSnapshot = false;
236
234
DtmGlobalTransaction = false;
237
235
DtmEnsureConnection ();
238
236
if (!DtmGlobalSetTransStatus (DtmConn , DtmNodeId , xid , status ) && status != TRANSACTION_STATUS_ABORTED ) {
239
237
elog (ERROR , "DTMD failed to set transaction status" );
240
238
}
241
- status = DtmGetGlobalTransStatus (xid );
239
+ status = DtmGlobalGetTransStatus (DtmConn , DtmNodeId , xid , true);
240
+ XTM_INFO ("Commit transaction %d\n" , xid );
242
241
Assert (status == TRANSACTION_STATUS_ABORTED || status == TRANSACTION_STATUS_COMMITTED );
243
242
} else {
244
243
elog (WARNING , "Set transaction %u status in local CLOG" , xid );
245
244
}
246
245
} else {
247
- XidStatus gs = DtmGetGlobalTransStatus (xid );
246
+ XidStatus gs ;
247
+ DtmEnsureConnection ();
248
+ gs = DtmGlobalGetTransStatus (DtmConn , DtmNodeId , xid , false);
248
249
if (gs != TRANSACTION_STATUS_UNKNOWN ) {
249
250
status = gs ;
250
251
}
260
261
_PG_init (void )
261
262
{
262
263
TM = & DtmTM ;
263
- // TransactionIsInCurrentSnapshot = TransactionIsInDtmSnapshot;
264
264
265
265
DefineCustomIntVariable ("dtm.node_id" ,
266
266
"Identifier of node in distributed cluster for DTM" ,
@@ -303,6 +303,7 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
303
303
gtid .nodes = (NodeId * )ARR_DATA_PTR (nodes );
304
304
gtid .nNodes = ArrayGetNItems (ARR_NDIM (nodes ), ARR_DIMS (nodes ));
305
305
DtmGlobalTransaction = true;
306
+ XTM_INFO ("Start transaction {%d,%d} at node %d\n" , gtid .xids [0 ], gtid .xids [1 ], DtmNodeId );
306
307
XTM_TRACE ("XTM: dtm_begin_transaction \n" );
307
308
if (DtmNodeId == gtid .nodes [0 ]) {
308
309
DtmEnsureConnection ();
@@ -318,8 +319,6 @@ dtm_get_snapshot(PG_FUNCTION_ARGS)
318
319
DtmGlobalGetSnapshot (DtmConn , DtmNodeId , GetCurrentTransactionId (), & DtmSnapshot );
319
320
320
321
XTM_TRACE ("XTM: dtm_get_snapshot \n" );
321
-
322
- /* Move it to DtmGlobalGetSnapshot? */
323
322
Assert (!DtmHasSnapshot );
324
323
DtmHasSnapshot = true;
325
324
DtmGlobalTransaction = true;
0 commit comments