import time
import numpy as np
from scipy.sparse import issparse, csr_matrix
from scipy.sparse.csgraph import connected_components
from scipy.sparse.linalg import eigsh
from scipy.stats import entropy
from sklearn.decomposition import PCA
from sklearn.utils.extmath import randomized_svd
from threadpoolctl import threadpool_limits
from typing import List, Tuple
from pegasusio import MultimodalData
from pegasus.tools import eff_n_jobs, update_rep, W_from_rep
import logging
logger = logging.getLogger(__name__)
from pegasusio import timer
def calculate_normalized_affinity(
W: csr_matrix
) -> Tuple[csr_matrix, np.array, np.array]:
diag = W.sum(axis=1).A1
diag_half = np.sqrt(diag)
W_norm = W.tocoo(copy=True)
W_norm.data /= diag_half[W_norm.row]
W_norm.data /= diag_half[W_norm.col]
W_norm = W_norm.tocsr()
return W_norm, diag, diag_half
def calc_von_neumann_entropy(lambdas: List[float], t: float) -> float:
etas = 1.0 - lambdas ** t
etas = etas / etas.sum()
return entropy(etas)
def find_knee_point(x: List[float], y: List[float]) -> int:
""" Return the knee point, which is defined as the point furthest from line between two end points
"""
p1 = np.array((x[0], y[0]))
p2 = np.array((x[-1], y[-1]))
length_p12 = np.linalg.norm(p2 - p1)
max_dis = 0.0
knee = 0
for cand_knee in range(1, len(x) - 1):
p3 = np.array((x[cand_knee], y[cand_knee]))
dis = np.linalg.norm(np.cross(p2 - p1, p3 - p1)) / length_p12
if max_dis < dis:
max_dis = dis
knee = cand_knee
return knee
def calculate_diffusion_map(
W: csr_matrix, n_components: int, solver: str, max_t: int, n_jobs: int, random_state: int,
) -> Tuple[np.array, np.array, np.array]:
assert issparse(W)
nc, labels = connected_components(W, directed=True, connection="strong")
logger.info("Calculating connected components is done.")
assert nc == 1
W_norm, diag, diag_half = calculate_normalized_affinity(W.astype(np.float64)) # use double precision to guarantee reproducibility
logger.info("Calculating normalized affinity matrix is done.")
n_jobs = eff_n_jobs(n_jobs)
with threadpool_limits(limits = n_jobs):
if solver == "eigsh":
np.random.seed(random_state)
v0 = np.random.uniform(-1.0, 1.0, W_norm.shape[0])
Lambda, U = eigsh(W_norm, k=n_components, v0=v0)
Lambda = Lambda[::-1]
U = U[:, ::-1]
else:
assert solver == "randomized"
U, S, VT = randomized_svd(
W_norm, n_components=n_components, random_state=random_state
)
signs = np.sign((U * VT.transpose()).sum(axis=0)) # get eigenvalue signs
Lambda = signs * S # get eigenvalues
# remove the first eigen value and vector
Lambda = Lambda[1:]
U = U[:, 1:]
Phi = U / diag_half[:, np.newaxis]
if max_t == -1:
Lambda_new = Lambda / (1.0 - Lambda)
else:
# Find the knee point
x = np.array(range(1, max_t + 1), dtype = float)
y = np.array([calc_von_neumann_entropy(Lambda, t) for t in x])
t = x[find_knee_point(x, y)]
logger.info("Detected knee point at t = {:.0f}.".format(t))
# U_df = U * Lambda #symmetric diffusion component
Lambda_new = Lambda * ((1.0 - Lambda ** t) / (1.0 - Lambda))
Phi_pt = Phi * Lambda_new # asym pseudo component
return Phi_pt, Lambda, Phi # , U_df, W_norm
[docs]@timer(logger=logger)
def diffmap(
data: MultimodalData,
n_components: int = 100,
rep: str = "pca",
solver: str = "eigsh",
max_t: float = 5000,
n_jobs: int = -1,
random_state: int = 0,
) -> None:
"""Calculate Diffusion Map.
Parameters
----------
data: ``pegasusio.MultimodalData``
Annotated data matrix with rows for cells and columns for genes.
n_components: ``int``, optional, default: ``100``
Number of diffusion components to calculate.
rep: ``str``, optional, default: ``"pca"``
Embedding Representation of data used for calculating the Diffusion Map. By default, use PCA coordinates.
solver: ``str``, optional, default: ``"eigsh"``
Solver for eigen decomposition:
* ``"eigsh"``: default setting. Use *scipy* `eigsh <https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.linalg.eigsh.html>`_ as the solver to find eigenvalus and eigenvectors using the Implicitly Restarted Lanczos Method.
* ``"randomized"``: Use *scikit-learn* `randomized_svd <https://scikit-learn.org/stable/modules/generated/sklearn.utils.extmath.randomized_svd.html>`_ as the solver to calculate a truncated randomized SVD.
max_t: ``float``, optional, default: ``5000``
pegasus tries to determine the best t to sum up to between ``[1, max_t]``.
n_jobs : `int`, optional (default: -1)
Number of threads to use. -1 refers to using all physical CPU cores.
random_state: ``int``, optional, default: ``0``
Random seed set for reproducing results.
Returns
-------
``None``
Update ``data.obsm``:
* ``data.obsm["X_diffmap"]``: Diffusion Map matrix of the data.
Update ``data.uns``:
* ``data.uns["diffmap_evals"]``: Eigenvalues corresponding to Diffusion Map matrix.
Examples
--------
>>> pg.diffmap(data)
"""
rep = update_rep(rep)
Phi_pt, Lambda, Phi = calculate_diffusion_map(
W_from_rep(data, rep),
n_components=n_components,
solver=solver,
max_t = max_t,
n_jobs = n_jobs,
random_state=random_state,
)
data.obsm["X_diffmap"] = np.ascontiguousarray(Phi_pt, dtype=np.float32)
data.uns["diffmap_evals"] = Lambda.astype(np.float32)
data.obsm["X_phi"] = np.ascontiguousarray(Phi, dtype=np.float32)
# data.uns['W_norm'] = W_norm
# data.obsm['X_dmnorm'] = U_df
# remove previous FLE calculations
data.uns.pop("diffmap_knn_indices", None)
data.uns.pop("diffmap_knn_distances", None)
data.uns.pop("W_diffmap", None)