Skip to content

Commit 610f7c5

Browse files
committed
Fix handlings of subtransactions
1 parent 7841007 commit 610f7c5

File tree

4 files changed

+132
-38
lines changed

4 files changed

+132
-38
lines changed

pg_dtm--1.0.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,7 @@ LANGUAGE C;
2020
CREATE FUNCTION dtm_end_prepare(gtid cstring, csn bigint) RETURNS void
2121
AS 'MODULE_PATHNAME','dtm_end_prepare'
2222
LANGUAGE C;
23+
24+
CREATE FUNCTION dtm_get_csn(xid integer) RETURNS bigint
25+
AS 'MODULE_PATHNAME','dtm_get_csn'
26+
LANGUAGE C;

pg_dtm.c

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ static bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
8585
static TransactionId DtmAdjustOldestXid(TransactionId xid);
8686
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
8787
static bool DtmDetectGlobalDeadLock(PGPROC* proc);
88+
static cid_t DtmGetCsn(TransactionId xid);
8889

8990
static TransactionManager DtmTM = { PgTransactionIdGetStatus, DtmSetTransactionStatus, DtmGetSnapshot, PgGetNewTransactionId, DtmGetOldestXmin, PgTransactionIdIsInProgress, PgGetGlobalTransactionId, DtmXidInMVCCSnapshot, DtmDetectGlobalDeadLock };
9091

@@ -308,6 +309,7 @@ PG_FUNCTION_INFO_V1(dtm_access);
308309
PG_FUNCTION_INFO_V1(dtm_begin_prepare);
309310
PG_FUNCTION_INFO_V1(dtm_prepare);
310311
PG_FUNCTION_INFO_V1(dtm_end_prepare);
312+
PG_FUNCTION_INFO_V1(dtm_get_csn);
311313

312314
Datum
313315
dtm_extend(PG_FUNCTION_ARGS)
@@ -357,7 +359,13 @@ dtm_end_prepare(PG_FUNCTION_ARGS)
357359
PG_RETURN_VOID();
358360
}
359361

360-
362+
Datum
363+
dtm_get_csn(PG_FUNCTION_ARGS)
364+
{
365+
TransactionId xid = PG_GETARG_INT32(0);
366+
cid_t csn = DtmGetCsn(xid);
367+
PG_RETURN_INT64(csn);
368+
}
361369
/*
362370
* ***************************************************************************
363371
*/
@@ -392,13 +400,22 @@ static int dtm_gtid_match_fn(const void *key1, const void *key2, Size keysize)
392400
return strcmp((GlobalTransactionId)key1, (GlobalTransactionId)key2);
393401
}
394402

395-
static void IncludeInTransactionList(DtmTransStatus* ts)
403+
static void DtmTransactionListAppend(DtmTransStatus* ts)
396404
{
397405
ts->next = NULL;
398406
*local->trans_list_tail = ts;
399407
local->trans_list_tail = &ts->next;
400408
}
401409

