Skip to content

Commit c501386

Browse files
committed
Try locking during begin-start and get-global-snapshot.
1 parent ae47bde commit c501386

File tree

12 files changed

+80
-22
lines changed

12 files changed

+80
-22
lines changed

contrib/pg_gtm/dtmd/bin/dtmd

21.4 KB
Binary file not shown.

contrib/pg_gtm/tests/transfers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
const (
1212
TRANSFER_CONNECTIONS = 4
1313
INIT_AMOUNT = 10000
14-
N_ITERATIONS = 1000
14+
N_ITERATIONS = 10000
1515
N_ACCOUNTS = 1//100000
1616
)
1717

contrib/pg_xtm/dtmd/src/main.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,8 @@ static char *onabort(void *client, cmd_t *cmd) {
245245

246246
static void gen_snapshot(Snapshot *s, int node) {
247247
s->nactive = 0;
248-
s->xmin = s->xmax = xmax[node];
248+
s->xmin = xmax[node];
249+
s->xmax = s->xmin + 1;
249250
int i;
250251
for (i = 0; i < transactions_count; i++) {
251252
Transaction *t = transactions[i].participants + node;

contrib/pg_xtm/dtmd/src/snapshot.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ char *snapshot_serialize(Snapshot *s) {
4545
append_hex16(&cursor, s->active[i]);
4646
}
4747

48-
shout("cursor - data = %ld, len = %d\n", cursor - data, len);
48+
//shout("cursor - data = %ld, len = %d\n", cursor - data, len);
4949
assert(cursor - data == len);
5050
assert(data[len] == '\0');
5151
return data;

contrib/pg_xtm/pg_dtm--1.0.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,10 @@ CREATE FUNCTION dtm_get_snapshot() RETURNS void
99
AS 'MODULE_PATHNAME','dtm_get_snapshot'
1010
LANGUAGE C;
1111

12+
CREATE FUNCTION dtm_lock() RETURNS void
13+
AS 'MODULE_PATHNAME','dtm_lock'
14+
LANGUAGE C;
15+
16+
CREATE FUNCTION dtm_unlock() RETURNS void
17+
AS 'MODULE_PATHNAME','dtm_unlock'
18+
LANGUAGE C;

contrib/pg_xtm/pg_dtm.c

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,31 @@ static void DtmEnsureConnection(void)
6565

6666
static void DtmCopySnapshot(Snapshot dst, Snapshot src)
6767
{
68-
DtmInitSnapshot(dst);
69-
memcpy(dst->xip, src->xip, src->xcnt*sizeof(TransactionId));
70-
memcpy(dst->subxip, src->xip, src->xcnt*sizeof(TransactionId));
71-
dst->xmax = src->xmax;
72-
dst->xmin = src->xmin;
73-
dst->xcnt = src->xcnt;
74-
dst->subxcnt = src->xcnt;
75-
dst->curcid = src->curcid;
68+
int i, j, n;
69+
static TransactionId* buf;
70+
TransactionId prev = InvalidTransactionId;
71+
if (buf == NULL) {
72+
buf = (TransactionId *)malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId) * 2);
73+
}
74+
75+
GetLocalSnapshotData(dst);
76+
77+
if (dst->xmin > src->xmin) {
78+
dst->xmin = src->xmin;
79+
}
80+
if (dst->xmax > src->xmax) {
81+
dst->xmax = src->xmax;
82+
}
83+
84+
memcpy(buf, dst->xip, dst->xcnt*sizeof(TransactionId));
85+
memcpy(buf + dst->xcnt, src->xip, src->xcnt*sizeof(TransactionId));
86+
qsort(buf, dst->xcnt + src->xcnt, sizeof(TransactionId), xidComparator);
87+
for (i = 0, j = 0, n = dst->xcnt + src->xcnt; i < n && buf[i] < dst->xmax; i++) {
88+
if (buf[i] != prev) {
89+
dst->xip[j++] = prev = buf[i];
90+
}
91+
}
92+
dst->xcnt = j;
7693
}
7794

7895
static void DtmUpdateRecentXmin(void)
@@ -100,7 +117,7 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
100117
return snapshot;
101118
}
102119
snapshot = GetLocalSnapshotData(snapshot);
103-
DtmUpdateRecentXmin();
120+
// DtmUpdateRecentXmin();
104121
return snapshot;
105122
}
106123

