Source code for pegasus.tools.nearest_neighbors

import time
import numpy as np
import pandas as pd
import numba

from scipy.sparse import issparse, csr_matrix
from scipy.stats import chi2
from sklearn.neighbors import NearestNeighbors
from pegasusio import MultimodalData
from typing import List, Tuple

from pegasus.tools import eff_n_jobs, update_rep, X_from_rep

import logging
logger = logging.getLogger(__name__)

from pegasusio import timer



@numba.njit(cache=True)
def _reorg_knn(indices, distances):
    for i in range(indices.shape[0]):
        if indices[i, 0] != i:
            for j in range(indices.shape[1]-1, 0, -1):
                indices[i, j] = indices[i, j-1]
                distances[i, j] = distances[i, j-1]
            indices[i, 0] = i
            distances[i, 0] = 0.0


def calculate_nearest_neighbors(
    X: np.array,
    K: int = 100,
    n_jobs: int = -1,
    method: str = "hnsw",
    exact_k: bool = False,
    M: int = 20,
    efC: int = 200,
    efS: int = 200,
    random_state: int = 0,
    full_speed: int = False,
    dist: str = 'l2',
) -> Tuple[List[int], List[float], int]:
    """Find K nearest neighbors for each data point in the matrix and return the indices and distances arrays.

    K is determined by min(K, int(sqrt(X.shape[0]))) if exact_k == False.

    Parameters
    ----------

    X : `np.array`
        An array of n_samples by n_features.
    K : `int`, optional (default: 100)
        Number of neighbors, including the data point itself. If K is None, determine K by sqrt(X.shape[0]).
    n_jobs : `int`, optional (default: -1)
        Number of threads to use. -1 refers to using all physical CPU cores.
    method: `str`, optional (default: 'hnsw')
        Choosing from 'hnsw' for approximate nearest neighbor search or 'sklearn' for exact nearest neighbor search. If X.shape[0] <= 1000, method will be automatically set to "sklearn" for exact KNN search
    exact_k: `bool`, optional (default: 'False')
        If True, use exactly the K passed to the function; otherwise K is determined as min(K, sqrt(X.shape[0])).
    M, efC, efS: `int`, optional (20, 200, 200)
        HNSW algorithm parameters.
    random_state: `int`, optional (default: 0)
        Random seed for random number generator.
    full_speed: `bool`, optional (default: False)
        If full_speed, use multiple threads in constructing hnsw index. However, the kNN results are not reproducible. If not full_speed, use only one thread to make sure results are reproducible.
    dist: `str`, optional (default: 'l2')
        Distance metric to use. By default, use squared L2 distance. Available options, 'l2', inner product 'ip' or cosine similarity 'cosine'.

    Returns
    -------

    kNN indices array, distances array and adjusted K.

    Examples
    --------
    >>> indices, distances = calculate_nearest_neighbors(X)
    """
    nsample = X.shape[0]

    if nsample <= 1000:
        method = "sklearn"

    k_rot = int(nsample ** 0.5) # rot, rule of thumb
    if (K is None) or (K > k_rot and (not exact_k)):
        K = k_rot
        logger.info(f"in calculate_nearest_neighbors, K is adjusted to {K}.")

    if K == 1:
        return np.zeros((nsample, 0), dtype=int), np.zeros((nsample, 0), dtype=np.float32), K

    n_jobs = eff_n_jobs(n_jobs)

    if method == "hnsw":
        try:
            import hnswlib
        except ImportError:
            raise ImportError("Need hnswlib! Try 'pip install hnswlib' or 'conda install -c conda-forge hnswlib'.")

        assert not issparse(X)
        # Build hnsw index
        knn_index = hnswlib.Index(space=dist, dim=X.shape[1])
        knn_index.init_index(
            max_elements=nsample, ef_construction=efC, M=M, random_seed=random_state
        )
        knn_index.set_num_threads(n_jobs if full_speed else 1)
        knn_index.add_items(X)

        # KNN query
        knn_index.set_ef(efS)
        knn_index.set_num_threads(n_jobs)
        indices, distances = knn_index.knn_query(X, k=K)
        # eliminate the first neighbor, which is the node itself
        _reorg_knn(indices, distances)
        indices = indices[:, 1:]
        indices.dtype = np.int64
        distances = distances[:, 1:]
        distances = np.sqrt(distances, out=distances)
    else:
        assert method == "sklearn"
        knn = NearestNeighbors(
            n_neighbors=K - 1, n_jobs=n_jobs
        )  # eliminate the first neighbor, which is the node itself
        knn.fit(X)
        distances, indices = knn.kneighbors()

    return indices, distances, K


