Skip to content

Commit dbad0e8

Browse files
committed
Support work with large snapshots
1 parent 5317b56 commit dbad0e8

File tree

5 files changed

+39
-20
lines changed

5 files changed

+39
-20
lines changed

contrib/pg_dtm/libdtm.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ void DtmInitSnapshot(Snapshot snapshot)
301301
* we are in recovery, see later comments.
302302
*/
303303
snapshot->xip = (TransactionId *)
304-
malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
304+
malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
305305
if (snapshot->xip == NULL)
306306
ereport(ERROR,
307307
(errcode(ERRCODE_OUT_OF_MEMORY),

contrib/pg_dtm/pg_dtm.c

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -205,22 +205,39 @@ static void DtmMergeWithGlobalSnapshot(Snapshot dst)
205205
if (src->xmin < dst->xmin) {
206206
dst->xmin = src->xmin;
207207
}
208-
209-
n = dst->xcnt;
210-
Assert(src->xcnt + n <= GetMaxSnapshotXidCount());
211-
memcpy(dst->xip + n, src->xip, src->xcnt*sizeof(TransactionId));
212-
n += src->xcnt;
213-
214-
qsort(dst->xip, n, sizeof(TransactionId), xidComparator);
215-
xid = InvalidTransactionId;
216-
217-
for (i = 0, j = 0; i < n && dst->xip[i] < dst->xmax; i++) {
218-
if (dst->xip[i] != xid) {
219-
dst->xip[j++] = xid = dst->xip[i];
220-
}
221-
}
222-
dst->xcnt = j;
223-
208+
Assert(src->subxcnt == 0);
209+
210+
if (src->xcnt + dst->subxcnt + dst->xcnt <= GetMaxSnapshotXidCount()) {
211+
Assert(dst->subxcnt == 0);
212+
memcpy(dst->xip + dst->xcnt, src->xip, src->xcnt*sizeof(TransactionId));
213+
n = dst->xcnt + src->xcnt;
214+
215+
qsort(dst->xip, n, sizeof(TransactionId), xidComparator);
216+
xid = InvalidTransactionId;
217+
218+
for (i = 0, j = 0; i < n && dst->xip[i] < dst->xmax; i++) {
219+
if (dst->xip[i] != xid) {
220+
dst->xip[j++] = xid = dst->xip[i];
221+
}
222+
}
223+
dst->xcnt = j;
224+
} else {
225+
Assert(src->xcnt + dst->subxcnt + dst->xcnt <= GetMaxSnapshotSubxidCount());
226+
memcpy(dst->subxip + dst->subxcnt, dst->xip, dst->xcnt*sizeof(TransactionId));
227+
memcpy(dst->subxip + dst->subxcnt + dst->xcnt, src->xip, src->xcnt*sizeof(TransactionId));
228+
n = dst->xcnt + dst->subxcnt + src->xcnt;
229+
230+
qsort(dst->subxip, n, sizeof(TransactionId), xidComparator);
231+
xid = InvalidTransactionId;
232+
233+
for (i = 0, j = 0; i < n && dst->subxip[i] < dst->xmax; i++) {
234+
if (dst->subxip[i] != xid) {
235+
dst->subxip[j++] = xid = dst->subxip[i];
236+
}
237+
}
238+
dst->subxcnt = j;
239+
dst->xcnt = 0;
240+
}
224241
DumpSnapshot(dst, "merged");
225242
}
226243

contrib/pg_dtm/sockhub/start-clients.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
n_clients=100
1+
n_clients=200
22
n_iters=100000
33
./sockhub -h $1 -p 5001 -f /tmp/p5002 &
44
for ((i=0;i<n_clients;i++))

contrib/pg_dtm/tests/transfers.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,8 @@ func prepare_one(connstr string, wg *sync.WaitGroup) {
207207
}
208208
exec(conn, "drop table if exists t")
209209
exec(conn, "create table t(u int primary key, v int)")
210-
210+
exec(conn, "insert into t (select generate_series(0,1000000), $1)", cfg.Accounts.Balance)
211+
/*
211212
exec(conn, "begin transaction isolation level " + cfg.Isolation)
212213
213214
start := time.Now()
@@ -223,7 +224,7 @@ func prepare_one(connstr string, wg *sync.WaitGroup) {
223224
start = time.Now()
224225
}
225226
}
226-
227+
*/
227228
exec(conn, "commit")
228229
wg.Done()
229230
}

contrib/pg_dtm/tests/transfers.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ go run transfers.go \
33
-d 'dbname=postgres port=5432' \
44
-d 'dbname=postgres port=5433' \
55
-m \
6+
-w 128 \
67
-g

0 commit comments

Comments
 (0)