Skip to content

MNT Use float64_t and intp_t directly in Cython for _pairwise_distances_reduction #24153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
7 changes: 3 additions & 4 deletions sklearn/feature_extraction/_hashing_fast.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ from libcpp.vector cimport vector

cimport numpy as cnp
import numpy as np
from ..utils._typedefs cimport INT32TYPE_t, INT64TYPE_t
from ..utils.murmurhash cimport murmurhash3_bytes_s32
from ..utils._vector_sentinel cimport vector_to_nd_array

Expand All @@ -24,11 +23,11 @@ def transform(raw_X, Py_ssize_t n_features, dtype,
For constructing a scipy.sparse.csr_matrix.

"""
cdef INT32TYPE_t h
cdef cnp.int32_t h
cdef double value

cdef vector[INT32TYPE_t] indices
cdef vector[INT64TYPE_t] indptr
cdef vector[cnp.int32_t] indices
cdef vector[cnp.int64_t] indptr
indptr.push_back(0)

# Since Python array does not understand Numpy dtypes, we grow the indices
Expand Down
15 changes: 7 additions & 8 deletions sklearn/metrics/_pairwise_distances_reduction/_argkmin.pxd.tp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ implementation_specific_values = [
}}

cimport numpy as cnp
from ...utils._typedefs cimport ITYPE_t, DTYPE_t

cnp.import_array()

Expand All @@ -28,22 +27,22 @@ cdef class ArgKmin{{name_suffix}}(BaseDistanceReducer{{name_suffix}}):
"""{{name_suffix}}bit implementation of BaseDistanceReducer{{name_suffix}} for the `ArgKmin` reduction."""

cdef:
ITYPE_t k
cnp.intp_t k

ITYPE_t[:, ::1] argkmin_indices
DTYPE_t[:, ::1] argkmin_distances
cnp.intp_t[:, ::1] argkmin_indices
cnp.float64_t[:, ::1] argkmin_distances

# Used as array of pointers to private datastructures used in threads.
DTYPE_t ** heaps_r_distances_chunks
ITYPE_t ** heaps_indices_chunks
cnp.float64_t ** heaps_r_distances_chunks
cnp.intp_t ** heaps_indices_chunks


cdef class EuclideanArgKmin{{name_suffix}}(ArgKmin{{name_suffix}}):
"""EuclideanDistance-specialized {{name_suffix}}bit implementation of ArgKmin{{name_suffix}}."""
cdef:
GEMMTermComputer{{name_suffix}} gemm_term_computer
const DTYPE_t[::1] X_norm_squared
const DTYPE_t[::1] Y_norm_squared
const cnp.float64_t[::1] X_norm_squared
const cnp.float64_t[::1] Y_norm_squared

bint use_squared_distances

Expand Down
148 changes: 73 additions & 75 deletions sklearn/metrics/_pairwise_distances_reduction/_argkmin.pyx.tp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ from cython.parallel cimport parallel, prange

from ...utils._heap cimport heap_push
from ...utils._sorting cimport simultaneous_sort
from ...utils._typedefs cimport ITYPE_t, DTYPE_t

import numpy as np
import warnings
Expand All @@ -32,7 +31,6 @@ from numbers import Integral
from scipy.sparse import issparse
from ...utils import check_array, check_scalar, _in_unstable_openblas_configuration
from ...utils.fixes import threadpool_limits
from ...utils._typedefs import ITYPE, DTYPE


cnp.import_array()
Expand Down Expand Up @@ -60,7 +58,7 @@ cdef class ArgKmin{{name_suffix}}(BaseDistanceReducer{{name_suffix}}):
cls,
X,
Y,
ITYPE_t k,
cnp.intp_t k,
str metric="euclidean",
chunk_size=None,
dict metric_kwargs=None,
Expand Down Expand Up @@ -122,7 +120,7 @@ cdef class ArgKmin{{name_suffix}}(BaseDistanceReducer{{name_suffix}}):
DatasetsPair{{name_suffix}} datasets_pair,
chunk_size=None,
strategy=None,
ITYPE_t k=1,
cnp.intp_t k=1,
):
super().__init__(
datasets_pair=datasets_pair,
Expand All @@ -140,16 +138,16 @@ cdef class ArgKmin{{name_suffix}}(BaseDistanceReducer{{name_suffix}}):
# - when parallelizing on Y, the pointers of those heaps are referencing
# small heaps which are thread-wise-allocated and whose content will be
# merged with the main heaps'.
self.heaps_r_distances_chunks = <DTYPE_t **> malloc(
sizeof(DTYPE_t *) * self.chunks_n_threads
self.heaps_r_distances_chunks = <cnp.float64_t **> malloc(
sizeof(cnp.float64_t *) * self.chunks_n_threads
)
self.heaps_indices_chunks = <ITYPE_t **> malloc(
sizeof(ITYPE_t *) * self.chunks_n_threads
self.heaps_indices_chunks = <cnp.intp_t **> malloc(
sizeof(cnp.intp_t *) * self.chunks_n_threads
)

# Main heaps which will be returned as results by `ArgKmin{{name_suffix}}.compute`.
self.argkmin_indices = np.full((self.n_samples_X, self.k), 0, dtype=ITYPE)
self.argkmin_distances = np.full((self.n_samples_X, self.k), DBL_MAX, dtype=DTYPE)
self.argkmin_indices = np.full((self.n_samples_X, self.k), 0, dtype=np.intp)
self.argkmin_distances = np.full((self.n_samples_X, self.k), DBL_MAX, dtype=np.float64)

def __dealloc__(self):
if self.heaps_indices_chunks is not NULL:
Expand All @@ -160,18 +158,18 @@ cdef class ArgKmin{{name_suffix}}(BaseDistanceReducer{{name_suffix}}):

cdef void _compute_and_reduce_distances_on_chunks(
self,
ITYPE_t X_start,
ITYPE_t X_end,
ITYPE_t Y_start,
ITYPE_t Y_end,
ITYPE_t thread_num,
cnp.intp_t X_start,
cnp.intp_t X_end,
cnp.intp_t Y_start,
cnp.intp_t Y_end,
cnp.intp_t thread_num,
) nogil:
cdef:
ITYPE_t i, j
ITYPE_t n_samples_X = X_end - X_start
ITYPE_t n_samples_Y = Y_end - Y_start
DTYPE_t *heaps_r_distances = self.heaps_r_distances_chunks[thread_num]
ITYPE_t *heaps_indices = self.heaps_indices_chunks[thread_num]
cnp.intp_t i, j
cnp.intp_t n_samples_X = X_end - X_start
cnp.intp_t n_samples_Y = Y_end - Y_start
cnp.float64_t *heaps_r_distances = self.heaps_r_distances_chunks[thread_num]
cnp.intp_t *heaps_indices = self.heaps_indices_chunks[thread_num]

# Pushing the distances and their associated indices on a heap
# which by construction will keep track of the argkmin.
Expand All @@ -187,9 +185,9 @@ cdef class ArgKmin{{name_suffix}}(BaseDistanceReducer{{name_suffix}}):

cdef void _parallel_on_X_init_chunk(
self,
ITYPE_t thread_num,
ITYPE_t X_start,
ITYPE_t X_end,
cnp.intp_t thread_num,
cnp.intp_t X_start,
cnp.intp_t X_end,
) nogil:
# As this strategy is embarrassingly parallel, we can set each
# thread's heaps pointer to the proper position on the main heaps.
Expand All @@ -199,12 +197,12 @@ cdef class ArgKmin{{name_suffix}}(BaseDistanceReducer{{name_suffix}}):
@final
cdef void _parallel_on_X_prange_iter_finalize(
self,
ITYPE_t thread_num,
ITYPE_t X_start,
ITYPE_t X_end,
cnp.intp_t thread_num,
cnp.intp_t X_start,
cnp.intp_t X_end,
) nogil:
cdef:
ITYPE_t idx, jdx
cnp.intp_t idx, jdx

# Sorting the main heaps portion associated to `X[X_start:X_end]`
# in ascending order w.r.t the distances.
Expand All @@ -220,8 +218,8 @@ cdef class ArgKmin{{name_suffix}}(BaseDistanceReducer{{name_suffix}}):
) nogil:
cdef:
# Maximum number of scalar elements (the last chunks can be smaller)
ITYPE_t heaps_size = self.X_n_samples_chunk * self.k
ITYPE_t thread_num
cnp.intp_t heaps_size = self.X_n_samples_chunk * self.k
cnp.intp_t thread_num

# The allocation is done in parallel for data locality purposes: this way
# the heaps used in each threads are allocated in pages which are closer
Expand All @@ -233,18 +231,18 @@ cdef class ArgKmin{{name_suffix}}(BaseDistanceReducer{{name_suffix}}):
# As chunks of X are shared across threads, so must their
# heaps. To solve this, each thread has its own heaps
# which are then synchronised back in the main ones.
self.heaps_r_distances_chunks[thread_num] = <DTYPE_t *> malloc(
heaps_size * sizeof(DTYPE_t)
self.heaps_r_distances_chunks[thread_num] = <cnp.float64_t *> malloc(
heaps_size * sizeof(cnp.float64_t)
)
self.heaps_indices_chunks[thread_num] = <ITYPE_t *> malloc(
heaps_size * sizeof(ITYPE_t)
self.heaps_indices_chunks[thread_num] = <cnp.intp_t *> malloc(
heaps_size * sizeof(cnp.intp_t)
)

cdef void _parallel_on_Y_parallel_init(
self,
ITYPE_t thread_num,
ITYPE_t X_start,
ITYPE_t X_end,
cnp.intp_t thread_num,
cnp.intp_t X_start,
cnp.intp_t X_end,
) nogil:
# Initialising heaps (memset can't be used here)
for idx in range(self.X_n_samples_chunk * self.k):
Expand All @@ -254,11 +252,11 @@ cdef class ArgKmin{{name_suffix}}(BaseDistanceReducer{{name_suffix}}):
@final
cdef void _parallel_on_Y_synchronize(
self,
ITYPE_t X_start,
ITYPE_t X_end,
cnp.intp_t X_start,
cnp.intp_t X_end,
) nogil:
cdef:
ITYPE_t idx, jdx, thread_num
cnp.intp_t idx, jdx, thread_num
with nogil, parallel(num_threads=self.effective_n_threads):
# Synchronising the thread heaps with the main heaps.
# This is done in parallel sample-wise (no need for locks).
Expand All @@ -282,7 +280,7 @@ cdef class ArgKmin{{name_suffix}}(BaseDistanceReducer{{name_suffix}}):
self,
) nogil:
cdef:
ITYPE_t idx, thread_num
cnp.intp_t idx, thread_num

with nogil, parallel(num_threads=self.chunks_n_threads):
# Deallocating temporary datastructures
Expand All @@ -302,9 +300,9 @@ cdef class ArgKmin{{name_suffix}}(BaseDistanceReducer{{name_suffix}}):

cdef void compute_exact_distances(self) nogil:
cdef:
ITYPE_t i, j
ITYPE_t[:, ::1] Y_indices = self.argkmin_indices
DTYPE_t[:, ::1] distances = self.argkmin_distances
cnp.intp_t i, j
cnp.intp_t[:, ::1] Y_indices = self.argkmin_indices
cnp.float64_t[:, ::1] distances = self.argkmin_distances
for i in prange(self.n_samples_X, schedule='static', nogil=True,
num_threads=self.effective_n_threads):
for j in range(self.k):
Expand Down Expand Up @@ -339,7 +337,7 @@ cdef class EuclideanArgKmin{{name_suffix}}(ArgKmin{{name_suffix}}):
self,
X,
Y,
ITYPE_t k,
cnp.intp_t k,
bint use_squared_distances=False,
chunk_size=None,
strategy=None,
Expand Down Expand Up @@ -370,7 +368,7 @@ cdef class EuclideanArgKmin{{name_suffix}}(ArgKmin{{name_suffix}}):
DenseDenseDatasetsPair{{name_suffix}} datasets_pair = (
<DenseDenseDatasetsPair{{name_suffix}}> self.datasets_pair
)
ITYPE_t dist_middle_terms_chunks_size = self.Y_n_samples_chunk * self.X_n_samples_chunk
cnp.intp_t dist_middle_terms_chunks_size = self.Y_n_samples_chunk * self.X_n_samples_chunk

self.gemm_term_computer = GEMMTermComputer{{name_suffix}}(
datasets_pair.X,
Expand Down Expand Up @@ -407,7 +405,7 @@ cdef class EuclideanArgKmin{{name_suffix}}(ArgKmin{{name_suffix}}):
@final
cdef void _parallel_on_X_parallel_init(
self,
ITYPE_t thread_num,
cnp.intp_t thread_num,
) nogil:
ArgKmin{{name_suffix}}._parallel_on_X_parallel_init(self, thread_num)
self.gemm_term_computer._parallel_on_X_parallel_init(thread_num)
Expand All @@ -416,9 +414,9 @@ cdef class EuclideanArgKmin{{name_suffix}}(ArgKmin{{name_suffix}}):
@final
cdef void _parallel_on_X_init_chunk(
self,
ITYPE_t thread_num,
ITYPE_t X_start,
ITYPE_t X_end,
cnp.intp_t thread_num,
cnp.intp_t X_start,
cnp.intp_t X_end,
) nogil:
ArgKmin{{name_suffix}}._parallel_on_X_init_chunk(self, thread_num, X_start, X_end)
self.gemm_term_computer._parallel_on_X_init_chunk(thread_num, X_start, X_end)
Expand All @@ -427,11 +425,11 @@ cdef class EuclideanArgKmin{{name_suffix}}(ArgKmin{{name_suffix}}):
@final
cdef void _parallel_on_X_pre_compute_and_reduce_distances_on_chunks(
self,
ITYPE_t X_start,
ITYPE_t X_end,
ITYPE_t Y_start,
ITYPE_t Y_end,
ITYPE_t thread_num,
cnp.intp_t X_start,
cnp.intp_t X_end,
cnp.intp_t Y_start,
cnp.intp_t Y_end,
cnp.intp_t thread_num,
) nogil:
ArgKmin{{name_suffix}}._parallel_on_X_pre_compute_and_reduce_distances_on_chunks(
self,
Expand All @@ -448,17 +446,17 @@ cdef class EuclideanArgKmin{{name_suffix}}(ArgKmin{{name_suffix}}):
cdef void _parallel_on_Y_init(
self,
) nogil:
cdef ITYPE_t thread_num
cdef cnp.intp_t thread_num
ArgKmin{{name_suffix}}._parallel_on_Y_init(self)
self.gemm_term_computer._parallel_on_Y_init()


@final
cdef void _parallel_on_Y_parallel_init(
self,
ITYPE_t thread_num,
ITYPE_t X_start,
ITYPE_t X_end,
cnp.intp_t thread_num,
cnp.intp_t X_start,
cnp.intp_t X_end,
) nogil:
ArgKmin{{name_suffix}}._parallel_on_Y_parallel_init(self, thread_num, X_start, X_end)
self.gemm_term_computer._parallel_on_Y_parallel_init(thread_num, X_start, X_end)
Expand All @@ -467,11 +465,11 @@ cdef class EuclideanArgKmin{{name_suffix}}(ArgKmin{{name_suffix}}):
@final
cdef void _parallel_on_Y_pre_compute_and_reduce_distances_on_chunks(
self,
ITYPE_t X_start,
ITYPE_t X_end,
ITYPE_t Y_start,
ITYPE_t Y_end,
ITYPE_t thread_num,
cnp.intp_t X_start,
cnp.intp_t X_end,
cnp.intp_t Y_start,
cnp.intp_t Y_end,
cnp.intp_t thread_num,
) nogil:
ArgKmin{{name_suffix}}._parallel_on_Y_pre_compute_and_reduce_distances_on_chunks(
self,
Expand All @@ -487,22 +485,22 @@ cdef class EuclideanArgKmin{{name_suffix}}(ArgKmin{{name_suffix}}):
@final
cdef void _compute_and_reduce_distances_on_chunks(
self,
ITYPE_t X_start,
ITYPE_t X_end,
ITYPE_t Y_start,
ITYPE_t Y_end,
ITYPE_t thread_num,
cnp.intp_t X_start,
cnp.intp_t X_end,
cnp.intp_t Y_start,
cnp.intp_t Y_end,
cnp.intp_t thread_num,
) nogil:
cdef:
ITYPE_t i, j
DTYPE_t squared_dist_i_j
ITYPE_t n_X = X_end - X_start
ITYPE_t n_Y = Y_end - Y_start
DTYPE_t * dist_middle_terms = self.gemm_term_computer._compute_distances_on_chunks(
cnp.intp_t i, j
cnp.float64_t squared_dist_i_j
cnp.intp_t n_X = X_end - X_start
cnp.intp_t n_Y = Y_end - Y_start
cnp.float64_t * dist_middle_terms = self.gemm_term_computer._compute_distances_on_chunks(
X_start, X_end, Y_start, Y_end, thread_num
)
DTYPE_t * heaps_r_distances = self.heaps_r_distances_chunks[thread_num]
ITYPE_t * heaps_indices = self.heaps_indices_chunks[thread_num]
cnp.float64_t * heaps_r_distances = self.heaps_r_distances_chunks[thread_num]
cnp.intp_t * heaps_indices = self.heaps_indices_chunks[thread_num]


# Pushing the distance and their associated indices on heaps
Expand Down
Loading