@@ -29,7 +29,7 @@ struct thread
29
29
size_t aborts;
30
30
int id;
31
31
32
- void start (int tid, thread_proc_t proc) {
32
+ void start (int tid, thread_proc_t proc) {
33
33
id = tid;
34
34
updates = 0 ;
35
35
selects = 0 ;
@@ -38,7 +38,7 @@ struct thread
38
38
pthread_create (&t, NULL , proc, this );
39
39
}
40
40
41
- void wait () {
41
+ void wait () {
42
42
pthread_join (t, NULL );
43
43
}
44
44
};
@@ -96,7 +96,7 @@ T execQuery( transaction_base& txn, char const* sql, ...)
96
96
va_end (args);
97
97
result r = txn.exec (buf);
98
98
return r[0 ][0 ].as (T ());
99
- }
99
+ }
100
100
101
101
void * reader (void * arg)
102
102
{
@@ -118,32 +118,32 @@ void* reader(void* arg)
118
118
}
119
119
return NULL ;
120
120
}
121
-
121
+
122
122
void * writer (void * arg)
123
123
{
124
124
thread& t = *(thread*)arg;
125
125
connection conn (cfg.connection );
126
126
for (int i = 0 ; i < cfg.nIterations ; i++)
127
- {
127
+ {
128
128
work txn (conn);
129
129
int srcAcc = random () % cfg.nAccounts ;
130
130
int dstAcc = random () % cfg.nAccounts ;
131
- try {
132
- if (random () % 100 < cfg.updatePercent ) {
131
+ try {
132
+ if (random () % 100 < cfg.updatePercent ) {
133
133
exec (txn, " update t set v = v - 1 where u=%d" , srcAcc);
134
134
exec (txn, " update t set v = v + 1 where u=%d" , dstAcc);
135
135
t.updates += 2 ;
136
- } else {
136
+ } else {
137
137
int64_t sum = execQuery<int64_t >(txn, " select v from t where u=%d" , srcAcc)
138
138
+ execQuery<int64_t >(txn, " select v from t where u=%d" , dstAcc);
139
- if (sum > cfg.nIterations *cfg.nWriters || sum < -cfg.nIterations *cfg.nWriters ) {
139
+ if (sum > cfg.nIterations *cfg.nWriters || sum < -cfg.nIterations *cfg.nWriters ) {
140
140
printf (" Wrong sum=%ld\n " , sum);
141
141
}
142
142
t.selects += 2 ;
143
143
}
144
- txn.commit ();
144
+ txn.commit ();
145
145
t.transactions += 1 ;
146
- } catch (pqxx_exception const & x) {
146
+ } catch (pqxx_exception const & x) {
147
147
txn.abort ();
148
148
t.aborts += 1 ;
149
149
i -= 1 ;
@@ -152,13 +152,13 @@ void* writer(void* arg)
152
152
}
153
153
return NULL ;
154
154
}
155
-
155
+
156
156
void initializeDatabase ()
157
157
{
158
158
connection conn (cfg.connection );
159
159
int accountsPerShard = (cfg.nAccounts + cfg.nShards - 1 )/cfg.nShards ;
160
- for (int i = 0 ; i < cfg.nShards ; i++)
161
- {
160
+ for (int i = 0 ; i < cfg.nShards ; i++)
161
+ {
162
162
work txn (conn);
163
163
exec (txn, " alter table t_fdw%i add check (u between %d and %d)" , i+1 , accountsPerShard*i, accountsPerShard*(i+1 )-1 );
164
164
exec (txn, " insert into t_fdw%i (select generate_series(%d,%d), %d)" , i+1 , accountsPerShard*i, accountsPerShard*(i+1 )-1 , 0 );
@@ -175,15 +175,15 @@ int main (int argc, char* argv[])
175
175
return 1 ;
176
176
}
177
177
178
- for (int i = 1 ; i < argc; i++) {
179
- if (argv[i][0 ] == ' -' ) {
180
- switch (argv[i][1 ]) {
178
+ for (int i = 1 ; i < argc; i++) {
179
+ if (argv[i][0 ] == ' -' ) {
180
+ switch (argv[i][1 ]) {
181
181
case ' r' :
182
182
cfg.nReaders = atoi (argv[++i]);
183
183
continue ;
184
184
case ' w' :
185
185
cfg.nWriters = atoi (argv[++i]);
186
- continue ;
186
+ continue ;
187
187
case ' a' :
188
188
cfg.nAccounts = atoi (argv[++i]);
189
189
continue ;
@@ -213,7 +213,7 @@ int main (int argc, char* argv[])
213
213
return 1 ;
214
214
}
215
215
216
- if (initialize) {
216
+ if (initialize) {
217
217
initializeDatabase ();
218
218
printf (" %d accounts inserted\n " , cfg.nAccounts );
219
219
return 0 ;
@@ -229,29 +229,29 @@ int main (int argc, char* argv[])
229
229
size_t nSelects = 0 ;
230
230
size_t nTransactions = 0 ;
231
231
232
- for (int i = 0 ; i < cfg.nReaders ; i++) {
232
+ for (int i = 0 ; i < cfg.nReaders ; i++) {
233
233
readers[i].start (i, reader);
234
234
}
235
- for (int i = 0 ; i < cfg.nWriters ; i++) {
235
+ for (int i = 0 ; i < cfg.nWriters ; i++) {
236
236
writers[i].start (i, writer);
237
237
}
238
-
239
- for (int i = 0 ; i < cfg.nWriters ; i++) {
238
+
239
+ for (int i = 0 ; i < cfg.nWriters ; i++) {
240
240
writers[i].wait ();
241
241
nUpdates += writers[i].updates ;
242
242
nSelects += writers[i].selects ;
243
243
nAborts += writers[i].aborts ;
244
244
nTransactions += writers[i].transactions ;
245
245
}
246
-
246
+
247
247
running = false ;
248
248
249
- for (int i = 0 ; i < cfg.nReaders ; i++) {
249
+ for (int i = 0 ; i < cfg.nReaders ; i++) {
250
250
readers[i].wait ();
251
251
nSelects += readers[i].selects ;
252
252
nTransactions += writers[i].transactions ;
253
253
}
254
-
254
+
255
255
time_t elapsed = getCurrentTime () - start;
256
256
257
257
printf (
@@ -260,10 +260,10 @@ int main (int argc, char* argv[])
260
260
" \" readers\" :%d, \" writers\" :%d, \" update_percent\" :%d, \" accounts\" :%d, \" iterations\" :%d ,\" shards\" :%d}\n " ,
261
261
(double )(nTransactions*USEC)/elapsed,
262
262
nTransactions,
263
- nSelects,
263
+ nSelects,
264
264
nUpdates,
265
265
nAborts,
266
- (int )(nAborts*100 /nTransactions),
266
+ (int )(nAborts*100 /nTransactions),
267
267
cfg.nReaders ,
268
268
cfg.nWriters ,
269
269
cfg.updatePercent ,
0 commit comments