Skip to content

Commit 10f125a

Browse files
committed
Yet another algorithm of merging global and local snapshots
1 parent 4815181 commit 10f125a

File tree

2 files changed

+32
-39
lines changed

2 files changed

+32
-39
lines changed

contrib/pg_xtm/pg_dtm.c

Lines changed: 28 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@ static void DtmCopySnapshot(Snapshot dst, Snapshot src);
4545
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
4646
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
4747
static void DtmUpdateRecentXmin(void);
48-
// static bool IsInDtmSnapshot(TransactionId xid);
4948
static bool DtmTransactionIsInProgress(TransactionId xid);
49+
static bool TransactionIdIsInDtmSnapshot(Snapshot s, TransactionId xid);
50+
static bool TransactionIdIsInDoubt(Snapshot s, TransactionId xid);
5051

5152
static NodeId DtmNodeId;
5253
static DTMConn DtmConn;
@@ -58,7 +59,7 @@ static DTMConn DtmConn;
5859

5960
#define XTM_TRACE(fmt, ...)
6061
//#define XTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
61-
#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
62+
#define XTM_INFO(fmt, ...) elog(WARNING, fmt, ## __VA_ARGS__)
6263
#define XTM_CONNECT_ATTEMPTS 10
6364

6465
static void DtmEnsureConnection(void)
@@ -96,24 +97,26 @@ static void DumpSnapshot(Snapshot s, char *name)
9697
XTM_INFO("%s\n", buf);
9798
}
9899

99-
static bool IsInSnapshot(Snapshot s, TransactionId xid)
100+
static bool TransactionIdIsInDtmSnapshot(Snapshot s, TransactionId xid)
100101
{
101-
int i;
102-
if (xid < s->xmin) {
103-
return false;
104-
}
105-
if (xid >= s->xmax) {
106-
return true;
107-
}
108-
for (i = 0; i < s->xcnt; i++) {
109-
if (s->xip[i] == xid) {
102+
return xid >= s->xmax
103+
|| bsearch(&xid, s->xip, s->xcnt, sizeof(TransactionId), xidComparator) != NULL;
104+
}
105+
106+
static bool TransactionIdIsInDoubt(Snapshot s, TransactionId xid)
107+
{
108+
if (!TransactionIdIsInDtmSnapshot(s, xid)) {
109+
XLogRecPtr lsn;
110+
XidStatus status = CLOGTransactionIdGetStatus(xid, &lsn);
111+
if (status != TRANSACTION_STATUS_IN_PROGRESS) {
112+
XTM_INFO("Wait for transaction %d to complete\n", xid);
113+
XactLockTableWait(xid, NULL, NULL, XLTW_None);
110114
return true;
111115
}
112116
}
113117
return false;
114118
}
115-
116-
119+
117120
static void DtmCopySnapshot(Snapshot dst, Snapshot src)
118121
{
119122
int i, j, n;
@@ -129,33 +132,21 @@ static void DtmCopySnapshot(Snapshot dst, Snapshot src)
129132

130133
Assert(TransactionIdIsValid(src->xmin) && TransactionIdIsValid(src->xmax));
131134

135+
/* Check that globall competed transactions are not included in local snapshot */
132136
RefreshLocalSnapshot:
133137
GetLocalSnapshotData(dst);
134-
xid = src->xmin < dst->xmin ? src->xmin : dst->xmin;
135-
for (i = 0; i < src->xcnt; i++) {
136-
while (src->xip[i] > xid) { /* XID is completed according to global snapshot... */
137-
if (IsInSnapshot(dst, xid)) { /* ...but still marked as running in local snapshot */
138-
pg_usleep(MIN_DELAY);
139-
goto RefreshLocalSnapshot;
140-
} else {
141-
xid += 1; /* XID is also marked completed in local snapshot */
142-
}
143-
}
144-
/* XID is considered as running in global snapshot */
145-
/* doesn't matter what local snapshot thinks about it */
146-
xid = src->xip[i]+1;
138+
for (i = 0; i < dst->xcnt; i++) {
139+
if (TransactionIdIsInDoubt(src, dst->xip[i])) {
140+
goto RefreshLocalSnapshot;
141+
}
147142
}
148-
while (xid < src->xmax) {
149-
if (IsInSnapshot(dst, xid)) { /* ...but still marked as running in local snapshot */
150-
pg_usleep(MIN_DELAY);
143+
for (xid = dst->xmax; xid < src->xmax; xid++) {
144+
if (TransactionIdIsInDoubt(src, xid)) {
151145
goto RefreshLocalSnapshot;
152-
} else {
153-
xid += 1; /* XID is also marked completed in local snapshot */
154146
}
155-
}
156-
/* At this point we are sure that all transactions marked as completed in global snapshot are also finished locally */
147+
}
157148

158-
/* merge two snapshots: produce most restrictive snapshots whihc includes running transactions from both of them */
149+
/* Merge two snapshots: produce most restrictive snapshots whihc includes running transactions from both of them */
159150
if (dst->xmin > src->xmin) {
160151
dst->xmin = src->xmin;
161152
}
@@ -233,6 +224,8 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
233224
DtmHasSnapshot = false;
234225
DtmGlobalTransaction = false;
235226
DtmEnsureConnection();
227+
XTM_INFO("Begin commit transaction %d\n", xid);
228+
CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_COMMITTED, lsn);
236229
if (!DtmGlobalSetTransStatus(DtmConn, DtmNodeId, xid, status, true) && status != TRANSACTION_STATUS_ABORTED) {
237230
elog(ERROR, "DTMD failed to set transaction status");
238231
}

contrib/pg_xtm/tests/transfers.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ func prepare_db() {
7777
exec(conn2, "select dtm_begin_transaction($1, $2)", nodes, xids)
7878

7979
// first global statement
80-
exec(conn1, "select dtm_get_snapshot()")
81-
exec(conn2, "select dtm_get_snapshot()")
80+
//exec(conn1, "select dtm_get_snapshot()")
81+
//exec(conn2, "select dtm_get_snapshot()")
8282

8383
for i := 0; i < N_ACCOUNTS; i++ {
8484
exec(conn1, "insert into t values($1, $2)", i, INIT_AMOUNT)
@@ -126,8 +126,8 @@ func transfer(id int, wg *sync.WaitGroup) {
126126
exec(conn2, "select dtm_begin_transaction($1, $2)", nodes, xids)
127127

128128
// first global statement
129-
exec(conn1, "select dtm_get_snapshot()")
130-
exec(conn2, "select dtm_get_snapshot()")
129+
// exec(conn1, "select dtm_get_snapshot()")
130+
// exec(conn2, "select dtm_get_snapshot()")
131131

132132
exec(conn1, "update t set v = v + $1 where u=$2", amount, account1)
133133
exec(conn2, "update t set v = v - $1 where u=$2", amount, account2)

0 commit comments

Comments
 (0)