Skip to content

Commit 660f187

Browse files
committed
Merge branch 'xtm' of gitlab.postgrespro.ru:pgpro-dev/postgrespro into xtm
2 parents b1cd83c + 78dd16d commit 660f187

File tree

4 files changed

+107
-53
lines changed

4 files changed

+107
-53
lines changed

contrib/pg_xtm/pg_dtm.c

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,14 @@ static TransactionId DtmMinXid;
8686
static bool DtmHasGlobalSnapshot;
8787
static bool DtmIsGlobalTransaction;
8888
static int DtmLocalXidReserve;
89+
static int DtmCurcid;
90+
static Snapshot DtmLastSnapshot;
8991
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmGetNewTransactionId, DtmGetOldestXmin };
9092

9193

9294
#define XTM_TRACE(fmt, ...)
93-
//#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
94-
#define XTM_INFO(fmt, ...)
95+
#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
96+
//#define XTM_INFO(fmt, ...)
9597

9698
static void DumpSnapshot(Snapshot s, char *name)
9799
{
@@ -100,8 +102,8 @@ static void DumpSnapshot(Snapshot s, char *name)
100102
char *cursor = buf;
101103
cursor += sprintf(
102104
cursor,
103-
"snapshot %s for transaction %d: xmin=%d, xmax=%d, active=[",
104-
name, GetCurrentTransactionId(), s->xmin, s->xmax
105+
"snapshot %s(%p) for transaction %d: xmin=%d, xmax=%d, active=[",
106+
name, s, GetCurrentTransactionId(), s->xmin, s->xmax
105107
);
106108
for (i = 0; i < s->xcnt; i++) {
107109
if (i == 0) {
@@ -480,10 +482,12 @@ DtmGetNewTransactionId(bool isSubXact)
480482

481483
static Snapshot DtmGetSnapshot(Snapshot snapshot)
482484
{
483-
if (TransactionIdIsValid(DtmNextXid)) {
484-
if (!DtmHasGlobalSnapshot) {
485+
if (TransactionIdIsValid(DtmNextXid) && IsMVCCSnapshot(snapshot) && snapshot != &CatalogSnapshotData) {
486+
if (!DtmHasGlobalSnapshot && (snapshot != DtmLastSnapshot || DtmCurcid != snapshot->curcid)) {
485487
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot, &DtmMinXid);
486488
}
489+
DtmCurcid = snapshot->curcid;
490+
DtmLastSnapshot = snapshot;
487491
DtmMergeSnapshots(snapshot, &DtmSnapshot);
488492
if (!IsolationUsesXactSnapshot()) {
489493
DtmHasGlobalSnapshot = false;
@@ -595,6 +599,7 @@ DtmXactCallback(XactEvent event, void *arg)
595599
LWLockRelease(dtm->hashLock);
596600
}
597601
DtmNextXid = InvalidTransactionId;
602+
DtmLastSnapshot = NULL;
598603
}
599604
}
600605
}
@@ -701,6 +706,7 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
701706

702707
DtmHasGlobalSnapshot = true;
703708
DtmIsGlobalTransaction = true;
709+
DtmLastSnapshot = NULL;
704710

705711
PG_RETURN_INT32(DtmNextXid);
706712
}
@@ -716,6 +722,7 @@ Datum dtm_join_transaction(PG_FUNCTION_ARGS)
716722

717723
DtmHasGlobalSnapshot = true;
718724
DtmIsGlobalTransaction = true;
725+
DtmLastSnapshot = NULL;
719726

720727
PG_RETURN_VOID();
721728
}

contrib/pg_xtm/tests/daemons.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ func postgres(bin string, datadir string, port int, nodeid int, wg *sync.WaitGro
9191
"-D", datadir,
9292
"-p", strconv.Itoa(port),
9393
"-c", "dtm.node_id=" + strconv.Itoa(nodeid),
94+
"-c", "autovacuum=off",
95+
"-c", "fsync=off",
96+
"-c", "synchronous_commit=off",
9497
}
9598
name := "postgres " + datadir
9699
c := make(chan string)

contrib/pg_xtm/tests/transfers.go

Lines changed: 89 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"sync"
66
"math/rand"
7+
"time"
78
"github.com/jackc/pgx"
89
)
910

@@ -34,10 +35,10 @@ var running = false
3435
var nodes []int32 = []int32{0,1}
3536

