Skip to content

Commit ff9dab5

Browse files
committed
Keep inspection connection alive
1 parent 49a636b commit ff9dab5

File tree

2 files changed

+49
-44
lines changed

2 files changed

+49
-44
lines changed

contrib/pg_xtm/pg_dtm.c

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ static NodeId DtmNodeId;
5353
static DTMConn DtmConn;
5454
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
5555
static bool DtmHasSnapshot = false;
56+
static bool DtmGlobalTransaction = false;
5657
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmTransactionIsInProgress };
5758
static DTMConn DtmConn;
5859

@@ -100,10 +101,12 @@ static void DtmCopySnapshot(Snapshot dst, Snapshot src)
100101
static TransactionId* buf;
101102
TransactionId prev = InvalidTransactionId;
102103

103-
XTM_TRACE("XTM: DtmCopySnapshot \n");
104+
XTM_TRACE("XTM: DtmCopySnapshot for transaction%u\n", GetCurrentTransactionId());
105+
DumpSnapshot(dst, "local");
106+
DumpSnapshot(src, "DTM");
104107

105108
if (buf == NULL) {
106-
buf = (TransactionId *)malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId) * 2);
109+
buf = (TransactionId *)malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId) * 2);
107110
}
108111

109112
GetLocalSnapshotData(dst);
@@ -124,6 +127,7 @@ static void DtmCopySnapshot(Snapshot dst, Snapshot src)
124127
}
125128
}
126129
dst->xcnt = j;
130+
DumpSnapshot(dst, "Merged");
127131
}
128132

129133
static void DtmUpdateRecentXmin(void)
@@ -143,28 +147,26 @@ static void DtmUpdateRecentXmin(void)
143147
if (RecentGlobalXmin > xmin) {
144148
RecentGlobalXmin = xmin;
145149
}
146-
RecentXmin = xmin;
150+
//RecentXmin = xmin;
147151
}
148152
}
149153

150154
static Snapshot DtmGetSnapshot(Snapshot snapshot)
151155
{
152156
XTM_TRACE("XTM: DtmGetSnapshot \n");
153-
154-
if (DtmHasSnapshot) {
157+
snapshot = GetLocalSnapshotData(snapshot);
158+
if (DtmHasSnapshot) {
155159
DtmCopySnapshot(snapshot, &DtmSnapshot);
156-
DtmUpdateRecentXmin();
157-
} else {
158-
snapshot = GetLocalSnapshotData(snapshot);
160+
//DtmUpdateRecentXmin();
159161
}
160162
return snapshot;
161163
}
162164

163165
static bool IsInDtmSnapshot(TransactionId xid)
164166
{
165167
return DtmHasSnapshot
166-
&& (/*xid > DtmSnapshot.xmax
167-
|| */bsearch(&xid, DtmSnapshot.xip, DtmSnapshot.xcnt, sizeof(TransactionId), xidComparator) != NULL);
168+
&& (xid >= DtmSnapshot.xmax
169+
|| bsearch(&xid, DtmSnapshot.xip, DtmSnapshot.xcnt, sizeof(TransactionId), xidComparator) != NULL);
168170
}
169171

170172
static bool DtmTransactionIsInProgress(TransactionId xid)
@@ -183,7 +185,7 @@ static bool DtmTransactionIsInProgress(TransactionId xid)
183185
}
184186
#endif
185187
XTM_TRACE("XTM: DtmTransactionIsInProgress \n");
186-
return TransactionIdIsRunning(xid) && !IsInDtmSnapshot(xid);
188+
return TransactionIdIsRunning(xid);// || IsInDtmSnapshot(xid);
187189
}
188190

189191
static XidStatus DtmGetGloabalTransStatus(TransactionId xid)
@@ -207,8 +209,13 @@ static XidStatus DtmGetGloabalTransStatus(TransactionId xid)
207209

208210
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
209211
{
210-
XTM_TRACE("XTM: DtmGetTransactionStatus \n");
212+
#if 0
213+
if (IsInDtmSnapshot(xid)) {
214+
return TRANSACTION_STATUS_IN_PROGRESS;
215+
}
216+
#endif
211217
XidStatus status = CLOGTransactionIdGetStatus(xid, lsn);
218+
XTM_TRACE("XTM: DtmGetTransactionStatus \n");
212219
#if 0
213220
if (status == TRANSACTION_STATUS_IN_PROGRESS) {
214221
status = DtmGetGloabalTransStatus(xid);
@@ -217,23 +224,23 @@ static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
217224
}
218225
}
219226
#endif
220-
return CLOGTransactionIdGetStatus(xid, lsn);
227+
return status;
221228
}
222229

223230

