Skip to content

Commit 54d4ee6

Browse files
committed
Yet annother DTM version
1 parent 15a412f commit 54d4ee6

File tree

6 files changed

+107
-82
lines changed

6 files changed

+107
-82
lines changed

contrib/pg_xtm/pg_dtm--1.0.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
22
\echo Use "CREATE EXTENSION pg_dtm" to load this file. \quit
33

4-
CREATE FUNCTION dtm_global_transaction(nodes integer[], xids integer[]) RETURNS void
5-
AS 'MODULE_PATHNAME','dtm_global_transaction'
4+
CREATE FUNCTION dtm_begin_transaction(nodes integer[], xids integer[]) RETURNS void
5+
AS 'MODULE_PATHNAME','dtm_begin_transaction'
66
LANGUAGE C;
77

88
CREATE FUNCTION dtm_get_snapshot() RETURNS void

contrib/pg_xtm/pg_dtm.c

Lines changed: 79 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot);
4141
static void DtmCopySnapshot(Snapshot dst, Snapshot src);
4242
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
4343
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
44-
static bool TransactionIsInDtmSnapshot(TransactionId xid);
44+
static XidStatus DtmGetGloabalTransStatus(TransactionId xid);
45+
static void DtmUpdateRecentXmin(void);
46+
static bool IsInDtmSnapshot(TransactionId xid);
4547

4648
static NodeId DtmNodeId;
47-
4849
static DTMConn DtmConn;
4950
static SnapshotData DtmSnapshot = {HeapTupleSatisfiesMVCC};
5051
static bool DtmHasSnapshot = false;
@@ -73,62 +74,102 @@ static void DtmCopySnapshot(Snapshot dst, Snapshot src)
7374
dst->curcid = src->curcid;
7475
}
7576

77+
static void DtmUpdateRecentXmin(void)
78+
{
79+
TransactionId xmin = DtmSnapshot.xmin;
80+
if (xmin != InvalidTransactionId) {
81+
xmin -= vacuum_defer_cleanup_age;
82+
if (!TransactionIdIsNormal(xmin)) {
83+
xmin = FirstNormalTransactionId;
84+
}
85+
if (RecentGlobalDataXmin > xmin) {
86+
RecentGlobalDataXmin = xmin;
87+
}
88+
if (RecentGlobalXmin > xmin) {
89+
RecentGlobalXmin = xmin;
90+
}
91+
RecentXmin = xmin;
92+
}
93+
}
94+
7695
static Snapshot DtmGetSnapshot(Snapshot snapshot)
7796
{
7897
if (DtmHasSnapshot) {
7998
DtmCopySnapshot(snapshot, &DtmSnapshot);
8099
return snapshot;
81100
}
82-
return GetLocalSnapshotData(snapshot);
101+
snapshot = GetLocalSnapshotData(snapshot);
102+
DtmUpdateRecentXmin();
103+
return snapshot;
104+
}
105+
106+
107+
static bool IsInDtmSnapshot(TransactionId xid)
108+
{
109+
return DtmHasSnapshot
110+
&& (xid > DtmSnapshot.xmax
111+
|| bsearch(&xid, DtmSnapshot.xip, DtmSnapshot.xcnt, sizeof(TransactionId), xidComparator) != NULL);
83112
}
113+
114+
115+
static XidStatus DtmGetGloabalTransStatus(TransactionId xid)
116+
{
117+
unsigned delay = 1000;
118+
while (true) {
119+
DtmEnsureConnection();
120+
XidStatus status = DtmGlobalGetTransStatus(DtmConn, DtmNodeId, xid);
121+
if (status == TRANSACTION_STATUS_IN_PROGRESS) {
122+
pg_usleep(delay);
123+
if (delay < 100000) {
124+
delay *= 2;
125+
}
126+
} else {
127+
return status;
128+
}
129+
}
130+
}
84131

