import time
import numpy as np
import pandas as pd
from typing import Union, List
from pegasusio import UnimodalData, MultimodalData
from pegasus.tools import select_features, X_from_rep, check_batch_key
import logging
logger = logging.getLogger(__name__)
from pegasusio import timer
[docs]@timer(logger=logger)
def run_harmony(
data: Union[MultimodalData, UnimodalData],
batch: Union[str, List[str]] = "Channel",
rep: str = "pca",
n_comps: int = None,
n_jobs: int = -1,
n_clusters: int = None,
random_state: int = 0,
use_gpu: bool = False,
max_iter_harmony: int = 10,
) -> str:
"""Batch correction on PCs using Harmony.
This is a wrapper of `harmony-pytorch <https://github.com/lilab-bcb/harmony-pytorch>`_ package, which is a Pytorch implementation of Harmony algorithm [Korsunsky19]_.
Parameters
----------
data: ``MultimodalData``.
Annotated data matrix with rows for cells and columns for genes.
batch: ``str`` or ``List[str]``, optional, default: ``"Channel"``.
Which attribute in data.obs field represents batches, default is "Channel". If using multiple attributes, specify their names in a list.
rep: ``str``, optional, default: ``"pca"``.
Which representation to use as input of Harmony, default is PCA.
n_comps: `int`, optional (default: None)
Number of components to be used in the `rep`. If n_comps == None, use all components; otherwise, use the minimum of n_comps and rep's dimensions.
n_jobs : ``int``, optional, default: ``-1``.
Number of threads to use in Harmony. ``-1`` refers to using all physical CPU cores.
n_clusters: ``int``, optional, default: ``None``.
Number of Harmony clusters. Default is ``None``, which asks Harmony to estimate this number from the data.
random_state: ``int``, optional, default: ``0``.
Seed for random number generator
use_gpu: ``bool``, optional, default: ``False``.
If ``True``, use GPU if available. Otherwise, use CPU only.
max_iter_harmony: ``int``, optional, default: ``10``.
Maximum iterations on running Harmony if not converged.
Returns
-------
out_rep: ``str``
The keyword in ``data.obsm`` referring to the embedding calculated by Harmony algorithm.
This keyword is ``rep + '_harmony'``, where ``rep`` is the input parameter above.
Update ``data.obsm``:
* ``data.obsm['X_' + out_rep]``: The embedding calculated by Harmony algorithm.
Examples
--------
>>> pg.run_harmony(data, rep = "pca", n_jobs = 10, random_state = 25)
"""
if not check_batch_key(data, batch, "Cannot apply Harmony!"):
return rep
try:
from harmony import harmonize
except ImportError as e:
import sys
logger.error(f"{e}\nNeed Harmony! Try 'pip install harmony-pytorch'.")
sys.exit(-1)
logger.info("Start integration using Harmony.")
out_rep = rep + "_harmony"
data.obsm["X_" + out_rep] = harmonize(
X_from_rep(data, rep, n_comps),
data.obs,
batch,
n_clusters = n_clusters,
n_jobs = n_jobs,
random_state = random_state,
use_gpu = use_gpu,
max_iter_harmony = max_iter_harmony,
)
return out_rep
[docs]@timer(logger=logger)
def run_scanorama(
data: Union[MultimodalData, UnimodalData],
batch: str = "Channel",
n_components: int = 50,
features: str = "highly_variable_features",
standardize: bool = True,
max_value: float = 10.0,
random_state: int = 0,
) -> str:
"""Batch correction using Scanorama.
This is a wrapper of `Scanorama <https://github.com/brianhie/scanorama>`_ package. See [Hie19]_ for details on the algorithm.
Parameters
----------
data: ``MultimodalData``.
Annotated data matrix with rows for cells and columns for genes.
batch: ``str``, optional, default: ``"Channel"``.
Which attribute in data.obs field represents batches, default is "Channel".
n_components: ``int``, optional default: ``50``.
Number of integrated embedding components to keep. This sets Scanorama's dimred parameter.
features: ``str``, optional, default: ``"highly_variable_features"``.
Keyword in ``data.var`` to specify features used for Scanorama.
standardize: ``bool``, optional, default: ``True``.
Whether to scale the data to unit variance and zero mean.
max_value: ``float``, optional, default: ``10``.
The threshold to truncate data after scaling. If ``None``, do not truncate.
random_state: ``int``, optional, default: ``0``.
Seed for random number generator.
Returns
-------
out_rep: ``str``
The keyword in ``data.obsm`` referring to the embedding calculated by Scanorama algorithm. out_rep is always equal to "scanorama"
Update ``data.obsm``:
* ``data.obsm['X_scanorama']``: The embedding calculated by Scanorama algorithm.
Examples
--------
>>> pg.run_scanorama(data, random_state = 25)
"""
if not check_batch_key(data, batch, "Cannot apply Scanorama!"):
return "pca"
try:
from scanorama import integrate
except ImportError as e:
import sys
logger.error(f"{e}\nNeed Scanorama! Try 'pip install scanorama'.")
sys.exit(-1)
logger.info("Start integration using Scanorama.")
rep = "scanorama"
keyword = select_features(data, features=features, standardize=standardize, max_value=max_value)
X = data.uns[keyword]
datasets = []
for channel in data.obs[batch].cat.categories:
idx = (data.obs[batch] == channel).values
assert idx.sum() > 0
datasets.append(X[idx, :])
genes_list = [[str(i) for i in range(X.shape[1])]] * data.obs[batch].cat.categories.size
integrated, genes = integrate(datasets, genes_list, dimred = n_components, seed = random_state)
data.obsm[f"X_{rep}"] = np.concatenate(integrated, axis = 0)
return rep