Source code for pegasus.annotate_cluster.annotate_cluster

import numpy as np
import pandas as pd
import json

from sys import stdout
from natsort import natsorted
from typing import List, Dict, Union
from anndata import AnnData
from io import IOBase

import logging
logger = logging.getLogger(__name__)

from pegasusio import timer, MultimodalData, UnimodalData


class CellType:
    def __init__(self, name: str, ignore_nonde: bool = False):
        self.name = name
        self.score = self.avgp = 0.0
        self.weak_support = []
        self.strong_support = []
        self.subtypes = None
        self.ignore_nonde = ignore_nonde

    def evaluate(
        self,
        obj: dict,
        de_up: pd.DataFrame,
        de_down: pd.DataFrame,
        thre: float,
    ):
        """ Calculate score for matching a cluster with a putative cell type.
        """
        self.score = self.avgp = 0.0
        self.weak_support = []
        self.strong_support = []

        nump = 0
        for marker_set in obj["markers"]:
            numer = 0.0
            denom = len(marker_set["genes"]) * 2.0
            if denom == 0.0:
                continue

            for marker in marker_set["genes"]:
                sign = marker[-1]
                gsym = marker[:-1]

                if sign == "+":
                    if gsym in de_up.index:
                        fc = de_up.at[gsym, "fc"]
                        percent = de_up.at[gsym, "percent"]
                        self.avgp += percent
                        nump += 1

                        if fc >= thre:
                            numer += 2.0
                            self.strong_support.append(
                                (marker, f"{percent:.2f}%")
                            )
                        else:
                            numer += 1.0 + (fc - 1.0) / (thre - 1.0)
                            self.weak_support.append(
                                (marker, f"{percent:.2f}%")
                            )
                else:
                    assert sign == "-"
                    if gsym not in de_up.index:
                        if gsym in de_down.index:
                            fc = (
                                (1.0 / de_down.at[gsym, "fc"])
                                if de_down.at[gsym, "fc"] > 0.0
                                else np.inf
                            )
                            percent = de_down.at[gsym, "percent"]
                            if fc >= thre:
                                numer += 2.0
                                self.strong_support.append(
                                    (marker, f"{percent:.2f}%")
                                )
                            else:
                                numer += 1.0 + (fc - 1.0) / (thre - 1.0)
                                self.weak_support.append(
                                    (marker, f"{percent:.2f}%")
                                )
                        elif not self.ignore_nonde:
                            numer += 1.0
                            self.weak_support.append((marker, "N/A"))

            self.score += numer / denom * marker_set["weight"]

        self.score = (
            self.score / obj["denominator"] if obj["denominator"] > 0.0 else 0.0
        )
        if nump > 0:
            self.avgp /= nump

    def __repr__(self):
        res = f"name: {self.name}; score: {self.score:.2f}; average marker percentage: {self.avgp:.2f}%"
        if len(self.strong_support) > 0:
            res += "; strong support: {0}".format(
                ",".join([f"({x[0]},{x[1]})" for x in self.strong_support])
            )
        if len(self.weak_support) > 0:
            res += "; weak support: {0}".format(
                ",".join([f"({x[0]},{x[1]})" for x in self.weak_support])
            )

        return res


class Annotator:
    def __init__(self, markers: Dict, genes: List[str]) -> None:
        self.object = markers
        self.recalibrate(self.object, genes)

    def recalibrate(self, obj: dict, genes: List[str]) -> None:
        """ Remove markers that are not expressed (not in genes) and calculate partial weights for existing genes.
        """
        for celltype in obj["cell_types"]:
            denom = 0.0
            for marker_set in celltype["markers"]:
                markers = marker_set["genes"]
                s = len(markers)
                marker_set["genes"] = [x for x in markers if x[:-1] in genes]
                new_s = len(marker_set["genes"])
                marker_set["weight"] = marker_set["weight"] / s * new_s
                denom += marker_set["weight"]
            celltype["denominator"] = denom
            sub_obj = celltype.get("subtypes", None)
            if sub_obj is not None:
                self.recalibrate(sub_obj, genes)

    def evaluate(
        self,
        de_up: pd.DataFrame,
        de_down: pd.DataFrame,
        fc_thre: float = 1.5,
        threshold: float = 0.5,
        ignore_nonde: bool = False,
        obj: dict = None,
    ):
        """ Evaluate a cluster to determine its putative cell type.
        """
        if obj is None:
            obj = self.object

        results = []
        for celltype in obj["cell_types"]:
            ct = CellType(celltype["name"], ignore_nonde=ignore_nonde)
            ct.evaluate(celltype, de_up, de_down, fc_thre)
            if ct.score >= threshold:
                sub_obj = celltype.get("subtypes", None)
                if sub_obj is not None:
                    ct.subtypes = self.evaluate(
                        de_up,
                        de_down,
                        fc_thre=fc_thre,
                        ignore_nonde=ignore_nonde,
                        obj=sub_obj,
                    )
                results.append(ct)

        results.sort(key=lambda x: x.score, reverse=True)

        return results

    def report(
        self,
        fout: IOBase,
        ct_list: List["CellType"],
        space: int = 4,
    ) -> None:
        """ Write putative cell type reports to fout.
        """
        for ct in ct_list:
            fout.write(" " * space + str(ct) + "\n")
            if ct.subtypes is not None:
                self.report(fout, ct.subtypes, space + 4)


