import time
import numpy as np
import pandas as pd
from pandas.api.types import is_categorical_dtype
from pegasusio import MultimodalData
from natsort import natsorted
from threadpoolctl import threadpool_limits
from sklearn.cluster import KMeans
from typing import List, Optional, Union
from pegasus.tools import eff_n_jobs, construct_graph, calc_stat_per_batch
import logging
logger = logging.getLogger(__name__)
from pegasusio import timer
def _calc_trans_distor(X: np.ndarray, labels: np.ndarray, Y: float) -> float:
""" Calculate transformed distortion function for the jump method """
_, means, _ = calc_stat_per_batch(X, labels)
distor = (((X - means.T[labels,:]) ** 2).sum() / X.shape[0] / X.shape[1])
trans_distor = distor ** (-Y)
return trans_distor
@timer(logger=logger)
def jump_method(
data: MultimodalData,
rep: str = "pca",
K_max: int = 40,
Y: float = None,
n_jobs: int = -1,
random_state: int = 0,
) -> None:
""" Determine the optimal number of clusters using the Jump Method. [Sugar and James, 2003]_
Parameters
----------
data: ``pegasusio.MultimodalData``
Annotated data matrix with rows for cells and columns for genes.
rep: ``str``, optional, default: ``"pca"``
The embedding representation used for clustering. Keyword ``'X_' + rep`` must exist in ``data.obsm``. By default, use PCA coordinates.
K_max: ``int``, optional, default: 40
The maximum number of clusters to try.
Y: ``float``, optional, default: ``None``
The transformation power used. If None, use min(data.shape[1] / 3.0, 3.0).
n_jobs : `int`, optional (default: -1)
Number of threads to use. -1 refers to using all physical CPU cores.
random_state: ``int``, optional, default: ``0``
Random seed for reproducing results.
Returns
-------
``None``
Update ``data.uns``:
* ``data.obs[jump_values]``: Jump values (difference of adjacent transformed distortion values)
Examples
--------
>>> pg.jump_method(data)
"""
X = data.obsm[f"X_{rep}"]
Y = min(data.shape[1] / 3.0, 3.0) if Y is None else Y
logger.info(f"Jump method: Y = {Y:.3f}.")
n_jobs = eff_n_jobs(n_jobs)
jump_values = np.zeros(K_max, dtype = np.float64)
v_old = v = 0.0
for k in range(1, K_max + 1):
with threadpool_limits(limits = n_jobs):
kmeans = KMeans(n_clusters = k, random_state = random_state).fit(X)
v = _calc_trans_distor(X, kmeans.labels_, Y)
jump_values[k - 1] = v - v_old
v_old = v
logger.info(f"K = {k} is finished, jump_value = {jump_values[k - 1]:.6f}.")
optimal_k = np.argmax(jump_values) + 1
data.uns[f"{rep}_jump_values"] = jump_values
data.uns[f"{rep}_optimal_k"] = optimal_k
logger.info(f"Jump method finished. Optimal K = {optimal_k}.")
def _run_community_detection(algo, module, G, resolution, random_state, n_iter = -1):
partition_type = module.RBConfigurationVertexPartition
if algo == "louvain":
partition = partition_type(G, resolution_parameter=resolution, weights="weight")
optimiser = module.Optimiser()
optimiser.set_rng_seed(random_state)
diff = optimiser.optimise_partition(partition)
else:
partition = module.find_partition(
G,
partition_type,
seed=random_state,
weights="weight",
resolution_parameter=resolution,
n_iterations=n_iter,
)
return partition.membership
def _find_optimal_resolution(algo, module, optimal_k, resol_max, G, random_state, n_iter = -1):
resol = None
membership = None
resol_l = 0.01
resol_r = resol_max
while resol_r - resol_l > 0.05:
resol_mid = (resol_l + resol_r) / 2.0
membership_mid = _run_community_detection(algo, module, G, resol_mid, random_state, n_iter)
k = max(membership_mid) + 1
logger.info(f"_find_optimal_resolution: resol = {resol_mid:.4f}, k = {k}, optimal_k = {optimal_k}.")
if k >= optimal_k:
resol_r = resol_mid
resol = resol_mid
membership = membership_mid
else:
resol_l = resol_mid
if resol is None:
resol = resol_r
membership = _run_community_detection(algo, module, G, resol, random_state, n_iter)
k = max(membership_mid) + 1
logger.info(f"_find_optimal_resolution: resol = {resol:.4f}, k = {k}, optimal_k = {optimal_k}.")
return resol, membership
[docs]@timer(logger=logger)
def louvain(
data: MultimodalData,
rep: str = "pca",
resolution: int = 1.3,
n_clust: int = None,
random_state: int = 0,
class_label: str = "louvain_labels",
) -> None:
"""Cluster the cells using Louvain algorithm. [Blondel08]_
Parameters
----------
data: ``pegasusio.MultimodalData``
Annotated data matrix with rows for cells and columns for genes.
rep: ``str``, optional, default: ``"pca"``
The embedding representation used for clustering. Keyword ``'X_' + rep`` must exist in ``data.obsm`` and nearest neighbors must be calculated so that affinity matrix ``'W_' + rep`` exists in ``data.uns``. By default, use PCA coordinates.
resolution: ``int``, optional, default: ``1.3``
Resolution factor. Higher resolution tends to find more clusters with smaller sizes.
n_clust: ``int``, optional, default: ``None``
This option only takes effect if 'resolution = None'. Try to find an appropriate resolution by binary search such that the total number of clusters matches 'n_clust'. The range of resolution to search is (0.01, 2.0].
random_state: ``int``, optional, default: ``0``
Random seed for reproducing results.
class_label: ``str``, optional, default: ``"louvain_labels"``
Key name for storing cluster labels in ``data.obs``.
Returns
-------
``None``
Update ``data.obs``:
* ``data.obs[class_label]``: Cluster labels of cells as categorical data.
Examples
--------
>>> pg.louvain(data)
"""
try:
import louvain as louvain_module
except ImportError:
import sys
logger.error("Need louvain! Try 'pip install louvain' or 'conda install -c conda-forge louvain'.")
sys.exit(-1)
rep_key = "W_" + rep
if rep_key not in data.obsp:
raise ValueError("Cannot find affinity matrix. Please run neighbors first!")
W = data.obsp[rep_key]
G = construct_graph(W)
if resolution is not None:
membership = _run_community_detection("louvain", louvain_module, G, resolution, random_state)
else:
assert isinstance(n_clust, int)
resolution, membership = _find_optimal_resolution("louvain", louvain_module, n_clust, 2.0, G, random_state)
data.uns["louvain_resolution"] = resolution
labels = np.array([str(x + 1) for x in membership])
categories = natsorted(np.unique(labels))
data.obs[class_label] = pd.Categorical(values=labels, categories=categories)
data.register_attr(class_label, "cluster")
n_clusters = data.obs[class_label].cat.categories.size
logger.info(f"Louvain clustering is done. Get {n_clusters} clusters.")
[docs]@timer(logger=logger)
def leiden(
data: MultimodalData,
rep: str = "pca",
resolution: int = 1.3,
n_clust: int = None,
n_iter: int = -1,
random_state: int = 0,
class_label: str = "leiden_labels",
) -> None:
"""Cluster the data using Leiden algorithm. [Traag19]_
Parameters
----------
data: ``pegasusio.MultimodalData``
Annotated data matrix with rows for cells and columns for genes.
rep: ``str``, optional, default: ``"pca"``
The embedding representation used for clustering. Keyword ``'X_' + rep`` must exist in ``data.obsm`` and nearest neighbors must be calculated so that affinity matrix ``'W_' + rep`` exists in ``data.uns``. By default, use PCA coordinates.
resolution: ``int``, optional, default: ``1.3``
Resolution factor. Higher resolution tends to find more clusters.
n_clust: ``int``, optional, default: ``None``
This option only takes effect if 'resolution = None'. Try to find an appropriate resolution by binary search such that the total number of clusters matches 'n_clust'. The range of resolution to search is (0.01, 2.0].
n_iter: ``int``, optional, default: ``-1``
Number of iterations that Leiden algorithm runs. If ``-1``, run the algorithm until reaching its optimal clustering.
random_state: ``int``, optional, default: ``0``
Random seed for reproducing results.
class_label: ``str``, optional, default: ``"leiden_labels"``
Key name for storing cluster labels in ``data.obs``.
Returns
-------
``None``
Update ``data.obs``:
* ``data.obs[class_label]``: Cluster labels of cells as categorical data.
Examples
--------
>>> pg.leiden(data)
"""
try:
import leidenalg
except ImportError:
import sys
logger.error("Need leidenalg! Try 'pip install leidenalg'.")
sys.exit(-1)
rep_key = "W_" + rep
if rep_key not in data.obsp:
raise ValueError("Cannot find affinity matrix. Please run neighbors first!")
W = data.obsp[rep_key]
G = construct_graph(W)
if resolution is not None:
membership = _run_community_detection("leiden", leidenalg, G, resolution, random_state, n_iter)
else:
assert isinstance(n_clust, int)
resolution, membership = _find_optimal_resolution("leiden", leidenalg, n_clust, 2.0, G, random_state, n_iter)
data.uns["leiden_resolution"] = resolution
labels = np.array([str(x + 1) for x in membership])
categories = natsorted(np.unique(labels))
data.obs[class_label] = pd.Categorical(values=labels, categories=categories)
data.register_attr(class_label, "cluster")
n_clusters = data.obs[class_label].cat.categories.size
logger.info(f"Leiden clustering is done. Get {n_clusters} clusters.")
def partition_cells_by_kmeans(
X: np.ndarray,
n_clusters: int,
n_clusters2: int,
n_init: int,
n_jobs: int,
random_state: int,
min_avg_cells_per_final_cluster: Optional[int] = 10,
) -> List[int]:
n_clusters = min(n_clusters, max(X.shape[0] // min_avg_cells_per_final_cluster, 1))
if n_clusters == 1:
return np.zeros(X.shape[0], dtype = np.int32)
n_jobs = eff_n_jobs(n_jobs)
kmeans_params = {
'n_clusters': n_clusters,
'n_init': n_init,
'random_state': random_state,
}
km = KMeans(**kmeans_params)
with threadpool_limits(limits = n_jobs):
km.fit(X)
coarse = km.labels_.copy()
km.set_params(n_init=1)
labels = coarse.copy()
base_sum = 0
for i in range(n_clusters):
idx = coarse == i
nc = min(n_clusters2, max(idx.sum() // min_avg_cells_per_final_cluster, 1))
if nc == 1:
labels[idx] = base_sum
else:
km.set_params(n_clusters=nc)
km.fit(X[idx, :])
labels[idx] = base_sum + km.labels_
base_sum += nc
return labels
[docs]@timer(logger=logger)
def spectral_louvain(
data: MultimodalData,
rep: str = "pca",
resolution: float = 1.3,
rep_kmeans: str = "diffmap",
n_clusters: int = 30,
n_clusters2: int = 50,
n_init: int = 10,
n_jobs: int = -1,
random_state: int = 0,
class_label: str = "spectral_louvain_labels",
) -> None:
""" Cluster the data using Spectral Louvain algorithm. [Li20]_
Parameters
----------
data: ``pegasusio.MultimodalData``
Annotated data matrix with rows for cells and columns for genes.
rep: ``str``, optional, default: ``"pca"``
The embedding representation used for clustering. Keyword ``'X_' + rep`` must exist in ``data.obsm``. By default, use PCA coordinates.
resolution: ``int``, optional, default: ``1.3``
Resolution factor. Higher resolution tends to find more clusters with smaller sizes.
rep_kmeans: ``str``, optional, default: ``"diffmap"``
The embedding representation on which the KMeans runs. Keyword must exist in ``data.obsm``. By default, use Diffusion Map coordinates. If diffmap is not calculated, use PCA coordinates instead.
n_clusters: ``int``, optional, default: ``30``
The number of first level clusters.
n_clusters2: ``int``, optional, default: ``50``
The number of second level clusters.
n_init: ``int``, optional, default: ``10``
Number of kmeans tries for the first level clustering. Default is set to be the same as scikit-learn Kmeans function.
n_jobs : `int`, optional (default: -1)
Number of threads to use for the KMeans step. -1 refers to using all physical CPU cores.
random_state: ``int``, optional, default: ``0``
Random seed for reproducing results.
class_label: ``str``, optional, default: ``"spectral_louvain_labels"``
Key name for storing cluster labels in ``data.obs``.
Returns
-------
``None``
Update ``data.obs``:
* ``data.obs[class_label]``: Cluster labels for cells as categorical data.
Examples
--------
>>> pg.spectral_louvain(data)
"""
try:
import louvain as louvain_module
except ImportError:
import sys
logger.error("Need louvain! Try 'pip install louvain' or 'conda install -c conda-forge louvain'.")
sys.exit(-1)
if f"X_{rep_kmeans}" not in data.obsm.keys():
logger.warning(f"{rep_kmeans} is not calculated, switch to pca instead.")
rep_kmeans = "pca"
if f"X_{rep_kmeans}" not in data.obsm.keys():
raise ValueError(f"Please run {rep_kmeans} first!")
if f"W_{rep}" not in data.obsp:
raise ValueError("Cannot find affinity matrix. Please run neighbors first!")
labels = partition_cells_by_kmeans(
data.obsm[f"X_{rep_kmeans}"], n_clusters, n_clusters2, n_init, n_jobs, random_state,
)
W = data.obsp[f"W_{rep}"]
G = construct_graph(W)
partition_type = louvain_module.RBConfigurationVertexPartition
partition = partition_type(
G, resolution_parameter=resolution, weights="weight", initial_membership=labels
)
partition_agg = partition.aggregate_partition()
optimiser = louvain_module.Optimiser()
optimiser.set_rng_seed(random_state)
diff = optimiser.optimise_partition(partition_agg)
partition.from_coarse_partition(partition_agg)
labels = np.array([str(x + 1) for x in partition.membership])
categories = natsorted(np.unique(labels))
data.obs[class_label] = pd.Categorical(values=labels, categories=categories)
data.register_attr(class_label, "cluster")
n_clusters = data.obs[class_label].cat.categories.size
logger.info(f"Spectral Louvain clustering is done. Get {n_clusters} clusters.")
[docs]@timer(logger=logger)
def spectral_leiden(
data: MultimodalData,
rep: str = "pca",
resolution: float = 1.3,
rep_kmeans: str = "diffmap",
n_clusters: int = 30,
n_clusters2: int = 50,
n_init: int = 10,
n_jobs: int = -1,
random_state: int = 0,
class_label: str = "spectral_leiden_labels",
) -> None:
"""Cluster the data using Spectral Leiden algorithm. [Li20]_
Parameters
----------
data: ``pegasusio.MultimodalData``
Annotated data matrix with rows for cells and columns for genes.
rep: ``str``, optional, default: ``"pca"``
The embedding representation used for clustering. Keyword ``'X_' + rep`` must exist in ``data.obsm``. By default, use PCA coordinates.
resolution: ``int``, optional, default: ``1.3``
Resolution factor. Higher resolution tends to find more clusters.
rep_kmeans: ``str``, optional, default: ``"diffmap"``
The embedding representation on which the KMeans runs. Keyword must exist in ``data.obsm``. By default, use Diffusion Map coordinates. If diffmap is not calculated, use PCA coordinates instead.
n_clusters: ``int``, optional, default: ``30``
The number of first level clusters.
n_clusters2: ``int``, optional, default: ``50``
The number of second level clusters.
n_init: ``int``, optional, default: ``10``
Number of kmeans tries for the first level clustering. Default is set to be the same as scikit-learn Kmeans function.
n_jobs : `int`, optional (default: -1)
Number of threads to use for the KMeans step. -1 refers to using all physical CPU cores.
random_state: ``int``, optional, default: ``0``
Random seed for reproducing results.
class_label: ``str``, optional, default: ``"spectral_leiden_labels"``
Key name for storing cluster labels in ``data.obs``.
Returns
-------
``None``
Update ``data.obs``:
* ``data.obs[class_label]``: Cluster labels for cells as categorical data.
Examples
--------
>>> pg.spectral_leiden(data)
"""
try:
import leidenalg
except ImportError:
import sys
logger.error("Need leidenalg! Try 'pip install leidenalg'.")
sys.exit(-1)
if f"X_{rep_kmeans}" not in data.obsm.keys():
logger.warning(f"{rep_kmeans} is not calculated, switch to pca instead.")
rep_kmeans = "pca"
if f"X_{rep_kmeans}" not in data.obsm.keys():
raise ValueError(f"Please run {rep_kmeans} first!")
if f"W_{rep}" not in data.obsp:
raise ValueError("Cannot find affinity matrix. Please run neighbors first!")
labels = partition_cells_by_kmeans(
data.obsm[f"X_{rep_kmeans}"], n_clusters, n_clusters2, n_init, n_jobs, random_state,
)
W = data.obsp[f"W_{rep}"]
G = construct_graph(W)
partition_type = leidenalg.RBConfigurationVertexPartition
partition = partition_type(
G, resolution_parameter=resolution, weights="weight", initial_membership=labels
)
partition_agg = partition.aggregate_partition()
optimiser = leidenalg.Optimiser()
optimiser.set_rng_seed(random_state)
diff = optimiser.optimise_partition(partition_agg, -1)
partition.from_coarse_partition(partition_agg)
labels = np.array([str(x + 1) for x in partition.membership])
categories = natsorted(np.unique(labels))
data.obs[class_label] = pd.Categorical(values=labels, categories=categories)
data.register_attr(class_label, "cluster")
n_clusters = data.obs[class_label].cat.categories.size
logger.info(f"Spectral Leiden clustering is done. Get {n_clusters} clusters.")
[docs]def cluster(
data: MultimodalData,
algo: str = "louvain",
rep: str = "pca",
resolution: int = 1.3,
n_jobs: int = -1,
random_state: int = 0,
class_label: str = None,
n_iter: int = -1,
rep_kmeans: str = "diffmap",
n_clusters: int = 30,
n_clusters2: int = 50,
n_init: int = 10,
) -> None:
"""Cluster the data using the chosen algorithm.
Candidates are *louvain*, *leiden*, *spectral_louvain* and *spectral_leiden*.
If data have < 1000 cells and there are clusters with sizes of 1, resolution is automatically reduced until no cluster of size 1 appears.
Parameters
----------
data: ``pegasusio.MultimodalData``
Annotated data matrix with rows for cells and columns for genes.
algo: ``str``, optional, default: ``"louvain"``
Which clustering algorithm to use. Choices are louvain, leiden, spectral_louvain, spectral_leiden
rep: ``str``, optional, default: ``"pca"``
The embedding representation used for clustering. Keyword ``'X_' + rep`` must exist in ``data.obsm``. By default, use PCA coordinates.
resolution: ``int``, optional, default: ``1.3``
Resolution factor. Higher resolution tends to find more clusters.
n_jobs : `int`, optional (default: -1)
Number of threads to use for the KMeans step in 'spectral_louvain' and 'spectral_leiden'. -1 refers to using all physical CPU cores.
random_state: ``int``, optional, default: ``0``
Random seed for reproducing results.
class_label: ``str``, optional, default: None
Key name for storing cluster labels in ``data.obs``. If None, use 'algo_labels'.
n_iter: ``int``, optional, default: ``-1``
Number of iterations that Leiden algorithm runs. If ``-1``, run the algorithm until reaching its optimal clustering.
rep_kmeans: ``str``, optional, default: ``"diffmap"``
The embedding representation on which the KMeans runs. Keyword must exist in ``data.obsm``. By default, use Diffusion Map coordinates. If diffmap is not calculated, use PCA coordinates instead.
n_clusters: ``int``, optional, default: ``30``
The number of first level clusters.
n_clusters2: ``int``, optional, default: ``50``
The number of second level clusters.
n_init: ``int``, optional, default: ``10``
Number of kmeans tries for the first level clustering. Default is set to be the same as scikit-learn Kmeans function.
Returns
-------
``None``
Update ``data.obs``:
* ``data.obs[class_label]``: Cluster labels of cells as categorical data.
Examples
--------
>>> pg.cluster(data, algo = 'leiden')
"""
if algo not in {"louvain", "leiden", "spectral_louvain", "spectral_leiden"}:
raise ValueError("Unknown clustering algorithm {}.".format(algo))
if class_label is None:
class_label = algo + "_labels"
kwargs = {
"data": data,
"rep": rep,
"resolution": resolution,
"random_state": random_state,
"class_label": class_label,
}
if algo == "leiden":
kwargs["n_iter"] = n_iter
if algo in ["spectral_louvain", "spectral_leiden"]:
kwargs.update(
{
"rep_kmeans": rep_kmeans,
"n_clusters": n_clusters,
"n_clusters2": n_clusters2,
"n_init": n_init,
"n_jobs": n_jobs,
}
)
cluster_func = globals()[algo]
cluster_func(**kwargs) # clustering
if data.shape[0] < 100000 and data.obs[class_label].value_counts().min() == 1:
new_resol = resolution
while new_resol > 0.0:
new_resol -= 0.1
kwargs["resolution"] = new_resol
cluster_func(**kwargs)
if data.obs[class_label].value_counts().min() > 1:
break
logger.warning(
"Reduced resolution from {:.2f} to {:.2f} to avoid clusters of size 1.".format(
resolution, new_resol
)
)
[docs]def split_one_cluster(
data: MultimodalData,
clust_label: str,
clust_id: str,
n_clust: int,
res_label: str,
rep: str = "pca",
n_comps: int = None,
random_state: int = 0,
) -> None:
"""
Use Leiden algorithm to split 'clust_id' in 'clust_label' into 'n_components' sub-clusters and write the new clusting results to 'res_label'. The sub-cluster names are the concatenation of original cluster name and the subcluster id (e.g. 'T' -> 'T-1', 'T-2').
Parameters
----------
data: ``pegasusio.MultimodalData``
Annotated data matrix with rows for cells and columns for genes.
clust_label: `str`
Use existing clustering stored in data.obs['clust_label'].
clust_id: `str`
Cluster ID in data.obs['clust_label'].
n_clust: `int`
Split 'clust_id' into `n_clust' subclusters.
res_label: `str`,
Write new clustering in data.obs['res_label']. The sub-cluster names are the concatenation of original cluster name and the subcluster id (e.g. 'T' -> 'T-1', 'T-2').
rep: ``str``, optional, default: ``"pca"``
The embedding representation used for Kmeans clustering. Keyword ``'X_' + rep`` must exist in ``data.obsm``. By default, use PCA coordinates.
n_comps: `int`, optional (default: None)
Number of components to be used in the `rep`. If n_comps == None, use all components; otherwise, use the minimum of n_comps and rep's dimensions.
n_jobs : `int`, optional (default: -1)
Number of threads to use for the KMeans step in 'spectral_louvain' and 'spectral_leiden'. -1 refers to using all physical CPU cores.
random_state: ``int``, optional, default: ``0``
Random seed for reproducing results.
Returns
-------
``None``
Update ``data.obs``:
* ``data.obs[res_label]``: New cluster labels of cells as categorical data.
Examples
--------
>>> pg.split_one_cluster(data, 'leiden_labels', '15', 2, 'leiden_labels_split')
"""
cats = None
if is_categorical_dtype(data.obs[clust_label]):
cats = data.obs[clust_label].cat.categories.values
else:
cats = pd.Categorical(data.obs[clust_label]).categories.values
if cats.dtype.kind not in {'S', 'U'}:
cats = cats.astype(str)
idx_cat = np.nonzero(cats==clust_id)[0]
if idx_cat.size == 0:
raise ValueError(f"{clust_id} is not in {clust_label}!")
elif idx_cat.size > 1:
raise ValueError(f"Detected more than one categories in {clust_label} with name {clust_id}!")
else:
idx_cat = idx_cat[0]
idx = np.nonzero((data.obs[clust_label] == clust_id).values)[0]
tmpdat = data[idx].copy()
from pegasus.tools import neighbors
neighbors(tmpdat, rep=rep, n_comps=n_comps, use_cache=False)
leiden(tmpdat, rep=rep, resolution=None, n_clust=n_clust, random_state=random_state)
new_clust = data.obs[clust_label].values.astype(object)
cats_sub = []
for i, label in enumerate(tmpdat.obs['leiden_labels'].value_counts().index):
sub_id = f"{clust_id}-{i+1}"
new_clust[idx[(tmpdat.obs['leiden_labels'] == label).values]] = sub_id
cats_sub.append(sub_id)
data.obs[res_label] = pd.Categorical(values = new_clust, categories = np.concatenate((cats[0:idx_cat], np.array(cats_sub), cats[idx_cat+1:])))
data.register_attr(res_label, "cluster")
del tmpdat