Source code for pegasus.tools.diff_expr

import time
import numpy as np
import pandas as pd

from pandas.api.types import is_categorical_dtype
from scipy.sparse import csr_matrix
from statsmodels.stats.multitest import fdrcorrection as fdr
from joblib import Parallel, delayed, parallel_backend

from typing import List, Tuple, Dict, Union, Optional

import logging
logger = logging.getLogger(__name__)

from anndata import AnnData
from pegasusio import timer, MultimodalData, UnimodalData
from pegasus.tools import eff_n_jobs



def _calc_qvals(
    nclust: int,
    pvals: np.ndarray,
    first_j: int,
    second_j: int,
) -> np.ndarray:
    """ Calculate FDR
    """
    qvals = np.zeros(pvals.shape, dtype = np.float32)
    if second_j > 0:
        _, qval = fdr(pvals[:, first_j])
        qvals[:, first_j] = qvals[:, second_j] = qval
    else:
        for j in range(nclust):
            _, qvals[:, j] = fdr(pvals[:, j])
    return qvals


def _de_test(
    X: csr_matrix,
    cluster_labels: pd.Categorical,
    gene_names: List[str],
    n_jobs: int,
    t: Optional[bool] = False,
    fisher: Optional[bool] = False,
    temp_folder: Optional[str] = None,
    verbose: Optional[bool] = True,
) -> pd.DataFrame:
    """ Collect sufficient statistics, run Mann-Whitney U test, calculate auroc (triggering diff_expr_utils.calc_mwu in parallel), optionally run Welch's T test and Fisher's Exact test (in parallel).
    """
    from pegasus.cylib.de_utils import csr_to_csc, calc_mwu, calc_stat

    start = time.perf_counter()

    ords = np.argsort(cluster_labels.codes)
    data, indices, indptr = csr_to_csc(X.data, X.indices, X.indptr, X.shape[0], X.shape[1], ords)
    cluster_cnts = cluster_labels.value_counts()
    n1arr = cluster_cnts.values
    n2arr = X.shape[0] - n1arr
    cluster_cumsum = cluster_cnts.cumsum().values
    nclust = n1arr.size

    first_j = second_j = -1
    posvec = np.where(n1arr > 0)[0]
    if len(posvec) == 2:
        first_j = posvec[0]
        second_j = posvec[1]


    if verbose:
        end = time.perf_counter()
        logger.info(f"CSR matrix is converted to CSC matrix. Time spent = {end - start:.4f}s.")
        start = end
        # logger.info(f"Preparation (including converting X to csc_matrix format) for MWU test is finished. Time spent = {time.perf_counter() - start:.2f}s.")


    ngene = X.shape[1]
    quotient = ngene // n_jobs
    residue = ngene % n_jobs
    intervals = []
    start_pos = end_pos = 0
    for i in range(n_jobs):
        end_pos = start_pos + quotient + (i < residue)
        if end_pos == start_pos:
            break
        intervals.append((start_pos, end_pos))
        start_pos = end_pos

    with parallel_backend("loky", inner_max_num_threads=1):
        result_list = Parallel(n_jobs=len(intervals), temp_folder=temp_folder)(
            delayed(calc_mwu)(
                start_pos,
                end_pos,
                data,
                indices,
                indptr,
                n1arr,
                n2arr,
                cluster_cumsum,
                first_j,
                second_j,
                verbose,
            )
            for start_pos, end_pos in intervals
        )

    Ulist = []
    plist = []
    alist = []
    for U_stats, pvals, aurocs in result_list:
        Ulist.append(U_stats)
        plist.append(pvals)
        alist.append(aurocs)

    U_stats = np.concatenate(Ulist, axis = 0)
    pvals = np.concatenate(plist, axis = 0)
    aurocs = np.concatenate(alist, axis = 0)
    qvals = _calc_qvals(nclust, pvals, first_j, second_j)

    dfU = pd.DataFrame(U_stats, index = gene_names, columns = [f"{x}:mwu_U" for x in cluster_labels.categories])
    dfUp = pd.DataFrame(pvals, index = gene_names, columns = [f"{x}:mwu_pval" for x in cluster_labels.categories])
    dfUq = pd.DataFrame(qvals, index = gene_names, columns = [f"{x}:mwu_qval" for x in cluster_labels.categories])
    dfUa = pd.DataFrame(aurocs, index = gene_names, columns = [f"{x}:auroc" for x in cluster_labels.categories])

    if verbose:
        end = time.perf_counter()
        logger.info(f"MWU test and AUROC calculation are finished. Time spent = {end - start:.4f}s.")
        start = end

    # basic statistics and optional t test and fisher test
    results = calc_stat(data, indices, indptr, n1arr, n2arr, cluster_cumsum, first_j, second_j, t, fisher, verbose)
    dfl2M = pd.DataFrame(results[0][0], index = gene_names, columns = [f"{x}:log2Mean" for x in cluster_labels.categories])
    dfl2Mo = pd.DataFrame(results[0][1], index = gene_names, columns = [f"{x}:log2Mean_other" for x in cluster_labels.categories])
    dfl2FC = pd.DataFrame(results[0][2], index = gene_names, columns = [f"{x}:log2FC" for x in cluster_labels.categories])
    dfpct = pd.DataFrame(results[0][3], index = gene_names, columns = [f"{x}:percentage" for x in cluster_labels.categories])
    dfpcto = pd.DataFrame(results[0][4], index = gene_names, columns = [f"{x}:percentage_other" for x in cluster_labels.categories])
    dfpfc = pd.DataFrame(results[0][5], index = gene_names, columns = [f"{x}:percentage_fold_change" for x in cluster_labels.categories])

    df_list = [dfl2M, dfl2Mo, dfl2FC, dfpct, dfpcto, dfpfc, dfUa, dfU, dfUp, dfUq]

    if verbose:
        end = time.perf_counter()
        logger.info(f"Sufficient statistics are collected. Time spent = {end - start:.4f}s.")
        start = end

    if t:
        qvals = _calc_qvals(nclust, results[1][1], first_j, second_j)
        dft = pd.DataFrame(results[1][0], index = gene_names, columns = [f"{x}:t_tstat" for x in cluster_labels.categories])
        dftp = pd.DataFrame(results[1][1], index = gene_names, columns = [f"{x}:t_pval" for x in cluster_labels.categories])
        dftq = pd.DataFrame(qvals, index = gene_names, columns = [f"{x}:t_qval" for x in cluster_labels.categories])
        df_list.extend([dft, dftp, dftq])

        if verbose:
            end = time.perf_counter()
            logger.info(f"Welch's t-test is finished. Time spent = {end - start:.4f}s.")
            start = end

    if fisher:
        from pegasus.cylib.cfisher import fisher_exact
        a_true, a_false, b_true, b_false = results[1 if not t else 2]

        oddsratios = np.zeros((ngene, n1arr.size), dtype = np.float32)
        pvals = np.ones((ngene, n1arr.size), dtype = np.float32)

        if second_j > 0:
            oddsratio, pval = fisher_exact(a_true[first_j], a_false[first_j], b_true[first_j], b_false[first_j])
            oddsratios[:, first_j] = oddsratio
            idx1 = oddsratio > 0.0
            idx2 = oddsratio < 1e30
            oddsratios[idx1 & idx2, second_j] = 1.0 / oddsratio[idx1 & idx2]
            oddsratios[~idx1] = 1e30
            pvals[:, first_j] = pvals[:, second_j] = pval
        else:
            with parallel_backend("loky", inner_max_num_threads=1):
                result_list = Parallel(n_jobs=n_jobs, temp_folder=temp_folder)(
                    delayed(fisher_exact)(
                        a_true[i],
                        a_false[i],
                        b_true[i],
                        b_false[i],
                    )
                    for i in posvec
                )

            for i in range(posvec.size):
                oddsratios[:, posvec[i]] = result_list[i][0]
                pvals[:, posvec[i]] = result_list[i][1]

        qvals = _calc_qvals(nclust, pvals, first_j, second_j)
        dff = pd.DataFrame(oddsratios, index = gene_names, columns = [f"{x}:fisher_oddsratio" for x in cluster_labels.categories])
        dffp = pd.DataFrame(pvals, index = gene_names, columns = [f"{x}:fisher_pval" for x in cluster_labels.categories])
        dffq = pd.DataFrame(qvals, index = gene_names, columns = [f"{x}:fisher_qval" for x in cluster_labels.categories])
        df_list.extend([dff, dffp, dffq])

        if verbose:
            end = time.perf_counter()
            logger.info(f"Fisher's exact test is finished. Time spent = {end - start:.4f}s.")

    df = pd.concat(df_list, axis = 1)

    return df



