Source code for pegasus.tools.visualization

import time
import numpy as np
import scipy
import umap as umap_module
import forceatlas2 as fa2
import uuid

from pegasusio import MultimodalData
from joblib import effective_n_jobs
try:
    from MulticoreTSNE import MulticoreTSNE as TSNE
except ImportError:
    print("Need Multicore-TSNE!")

from pegasus.tools import (
    update_rep,
    X_from_rep,
    W_from_rep,
    knn_is_cached,
    neighbors,
    net_train_and_predict,
    calculate_nearest_neighbors,
    calculate_affinity_matrix,
    construct_graph,
)

import logging
logger = logging.getLogger(__name__)

from pegasusio import timer



def calc_tsne(
    X,
    n_jobs,
    n_components,
    perplexity,
    early_exaggeration,
    learning_rate,
    random_state,
    init="random",
    n_iter=1000,
    n_iter_early_exag=250,
):
    """
    TODO: Typing
    """
    tsne = TSNE(
        n_jobs=n_jobs,
        n_components=n_components,
        perplexity=perplexity,
        early_exaggeration=early_exaggeration,
        learning_rate=learning_rate,
        random_state=random_state,
        verbose=1,
        init=init,
        n_iter=n_iter,
        n_iter_early_exag=n_iter_early_exag,
    )
    X_tsne = tsne.fit_transform(X)
    logger.info("Final error = {}".format(tsne.kl_divergence_))
    return X_tsne


def calc_fitsne(
    X,
    nthreads,
    no_dims,
    perplexity,
    early_exag_coeff,
    learning_rate,
    rand_seed,
    initialization=None,
    max_iter=1000,
    stop_early_exag_iter=250,
    mom_switch_iter=250,
):
    """
    TODO: Typing
    """
    # FItSNE will change X content

    # Check if fftw3 is installed.
    import ctypes.util

    fftw3_loc = ctypes.util.find_library("fftw3")
    if fftw3_loc is None:
        raise Exception("Please install 'fftw3' first to use the FIt-SNE feature!")

    from fitsne import FItSNE

    return FItSNE(
        X.astype("float64"),
        nthreads=nthreads,
        no_dims=no_dims,
        perplexity=perplexity,
        early_exag_coeff=early_exag_coeff,
        learning_rate=learning_rate,
        rand_seed=rand_seed,
        initialization=initialization,
        max_iter=max_iter,
        stop_early_exag_iter=stop_early_exag_iter,
        mom_switch_iter=mom_switch_iter,
    )


