Skip to content

Commit 9d14fcb

Browse files
committed
Receive snapshots implicitly
1 parent 3d6eaab commit 9d14fcb

File tree

4 files changed

+44
-67
lines changed

4 files changed

+44
-67
lines changed

contrib/pg_xtm/libdtm.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,6 @@ void DtmInitSnapshot(Snapshot snapshot);
1414
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot shaposhot);
1515

1616
// Asks DTM for a fresh snapshot.
17-
void DtmGlobalNewSnapshot(TransactionId xid, Snapshot snapshot);
18-
19-
// Get existed DTM snapshot.
2017
void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot);
2118

2219
// Commits transaction only once all participants have called this function,
@@ -26,7 +23,7 @@ void DtmGlobalSetTransStatus(TransactionId xid, XidStatus status, bool wait);
2623
// Gets the status of the transaction identified by 'xid'. Returns the status
2724
// on success, or -1 otherwise. If 'wait' is true, then it does not return
2825
// until the transaction is finished.
29-
XidStatus DtmGlobalGetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, bool wait);
26+
XidStatus DtmGlobalGetTransStatus(TransactionId xid, bool wait);
3027

3128
// Reserve XIDs for local transaction
3229
TransactioinId DtmGlobalReserve(int nXids);

contrib/pg_xtm/pg_dtm--1.0.sql

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,10 @@ CREATE FUNCTION dtm_begin_transaction(n_participants integer) RETURNS integer
55
AS 'MODULE_PATHNAME','dtm_begin_transaction'
66
LANGUAGE C;
77

8-
CREATE FUNCTION dtm_get_snapshot(xid integer) RETURNS void
8+
CREATE FUNCTION dtm_join_transaction(xid integer) RETURNS void
99
AS 'MODULE_PATHNAME','dtm_get_snapshot'
1010
LANGUAGE C;
1111

12-
CREATE FUNCTION dtm_new_snapshot(xid integer) RETURNS void
13-
AS 'MODULE_PATHNAME','dtm_new_snapshot'
14-
LANGUAGE C;
15-
1612
CREATE FUNCTION dtm_get_current_snapshot_xmin() RETURNS integer
1713
AS 'MODULE_PATHNAME','dtm_get_current_snapshot_xmin'
1814
LANGUAGE C;

contrib/pg_xtm/pg_dtm.c

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,12 @@ static void dtm_shmem_startup(void);
6969
static shmem_startup_hook_type prev_shmem_startup_hook;
7070
static HTAB* xid_in_doubt;
7171
static DtmState* dtm;
72-
static TransactionId DtmCurrentXid = InvalidTransactionId;
7372
static Snapshot CurrentTransactionSnapshot;
7473

7574
static TransactionId DtmNextXid;
7675
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
7776
static SnapshotData DtmLocalSnapshot = { HeapTupleSatisfiesMVCC };
78-
static bool DtmIsGlobalTransaction = false;
77+
static bool DtmHasGlobalSnapshot;
7978
static int DtmLocalXidReserve;
8079
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmCopySnapshot, DtmGetNextXid };
8180

@@ -185,7 +184,7 @@ static void DtmUpdateRecentXmin(void)
185184

186185
XTM_TRACE("XTM: DtmUpdateRecentXmin \n");
187186

