import numpy as np
import pandas as pd
from typing import List, Union
from pegasusio import UnimodalData, MultimodalData
import logging
logger = logging.getLogger("pegasus")
[docs]def search_genes(
data: Union[MultimodalData, UnimodalData],
gene_list: List[str],
rec_key: str = "de_res",
measure: str = "percentage",
) -> pd.DataFrame:
"""Extract and display gene expressions for each cluster.
This function helps to see marker expressions in clusters via the interactive python environment.
Parameters
----------
data: ``MultimodalData`` or ``UnimodalData`` object
Annotated data matrix containing the expression matrix and differential expression results.
gene_list: ``List[str]``
A list of gene symbols.
rec_key: ``str``, optional, default: ``"de_res"``
Keyword of DE analysis result stored in ``data.varm``.
measure : ``str``, optional, default: ``"percentage"``
Can be either ``"percentage"`` or ``"mean_logExpr"``:
* ``percentage`` shows the percentage of cells expressed the genes;
* ``mean_logExpr`` shows the mean log expression.
Returns
-------
``pandas.DataFrame``
A data frame containing marker expressions in each cluster.
Examples
--------
>>> results = pg.search_genes(adata, ['CD3E', 'CD4', 'CD8'])
"""
columns = [x for x in data.varm[rec_key].dtype.names if x.endswith(f":{measure}")]
df = pd.DataFrame(data=data.varm[rec_key][columns], index=data.var_names)
return df.reindex(index=gene_list)
[docs]def search_de_genes(
data: Union[MultimodalData, UnimodalData],
gene_list: List[str],
rec_key: str = "de_res",
de_test: str = "fisher",
de_alpha: float = 0.05,
thre: float = 1.5,
) -> pd.DataFrame:
"""Extract and display differential expression analysis results of markers for each cluster.
This function helps to see if markers are up or down regulated in each cluster via the interactive python environment:
* ``++`` indicates up-regulated and fold change >= threshold;
* ``+`` indicates up-regulated but fold change < threshold;
* ``--`` indicates down-regulated and fold change <= 1 / threshold;
* ``-`` indicates down-regulated but fold change > 1 / threshold;
* ``?`` indicates not differentially expressed.
Parameters
----------
data: ``MultimodalData`` or ``UnimodalData`` object
Annotated data matrix containing the expression matrix and differential expression results.
gene_list: ``List[str]``
A list of gene symbols.
rec_key: ``str``, optional, default: ``"de_res"``
Keyword of DE analysis result stored in ``data.varm``.
de_test : ``str``, optional, default: ``"fisher"``
Differential expression test to look at, could be either ``t``, ``fisher`` or ``mwu``.
de_alpha : ``float``, optional, default: ``0.05``
False discovery rate.
thre : ``float``, optional, default: ``1.5``
Fold change threshold to determine if the marker is a strong DE (``++`` or ``--``) or weak DE (``+`` or ``-``).
Returns
-------
``pandas.DataFrame``
A data frame containing marker differential expression results for each cluster.
Examples
--------
>>> df = pegasus.misc.search_de_genes(adata, ['CD3E', 'CD4', 'CD8'], thre = 2.0)
"""
columns = [
x for x in data.varm[rec_key].dtype.names if x.endswith(f":{de_test}_qval")
]
df_de = pd.DataFrame(data.varm[rec_key][columns], index=data.var_names)
df_de = df_de.reindex(index=gene_list)
columns = [
x
for x in data.varm[rec_key].dtype.names
if (
x.endswith(":percentage_fold_change")
if de_test == "fisher"
else x.endswith(":log2FC")
)
]
df_fc = pd.DataFrame(data.varm[rec_key][columns], index=data.var_names)
df_fc = df_fc.reindex(index=gene_list)
if de_test != "fisher":
df_fc = np.exp(df_fc)
results = np.zeros((len(gene_list), len(columns)), dtype=np.dtype("U4"))
results[:] = "?"
results[np.isnan(df_de)] = "NaN"
results[(df_de <= de_alpha).values & (df_fc > 1.0).values] = "+"
results[(df_de <= de_alpha).values & (df_fc >= thre).values] = "++"
results[(df_de <= de_alpha).values & (df_fc < 1.0).values] = "-"
results[(df_de <= de_alpha).values & (df_fc <= 1.0 / thre).values] = "--"
clusts = [x.rpartition(":")[2] for x in columns]
df = pd.DataFrame(data=results, index=gene_list, columns=clusts)
return df
def show_attributes(
input_file: str,
show_attributes: bool,
show_gene_attributes: bool,
show_values_for_attributes: str,
) -> None:
""" Show data attributes. For command line use.
"""
# data = read_input(input_file, mode="r")
# if show_attributes:
# print(
# "Available sample attributes in input dataset: {0}".format(
# ", ".join(data.obs.columns.values)
# )
# )
# if show_gene_attributes:
# print(
# "Available gene attributes in input dataset: {0}".format(
# ", ".join(data.var.columns.values)
# )
# )
# if not show_values_for_attributes is None:
# for attr in show_values_for_attributes.split(","):
# print(
# "Available values for attribute {0}: {1}.".format(
# attr, ", ".join(np.unique(data.obs[attr]))
# )
# )
def perform_oneway_anova(
data: Union[MultimodalData, UnimodalData],
glist: List[str],
restriction_vec: List[str],
group_str: str,
fdr_alpha: float = 0.05,
res_key: str = None,
) -> pd.DataFrame:
"""Perform one way ANOVA on a subset of cells (restricted by restriction_vec) grouped by group_str and control FDR at fdr_alpha.
Parameters
----------
data : `anndata` object
An `anndata` object containing the expression matrix.
glist : `list[str]`
A list of gene symbols.
restriction_vec : `list[str]`
A vector of restrictions for selecting cells. Each restriction takes the format of attr:value,value,value
group_str : `str`
How to group selected cells for ANOVA analysis. If group_str is for pseudotime, it has two formats. 1) 'pseudotime:time:n', which divides cells by equal pseudotime invertal; 2) 'pseudotime:size:n' divides cells by equal number of cells.
fdr_alpha : `float`, optional (default: 0.05)
False discovery rate.
res_key : `str`, optional (default: None)
Store results into data using res_key, the grouping information is stored in obs and the results is stored in uns.
Returns
-------
`pandas.DataFrame`
Results for genes that pass FDR control.
Examples
--------
>>> results = misc.perform_oneway_anova(data, ['CD3E', 'CD4', 'CD8'], [], 'pseudotime:size:10')
"""
from scipy.stats import f_oneway
from statsmodels.stats.multitest import fdrcorrection as fdr
selected = np.ones(data.shape[0], dtype=bool)
for rest_str in restriction_vec:
attr, value_str = rest_str.split(":")
values = value_str.split(",")
selected = selected & np.isin(data.obs[attr], values)
gene_list = np.array(glist)
gene_list = gene_list[np.isin(gene_list, data.var_names)]
ngene = gene_list.size
newdat = data[selected, :][:, gene_list].copy()
newdat.X = newdat.X.toarray()
group_values = group_str.split(":")
group_names = []
col_names = []
ngr = 0
group_idx = None
if group_values[0] == "pseudotime":
assert len(group_values) == 3
div_by = group_values[1]
ngr = int(group_values[2])
group_idx = np.zeros((ngr, newdat.shape[0]), dtype=bool)
pseudotimes = newdat.obs["pseudotime"].values
min_t = pseudotimes.min()
max_t = pseudotimes.max()
if div_by == "time":
interval = (max_t - min_t) / ngr
left = min_t - 1e-5
for i in range(ngr):
right = min_t + interval * (i + 1)
name = "({:.2f}, {:.2f}]".format(left if left >= 0 else 0.0, right)
group_names.append(name)
group_idx[i] = (pseudotimes > left) & (pseudotimes <= right)
left = right
else:
assert div_by == "size"
ords = np.argsort(pseudotimes)
quotient = ords.size // ngr
residule = ords.size % ngr
fr = 0
for i in range(ngr):
to = fr + quotient + (i < residule)
name = "[{:.2f}, {:.2f}]".format(
pseudotimes[ords[fr]], pseudotimes[ords[to - 1]]
)
group_names.append(name)
group_idx[i][ords[fr:to]] = True
fr = to
else:
assert len(group_values) == 2
group_attr = group_values[0]
tmp_str = group_values[1]
groups_str = tmp_str.split(";")
ngr = len(groups_str)
group_idx = np.zeros((ngr, newdat.shape[0]), dtype=bool)
for i, gstr in enumerate(groups_str):
name, values = gstr.split("~")
group_names.append(name)
group_idx[i] = np.isin(newdat.obs[group_attr], values.split(","))
for i in range(ngr):
print("Group {} has {} cells.".format(group_names[i], group_idx[i].sum()))
np.warnings.filterwarnings("ignore")
stats = np.zeros((ngene, 3 + ngr * 2))
for i in range(ngene):
arr_list = []
for j in range(ngr):
arr = newdat.X[group_idx[j], i]
stats[i, 3 + j * 2] = arr.mean()
stats[i, 3 + j * 2 + 1] = (arr > 0).sum() * 100.0 / arr.size
arr_list.append(arr)
stats[i, 0], stats[i, 1] = f_oneway(*arr_list)
if np.isnan(stats[i, 0]):
stats[i, 0] = 0.0
stats[i, 1] = 1.0
passed, stats[:, 2] = fdr(stats[:, 1])
cols = ["fstat", "pval", "qval"]
for i in range(ngr):
cols.extend([group_names[i] + "_mean", group_names[i] + "_percent"])
raw_results = pd.DataFrame(stats, columns=cols, index=gene_list)
results = raw_results[raw_results["qval"] <= fdr_alpha]
results = results.sort_values("qval")
if res_key is not None:
data.uns[res_key] = raw_results
data.obs[res_key] = "background"
for i in range(ngr):
idx = np.zeros(data.shape[0], dtype=bool)
idx[selected] = group_idx[i]
data.obs.loc[idx, res_key] = group_names[i]
return results
[docs]def find_outlier_clusters(data: Union[MultimodalData, UnimodalData], cluster_attr: str, qc_attr: str) -> pd.DataFrame:
""" Using MWU test to detect if any cluster is an outlier regarding one of the qc attributes: n_genes, n_counts, percent_mito.
Parameters
----------
data: ``MultimodalData`` or ``UnimodalData`` object
Annotated data matrix with rows for cells and columns for genes.
cluster_attr: ``str``
Attribute in data.obs representing cluster assignment, e.g. 'louvain_labels'
qc_attr: ``str``
One of the QC attribute, choosing from 'n_genes', 'n_counts' and 'percent_mito'.
"""
from scipy.sparse import csr_matrix, csc_matrix
from pegasus.cylib.de_utils import csr_to_csc, calc_mwu
cluster_labels = data.obs[cluster_attr].values
values = data.obs[qc_attr].values
report = 'upper' if qc_attr == 'percent_mito' else 'lower'
nsample = cluster_labels.size
ords = np.argsort(cluster_labels.codes)
mat_csr = csr_matrix(values.astype(np.float32).reshape(-1, 1))
data, indices, indptr = csr_to_csc(mat_csr.data, mat_csr.indices, mat_csr.indptr, nsample, 1, ords)
start_pos = 0
end_pos = 1
cluster_cnts = cluster_labels.value_counts()
n1arr = cluster_cnts.values
n2arr = nsample - n1arr
cluster_cumsum = cluster_cnts.cumsum().values
first_j = second_j = -1
posvec = np.where(n1arr > 0)[0]
if len(posvec) == 2:
first_j = posvec[0]
second_j = posvec[1]
U_stats, pvals, aurocs = calc_mwu(start_pos, end_pos, data, indices, indptr, n1arr, n2arr, cluster_cumsum, first_j, second_j, False)
index = []
pval_vec = []
auroc_vec = []
for i in range(U_stats.shape[1]):
if pvals[0, i] < 1e-6 and ((report == 'lower' and aurocs[0, i] < 0.25) or (report == 'upper' and aurocs[0, i] > 0.75)):
index.append(cluster_labels.categories[i])
pval_vec.append(pvals[0, i])
auroc_vec.append(aurocs[0, i])
res_df = pd.DataFrame(data = {'auroc': auroc_vec, 'pval': pval_vec}, index = index)
res_df.sort_values(by='auroc', ascending=(report == 'lower'), inplace=True)
return res_df