# Running umap using our own kNN indices
def calc_umap(
    X,
    n_components,
    n_neighbors,
    min_dist,
    spread,
    random_state,
    init="spectral",
    n_epochs=None,
    learning_rate=1.0,
    knn_indices=None,
    knn_dists=None,
):
    """
    TODO: Typing
    """
    umap_obj = umap_module.UMAP(
        n_components=n_components,
        n_neighbors=n_neighbors,
        min_dist=min_dist,
        spread=spread,
        random_state=random_state,
        init=init,
        n_epochs=n_epochs,
        learning_rate=learning_rate,
        verbose=True,
    )

    embedding = None
    if X.shape[0] < 4096 or knn_indices is None:
        embedding = umap_obj.fit_transform(X)
        logger.info("using umap kNN graph {}".format(X.shape[0]))
    else:
        assert knn_dists is not None
        # preprocessing codes adopted from UMAP's umap_.py fit function in order to use our own kNN graphs
        from sklearn.utils import check_random_state, check_array

        X = check_array(X, dtype=np.float32, accept_sparse="csr")
        umap_obj._raw_data = X
        if umap_obj.a is None or umap_obj.b is None:
            umap_obj._a, umap_obj._b = umap_module.umap_.find_ab_params(
                umap_obj.spread, umap_obj.min_dist
            )
        else:
            umap_obj._a = umap_obj.a
            umap_obj._b = umap_obj.b
        umap_obj._metric_kwds = (
            umap_obj.metric_kwds if umap_obj.metric_kwds is not None else {}
        )
        umap_obj._target_metric_kwds = {}
        _init = (
            check_array(umap_obj.init, dtype=np.float32, accept_sparse=False)
            if isinstance(umap_obj.init, np.ndarray)
            else umap_obj.init
        )
        umap_obj._initial_alpha = umap_obj.learning_rate
        umap_obj._validate_parameters()

        if umap_obj.verbose:
            logger.info(str(umap_obj))

        if scipy.sparse.isspmatrix_csr(X):
            if not X.has_sorted_indices:
                X.sort_indices()
            umap_obj._sparse_data = True
        else:
            umap_obj._sparse_data = False

        _random_state = check_random_state(umap_obj.random_state)

        if umap_obj.verbose:
            logger.info("Construct fuzzy simplicial set")

        umap_obj._small_data = False
        umap_obj.graph_, umap_obj._sigmas, umap_obj._rhos = umap_module.umap_.fuzzy_simplicial_set(
            X=X,
            n_neighbors=umap_obj.n_neighbors,
            random_state=_random_state,
            metric=umap_obj.metric,
            metric_kwds=umap_obj._metric_kwds,
            knn_indices=knn_indices,
            knn_dists=knn_dists,
            angular=umap_obj.angular_rp_forest,
            set_op_mix_ratio=umap_obj.set_op_mix_ratio,
            local_connectivity=umap_obj.local_connectivity,
            verbose=umap_obj.verbose,
        )

        _n_epochs = umap_obj.n_epochs if umap_obj.n_epochs is not None else 0
        if umap_obj.verbose:
            logger.info("Construct embedding")
        embedding = umap_module.umap_.simplicial_set_embedding(
            data=X,
            graph=umap_obj.graph_,
            n_components=umap_obj.n_components,
            initial_alpha=umap_obj._initial_alpha,
            a=umap_obj._a,
            b=umap_obj._b,
            gamma=umap_obj.repulsion_strength,
            negative_sample_rate=umap_obj.negative_sample_rate,
            n_epochs=_n_epochs,
            init=_init,
            random_state=_random_state,
            metric=umap_obj.metric,
            metric_kwds=umap_obj._metric_kwds,
            verbose=umap_obj.verbose,
        )

    return embedding


def calc_force_directed_layout(
    W,
    file_name,
    n_jobs,
    target_change_per_node,
    target_steps,
    is3d,
    memory,
    random_state,
    init=None,
):
    """
    TODO: Typing
    """
    G = construct_graph(W)
    return fa2.forceatlas2(
        file_name,
        graph=G,
        n_jobs=n_jobs,
        target_change_per_node=target_change_per_node,
        target_steps=target_steps,
        is3d=is3d,
        memory=memory,
        random_state=random_state,
        init=init,
    )