def infer_cluster_names(
    cell_type_dict: Dict[str, List["CellType"]], threshold: float = 0.5, is_human_immune: bool = False
) -> List[str]:
    """Decide cluster names based on cell types automatically.

    Parameters
    ----------
    cell_type_dict: ``Dict[str, List["CellType"]]``
        Python dictionary of cell type list for each cluster. This is the output of ``pg.infer_cell_types``.

    threshold: ``float``, optional, default: ``0.5``
        A threshold for cell type result reported. It should be a real number between ``0.0`` and ``1.0``.

    is_human_immune: ``bool``, optional, default: False
        If cell types are annotated using the Pegasus' human_immune markers, apply a smarter naming algorithm.

    Returns
    -------
    ``List[str]``
        A list of cluster names decided by their corresponding cell types. The order is consistent with clusters.

    Examples
    --------
    >>> cell_type_dict = pg.infer_cell_types(adata, markers = 'human_immune', de_test = 't')
    >>> cluster_names = pg.infer_cluster_names(cell_type_dict)
    """
    from collections import Counter

    cluster_ids = natsorted(cell_type_dict.keys())
    names = []
    name_dict = Counter()
    for cluster_id in cluster_ids:
        ct_list = cell_type_dict[cluster_id]

        if len(ct_list) == 0 or ct_list[0].score < threshold:
            cell_name = cluster_id
        else:
            ct = ct_list[0]
            if is_human_immune and ct.name == "T cell":
                subname = None
                has_naive_t = False
                for subt in ct.subtypes:
                    if subt.score >= threshold and (subt.name != "T regulatory cell" or subt.avgp > 0.5):
                        if subt.name == "Naive T cell" and subt.score >= 0.6:
                            has_naive_t = True
                        elif subname is None:
                            subname = subt.name
                if subname is None:
                    cell_name = "Naive T cell" if has_naive_t else "T cell"
                elif has_naive_t and (subname in ["T helper cell", "Cytotoxic T cell"]):
                    cell_name = "CD4+ Naive T cell" if subname == "T helper cell" else "CD8+ Naive T cell"
                else:
                    cell_name = subname
            elif is_human_immune and ct.name == "CD1C+ dendritic cell":
                cell_name = ct.name
                for ctype in ct_list[1:]:
                    if ctype.score >= threshold and ctype.name == "CLEC9A+ dendritic cell":
                        cell_name = "Conventional dendritic cell (CD1C+/CLEC9A+)"
                        break
            else:
                while ct.subtypes is not None and len(ct.subtypes) > 0 and ct.subtypes[0].score >= threshold:
                    ct = ct.subtypes[0]
                cell_name = ct.name

            name_dict[cell_name] += 1
            if name_dict[cell_name] > 1:
                cell_name = f"{cell_name}-{name_dict[cell_name]}"

        names.append(cell_name)

    return names


