Source code for pegasus.tools.nearest_neighbors

import time
import numpy as np
import pandas as pd

from scipy.sparse import issparse, csr_matrix
from scipy.stats import chi2
from sklearn.neighbors import NearestNeighbors
from pegasusio import MultimodalData
from joblib import effective_n_jobs
from typing import List, Tuple

from pegasus.tools import update_rep, X_from_rep, knn_is_cached

import logging
logger = logging.getLogger(__name__)

from pegasusio import timer



def calculate_nearest_neighbors(
    X: np.array,
    K: int = 100,
    n_jobs: int = -1,
    method: str = "hnsw",
    M: int = 20,
    efC: int = 200,
    efS: int = 200,
    random_state: int = 0,
    full_speed: int = False,
):
    """Calculate nearest neighbors
    X is the sample by feature matrix
    Return K -1 neighbors, the first one is the point itself and thus omitted.
    TODO: Documentation
    """

    nsample = X.shape[0]

    if nsample <= 1000:
        method = "sklearn"

    if nsample < K:
        logger.warning(
            "Warning: in calculate_nearest_neighbors, number of samples = {} < K = {}!\n Set K to {}.".format(
                nsample, K, nsample
            )
        )
        K = nsample

    n_jobs = effective_n_jobs(n_jobs)

    if method == "hnsw":
        import hnswlib

        assert not issparse(X)
        # Build hnsw index
        knn_index = hnswlib.Index(space="l2", dim=X.shape[1])
        knn_index.init_index(
            max_elements=nsample, ef_construction=efC, M=M, random_seed=random_state
        )
        knn_index.set_num_threads(n_jobs if full_speed else 1)
        knn_index.add_items(X)

        # KNN query
        knn_index.set_ef(efS)
        knn_index.set_num_threads(n_jobs)
        indices, distances = knn_index.knn_query(X, k=K)
        # eliminate the first neighbor, which is the node itself
        for i in range(nsample):
            if indices[i, 0] != i:
                indices[i, 1:] = indices[i, 0:-1]
                distances[i, 1:] = distances[i, 0:-1]
        indices = indices[:, 1:].astype(int)
        distances = np.sqrt(distances[:, 1:])
    else:
        assert method == "sklearn"
        knn = NearestNeighbors(
            n_neighbors=K - 1, n_jobs=n_jobs
        )  # eliminate the first neighbor, which is the node itself
        knn.fit(X)
        distances, indices = knn.kneighbors()

    return indices, distances


@timer(logger=logger)
def get_neighbors(
    data: MultimodalData,
    K: int = 100,
    rep: str = "pca",
    n_jobs: int = -1,
    random_state: int = 0,
    full_speed: bool = False,
) -> Tuple[List[int], List[float]]:
    """Find K nearest neighbors for each data point and return the indices and distances arrays.

    Parameters
    ----------

    data : `pegasusio.MultimodalData`
        An AnnData object.
    K : `int`, optional (default: 100)
        Number of neighbors, including the data point itself.
    rep : `str`, optional (default: 'pca')
        Representation used to calculate kNN. If `None` use data.X
    n_jobs : `int`, optional (default: -1)
        Number of threads to use. -1 refers to all available threads
    random_state: `int`, optional (default: 0)
        Random seed for random number generator.
    full_speed: `bool`, optional (default: False)
        If full_speed, use multiple threads in constructing hnsw index. However, the kNN results are not reproducible. If not full_speed, use only one thread to make sure results are reproducible.

    Returns
    -------

    kNN indices and distances arrays.

    Examples
    --------
    >>> indices, distances = tools.get_neighbors(data)
    """

    rep = update_rep(rep)
    indices_key = rep + "_knn_indices"
    distances_key = rep + "_knn_distances"

    if knn_is_cached(data, indices_key, distances_key, K):
        indices = data.uns[indices_key]
        distances = data.uns[distances_key]
        logger.info("Found cached kNN results, no calculation is required.")
    else:
        indices, distances = calculate_nearest_neighbors(
            X_from_rep(data, rep),
            K=K,
            n_jobs=effective_n_jobs(n_jobs),
            random_state=random_state,
            full_speed=full_speed,
        )
        data.uns[indices_key] = indices
        data.uns[distances_key] = distances

    return indices, distances


