Skip to content

Commit 316e377

Browse files
committed
Corectly handle rollback of remote transaction
1 parent 39d8402 commit 316e377

File tree

2 files changed

+254
-8
lines changed

2 files changed

+254
-8
lines changed

contrib/pg_xtm/pg_dtm.c

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ static Snapshot CurrentTransactionSnapshot;
8888
static TransactionId DtmNextXid;
8989
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
9090
static bool DtmHasGlobalSnapshot;
91-
static bool DtmIsGlobalTransaction;
91+
static bool DtmGlobalXidAssigned;
9292
static int DtmLocalXidReserve;
9393
static int DtmCurcid;
9494
static Snapshot DtmLastSnapshot;
@@ -329,6 +329,7 @@ DtmGetNewTransactionId(bool isSubXact)
329329
TransactionId xid;
330330

331331
XTM_INFO("%d: GetNewTransactionId\n", getpid());
332+
Assert(!DtmGlobalXidAssigned);
332333

333334
/*
334335
* Workers synchronize transaction state at the beginning of each parallel
@@ -550,6 +551,9 @@ static bool DtmTransactionIdIsInProgress(TransactionId xid)
550551

551552
static Snapshot DtmGetSnapshot(Snapshot snapshot)
552553
{
554+
if (DtmGlobalXidAssigned) {
555+
return GetLocalSnapshotData(snapshot);
556+
}
553557
if (TransactionIdIsValid(DtmNextXid) /*&& IsMVCCSnapshot(snapshot)*/ && snapshot != &CatalogSnapshotData) {
554558
if (!DtmHasGlobalSnapshot && (snapshot != DtmLastSnapshot || DtmCurcid != snapshot->curcid)) {
555559
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot, &dtm->minXid);
@@ -584,7 +588,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
584588
{
585589
XTM_INFO("%d: DtmSetTransactionStatus %u = %u\n", getpid(), xid, status);
586590
if (!RecoveryInProgress()) {
587-
if (!DtmIsGlobalTransaction && TransactionIdIsValid(DtmNextXid)) {
591+
if (!DtmGlobalXidAssigned && TransactionIdIsValid(DtmNextXid)) {
588592
/* Already should be IN_PROGRESS */
589593
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
590594
CurrentTransactionSnapshot = NULL;
@@ -664,15 +668,19 @@ static void
664668
DtmXactCallback(XactEvent event, void *arg)
665669
{
666670
if (event == XACT_EVENT_COMMIT || event == XACT_EVENT_ABORT) {
667-
XTM_INFO("%d: DtmXactCallbackevent=%d isGlobal=%d, nextxid=%d\n", getpid(), event, DtmIsGlobalTransaction, DtmNextXid);
668-
if (DtmIsGlobalTransaction) {
669-
DtmIsGlobalTransaction = false;
671+
XTM_INFO("%d: DtmXactCallbackevent=%d isGlobal=%d, nextxid=%d\n", getpid(), event, DtmGlobalXidAssigned, DtmNextXid);
672+
if (DtmGlobalXidAssigned) {
673+
DtmGlobalXidAssigned = false;
670674
} else if (TransactionIdIsValid(DtmNextXid)) {
671675
if (event == XACT_EVENT_COMMIT) {
672676
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
673677
hash_search(xid_in_doubt, &DtmNextXid, HASH_REMOVE, NULL);
674678
LWLockRelease(dtm->hashLock);
675-
}
679+
} else {
680+
if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny())) {
681+
DtmGlobalSetTransStatus(DtmNextXid, TRANSACTION_STATUS_ABORTED, false);
682+
}
683+
}
676684
DtmNextXid = InvalidTransactionId;
677685
DtmLastSnapshot = NULL;
678686
}
@@ -780,7 +788,7 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
780788
XTM_INFO("%d: Start global transaction %d, dtm->minXid=%d\n", getpid(), DtmNextXid, dtm->minXid);
781789

782790
DtmHasGlobalSnapshot = true;
783-
DtmIsGlobalTransaction = true;
791+
DtmGlobalXidAssigned = true;
784792
DtmLastSnapshot = NULL;
785793

786794
PG_RETURN_INT32(DtmNextXid);
@@ -796,7 +804,7 @@ Datum dtm_join_transaction(PG_FUNCTION_ARGS)
796804
XTM_INFO("%d: Join global transaction %d, dtm->minXid=%d\n", getpid(), DtmNextXid, dtm->minXid);
797805

798806
DtmHasGlobalSnapshot = true;
799-
DtmIsGlobalTransaction = true;
807+
DtmGlobalXidAssigned = true;
800808
DtmLastSnapshot = NULL;
801809

802810
PG_RETURN_VOID();

contrib/pg_xtm/tests/transfers-fdw.go

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"strconv"
7+
"math/rand"
8+
"time"
9+
"github.com/jackc/pgx"
10+
)
11+
12+
const (
13+
TRANSFER_CONNECTIONS = 2
14+
INIT_AMOUNT = 10000
15+
N_ITERATIONS = 10000
16+
N_ACCOUNTS = TRANSFER_CONNECTIONS//100000
17+
ISOLATION_LEVEL = "repeatable read"
18+
//ISOLATION_LEVEL = "read committed"
19+
)
20+
21+
22+
var cfg1 = pgx.ConnConfig{
23+
Host: "127.0.0.1",
24+
Port: 5432,
25+
Database: "postgres",
26+
}
27+
28+
var cfg2 = pgx.ConnConfig{
29+
Host: "127.0.0.1",
30+
Port: 5433,
31+
Database: "postgres",
32+
}
33+
34+
35+
var running = false
36+
37+
func prepare_db() {
38+
// var xid int32
39+
40+
conn1, err := pgx.Connect(cfg1)
41+
checkErr(err)
42+
defer conn1.Close()
43+
44+
conn2, err := pgx.Connect(cfg2)
45+
checkErr(err)
46+
defer conn2.Close()
47+
48+
exec(conn1, "drop extension if exists pg_dtm")
49+
exec(conn1, "create extension pg_dtm")
50+
exec(conn1, "drop table if exists t")
51+
exec(conn1, "create table t(u int primary key, v int)")
52+
53+
exec(conn2, "drop extension if exists pg_dtm")
54+
exec(conn2, "create extension pg_dtm")
55+
exec(conn2, "drop table if exists t")
56+
exec(conn2, "create table t(u int primary key, v int)")
57+
58+
exec(conn1, "CREATE EXTENSION postgres_fdw");
59+
exec(conn1, "CREATE SERVER dtm FOREIGN DATA WRAPPER postgres_fdw options (dbname 'postgres', host '127.0.0.1', port '5433')");
60+
exec(conn1, "CREATE FOREIGN TABLE t_fdw() inherits (t) server dtm options(table_name 't')");
61+
exec(conn1, "CREATE USER MAPPING for knizhnik SERVER dtm options (user 'knizhnik')");
62+
63+
// start transaction
64+
exec(conn1, "begin transaction isolation level " + ISOLATION_LEVEL)
65+
exec(conn2, "begin transaction isolation level " + ISOLATION_LEVEL)
66+
67+
for i := 0; i < N_ACCOUNTS; i++ {
68+
exec(conn1, "insert into t values($1, $2)", i, INIT_AMOUNT)
69+
exec(conn2, "insert into t values($1, $2)", -i, INIT_AMOUNT)
70+
}
71+
72+
exec(conn1, "commit")
73+
exec(conn2, "commit")
74+
}
75+
76+
func progress(total int, cCommits chan int, cAborts chan int) {
77+
commits := 0
78+
aborts := 0
79+
start := time.Now()
80+
for newcommits := range cCommits {
81+
newaborts := <-cAborts
82+
commits += newcommits
83+
aborts += newaborts
84+
if time.Since(start).Seconds() > 10 {
85+
fmt.Printf(
86+
"progress %0.2f%%: %d commits, %d aborts\n",
87+
float32(commits) * 100.0 / float32(total), commits, aborts,
88+
)
89+
start = time.Now()
90+
}
91+
}
92+
}
93+
94+
func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
95+
var err error
96+
var xid int32
97+
var nAborts = 0
98+
var nCommits = 0
99+
var myCommits = 0
100+
101+
conn, err := pgx.Connect(cfg1)
102+
checkErr(err)
103+
defer conn.Close()
104+
105+
start := time.Now()
106+
for myCommits < N_ITERATIONS {
107+
amount := 1
108+
account1 := rand.Intn(N_ACCOUNTS)
109+
account2 := -rand.Intn(N_ACCOUNTS)
110+
111+
exec(conn, "begin")
112+
xid = execQuery(conn, "select dtm_begin_transaction(2)")
113+
exec(conn, "select postgres_fdw_exec('t_fdw'::regclass::oid, 'select public.dtm_join_transaction(" + strconv.Itoa(int(xid)) + ")')")
114+
exec(conn, "commit")
115+
116+
exec(conn, "begin transaction isolation level " + ISOLATION_LEVEL)
117+
118+
ok1 := execUpdate(conn, "update t set v = v - $1 where u=$2", amount, account1)
119+
ok2 := execUpdate(conn, "update t set v = v + $1 where u=$2", amount, account2)
120+
121+
if !ok1 || !ok2 {
122+
exec(conn, "rollback")
123+
nAborts += 1
124+
} else {
125+
exec(conn, "commit")
126+
nCommits += 1
127+
myCommits += 1
128+
}
129+
130+
if time.Since(start).Seconds() > 10 {
131+
cCommits <- nCommits
132+
cAborts <- nAborts
133+
nCommits = 0
134+
nAborts = 0
135+
start = time.Now()
136+
}
137+
}
138+
cCommits <- nCommits
139+
cAborts <- nAborts
140+
wg.Done()
141+
}
142+
143+
func inspect(wg *sync.WaitGroup) {
144+
var sum int64
145+
var prevSum int64 = 0
146+
var xid int32
147+
148+
{
149+
conn, err := pgx.Connect(cfg1)
150+
checkErr(err)
151+
152+
for running {
153+
exec(conn, "begin")
154+
xid = execQuery(conn, "select dtm_begin_transaction(2)")
155+
exec(conn, "select postgres_fdw_exec('t_fdw'::regclass::oid, 'select public.dtm_join_transaction(" + strconv.Itoa(int(xid)) + ")')")
156+
exec(conn, "commit")
157+
158+
exec(conn, "begin transaction isolation level " + ISOLATION_LEVEL)
159+
160+
sum = execQuery64(conn, "select sum(v) from t")
161+
162+
if (sum != prevSum) {
163+
fmt.Printf("Total=%d xid=%d\n", sum, xid)
164+
prevSum = sum
165+
}
166+
167+
exec(conn, "commit")
168+
}
169+
conn.Close()
170+
}
171+
wg.Done()
172+
}
173+
174+
func main() {
175+
var transferWg sync.WaitGroup
176+
var inspectWg sync.WaitGroup
177+
178+
prepare_db()
179+
180+
cCommits := make(chan int)
181+
cAborts := make(chan int)
182+
go progress(TRANSFER_CONNECTIONS * N_ITERATIONS, cCommits, cAborts)
183+
184+
transferWg.Add(TRANSFER_CONNECTIONS)
185+
for i:=0; i<TRANSFER_CONNECTIONS; i++ {
186+
go transfer(i, cCommits, cAborts, &transferWg)
187+
}
188+
running = true
189+
inspectWg.Add(1)
190+
go inspect(&inspectWg)
191+
192+
transferWg.Wait()
193+
running = false
194+
inspectWg.Wait()
195+
196+
fmt.Printf("done\n")
197+
}
198+
199+
func exec(conn *pgx.Conn, stmt string, arguments ...interface{}) {
200+
var err error
201+
// fmt.Println(stmt)
202+
_, err = conn.Exec(stmt, arguments... )
203+
checkErr(err)
204+
}
205+
206+
func execUpdate(conn *pgx.Conn, stmt string, arguments ...interface{}) bool {
207+
var err error
208+
// fmt.Println(stmt)
209+
_, err = conn.Exec(stmt, arguments... )
210+
//if err != nil {
211+
// fmt.Println(err)
212+
//}
213+
return err == nil
214+
}
215+
216+
func execQuery(conn *pgx.Conn, stmt string, arguments ...interface{}) int32 {
217+
var err error
218+
var result int32
219+
err = conn.QueryRow(stmt, arguments...).Scan(&result)
220+
checkErr(err)
221+
return result
222+
}
223+
224+
func execQuery64(conn *pgx.Conn, stmt string, arguments ...interface{}) int64 {
225+
var err error
226+
var result int64
227+
err = conn.QueryRow(stmt, arguments...).Scan(&result)
228+
checkErr(err)
229+
return result
230+
}
231+
232+
func checkErr(err error) {
233+
if err != nil {
234+
panic(err)
235+
}
236+
}
237+
238+
// vim: expandtab ts=4 sts=4 sw=4

0 commit comments

Comments
 (0)