import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import logging
logger = logging.getLogger(__name__)
from pegasusio import UnimodalData, timer
from typing import Union, Dict, Optional, Tuple
[docs]def markers(
pseudobulk: UnimodalData,
head: int = None,
de_key: str = "deseq2",
alpha: float = 0.05,
) -> Dict[str, pd.DataFrame]:
""" Extract pseudobulk DE results into a human readable structure.
This function extracts information from ``data.varm[de_key]``, and return as
a human readible dictionary of pandas DataFrame objects.
Parameters
----------
pseudobulk: ``UnimodalData``
Pseudobulk data matrix with rows for cells and columns for genes.
head: ``int``, optional, default: ``None``
List only top ``head`` genes. If ``None``, show any DE genes.
de_key: ``str``, optional, default, ``deseq2``
Keyword of DE result stored in ``data.varm``.
alpha: ``float``, optional, default: ``0.05``
q-value threshold for getting significant DE genes. Only those with q-value of MWU test no less than ``alpha`` are significant, and thus considered as DE genes.
Returns
-------
results: ``Dict[str, pd.DataFrame]``
A Python dictionary containing DE results. This dictionary contains two keywords: 'up' and 'down'.
'up' refers to up-regulated genes, which should have 'log2FoldChange' > 0.5.
'down' refers to down-regulated genes, which should have 'log2FoldChange' < 0.5.
Examples
--------
>>> markers = pg.pseudo.markers(pseudobulk)
"""
if de_key not in pseudobulk.varm.keys():
raise ValueError("Please run DE analysis first")
res_dict = {}
df = pd.DataFrame(data=pseudobulk.varm[de_key], index=pseudobulk.var_names)
idx = df["padj"] <= alpha
idx_up = idx & (df["log2FoldChange"].values > 0.0)
df_up = df.loc[idx_up].sort_values(by="log2FoldChange", ascending=False, inplace=False)
res_dict["up"] = pd.DataFrame(df_up if head is None else df_up.iloc[0:head])
idx_down = idx & (df["log2FoldChange"].values < 0.0)
df_down = df.loc[idx_down].sort_values(by="log2FoldChange", ascending=True, inplace=False)
res_dict["down"] = pd.DataFrame(df_down if head is None else df_down.iloc[0:head])
return res_dict
[docs]@timer(logger=logger)
def write_results_to_excel(
results: Dict[str, pd.DataFrame],
output_file: str,
ndigits: int = 3,
) -> None:
""" Write pseudo-bulk DE analysis results into a Excel workbook.
Parameters
----------
results: ``Dict[str, pd.DataFrame]``
DE marker dictionary generated by ``pg.pseudo.markers``.
output_file: ``str``
File name to which the marker dictionary is written.
ndigits: ``int``, optional, default: ``3``
Round non p-values and q-values to ``ndigits`` after decimal point in the excel.
Returns
-------
``None``
Marker information is written to file with name ``output_file``. In the generated Excel workbook, each tab stores DE result of up (**up**) / down (**down**)-regulated genes.
Examples
--------
>>> pg.pseudo.write_results_to_excel(marker_dict, "result.de.xlsx")
"""
import xlsxwriter
def format_short_output_cols(
df_orig: pd.DataFrame, ndigits: int = 3
) -> pd.DataFrame:
""" Round related float columns to ndigits decimal points.
"""
df = pd.DataFrame(df_orig, copy = True) # copy must be true, otherwise the passed df_orig will be modified.
cols = []
for name in df.columns:
if (not name.endswith("pval")) and (not name.endswith("qval")):
cols.append(name)
df.loc[:, cols] = df.loc[:, cols].round(ndigits)
return df
def add_worksheet(
workbook: "workbook", df_orig: pd.DataFrame, sheet_name: str
) -> None:
""" Add one worksheet with content as df
"""
df = format_short_output_cols(df_orig)
df.reset_index(inplace=True)
worksheet = workbook.add_worksheet(name=sheet_name)
if df.shape[0] > 0:
worksheet.add_table(
0,
0,
df.index.size,
df.columns.size - 1,
{
"data": df.to_numpy(),
"style": "Table Style Light 1",
"first_column": True,
"header_row": True,
"columns": [{"header": x} for x in df.columns.values],
},
)
else:
worksheet.write_row(0, 0, df.columns.values)
workbook = xlsxwriter.Workbook(output_file, {"nan_inf_to_errors": True})
workbook.formats[0].set_font_size(9)
add_worksheet(workbook, results["up"], "up")
add_worksheet(workbook, results["down"], "dn")
workbook.close()
logger.info("Excel spreadsheet is written.")
[docs]def volcano(
pseudobulk: UnimodalData,
de_key: str = "deseq2",
qval_threshold: float = 0.05,
log2fc_threshold: float = 1.0,
top_n: int = 20,
panel_size: Optional[Tuple[float, float]] = (6, 4),
return_fig: Optional[bool] = False,
dpi: Optional[float] = 300.0,
) -> Union[plt.Figure, None]:
"""
Generate Volcano plots (-log10 p value vs. log2 fold change) for visualizing DE results.
Parameters
-----------
pseudobulk: ``UnimodalData`` object.
Pseudobulk data matrix.
de_key: ``str``, optional, default: ``deseq2``
The varm keyword for DE results. data.varm[de_key] should store the full DE result table.
qval_threshold: ``float``, optional, default: 0.05.
Selected FDR rate. A horizontal line indicating this rate will be shown in the figure.
log2fc_threshold: ``float``, optional, default: 1.0
Log2 fold change threshold to highlight biologically interesting genes. Two vertical lines representing negative and positive log2 fold change will be shown.
top_n: ``int``, optional, default: ``20``
Number of top DE genes to show names. Genes are ranked by Log2 fold change.
panel_size: ``Tuple[float, float]``, optional, default: ``(6, 4)``
The size (width, height) in inches of figure.
return_fig: ``bool``, optional, default: ``False``
Return a ``Figure`` object if ``True``; return ``None`` otherwise.
dpi: ``float``, optional, default: ``300.0``
The resolution in dots per inch.
Returns
--------
``Figure`` object
A ``matplotlib.figure.Figure`` object containing the dot plot if ``return_fig == True``
Examples
---------
>>> pg.pseudo.volcano(pseudobulk, dpi=200)
"""
if de_key not in pseudobulk.varm:
logger.warning(f"Cannot find DE results '{de_key}'. Please conduct DE analysis first!")
return None
de_res = pseudobulk.varm[de_key]
fcstr = "log2FoldChange"
pstr = "pvalue"
qstr = "padj"
columns = de_res.dtype.names
if (fcstr not in columns) or (pstr not in columns) or (qstr not in columns):
logger.warning("Please conduct DE test first!")
return None
log2fc = de_res[fcstr]
pvals = de_res[pstr]
pvals[pvals == 0.0] = 1e-45 # very small pvalue to avoid log10 0
neglog10p = -np.log10(pvals)
yconst = min(neglog10p[de_res[qstr] <= qval_threshold])
from pegasus.plotting.plot_utils import _get_subplot_layouts
fig, ax = _get_subplot_layouts(panel_size=panel_size, dpi=dpi)
idxsig = neglog10p >= yconst
idxnsig = neglog10p < yconst
idxfc = (log2fc <= -log2fc_threshold) | (log2fc >= log2fc_threshold)
idxnfc = ~idxfc
idx = idxnsig & idxnfc
ax.scatter(log2fc[idx], neglog10p[idx], s=5, c='k', marker='o', linewidths=0.5, alpha=0.5, label="NS")
idx = idxnsig & idxfc
ax.scatter(log2fc[idx], neglog10p[idx], s=5, c='g', marker='o', linewidths=0.5, alpha=0.5, label=r"Log$_2$ FC")
idx = idxsig & idxnfc
ax.scatter(log2fc[idx], neglog10p[idx], s=5, c='b', marker='o', linewidths=0.5, alpha=0.5, label=r"q-value")
idx = idxsig & idxfc
ax.scatter(log2fc[idx], neglog10p[idx], s=5, c='r', marker='o', linewidths=0.5, alpha=0.5, label=r"q-value and log$_2$ FC")
ax.set_xlabel(r"Log$_2$ fold change")
ax.set_ylabel(r"$-$Log$_{10}$ $P$")
legend = ax.legend(
loc="center",
bbox_to_anchor=(0.5, 1.1),
frameon=False,
fontsize=8,
ncol=4,
)
for handle in legend.legendHandles: # adjust legend size
handle.set_sizes([50.0])
ax.axhline(y = yconst, c = 'k', lw = 0.5, ls = '--')
ax.axvline(x = -log2fc_threshold, c = 'k', lw = 0.5, ls = '--')
ax.axvline(x = log2fc_threshold, c = 'k', lw = 0.5, ls = '--')
texts = []
idx = np.where(idxsig & (log2fc >= log2fc_threshold))[0]
posvec = np.argsort(log2fc[idx])[::-1][0:top_n]
for pos in posvec:
gid = idx[pos]
texts.append(ax.text(log2fc[gid], neglog10p[gid], pseudobulk.var_names[gid], fontsize=5))
idx = np.where(idxsig & (log2fc <= -log2fc_threshold))[0]
posvec = np.argsort(log2fc[idx])[0:top_n]
for pos in posvec:
gid = idx[pos]
texts.append(ax.text(log2fc[gid], neglog10p[gid], pseudobulk.var_names[gid], fontsize=5))
from adjustText import adjust_text
adjust_text(texts, arrowprops=dict(arrowstyle='-', color='k', lw=0.5))
return fig if return_fig else None