@@ -62,15 +62,19 @@ struct config
62
62
int nWriters;
63
63
int nIterations;
64
64
int nAccounts;
65
+ int startId;
66
+ int diapason;
65
67
char const * isolationLevel;
66
68
vector<string> connections;
67
69
68
70
config () {
69
71
nReaders = 1 ;
70
72
nWriters = 10 ;
71
73
nIterations = 1000 ;
72
- nAccounts = 1000 ;
74
+ nAccounts = 100000 ;
73
75
isolationLevel = " read committed" ;
76
+ startId = 1 ;
77
+ diapason = 100000 ;
74
78
}
75
79
};
76
80
@@ -154,23 +158,24 @@ void* reader(void* arg)
154
158
void * writer (void * arg)
155
159
{
156
160
thread& t = *(thread*)arg;
157
- vector< unique_ptr<connection> > conns (cfg.connections .size ());
158
- for (size_t i = 0 ; i < conns.size (); i++) {
159
- conns[i] = new connection (cfg.connections [i]);
160
- }
161
+ connection *srcCon, *dstCon;
162
+
163
+ srcCon = new connection (cfg.connections [t.id % cfg.connections .size ()]);
164
+ dstCon = new connection (cfg.connections [(t.id + 1 ) % cfg.connections .size ()]);
165
+
161
166
for (int i = 0 ; i < cfg.nIterations ; i++)
162
167
{
163
- int srcCon, dstCon ;
164
- int srcAcc = ( random () % (( cfg.nAccounts -cfg. nWriters )/cfg. nWriters ))*cfg. nWriters + t. id ;
165
- int dstAcc = ( random () % ((cfg. nAccounts -cfg. nWriters )/cfg. nWriters ))*cfg. nWriters + t. id ;
166
-
167
- do {
168
- srcCon = random () % cfg. connections . size () ;
169
- dstCon = random () % cfg. connections . size () ;
170
- } while (srcCon == dstCon);
171
-
172
- nontransaction srcTx (*conns[ srcCon] );
173
- nontransaction dstTx (*conns[ dstCon] );
168
+ int srcAcc = cfg. startId + random () % cfg. diapason ;
169
+ int dstAcc = cfg. startId + random () % cfg.diapason ;
170
+
171
+ if (srcAcc > dstAcc) {
172
+ int tmpAcc = dstAcc;
173
+ dstAcc = srcAcc ;
174
+ srcAcc = tmpAcc ;
175
+ }
176
+
177
+ nontransaction srcTx (*srcCon);
178
+ nontransaction dstTx (*dstCon);
174
179
175
180
xid_t xid = execQuery (srcTx, " select dtm_begin_transaction()" );
176
181
exec (dstTx, " select dtm_join_transaction(%u)" , xid);
@@ -188,13 +193,14 @@ void* writer(void* arg)
188
193
i -= 1 ;
189
194
continue ;
190
195
}
196
+
191
197
pipeline srcPipe (srcTx);
192
198
pipeline dstPipe (dstTx);
193
199
srcPipe.insert (" commit transaction" );
194
200
dstPipe.insert (" commit transaction" );
195
201
srcPipe.complete ();
196
202
dstPipe.complete ();
197
-
203
+
198
204
t.proceeded += 1 ;
199
205
}
200
206
return NULL ;
@@ -217,6 +223,12 @@ void initializeDatabase()
217
223
int main (int argc, char * argv[])
218
224
{
219
225
bool initialize = false ;
226
+
227
+ if (argc == 1 ){
228
+ printf (" Use -h to show usage options\n " );
229
+ return 1 ;
230
+ }
231
+
220
232
for (int i = 1 ; i < argc; i++) {
221
233
if (argv[i][0 ] == ' -' ) {
222
234
switch (argv[i][1 ]) {
@@ -232,11 +244,16 @@ int main (int argc, char* argv[])
232
244
case ' n' :
233
245
cfg.nIterations = atoi (argv[++i]);
234
246
continue ;
235
- case ' c' :
236
- cfg.connections .push_back (string (argv[++i]));
247
+ case ' s' :
248
+ cfg.startId = atoi (argv[++i]);
249
+ continue ;
250
+ case ' d' :
251
+ cfg.diapason = atoi (argv[++i]);
237
252
continue ;
238
253
case ' l' :
239
254
cfg.isolationLevel = argv[++i];
255
+ case ' C' :
256
+ cfg.connections .push_back (string (argv[++i]));
240
257
continue ;
241
258
case ' i' :
242
259
initialize = true ;
@@ -246,15 +263,26 @@ int main (int argc, char* argv[])
246
263
printf (" Options:\n "
247
264
" \t -r N\t number of readers (1)\n "
248
265
" \t -w N\t number of writers (10)\n "
249
- " \t -a N\t number of accounts (1000)\n "
266
+ " \t -a N\t number of accounts (100000)\n "
267
+ " \t -s N\t perform updates starting from this id (1)\n "
268
+ " \t -d N\t perform updates in this diapason (100000)\n "
250
269
" \t -n N\t number of iterations (1000)\n "
251
270
" \t -l STR\t isolation level (read committed)\n "
252
271
" \t -c STR\t database connection string\n "
272
+ " \t -C STR\t database connection string\n "
253
273
" \t -i\t initialize datanase\n " );
254
274
return 1 ;
255
275
}
276
+
277
+ if (cfg.startId + cfg.diapason - 1 > cfg.nAccounts ) {
278
+ printf (" startId + diapason should be less that nAccounts. Exiting.\n " );
279
+ return 1 ;
280
+ }
281
+
256
282
if (initialize) {
257
283
initializeDatabase ();
284
+ printf (" %d account inserted\n " , cfg.nAccounts );
285
+ return 0 ;
258
286
}
259
287
260
288
time_t start = getCurrentTime ();
@@ -287,6 +315,19 @@ int main (int argc, char* argv[])
287
315
}
288
316
289
317
time_t elapsed = getCurrentTime () - start;
290
- printf (" TPS(updates)=%f, TPS(selects)=%f, aborts=%ld\n " , (double )(nWrites*USEC)/elapsed, (double )(nReads*USEC)/elapsed, nAborts);
318
+
319
+ printf (
320
+ " {\" update_tps\" :%f, \" read_tps\" :%f,"
321
+ " \" readers\" :%d, \" writers\" :%d,"
322
+ " \" accounts\" :%d, \" iterations\" :%d, \" hosts\" :%d}\n " ,
323
+ (double )(nWrites*USEC)/elapsed,
324
+ (double )(nReads*USEC)/elapsed,
325
+ cfg.nReaders ,
326
+ cfg.nWriters ,
327
+ cfg.nAccounts ,
328
+ cfg.nIterations ,
329
+ cfg.connections .size ()
330
+ );
331
+
291
332
return 0 ;
292
333
}
0 commit comments