Skip to content

Commit 262db5c

Browse files
committed
Merge branch 'xtm' of gitlab.postgrespro.ru:pgpro-dev/postgrespro into xtm
2 parents 7e2bfda + c8be45a commit 262db5c

File tree

3 files changed

+76
-77
lines changed

3 files changed

+76
-77
lines changed

contrib/pg_xtm/pg_dtm.c

Lines changed: 68 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot);
4444
static void DtmCopySnapshot(Snapshot dst, Snapshot src);
4545
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
4646
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
47-
static XidStatus DtmGetGlobalTransStatus(TransactionId xid);
4847
static void DtmUpdateRecentXmin(void);
4948
// static bool IsInDtmSnapshot(TransactionId xid);
5049
static bool DtmTransactionIsInProgress(TransactionId xid);
@@ -57,7 +56,9 @@ static bool DtmGlobalTransaction = false;
5756
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmTransactionIsInProgress };
5857
static DTMConn DtmConn;
5958

60-
#define XTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
59+
#define XTM_TRACE(fmt, ...)
60+
//#define XTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
61+
#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
6162
#define XTM_CONNECT_ATTEMPTS 10
6263

6364
static void DtmEnsureConnection(void)
@@ -81,8 +82,8 @@ static void DumpSnapshot(Snapshot s, char *name)
8182
char *cursor = buf;
8283
cursor += sprintf(
8384
cursor,
84-
"snapshot %s: xmin=%d, xmax=%d, active=[",
85-
name, s->xmin, s->xmax
85+
"snapshot %s for transaction %d: xmin=%d, xmax=%d, active=[",
86+
name, GetCurrentTransactionId(), s->xmin, s->xmax
8687
);
8788
for (i = 0; i < s->xcnt; i++) {
8889
if (i == 0) {
@@ -92,25 +93,69 @@ static void DumpSnapshot(Snapshot s, char *name)
9293
}
9394
}
9495
cursor += sprintf(cursor, "]");
95-
XTM_TRACE("%s\n", buf);
96+
XTM_INFO("%s\n", buf);
9697
}
9798

99+
static bool IsInSnapshot(Snapshot s, TransactionId xid)
100+
{
101+
int i;
102+
if (xid < s->xmin) {
103+
return false;
104+
}
105+
if (xid >= s->xmax) {
106+
return true;
107+
}
108+
for (i = 0; i < s->xcnt; i++) {
109+
if (s->xip[i] == xid) {
110+
return true;
111+
}
112+
}
113+
return false;
114+
}
115+
116+
98117
static void DtmCopySnapshot(Snapshot dst, Snapshot src)
99118
{
100119
int i, j, n;
101120
static TransactionId* buf;
102-
TransactionId prev = InvalidTransactionId;
103-
104-
XTM_TRACE("XTM: DtmCopySnapshot for transaction%u\n", GetCurrentTransactionId());
105-
DumpSnapshot(dst, "local");
106-
DumpSnapshot(src, "DTM");
121+
TransactionId xid;
107122

108123
if (buf == NULL) {
109124
buf = (TransactionId *)malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId) * 2);
110125
}
111126

127+
DumpSnapshot(dst, "local");
128+
DumpSnapshot(src, "DTM");
129+
130+
Assert(TransactionIdIsValid(src->xmin) && TransactionIdIsValid(src->xmax));
131+
132+
RefreshLocalSnapshot:
112133
GetLocalSnapshotData(dst);
134+
xid = src->xmin < dst->xmin ? src->xmin : dst->xmin;
135+
for (i = 0; i < src->xcnt; i++) {
136+
while (src->xip[i] > xid) { /* XID is completed according to global snapshot... */
137+
if (IsInSnapshot(dst, xid)) { /* ...but still marked as running in local snapshot */
138+
pg_usleep(MIN_DELAY);
139+
goto RefreshLocalSnapshot;
140+
} else {
141+
xid += 1; /* XID is also marked completed in local snapshot */
142+
}
143+
}
144+
/* XID is considered as running in global snapshot */
145+
/* doesn't matter what local snapshot thinks about it */
146+
xid = src->xip[i]+1;
147+
}
148+
while (xid < src->xmax) {
149+
if (IsInSnapshot(dst, xid)) { /* ...but still marked as running in local snapshot */
150+
pg_usleep(MIN_DELAY);
151+
goto RefreshLocalSnapshot;
152+
} else {
153+
xid += 1; /* XID is also marked completed in local snapshot */
154+
}
155+
}
156+
/* At this point we are sure that all transactions marked as completed in global snapshot are also finished locally */
113157

158+
/* merge two snapshots: produce most restrictive snapshots whihc includes running transactions from both of them */
114159
if (dst->xmin > src->xmin) {
115160
dst->xmin = src->xmin;
116161
}
@@ -120,14 +165,15 @@ static void DtmCopySnapshot(Snapshot dst, Snapshot src)
120165