410+
static void DtmTransactionListInsertAfter(DtmTransStatus* after, DtmTransStatus* ts)
411+
{
412+
ts->next = after->next;
413+
after->next = ts;
414+
if (local->trans_list_tail == &after->next) {
415+
local->trans_list_tail = &ts->next;
416+
}
417+
}
418+
402419
static TransactionId DtmAdjustOldestXid(TransactionId xid)
403420
{
404421
if (TransactionIdIsValid(xid)) {
@@ -459,9 +476,12 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
459476
while (true)
460477
{
461478
DtmTransStatus* ts = (DtmTransStatus*)hash_search(xid2status, &xid, HASH_FIND, NULL);
462-
if (ts == NULL && TransactionIdFollowsOrEquals(xid, TransactionXmin)) {
463-
xid = SubTransGetTopmostTransaction(xid);
464-
ts = (DtmTransStatus*)hash_search(xid2status, &xid, HASH_FIND, NULL);
479+
if (ts == NULL &&
480+
!(TransactionIdFollowsOrEquals(xid, snapshot->xmax) || TransactionIdPrecedes(xid, snapshot->xmin)))
481+
{
482+
//TransactionIdFollowsOrEquals(xid, TransactionXmin)) {
483+
TransactionId subxid = SubTransGetTopmostTransaction(xid);
484+
ts = (DtmTransStatus*)hash_search(xid2status, &subxid, HASH_FIND, NULL);
465485
}
466486
if (ts != NULL)
467487
{
@@ -618,7 +638,7 @@ void DtmLocalBeginPrepare(GlobalTransactionId gtid)
618638
ts = (DtmTransStatus*)hash_search(xid2status, &id->xid, HASH_ENTER, NULL);
619639
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
620640
ts->cid = dtm_get_cid();
621-
IncludeInTransactionList(ts);
641+
DtmTransactionListAppend(ts);
622642
}
623643
SpinLockRelease(&local->lock);
624644
}
@@ -685,7 +705,7 @@ void DtmLocalCommit(DtmTransState* x)
685705
} else if (!found) {
686706
//Assert(!found);
687707
ts->cid = dtm_get_cid();
688-
IncludeInTransactionList(ts);
708+
DtmTransactionListAppend(ts);
689709
}
690710
x->cid = ts->cid;
691711
ts->status = TRANSACTION_STATUS_COMMITTED;
@@ -724,7 +744,7 @@ void DtmLocalAbort(DtmTransState* x)
724744
} else if (!found) {
725745
//Assert(!found);
726746
ts->cid = dtm_get_cid();
727-
IncludeInTransactionList(ts);
747+
DtmTransactionListAppend(ts);
728748
}
729749
x->cid = ts->cid;
730750
ts->status = TRANSACTION_STATUS_ABORTED;
@@ -758,8 +778,7 @@ void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *sub
758778
Assert(!found);
759779
sts->status = status;
760780
sts->cid = ts->cid;
761-
sts->next = ts->next;
762-
ts->next = sts;
781+
DtmTransactionListInsertAfter(ts, sts);
763782
}
764783
}
765784
SpinLockRelease(&local->lock);
@@ -772,3 +791,18 @@ bool DtmDetectGlobalDeadLock(PGPROC* proc)
772791
elog(WARNING, "Global deadlock?");
773792
return true;
774793
}
794+
795+
static cid_t DtmGetCsn(TransactionId xid)
796+
{
797+
cid_t csn = 0;
798+
SpinLockAcquire(&local->lock);
799+
{
800+
DtmTransStatus* ts = (DtmTransStatus*)hash_search(xid2status, &xid, HASH_FIND, NULL);
801+
if (ts != NULL) {
802+
csn = ts->cid;
803+
}
804+
}
805+
SpinLockRelease(&local->lock);
806+
return csn;
807+
}
808+

tests/dtmbench.cpp

Lines changed: 83 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,14 @@ struct thread
4141
pthread_t t;
4242
size_t proceeded;
4343
size_t aborts;
44+
time_t max_trans_duration;
4445
int id;
4546

4647
void start(int tid, thread_proc_t proc) {
4748
id = tid;
4849
proceeded = 0;
4950
aborts = 0;
51+
max_trans_duration = 0;
5052
pthread_create(&t, NULL, proc, this);
5153
}
5254

@@ -63,6 +65,9 @@ struct config
6365
int nAccounts;
6466
int startId;
6567
int diapason;
68+
bool deadlockFree;
69+
bool maxSnapshot;
70+
bool makeSavepoints;
6671
vector<string> connections;
6772

6873
config() {
@@ -72,6 +77,8 @@ struct config
7277
nAccounts = 100000;
7378
startId = 0;
7479
diapason = 100000;
80+
deadlockFree = false;
81+
makeSavepoints = false;
7582
}
7683
};
7784

@@ -87,6 +94,9 @@ static time_t getCurrentTime()
8794
return (time_t)tv.tv_sec*USEC + tv.tv_usec;
8895
}
8996

97+
inline csn_t max(csn_t t1, csn_t t2) {
98+
return t1 < t2 ? t2 : t1;
99+
}
90100

