Source code for pegasus.tools.pseudobulk

import numpy as np
import pandas as pd
import logging
logger = logging.getLogger(__name__)

from pegasusio import MultimodalData, UnimodalData, timer
from pandas.api.types import is_categorical_dtype, is_numeric_dtype
from typing import Union, Optional, List, Tuple



def set_bulk_value(col):
    if is_numeric_dtype(col):
        return col.mean()
    else:
        # Categorical
        return col.value_counts().idxmax()


def get_pseudobulk_count(X, df, attr, bulk_list):
    M = []
    for bulk in bulk_list:
        df_bulk = df.loc[df[attr] == bulk]
        bulk_vec = np.sum(X[df_bulk.index, :], axis=0).A1
        M.append(bulk_vec)

    return np.array(M, dtype=np.int32)


[docs]@timer(logger=logger) def pseudobulk( data: MultimodalData, sample: str, attrs: Optional[Union[List[str], str]] = None, mat_key: Optional[str] = "counts", cluster: Optional[str] = None, ) -> UnimodalData: """Generate Pseudo-bulk count matrices. Parameters ----------- data: ``MultimodalData`` or ``UnimodalData`` object Annotated data matrix with rows for cells and columns for genes. sample: ``str`` Specify the cell attribute used for aggregating pseudo-bulk data. Key must exist in ``data.obs``. attrs: ``str`` or ``List[str]``, optional, default: ``None`` Specify additional cell attributes to remain in the pseudo bulk data. If set, all attributes' keys must exist in ``data.obs``. Notice that for a categorical attribute, each pseudo-bulk's value is the one of highest frequency among its cells, and for a numeric attribute, each pseudo-bulk's value is the mean among its cells. mat_key: ``str``, optional, default: ``counts`` Specify the single-cell count matrix used for aggregating pseudo-bulk counts: If specified, use the count matrix with key ``mat_key`` from matrices of ``data``; otherwise, default is ``counts``. cluster: ``str``, optional, default: ``None`` If set, additionally generate pseudo-bulk matrices per cluster specified in ``data.obs[cluster]``. Returns ------- A UnimodalData object ``udata`` containing pseudo-bulk information: * It has the following count matrices: * ``X``: The pseudo-bulk count matrix over all cells. * If ``cluster`` is set, a number of pseudo-bulk count matrices of cells belonging to the clusters, respectively. * ``udata.obs``: It contains pseudo-bulk attributes aggregated from the corresponding single-cell attributes. * ``udata.var``: Gene names and Ensembl IDs are maintained. Update ``data``: * Add the returned UnimodalData object above to ``data`` with key ``<sample>-pseudobulk``, where ``<sample>`` is replaced by the actual value of ``sample`` argument. Examples -------- >>> pg.pseudobulk(data, sample="Channel") """ X = data.get_matrix(mat_key) assert sample in data.obs.columns, f"Sample key '{sample}' must exist in data.obs!" sample_vec = ( data.obs[sample] if is_categorical_dtype(data.obs[sample]) else data.obs[sample].astype("category") ) bulk_list = sample_vec.cat.categories df_barcode = data.obs.reset_index() mat_dict = {"counts": get_pseudobulk_count(X, df_barcode, sample, bulk_list)} # Generate pseudo-bulk attributes if specified bulk_attr_list = [] if attrs is not None: if isinstance(attrs, str): attrs = [attrs] for attr in attrs: assert ( attr in data.obs.columns ), f"Cell attribute key '{attr}' must exist in data.obs!" for bulk in bulk_list: df_bulk = df_barcode.loc[df_barcode[sample] == bulk] if attrs is not None: bulk_attr = df_bulk[attrs].apply(set_bulk_value, axis=0) bulk_attr["barcodekey"] = bulk else: bulk_attr = pd.Series({"barcodekey": bulk}) bulk_attr_list.append(bulk_attr) df_pseudobulk = pd.DataFrame(bulk_attr_list) df_feature = pd.DataFrame(index=data.var_names) if "featureid" in data.var.columns: df_feature["featureid"] = data.var["featureid"] if cluster is not None: assert ( cluster in data.obs.columns ), f"Cluster key '{attr}' must exist in data.obs!" cluster_list = data.obs[cluster].astype("category").cat.categories for cls in cluster_list: mat_dict[f"{cluster}_{cls}.X"] = get_pseudobulk_count( X, df_barcode.loc[df_barcode[cluster] == cls], sample, bulk_list ) udata = UnimodalData( barcode_metadata=df_pseudobulk, feature_metadata=df_feature, matrices=mat_dict, genome=sample, modality="pseudobulk", cur_matrix="counts", ) data.add_data(udata) return udata
[docs]@timer(logger=logger) def deseq2( pseudobulk: UnimodalData, design: str, contrast: Tuple[str, str, str], de_key: str = "deseq2", replaceOutliers: bool = True, ) -> None: """Perform Differential Expression (DE) Analysis using DESeq2 on pseduobulk data. This function calls R package DESeq2, requiring DESeq2 in R installed. DE analysis will be performed on all pseudo-bulk matrices in pseudobulk. Parameters ---------- pseudobulk: ``UnimodalData`` Pseudobulk data with rows for samples and columns for genes. If pseudobulk contains multiple matrices, DESeq2 will apply to all matrices. design: ``str`` Design formula that will be passed to DESeq2 contrast: ``Tuple[str, str, str]`` A tuple of three elements passing to DESeq2: a factor in design formula, a level in the factor as numeritor of fold change, and a level as denominator of fold change. de_key: ``str``, optional, default: ``"deseq2"`` Key name of DE analysis results stored. For cluster.X, stored key will be cluster.de_key replaceOutliers: ``bool``, optional, default: ``True`` If execute DESeq2's replaceOutliers step. If set to ``False``, we will set minReplicatesForReplace=Inf in ``DESeq`` function and set cooksCutoff=False in ``results`` function. Returns ------- ``None`` Update ``pseudobulk.varm``: ``pseudobulk.varm[de_key]``: DE analysis result for pseudo-bulk count matrix. ``pseudobulk.varm[cluster.de_key]``: DE results for cluster-specific pseudo-bulk count matrices. Examples -------- >>> pg.deseq2(pseudobulk, '~gender', ('gender', 'female', 'male')) """ try: import rpy2.robjects as ro from rpy2.robjects import pandas2ri, numpy2ri, Formula from rpy2.robjects.packages import importr from rpy2.robjects.conversion import localconverter except ModuleNotFoundError as e: import sys logger.error(f"{e}\nNeed rpy2! Try 'pip install rpy2'.") sys.exit(-1) try: deseq2 = importr('DESeq2') except ModuleNotFoundError: import sys text = """Please install DESeq2 in order to run this function.\n To install this package, start R and enter:\n if (!require("BiocManager", quietly = TRUE)) install.packages("BiocManager") BiocManager::install("DESeq2")""" logger.error(text) sys.exit(-1) import math to_dataframe = ro.r('function(x) data.frame(x)') for mat_key in pseudobulk.list_keys(): with localconverter(ro.default_converter + numpy2ri.converter + pandas2ri.converter): dds = deseq2.DESeqDataSetFromMatrix(countData = pseudobulk.get_matrix(mat_key).T, colData = pseudobulk.obs, design = Formula(design)) if replaceOutliers: dds = deseq2.DESeq(dds) res= deseq2.results(dds, contrast=ro.StrVector(contrast)) else: dds = deseq2.DESeq(dds, minReplicatesForReplace=math.inf) res= deseq2.results(dds, contrast=ro.StrVector(contrast), cooksCutoff=False) with localconverter(ro.default_converter + pandas2ri.converter): res_df = ro.conversion.rpy2py(to_dataframe(res)) res_df.fillna({'log2FoldChange': 0.0, 'lfcSE': 0.0, 'stat': 0.0, 'pvalue': 1.0, 'padj': 1.0}, inplace=True) de_res_key = de_key if mat_key.find('.') < 0 else f"{mat_key.partition('.')[0]}.{de_key}" pseudobulk.varm[de_res_key] = res_df.to_records(index=False)