import logging
from typing import Optional, Tuple
import networkx as nx
import stringcase
from kgx.utils.kgx_utils import generate_edge_key, get_toolkit, snakecase_to_sentencecase, sentencecase_to_snakecase
SAME_AS = 'same_as'
LEADER_ANNOTATION = 'clique_leader'
# TODO: Get the prefix priority order from BioLink Model
PREFIX_PRIORITIZATION_MAP = {
'gene': ['HGNC', 'NCBIGene', 'Ensembl'],
'genomic_entity': ['HGNC', 'NCBIGene', 'Ensembl'],
}
MAPPING = {}
[docs]class CliqueMerge(object):
"""
"""
def __init__(self, prefix_prioritization_map: dict = None):
self.toolkit = get_toolkit()
self.clique_graph = nx.Graph()
self.target_graph = None
if prefix_prioritization_map:
for x, v in prefix_prioritization_map:
PREFIX_PRIORITIZATION_MAP[x] = v
[docs] def build_cliques(self, target_graph: nx.MultiDiGraph):
"""
Builds a clique graph from ``same_as`` edges in ``target_graph``.
Parameters
----------
target_graph: networkx.MultiDiGraph
A MultiDiGraph that contains nodes and edges
Returns
-------
networkx.Graph
The clique graph with only ``same_as`` edges
"""
self.target_graph = target_graph
for u, v, data in target_graph.edges(data=True):
if 'edge_label' in data and data['edge_label'] == SAME_AS:
# load all same_as edges to self.clique_graph
self.clique_graph.add_node(u, **target_graph.nodes[u])
self.clique_graph.add_node(v, **target_graph.nodes[v])
self.clique_graph.add_edge(u, v, **data)
[docs] def update_categories(self, clique: list):
"""
For a given clique, get category for each node in clique and validate against BioLink Model,
mapping to BioLink Model category where needed.
Ex.: If a node has `gene` as its category, then this method adds all of its ancestors.
Parameters
----------
clique: list
A list of nodes from a clique
"""
updated_node_categories = {}
for node in clique:
data = self.clique_graph.nodes[node]
print(data)
if 'category' in data:
categories = data['category']
else:
# get category from equivalence
categories = self.get_category_from_equivalence(node, data)
extended_categories = set()
invalid_categories = []
for category in categories:
# TODO: this sentence case conversion needs to be handled properly
category = snakecase_to_sentencecase(category).lower()
logging.debug("Looking at category: {}".format(category))
element = self.toolkit.get_element(category)
if element:
# category exists in BioLink Model as a class or as an alias to a class
mapped_category = element['name']
ancestors = self.toolkit.ancestors(mapped_category)
if len(ancestors) > len(extended_categories):
# the category with the longest list of ancestors will be the most specific category
logging.debug("Ancestors for {} is larger than previous one".format(mapped_category))
extended_categories = ancestors
else:
logging.warning("[1] category '{}' not in BioLink Model".format(category))
invalid_categories.append(category)
logging.debug("Invalid categories: {}".format(invalid_categories))
extended_categories = [stringcase.snakecase(x).lower() for x in extended_categories]
for x in categories:
element = self.toolkit.get_element(x)
if element is None:
logging.warning("[2] category '{}' is not in BioLink Model".format(x))
continue
mapped_category = element['name']
if stringcase.snakecase(mapped_category).lower() not in extended_categories:
logging.warning("category '{}' not in ancestor closure: {}".format(stringcase.snakecase(mapped_category).lower(), extended_categories))
mapped = MAPPING[x] if x in MAPPING.keys() else x
if mapped not in extended_categories:
logging.warning("category '{}' is not even in any custom defined mapping. ".format(mapped_category))
invalid_categories.append(x)
update_dict = {'category': extended_categories}
if invalid_categories:
update_dict['_invalid_category'] = invalid_categories
updated_node_categories[node] = update_dict
logging.debug("Updating nodes in clique with: {}".format(updated_node_categories))
nx.set_node_attributes(self.clique_graph, updated_node_categories)
nx.set_node_attributes(self.target_graph, updated_node_categories)
[docs] def validate_categories(self, clique: list) -> Tuple[str, list]:
"""
For nodes in a clique, validate the category for each node to make sure that all nodes in a clique
are of the same type.
Parameters
----------
clique: list
A list of nodes from a clique
Returns
-------
tuple[str, list]
A tuple of clique category string and a list of invalid nodes
"""
invalid_nodes = []
all_categories = []
for node in clique:
logging.info(node)
node_data = self.clique_graph.nodes[node]
if 'category' in node_data and len(node_data['category']) > 0:
all_categories.append(node_data['category'][0])
if len(all_categories) == 0:
return None, None
(clique_category, clique_category_ancestors) = self.get_the_most_specific_category(all_categories)
logging.debug("Most specific category: {}".format(clique_category))
logging.debug("Most specific category ancestors: {}".format(clique_category_ancestors))
for node in clique:
data = self.clique_graph.nodes[node]
node_category = data['category'][0]
logging.debug("node_category: {}".format(node_category))
# TODO: this sentencecase to snakecase transition needs to be handled properly
ancestors = [sentencecase_to_snakecase(x) for x in clique_category_ancestors]
logging.debug("clique ancestors: {}".format(ancestors))
if node_category not in ancestors:
invalid_nodes.append(node)
logging.info("clique category '{}' does not match node: {}".format(clique_category, data))
# TODO: check if node category is a subclass of any of the ancestors via other ontologies
logging.info("Invalid Nodes: {}".format(invalid_nodes))
return clique_category, invalid_nodes
[docs] def get_the_most_specific_category(self, categories: list) -> Tuple[str, list]:
"""
From a list of categories, it tries to fetch ancestors for all.
The category with the longest ancestor is considered to be the most specific.
Parameters
----------
categories: list
A list of categories
Returns
-------
tuple[str, list]
A tuple of the most specific category and a list of ancestors of that category
"""
# TODO: could be integrated into update_categories method
most_specific_category = None
most_specific_category_ancestors = []
for category in categories:
logging.debug("category: {}".format(category))
formatted_category = snakecase_to_sentencecase(category)
logging.debug("formatted_category: {}".format(formatted_category))
element = self.toolkit.get_element(category)
if element:
# category exists in BioLink Model as a class or as an alias to a class
mapped_category = element['name']
ancestors = self.toolkit.ancestors(mapped_category)
logging.debug("ancestors: {}".format(ancestors))
if len(ancestors) > len(most_specific_category_ancestors):
# the category with the longest list of ancestors will be the most specific category
most_specific_category = category
most_specific_category_ancestors = ancestors
return most_specific_category, most_specific_category_ancestors
[docs] def elect_leader(self):
"""
Elect leader for each clique in a graph.
"""
cliques = list(nx.connected_components(self.clique_graph))
election_strategy = None
for clique in cliques:
clique_category = None
logging.info("Processing clique: {}".format(clique))
# first update all categories for nodes in a clique
self.update_categories(clique)
# validate categories of all nodes in a clique, while removing the ones that are not supposed to be in the clique
(clique_category, invalid_nodes) = self.validate_categories(clique)
if invalid_nodes:
logging.debug("Removing nodes {} as they are not supposed to be part of clique: {}".format(invalid_nodes, clique))
clique = [x for x in clique if x not in invalid_nodes]
for n in invalid_nodes:
self.clique_graph.remove_node(n)
# TODO: what about the original equivalentClass edge that made this incorrect assertion?
if clique_category:
leader = None
# First check for LEADER_ANNOTATION property
(leader, election_strategy) = self.get_leader_by_annotation(clique)
if leader is None:
# If leader is None, then use prefix prioritization
logging.debug("Could not elect clique leader by looking for LEADER_ANNOTATION property; Using prefix prioritization instead")
# assuming that all nodes in a clique belong to the same category
if clique_category in PREFIX_PRIORITIZATION_MAP.keys():
(leader, election_strategy) = self.get_leader_by_prefix_priority(clique, PREFIX_PRIORITIZATION_MAP[clique_category])
else:
logging.debug("No prefix order found for category '{}' in PREFIX_PRIORITIZATION_MAP".format(clique_category))
if leader is None:
# If leader is still None then fall back to alphabetical sort on prefixes
logging.info("Could not elect clique leader by PREFIX_PRIORITIZATION; Using alphabetical sort on prefixes")
(leader, election_strategy) = self.get_leader_by_sort(clique)
logging.debug("Elected {} as leader via {} for clique {}".format(leader, election_strategy, clique))
self.clique_graph.nodes[leader][LEADER_ANNOTATION] = True
self.target_graph.nodes[leader][LEADER_ANNOTATION] = True
self.clique_graph.nodes[leader]['election_strategy'] = election_strategy
self.target_graph.nodes[leader]['election_strategy'] = election_strategy
[docs] def get_leader_by_annotation(self, clique: list) -> Tuple[Optional[str], Optional[str]]:
"""
Get leader by searching for leader annotation property in any of the nodes in a given clique.
Parameters
----------
clique: list
A list of nodes from a clique
Returns
-------
tuple[Optional[str], Optional[str]]
A tuple containing the node that has been elected as the leader, and the election strategy
"""
leader = None
election_strategy = None
for node in clique:
attributes = self.clique_graph.nodes[node]
if LEADER_ANNOTATION in attributes and eval(attributes[LEADER_ANNOTATION]):
logging.debug("Node {} in clique has LEADER_ANNOTATION property; electing it as clique leader".format(node))
election_strategy = 'LEADER_ANNOTATION'
return leader, election_strategy
[docs] def get_leader_by_prefix_priority(self, clique: list, prefix_priority_list: list) -> Tuple[Optional[str], Optional[str]]:
"""
Get leader from clique based on a given prefix priority.
Parameters
----------
clique: list
A list of nodes that correspond to a clique
prefix_priority_list: list
A list of prefixes in descending priority
Returns
-------
tuple[Optional[str], Optional[str]]
A tuple containing the node that has been elected as the leader, and the election strategy
"""
leader = None
election_strategy = None
for prefix in prefix_priority_list:
logging.debug("Checking for prefix {} in {}".format(prefix, clique))
leader = next((s for s in clique if prefix in s), None)
if leader:
election_strategy = "PREFIX_PRIORITIZATION"
break
return leader, election_strategy
[docs] def get_leader_by_sort(self, clique: list) -> Tuple[Optional[str], Optional[str]]:
"""
Get leader from clique based on the first selection from an alphabetical sort of the node id prefixes.
Parameters
----------
clique: list
A list of nodes that correspond to a clique
Returns
-------
tuple[Optional[str], Optional[str]]
A tuple containing the node that has been elected as the leader, and the election strategy
"""
election_strategy = 'ALPHABETICAL_SORT'
prefixes = [x.split(':', 1)[0] for x in clique]
prefixes.sort()
leader_prefix = prefixes[0]
print("clique: {} leader_prefix: {}".format(clique, leader_prefix))
leader = [x for x in clique if leader_prefix in x]
return leader[0], election_strategy
[docs] def consolidate_edges(self) -> nx.MultiDiGraph:
"""
Move all edges from nodes in a clique to the clique leader.
Returns
-------
nx.MultiDiGraph
The target graph where all edges from nodes in a clique are moved to clique leader
"""
cliques = list(nx.connected_components(self.clique_graph))
for clique in cliques:
logging.info("processing clique: {}".format(clique))
leader = [x for x in clique if LEADER_ANNOTATION in self.clique_graph.nodes[x] and self.clique_graph.nodes[x][LEADER_ANNOTATION]]
if len(leader) == 0:
logging.debug("No leader for clique {}; skipping".format(clique))
continue
else:
leader = leader[0]
nx.set_node_attributes(self.target_graph, {leader: {LEADER_ANNOTATION: self.clique_graph.nodes[leader].get(LEADER_ANNOTATION), 'election_strategy': self.clique_graph.nodes[leader].get('election_strategy')}})
for node in clique:
if node == leader:
continue
in_edges = self.target_graph.in_edges(node, True)
filtered_in_edges = [x for x in in_edges if x[2]['edge_label'] != SAME_AS]
print("IN EDGES: {}".format(filtered_in_edges))
equiv_in_edges = [x for x in in_edges if x[2]['edge_label'] == SAME_AS]
logging.debug("Moving {} in-edges from {} to {}".format(len(in_edges), node, leader))
for u, v, edge_data in filtered_in_edges:
key = generate_edge_key(u, edge_data['edge_label'], v)
self.target_graph.remove_edge(u, v, key=key)
edge_data['_original_subject'] = edge_data['subject']
edge_data['_original_object'] = edge_data['object']
edge_data['object'] = leader
key = generate_edge_key(u, edge_data['edge_label'], leader)
self.target_graph.add_edge(edge_data['subject'], edge_data['object'], key, **edge_data)
out_edges = self.target_graph.out_edges(node, True)
filtered_out_edges = [x for x in out_edges if x[2]['edge_label'] != SAME_AS]
equiv_out_edges = [x for x in out_edges if x[2]['edge_label'] == SAME_AS]
logging.debug("Moving {} out-edges from {} to {}".format(len(out_edges), node, leader))
for u, v, edge_data in filtered_out_edges:
key = generate_edge_key(u, edge_data['edge_label'], v)
self.target_graph.remove_edge(u, v, key=key)
edge_data['_original_subject'] = edge_data['subject']
edge_data['_original_object'] = edge_data['object']
edge_data['subject'] = leader
key = generate_edge_key(leader, edge_data['edge_label'], v)
self.target_graph.add_edge(edge_data['subject'], edge_data['object'], key, **edge_data)
aliases = self.target_graph.nodes[leader].get('aliases') if 'aliases' in self.target_graph.nodes[leader] else []
for u, v, edge_data in equiv_in_edges:
if u != leader:
aliases.append(u)
if v != leader:
aliases.append(v)
self.target_graph.remove_edge(u, v, key=generate_edge_key(u, SAME_AS, v))
logging.debug("equiv out edges: {}".format(equiv_out_edges))
for u, v, edge_data in equiv_out_edges:
if u != leader:
logging.debug("{} is an alias of leader {}".format(u, leader))
aliases.append(u)
if v != leader:
logging.debug("{} is an alias of leader {}".format(v, leader))
aliases.append(v)
self.target_graph.remove_edge(u, v, key=generate_edge_key(u, SAME_AS, v))
# set aliases for leader
nx.set_node_attributes(self.target_graph, {leader: {'aliases': aliases}})
# remove all node instances of aliases
self.target_graph.remove_nodes_from(aliases)
return self.target_graph
[docs] def get_category_from_equivalence(self, node: str, attributes: dict) -> str:
"""
Get category for a node based on its equivalent nodes in a graph.
Parameters
----------
node: str
Node identifier
attributes: dict
Node's attributes
Returns
-------
str
Category for the node
"""
category = []
for u, v, data in self.clique_graph.edges(node, data=True):
if data['edge_label'] == 'same_as':
if u == node:
category = self.clique_graph.nodes[v]['category']
break
elif v == node:
category = self.clique_graph.nodes[u]['category']
break
update = {node: {'category': category}}
nx.set_node_attributes(self.clique_graph, update)
return category