@@ -95,7 +95,10 @@ static int pthread_join(pthread_t th, void **thread_return);
95
95
#define LOG_STEP_SECONDS 5 /* seconds between log messages */
96
96
#define DEFAULT_NXACTS 10 /* default nxacts */
97
97
98
+ #define ZIPF_CACHE_SIZE 15 /* cache cells number */
99
+
98
100
#define MIN_GAUSSIAN_PARAM 2.0 /* minimum parameter for gauss */
101
+ #define MAX_ZIPFIAN_PARAM 1000 /* maximum parameter for zipfian */
99
102
100
103
int nxacts = 0 ; /* number of transactions per client */
101
104
int duration = 0 ; /* duration in seconds */
@@ -330,6 +333,35 @@ typedef struct
330
333
int ecnt ; /* error count */
331
334
} CState ;
332
335
336
+ /*
337
+ * Cache cell for zipfian_random call
338
+ */
339
+ typedef struct
340
+ {
341
+ /* cell keys */
342
+ double s ; /* s - parameter of zipfan_random function */
343
+ int64 n ; /* number of elements in range (max - min + 1) */
344
+
345
+ double harmonicn ; /* generalizedHarmonicNumber(n, s) */
346
+ double alpha ;
347
+ double beta ;
348
+ double eta ;
349
+
350
+ uint64 last_used ; /* last used logical time */
351
+ } ZipfCell ;
352
+
353
+ /*
354
+ * Zipf cache for zeta values
355
+ */
356
+ typedef struct
357
+ {
358
+ uint64 current ; /* counter for LRU cache replacement algorithm */
359
+
360
+ int nb_cells ; /* number of filled cells */
361
+ int overflowCount ; /* number of cache overflows */
362
+ ZipfCell cells [ZIPF_CACHE_SIZE ];
363
+ } ZipfCache ;
364
+
333
365
/*
334
366
* Thread state
335
367
*/
@@ -342,6 +374,8 @@ typedef struct
342
374
unsigned short random_state [3 ]; /* separate randomness for each thread */
343
375
int64 throttle_trigger ; /* previous/next throttling (us) */
344
376
FILE * logfile ; /* where to log, or NULL */
377
+ ZipfCache zipf_cache ; /* for thread-safe zipfian random number
378
+ * generation */
345
379
346
380
/* per thread collected stats */
347
381
instr_time start_time ; /* thread start time */
@@ -746,6 +780,137 @@ getPoissonRand(TState *thread, int64 center)
746
780
return (int64 ) (- log (uniform ) * ((double ) center ) + 0.5 );
747
781
}
748
782
783
+ /* helper function for getZipfianRand */
784
+ static double
785
+ generalizedHarmonicNumber (int64 n , double s )
786
+ {
787
+ int i ;
788
+ double ans = 0.0 ;
789
+
790
+ for (i = n ; i > 1 ; i -- )
791
+ ans += pow (i , - s );
792
+ return ans + 1.0 ;
793
+ }
794
+
795
+ /* set harmonicn and other parameters to cache cell */
796
+ static void
797
+ zipfSetCacheCell (ZipfCell * cell , int64 n , double s )
798
+ {
799
+ double harmonic2 ;
800
+
801
+ cell -> n = n ;
802
+ cell -> s = s ;
803
+
804
+ harmonic2 = generalizedHarmonicNumber (2 , s );
805
+ cell -> harmonicn = generalizedHarmonicNumber (n , s );
806
+
807
+ cell -> alpha = 1.0 / (1.0 - s );
808
+ cell -> beta = pow (0.5 , s );
809
+ cell -> eta = (1.0 - pow (2.0 / n , 1.0 - s )) / (1.0 - harmonic2 / cell -> harmonicn );
810
+ }
811
+
812
+ /*
813
+ * search for cache cell with keys (n, s)
814
+ * and create new cell if it does not exist
815
+ */
816
+ static ZipfCell *
817
+ zipfFindOrCreateCacheCell (ZipfCache * cache , int64 n , double s )
818
+ {
819
+ int i ,
820
+ least_recently_used = 0 ;
821
+ ZipfCell * cell ;
822
+
823
+ /* search cached cell for given parameters */
824
+ for (i = 0 ; i < cache -> nb_cells ; i ++ )
825
+ {
826
+ cell = & cache -> cells [i ];
827
+ if (cell -> n == n && cell -> s == s )
828
+ return & cache -> cells [i ];
829
+
830
+ if (cell -> last_used < cache -> cells [least_recently_used ].last_used )
831
+ least_recently_used = i ;
832
+ }
833
+
834
+ /* create new one if it does not exist */
835
+ if (cache -> nb_cells < ZIPF_CACHE_SIZE )
836
+ i = cache -> nb_cells ++ ;
837
+ else
838
+ {
839
+ /* replace LRU cell if cache is full */
840
+ i = least_recently_used ;
841
+ cache -> overflowCount ++ ;
842
+ }
843
+
844
+ zipfSetCacheCell (& cache -> cells [i ], n , s );
845
+
846
+ cache -> cells [i ].last_used = cache -> current ++ ;
847
+ return & cache -> cells [i ];
848
+ }
849
+
850
+ /*
851
+ * Computing zipfian using rejection method, based on
852
+ * "Non-Uniform Random Variate Generation",
853
+ * Luc Devroye, p. 550-551, Springer 1986.
854
+ */
855
+ static int64
856
+ computeIterativeZipfian (TState * thread , int64 n , double s )
857
+ {
858
+ double b = pow (2.0 , s - 1.0 );
859
+ double x ,
860
+ t ,
861
+ u ,
862
+ v ;
863
+
864
+ while (true)
865
+ {
866
+ /* random variates */
867
+ u = pg_erand48 (thread -> random_state );
868
+ v = pg_erand48 (thread -> random_state );
869
+
870
+ x = floor (pow (u , -1.0 / (s - 1.0 )));
871
+
872
+ t = pow (1.0 + 1.0 / x , s - 1.0 );
873
+ /* reject if too large or out of bound */
874
+ if (v * x * (t - 1.0 ) / (b - 1.0 ) <= t / b && x <= n )
875
+ break ;
876
+ }
877
+ return (int64 ) x ;
878
+ }
879
+
880
+ /*
881
+ * Computing zipfian using harmonic numbers, based on algorithm described in
882
+ * "Quickly Generating Billion-Record Synthetic Databases",
883
+ * Jim Gray et al, SIGMOD 1994
884
+ */
885
+ static int64
886
+ computeHarmonicZipfian (TState * thread , int64 n , double s )
887
+ {
888
+ ZipfCell * cell = zipfFindOrCreateCacheCell (& thread -> zipf_cache , n , s );
889
+ double uniform = pg_erand48 (thread -> random_state );
890
+ double uz = uniform * cell -> harmonicn ;
891
+
892
+ if (uz < 1.0 )
893
+ return 1 ;
894
+ if (uz < 1.0 + cell -> beta )
895
+ return 2 ;
896
+ return 1 + (int64 ) (cell -> n * pow (cell -> eta * uniform - cell -> eta + 1.0 , cell -> alpha ));
897
+ }
898
+
899
+ /* random number generator: zipfian distribution from min to max inclusive */
900
+ static int64
901
+ getZipfianRand (TState * thread , int64 min , int64 max , double s )
902
+ {
903
+ int64 n = max - min + 1 ;
904
+
905
+ /* abort if parameter is invalid */
906
+ Assert (s > 0.0 && s != 1.0 && s <= MAX_ZIPFIAN_PARAM );
907
+
908
+
909
+ return min - 1 + ((s > 1 )
910
+ ? computeIterativeZipfian (thread , n , s )
911
+ : computeHarmonicZipfian (thread , n , s ));
912
+ }
913
+
749
914
/*
750
915
* Initialize the given SimpleStats struct to all zeroes
751
916
*/
@@ -1303,7 +1468,6 @@ coerceToDouble(PgBenchValue *pval, double *dval)
1303
1468
return true;
1304
1469
}
1305
1470
}
1306
-
1307
1471
/* assign an integer value */
1308
1472
static void
1309
1473
setIntValue (PgBenchValue * pv , int64 ival )
@@ -1605,6 +1769,7 @@ evalFunc(TState *thread, CState *st,
1605
1769
case PGBENCH_RANDOM :
1606
1770
case PGBENCH_RANDOM_EXPONENTIAL :
1607
1771
case PGBENCH_RANDOM_GAUSSIAN :
1772
+ case PGBENCH_RANDOM_ZIPFIAN :
1608
1773
{
1609
1774
int64 imin ,
1610
1775
imax ;
@@ -1655,6 +1820,18 @@ evalFunc(TState *thread, CState *st,
1655
1820
setIntValue (retval ,
1656
1821
getGaussianRand (thread , imin , imax , param ));
1657
1822
}
1823
+ else if (func == PGBENCH_RANDOM_ZIPFIAN )
1824
+ {
1825
+ if (param <= 0.0 || param == 1.0 || param > MAX_ZIPFIAN_PARAM )
1826
+ {
1827
+ fprintf (stderr ,
1828
+ "zipfian parameter must be in range (0, 1) U (1, %d]"
1829
+ " (got %f)\n" , MAX_ZIPFIAN_PARAM , param );
1830
+ return false;
1831
+ }
1832
+ setIntValue (retval ,
1833
+ getZipfianRand (thread , imin , imax , param ));
1834
+ }
1658
1835
else /* exponential */
1659
1836
{
1660
1837
if (param <= 0.0 )
@@ -3683,6 +3860,8 @@ printResults(TState *threads, StatsData *total, instr_time total_time,
3683
3860
tps_include ,
3684
3861
tps_exclude ;
3685
3862
int64 ntx = total -> cnt - total -> skipped ;
3863
+ int i ,
3864
+ totalCacheOverflows = 0 ;
3686
3865
3687
3866
time_include = INSTR_TIME_GET_DOUBLE (total_time );
3688
3867
@@ -3710,6 +3889,15 @@ printResults(TState *threads, StatsData *total, instr_time total_time,
3710
3889
printf ("number of transactions actually processed: " INT64_FORMAT "\n" ,
3711
3890
ntx );
3712
3891
}
3892
+ /* Report zipfian cache overflow */
3893
+ for (i = 0 ; i < nthreads ; i ++ )
3894
+ {
3895
+ totalCacheOverflows += threads [i ].zipf_cache .overflowCount ;
3896
+ }
3897
+ if (totalCacheOverflows > 0 )
3898
+ {
3899
+ printf ("zipfian cache array overflowed %d time(s)\n" , totalCacheOverflows );
3900
+ }
3713
3901
3714
3902
/* Remaining stats are nonsensical if we failed to execute any xacts */
3715
3903
if (total -> cnt <= 0 )
@@ -4513,6 +4701,9 @@ main(int argc, char **argv)
4513
4701
thread -> random_state [2 ] = random ();
4514
4702
thread -> logfile = NULL ; /* filled in later */
4515
4703
thread -> latency_late = 0 ;
4704
+ thread -> zipf_cache .nb_cells = 0 ;
4705
+ thread -> zipf_cache .current = 0 ;
4706
+ thread -> zipf_cache .overflowCount = 0 ;
4516
4707
initStats (& thread -> stats , 0 );
4517
4708
4518
4709
nclients_dealt += thread -> nstate ;
0 commit comments