def _perform_de_cond(
    clust_ids: List[str],
    cond_labels: pd.Categorical,
    gene_names: List[str],
    cond_n1arr_list: List[List[int]],
    cond_n2arr_list: List[List[int]],
    cond_cumsum_list: List[List[int]],
    data_list: List[List[float]],
    indices_list: List[List[int]],
    indptr_list: List[List[int]],
    t: bool,
    fisher: bool,
    verbose: bool,
) -> List[pd.DataFrame]:
    """ Run DE test for clusters specified. In each cluster, perform one vs. rest for the condition
    """
    df_res_list = []

    from pegasus.cylib.de_utils import calc_mwu, calc_stat

    ngene = indptr_list[0].size - 1
    for i, clust_id in enumerate(clust_ids):
        nclust = cond_n1arr_list[i].size

        first_j = second_j = -1
        posvec = np.where(cond_n1arr_list[i] > 0)[0]
        if len(posvec) == 2:
            first_j = posvec[0]
            second_j = posvec[1]

        U_stats, pvals, aurocs = calc_mwu(0, ngene, data_list[i], indices_list[i], indptr_list[i], cond_n1arr_list[i], cond_n2arr_list[i], cond_cumsum_list[i], first_j, second_j, False)
        qvals = _calc_qvals(nclust, pvals, first_j, second_j)

        dfU = pd.DataFrame(U_stats, index = gene_names, columns = [f"{clust_id}:{x}:mwu_U" for x in cond_labels.categories])
        dfUp = pd.DataFrame(pvals, index = gene_names, columns = [f"{clust_id}:{x}:mwu_pval" for x in cond_labels.categories])
        dfUq = pd.DataFrame(qvals, index = gene_names, columns = [f"{clust_id}:{x}:mwu_qval" for x in cond_labels.categories])
        dfUa = pd.DataFrame(aurocs, index = gene_names, columns = [f"{clust_id}:{x}:auroc" for x in cond_labels.categories])

        results = calc_stat(data_list[i], indices_list[i], indptr_list[i], cond_n1arr_list[i], cond_n2arr_list[i], cond_cumsum_list[i], first_j, second_j, t, fisher, False)
        dfl2M = pd.DataFrame(results[0][0], index = gene_names, columns = [f"{clust_id}:{x}:log2Mean" for x in cond_labels.categories])
        dfl2Mo = pd.DataFrame(results[0][1], index = gene_names, columns = [f"{clust_id}:{x}:log2Mean_other" for x in cond_labels.categories])
        dfl2FC = pd.DataFrame(results[0][2], index = gene_names, columns = [f"{clust_id}:{x}:log2FC" for x in cond_labels.categories])
        dfpct = pd.DataFrame(results[0][3], index = gene_names, columns = [f"{clust_id}:{x}:percentage" for x in cond_labels.categories])
        dfpcto = pd.DataFrame(results[0][4], index = gene_names, columns = [f"{clust_id}:{x}:percentage_other" for x in cond_labels.categories])
        dfpfc = pd.DataFrame(results[0][5], index = gene_names, columns = [f"{clust_id}:{x}:percentage_fold_change" for x in cond_labels.categories])

        df_list = [dfl2M, dfl2Mo, dfl2FC, dfpct, dfpcto, dfpfc, dfUa, dfU, dfUp, dfUq]

        if t:
            qvals = _calc_qvals(nclust, results[1][1], first_j, second_j)
            dft = pd.DataFrame(results[1][0], index = gene_names, columns = [f"{clust_id}:{x}:t_tstat" for x in cond_labels.categories])
            dftp = pd.DataFrame(results[1][1], index = gene_names, columns = [f"{clust_id}:{x}:t_pval" for x in cond_labels.categories])
            dftq = pd.DataFrame(qvals, index = gene_names, columns = [f"{clust_id}:{x}:t_qval" for x in cond_labels.categories])
            df_list.extend([dft, dftp, dftq])

        if fisher:
            a_true, a_false, b_true, b_false = results[1 if not t else 2]

            from pegasus.cylib.cfisher import fisher_exact

            oddsratios = np.zeros((ngene, nclust), dtype = np.float32)
            pvals = np.ones((ngene, nclust), dtype = np.float32)

            if second_j > 0:
                oddsratio, pval = fisher_exact(a_true[first_j], a_false[first_j], b_true[first_j], b_false[first_j])
                oddsratios[:, first_j] = oddsratio
                idx1 = oddsratio > 0.0
                idx2 = oddsratio < 1e30
                oddsratios[idx1 & idx2, second_j] = 1.0 / oddsratio[idx1 & idx2]
                oddsratios[~idx1] = 1e30
                pvals[:, first_j] = pvals[:, second_j] = pval
            else:
                for j in posvec:
                    oddsratios[:, j], pvals[:, j] = fisher_exact(a_true[j], a_false[j], b_true[j], b_false[j])

            qvals = _calc_qvals(nclust, pvals, first_j, second_j)
            dff = pd.DataFrame(oddsratios, index = gene_names, columns = [f"{clust_id}:{x}:fisher_oddsratio" for x in cond_labels.categories])
            dffp = pd.DataFrame(pvals, index = gene_names, columns = [f"{clust_id}:{x}:fisher_pval" for x in cond_labels.categories])
            dffq = pd.DataFrame(qvals, index = gene_names, columns = [f"{clust_id}:{x}:fisher_qval" for x in cond_labels.categories])
            df_list.extend([dff, dffp, dffq])

        df_res_list.append(pd.concat(df_list, axis = 1))

    if verbose:
        clust_ids_str = ','.join([str(x) for x in clust_ids])
        print(f"_perform_de_cond finished for clusters {clust_ids_str}.")

    return df_res_list


