Skip to content

MAINT cleanup utils.__init__: move chunking tools into dedicated submodule #28516

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sklearn/cluster/_optics.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
from ..metrics import pairwise_distances
from ..metrics.pairwise import _VALID_METRICS, PAIRWISE_BOOLEAN_FUNCTIONS
from ..neighbors import NearestNeighbors
from ..utils import gen_batches, get_chunk_n_rows
from ..utils import gen_batches
from ..utils._chunking import get_chunk_n_rows
from ..utils._param_validation import (
HasMethods,
Interval,
Expand Down
4 changes: 2 additions & 2 deletions sklearn/datasets/_arff_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from ..externals import _arff
from ..externals._arff import ArffSparseDataType
from ..utils import _chunk_generator, get_chunk_n_rows
from ..utils._chunking import chunk_generator, get_chunk_n_rows
from ..utils._optional_dependencies import check_pandas_support
from ..utils.fixes import pd_fillna

Expand Down Expand Up @@ -192,7 +192,7 @@ def _io_to_generator(gzip_file):
# read arff data with chunks
columns_to_keep = [col for col in columns_names if col in columns_to_select]
dfs = [first_df[columns_to_keep]]
for data in _chunk_generator(arff_container["data"], chunksize):
for data in chunk_generator(arff_container["data"], chunksize):
dfs.append(
pd.DataFrame(data, columns=columns_names, copy=False)[columns_to_keep]
)
Expand Down
2 changes: 1 addition & 1 deletion sklearn/ensemble/_iforest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
check_array,
check_random_state,
gen_batches,
get_chunk_n_rows,
)
from ..utils._chunking import get_chunk_n_rows
from ..utils._param_validation import Interval, RealNotInt, StrOptions
from ..utils.validation import _num_samples, check_is_fitted
from ._bagging import BaseBagging
Expand Down
2 changes: 1 addition & 1 deletion sklearn/metrics/pairwise.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
check_array,
gen_batches,
gen_even_slices,
get_chunk_n_rows,
)
from ..utils._chunking import get_chunk_n_rows
from ..utils._mask import _get_mask
from ..utils._missing import is_scalar_nan
from ..utils._param_validation import (
Expand Down
173 changes: 4 additions & 169 deletions sklearn/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
import numpy as np
from scipy.sparse import issparse

from .. import get_config
from ..exceptions import DataConversionWarning
from . import _joblib, metadata_routing
from ._bunch import Bunch
from ._chunking import gen_batches, gen_even_slices
from ._estimator_html_repr import estimator_html_repr
from ._param_validation import Integral, Interval, validate_params
from ._param_validation import Interval, validate_params
from .class_weight import compute_class_weight, compute_sample_weight
from .deprecation import deprecated
from .discovery import all_estimators
Expand Down Expand Up @@ -76,6 +76,8 @@
"Bunch",
"metadata_routing",
"safe_sqr",
"gen_batches",
"gen_even_slices",
]

IS_PYPY = platform.python_implementation() == "PyPy"
Expand Down Expand Up @@ -745,132 +747,6 @@ def shuffle(*arrays, random_state=None, n_samples=None):
)


def _chunk_generator(gen, chunksize):
"""Chunk generator, ``gen`` into lists of length ``chunksize``. The last
chunk may have a length less than ``chunksize``."""
while True:
chunk = list(islice(gen, chunksize))
if chunk:
yield chunk
else:
return