@@ -114,7 +131,7 @@ static bool IsInDtmSnapshot(TransactionId xid)
114131

115132
static bool DtmTransactionIsInProgress(TransactionId xid)
116133
{
117-
return IsInDtmSnapshot(xid) || TransactionIdIsRunning(xid);
134+
return /*IsInDtmSnapshot(xid) || */ TransactionIdIsRunning(xid);
118135
}
119136

120137
static XidStatus DtmGetGloabalTransStatus(TransactionId xid)
@@ -216,6 +233,8 @@ PG_MODULE_MAGIC;
216233

217234
PG_FUNCTION_INFO_V1(dtm_begin_transaction);
218235
PG_FUNCTION_INFO_V1(dtm_get_snapshot);
236+
PG_FUNCTION_INFO_V1(dtm_lock);
237+
PG_FUNCTION_INFO_V1(dtm_unlock);
219238

220239
Datum
221240
dtm_begin_transaction(PG_FUNCTION_ARGS)
@@ -235,7 +254,9 @@ Datum
235254
dtm_get_snapshot(PG_FUNCTION_ARGS)
236255
{
237256
DtmEnsureConnection();
257+
LWLockAcquire(DtmLock, LW_EXCLUSIVE);
238258
DtmGlobalGetSnapshot(DtmConn, DtmNodeId, GetCurrentTransactionId(), &DtmSnapshot);
259+
LWLockRelease(DtmLock);
239260

240261
// VacuumProcArray(&DtmSnapshot);
241262

@@ -246,3 +267,16 @@ dtm_get_snapshot(PG_FUNCTION_ARGS)
246267
PG_RETURN_VOID();
247268
}
248269

270+
Datum
271+
dtm_lock(PG_FUNCTION_ARGS)
272+
{
273+
LWLockAcquire(DtmLock, LW_EXCLUSIVE);
274+
PG_RETURN_VOID();
275+
}
276+
277+
Datum
278+
dtm_unlock(PG_FUNCTION_ARGS)
279+
{
280+
LWLockRelease(DtmLock);
281+
PG_RETURN_VOID();
282+
}

