Skip to content

Commit c8be45a

Browse files
committed
New snapshot merge algorithm
1 parent aea4e65 commit c8be45a

File tree

1 file changed

+62
-51
lines changed

1 file changed

+62
-51
lines changed

contrib/pg_xtm/pg_dtm.c

Lines changed: 62 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -96,43 +96,83 @@ static void DumpSnapshot(Snapshot s, char *name)
9696
XTM_INFO("%s\n", buf);
9797
}
9898

99-
/* DTM snapshot is sorted, so we can use bsearch */
100-
static bool IsInDtmSnapshot(Snapshot s, TransactionId xid)
99+
static bool IsInSnapshot(Snapshot s, TransactionId xid)
101100
{
102-
return (xid >= s->xmax
103-
|| bsearch(&xid, s->xip, s->xcnt, sizeof(TransactionId), xidComparator) != NULL);
101+
int i;
102+
if (xid < s->xmin) {
103+
return false;
104+
}
105+
if (xid >= s->xmax) {
106+
return true;
107+
}
108+
for (i = 0; i < s->xcnt; i++) {
109+
if (s->xip[i] == xid) {
110+
return true;
111+
}
112+
}
113+
return false;
104114
}
105115

106116

107117
static void DtmCopySnapshot(Snapshot dst, Snapshot src)
108118
{
109-
int i;
119+
int i, j, n;
120+
static TransactionId* buf;
110121
TransactionId xid;
111122

123+
if (buf == NULL) {
124+
buf = (TransactionId *)malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId) * 2);
125+
}
126+
112127
DumpSnapshot(dst, "local");
113128
DumpSnapshot(src, "DTM");
114129