91101
void exec(transaction_base& txn, char const* sql, ...)
92102
{
@@ -119,27 +129,41 @@ void* reader(void* arg)
119129
int64_t prevSum = 0;
120130

121131
while (running) {
122-
csn_t snapshot;
132+
csn_t snapshot = 0;
123133
vector< unique_ptr<work> > txns(conns.size());
134+
time_t start = getCurrentTime();
124135
for (size_t i = 0; i < conns.size(); i++) {
125136
txns[i] = new work(*conns[i]);
126137
}
127-
for (size_t i = 0; i < txns.size(); i++) {
128-
if (i == 0) {
129-
snapshot = execQuery(*txns[i], "select dtm_extend()");
130-
} else {
131-
snapshot = execQuery(*txns[i], "select dtm_access(%ld)", snapshot);
138+
if (cfg.maxSnapshot) {
139+
for (size_t i = 0; i < txns.size(); i++) {
140+
snapshot = max(snapshot, execQuery(*txns[i], "select dtm_extend()"));
141+
}
142+
for (size_t i = 0; i < txns.size(); i++) {
143+
execQuery(*txns[i], "select dtm_access(%ld)", snapshot);
144+
}
145+
} else {
146+
for (size_t i = 0; i < txns.size(); i++) {
147+
if (i == 0) {
148+
snapshot = execQuery(*txns[i], "select dtm_extend()");
149+
} else {
150+
snapshot = execQuery(*txns[i], "select dtm_access(%ld)", snapshot);
151+
}
132152
}
133153
}
134154
int64_t sum = 0;
135155
for (size_t i = 0; i < txns.size(); i++) {
136156
sum += execQuery(*txns[i], "select sum(v) from t");
137157
}
138158
if (sum != prevSum) {
139-
printf("Total=%ld snapshot=%ld\n", sum, snapshot);
159+
printf("Total=%ld snapshot=%ldm delta=%ld usec\n", sum, snapshot, getCurrentTime()-snapshot);
140160
prevSum = sum;
141161
}
142162
t.proceeded += 1;
163+
time_t elapsed = getCurrentTime() - start;
164+
if (elapsed > t.max_trans_duration) {
165+
t.max_trans_duration = elapsed;
166+
}
143167
}
144168
return NULL;
145169
}
@@ -156,33 +180,37 @@ void* writer(void* arg)
156180
{
157181
char gtid[32];
158182

159-
// int srcAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
160-
// int dstAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
161-
162183
int srcAcc = cfg.startId + random() % cfg.diapason;
163184
int dstAcc = cfg.startId + random() % cfg.diapason;
164185

165-
#if 1 // avoid deadlocks
166-
if (srcAcc > dstAcc) {
186+
if (cfg.deadlockFree && srcAcc > dstAcc) { // avoid deadlocks
167187
int tmpAcc = dstAcc;
168188
dstAcc = srcAcc;
169189
srcAcc = tmpAcc;
170190
}
171-
#endif
172191
sprintf(gtid, "%d.%d.%d", cfg.startId, t.id, i);
173192

174193
nontransaction srcTx(*srcCon);
175194
nontransaction dstTx(*dstCon);
176195

196+
time_t start = getCurrentTime();
197+
177198
exec(srcTx, "begin transaction");
178199
exec(dstTx, "begin transaction");
179200

180-
csn_t snapshot = execQuery(srcTx, "select dtm_extend('%s')", gtid);
181-
snapshot = execQuery(dstTx, "select dtm_access(%ld, '%s')", snapshot, gtid);
182-
183-
exec(srcTx, "savepoint c1");
184-
exec(dstTx, "savepoint c2");
185-
201+
if (cfg.maxSnapshot) {
202+
csn_t snapshot = execQuery(srcTx, "select dtm_extend('%s')", gtid);
203+
snapshot = max(snapshot, execQuery(dstTx, "select dtm_extend('%s')", gtid));
204+
execQuery(srcTx, "select dtm_access(%ld, '%s')", snapshot, gtid);
205+
execQuery(dstTx, "select dtm_access(%ld, '%s')", snapshot, gtid);
206+
} else {
207+
csn_t snapshot = execQuery(srcTx, "select dtm_extend('%s')", gtid);
208+
snapshot = execQuery(dstTx, "select dtm_access(%ld, '%s')", snapshot, gtid);
209+
}
210+
if (cfg.makeSavepoints) {
211+
exec(srcTx, "savepoint c1");
212+
exec(dstTx, "savepoint c2");
213+
}
186214
try {
187215
exec(srcTx, "update t set v = v - 1 where u=%d", srcAcc);
188216
exec(dstTx, "update t set v = v + 1 where u=%d", dstAcc);
@@ -204,7 +232,12 @@ void* writer(void* arg)
204232
exec(dstTx, "select dtm_end_prepare('%s', %ld)", gtid, csn);
205233
exec(srcTx, "commit prepared '%s'", gtid);
206234
exec(dstTx, "commit prepared '%s'", gtid);
207-
235+
236+
time_t elapsed = getCurrentTime() - start;
237+
if (elapsed > t.max_trans_duration) {
238+
t.max_trans_duration = elapsed;
239+
}
240+
208241
t.proceeded += 1;
209242
}
210243
return NULL;
@@ -219,7 +252,7 @@ void initializeDatabase()
219252
exec(txn, "create extension pg_dtm");
220253
exec(txn, "drop table if exists t");
221254
exec(txn, "create table t(u int primary key, v int)");
222-
exec(txn, "insert into t (select generate_series(0,%d), %d)", cfg.nAccounts, 0);
255+
exec(txn, "insert into t (select generate_series(0,%d), %d)", cfg.nAccounts-1, 0);
223256
txn.commit();
224257
}
225258
}
@@ -255,8 +288,18 @@ int main (int argc, char* argv[])
255288
cfg.diapason = atoi(argv[++i]);
256289
continue;
257290
case 'C':
291+
case 'c':
258292
cfg.connections.push_back(string(argv[++i]));
259293
continue;
294+
case 'f':
295+
cfg.deadlockFree = true;
296+
continue;
297+
case 'm':
298+
cfg.maxSnapshot = true;
299+
continue;
300+
case 'x':
301+
cfg.makeSavepoints = true;
302+
continue;
260303
case 'i':
261304
initialize = true;
262305
continue;
@@ -266,16 +309,19 @@ int main (int argc, char* argv[])
266309
"\t-r N\tnumber of readers (1)\n"
267310
"\t-w N\tnumber of writers (10)\n"
268311
"\t-a N\tnumber of accounts (100000)\n"
269-
"\t-s N\tperform updates starting from this id (1)\n"
270-
"\t-d N\tperform updates in this diapason (100000)\n"
312+
"\t-s N\tperform updates starting from this id (0)\n"
313+
"\t-d N\tperform updates in this diapason (#accounts)\n"
271314
"\t-n N\tnumber of iterations (1000)\n"
272-
"\t-C STR\tdatabase connection string\n"
315+
"\t-c STR\tdatabase connection string\n"
316+
"\t-f\tavoid deadlocks by ordering accounts\n"
317+
"\t-m\tchoose maximal snapshot\n"
318+
"\t-x\tmake savepoints\n"
273319
"\t-i\tinitialize datanase\n");
274320
return 1;
275321
}
276322

