Source code for pegasus.pseudo.convenient

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import logging
logger = logging.getLogger(__name__)

from pegasusio import UnimodalData, timer
from typing import Union, Dict, Optional, Tuple



[docs]def markers( pseudobulk: UnimodalData, head: int = None, de_key: str = "deseq2", alpha: float = 0.05, ) -> Dict[str, pd.DataFrame]: """ Extract pseudobulk DE results into a human readable structure. This function extracts information from ``data.varm[de_key]``, and return as a human readible dictionary of pandas DataFrame objects. Parameters ---------- pseudobulk: ``UnimodalData`` Pseudobulk data matrix with rows for cells and columns for genes. head: ``int``, optional, default: ``None`` List only top ``head`` genes. If ``None``, show any DE genes. de_key: ``str``, optional, default, ``deseq2`` Keyword of DE result stored in ``data.varm``. alpha: ``float``, optional, default: ``0.05`` q-value threshold for getting significant DE genes. Only those with q-value of MWU test no less than ``alpha`` are significant, and thus considered as DE genes. Returns ------- results: ``Dict[str, pd.DataFrame]`` A Python dictionary containing DE results. This dictionary contains two keywords: 'up' and 'down'. 'up' refers to up-regulated genes, which should have 'log2FoldChange' > 0.5. 'down' refers to down-regulated genes, which should have 'log2FoldChange' < 0.5. Examples -------- >>> markers = pg.pseudo.markers(pseudobulk) """ if de_key not in pseudobulk.varm.keys(): raise ValueError("Please run DE analysis first") res_dict = {} df = pd.DataFrame(data=pseudobulk.varm[de_key], index=pseudobulk.var_names) idx = df["padj"] <= alpha idx_up = idx & (df["log2FoldChange"].values > 0.0) df_up = df.loc[idx_up].sort_values(by="log2FoldChange", ascending=False, inplace=False) res_dict["up"] = pd.DataFrame(df_up if head is None else df_up.iloc[0:head]) idx_down = idx & (df["log2FoldChange"].values < 0.0) df_down = df.loc[idx_down].sort_values(by="log2FoldChange", ascending=True, inplace=False) res_dict["down"] = pd.DataFrame(df_down if head is None else df_down.iloc[0:head]) return res_dict
[docs]@timer(logger=logger) def write_results_to_excel( results: Dict[str, pd.DataFrame], output_file: str, ndigits: int = 3, ) -> None: """ Write pseudo-bulk DE analysis results into a Excel workbook. Parameters ---------- results: ``Dict[str, pd.DataFrame]`` DE marker dictionary generated by ``pg.pseudo.markers``. output_file: ``str`` File name to which the marker dictionary is written. ndigits: ``int``, optional, default: ``3`` Round non p-values and q-values to ``ndigits`` after decimal point in the excel. Returns ------- ``None`` Marker information is written to file with name ``output_file``. In the generated Excel workbook, each tab stores DE result of up (**up**) / down (**down**)-regulated genes. Examples -------- >>> pg.pseudo.write_results_to_excel(marker_dict, "result.de.xlsx") """ import xlsxwriter def format_short_output_cols( df_orig: pd.DataFrame, ndigits: int = 3 ) -> pd.DataFrame: """ Round related float columns to ndigits decimal points. """ df = pd.DataFrame(df_orig, copy = True) # copy must be true, otherwise the passed df_orig will be modified. cols = [] for name in df.columns: if (not name.endswith("pval")) and (not name.endswith("qval")): cols.append(name) df.loc[:, cols] = df.loc[:, cols].round(ndigits) return df def add_worksheet( workbook: "workbook", df_orig: pd.DataFrame, sheet_name: str ) -> None: """ Add one worksheet with content as df """ df = format_short_output_cols(df_orig) df.reset_index(inplace=True) worksheet = workbook.add_worksheet(name=sheet_name) if df.shape[0] > 0: worksheet.add_table( 0, 0, df.index.size, df.columns.size - 1, { "data": df.to_numpy(), "style": "Table Style Light 1", "first_column": True, "header_row": True, "columns": [{"header": x} for x in df.columns.values], }, ) else: worksheet.write_row(0, 0, df.columns.values) workbook = xlsxwriter.Workbook(output_file, {"nan_inf_to_errors": True}) workbook.formats[0].set_font_size(9) add_worksheet(workbook, results["up"], "up") add_worksheet(workbook, results["down"], "dn") workbook.close() logger.info("Excel spreadsheet is written.")
[docs]def volcano( pseudobulk: UnimodalData, de_key: str = "deseq2", qval_threshold: float = 0.05, log2fc_threshold: float = 1.0, top_n: int = 20, panel_size: Optional[Tuple[float, float]] = (6, 4), return_fig: Optional[bool] = False, dpi: Optional[float] = 300.0, ) -> Union[plt.Figure, None]: """ Generate Volcano plots (-log10 p value vs. log2 fold change) for visualizing DE results. Parameters ----------- pseudobulk: ``UnimodalData`` object. Pseudobulk data matrix. de_key: ``str``, optional, default: ``deseq2`` The varm keyword for DE results. data.varm[de_key] should store the full DE result table. qval_threshold: ``float``, optional, default: 0.05. Selected FDR rate. A horizontal line indicating this rate will be shown in the figure. log2fc_threshold: ``float``, optional, default: 1.0 Log2 fold change threshold to highlight biologically interesting genes. Two vertical lines representing negative and positive log2 fold change will be shown. top_n: ``int``, optional, default: ``20`` Number of top DE genes to show names. Genes are ranked by Log2 fold change. panel_size: ``Tuple[float, float]``, optional, default: ``(6, 4)`` The size (width, height) in inches of figure. return_fig: ``bool``, optional, default: ``False`` Return a ``Figure`` object if ``True``; return ``None`` otherwise. dpi: ``float``, optional, default: ``300.0`` The resolution in dots per inch. Returns -------- ``Figure`` object A ``matplotlib.figure.Figure`` object containing the dot plot if ``return_fig == True`` Examples --------- >>> pg.pseudo.volcano(pseudobulk, dpi=200) """ if de_key not in pseudobulk.varm: logger.warning(f"Cannot find DE results '{de_key}'. Please conduct DE analysis first!") return None de_res = pseudobulk.varm[de_key] fcstr = "log2FoldChange" pstr = "pvalue" qstr = "padj" columns = de_res.dtype.names if (fcstr not in columns) or (pstr not in columns) or (qstr not in columns): logger.warning("Please conduct DE test first!") return None log2fc = de_res[fcstr] pvals = de_res[pstr] pvals[pvals == 0.0] = 1e-45 # very small pvalue to avoid log10 0 neglog10p = -np.log10(pvals) yconst = min(neglog10p[de_res[qstr] <= qval_threshold]) from pegasus.plotting.plot_utils import _get_subplot_layouts fig, ax = _get_subplot_layouts(panel_size=panel_size, dpi=dpi) idxsig = neglog10p >= yconst idxnsig = neglog10p < yconst idxfc = (log2fc <= -log2fc_threshold) | (log2fc >= log2fc_threshold) idxnfc = ~idxfc idx = idxnsig & idxnfc ax.scatter(log2fc[idx], neglog10p[idx], s=5, c='k', marker='o', linewidths=0.5, alpha=0.5, label="NS") idx = idxnsig & idxfc ax.scatter(log2fc[idx], neglog10p[idx], s=5, c='g', marker='o', linewidths=0.5, alpha=0.5, label=r"Log$_2$ FC") idx = idxsig & idxnfc ax.scatter(log2fc[idx], neglog10p[idx], s=5, c='b', marker='o', linewidths=0.5, alpha=0.5, label=r"q-value") idx = idxsig & idxfc ax.scatter(log2fc[idx], neglog10p[idx], s=5, c='r', marker='o', linewidths=0.5, alpha=0.5, label=r"q-value and log$_2$ FC") ax.set_xlabel(r"Log$_2$ fold change") ax.set_ylabel(r"$-$Log$_{10}$ $P$") legend = ax.legend( loc="center", bbox_to_anchor=(0.5, 1.1), frameon=False, fontsize=8, ncol=4, ) for handle in legend.legendHandles: # adjust legend size handle.set_sizes([50.0]) ax.axhline(y = yconst, c = 'k', lw = 0.5, ls = '--') ax.axvline(x = -log2fc_threshold, c = 'k', lw = 0.5, ls = '--') ax.axvline(x = log2fc_threshold, c = 'k', lw = 0.5, ls = '--') texts = [] idx = np.where(idxsig & (log2fc >= log2fc_threshold))[0] posvec = np.argsort(log2fc[idx])[::-1][0:top_n] for pos in posvec: gid = idx[pos] texts.append(ax.text(log2fc[gid], neglog10p[gid], pseudobulk.var_names[gid], fontsize=5)) idx = np.where(idxsig & (log2fc <= -log2fc_threshold))[0] posvec = np.argsort(log2fc[idx])[0:top_n] for pos in posvec: gid = idx[pos] texts.append(ax.text(log2fc[gid], neglog10p[gid], pseudobulk.var_names[gid], fontsize=5)) from adjustText import adjust_text adjust_text(texts, arrowprops=dict(arrowstyle='-', color='k', lw=0.5)) return fig if return_fig else None