[docs]def infer_cell_types( data: Union[MultimodalData, UnimodalData, AnnData], markers: Union[str, Dict], de_test: str = "mwu", de_alpha: float = 0.05, de_key: str = "de_res", threshold: float = 0.5, ignore_nonde: bool = False, output_file: str = None, ) -> Dict[str, List["CellType"]]: """Infer putative cell types for each cluster using legacy markers. Parameters ---------- data : ``MultimodalData``, ``UnimodalData``, or ``anndata.AnnData``. Data structure of count matrix and DE analysis results. markers : ``str`` or ``Dict`` * If ``str``, it is a string representing a comma-separated list; each element in the list * either refers to a JSON file containing legacy markers, or predefined markers * ``'human_immune'`` for human immune cells; * ``'mouse_immune'`` for mouse immune cells; * ``'human_brain'`` for human brain cells; * ``'mouse_brain'`` for mouse brain cells; * ``'human_lung'`` for human lung cells; * ``'mouse_lung'`` for mouse lung cells; * ``'mouse_liver'`` for mouse liver cells. * If ``Dict``, it refers to a Python dictionary describing the markers. de_test: ``str``, optional, default: ``"mwu"`` pegasus determines cell types using DE test results. This argument indicates which DE test result to use, can be either ``'t'``, ``'fisher'`` or ``'mwu'``. By default, it uses ``'mwu'``. de_alpha: ``float``, optional, default: ``0.05`` False discovery rate for controling family-wide error. de_key : ``str``, optional, default: ``"de_res"`` The keyword in ``data.varm`` that stores DE analysis results. threshold : ``float``, optional, defaut: ``0.5`` Only report putative cell types with a score larger than or equal to ``threshold``. ignore_nonde: ``bool``, optional, default: ``False`` Do not consider non DE genes as weak negative markers. output_file: ``str``, optional, default: ``None`` File name of output cluster annotation. If ``None``, do not write to any file. Returns ------- ``Dict[str, List["CellType"]]`` Python dictionary with cluster ID's being keys, and their corresponding cell type lists sortec by scores being values. Examples -------- >>> cell_type_dict = pg.infer_cell_types(adata, markers = 'human_immune,human_brain') """ if output_file is not None: fout = open(output_file, "w") import pkg_resources predefined_markers = dict( human_immune="human_immune_cell_markers.json", mouse_immune="mouse_immune_cell_markers.json", human_brain="human_brain_cell_markers.json", mouse_brain="mouse_brain_cell_markers.json", human_lung="human_lung_cell_markers.json", mouse_lung="mouse_lung_cell_markers.json", mouse_liver="mouse_liver_cell_markers.json", ) if isinstance(markers, str): tokens = markers.split(',') markers = None for token in tokens: if token in predefined_markers: token = pkg_resources.resource_filename( "pegasus.annotate_cluster", predefined_markers[token] ) with open(token) as fin: tmp_dict = json.load(fin) if markers is None: markers = tmp_dict else: markers["title"] = f"{markers['title']}/{tmp_dict['title']}" markers["cell_types"].extend(tmp_dict["cell_types"]) assert isinstance(markers, dict) anno = Annotator(markers, data.var_names) test2metric = {"mwu": "auroc", "t": "log2FC", "fisher": "percentage_fold_change"} metric = test2metric[de_test] thre = 0.5 if de_test == "mwu" else 0.0 coln = "percentage_fold_change" if de_test == "fisher" else "log2FC" clusts = natsorted( [ x.rpartition(":")[0] for x in data.varm[de_key].dtype.names if x.endswith(":auroc") ] ) cell_type_results = {} for clust_id in clusts: idx = data.varm[de_key][f"{clust_id}:{de_test}_qval"] <= de_alpha idx_up = idx & (data.varm[de_key][f"{clust_id}:{metric}"] > thre) idx_down = idx & (data.varm[de_key][f"{clust_id}:{metric}"] < thre) assert idx_up.sum() + idx_down.sum() == idx.sum() cols = [f"{clust_id}:{coln}", f"{clust_id}:percentage"] de_up = pd.DataFrame( data=data.varm[de_key][cols][idx_up], index=data.var_names[idx_up] ) de_up.rename(columns={cols[0]: "fc", cols[1]: "percent"}, inplace=True) de_down = pd.DataFrame( data=data.varm[de_key][cols][idx_down], index=data.var_names[idx_down] ) de_down.rename(columns={cols[0]: "fc", cols[1]: "percent"}, inplace=True) if de_test != "fisher": de_up["fc"] = 2.0 ** de_up["fc"] de_down["fc"] = 2.0 ** de_down["fc"] results = anno.evaluate(de_up, de_down, threshold=threshold, ignore_nonde=ignore_nonde) if output_file is not None: fout.write(f"Cluster {clust_id}:\n") anno.report(fout, results) cell_type_results[clust_id] = results if output_file is not None: fout.close() return cell_type_results
[docs]def annotate( data: Union[MultimodalData, UnimodalData,AnnData], name: str, based_on: str, anno_dict: Union[Dict[str, str], List[str]], ) -> None: """Add annotation to the data object as a categorical variable. Parameters ---------- data : ``MultimodalData``, ``UnimodalData``, or ``anndata.AnnData`` Gene-count matrix with DE analysis information. name : `str` Name of the new annotation in data.obs. based_on : `str` Name of the attribute the cluster ids coming from. anno_dict : `Dict[str, str]` or `List[str]` Dictionary mapping from cluster id to cell type. If it is a List, map cell types to cluster ids one to one in correspondence. Returns ------- None Examples -------- >>> pg.annotate(data, 'anno', 'spectral_louvain_labels', {'1': 'T cell', '2': 'B cell'}) >>> pg.annotate(data, 'anno', 'louvain_labels', ['T cell', 'B cell']) """ if isinstance(anno_dict, list): cluster_ids = data.obs[based_on].cat.categories.values.astype('str') anno_dict = dict(zip(cluster_ids, anno_dict)) from natsort import natsorted data.obs[name] = pd.Categorical([anno_dict[x] for x in data.obs[based_on]], categories = natsorted(np.unique(list(anno_dict.values()))))
@timer(logger=logger) def run_annotate_cluster( input_file: str, output_file: str, markers: str, de_test: str, de_alpha: float = 0.05, de_key: str = "de_res", threshold: float = 0.5, ignore_nonde: bool = False, ) -> None: """ For command line use. """ from pegasusio import read_input data = read_input(input_file, mode="r") infer_cell_types( data, markers, de_test, de_alpha=de_alpha, de_key=de_key, threshold=threshold, ignore_nonde=ignore_nonde, output_file=output_file, ) def annotate_data_object(input_file: str, annotation: str) -> None: """ For command line use. annotation: anno_name:clust_name:cell_type1;...cell_typen """ from pegasusio import read_input, write_output data = read_input(input_file, mode="r") anno_name, clust_name, anno_str = annotation.split(":") anno_dict = {str(i + 1): x for i, x in enumerate(anno_str.split(";"))} annotate(data, anno_name, clust_name, anno_dict) write_output(data, input_file)