diff --git a/deslib/base.py b/deslib/base.py index 2b58b16e..d2ad35cf 100644 --- a/deslib/base.py +++ b/deslib/base.py @@ -244,7 +244,7 @@ class labels of each example in X. # validate the value of k self._validate_k() self._set_region_of_competence_algorithm() - self._fit_region_competence(X_dsel, y_dsel) + self.roc_algorithm_.fit(X_dsel, y_dsel) # validate the IH if self.with_IH: @@ -302,24 +302,10 @@ def _encode_base_labels(self, y): else: return self.enc_.transform(y) - def _fit_region_competence(self, X, y): - """Fit the k-NN classifier inside the dynamic selection method. - - Parameters - ---------- - X : array of shape (n_samples, n_features) - The Input data. - - y : array of shape (n_samples) - class labels of each sample in X. - - """ - self.roc_algorithm_.fit(X, y) - def _set_dsel(self, X, y): """Pre-Process the input X and y data into the dynamic selection dataset(DSEL) and get information about the structure of the data - (e.g., n_classes, N_samples, classes) + (e.g., n_classes, n_samples, classes) Parameters ---------- @@ -334,7 +320,8 @@ class labels of each sample in X. self.n_classes_ = self.classes_.size self.n_features_ = X.shape[1] self.n_samples_ = self.DSEL_target_.size - self.DSEL_processed_, self.BKS_DSEL_ = self._preprocess_dsel() + self.BKS_DSEL_ = self._predict_base(self.DSEL_data_) + self.DSEL_processed_ = self.BKS_DSEL_ == y[:, np.newaxis] def _set_region_of_competence_algorithm(self): @@ -411,34 +398,12 @@ def predict(self, X): # Check if the DS model was trained check_is_fitted(self, ["DSEL_processed_", "DSEL_data_", "DSEL_target_"]) - - # Check if X is a valid input X = check_array(X) - self._check_num_features(X) - - n_samples = X.shape[0] - predicted_labels = np.empty(n_samples, dtype=np.intp) - - if self.needs_proba: - base_probabilities = self._predict_proba_base(X) - base_predictions = base_probabilities.argmax(axis=2) - else: - base_probabilities = None - base_predictions = self._predict_base(X) + predicted_labels = np.empty(X.shape[0], dtype=np.intp) - all_agree_vector = BaseDS._all_classifier_agree(base_predictions) - ind_all_agree = np.where(all_agree_vector)[0] - - # Since the predictions are always the same, get the predictions of the - # first base classifier. - if ind_all_agree.size: - predicted_labels[ind_all_agree] = base_predictions[ - ind_all_agree, 0] - - # For the samples with disagreement, perform the dynamic selection - # steps. First step is to collect the samples with disagreement - # between base classifiers - ind_disagreement = np.where(~all_agree_vector)[0] + base_predictions, base_probabilities = self._preprocess_predictions(X) + ind_disagreement = self._prediction_by_agreement(base_predictions, + predicted_labels) if ind_disagreement.size: X_DS = X[ind_disagreement, :] @@ -452,87 +417,18 @@ def predict(self, X): # we need to call DS routines distances, neighbors = self._get_region_competence(X_DS) - if self.with_IH: - # if IH is used, calculate the hardness level associated with - # each sample - hardness = hardness_region_competence(neighbors, - self.DSEL_target_, - self.safe_k) - - # Get the index associated with the easy and hard samples. - # Samples with low hardness are passed down to the knn - # classifier while samples with high hardness are passed down - # to the DS methods. So, here we split the samples that are - # passed to down to each stage by calculating their indices_. - easy_samples_mask = hardness < self.IH_rate - ind_knn_classifier = np.where(easy_samples_mask)[0] - ind_ds_classifier = np.where(~easy_samples_mask)[0] - - if ind_knn_classifier.size: - # all samples with low hardness should be classified by - # the knn method here: - # First get the class associated with each neighbor - y_neighbors = self.DSEL_target_[ - neighbors[ind_knn_classifier, :self.safe_k]] - - # Accessing which samples in the original matrix are - # associated with the low instance hardness indices_. This - # is important since the low hardness indices - # ind_knn_classifier was estimated based on a subset - # of samples - ind_knn_original_matrix = ind_disagreement[ - ind_knn_classifier] - prediction_knn, _ = mode(y_neighbors, axis=1) - predicted_labels[ - ind_knn_original_matrix] = prediction_knn.reshape(-1, ) - - # Remove from the neighbors and distance matrices the - # samples that were classified using the KNN - neighbors = np.delete(neighbors, ind_knn_classifier, - axis=0) - distances = np.delete(distances, ind_knn_classifier, - axis=0) - else: - # IH was not considered. So all samples with disagreement are - # passed down to the DS algorithm - ind_ds_classifier = np.arange(ind_disagreement.size) - - # At this stage the samples which all base classifiers agrees or - # that are associated with low hardness were already classified. - # The remaining samples are now passed down to the DS techniques - # for classification. + distances, ind_ds_classifier, neighbors = self._IH_prediction( + X_DS, distances, ind_disagreement, + neighbors, predicted_labels, False + ) # First check whether there are still samples to be classified. if ind_ds_classifier.size: - # IF the DFP pruning is considered, calculate the DFP mask - # for all samples in X - if self.DFP: - DFP_mask = frienemy_pruning_preprocessed( - neighbors, self.DSEL_target_, self.DSEL_processed_ - ) - else: - DFP_mask = np.ones( - (ind_ds_classifier.size, self.n_classifiers_)) - - # Get the real indices_ of the samples that will be classified - # using a DS algorithm. - ind_ds_original_matrix = ind_disagreement[ind_ds_classifier] - - if self.needs_proba: - selected_probabilities = base_probabilities[ - ind_ds_original_matrix] - else: - selected_probabilities = None - - pred_ds = self.classify_with_ds(X_DS[ind_ds_classifier], - base_predictions[ - ind_ds_original_matrix], - selected_probabilities, - neighbors=neighbors, - distances=distances, - DFP_mask=DFP_mask) - predicted_labels[ind_ds_original_matrix] = pred_ds + self._predict_DS(X_DS, base_predictions, base_probabilities, + distances, ind_disagreement, + ind_ds_classifier, neighbors, + predicted_labels) return self.classes_.take(predicted_labels) @@ -549,23 +445,19 @@ def predict_proba(self, X): predicted_proba : array of shape (n_samples, n_classes) Probabilities estimates for each sample in X. """ - # Check if the DS model was trained check_is_fitted(self, ["DSEL_processed_", "DSEL_data_", "DSEL_target_"]) - # Check if X is a valid input X = check_array(X, ensure_2d=False) - # Check if the base classifiers are able to estimate posterior - # probabilities (implements predict_proba method). self._check_predict_proba() - base_probabilities = self._predict_proba_base(X) base_predictions = base_probabilities.argmax(axis=2) - n_samples = X.shape[0] - predicted_proba = np.zeros((n_samples, self.n_classes_)) - + predicted_proba = np.zeros((X.shape[0], self.n_classes_)) + ind_disagreement = self._prediction_by_agreement(base_predictions, + predicted_proba, + base_probabilities) all_agree_vector = BaseDS._all_classifier_agree(base_predictions) ind_all_agree = np.where(all_agree_vector)[0] @@ -577,65 +469,18 @@ def predict_proba(self, X): if ind_disagreement.size: X_DS = X[ind_disagreement, :] - - # Always calculating the neighborhood. Passing that to classify - # later - # TODO: Check problems with DES Clustering method. Maybe add a - # check to prevent that here. (or do clustering instead) - # Then, we estimate the nearest neighbors for all samples that we - # need to call DS routines distances, neighbors = self._get_region_competence(X_DS) - - if self.with_IH: - # if IH is used, calculate the hardness level associated with - # each sample - hardness = hardness_region_competence(neighbors, - self.DSEL_target_, - self.safe_k) - - # Get the index associated with the easy and hard samples. - # Samples with low hardness are passed down to the knn - # classifier while samples with high hardness are passed down - # to the DS methods. So, here we split the samples that are - # passed to down to each stage by calculating their indices_. - easy_samples_mask = hardness < self.IH_rate - ind_knn_classifier = np.where(easy_samples_mask)[0] - ind_ds_classifier = np.where(~easy_samples_mask)[0] - - if ind_knn_classifier.size: - # all samples with low hardness should be classified by - # the knn method here: - # First get the class associated with each neighbor - - # Accessing which samples in the original matrix are - # associated with the low instance hardness indices_. - ind_knn_original_matrix = ind_disagreement[ - ind_knn_classifier] - - predicted_proba[ind_knn_original_matrix] = \ - self.roc_algorithm_.predict_proba( - X_DS[ind_knn_classifier]) - - # Remove from the neighbors and distance matrices the - # samples that were classified using the KNN - neighbors = np.delete(neighbors, ind_knn_classifier, - axis=0) - distances = np.delete(distances, ind_knn_classifier, - axis=0) - else: - # IH was not considered. So all samples with disagreement are - # passed down to the DS algorithm - ind_ds_classifier = np.arange(ind_disagreement.size) - + distances, ind_ds_classifier, neighbors = self._IH_prediction( + X_DS, + distances, + ind_disagreement, + neighbors, + predicted_proba, + True + ) if ind_ds_classifier.size: # Check if the dynamic frienemy pruning should be used - if self.DFP: - DFP_mask = frienemy_pruning_preprocessed( - neighbors, self.DSEL_target_, self.DSEL_processed_ - ) - else: - DFP_mask = np.ones( - (ind_ds_classifier.size, self.n_classifiers_)) + DFP_mask = self._apply_dfp(ind_ds_classifier, neighbors) ind_ds_original_matrix = ind_disagreement[ind_ds_classifier] @@ -653,6 +498,136 @@ def predict_proba(self, X): return predicted_proba + def _preprocess_predictions(self, X, req_proba=False): + if self.needs_proba or req_proba: + base_probabilities = self._predict_proba_base(X) + base_predictions = base_probabilities.argmax(axis=2) + else: + base_probabilities = None + base_predictions = self._predict_base(X) + return base_predictions, base_probabilities + + def _prediction_by_agreement(self, base_predictions, predictions, + base_probabilities=None): + all_agree_vector = BaseDS._all_classifier_agree(base_predictions) + ind_all_agree = np.where(all_agree_vector)[0] + # Since the predictions are always the same, get the predictions of the + # first base classifier. + if ind_all_agree.size: + if base_probabilities is not None: + predictions[ind_all_agree] = base_probabilities[ + ind_all_agree].mean(axis=1) + else: + predictions[ind_all_agree] = base_predictions[ + ind_all_agree, 0] + # return samples with disagreement + ind_disagreement = np.where(~all_agree_vector)[0] + return ind_disagreement + + def _IH_prediction(self, X_DS, distances, ind_disagreement, neighbors, + predicted_proba, is_proba=False): + + # TODO: make this if outside? + if self.with_IH: + ind_hard, ind_easy = self._split_easy_samples(neighbors) + distances, neighbors = self._predict_easy_samples(X_DS, distances, + ind_disagreement, + ind_easy, + neighbors, + predicted_proba, + is_proba) + else: + # IH was not considered. So all samples with disagreement are + # passed down to the DS algorithm + ind_hard = np.arange(ind_disagreement.size) + return distances, ind_hard, neighbors + + def _predict_easy_samples(self, X_DS, distances, ind_disagreement, + ind_easy, neighbors, predictions, is_proba): + # TODO: Make this if outside? + if ind_easy.size: + # all samples with low hardness should be classified by + # the knn method here: + # First get the class associated with each neighbor + + # Accessing which samples in the original matrix are + # associated with the low instance hardness indices_. + ind_knn_original_matrix = ind_disagreement[ind_easy] + + if is_proba: + predictions[ind_knn_original_matrix] = \ + self.roc_algorithm_.predict_proba( + X_DS[ind_easy]) + else: + y_neighbors = self.DSEL_target_[neighbors[ind_easy, + :self.safe_k]] + predictions_knn, _ = mode(y_neighbors, axis=1) + predictions[ind_knn_original_matrix] = predictions_knn.reshape( + -1, ) + + # Remove from the neighbors and distance matrices the + # samples that were classified using the KNN + neighbors = np.delete(neighbors, ind_easy, + axis=0) + distances = np.delete(distances, ind_easy, + axis=0) + return distances, neighbors + + def _split_easy_samples(self, neighbors): + # if IH is used, calculate the hardness level associated with + # each sample + hardness = hardness_region_competence(neighbors, + self.DSEL_target_, + self.safe_k) + # Get the index associated with the easy and hard samples. + # Samples with low hardness are passed down to the knn + # classifier while samples with high hardness are passed down + # to the DS method. So, here we split the samples that are + # passed to down to each stage by calculating their indices. + easy_samples_mask = hardness < self.IH_rate + ind_knn_classifier = np.where(easy_samples_mask)[0] + ind_ds_classifier = np.where(~easy_samples_mask)[0] + return ind_ds_classifier, ind_knn_classifier + + def _predict_DS(self, X_DS, base_predictions, base_probabilities, + distances, ind_disagreement, ind_ds_classifier, neighbors, + predicted, is_proba=False): + + # IF the DFP pruning is considered, calculate the DFP mask + # for all samples in X + DFP_mask = self._apply_dfp(ind_ds_classifier, neighbors) + # Get the real indices_ of the samples that will be classified + # using a DS algorithm. + ind_ds_original_matrix = ind_disagreement[ind_ds_classifier] + if self.needs_proba or is_proba: + selected_probabilities = base_probabilities[ + ind_ds_original_matrix] + else: + selected_probabilities = None + + args = [X_DS[ind_ds_classifier], + base_predictions[ind_ds_original_matrix], + selected_probabilities, + neighbors, + distances, + DFP_mask] + if is_proba: + preds = self.predict_proba_with_ds(*args) + else: + preds = self.classify_with_ds(*args) + + predicted[ind_ds_original_matrix] = preds + + def _apply_dfp(self, ind_ds_classifier, neighbors): + if self.DFP: + DFP_mask = frienemy_pruning_preprocessed(neighbors, + self.DSEL_target_, + self.DSEL_processed_) + else: + DFP_mask = np.ones( + (ind_ds_classifier.size, self.n_classifiers_)) + return DFP_mask + def _preprocess_dsel(self): """Compute the prediction of each base classifier for all samples in DSEL. Used to speed-up the test phase, by @@ -719,26 +694,6 @@ def _predict_proba_base(self, X): probabilities[:, index] = clf.predict_proba(X) return probabilities - def _preprocess_dsel_scores(self): - """Compute the output profiles of the dynamic selection dataset (DSEL) - Each position of the output profiles vector is the score obtained by a - base classifier :math:`c_{i}` - for the classes of the input sample. - - Returns - ------- - scores : array of shape (n_samples, n_classifiers, n_classes) - Scores (probabilities) for each class obtained by each base - classifier in the generated_pool - for each sample in X. - """ - scores = np.empty( - (self.n_samples_, self.n_classifiers_, self.n_classes_)) - for index, clf in enumerate(self.pool_classifiers_): - scores[:, index, :] = clf.predict_proba(self.DSEL_data_) - - return scores - @staticmethod def _all_classifier_agree(predictions): """Check whether there is a difference in opinion among the classifiers @@ -782,11 +737,9 @@ def _validate_parameters(self): "parameter safe_k must be equal or less than parameter k." "input safe_k is {} and k is {}".format(self.k, self.safe_k)) - if not isinstance(self.IH_rate, float): raise TypeError( "parameter IH_rate should be a float between [0.0, 0.5]") - if self.IH_rate < 0 or self.IH_rate > 0.5: raise ValueError("Parameter IH_rate should be between [0.0, 0.5]." "IH_rate = {}".format(self.IH_rate)) @@ -806,29 +759,6 @@ def _validate_pool(self): raise ValueError("n_classifiers must be greater than zero, " "got {}.".format(self.n_classifiers_)) - def _check_num_features(self, X): - """ Verify if the number of features (n_features) of X is equals to - the number of features used to fit the model. Raises an error if - n_features is different. - - Parameters - ---------- - X : array of shape (classes, n_features) - The input data. - - Raises - ------- - ValueError - If X has a different dimensionality than the training data. - """ - n_features = X.shape[1] - if self.n_features_ != n_features: - raise ValueError("Number of features of the model must " - "match the input. Model n_features_ is {} and " - "input n_features_ is {} ".format( - self.n_features_, - n_features)) - def _check_predict_proba(self): """ Checks if each base classifier in the pool implements the predict_proba method. diff --git a/deslib/dcs/a_posteriori.py b/deslib/dcs/a_posteriori.py index 76f3c2c5..cc6f87ef 100644 --- a/deslib/dcs/a_posteriori.py +++ b/deslib/dcs/a_posteriori.py @@ -148,7 +148,7 @@ class labels of each example in X. super(APosteriori, self).fit(X, y) self._check_predict_proba() - self.dsel_scores_ = self._preprocess_dsel_scores() + self.dsel_scores_ = self._predict_proba_base(self.DSEL_data_) return self def estimate_competence(self, query, neighbors, distances, diff --git a/deslib/dcs/a_priori.py b/deslib/dcs/a_priori.py index 6289d868..a72c5723 100644 --- a/deslib/dcs/a_priori.py +++ b/deslib/dcs/a_priori.py @@ -141,7 +141,7 @@ class labels of each example in X. super(APriori, self).fit(X, y) self._check_predict_proba() - self.dsel_scores_ = self._preprocess_dsel_scores() + self.dsel_scores_ = self._predict_proba_base(self.DSEL_data_) return self def estimate_competence(self, query, neighbors, distances, diff --git a/deslib/des/des_clustering.py b/deslib/des/des_clustering.py index 8c30f628..42d14264 100644 --- a/deslib/des/des_clustering.py +++ b/deslib/des/des_clustering.py @@ -176,6 +176,11 @@ class labels of each example in X. self._preprocess_clusters() return self + def _get_region_competence(self, query, k=None): + distances = self.clustering_.transform(query) + region = self.clustering_.predict(query) + return distances, region + def _preprocess_clusters(self): """Preprocess the competence as well as the average diversity of each base classifier for each specific cluster. diff --git a/deslib/des/knop.py b/deslib/des/knop.py index 1c2c2abc..162a5d46 100644 --- a/deslib/des/knop.py +++ b/deslib/des/knop.py @@ -141,7 +141,7 @@ class labels of each example in X. raise ValueError( "Error. KNOP does not accept one class datasets!") self._check_predict_proba() - self.dsel_scores_ = self._preprocess_dsel_scores() + self.dsel_scores_ = self._predict_proba_base(self.DSEL_data_) # Reshape DSEL_scores as a 2-D array for nearest neighbor calculations dsel_output_profiles = self.dsel_scores_.reshape(self.n_samples_, self.n_classifiers_ * diff --git a/deslib/des/meta_des.py b/deslib/des/meta_des.py index f0cc481e..6b3a0ec9 100644 --- a/deslib/des/meta_des.py +++ b/deslib/des/meta_des.py @@ -194,7 +194,7 @@ class labels of each example in X. # Check if the base classifier is able to estimate probabilities self._check_predict_proba() - self.dsel_scores_ = self._preprocess_dsel_scores() + self.dsel_scores_ = self._predict_proba_base(self.DSEL_data_) # Reshape DSEL_scores as a 2-D array for nearest neighbor calculations dsel_output_profiles = self.dsel_scores_.reshape(self.n_samples_, diff --git a/deslib/des/probabilistic/base.py b/deslib/des/probabilistic/base.py index 260fb6f3..726a323b 100644 --- a/deslib/des/probabilistic/base.py +++ b/deslib/des/probabilistic/base.py @@ -72,7 +72,7 @@ class labels of each example in X. self._check_predict_proba() - self.dsel_scores_ = self._preprocess_dsel_scores() + self.dsel_scores_ = self._predict_proba_base(self.DSEL_data_) # Pre process the source of competence for the entire DSEL, # making the method faster during generalization. diff --git a/deslib/tests/conftest.py b/deslib/tests/conftest.py index a9edd947..56c449e4 100644 --- a/deslib/tests/conftest.py +++ b/deslib/tests/conftest.py @@ -135,18 +135,19 @@ def create_base_classifier(return_value, return_prob=None): @pytest.fixture def create_pool_classifiers(): clf_0 = create_base_classifier(return_value=np.zeros(1), - return_prob=np.atleast_2d([0.5, 0.5])) + return_prob=np.array([[0.5, 0.5]])) clf_1 = create_base_classifier(return_value=np.ones(1), - return_prob=np.atleast_2d([1.0, 0.0])) + return_prob=np.array([[1.0, 0.0]])) clf_2 = create_base_classifier(return_value=np.zeros(1), - return_prob=np.atleast_2d([0.33, 0.67])) + return_prob=np.array([[0.33, 0.67]])) pool_classifiers = [clf_0, clf_1, clf_2] return pool_classifiers @pytest.fixture def create_pool_all_agree(): - return [create_base_classifier(return_value=np.zeros(1))] * 100 + return [create_base_classifier(return_value=np.zeros(1), + return_prob=np.array([[0.61, 0.39]]))] * 100 @pytest.fixture diff --git a/deslib/tests/test_base.py b/deslib/tests/test_base.py index 2a76abbd..749b2573 100644 --- a/deslib/tests/test_base.py +++ b/deslib/tests/test_base.py @@ -168,7 +168,7 @@ def test_preprocess_dsel_scores(create_X_y, create_pool_classifiers): X, y = create_X_y ds_test = BaseDS(create_pool_classifiers) ds_test.fit(X, y) - dsel_scores = ds_test._preprocess_dsel_scores() + dsel_scores = ds_test._predict_proba_base(X) expected = np.array([[0.5, 0.5], [1.0, 0.0], [0.33, 0.67]]) expected = np.tile(expected, (15, 1, 1)) assert np.array_equal(dsel_scores, expected) @@ -212,15 +212,14 @@ def test_input_IH_rate(IH_rate): def test_predict_proba_all_agree(example_estimate_competence, - create_pool_classifiers): + create_pool_all_agree): X, y, _, _, _, dsel_scores = example_estimate_competence query = np.atleast_2d([1, 1]) - ds_test = BaseDS(create_pool_classifiers) + ds_test = BaseDS(create_pool_all_agree) ds_test.fit(X, y) ds_test.DSEL_scores = dsel_scores backup_all_agree = BaseDS._all_classifier_agree - BaseDS._all_classifier_agree = MagicMock(return_value=np.array([True])) proba = ds_test.predict_proba(query) BaseDS._all_classifier_agree = backup_all_agree diff --git a/deslib/util/__init__.py b/deslib/util/__init__.py index 43f0e319..dd73f4bf 100644 --- a/deslib/util/__init__.py +++ b/deslib/util/__init__.py @@ -23,12 +23,22 @@ deslib.util.knne - Implementation of the K-Nearest Neighbors Equality technique + +deslib.util.aggregation.dfp - General Dynamic Frienemy Pruning (DFP) +implementation. This implementation allows using the DFP method to any ensemble +model, not only dynamic ones. + +deslib.util.bpso - V and S shaped Binary Particle Swarm Optimization for + used feature selection. """ from .aggregation import * +from .bpso import BPSO +from .datasets import * +from .dfp import frienemy_pruning +from .dfp import frienemy_pruning_preprocessed from .diversity import * +from .faiss_knn_wrapper import FaissKNNClassifier from .instance_hardness import * -from .prob_functions import * -from .datasets import * from .knne import KNNE -from .faiss_knn_wrapper import FaissKNNClassifier +from .prob_functions import * diff --git a/deslib/util/bpso.py b/deslib/util/bpso.py new file mode 100644 index 00000000..49ab1af9 --- /dev/null +++ b/deslib/util/bpso.py @@ -0,0 +1,311 @@ +# coding=utf-8 + +# Author: Rafael Menelau Oliveira e Cruz +# +# License: BSD 3 clause + +import copy +from typing import List + +import numpy as np + +# Limits +X_MAX = 10 +X_MIN = -X_MAX +MI = 100 +POS_MAX = 100 +POS_MIN = -100 + +# Auxiliary variables +z = 0 + + +def s_shaped_transfer(X): + result = 1.0 / (1.0 + np.power(np.e, -2.0 * X)) + result[np.isnan(result)] = 1 + return result + + +def v_shaped_transfer(X): + return np.abs((2.0 / np.pi) * np.arctan((np.pi / 2.0) * X)) + + +class Particle: + """ + Class representing a particle in a swarm. + + Parameters + ---------- + inertia : float + Initial inertia of the swarm + + c1 : float + Self coefficient + + c2 : float + Group coefficient + + Attributes + ---------- + n_dimensions : int + Particle dimensionality + pbest : array-like + Particle best position + best_fitness : float + Best fitness values obtained by the particle + fitness : float + Current fitness value from the particle + velocity : + Velocity vector. Each element corresponds to the velocity in the + corresponding dimension. + phi : float + Coefficient + history : List[Float] + Fitness evolution of the given particle. + """ + + def __init__(self, position, inertia, c1, c2): + self.position = np.asarray(position) + self.c1 = c1 + self.c2 = c2 + self.inertia = inertia + + # class variables + self.n_dimensions = position.size + self.best_fitness = None + self.fitness = None + self.phi = 0 + self.pbest = np.copy(self.position) + self.velocity = np.zeros(self.n_dimensions) + self.history = [] + + +class BPSO: + """ + Binary Particle Swarm Optimization (BPSO) with self updating mechanism. + Conversion from continuous to binary representation is conducted using + either the V-shaped and S-shaped transfer functions + + Parameters + ---------- + max_iter : int, default 100 + Number of iterations in the optimization. + n_particles : int, default 20 + Number of particles used in the optimization. + init_inertia : float + Initial inertia of the swarm + final_inertia : float + Final inertia of the swarm + c1 : float + Self coefficient + c2 : float + Group coefficient + + Attributes + ---------- + n_particles_ : int + Number of particles in the swarm + particles_ : List[Particle] + List of particles in the swarm. + g_best_ : Particle + Particle containing the best fitness in the swarm history + + References + ---------- + Kennedy, James, and Russell Eberhart. "Particle swarm optimization." + In Proceedings of IJCNN'95-International Conference on Neural Networks, + vol. 4, pp. 1942-1948. IEEE, 1995. + + Mirjalili, Seyedali, and Andrew Lewis. "S-shaped versus V-shaped transfer + functions for binary particle swarm optimization." Swarm and Evolutionary + Computation 9 (2013): 1-14. + + Zhang, Ying Chao, Xiong Xiong, and QiDong Zhang. "An improved self-adaptive + PSO algorithm with detection function for multimodal function optimization + problems." Mathematical Problems in Engineering 2013 (2013). + """ + def __init__(self, + max_iter, + n_particles, + n_dim, + init_inertia, + final_inertia, + c1, + c2, + transfer_function='v-shaped', + max_iter_no_change=None, + random_state=None, + ): + self.max_iter = max_iter + self.n_particles = n_particles + self.n_dim = n_dim + self.init_inertia = init_inertia + self.final_inertia = final_inertia + self.initial_c1 = c1 + self.initial_c2 = c2 + self.transfer_function = transfer_function + self.max_iter_no_change = max_iter_no_change + self.random_state = random_state + + def _create_swarm(self): + self.particles_ = [] + self.gbest_ = None + positions = np.random.uniform(0, 1, (self.n_particles, self.n_dim)) + positions = (positions > 0.5).astype(int) + for idx in range(self.n_particles): + particle = Particle(positions[idx], + inertia=self.init_inertia, + c1=self.initial_c1, + c2=self.initial_c2) + + self.particles_.append(particle) + + def _update_velocity(self): + for p in self.particles_: + tmp_c1 = p.pbest - p.position + tmp_c2 = self.gbest_.position - p.position + inertia = p.inertia * p.velocity + cognitive = p.c1 * np.random.rand(p.n_dimensions) * tmp_c1 + social = p.c2 * np.random.rand(p.n_dimensions) * tmp_c2 + p.velocity = inertia + cognitive + social + p.velocity = p.velocity.clip(X_MIN, X_MAX) + + # for dim in range(len(particle.position)): + # tmp_c1 = particle.pbest[dim] - particle.position[dim] + # tmp_c2 = self.gbest_.position[dim] - particle.position[dim] + # + # inertia = particle.inertia * particle.velocity[dim] + # cognitive = ( + # (particle.c1 * np.random.rand()) * tmp_c1) + # social = (particle.c2 * np.random.rand()) * tmp_c2 + # + # particle.velocity[dim] = inertia + cognitive + social + # + # # Limit velocity + # if particle.velocity[dim] >= X_MAX: + # particle.velocity[dim] = X_MAX + # elif particle.velocity[dim] <= X_MIN: + # particle.velocity[dim] = X_MIN + + def _update_particles(self): + for particle in self.particles_: + particle.position += particle.velocity + particle.position = particle.position.clip(POS_MAX, POS_MIN) + + # for dim in range(len(particle.position)): + # particle.position[dim] = particle.position[dim] + \ + # particle.velocity[dim] + # if particle.position[dim] >= POS_MAX: + # particle.position[dim] = POS_MAX + # elif particle.position[dim] <= POS_MIN: + # particle.position[dim] = POS_MIN + + def _update_binary_particles(self): + for particle in self.particles_: + velocity = self._transfer_function(particle.velocity) + pos = (np.random.rand(self.n_dim) < velocity).astype(np.int) + particle.position[pos == 1] = particle.position[pos == 1] ^ 1 + + def _transfer_function(self, velocity): + if self.transfer_function == 's-shape': + velocity = s_shaped_transfer(velocity) + else: + velocity = v_shaped_transfer(velocity) + return velocity + + def _self_update(self): + # Compute phi for each particle + for particle in self.particles_: + tmp1 = 0 + tmp2 = 0 + for j in range(len(particle.position)): + tmp1 = tmp1 + self.gbest_.position[j] - particle.position[ + j] + tmp2 = tmp2 + particle.pbest[j] - particle.position[j] + if tmp1 == 0: + tmp1 = 1 + if tmp2 == 0: + tmp2 = 1 + particle.phi = abs(tmp1 / tmp2) + ln = np.log(particle.phi) + tmp = particle.phi * (self.iter_ - ((1 + ln) * self.max_iter) / MI) + particle.inertia = ((self.init_inertia - self.final_inertia) / ( + 1 + np.exp(tmp))) + self.final_inertia + particle.c1 = self.initial_c1 * (particle.phi ** (-1)) + particle.c2 = self.initial_c2 * particle.phi + + def _update_pbest(self): + """ + Method used to update the position of each particle. + """ + for particle in self.particles_: + if (particle.best_fitness is None or + particle.best_fitness >= particle.fitness): + particle.pbest = particle.position + particle.best_fitness = particle.fitness + + def _update_gbest(self): + """ + Method used to update the best particle in the swarm. + """ + for particle in self.particles_: + if self.gbest_ is None or particle.fitness < self.gbest_.fitness: + self.gbest_ = copy.deepcopy(particle) + self.n_iter_no_change_ = 0 + + def optimize(self, fitness_function): + """ + Run the PSO algorithm. + + Parameters + ---------- + fitness_function : function + Function used to estimate the fitness of a binary particle. + + Return + ------ + gbest_ : Particle + Global best solution from the whole swarm. + """ + self._create_swarm() + self.n_iter_no_change_ = 0 + self.iter_ = 0 + + while not self._stop(): + # compute fitness of each particle + for particle in self.particles_: + particle.fitness = fitness_function(particle.position) + + self._update_gbest() + self._update_pbest() + self._update_velocity() + self._self_update() + self._update_binary_particles() + self.iter_ = self.iter_ + 1 + self.n_iter_no_change_ += 1 + return self.gbest_ + + def _stop(self): + """ + Function to check if the optimization should stop. + """ + # Early stopping + if (self.max_iter_no_change is not None + and self.n_iter_no_change_ >= self.max_iter_no_change): + return True + # Reached maximum number of iteration + if self.iter_ >= self.max_iter: + return True + + +def main(): + from sklearn.datasets import make_classification + from sklearn.neighbors import KNeighborsClassifier + + def fitness(X_train, X_val, y_train, y_val, p): + knn = KNN_classifier + + X, y = make_classification(n_samples=2000, n_features=100, n_redundant=50, + n_informative=20) + swarm = BPSO(1000, 10, 200, 1, 0.3, c1=2, c2=2, max_iter_no_change=50,) + swarm.optimize() \ No newline at end of file