import time
import numpy as np
import pandas as pd
from anndata import AnnData
from joblib import effective_n_jobs
from natsort import natsorted
import ctypes
import ctypes.util
from sklearn.cluster import KMeans
from typing import List
from pegasus.tools import construct_graph
import logging
logger = logging.getLogger("pegasus")
from pegasus.utils import decorators as pg_deco
[docs]def louvain(
data: AnnData,
rep: str = "pca",
resolution: int = 1.3,
random_state: int = 0,
class_label: str = "louvain_labels",
) -> None:
"""Cluster the cells using Louvain algorithm.
Parameters
----------
data: ``anndata.AnnData``
Annotated data matrix with rows for cells and columns for genes.
rep: ``str``, optional, default: ``"pca"``
The embedding representation used for clustering. Keyword ``'X_' + rep`` must exist in ``data.obsm``. By default, use PCA coordinates.
resolution: ``int``, optional, default: ``1.3``
Resolution factor. Higher resolution tends to find more clusters with smaller sizes.
random_state: ``int``, optional, default: ``0``
Random seed for reproducing results.
class_label: ``str``, optional, default: ``"louvain_labels"``
Key name for storing cluster labels in ``data.obs``.
Returns
-------
``None``
Update ``data.obs``:
* ``data.obs[class_label]``: Cluster labels of cells as categorical data.
Examples
--------
>>> pg.louvain(adata)
"""
try:
import louvain as louvain_module
except ImportError:
print("Need louvain! Try 'pip install louvain-github'.")
start = time.perf_counter()
rep_key = "W_" + rep
if rep_key not in data.uns:
raise ValueError("Cannot find affinity matrix. Please run neighbors first!")
W = data.uns[rep_key]
G = construct_graph(W)
partition_type = louvain_module.RBConfigurationVertexPartition
partition = partition_type(G, resolution_parameter=resolution, weights="weight")
optimiser = louvain_module.Optimiser()
optimiser.set_rng_seed(random_state)
diff = optimiser.optimise_partition(partition)
labels = np.array([str(x + 1) for x in partition.membership])
categories = natsorted(np.unique(labels))
data.obs[class_label] = pd.Categorical(values=labels, categories=categories)
end = time.perf_counter()
n_clusters = data.obs[class_label].cat.categories.size
logger.info(
"Louvain clustering is done. Get {cluster} clusters. Time spent = {duration:.2f}s.".format(
cluster=n_clusters, duration=end - start
)
)
[docs]def leiden(
data: AnnData,
rep: str = "pca",
resolution: int = 1.3,
n_iter: int = -1,
random_state: int = 0,
class_label: str = "leiden_labels",
) -> None:
"""Cluster the data using Leiden algorithm.
Parameters
----------
data: ``anndata.AnnData``
Annotated data matrix with rows for cells and columns for genes.
rep: ``str``, optional, default: ``"pca"``
The embedding representation used for clustering. Keyword ``'X_' + rep`` must exist in ``data.obsm``. By default, use PCA coordinates.
resolution: ``int``, optional, default: ``1.3``
Resolution factor. Higher resolution tends to find more clusters.
n_iter: ``int``, optional, default: ``-1``
Number of iterations that Leiden algorithm runs. If ``-1``, run the algorithm until reaching its optimal clustering.
random_state: ``int``, optional, default: ``0``
Random seed for reproducing results.
class_label: ``str``, optional, default: ``"leiden_labels"``
Key name for storing cluster labels in ``data.obs``.
Returns
-------
``None``
Update ``data.obs``:
* ``data.obs[class_label]``: Cluster labels of cells as categorical data.
Examples
--------
>>> pg.leiden(adata)
"""
try:
import leidenalg
except ImportError:
print("Need leidenalg! Try 'pip install leidenalg'.")
start = time.perf_counter()
rep_key = "W_" + rep
if rep_key not in data.uns:
raise ValueError("Cannot find affinity matrix. Please run neighbors first!")
W = data.uns[rep_key]
G = construct_graph(W)
partition_type = leidenalg.RBConfigurationVertexPartition
partition = leidenalg.find_partition(
G,
partition_type,
seed=random_state,
weights="weight",
resolution_parameter=resolution,
n_iterations=n_iter,
)
labels = np.array([str(x + 1) for x in partition.membership])
categories = natsorted(np.unique(labels))
data.obs[class_label] = pd.Categorical(values=labels, categories=categories)
end = time.perf_counter()
n_clusters = data.obs[class_label].cat.categories.size
logger.info(
"Leiden clustering is done. Get {cluster} clusters. Time spent = {duration:.2f}s.".format(
cluster=n_clusters, duration=end - start
)
)
@pg_deco.TimeLogger()
def partition_cells_by_kmeans(
data: AnnData,
rep: str,
n_jobs: int,
n_clusters: int,
n_clusters2: int,
n_init: int,
random_state: int,
) -> List[int]:
n_jobs = effective_n_jobs(n_jobs)
rep_key = "X_" + rep
X = data.obsm[rep_key].astype("float64")
km = KMeans(
n_clusters=n_clusters, n_jobs=n_jobs, n_init=n_init, random_state=random_state
)
km.fit(X)
coarse = km.labels_.copy()
km.set_params(n_init=1)
labels = coarse.copy()
base_sum = 0
for i in range(n_clusters):
idx = coarse == i
nc = min(n_clusters2, idx.sum())
km.set_params(n_clusters=nc)
km.fit(X[idx, :])
labels[idx] = base_sum + km.labels_
base_sum += nc
return labels
[docs]def spectral_louvain(
data: AnnData,
rep: str = "pca",
resolution: float = 1.3,
rep_kmeans: str = "diffmap",
n_clusters: int = 30,
n_clusters2: int = 50,
n_init: int = 10,
n_jobs: int = -1,
random_state: int = 0,
class_label: str = "spectral_louvain_labels",
) -> None:
""" Cluster the data using Spectral Louvain algorithm.
Parameters
----------
data: ``anndata.AnnData``
Annotated data matrix with rows for cells and columns for genes.
rep: ``str``, optional, default: ``"pca"``
The embedding representation used for clustering. Keyword ``'X_' + rep`` must exist in ``data.obsm``. By default, use PCA coordinates.
resolution: ``int``, optional, default: ``1.3``
Resolution factor. Higher resolution tends to find more clusters with smaller sizes.
rep_kmeans: ``str``, optional, default: ``"diffmap"``
The embedding representation on which the KMeans runs. Keyword must exist in ``data.obsm``. By default, use Diffusion Map coordinates. If diffmap is not calculated, use PCA coordinates instead.
n_clusters: ``int``, optional, default: ``30``
The number of first level clusters.
n_clusters2: ``int``, optional, default: ``50``
The number of second level clusters.
n_init: ``int``, optional, default: ``10``
Number of kmeans tries for the first level clustering. Default is set to be the same as scikit-learn Kmeans function.
n_jobs: ``int``, optional, default: ``-1``
Number of threads to use. If ``-1``, use all available threads.
random_state: ``int``, optional, default: ``0``
Random seed for reproducing results.
class_label: ``str``, optional, default: ``"spectral_louvain_labels"``
Key name for storing cluster labels in ``data.obs``.
Returns
-------
``None``
Update ``data.obs``:
* ``data.obs[class_label]``: Cluster labels for cells as categorical data.
Examples
--------
>>> pg.spectral_louvain(adata)
"""
try:
import louvain as louvain_module
except ImportError:
print("Need louvain! Try 'pip install louvain-github'.")
start = time.perf_counter()
if "X_" + rep_kmeans not in data.obsm.keys():
logger.warning(
"{} is not calculated, switch to pca instead.".format(rep_kmeans)
)
rep_kmeans = "pca"
if "X_" + rep_kmeans not in data.obsm.keys():
raise ValueError("Please run {} first!".format(rep_kmeans))
if "W_" + rep not in data.uns:
raise ValueError("Cannot find affinity matrix. Please run neighbors first!")
labels = partition_cells_by_kmeans(
data, rep_kmeans, n_jobs, n_clusters, n_clusters2, n_init, random_state,
)
W = data.uns["W_" + rep]
G = construct_graph(W)
partition_type = louvain_module.RBConfigurationVertexPartition
partition = partition_type(
G, resolution_parameter=resolution, weights="weight", initial_membership=labels
)
partition_agg = partition.aggregate_partition()
optimiser = louvain_module.Optimiser()
optimiser.set_rng_seed(random_state)
diff = optimiser.optimise_partition(partition_agg)
partition.from_coarse_partition(partition_agg)
labels = np.array([str(x + 1) for x in partition.membership])
categories = natsorted(np.unique(labels))
data.obs[class_label] = pd.Categorical(values=labels, categories=categories)
end = time.perf_counter()
n_clusters = data.obs[class_label].cat.categories.size
logger.info(
"Spectral Louvain clustering is done. Get {cluster} clusters. Time spent = {duration:.2f}s.".format(
cluster=n_clusters, duration=end - start
)
)
[docs]def spectral_leiden(
data: AnnData,
rep: str = "pca",
resolution: float = 1.3,
rep_kmeans: str = "diffmap",
n_clusters: int = 30,
n_clusters2: int = 50,
n_init: int = 10,
n_jobs: int = -1,
random_state: int = 0,
class_label: str = "spectral_leiden_labels",
) -> None:
"""Cluster the data using Spectral Leiden algorithm.
Parameters
----------
data: ``anndata.AnnData``
Annotated data matrix with rows for cells and columns for genes.
rep: ``str``, optional, default: ``"pca"``
The embedding representation used for clustering. Keyword ``'X_' + rep`` must exist in ``data.obsm``. By default, use PCA coordinates.
resolution: ``int``, optional, default: ``1.3``
Resolution factor. Higher resolution tends to find more clusters.
rep_kmeans: ``str``, optional, default: ``"diffmap"``
The embedding representation on which the KMeans runs. Keyword must exist in ``data.obsm``. By default, use Diffusion Map coordinates. If diffmap is not calculated, use PCA coordinates instead.
n_clusters: ``int``, optional, default: ``30``
The number of first level clusters.
n_clusters2: ``int``, optional, default: ``50``
The number of second level clusters.
n_init: ``int``, optional, default: ``10``
Number of kmeans tries for the first level clustering. Default is set to be the same as scikit-learn Kmeans function.
n_jobs: ``int``, optional, default: ``-1``
Number of threads to use. If ``-1``, use all available threads.
random_state: ``int``, optional, default: ``0``
Random seed for reproducing results.
class_label: ``str``, optional, default: ``"spectral_leiden_labels"``
Key name for storing cluster labels in ``data.obs``.
Returns
-------
``None``
Update ``data.obs``:
* ``data.obs[class_label]``: Cluster labels for cells as categorical data.
Examples
--------
>>> pg.spectral_leiden(adata)
"""
try:
import leidenalg
except ImportError:
print("Need leidenalg! Try 'pip install leidenalg'.")
start = time.perf_counter()
if "X_" + rep_kmeans not in data.obsm.keys():
logger.warning(
"{} is not calculated, switch to pca instead.".format(rep_kmeans)
)
rep_kmeans = "pca"
if "X_" + rep_kmeans not in data.obsm.keys():
raise ValueError("Please run {} first!".format(rep_kmeans))
if "W_" + rep not in data.uns:
raise ValueError("Cannot find affinity matrix. Please run neighbors first!")
labels = partition_cells_by_kmeans(
data, rep_kmeans, n_jobs, n_clusters, n_clusters2, n_init, random_state,
)
W = data.uns["W_" + rep]
G = construct_graph(W)
partition_type = leidenalg.RBConfigurationVertexPartition
partition = partition_type(
G, resolution_parameter=resolution, weights="weight", initial_membership=labels
)
partition_agg = partition.aggregate_partition()
optimiser = leidenalg.Optimiser()
optimiser.set_rng_seed(random_state)
diff = optimiser.optimise_partition(partition_agg, -1)
partition.from_coarse_partition(partition_agg)
labels = np.array([str(x + 1) for x in partition.membership])
categories = natsorted(np.unique(labels))
data.obs[class_label] = pd.Categorical(values=labels, categories=categories)
end = time.perf_counter()
n_clusters = data.obs[class_label].cat.categories.size
logger.info(
"Spectral Leiden clustering is done. Get {cluster} clusters. Time spent = {duration:.2f}s.".format(
cluster=n_clusters, duration=end - start
)
)
[docs]def cluster(
data: AnnData,
algo: str = "louvain",
rep: str = "pca",
resolution: int = 1.3,
random_state: int = 0,
class_label: str = None,
n_iter: int = -1,
rep_kmeans: str = "diffmap",
n_clusters: int = 30,
n_clusters2: int = 50,
n_init: int = 10,
n_jobs: int = -1,
) -> None:
"""Cluster the data using the chosen algorithm. Candidates are louvain, leiden, spectral_louvain and spectral_leiden. If data have < 1000 cells and there are clusters with sizes of 1, resolution is automatically reduced until no cluster of size 1 appears.
Parameters
----------
data: ``anndata.AnnData``
Annotated data matrix with rows for cells and columns for genes.
algo: ``str``, optional, default: ``"louvain"``
Which clustering algorithm to use. Choices are louvain, leiden, spectral_louvain, spectral_leiden
rep: ``str``, optional, default: ``"pca"``
The embedding representation used for clustering. Keyword ``'X_' + rep`` must exist in ``data.obsm``. By default, use PCA coordinates.
resolution: ``int``, optional, default: ``1.3``
Resolution factor. Higher resolution tends to find more clusters.
random_state: ``int``, optional, default: ``0``
Random seed for reproducing results.
class_label: ``str``, optional, default: None
Key name for storing cluster labels in ``data.obs``. If None, use 'algo_labels'.
n_iter: ``int``, optional, default: ``-1``
Number of iterations that Leiden algorithm runs. If ``-1``, run the algorithm until reaching its optimal clustering.
rep_kmeans: ``str``, optional, default: ``"diffmap"``
The embedding representation on which the KMeans runs. Keyword must exist in ``data.obsm``. By default, use Diffusion Map coordinates. If diffmap is not calculated, use PCA coordinates instead.
n_clusters: ``int``, optional, default: ``30``
The number of first level clusters.
n_clusters2: ``int``, optional, default: ``50``
The number of second level clusters.
n_init: ``int``, optional, default: ``10``
Number of kmeans tries for the first level clustering. Default is set to be the same as scikit-learn Kmeans function.
n_jobs: ``int``, optional, default: ``-1``
Number of threads to use. If ``-1``, use all available threads.
Returns
-------
``None``
Update ``data.obs``:
* ``data.obs[class_label]``: Cluster labels of cells as categorical data.
Examples
--------
>>> pg.cluster(adata, algo = 'leiden')
"""
if algo not in {"louvain", "leiden", "spectral_louvain", "spectral_leiden"}:
raise ValueError("Unknown clustering algorithm {}.".format(algo))
if class_label is None:
class_label = algo + "_labels"
kwargs = {
"data": data,
"rep": rep,
"resolution": resolution,
"random_state": random_state,
"class_label": class_label,
}
if algo == "leiden":
kwargs["n_iter"] = n_iter
if algo in ["spectral_louvain", "spectral_leiden"]:
kwargs.update(
{
"rep_kmeans": rep_kmeans,
"n_clusters": n_clusters,
"n_clusters2": n_clusters2,
"n_init": n_init,
"n_jobs": n_jobs,
}
)
cluster_func = globals()[algo]
cluster_func(**kwargs) # clustering
if data.shape[0] < 100000 and data.obs[class_label].value_counts().min() == 1:
new_resol = resolution
while new_resol > 0.0:
new_resol -= 0.1
kwargs["resolution"] = new_resol
cluster_func(**kwargs)
if data.obs[class_label].value_counts().min() > 1:
break
logger.warning(
"Reduced resolution from {:.2f} to {:.2f} to avoid clusters of size 1.".format(
resolution, new_resol
)
)