Skip to content

Commit 3f68177

Browse files
committed
Merge branch 'master' into uniq-arbiter
2 parents eb05cb7 + 9219f15 commit 3f68177

File tree

5 files changed

+43
-24
lines changed

5 files changed

+43
-24
lines changed

contrib/multimaster/multimaster.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ static TransactionId DtmNextXid;
123123
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
124124
static bool DtmHasGlobalSnapshot;
125125
static int DtmLocalXidReserve;
126+
static CommandId DtmCurcid;
126127
static Snapshot DtmLastSnapshot;
127128
static TransactionManager DtmTM = {
128129
DtmGetTransactionStatus,
@@ -630,11 +631,12 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
630631
{
631632
if (TransactionIdIsValid(DtmNextXid) && snapshot != &CatalogSnapshotData)
632633
{
633-
if (!DtmHasGlobalSnapshot) {
634+
if (!DtmHasGlobalSnapshot && (snapshot != DtmLastSnapshot || DtmCurcid != GetCurrentCommandId(false))) {
634635
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot, &dtm->minXid);
635636
}
636637
DtmLastSnapshot = snapshot;
637638
DtmMergeWithGlobalSnapshot(snapshot);
639+
DtmCurcid = snapshot->curcid;
638640
if (!IsolationUsesXactSnapshot())
639641
{
640642
/* Use single global snapshot during all transaction for repeatable read isolation level,

contrib/pg_dtm/pg_dtm.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
107107
static bool DtmHasGlobalSnapshot;
108108
static bool DtmGlobalXidAssigned;
109109
static int DtmLocalXidReserve;
110+
static CommandId DtmCurcid;
110111
static Snapshot DtmLastSnapshot;
111112
static TransactionManager DtmTM = {
112113
DtmGetTransactionStatus,
@@ -600,7 +601,6 @@ DtmGetNewTransactionId(bool isSubXact)
600601
return xid;
601602
}
602603

603-
604604
static Snapshot DtmGetSnapshot(Snapshot snapshot)
605605
{
606606
if (DtmGlobalXidAssigned)
@@ -612,12 +612,13 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
612612
return PgGetSnapshotData(snapshot);
613613
}
614614
if (TransactionIdIsValid(DtmNextXid) && snapshot != &CatalogSnapshotData)
615-
{
616-
if (!DtmHasGlobalSnapshot) {
615+
{
616+
if (!DtmHasGlobalSnapshot && (snapshot != DtmLastSnapshot || DtmCurcid != GetCurrentCommandId(false))) {
617617
ArbiterGetSnapshot(DtmNextXid, &DtmSnapshot, &dtm->minXid);
618618
}
619619
DtmLastSnapshot = snapshot;
620620
DtmMergeWithGlobalSnapshot(snapshot);
621+
DtmCurcid = snapshot->curcid;
621622
if (!IsolationUsesXactSnapshot())
622623
{
623624
/* Use single global snapshot during all transaction for repeatable read isolation level,

src/backend/commands/indexcmds.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ AlterIndex(Oid indexRelationId, IndexStmt *stmt)
368368
SPI_cursor_fetch(portal, true, 1);
369369
if (!SPI_processed) {
370370
break;
371-
}
371+
}
372372
tuple = SPI_tuptable->vals[0];
373373
tupleid = &tuple->t_data->t_ctid;
374374
ExecStoreTuple(tuple, slot, InvalidBuffer, false);

src/bin/insbench/insbench.cpp

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ void* inserter(void* arg)
9898
} else {
9999
con.prepare("insert", "insert into t (select generate_series($1::integer,$2::integer),ceil(random()*1000000000),ceil(random()*1000000000),ceil(random()*1000000000),ceil(random()*1000000000),ceil(random()*1000000000),ceil(random()*1000000000),ceil(random()*1000000000),ceil(random()*1000000000))");
100100
}
101-
101+
time_t curr = currTimestamp;
102+
102103
for (int i = 0; i < cfg.nIterations; i++)
103104
{
104105
work txn(con);
@@ -109,8 +110,9 @@ void* inserter(void* arg)
109110
txn.prepared("insert")(getCurrentTime())(random())(random())(random())(random())(random())(random())(random())(random()).exec();
110111
}
111112
} else {
112-
currTimestamp = i*cfg.transactionSize;
113-
txn.prepared("insert")(i*cfg.transactionSize)((i+1)*cfg.transactionSize-1).exec();
113+
txn.prepared("insert")(curr)(curr+cfg.transactionSize-1).exec();
114+
curr += cfg.transactionSize;
115+
currTimestamp = curr;
114116
}
115117
txn.commit();
116118
}
@@ -124,10 +126,11 @@ void* indexUpdater(void* arg)
124126
sleep(cfg.indexUpdateInterval);
125127
printf("Alter indexes\n");
126128
time_t now = getCurrentTime();
129+
time_t limit = cfg.useSystemTime ? now : currTimestamp;
127130
{
128131
work txn(con);
129132
for (int i = 0; i < cfg.nIndexes; i++) {
130-
exec(txn, "alter index idx%d where pk<%lu", i, cfg.useSystemTime ? now : currTimestamp);
133+
exec(txn, "alter index idx%d where pk<%lu", i, limit);
131134
}
132135
txn.commit();
133136
}
@@ -149,18 +152,6 @@ void initializeDatabase()
149152
time_t now = getCurrentTime();
150153
exec(txn, "drop table if exists t");
151154
exec(txn, "create table t (pk bigint, k1 bigint, k2 bigint, k3 bigint, k4 bigint, k5 bigint, k6 bigint, k7 bigint, k8 bigint)");
152-
if (!cfg.noPK) {
153-
exec(txn, "create index pk on t(pk)");
154-
}
155-
for (int i = 0; i < cfg.nIndexes; i++) {
156-
if (cfg.indexUpdateInterval == 0) {
157-
exec(txn, "create index idx%d on t(k%d)", i, i+1);
158-
} else if (cfg.useSystemTime) {
159-
exec(txn, "create index idx%d on t(k%d) where pk<%ld", i, i+1, now);
160-
} else {
161-
exec(txn, "create index idx%d on t(k%d) where pk<%ld", i, i+1, 0);
162-
}
163-
}
164155

165156
if (cfg.initialSize)
166157
{
@@ -184,9 +175,26 @@ void initializeDatabase()
184175
txn.prepared("insert")(cfg.initialSize)(cfg.initialSize-1).exec();
185176
currTimestamp = cfg.initialSize;
186177
}
187-
txn.exec("vacuum analyze");
178+
}
179+
if (!cfg.noPK) {
180+
exec(txn, "create index pk on t(pk)");
181+
}
182+
for (int i = 0; i < cfg.nIndexes; i++) {
183+
if (cfg.indexUpdateInterval == 0) {
184+
exec(txn, "create index idx%d on t(k%d)", i, i+1);
185+
} else if (cfg.useSystemTime) {
186+
exec(txn, "create index idx%d on t(k%d) where pk<%ld", i, i+1, now);
187+
} else {
188+
exec(txn, "create index idx%d on t(k%d) where pk<%ld", i, i+1, currTimestamp);
189+
}
188190
}
189191
txn.commit();
192+
{
193+
nontransaction txn(con);
194+
txn.exec("vacuum analyze");
195+
sleep(2);
196+
}
197+
printf("Database intialized\n");
190198
}
191199

192200

@@ -234,7 +242,8 @@ int main (int argc, char* argv[])
234242
"\t-w N\tnumber of inserters (1)\n"
235243
"\t-u N\tindex update interval (0)\n"
236244
"\t-n N\tnumber of iterations (10000)\n"
237-
"\t-i N\tnumber of indexes (8)\n"
245+
"\t-x N\tnumber of indexes (8)\n"
246+
"\t-i N\tinitial table size (1000000)\n"
238247
"\t-q\tuse system time and libpq\n"
239248
"\t-p\tno primary key\n"
240249
"\t-c STR\tdatabase connection string\n");
@@ -257,10 +266,11 @@ int main (int argc, char* argv[])
257266
for (int i = 0; i < cfg.nInserters; i++) {
258267
inserters[i].wait();
259268
}
269+
time_t elapsed = getCurrentTime() - start;
270+
260271
running = false;
261272
bgw.wait();
262273

263-
time_t elapsed = getCurrentTime() - start;
264274

265275
printf(
266276
"{\"tps\":%f, \"index_updates\":%d, \"max_update_time\":%ld, \"avg_update_time\":%f,"

src/bin/insbench/run.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
echo Insert with 1 index
2+
./insbench -c "dbname=postgres host=localhost port=5432 sslmode=disable" -x 0
3+
echo Insert with 9 indexex
4+
./insbench -c "dbname=postgres host=localhost port=5432 sslmode=disable" -x 8
5+
echo Insert with 9 partial indexes
6+
./insbench -c "dbname=postgres host=localhost port=5432 sslmode=disable" -x 8 -u 1

0 commit comments

Comments
 (0)