121166
memcpy(buf, dst->xip, dst->xcnt*sizeof(TransactionId));
122167
memcpy(buf + dst->xcnt, src->xip, src->xcnt*sizeof(TransactionId));
123-
qsort(buf, dst->xcnt + src->xcnt, sizeof(TransactionId), xidComparator);
168+
qsort(buf, dst->xcnt + src->xcnt, sizeof(TransactionId), xidComparator);
169+
xid = InvalidTransactionId;
124170
for (i = 0, j = 0, n = dst->xcnt + src->xcnt; i < n && buf[i] < dst->xmax; i++) {
125-
if (buf[i] != prev) {
126-
dst->xip[j++] = prev = buf[i];
171+
if (buf[i] != xid) {
172+
dst->xip[j++] = xid = buf[i];
127173
}
128174
}
129175
dst->xcnt = j;
130-
DumpSnapshot(dst, "Merged");
176+
DumpSnapshot(dst, "merged");
131177
}
132178

133179
static void DtmUpdateRecentXmin(void)
@@ -157,69 +203,22 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
157203
snapshot = GetLocalSnapshotData(snapshot);
158204
if (DtmHasSnapshot) {
159205
DtmCopySnapshot(snapshot, &DtmSnapshot);
160-
//DtmUpdateRecentXmin();
206+
DtmUpdateRecentXmin();
161207
}
162208
return snapshot;
163209
}
164210

165-
static bool IsInDtmSnapshot(TransactionId xid)
166-
{
167-
return DtmHasSnapshot
168-
&& (xid >= DtmSnapshot.xmax
169-
|| bsearch(&xid, DtmSnapshot.xip, DtmSnapshot.xcnt, sizeof(TransactionId), xidComparator) != NULL);
170-
}
171211

172212
static bool DtmTransactionIsInProgress(TransactionId xid)
173213
{
174-
#if 0
175-
if (IsInDtmSnapshot(xid)) {
176-
unsigned delay = MIN_DELAY;
177-
XLogRecPtr lsn;
178-
while (CLOGTransactionIdGetStatus(xid, &lsn) == TRANSACTION_STATUS_IN_PROGRESS) {
179-
pg_usleep(delay);
180-
if (delay < MAX_DELAY) {
181-
delay *= 2;
182-
}
183-
}
184-
return false;
185-
}
186-
#endif
187214
XTM_TRACE("XTM: DtmTransactionIsInProgress \n");
188-
return TransactionIdIsRunning(xid);// || IsInDtmSnapshot(xid);
189-
}
190-
191-
static XidStatus DtmGetGlobalTransStatus(TransactionId xid)
192-
{
193-
XTM_TRACE("XTM: DtmGetGlobalTransStatus \n");
194-
while (true) {
195-
XidStatus status;
196-
DtmEnsureConnection();
197-
status = DtmGlobalGetTransStatus(DtmConn, DtmNodeId, xid, true);
198-
if (status == TRANSACTION_STATUS_IN_PROGRESS) {
199-
elog(ERROR, "DTMD reported status in progress");
200-
} else {
201-
return status;
202-
}
203-
}
215+
return TransactionIdIsRunning(xid);
204216
}
205217

206218
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
207219
{
208-
#if 0
209-
if (IsInDtmSnapshot(xid)) {
210-
return TRANSACTION_STATUS_IN_PROGRESS;
211-
}
212-
#endif
213220
XidStatus status = CLOGTransactionIdGetStatus(xid, lsn);
214221
XTM_TRACE("XTM: DtmGetTransactionStatus \n");
215-
#if 0
216-
if (status == TRANSACTION_STATUS_IN_PROGRESS) {
217-
status = DtmGetGlobalTransStatus(xid);
218-
if (status == TRANSACTION_STATUS_UNKNOWN) {
219-
status = TRANSACTION_STATUS_IN_PROGRESS;
220-
}
221-
}
222-
#endif
223222
return status;
224223
}
225224