def knn_is_cached(
    data: MultimodalData, indices_key: str, distances_key: str, K: int
) -> bool:
    return (
        (indices_key in data.obsm)
        and (distances_key in data.obsm)
        and data.obsm[indices_key].shape[0] == data.shape[0]
        and (K <= data.obsm[indices_key].shape[1] + 1)
    )


[docs]@timer(logger=logger) def get_neighbors( data: MultimodalData, K: int = 100, rep: str = "pca", n_comps: int = None, n_jobs: int = -1, random_state: int = 0, full_speed: bool = False, use_cache: bool = False, dist: str = "l2", method: str = "hnsw", exact_k: bool = False, ) -> Tuple[List[int], List[float], int]: """Find K nearest neighbors for each data point and return the indices and distances arrays. K is determined by min(K, int(sqrt(data.shape[0]))) if exact_k == False. Parameters ---------- data : `pegasusio.MultimodalData` An AnnData object. K : `int`, optional (default: 100) Number of neighbors, including the data point itself. rep : `str`, optional (default: 'pca') Representation used to calculate kNN. If `None` use data.X n_comps: `int`, optional (default: None) Number of components to be used in the `rep`. If n_comps == None, use all components; otherwise, use the minimum of n_comps and rep's dimensions. n_jobs : `int`, optional (default: -1) Number of threads to use. -1 refers to using all physical CPU cores. random_state: `int`, optional (default: 0) Random seed for random number generator. full_speed: `bool`, optional (default: False) If full_speed, use multiple threads in constructing hnsw index. However, the kNN results are not reproducible. If not full_speed, use only one thread to make sure results are reproducible. use_cache: `bool`, optional (default: False) If use_cache and found cached knn results, will not recompute. dist: `str`, optional (default: 'l2') Distance metric to use. By default, use squared L2 distance. Available options, 'l2' or inner product 'ip' or cosine similarity 'cosine'. method: `str`, optional (default: 'hnsw') Choosing from 'hnsw' for approximate nearest neighbor search or 'sklearn' for exact nearest neighbor search. exact_k: `bool`, optional (default: 'False') If True, use exactly the K passed to the function; otherwise K is determined as min(K, sqrt(X.shape[0])). Returns ------- kNN indices array, distances array, and adjusted K. Examples -------- >>> indices, distances, K = tools.get_neighbors(data) """ rep = update_rep(rep) indices_key = rep + "_knn_indices" distances_key = rep + "_knn_distances" k_rot = int(data.shape[0] ** 0.5) # rot, rule of thumb if (K is None) or (K > k_rot and (not exact_k)): K = k_rot logger.info(f"in get_neighbors, K is adjusted to {K}.") if use_cache and knn_is_cached(data, indices_key, distances_key, K): indices = data.obsm[indices_key] distances = data.obsm[distances_key] logger.info("Found cached kNN results, no calculation is required.") else: indices, distances, _ = calculate_nearest_neighbors( X_from_rep(data, rep, n_comps), K=K, n_jobs=eff_n_jobs(n_jobs), method=method, exact_k=exact_k, random_state=random_state, full_speed=full_speed, dist=dist, ) data.obsm[indices_key] = indices data.register_attr(indices_key, "knn") data.obsm[distances_key] = distances data.register_attr(distances_key, "knn") return indices, distances, K
def get_symmetric_matrix(csr_mat: "csr_matrix") -> "csr_matrix": tp_mat = csr_mat.transpose().tocsr() sym_mat = csr_mat + tp_mat sym_mat.sort_indices() idx_mat = (csr_mat != 0).astype(int) + (tp_mat != 0).astype(int) idx_mat.sort_indices() idx = idx_mat.data == 2 sym_mat.data[idx] /= 2.0 return sym_mat # We should not modify distances array! @timer(logger=logger) def calculate_affinity_matrix( indices: List[int], distances: List[float] ) -> "csr_matrix": nsample = indices.shape[0] K = indices.shape[1] # calculate sigma, important to use median here! sigmas = np.median(distances, axis=1) sigmas_sq = np.square(sigmas) # calculate local-scaled kernel normed_dist = np.zeros((nsample, K), dtype=np.float32) for i in range(nsample): numers = 2.0 * sigmas[i] * sigmas[indices[i, :]] denoms = sigmas_sq[i] + sigmas_sq[indices[i, :]] normed_dist[i, :] = np.sqrt(numers / denoms) * np.exp( -np.square(distances[i, :]) / denoms ) W = csr_matrix( (normed_dist.ravel(), (np.repeat(range(nsample), K), indices.ravel())), shape=(nsample, nsample), ) W = get_symmetric_matrix(W) # density normalization z = W.sum(axis=1).A1 W = W.tocoo() W.data /= z[W.row] W.data /= z[W.col] W = W.tocsr() W.eliminate_zeros() return W
[docs]def neighbors( data: MultimodalData, K: int = 100, rep: "str" = "pca", n_comps: int = None, n_jobs: int = -1, random_state: int = 0, full_speed: bool = False, use_cache: bool = False, dist: str = "l2", method: str = "hnsw", exact_k: bool = False, ) -> None: """Compute k nearest neighbors and affinity matrix, which will be used for diffmap and graph-based community detection algorithms. The kNN calculation uses `hnswlib <https://github.com/nmslib/hnswlib>`_ introduced by [Malkov16]_. K is determined by min(K, sqrt(data.shape[0])). Parameters ---------- data: ``pegasusio.MultimodalData`` Annotated data matrix with rows for cells and columns for genes. K: ``int``, optional, default: ``100`` Number of neighbors, including the data point itself. rep: ``str``, optional, default: ``"pca"`` Embedding representation used to calculate kNN. If ``None``, use ``data.X``; otherwise, keyword ``'X_' + rep`` must exist in ``data.obsm``. n_comps: `int`, optional (default: None) Number of components to be used in the `rep`. If n_comps == None, use all components; otherwise, use the minimum of n_comps and rep's dimensions. n_jobs: ``int``, optional, default: ``-1`` Number of threads to use. If ``-1``, use all physical CPU cores. random_state: ``int``, optional, default: ``0`` Random seed set for reproducing results. full_speed: ``bool``, optional, default: ``False`` * If ``True``, use multiple threads in constructing ``hnsw`` index. However, the kNN results are not reproducible. * Otherwise, use only one thread to make sure results are reproducible. use_cache: ``bool``, optional, default: ``False`` * If ``True`` and found cached knn results, Pegasus will use cached results and do not recompute. * Otherwise, compute kNN irrespective of caching status. dist: ``str``, optional (default: ``"l2"``) Distance metric to use. By default, use squared L2 distance. Available options, ``"l2"`` or inner product ``"ip"`` or cosine similarity ``"cosine"``. method: ``str``, optional (default: ``"hnsw"``) Choose from "hnsw" or "sklearn". "hnsw" uses HNSW algorithm for approximate nearest neighbor search and "sklearn" uses sklearn package for exact nearest neighbor search. exact_k: ``bool``, optional (default: ``False``) If True, use exactly the K passed to the function; otherwise K is determined as min(K, sqrt(X.shape[0])). Returns ------- ``None`` Update ``data.obsm``: * ``data.obsm[rep + "_knn_indices"]``: kNN index matrix. Row i is the index list of kNN of cell i (excluding itself), sorted from nearest to farthest. * ``data.obsm[rep + "_knn_distances"]``: kNN distance matrix. Row i is the distance list of kNN of cell i (excluding itselt), sorted from smallest to largest. Update ``data.obsp``: * ``data.obsp["W_" + rep]``: kNN graph of the data in terms of affinity matrix. Examples -------- >>> pg.neighbors(data) """ # calculate kNN rep = update_rep(rep) indices, distances, K = get_neighbors( data, K=K, rep=rep, n_comps=n_comps, n_jobs=n_jobs, random_state=random_state, full_speed=full_speed, use_cache=use_cache, dist=dist, method=method, exact_k=exact_k, ) # calculate affinity matrix W = calculate_affinity_matrix(indices[:, 0 : K - 1], distances[:, 0 : K - 1]) data.obsp["W_" + rep] = W # pop out jump method values data.uns.pop(f"{rep}_jump_values", None) data.uns.pop(f"{rep}_optimal_k", None)
def calc_kBET_for_one_chunk(knn_indices, attr_values, ideal_dist, K): dof = ideal_dist.size - 1 ns = knn_indices.shape[0] results = np.zeros((ns, 2)) for i in range(ns): observed_counts = ( pd.Series(attr_values[knn_indices[i, :]]).value_counts(sort=False).values ) expected_counts = ideal_dist * K stat = np.sum( np.divide( np.square(np.subtract(observed_counts, expected_counts)), expected_counts, ) ) p_value = 1 - chi2.cdf(stat, dof) results[i, 0] = stat results[i, 1] = p_value return results
[docs]def calc_kBET( data: MultimodalData, attr: str, rep: str = "pca", K: int = 25, alpha: float = 0.05, n_jobs: int = -1, random_state: int = 0, temp_folder: str = None, use_cache: bool = True, ) -> Tuple[float, float, float]: """Calculate the kBET metric of the data regarding a specific sample attribute and embedding. The kBET metric is defined in [Büttner18]_, which measures if cells from different samples mix well in their local neighborhood. Parameters ---------- data: ``pegasusio.MultimodalData`` Annotated data matrix with rows for cells and columns for genes. attr: ``str`` The sample attribute to consider. Must exist in ``data.obs``. rep: ``str``, optional, default: ``"pca"`` The embedding representation to be used. The key ``'X_' + rep`` must exist in ``data.obsm``. By default, use PCA coordinates. K: ``int``, optional, default: ``25`` Number of nearest neighbors, using L2 metric. alpha: ``float``, optional, default: ``0.05`` Acceptance rate threshold. A cell is accepted if its kBET p-value is greater than or equal to ``alpha``. n_jobs: ``int``, optional, default: ``-1`` Number of threads used. If ``-1``, use all physical CPU cores. random_state: ``int``, optional, default: ``0`` Random seed set for reproducing results. temp_folder: ``str``, optional, default: ``None`` Temporary folder for joblib execution. use_cache: ``bool``, optional, default: ``True`` If use cache results for kNN. Returns ------- stat_mean: ``float`` Mean kBET chi-square statistic over all cells. pvalue_mean: ``float`` Mean kBET p-value over all cells. accept_rate: ``float`` kBET Acceptance rate of the sample. Examples -------- >>> pg.calc_kBET(data, attr = 'Channel') >>> pg.calc_kBET(data, attr = 'Channel', rep = 'umap') """ assert attr in data.obs if data.obs[attr].dtype.name != "category": data.obs[attr] = pd.Categorical(data.obs[attr]) ideal_dist = ( data.obs[attr].value_counts(normalize=True, sort=False).values ) # ideal no batch effect distribution nsample = data.shape[0] nbatch = ideal_dist.size attr_values = data.obs[attr].values.copy() attr_values.categories = range(nbatch) indices, distances, K = get_neighbors( data, K=K, rep=rep, n_jobs=n_jobs, random_state=random_state, use_cache=use_cache, ) knn_indices = np.concatenate( (np.arange(nsample).reshape(-1, 1), indices[:, 0 : K - 1]), axis=1 ) # add query as 1-nn # partition into chunks n_jobs = min(eff_n_jobs(n_jobs), nsample) starts = np.zeros(n_jobs + 1, dtype=int) quotient = nsample // n_jobs remainder = nsample % n_jobs for i in range(n_jobs): starts[i + 1] = starts[i] + quotient + (1 if i < remainder else 0) from joblib import Parallel, delayed, parallel_backend with parallel_backend("loky", inner_max_num_threads=1): kBET_arr = np.concatenate( Parallel(n_jobs=n_jobs, temp_folder=temp_folder)( delayed(calc_kBET_for_one_chunk)( knn_indices[starts[i] : starts[i + 1], :], attr_values, ideal_dist, K ) for i in range(n_jobs) ) ) res = kBET_arr.mean(axis=0) stat_mean = res[0] pvalue_mean = res[1] accept_rate = (kBET_arr[:, 1] >= alpha).sum() / nsample return (stat_mean, pvalue_mean, accept_rate)
[docs]def calc_kSIM( data: MultimodalData, attr: str, rep: str = "pca", K: int = 25, min_rate: float = 0.9, n_jobs: int = -1, random_state: int = 0, use_cache: bool = True, ) -> Tuple[float, float]: """Calculate the kSIM metric of the data regarding a specific sample attribute and embedding. The kSIM metric is defined in [Li20]_, which measures if a sample attribute is not diffused too much in each cell's local neighborhood. Parameters ---------- data: ``pegasusio.MultimodalData`` Annotated data matrix with rows for cells and columns for genes. attr: ``str`` The sample attribute to consider. Must exist in ``data.obs``. rep: ``str``, optional, default: ``"pca"`` The embedding representation to consider. The key ``'X_' + rep`` must exist in ``data.obsm``. K: ``int``, optional, default: ``25`` The number of nearest neighbors to be considered. min_rate: ``float``, optional, default: ``0.9`` Acceptance rate threshold. A cell is accepted if its kSIM rate is larger than or equal to ``min_rate``. n_jobs: ``int``, optional, default: ``-1`` Number of threads used. If ``-1``, use all physical CPU cores. random_state: ``int``, optional, default: ``0`` Random seed set for reproducing results. use_cache: ``bool``, optional, default: ``True`` If use cache results for kNN. Returns ------- kSIM_mean: ``float`` Mean kSIM rate over all the cells. kSIM_accept_rate: ``float`` kSIM Acceptance rate of the sample. Examples -------- >>> pg.calc_kSIM(data, attr = 'cell_type') >>> pg.calc_kSIM(data, attr = 'cell_type', rep = 'umap') """ assert attr in data.obs nsample = data.shape[0] indices, distances, K = get_neighbors( data, K=K, rep=rep, n_jobs=n_jobs, random_state=random_state, use_cache=use_cache, ) knn_indices = np.concatenate( (np.arange(nsample).reshape(-1, 1), indices[:, 0 : K - 1]), axis=1 ) # add query as 1-nn labels = np.reshape(data.obs[attr].values[knn_indices.ravel()], (-1, K)) same_labs = labels == labels[:, 0].reshape(-1, 1) correct_rates = same_labs.sum(axis=1) / K kSIM_mean = correct_rates.mean() kSIM_accept_rate = (correct_rates >= min_rate).sum() / nsample return (kSIM_mean, kSIM_accept_rate)