import time
import numpy as np
import pandas as pd
from anndata import AnnData
from scipy.sparse import csr_matrix, csc_matrix
from joblib import Parallel, delayed, effective_n_jobs
from statsmodels.stats.multitest import fdrcorrection as fdr
from collections import defaultdict
from typing import List, Tuple, Dict
import logging
logger = logging.getLogger("pegasus")
from pegasus.utils import decorators as pg_deco
def calc_basic_stat(
clust_id: str,
data: List[float],
indices: List[int],
indptr: List[int],
shape: Tuple[int, int],
cluster_labels: List[str],
cond_labels: List[str],
gene_names: List[str],
sum_vec: List[float],
cnt_vec: List[int],
verbose: bool,
) -> pd.DataFrame:
""" Calcualte basic statistics for one cluster
"""
zero_vec = np.zeros(shape[1], dtype=np.float32)
# recover sparse matrix
mat = csr_matrix((data, indices, indptr), shape=shape)
mask = cluster_labels == clust_id
mat_clust = mat[mask]
if cond_labels is None:
n1 = mat_clust.shape[0]
n2 = shape[0] - n1
sum_clu = mat_clust.sum(axis=0).A1
mean1 = sum_clu / n1 if n1 > 0 else zero_vec
mean2 = (sum_vec - sum_clu) / n2 if n2 > 0 else zero_vec
mean2[mean2 < 0.0] = 0.0
nonzeros = mat_clust.getnnz(axis=0)
percents = (nonzeros / n1 * 100.0).astype(np.float32) if n1 > 0 else zero_vec
percents_other = (
((cnt_vec - nonzeros) / n2 * 100.0).astype(np.float32)
if n2 > 0
else zero_vec
)
else:
cond1 = cond_labels.categories[0]
cond_labs = cond_labels[mask]
mask2 = cond_labs == cond1
mat_cond1 = mat_clust[mask2]
mat_cond2 = mat_clust[~mask2]
n1 = mat_cond1.shape[0]
n2 = mat_cond2.shape[0]
mean1 = mat_cond1.mean(axis=0).A1
mean2 = mat_cond2.mean(axis=0).A1
percents = (
(mat_cond1.getnnz(axis=0) / n1 * 100.0).astype(np.float32)
if n1 > 0
else zero_vec
)
percents_other = (
(mat_cond2.getnnz(axis=0) / n2 * 100.0).astype(np.float32)
if n2 > 0
else zero_vec
)
# calculate log_fold_change and WAD, Weighted Average Difference, https://almob.biomedcentral.com/articles/10.1186/1748-7188-3-8
log_fold_change = mean1 - mean2
x_avg = (mean1 + mean2) / 2
x_max = x_avg.max()
x_min = x_avg.min() - 0.001 # to avoid divide by zero
weights = (x_avg - x_min) / (x_max - x_min)
WADs = log_fold_change * weights
# calculate percent fold change
idx = percents > 0.0
idx_other = percents_other > 0.0
percent_fold_change = np.zeros(shape[1], dtype=np.float32)
percent_fold_change[(~idx) & (~idx_other)] = np.nan
percent_fold_change[idx & (~idx_other)] = np.inf
percent_fold_change[idx_other] = percents[idx_other] / percents_other[idx_other]
df = pd.DataFrame(
{
"mean_logExpr:{0}".format(clust_id): mean1,
"mean_logExpr_other:{0}".format(clust_id): mean2,
"log_fold_change:{0}".format(clust_id): log_fold_change,
"fold_change:{0}".format(clust_id): np.exp(log_fold_change),
"percentage:{0}".format(clust_id): percents,
"percentage_other:{0}".format(clust_id): percents_other,
"percentage_fold_change:{0}".format(clust_id): percent_fold_change,
"WAD_score:{0}".format(clust_id): WADs,
},
index=gene_names,
)
if verbose:
logger.info("calc_basic_stat finished for cluster {0}.".format(clust_id))
return df
def collect_basic_statistics(
X: csr_matrix,
cluster_labels: List[str],
cond_labels: List[str],
gene_names: List[str],
n_jobs: int,
temp_folder: str,
verbose: bool,
) -> List[pd.DataFrame]:
""" Collect basic statistics, triggering calc_basic_stat in parallel
"""
start = time.perf_counter()
sum_vec = cnt_vec = None
if cond_labels is None:
sum_vec = X.sum(axis=0).A1
cnt_vec = X.getnnz(axis=0)
result_list = Parallel(n_jobs=n_jobs, max_nbytes=1e7, temp_folder=temp_folder)(
delayed(calc_basic_stat)(
clust_id,
X.data,
X.indices,
X.indptr,
X.shape,
cluster_labels,
cond_labels,
gene_names,
sum_vec,
cnt_vec,
verbose,
)
for clust_id in cluster_labels.categories
)
end = time.perf_counter()
if verbose:
logger.info(
"Collecting basic statistics is done. Time spent = {:.2f}s.".format(
end - start
)
)
return result_list
def calc_auc(
clust_id: str,
data: List[float],
indices: List[int],
indptr: List[int],
shape: Tuple[int, int],
cluster_labels: List[str],
cond_labels: List[str],
gene_names: List[str],
verbose: bool,
) -> pd.DataFrame:
""" Calculate AUROC for one cluster
"""
import sklearn.metrics as sm
csc_mat = csc_matrix((data, indices, indptr), shape=shape)
mask = cluster_labels == clust_id
auroc = np.zeros(shape[1], dtype=np.float32)
# aupr = np.zeros(shape[1], dtype = np.float32)
if cond_labels is None:
exprs = np.zeros(shape[0])
y_true = mask
n1 = mask.sum()
n2 = shape[0] - n1
else:
exprs = None
cond1 = cond_labels.categories[0]
cond_labs = cond_labels[mask]
y_true = cond_labs == cond1
n1 = y_true.sum()
n2 = mask.sum() - n1
if n1 > 0 and n2 > 0:
for i in range(shape[1]):
if cond_labels is None:
exprs[:] = 0.0
exprs[
csc_mat.indices[csc_mat.indptr[i] : csc_mat.indptr[i + 1]]
] = csc_mat.data[csc_mat.indptr[i] : csc_mat.indptr[i + 1]]
else:
exprs = csc_mat[mask, i].toarray()[:, 0]
fpr, tpr, thresholds = sm.roc_curve(y_true, exprs)
auroc[i] = sm.auc(fpr, tpr)
# precision, recall, thresholds = sm.precision_recall_curve(y_true, exprs)
# aupr[i] = sm.auc(recall, precision)
df = pd.DataFrame(
{
"auroc:{0}".format(clust_id): auroc,
# "aupr:{0}".format(clust_id): aupr,
},
index=gene_names,
)
if verbose:
logger.info("calc_auc finished for cluster {0}.".format(clust_id))
return df
def calculate_auc_values(
Xc: csc_matrix,
cluster_labels: List[str],
cond_labels: List[str],
gene_names: List[str],
n_jobs: int,
temp_folder: str,
verbose: bool,
) -> List[pd.DataFrame]:
""" Calculate AUROC values, triggering calc_auc in parallel
"""
start = time.perf_counter()
result_list = Parallel(n_jobs=n_jobs, max_nbytes=1e7, temp_folder=temp_folder)(
delayed(calc_auc)(
clust_id,
Xc.data,
Xc.indices,
Xc.indptr,
Xc.shape,
cluster_labels,
cond_labels,
gene_names,
verbose,
)
for clust_id in cluster_labels.categories
)
end = time.perf_counter()
if verbose:
logger.info(
"AUROC values are calculated. Time spent = {:.2f}s.".format(end - start)
)
return result_list
def calc_t(
clust_id: str,
data: List[float],
indices: List[int],
indptr: List[int],
shape: Tuple[int, int],
cluster_labels: List[str],
cond_labels: List[str],
gene_names: List[str],
sum_vec: List[float],
sum2_vec: List[float],
verbose: bool,
) -> pd.DataFrame:
""" Calcualte Welch's t-test for one cluster
"""
# recover sparse matrix
mat = csr_matrix((data, indices, indptr), shape=shape)
pvals = np.full(shape[1], 1.0)
tscores = np.full(shape[1], 0)
mask = cluster_labels == clust_id
mat_clust = mat[mask]
if cond_labels is None:
n1 = mat_clust.shape[0]
n2 = shape[0] - n1
if n1 > 1 and n2 > 1:
sum_clu = mat_clust.sum(axis=0).A1
mean1 = sum_clu / n1
mean2 = (sum_vec - sum_clu) / n2
mean2[mean2 < 0.0] = 0.0
sum2_clu = mat_clust.power(2).sum(axis=0).A1
s1sqr = (sum2_clu - n1 * (mean1 ** 2)) / (n1 - 1)
s2sqr = ((sum2_vec - sum2_clu) - n2 * (mean2 ** 2)) / (n2 - 1)
s2sqr[s2sqr < 0.0] = 0.0
else:
cond1 = cond_labels.categories[0]
cond_labs = cond_labels[mask]
mask2 = cond_labs == cond1
mat_cond1 = mat_clust[mask2]
mat_cond2 = mat_clust[~mask2]
n1 = mat_cond1.shape[0]
n2 = mat_cond2.shape[0]
if n1 > 1 and n2 > 1:
mean1 = mat_cond1.mean(axis=0).A1
psum1 = mat_cond1.power(2).sum(axis=0).A1
s1sqr = (psum1 - n1 * (mean1 ** 2)) / (n1 - 1)
mean2 = mat_cond2.mean(axis=0).A1
psum2 = mat_cond2.power(2).sum(axis=0).A1
s2sqr = (psum2 - n2 * (mean2 ** 2)) / (n2 - 1)
if n1 > 1 and n2 > 1:
import scipy.stats as ss
var_est = s1sqr / n1 + s2sqr / n2
idx = var_est > 0.0
if idx.sum() > 0:
tscore = (mean1[idx] - mean2[idx]) / np.sqrt(var_est[idx])
v = (var_est[idx] ** 2) / (
(s1sqr[idx] / n1) ** 2 / (n1 - 1) + (s2sqr[idx] / n2) ** 2 / (n2 - 1)
)
pvals[idx] = ss.t.sf(np.fabs(tscore), v) * 2.0 # two-sided
tscores[idx] = tscore
passed, qvals = fdr(pvals)
df = pd.DataFrame(
{
"t_pval:{0}".format(clust_id): pvals.astype(np.float32),
"t_qval:{0}".format(clust_id): qvals.astype(np.float32),
"t_score:{0}".format(clust_id): tscores.astype(np.float32),
},
index=gene_names,
)
if verbose:
logger.info("calc_t finished for cluster {0}.".format(clust_id))
return df
def t_test(
X: csr_matrix,
cluster_labels: List[str],
cond_labels: List[str],
gene_names: List[str],
n_jobs: int,
temp_folder: str,
verbose: bool,
) -> List[pd.DataFrame]:
""" Run Welch's t-test, triggering calc_t in parallel
"""
start = time.perf_counter()
sum_vec = sum2_vec = None
if cond_labels is None:
sum_vec = X.sum(axis=0).A1
sum2_vec = X.power(2).sum(axis=0).A1
result_list = Parallel(n_jobs=n_jobs, max_nbytes=1e7, temp_folder=temp_folder)(
delayed(calc_t)(
clust_id,
X.data,
X.indices,
X.indptr,
X.shape,
cluster_labels,
cond_labels,
gene_names,
sum_vec,
sum2_vec,
verbose,
)
for clust_id in cluster_labels.categories
)
end = time.perf_counter()
if verbose:
logger.info("Welch's t-test is done. Time spent = {:.2f}s.".format(end - start))
return result_list
def calc_fisher(
clust_id: str,
data: List[float],
indices: List[int],
indptr: List[int],
shape: Tuple[int, int],
cluster_labels: List[str],
cond_labels: List[str],
gene_names: List[str],
cnt_vec: List[int],
verbose: bool,
) -> pd.DataFrame:
""" Calcualte Fisher's exact test for one cluster
"""
import fisher
# recover sparse matrix
mat = csr_matrix((data, indices, indptr), shape=shape)
mask = cluster_labels == clust_id
mat_clust = mat[mask]
if cond_labels is None:
n1 = mat_clust.shape[0]
n2 = shape[0] - n1
a_true = mat_clust.getnnz(axis=0).astype(np.uint)
a_false = n1 - a_true
b_true = cnt_vec.astype(np.uint) - a_true
b_false = n2 - b_true
else:
cond1 = cond_labels.categories[0]
cond_labs = cond_labels[mask]
mask2 = cond_labs == cond1
mat_cond1 = mat_clust[mask2]
mat_cond2 = mat_clust[~mask2]
n1 = mat_cond1.shape[0]
n2 = mat_cond2.shape[0]
a_true = mat_cond1.getnnz(axis=0).astype(np.uint)
a_false = n1 - a_true
b_true = mat_cond2.getnnz(axis=0).astype(np.uint)
b_false = n2 - b_true
pvals = fisher.pvalue_npy(a_true, a_false, b_true, b_false)[2]
passed, qvals = fdr(pvals)
df = pd.DataFrame(
{
"fisher_pval:{0}".format(clust_id): pvals.astype(np.float32),
"fisher_qval:{0}".format(clust_id): qvals.astype(np.float32),
},
index=gene_names,
)
if verbose:
logger.info("calc_fisher finished for cluster {0}.".format(clust_id))
return df
def fisher_test(
X: csr_matrix,
cluster_labels: List[str],
cond_labels: List[str],
gene_names: List[str],
n_jobs: int,
temp_folder: str,
verbose: bool,
) -> List[pd.DataFrame]:
""" Run Fisher's exact test, triggering calc_fisher in parallel
"""
start = time.perf_counter()
cnt_vec = None
if cond_labels is None:
cnt_vec = X.getnnz(axis=0)
result_list = Parallel(n_jobs=n_jobs, max_nbytes=1e7, temp_folder=temp_folder)(
delayed(calc_fisher)(
clust_id,
X.data,
X.indices,
X.indptr,
X.shape,
cluster_labels,
cond_labels,
gene_names,
cnt_vec,
verbose,
)
for clust_id in cluster_labels.categories
)
end = time.perf_counter()
if verbose:
logger.info(
"Fisher's exact test is done. Time spent = {:.2f}s.".format(end - start)
)
return result_list
def calc_mwu(
clust_id: str,
data: List[float],
indices: List[int],
indptr: List[int],
shape: Tuple[int, int],
cluster_labels: List[str],
cond_labels: List[str],
gene_names: List[str],
verbose: bool,
) -> pd.DataFrame:
""" Run Mann-Whitney U test for one cluster
"""
csc_mat = csc_matrix((data, indices, indptr), shape=shape)
U_stats = np.zeros(shape[1], dtype=np.float32)
pvals = np.full(shape[1], 1.0)
mask = cluster_labels == clust_id
if cond_labels is None:
exprs = np.zeros(shape[0])
idx_x = mask
idx_y = ~idx_x
else:
exprs = None
cond1 = cond_labels.categories[0]
cond_labs = cond_labels[mask]
idx_x = cond_labs == cond1
idx_y = ~idx_x
n1 = idx_x.sum()
n2 = idx_y.sum()
if n1 > 0 and n2 > 0:
import scipy.stats as ss
for i in range(shape[1]):
if cond_labels is None:
if csc_mat.indptr[i + 1] - csc_mat.indptr[i] > 0:
exprs[:] = 0.0
exprs[
csc_mat.indices[csc_mat.indptr[i] : csc_mat.indptr[i + 1]]
] = csc_mat.data[csc_mat.indptr[i] : csc_mat.indptr[i + 1]]
U_stats[i], pvals[i] = ss.mannwhitneyu(
exprs[idx_x], exprs[idx_y], alternative="two-sided"
)
else:
tmp_mat = csc_mat[mask, i]
if tmp_mat.data.size > 0:
exprs = tmp_mat.toarray()[:, 0]
U_stats[i], pvals[i] = ss.mannwhitneyu(
exprs[idx_x], exprs[idx_y], alternative="two-sided"
)
passed, qvals = fdr(pvals)
df = pd.DataFrame(
{
"mwu_U:{0}".format(clust_id): U_stats.astype(np.float32),
"mwu_pval:{0}".format(clust_id): pvals.astype(np.float32),
"mwu_qval:{0}".format(clust_id): qvals.astype(np.float32),
},
index=gene_names,
)
if verbose:
logger.info("calc_mwu finished for cluster {0}.".format(clust_id))
return df
def mwu_test(
Xc: csc_matrix,
cluster_labels: List[str],
cond_labels: List[str],
gene_names: List[str],
n_jobs: int,
temp_folder: str,
verbose: bool,
) -> List[pd.DataFrame]:
""" Run Mann-Whitney U test, triggering calc_mwu in parallel
"""
start = time.perf_counter()
result_list = Parallel(n_jobs=n_jobs, max_nbytes=1e7, temp_folder=temp_folder)(
delayed(calc_mwu)(
clust_id,
Xc.data,
Xc.indices,
Xc.indptr,
Xc.shape,
cluster_labels,
cond_labels,
gene_names,
verbose,
)
for clust_id in cluster_labels.categories
)
end = time.perf_counter()
if verbose:
logger.info(
"Mann-Whitney U test is done. Time spent = {:.2f}s.".format(end - start)
)
return result_list
def organize_results(results: List[List[pd.DataFrame]]) -> pd.DataFrame:
""" Concatenate resulting dataframes into one big dataframe
"""
m = len(results)
n = len(results[0])
reslist = []
for i in range(n):
for j in range(m):
reslist.append(results[j][i])
df = pd.concat(reslist, axis=1)
return df
[docs]def de_analysis(
data: AnnData,
cluster: str,
condition: str = None,
subset: str = None,
result_key: str = "de_res",
n_jobs: int = -1,
auc: bool = True,
t: bool = True,
fisher: bool = False,
mwu: bool = False,
temp_folder: str = None,
verbose: bool = True,
) -> None:
"""Perform Differential Expression (DE) Analysis on data.
Parameters
----------
data: ``anndata.AnnData``
Annotated data matrix with rows for cells and columns for genes.
cluster: ``str``
Cluster labels used in DE analysis. Must exist in ``data.obs``.
condition: ``str``, optional, default: ``None``
Sample attribute used as condition in DE analysis. If ``None``, no condition is considered; otherwise, must exist in ``data.obs``.
subset: ``str``, optional, default: ``None``
Perform DE analysis on only a subset of cluster IDs. Cluster ID subset is specified as ``"clust_id1,clust_id2,...,clust_idn"``, where all IDs must exist in ``data.obs[cluster]``.
result_key: ``str``, optional, default: ``"de_res"``
Key name of DE analysis result stored.
n_jobs: ``int``, optional, default: ``-1``
Number of threads to use. If ``-1``, use all available threads.
auc: ``bool``, optional, default: ``True``
If ``True``, calculate area under ROC (AUROC) and area under Precision-Recall (AUPR).
t: ``bool``, optional, default: ``True``
If ``True``, calculate Welch's t test.
fisher: ``bool``, optional, default: ``False``
If ``True``, calculate Fisher's exact test.
mwu: ``bool``, optional, default: ``False``
If ``True``, calculate Mann-Whitney U test.
temp_folder: ``str``, optional, default: ``None``
Joblib temporary folder for memmapping numpy arrays.
verbose: ``bool``, optional, default: ``True``
If ``True``, show detailed intermediate output.
Returns
-------
``None``
Update ``data.varm``:
``data.varm[result_key]``: DE analysis result.
Examples
--------
>>> pg.de_analysis(adata, cluster = 'spectral_leiden_labels')
subset: a comma-separated list of cluster labels. Then de will be performed only on these subsets.
"""
start = time.perf_counter()
if cluster not in data.obs:
raise ValueError("Cannot find cluster label!")
cluster_labels = data.obs[cluster].values
if not isinstance(cluster_labels, pd.Categorical):
cluster_labels = pd.Categorical(cluster_labels)
cond_labels = None
if condition is not None:
if condition not in data.obs:
raise ValueError("Cannot find condition!")
cond_labels = data.obs[condition].values
if not isinstance(cond_labels, pd.Categorical):
cond_labels = pd.Categorical(cond_labels)
if cond_labels.categories.size != 2:
raise ValueError(
"Number of distinct values in Condition is not equal to 2!"
)
X = data.X if isinstance(data.X, csr_matrix) else data.X[:]
X.eliminate_zeros() # In case there is any extra zeros
if subset is not None:
# subset data for de analysis
subset = np.array(subset.split(","))
idx_s = np.isin(subset, cluster_labels.categories.values)
if idx_s.sum() < subset.size:
raise ValueError(
"These cluster labels do not exist: " + ",".join(subset[~idx_s]) + "!"
)
idx = np.isin(cluster_labels, subset)
cluster_labels = cluster_labels[idx]
if cond_labels is not None:
cond_labels = cond_labels[idx]
X = X[idx]
n_jobs = effective_n_jobs(n_jobs)
gene_names = data.var_names
results = []
results.append(
collect_basic_statistics(
X, cluster_labels, cond_labels, gene_names, n_jobs, temp_folder, verbose
)
)
Xc = None
if auc or mwu:
t1 = time.perf_counter()
Xc = X.tocsc()
if verbose:
logger.info(
"Converting X to csc_matrix is done. Time spent = {:.2f}s.".format(
time.perf_counter() - t1
)
)
if auc:
results.append(
calculate_auc_values(
Xc,
cluster_labels,
cond_labels,
gene_names,
n_jobs,
temp_folder,
verbose,
)
)
if t:
results.append(
t_test(
X, cluster_labels, cond_labels, gene_names, n_jobs, temp_folder, verbose
)
)
if fisher:
results.append(
fisher_test(
X, cluster_labels, cond_labels, gene_names, n_jobs, temp_folder, verbose
)
)
if mwu:
results.append(
mwu_test(
Xc,
cluster_labels,
cond_labels,
gene_names,
n_jobs,
temp_folder,
verbose,
)
)
df = organize_results(results)
data.varm[result_key] = df.to_records(index=False)
end = time.perf_counter()
logger.info(
"Differential expression analysis is finished. Time spent = {:.2f}s.".format(
end - start
)
)
def get_valid_gene_index(n: int, df: pd.DataFrame, alpha: float) -> List[bool]:
""" get genes that are DE for at least one test. If no DE tests, all genes are valid.
"""
idx = np.zeros(n, dtype=bool)
has_test = False
for qval in ["t_qval", "fisher_qval", "mwu_qval"]:
if qval in df.columns:
idx = idx | (df[qval].values <= alpha)
has_test = True
if not has_test:
idx = np.ones(n, dtype=bool)
return idx
def get_sort_key(sort_by: List[str], col_names: List[str], direction: str):
""" direction is either up or down; if down, do not use aupr
"""
for key in sort_by:
if key in col_names:
return key
raise ValueError("No valid key!")
[docs]def markers(
data: AnnData,
head: int = None,
de_key: str = "de_res",
sort_by: str = "auroc,WAD_score",
alpha: float = 0.05,
) -> Dict[str, Dict[str, pd.DataFrame]]:
"""
Parameters
----------
data: ``anndata.AnnData``
Annotated data matrix with rows for cells and columns for genes.
head: ``int``, optional, default: ``None``
List only top ``head`` genes for each cluster. If ``None``, show any DE genes.
de_key: ``str``, optional, default, ``de_res``
Keyword of DE result stored in ``data.varm``.
sort_by: ``str``, optional, default: ``"auroc,WAD_score"``
Sort the resulting marker dictionary by ``auroc`` and ``WAD_score``.
alpha: ``float``, optional, default: ``0.05``
q-value threshold for getting valid DE genes. Only those with q-value of any test below ``alpha`` are significant, and thus considered as DE genes.
Returns
-------
results: ``Dict[str, Dict[str, pd.DataFrame]]``
A Python dictionary containing markers in structure ``dict[cluster_id]['up' or 'down'][dataframe]``.
Examples
--------
>>> marker_dict = pg.markers(adata)
"""
if de_key not in data.varm.keys():
raise ValueError("Please run de_analysis first!")
sort_by = sort_by.split(",")
clust2cols = defaultdict(list)
rec_array = data.varm[de_key]
for name in rec_array.dtype.names:
col_name, sep, clust_id = name.partition(":")
clust2cols[clust_id].append(col_name)
results = defaultdict(dict)
for clust_id, col_names in clust2cols.items():
rec_names = [x + ":" + clust_id for x in col_names]
df = pd.DataFrame(data=rec_array[rec_names], index=data.var_names)
df.columns = col_names
df.index.name = "feature"
idx = get_valid_gene_index(data.shape[1], df, alpha)
idx_up = idx & (df["log_fold_change"].values > 0)
df_up = df.loc[idx_up].sort_values(
by=get_sort_key(sort_by, col_names, "up"), ascending=False, inplace=False
)
results[clust_id]["up"] = pd.DataFrame(
df_up if head is None else df_up.iloc[0:head]
)
idx_down = idx & (df["log_fold_change"].values < 0)
df_down = df.loc[idx_down].sort_values(
by=get_sort_key(sort_by, col_names, "down"), ascending=True, inplace=False
)
results[clust_id]["down"] = pd.DataFrame(
df_down if head is None else df_down.iloc[0:head]
)
return results
[docs]def write_results_to_excel(
results: Dict[str, Dict[str, pd.DataFrame]], output_file: str, ndigits: int = 3
) -> None:
""" Write results into Excel workbook.
Parameters
----------
results: ``Dict[str, Dict[str, pd.DataFrame]]``
DE marker dictionary generated by ``pg.markers``.
output_file: ``str``
File name to which the marker dictionary is written.
ndigits: ``int``, optional, default: ``3``
Round non p-values and q-values to ``ndigits`` after decimal point in the excel.
Returns
-------
``None``
Marker information is written to file with name ``output_file``.
Examples
--------
>>> pg.write_results_to_excel(marker_dict, "result.de.xlsx")
"""
start = time.perf_counter()
import xlsxwriter
from natsort import natsorted
def format_short_output_cols(
df_orig: pd.DataFrame, ndigits: int = 3
) -> pd.DataFrame:
""" Round related float columns to ndigits decimal points.
"""
df = pd.DataFrame(df_orig)
cols = []
for name in df.columns:
if (not name.endswith("pval")) and (not name.endswith("qval")):
cols.append(name)
df.loc[:, cols] = df.loc[:, cols].round(ndigits)
return df
def add_worksheet(
workbook: "workbook", df_orig: pd.DataFrame, sheet_name: str
) -> None:
""" Add one worksheet with content as df
"""
df = format_short_output_cols(df_orig)
df.reset_index(inplace=True)
worksheet = workbook.add_worksheet(name=sheet_name)
if df.shape[0] > 0:
worksheet.add_table(
0,
0,
df.index.size,
df.columns.size - 1,
{
"data": df.to_numpy(),
"style": "Table Style Light 1",
"first_column": True,
"header_row": True,
"columns": [{"header": x} for x in df.columns.values],
},
)
else:
worksheet.write_row(0, 0, df.columns.values)
workbook = xlsxwriter.Workbook(output_file, {"nan_inf_to_errors": True})
workbook.formats[0].set_font_size(9)
for clust_id in natsorted(results.keys()):
add_worksheet(workbook, results[clust_id]["up"], "up " + clust_id)
add_worksheet(workbook, results[clust_id]["down"], "down " + clust_id)
workbook.close()
end = time.perf_counter()
logger.info(
"Excel spreadsheet is written. Time spent = {:.2f}s.".format(end - start)
)
@pg_deco.TimeLogger()
def run_de_analysis(
input_file: str,
output_excel_file: str,
cluster: str,
result_key: str = "de_res",
n_jobs: int = -1,
auc: bool = True,
t: bool = True,
fisher: bool = False,
mwu: bool = False,
temp_folder: str = None,
verbose: bool = True,
alpha: float = 0.05,
ndigits: int = 3,
) -> None:
""" For command line only
"""
from pegasus.io import read_input, write_output
data = read_input(input_file, h5ad_mode="r+")
de_analysis(
data,
cluster,
result_key=result_key,
n_jobs=n_jobs,
auc=auc,
t=t,
fisher=fisher,
mwu=mwu,
temp_folder=temp_folder,
verbose=verbose,
)
write_output(data, input_file, whitelist=["varm/{}".format(result_key)])
logger.info(
"Differential expression results are written to varm/{} in h5ad file.".format(
result_key
)
)
results = markers(data, de_key=result_key, alpha=alpha)
write_results_to_excel(results, output_excel_file, ndigits=ndigits)