Skip to content

Commit 0f1b88d

Browse files
committed
FDW is workings
1 parent a09fc09 commit 0f1b88d

File tree

6 files changed

+40
-85
lines changed

6 files changed

+40
-85
lines changed

contrib/pg_xtm/pg_dtm.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionSt
9696

9797

9898
#define XTM_TRACE(fmt, ...)
99-
#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
100-
//#define XTM_INFO(fmt, ...)
99+
//#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
100+
#define XTM_INFO(fmt, ...)
101101

102102
static void DumpSnapshot(Snapshot s, char *name)
103103
{

contrib/pg_xtm/tests/transfers-fdw.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
)
1111

1212
const (
13-
TRANSFER_CONNECTIONS = 2
13+
TRANSFER_CONNECTIONS = 8
1414
INIT_AMOUNT = 10000
1515
N_ITERATIONS = 10000
1616
N_ACCOUNTS = TRANSFER_CONNECTIONS//100000
@@ -35,8 +35,6 @@ var cfg2 = pgx.ConnConfig{
3535
var running = false
3636

3737
func prepare_db() {
38-
// var xid int32
39-
4038
conn1, err := pgx.Connect(cfg1)
4139
checkErr(err)
4240
defer conn1.Close()
@@ -66,7 +64,7 @@ func prepare_db() {
6664

6765
for i := 0; i < N_ACCOUNTS; i++ {
6866
exec(conn1, "insert into t values($1, $2)", i, INIT_AMOUNT)
69-
exec(conn2, "insert into t values($1, $2)", -i, INIT_AMOUNT)
67+
exec(conn2, "insert into t values($1, $2)", ^i, INIT_AMOUNT)
7068
}
7169

7270
exec(conn1, "commit")
@@ -106,7 +104,7 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
106104
for myCommits < N_ITERATIONS {
107105
amount := 1
108106
account1 := rand.Intn(N_ACCOUNTS)
109-
account2 := -rand.Intn(N_ACCOUNTS)
107+
account2 := ^rand.Intn(N_ACCOUNTS)
110108

111109
exec(conn, "begin")
112110
xid = execQuery(conn, "select dtm_begin_transaction(2)")
@@ -150,6 +148,7 @@ func inspect(wg *sync.WaitGroup) {
150148
checkErr(err)
151149

152150
for running {
151+
153152
exec(conn, "begin")
154153
xid = execQuery(conn, "select dtm_begin_transaction(2)")
155154
exec(conn, "select postgres_fdw_exec('t_fdw'::regclass::oid, 'select public.dtm_join_transaction(" + strconv.Itoa(int(xid)) + ")')")

contrib/pg_xtm/tests/transfers.go

Lines changed: 22 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,10 @@ import (
1111
const (
1212
TRANSFER_CONNECTIONS = 8
1313
INIT_AMOUNT = 10000
14-
N_ITERATIONS = 100000
14+
N_ITERATIONS = 10000
1515
N_ACCOUNTS = TRANSFER_CONNECTIONS//100000
16-
//ISOLATION_LEVEL = "repeatable read"
17-
ISOLATION_LEVEL = "read committed"
18-
GLOBAL_UPDATES = true
19-
LOCAL_UPDATES = false
16+
ISOLATION_LEVEL = "repeatable read"
17+
//ISOLATION_LEVEL = "read committed"
2018
)
2119

2220

@@ -50,8 +48,6 @@ func commit(conn1, conn2 *pgx.Conn) {
5048
}
5149

5250
func prepare_db() {
53-
// var xid int32
54-
5551
conn1, err := pgx.Connect(cfg1)
5652
checkErr(err)
5753
defer conn1.Close()
@@ -70,9 +66,6 @@ func prepare_db() {
7066
exec(conn2, "drop table if exists t")
7167
exec(conn2, "create table t(u int primary key, v int)")
7268

73-
// xid = execQuery(conn1, "select dtm_begin_transaction(2)")
74-
// exec(conn2, "select dtm_join_transaction($1)", xid)
75-
7669
// strt transaction
7770
exec(conn1, "begin transaction isolation level " + ISOLATION_LEVEL)
7871
exec(conn2, "begin transaction isolation level " + ISOLATION_LEVEL)
@@ -134,63 +127,27 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
134127
account1 := rand.Intn(N_ACCOUNTS)
135128
account2 := rand.Intn(N_ACCOUNTS)
136129

137-
if (account1 >= account2) {
138-
continue
139-
}
130+
src := conn[0]
131+
dst := conn[1]
140132

141-
srci := rand.Intn(2)
142-
dsti := rand.Intn(2)
143-
if (srci > dsti) {
144-
continue
145-
}
133+
xid = execQuery(src, "select dtm_begin_transaction(2)")
134+
exec(dst, "select dtm_join_transaction($1)", xid)
146135

147-
src := conn[srci]
148-
dst := conn[dsti]
136+
// start transaction
137+
exec(src, "begin transaction isolation level " + ISOLATION_LEVEL)
138+
exec(dst, "begin transaction isolation level " + ISOLATION_LEVEL)
149139

150-
if src == dst {
151-
// local update
152-
if !LOCAL_UPDATES {
153-
// which we do not want
154-
continue
155-
}
140+
ok1 := execUpdate(src, "update t set v = v - $1 where u=$2", amount, account1)
141+
ok2 := execUpdate(dst, "update t set v = v + $1 where u=$2", amount, account2)
156142

157-
exec(src, "begin transaction isolation level " + ISOLATION_LEVEL)
158-
ok1 := execUpdate(src, "update t set v = v - $1 where u=$2", amount, account1)
159-
ok2 := execUpdate(src, "update t set v = v + $1 where u=$2", amount, account2)
160-
if !ok1 || !ok2 {
161-
exec(src, "rollback")
162-
nAborts += 1
163-
} else {
164-
exec(src, "commit")
165-
nCommits += 1
166-
myCommits += 1
167-
}
143+
if !ok1 || !ok2 {
144+
exec(src, "rollback")
145+
exec(dst, "rollback")
146+
nAborts += 1
168147
} else {
169-
// global update
170-
if !GLOBAL_UPDATES {
171-
// which we do not want
172-
continue
173-
}
174-
175-
xid = execQuery(src, "select dtm_begin_transaction(2)")
176-
exec(dst, "select dtm_join_transaction($1)", xid)
177-
178-
// start transaction
179-
exec(src, "begin transaction isolation level " + ISOLATION_LEVEL)
180-
exec(dst, "begin transaction isolation level " + ISOLATION_LEVEL)
181-
182-
ok1 := execUpdate(src, "update t set v = v - $1 where u=$2", amount, account1)
183-
ok2 := execUpdate(dst, "update t set v = v + $1 where u=$2", amount, account2)
184-
185-
if !ok1 || !ok2 {
186-
exec(src, "rollback")
187-
exec(dst, "rollback")
188-
nAborts += 1
189-
} else {
190-
commit(src, dst)
191-
nCommits += 1
192-
myCommits += 1
193-
}
148+
commit(src, dst)
149+
nCommits += 1
150+
myCommits += 1
194151
}
195152

196153
if time.Since(start).Seconds() > 1 {
@@ -285,9 +242,9 @@ func execUpdate(conn *pgx.Conn, stmt string, arguments ...interface{}) bool {
285242
var err error
286243
// fmt.Println(stmt)
287244
_, err = conn.Exec(stmt, arguments... )
288-
if err != nil {
289-
fmt.Println(err)
290-
}
245+
//if err != nil {
246+
// fmt.Println(err)
247+
//}
291248
return err == nil
292249
}
293250

src/backend/access/heap/heapam.c

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3632,20 +3632,15 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
36323632

36333633
#if 0
36343634
{
3635-
char buf[256];
3636-
sprintf(buf, "backend-%d.trace", getpid());
3637-
FILE* f = fopen(buf, "a");
36383635
Snapshot s = GetTransactionSnapshot();
3639-
fprintf(f, "xid=%d: old.ctid=[%x-%x,%x], old.xmin=%d, old.xmax=%d, old.mask=%x, new.xmin=%d, new.xmax=%d, new.flags=%x, snap.xmin=%d, snap.xmax=%d, xcnt=%d, xip={%d,%d,%d,%d,%d}\n",
3640-
xid,
3636+
fprintf(stderr, "pid=%d transaction %d update tuple: old.ctid=[%x-%x,%x], old.xmin=%d, old.xmax=%d, old.mask=%x, new.xmin=%d, new.xmax=%d, new.flags=%x, snap.xmin=%d, snap.xmax=%d\n",
3637+
getpid(), xid,
36413638
oldtup.t_data->t_ctid.ip_blkid.bi_hi,
36423639
oldtup.t_data->t_ctid.ip_blkid.bi_lo,
36433640
oldtup.t_data->t_ctid.ip_posid,
36443641
HeapTupleHeaderGetRawXmin(oldtup.t_data), HeapTupleHeaderGetRawXmax(oldtup.t_data), oldtup.t_data->t_infomask,
36453642
xid, xmax_new_tuple, infomask_new_tuple,
3646-
s->xmin, s->xmax, s->xcnt, s->xip[0], s->xip[1], s->xip[2], s->xip[3], s->xip[4]
3647-
);
3648-
fclose(f);
3643+
s->xmin, s->xmax);
36493644
}
36503645
Assert(xmax_new_tuple != xid || (newtup->t_data->t_infomask & HEAP_XMAX_LOCK_ONLY) != 0);
36513646
#endif

src/backend/access/heap/visibilitymap.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
254254
Page page;
255255
char *map;
256256

257-
#if 1
257+
#if 0
258258
fprintf(stderr, "Visibilitymap cutoff %d, RecentLocalDataXmin=%d\n", cutoff_xid, RecentGlobalDataXmin);
259259
// return;
260260
#endif

src/backend/utils/time/tqual.c

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,7 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
970970
* and more contention on the PGXACT array.
971971
*/
972972
bool
973-
_HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
973+
HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
974974
Buffer buffer)
975975
{
976976
HeapTupleHeader tuple = htup->t_data;
@@ -1156,7 +1156,7 @@ _HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
11561156

11571157
return false;
11581158
}
1159-
#if 1
1159+
#if 0
11601160
bool
11611161
HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
11621162
Buffer buffer)
@@ -1166,8 +1166,12 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
11661166
HeapTupleHeader tuple = htup->t_data;
11671167
TransactionId curxid = GetCurrentTransactionId();
11681168
if (TransactionIdIsNormal(curxid)) {
1169-
fprintf(stderr, "pid=%d Transaction %d, [%d,%d) visibility check for tuple {%d,%d} %x = %d\n",
1170-
getpid(), curxid, snapshot->xmin, snapshot->xmax, HeapTupleHeaderGetRawXmin(tuple), HeapTupleHeaderGetRawXmax(tuple), tuple->t_infomask, result);
1169+
fprintf(stderr, "pid=%d Transaction %d, [%d,%d) visibility check for tuple [%x-%x,%x] {%d,%d} %x = %d\n",
1170+
getpid(), curxid, snapshot->xmin, snapshot->xmax,
1171+
tuple->t_ctid.ip_blkid.bi_hi,
1172+
tuple->t_ctid.ip_blkid.bi_lo,
1173+
tuple->t_ctid.ip_posid,
1174+
HeapTupleHeaderGetRawXmin(tuple), HeapTupleHeaderGetRawXmax(tuple), tuple->t_infomask, result);
11711175
}
11721176

11731177
return result;

0 commit comments

Comments
 (0)