def _de_test_cond(
    X: csr_matrix,
    cluster_labels: pd.Categorical,
    cond_labels: pd.Categorical,
    gene_names: List[str],
    n_jobs: int,
    t: Optional[bool] = False,
    fisher: Optional[bool] = False,
    temp_folder: Optional[str] = None,
    verbose: Optional[bool] = True,
) -> List[pd.DataFrame]:
    """ Collect sufficient statistics, run Mann-Whitney U test, calculate auroc, optionally run Welch's T test and Fisher's Exact test on each cluster (DE analysis is based on cond_labels)
    """
    start = time.perf_counter()

    clust_cond = np.array(list(zip(cluster_labels.codes, cond_labels.codes)), dtype = [("clust", "<i4"), ("cond", "<i4")])
    ords = np.argsort(clust_cond)
    df_cross = pd.crosstab(cluster_labels.codes, cond_labels.codes)
    cluster_cnts = df_cross.values.sum(axis = 1)
    cluster_cumsum = cluster_cnts.cumsum()

    from pegasus.cylib.de_utils import csr_to_csc_cond

    data_list, indices_list, indptrs = csr_to_csc_cond(X.data, X.indices, X.indptr, X.shape[1], ords, cluster_cumsum)

    if verbose:
        end = time.perf_counter()
        logger.info(f"CSR matrix is converted to CSC matrix. Time spent = {end - start:.2f}s.")

    # assign from 0 to n and then n to 0; cluster ID is sorted in descending order with respect to number of cells
    ords = np.argsort(cluster_cnts)[::-1]
    nclust = cluster_cumsum.size
    neff = min(n_jobs, nclust)

    intervals = []
    datalists = []
    indiceslists = []
    for i in range(neff):
        intervals.append([])
        datalists.append([])
        indiceslists.append([])

    pos = 0
    sign = 1
    for i in range(nclust):
        intervals[pos].append(ords[i])
        datalists[pos].append(data_list[ords[i]])
        indiceslists[pos].append(indices_list[ords[i]])
        if pos + sign == neff:
            sign = -1
        elif pos + sign == -1:
            sign = 1
        else:
            pos += sign

    n1arr = df_cross.values
    n2arr = cluster_cnts.reshape(-1, 1) - n1arr
    cumsum = df_cross.cumsum(axis = 1).values

    with parallel_backend("loky", inner_max_num_threads=1):
        result_list = Parallel(n_jobs=neff, temp_folder=temp_folder)(
            delayed(_perform_de_cond)(
                cluster_labels.categories[intervals[i]],
                cond_labels,
                gene_names,
                n1arr[intervals[i]],
                n2arr[intervals[i]],
                cumsum[intervals[i]],
                datalists[i],
                indiceslists[i],
                indptrs[intervals[i]],
                t,
                fisher,
                verbose,
            )
            for i in range(neff)
        )

    df = pd.concat([x for y in result_list for x in y], axis = 1)

    return df


