import numpy as np
import pandas as pd
import time
from natsort import natsorted
import multiprocessing
from sklearn.cluster import KMeans
from typing import List
from anndata import AnnData
from pegasus.utils import decorators as pg_deco
[docs]def estimate_background_probs(adt: AnnData, random_state: int = 0):
"""For cell-hashing data, estimate antibody background probability using EM algorithm.
Parameters
----------
adt: ``anndata.AnnData``
Annotated data matrix for antibody.
random_state: ``int``, optional, default: ``0``
Random seed set for reproducing results.
Returns
-------
``None``
Update ``adt.uns``:
* ``adt.uns["background_probs"]``: estimated antibody background probability.
Example
-------
>>> pg.estimate_background_probs(adt)
"""
adt.obs["counts"] = adt.X.sum(axis=1).A1 if adt.shape[1] > 1 else adt.X
counts_log10 = np.log10(adt.obs["counts"].values.reshape(-1, 1))
kmeans = KMeans(n_clusters=2, random_state=random_state).fit(counts_log10)
signal = 0 if kmeans.cluster_centers_[0] > kmeans.cluster_centers_[1] else 1
adt.obs["hto_type"] = "background"
adt.obs.loc[kmeans.labels_ == signal, "hto_type"] = "signal"
idx = np.isin(adt.obs["hto_type"], "background")
pvec = (
adt.X[idx,].sum(axis=0).A1 if adt.shape[1] > 1 else np.array(adt.X[idx,].sum())
)
pvec /= pvec.sum()
adt.uns["background_probs"] = pvec
def estimate_probs(arr, pvec, alpha, alpha_noise, tol):
probs = np.zeros(pvec.size + 1)
old_probs = np.zeros(pvec.size + 1)
z = np.zeros(pvec.size + 1)
noise = pvec.size
# Estimate MLE without Generalized Dirichlet prior
probs_mle = arr / arr.sum()
probs[noise] = (probs_mle / pvec).min() + 0.01
probs[:-1] = np.maximum(probs_mle - probs[noise] * pvec, 0.01)
probs = probs / probs.sum()
# EM algorithm
i = 0
eps = 1.0
while eps > tol:
i += 1
old_probs[:] = probs[:]
# E step
z[:-1] = alpha - 1.0
z[noise] = alpha_noise - 1.0
for j in range(pvec.size):
if arr[j] > 0:
p = probs[j] / (probs[noise] * pvec[j] + probs[j])
z[j] += arr[j] * p
z[noise] += arr[j] * (1.0 - p)
# M step
idx = z > 0.0
probs[idx] = z[idx] / z[idx].sum()
probs[~idx] = 0.0
eps = np.linalg.norm(probs - old_probs, ord=1)
# print ("i = {}, eps = {:.2g}.".format(i, eps))
return probs
def get_droplet_info(probs, sample_names):
ids = np.nonzero(probs >= 0.1)[0]
ids = ids[np.argsort(probs[ids])[::-1]]
return (
"singlet" if ids.size == 1 else "doublet",
",".join([sample_names[i] for i in ids]),
)
def calc_demux(data, adt, nsample, min_signal, probs="raw_probs"):
demux_type = np.full(data.shape[0], "unknown", dtype="object")
assignments = np.full(data.shape[0], "", dtype="object")
signals = adt.obs["counts"].reindex(data.obs_names, fill_value=0.0).values * (
1.0 - data.obsm[probs][:, nsample]
)
idx = signals >= min_signal
tmp = data.obsm[probs][idx,]
norm_probs = tmp[:, 0:nsample] / (1.0 - tmp[:, nsample])[:, None]
values1 = []
values2 = []
for i in range(norm_probs.shape[0]):
droplet_type, droplet_id = get_droplet_info(norm_probs[i,], adt.var_names)
values1.append(droplet_type)
values2.append(droplet_id)
demux_type[idx] = values1
data.obs["demux_type"] = pd.Categorical(
demux_type, categories=["singlet", "doublet", "unknown"]
)
assignments[idx] = values2
data.obs["assignment"] = pd.Categorical(
assignments, categories=natsorted(np.unique(assignments))
)
def has_duplicate_names(names: List[str]) -> bool:
for name in names:
if name.find(".#~") >= 0:
return True
return False
def remove_suffix(assigns: List[str]) -> List[str]:
assigns = assigns.astype(str)
results = []
for value in assigns:
fields = value.split(",")
for i, item in enumerate(fields):
pos = item.find(".#~")
if pos >= 0:
fields[i] = item[:pos]
results.append(",".join(fields))
results = np.array(results)
return pd.Categorical(results, categories=natsorted(np.unique(results)))
[docs]@pg_deco.TimeLogger()
def demultiplex(
data: AnnData,
adt: AnnData,
min_signal: float = 10.0,
alpha: float = 0.0,
alpha_noise: float = 1.0,
tol: float = 1e-6,
n_threads: int = 1,
):
"""Demultiplexing cell-hashing data, using the estimated antibody background probability calculated in ``pg.estimate_background_probs``.
Parameters
----------
data: ``anndata.AnnData``
Annotated data matrix for gene expression matrix.
adt: ``anndata.AnnData``
Annotated data matrix for antibody count matrix.
min_signal: ``float``, optional, default: ``10.0``
Any cell/nucleus with less than ``min_signal`` hashtags from the signal will be marked as ``unknown``.
alpha: ``float``, optional, default: ``0.0``
The Dirichlet prior concentration parameter (alpha) on samples. An alpha value < 1.0 will make the prior sparse.
alpha_noise: ``float``, optional, default: ``1.0``
tol: ``float``, optional, default: ``1e-6``
Tolerance threshold when judging equivalence of two floating point values.
n_threads: ``int``, optional, default: ``1``
Number of threads to use. Must be a positive integer.
Returns
-------
``None``
Update ``data.obs``:
* ``data.obs["demux_type"]``: Demultiplexed types of the cells. Either ``singlet``, ``doublet``, or ``unknown``.
* ``data.obs["assignment"]``: Assigned samples of origin for each cell barcode.
* ``data.obs["assignment.dedup"]``: Only exist if one sample name can correspond to multiple feature barcodes. In this array, each feature barcode is assigned a unique sample name.
Examples
--------
>>> pg.demultiplex(adata, adt)
"""
nsample = adt.shape[1]
data.uns["background_probs"] = adt.uns["background_probs"]
idx_df = data.obs_names.isin(adt.obs_names)
adt.obs["rna_type"] = "background"
adt.obs.loc[data.obs_names[idx_df], "rna_type"] = "signal"
if nsample == 1:
print("Warning: detect only one barcode, no need to demultiplex!")
data.obsm["raw_probs"] = np.zeros((data.shape[0], nsample + 1))
data.obsm["raw_probs"][:, 0] = 1.0
data.obsm["raw_probs"][:, 1] = 0.0
data.obs["demux_type"] = "singlet"
data.obs["assignment"] = adt.var_names[0]
else:
if nsample == 2:
print(
"Warning: detect only two barcodes, demultiplexing accuracy might be affected!"
)
ncalc = idx_df.sum()
if ncalc < data.shape[0]:
nzero = data.shape[0] - ncalc
print(
"Warning: {} cells do not have ADTs, percentage = {:.2f}%.".format(
nzero, nzero * 100.0 / data.shape[0]
)
)
adt_small = adt[data.obs_names[idx_df],].X.toarray()
data.obsm["raw_probs"] = np.zeros((data.shape[0], nsample + 1))
data.obsm["raw_probs"][:, nsample] = 1.0
iter_array = [
(adt_small[i,], adt.uns["background_probs"], alpha, alpha_noise, tol)
for i in range(ncalc)
]
with multiprocessing.Pool(n_threads) as pool:
data.obsm["raw_probs"][idx_df, :] = pool.starmap(estimate_probs, iter_array)
calc_demux(data, adt, nsample, min_signal)
if has_duplicate_names(adt.var_names):
data.obs["assignment.dedup"] = data.obs["assignment"]
data.obs["assignment"] = remove_suffix(data.obs["assignment"].values)