Source code for idpet.dimensionality_reduction

from abc import ABC, abstractmethod
from typing import List, Tuple
import inspect
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA, KernelPCA
from sklearn.manifold import TSNE
import numpy as np
from sklearn.metrics import silhouette_score
from umap import UMAP
from idpet.utils import logger


[docs] class DimensionalityReduction(ABC):
[docs] @abstractmethod def fit(self, data:np.ndarray): """ Fit the dimensionality reduction model to the data. Parameters ---------- data : np.ndarray The input data array of shape (n_samples, n_features). Notes ----- This method fits the dimensionality reduction model to the input data. """ raise NotImplementedError("Method 'fit' must be implemented in subclasses.")
[docs] @abstractmethod def transform(self, data:np.ndarray) -> np.ndarray: """ Transform the input data using the fitted dimensionality reduction model. Parameters ---------- data : np.ndarray The input data array of shape (n_samples, n_features) to be transformed. Returns ------- np.ndarray The transformed data array of shape (n_samples, n_components). Notes ----- This method transforms the input data using the fitted dimensionality reduction model. """ raise NotImplementedError("Method 'transform' must be implemented in subclasses.")
[docs] @abstractmethod def fit_transform(self, data:np.ndarray) -> np.ndarray: """ Fit the dimensionality reduction model to the data and then transform it. Parameters ---------- data : np.ndarray The input data array of shape (n_samples, n_features). Returns ------- np.ndarray The transformed data array of shape (n_samples, n_components). Notes ----- This method fits the dimensionality reduction model to the input data and then transforms it. """ raise NotImplementedError("Method 'fit_transform' must be implemented in subclasses.")
[docs] class PCAReduction(DimensionalityReduction): """ Principal Component Analysis (PCA) for dimensionality reduction. Parameters ---------- num_dim : int, optional Number of components to keep. Default is 10. """ def __init__(self, n_components: int = 10): self.n_components = n_components
[docs] def fit(self, data:np.ndarray): self.pca = PCA(n_components=self.n_components) self.pca.fit(data) return self.pca
[docs] def transform(self, data:np.ndarray) -> np.ndarray: reduce_dim_data = self.pca.transform(data) return reduce_dim_data
[docs] def fit_transform(self, data:np.ndarray) -> np.ndarray: self.pca = PCA(n_components=self.n_components) transformed = self.pca.fit_transform(data) return transformed
[docs] class TSNEReduction(DimensionalityReduction): """ Class for performing dimensionality reduction using t-SNE algorithm. Parameters ---------- perplexity_vals : List[float], optional List of perplexity values. Default is [30]. metric : str, optional Metric to use. Default is "euclidean". circular : bool, optional Whether to use circular metrics. Default is False. n_components : int, optional Number of dimensions of the embedded space. Default is 2. learning_rate : float, optional Learning rate. Default is 100.0. range_n_clusters : List[int], optional Range of cluster values. Default is range(2, 10, 1). random_state: int, optional Random seed for sklearn. """ def __init__( self, perplexity_vals:List[float]=[30], metric:str="euclidean", circular:bool=False, n_components:int=2, learning_rate:float='auto', range_n_clusters:List[int] = range(2,10,1), random_state:int=None ): self.perplexity_vals = perplexity_vals self.metric = unit_vector_distance if circular else metric self.n_components = n_components self.learning_rate = learning_rate self.results = [] self.range_n_clusters = range_n_clusters self.random_state = random_state # For compatibility with different sklearn versions. sig = inspect.signature(TSNE.__init__) if "n_iter" in sig.parameters: self.n_iter_args = {"n_iter": 3500} elif "max_iter" in sig.parameters: self.n_iter_args = {"max_iter": 3500} else: raise TypeError()
[docs] def fit(self, data:np.ndarray): self.best_tsne = self.fit_transform(data) return self
# return super().fit(data)
[docs] def transform(self, data:np.ndarray) -> np.ndarray: return super().transform(data)
[docs] def fit_transform(self, data:np.ndarray) -> np.ndarray: self.data = data logger.info("tsne is running...") for perplexity in self.perplexity_vals: tsneObject = TSNE( n_components=self.n_components, perplexity=perplexity, early_exaggeration=10.0, learning_rate=self.learning_rate, metric=self.metric, n_iter_without_progress=300, min_grad_norm=1e-7, init="random", method="barnes_hut", angle=0.5, random_state=self.random_state, **self.n_iter_args ) tsne = tsneObject.fit_transform(data) self._cluster(tsne, perplexity) best_result = max(self.results, key=lambda x: x['silhouette_product']) self.bestP = best_result['perplexity'] self.bestK = best_result['n_clusters'] self.best_tsne = best_result['tsne_features'] self.best_kmeans = best_result['kmeans_model'] logger.info(f"Best Perplexity: {self.bestP}") logger.info(f"Best Number of Clusters: {self.bestK}") logger.info("Silhouette Score Low Dimensional: {}".format(best_result['silhouette_ld'])) logger.info("Silhouette Score High Dimensional: {}".format(best_result['silhouette_hd'])) logger.info("Silhouette Score Product: {}".format(best_result['silhouette_product'])) return self.best_tsne
def _cluster(self, tsne:np.ndarray, perplexity:float): for n_clusters in self.range_n_clusters: kmeans = KMeans(n_clusters=n_clusters, n_init='auto').fit(tsne) silhouette_ld = silhouette_score(tsne, kmeans.labels_) silhouette_hd = silhouette_score(self.data, kmeans.labels_) result = { 'perplexity': perplexity, 'n_clusters': n_clusters, 'silhouette_ld': silhouette_ld, 'silhouette_hd': silhouette_hd, 'silhouette_product': silhouette_ld * silhouette_hd, 'tsne_features': tsne, 'kmeans_model': kmeans } self.results.append(result)
[docs] class UMAPReduction(DimensionalityReduction): """ Class for performing dimensionality reduction using Uniform Manifold Approximation and Projection (UMAP) algorithm. Parameters ---------- num_dim : int, optional Number of dimensions for the reduced space. Default is 2. n_neighbors : List[int], optional Number of neighbors to consider for each point in the input data. Default is [15]. min_dist : float, optional The minimum distance between embedded points. Default is 0.1. metric : str, optional The metric to use for distance calculation. Default is 'euclidean'. range_n_clusters : range or List, optional Range of cluster values to consider for silhouette scoring. Default is range(2, 10, 1). random_state : int, optional Random state of the UMAP implementation. """ def __init__(self, n_components:int=2, n_neighbors:List[int]=[15], circular=False, min_dist:float=0.1, metric:str='euclidean', range_n_clusters:List[int] = range(2,10,1), random_state:int=None ): self.n_components = n_components self.n_neighbors = n_neighbors self.min_dist = min_dist self.metric = unit_vector_distance if circular else metric self.range_n_clusters = range_n_clusters self.sil_scores = [] self.best_embedding = None self.best_score = -1 self.best_n_clusters = None self.best_n_neighbors = None self.random_state = random_state
[docs] def fit(self, data): self.best_embedding = self.fit_transform(data) return self
[docs] def transform(self, data) -> np.ndarray: return super().transform(data)
[docs] def fit_transform(self, data) -> np.ndarray: logger.info('UMAP is running...') for n_neighbor in self.n_neighbors: umap_model = UMAP( n_neighbors=n_neighbor, min_dist=self.min_dist, n_components=self.n_components, metric=self.metric, random_state=self.random_state ) embedding = umap_model.fit_transform(data) scores = self.cluster(embedding, n_neighbor) best_score_for_n_neighbor = max(scores, key=lambda x: x[2]) if best_score_for_n_neighbor[2] > self.best_score: self.best_score = best_score_for_n_neighbor[2] self.best_embedding = embedding self.best_n_clusters = best_score_for_n_neighbor[1] self.best_n_neighbors = n_neighbor logger.info(f'Best number of neighbors: {self.best_n_neighbors}') logger.info(f'Best number of clusters : {self.best_n_clusters}') return self.best_embedding
[docs] def cluster(self, embedding, n_neighbor) -> List[Tuple]: """ Perform clustering using KMeans algorithm for each number of clusters in the specified range. Returns ------- List[Tuple] A list of tuples containing the number of clusters and the corresponding silhouette score for each clustering result. """ for n_clusters in self.range_n_clusters: clusterer = KMeans(n_clusters=n_clusters, n_init="auto", random_state=10) cluster_labels = clusterer.fit_predict(embedding) silhouette_avg = silhouette_score(embedding, cluster_labels) self.sil_scores.append((n_neighbor, n_clusters,silhouette_avg)) # print( # "For n_neighbors =", # n_neighbor, # "For n_clusters =", # n_clusters, # "The average silhouette_score is :", # silhouette_avg, # ) return self.sil_scores
[docs] class KPCAReduction(DimensionalityReduction): """ Class for performing dimensionality reduction using Kernel PCA (KPCA) algorithm. Parameters ---------- circular : bool, optional Whether to use circular metrics for angular features. Default is False. If True, it will override the `kernel` argument. num_dim : int, optional Number of dimensions for the reduced space. Default is 10. kernel: str, optional Kernel used for PCA, as in the scikit-learn implementation: https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.KernelPCA.html gamma : float, optional Kernel coefficient. Default is None. """ def __init__(self, circular: bool = False, n_components: int = 10, kernel: str = "poly", gamma : float = None ) -> None: self.circular = circular self.n_components = n_components self.gamma = gamma self.kernel = kernel
[docs] def fit(self, data): # Use angular features and a custom similarity function. if self.circular: self.gamma = 1/data.shape[1] if self.gamma is None else self.gamma kernel = lambda a1, a2: unit_vector_kernel(a1, a2, gamma=self.gamma) pca_in = data # Use raw features. else: kernel = self.kernel pca_in = data self.pca = KernelPCA( n_components=self.n_components, kernel=kernel, gamma=self.gamma # Ignored if using circular. ) self.pca.fit(pca_in) return self.pca
[docs] def transform(self, data) -> np.ndarray: reduce_dim_data = self.pca.transform(data) return reduce_dim_data
[docs] def fit_transform(self, data) -> np.ndarray: return super().fit_transform(data)
[docs] class DimensionalityReductionFactory: """ Factory class for creating instances of various dimensionality reduction algorithms. Methods ------- get_reducer(method, \*args, \*\*kwargs) Get an instance of the specified dimensionality reduction algorithm. """
[docs] @staticmethod def get_reducer(method, *args, **kwargs) -> DimensionalityReduction: """ Get an instance of the specified dimensionality reduction algorithm. Parameters ---------- method : str Name of the dimensionality reduction method. \*args Positional arguments to pass to the constructor of the selected method. \*\*kwargs Keyword arguments to pass to the constructor of the selected method. Returns ------- DimensionalityReduction Instance of the specified dimensionality reduction algorithm. """ if method == "pca": return PCAReduction(*args, **kwargs) elif method == "tsne": return TSNEReduction(*args, **kwargs) elif method == "kpca": return KPCAReduction(*args, **kwargs) elif method == "umap": return UMAPReduction(*args, **kwargs) else: raise NotImplementedError("Unsupported dimensionality reduction method.")
#---------------------------------------------------------------------- # Functions for performing dimensionality reduction on circular data. - #----------------------------------------------------------------------
[docs] def unit_vectorize(a: np.ndarray) -> np.ndarray: """ Convert an array with (\*, N) angles in an array with (\*, N, 2) sine and cosine values for the N angles. """ v = np.concatenate([np.cos(a)[...,None], np.sin(a)[...,None]], axis=-1) return v
[docs] def unit_vector_distance(a0: np.ndarray, a1: np.ndarray, sqrt: bool = True): """ Compute the sum of distances between two (\*, N) arrays storing the values of N angles. """ v0 = unit_vectorize(a0) v1 = unit_vectorize(a1) # Distance between N pairs of angles. if sqrt: dist = np.sqrt(np.square(v0 - v1).sum(axis=-1)) else: dist = np.square(v0 - v1).sum(axis=-1) # We sum over the N angles. dist = dist.sum(axis=-1) return dist
[docs] def unit_vector_kernel(a1, a2, gamma): """ Compute unit vector kernel. """ dist = unit_vector_distance(a1, a2, sqrt=False) sim = np.exp(-gamma*dist) return sim