Source code for pegasus.tools.scvitools

import numpy as np
import pandas as pd
from scipy.sparse import csr_matrix

import numba
from numba import njit

from typing import List, Optional, Union
from pegasusio import UnimodalData, MultimodalData
from pegasus.tools import eff_n_jobs

import anndata

import logging
logger = logging.getLogger(__name__)

from pegasusio import timer

def _gen_anndata(
    data: Union[MultimodalData, UnimodalData],
    features: str = "highly_variable_features",
    obs_columns: Optional[List[str]] = None,
    matkey: str = "counts",
) -> anndata.AnnData:
    """ Generate a new Anndata object for scvitools

    Parameters
    ----------
    data: ``pegasusio.MultimodalData``
        Annotated data matrix with rows for cells and columns for genes.
    features: ``str``, optional, default: ``"highly_variable_features"``
        Keyword in ``data.var``, which refers to a boolean array. If ``None``, all features will be selected.
    obs_columns: ``List[str]``
        A list of obs keys that should be included in the new anndata.
    matkey: ``str``, optional, default: ``"counts"``
        Matrix key for the raw count

    Returns
    -------
    An AnnData object.
    """
    mat = data.get_matrix(matkey)
    if obs_columns is not None and obs_columns:
        obs_field = data.obs[obs_columns]
    else:
        obs_field = data.obs
    if features is not None:
        assert features in data.var
        idx = data.var[features].values
        X = mat[:, idx]
        var_field = data.var.loc[idx, []]
    else:
        X = mat
        var_field = data.var[[]]

    return anndata.AnnData(X = X, obs = obs_field, var = var_field)


@njit(fastmath=True, cache=True)
def _select_csr(data, indices, indptr, indexer, new_size):
    data_new = np.zeros_like(data[0:new_size])
    indices_new = np.zeros_like(indices[0:new_size])
    indptr_new = np.zeros_like(indptr)

    cnt = 0
    for i in range(indptr.size - 1):
        indptr_new[i] = cnt
        for j in range(indptr[i], indptr[i + 1]):
            new_idx = indexer[indices[j]]
            if new_idx >= 0:
                data_new[cnt] = data[j]
                indices_new[cnt] = new_idx
                cnt += 1
    indptr_new[indptr.size - 1] = cnt

    return data_new, indices_new, indptr_new


def _gen_query_anndata(
    data: Union[MultimodalData, UnimodalData],
    ref_features: pd.Index,
    obs_columns: Optional[List[str]] = None,
    matkey: str = "counts",
) -> anndata.AnnData:
    """ Generate a new query Anndata object for scvitools

    Parameters
    ----------
    data: ``pegasusio.MultimodalData``
        Annotated data matrix with rows for cells and columns for genes.
    ref_features: ``pd.Index``
        A pandas index of reference feature names
    obs_columns: ``List[str]``
        A list of obs keys that should be included in the new anndata.
    matkey: ``str``, optional, default: ``"counts"``
        Matrix key for the raw count

    Returns
    -------
    An AnnData object.
    """
    mat = data.get_matrix(matkey)
    if obs_columns is not None and obs_columns:
        obs_field = data.obs[obs_columns]
    else:
        obs_field = data.obs
    var_field = pd.DataFrame(index = ref_features)

    indexer = ref_features.get_indexer(data.var_names)
    new_size = (indexer[mat.indices]>=0).sum()
    data_new, indices_new, indptr_new = _select_csr(mat.data, mat.indices, mat.indptr, indexer, new_size)
    X = csr_matrix((data_new, indices_new, indptr_new), shape = (mat.shape[0], ref_features.size))
    X.sort_indices()

    return anndata.AnnData(X = X, obs = obs_field, var = var_field)