85132
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
86133
{
87134
XidStatus status = CLOGTransactionIdGetStatus(xid, lsn);
88-
if (status == TRANSACTION_STATUS_IN_PROGRESS) {
89135
#if 0
90-
&& DtmHasSnapshot && !TransactionIdIsInProgress(xid)) {
91-
unsigned delay = 1000;
92-
while (true) {
93-
DtmEnsureConnection();
94-
status = DtmGlobalGetTransStatus(DtmConn, DtmNodeId, xid);
95-
if (status == TRANSACTION_STATUS_IN_PROGRESS) {
96-
pg_usleep(delay);
97-
if (delay < 100000) {
98-
delay *= 2;
99-
}
100-
} else {
101-
break;
102-
}
103-
}
104-
#endif
105-
status = DtmGlobalGetTransStatus(DtmConn, DtmNodeId, xid);
106-
if (status != TRANSACTION_STATUS_IN_PROGRESS) {
107-
CLOGTransactionIdSetTreeStatus(xid, 0, NULL, status, InvalidXLogRecPtr);
136+
if (status == TRANSACTION_STATUS_IN_PROGRESS) {
137+
status = DtmGetGloabalTransStatus(xid);
138+
if (status == TRANSACTION_STATUS_UNKNOWN) {
139+
status = TRANSACTION_STATUS_IN_PROGRESS;
108140
}
109141
}
142+
#endif
110143
return status;
111144
}
112145

146+
113147
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
114148
{
115-
if (DtmHasSnapshot) {
116-
/* Already should be IN_PROGRESS */
117-
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
118-
if (status == TRANSACTION_STATUS_COMMITTED) {
119-
ProcArrayAdd(&ProcGlobal->allProcs[AllocGXid(xid)]);
149+
if (!RecoveryInProgress()) {
150+
if (DtmHasSnapshot) {
151+
/* Already should be IN_PROGRESS */
152+
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
153+
154+
DtmHasSnapshot = false;
155+
DtmEnsureConnection();
156+
if (!DtmGlobalSetTransStatus(DtmConn, DtmNodeId, xid, status) && status != TRANSACTION_STATUS_ABORTED) {
157+
elog(ERROR, "DTMD failed to set transaction status");
158+
}
159+
status = DtmGetGloabalTransStatus(xid);
160+
Assert(status == TRANSACTION_STATUS_ABORTED || status == TRANSACTION_STATUS_COMMITTED);
161+
} else {
162+
elog(WARNING, "Set transaction %u status in local CLOG" , xid);
120163
}
121-
DtmHasSnapshot = false;
122-
DtmEnsureConnection();
123-
if (!DtmGlobalSetTransStatus(DtmConn, DtmNodeId, xid, status) && status != TRANSACTION_STATUS_ABORTED) {
124-
elog(ERROR, "DTMD failed to set transaction status");
164+
} else {
165+
XidStatus gs = DtmGetGloabalTransStatus(xid);
166+
if (gs != TRANSACTION_STATUS_UNKNOWN) {
167+
status = gs;
125168
}
126-
} else {
127-
CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn);
128169
}
170+
CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn);
129171
}
130172

131-
132173
/*
133174
* ***************************************************************************
134175
*/
@@ -167,11 +208,11 @@ _PG_fini(void)
167208

168209
PG_MODULE_MAGIC;
169210

170-
PG_FUNCTION_INFO_V1(dtm_global_transaction);
211+
PG_FUNCTION_INFO_V1(dtm_begin_transaction);
171212
PG_FUNCTION_INFO_V1(dtm_get_snapshot);
172213

173214
Datum
174-
dtm_global_transaction(PG_FUNCTION_ARGS)
215+
dtm_begin_transaction(PG_FUNCTION_ARGS)
175216
{
176217
GlobalTransactionId gtid;
177218
ArrayType* nodes = PG_GETARG_ARRAYTYPE_P(0);
@@ -184,36 +225,16 @@ dtm_global_transaction(PG_FUNCTION_ARGS)
184225
PG_RETURN_VOID();
185226
}
186227

187-
188-
189-
190-
191-
192-
193228
Datum
194229
dtm_get_snapshot(PG_FUNCTION_ARGS)
195230
{
196-
TransactionId xmin;
197231
DtmEnsureConnection();
198232
DtmGlobalGetSnapshot(DtmConn, DtmNodeId, GetCurrentTransactionId(), &DtmSnapshot);
199233

200-
VacuumProcArray(&DtmSnapshot);
234+
// VacuumProcArray(&DtmSnapshot);
201235

202236
/* Move it to DtmGlobalGetSnapshot? */
203-
xmin = DtmSnapshot.xmin;
204-
if (xmin != InvalidTransactionId) {
205-
xmin -= vacuum_defer_cleanup_age;
206-
if (!TransactionIdIsNormal(xmin)) {
207-
xmin = FirstNormalTransactionId;
208-
}
209-
if (RecentGlobalDataXmin > xmin) {
210-
RecentGlobalDataXmin = xmin;
211-
}
212-
if (RecentGlobalXmin > xmin) {
213-
RecentGlobalXmin = xmin;
214-
}
215-
RecentXmin = xmin;
216-
}
237+
DtmUpdateRecentXmin();
217238
DtmSnapshot.curcid = GetCurrentCommandId(false);
218239
DtmHasSnapshot = true;
219240
PG_RETURN_VOID();

contrib/pg_xtm/tests/setup.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
export PATH=~/go/bin:~/postgrespro/dist/bin/:$PATH
2+
export LD_LIBRARY_PATH=/usr/local/lib
3+
export GOPATH=~/golib
4+
export GOROOT=~/go

contrib/pg_xtm/tests/transfers.go

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,19 @@ var cfg2 = pgx.ConnConfig{
3030
var running = false
3131
var nodes []int32 = []int32{0,1}
3232

33+
func asyncCommit(conn *pgx.Conn, wg *sync.WaitGroup) {
34+
exec(conn, "commit")
35+
wg.Done()
36+
}
37+
38+
func commit(conn1, conn2 *pgx.Conn) {
39+
var wg sync.WaitGroup
40+
wg.Add(2)
41+
go asyncCommit(conn1, &wg)
42+
go asyncCommit(conn2, &wg)
43+
wg.Wait()
44+
}
45+
3346
func prepare_db() {
3447
var xids []int32 = make([]int32, 2)
3548

@@ -60,7 +73,7 @@ func prepare_db() {
6073
xids[1] = execQuery(conn2, "select txid_current()")
6174

6275
// register global transaction in DTMD
63-
exec(conn1, "select dtm_global_transaction($1, $2)", nodes, xids)
76+
exec(conn1, "select dtm_begin_transaction($1, $2)", nodes, xids)
6477

6578
// first global statement
6679
exec(conn1, "select dtm_get_snapshot()")
@@ -71,14 +84,7 @@ func prepare_db() {
7184
exec(conn2, "insert into t values($1, $2)", i, INIT_AMOUNT)
7285
}
7386

74-
// second global statement
75-
exec(conn1, "select dtm_get_snapshot()")
76-
exec(conn2, "select dtm_get_snapshot()")
77-
78-
// commit work
79-
exec(conn1, "commit")
80-
exec(conn2, "commit")
81-
// at this moment transaction should be globally committed
87+
commit(conn1, conn2)
8288
}
8389

8490
func max(a, b int64) int64 {
@@ -115,23 +121,16 @@ func transfer(id int, wg *sync.WaitGroup) {
115121
xids[1] = execQuery(conn2, "select txid_current()")
116122

117123
// register global transaction in DTMD
118-
exec(conn1, "select dtm_global_transaction($1, $2)", nodes, xids)
124+
exec(conn1, "select dtm_begin_transaction($1, $2)", nodes, xids)
119125

120126
// first global statement
121127
exec(conn1, "select dtm_get_snapshot()")
122128
exec(conn2, "select dtm_get_snapshot()")
123129

124130
exec(conn1, "update t set v = v + $1 where u=$2", amount, account1)
125131
exec(conn2, "update t set v = v - $1 where u=$2", amount, account2)
126-
127-
// second global statement
128-
exec(conn1, "select dtm_get_snapshot()")
129-
exec(conn2, "select dtm_get_snapshot()")
130-
131-
// commit work
132-
exec(conn1, "commit")
133-
exec(conn2, "commit")
134-
// at this moment transaction should be globally committed
132+
133+
commit(conn1, conn2)
135134
}
136135

137136
fmt.Println("Test completed")
@@ -161,16 +160,15 @@ func total() int32 {
161160
xids[1] = execQuery(conn2, "select txid_current()")
162161

163162
// register global transaction in DTMD
164-
exec(conn1, "select dtm_global_transaction($1, $2)", nodes, xids)
163+
exec(conn1, "select dtm_begin_transaction($1, $2)", nodes, xids)
165164

166165
exec(conn1, "select dtm_get_snapshot()")
167166
exec(conn2, "select dtm_get_snapshot()")
168167

169168
sum1 = execQuery(conn1, "select sum(v) from t")
170169
sum2 = execQuery(conn2, "select sum(v) from t")
171170

172-
exec(conn1, "commit")
173-
exec(conn2, "commit")
171+
commit(conn1, conn2)
174172

175173
return sum1 + sum2
176174
}

src/backend/access/transam/clog.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include "access/xloginsert.h"
4040
#include "access/xlogutils.h"
4141
#include "access/xtm.h"
42+
#include "access/xact.h"
4243
#include "storage/procarray.h"
4344
#include "miscadmin.h"
4445
#include "pg_trace.h"

src/include/access/clog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ typedef int XidStatus;
2727
#define TRANSACTION_STATUS_COMMITTED 0x01
2828
#define TRANSACTION_STATUS_ABORTED 0x02
2929
#define TRANSACTION_STATUS_SUB_COMMITTED 0x03
30+
#define TRANSACTION_STATUS_UNKNOWN 0x03
3031

3132

3233
extern void TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,

0 commit comments

Comments
 (0)