@@ -43,23 +43,30 @@ static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
43
43
static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn );
44
44
static XidStatus DtmGetGloabalTransStatus (TransactionId xid );
45
45
static void DtmUpdateRecentXmin (void );
46
- static bool IsInDtmSnapshot (TransactionId xid );
46
+ // static bool IsInDtmSnapshot(TransactionId xid);
47
47
static bool DtmTransactionIsInProgress (TransactionId xid );
48
48
49
49
static NodeId DtmNodeId ;
50
50
static DTMConn DtmConn ;
51
- static SnapshotData DtmSnapshot = {HeapTupleSatisfiesMVCC };
51
+ static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
52
52
static bool DtmHasSnapshot = false;
53
53
static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmTransactionIsInProgress };
54
54
static DTMConn DtmConn ;
55
55
56
+ #define XTM_TRACE (fmt , ...) fprintf(stderr, fmt, ## __VA_ARGS__)
57
+ #define XTM_CONNECT_ATTEMPTS 10
58
+
56
59
static void DtmEnsureConnection (void )
57
60
{
58
- while (true) {
61
+ int attempt = 0 ;
62
+ XTM_TRACE ("XTM: DtmEnsureConnection\n" );
63
+ while (attempt < XTM_CONNECT_ATTEMPTS ) {
59
64
if (DtmConn ) {
60
65
break ;
61
- }
66
+ }
67
+ XTM_TRACE ("XTM: DtmEnsureConnection, attempt #%u\n" , attempt );
62
68
DtmConn = DtmConnect ("127.0.0.1" , 5431 );
69
+ attempt ++ ;
63
70
}
64
71
}
65
72
@@ -68,6 +75,9 @@ static void DtmCopySnapshot(Snapshot dst, Snapshot src)
68
75
int i , j , n ;
69
76
static TransactionId * buf ;
70
77
TransactionId prev = InvalidTransactionId ;
78
+
79
+ XTM_TRACE ("XTM: DtmCopySnapshot \n" );
80
+
71
81
if (buf == NULL ) {
72
82
buf = (TransactionId * )malloc (GetMaxSnapshotSubxidCount () * sizeof (TransactionId ) * 2 );
73
83
}
@@ -95,6 +105,9 @@ static void DtmCopySnapshot(Snapshot dst, Snapshot src)
95
105
static void DtmUpdateRecentXmin (void )
96
106
{
97
107
TransactionId xmin = DtmSnapshot .xmin ;
108
+
109
+ XTM_TRACE ("XTM: DtmUpdateRecentXmin \n" );
110
+
98
111
if (xmin != InvalidTransactionId ) {
99
112
xmin -= vacuum_defer_cleanup_age ;
100
113
if (!TransactionIdIsNormal (xmin )) {
@@ -112,6 +125,8 @@ static void DtmUpdateRecentXmin(void)
112
125
113
126
static Snapshot DtmGetSnapshot (Snapshot snapshot )
114
127
{
128
+ XTM_TRACE ("XTM: DtmGetSnapshot \n" );
129
+
115
130
if (DtmHasSnapshot ) {
116
131
DtmCopySnapshot (snapshot , & DtmSnapshot );
117
132
DtmUpdateRecentXmin ();
@@ -132,12 +147,16 @@ static bool IsInDtmSnapshot(TransactionId xid)
132
147
133
148
static bool DtmTransactionIsInProgress (TransactionId xid )
134
149
{
150
+ XTM_TRACE ("XTM: DtmTransactionIsInProgress \n" );
135
151
return /*IsInDtmSnapshot(xid) || */ TransactionIdIsRunning (xid );
136
152
}
137
153
138
154
static XidStatus DtmGetGloabalTransStatus (TransactionId xid )
139
155
{
140
156
unsigned delay = 1000 ;
157
+
158
+ XTM_TRACE ("XTM: DtmGetGloabalTransStatus \n" );
159
+
141
160
while (true) {
142
161
XidStatus status ;
143
162
DtmEnsureConnection ();
@@ -155,6 +174,7 @@ static XidStatus DtmGetGloabalTransStatus(TransactionId xid)
155
174
156
175
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn )
157
176
{
177
+ XTM_TRACE ("XTM: DtmGetTransactionStatus \n" );
158
178
XidStatus status = CLOGTransactionIdGetStatus (xid , lsn );
159
179
#if 0
160
180
if (status == TRANSACTION_STATUS_IN_PROGRESS ) {
@@ -170,6 +190,7 @@ static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
170
190
171
191
static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn )
172
192
{
193
+ XTM_TRACE ("XTM: DtmSetTransactionStatus %u = %u \n" , xid , status );
173
194
if (!RecoveryInProgress ()) {
174
195
if (DtmHasSnapshot ) {
175
196
/* Already should be IN_PROGRESS */
@@ -179,6 +200,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
179
200
DtmEnsureConnection ();
180
201
if (!DtmGlobalSetTransStatus (DtmConn , DtmNodeId , xid , status ) && status != TRANSACTION_STATUS_ABORTED ) {
181
202
elog (ERROR , "DTMD failed to set transaction status" );
203
+ // elog(WARNING, "DTMD failed to set transaction status");
182
204
}
183
205
status = DtmGetGloabalTransStatus (xid );
184
206
Assert (status == TRANSACTION_STATUS_ABORTED || status == TRANSACTION_STATUS_COMMITTED );
@@ -244,6 +266,9 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
244
266
gtid .xids = (TransactionId * )ARR_DATA_PTR (xids );
245
267
gtid .nodes = (NodeId * )ARR_DATA_PTR (nodes );
246
268
gtid .nNodes = ArrayGetNItems (ARR_NDIM (nodes ), ARR_DIMS (nodes ));
269
+
270
+ XTM_TRACE ("XTM: dtm_begin_transaction \n" );
271
+
247
272
DtmEnsureConnection ();
248
273
DtmGlobalStartTransaction (DtmConn , & gtid );
249
274
PG_RETURN_VOID ();
@@ -255,6 +280,8 @@ dtm_get_snapshot(PG_FUNCTION_ARGS)
255
280
DtmEnsureConnection ();
256
281
DtmGlobalGetSnapshot (DtmConn , DtmNodeId , GetCurrentTransactionId (), & DtmSnapshot );
257
282
283
+ XTM_TRACE ("XTM: dtm_get_snapshot \n" );
284
+
258
285
/* Move it to DtmGlobalGetSnapshot? */
259
286
DtmHasSnapshot = true;
260
287
PG_RETURN_VOID ();
0 commit comments