@validate_params(
{
"n": [Interval(numbers.Integral, 1, None, closed="left")],
"batch_size": [Interval(numbers.Integral, 1, None, closed="left")],
"min_batch_size": [Interval(numbers.Integral, 0, None, closed="left")],
},
prefer_skip_nested_validation=True,
)
def gen_batches(n, batch_size, *, min_batch_size=0):
"""Generator to create slices containing `batch_size` elements from 0 to `n`.

The last slice may contain less than `batch_size` elements, when
`batch_size` does not divide `n`.

Parameters
----------
n : int
Size of the sequence.
batch_size : int
Number of elements in each batch.
min_batch_size : int, default=0
Minimum number of elements in each batch.

Yields
------
slice of `batch_size` elements

See Also
--------
gen_even_slices: Generator to create n_packs slices going up to n.

Examples
--------
>>> from sklearn.utils import gen_batches
>>> list(gen_batches(7, 3))
[slice(0, 3, None), slice(3, 6, None), slice(6, 7, None)]
>>> list(gen_batches(6, 3))
[slice(0, 3, None), slice(3, 6, None)]
>>> list(gen_batches(2, 3))
[slice(0, 2, None)]
>>> list(gen_batches(7, 3, min_batch_size=0))
[slice(0, 3, None), slice(3, 6, None), slice(6, 7, None)]
>>> list(gen_batches(7, 3, min_batch_size=2))
[slice(0, 3, None), slice(3, 7, None)]
"""
start = 0
for _ in range(int(n // batch_size)):
end = start + batch_size
if end + min_batch_size > n:
continue
yield slice(start, end)
start = end
if start < n:
yield slice(start, n)


@validate_params(
{
"n": [Interval(Integral, 1, None, closed="left")],
"n_packs": [Interval(Integral, 1, None, closed="left")],
"n_samples": [Interval(Integral, 1, None, closed="left"), None],
},
prefer_skip_nested_validation=True,
)
def gen_even_slices(n, n_packs, *, n_samples=None):
"""Generator to create `n_packs` evenly spaced slices going up to `n`.

If `n_packs` does not divide `n`, except for the first `n % n_packs`
slices, remaining slices may contain fewer elements.

Parameters
----------
n : int
Size of the sequence.
n_packs : int
Number of slices to generate.
n_samples : int, default=None
Number of samples. Pass `n_samples` when the slices are to be used for
sparse matrix indexing; slicing off-the-end raises an exception, while
it works for NumPy arrays.

Yields
------
`slice` representing a set of indices from 0 to n.

See Also
--------
gen_batches: Generator to create slices containing batch_size elements
from 0 to n.

Examples
--------
>>> from sklearn.utils import gen_even_slices
>>> list(gen_even_slices(10, 1))
[slice(0, 10, None)]
>>> list(gen_even_slices(10, 10))
[slice(0, 1, None), slice(1, 2, None), ..., slice(9, 10, None)]
>>> list(gen_even_slices(10, 5))
[slice(0, 2, None), slice(2, 4, None), ..., slice(8, 10, None)]
>>> list(gen_even_slices(10, 3))
[slice(0, 4, None), slice(4, 7, None), slice(7, 10, None)]
"""
start = 0
for pack_num in range(n_packs):
this_n = n // n_packs
if pack_num < n % n_packs:
this_n += 1
if this_n > 0:
end = start + this_n
if n_samples is not None:
end = min(n_samples, end)
yield slice(start, end, None)
start = end


def tosequence(x):
"""Cast iterable x to a Sequence, avoiding a copy if possible.

Expand Down Expand Up @@ -1012,44 +888,3 @@ def _print_elapsed_time(source, message=None):
start = timeit.default_timer()
yield
print(_message_with_time(source, message, timeit.default_timer() - start))


def get_chunk_n_rows(row_bytes, *, max_n_rows=None, working_memory=None):
"""Calculate how many rows can be processed within `working_memory`.

Parameters
----------
row_bytes : int
The expected number of bytes of memory that will be consumed
during the processing of each row.
max_n_rows : int, default=None
The maximum return value.
working_memory : int or float, default=None
The number of rows to fit inside this number of MiB will be
returned. When None (default), the value of
``sklearn.get_config()['working_memory']`` is used.

Returns
-------
int
The number of rows which can be processed within `working_memory`.

Warns
-----
Issues a UserWarning if `row_bytes exceeds `working_memory` MiB.
"""

if working_memory is None:
working_memory = get_config()["working_memory"]

chunk_n_rows = int(working_memory * (2**20) // row_bytes)
if max_n_rows is not None:
chunk_n_rows = min(chunk_n_rows, max_n_rows)
if chunk_n_rows < 1:
warnings.warn(
"Could not adhere to working_memory config. "
"Currently %.0fMiB, %.0fMiB required."
% (working_memory, np.ceil(row_bytes * 2**-20))
)
chunk_n_rows = 1
return chunk_n_rows
Loading