@@ -41,12 +41,14 @@ struct thread
41
41
pthread_t t;
42
42
size_t proceeded;
43
43
size_t aborts;
44
+ time_t max_trans_duration;
44
45
int id;
45
46
46
47
void start (int tid, thread_proc_t proc) {
47
48
id = tid;
48
49
proceeded = 0 ;
49
50
aborts = 0 ;
51
+ max_trans_duration = 0 ;
50
52
pthread_create (&t, NULL , proc, this );
51
53
}
52
54
@@ -63,6 +65,9 @@ struct config
63
65
int nAccounts;
64
66
int startId;
65
67
int diapason;
68
+ bool deadlockFree;
69
+ bool maxSnapshot;
70
+ bool makeSavepoints;
66
71
vector<string> connections;
67
72
68
73
config () {
@@ -72,6 +77,8 @@ struct config
72
77
nAccounts = 100000 ;
73
78
startId = 0 ;
74
79
diapason = 100000 ;
80
+ deadlockFree = false ;
81
+ makeSavepoints = false ;
75
82
}
76
83
};
77
84
@@ -87,6 +94,9 @@ static time_t getCurrentTime()
87
94
return (time_t )tv.tv_sec *USEC + tv.tv_usec ;
88
95
}
89
96
97
+ inline csn_t max (csn_t t1, csn_t t2) {
98
+ return t1 < t2 ? t2 : t1;
99
+ }
90
100
91
101
void exec (transaction_base& txn, char const * sql, ...)
92
102
{
@@ -119,27 +129,41 @@ void* reader(void* arg)
119
129
int64_t prevSum = 0 ;
120
130
121
131
while (running) {
122
- csn_t snapshot;
132
+ csn_t snapshot = 0 ;
123
133
vector< unique_ptr<work> > txns (conns.size ());
134
+ time_t start = getCurrentTime ();
124
135
for (size_t i = 0 ; i < conns.size (); i++) {
125
136
txns[i] = new work (*conns[i]);
126
137
}
127
- for (size_t i = 0 ; i < txns.size (); i++) {
128
- if (i == 0 ) {
129
- snapshot = execQuery (*txns[i], " select dtm_extend()" );
130
- } else {
131
- snapshot = execQuery (*txns[i], " select dtm_access(%ld)" , snapshot);
138
+ if (cfg.maxSnapshot ) {
139
+ for (size_t i = 0 ; i < txns.size (); i++) {
140
+ snapshot = max (snapshot, execQuery (*txns[i], " select dtm_extend()" ));
141
+ }
142
+ for (size_t i = 0 ; i < txns.size (); i++) {
143
+ execQuery (*txns[i], " select dtm_access(%ld)" , snapshot);
144
+ }
145
+ } else {
146
+ for (size_t i = 0 ; i < txns.size (); i++) {
147
+ if (i == 0 ) {
148
+ snapshot = execQuery (*txns[i], " select dtm_extend()" );
149
+ } else {
150
+ snapshot = execQuery (*txns[i], " select dtm_access(%ld)" , snapshot);
151
+ }
132
152
}
133
153
}
134
154
int64_t sum = 0 ;
135
155
for (size_t i = 0 ; i < txns.size (); i++) {
136
156
sum += execQuery (*txns[i], " select sum(v) from t" );
137
157
}
138
158
if (sum != prevSum) {
139
- printf (" Total=%ld snapshot=%ld \n " , sum, snapshot);
159
+ printf (" Total=%ld snapshot=%ldm delta=%ld usec \n " , sum, snapshot, getCurrentTime ()- snapshot);
140
160
prevSum = sum;
141
161
}
142
162
t.proceeded += 1 ;
163
+ time_t elapsed = getCurrentTime () - start;
164
+ if (elapsed > t.max_trans_duration ) {
165
+ t.max_trans_duration = elapsed;
166
+ }
143
167
}
144
168
return NULL ;
145
169
}
@@ -156,33 +180,37 @@ void* writer(void* arg)
156
180
{
157
181
char gtid[32 ];
158
182
159
- // int srcAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
160
- // int dstAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
161
-
162
183
int srcAcc = cfg.startId + random () % cfg.diapason ;
163
184
int dstAcc = cfg.startId + random () % cfg.diapason ;
164
185
165
- #if 1 // avoid deadlocks
166
- if (srcAcc > dstAcc) {
186
+ if (cfg.deadlockFree && srcAcc > dstAcc) { // avoid deadlocks
167
187
int tmpAcc = dstAcc;
168
188
dstAcc = srcAcc;
169
189
srcAcc = tmpAcc;
170
190
}
171
- #endif
172
191
sprintf (gtid, " %d.%d.%d" , cfg.startId , t.id , i);
173
192
174
193
nontransaction srcTx (*srcCon);
175
194
nontransaction dstTx (*dstCon);
176
195
196
+ time_t start = getCurrentTime ();
197
+
177
198
exec (srcTx, " begin transaction" );
178
199
exec (dstTx, " begin transaction" );
179
200
180
- csn_t snapshot = execQuery (srcTx, " select dtm_extend('%s')" , gtid);
181
- snapshot = execQuery (dstTx, " select dtm_access(%ld, '%s')" , snapshot, gtid);
182
-
183
- exec (srcTx, " savepoint c1" );
184
- exec (dstTx, " savepoint c2" );
185
-
201
+ if (cfg.maxSnapshot ) {
202
+ csn_t snapshot = execQuery (srcTx, " select dtm_extend('%s')" , gtid);
203
+ snapshot = max (snapshot, execQuery (dstTx, " select dtm_extend('%s')" , gtid));
204
+ execQuery (srcTx, " select dtm_access(%ld, '%s')" , snapshot, gtid);
205
+ execQuery (dstTx, " select dtm_access(%ld, '%s')" , snapshot, gtid);
206
+ } else {
207
+ csn_t snapshot = execQuery (srcTx, " select dtm_extend('%s')" , gtid);
208
+ snapshot = execQuery (dstTx, " select dtm_access(%ld, '%s')" , snapshot, gtid);
209
+ }
210
+ if (cfg.makeSavepoints ) {
211
+ exec (srcTx, " savepoint c1" );
212
+ exec (dstTx, " savepoint c2" );
213
+ }
186
214
try {
187
215
exec (srcTx, " update t set v = v - 1 where u=%d" , srcAcc);
188
216
exec (dstTx, " update t set v = v + 1 where u=%d" , dstAcc);
@@ -204,7 +232,12 @@ void* writer(void* arg)
204
232
exec (dstTx, " select dtm_end_prepare('%s', %ld)" , gtid, csn);
205
233
exec (srcTx, " commit prepared '%s'" , gtid);
206
234
exec (dstTx, " commit prepared '%s'" , gtid);
207
-
235
+
236
+ time_t elapsed = getCurrentTime () - start;
237
+ if (elapsed > t.max_trans_duration ) {
238
+ t.max_trans_duration = elapsed;
239
+ }
240
+
208
241
t.proceeded += 1 ;
209
242
}
210
243
return NULL ;
@@ -219,7 +252,7 @@ void initializeDatabase()
219
252
exec (txn, " create extension pg_dtm" );
220
253
exec (txn, " drop table if exists t" );
221
254
exec (txn, " create table t(u int primary key, v int)" );
222
- exec (txn, " insert into t (select generate_series(0,%d), %d)" , cfg.nAccounts , 0 );
255
+ exec (txn, " insert into t (select generate_series(0,%d), %d)" , cfg.nAccounts - 1 , 0 );
223
256
txn.commit ();
224
257
}
225
258
}
@@ -255,8 +288,18 @@ int main (int argc, char* argv[])
255
288
cfg.diapason = atoi (argv[++i]);
256
289
continue ;
257
290
case ' C' :
291
+ case ' c' :
258
292
cfg.connections .push_back (string (argv[++i]));
259
293
continue ;
294
+ case ' f' :
295
+ cfg.deadlockFree = true ;
296
+ continue ;
297
+ case ' m' :
298
+ cfg.maxSnapshot = true ;
299
+ continue ;
300
+ case ' x' :
301
+ cfg.makeSavepoints = true ;
302
+ continue ;
260
303
case ' i' :
261
304
initialize = true ;
262
305
continue ;
@@ -266,16 +309,19 @@ int main (int argc, char* argv[])
266
309
" \t -r N\t number of readers (1)\n "
267
310
" \t -w N\t number of writers (10)\n "
268
311
" \t -a N\t number of accounts (100000)\n "
269
- " \t -s N\t perform updates starting from this id (1 )\n "
270
- " \t -d N\t perform updates in this diapason (100000 )\n "
312
+ " \t -s N\t perform updates starting from this id (0 )\n "
313
+ " \t -d N\t perform updates in this diapason (#accounts )\n "
271
314
" \t -n N\t number of iterations (1000)\n "
272
- " \t -C STR\t database connection string\n "
315
+ " \t -c STR\t database connection string\n "
316
+ " \t -f\t avoid deadlocks by ordering accounts\n "
317
+ " \t -m\t choose maximal snapshot\n "
318
+ " \t -x\t make savepoints\n "
273
319
" \t -i\t initialize datanase\n " );
274
320
return 1 ;
275
321
}
276
322
277
323
if (cfg.startId + cfg.diapason > cfg.nAccounts ) {
278
- printf ( " startId + diapason should be less that nAccounts. Exiting. \n " ) ;
324
+ cfg. diapason = cfg. nAccounts - cfg. startId ;
279
325
return 1 ;
280
326
}
281
327
@@ -293,7 +339,9 @@ int main (int argc, char* argv[])
293
339
size_t nReads = 0 ;
294
340
size_t nWrites = 0 ;
295
341
size_t nAborts = 0 ;
296
-
342
+ time_t maxReadDuration = 0 ;
343
+ time_t maxWriteDuration = 0 ;
344
+
297
345
for (int i = 0 ; i < cfg.nReaders ; i++) {
298
346
readers[i].start (i, reader);
299
347
}
@@ -305,14 +353,20 @@ int main (int argc, char* argv[])
305
353
writers[i].wait ();
306
354
nWrites += writers[i].proceeded ;
307
355
nAborts += writers[i].aborts ;
356
+ if (writers[i].max_trans_duration > maxWriteDuration) {
357
+ maxWriteDuration = writers[i].max_trans_duration ;
358
+ }
308
359
}
309
360
310
361
running = false ;
311
362
312
363
for (int i = 0 ; i < cfg.nReaders ; i++) {
313
364
readers[i].wait ();
314
365
nReads += readers[i].proceeded ;
315
- }
366
+ if (readers[i].max_trans_duration > maxReadDuration) {
367
+ maxReadDuration = readers[i].max_trans_duration ;
368
+ }
369
+ }
316
370
317
371
time_t elapsed = getCurrentTime () - start;
318
372
@@ -321,13 +375,15 @@ int main (int argc, char* argv[])
321
375
printf (
322
376
" {\" update_tps\" :%f, \" read_tps\" :%f,"
323
377
" \" readers\" :%d, \" writers\" :%d, \" aborts\" :%ld, \" abort_percent\" : %d,"
378
+ " \" max_read_duration\" :%ld, \" max_write_duration\" :%ld,"
324
379
" \" accounts\" :%d, \" iterations\" :%d, \" hosts\" :%ld}\n " ,
325
380
(double )(nWrites*USEC)/elapsed,
326
381
(double )(nReads*USEC)/elapsed,
327
382
cfg.nReaders ,
328
383
cfg.nWriters ,
329
384
nAborts,
330
385
(int )(nAborts*100 /nWrites),
386
+ maxReadDuration, maxWriteDuration,
331
387
cfg.nAccounts ,
332
388
cfg.nIterations ,
333
389
cfg.connections .size ()
0 commit comments