Skip to content

Commit 65c7884

Browse files
committed
Add support of single node global transactions in transfers test.
1 parent 7e25ccd commit 65c7884

File tree

1 file changed

+60
-15
lines changed

1 file changed

+60
-15
lines changed

contrib/pg_dtm/tests/transfers.go

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -299,22 +299,67 @@ func writer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
299299
dst := conns[rand.Intn(len(conns))]
300300

301301
if src == dst {
302-
// local update
303-
if !cfg.Writers.AllowLocal {
304-
// which we do not want
305-
continue
306-
}
307-
308-
exec(src, "begin transaction isolation level " + cfg.Isolation)
309-
ok1 := execUpdate(src, "update t set v = v - $1 where u=$2", amount, from_acc)
310-
ok2 := execUpdate(src, "update t set v = v + $1 where u=$2", amount, to_acc)
311-
if !ok1 || !ok2 {
312-
exec(src, "rollback")
313-
nAborts += 1
302+
if cfg.Writers.AllowLocal {
303+
// local update
304+
exec(src, "begin transaction isolation level " + cfg.Isolation)
305+
ok1 := execUpdate(src, "update t set v = v - $1 where u=$2", amount, from_acc)
306+
ok2 := execUpdate(src, "update t set v = v + $1 where u=$2", amount, to_acc)
307+
if !ok1 || !ok2 {
308+
exec(src, "rollback")
309+
nAborts += 1
310+
} else {
311+
exec(src, "commit")
312+
nCommits += 1
313+
myCommits += 1
314+
}
314315
} else {
315-
exec(src, "commit")
316-
nCommits += 1
317-
myCommits += 1
316+
if len(conns) > 1 {
317+
continue
318+
}
319+
320+
// global single-node update
321+
execQuery(src, "select dtm_begin_transaction()")
322+
323+
// start transaction
324+
exec(src, "begin transaction isolation level " + cfg.Isolation)
325+
326+
ok := true
327+
if (cfg.Writers.UseCursors) {
328+
exec(
329+
src,
330+
"declare cur0 cursor for select * from t where u=$1 for update",
331+
from_acc,
332+
)
333+
334+
ok = execUpdate(src, "fetch from cur0") && ok
335+
336+
ok = execUpdate(
337+
src, "update t set v = v - $1 where current of cur0",
338+
amount,
339+
) && ok
340+
ok = execUpdate(
341+
src, "update t set v = v + $1 where current of cur0",
342+
amount,
343+
) && ok
344+
} else {
345+
ok = execUpdate(
346+
src, "update t set v = v - $1 where u=$2",
347+
amount, from_acc,
348+
) && ok
349+
ok = execUpdate(
350+
src, "update t set v = v + $1 where u=$2",
351+
amount, to_acc,
352+
) && ok
353+
}
354+
355+
if ok {
356+
commit(src)
357+
nCommits += 1
358+
myCommits += 1
359+
} else {
360+
exec(src, "rollback")
361+
nAborts += 1
362+
}
318363
}
319364
} else {
320365
// global update

0 commit comments

Comments
 (0)