contrib/pg_xtm/tests/transfers.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ func prepare_db() {
6565
exec(conn2, "create table t(u int primary key, v int)")
6666

6767
// strt transaction
68+
exec(conn1, "select dtm_lock()")
69+
exec(conn2, "select dtm_lock()")
6870
exec(conn1, "begin")
6971
exec(conn2, "begin")
7072

@@ -74,6 +76,8 @@ func prepare_db() {
7476

7577
// register global transaction in DTMD
7678
exec(conn1, "select dtm_begin_transaction($1, $2)", nodes, xids)
79+
exec(conn1, "select dtm_unlock()")
80+
exec(conn2, "select dtm_unlock()")
7781

7882
// first global statement
7983
exec(conn1, "select dtm_get_snapshot()")
@@ -113,6 +117,8 @@ func transfer(id int, wg *sync.WaitGroup) {
113117
account2 := rand.Intn(N_ACCOUNTS)
114118

115119
// strt transaction
120+
exec(conn1, "select dtm_lock()")
121+
exec(conn2, "select dtm_lock()")
116122
exec(conn1, "begin")
117123
exec(conn2, "begin")
118124

@@ -122,6 +128,8 @@ func transfer(id int, wg *sync.WaitGroup) {
122128

123129
// register global transaction in DTMD
124130
exec(conn1, "select dtm_begin_transaction($1, $2)", nodes, xids)
131+
exec(conn1, "select dtm_unlock()")
132+
exec(conn2, "select dtm_unlock()")
125133

126134
// first global statement
127135
exec(conn1, "select dtm_get_snapshot()")
@@ -152,15 +160,19 @@ func total() int32 {
152160
defer conn2.Close()
153161

154162
for {
155-
exec(conn1, "begin transaction")
156-
exec(conn2, "begin transaction")
163+
exec(conn1, "select dtm_lock()")
164+
exec(conn2, "select dtm_lock()")
165+
exec(conn1, "begin")
166+
exec(conn2, "begin")
157167

158168
// obtain XIDs of paticipants
159169
xids[0] = execQuery(conn1, "select txid_current()")
160170
xids[1] = execQuery(conn2, "select txid_current()")
161171

162172
// register global transaction in DTMD
163173
exec(conn1, "select dtm_begin_transaction($1, $2)", nodes, xids)
174+
exec(conn1, "select dtm_unlock()")
175+
exec(conn2, "select dtm_unlock()")
164176

165177
exec(conn1, "select dtm_get_snapshot()")
166178
exec(conn2, "select dtm_get_snapshot()")
@@ -207,7 +219,7 @@ func main() {
207219

208220
func exec(conn *pgx.Conn, stmt string, arguments ...interface{}) {
209221
var err error
210-
_, err = conn.Exec(stmt, arguments... )
222+
_, _ = conn.Exec(stmt, arguments... )
211223
checkErr(err)
212224
}
213225

@@ -221,7 +233,8 @@ func execQuery(conn *pgx.Conn, stmt string, arguments ...interface{}) int32 {
221233

222234
func checkErr(err error) {
223235
if err != nil {
224-
panic(err)
236+
//panic(err)
237+
fmt.Println(err)
225238
}
226239
}
227240

src/backend/access/heap/heapam.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3270,7 +3270,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
32703270
oldtup.t_data = (HeapTupleHeader) PageGetItem(page, lp);
32713271
oldtup.t_len = ItemIdGetLength(lp);
32723272
oldtup.t_self = *otid;
3273-
3273+
32743274
/* the new tuple is ready, except for this: */
32753275
newtup->t_tableOid = RelationGetRelid(relation);
32763276

@@ -3630,6 +3630,8 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
36303630
newtup->t_data->t_infomask2 |= infomask2_new_tuple;
36313631
HeapTupleHeaderSetXmax(newtup->t_data, xmax_new_tuple);
36323632

3633+
Assert(xmax_new_tuple != xid || (newtup->t_data->t_infomask & HEAP_XMAX_LOCK_ONLY) != 0);
3634+
36333635
/*
36343636
* Replace cid with a combo cid if necessary. Note that we already put
36353637
* the plain cid into the new tuple.

src/backend/executor/execMain.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2239,11 +2239,9 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode,
22392239
}
22402240

22412241
/* otherwise xmin should not be dirty... */
2242-
#if 0
22432242
if (TransactionIdIsValid(SnapshotDirty.xmin)) {
22442243
elog(ERROR, "t_xmin is uncommitted in tuple to be updated");
22452244
}
2246-
#endif
22472245

22482246
/*
22492247
* If tuple is being updated by other transaction then we have to

src/backend/executor/nodeModifyTable.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1267,6 +1267,7 @@ ExecModifyTable(ModifyTableState *node)
12671267
ItemPointerData tuple_ctid;
12681268
HeapTupleData oldtupdata;
12691269
HeapTuple oldtuple;
1270+
int n_tuples = 0;
12701271

12711272
/*
12721273
* This should NOT get called during EvalPlanQual; we should have passed a

src/backend/storage/lmgr/lmgr.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -542,8 +542,9 @@ XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid,
542542
error_context_stack = &callback;
543543
}
544544

545-
while (TransactionIdIsValid(xid))
545+
for (;;)
546546
{
547+
Assert(TransactionIdIsValid(xid));
547548
Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
548549

549550
SET_LOCKTAG_TRANSACTION(tag, xid);

src/include/storage/lwlock.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,9 @@ extern PGDLLIMPORT LWLockPadded *MainLWLockArray;
139139
#define CommitTsControlLock (&MainLWLockArray[38].lock)
140140
#define CommitTsLock (&MainLWLockArray[39].lock)
141141
#define ReplicationOriginLock (&MainLWLockArray[40].lock)
142+
#define DtmLock (&MainLWLockArray[41].lock)
142143

143-
#define NUM_INDIVIDUAL_LWLOCKS 41
144+
#define NUM_INDIVIDUAL_LWLOCKS 42
144145

145146
/*
146147
* It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS

0 commit comments

Comments
 (0)