@@ -45,8 +45,9 @@ static void DtmCopySnapshot(Snapshot dst, Snapshot src);
45
45
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn );
46
46
static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn );
47
47
static void DtmUpdateRecentXmin (void );
48
- // static bool IsInDtmSnapshot(TransactionId xid);
49
48
static bool DtmTransactionIsInProgress (TransactionId xid );
49
+ static bool TransactionIdIsInDtmSnapshot (Snapshot s , TransactionId xid );
50
+ static bool TransactionIdIsInDoubt (Snapshot s , TransactionId xid );
50
51
51
52
static NodeId DtmNodeId ;
52
53
static DTMConn DtmConn ;
@@ -58,7 +59,7 @@ static DTMConn DtmConn;
58
59
59
60
#define XTM_TRACE (fmt , ...)
60
61
//#define XTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
61
- #define XTM_INFO (fmt , ...) fprintf(stderr , fmt, ## __VA_ARGS__)
62
+ #define XTM_INFO (fmt , ...) elog(WARNING , fmt, ## __VA_ARGS__)
62
63
#define XTM_CONNECT_ATTEMPTS 10
63
64
64
65
static void DtmEnsureConnection (void )
@@ -96,24 +97,26 @@ static void DumpSnapshot(Snapshot s, char *name)
96
97
XTM_INFO ("%s\n" , buf );
97
98
}
98
99
99
- static bool IsInSnapshot (Snapshot s , TransactionId xid )
100
+ static bool TransactionIdIsInDtmSnapshot (Snapshot s , TransactionId xid )
100
101
{
101
- int i ;
102
- if (xid < s -> xmin ) {
103
- return false;
104
- }
105
- if (xid >= s -> xmax ) {
106
- return true;
107
- }
108
- for (i = 0 ; i < s -> xcnt ; i ++ ) {
109
- if (s -> xip [i ] == xid ) {
102
+ return xid >= s -> xmax
103
+ || bsearch (& xid , s -> xip , s -> xcnt , sizeof (TransactionId ), xidComparator ) != NULL ;
104
+ }
105
+
106
+ static bool TransactionIdIsInDoubt (Snapshot s , TransactionId xid )
107
+ {
108
+ if (!TransactionIdIsInDtmSnapshot (s , xid )) {
109
+ XLogRecPtr lsn ;
110
+ XidStatus status = CLOGTransactionIdGetStatus (xid , & lsn );
111
+ if (status != TRANSACTION_STATUS_IN_PROGRESS ) {
112
+ XTM_INFO ("Wait for transaction %d to complete\n" , xid );
113
+ XactLockTableWait (xid , NULL , NULL , XLTW_None );
110
114
return true;
111
115
}
112
116
}
113
117
return false;
114
118
}
115
-
116
-
119
+
117
120
static void DtmCopySnapshot (Snapshot dst , Snapshot src )
118
121
{
119
122
int i , j , n ;
@@ -129,33 +132,21 @@ static void DtmCopySnapshot(Snapshot dst, Snapshot src)
129
132
130
133
Assert (TransactionIdIsValid (src -> xmin ) && TransactionIdIsValid (src -> xmax ));
131
134
135
+ /* Check that globall competed transactions are not included in local snapshot */
132
136
RefreshLocalSnapshot :
133
137
GetLocalSnapshotData (dst );
134
- xid = src -> xmin < dst -> xmin ? src -> xmin : dst -> xmin ;
135
- for (i = 0 ; i < src -> xcnt ; i ++ ) {
136
- while (src -> xip [i ] > xid ) { /* XID is completed according to global snapshot... */
137
- if (IsInSnapshot (dst , xid )) { /* ...but still marked as running in local snapshot */
138
- pg_usleep (MIN_DELAY );
139
- goto RefreshLocalSnapshot ;
140
- } else {
141
- xid += 1 ; /* XID is also marked completed in local snapshot */
142
- }
143
- }
144
- /* XID is considered as running in global snapshot */
145
- /* doesn't matter what local snapshot thinks about it */
146
- xid = src -> xip [i ]+ 1 ;
138
+ for (i = 0 ; i < dst -> xcnt ; i ++ ) {
139
+ if (TransactionIdIsInDoubt (src , dst -> xip [i ])) {
140
+ goto RefreshLocalSnapshot ;
141
+ }
147
142
}
148
- while (xid < src -> xmax ) {
149
- if (IsInSnapshot (dst , xid )) { /* ...but still marked as running in local snapshot */
150
- pg_usleep (MIN_DELAY );
143
+ for (xid = dst -> xmax ; xid < src -> xmax ; xid ++ ) {
144
+ if (TransactionIdIsInDoubt (src , xid )) {
151
145
goto RefreshLocalSnapshot ;
152
- } else {
153
- xid += 1 ; /* XID is also marked completed in local snapshot */
154
146
}
155
- }
156
- /* At this point we are sure that all transactions marked as completed in global snapshot are also finished locally */
147
+ }
157
148
158
- /* merge two snapshots: produce most restrictive snapshots whihc includes running transactions from both of them */
149
+ /* Merge two snapshots: produce most restrictive snapshots whihc includes running transactions from both of them */
159
150
if (dst -> xmin > src -> xmin ) {
160
151
dst -> xmin = src -> xmin ;
161
152
}
@@ -233,6 +224,8 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
233
224
DtmHasSnapshot = false;
234
225
DtmGlobalTransaction = false;
235
226
DtmEnsureConnection ();
227
+ XTM_INFO ("Begin commit transaction %d\n" , xid );
228
+ CLOGTransactionIdSetTreeStatus (xid , nsubxids , subxids , TRANSACTION_STATUS_COMMITTED , lsn );
236
229
if (!DtmGlobalSetTransStatus (DtmConn , DtmNodeId , xid , status , true) && status != TRANSACTION_STATUS_ABORTED ) {
237
230
elog (ERROR , "DTMD failed to set transaction status" );
238
231
}
0 commit comments