@@ -231,20 +230,22 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
231230
if (DtmGlobalTransaction) {
232231
/* Already should be IN_PROGRESS */
233232
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
234-
235233
DtmHasSnapshot = false;
236234
DtmGlobalTransaction = false;
237235
DtmEnsureConnection();
238236
if (!DtmGlobalSetTransStatus(DtmConn, DtmNodeId, xid, status) && status != TRANSACTION_STATUS_ABORTED) {
239237
elog(ERROR, "DTMD failed to set transaction status");
240238
}
241-
status = DtmGetGlobalTransStatus(xid);
239+
status = DtmGlobalGetTransStatus(DtmConn, DtmNodeId, xid, true);
240+
XTM_INFO("Commit transaction %d\n", xid);
242241
Assert(status == TRANSACTION_STATUS_ABORTED || status == TRANSACTION_STATUS_COMMITTED);
243242
} else {
244243
elog(WARNING, "Set transaction %u status in local CLOG" , xid);
245244
}
246245
} else {
247-
XidStatus gs = DtmGetGlobalTransStatus(xid);
246+
XidStatus gs;
247+
DtmEnsureConnection();
248+
gs = DtmGlobalGetTransStatus(DtmConn, DtmNodeId, xid, false);
248249
if (gs != TRANSACTION_STATUS_UNKNOWN) {
249250
status = gs;
250251
}
@@ -260,7 +261,6 @@ void
260261
_PG_init(void)
261262
{
262263
TM = &DtmTM;
263-
// TransactionIsInCurrentSnapshot = TransactionIsInDtmSnapshot;
264264

265265
DefineCustomIntVariable("dtm.node_id",
266266
"Identifier of node in distributed cluster for DTM",
@@ -303,6 +303,7 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
303303
gtid.nodes = (NodeId*)ARR_DATA_PTR(nodes);
304304
gtid.nNodes = ArrayGetNItems(ARR_NDIM(nodes), ARR_DIMS(nodes));
305305
DtmGlobalTransaction = true;
306+
XTM_INFO("Start transaction {%d,%d} at node %d\n", gtid.xids[0], gtid.xids[1], DtmNodeId);
306307
XTM_TRACE("XTM: dtm_begin_transaction \n");
307308
if (DtmNodeId == gtid.nodes[0]) {
308309
DtmEnsureConnection();
@@ -318,8 +319,6 @@ dtm_get_snapshot(PG_FUNCTION_ARGS)
318319
DtmGlobalGetSnapshot(DtmConn, DtmNodeId, GetCurrentTransactionId(), &DtmSnapshot);
319320

320321
XTM_TRACE("XTM: dtm_get_snapshot \n");
321-
322-
/* Move it to DtmGlobalGetSnapshot? */
323322
Assert(!DtmHasSnapshot);
324323
DtmHasSnapshot = true;
325324
DtmGlobalTransaction = true;

contrib/pg_xtm/tests/transfers.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ func prepare_db() {
7777
exec(conn2, "select dtm_begin_transaction($1, $2)", nodes, xids)
7878

7979
// first global statement
80-
// exec(conn1, "select dtm_get_snapshot()")
81-
// exec(conn2, "select dtm_get_snapshot()")
80+
exec(conn1, "select dtm_get_snapshot()")
81+
exec(conn2, "select dtm_get_snapshot()")
8282

8383
for i := 0; i < N_ACCOUNTS; i++ {
8484
exec(conn1, "insert into t values($1, $2)", i, INIT_AMOUNT)
@@ -126,8 +126,8 @@ func transfer(id int, wg *sync.WaitGroup) {
126126
exec(conn2, "select dtm_begin_transaction($1, $2)", nodes, xids)
127127

128128
// first global statement
129-
// exec(conn1, "select dtm_get_snapshot()")
130-
// exec(conn2, "select dtm_get_snapshot()")
129+
exec(conn1, "select dtm_get_snapshot()")
130+
exec(conn2, "select dtm_get_snapshot()")
131131

132132
exec(conn1, "update t set v = v + $1 where u=$2", amount, account1)
133133
exec(conn2, "update t set v = v - $1 where u=$2", amount, account2)
@@ -144,13 +144,13 @@ func inspect(wg *sync.WaitGroup) {
144144
var prevSum int32 = 0
145145
var xids []int32 = make([]int32, 2)
146146

147+
for running {
147148
conn1, err := pgx.Connect(cfg1)
148149
checkErr(err)
149150

150151
conn2, err := pgx.Connect(cfg2)
151152
checkErr(err)
152153

153-
for running {
154154
exec(conn1, "begin")
155155
exec(conn2, "begin")
156156

@@ -172,12 +172,12 @@ func inspect(wg *sync.WaitGroup) {
172172

173173
sum = sum1 + sum2
174174
if (sum != prevSum) {
175-
fmt.Println("Total = ", sum)
175+
fmt.Println("Total = ", sum, "xids=", xids)
176176
prevSum = sum
177177
}
178-
}
179178
conn1.Close()
180179
conn2.Close()
180+
}
181181
wg.Done()
182182
}
183183

src/backend/storage/ipc/procarray.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -965,7 +965,7 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
965965
bool
966966
TransactionIdIsInProgress(TransactionId xid)
967967
{
968-
TM->IsInProgress(xid);
968+
return TM->IsInProgress(xid);
969969
}
970970

971971
/*

0 commit comments

Comments
 (0)