def get_symmetric_matrix(csr_mat: "csr_matrix") -> "csr_matrix":
    tp_mat = csr_mat.transpose().tocsr()
    sym_mat = csr_mat + tp_mat
    sym_mat.sort_indices()

    idx_mat = (csr_mat != 0).astype(int) + (tp_mat != 0).astype(int)
    idx_mat.sort_indices()
    idx = idx_mat.data == 2

    sym_mat.data[idx] /= 2.0
    return sym_mat


# We should not modify distances array!
@timer(logger=logger)
def calculate_affinity_matrix(
    indices: List[int], distances: List[float]
) -> "csr_matrix":

    nsample = indices.shape[0]
    K = indices.shape[1]
    # calculate sigma, important to use median here!
    sigmas = np.median(distances, axis=1)
    sigmas_sq = np.square(sigmas)

    # calculate local-scaled kernel
    normed_dist = np.zeros((nsample, K), dtype=float)
    for i in range(nsample):
        numers = 2.0 * sigmas[i] * sigmas[indices[i, :]]
        denoms = sigmas_sq[i] + sigmas_sq[indices[i, :]]
        normed_dist[i, :] = np.sqrt(numers / denoms) * np.exp(
            -np.square(distances[i, :]) / denoms
        )

    W = csr_matrix(
        (normed_dist.ravel(), (np.repeat(range(nsample), K), indices.ravel())),
        shape=(nsample, nsample),
    )
    W = get_symmetric_matrix(W)

    # density normalization
    z = W.sum(axis=1).A1
    W = W.tocoo()
    W.data /= z[W.row]
    W.data /= z[W.col]
    W = W.tocsr()
    W.eliminate_zeros()

    return W


