Skip to content

Commit 8a9c01b

Browse files
committed
WIP
1 parent e489993 commit 8a9c01b

File tree

6 files changed

+59
-32
lines changed

6 files changed

+59
-32
lines changed

contrib/pg_xtm/dtmd/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
CC=gcc
2-
CFLAGS=-g -Wall -Iinclude -D_LARGEFILE64_SOURCE
2+
CFLAGS=-g -Wall -Iinclude -D_LARGEFILE64_SOURCE -DDEBUG
33
LIBUV_PREFIX=$(HOME)/libuv-build
44
LIBUV_CFLAGS=-I"$(LIBUV_PREFIX)/include" -L"$(LIBUV_PREFIX)/lib"
55
LIBUV_LDFLAGS=-luv -pthread

contrib/pg_xtm/libdtm.c

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,19 +122,27 @@ static bool dtm_read_snapshot(DTMConn dtm, Snapshot s)
122122
static bool dtm_read_status(DTMConn dtm, XidStatus *s)
123123
{
124124
char statuschar;
125-
if (!dtm_read_char(dtm, &statuschar)) return false;
125+
if (!dtm_read_char(dtm, &statuschar)) {
126+
fprintf(stderr, "dtm_read_status: failed to get char\n");
127+
return false;
128+
}
126129

127130
switch (statuschar)
128131
{
129132
case '0':
130133
*s = TRANSACTION_STATUS_UNKNOWN;
134+
break;
131135
case 'c':
132136
*s = TRANSACTION_STATUS_COMMITTED;
137+
break;
133138
case 'a':
134139
*s = TRANSACTION_STATUS_ABORTED;
140+
break;
135141
case '?':
136142
*s = TRANSACTION_STATUS_IN_PROGRESS;
143+
break;
137144
default:
145+
fprintf(stderr, "dtm_read_status: unexpected char '%c'\n", statuschar);
138146
return false;
139147
}
140148
return true;
@@ -229,7 +237,10 @@ static DTMConn GetConnection()
229237
if (dtm == NULL)
230238
{
231239
// FIXME: add API for setting the host and port for dtm connection
232-
dtm = DtmConnect("localhost", 5431);
240+
dtm = DtmConnect("127.0.0.1", 5431);
241+
if (dtm == NULL) {
242+
elog(ERROR, "Failed to connect to DTMD");
243+
}
233244
}
234245
return dtm;
235246
}
@@ -329,11 +340,11 @@ XidStatus DtmGlobalSetTransStatus(TransactionId xid, XidStatus status, bool wait
329340
{
330341
case TRANSACTION_STATUS_COMMITTED:
331342
// query
332-
if (!dtm_query(dtm, 'c', 2, xid, wait)) goto failure;
343+
if (!dtm_query(dtm, 'y', 2, xid, wait)) goto failure;
333344
break;
334345
case TRANSACTION_STATUS_ABORTED:
335346
// query
336-
if (!dtm_query(dtm, 'a', 2, xid, wait)) goto failure;
347+
if (!dtm_query(dtm, 'n', 2, xid, wait)) goto failure;
337348
break;
338349
default:
339350
assert(false); // should not happen

contrib/pg_xtm/pg_dtm.c

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ static TransactionId DtmNextXid;
7575
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
7676
static SnapshotData DtmLocalSnapshot = { HeapTupleSatisfiesMVCC };
7777
static bool DtmHasGlobalSnapshot;
78+
static bool DtmIsGlobalTransaction;
7879
static int DtmLocalXidReserve;
7980
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmCopySnapshot, DtmGetNextXid };
8081

@@ -238,16 +239,18 @@ static TransactionId DtmGetNextXid()
238239
{
239240
TransactionId xid;
240241
if (TransactionIdIsValid(DtmNextXid)) {
242+
XTM_INFO("Use global XID %d\n", DtmNextXid);
241243
xid = DtmNextXid;
242244
} else {
243245
LWLockAcquire(dtm->xidLock, LW_EXCLUSIVE);
244246
if (dtm->nReservedXids == 0) {
245-
dtm->nReservedXids = DtmGlobalReserve(dtm->nextXid, DtmLocalXidReserve, &xid);
247+
dtm->nReservedXids = DtmGlobalReserve(ShmemVariableCache->nextXid, DtmLocalXidReserve, &xid);
246248
ShmemVariableCache->nextXid = xid;
247249
dtm->nextXid = xid;
248250
}
249251
Assert(dtm->nextXid == ShmemVariableCache->nextXid);
250252
xid = ShmemVariableCache->nextXid;
253+
XTM_INFO("Obtain new local XID %d\n", xid);
251254
dtm->nextXid += 1;
252255
dtm->nReservedXids -= 1;
253256
LWLockRelease(dtm->xidLock);
@@ -260,12 +263,13 @@ static Snapshot DtmGetSnapshot()
260263
Snapshot snapshot = GetLocalTransactionSnapshot();
261264
if (TransactionIdIsValid(DtmNextXid)) {
262265
if (!DtmHasGlobalSnapshot) {
263-
Assert(!IsolationUsesXactSnapshot());
264266
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot);
265267
}
266268
DtmMergeSnapshots(snapshot, &DtmSnapshot);
267269
DtmUpdateRecentXmin();
268-
DtmHasGlobalSnapshot = false;
270+
if (!IsolationUsesXactSnapshot()) {
271+
DtmHasGlobalSnapshot = false;
272+
}
269273
}
270274
CurrentTransactionSnapshot = snapshot;
271275
return snapshot;
@@ -358,18 +362,17 @@ static void DtmInitialize()
358362
static void
359363
DtmXactCallback(XactEvent event, void *arg)
360364
{
361-
if (TransactionIdIsValid(DtmNextXid)) {
362-
switch (event) {
363-
case XACT_EVENT_COMMIT:
365+
if (event == XACT_EVENT_COMMIT || event == XACT_EVENT_ABORT) {
366+
XTM_INFO("%d: DtmXactCallbackevent=%d isGlobal=%d, nextxid=%d\n", getpid(), event, DtmIsGlobalTransaction, DtmNextXid);
367+
if (DtmIsGlobalTransaction) {
368+
DtmIsGlobalTransaction = false;
369+
} else if (TransactionIdIsValid(DtmNextXid)) {
370+
if (event == XACT_EVENT_COMMIT) {
364371
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
365372
hash_search(xid_in_doubt, &DtmNextXid, HASH_REMOVE, NULL);
366373
LWLockRelease(dtm->hashLock);
367-
/* no break */
368-
case XACT_EVENT_ABORT:
369-
DtmNextXid = InvalidTransactionId;
370-
break;
371-
default:
372-
break;
374+
}
375+
DtmNextXid = InvalidTransactionId;
373376
}
374377
}
375378
}
@@ -472,8 +475,10 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
472475

473476
DtmNextXid = DtmGlobalStartTransaction(nParticipants, &DtmSnapshot);
474477
Assert(TransactionIdIsValid(DtmNextXid));
478+
XTM_INFO("%d: Start global transaction %d\n", getpid(), DtmNextXid);
475479

476480
DtmHasGlobalSnapshot = true;
481+
DtmIsGlobalTransaction = true;
477482

478483
PG_RETURN_INT32(DtmNextXid);
479484
}
@@ -483,10 +488,12 @@ Datum dtm_join_transaction(PG_FUNCTION_ARGS)
483488
Assert(!TransactionIdIsValid(DtmNextXid));
484489
DtmNextXid = PG_GETARG_INT32(0);
485490
Assert(TransactionIdIsValid(DtmNextXid));
491+
XTM_INFO("%d: Join global transaction %d\n", getpid(), DtmNextXid);
486492

487493
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot);
488494

489495
DtmHasGlobalSnapshot = true;
496+
DtmIsGlobalTransaction = true;
490497

491498
PG_RETURN_VOID();
492499
}

contrib/pg_xtm/tests/transfers.go

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
const (
11-
TRANSFER_CONNECTIONS = 8
11+
TRANSFER_CONNECTIONS = 2
1212
INIT_AMOUNT = 10000
1313
N_ITERATIONS = 10000
1414
N_ACCOUNTS = TRANSFER_CONNECTIONS//100000
@@ -47,7 +47,7 @@ func commit(conn1, conn2 *pgx.Conn) {
4747
}
4848

4949
func prepare_db() {
50-
var xid int32
50+
// var xid int32
5151

5252
conn1, err := pgx.Connect(cfg1)
5353
checkErr(err)
@@ -67,8 +67,8 @@ func prepare_db() {
6767
exec(conn2, "drop table if exists t")
6868
exec(conn2, "create table t(u int primary key, v int)")
6969

70-
xid = execQuery(conn1, "select dtm_begin_transaction(2)")
71-
exec(conn2, "select dtm_join_transaction(xid)")
70+
// xid = execQuery(conn1, "select dtm_begin_transaction(2)")
71+
// exec(conn2, "select dtm_join_transaction($1)", xid)
7272

7373
// strt transaction
7474
exec(conn1, "begin transaction isolation level " + ISOLATION_LEVEL)
@@ -109,14 +109,15 @@ func transfer(id int, wg *sync.WaitGroup) {
109109
account2 := rand.Intn(N_ACCOUNTS)
110110

111111
xid = execQuery(conn1, "select dtm_begin_transaction(2)")
112-
exec(conn2, "select dtm_join_transaction(xid)")
112+
exec(conn2, "select dtm_join_transaction($1)", xid)
113113

114114
// start transaction
115115
exec(conn1, "begin transaction isolation level " + ISOLATION_LEVEL)
116116
exec(conn2, "begin transaction isolation level " + ISOLATION_LEVEL)
117117

118-
if !execUpdate(conn1, "update t set v = v + $1 where u=$2", amount, account1) ||
119-
!execUpdate(conn2, "update t set v = v - $1 where u=$2", amount, account2) {
118+
ok1 := execUpdate(conn1, "update t set v = v + $1 where u=$2", amount, account1)
119+
ok2 := execUpdate(conn2, "update t set v = v - $1 where u=$2", amount, account2)
120+
if !ok1 || !ok2 {
120121
exec(conn1, "rollback")
121122
exec(conn2, "rollback")
122123
nConflicts += 1
@@ -130,8 +131,8 @@ func transfer(id int, wg *sync.WaitGroup) {
130131
}
131132

132133
func inspect(wg *sync.WaitGroup) {
133-
var sum1, sum2, sum int32
134-
var prevSum int32 = 0
134+
var sum1, sum2, sum int64
135+
var prevSum int64 = 0
135136
var xid int32
136137

137138
{
@@ -145,17 +146,17 @@ func inspect(wg *sync.WaitGroup) {
145146

146147

147148
xid = execQuery(conn1, "select dtm_begin_transaction(2)")
148-
exec(conn2, "select dtm_join_transaction(xid)")
149+
exec(conn2, "select dtm_join_transaction($1)", xid)
149150

150151
exec(conn1, "begin transaction isolation level " + ISOLATION_LEVEL)
151152
exec(conn2, "begin transaction isolation level " + ISOLATION_LEVEL)
152153

153-
sum1 = execQuery(conn1, "select sum(v) from t")
154-
sum2 = execQuery(conn2, "select sum(v) from t")
154+
sum1 = execQuery64(conn1, "select sum(v) from t")
155+
sum2 = execQuery64(conn2, "select sum(v) from t")
155156

156157
sum = sum1 + sum2
157158
if (sum != prevSum) {
158-
fmt.Println("Total = ", sum, "xids=", xids, "snap1={", execQuery(conn1, "select dtm_get_current_snapshot_xmin()"), execQuery(conn1, "select dtm_get_current_snapshot_xmax()"), "}, snap2={", execQuery(conn2, "select dtm_get_current_snapshot_xmin()"), execQuery(conn2, "select dtm_get_current_snapshot_xmax()"), "}")
159+
fmt.Println("Total = ", sum, "xid=", xid, "snap1={", execQuery(conn1, "select dtm_get_current_snapshot_xmin()"), execQuery(conn1, "select dtm_get_current_snapshot_xmax()"), "}, snap2={", execQuery(conn2, "select dtm_get_current_snapshot_xmin()"), execQuery(conn2, "select dtm_get_current_snapshot_xmax()"), "}")
159160
prevSum = sum
160161
}
161162

@@ -205,9 +206,16 @@ func execQuery(conn *pgx.Conn, stmt string, arguments ...interface{}) int32 {
205206
var result int32
206207
err = conn.QueryRow(stmt, arguments...).Scan(&result)
207208
checkErr(err)
208-
return int32(result)
209+
return result
209210
}
210211

212+
func execQuery64(conn *pgx.Conn, stmt string, arguments ...interface{}) int64 {
213+
var err error
214+
var result int64
215+
err = conn.QueryRow(stmt, arguments...).Scan(&result)
216+
checkErr(err)
217+
return result
218+
}
211219
func checkErr(err error) {
212220
if err != nil {
213221
panic(err)

src/backend/access/transam/varsup.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "access/transam.h"
2020
#include "access/xact.h"
2121
#include "access/xlog.h"
22+
#include "access/xtm.h"
2223
#include "commands/dbcommands.h"
2324
#include "miscadmin.h"
2425
#include "postmaster/autovacuum.h"

src/include/access/transam.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ extern TransactionId TransactionIdLatest(TransactionId mainxid,
169169
extern XLogRecPtr TransactionIdGetCommitLSN(TransactionId xid);
170170

171171
/* in transam/varsup.c */
172-
extern TransactionId GetNextTransactionId();
172+
extern TransactionId GetNextTransactionId(void);
173173
extern TransactionId GetNewTransactionId(bool isSubXact);
174174
extern TransactionId ReadNewTransactionId(void);
175175
extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid,

0 commit comments

Comments
 (0)