277323
if (cfg.startId + cfg.diapason > cfg.nAccounts) {
278-
printf("startId + diapason should be less that nAccounts. Exiting.\n");
324+
cfg.diapason = cfg.nAccounts - cfg.startId;
279325
return 1;
280326
}
281327

@@ -293,7 +339,9 @@ int main (int argc, char* argv[])
293339
size_t nReads = 0;
294340
size_t nWrites = 0;
295341
size_t nAborts = 0;
296-
342+
time_t maxReadDuration = 0;
343+
time_t maxWriteDuration = 0;
344+
297345
for (int i = 0; i < cfg.nReaders; i++) {
298346
readers[i].start(i, reader);
299347
}
@@ -305,14 +353,20 @@ int main (int argc, char* argv[])
305353
writers[i].wait();
306354
nWrites += writers[i].proceeded;
307355
nAborts += writers[i].aborts;
356+
if (writers[i].max_trans_duration > maxWriteDuration) {
357+
maxWriteDuration = writers[i].max_trans_duration;
358+
}
308359
}
309360

310361
running = false;
311362

312363
for (int i = 0; i < cfg.nReaders; i++) {
313364
readers[i].wait();
314365
nReads += readers[i].proceeded;
315-
}
366+
if (readers[i].max_trans_duration > maxReadDuration) {
367+
maxReadDuration = readers[i].max_trans_duration;
368+
}
369+
}
316370

317371
time_t elapsed = getCurrentTime() - start;
318372

@@ -321,13 +375,15 @@ int main (int argc, char* argv[])
321375
printf(
322376
"{\"update_tps\":%f, \"read_tps\":%f,"
323377
" \"readers\":%d, \"writers\":%d, \"aborts\":%ld, \"abort_percent\": %d,"
378+
" \"max_read_duration\":%ld, \"max_write_duration\":%ld,"
324379
" \"accounts\":%d, \"iterations\":%d, \"hosts\":%ld}\n",
325380
(double)(nWrites*USEC)/elapsed,
326381
(double)(nReads*USEC)/elapsed,
327382
cfg.nReaders,
328383
cfg.nWriters,
329384
nAborts,
330385
(int)(nAborts*100/nWrites),
386+
maxReadDuration, maxWriteDuration,
331387
cfg.nAccounts,
332388
cfg.nIterations,
333389
cfg.connections.size()

tests/makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
CXX=g++
2-
CXXFLAGS=-g -Wall -O2 -pthread
2+
CXXFLAGS=-g -Wall -O0 -pthread
33

44
all: dtmbench
55

0 commit comments

Comments
 (0)