@@ -96,43 +96,83 @@ static void DumpSnapshot(Snapshot s, char *name)
96
96
XTM_INFO ("%s\n" , buf );
97
97
}
98
98
99
- /* DTM snapshot is sorted, so we can use bsearch */
100
- static bool IsInDtmSnapshot (Snapshot s , TransactionId xid )
99
+ static bool IsInSnapshot (Snapshot s , TransactionId xid )
101
100
{
102
- return (xid >= s -> xmax
103
- || bsearch (& xid , s -> xip , s -> xcnt , sizeof (TransactionId ), xidComparator ) != NULL );
101
+ int i ;
102
+ if (xid < s -> xmin ) {
103
+ return false;
104
+ }
105
+ if (xid >= s -> xmax ) {
106
+ return true;
107
+ }
108
+ for (i = 0 ; i < s -> xcnt ; i ++ ) {
109
+ if (s -> xip [i ] == xid ) {
110
+ return true;
111
+ }
112
+ }
113
+ return false;
104
114
}
105
115
106
116
107
117
static void DtmCopySnapshot (Snapshot dst , Snapshot src )
108
118
{
109
- int i ;
119
+ int i , j , n ;
120
+ static TransactionId * buf ;
110
121
TransactionId xid ;
111
122
123
+ if (buf == NULL ) {
124
+ buf = (TransactionId * )malloc (GetMaxSnapshotXidCount () * sizeof (TransactionId ) * 2 );
125
+ }
126
+
112
127
DumpSnapshot (dst , "local" );
113
128
DumpSnapshot (src , "DTM" );
114
129
115
- Wait :
116
- while (true) {
117
- GetLocalSnapshotData (dst );
118
- for (i = 0 ; i < dst -> xcnt && IsInDtmSnapshot (src , dst -> xip [i ]); i ++ );
119
- if (i == dst -> xcnt ) {
120
- break ;
121
- }
122
- pg_usleep (MIN_DELAY );
130
+ Assert (TransactionIdIsValid (src -> xmin ) && TransactionIdIsValid (src -> xmax ));
131
+
132
+ RefreshLocalSnapshot :
133
+ GetLocalSnapshotData (dst );
134
+ xid = src -> xmin < dst -> xmin ? src -> xmin : dst -> xmin ;
135
+ for (i = 0 ; i < src -> xcnt ; i ++ ) {
136
+ while (src -> xip [i ] > xid ) { /* XID is completed according to global snapshot... */
137
+ if (IsInSnapshot (dst , xid )) { /* ...but still marked as running in local snapshot */
138
+ pg_usleep (MIN_DELAY );
139
+ goto RefreshLocalSnapshot ;
140
+ } else {
141
+ xid += 1 ; /* XID is also marked completed in local snapshot */
142
+ }
143
+ }
144
+ /* XID is considered as running in global snapshot */
145
+ /* doesn't matter what local snapshot thinks about it */
146
+ xid = src -> xip [i ]+ 1 ;
123
147
}
124
- for (xid = dst -> xmax ; xid < src -> xmax ; xid ++ ) {
125
- if (! IsInDtmSnapshot ( src , xid )) {
148
+ while (xid < src -> xmax ) {
149
+ if (IsInSnapshot ( dst , xid )) { /* ...but still marked as running in local snapshot */
126
150
pg_usleep (MIN_DELAY );
127
- goto Wait ;
151
+ goto RefreshLocalSnapshot ;
152
+ } else {
153
+ xid += 1 ; /* XID is also marked completed in local snapshot */
128
154
}
129
- }
155
+ }
156
+ /* At this point we are sure that all transactions marked as completed in global snapshot are also finished locally */
130
157
158
+ /* merge two snapshots: produce most restrictive snapshots whihc includes running transactions from both of them */
159
+ if (dst -> xmin > src -> xmin ) {
160
+ dst -> xmin = src -> xmin ;
161
+ }
162
+ if (dst -> xmax > src -> xmax ) {
163
+ dst -> xmax = src -> xmax ;
164
+ }
131
165
132
- memcpy (dst -> xip , src -> xip , src -> xcnt * sizeof (TransactionId ));
133
- dst -> xmin = src -> xmin ;
134
- dst -> xmax = src -> xmax ;
135
- dst -> xcnt = src -> xcnt ;
166
+ memcpy (buf , dst -> xip , dst -> xcnt * sizeof (TransactionId ));
167
+ memcpy (buf + dst -> xcnt , src -> xip , src -> xcnt * sizeof (TransactionId ));
168
+ qsort (buf , dst -> xcnt + src -> xcnt , sizeof (TransactionId ), xidComparator );
169
+ xid = InvalidTransactionId ;
170
+ for (i = 0 , j = 0 , n = dst -> xcnt + src -> xcnt ; i < n && buf [i ] < dst -> xmax ; i ++ ) {
171
+ if (buf [i ] != xid ) {
172
+ dst -> xip [j ++ ] = xid = buf [i ];
173
+ }
174
+ }
175
+ dst -> xcnt = j ;
136
176
DumpSnapshot (dst , "merged" );
137
177
}
138
178
@@ -171,41 +211,15 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
171
211
172
212
static bool DtmTransactionIsInProgress (TransactionId xid )
173
213
{
174
- #if 0
175
- if (IsInDtmSnapshot (xid )) {
176
- unsigned delay = MIN_DELAY ;
177
- XLogRecPtr lsn ;
178
- while (CLOGTransactionIdGetStatus (xid , & lsn ) == TRANSACTION_STATUS_IN_PROGRESS ) {
179
- pg_usleep (delay );
180
- if (delay < MAX_DELAY ) {
181
- delay *= 2 ;
182
- }
183
- }
184
- return false;
185
- }
186
- #endif
187
214
XTM_TRACE ("XTM: DtmTransactionIsInProgress \n" );
188
- return TransactionIdIsRunning (xid );// || IsInDtmSnapshot(xid);
215
+ return TransactionIdIsRunning (xid );
189
216
}
190
217
191
218
192
219
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn )
193
220
{
194
- #if 0
195
- if (IsInDtmSnapshot (xid )) {
196
- return TRANSACTION_STATUS_IN_PROGRESS ;
197
- }
198
- #endif
199
221
XidStatus status = CLOGTransactionIdGetStatus (xid , lsn );
200
222
XTM_TRACE ("XTM: DtmGetTransactionStatus \n" );
201
- #if 0
202
- if (status == TRANSACTION_STATUS_IN_PROGRESS ) {
203
- status = DtmGetGloabalTransStatus (xid );
204
- if (status == TRANSACTION_STATUS_UNKNOWN ) {
205
- status = TRANSACTION_STATUS_IN_PROGRESS ;
206
- }
207
- }
208
- #endif
209
223
return status ;
210
224
}
211
225
@@ -223,7 +237,6 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
223
237
if (!DtmGlobalSetTransStatus (DtmConn , DtmNodeId , xid , status ) && status != TRANSACTION_STATUS_ABORTED ) {
224
238
elog (ERROR , "DTMD failed to set transaction status" );
225
239
}
226
- DtmEnsureConnection ();
227
240
status = DtmGlobalGetTransStatus (DtmConn , DtmNodeId , xid , true);
228
241
XTM_INFO ("Commit transaction %d\n" , xid );
229
242
Assert (status == TRANSACTION_STATUS_ABORTED || status == TRANSACTION_STATUS_COMMITTED );
249
262
_PG_init (void )
250
263
{
251
264
TM = & DtmTM ;
252
- // TransactionIsInCurrentSnapshot = TransactionIsInDtmSnapshot;
253
265
254
266
DefineCustomIntVariable ("dtm.node_id" ,
255
267
"Identifier of node in distributed cluster for DTM" ,
@@ -308,7 +320,6 @@ dtm_get_snapshot(PG_FUNCTION_ARGS)
308
320
DtmGlobalGetSnapshot (DtmConn , DtmNodeId , GetCurrentTransactionId (), & DtmSnapshot );
309
321
310
322
XTM_TRACE ("XTM: dtm_get_snapshot \n" );
311
- /* Move it to DtmGlobalGetSnapshot? */
312
323
Assert (!DtmHasSnapshot );
313
324
DtmHasSnapshot = true;
314
325
DtmGlobalTransaction = true;
0 commit comments