import logging
logger = logging.getLogger(__name__)
import pandas as pd
from pegasus.tools import predefined_pathways, load_signatures_from_file
from pegasusio import MultimodalData, UnimodalData, timer
from typing import Union, Optional
[docs]@timer(logger=logger)
def fgsea(
data: Union[MultimodalData, UnimodalData],
log2fc_key: str,
pathways: str,
de_key: Optional[str] = "de_res",
minSize: Optional[int] = 15,
maxSize: Optional[int] = 500,
nproc: Optional[int] = 0,
seed: Optional[int] = 0,
fgsea_key: Optional[str] = "fgsea_out",
) -> None:
"""Perform Gene Set Enrichment Analysis using fGSEA. This function calls R package fGSEA, requiring fGSEA in R installed.
Parameters
----------
data: Union[``MultimodalData``, ``UnimodalData``]
Single-cell or pseudo-bulk data.
log2fc_key: ``str``
Key in pre-computed DE results representing log2 fold change.
pathways: ``str``
Either a string or a path to the gene set file in GMT format. If string, choosing from "hallmark" and "canonical_pathways" (MSigDB H and C2/CP).
de_key: ``str``, optional, default: ``"de_res"``
Key name of DE analysis results stored. data.varm[de_key] should contain a record array of DE results.
minSize: ``int``, optional, default: ``15``
Minimal size of a gene set to consider.
maxSize: ``int``, optional, default: ``500``
Maximal size of a gene set to consider.
nproc: ``int``, optional, default: ``0``
Numbr of processes for parallel computation. If nproc > 0, set BPPARAM.
seed: ``int``, optional, default: ``0``
Random seed to make sure fGSEA results are reproducible.
fgsea_key: ``str``, optional, default: ``"fgsea_out"``
Key to use to store fGSEA results as a data frame.
Returns
-------
``None``
Update ``data.uns``:
``data.uns[fgsea_key]``: fGSEA outputs sorted by padj.
Examples
--------
>>> pg.fgsea(data, '3:log2FC', hallmark', fgsea_key='fgsea_res')
"""
try:
import rpy2.robjects as ro
from rpy2.robjects import pandas2ri
from rpy2.robjects.packages import importr
from rpy2.robjects.conversion import localconverter
except ModuleNotFoundError as e:
import sys
logger.error(f"{e}\nNeed rpy2! Try 'pip install rpy2'.")
sys.exit(-1)
try:
fgsea = importr("fgsea")
except ModuleNotFoundError:
import sys
text = """Please install fgsea in order to run this function.\n
To install this package, start R and enter:\n
if (!require("BiocManager", quietly = TRUE))
install.packages("BiocManager")
BiocManager::install("fgsea")"""
logger.error(text)
sys.exit(-1)
ro.r(f"set.seed({seed})")
pwdict = load_signatures_from_file(predefined_pathways.get(pathways, pathways))
pathways_r = ro.ListVector(pwdict)
log2fc = ro.FloatVector(data.varm[de_key][log2fc_key])
log2fc.names = ro.StrVector(data.var_names)
res = fgsea.fgsea(pathways_r, log2fc, minSize=minSize, maxSize=maxSize, nproc=nproc)
unlist = ro.r(
"""
function(df) {
df$leadingEdge <- sapply(df$leadingEdge, function(x) {paste(unlist(x), collapse=',')})
return(df)
}
"""
)
with localconverter(ro.default_converter + pandas2ri.converter):
res_df = ro.conversion.rpy2py(unlist(res))
res_df.sort_values("padj", inplace=True)
data.uns[fgsea_key] = res_df
[docs]@timer(logger=logger)
def write_fgsea_results_to_excel(
data: Union[MultimodalData, UnimodalData],
output_file: str,
fgsea_key: Optional[str] = "fgsea_out",
ndigits: Optional[int] = 3,
) -> None:
"""Write Gene Set Enrichment Analysis (GSEA) results generated by fgsea function into Excel workbook.
Parameters
----------
data: Union[``MultomodalData``, ``UnimodalData``]
Single-cell or pseudo-bulk data.
output_file: ``str``
File name for the output.
fgsea_key: ``str``, optinoal, default: ``"fgsea_out"``
Key name of GSEA results stored in ``data.uns`` field.
ndigits: ``int``, optional, default: ``3``
Round non p-values and q-values to ``ndigits`` after decimal point in the excel.
"""
assert fgsea_key in data.uns.keys(), f"Key '{fgsea_key}' does not exist in data.uns!"
import xlsxwriter
def format_short_output_cols(
df_orig: pd.DataFrame, ndigits: int
) -> pd.DataFrame:
""" Round related float columns to ndigits decimal points.
"""
df = pd.DataFrame(df_orig, copy = True) # copy must be true, otherwise the passed df_orig will be modified.
cols = []
for name in df.columns:
if (not name.endswith("pval")) and (not name.endswith("qval")):
cols.append(name)
df.loc[:, cols] = df.loc[:, cols].round(ndigits)
return df
def add_worksheet(
workbook: "workbook", df_orig: pd.DataFrame, sheet_name: str
) -> None:
""" Add one worksheet with content as df
"""
df = format_short_output_cols(df_orig, ndigits)
df.reset_index(inplace=True)
worksheet = workbook.add_worksheet(name=sheet_name)
if df.shape[0] > 0:
worksheet.add_table(
0,
0,
df.index.size,
df.columns.size - 1,
{
"data": df.to_numpy(),
"style": "Table Style Light 1",
"first_column": True,
"header_row": True,
"columns": [{"header": x} for x in df.columns.values],
},
)
else:
worksheet.write_row(0, 0, df.columns.values)
try:
workbook = xlsxwriter.Workbook(output_file, {"nan_inf_to_errors": True})
workbook.formats[0].set_font_size(9)
df_fgsea = pd.DataFrame(data.uns[fgsea_key])
add_worksheet(workbook, df_fgsea.loc[df_fgsea['NES']>0].set_index("pathway"), "UP")
add_worksheet(workbook, df_fgsea.loc[df_fgsea['NES']<0].set_index("pathway"), "DOWN")
logger.info("Excel spreadsheet is written.")
finally:
workbook.close()