[docs]def neighbors( data: MultimodalData, K: int = 100, rep: "str" = "pca", n_jobs: int = -1, random_state: int = 0, full_speed: bool = False, ) -> None: """Compute k nearest neighbors and affinity matrix, which will be used for diffmap and graph-based community detection algorithms. The kNN calculation uses `hnswlib <https://github.com/nmslib/hnswlib>`_ introduced by [Malkov16]_. Parameters ---------- data: ``pegasusio.MultimodalData`` Annotated data matrix with rows for cells and columns for genes. K: ``int``, optional, default: ``100`` Number of neighbors, including the data point itself. rep: ``str``, optional, default: ``"pca"`` Embedding representation used to calculate kNN. If ``None``, use ``data.X``; otherwise, keyword ``'X_' + rep`` must exist in ``data.obsm``. n_jobs: ``int``, optional, default: ``-1`` Number of threads to use. If ``-1``, use all available threads. random_state: ``int``, optional, default: ``0`` Random seed set for reproducing results. full_speed: ``bool``, optional, default: ``False`` * If ``True``, use multiple threads in constructing ``hnsw`` index. However, the kNN results are not reproducible. * Otherwise, use only one thread to make sure results are reproducible. Returns ------- ``None`` Update ``data.uns``: * ``data.uns[rep + "_knn_indices"]``: kNN index matrix. Row i is the index list of kNN of cell i (excluding itself), sorted from nearest to farthest. * ``data.uns[rep + "_knn_distances"]``: kNN distance matrix. Row i is the distance list of kNN of cell i (excluding itselt), sorted from smallest to largest. * ``data.uns["W_" + rep]``: kNN graph of the data in terms of affinity matrix. Examples -------- >>> pg.neighbors(data) """ # calculate kNN rep = update_rep(rep) indices, distances = get_neighbors( data, K=K, rep=rep, n_jobs=n_jobs, random_state=random_state, full_speed=full_speed, ) # calculate affinity matrix W = calculate_affinity_matrix(indices[:, 0 : K - 1], distances[:, 0 : K - 1]) data.uns["W_" + rep] = W
def calc_kBET_for_one_chunk(knn_indices, attr_values, ideal_dist, K): dof = ideal_dist.size - 1 ns = knn_indices.shape[0] results = np.zeros((ns, 2)) for i in range(ns): observed_counts = ( pd.Series(attr_values[knn_indices[i, :]]).value_counts(sort=False).values ) expected_counts = ideal_dist * K stat = np.sum( np.divide( np.square(np.subtract(observed_counts, expected_counts)), expected_counts, ) ) p_value = 1 - chi2.cdf(stat, dof) results[i, 0] = stat results[i, 1] = p_value return results
[docs]def calc_kBET( data: MultimodalData, attr: str, rep: str = "pca", K: int = 25, alpha: float = 0.05, n_jobs: int = -1, random_state: int = 0, temp_folder: str = None, ) -> Tuple[float, float, float]: """Calculate the kBET metric of the data regarding a specific sample attribute and embedding. The kBET metric is defined in [Büttner18]_, which measures if cells from different samples mix well in their local neighborhood. Parameters ---------- data: ``pegasusio.MultimodalData`` Annotated data matrix with rows for cells and columns for genes. attr: ``str`` The sample attribute to consider. Must exist in ``data.obs``. rep: ``str``, optional, default: ``"pca"`` The embedding representation to be used. The key ``'X_' + rep`` must exist in ``data.obsm``. By default, use PCA coordinates. K: ``int``, optional, default: ``25`` Number of nearest neighbors, using L2 metric. alpha: ``float``, optional, default: ``0.05`` Acceptance rate threshold. A cell is accepted if its kBET p-value is greater than or equal to ``alpha``. n_jobs: ``int``, optional, default: ``-1`` Number of threads used. If ``-1``, use all available threads. random_state: ``int``, optional, default: ``0`` Random seed set for reproducing results. temp_folder: ``str``, optional, default: ``None`` Temporary folder for joblib execution. Returns ------- stat_mean: ``float`` Mean kBET chi-square statistic over all cells. pvalue_mean: ``float`` Mean kBET p-value over all cells. accept_rate: ``float`` kBET Acceptance rate of the sample. Examples -------- >>> pg.calc_kBET(data, attr = 'Channel') >>> pg.calc_kBET(data, attr = 'Channel', rep = 'umap') """ assert attr in data.obs if data.obs[attr].dtype.name != "category": data.obs[attr] = pd.Categorical(data.obs[attr]) from joblib import Parallel, delayed ideal_dist = ( data.obs[attr].value_counts(normalize=True, sort=False).values ) # ideal no batch effect distribution nsample = data.shape[0] nbatch = ideal_dist.size attr_values = data.obs[attr].values.copy() attr_values.categories = range(nbatch) indices, distances = get_neighbors( data, K=K, rep=rep, n_jobs=n_jobs, random_state=random_state ) knn_indices = np.concatenate( (np.arange(nsample).reshape(-1, 1), indices[:, 0 : K - 1]), axis=1 ) # add query as 1-nn # partition into chunks n_jobs = min(effective_n_jobs(n_jobs), nsample) starts = np.zeros(n_jobs + 1, dtype=int) quotient = nsample // n_jobs remainder = nsample % n_jobs for i in range(n_jobs): starts[i + 1] = starts[i] + quotient + (1 if i < remainder else 0) kBET_arr = np.concatenate( Parallel(n_jobs=n_jobs, max_nbytes=1e7, temp_folder=temp_folder)( delayed(calc_kBET_for_one_chunk)( knn_indices[starts[i] : starts[i + 1], :], attr_values, ideal_dist, K ) for i in range(n_jobs) ) ) res = kBET_arr.mean(axis=0) stat_mean = res[0] pvalue_mean = res[1] accept_rate = (kBET_arr[:, 1] >= alpha).sum() / nsample return (stat_mean, pvalue_mean, accept_rate)
[docs]def calc_kSIM( data: MultimodalData, attr: str, rep: str = "pca", K: int = 25, min_rate: float = 0.9, n_jobs: int = -1, random_state: int = 0, ) -> Tuple[float, float]: """Calculate the kSIM metric of the data regarding a specific sample attribute and embedding. The kSIM metric is defined in [Li20]_, which measures if a sample attribute is not diffused too much in each cell's local neighborhood. Parameters ---------- data: ``pegasusio.MultimodalData`` Annotated data matrix with rows for cells and columns for genes. attr: ``str`` The sample attribute to consider. Must exist in ``data.obs``. rep: ``str``, optional, default: ``"pca"`` The embedding representation to consider. The key ``'X_' + rep`` must exist in ``data.obsm``. K: ``int``, optional, default: ``25`` The number of nearest neighbors to be considered. min_rate: ``float``, optional, default: ``0.9`` Acceptance rate threshold. A cell is accepted if its kSIM rate is larger than or equal to ``min_rate``. n_jobs: ``int``, optional, default: ``-1`` Number of threads used. If ``-1``, use all available threads. random_state: ``int``, optional, default: ``0`` Random seed set for reproducing results. Returns ------- kSIM_mean: ``float`` Mean kSIM rate over all the cells. kSIM_accept_rate: ``float`` kSIM Acceptance rate of the sample. Examples -------- >>> pg.calc_kSIM(data, attr = 'cell_type') >>> pg.calc_kSIM(data, attr = 'cell_type', rep = 'umap') """ assert attr in data.obs nsample = data.shape[0] indices, distances = get_neighbors( data, K=K, rep=rep, n_jobs=n_jobs, random_state=random_state ) knn_indices = np.concatenate( (np.arange(nsample).reshape(-1, 1), indices[:, 0 : K - 1]), axis=1 ) # add query as 1-nn labels = np.reshape(data.obs[attr].values[knn_indices.ravel()], (-1, K)) same_labs = labels == labels[:, 0].reshape(-1, 1) correct_rates = same_labs.sum(axis=1) / K kSIM_mean = correct_rates.mean() kSIM_accept_rate = (correct_rates >= min_rate).sum() / nsample return (kSIM_mean, kSIM_accept_rate)
# def calc_JSD(P, Q): # M = (P + Q) / 2 # return (entropy(P, M, base = 2) + entropy(Q, M, base = 2)) / 2.0 # def calc_kBJSD_for_one_datapoint(pos, attr_values, knn_indices, ideal_dist): # idx = np.append(knn_indices[pos], [pos]) # empirical_dist = pd.Series(attr_values[idx]).value_counts(normalize = True, sort = False).values # return calc_JSD(ideal_dist, empirical_dist) # def calc_kBJSD(data, attr, rep = 'pca', K = 25, n_jobs = 1, random_state = 0, temp_folder = None): # assert attr in data.obs # if data.obs[attr].dtype.name != 'category': # data.obs[attr] = pd.Categorical(data.obs[attr]) # from joblib import Parallel, delayed # ideal_dist = data.obs[attr].value_counts(normalize = True, sort = False).values # ideal no batch effect distribution # nsample = data.shape[0] # nbatch = ideal_dist.size # attr_values = data.obs[attr].values.copy() # attr_values.categories = range(nbatch) # indices, distances = get_neighbors(data, K = K, rep = rep, n_jobs = n_jobs, random_state = random_state) # knn_indices = indices[:, 0 : K - 1] # kBJSD_arr = np.array(Parallel(n_jobs = 1, max_nbytes = 1e7, temp_folder = temp_folder)(delayed(calc_kBJSD_for_one_datapoint)(i, attr_values, knn_indices, ideal_dist) for i in range(nsample))) # return kBJSD_arr.mean()