@@ -59,13 +59,17 @@ struct config
59
59
int nWriters;
60
60
int nIterations;
61
61
int nAccounts;
62
+ int startId;
63
+ int diapason;
62
64
vector<string> connections;
63
65
64
66
config () {
65
67
nReaders = 1 ;
66
68
nWriters = 10 ;
67
69
nIterations = 1000 ;
68
- nAccounts = 1000 ;
70
+ nAccounts = 100000 ;
71
+ startId = 1 ;
72
+ diapason = 100000 ;
69
73
}
70
74
};
71
75
@@ -141,36 +145,41 @@ void* reader(void* arg)
141
145
void * writer (void * arg)
142
146
{
143
147
thread& t = *(thread*)arg;
144
- vector< unique_ptr<connection> > conns (cfg.connections .size ());
145
- for (size_t i = 0 ; i < conns.size (); i++) {
146
- conns[i] = new connection (cfg.connections [i]);
147
- }
148
+ connection *srcCon, *dstCon;
149
+
150
+ srcCon = new connection (cfg.connections [t.id % cfg.connections .size ()]);
151
+ dstCon = new connection (cfg.connections [(t.id + 1 ) % cfg.connections .size ()]);
152
+
148
153
for (int i = 0 ; i < cfg.nIterations ; i++)
149
154
{
150
155
char gtid[32 ];
151
- int srcCon, dstCon;
152
- int srcAcc = (random () % ((cfg.nAccounts -cfg.nWriters )/cfg.nWriters ))*cfg.nWriters + t.id ;
153
- int dstAcc = (random () % ((cfg.nAccounts -cfg.nWriters )/cfg.nWriters ))*cfg.nWriters + t.id ;
154
156
155
- sprintf (gtid, " %d.%d" , t.id , i);
157
+ // int srcAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
158
+ // int dstAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
156
159
157
- do {
158
- srcCon = random () % cfg.connections .size ();
159
- dstCon = random () % cfg.connections .size ();
160
- } while (srcCon == dstCon);
161
-
162
- nontransaction srcTx (*conns[srcCon]);
163
- nontransaction dstTx (*conns[dstCon]);
160
+ int srcAcc = cfg.startId + random () % cfg.diapason ;
161
+ int dstAcc = cfg.startId + random () % cfg.diapason ;
162
+
163
+ if (srcAcc > dstAcc) {
164
+ int tmpAcc = dstAcc;
165
+ dstAcc = srcAcc;
166
+ srcAcc = tmpAcc;
167
+ }
168
+
169
+ sprintf (gtid, " %d.%d.%d" , cfg.startId , t.id , i);
170
+
171
+ nontransaction srcTx (*srcCon);
172
+ nontransaction dstTx (*dstCon);
164
173
165
174
exec (srcTx, " begin transaction" );
166
175
exec (dstTx, " begin transaction" );
167
176
168
177
csn_t snapshot = execQuery (srcTx, " select dtm_extend('%s')" , gtid);
169
178
snapshot = execQuery (dstTx, " select dtm_access(%ld, '%s')" , snapshot, gtid);
170
-
179
+
171
180
exec (srcTx, " update t set v = v - 1 where u=%d" , srcAcc);
172
181
exec (dstTx, " update t set v = v + 1 where u=%d" , dstAcc);
173
-
182
+
174
183
exec (srcTx, " prepare transaction '%s'" , gtid);
175
184
exec (dstTx, " prepare transaction '%s'" , gtid);
176
185
exec (srcTx, " select dtm_begin_prepare('%s')" , gtid);
@@ -196,17 +205,20 @@ void initializeDatabase()
196
205
exec (txn, " create extension pg_dtm" );
197
206
exec (txn, " drop table if exists t" );
198
207
exec (txn, " create table t(u int primary key, v int)" );
199
- exec (txn, " insert into t (select generate_series(0,%d), %d)" , cfg.nAccounts - 1 , 0 );
208
+ exec (txn, " insert into t (select generate_series(0,%d), %d)" , cfg.nAccounts , 0 );
200
209
txn.commit ();
201
-
202
- // nontransaction vacTx(conn);
203
- // exec(vacTx, "vacuum full");
204
210
}
205
211
}
206
212
207
213
int main (int argc, char * argv[])
208
214
{
209
215
bool initialize = false ;
216
+
217
+ if (argc == 1 ){
218
+ printf (" Use -h to show usage options\n " );
219
+ return 1 ;
220
+ }
221
+
210
222
for (int i = 1 ; i < argc; i++) {
211
223
if (argv[i][0 ] == ' -' ) {
212
224
switch (argv[i][1 ]) {
@@ -222,6 +234,12 @@ int main (int argc, char* argv[])
222
234
case ' n' :
223
235
cfg.nIterations = atoi (argv[++i]);
224
236
continue ;
237
+ case ' s' :
238
+ cfg.startId = atoi (argv[++i]);
239
+ continue ;
240
+ case ' d' :
241
+ cfg.diapason = atoi (argv[++i]);
242
+ continue ;
225
243
case ' C' :
226
244
cfg.connections .push_back (string (argv[++i]));
227
245
continue ;
@@ -233,14 +251,24 @@ int main (int argc, char* argv[])
233
251
printf (" Options:\n "
234
252
" \t -r N\t number of readers (1)\n "
235
253
" \t -w N\t number of writers (10)\n "
236
- " \t -a N\t number of accounts (1000)\n "
254
+ " \t -a N\t number of accounts (100000)\n "
255
+ " \t -s N\t perform updates starting from this id (1)\n "
256
+ " \t -d N\t perform updates in this diapason (100000)\n "
237
257
" \t -n N\t number of iterations (1000)\n "
238
- " \t -c STR\t database connection string\n "
258
+ " \t -C STR\t database connection string\n "
239
259
" \t -i\t initialize datanase\n " );
240
260
return 1 ;
241
261
}
262
+
263
+ if (cfg.startId + cfg.diapason - 1 > cfg.nAccounts ) {
264
+ printf (" startId + diapason should be less that nAccounts. Exiting.\n " );
265
+ return 1 ;
266
+ }
267
+
242
268
if (initialize) {
243
269
initializeDatabase ();
270
+ printf (" %d account inserted\n " , cfg.nAccounts );
271
+ return 0 ;
244
272
}
245
273
246
274
time_t start = getCurrentTime ();
@@ -275,7 +303,9 @@ int main (int argc, char* argv[])
275
303
276
304
277
305
printf (
278
- " {\" update_tps\" :%f, \" read_tps\" :%f, \" readers\" :%d, \" writers\" :%d, \" accounts\" :%d, \" iterations\" :%d, \" hosts\" :%d}\n " ,
306
+ " {\" update_tps\" :%f, \" read_tps\" :%f,"
307
+ " \" readers\" :%d, \" writers\" :%d,"
308
+ " \" accounts\" :%d, \" iterations\" :%d, \" hosts\" :%d}\n " ,
279
309
(double )(nWrites*USEC)/elapsed,
280
310
(double )(nReads*USEC)/elapsed,
281
311
cfg.nReaders ,
0 commit comments