224231
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
225232
{
226233
XTM_TRACE("XTM: DtmSetTransactionStatus %u = %u \n", xid, status);
227234
if (!RecoveryInProgress()) {
228-
if (DtmHasSnapshot) {
235+
if (DtmGlobalTransaction) {
229236
/* Already should be IN_PROGRESS */
230237
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
231238

232239
DtmHasSnapshot = false;
240+
DtmGlobalTransaction = false;
233241
DtmEnsureConnection();
234242
if (!DtmGlobalSetTransStatus(DtmConn, DtmNodeId, xid, status) && status != TRANSACTION_STATUS_ABORTED) {
235243
elog(ERROR, "DTMD failed to set transaction status");
236-
// elog(WARNING, "DTMD failed to set transaction status");
237244
}
238245
status = DtmGetGloabalTransStatus(xid);
239246
Assert(status == TRANSACTION_STATUS_ABORTED || status == TRANSACTION_STATUS_COMMITTED);
@@ -299,11 +306,12 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
299306
gtid.xids = (TransactionId*)ARR_DATA_PTR(xids);
300307
gtid.nodes = (NodeId*)ARR_DATA_PTR(nodes);
301308
gtid.nNodes = ArrayGetNItems(ARR_NDIM(nodes), ARR_DIMS(nodes));
302-
309+
DtmGlobalTransaction = true;
303310
XTM_TRACE("XTM: dtm_begin_transaction \n");
304-
305-
DtmEnsureConnection();
306-
DtmGlobalStartTransaction(DtmConn, &gtid);
311+
if (DtmNodeId == gtid.nodes[0]) {
312+
DtmEnsureConnection();
313+
DtmGlobalStartTransaction(DtmConn, &gtid);
314+
}
307315
PG_RETURN_VOID();
308316
}
309317

@@ -316,7 +324,9 @@ dtm_get_snapshot(PG_FUNCTION_ARGS)
316324
XTM_TRACE("XTM: dtm_get_snapshot \n");
317325

318326
/* Move it to DtmGlobalGetSnapshot? */
327+
Assert(!DtmHasSnapshot);
319328
DtmHasSnapshot = true;
329+
DtmGlobalTransaction = true;
320330
PG_RETURN_VOID();
321331
}
322332

contrib/pg_xtm/tests/transfers.go

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,11 @@ func prepare_db() {
7474

7575
// register global transaction in DTMD
7676
exec(conn1, "select dtm_begin_transaction($1, $2)", nodes, xids)
77+
exec(conn2, "select dtm_begin_transaction($1, $2)", nodes, xids)
7778

7879
// first global statement
79-
exec(conn1, "select dtm_get_snapshot()")
80-
exec(conn2, "select dtm_get_snapshot()")
80+
// exec(conn1, "select dtm_get_snapshot()")
81+
// exec(conn2, "select dtm_get_snapshot()")
8182

8283
for i := 0; i < N_ACCOUNTS; i++ {
8384
exec(conn1, "insert into t values($1, $2)", i, INIT_AMOUNT)
@@ -122,10 +123,11 @@ func transfer(id int, wg *sync.WaitGroup) {
122123

123124
// register global transaction in DTMD
124125
exec(conn1, "select dtm_begin_transaction($1, $2)", nodes, xids)
126+
exec(conn2, "select dtm_begin_transaction($1, $2)", nodes, xids)
125127

126128
// first global statement
127-
exec(conn1, "select dtm_get_snapshot()")
128-
exec(conn2, "select dtm_get_snapshot()")
129+
// exec(conn1, "select dtm_get_snapshot()")
130+
// exec(conn2, "select dtm_get_snapshot()")
129131

130132
exec(conn1, "update t set v = v + $1 where u=$2", amount, account1)
131133
exec(conn2, "update t set v = v - $1 where u=$2", amount, account2)
@@ -137,21 +139,18 @@ func transfer(id int, wg *sync.WaitGroup) {
137139
wg.Done()
138140
}
139141

140-
func total() int32 {
141-
var err error
142-
var sum1 int32
143-
var sum2 int32
142+
func inspect(wg *sync.WaitGroup) {
143+
var sum1, sum2, sum int32
144+
var prevSum int32 = 0
144145
var xids []int32 = make([]int32, 2)
145146

146-
conn1, err := pgx.Connect(cfg1)
147-
checkErr(err)
148-
defer conn1.Close()
147+
conn1, err := pgx.Connect(cfg1)
148+
checkErr(err)
149149

150-
conn2, err := pgx.Connect(cfg2)
151-
checkErr(err)
152-
defer conn2.Close()
150+
conn2, err := pgx.Connect(cfg2)
151+
checkErr(err)
153152

154-
for {
153+
for running {
155154
exec(conn1, "begin")
156155
exec(conn2, "begin")
157156

@@ -161,6 +160,7 @@ func total() int32 {
161160

162161
// register global transaction in DTMD
163162
exec(conn1, "select dtm_begin_transaction($1, $2)", nodes, xids)
163+
exec(conn2, "select dtm_begin_transaction($1, $2)", nodes, xids)
164164

165165
exec(conn1, "select dtm_get_snapshot()")
166166
exec(conn2, "select dtm_get_snapshot()")
@@ -170,19 +170,14 @@ func total() int32 {
170170

171171
commit(conn1, conn2)
172172

173-
return sum1 + sum2
174-
}
175-
}
176-
177-
func totalrep(wg *sync.WaitGroup) {
178-
var prevSum int32 = 0
179-
for running {
180-
sum := total()
173+
sum = sum1 + sum2
181174
if (sum != prevSum) {
182175
fmt.Println("Total = ", sum)
183176
prevSum = sum
184-
}
177+
}
185178
}
179+
conn1.Close()
180+
conn2.Close()
186181
wg.Done()
187182
}
188183

@@ -198,7 +193,7 @@ func main() {
198193
}
199194
running = true
200195
inspectWg.Add(1)
201-
go totalrep(&inspectWg)
196+
go inspect(&inspectWg)
202197

203198
transferWg.Wait()
204199
running = false

0 commit comments

Comments
 (0)