@@ -41,10 +41,11 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot);
41
41
static void DtmCopySnapshot (Snapshot dst , Snapshot src );
42
42
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn );
43
43
static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn );
44
- static bool TransactionIsInDtmSnapshot (TransactionId xid );
44
+ static XidStatus DtmGetGloabalTransStatus (TransactionId xid );
45
+ static void DtmUpdateRecentXmin (void );
46
+ static bool IsInDtmSnapshot (TransactionId xid );
45
47
46
48
static NodeId DtmNodeId ;
47
-
48
49
static DTMConn DtmConn ;
49
50
static SnapshotData DtmSnapshot = {HeapTupleSatisfiesMVCC };
50
51
static bool DtmHasSnapshot = false;
@@ -73,62 +74,102 @@ static void DtmCopySnapshot(Snapshot dst, Snapshot src)
73
74
dst -> curcid = src -> curcid ;
74
75
}
75
76
77
+ static void DtmUpdateRecentXmin (void )
78
+ {
79
+ TransactionId xmin = DtmSnapshot .xmin ;
80
+ if (xmin != InvalidTransactionId ) {
81
+ xmin -= vacuum_defer_cleanup_age ;
82
+ if (!TransactionIdIsNormal (xmin )) {
83
+ xmin = FirstNormalTransactionId ;
84
+ }
85
+ if (RecentGlobalDataXmin > xmin ) {
86
+ RecentGlobalDataXmin = xmin ;
87
+ }
88
+ if (RecentGlobalXmin > xmin ) {
89
+ RecentGlobalXmin = xmin ;
90
+ }
91
+ RecentXmin = xmin ;
92
+ }
93
+ }
94
+
76
95
static Snapshot DtmGetSnapshot (Snapshot snapshot )
77
96
{
78
97
if (DtmHasSnapshot ) {
79
98
DtmCopySnapshot (snapshot , & DtmSnapshot );
80
99
return snapshot ;
81
100
}
82
- return GetLocalSnapshotData (snapshot );
101
+ snapshot = GetLocalSnapshotData (snapshot );
102
+ DtmUpdateRecentXmin ();
103
+ return snapshot ;
104
+ }
105
+
106
+
107
+ static bool IsInDtmSnapshot (TransactionId xid )
108
+ {
109
+ return DtmHasSnapshot
110
+ && (xid > DtmSnapshot .xmax
111
+ || bsearch (& xid , DtmSnapshot .xip , DtmSnapshot .xcnt , sizeof (TransactionId ), xidComparator ) != NULL );
83
112
}
113
+
114
+
115
+ static XidStatus DtmGetGloabalTransStatus (TransactionId xid )
116
+ {
117
+ unsigned delay = 1000 ;
118
+ while (true) {
119
+ DtmEnsureConnection ();
120
+ XidStatus status = DtmGlobalGetTransStatus (DtmConn , DtmNodeId , xid );
121
+ if (status == TRANSACTION_STATUS_IN_PROGRESS ) {
122
+ pg_usleep (delay );
123
+ if (delay < 100000 ) {
124
+ delay *= 2 ;
125
+ }
126
+ } else {
127
+ return status ;
128
+ }
129
+ }
130
+ }
84
131
85
132
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn )
86
133
{
87
134
XidStatus status = CLOGTransactionIdGetStatus (xid , lsn );
88
- if (status == TRANSACTION_STATUS_IN_PROGRESS ) {
89
135
#if 0
90
- & & DtmHasSnapshot && !TransactionIdIsInProgress (xid )) {
91
- unsigned delay = 1000 ;
92
- while (true) {
93
- DtmEnsureConnection ();
94
- status = DtmGlobalGetTransStatus (DtmConn , DtmNodeId , xid );
95
- if (status == TRANSACTION_STATUS_IN_PROGRESS ) {
96
- pg_usleep (delay );
97
- if (delay < 100000 ) {
98
- delay *= 2 ;
99
- }
100
- } else {
101
- break ;
102
- }
103
- }
104
- #endif
105
- status = DtmGlobalGetTransStatus (DtmConn , DtmNodeId , xid );
106
- if (status != TRANSACTION_STATUS_IN_PROGRESS ) {
107
- CLOGTransactionIdSetTreeStatus (xid , 0 , NULL , status , InvalidXLogRecPtr );
136
+ if (status == TRANSACTION_STATUS_IN_PROGRESS ) {
137
+ status = DtmGetGloabalTransStatus (xid );
138
+ if (status == TRANSACTION_STATUS_UNKNOWN ) {
139
+ status = TRANSACTION_STATUS_IN_PROGRESS ;
108
140
}
109
141
}
142
+ #endif
110
143
return status ;
111
144
}
112
145
146
+
113
147
static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn )
114
148
{
115
- if (DtmHasSnapshot ) {
116
- /* Already should be IN_PROGRESS */
117
- /* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
118
- if (status == TRANSACTION_STATUS_COMMITTED ) {
119
- ProcArrayAdd (& ProcGlobal -> allProcs [AllocGXid (xid )]);
149
+ if (!RecoveryInProgress ()) {
150
+ if (DtmHasSnapshot ) {
151
+ /* Already should be IN_PROGRESS */
152
+ /* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
153
+
154
+ DtmHasSnapshot = false;
155
+ DtmEnsureConnection ();
156
+ if (!DtmGlobalSetTransStatus (DtmConn , DtmNodeId , xid , status ) && status != TRANSACTION_STATUS_ABORTED ) {
157
+ elog (ERROR , "DTMD failed to set transaction status" );
158
+ }
159
+ status = DtmGetGloabalTransStatus (xid );
160
+ Assert (status == TRANSACTION_STATUS_ABORTED || status == TRANSACTION_STATUS_COMMITTED );
161
+ } else {
162
+ elog (WARNING , "Set transaction %u status in local CLOG" , xid );
120
163
}
121
- DtmHasSnapshot = false;
122
- DtmEnsureConnection ( );
123
- if (! DtmGlobalSetTransStatus ( DtmConn , DtmNodeId , xid , status ) && status != TRANSACTION_STATUS_ABORTED ) {
124
- elog ( ERROR , "DTMD failed to set transaction status" ) ;
164
+ } else {
165
+ XidStatus gs = DtmGetGloabalTransStatus ( xid );
166
+ if (gs != TRANSACTION_STATUS_UNKNOWN ) {
167
+ status = gs ;
125
168
}
126
- } else {
127
- CLOGTransactionIdSetTreeStatus (xid , nsubxids , subxids , status , lsn );
128
169
}
170
+ CLOGTransactionIdSetTreeStatus (xid , nsubxids , subxids , status , lsn );
129
171
}
130
172
131
-
132
173
/*
133
174
* ***************************************************************************
134
175
*/
@@ -167,11 +208,11 @@ _PG_fini(void)
167
208
168
209
PG_MODULE_MAGIC ;
169
210
170
- PG_FUNCTION_INFO_V1 (dtm_global_transaction );
211
+ PG_FUNCTION_INFO_V1 (dtm_begin_transaction );
171
212
PG_FUNCTION_INFO_V1 (dtm_get_snapshot );
172
213
173
214
Datum
174
- dtm_global_transaction (PG_FUNCTION_ARGS )
215
+ dtm_begin_transaction (PG_FUNCTION_ARGS )
175
216
{
176
217
GlobalTransactionId gtid ;
177
218
ArrayType * nodes = PG_GETARG_ARRAYTYPE_P (0 );
@@ -184,36 +225,16 @@ dtm_global_transaction(PG_FUNCTION_ARGS)
184
225
PG_RETURN_VOID ();
185
226
}
186
227
187
-
188
-
189
-
190
-
191
-
192
-
193
228
Datum
194
229
dtm_get_snapshot (PG_FUNCTION_ARGS )
195
230
{
196
- TransactionId xmin ;
197
231
DtmEnsureConnection ();
198
232
DtmGlobalGetSnapshot (DtmConn , DtmNodeId , GetCurrentTransactionId (), & DtmSnapshot );
199
233
200
- VacuumProcArray (& DtmSnapshot );
234
+ // VacuumProcArray(&DtmSnapshot);
201
235
202
236
/* Move it to DtmGlobalGetSnapshot? */
203
- xmin = DtmSnapshot .xmin ;
204
- if (xmin != InvalidTransactionId ) {
205
- xmin -= vacuum_defer_cleanup_age ;
206
- if (!TransactionIdIsNormal (xmin )) {
207
- xmin = FirstNormalTransactionId ;
208
- }
209
- if (RecentGlobalDataXmin > xmin ) {
210
- RecentGlobalDataXmin = xmin ;
211
- }
212
- if (RecentGlobalXmin > xmin ) {
213
- RecentGlobalXmin = xmin ;
214
- }
215
- RecentXmin = xmin ;
216
- }
237
+ DtmUpdateRecentXmin ();
217
238
DtmSnapshot .curcid = GetCurrentCommandId (false);
218
239
DtmHasSnapshot = true;
219
240
PG_RETURN_VOID ();
0 commit comments