Source code for pegasus.tools.preprocessing

import time
import numpy as np
import pandas as pd

from scipy.sparse import issparse

from sklearn.decomposition import PCA

from typing import List, Tuple, Union
from pegasusio import UnimodalData, MultimodalData, calc_qc_filters, DictWithDefault

import logging
logger = logging.getLogger(__name__)

from pegasusio import timer, run_gc



[docs]def qc_metrics( data: MultimodalData, select_singlets: bool = False, remap_string: str = None, subset_string: str = None, min_genes: int = 500, max_genes: int = 6000, min_umis: int = None, max_umis: int = None, mito_prefix: str = "MT-", percent_mito: float = 20.0, ) -> None: """Generate Quality Control (QC) metrics regarding cell barcodes on the dataset. Parameters ---------- data: ``pegasusio.MultimodalData`` Use current selected modality in data, which should contain one RNA expression matrix. select_singlets: ``bool``, optional, default ``False`` If select only singlets. remap_string: ``str``, optional, default ``None`` Remap singlet names using <remap_string>, where <remap_string> takes the format "new_name_i:old_name_1,old_name_2;new_name_ii:old_name_3;...". For example, if we hashed 5 libraries from 3 samples sample1_lib1, sample1_lib2, sample2_lib1, sample2_lib2 and sample3, we can remap them to 3 samples using this string: "sample1:sample1_lib1,sample1_lib2;sample2:sample2_lib1,sample2_lib2". In this way, the new singlet names will be in metadata field with key 'assignment', while the old names will be kept in metadata field with key 'assignment.orig'. subset_string: ``str``, optional, default ``None`` If select singlets, only select singlets in the <subset_string>, which takes the format "name1,name2,...". Note that if --remap-singlets is specified, subsetting happens after remapping. For example, we can only select singlets from sampe 1 and 3 using "sample1,sample3". min_genes: ``int``, optional, default: ``500`` Only keep cells with at least ``min_genes`` genes. max_genes: ``int``, optional, default: ``6000`` Only keep cells with less than ``max_genes`` genes. min_umis: ``int``, optional, default: ``None`` Only keep cells with at least ``min_umis`` UMIs. max_umis: ``int``, optional, default: ``None`` Only keep cells with less than ``max_umis`` UMIs. mito_prefix: ``str``, optional, default: ``MT-`` Prefix for mitochondrial genes. percent_mito: ``float``, optional, default: ``20.0`` Only keep cells with percent mitochondrial genes less than ``percent_mito`` % of total counts. Returns ------- ``None`` Update ``data.obs``: * ``n_genes``: Total number of genes for each cell. * ``n_counts``: Total number of counts for each cell. * ``percent_mito``: Percent of mitochondrial genes for each cell. * ``passed_qc``: Boolean type indicating if a cell passes the QC process based on the QC metrics. * ``demux_type``: this column might be deleted if select_singlets is on. Examples -------- >>> pg.qc_metrics(data) """ if isinstance(data, MultimodalData): data = data.current_data() # Make sure that n_genes and n_counts statistics are calculated by setting min_genes = 1 and min_umis = 1 if min_genes is None: min_genes = 1 if min_umis is None: min_umis = 1 calc_qc_filters(data, select_singlets = select_singlets, remap_string = remap_string, subset_string = subset_string, min_genes = min_genes, max_genes = max_genes, min_umis = min_umis, max_umis = max_umis, mito_prefix = mito_prefix, percent_mito = percent_mito)
[docs]def get_filter_stats(data: MultimodalData, min_genes_before_filt: int = 100) -> pd.DataFrame: """Calculate filtration stats on cell barcodes. Parameters ---------- data: ``pegasusio.MultimodalData`` Use current selected modality in data, which should contain one RNA expression matrix. min_genes_before_filt: ``int``, optional, default ``100`` If raw data matrix is input, empty barcodes will dominate pre-filtration statistics. To avoid this, for raw matrix, only consider barcodes with at least <number> genes for pre-filtration condition. Returns ------- df_cells: ``pandas.DataFrame`` Data frame of stats on cell filtration. Examples -------- >>> df = pg.get_filter_stats(data) """ # cell stats if isinstance(data, MultimodalData): data = data.current_data() if "Channel" not in data.obs: data.obs["Channel"] = pd.Categorical([""] * data.shape[0]) df = data.obs[data.obs["n_genes"] >= min_genes_before_filt] if data.obs["n_genes"].min() == 0 else data.obs gb1 = df.groupby("Channel") df_before = gb1.median() df_before = df_before.assign(total=gb1.size()) df_before.rename( columns={ "n_genes": "median_n_genes_before", "n_counts": "median_n_umis_before", "percent_mito": "median_percent_mito_before", }, inplace=True, ) # focusing only on filtered cells gb2 = data.obs[data.obs["passed_qc"]].groupby("Channel") df_after = gb2.median() df_after = df_after.assign(kept=gb2.size()) df_after.rename( columns={ "n_genes": "median_n_genes", "n_counts": "median_n_umis", "percent_mito": "median_percent_mito", }, inplace=True, ) df_cells = pd.concat((df_before, df_after), axis=1, sort=False) df_cells.fillna(0, inplace=True) df_cells["kept"] = df_cells["kept"].astype(int) df_cells["filt"] = df_cells["total"] - df_cells["kept"] target_cols = np.array(["kept", "median_n_genes", "median_n_umis", "median_percent_mito", "filt", "total", "median_n_genes_before", "median_n_umis_before", "median_percent_mito_before"]) target_cols = target_cols[np.isin(target_cols, df_cells.columns)] df_cells = df_cells[target_cols] df_cells.sort_values("kept", inplace=True) return df_cells
[docs]def filter_data(data: MultimodalData, focus_list: List[str] = None) -> None: """ Filter data based on qc_metrics calculated in ``pg.qc_metrics``. Parameters ---------- data: ``pegasusio.MultimodalData`` Use current selected modality in data, which should contain one RNA expression matrix. focus_list: ``List[str]``, optional, default None UnimodalData objects with keys in focus_list were qc_metrics marked. Filter them and make sure other modalities' barcodes are consistent with filtered barcodes. If focus_list is None and self._selected's modality is "rna", focus_list = [self._selected] Returns ------- ``None`` Update ``data`` with cells after filtration. Examples -------- >>> pg.filter_data(data) """ data.filter_data(focus_list = focus_list, cache_passqc = True)
[docs]def identify_robust_genes(data: MultimodalData, percent_cells: float = 0.05) -> None: """ Identify robust genes as candidates for HVG selection and remove genes that are not expressed in any cells. Parameters ---------- data: ``pegasusio.MultimodalData`` Use current selected modality in data, which should contain one RNA expression matrix. percent_cells: ``float``, optional, default: ``0.05`` Only assign genes to be ``robust`` that are expressed in at least ``percent_cells`` % of cells. Returns ------- ``None`` Update ``data.var``: * ``n_cells``: Total number of cells in which each gene is measured. * ``percent_cells``: Percent of cells in which each gene is measured. * ``robust``: Boolean type indicating if a gene is robust based on the QC metrics. * ``highly_variable_features``: Boolean type indicating if a gene is a highly variable feature. By default, set all robust genes as highly variable features. Examples -------- >>> pg.identify_robust_genes(data, percent_cells = 0.05) """ if isinstance(data, MultimodalData): data = data.current_data() data.var["n_cells"] = data.X.getnnz(axis=0) data.var["percent_cells"] = (data.var["n_cells"] / data.shape[0]) * 100 data.var["robust"] = data.var["percent_cells"] >= percent_cells data.var["highly_variable_features"] = data.var["robust"] # default all robust genes are "highly" variable prior_n = data.shape[1] data._inplace_subset_var(data.var["n_cells"] > 0) logger.info(f"After filtration, {data.shape[1]}/{prior_n} genes are kept. Among {data.shape[1]} genes, {data.var['robust'].sum()} genes are robust.")
def _generate_filter_plots( unidata: UnimodalData, plot_filt: str, plot_filt_figsize: str = None, min_genes_before_filt: int = 100 ) -> None: """ This function generates filtration plots, only used in command line. """ group_key = unidata.get_uid() from pegasus.plotting import qcviolin kwargs = {"return_fig": True, "dpi": 500} if plot_filt_figsize is not None: width, height = plot_filt_figsize.split(",") kwargs["panel_size"] = (int(width), int(height)) fig = qcviolin(unidata, "count", **kwargs) fig.savefig(f"{plot_filt}.{group_key}.filt.UMI.pdf") fig = qcviolin(unidata, "gene", **kwargs) fig.savefig(f"{plot_filt}.{group_key}.filt.gene.pdf") fig = qcviolin(unidata, "mito", **kwargs) if fig is not None: fig.savefig(f"{plot_filt}.{group_key}.filt.mito.pdf") logger.info("Filtration plots are generated.") @timer(logger=logger) def _run_filter_data( data: MultimodalData, focus_list: List[str] = None, output_filt: str = None, plot_filt: str = None, plot_filt_figsize: Tuple[int, int] = None, min_genes_before_filt: int = 100, select_singlets: bool = False, remap_string: str = None, subset_string: str = None, min_genes: int = 500, max_genes: int = 6000, min_umis: int = None, max_umis: int = None, mito_prefix: str = "MT-", percent_mito: float = 20.0, percent_cells: float = 0.05, ) -> None: """ This function is for command line use. """ if focus_list is None: focus_list = [data.current_key()] mito_dict = DictWithDefault(mito_prefix) for key in focus_list: unidata = data.get_data(key) qc_metrics( unidata, select_singlets, remap_string, subset_string, min_genes, max_genes, min_umis, max_umis, mito_dict.get(unidata.get_genome()), percent_mito, ) if output_filt is not None: group_key = unidata.get_uid() writer = pd.ExcelWriter(f"{output_filt}.{group_key}.filt.xlsx", engine="xlsxwriter") df_cells = get_filter_stats(unidata, min_genes_before_filt = min_genes_before_filt) df_cells.to_excel(writer, sheet_name="Cell filtration stats") writer.save() logger.info(f"Filtration results for {group_key} are written.") if plot_filt is not None: _generate_filter_plots(unidata, plot_filt, plot_filt_figsize = plot_filt_figsize, min_genes_before_filt = min_genes_before_filt) filter_data(data, focus_list = focus_list) for key in focus_list: unidata = data.get_data(key) identify_robust_genes(unidata, percent_cells = percent_cells)
[docs]@timer(logger=logger) @run_gc def log_norm(data: MultimodalData, norm_count: float = 1e5, backup_matrix: str = "raw.X") -> None: """Normalization, and then apply natural logarithm to the data. Parameters ---------- data: ``pegasusio.MultimodalData`` Use current selected modality in data, which should contain one RNA expression matrix. norm_count: ``int``, optional, default: ``1e5``. Total count of cells after normalization. backup_matrix: ``str``, optional, default: ``raw.X``. Where to back up the count matrix. Returns ------- ``None`` Update ``data.X`` with count matrix after log-normalization. In addition, back up the original count matrix as ``backup_matrix``. Examples -------- >>> pg.log_norm(data) """ if isinstance(data, MultimodalData): data = data.current_data() assert data.get_modality() == "rna" data.add_matrix(backup_matrix, data.X) data.X = data.get_matrix("X").astype(np.float32) mat = data.X[:, data.var["robust"].values] scale = norm_count / mat.sum(axis=1).A1 data.X.data *= np.repeat(scale, np.diff(data.X.indptr)) data.X.data = np.log1p(data.X.data) # faster than data.X.log1p() data.obs["scale"] = scale
[docs]@run_gc def select_features(data: MultimodalData, features: str = "highly_variable_features", standardize: bool = True, max_value: float = 10.0) -> str: """ Subset the features and store the resulting matrix in dense format in data.uns with `'fmat_'` prefix, with the option of standardization and truncating based on max_value. `'fmat_*'` will be removed before writing out the disk. Parameters ---------- data: ``pegasusio.MultimodalData`` Annotated data matrix with rows for cells and columns for genes. features: ``str``, optional, default: ``None``. a keyword in ``data.var``, which refers to a boolean array. If ``None``, all features will be selected. standardize: ``bool``, optional, default: ``True``. Whether to scale the data to unit variance and zero mean. max_value: ``float``, optional, default: ``10``. The threshold to truncate data after scaling. If ``None``, do not truncate. Returns ------- keyword: ``str`` The keyword in ``data.uns`` referring to the features selected. Update ``data.uns``: * ``data.uns[keyword]``: A submatrix of the data containing features selected. Examples -------- >>> pg.select_features(data) """ keyword = "fmat_" + str(features) # fmat: feature matrix if features is not None: assert features in data.var X = data.X[:, data.var[features].values] else: X = data.X data.uns[keyword] = X.toarray() if issparse(X) else X.copy() if standardize or (max_value is not None): X = data.uns[keyword] if standardize: m1 = X.mean(axis=0) psum = np.multiply(X, X).sum(axis=0) std = ((psum - X.shape[0] * (m1 ** 2)) / (X.shape[0] - 1.0)) ** 0.5 std[std == 0] = 1 X -= m1 X /= std if max_value is not None: X[X > max_value] = max_value X[X < -max_value] = -max_value data.uns[keyword] = X return keyword
[docs]@timer(logger=logger) def pca( data: MultimodalData, n_components: int = 50, features: str = "highly_variable_features", standardize: bool = True, max_value: float = 10, robust: bool = False, random_state: int = 0, ) -> None: """Perform Principle Component Analysis (PCA) to the data. The calculation uses *scikit-learn* implementation. Parameters ---------- data: ``pegasusio.MultimodalData`` Annotated data matrix with rows for cells and columns for genes. n_components: ``int``, optional, default: ``50``. Number of Principal Components to get. features: ``str``, optional, default: ``"highly_variable_features"``. Keyword in ``data.var`` to specify features used for PCA. standardize: ``bool``, optional, default: ``True``. Whether to scale the data to unit variance and zero mean. max_value: ``float``, optional, default: ``10``. The threshold to truncate data after scaling. If ``None``, do not truncate. robust: ``bool``, optional, default: ``False``. If true, use 'arpack' instead of 'randomized' for large sparse matrices (i.e. max(X.shape) > 500 and n_components < 0.8 * min(X.shape)) random_state: ``int``, optional, default: ``0``. Random seed to be set for reproducing result. Returns ------- ``None``. Update ``data.obsm``: * ``data.obsm["X_pca"]``: PCA matrix of the data. Update ``data.uns``: * ``data.uns["PCs"]``: The principal components containing the loadings. * ``data.uns["pca_variance"]``: Explained variance, i.e. the eigenvalues of the covariance matrix. * ``data.uns["pca_variance_ratio"]``: Ratio of explained variance. Examples -------- >>> pg.pca(data) """ keyword = select_features(data, features = features, standardize = standardize, max_value = max_value) X = data.uns[keyword] pca = PCA(n_components=n_components, random_state=random_state) if robust: svd_solver = "arpack" if max(X.shape) > 500 and n_components < 0.8 * min(X.shape) else "full" pca = PCA(n_components=n_components, random_state=random_state, svd_solver=svd_solver) X_pca = pca.fit_transform(X) data.obsm["X_pca"] = np.ascontiguousarray(X_pca) data.uns[ "PCs" ] = pca.components_.T # cannot be varm because numbers of features are not the same data.uns["pca"] = {} data.uns["pca"]["variance"] = pca.explained_variance_ data.uns["pca"]["variance_ratio"] = pca.explained_variance_ratio_
[docs]@timer(logger=logger) def pc_regress_out( data: Union[MultimodalData, UnimodalData], attrs: List[str], rep: str = 'pca', ) -> str: """Regress out effects due to specific observational attributes at Principal Component level. Parameters ---------- data: ``MultimodalData`` or ``UnimodalData`` object Annotated data matrix with rows for cells and columns for genes. attrs: ``List[str]`` List of numeric cell attributes to be regressed out. They must exist in ``data.obs`` field. rep: ``str``, optional, default: ``pca`` This is to specify which embedding to be used for regressing out. The key ``'X_'+rep`` must exist in ``data.obsm`` field. By default, use PCA embedding. Returns ------- pca_key: ``str`` The key to the resulting new embedding matrix in ``data.obsm``. It's ``'X_'+rep+'_regressed'``. Update ``data.obsm``: * ``data.obsm[pca_key]``: The PCA matrix with effects of attributes specified regressed out. Examples -------- >>> pg.pc_regress(data, attrs=['G1/S', 'G2/M']) """ n_components = data.obsm[f'X_{rep}'].shape[1] from pandas.api.types import is_numeric_dtype for attr in attrs: if not is_numeric_dtype(data.obs[attr]): raise TypeError(f"Cell attribute '{attr}' is not numeric. For regressing out, All attributes used must be numeric!") X = data.obs[attrs] from sklearn.linear_model import LinearRegression response_list = [] for i in range(n_components): pc = data.obsm[f'X_{rep}'][:, i] model = LinearRegression().fit(X, pc) y_pred = model.predict(X) resid = pc - y_pred response_list.append(resid) pca_key = f'{rep}_regressed' data.obsm[f'X_{pca_key}'] = np.vstack(response_list).T.astype(data.obsm[f'X_{rep}'].dtype) return pca_key