Skip to content

Commit 6f3d573

Browse files
committed
Remote dtm_get_snapshot
1 parent fcd31f3 commit 6f3d573

File tree

2 files changed

+7
-30
lines changed

2 files changed

+7
-30
lines changed

contrib/pg_xtm/pg_dtm.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,11 @@ static void DtmUpdateRecentXmin(void)
192192
static Snapshot DtmGetSnapshot(Snapshot snapshot)
193193
{
194194
XTM_TRACE("XTM: DtmGetSnapshot \n");
195+
if (DtmGlobalTransaction && !DtmHasSnapshot) {
196+
DtmHasSnapshot = true;
197+
DtmEnsureConnection();
198+
DtmGlobalGetSnapshot(DtmConn, DtmNodeId, GetCurrentTransactionId(), &DtmSnapshot);
199+
}
195200
snapshot = GetLocalSnapshotData(snapshot);
196201
if (DtmHasSnapshot) {
197202
DtmCopySnapshot(snapshot, &DtmSnapshot);
@@ -204,7 +209,7 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
204209
static bool DtmTransactionIsInProgress(TransactionId xid)
205210
{
206211
XTM_TRACE("XTM: DtmTransactionIsInProgress \n");
207-
return TransactionIdIsRunning(xid);
212+
return TransactionIdIsRunning(xid);// || (DtmHasSnapshot && TransactionIdIsInDtmSnapshot(&DtmSnapshot, xid));
208213
}
209214

210215
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)

contrib/pg_xtm/tests/transfers.go

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,6 @@ func prepare_db() {
7676
exec(conn1, "select dtm_begin_transaction($1, $2)", nodes, xids)
7777
exec(conn2, "select dtm_begin_transaction($1, $2)", nodes, xids)
7878

79-
// first global statement
80-
exec(conn1, "select dtm_get_snapshot()")
81-
exec(conn2, "select dtm_get_snapshot()")
82-
8379
for i := 0; i < N_ACCOUNTS; i++ {
8480
exec(conn1, "insert into t values($1, $2)", i, INIT_AMOUNT)
8581
exec(conn2, "insert into t values($1, $2)", i, INIT_AMOUNT)
@@ -97,7 +93,7 @@ func max(a, b int64) int64 {
9793

9894
func transfer(id int, wg *sync.WaitGroup) {
9995
var err error
100-
var sum1, sum2, sum int32
96+
// var sum1, sum2, sum int32
10197
var xids []int32 = make([]int32, 2)
10298

10399
conn1, err := pgx.Connect(cfg1)
@@ -125,31 +121,10 @@ func transfer(id int, wg *sync.WaitGroup) {
125121
// register global transaction in DTMD
126122
exec(conn1, "select dtm_begin_transaction($1, $2)", nodes, xids)
127123
exec(conn2, "select dtm_begin_transaction($1, $2)", nodes, xids)
128-
129-
// first global statement
130-
exec(conn1, "select dtm_get_snapshot()")
131-
exec(conn2, "select dtm_get_snapshot()")
132-
133-
sum1 = execQuery(conn1, "select sum(v) from t")
134-
sum2 = execQuery(conn2, "select sum(v) from t")
135-
sum = sum1 + sum2
136-
137-
exec(conn1, "select dtm_get_snapshot()")
138-
exec(conn2, "select dtm_get_snapshot()")
139124

140125
exec(conn1, "update t set v = v + $1 where u=$2", amount, account1)
141126
exec(conn2, "update t set v = v - $1 where u=$2", amount, account2)
142-
143-
exec(conn1, "select dtm_get_snapshot()")
144-
exec(conn2, "select dtm_get_snapshot()")
145127

146-
sum1 = execQuery(conn1, "select sum(v) from t")
147-
sum2 = execQuery(conn2, "select sum(v) from t")
148-
149-
if (sum1 + sum2 != sum) {
150-
fmt.Println("Before = ", sum, ", after=", sum1 + sum2, ", xids=", xids)
151-
}
152-
153128
commit(conn1, conn2)
154129
}
155130

@@ -180,9 +155,6 @@ func inspect(wg *sync.WaitGroup) {
180155
exec(conn1, "select dtm_begin_transaction($1, $2)", nodes, xids)
181156
exec(conn2, "select dtm_begin_transaction($1, $2)", nodes, xids)
182157

183-
exec(conn1, "select dtm_get_snapshot()")
184-
exec(conn2, "select dtm_get_snapshot()")
185-
186158
sum1 = execQuery(conn1, "select sum(v) from t")
187159
sum2 = execQuery(conn2, "select sum(v) from t")
188160

0 commit comments

Comments
 (0)