115-
Wait:
116-
while (true) {
117-
GetLocalSnapshotData(dst);
118-
for (i = 0; i < dst->xcnt && IsInDtmSnapshot(src, dst->xip[i]); i++);
119-
if (i == dst->xcnt) {
120-
break;
121-
}
122-
pg_usleep(MIN_DELAY);
130+
Assert(TransactionIdIsValid(src->xmin) && TransactionIdIsValid(src->xmax));
131+
132+
RefreshLocalSnapshot:
133+
GetLocalSnapshotData(dst);
134+
xid = src->xmin < dst->xmin ? src->xmin : dst->xmin;
135+
for (i = 0; i < src->xcnt; i++) {
136+
while (src->xip[i] > xid) { /* XID is completed according to global snapshot... */
137+
if (IsInSnapshot(dst, xid)) { /* ...but still marked as running in local snapshot */
138+
pg_usleep(MIN_DELAY);
139+
goto RefreshLocalSnapshot;
140+
} else {
141+
xid += 1; /* XID is also marked completed in local snapshot */
142+
}
143+
}
144+
/* XID is considered as running in global snapshot */
145+
/* doesn't matter what local snapshot thinks about it */
146+
xid = src->xip[i]+1;
123147
}
124-
for (xid = dst->xmax; xid < src->xmax; xid++) {
125-
if (!IsInDtmSnapshot(src, xid)) {
148+
while (xid < src->xmax) {
149+
if (IsInSnapshot(dst, xid)) { /* ...but still marked as running in local snapshot */
126150
pg_usleep(MIN_DELAY);
127-
goto Wait;
151+
goto RefreshLocalSnapshot;
152+
} else {
153+
xid += 1; /* XID is also marked completed in local snapshot */
128154
}
129-
}
155+
}
156+
/* At this point we are sure that all transactions marked as completed in global snapshot are also finished locally */
130157

158+
/* merge two snapshots: produce most restrictive snapshots whihc includes running transactions from both of them */
159+
if (dst->xmin > src->xmin) {
160+
dst->xmin = src->xmin;
161+
}
162+
if (dst->xmax > src->xmax) {
163+
dst->xmax = src->xmax;
164+
}
131165

132-
memcpy(dst->xip, src->xip, src->xcnt*sizeof(TransactionId));
133-
dst->xmin = src->xmin;
134-
dst->xmax = src->xmax;
135-
dst->xcnt = src->xcnt;
166+
memcpy(buf, dst->xip, dst->xcnt*sizeof(TransactionId));
167+
memcpy(buf + dst->xcnt, src->xip, src->xcnt*sizeof(TransactionId));
168+
qsort(buf, dst->xcnt + src->xcnt, sizeof(TransactionId), xidComparator);
169+
xid = InvalidTransactionId;
170+
for (i = 0, j = 0, n = dst->xcnt + src->xcnt; i < n && buf[i] < dst->xmax; i++) {
171+
if (buf[i] != xid) {
172+
dst->xip[j++] = xid = buf[i];
173+
}
174+
}
175+
dst->xcnt = j;
136176
DumpSnapshot(dst, "merged");
137177
}
138178

@@ -171,41 +211,15 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
171211

172212
static bool DtmTransactionIsInProgress(TransactionId xid)
173213
{
174-
#if 0
175-
if (IsInDtmSnapshot(xid)) {
176-
unsigned delay = MIN_DELAY;
177-
XLogRecPtr lsn;
178-
while (CLOGTransactionIdGetStatus(xid, &lsn) == TRANSACTION_STATUS_IN_PROGRESS) {
179-
pg_usleep(delay);
180-
if (delay < MAX_DELAY) {
181-
delay *= 2;
182-
}
183-
}
184-
return false;
185-
}
186-
#endif
187214
XTM_TRACE("XTM: DtmTransactionIsInProgress \n");
188-
return TransactionIdIsRunning(xid);// || IsInDtmSnapshot(xid);
215+
return TransactionIdIsRunning(xid);
189216
}
190217

191218

192219
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
193220
{
194-
#if 0
195-
if (IsInDtmSnapshot(xid)) {
196-
return TRANSACTION_STATUS_IN_PROGRESS;
197-
}
198-
#endif
199221
XidStatus status = CLOGTransactionIdGetStatus(xid, lsn);
200222
XTM_TRACE("XTM: DtmGetTransactionStatus \n");
201-
#if 0
202-
if (status == TRANSACTION_STATUS_IN_PROGRESS) {
203-
status = DtmGetGloabalTransStatus(xid);
204-
if (status == TRANSACTION_STATUS_UNKNOWN) {
205-
status = TRANSACTION_STATUS_IN_PROGRESS;
206-
}
207-
}
208-
#endif
209223
return status;
210224
}
211225

@@ -223,7 +237,6 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
223237
if (!DtmGlobalSetTransStatus(DtmConn, DtmNodeId, xid, status) && status != TRANSACTION_STATUS_ABORTED) {
224238
elog(ERROR, "DTMD failed to set transaction status");
225239
}
226-
DtmEnsureConnection();
227240
status = DtmGlobalGetTransStatus(DtmConn, DtmNodeId, xid, true);
228241
XTM_INFO("Commit transaction %d\n", xid);
229242
Assert(status == TRANSACTION_STATUS_ABORTED || status == TRANSACTION_STATUS_COMMITTED);
@@ -249,7 +262,6 @@ void
249262
_PG_init(void)
250263
{
251264
TM = &DtmTM;
252-
// TransactionIsInCurrentSnapshot = TransactionIsInDtmSnapshot;
253265

254266
DefineCustomIntVariable("dtm.node_id",
255267
"Identifier of node in distributed cluster for DTM",
@@ -308,7 +320,6 @@ dtm_get_snapshot(PG_FUNCTION_ARGS)
308320
DtmGlobalGetSnapshot(DtmConn, DtmNodeId, GetCurrentTransactionId(), &DtmSnapshot);
309321

310322
XTM_TRACE("XTM: dtm_get_snapshot \n");
311-
/* Move it to DtmGlobalGetSnapshot? */
312323
Assert(!DtmHasSnapshot);
313324
DtmHasSnapshot = true;
314325
DtmGlobalTransaction = true;

0 commit comments

Comments
 (0)