Skip to content

Commit aea4e65

Browse files
committed
Wait until commit of local transaction
1 parent 58e3b4a commit aea4e65

File tree

2 files changed

+41
-52
lines changed

2 files changed

+41
-52
lines changed

contrib/pg_xtm/pg_dtm.c

Lines changed: 37 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot);
4444
static void DtmCopySnapshot(Snapshot dst, Snapshot src);
4545
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
4646
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
47-
static XidStatus DtmGetGloabalTransStatus(TransactionId xid);
4847
static void DtmUpdateRecentXmin(void);
4948
// static bool IsInDtmSnapshot(TransactionId xid);
5049
static bool DtmTransactionIsInProgress(TransactionId xid);
@@ -59,6 +58,7 @@ static DTMConn DtmConn;
5958

6059
#define XTM_TRACE(fmt, ...)
6160
//#define XTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
61+
#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
6262
#define XTM_CONNECT_ATTEMPTS 10
6363

6464
static void DtmEnsureConnection(void)
@@ -93,40 +93,46 @@ static void DumpSnapshot(Snapshot s, char *name)
9393
}
9494
}
9595
cursor += sprintf(cursor, "]");
96-
printf("%s\n", buf);
96+
XTM_INFO("%s\n", buf);
9797
}
9898

99+
/* DTM snapshot is sorted, so we can use bsearch */
100+
static bool IsInDtmSnapshot(Snapshot s, TransactionId xid)
101+
{
102+
return (xid >= s->xmax
103+
|| bsearch(&xid, s->xip, s->xcnt, sizeof(TransactionId), xidComparator) != NULL);
104+
}
105+
106+
99107
static void DtmCopySnapshot(Snapshot dst, Snapshot src)
100108
{
101-
int i, j, n;
102-
static TransactionId* buf;
103-
TransactionId prev = InvalidTransactionId;
109+
int i;
110+
TransactionId xid;
104111

105112
DumpSnapshot(dst, "local");
106113
DumpSnapshot(src, "DTM");
107114

108-
if (buf == NULL) {
109-
buf = (TransactionId *)malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId) * 2);
110-
}
111-
112-
GetLocalSnapshotData(dst);
113-
114-
if (dst->xmin > src->xmin) {
115-
dst->xmin = src->xmin;
116-
}
117-
if (dst->xmax > src->xmax) {
118-
dst->xmax = src->xmax;
115+
Wait:
116+
while (true) {
117+
GetLocalSnapshotData(dst);
118+
for (i = 0; i < dst->xcnt && IsInDtmSnapshot(src, dst->xip[i]); i++);
119+
if (i == dst->xcnt) {
120+
break;
121+
}
122+
pg_usleep(MIN_DELAY);
119123
}
120-
121-
memcpy(buf, dst->xip, dst->xcnt*sizeof(TransactionId));
122-
memcpy(buf + dst->xcnt, src->xip, src->xcnt*sizeof(TransactionId));
123-
qsort(buf, dst->xcnt + src->xcnt, sizeof(TransactionId), xidComparator);
124-
for (i = 0, j = 0, n = dst->xcnt + src->xcnt; i < n && buf[i] < dst->xmax; i++) {
125-
if (buf[i] != prev) {
126-
dst->xip[j++] = prev = buf[i];
124+
for (xid = dst->xmax; xid < src->xmax; xid++) {
125+
if (!IsInDtmSnapshot(src, xid)) {
126+
pg_usleep(MIN_DELAY);
127+
goto Wait;
127128
}
128129
}
129-
dst->xcnt = j;
130+
131+
132+
memcpy(dst->xip, src->xip, src->xcnt*sizeof(TransactionId));
133+
dst->xmin = src->xmin;
134+
dst->xmax = src->xmax;
135+
dst->xcnt = src->xcnt;
130136
DumpSnapshot(dst, "merged");
131137
}
132138

@@ -162,12 +168,6 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
162168
return snapshot;
163169
}
164170

165-
static bool IsInDtmSnapshot(TransactionId xid)
166-
{
167-
return DtmHasSnapshot
168-
&& (xid >= DtmSnapshot.xmax
169-
|| bsearch(&xid, DtmSnapshot.xip, DtmSnapshot.xcnt, sizeof(TransactionId), xidComparator) != NULL);
170-
}
171171

172172
static bool DtmTransactionIsInProgress(TransactionId xid)
173173
{
@@ -188,20 +188,6 @@ static bool DtmTransactionIsInProgress(TransactionId xid)
188188
return TransactionIdIsRunning(xid);// || IsInDtmSnapshot(xid);
189189
}
190190

191-
static XidStatus DtmGetGloabalTransStatus(TransactionId xid)
192-
{
193-
XTM_TRACE("XTM: DtmGetGloabalTransStatus \n");
194-
while (true) {
195-
XidStatus status;
196-
DtmEnsureConnection();
197-
status = DtmGlobalGetTransStatus(DtmConn, DtmNodeId, xid, true);
198-
if (status == TRANSACTION_STATUS_IN_PROGRESS) {
199-
elog(ERROR, "DTMD reported status in progress");
200-
} else {
201-
return status;
202-
}
203-
}
204-
}
205191

206192
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
207193
{
@@ -231,20 +217,23 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
231217
if (DtmGlobalTransaction) {
232218
/* Already should be IN_PROGRESS */
233219
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
234-
235220
DtmHasSnapshot = false;
236221
DtmGlobalTransaction = false;
237222
DtmEnsureConnection();
238223
if (!DtmGlobalSetTransStatus(DtmConn, DtmNodeId, xid, status) && status != TRANSACTION_STATUS_ABORTED) {
239224
elog(ERROR, "DTMD failed to set transaction status");
240225
}
241-
status = DtmGetGloabalTransStatus(xid);
226+
DtmEnsureConnection();
227+
status = DtmGlobalGetTransStatus(DtmConn, DtmNodeId, xid, true);
228+
XTM_INFO("Commit transaction %d\n", xid);
242229
Assert(status == TRANSACTION_STATUS_ABORTED || status == TRANSACTION_STATUS_COMMITTED);
243230
} else {
244231
elog(WARNING, "Set transaction %u status in local CLOG" , xid);
245232
}
246233
} else {
247-
XidStatus gs = DtmGetGloabalTransStatus(xid);
234+
XidStatus gs;
235+
DtmEnsureConnection();
236+
gs = DtmGlobalGetTransStatus(DtmConn, DtmNodeId, xid, false);
248237
if (gs != TRANSACTION_STATUS_UNKNOWN) {
249238
status = gs;
250239
}
@@ -303,6 +292,7 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
303292
gtid.nodes = (NodeId*)ARR_DATA_PTR(nodes);
304293
gtid.nNodes = ArrayGetNItems(ARR_NDIM(nodes), ARR_DIMS(nodes));
305294
DtmGlobalTransaction = true;
295+
XTM_INFO("Start transaction {%d,%d} at node %d\n", gtid.xids[0], gtid.xids[1], DtmNodeId);
306296
XTM_TRACE("XTM: dtm_begin_transaction \n");
307297
if (DtmNodeId == gtid.nodes[0]) {
308298
DtmEnsureConnection();
@@ -318,7 +308,6 @@ dtm_get_snapshot(PG_FUNCTION_ARGS)
318308
DtmGlobalGetSnapshot(DtmConn, DtmNodeId, GetCurrentTransactionId(), &DtmSnapshot);
319309

320310
XTM_TRACE("XTM: dtm_get_snapshot \n");
321-
322311
/* Move it to DtmGlobalGetSnapshot? */
323312
Assert(!DtmHasSnapshot);
324313
DtmHasSnapshot = true;

contrib/pg_xtm/tests/transfers.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ func prepare_db() {
7777
exec(conn2, "select dtm_begin_transaction($1, $2)", nodes, xids)
7878

7979
// first global statement
80-
// exec(conn1, "select dtm_get_snapshot()")
81-
// exec(conn2, "select dtm_get_snapshot()")
80+
exec(conn1, "select dtm_get_snapshot()")
81+
exec(conn2, "select dtm_get_snapshot()")
8282

8383
for i := 0; i < N_ACCOUNTS; i++ {
8484
exec(conn1, "insert into t values($1, $2)", i, INIT_AMOUNT)
@@ -126,8 +126,8 @@ func transfer(id int, wg *sync.WaitGroup) {
126126
exec(conn2, "select dtm_begin_transaction($1, $2)", nodes, xids)
127127

128128
// first global statement
129-
// exec(conn1, "select dtm_get_snapshot()")
130-
// exec(conn2, "select dtm_get_snapshot()")
129+
exec(conn1, "select dtm_get_snapshot()")
130+
exec(conn2, "select dtm_get_snapshot()")
131131

132132
exec(conn1, "update t set v = v + $1 where u=$2", amount, account1)
133133
exec(conn2, "update t set v = v - $1 where u=$2", amount, account2)

0 commit comments

Comments
 (0)