[docs]@timer(logger=logger) def tsne( data: MultimodalData, rep: str = "pca", n_jobs: int = -1, n_components: int = 2, perplexity: float = 30, early_exaggeration: int = 12, learning_rate: float = 1000, random_state: int = 0, out_basis: str = "tsne", ) -> None: """Calculate tSNE embedding of cells. This function uses MulticoreTSNE_ package. See [Maaten08]_ for details on tSNE. .. _MulticoreTSNE: https://github.com/DmitryUlyanov/Multicore-TSNE Parameters ---------- data: ``pegasusio.MultimodalData`` Annotated data matrix with rows for cells and columns for genes. rep: ``str``, optional, default: ``"pca"`` Representation of data used for the calculation. By default, use PCA coordinates. If ``None``, use the count matrix ``data.X``. n_jobs: ``int``, optional, default: ``-1`` Number of threads to use. If ``-1``, use all available threads. n_components: ``int``, optional, default: ``2`` Dimension of calculated tSNE coordinates. By default, generate 2-dimensional data for 2D visualization. perplexity: ``float``, optional, default: ``30`` The perplexity is related to the number of nearest neighbors used in other manifold learning algorithms. Larger datasets usually require a larger perplexity. early_exaggeration: ``int``, optional, default: ``12`` Controls how tight natural clusters in the original space are in the embedded space, and how much space will be between them. learning_rate: ``float``, optional, default: ``1000`` The learning rate can be a critical parameter, which should be between 100 and 1000. random_state: ``int``, optional, default: ``0`` Random seed set for reproducing results. out_basis: ``str``, optional, default: ``"tsne"`` Key name for calculated tSNE coordinates to store. Returns ------- ``None`` Update ``data.obsm``: * ``data.obsm['X_' + out_basis]``: tSNE coordinates of the data. Examples -------- >>> pg.tsne(data) """ rep = update_rep(rep) n_jobs = effective_n_jobs(n_jobs) data.obsm["X_" + out_basis] = calc_tsne( X_from_rep(data, rep), n_jobs, n_components, perplexity, early_exaggeration, learning_rate, random_state, )
[docs]@timer(logger=logger) def fitsne( data: MultimodalData, rep: str = "pca", n_jobs: int = -1, n_components: int = 2, perplexity: float = 30, early_exaggeration: int = 12, learning_rate: float = 1000, random_state: int = 0, out_basis: str = "fitsne", ) -> None: """Calculate FIt-SNE embedding of cells. This function uses fitsne_ package. See [Linderman19]_ for details on FIt-SNE. .. _fitsne: https://github.com/KlugerLab/FIt-SNE Parameters ---------- data: ``pegasusio.MultimodalData`` Annotated data matrix with rows for cells and columns for genes. rep: ``str``, optional, default: ``"pca"`` Representation of data used for the calculation. By default, use PCA coordinates. If ``None``, use the count matrix ``data.X``. n_jobs: ``int``, optional, default: ``-1`` Number of threads to use. If ``-1``, use all available threads. n_components: ``int``, optional, default: ``2`` Dimension of calculated FI-tSNE coordinates. By default, generate 2-dimensional data for 2D visualization. perplexity: ``float``, optional, default: ``30`` The perplexity is related to the number of nearest neighbors used in other manifold learning algorithms. Larger datasets usually require a larger perplexity. early_exaggeration: ``int``, optional, default: ``12`` Controls how tight natural clusters in the original space are in the embedded space, and how much space will be between them. learning_rate: ``float``, optional, default: ``1000`` The learning rate can be a critical parameter, which should be between 100 and 1000. random_state: ``int``, optional, default: ``0`` Random seed set for reproducing results. out_basis: ``str``, optional, default: ``"fitsne"`` Key name for calculated FI-tSNE coordinates to store. Returns ------- ``None`` Update ``data.obsm``: * ``data.obsm['X_' + out_basis]``: FI-tSNE coordinates of the data. Examples -------- >>> pg.fitsne(data) """ rep = update_rep(rep) n_jobs = effective_n_jobs(n_jobs) data.obsm["X_" + out_basis] = calc_fitsne( X_from_rep(data, rep), n_jobs, n_components, perplexity, early_exaggeration, learning_rate, random_state, )
[docs]@timer(logger=logger) def umap( data: MultimodalData, rep: str = "pca", n_components: int = 2, n_neighbors: int = 15, min_dist: float = 0.5, spread: float = 1.0, random_state: int = 0, out_basis: str = "umap", ) -> None: """Calculate UMAP embedding of cells. This function uses umap-learn_ package. See [McInnes18]_ for details on UMAP. .. _umap-learn: https://github.com/lmcinnes/umap Parameters ---------- data: ``pegasusio.MultimodalData`` Annotated data matrix with rows for cells and columns for genes. rep: ``str``, optional, default: ``"pca"`` Representation of data used for the calculation. By default, use PCA coordinates. If ``None``, use the count matrix ``data.X``. n_components: ``int``, optional, default: ``2`` Dimension of calculated UMAP coordinates. By default, generate 2-dimensional data for 2D visualization. n_neighbors: ``int``, optional, default: ``15`` Number of nearest neighbors considered during the computation. min_dist: ``float``, optional, default: ``0.5`` The effective minimum distance between embedded data points. spread: ``float``, optional, default: ``1.0`` The effective scale of embedded data points. random_state: ``int``, optional, default: ``0`` Random seed set for reproducing results. out_basis: ``str``, optional, default: ``"umap"`` Key name for calculated UMAP coordinates to store. Returns ------- ``None`` Update ``data.obsm``: * ``data.obsm['X_' + out_basis]``: UMAP coordinates of the data. Examples -------- >>> pg.umap(data) """ start = time.time() rep = update_rep(rep) indices_key = rep + "_knn_indices" distances_key = rep + "_knn_distances" X = X_from_rep(data, rep) if not knn_is_cached(data, indices_key, distances_key, n_neighbors): raise ValueError("Please run neighbors first!") knn_indices = np.insert( data.uns[indices_key][:, 0 : n_neighbors - 1], 0, range(data.shape[0]), axis=1 ) knn_dists = np.insert( data.uns[distances_key][:, 0 : n_neighbors - 1], 0, 0.0, axis=1 ) data.obsm["X_" + out_basis] = calc_umap( X, n_components, n_neighbors, min_dist, spread, random_state, knn_indices=knn_indices, knn_dists=knn_dists, ) end = time.time() logger.info("UMAP is calculated. Time spent = {:.2f}s.".format(end - start))
[docs]@timer(logger=logger) def fle( data: MultimodalData, file_name: str = None, n_jobs: int = -1, rep: str = "diffmap", K: int = 50, full_speed: bool = False, target_change_per_node: float = 2.0, target_steps: int = 5000, is3d: bool = False, memory: int = 8, random_state: int = 0, out_basis: str = "fle", ) -> None: """Construct the Force-directed (FLE) graph. This implementation uses forceatlas2-python_ package, which is a Python wrapper of ForceAtlas2_. See [Jacomy14]_ for details on FLE. .. _forceatlas2-python: https://github.com/klarman-cell-observatory/forceatlas2-python .. _ForceAtlas2: https://github.com/klarman-cell-observatory/forceatlas2 Parameters ---------- data: ``pegasusio.MultimodalData`` Annotated data matrix with rows for cells and columns for genes. file_name: ``str``, optional, default: ``None`` Temporary file to store the coordinates as the input to forceatlas2. If ``None``, use ``tempfile.mkstemp`` to generate file name. n_jobs: ``int``, optional, default: ``-1`` Number of threads to use. If ``-1``, use all available threads. rep: ``str``, optional, default: ``"diffmap"`` Representation of data used for the calculation. By default, use Diffusion Map coordinates. If ``None``, use the count matrix ``data.X``. K: ``int``, optional, default: ``50`` Number of nearest neighbors to be considered during the computation. full_speed: ``bool``, optional, default: ``False`` * If ``True``, use multiple threads in constructing ``hnsw`` index. However, the kNN results are not reproducible. * Otherwise, use only one thread to make sure results are reproducible. target_change_per_node: ``float``, optional, default: ``2.0`` Target change per node to stop ForceAtlas2. target_steps: ``int``, optional, default: ``5000`` Maximum number of iterations before stopping the ForceAtlas2 algorithm. is3d: ``bool``, optional, default: ``False`` If ``True``, calculate 3D force-directed layout. memory: ``int``, optional, default: ``8`` Memory size in GB for the Java FA2 component. By default, use 8GB memory. random_state: ``int``, optional, default: ``0`` Random seed set for reproducing results. out_basis: ``str``, optional, default: ``"fle"`` Key name for calculated FLE coordinates to store. Returns ------- ``None`` Update ``data.obsm``: * ``data.obsm['X_' + out_basis]``: FLE coordinates of the data. Examples -------- >>> pg.fle(data) """ if file_name is None: import tempfile _, file_name = tempfile.mkstemp() n_jobs = effective_n_jobs(n_jobs) rep = update_rep(rep) if ("W_" + rep) not in data.uns: neighbors( data, K=K, rep=rep, n_jobs=n_jobs, random_state=random_state, full_speed=full_speed, ) data.obsm["X_" + out_basis] = calc_force_directed_layout( W_from_rep(data, rep), file_name, n_jobs, target_change_per_node, target_steps, is3d, memory, random_state, )
@timer(logger=logger) def select_cells(distances, frac, K=25, alpha=1.0, random_state=0): """ TODO: documentation (not user API) """ nsample = distances.shape[0] if K > distances.shape[1]: logger.info( "Warning: in select_cells, K = {} > the number of calculated nearest neighbors!\nSet K to {}".format( K, distances.shape[1] ) ) K = distances.shape[1] probs = np.zeros(nsample) if alpha == 0.0: probs[:] = 1.0 # uniform elif alpha == 1.0: probs[:] = distances[:, K - 1] else: probs[:] = distances[:, K - 1] ** alpha probs /= probs.sum() np.random.seed(random_state) selected = np.zeros(nsample, dtype=bool) selected[ np.random.choice(nsample, size=int(nsample * frac), replace=False, p=probs) ] = True return selected
[docs]@timer(logger=logger) def net_tsne( data: MultimodalData, rep: str = "pca", n_jobs: int = -1, n_components: int = 2, perplexity: float = 30, early_exaggeration: int = 12, learning_rate: float = 1000, random_state: int = 0, select_frac: float = 0.1, select_K: int = 25, select_alpha: float = 1.0, net_alpha: float = 0.1, polish_learning_frac: float = 0.33, polish_n_iter: int = 150, out_basis: str = "net_tsne", ) -> None: """Calculate Net-tSNE embedding of cells. Net-tSNE is an approximated tSNE embedding using Deep Learning model to improve the calculation speed. In specific, the deep model used is MLPRegressor_, the *scikit-learn* implementation of Multi-layer Perceptron regressor. See [Li20]_ for details. .. _MLPRegressor: https://scikit-learn.org/stable/modules/generated/sklearn.neural_network.MLPRegressor.html Parameters ---------- data: ``pegasusio.MultimodalData`` Annotated data matrix with rows for cells (``n_obs``) and columns for genes (``n_feature``). rep: ``str``, optional, default: ``"pca"`` Representation of data used for the calculation. By default, use PCA coordinates. If ``None``, use the count matrix ``data.X``. n_jobs: ``int``, optional, default: ``-1`` Number of threads to use. If ``-1``, use all available threads. n_components: ``int``, optional, default: ``2`` Dimension of calculated tSNE coordinates. By default, generate 2-dimensional data for 2D visualization. perplexity: ``float``, optional, default: ``30`` The perplexity is related to the number of nearest neighbors used in other manifold learning algorithms. Larger datasets usually require a larger perplexity. early_exaggeration: ``int``, optional, default: ``12`` Controls how tight natural clusters in the original space are in the embedded space, and how much space will be between them. learning_rate: ``float``, optional, default: ``1000`` The learning rate can be a critical parameter, which should be between 100 and 1000. random_state: ``int``, optional, default: ``0`` Random seed set for reproducing results. select_frac: ``float``, optional, default: ``0.1`` Down sampling fraction on the cells. select_K: ``int``, optional, default: ``25`` Number of neighbors to be used to estimate local density for each data point for down sampling. select_alpha: ``float``, optional, default: ``1.0`` Weight the down sample to be proportional to ``radius ** select_alpha``. net_alpha: ``float``, optional, default: ``0.1`` L2 penalty (regularization term) parameter of the deep regressor. polish_learning_frac: ``float``, optional, default: ``0.33`` After running the deep regressor to predict new coordinates, use ``polish_learning_frac`` * ``n_obs`` as the learning rate to polish the coordinates. polish_n_iter: ``int``, optional, default: ``150`` Number of iterations for polishing tSNE run. out_basis: ``str``, optional, default: ``"net_tsne"`` Key name for the approximated tSNE coordinates calculated. Returns ------- ``None`` Update ``data.obsm``: * ``data.obsm['X_' + out_basis]``: Net tSNE coordinates of the data. Update ``data.obs``: * ``data.obs['ds_selected']``: Boolean array to indicate which cells are selected during the down sampling phase. Examples -------- >>> pg.net_tsne(data) """ rep = update_rep(rep) indices_key = rep + "_knn_indices" distances_key = rep + "_knn_distances" if not knn_is_cached(data, indices_key, distances_key, select_K): raise ValueError("Please run neighbors first!") n_jobs = effective_n_jobs(n_jobs) selected = select_cells( data.uns[distances_key], select_frac, K=select_K, alpha=select_alpha, random_state=random_state, ) X_full = X_from_rep(data, rep) X = X_full[selected, :] X_tsne = calc_tsne( X, n_jobs, n_components, perplexity, early_exaggeration, learning_rate, random_state, ) data.uns["X_" + out_basis + "_small"] = X_tsne data.obs["ds_selected"] = selected Y_init = np.zeros((data.shape[0], n_components), dtype=np.float64) Y_init[selected, :] = X_tsne Y_init[~selected, :] = net_train_and_predict( X, X_tsne, X_full[~selected, :], net_alpha, random_state, verbose=True ) data.obsm["X_" + out_basis + "_pred"] = Y_init polish_learning_rate = polish_learning_frac * data.shape[0] data.obsm["X_" + out_basis] = calc_tsne( X_full, n_jobs, n_components, perplexity, early_exaggeration, polish_learning_rate, random_state, init=Y_init, n_iter=polish_n_iter, n_iter_early_exag=0, )
[docs]@timer(logger=logger) def net_umap( data: MultimodalData, rep: str = "pca", n_jobs: int = -1, n_components: int = 2, n_neighbors: int = 15, min_dist: float = 0.5, spread: float = 1.0, random_state: int = 0, select_frac: float = 0.1, select_K: int = 25, select_alpha: float = 1.0, full_speed: bool = False, net_alpha: float = 0.1, polish_learning_rate: float = 10.0, polish_n_epochs: int = 30, out_basis: str = "net_umap", ) -> None: """Calculate Net-UMAP embedding of cells. Net-UMAP is an approximated UMAP embedding using Deep Learning model to improve the speed. In specific, the deep model used is MLPRegressor_, the *scikit-learn* implementation of Multi-layer Perceptron regressor. See [Li20]_ for details. .. _MLPRegressor: https://scikit-learn.org/stable/modules/generated/sklearn.neural_network.MLPRegressor.html Parameters ---------- data: ``pegasusio.MultimodalData`` Annotated data matrix with rows for cells and columns for genes. rep: ``str``, optional, default: ``"pca"`` Representation of data used for the calculation. By default, use PCA coordinates. If ``None``, use the count matrix ``data.X``. n_components: ``int``, optional, default: ``2`` Dimension of calculated UMAP coordinates. By default, generate 2-dimensional data for 2D visualization. n_neighbors: ``int``, optional, default: ``15`` Number of nearest neighbors considered during the computation. min_dist: ``float``, optional, default: ``0.5`` The effective minimum distance between embedded data points. spread: ``float``, optional, default: ``1.0`` The effective scale of embedded data points. random_state: ``int``, optional, default: ``0`` Random seed set for reproducing results. select_frac: ``float``, optional, default: ``0.1`` Down sampling fraction on the cells. select_K: ``int``, optional, default: ``25`` Number of neighbors to be used to estimate local density for each data point for down sampling. select_alpha: ``float``, optional, default: ``1.0`` Weight the down sample to be proportional to ``radius ** select_alpha``. full_speed: ``bool``, optional, default: ``False`` * If ``True``, use multiple threads in constructing ``hnsw`` index. However, the kNN results are not reproducible. * Otherwise, use only one thread to make sure results are reproducible. net_alpha: ``float``, optional, default: ``0.1`` L2 penalty (regularization term) parameter of the deep regressor. polish_learning_frac: ``float``, optional, default: ``10.0`` After running the deep regressor to predict new coordinates, use ``polish_learning_frac`` * ``n_obs`` as the learning rate to polish the coordinates. polish_n_iter: ``int``, optional, default: ``30`` Number of iterations for polishing UMAP run. out_basis: ``str``, optional, default: ``"net_umap"`` Key name for calculated UMAP coordinates to store. Returns ------- ``None`` Update ``data.obsm``: * ``data.obsm['X_' + out_basis]``: Net UMAP coordinates of the data. Update ``data.obs``: * ``data.obs['ds_selected']``: Boolean array to indicate which cells are selected during the down sampling phase. Examples -------- >>> pg.net_umap(data) """ rep = update_rep(rep) indices_key = rep + "_knn_indices" distances_key = rep + "_knn_distances" if not knn_is_cached(data, indices_key, distances_key, select_K): raise ValueError("Please run neighbors first!") n_jobs = effective_n_jobs(n_jobs) selected = select_cells( data.uns[distances_key], select_frac, K=select_K, alpha=select_alpha, random_state=random_state, ) X_full = X_from_rep(data, rep) X = X_full[selected, :] ds_indices_key = "ds_" + rep + "_knn_indices" # ds refers to down-sampling ds_distances_key = "ds_" + rep + "_knn_distances" indices, distances = calculate_nearest_neighbors( X, K=n_neighbors, n_jobs=n_jobs, random_state=random_state, full_speed=full_speed, ) data.uns[ds_indices_key] = indices data.uns[ds_distances_key] = distances knn_indices = np.insert( data.uns[ds_indices_key][:, 0 : n_neighbors - 1], 0, range(X.shape[0]), axis=1 ) knn_dists = np.insert( data.uns[ds_distances_key][:, 0 : n_neighbors - 1], 0, 0.0, axis=1 ) X_umap = calc_umap( X, n_components, n_neighbors, min_dist, spread, random_state, knn_indices=knn_indices, knn_dists=knn_dists, ) data.uns["X_" + out_basis + "_small"] = X_umap data.obs["ds_selected"] = selected Y_init = np.zeros((data.shape[0], n_components), dtype=np.float64) Y_init[selected, :] = X_umap Y_init[~selected, :] = net_train_and_predict( X, X_umap, X_full[~selected, :], net_alpha, random_state, verbose=True ) data.obsm["X_" + out_basis + "_pred"] = Y_init knn_indices = np.insert( data.uns[indices_key][:, 0 : n_neighbors - 1], 0, range(data.shape[0]), axis=1 ) knn_dists = np.insert( data.uns[distances_key][:, 0 : n_neighbors - 1], 0, 0.0, axis=1 ) data.obsm["X_" + out_basis] = calc_umap( X_full, n_components, n_neighbors, min_dist, spread, random_state, init=Y_init, n_epochs=polish_n_epochs, learning_rate=polish_learning_rate, knn_indices=knn_indices, knn_dists=knn_dists, )
[docs]@timer(logger=logger) def net_fle( data: MultimodalData, file_name: str = None, n_jobs: int = -1, rep: str = "diffmap", K: int = 50, full_speed: bool = False, target_change_per_node: float = 2.0, target_steps: int = 5000, is3d: bool = False, memory: int = 8, random_state: int = 0, select_frac: float = 0.1, select_K: int = 25, select_alpha: float = 1.0, net_alpha: float = 0.1, polish_target_steps: int = 1500, out_basis: str = "net_fle", ) -> None: """Construct Net-Force-directed (FLE) graph. Net-FLE is an approximated FLE graph using Deep Learning model to improve the speed. In specific, the deep model used is MLPRegressor_, the *scikit-learn* implementation of Multi-layer Perceptron regressor. See [Li20]_ for details. .. _MLPRegressor: https://scikit-learn.org/stable/modules/generated/sklearn.neural_network.MLPRegressor.html Parameters ---------- data: ``pegasusio.MultimodalData`` Annotated data matrix with rows for cells and columns for genes. file_name: ``str``, optional, default: ``None`` Temporary file to store the coordinates as the input to forceatlas2. If ``None``, use ``tempfile.mkstemp`` to generate file name. n_jobs: ``int``, optional, default: ``-1`` Number of threads to use. If ``-1``, use all available threads. rep: ``str``, optional, default: ``"diffmap"`` Representation of data used for the calculation. By default, use Diffusion Map coordinates. If ``None``, use the count matrix ``data.X``. K: ``int``, optional, default: ``50`` Number of nearest neighbors to be considered during the computation. full_speed: ``bool``, optional, default: ``False`` * If ``True``, use multiple threads in constructing ``hnsw`` index. However, the kNN results are not reproducible. * Otherwise, use only one thread to make sure results are reproducible. target_change_per_node: ``float``, optional, default: ``2.0`` Target change per node to stop ForceAtlas2. target_steps: ``int``, optional, default: ``5000`` Maximum number of iterations before stopping the ForceAtlas2 algorithm. is3d: ``bool``, optional, default: ``False`` If ``True``, calculate 3D force-directed layout. memory: ``int``, optional, default: ``8`` Memory size in GB for the Java FA2 component. By default, use 8GB memory. random_state: ``int``, optional, default: ``0`` Random seed set for reproducing results. select_frac: ``float``, optional, default: ``0.1`` Down sampling fraction on the cells. select_K: ``int``, optional, default: ``25`` Number of neighbors to be used to estimate local density for each data point for down sampling. select_alpha: ``float``, optional, default: ``1.0`` Weight the down sample to be proportional to ``radius ** select_alpha``. net_alpha: ``float``, optional, default: ``0.1`` L2 penalty (regularization term) parameter of the deep regressor. polish_target_steps: ``int``, optional, default: ``1500`` After running the deep regressor to predict new coordinate, Number of ForceAtlas2 iterations. out_basis: ``str``, optional, default: ``"net_fle"`` Key name for calculated FLE coordinates to store. Returns ------- ``None`` Update ``data.obsm``: * ``data.obsm['X_' + out_basis]``: Net FLE coordinates of the data. Update ``data.obs``: * ``data.obs['ds_selected']``: Boolean array to indicate which cells are selected during the down sampling phase. Examples -------- >>> pg.net_fle(data) """ if file_name is None: if file_name is None: import tempfile _, file_name = tempfile.mkstemp() n_jobs = effective_n_jobs(n_jobs) rep = update_rep(rep) if ("W_" + rep) not in data.uns: neighbors( data, K=K, rep=rep, n_jobs=n_jobs, random_state=random_state, full_speed=full_speed, ) indices_key = rep + "_knn_indices" distances_key = rep + "_knn_distances" if not knn_is_cached(data, indices_key, distances_key, select_K): raise ValueError("Please run neighbors first!") selected = select_cells( data.uns[distances_key], select_frac, K=select_K, alpha=select_alpha, random_state=random_state, ) X_full = X_from_rep(data, rep) X = X_full[selected, :] ds_indices_key = "ds_" + rep + "_knn_indices" ds_distances_key = "ds_" + rep + "_knn_distances" indices, distances = calculate_nearest_neighbors( X, K=K, n_jobs=n_jobs, random_state=random_state, full_speed=full_speed ) data.uns[ds_indices_key] = indices data.uns[ds_distances_key] = distances W = calculate_affinity_matrix(indices, distances) X_fle = calc_force_directed_layout( W, file_name + ".small", n_jobs, target_change_per_node, target_steps, is3d, memory, random_state, ) data.uns["X_" + out_basis + "_small"] = X_fle data.obs["ds_diffmap_selected"] = selected n_components = 2 if not is3d else 3 Y_init = np.zeros((data.shape[0], n_components), dtype=np.float64) Y_init[selected, :] = X_fle Y_init[~selected, :] = net_train_and_predict( X, X_fle, X_full[~selected, :], net_alpha, random_state, verbose=True ) data.obsm["X_" + out_basis + "_pred"] = Y_init data.obsm["X_" + out_basis] = calc_force_directed_layout( W_from_rep(data, rep), file_name, n_jobs, target_change_per_node, polish_target_steps, is3d, memory, random_state, init=Y_init, )