[docs]@timer(logger=logger) def de_analysis( data: Union[MultimodalData, UnimodalData, AnnData], cluster: str, condition: Optional[str] = None, subset: Optional[List[str]] = None, de_key: Optional[str] = "de_res", n_jobs: Optional[int] = -1, t: Optional[bool] = False, fisher: Optional[bool] = False, temp_folder: Optional[str] = None, verbose: Optional[bool] = True, ) -> None: """Perform Differential Expression (DE) Analysis on data. The analysis considers one cluster at one time, comparing gene expression levels on cells within the cluster with all the others using a number of statistical tools, and determining up-regulated genes and down-regulated genes of the cluster. Mann-Whitney U test and AUROC are calculated by default. Welch's T test and Fisher's Exact test are optionally. The scalability performance on calculating all the test statistics is improved by the inspiration from `Presto <https://github.com/immunogenomics/presto>`_. Parameters ---------- data: ``MultimodalData``, ``UnimodalData``, or ``anndata.AnnData`` Data matrix with rows for cells and columns for genes. cluster: ``str`` Cluster labels used in DE analysis. Must exist in ``data.obs``. condition: ``str``, optional, default: ``None`` Sample attribute used as condition in DE analysis. If ``None``, no condition is considered; otherwise, must exist in ``data.obs``. If ``condition`` is used, the DE analysis will be performed on cells of each level of ``data.obs[condition]`` respectively, and collect the results after finishing. subset: ``List[str]``, optional, default: ``None`` Perform DE analysis on only a subset of cluster IDs. Cluster ID subset is specified as a list of strings, such as ``[clust_1,clust_3,clust_5]``, where all IDs must exist in ``data.obs[cluster]``. de_key: ``str``, optional, default: ``"de_res"`` Key name of DE analysis results stored. n_jobs: ``int``, optional, default: ``-1`` Number of threads to use. If ``-1``, use all available threads. t: ``bool``, optional, default: ``False`` If ``True``, calculate Welch's t test. fisher: ``bool``, optional, default: ``False`` If ``True``, calculate Fisher's exact test. temp_folder: ``str``, optional, default: ``None`` Joblib temporary folder for memmapping numpy arrays. verbose: ``bool``, optional, default: ``True`` If ``True``, show detailed intermediate output. Returns ------- ``None`` Update ``data.varm``: ``data.varm[de_key]``: DE analysis result. Examples -------- >>> pg.de_analysis(data, cluster='spectral_leiden_labels') >>> pg.de_analysis(data, cluster='louvain_labels', condition='anno') """ if cluster not in data.obs: raise ValueError("Cannot find cluster label!") cluster_labels = data.obs[cluster].values if not is_categorical_dtype(cluster_labels): from natsort import natsorted cluster_labels = pd.Categorical(cluster_labels, natsorted(np.unique(cluster_labels))) cond_labels = None if condition is not None: if condition not in data.obs: raise ValueError("Cannot find condition!") cond_labels = data.obs[condition].values if not is_categorical_dtype(cond_labels): from natsort import natsorted cond_labels = pd.Categorical(cond_labels, natsorted(np.unique(cond_labels))) if cond_labels.categories.size < 2: raise ValueError("Number of conditions must be at least 2!") X = data.X if isinstance(data.X, csr_matrix) else csr_matrix(data.X) # If dense matrix, force it to be a csr_matrix if subset is not None: # subset data for de analysis subset = np.array(subset) idx_s = np.isin(subset, cluster_labels.categories.values) if idx_s.sum() < subset.size: raise ValueError( "These cluster labels do not exist: " + ",".join(subset[~idx_s]) + "!" ) idx = np.isin(cluster_labels, subset) cluster_labels = pd.Categorical(cluster_labels[idx], categories = subset) if cond_labels is not None: cond_labels = cond_labels[idx] X = X[idx] if condition is not None: #Eliminate NaN rows from calculation idx_na = cond_labels.isna() if idx_na.sum() > 0: logger.warning("Detected NaN values in condition. Cells with NaN values are excluded from DE analysis.") idx_not_na = ~idx_na X = X[idx_not_na] cluster_labels = cluster_labels[idx_not_na] cond_labels = cond_labels[idx_not_na] n_jobs = eff_n_jobs(n_jobs) gene_names = data.var_names.values if cond_labels is None: df = _de_test(X, cluster_labels, gene_names, n_jobs, t, fisher, temp_folder, verbose) else: df = _de_test_cond(X, cluster_labels, cond_labels, gene_names, n_jobs, t, fisher, temp_folder, verbose) data.varm[de_key] = df.to_records(index=False) logger.info("Differential expression analysis is finished.")
def _assemble_df(res_dict: dict, rec_array: np.ndarray, prefix: str, col_names: List[str], gene_names: pd.Index, alpha: float, head: int): rec_names = [f"{prefix}:{x}" for x in col_names] df = pd.DataFrame(data=rec_array[rec_names], index=gene_names) df.columns = col_names idx = df["mwu_qval"] <= alpha idx_up = idx & (df["auroc"].values > 0.5) df_up = df.loc[idx_up].sort_values(by="auroc", ascending=False, inplace=False) res_dict["up"] = pd.DataFrame(df_up if head is None else df_up.iloc[0:head]) idx_down = idx & (df["auroc"].values < 0.5) df_down = df.loc[idx_down].sort_values(by="auroc", ascending=True, inplace=False) res_dict["down"] = pd.DataFrame(df_down if head is None else df_down.iloc[0:head])
[docs]def markers( data: Union[MultimodalData, UnimodalData, AnnData], head: int = None, de_key: str = "de_res", alpha: float = 0.05, ) -> Union[Dict[str, Dict[str, pd.DataFrame]], Dict[str, Dict[str, Dict[str, pd.DataFrame]]]]: """ Extract DE results into a human readable structure. This function extracts information from ``data.varm[de_key]``, and return as a human readible dictionary of pandas DataFrame objects. Parameters ---------- data: ``MultimodalData``, ``UnimodalData``, or ``anndata.AnnData`` Data matrix with rows for cells and columns for genes. head: ``int``, optional, default: ``None`` List only top ``head`` genes for each cluster. If ``None``, show any DE genes. de_key: ``str``, optional, default, ``de_res`` Keyword of DE result stored in ``data.varm``. alpha: ``float``, optional, default: ``0.05`` q-value threshold for getting significant DE genes. Only those with q-value of MWU test no less than ``alpha`` are significant, and thus considered as DE genes. Returns ------- results: ``Dict[str, Dict[str, pd.DataFrame]]`` A Python dictionary containing markers. If DE is performed between clusters, the structure is ``dict[cluster_id]['up' or 'down'][dataframe]``. If DE is performed between conditions within each cluster, the structure is ``dict[cluster_id][condition_id]['up' or 'down'][dataframe]``. 'up' refers to up-regulated genes, which should have 'auroc' > 0.5. 'down' refers to down-regulated genes, which should have 'auroc' < 0.5. Examples -------- >>> marker_dict = pg.markers(data) """ if de_key not in data.varm.keys(): raise ValueError("Please run de_analysis first!") rec_array = data.varm[de_key] gene_names = data.var_names de_clust = len(rec_array.dtype.names[0].split(":")) == 2 from collections import defaultdict if de_clust: clust2cols = defaultdict(list) for name in rec_array.dtype.names: clust_id, col_name = name.split(":") clust2cols[clust_id].append(col_name) results = defaultdict(dict) for clust_id, col_names in clust2cols.items(): _assemble_df(res_dict=results[clust_id], rec_array=rec_array, prefix=clust_id, col_names=col_names, gene_names=gene_names, alpha=alpha, head=head, ) else: clust2cond2cols = defaultdict(lambda: defaultdict(list)) for name in rec_array.dtype.names: clust_id, cond_id, col_name = name.split(":") clust2cond2cols[clust_id][cond_id].append(col_name) results = defaultdict(lambda: defaultdict(dict)) for clust_id, cond2cols in clust2cond2cols.items(): for cond_id, col_names in cond2cols.items(): _assemble_df(res_dict=results[clust_id][cond_id], rec_array=rec_array, prefix=":".join([clust_id, cond_id]), col_names=col_names, gene_names=gene_names, alpha=alpha, head=head, ) return results
[docs]@timer(logger=logger) def write_results_to_excel( results: Union[Dict[str, Dict[str, pd.DataFrame]], Dict[str, Dict[str, Dict[str, pd.DataFrame]]]], output_file: str, ndigits: int = 3, ) -> None: """ Write DE analysis results into Excel workbook. Parameters ---------- results: ``Dict[str, Dict[str, pd.DataFrame]]``, or ``Dict[str, Dict[str, Dict[str, pd.DataFrame]]]`` DE marker dictionary generated by ``pg.markers``. output_file: ``str`` File name to which the marker dictionary is written. ndigits: ``int``, optional, default: ``3`` Round non p-values and q-values to ``ndigits`` after decimal point in the excel. Returns ------- ``None`` Marker information is written to file with name ``output_file``. In the generated Excel workbook, * If ``condition`` is ``None`` in ``pg.de_analysis``: Each tab stores DE result of up/down-regulated genes of cells within one cluster, and its name follows the pattern: **"cluster_id|up"** or **"cluster_id|dn"**. * If ``condition`` is not ``None`` in ``pg.de_analysis``: Each tab stores DE result of up/down-regulated genes of cells within one cluster under one condition level. The tab's name follows the pattern: **"cluster_id|cond_level|up"** or **"cluster_id|cond_level|dn"**. * Notice that the tab name in Excel only allows at most 31 characters. Therefore, some of the resulting tab names may be truncated if their names are longer than this threshold. Examples -------- >>> pg.write_results_to_excel(marker_dict, "result.de.xlsx") """ import xlsxwriter from natsort import natsorted # Need to make sure tab characters < 32 see here: https://xlsxwriter.readthedocs.io/workbook.html#:~:text=The%20worksheet%20name%20must%20be,be%20less%20than%2032%20characters. def compute_abbrevs(clust_ids: List[str], clust_de: bool, cond_ids: List[str], clust_abbrevs: List[str], cond_abbrevs: List[str]) -> None: COND_MLEN=7 # condition name should take at most 7 characters TOTAL_LEN=28 # excluding the last 3 |up or |dn clust_mlen = TOTAL_LEN if not clust_de: max_cond_len = max([len(x) for x in cond_ids]) max_cond_len = min(max_cond_len, COND_MLEN) for cond_id in cond_ids: cond_abbrevs.append(cond_id[:max_cond_len]) clust_mlen -= max_cond_len + 1 for clust_id in clust_ids: if len(clust_id) <= clust_mlen: clust_abbrevs.append(clust_id) else: # Extract suffix like '-2', in which Pegasus uses to distinguish clusters with the same cell type. import re suf = "" match = re.search("\-\d+$", clust_id) if match is not None: suf = match.group() clust_id = clust_id[:match.start()] # Split by space fields = re.split("\s+", clust_id) if fields[-1].lower() in ["cell", "cells"]: fields = fields[:-1] abbr = " ".join(fields) + suf if len(abbr) <= clust_mlen: clust_abbrevs.append(abbr) else: fsize = len(fields) len_left = clust_mlen - len(suf) - (fsize - 1) assert len_left >= fsize endpos = [0] * fsize while len_left > 0: for i in range(fsize-1, -1, -1): if endpos[i] < len(fields[i]): endpos[i] += 1 len_left -= 1 if len_left == 0: break clust_abbrevs.append(" ".join([fields[i][:endpos[i]] for i in range(fsize)]) + suf) def format_short_output_cols( df_orig: pd.DataFrame, ndigits: int ) -> pd.DataFrame: """ Round related float columns to ndigits decimal points. """ df = pd.DataFrame(df_orig, copy = True) # copy must be true, otherwise the passed df_orig will be modified. cols = [] for name in df.columns: if (not name.endswith("pval")) and (not name.endswith("qval")): cols.append(name) df.loc[:, cols] = df.loc[:, cols].round(ndigits) return df def add_worksheet( workbook: "workbook", df_orig: pd.DataFrame, sheet_name: str ) -> None: """ Add one worksheet with content as df """ df = format_short_output_cols(df_orig, ndigits) df.reset_index(inplace=True) worksheet = workbook.add_worksheet(name=sheet_name) if df.shape[0] > 0: worksheet.add_table( 0, 0, df.index.size, df.columns.size - 1, { "data": df.to_numpy(), "style": "Table Style Light 1", "first_column": True, "header_row": True, "columns": [{"header": x} for x in df.columns.values], }, ) else: worksheet.write_row(0, 0, df.columns.values) workbook = xlsxwriter.Workbook(output_file, {"nan_inf_to_errors": True}) workbook.formats[0].set_font_size(9) clust_ids = natsorted(results.keys()) clust_de = isinstance(list(list(results.values())[0].values())[0], pd.DataFrame) cond_ids = natsorted(list(results.values())[0].keys()) if not clust_de else None clust_abbrevs = [] cond_abbrevs = [] if not clust_de else None compute_abbrevs(clust_ids, clust_de, cond_ids, clust_abbrevs, cond_abbrevs) for clust_id, clust_abbr in zip(clust_ids, clust_abbrevs): if clust_de: add_worksheet(workbook, results[clust_id]["up"], f"{clust_abbr}|up") add_worksheet(workbook, results[clust_id]["down"], f"{clust_abbr}|dn") else: cond_dict = results[clust_id] for cond_id, cond_abbr in zip(cond_ids, cond_abbrevs): add_worksheet(workbook, cond_dict[cond_id]["up"], f"{clust_abbr}|{cond_abbr}|up") add_worksheet(workbook, cond_dict[cond_id]["down"], f"{clust_abbr}|{cond_abbr}|dn") workbook.close() logger.info("Excel spreadsheet is written.")
def cluster_specific_markers( markers: Dict[str, Dict[str, pd.DataFrame]], clust_id: str, min_auroc: float = 0.7, expected_pfc: float = 10.0, n_lo: int = 25, n_up: int = 50, ) -> pd.DataFrame: """ Extract cluster-specific markers from DE results ``markers``. This function extracts cluster-specific markers (e.g. with auroc >= min_auroc and high in percentage fold change). The extracted markers can be screened for signatures representing the cluster. The selection procedure is as follows: First, pick genes with AUROC >= min_auroc and pfc (percentage fold change) >= expected_pfc. If the number is between [n_lo, n_up], return the subset of markers containing only these genes. Otherwise, if the number < n_lo, extend the gene set to include up to n_lo genes in descending order of their pfc. If the number > n_up, truncate the set by keeping only n_up genes with highest pfc. Parameters ---------- markers: ``Dict[str, Dict[str, pd.DataFrame]]`` Markers from `de_analysis`. clust_id: ``str`` Cluster ID to tell which cluster to focus on. min_auroc: ``float``, default, ``0.7`` Minimum AUROC for a gene. expected_pfc: ``float``, optional, default: ``10.0`` Expected percentage fold change for a gene. n_lo: ``int``, optional, default: ``25`` Lower bound (inclusive) on the number of genes to return. n_up: ``int``, optional, default: ``50`` Upper bound (inclusive) on the number of genes to return. Returns ------- results: ``pd.DataFrame`` A Python dataframe containing selected markers, ranking in descending order with respect to AUROC. Examples -------- >>> candidates = pg.cluster_specific_markers(markers, 'Mono') """ df = markers[clust_id]['up'] idx_auc = df['auroc'] >= min_auroc idx_epf = df['percentage_fold_change'] >= expected_pfc idx = idx_auc & idx_epf n = idx.sum() if n >= n_lo and n <= n_up: return df[idx] else: res = df[idx_auc].sort_values('percentage_fold_change', ascending=False) res = res.iloc[0:(n_lo if n < n_lo else n_up)].sort_values('auroc', ascending=False) return res @timer(logger=logger) def run_de_analysis( input_file: str, output_excel_file: str, cluster: str, condition: Optional[str] = None, de_key: str = "de_res", n_jobs: int = -1, auc: bool = True, t: bool = True, fisher: bool = False, mwu: bool = False, temp_folder: str = None, verbose: bool = True, alpha: float = 0.05, ndigits: int = 3, ) -> None: """ For command line only """ from pegasusio import read_input, write_output data = read_input(input_file, mode='r') de_analysis( data, cluster, condition=condition, de_key=de_key, n_jobs=n_jobs, t=t, fisher=fisher, temp_folder=temp_folder, verbose=verbose, ) write_output(data, input_file) logger.info(f"Differential expression results are written to varm/{de_key}.") results = markers(data, de_key=de_key, alpha=alpha) write_results_to_excel(results, output_excel_file, ndigits=ndigits)