Source code for pegasus.tools.gsea

import logging

logger = logging.getLogger(__name__)

import pandas as pd
import numpy as np

from pegasus.tools import predefined_pathways, load_signatures_from_file, eff_n_jobs
from pegasusio import MultimodalData, UnimodalData, timer
from typing import Union, Tuple


[docs]@timer(logger=logger) def gsea( data: Union[MultimodalData, UnimodalData], rank_key: str, pathways: str, method: str = "gseapy", gsea_key: str = "gsea_out", min_size: int = 15, max_size: int = 500, n_jobs: int = 4, seed: int = 0, verbose: bool = True, **kwargs, ) -> None: """Perform Gene Set Enrichment Analysis (GSEA). Parameters ---------- data: ``MultimodalData`` or ``UnimodalData`` Single-cell or pseudo-bulk data. rank_key: ``str`` Key in pre-computed DE results representing gene ranks. The string format is ``de_key:attr``, where ``de_key`` is the key of DE results in ``data.varm``, and ``attr`` is the column name in ``data.varm['de_key']`` used as the gene signatures for GSEA analysis. pathways: ``str`` Either a keyword or a path to the gene set file in GMT format. If keyword, choosing from "hallmark" and "canonical_pathways" (MSigDB H and C2/CP). method: ``str``, optional, default: ``gseapy`` Specify which package to use as the backend for GSEA. By default ``gseapy``, use GSEAPY's prerank method. Notice that ``permutation_num=1000`` is set by default. If you want to change these parameters, please reset in ``kwargs``. Alternatively, if specify ``fgsea``, then use R package ``fgsea``, which requires ``rpy2`` and R installation. gsea_key: ``str``, optional, default: ``"gsea_out"`` Key to use to store GSEA results as a data frame. min_size: ``int``, optional, default: ``15`` Minimum allowed number of genes from gene set also the data set. max_size: ``int``, optional, default: ``500`` Maximum allowed number of genes from gene set also the data set. n_jobs: ``int``, optional, default: ``4`` Numbr of threads used for parallel computation. seed: ``int``, optional, default: ``0`` Random seed to make sure GSEA results are reproducible. verbose: ``bool``, optional, default: ``True`` If printing out progress of the job. Only works when ``method="gseapy"``. kwargs If ``method="gseapy"``, pass other keyword arguments to ``gseapy.prerank`` function. Details about GSEAPY prerank function's optional parameters are `here <https://gseapy.readthedocs.io/en/latest/run.html#gseapy.prerank>`_. Returns ------- ``None`` Update ``data.uns``: ``data.uns[gsea_key]``: GSEA outputs sorted by q-values (adjusted p-values) in ascending order, and if q-values are equal, sort by NES scores in descending order. Examples -------- >>> pg.gsea(data, "deseq2:stat", "canonical_pathways") >>> pg.gsea(data, "de_res:1:mwu_U", "canonical_pathways", method="fgsea") """ if method == "gseapy": _run_gseapy( data=data, rank_key=rank_key, pathways=pathways, gsea_key=gsea_key, min_size=min_size, max_size=max_size, n_jobs=n_jobs, seed=seed, verbose=verbose, **kwargs, ) else: _run_fgsea( data=data, rank_key=rank_key, pathways=pathways, gsea_key=gsea_key, min_size=min_size, max_size=max_size, n_jobs=n_jobs, seed=seed, )
def _decide_qval_str(rank_key): qstr = "padj" if ":" in rank_key: # pegasus.de_analysis result prefix = rank_key.split("_")[0] qstr = f"{prefix}_qval" return qstr def _validate_keys(data, de_key, rank_key): if de_key not in data.varm: import sys logger.error(f"Key '{de_key}' not in data.varm! Wrong key name or need to run DE analysis first!") sys.exit(-1) if rank_key not in data.varm[de_key].dtype.names: import sys logger.error(f"Key '{rank_key}' not in DE result! Wrong key name specified!") sys.exit(-1) def _split_rank_key(rank_key: str) -> Tuple[str, str]: if ":" not in rank_key: import sys logger.error(f"Rank key '{rank_key}' does not satisfy format 'de_key:attr'!") sys.exit(-1) keys = rank_key.split(":") de_key = keys[0] attr_key = ":".join(keys[1:]) return (de_key, attr_key) def _run_gseapy( data: Union[MultimodalData, UnimodalData], rank_key: str, pathways: str, gsea_key: str, min_size: int, max_size: int, n_jobs: int, seed: int, verbose: bool, **kwargs, ) -> None: try: import gseapy as gp except ModuleNotFoundError as e: import sys logger.error(f"{e}\nNeed gseapy! Try 'pip install gseapy'.") sys.exit(-1) de_key, attr_key = _split_rank_key(rank_key) _validate_keys(data, de_key, attr_key) rank_df = pd.DataFrame(data.varm[de_key], index=data.var_names) rank_df.dropna(axis=0, how='any', inplace=True) # Remove genes with NaN values (especially NaN q-values) rank_df = rank_df.reset_index().rename(columns={rank_df.index.name: "gene"})[["gene", attr_key]] gene_sets = load_signatures_from_file(predefined_pathways.get(pathways, pathways)) n_jobs = eff_n_jobs(n_jobs) res = gp.prerank( rnk=rank_df, gene_sets=gene_sets, min_size=min_size, max_size=max_size, threads=n_jobs, seed=seed, verbose=verbose, **kwargs, ) res_df = res.res2d res_df.rename(columns={"FDR q-val": "padj", "Term": "pathway"}, inplace=True) res_df["ES"] = res_df["ES"].astype(np.float64) res_df["NES"] = res_df["NES"].astype(np.float64) res_df["NES abs"] = np.abs(res_df["NES"]) res_df["padj"] = res_df["padj"].astype(np.float64) res_df.sort_values(["padj", "NES abs", "pathway"], ascending=[True, False, True], inplace=True) data.uns[gsea_key] = res_df.drop(columns={"Name", "NES abs"}) # Remove redundant columns def _run_fgsea( data: MultimodalData, rank_key: str, pathways: str, gsea_key: str, min_size: int, max_size: int, n_jobs: int, seed: int, ) -> None: try: import rpy2.robjects as ro from rpy2.robjects import pandas2ri from rpy2.robjects.packages import importr from rpy2.robjects.conversion import localconverter except ModuleNotFoundError as e: import sys logger.error(f"{e}\nNeed rpy2! Try 'pip install rpy2'.") sys.exit(-1) try: fgsea = importr("fgsea") except ModuleNotFoundError: import sys text = """Please install fgsea in order to run this function.\n To install this package, start R and enter:\n if (!require("BiocManager", quietly = TRUE)) install.packages("BiocManager") BiocManager::install("fgsea")""" logger.error(text) sys.exit(-1) nproc = eff_n_jobs(n_jobs) if nproc == 1: nproc = 0 # Avoid setting BPPARAM when only using 1 thread ro.r(f"set.seed({seed})") pwdict = load_signatures_from_file(predefined_pathways.get(pathways, pathways)) pathways_r = ro.ListVector(pwdict) de_key, attr_key = _split_rank_key(rank_key) _validate_keys(data, de_key, attr_key) rank_df = pd.DataFrame(data.varm[de_key], index=data.var_names) rank_df.dropna(axis=0, how='any', inplace=True) # Remove genes with NaN values (especially NaN q-values) rank_vec = ro.FloatVector(rank_df[attr_key].values) rank_vec.names = ro.StrVector(rank_df.index.values) res = fgsea.fgsea(pathways_r, rank_vec, minSize=min_size, maxSize=max_size, nproc=nproc) unlist = ro.r( """ function(df) { df$leadingEdge <- sapply(df$leadingEdge, function(x) {paste(unlist(x), collapse=',')}) return(df) } """ ) with localconverter(ro.default_converter + pandas2ri.converter): res_df = ro.conversion.rpy2py(unlist(res)) res_df["NES abs"] = np.abs(res_df["NES"]) res_df.sort_values(["padj", "NES", "pathway"], ascending=[True, False, True], inplace=True) data.uns[gsea_key] = res_df.drop(columns={"NES abs"}) # Remove redundant columns
[docs]@timer(logger=logger) def write_gsea_results_to_excel( data: Union[MultimodalData, UnimodalData], output_file: str, gsea_key: str = "gsea_out", ndigits: int = 3, ) -> None: """Write Gene Set Enrichment Analysis (GSEA) results into Excel workbook. Parameters ---------- data: ``MultomodalData`` or ``UnimodalData`` Single-cell or pseudo-bulk data. output_file: ``str`` File name for the output. gsea_key: ``str``, optinoal, default: ``gsea_out`` Key name of GSEA results stored in ``data.uns`` field. ndigits: ``int``, optional, default: ``3`` Round non p-values and q-values to ``ndigits`` after decimal point in the excel. Returns ------- ``None`` Pathway information is written to file with name ``output_file``. In the generated Excel workbook, there are two tabs: * ``UP``: Pathways with positive NES scores (``NES`` column), which are sorted by q-values (``padj`` column). * ``DOWN``: Pathways with negative NES scores (``NES`` column), which are sorted by q-values (``padj`` column). Examples -------- >>> pg.write_gsea_results_to_excel(data, "gsea.xlsx") """ if gsea_key not in data.uns: import sys logger.error(f"Key '{gsea_key}' does not exist in data.uns!") sys.exit(-1) import xlsxwriter def format_short_output_cols( df_orig: pd.DataFrame, ndigits: int ) -> pd.DataFrame: """ Round related float columns to ndigits decimal points. """ df = pd.DataFrame(df_orig, copy = True) # copy must be true, otherwise the passed df_orig will be modified. cols = ["ES", "NES"] df.loc[:, cols] = df.loc[:, cols].round(ndigits) return df def add_worksheet( workbook: "workbook", df_orig: pd.DataFrame, sheet_name: str ) -> None: """ Add one worksheet with content as df """ df = format_short_output_cols(df_orig, ndigits) df.reset_index(inplace=True) worksheet = workbook.add_worksheet(name=sheet_name) if df.shape[0] > 0: worksheet.add_table( 0, 0, df.index.size, df.columns.size - 1, { "data": df.to_numpy(), "style": "Table Style Light 1", "first_column": True, "header_row": True, "columns": [{"header": x} for x in df.columns.values], }, ) else: worksheet.write_row(0, 0, df.columns.values) try: workbook = xlsxwriter.Workbook(output_file, {"nan_inf_to_errors": True}) workbook.formats[0].set_font_size(9) df_gsea = pd.DataFrame(data.uns[gsea_key]) add_worksheet(workbook, df_gsea.loc[df_gsea['NES']>0].set_index("pathway"), "UP") add_worksheet(workbook, df_gsea.loc[df_gsea['NES']<0].set_index("pathway"), "DOWN") logger.info("Excel spreadsheet is written.") finally: workbook.close()