diff --git a/Lib/statistics.py b/Lib/statistics.py index 5f0a6e67d4..f66245380a 100644 --- a/Lib/statistics.py +++ b/Lib/statistics.py @@ -11,7 +11,7 @@ Function Description ================== ================================================== mean Arithmetic mean (average) of data. -fmean Fast, floating-point arithmetic mean. +fmean Fast, floating point arithmetic mean. geometric_mean Geometric mean of data. harmonic_mean Harmonic mean of data. median Median (middle value) of data. @@ -112,8 +112,6 @@ 'fmean', 'geometric_mean', 'harmonic_mean', - 'kde', - 'kde_random', 'linear_regression', 'mean', 'median', @@ -132,20 +130,14 @@ import math import numbers import random -import sys from fractions import Fraction from decimal import Decimal -from itertools import count, groupby, repeat +from itertools import groupby, repeat from bisect import bisect_left, bisect_right -from math import hypot, sqrt, fabs, exp, erf, tau, log, fsum, sumprod -from math import isfinite, isinf, pi, cos, sin, tan, cosh, asin, atan, acos -from functools import reduce +from math import hypot, sqrt, fabs, exp, erf, tau, log, fsum from operator import itemgetter -from collections import Counter, namedtuple, defaultdict - -_SQRT2 = sqrt(2.0) -_random = random +from collections import Counter, namedtuple # === Exceptions === @@ -188,12 +180,11 @@ def _sum(data): allowed. """ count = 0 - types = set() - types_add = types.add partials = {} partials_get = partials.get + T = int for typ, values in groupby(data, type): - types_add(typ) + T = _coerce(T, typ) # or raise TypeError for n, d in map(_exact_ratio, values): count += 1 partials[d] = partials_get(d, 0) + n @@ -205,51 +196,9 @@ def _sum(data): else: # Sum all the partial sums using builtin sum. total = sum(Fraction(n, d) for d, n in partials.items()) - T = reduce(_coerce, types, int) # or raise TypeError return (T, total, count) -def _ss(data, c=None): - """Return the exact mean and sum of square deviations of sequence data. - - Calculations are done in a single pass, allowing the input to be an iterator. - - If given *c* is used the mean; otherwise, it is calculated from the data. - Use the *c* argument with care, as it can lead to garbage results. - - """ - if c is not None: - T, ssd, count = _sum((d := x - c) * d for x in data) - return (T, ssd, c, count) - count = 0 - types = set() - types_add = types.add - sx_partials = defaultdict(int) - sxx_partials = defaultdict(int) - for typ, values in groupby(data, type): - types_add(typ) - for n, d in map(_exact_ratio, values): - count += 1 - sx_partials[d] += n - sxx_partials[d] += n * n - if not count: - ssd = c = Fraction(0) - elif None in sx_partials: - # The sum will be a NAN or INF. We can ignore all the finite - # partials, and just look at this special one. - ssd = c = sx_partials[None] - assert not _isfinite(ssd) - else: - sx = sum(Fraction(n, d) for d, n in sx_partials.items()) - sxx = sum(Fraction(n, d*d) for d, n in sxx_partials.items()) - # This formula has poor numeric properties for floats, - # but with fractions it is exact. - ssd = (count * sxx - sx * sx) / count - c = sx / count - T = reduce(_coerce, types, int) # or raise TypeError - return (T, ssd, c, count) - - def _isfinite(x): try: return x.is_finite() # Likely a Decimal. @@ -296,28 +245,6 @@ def _exact_ratio(x): x is expected to be an int, Fraction, Decimal or float. """ - - # XXX We should revisit whether using fractions to accumulate exact - # ratios is the right way to go. - - # The integer ratios for binary floats can have numerators or - # denominators with over 300 decimal digits. The problem is more - # acute with decimal floats where the default decimal context - # supports a huge range of exponents from Emin=-999999 to - # Emax=999999. When expanded with as_integer_ratio(), numbers like - # Decimal('3.14E+5000') and Decimal('3.14E-5000') have large - # numerators or denominators that will slow computation. - - # When the integer ratios are accumulated as fractions, the size - # grows to cover the full range from the smallest magnitude to the - # largest. For example, Fraction(3.14E+300) + Fraction(3.14E-300), - # has a 616 digit numerator. Likewise, - # Fraction(Decimal('3.14E+5000')) + Fraction(Decimal('3.14E-5000')) - # has 10,003 digit numerator. - - # This doesn't seem to have been problem in practice, but it is a - # potential pitfall. - try: return x.as_integer_ratio() except AttributeError: @@ -352,6 +279,22 @@ def _convert(value, T): raise +def _find_lteq(a, x): + 'Locate the leftmost value exactly equal to x' + i = bisect_left(a, x) + if i != len(a) and a[i] == x: + return i + raise ValueError + + +def _find_rteq(a, l, x): + 'Locate the rightmost value exactly equal to x' + i = bisect_right(a, x, lo=l) + if i != (len(a) + 1) and a[i - 1] == x: + return i - 1 + raise ValueError + + def _fail_neg(values, errmsg='negative value'): """Iterate over values, failing if any are less than zero.""" for x in values: @@ -360,113 +303,6 @@ def _fail_neg(values, errmsg='negative value'): yield x -def _rank(data, /, *, key=None, reverse=False, ties='average', start=1) -> list[float]: - """Rank order a dataset. The lowest value has rank 1. - - Ties are averaged so that equal values receive the same rank: - - >>> data = [31, 56, 31, 25, 75, 18] - >>> _rank(data) - [3.5, 5.0, 3.5, 2.0, 6.0, 1.0] - - The operation is idempotent: - - >>> _rank([3.5, 5.0, 3.5, 2.0, 6.0, 1.0]) - [3.5, 5.0, 3.5, 2.0, 6.0, 1.0] - - It is possible to rank the data in reverse order so that the - highest value has rank 1. Also, a key-function can extract - the field to be ranked: - - >>> goals = [('eagles', 45), ('bears', 48), ('lions', 44)] - >>> _rank(goals, key=itemgetter(1), reverse=True) - [2.0, 1.0, 3.0] - - Ranks are conventionally numbered starting from one; however, - setting *start* to zero allows the ranks to be used as array indices: - - >>> prize = ['Gold', 'Silver', 'Bronze', 'Certificate'] - >>> scores = [8.1, 7.3, 9.4, 8.3] - >>> [prize[int(i)] for i in _rank(scores, start=0, reverse=True)] - ['Bronze', 'Certificate', 'Gold', 'Silver'] - - """ - # If this function becomes public at some point, more thought - # needs to be given to the signature. A list of ints is - # plausible when ties is "min" or "max". When ties is "average", - # either list[float] or list[Fraction] is plausible. - - # Default handling of ties matches scipy.stats.mstats.spearmanr. - if ties != 'average': - raise ValueError(f'Unknown tie resolution method: {ties!r}') - if key is not None: - data = map(key, data) - val_pos = sorted(zip(data, count()), reverse=reverse) - i = start - 1 - result = [0] * len(val_pos) - for _, g in groupby(val_pos, key=itemgetter(0)): - group = list(g) - size = len(group) - rank = i + (size + 1) / 2 - for value, orig_pos in group: - result[orig_pos] = rank - i += size - return result - - -def _integer_sqrt_of_frac_rto(n: int, m: int) -> int: - """Square root of n/m, rounded to the nearest integer using round-to-odd.""" - # Reference: https://www.lri.fr/~melquion/doc/05-imacs17_1-expose.pdf - a = math.isqrt(n // m) - return a | (a*a*m != n) - - -# For 53 bit precision floats, the bit width used in -# _float_sqrt_of_frac() is 109. -_sqrt_bit_width: int = 2 * sys.float_info.mant_dig + 3 - - -def _float_sqrt_of_frac(n: int, m: int) -> float: - """Square root of n/m as a float, correctly rounded.""" - # See principle and proof sketch at: https://bugs.python.org/msg407078 - q = (n.bit_length() - m.bit_length() - _sqrt_bit_width) // 2 - if q >= 0: - numerator = _integer_sqrt_of_frac_rto(n, m << 2 * q) << q - denominator = 1 - else: - numerator = _integer_sqrt_of_frac_rto(n << -2 * q, m) - denominator = 1 << -q - return numerator / denominator # Convert to float - - -def _decimal_sqrt_of_frac(n: int, m: int) -> Decimal: - """Square root of n/m as a Decimal, correctly rounded.""" - # Premise: For decimal, computing (n/m).sqrt() can be off - # by 1 ulp from the correctly rounded result. - # Method: Check the result, moving up or down a step if needed. - if n <= 0: - if not n: - return Decimal('0.0') - n, m = -n, -m - - root = (Decimal(n) / Decimal(m)).sqrt() - nr, dr = root.as_integer_ratio() - - plus = root.next_plus() - np, dp = plus.as_integer_ratio() - # test: n / m > ((root + plus) / 2) ** 2 - if 4 * n * (dr*dp)**2 > m * (dr*np + dp*nr)**2: - return plus - - minus = root.next_minus() - nm, dm = minus.as_integer_ratio() - # test: n / m < ((root + minus) / 2) ** 2 - if 4 * n * (dr*dm)**2 < m * (dr*nm + dm*nr)**2: - return minus - - return root - - # === Measures of central tendency (averages) === def mean(data): @@ -485,13 +321,17 @@ def mean(data): If ``data`` is empty, StatisticsError will be raised. """ - T, total, n = _sum(data) + if iter(data) is data: + data = list(data) + n = len(data) if n < 1: raise StatisticsError('mean requires at least one data point') + T, total, count = _sum(data) + assert count == n return _convert(total / n, T) -def fmean(data, weights=None): +def fmean(data): """Convert data to floats and compute the arithmetic mean. This runs faster than the mean() function and it always returns a float. @@ -500,40 +340,29 @@ def fmean(data, weights=None): >>> fmean([3.5, 4.0, 5.25]) 4.25 """ - if weights is None: - try: - n = len(data) - except TypeError: - # Handle iterators that do not define __len__(). - n = 0 - def count(iterable): - nonlocal n - for n, x in enumerate(iterable, start=1): - yield x - data = count(data) + try: + n = len(data) + except TypeError: + # Handle iterators that do not define __len__(). + n = 0 + def count(iterable): + nonlocal n + for n, x in enumerate(iterable, start=1): + yield x + total = fsum(count(data)) + else: total = fsum(data) - if not n: - raise StatisticsError('fmean requires at least one data point') - return total / n - if not isinstance(weights, (list, tuple)): - weights = list(weights) try: - num = sumprod(data, weights) - except ValueError: - raise StatisticsError('data and weights must be the same length') - den = fsum(weights) - if not den: - raise StatisticsError('sum of weights must be non-zero') - return num / den + return total / n + except ZeroDivisionError: + raise StatisticsError('fmean requires at least one data point') from None def geometric_mean(data): """Convert data to floats and compute the geometric mean. - Raises a StatisticsError if the input dataset is empty - or if it contains a negative value. - - Returns zero if the product of inputs is zero. + Raises a StatisticsError if the input dataset is empty, + if it contains a zero, or if it contains a negative value. No special efforts are made to achieve exact results. (However, this may change in the future.) @@ -541,25 +370,11 @@ def geometric_mean(data): >>> round(geometric_mean([54, 24, 36]), 9) 36.0 """ - n = 0 - found_zero = False - def count_positive(iterable): - nonlocal n, found_zero - for n, x in enumerate(iterable, start=1): - if x > 0.0 or math.isnan(x): - yield x - elif x == 0.0: - found_zero = True - else: - raise StatisticsError('No negative inputs allowed', x) - total = fsum(map(log, count_positive(data))) - if not n: - raise StatisticsError('Must have a non-empty dataset') - if math.isnan(total): - return math.nan - if found_zero: - return math.nan if total == math.inf else 0.0 - return exp(total / n) + try: + return exp(fmean(map(log, data))) + except ValueError: + raise StatisticsError('geometric mean requires a non-empty dataset ' + 'containing positive numbers') from None def harmonic_mean(data, weights=None): @@ -683,75 +498,58 @@ def median_high(data): return data[n // 2] -def median_grouped(data, interval=1.0): - """Estimates the median for numeric data binned around the midpoints - of consecutive, fixed-width intervals. - - The *data* can be any iterable of numeric data with each value being - exactly the midpoint of a bin. At least one value must be present. +def median_grouped(data, interval=1): + """Return the 50th percentile (median) of grouped continuous data. - The *interval* is width of each bin. + >>> median_grouped([1, 2, 2, 3, 4, 4, 4, 4, 4, 5]) + 3.7 + >>> median_grouped([52, 52, 53, 54]) + 52.5 - For example, demographic information may have been summarized into - consecutive ten-year age groups with each group being represented - by the 5-year midpoints of the intervals: + This calculates the median as the 50th percentile, and should be + used when your data is continuous and grouped. In the above example, + the values 1, 2, 3, etc. actually represent the midpoint of classes + 0.5-1.5, 1.5-2.5, 2.5-3.5, etc. The middle value falls somewhere in + class 3.5-4.5, and interpolation is used to estimate it. - >>> demographics = Counter({ - ... 25: 172, # 20 to 30 years old - ... 35: 484, # 30 to 40 years old - ... 45: 387, # 40 to 50 years old - ... 55: 22, # 50 to 60 years old - ... 65: 6, # 60 to 70 years old - ... }) + Optional argument ``interval`` represents the class interval, and + defaults to 1. Changing the class interval naturally will change the + interpolated 50th percentile value: - The 50th percentile (median) is the 536th person out of the 1071 - member cohort. That person is in the 30 to 40 year old age group. - - The regular median() function would assume that everyone in the - tricenarian age group was exactly 35 years old. A more tenable - assumption is that the 484 members of that age group are evenly - distributed between 30 and 40. For that, we use median_grouped(). - - >>> data = list(demographics.elements()) - >>> median(data) - 35 - >>> round(median_grouped(data, interval=10), 1) - 37.5 - - The caller is responsible for making sure the data points are separated - by exact multiples of *interval*. This is essential for getting a - correct result. The function does not check this precondition. - - Inputs may be any numeric type that can be coerced to a float during - the interpolation step. + >>> median_grouped([1, 3, 3, 5, 7], interval=1) + 3.25 + >>> median_grouped([1, 3, 3, 5, 7], interval=2) + 3.5 + This function does not check whether the data points are at least + ``interval`` apart. """ data = sorted(data) n = len(data) - if not n: + if n == 0: raise StatisticsError("no median for empty data") - + elif n == 1: + return data[0] # Find the value at the midpoint. Remember this corresponds to the - # midpoint of the class interval. + # centre of the class interval. x = data[n // 2] - - # Using O(log n) bisection, find where all the x values occur in the data. - # All x will lie within data[i:j]. - i = bisect_left(data, x) - j = bisect_right(data, x, lo=i) - - # Coerce to floats, raising a TypeError if not possible + for obj in (x, interval): + if isinstance(obj, (str, bytes)): + raise TypeError('expected number but got %r' % obj) try: - interval = float(interval) - x = float(x) - except ValueError: - raise TypeError(f'Value cannot be converted to a float') - - # Interpolate the median using the formula found at: - # https://www.cuemath.com/data/median-of-grouped-data/ - L = x - interval / 2.0 # Lower limit of the median interval - cf = i # Cumulative frequency of the preceding interval - f = j - i # Number of elements in the median internal + L = x - interval / 2 # The lower limit of the median interval. + except TypeError: + # Mixed type. For now we just coerce to float. + L = float(x) - float(interval) / 2 + + # Uses bisection search to search for x in data with log(n) time complexity + # Find the position of leftmost occurrence of x in data + l1 = _find_lteq(data, x) + # Find the position of rightmost occurrence of x in data[l1...len(data)] + # Assuming always l1 <= l2 + l2 = _find_rteq(data, l1, x) + cf = l1 + f = l2 - l1 + 1 return L + interval * (n / 2 - cf) / f @@ -798,212 +596,9 @@ def multimode(data): >>> multimode('') [] """ - counts = Counter(iter(data)) - if not counts: - return [] - maxcount = max(counts.values()) - return [value for value, count in counts.items() if count == maxcount] - - -def kde(data, h, kernel='normal', *, cumulative=False): - """Kernel Density Estimation: Create a continuous probability density - function or cumulative distribution function from discrete samples. - - The basic idea is to smooth the data using a kernel function - to help draw inferences about a population from a sample. - - The degree of smoothing is controlled by the scaling parameter h - which is called the bandwidth. Smaller values emphasize local - features while larger values give smoother results. - - The kernel determines the relative weights of the sample data - points. Generally, the choice of kernel shape does not matter - as much as the more influential bandwidth smoothing parameter. - - Kernels that give some weight to every sample point: - - normal (gauss) - logistic - sigmoid - - Kernels that only give weight to sample points within - the bandwidth: - - rectangular (uniform) - triangular - parabolic (epanechnikov) - quartic (biweight) - triweight - cosine - - If *cumulative* is true, will return a cumulative distribution function. - - A StatisticsError will be raised if the data sequence is empty. - - Example - ------- - - Given a sample of six data points, construct a continuous - function that estimates the underlying probability density: - - >>> sample = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2] - >>> f_hat = kde(sample, h=1.5) - - Compute the area under the curve: - - >>> area = sum(f_hat(x) for x in range(-20, 20)) - >>> round(area, 4) - 1.0 - - Plot the estimated probability density function at - evenly spaced points from -6 to 10: - - >>> for x in range(-6, 11): - ... density = f_hat(x) - ... plot = ' ' * int(density * 400) + 'x' - ... print(f'{x:2}: {density:.3f} {plot}') - ... - -6: 0.002 x - -5: 0.009 x - -4: 0.031 x - -3: 0.070 x - -2: 0.111 x - -1: 0.125 x - 0: 0.110 x - 1: 0.086 x - 2: 0.068 x - 3: 0.059 x - 4: 0.066 x - 5: 0.082 x - 6: 0.082 x - 7: 0.058 x - 8: 0.028 x - 9: 0.009 x - 10: 0.002 x - - Estimate P(4.5 < X <= 7.5), the probability that a new sample value - will be between 4.5 and 7.5: - - >>> cdf = kde(sample, h=1.5, cumulative=True) - >>> round(cdf(7.5) - cdf(4.5), 2) - 0.22 - - References - ---------- - - Kernel density estimation and its application: - https://www.itm-conferences.org/articles/itmconf/pdf/2018/08/itmconf_sam2018_00037.pdf - - Kernel functions in common use: - https://en.wikipedia.org/wiki/Kernel_(statistics)#kernel_functions_in_common_use - - Interactive graphical demonstration and exploration: - https://demonstrations.wolfram.com/KernelDensityEstimation/ - - Kernel estimation of cumulative distribution function of a random variable with bounded support - https://www.econstor.eu/bitstream/10419/207829/1/10.21307_stattrans-2016-037.pdf - - """ - - n = len(data) - if not n: - raise StatisticsError('Empty data sequence') - - if not isinstance(data[0], (int, float)): - raise TypeError('Data sequence must contain ints or floats') - - if h <= 0.0: - raise StatisticsError(f'Bandwidth h must be positive, not {h=!r}') - - if kernel == "normal" or kernel == "gauss": - sqrt2pi = sqrt(2 * pi) - sqrt2 = sqrt(2) - K = lambda t: exp(-1/2 * t * t) / sqrt2pi - W = lambda t: 1/2 * (1.0 + erf(t / sqrt2)) - support = None - elif kernel == "logistic": - # 1.0 / (exp(t) + 2.0 + exp(-t)) - K = lambda t: 1/2 / (1.0 + cosh(t)) - W = lambda t: 1.0 - 1.0 / (exp(t) + 1.0) - support = None - elif kernel == "sigmoid": - # (2/pi) / (exp(t) + exp(-t)) - c1 = 1 / pi - c2 = 2 / pi - K = lambda t: c1 / cosh(t) - W = lambda t: c2 * atan(exp(t)) - support = None - elif kernel == "rectangular" or kernel == "uniform": - K = lambda t: 1/2 - W = lambda t: 1/2 * t + 1/2 - support = 1.0 - elif kernel == "triangular": - K = lambda t: 1.0 - abs(t) - W = lambda t: t*t * (1/2 if t < 0.0 else -1/2) + t + 1/2 - support = 1.0 - elif kernel == "parabolic" or kernel == "epanechnikov": - K = lambda t: 3/4 * (1.0 - t * t) - W = lambda t: -1/4 * t**3 + 3/4 * t + 1/2 - support = 1. - elif kernel == "quartic" or kernel == "biweight": - K = lambda t: 15/16 * (1.0 - t * t) ** 2 - W = lambda t: 3/16 * t**5 - 5/8 * t**3 + 15/16 * t + 1/2 - support = 1.0 - elif kernel == "triweight": - K = lambda t: 35/32 * (1.0 - t * t) ** 3 - W = lambda t: 35/32 * (-1/7*t**7 + 3/5*t**5 - t**3 + t) + 1/2 - support = 1.0 - elif kernel == "cosine": - c1 = pi / 4 - c2 = pi / 2 - K = lambda t: c1 * cos(c2 * t) - W = lambda t: 1/2 * sin(c2 * t) + 1/2 - support = 1.0 - else: - raise StatisticsError(f'Unknown kernel name: {kernel!r}') - - if support is None: - - def pdf(x): - n = len(data) - return sum(K((x - x_i) / h) for x_i in data) / (n * h) - - def cdf(x): - n = len(data) - return sum(W((x - x_i) / h) for x_i in data) / n - - else: - - sample = sorted(data) - bandwidth = h * support - - def pdf(x): - nonlocal n, sample - if len(data) != n: - sample = sorted(data) - n = len(data) - i = bisect_left(sample, x - bandwidth) - j = bisect_right(sample, x + bandwidth) - supported = sample[i : j] - return sum(K((x - x_i) / h) for x_i in supported) / (n * h) - - def cdf(x): - nonlocal n, sample - if len(data) != n: - sample = sorted(data) - n = len(data) - i = bisect_left(sample, x - bandwidth) - j = bisect_right(sample, x + bandwidth) - supported = sample[i : j] - return sum((W((x - x_i) / h) for x_i in supported), i) / n - - if cumulative: - cdf.__doc__ = f'CDF estimate with {h=!r} and {kernel=!r}' - return cdf - - else: - pdf.__doc__ = f'PDF estimate with {h=!r} and {kernel=!r}' - return pdf + counts = Counter(iter(data)).most_common() + maxcount, mode_items = next(groupby(counts, key=itemgetter(1)), (0, [])) + return list(map(itemgetter(0), mode_items)) # Notes on methods for computing quantiles @@ -1064,10 +659,7 @@ def quantiles(data, *, n=4, method='exclusive'): data = sorted(data) ld = len(data) if ld < 2: - if ld == 1: - return data * (n - 1) - raise StatisticsError('must have at least one data point') - + raise StatisticsError('must have at least two data points') if method == 'inclusive': m = ld - 1 result = [] @@ -1076,7 +668,6 @@ def quantiles(data, *, n=4, method='exclusive'): interpolated = (data[j] * (n - delta) + data[j + 1] * delta) / n result.append(interpolated) return result - if method == 'exclusive': m = ld + 1 result = [] @@ -1087,7 +678,6 @@ def quantiles(data, *, n=4, method='exclusive'): interpolated = (data[j - 1] * (n - delta) + data[j] * delta) / n result.append(interpolated) return result - raise ValueError(f'Unknown method: {method!r}') @@ -1095,6 +685,41 @@ def quantiles(data, *, n=4, method='exclusive'): # See http://mathworld.wolfram.com/Variance.html # http://mathworld.wolfram.com/SampleVariance.html +# http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance +# +# Under no circumstances use the so-called "computational formula for +# variance", as that is only suitable for hand calculations with a small +# amount of low-precision data. It has terrible numeric properties. +# +# See a comparison of three computational methods here: +# http://www.johndcook.com/blog/2008/09/26/comparing-three-methods-of-computing-standard-deviation/ + +def _ss(data, c=None): + """Return sum of square deviations of sequence data. + + If ``c`` is None, the mean is calculated in one pass, and the deviations + from the mean are calculated in a second pass. Otherwise, deviations are + calculated from ``c`` as given. Use the second case with care, as it can + lead to garbage results. + """ + if c is not None: + T, total, count = _sum((x-c)**2 for x in data) + return (T, total) + T, total, count = _sum(data) + mean_n, mean_d = (total / count).as_integer_ratio() + partials = Counter() + for n, d in map(_exact_ratio, data): + diff_n = n * mean_d - d * mean_n + diff_d = d * mean_d + partials[diff_d * diff_d] += diff_n * diff_n + if None in partials: + # The sum will be a NAN or INF. We can ignore all the finite + # partials, and just look at this special one. + total = partials[None] + assert not _isfinite(total) + else: + total = sum(Fraction(n, d) for d, n in partials.items()) + return (T, total) def variance(data, xbar=None): @@ -1135,9 +760,12 @@ def variance(data, xbar=None): Fraction(67, 108) """ - T, ss, c, n = _ss(data, xbar) + if iter(data) is data: + data = list(data) + n = len(data) if n < 2: raise StatisticsError('variance requires at least two data points') + T, ss = _ss(data, xbar) return _convert(ss / (n - 1), T) @@ -1176,9 +804,12 @@ def pvariance(data, mu=None): Fraction(13, 72) """ - T, ss, c, n = _ss(data, mu) + if iter(data) is data: + data = list(data) + n = len(data) if n < 1: raise StatisticsError('pvariance requires at least one data point') + T, ss = _ss(data, mu) return _convert(ss / n, T) @@ -1191,13 +822,14 @@ def stdev(data, xbar=None): 1.0810874155219827 """ - T, ss, c, n = _ss(data, xbar) - if n < 2: - raise StatisticsError('stdev requires at least two data points') - mss = ss / (n - 1) - if issubclass(T, Decimal): - return _decimal_sqrt_of_frac(mss.numerator, mss.denominator) - return _float_sqrt_of_frac(mss.numerator, mss.denominator) + # Fixme: Despite the exact sum of squared deviations, some inaccuracy + # remain because there are two rounding steps. The first occurs in + # the _convert() step for variance(), the second occurs in math.sqrt(). + var = variance(data, xbar) + try: + return var.sqrt() + except AttributeError: + return math.sqrt(var) def pstdev(data, mu=None): @@ -1209,47 +841,14 @@ def pstdev(data, mu=None): 0.986893273527251 """ - T, ss, c, n = _ss(data, mu) - if n < 1: - raise StatisticsError('pstdev requires at least one data point') - mss = ss / n - if issubclass(T, Decimal): - return _decimal_sqrt_of_frac(mss.numerator, mss.denominator) - return _float_sqrt_of_frac(mss.numerator, mss.denominator) - - -def _mean_stdev(data): - """In one pass, compute the mean and sample standard deviation as floats.""" - T, ss, xbar, n = _ss(data) - if n < 2: - raise StatisticsError('stdev requires at least two data points') - mss = ss / (n - 1) + # Fixme: Despite the exact sum of squared deviations, some inaccuracy + # remain because there are two rounding steps. The first occurs in + # the _convert() step for pvariance(), the second occurs in math.sqrt(). + var = pvariance(data, mu) try: - return float(xbar), _float_sqrt_of_frac(mss.numerator, mss.denominator) + return var.sqrt() except AttributeError: - # Handle Nans and Infs gracefully - return float(xbar), float(xbar) / float(ss) - -def _sqrtprod(x: float, y: float) -> float: - "Return sqrt(x * y) computed with improved accuracy and without overflow/underflow." - h = sqrt(x * y) - if not isfinite(h): - if isinf(h) and not isinf(x) and not isinf(y): - # Finite inputs overflowed, so scale down, and recompute. - scale = 2.0 ** -512 # sqrt(1 / sys.float_info.max) - return _sqrtprod(scale * x, scale * y) / scale - return h - if not h: - if x and y: - # Non-zero inputs underflowed, so scale up, and recompute. - # Scale: 1 / sqrt(sys.float_info.min * sys.float_info.epsilon) - scale = 2.0 ** 537 - return _sqrtprod(scale * x, scale * y) / scale - return h - # Improve accuracy with a differential correction. - # https://www.wolframalpha.com/input/?i=Maclaurin+series+sqrt%28h**2+%2B+x%29+at+x%3D0 - d = sumprod((x, h), (y, -h)) - return h + d / (2.0 * h) + return math.sqrt(var) # === Statistics for relations between two inputs === @@ -1283,16 +882,18 @@ def covariance(x, y, /): raise StatisticsError('covariance requires at least two data points') xbar = fsum(x) / n ybar = fsum(y) / n - sxy = sumprod((xi - xbar for xi in x), (yi - ybar for yi in y)) + sxy = fsum((xi - xbar) * (yi - ybar) for xi, yi in zip(x, y)) return sxy / (n - 1) -def correlation(x, y, /, *, method='linear'): +def correlation(x, y, /): """Pearson's correlation coefficient Return the Pearson's correlation coefficient for two inputs. Pearson's - correlation coefficient *r* takes values between -1 and +1. It measures - the strength and direction of a linear relationship. + correlation coefficient *r* takes values between -1 and +1. It measures the + strength and direction of the linear relationship, where +1 means very + strong, positive linear relationship, -1 very strong, negative linear + relationship, and 0 no linear relationship. >>> x = [1, 2, 3, 4, 5, 6, 7, 8, 9] >>> y = [9, 8, 7, 6, 5, 4, 3, 2, 1] @@ -1301,36 +902,19 @@ def correlation(x, y, /, *, method='linear'): >>> correlation(x, y) -1.0 - If *method* is "ranked", computes Spearman's rank correlation coefficient - for two inputs. The data is replaced by ranks. Ties are averaged - so that equal values receive the same rank. The resulting coefficient - measures the strength of a monotonic relationship. - - Spearman's rank correlation coefficient is appropriate for ordinal - data or for continuous data that doesn't meet the linear proportion - requirement for Pearson's correlation coefficient. """ n = len(x) if len(y) != n: raise StatisticsError('correlation requires that both inputs have same number of data points') if n < 2: raise StatisticsError('correlation requires at least two data points') - if method not in {'linear', 'ranked'}: - raise ValueError(f'Unknown method: {method!r}') - if method == 'ranked': - start = (n - 1) / -2 # Center rankings around zero - x = _rank(x, start=start) - y = _rank(y, start=start) - else: - xbar = fsum(x) / n - ybar = fsum(y) / n - x = [xi - xbar for xi in x] - y = [yi - ybar for yi in y] - sxy = sumprod(x, y) - sxx = sumprod(x, x) - syy = sumprod(y, y) + xbar = fsum(x) / n + ybar = fsum(y) / n + sxy = fsum((xi - xbar) * (yi - ybar) for xi, yi in zip(x, y)) + sxx = fsum((xi - xbar) ** 2.0 for xi in x) + syy = fsum((yi - ybar) ** 2.0 for yi in y) try: - return sxy / _sqrtprod(sxx, syy) + return sxy / sqrt(sxx * syy) except ZeroDivisionError: raise StatisticsError('at least one of the inputs is constant') @@ -1338,13 +922,13 @@ def correlation(x, y, /, *, method='linear'): LinearRegression = namedtuple('LinearRegression', ('slope', 'intercept')) -def linear_regression(x, y, /, *, proportional=False): +def linear_regression(x, y, /): """Slope and intercept for simple linear regression. Return the slope and intercept of simple linear regression parameters estimated using ordinary least squares. Simple linear regression describes relationship between an independent variable - *x* and a dependent variable *y* in terms of a linear function: + *x* and a dependent variable *y* in terms of linear function: y = slope * x + intercept + noise @@ -1360,20 +944,7 @@ def linear_regression(x, y, /, *, proportional=False): >>> noise = NormalDist().samples(5, seed=42) >>> y = [3 * x[i] + 2 + noise[i] for i in range(5)] >>> linear_regression(x, y) #doctest: +ELLIPSIS - LinearRegression(slope=3.17495..., intercept=1.00925...) - - If *proportional* is true, the independent variable *x* and the - dependent variable *y* are assumed to be directly proportional. - The data is fit to a line passing through the origin. - - Since the *intercept* will always be 0.0, the underlying linear - function simplifies to: - - y = slope * x + noise - - >>> y = [3 * x[i] + noise[i] for i in range(5)] - >>> linear_regression(x, y, proportional=True) #doctest: +ELLIPSIS - LinearRegression(slope=2.90475..., intercept=0.0) + LinearRegression(slope=3.09078914170..., intercept=1.75684970486...) """ n = len(x) @@ -1381,18 +952,15 @@ def linear_regression(x, y, /, *, proportional=False): raise StatisticsError('linear regression requires that both inputs have same number of data points') if n < 2: raise StatisticsError('linear regression requires at least two data points') - if not proportional: - xbar = fsum(x) / n - ybar = fsum(y) / n - x = [xi - xbar for xi in x] # List because used three times below - y = (yi - ybar for yi in y) # Generator because only used once below - sxy = sumprod(x, y) + 0.0 # Add zero to coerce result to a float - sxx = sumprod(x, x) + xbar = fsum(x) / n + ybar = fsum(y) / n + sxy = fsum((xi - xbar) * (yi - ybar) for xi, yi in zip(x, y)) + sxx = fsum((xi - xbar) ** 2.0 for xi in x) try: slope = sxy / sxx # equivalent to: covariance(x, y) / variance(x) except ZeroDivisionError: raise StatisticsError('x is constant') - intercept = 0.0 if proportional else ybar - slope * xbar + intercept = ybar - slope * xbar return LinearRegression(slope=slope, intercept=intercept) @@ -1500,29 +1068,29 @@ def __init__(self, mu=0.0, sigma=1.0): @classmethod def from_samples(cls, data): "Make a normal distribution instance from sample data." - return cls(*_mean_stdev(data)) + if not isinstance(data, (list, tuple)): + data = list(data) + xbar = fmean(data) + return cls(xbar, stdev(data, xbar)) def samples(self, n, *, seed=None): "Generate *n* samples for a given mean and standard deviation." - rnd = random.random if seed is None else random.Random(seed).random - inv_cdf = _normal_dist_inv_cdf - mu = self._mu - sigma = self._sigma - return [inv_cdf(rnd(), mu, sigma) for _ in repeat(None, n)] + gauss = random.gauss if seed is None else random.Random(seed).gauss + mu, sigma = self._mu, self._sigma + return [gauss(mu, sigma) for i in range(n)] def pdf(self, x): "Probability density function. P(x <= X < x+dx) / dx" - variance = self._sigma * self._sigma + variance = self._sigma ** 2.0 if not variance: raise StatisticsError('pdf() not defined when sigma is zero') - diff = x - self._mu - return exp(diff * diff / (-2.0 * variance)) / sqrt(tau * variance) + return exp((x - self._mu)**2.0 / (-2.0*variance)) / sqrt(tau*variance) def cdf(self, x): "Cumulative distribution function. P(X <= x)" if not self._sigma: raise StatisticsError('cdf() not defined when sigma is zero') - return 0.5 * (1.0 + erf((x - self._mu) / (self._sigma * _SQRT2))) + return 0.5 * (1.0 + erf((x - self._mu) / (self._sigma * sqrt(2.0)))) def inv_cdf(self, p): """Inverse cumulative distribution function. x : P(X <= x) = p @@ -1536,6 +1104,8 @@ def inv_cdf(self, p): """ if p <= 0.0 or p >= 1.0: raise StatisticsError('p must be in the range 0.0 < p < 1.0') + if self._sigma <= 0.0: + raise StatisticsError('cdf() not defined when sigma at or below zero') return _normal_dist_inv_cdf(p, self._mu, self._sigma) def quantiles(self, n=4): @@ -1576,9 +1146,9 @@ def overlap(self, other): dv = Y_var - X_var dm = fabs(Y._mu - X._mu) if not dv: - return 1.0 - erf(dm / (2.0 * X._sigma * _SQRT2)) + return 1.0 - erf(dm / (2.0 * X._sigma * sqrt(2.0))) a = X._mu * Y_var - Y._mu * X_var - b = X._sigma * Y._sigma * sqrt(dm * dm + dv * log(Y_var / X_var)) + b = X._sigma * Y._sigma * sqrt(dm**2.0 + dv * log(Y_var / X_var)) x1 = (a + b) / dv x2 = (a - b) / dv return 1.0 - (fabs(Y.cdf(x1) - X.cdf(x1)) + fabs(Y.cdf(x2) - X.cdf(x2))) @@ -1621,7 +1191,7 @@ def stdev(self): @property def variance(self): "Square of the standard deviation." - return self._sigma * self._sigma + return self._sigma ** 2.0 def __add__(x1, x2): """Add a constant or another NormalDist instance. @@ -1695,102 +1265,3 @@ def __hash__(self): def __repr__(self): return f'{type(self).__name__}(mu={self._mu!r}, sigma={self._sigma!r})' - - def __getstate__(self): - return self._mu, self._sigma - - def __setstate__(self, state): - self._mu, self._sigma = state - - -## kde_random() ############################################################## - -def _newton_raphson(f_inv_estimate, f, f_prime, tolerance=1e-12): - def f_inv(y): - "Return x such that f(x) ≈ y within the specified tolerance." - x = f_inv_estimate(y) - while abs(diff := f(x) - y) > tolerance: - x -= diff / f_prime(x) - return x - return f_inv - -def _quartic_invcdf_estimate(p): - sign, p = (1.0, p) if p <= 1/2 else (-1.0, 1.0 - p) - x = (2.0 * p) ** 0.4258865685331 - 1.0 - if p >= 0.004 < 0.499: - x += 0.026818732 * sin(7.101753784 * p + 2.73230839482953) - return x * sign - -_quartic_invcdf = _newton_raphson( - f_inv_estimate = _quartic_invcdf_estimate, - f = lambda t: 3/16 * t**5 - 5/8 * t**3 + 15/16 * t + 1/2, - f_prime = lambda t: 15/16 * (1.0 - t * t) ** 2) - -def _triweight_invcdf_estimate(p): - sign, p = (1.0, p) if p <= 1/2 else (-1.0, 1.0 - p) - x = (2.0 * p) ** 0.3400218741872791 - 1.0 - return x * sign - -_triweight_invcdf = _newton_raphson( - f_inv_estimate = _triweight_invcdf_estimate, - f = lambda t: 35/32 * (-1/7*t**7 + 3/5*t**5 - t**3 + t) + 1/2, - f_prime = lambda t: 35/32 * (1.0 - t * t) ** 3) - -_kernel_invcdfs = { - 'normal': NormalDist().inv_cdf, - 'logistic': lambda p: log(p / (1 - p)), - 'sigmoid': lambda p: log(tan(p * pi/2)), - 'rectangular': lambda p: 2*p - 1, - 'parabolic': lambda p: 2 * cos((acos(2*p-1) + pi) / 3), - 'quartic': _quartic_invcdf, - 'triweight': _triweight_invcdf, - 'triangular': lambda p: sqrt(2*p) - 1 if p < 1/2 else 1 - sqrt(2 - 2*p), - 'cosine': lambda p: 2 * asin(2*p - 1) / pi, -} -_kernel_invcdfs['gauss'] = _kernel_invcdfs['normal'] -_kernel_invcdfs['uniform'] = _kernel_invcdfs['rectangular'] -_kernel_invcdfs['epanechnikov'] = _kernel_invcdfs['parabolic'] -_kernel_invcdfs['biweight'] = _kernel_invcdfs['quartic'] - -def kde_random(data, h, kernel='normal', *, seed=None): - """Return a function that makes a random selection from the estimated - probability density function created by kde(data, h, kernel). - - Providing a *seed* allows reproducible selections within a single - thread. The seed may be an integer, float, str, or bytes. - - A StatisticsError will be raised if the *data* sequence is empty. - - Example: - - >>> data = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2] - >>> rand = kde_random(data, h=1.5, seed=8675309) - >>> new_selections = [rand() for i in range(10)] - >>> [round(x, 1) for x in new_selections] - [0.7, 6.2, 1.2, 6.9, 7.0, 1.8, 2.5, -0.5, -1.8, 5.6] - - """ - n = len(data) - if not n: - raise StatisticsError('Empty data sequence') - - if not isinstance(data[0], (int, float)): - raise TypeError('Data sequence must contain ints or floats') - - if h <= 0.0: - raise StatisticsError(f'Bandwidth h must be positive, not {h=!r}') - - kernel_invcdf = _kernel_invcdfs.get(kernel) - if kernel_invcdf is None: - raise StatisticsError(f'Unknown kernel name: {kernel!r}') - - prng = _random.Random(seed) - random = prng.random - choice = prng.choice - - def rand(): - return choice(data) + h * kernel_invcdf(random()) - - rand.__doc__ = f'Random KDE selection with {h=!r} and {kernel=!r}' - - return rand diff --git a/Lib/test/test_statistics.py b/Lib/test/test_statistics.py index 7b0dfd05e0..8fcbcf3540 100644 --- a/Lib/test/test_statistics.py +++ b/Lib/test/test_statistics.py @@ -1,4 +1,4 @@ -x = """Test suite for statistics module, including helper NumericTestCase and +"""Test suite for statistics module, including helper NumericTestCase and approx_equal function. """ @@ -9,14 +9,13 @@ import copy import decimal import doctest -import itertools import math import pickle import random import sys import unittest from test import support -from test.support import import_helper, requires_IEEE_754 +from test.support import import_helper from decimal import Decimal from fractions import Fraction @@ -28,12 +27,6 @@ # === Helper functions and class === -# Test copied from Lib/test/test_math.py -# detect evidence of double-rounding: fsum is not always correctly -# rounded on machines that suffer from double rounding. -x, y = 1e16, 2.9999 # use temporary values to defeat peephole optimizer -HAVE_DOUBLE_ROUNDING = (x + y == 1e16 + 4) - def sign(x): """Return -1.0 for negatives, including -0.0, otherwise +1.0.""" return math.copysign(1, x) @@ -698,6 +691,14 @@ def test_check_all(self): 'missing name "%s" in __all__' % name) +class DocTests(unittest.TestCase): + @unittest.skipIf(sys.flags.optimize >= 2, + "Docstrings are omitted with -OO and above") + def test_doc_tests(self): + failed, tried = doctest.testmod(statistics, optionflags=doctest.ELLIPSIS) + self.assertGreater(tried, 0) + self.assertEqual(failed, 0) + class StatisticsErrorTest(unittest.TestCase): def test_has_exception(self): errmsg = ( @@ -1038,6 +1039,50 @@ def test_error_msg(self): self.assertEqual(errmsg, msg) +class FindLteqTest(unittest.TestCase): + # Test _find_lteq private function. + + def test_invalid_input_values(self): + for a, x in [ + ([], 1), + ([1, 2], 3), + ([1, 3], 2) + ]: + with self.subTest(a=a, x=x): + with self.assertRaises(ValueError): + statistics._find_lteq(a, x) + + def test_locate_successfully(self): + for a, x, expected_i in [ + ([1, 1, 1, 2, 3], 1, 0), + ([0, 1, 1, 1, 2, 3], 1, 1), + ([1, 2, 3, 3, 3], 3, 2) + ]: + with self.subTest(a=a, x=x): + self.assertEqual(expected_i, statistics._find_lteq(a, x)) + + +class FindRteqTest(unittest.TestCase): + # Test _find_rteq private function. + + def test_invalid_input_values(self): + for a, l, x in [ + ([1], 2, 1), + ([1, 3], 0, 2) + ]: + with self.assertRaises(ValueError): + statistics._find_rteq(a, l, x) + + def test_locate_successfully(self): + for a, l, x, expected_i in [ + ([1, 1, 1, 2, 3], 0, 1, 2), + ([0, 1, 1, 1, 2, 3], 0, 1, 3), + ([1, 2, 3, 3, 3], 0, 3, 4) + ]: + with self.subTest(a=a, l=l, x=x): + self.assertEqual(expected_i, statistics._find_rteq(a, l, x)) + + # === Tests for public functions === class UnivariateCommonMixin: @@ -1072,7 +1117,7 @@ def test_no_inplace_modifications(self): def test_order_doesnt_matter(self): # Test that the order of data points doesn't change the result. - # CAUTION: due to floating-point rounding errors, the result actually + # CAUTION: due to floating point rounding errors, the result actually # may depend on the order. Consider this test representing an ideal. # To avoid this test failing, only test with exact values such as ints # or Fractions. @@ -1165,9 +1210,6 @@ def __pow__(self, other): def __add__(self, other): return type(self)(super().__add__(other)) __radd__ = __add__ - def __mul__(self, other): - return type(self)(super().__mul__(other)) - __rmul__ = __mul__ return (float, Decimal, Fraction, MyFloat) def test_types_conserved(self): @@ -1740,12 +1782,6 @@ def test_repeated_single_value(self): data = [x]*count self.assertEqual(self.func(data), float(x)) - def test_single_value(self): - # Override method from AverageMixin. - # Average of a single value is the value as a float. - for x in (23, 42.5, 1.3e15, Fraction(15, 19), Decimal('0.28')): - self.assertEqual(self.func([x]), float(x)) - def test_odd_fractions(self): # Test median_grouped works with an odd number of Fractions. F = Fraction @@ -1925,27 +1961,6 @@ def test_special_values(self): with self.assertRaises(ValueError): fmean([Inf, -Inf]) - def test_weights(self): - fmean = statistics.fmean - StatisticsError = statistics.StatisticsError - self.assertEqual( - fmean([10, 10, 10, 50], [0.25] * 4), - fmean([10, 10, 10, 50])) - self.assertEqual( - fmean([10, 10, 20], [0.25, 0.25, 0.50]), - fmean([10, 10, 20, 20])) - self.assertEqual( # inputs are iterators - fmean(iter([10, 10, 20]), iter([0.25, 0.25, 0.50])), - fmean([10, 10, 20, 20])) - with self.assertRaises(StatisticsError): - fmean([10, 20, 30], [1, 2]) # unequal lengths - with self.assertRaises(StatisticsError): - fmean(iter([10, 20, 30]), iter([1, 2])) # unequal lengths - with self.assertRaises(StatisticsError): - fmean([10, 20], [-1, 1]) # sum of weights is zero - with self.assertRaises(StatisticsError): - fmean(iter([10, 20]), iter([-1, 1])) # sum of weights is zero - # === Tests for variances and standard deviations === @@ -2122,104 +2137,6 @@ def test_center_not_at_mean(self): self.assertEqual(self.func(data), 2.5) self.assertEqual(self.func(data, mu=0.5), 6.5) -class TestSqrtHelpers(unittest.TestCase): - - def test_integer_sqrt_of_frac_rto(self): - for n, m in itertools.product(range(100), range(1, 1000)): - r = statistics._integer_sqrt_of_frac_rto(n, m) - self.assertIsInstance(r, int) - if r*r*m == n: - # Root is exact - continue - # Inexact, so the root should be odd - self.assertEqual(r&1, 1) - # Verify correct rounding - self.assertTrue(m * (r - 1)**2 < n < m * (r + 1)**2) - - @requires_IEEE_754 - @support.requires_resource('cpu') - def test_float_sqrt_of_frac(self): - - def is_root_correctly_rounded(x: Fraction, root: float) -> bool: - if not x: - return root == 0.0 - - # Extract adjacent representable floats - r_up: float = math.nextafter(root, math.inf) - r_down: float = math.nextafter(root, -math.inf) - assert r_down < root < r_up - - # Convert to fractions for exact arithmetic - frac_root: Fraction = Fraction(root) - half_way_up: Fraction = (frac_root + Fraction(r_up)) / 2 - half_way_down: Fraction = (frac_root + Fraction(r_down)) / 2 - - # Check a closed interval. - # Does not test for a midpoint rounding rule. - return half_way_down ** 2 <= x <= half_way_up ** 2 - - randrange = random.randrange - - for i in range(60_000): - numerator: int = randrange(10 ** randrange(50)) - denonimator: int = randrange(10 ** randrange(50)) + 1 - with self.subTest(numerator=numerator, denonimator=denonimator): - x: Fraction = Fraction(numerator, denonimator) - root: float = statistics._float_sqrt_of_frac(numerator, denonimator) - self.assertTrue(is_root_correctly_rounded(x, root)) - - # Verify that corner cases and error handling match math.sqrt() - self.assertEqual(statistics._float_sqrt_of_frac(0, 1), 0.0) - with self.assertRaises(ValueError): - statistics._float_sqrt_of_frac(-1, 1) - with self.assertRaises(ValueError): - statistics._float_sqrt_of_frac(1, -1) - - # Error handling for zero denominator matches that for Fraction(1, 0) - with self.assertRaises(ZeroDivisionError): - statistics._float_sqrt_of_frac(1, 0) - - # The result is well defined if both inputs are negative - self.assertEqual(statistics._float_sqrt_of_frac(-2, -1), statistics._float_sqrt_of_frac(2, 1)) - - def test_decimal_sqrt_of_frac(self): - root: Decimal - numerator: int - denominator: int - - for root, numerator, denominator in [ - (Decimal('0.4481904599041192673635338663'), 200874688349065940678243576378, 1000000000000000000000000000000), # No adj - (Decimal('0.7924949131383786609961759598'), 628048187350206338833590574929, 1000000000000000000000000000000), # Adj up - (Decimal('0.8500554152289934068192208727'), 722594208960136395984391238251, 1000000000000000000000000000000), # Adj down - ]: - with decimal.localcontext(decimal.DefaultContext): - self.assertEqual(statistics._decimal_sqrt_of_frac(numerator, denominator), root) - - # Confirm expected root with a quad precision decimal computation - with decimal.localcontext(decimal.DefaultContext) as ctx: - ctx.prec *= 4 - high_prec_ratio = Decimal(numerator) / Decimal(denominator) - ctx.rounding = decimal.ROUND_05UP - high_prec_root = high_prec_ratio.sqrt() - with decimal.localcontext(decimal.DefaultContext): - target_root = +high_prec_root - self.assertEqual(root, target_root) - - # Verify that corner cases and error handling match Decimal.sqrt() - self.assertEqual(statistics._decimal_sqrt_of_frac(0, 1), 0.0) - with self.assertRaises(decimal.InvalidOperation): - statistics._decimal_sqrt_of_frac(-1, 1) - with self.assertRaises(decimal.InvalidOperation): - statistics._decimal_sqrt_of_frac(1, -1) - - # Error handling for zero denominator matches that for Fraction(1, 0) - with self.assertRaises(ZeroDivisionError): - statistics._decimal_sqrt_of_frac(1, 0) - - # The result is well defined if both inputs are negative - self.assertEqual(statistics._decimal_sqrt_of_frac(-2, -1), statistics._decimal_sqrt_of_frac(2, 1)) - - class TestStdev(VarianceStdevMixin, NumericTestCase): # Tests for sample standard deviation. def setUp(self): @@ -2234,7 +2151,7 @@ def test_compare_to_variance(self): # Test that stdev is, in fact, the square root of variance. data = [random.uniform(-2, 9) for _ in range(1000)] expected = math.sqrt(statistics.variance(data)) - self.assertAlmostEqual(self.func(data), expected) + self.assertEqual(self.func(data), expected) def test_center_not_at_mean(self): data = (1.0, 2.0) @@ -2303,11 +2220,9 @@ def test_error_cases(self): with self.assertRaises(StatisticsError): geometric_mean([]) # empty input with self.assertRaises(StatisticsError): - geometric_mean([3.5, -4.0, 5.25]) # negative input - with self.assertRaises(StatisticsError): - geometric_mean([0.0, -4.0, 5.25]) # negative input with zero + geometric_mean([3.5, 0.0, 5.25]) # zero input with self.assertRaises(StatisticsError): - geometric_mean([3.5, -math.inf, 5.25]) # negative infinity + geometric_mean([3.5, -4.0, 5.25]) # negative input with self.assertRaises(StatisticsError): geometric_mean(iter([])) # empty iterator with self.assertRaises(TypeError): @@ -2330,200 +2245,6 @@ def test_special_values(self): with self.assertRaises(ValueError): geometric_mean([Inf, -Inf]) - # Cases with zero - self.assertEqual(geometric_mean([3, 0.0, 5]), 0.0) # Any zero gives a zero - self.assertEqual(geometric_mean([3, -0.0, 5]), 0.0) # Negative zero allowed - self.assertTrue(math.isnan(geometric_mean([0, NaN]))) # NaN beats zero - self.assertTrue(math.isnan(geometric_mean([0, Inf]))) # Because 0.0 * Inf -> NaN - - def test_mixed_int_and_float(self): - # Regression test for b.p.o. issue #28327 - geometric_mean = statistics.geometric_mean - expected_mean = 3.80675409583932 - values = [ - [2, 3, 5, 7], - [2, 3, 5, 7.0], - [2, 3, 5.0, 7.0], - [2, 3.0, 5.0, 7.0], - [2.0, 3.0, 5.0, 7.0], - ] - for v in values: - with self.subTest(v=v): - actual_mean = geometric_mean(v) - self.assertAlmostEqual(actual_mean, expected_mean, places=5) - - -class TestKDE(unittest.TestCase): - - def test_kde(self): - kde = statistics.kde - StatisticsError = statistics.StatisticsError - - kernels = ['normal', 'gauss', 'logistic', 'sigmoid', 'rectangular', - 'uniform', 'triangular', 'parabolic', 'epanechnikov', - 'quartic', 'biweight', 'triweight', 'cosine'] - - sample = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2] - - # The approximate integral of a PDF should be close to 1.0 - - def integrate(func, low, high, steps=10_000): - "Numeric approximation of a definite function integral." - dx = (high - low) / steps - midpoints = (low + (i + 1/2) * dx for i in range(steps)) - return sum(map(func, midpoints)) * dx - - for kernel in kernels: - with self.subTest(kernel=kernel): - f_hat = kde(sample, h=1.5, kernel=kernel) - area = integrate(f_hat, -20, 20) - self.assertAlmostEqual(area, 1.0, places=4) - - # Check CDF against an integral of the PDF - - data = [3, 5, 10, 12] - h = 2.3 - x = 10.5 - for kernel in kernels: - with self.subTest(kernel=kernel): - cdf = kde(data, h, kernel, cumulative=True) - f_hat = kde(data, h, kernel) - area = integrate(f_hat, -20, x, 100_000) - self.assertAlmostEqual(cdf(x), area, places=4) - - # Check error cases - - with self.assertRaises(StatisticsError): - kde([], h=1.0) # Empty dataset - with self.assertRaises(TypeError): - kde(['abc', 'def'], 1.5) # Non-numeric data - with self.assertRaises(TypeError): - kde(iter(sample), 1.5) # Data is not a sequence - with self.assertRaises(StatisticsError): - kde(sample, h=0.0) # Zero bandwidth - with self.assertRaises(StatisticsError): - kde(sample, h=-1.0) # Negative bandwidth - with self.assertRaises(TypeError): - kde(sample, h='str') # Wrong bandwidth type - with self.assertRaises(StatisticsError): - kde(sample, h=1.0, kernel='bogus') # Invalid kernel - with self.assertRaises(TypeError): - kde(sample, 1.0, 'gauss', True) # Positional cumulative argument - - # Test name and docstring of the generated function - - h = 1.5 - kernel = 'cosine' - f_hat = kde(sample, h, kernel) - self.assertEqual(f_hat.__name__, 'pdf') - self.assertIn(kernel, f_hat.__doc__) - self.assertIn(repr(h), f_hat.__doc__) - - # Test closed interval for the support boundaries. - # In particular, 'uniform' should non-zero at the boundaries. - - f_hat = kde([0], 1.0, 'uniform') - self.assertEqual(f_hat(-1.0), 1/2) - self.assertEqual(f_hat(1.0), 1/2) - - # Test online updates to data - - data = [1, 2] - f_hat = kde(data, 5.0, 'triangular') - self.assertEqual(f_hat(100), 0.0) - data.append(100) - self.assertGreater(f_hat(100), 0.0) - - def test_kde_kernel_invcdfs(self): - kernel_invcdfs = statistics._kernel_invcdfs - kde = statistics.kde - - # Verify that cdf / invcdf will round trip - xarr = [i/100 for i in range(-100, 101)] - for kernel, invcdf in kernel_invcdfs.items(): - with self.subTest(kernel=kernel): - cdf = kde([0.0], h=1.0, kernel=kernel, cumulative=True) - for x in xarr: - self.assertAlmostEqual(invcdf(cdf(x)), x, places=5) - - @support.requires_resource('cpu') - def test_kde_random(self): - kde_random = statistics.kde_random - StatisticsError = statistics.StatisticsError - kernels = ['normal', 'gauss', 'logistic', 'sigmoid', 'rectangular', - 'uniform', 'triangular', 'parabolic', 'epanechnikov', - 'quartic', 'biweight', 'triweight', 'cosine'] - sample = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2] - - # Smoke test - - for kernel in kernels: - with self.subTest(kernel=kernel): - rand = kde_random(sample, h=1.5, kernel=kernel) - selections = [rand() for i in range(10)] - - # Check error cases - - with self.assertRaises(StatisticsError): - kde_random([], h=1.0) # Empty dataset - with self.assertRaises(TypeError): - kde_random(['abc', 'def'], 1.5) # Non-numeric data - with self.assertRaises(TypeError): - kde_random(iter(sample), 1.5) # Data is not a sequence - with self.assertRaises(StatisticsError): - kde_random(sample, h=-1.0) # Zero bandwidth - with self.assertRaises(StatisticsError): - kde_random(sample, h=0.0) # Negative bandwidth - with self.assertRaises(TypeError): - kde_random(sample, h='str') # Wrong bandwidth type - with self.assertRaises(StatisticsError): - kde_random(sample, h=1.0, kernel='bogus') # Invalid kernel - - # Test name and docstring of the generated function - - h = 1.5 - kernel = 'cosine' - rand = kde_random(sample, h, kernel) - self.assertEqual(rand.__name__, 'rand') - self.assertIn(kernel, rand.__doc__) - self.assertIn(repr(h), rand.__doc__) - - # Approximate distribution test: Compare a random sample to the expected distribution - - data = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2, 7.8, 14.3, 15.1, 15.3, 15.8, 17.0] - xarr = [x / 10 for x in range(-100, 250)] - n = 1_000_000 - h = 1.75 - dx = 0.1 - - def p_observed(x): - # P(x <= X < x+dx) - i = bisect.bisect_left(big_sample, x) - j = bisect.bisect_left(big_sample, x + dx) - return (j - i) / len(big_sample) - - def p_expected(x): - # P(x <= X < x+dx) - return F_hat(x + dx) - F_hat(x) - - for kernel in kernels: - with self.subTest(kernel=kernel): - - rand = kde_random(data, h, kernel, seed=8675309**2) - big_sample = sorted([rand() for i in range(n)]) - F_hat = statistics.kde(data, h, kernel, cumulative=True) - - for x in xarr: - self.assertTrue(math.isclose(p_observed(x), p_expected(x), abs_tol=0.0005)) - - # Test online updates to data - - data = [1, 2] - rand = kde_random(data, 5, 'triangular') - self.assertLess(max([rand() for i in range(5000)]), 10) - data.append(100) - self.assertGreater(max(rand() for i in range(5000)), 10) - class TestQuantiles(unittest.TestCase): @@ -2634,11 +2355,6 @@ def f(x): data = random.choices(range(100), k=k) q1, q2, q3 = quantiles(data, method='inclusive') self.assertEqual(q2, statistics.median(data)) - # Base case with a single data point: When estimating quantiles from - # a sample, we want to be able to add one sample point at a time, - # getting increasingly better estimates. - self.assertEqual(quantiles([10], n=4), [10.0, 10.0, 10.0]) - self.assertEqual(quantiles([10], n=4, method='exclusive'), [10.0, 10.0, 10.0]) def test_equal_inputs(self): quantiles = statistics.quantiles @@ -2689,7 +2405,7 @@ def test_error_cases(self): with self.assertRaises(ValueError): quantiles([10, 20, 30], method='X') # method is unknown with self.assertRaises(StatisticsError): - quantiles([], n=4) # not enough data points + quantiles([10], n=4) # not enough data points with self.assertRaises(TypeError): quantiles([10, None, 30], n=4) # data is non-numeric @@ -2748,95 +2464,6 @@ def test_different_scales(self): self.assertAlmostEqual(statistics.correlation(x, y), 1) self.assertAlmostEqual(statistics.covariance(x, y), 0.1) - def test_sqrtprod_helper_function_fundamentals(self): - # Verify that results are close to sqrt(x * y) - for i in range(100): - x = random.expovariate() - y = random.expovariate() - expected = math.sqrt(x * y) - actual = statistics._sqrtprod(x, y) - with self.subTest(x=x, y=y, expected=expected, actual=actual): - self.assertAlmostEqual(expected, actual) - - x, y, target = 0.8035720646477457, 0.7957468097636939, 0.7996498651651661 - self.assertEqual(statistics._sqrtprod(x, y), target) - self.assertNotEqual(math.sqrt(x * y), target) - - # Test that range extremes avoid underflow and overflow - smallest = sys.float_info.min * sys.float_info.epsilon - self.assertEqual(statistics._sqrtprod(smallest, smallest), smallest) - biggest = sys.float_info.max - self.assertEqual(statistics._sqrtprod(biggest, biggest), biggest) - - # Check special values and the sign of the result - special_values = [0.0, -0.0, 1.0, -1.0, 4.0, -4.0, - math.nan, -math.nan, math.inf, -math.inf] - for x, y in itertools.product(special_values, repeat=2): - try: - expected = math.sqrt(x * y) - except ValueError: - expected = 'ValueError' - try: - actual = statistics._sqrtprod(x, y) - except ValueError: - actual = 'ValueError' - with self.subTest(x=x, y=y, expected=expected, actual=actual): - if isinstance(expected, str) and expected == 'ValueError': - self.assertEqual(actual, 'ValueError') - continue - self.assertIsInstance(actual, float) - if math.isnan(expected): - self.assertTrue(math.isnan(actual)) - continue - self.assertEqual(actual, expected) - self.assertEqual(sign(actual), sign(expected)) - - @requires_IEEE_754 - @unittest.skipIf(HAVE_DOUBLE_ROUNDING, - "accuracy not guaranteed on machines with double rounding") - @support.cpython_only # Allow for a weaker sumprod() implmentation - def test_sqrtprod_helper_function_improved_accuracy(self): - # Test a known example where accuracy is improved - x, y, target = 0.8035720646477457, 0.7957468097636939, 0.7996498651651661 - self.assertEqual(statistics._sqrtprod(x, y), target) - self.assertNotEqual(math.sqrt(x * y), target) - - def reference_value(x: float, y: float) -> float: - x = decimal.Decimal(x) - y = decimal.Decimal(y) - with decimal.localcontext() as ctx: - ctx.prec = 200 - return float((x * y).sqrt()) - - # Verify that the new function with improved accuracy - # agrees with a reference value more often than old version. - new_agreements = 0 - old_agreements = 0 - for i in range(10_000): - x = random.expovariate() - y = random.expovariate() - new = statistics._sqrtprod(x, y) - old = math.sqrt(x * y) - ref = reference_value(x, y) - new_agreements += (new == ref) - old_agreements += (old == ref) - self.assertGreater(new_agreements, old_agreements) - - def test_correlation_spearman(self): - # https://statistics.laerd.com/statistical-guides/spearmans-rank-order-correlation-statistical-guide-2.php - # Compare with: - # >>> import scipy.stats.mstats - # >>> scipy.stats.mstats.spearmanr(reading, mathematics) - # SpearmanrResult(correlation=0.6686960980480712, pvalue=0.03450954165178532) - # And Wolfram Alpha gives: 0.668696 - # https://www.wolframalpha.com/input?i=SpearmanRho%5B%7B56%2C+75%2C+45%2C+71%2C+61%2C+64%2C+58%2C+80%2C+76%2C+61%7D%2C+%7B66%2C+70%2C+40%2C+60%2C+65%2C+56%2C+59%2C+77%2C+67%2C+63%7D%5D - reading = [56, 75, 45, 71, 61, 64, 58, 80, 76, 61] - mathematics = [66, 70, 40, 60, 65, 56, 59, 77, 67, 63] - self.assertAlmostEqual(statistics.correlation(reading, mathematics, method='ranked'), - 0.6686960980480712) - - with self.assertRaises(ValueError): - statistics.correlation(reading, mathematics, method='bad_method') class TestLinearRegression(unittest.TestCase): @@ -2860,22 +2487,6 @@ def test_results(self): self.assertAlmostEqual(intercept, true_intercept) self.assertAlmostEqual(slope, true_slope) - def test_proportional(self): - x = [10, 20, 30, 40] - y = [180, 398, 610, 799] - slope, intercept = statistics.linear_regression(x, y, proportional=True) - self.assertAlmostEqual(slope, 20 + 1/150) - self.assertEqual(intercept, 0.0) - - def test_float_output(self): - x = [Fraction(2, 3), Fraction(3, 4)] - y = [Fraction(4, 5), Fraction(5, 6)] - slope, intercept = statistics.linear_regression(x, y) - self.assertTrue(isinstance(slope, float)) - self.assertTrue(isinstance(intercept, float)) - slope, intercept = statistics.linear_regression(x, y, proportional=True) - self.assertTrue(isinstance(slope, float)) - self.assertTrue(isinstance(intercept, float)) class TestNormalDist: @@ -3029,8 +2640,6 @@ def test_cdf(self): self.assertTrue(math.isnan(X.cdf(float('NaN')))) @support.skip_if_pgo_task - @support.requires_resource('cpu') - @unittest.skip("TODO: RUSTPYTHON Flaky") def test_inv_cdf(self): NormalDist = self.module.NormalDist @@ -3088,10 +2697,9 @@ def test_inv_cdf(self): iq.inv_cdf(1.0) # p is one with self.assertRaises(self.module.StatisticsError): iq.inv_cdf(1.1) # p over one - - # Supported case: - iq = NormalDist(100, 0) # sigma is zero - self.assertEqual(iq.inv_cdf(0.5), 100) + with self.assertRaises(self.module.StatisticsError): + iq = NormalDist(100, 0) # sigma is zero + iq.inv_cdf(0.5) # Special values self.assertTrue(math.isnan(Z.inv_cdf(float('NaN')))) @@ -3274,19 +2882,14 @@ def __init__(self, mu, sigma): nd = NormalDist(100, 15) self.assertNotEqual(nd, lnd) - def test_copy(self): + def test_pickle_and_copy(self): nd = self.module.NormalDist(37.5, 5.625) nd1 = copy.copy(nd) self.assertEqual(nd, nd1) nd2 = copy.deepcopy(nd) self.assertEqual(nd, nd2) - - def test_pickle(self): - nd = self.module.NormalDist(37.5, 5.625) - for proto in range(pickle.HIGHEST_PROTOCOL + 1): - with self.subTest(proto=proto): - pickled = pickle.loads(pickle.dumps(nd, protocol=proto)) - self.assertEqual(nd, pickled) + nd3 = pickle.loads(pickle.dumps(nd)) + self.assertEqual(nd, nd3) def test_hashability(self): ND = self.module.NormalDist @@ -3308,7 +2911,7 @@ def setUp(self): def tearDown(self): sys.modules['statistics'] = statistics - + @unittest.skipUnless(c_statistics, 'requires _statistics') class TestNormalDistC(unittest.TestCase, TestNormalDist): @@ -3325,7 +2928,6 @@ def tearDown(self): def load_tests(loader, tests, ignore): """Used for doctest/unittest integration.""" tests.addTests(doctest.DocTestSuite()) - tests.addTests(doctest.DocTestSuite(statistics)) return tests