3637
func asyncCommit(conn *pgx.Conn, wg *sync.WaitGroup) {
37-
exec(conn, "commit")
38+
exec(conn, "commit")
3839
wg.Done()
3940
}
40-
41+
4142
func commit(conn1, conn2 *pgx.Conn) {
4243
var wg sync.WaitGroup
4344
wg.Add(2)
@@ -66,33 +67,53 @@ func prepare_db() {
6667
exec(conn2, "create extension pg_dtm")
6768
exec(conn2, "drop table if exists t")
6869
exec(conn2, "create table t(u int primary key, v int)")
69-
70+
7071
// xid = execQuery(conn1, "select dtm_begin_transaction(2)")
7172
// exec(conn2, "select dtm_join_transaction($1)", xid)
7273

7374
// strt transaction
7475
exec(conn1, "begin transaction isolation level " + ISOLATION_LEVEL)
7576
exec(conn2, "begin transaction isolation level " + ISOLATION_LEVEL)
76-
77+
7778
for i := 0; i < N_ACCOUNTS; i++ {
7879
exec(conn1, "insert into t values($1, $2)", i, INIT_AMOUNT)
7980
exec(conn2, "insert into t values($1, $2)", i, INIT_AMOUNT)
8081
}
81-
82+
8283
commit(conn1, conn2)
8384
}
8485

8586
func max(a, b int64) int64 {
8687
if a >= b {
8788
return a
88-
}
89+
}
8990
return b
9091
}
9192

