Skip to content

MNT avoid thread limit for non nested parts of KMeans #16499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions sklearn/cluster/_k_means_elkan.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import numpy as np
cimport numpy as np
from threadpoolctl import threadpool_limits
cimport cython
from cython cimport floating
from cython.parallel import prange, parallel
Expand All @@ -29,7 +30,19 @@ from ._k_means_fast cimport _center_shift
np.import_array()


def _init_bounds_dense(
# Threadpoolctl wrappers to limit the number of threads in second level of
# nested parallelism (i.e. BLAS) to avoid oversubsciption.
def elkan_iter_chunked_dense(*args, **kwargs):
with threadpool_limits(limits=1, user_api="blas"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts regarding using the CM that deep in the code, versus doing it sooner e.g. in _kmeans_single_lloyd after _init_centroids is called?

I guess we may miss oversubscription issues if one day we decide to re-write _inertia_*

Only curious what you think

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we may miss oversubscription issues if one day we decide to re-write inertia*

True, but right now it uses no prange and no blas so I don't see it changing soon :)
And more importantly, we should think of the implications each time we want to add a prange somewhere.

Thoughts regarding using the CM that deep in the code, versus doing it sooner e.g. in _kmeans_single_lloyd after _init_centroids is called?

I wanted to put it where we're sure there will be no BLAS call took inside the context manager. That way we make sure that we can change the python code more safely.

Besides, In _kmeans_single_elkan you have:

for i in range(max_iter):
    elkan_iter(...)
    ...
    euclidean_distances(...)  # BLAS

we don't want to include euclidean distances in the CM. So we have to put the CM around elkan_iter.

_elkan_iter_chunked_dense(*args, **kwargs)


def elkan_iter_chunked_sparse(*args, **kwargs):
with threadpool_limits(limits=1, user_api="blas"):
_elkan_iter_chunked_sparse(*args, **kwargs)


def init_bounds_dense(
np.ndarray[floating, ndim=2, mode='c'] X, # IN
floating[:, ::1] centers, # IN
floating[:, ::1] center_half_distances, # IN
Expand Down Expand Up @@ -99,7 +112,7 @@ def _init_bounds_dense(
upper_bounds[i] = min_dist


def _init_bounds_sparse(
def init_bounds_sparse(
X, # IN
floating[:, ::1] centers, # IN
floating[:, ::1] center_half_distances, # IN
Expand Down
13 changes: 13 additions & 0 deletions sklearn/cluster/_k_means_lloyd.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import numpy as np
cimport numpy as np
from threadpoolctl import threadpool_limits
from cython cimport floating
from cython.parallel import prange, parallel
from libc.stdlib cimport malloc, calloc, free
Expand All @@ -25,6 +26,18 @@ from ._k_means_fast cimport _average_centers, _center_shift
np.import_array()


# Threadpoolctl wrappers to limit the number of threads in second level of
# nested parallelism (i.e. BLAS) to avoid oversubsciption.
def lloyd_iter_chunked_dense(*args, **kwargs):
with threadpool_limits(limits=1, user_api="blas"):
_lloyd_iter_chunked_dense(*args, **kwargs)


def lloyd_iter_chunked_sparse(*args, **kwargs):
with threadpool_limits(limits=1, user_api="blas"):
_lloyd_iter_chunked_sparse(*args, **kwargs)


def _lloyd_iter_chunked_dense(
np.ndarray[floating, ndim=2, mode='c'] X, # IN
floating[::1] sample_weight, # IN
Expand Down
65 changes: 32 additions & 33 deletions sklearn/cluster/_kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import numpy as np
import scipy.sparse as sp
from threadpoolctl import threadpool_limits

from ..base import BaseEstimator, ClusterMixin, TransformerMixin
from ..metrics.pairwise import euclidean_distances
Expand All @@ -32,12 +31,12 @@
from ._k_means_fast import _inertia_dense
from ._k_means_fast import _inertia_sparse
from ._k_means_fast import _mini_batch_update_csr
from ._k_means_lloyd import _lloyd_iter_chunked_dense
from ._k_means_lloyd import _lloyd_iter_chunked_sparse
from ._k_means_elkan import _init_bounds_dense
from ._k_means_elkan import _init_bounds_sparse
from ._k_means_elkan import _elkan_iter_chunked_dense
from ._k_means_elkan import _elkan_iter_chunked_sparse
from ._k_means_lloyd import lloyd_iter_chunked_dense
from ._k_means_lloyd import lloyd_iter_chunked_sparse
from ._k_means_elkan import init_bounds_dense
from ._k_means_elkan import init_bounds_sparse
from ._k_means_elkan import elkan_iter_chunked_dense
from ._k_means_elkan import elkan_iter_chunked_sparse


###############################################################################
Expand Down Expand Up @@ -420,12 +419,12 @@ def _kmeans_single_elkan(X, sample_weight, n_clusters, max_iter=300,
center_shift = np.zeros(n_clusters, dtype=X.dtype)

if sp.issparse(X):
init_bounds = _init_bounds_sparse
elkan_iter = _elkan_iter_chunked_sparse
init_bounds = init_bounds_sparse
elkan_iter = elkan_iter_chunked_sparse
_inertia = _inertia_sparse
else:
init_bounds = _init_bounds_dense
elkan_iter = _elkan_iter_chunked_dense
init_bounds = init_bounds_dense
elkan_iter = elkan_iter_chunked_dense
_inertia = _inertia_dense

init_bounds(X, centers, center_half_distances,
Expand Down Expand Up @@ -559,10 +558,10 @@ def _kmeans_single_lloyd(X, sample_weight, n_clusters, max_iter=300,
center_shift = np.zeros(n_clusters, dtype=X.dtype)

if sp.issparse(X):
lloyd_iter = _lloyd_iter_chunked_sparse
lloyd_iter = lloyd_iter_chunked_sparse
_inertia = _inertia_sparse
else:
lloyd_iter = _lloyd_iter_chunked_dense
lloyd_iter = lloyd_iter_chunked_dense
_inertia = _inertia_dense

for i in range(max_iter):
Expand Down Expand Up @@ -594,7 +593,8 @@ def _kmeans_single_lloyd(X, sample_weight, n_clusters, max_iter=300,
return labels, inertia, centers, i + 1


def _labels_inertia(X, sample_weight, x_squared_norms, centers, n_threads=1):
def _labels_inertia(X, sample_weight, x_squared_norms, centers,
n_threads=None):
"""E step of the K-means EM algorithm.

Compute the labels and the inertia of the given samples and centers.
Expand All @@ -615,7 +615,7 @@ def _labels_inertia(X, sample_weight, x_squared_norms, centers, n_threads=1):
centers : ndarray, shape (n_clusters, n_features)
The cluster centers.

n_threads : int, default=1
n_threads : int, default=None
The number of OpenMP threads to use for the computation. Parallelism is
sample-wise on the main cython loop which assigns each sample to its
closest center.
Expand All @@ -631,16 +631,18 @@ def _labels_inertia(X, sample_weight, x_squared_norms, centers, n_threads=1):
n_samples = X.shape[0]
n_clusters = centers.shape[0]

n_threads = _openmp_effective_n_threads(n_threads)

sample_weight = _check_normalize_sample_weight(sample_weight, X)
labels = np.full(n_samples, -1, dtype=np.int32)
weight_in_clusters = np.zeros(n_clusters, dtype=centers.dtype)
center_shift = np.zeros_like(weight_in_clusters)

if sp.issparse(X):
_labels = _lloyd_iter_chunked_sparse
_labels = lloyd_iter_chunked_sparse
_inertia = _inertia_sparse
else:
_labels = _lloyd_iter_chunked_dense
_labels = lloyd_iter_chunked_dense
_inertia = _inertia_dense

_labels(X, sample_weight, x_squared_norms, centers, centers,
Expand Down Expand Up @@ -1031,22 +1033,19 @@ def fit(self, X, y=None, sample_weight=None):
# seeds for the initializations of the kmeans runs.
seeds = random_state.randint(np.iinfo(np.int32).max, size=n_init)

# limit number of threads in second level of nested parallelism
# (i.e. BLAS) to avoid oversubsciption.
with threadpool_limits(limits=1, user_api="blas"):
for seed in seeds:
# run a k-means once
labels, inertia, centers, n_iter_ = kmeans_single(
X, sample_weight, self.n_clusters, max_iter=self.max_iter,
init=init, verbose=self.verbose, tol=tol,
x_squared_norms=x_squared_norms, random_state=seed,
n_threads=self._n_threads)
# determine if these results are the best so far
if best_inertia is None or inertia < best_inertia:
best_labels = labels.copy()
best_centers = centers.copy()
best_inertia = inertia
best_n_iter = n_iter_
for seed in seeds:
# run a k-means once
labels, inertia, centers, n_iter_ = kmeans_single(
X, sample_weight, self.n_clusters, max_iter=self.max_iter,
init=init, verbose=self.verbose, tol=tol,
x_squared_norms=x_squared_norms, random_state=seed,
n_threads=self._n_threads)
# determine if these results are the best so far
if best_inertia is None or inertia < best_inertia:
best_labels = labels.copy()
best_centers = centers.copy()
best_inertia = inertia
best_n_iter = n_iter_

if not sp.issparse(X):
if not self.copy_x:
Expand Down