Skip to content

Commit 7841007

Browse files
committed
Support subtransactions
1 parent d9e190e commit 7841007

File tree

2 files changed

+28
-6
lines changed

2 files changed

+28
-6
lines changed

pg_dtm.c

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,9 @@ static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum);
8484
static bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
8585
static TransactionId DtmAdjustOldestXid(TransactionId xid);
8686
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
87+
static bool DtmDetectGlobalDeadLock(PGPROC* proc);
8788

88-
static TransactionManager DtmTM = { PgTransactionIdGetStatus, DtmSetTransactionStatus, DtmGetSnapshot, PgGetNewTransactionId, DtmGetOldestXmin, PgTransactionIdIsInProgress, PgGetGlobalTransactionId, DtmXidInMVCCSnapshot };
89+
static TransactionManager DtmTM = { PgTransactionIdGetStatus, DtmSetTransactionStatus, DtmGetSnapshot, PgGetNewTransactionId, DtmGetOldestXmin, PgTransactionIdIsInProgress, PgGetGlobalTransactionId, DtmXidInMVCCSnapshot, DtmDetectGlobalDeadLock };
8990

9091
void _PG_init(void);
9192
void _PG_fini(void);
@@ -765,3 +766,9 @@ void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *sub
765766
}
766767
PgTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn);
767768
}
769+
770+
bool DtmDetectGlobalDeadLock(PGPROC* proc)
771+
{
772+
elog(WARNING, "Global deadlock?");
773+
return true;
774+
}

tests/dtmbench.cpp

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,13 @@ struct thread
4040
{
4141
pthread_t t;
4242
size_t proceeded;
43+
size_t aborts;
4344
int id;
4445

4546
void start(int tid, thread_proc_t proc) {
4647
id = tid;
4748
proceeded = 0;
49+
aborts = 0;
4850
pthread_create(&t, NULL, proc, this);
4951
}
5052

@@ -160,12 +162,13 @@ void* writer(void* arg)
160162
int srcAcc = cfg.startId + random() % cfg.diapason;
161163
int dstAcc = cfg.startId + random() % cfg.diapason;
162164

165+
#if 1 // avoid deadlocks
163166
if (srcAcc > dstAcc) {
164167
int tmpAcc = dstAcc;
165168
dstAcc = srcAcc;
166169
srcAcc = tmpAcc;
167170
}
168-
171+
#endif
169172
sprintf(gtid, "%d.%d.%d", cfg.startId, t.id, i);
170173

171174
nontransaction srcTx(*srcCon);
@@ -180,8 +183,16 @@ void* writer(void* arg)
180183
exec(srcTx, "savepoint c1");
181184
exec(dstTx, "savepoint c2");
182185

183-
exec(srcTx, "update t set v = v - 1 where u=%d", srcAcc);
184-
exec(dstTx, "update t set v = v + 1 where u=%d", dstAcc);
186+
try {
187+
exec(srcTx, "update t set v = v - 1 where u=%d", srcAcc);
188+
exec(dstTx, "update t set v = v + 1 where u=%d", dstAcc);
189+
} catch (pqxx_exception const& x) {
190+
exec(srcTx, "rollback");
191+
exec(dstTx, "rollback");
192+
t.aborts += 1;
193+
i -= 1;
194+
continue;
195+
}
185196

186197
exec(srcTx, "prepare transaction '%s'", gtid);
187198
exec(dstTx, "prepare transaction '%s'", gtid);
@@ -281,7 +292,8 @@ int main (int argc, char* argv[])
281292
vector<thread> writers(cfg.nWriters);
282293
size_t nReads = 0;
283294
size_t nWrites = 0;
284-
295+
size_t nAborts = 0;
296+
285297
for (int i = 0; i < cfg.nReaders; i++) {
286298
readers[i].start(i, reader);
287299
}
@@ -292,6 +304,7 @@ int main (int argc, char* argv[])
292304
for (int i = 0; i < cfg.nWriters; i++) {
293305
writers[i].wait();
294306
nWrites += writers[i].proceeded;
307+
nAborts += writers[i].aborts;
295308
}
296309

297310
running = false;
@@ -307,12 +320,14 @@ int main (int argc, char* argv[])
307320

308321
printf(
309322
"{\"update_tps\":%f, \"read_tps\":%f,"
310-
" \"readers\":%d, \"writers\":%d,"
323+
" \"readers\":%d, \"writers\":%d, \"aborts\":%ld, \"abort_percent\": %d,"
311324
" \"accounts\":%d, \"iterations\":%d, \"hosts\":%ld}\n",
312325
(double)(nWrites*USEC)/elapsed,
313326
(double)(nReads*USEC)/elapsed,
314327
cfg.nReaders,
315328
cfg.nWriters,
329+
nAborts,
330+
(int)(nAborts*100/nWrites),
316331
cfg.nAccounts,
317332
cfg.nIterations,
318333
cfg.connections.size()

0 commit comments

Comments
 (0)