92-
func transfer(id int, wg *sync.WaitGroup) {
93+
func progress(total int, cCommits chan int, cAborts chan int) {
94+
commits := 0
95+
aborts := 0
96+
start := time.Now()
97+
for newcommits := range cCommits {
98+
newaborts := <-cAborts
99+
commits += newcommits
100+
aborts += newaborts
101+
if time.Since(start).Seconds() > 1 {
102+
fmt.Printf(
103+
"progress %0.2f%%: %d commits, %d aborts\n",
104+
float32(commits) * 100.0 / float32(total), commits, aborts,
105+
)
106+
start = time.Now()
107+
}
108+
}
109+
}
110+
111+
func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
93112
var err error
94113
var xid int32
95-
var nConflicts = 0
114+
var nAborts = 0
115+
var nCommits = 0
116+
var myCommits = 0
96117

97118
conn1, err := pgx.Connect(cfg1)
98119
checkErr(err)
@@ -102,10 +123,11 @@ func transfer(id int, wg *sync.WaitGroup) {
102123
checkErr(err)
103124
defer conn2.Close()
104125

105-
for i := 0; i < N_ITERATIONS; i++ {
106-
//amount := 2*rand.Intn(2) - 1
107-
amount := 1
108-
account1 := rand.Intn(N_ACCOUNTS)
126+
start := time.Now()
127+
for myCommits < N_ITERATIONS {
128+
amount := 2*rand.Intn(2000) - 1
129+
//amount := 1
130+
account1 := rand.Intn(N_ACCOUNTS)
109131
account2 := rand.Intn(N_ACCOUNTS)
110132

111133
xid = execQuery(conn1, "select dtm_begin_transaction(2)")
@@ -114,25 +136,35 @@ func transfer(id int, wg *sync.WaitGroup) {
114136
// start transaction
115137
exec(conn1, "begin transaction isolation level " + ISOLATION_LEVEL)
116138
exec(conn2, "begin transaction isolation level " + ISOLATION_LEVEL)
117-
118-
ok1 := execUpdate(conn1, "update t set v = v + $1 where u=$2", amount, account1)
119-
ok2 := execUpdate(conn2, "update t set v = v - $1 where u=$2", amount, account2)
120-
if !ok1 || !ok2 {
139+
140+
ok1 := execUpdate(conn1, "update t set v = v + $1 where u=$2", amount, account1)
141+
ok2 := execUpdate(conn2, "update t set v = v - $1 where u=$2", amount, account2)
142+
if !ok1 || !ok2 {
121143
exec(conn1, "rollback")
122144
exec(conn2, "rollback")
123-
nConflicts += 1
124-
i -= 1
125-
} else {
145+
nAborts += 1
146+
} else {
126147
commit(conn1, conn2)
127-
}
148+
nCommits += 1
149+
myCommits += 1
150+
}
151+
152+
if time.Since(start).Seconds() > 1 {
153+
cCommits <- nCommits
154+
cAborts <- nAborts
155+
nCommits = 0
156+
nAborts = 0
157+
start = time.Now()
158+
}
128159
}
129-
fmt.Println("Test completed with ",nConflicts," conflicts")
160+
cCommits <- nCommits
161+
cAborts <- nAborts
130162
wg.Done()
131163
}
132164

133165
func inspect(wg *sync.WaitGroup) {
134166
var sum1, sum2, sum int64
135-
var prevSum int64 = 0
167+
var prevSum int64 = 0
136168
var xid int32
137169

138170
{
@@ -142,28 +174,33 @@ func inspect(wg *sync.WaitGroup) {
142174
conn2, err := pgx.Connect(cfg2)
143175
checkErr(err)
144176

145-
for running {
177+
for running {
178+
xid = execQuery(conn1, "select dtm_begin_transaction(2)")
179+
exec(conn2, "select dtm_join_transaction($1)", xid)
180+
181+
exec(conn1, "begin transaction isolation level " + ISOLATION_LEVEL)
182+
exec(conn2, "begin transaction isolation level " + ISOLATION_LEVEL)
183+
184+
sum1 = execQuery64(conn1, "select sum(v) from t")
185+
sum2 = execQuery64(conn2, "select sum(v) from t")
186+
187+
sum = sum1 + sum2
188+
if (sum != prevSum) {
189+
xmin1 := execQuery(conn1, "select dtm_get_current_snapshot_xmin()")
190+
xmax1 := execQuery(conn1, "select dtm_get_current_snapshot_xmax()")
191+
xmin2 := execQuery(conn2, "select dtm_get_current_snapshot_xmin()")
192+
xmax2 := execQuery(conn2, "select dtm_get_current_snapshot_xmax()")
193+
fmt.Printf(
194+
"Total=%d xid=%d snap1=[%d, %d) snap2=[%d, %d)\n",
195+
sum, xid, xmin1, xmax1, xmin2, xmax2,
196+
)
197+
prevSum = sum
198+
}
146199

147-
148-
xid = execQuery(conn1, "select dtm_begin_transaction(2)")
149-
exec(conn2, "select dtm_join_transaction($1)", xid)
150-
151-
exec(conn1, "begin transaction isolation level " + ISOLATION_LEVEL)
152-
exec(conn2, "begin transaction isolation level " + ISOLATION_LEVEL)
153-
154-
sum1 = execQuery64(conn1, "select sum(v) from t")
155-
sum2 = execQuery64(conn2, "select sum(v) from t")
156-
157-
sum = sum1 + sum2
158-
if (sum != prevSum) {
159-
fmt.Println("Total = ", sum, "xid=", xid, "snap1={", execQuery(conn1, "select dtm_get_current_snapshot_xmin()"), execQuery(conn1, "select dtm_get_current_snapshot_xmax()"), "}, snap2={", execQuery(conn2, "select dtm_get_current_snapshot_xmin()"), execQuery(conn2, "select dtm_get_current_snapshot_xmax()"), "}")
160-
prevSum = sum
161-
}
162-
163-
commit(conn1, conn2)
164-
}
165-
conn1.Close()
166-
conn2.Close()
200+
commit(conn1, conn2)
201+
}
202+
conn1.Close()
203+
conn2.Close()
167204
}
168205
wg.Done()
169206
}
@@ -174,9 +211,13 @@ func main() {
174211

175212
prepare_db()
176213

214+
cCommits := make(chan int)
215+
cAborts := make(chan int)
216+
go progress(TRANSFER_CONNECTIONS * N_ITERATIONS, cCommits, cAborts)
217+
177218
transferWg.Add(TRANSFER_CONNECTIONS)
178219
for i:=0; i<TRANSFER_CONNECTIONS; i++ {
179-
go transfer(i, &transferWg)
220+
go transfer(i, cCommits, cAborts, &transferWg)
180221
}
181222
running = true
182223
inspectWg.Add(1)
@@ -185,6 +226,8 @@ func main() {
185226
transferWg.Wait()
186227
running = false
187228
inspectWg.Wait()
229+
230+
fmt.Printf("done\n")
188231
}
189232

190233
func exec(conn *pgx.Conn, stmt string, arguments ...interface{}) {
@@ -216,10 +259,11 @@ func execQuery64(conn *pgx.Conn, stmt string, arguments ...interface{}) int64 {
216259
checkErr(err)
217260
return result
218261
}
262+
219263
func checkErr(err error) {
220264
if err != nil {
221265
panic(err)
222266
}
223267
}
224268

225-
269+
// vim: expandtab ts=4 sts=4 sw=4

src/backend/utils/time/tqual.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,7 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
970970
* and more contention on the PGXACT array.
971971
*/
972972
bool
973-
HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
973+
_HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
974974
Buffer buffer)
975975
{
976976
HeapTupleHeader tuple = htup->t_data;
@@ -1156,7 +1156,7 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
11561156

11571157
return false;
11581158
}
1159-
#if 0
1159+
#if 1
11601160
bool
11611161
HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
11621162
Buffer buffer)

0 commit comments

Comments
 (0)