[docs]@timer(logger=logger) def run_scvi( data: Union[MultimodalData, UnimodalData], features: str = "highly_variable_features", matkey: str = "counts", n_jobs: int = -1, random_state: int = 0, max_epochs: Union[int, None] = None, batch: Optional[str] = None, categorical_covariate_keys: Optional[List[str]] = None, continuous_covariate_keys: Optional[List[str]] = None, use_gpu: Union[str, int, bool, None] = None, ) -> str: """Run scVI embedding. This is a wrapper of `scvitools <https://github.com/scverse/scvi-tools>`_ package. Parameters ---------- data: ``MultimodalData``. Annotated data matrix with rows for cells and columns for genes. features: ``str``, optional, default: ``"highly_variable_features"`` Keyword in ``data.var``, which refers to a boolean array. If ``None``, all features will be selected. matkey: ``str``, optional, default: ``"counts"`` Matrix key for the raw count n_jobs : ``int``, optional, default: ``-1``. Number of threads to use. ``-1`` refers to using all physical CPU cores. random_state: ``int``, optional, default: ``0``. Seed for random number generator max_epochs: ``int | None``, optional, default: ``None``. Maximum number of training epochs. Defaults to np.min([round((20000 / n_cells) * 400), 400]) batch: ``str``, optional, default: ``None``. If only one categorical covariate, the obs key representing batches that should be corrected for, default is ``None``. categorical_covariate_keys: ``List[str]`` If multiple categorical covariates, a list of obs keys listing categorical covariates that should be corrected for, default is ``None``. continuous_covariate_keys: ``List[str]`` A list of obs keys listing continuous covariates that should be corrected for, default is ``None``. use_gpu: ``str | int | bool | None`` Use default GPU if available (if None or True), or index of GPU to use (if int), or name of GPU (if str, e.g., ``cuda:0``), or use CPU (if False). Returns ------- out_rep: ``str`` The keyword in ``data.obsm`` referring to the embedding calculated by integrative NMF algorithm. out_rep is always equal to "scVI" Update ``data.obsm``: * ``data.obsm['X_scVI']``: The embedding calculated by scVI. Examples -------- >>> pg.run_scvi(data, batch="Channel") >>> pg.run_scvi(data, categorical_covariate_keys=["cell_source", "donor"], continuous_covariate_keys=["percent_mito", "percent_ribo"]) """ try: import scvi except ImportError as e: import sys logger.error(f"{e}\nscvi-tools needed! Try 'pip install scvi-tools'.") sys.exit(-1) logger.info("Start embedding using scVI.") obs_columns = [] if batch is not None and batch: obs_columns.append(batch) if categorical_covariate_keys is not None and categorical_covariate_keys: obs_columns.extend(categorical_covariate_keys) if continuous_covariate_keys is not None and continuous_covariate_keys: obs_columns.extend(continuous_covariate_keys) adata = _gen_anndata(data, features, obs_columns, matkey) # gen AnnData scvi.settings.num_threads = eff_n_jobs(n_jobs) # set n_jobs scvi.settings.seed = random_state # set random_state, see [here](https://docs.scvi-tools.org/en/stable/_modules/scvi/_settings.html) for more details. print(max_epochs) if max_epochs is None: max_epochs = np.min([round((20000 / len(adata.obs)) * 400), 400]) print(type(max_epochs)) print(max_epochs) scvi.model.SCVI.setup_anndata(adata, batch_key=batch, categorical_covariate_keys=categorical_covariate_keys, continuous_covariate_keys=continuous_covariate_keys, ) # register anndata model = scvi.model.SCVI(adata) # obtain scvi model model.train(max_epochs=max_epochs, use_gpu=use_gpu) # train model data.obsm["X_scVI"] = model.get_latent_representation() # Get embedding data.register_attr("X_scVI", "embedding") # Register X_scVI as an embedding return "scVI"
[docs]@timer(logger=logger) def train_scarches_scanvi( data: Union[MultimodalData, UnimodalData], dir_path: str, label: str, unlabeled_category: str = "Unknown", features: str = "highly_variable_features", matkey: str = "counts", n_jobs: int = -1, random_state: int = 0, max_epochs: Union[int, None] = None, batch: Optional[str] = None, categorical_covariate_keys: Optional[List[str]] = None, continuous_covariate_keys: Optional[List[str]] = None, semisupervised_max_epochs: Union[int, None] = None, n_samples_per_label: Optional[int] = None, use_gpu: Union[str, int, bool, None] = None, arches_params: dict = dict( use_layer_norm="both", use_batch_norm="none", encode_covariates=True, dropout_rate=0.2, n_layers=2, ), ) -> None: """Run scArches training. This is a wrapper of `scvitools <https://github.com/scverse/scvi-tools>`_ package. Parameters ---------- data: ``MultimodalData``. Annotated data matrix with rows for cells and columns for genes. dir_path: ``str``. Save the model to this directory. label: ``str``. The obs key representing labels. unlabeled_category: ``str``, default: ``"Unknown"`` Value used for unlabeled cells in ``label``. features: ``str``, optional, default: ``"highly_variable_features"`` Keyword in ``data.var``, which refers to a boolean array. If ``None``, all features will be selected. matkey: ``str``, optional, default: ``"counts"`` Matrix key for the raw count n_jobs : ``int``, optional, default: ``-1``. Number of threads to use. ``-1`` refers to using all physical CPU cores. random_state: ``int``, optional, default: ``0``. Seed for random number generator max_epochs: ``int | None``, optional, default: ``None``. Maximum number of unsupervised training epochs. Defaults to np.min([round((20000 / n_cells) * 400), 400]) batch: ``str``, optional, default: ``None``. If only one categorical covariate, the obs key representing batches that should be corrected for, default is ``None``. categorical_covariate_keys: ``List[str]`` If multiple categorical covariates, a list of obs keys listing categorical covariates that should be corrected for, default is ``None``. continuous_covariate_keys: ``List[str]`` A list of obs keys listing continuous covariates that should be corrected for, default is ``None``. semisupervised_max_epochs: ``int | None``, optional, default: ``None``. Maximum number of semisupervised training epochs. Defaults to np.min([round(np.sqrt(``max_epochs``)), 20]) n_samples_per_label : ``int``, optional, default: ``None``. Number of subsamples for each label class to sample per epoch. By default, there is no label subsampling. use_gpu: ``str | int | bool | None`` Use default GPU if available (if None or True), or index of GPU to use (if int), or name of GPU (if str, e.g., ‘cuda:0’), or use CPU (if False). arches_params: ``dict``. Hyperparameters for VAE. See https://docs.scvi-tools.org/en/stable/api/reference/scvi.module.VAE.html#scvi.module.VAE for more details Returns ------- Update ``data.obsm``: * ``data.obsm['X_scVI']``: The embedding calculated by scVI. * ``data.obsm['X_scanVI']``: The embedding calculated by scanVI. Examples -------- >>> pg.train_scarches_scanvi(data, dir_path="scanvi_model/", label="celltype", matkey="counts", batch="tech", n_samples_per_label=100) """ try: import scvi except ImportError as e: import sys logger.error(f"{e}\nscvi-tools needed! Try 'pip install scvi-tools'.") sys.exit(-1) logger.info("Start training with scArches method.") obs_columns = [label] if batch is not None and batch: obs_columns.append(batch) if categorical_covariate_keys is not None and categorical_covariate_keys: obs_columns.extend(categorical_covariate_keys) if continuous_covariate_keys is not None and continuous_covariate_keys: obs_columns.extend(continuous_covariate_keys) adata = _gen_anndata(data, features, obs_columns, matkey) # gen AnnData scvi.settings.num_threads = eff_n_jobs(n_jobs) # set n_jobs scvi.settings.seed = random_state # set random_state, see [here](https://docs.scvi-tools.org/en/stable/_modules/scvi/_settings.html) for more details. # unsupervised if max_epochs is None: max_epochs = np.min([round((20000 / len(adata.obs)) * 400), 400]) scvi.model.SCVI.setup_anndata(adata, batch_key=batch, categorical_covariate_keys=categorical_covariate_keys, continuous_covariate_keys=continuous_covariate_keys, ) # register anndata vae_ref = scvi.model.SCVI(adata, **arches_params) # obtain scvi model vae_ref.train(max_epochs=max_epochs, use_gpu=use_gpu) # train model data.obsm["X_scVI"] = vae_ref.get_latent_representation() # Get embedding data.register_attr("X_scVI", "embedding") # Register X_scVI as an embedding # semisupervised if semisupervised_max_epochs is None: semisupervised_max_epochs = np.min([round(np.sqrt(max_epochs)), 20]) vae_ref_scan = scvi.model.SCANVI.from_scvi_model( vae_ref, unlabeled_category=unlabeled_category, labels_key=label, ) vae_ref_scan.train( max_epochs=semisupervised_max_epochs, n_samples_per_label=n_samples_per_label, use_gpu=use_gpu, ) vae_ref_scan.save(dir_path, overwrite=True) data.obsm["X_scANVI"] = vae_ref_scan.get_latent_representation() data.register_attr("X_scANVI", "embedding")
[docs]@timer(logger=logger) def predict_scarches_scanvi( data: Union[MultimodalData, UnimodalData], dir_path: str, label: str, predictions: str = "predictions", matkey: str = "counts", n_jobs: int = -1, random_state: int = 0, max_epochs: Union[int, None] = None, batch: Optional[str] = None, categorical_covariate_keys: Optional[List[str]] = None, continuous_covariate_keys: Optional[List[str]] = None, use_gpu: Union[str, int, bool, None] = None, ) -> None: """Run scArches training. This is a wrapper of `scvitools <https://github.com/scverse/scvi-tools>`_ package. Parameters ---------- data: ``MultimodalData``. Annotated data matrix with rows for cells and columns for genes. dir_path: ``str``. Save the model to this directory. label: ``str``. The obs key representing labels. predictions: ``str``, , optional, default: ``"predictions"`` The obs key to store predicted labels. matkey: ``str``, optional, default: ``"counts"`` Matrix key for the raw count n_jobs : ``int``, optional, default: ``-1``. Number of threads to use. ``-1`` refers to using all physical CPU cores. random_state: ``int``, optional, default: ``0``. Seed for random number generator max_epochs: ``int | None``, optional, default: ``None``. Maximum number of training epochs. Defaults to np.min([round((20000 / n_cells) * 100), 100]) batch: ``str``, optional, default: ``None``. If only one categorical covariate, the obs key representing batches that should be corrected for, default is ``None``. categorical_covariate_keys: ``List[str]`` If multiple categorical covariates, a list of obs keys listing categorical covariates that should be corrected for, default is ``None``. continuous_covariate_keys: ``List[str]`` A list of obs keys listing continuous covariates that should be corrected for, default is ``None``. use_gpu: ``str | int | bool | None`` Use default GPU if available (if None or True), or index of GPU to use (if int), or name of GPU (if str, e.g., ‘cuda:0’), or use CPU (if False). Returns ------- Update ``data.obsm``: * ``data.obsm['X_scanVI']``: The embedding calculated by scanVI. * ``data.obsm[predictions]``: The predicted labels by scanVI. Examples -------- >>> pg.predict_scarches_scanvi(data, dir_path="scanvi_model/", label="celltype", matkey="counts", batch="tech") """ try: import scvi except ImportError as e: import sys logger.error(f"{e}\nscvi-tools needed! Try 'pip install scvi-tools'.") sys.exit(-1) logger.info("Start prediction with scArches method.") obs_columns = [label] if batch is not None and batch: obs_columns.append(batch) if categorical_covariate_keys is not None and categorical_covariate_keys: obs_columns.extend(categorical_covariate_keys) if continuous_covariate_keys is not None and continuous_covariate_keys: obs_columns.extend(continuous_covariate_keys) features = scvi.model.SCANVI.prepare_query_anndata(None, dir_path, return_reference_var_names=True) adata = _gen_query_anndata(data, features, obs_columns, matkey) # gen AnnData scvi.settings.num_threads = eff_n_jobs(n_jobs) # set n_jobs scvi.settings.seed = random_state # set random_state, see [here](https://docs.scvi-tools.org/en/stable/_modules/scvi/_settings.html) for more details. if max_epochs is None: max_epochs = np.min([round((20000 / len(adata.obs)) * 100), 100]) vae_q = scvi.model.SCANVI.load_query_data(adata, dir_path) vae_q.train( max_epochs=max_epochs, plan_kwargs=dict(weight_decay=0.0), check_val_every_n_epoch=10, ) data.obsm["X_scANVI"] = vae_q.get_latent_representation() data.register_attr("X_scANVI", "embedding") data.obs[predictions] = vae_q.predict()