188-
if (xmin != InvalidTransactionId) {
187+
if (TransactionIdIsValid(xmin)) {
189188
xmin -= vacuum_defer_cleanup_age;
190189
if (!TransactionIdIsNormal(xmin)) {
191190
xmin = FirstNormalTransactionId;
@@ -238,9 +237,8 @@ static Snapshot DtmCopySnapshot(Snapshot snapshot)
238237
static TransactionId DtmGetNextXid()
239238
{
240239
TransactionId xid;
241-
if (TransactionIdIsValid(DtmNextXid)) {
240+
if (TransactionIdIsValid(DtmNextXid)) {
242241
xid = DtmNextXid;
243-
DtmNextXid = InvalidTransactionId;
244242
} else {
245243
LWLockAcquire(dtm->xidLock, LW_EXCLUSIVE);
246244
if (dtm->nReservedXids == 0) {
@@ -260,9 +258,14 @@ static TransactionId DtmGetNextXid()
260258
static Snapshot DtmGetSnapshot()
261259
{
262260
Snapshot snapshot = GetLocalTransactionSnapshot();
263-
if (DtmIsGlobalTransaction) {
261+
if (TransactionIdIsValid(DtmNextXid)) {
262+
if (!DtmHasGlobalSnapshot) {
263+
Assert(!IsolationUsesXactSnapshot());
264+
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot);
265+
}
264266
DtmMergeSnapshots(snapshot, &DtmSnapshot);
265267
DtmUpdateRecentXmin();
268+
DtmHasGlobalSnapshot = false;
266269
}
267270
CurrentTransactionSnapshot = snapshot;
268271
return snapshot;
@@ -293,9 +296,8 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
293296
return;
294297
} else {
295298
XTM_INFO("Begin commit transaction %d\n", xid);
296-
DtmCurrentXid = xid;
297299
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
298-
hash_search(xid_in_doubt, &DtmCurrentXid, HASH_ENTER, NULL);
300+
hash_search(xid_in_doubt, &DtmNextXid, HASH_ENTER, NULL);
299301
LWLockRelease(dtm->hashLock);
300302
if (!DtmGlobalSetTransStatus(xid, status, true)) {
301303
elog(ERROR, "DTMD failed to set transaction status");
@@ -358,10 +360,16 @@ static void DtmInitialize()
358360
static void
359361
DtmXactCallback(XactEvent event, void *arg)
360362
{
361-
if (event == XACT_EVENT_COMMIT && DtmCurrentXid != InvalidTransactionId) {
362-
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
363-
hash_search(xid_in_doubt, &DtmCurrentXid, HASH_REMOVE, NULL);
364-
LWLockRelease(dtm->hashLock);
363+
if (TransactionIdIsValid(DtmNextXid)) {
364+
switch (event) {
365+
case XACT_EVENT_COMMIT:
366+
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
367+
hash_search(xid_in_doubt, &DtmNextXid, HASH_REMOVE, NULL);
368+
LWLockRelease(dtm->hashLock);
369+
/* no break */
370+
case XACT_EVENT_ABORT:
371+
DtmNextXid = InvalidTransactionId;
372+
}
365373
}
366374
}
367375

@@ -437,8 +445,7 @@ static void dtm_shmem_startup(void)
437445
PG_MODULE_MAGIC;
438446

439447
PG_FUNCTION_INFO_V1(dtm_begin_transaction);
440-
PG_FUNCTION_INFO_V1(dtm_get_snapshot);
441-
PG_FUNCTION_INFO_V1(dtm_new_snapshot);
448+
PG_FUNCTION_INFO_V1(dtm_join_transaction);
442449
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xmax);
443450
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xmin);
444451

@@ -464,32 +471,22 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
464471
DtmNextXid = DtmGlobalStartTransaction(nParticipants, &DtmSnapshot);
465472
Assert(TransactionIdIsValid(DtmNextXid));
466473

467-
DtmIsGlobalTransaction = true;
474+
DtmHasGlobalSnapshot = true;
468475

469476
PG_RETURN_INT32(DtmNextXid);
470477
}
471478

472-
Datum dtm_get_snapshot(PG_FUNCTION_ARGS)
479+
Datum dtm_join_transaction(PG_FUNCTION_ARGS)
473480
{
474481
Assert(!TransactionIdIsValid(DtmNextXid));
475482
DtmNextXid = PG_GETARG_INT32(0);
476483
Assert(TransactionIdIsValid(DtmNextXid));
477484

478-
DtmNextXid = DtmGlobalGetSnapshot(DtmConn, DtmNextXid, &DtmSnapshot);
485+
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot);
479486

480-
DtmIsGlobalTransaction = true;
487+
DtmHasGlobalSnapshot = true;
481488

482489
PG_RETURN_VOID();
483490
}
484491

485-
Datum dtm_new_snapshot(PG_FUNCTION_ARGS)
486-
{
487-
Assert(!TransactionIdIsValid(DtmNextXid));
488-
DtmNextXid = PG_GETARG_INT32(0);
489-
Assert(TransactionIdIsValid(DtmNextXid));
490-
491-
DtmNextXid = DtmGlobalNewSnapshot(DtmConn, DtmNextXid, &DtmSnapshot);
492492

493-
PG_RETURN_VOID();
494-
}
495-

contrib/pg_xtm/tests/transfers.go

Lines changed: 17 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func commit(conn1, conn2 *pgx.Conn) {
4444
}
4545

4646
func prepare_db() {
47-
var xids []int32 = make([]int32, 2)
47+
var xid int32
4848

4949
conn1, err := pgx.Connect(cfg1)
5050
checkErr(err)
@@ -64,18 +64,13 @@ func prepare_db() {
6464
exec(conn2, "drop table if exists t")
6565
exec(conn2, "create table t(u int primary key, v int)")
6666

67+
xid = execQuery(conn1, "select dtm_begin_transaction(2))
68+
exec(conn2, "select dtm_join_transaction(xid))
69+
6770
// strt transaction
6871
exec(conn1, "begin transaction isolation level repeatable read")
6972
exec(conn2, "begin transaction isolation level repeatable read")
70-
71-
// obtain XIDs of paticipants
72-
xids[0] = execQuery(conn1, "select txid_current()")
73-
xids[1] = execQuery(conn2, "select txid_current()")
74-
75-
// register global transaction in DTMD
76-
exec(conn1, "select dtm_begin_transaction($1, $2)", nodes, xids)
77-
exec(conn2, "select dtm_begin_transaction($1, $2)", nodes, xids)
78-
73+
7974
for i := 0; i < N_ACCOUNTS; i++ {
8075
exec(conn1, "insert into t values($1, $2)", i, INIT_AMOUNT)
8176
exec(conn2, "insert into t values($1, $2)", i, INIT_AMOUNT)
@@ -93,7 +88,7 @@ func max(a, b int64) int64 {
9388

9489
func transfer(id int, wg *sync.WaitGroup) {
9590
var err error
96-
var xids []int32 = make([]int32, 2)
91+
var xid int32
9792
var nConflicts = 0
9893

9994
conn1, err := pgx.Connect(cfg1)
@@ -110,18 +105,13 @@ func transfer(id int, wg *sync.WaitGroup) {
110105
account1 := rand.Intn(N_ACCOUNTS)
111106
account2 := rand.Intn(N_ACCOUNTS)
112107

113-
// strt transaction
108+
xid = execQuery(conn1, "select dtm_begin_transaction(2))
109+
exec(conn2, "select dtm_join_transaction(xid))
110+
111+
// start transaction
114112
exec(conn1, "begin transaction isolation level repeatable read")
115113
exec(conn2, "begin transaction isolation level repeatable read")
116114

117-
// obtain XIDs of paticipants
118-
xids[0] = execQuery(conn1, "select txid_current()")
119-
xids[1] = execQuery(conn2, "select txid_current()")
120-
121-
// register global transaction in DTMD
122-
exec(conn1, "select dtm_begin_transaction($1, $2)", nodes, xids)
123-
exec(conn2, "select dtm_begin_transaction($1, $2)", nodes, xids)
124-
125115
if !execUpdate(conn1, "update t set v = v + $1 where u=$2", amount, account1) ||
126116
!execUpdate(conn2, "update t set v = v - $1 where u=$2", amount, account2) {
127117
exec(conn1, "rollback")
@@ -139,7 +129,7 @@ func transfer(id int, wg *sync.WaitGroup) {
139129
func inspect(wg *sync.WaitGroup) {
140130
var sum1, sum2, sum int32
141131
var prevSum int32 = 0
142-
var xids []int32 = make([]int32, 2)
132+
var xid int32
143133

144134
{
145135
conn1, err := pgx.Connect(cfg1)
@@ -149,17 +139,14 @@ func inspect(wg *sync.WaitGroup) {
149139
checkErr(err)
150140

151141
for running {
142+
143+
144+
xid = execQuery(conn1, "select dtm_begin_transaction(2))
145+
exec(conn2, "select dtm_join_transaction(xid))
146+
152147
exec(conn1, "begin transaction isolation level repeatable read")
153148
exec(conn2, "begin transaction isolation level repeatable read")
154149

155-
// obtain XIDs of paticipants
156-
xids[0] = execQuery(conn1, "select txid_current()")
157-
xids[1] = execQuery(conn2, "select txid_current()")
158-
159-
// register global transaction in DTMD
160-
exec(conn1, "select dtm_begin_transaction($1, $2)", nodes, xids)
161-
exec(conn2, "select dtm_begin_transaction($1, $2)", nodes, xids)
162-
163150
sum1 = execQuery(conn1, "select sum(v) from t")
164151
sum2 = execQuery(conn2, "select sum(v) from t")
165152

@@ -212,7 +199,7 @@ func execUpdate(conn *pgx.Conn, stmt string, arguments ...interface{}) bool {
212199

213200
func execQuery(conn *pgx.Conn, stmt string, arguments ...interface{}) int32 {
214201
var err error
215-
var result int64
202+
var result int32
216203
err = conn.QueryRow(stmt, arguments...).Scan(&result)
217204
checkErr(err)
218205
return int32(result)

0 commit comments

Comments
 (0)