Skip to content

Commit cfe0454

Browse files
committed
Use repeatable read isolation level
1 parent 8a7dfb1 commit cfe0454

File tree

2 files changed

+40
-24
lines changed

2 files changed

+40
-24
lines changed

contrib/pg_xtm/pg_dtm.c

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -209,16 +209,20 @@ static void DtmUpdateRecentXmin(void)
209209

210210
static Snapshot DtmGetSnapshot(Snapshot snapshot)
211211
{
212-
XTM_TRACE("XTM: DtmGetSnapshot \n");
213-
if (DtmGlobalTransaction/* && !DtmHasSnapshot*/) {
214-
DtmHasSnapshot = true;
215-
DtmEnsureConnection();
216-
DtmGlobalGetSnapshot(DtmConn, DtmNodeId, GetCurrentTransactionId(), &DtmSnapshot);
217-
}
218-
snapshot = GetLocalSnapshotData(snapshot);
219-
if (DtmHasSnapshot) {
220-
DtmCopySnapshot(snapshot, &DtmSnapshot);
221-
DtmUpdateRecentXmin();
212+
if (!IsMVCCSnapshot(snapshot) || snapshot == &CatalogSnapshotData) {
213+
snapshot = GetLocalSnapshotData(snapshot);
214+
} else {
215+
XTM_TRACE("XTM: DtmGetSnapshot \n");
216+
if (DtmGlobalTransaction/* && !DtmHasSnapshot*/) {
217+
DtmHasSnapshot = true;
218+
DtmEnsureConnection();
219+
DtmGlobalGetSnapshot(DtmConn, DtmNodeId, GetCurrentTransactionId(), &DtmSnapshot);
220+
}
221+
snapshot = GetLocalSnapshotData(snapshot);
222+
if (DtmHasSnapshot) {
223+
DtmCopySnapshot(snapshot, &DtmSnapshot);
224+
DtmUpdateRecentXmin();
225+
}
222226
}
223227
return snapshot;
224228
}

contrib/pg_xtm/tests/transfers.go

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package main
33
import (
44
"fmt"
55
"sync"
6-
"math/rand"
6+
// "math/rand"
77
"github.com/jackc/pgx"
88
)
99

@@ -94,6 +94,7 @@ func max(a, b int64) int64 {
9494
func transfer(id int, wg *sync.WaitGroup) {
9595
var err error
9696
var xids []int32 = make([]int32, 2)
97+
var nConflicts = 0
9798

9899
conn1, err := pgx.Connect(cfg1)
99100
checkErr(err)
@@ -106,12 +107,12 @@ func transfer(id int, wg *sync.WaitGroup) {
106107
for i := 0; i < N_ITERATIONS; i++ {
107108
//amount := 2*rand.Intn(2) - 1
108109
amount := 1
109-
account1 := rand.Intn(N_ACCOUNTS)
110-
account2 := rand.Intn(N_ACCOUNTS)
110+
account1 := id//rand.Intn(N_ACCOUNTS)
111+
account2 := id//rand.Intn(N_ACCOUNTS)
111112

112113
// strt transaction
113-
exec(conn1, "begin")
114-
exec(conn2, "begin")
114+
exec(conn1, "begin transaction isolation level repeatable read")
115+
exec(conn2, "begin transaction isolation level repeatable read")
115116

116117
// obtain XIDs of paticipants
117118
xids[0] = execQuery(conn1, "select txid_current()")
@@ -121,13 +122,17 @@ func transfer(id int, wg *sync.WaitGroup) {
121122
exec(conn1, "select dtm_begin_transaction($1, $2)", nodes, xids)
122123
exec(conn2, "select dtm_begin_transaction($1, $2)", nodes, xids)
123124

124-
exec(conn1, "update t set v = v + $1 where u=$2", amount, account1)
125-
exec(conn2, "update t set v = v - $1 where u=$2", amount, account2)
126-
127-
commit(conn1, conn2)
125+
if !execUpdate(conn1, "update t set v = v + $1 where u=$2", amount, account1) ||
126+
!execUpdate(conn2, "update t set v = v - $1 where u=$2", amount, account2) {
127+
exec(conn1, "rollback")
128+
exec(conn2, "rollback")
129+
nConflicts += 1
130+
i -= 1
131+
} else {
132+
commit(conn1, conn2)
133+
}
128134
}
129-
130-
fmt.Println("Test completed")
135+
fmt.Println("Test completed with ",nConflicts," conflicts")
131136
wg.Done()
132137
}
133138

@@ -143,8 +148,8 @@ func inspect(wg *sync.WaitGroup) {
143148
conn2, err := pgx.Connect(cfg2)
144149
checkErr(err)
145150

146-
exec(conn1, "begin")
147-
exec(conn2, "begin")
151+
exec(conn1, "begin transaction isolation level repeatable read")
152+
exec(conn2, "begin transaction isolation level repeatable read")
148153

149154
// obtain XIDs of paticipants
150155
xids[0] = execQuery(conn1, "select txid_current()")
@@ -192,10 +197,17 @@ func main() {
192197
func exec(conn *pgx.Conn, stmt string, arguments ...interface{}) {
193198
var err error
194199
// fmt.Println(stmt)
195-
_, _ = conn.Exec(stmt, arguments... )
200+
_, err = conn.Exec(stmt, arguments... )
196201
checkErr(err)
197202
}
198203

204+
func execUpdate(conn *pgx.Conn, stmt string, arguments ...interface{}) bool {
205+
var err error
206+
// fmt.Println(stmt)
207+
_, err = conn.Exec(stmt, arguments... )
208+
return err == nil
209+
}
210+
199211
func execQuery(conn *pgx.Conn, stmt string, arguments ...interface{}) int32 {
200212
var err error
201213
var result int64

0 commit comments

Comments
 (0)