Skip to content

Commit 8ed9197

Browse files
committed
Add active snapshot
2 parents 99affcb + eeb95fe commit 8ed9197

File tree

2 files changed

+67
-26
lines changed

2 files changed

+67
-26
lines changed

contrib/pg_xtm/dtmd/src/main.c

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -227,17 +227,17 @@ static void gen_snapshot(Transaction *t) {
227227

228228
static xid_t get_global_xmin() {
229229
int i, j;
230-
xid_t xmin = MAX_XID;
230+
xid_t xmin = INVALID_XID;
231231
Transaction *t;
232232
for (i = 0; i < transactions_count; i++) {
233233
t = transactions + i;
234-
j = t->snapshots_count > MAX_SNAPSHOTS_PER_TRANS ? MAX_SNAPSHOTS_PER_TRANS : t->snapshots_count;
235-
while (--j >= 0) {
236-
Snapshot* s = transaction_snapshot(t, j);
237-
if (s->xmin < xmin) {
238-
xmin = s->xmin;
239-
}
240-
// minor TODO: Use 'times_sent' to generate a bit greater xmin?
234+
j = t->snapshots_count > MAX_SNAPSHOTS_PER_TRANS ? MAX_SNAPSHOTS_PER_TRANS : t->snapshots_count;
235+
while (--j >= 0) {
236+
Snapshot* s = transaction_snapshot(t, j);
237+
if ((xmin == INVALID_XID) || (s->xmin < xmin)) {
238+
xmin = s->xmin;
239+
}
240+
// minor TODO: Use 'times_sent' to generate a bit greater xmin?
241241
}
242242
}
243243
return xmin;

contrib/pg_xtm/tests/transfers.go

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ const (
1515
N_ACCOUNTS = TRANSFER_CONNECTIONS//100000
1616
//ISOLATION_LEVEL = "repeatable read"
1717
ISOLATION_LEVEL = "read committed"
18+
GLOBAL_UPDATES = true
19+
LOCAL_UPDATES = false
1820
)
1921

2022

@@ -115,13 +117,15 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
115117
var nCommits = 0
116118
var myCommits = 0
117119

118-
conn1, err := pgx.Connect(cfg1)
120+
var conn [2]*pgx.Conn
121+
122+
conn[0], err = pgx.Connect(cfg1)
119123
checkErr(err)
120-
defer conn1.Close()
124+
defer conn[0].Close()
121125

122-
conn2, err := pgx.Connect(cfg2)
126+
conn[1], err = pgx.Connect(cfg2)
123127
checkErr(err)
124-
defer conn2.Close()
128+
defer conn[1].Close()
125129

126130
start := time.Now()
127131
for myCommits < N_ITERATIONS {
@@ -130,23 +134,57 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
130134
account1 := rand.Intn(N_ACCOUNTS)
131135
account2 := rand.Intn(N_ACCOUNTS)
132136

133-
xid = execQuery(conn1, "select dtm_begin_transaction(2)")
134-
exec(conn2, "select dtm_join_transaction($1)", xid)
137+
if (account1 >= account2) {
138+
continue
139+
}
140+
141+
src := conn[rand.Intn(2)]
142+
dst := conn[rand.Intn(2)]
135143

136-
// start transaction
137-
exec(conn1, "begin transaction isolation level " + ISOLATION_LEVEL)
138-
exec(conn2, "begin transaction isolation level " + ISOLATION_LEVEL)
144+
if src == dst {
145+
// local update
146+
if !LOCAL_UPDATES {
147+
// which we do not want
148+
continue
149+
}
139150

140-
ok1 := execUpdate(conn1, "update t set v = v + $1 where u=$2", amount, account1)
141-
ok2 := execUpdate(conn2, "update t set v = v - $1 where u=$2", amount, account2)
142-
if !ok1 || !ok2 {
143-
exec(conn1, "rollback")
144-
exec(conn2, "rollback")
145-
nAborts += 1
151+
exec(src, "begin transaction isolation level " + ISOLATION_LEVEL)
152+
ok1 := execUpdate(src, "update t set v = v - $1 where u=$2", amount, account1)
153+
ok2 := execUpdate(src, "update t set v = v + $1 where u=$2", amount, account2)
154+
if !ok1 || !ok2 {
155+
exec(src, "rollback")
156+
nAborts += 1
157+
} else {
158+
exec(src, "commit")
159+
nCommits += 1
160+
myCommits += 1
161+
}
146162
} else {
147-
commit(conn1, conn2)
148-
nCommits += 1
149-
myCommits += 1
163+
// global update
164+
if !GLOBAL_UPDATES {
165+
// which we do not want
166+
continue
167+
}
168+
169+
xid = execQuery(src, "select dtm_begin_transaction(2)")
170+
exec(dst, "select dtm_join_transaction($1)", xid)
171+
172+
// start transaction
173+
exec(src, "begin transaction isolation level " + ISOLATION_LEVEL)
174+
exec(dst, "begin transaction isolation level " + ISOLATION_LEVEL)
175+
176+
ok1 := execUpdate(src, "update t set v = v - $1 where u=$2", amount, account1)
177+
ok2 := execUpdate(dst, "update t set v = v + $1 where u=$2", amount, account2)
178+
179+
if !ok1 || !ok2 {
180+
exec(src, "rollback")
181+
exec(dst, "rollback")
182+
nAborts += 1
183+
} else {
184+
commit(src, dst)
185+
nCommits += 1
186+
myCommits += 1
187+
}
150188
}
151189

152190
if time.Since(start).Seconds() > 1 {
@@ -241,6 +279,9 @@ func execUpdate(conn *pgx.Conn, stmt string, arguments ...interface{}) bool {
241279
var err error
242280
// fmt.Println(stmt)
243281
_, err = conn.Exec(stmt, arguments... )
282+
if err != nil {
283+
fmt.Println(err)
284+
}
244285
return err == nil
245286
}
246287

0 commit comments

Comments
 (0)