Source code for kgx.transformers.transformer
import networkx as nx
import json, time, click, logging
from typing import Union, List, Dict, Tuple
from networkx.readwrite import json_graph
from kgx.utils.graph_utils import get_category_via_superclass
from kgx.utils.kgx_utils import get_toolkit, get_biolink_mapping, sentencecase_to_snakecase
from kgx.mapper import clique_merge
SimpleValue = Union[List[str], str]
IGNORE_CLASSES = ['All', 'entity']
ADDITIONAL_LABELS = {
'phenotypic_abnormality': 'phenotypic_feature',
'clinical_course': 'phenotypic_feature',
'blood_group': 'phenotypic_feature',
'clinical_modifier': 'phenotypic_feature',
'frequency': 'phenotypic_feature',
'mode_of_inheritance': 'phenotypic_feature',
'past_medical_history': 'phenotypic_feature'
}
[docs]class Transformer(object):
"""
Base class for performing a transformation.
This can be,
- from a source to an in-memory property graph (networkx.MultiDiGraph)
- from an in-memory property graph to a target format or database (Neo4j, CSV, RDF Triple Store, TTL)
"""
DEFAULT_NODE_LABEL = 'named_thing'
def __init__(self, source_graph: nx.MultiDiGraph = None):
if source_graph:
self.graph = source_graph
else:
self.graph = nx.MultiDiGraph()
self.filters = {}
self.graph_metadata = {}
[docs] def report(self) -> None:
"""
Print a summary report about self.graph
"""
logging.info('Total nodes in {}: {}'.format(self.graph.name or 'graph', len(self.graph.nodes())))
logging.info('Total edges in {}: {}'.format(self.graph.name or 'graph', len(self.graph.edges())))
[docs] def is_empty(self) -> bool:
"""
Check whether self.graph is empty.
Returns
-------
bool
A boolean value asserting whether the graph is empty or not
"""
return len(self.graph.nodes()) == 0 and len(self.graph.edges()) == 0
[docs] def set_filter(self, key: str, value: SimpleValue) -> None:
"""
Set a filter, defined by a key and value pair.
These filters are used to reduce the search space.
Parameters
----------
key: str
The key for a filter
value: Union[List[str], str]
The value for a filter. Can be either a string or a list
"""
self.filters[key] = value
[docs] def categorize(self):
"""
Find and validate category for every node in self.graph
"""
node_to_categories = {}
preserve = {}
for n, data in self.graph.nodes(data=True):
logging.info("Processing node {}".format(n))
new_categories = set()
if 'category' in data:
categories = data['category']
preserve[n] = data['category']
for category in categories:
element = get_biolink_mapping(category)
if element is not None:
# there is a direct mapping to a BioLink Model class
mapped_category = element['name']
logging.debug("Category: {} has a direct mapping to BioLink Model class {}".format(category, mapped_category))
new_categories.update([mapped_category])
else:
if category in ADDITIONAL_LABELS:
element = get_biolink_mapping(ADDITIONAL_LABELS[category])
if element is not None:
# take a look at an additional list of mappings
mapped_category = element['name']
logging.debug("Category: {} mapped over to {} has a direct mapping to BioLink Model class {}".format(category, ADDITIONAL_LABELS[category], mapped_category))
new_categories.update([mapped_category])
else:
# subClassOf traversal required
# assuming that the graph contains subClassOf edges
# and the node subClassOf x
new_categories.update(get_category_via_superclass(self.graph, category))
else:
# try via subClassOf
# subClassOf traversal required
# assuming that the graph contains subClassOf edges
# and the node subClassOf x
logging.info("node doesn't have a category field; trying to infer category via subclass_of axiom")
for u, v, edge_data in self.graph.edges(n, data=True):
logging.info("u: {} v: {} data: {}".format(u, v, edge_data))
if edge_data['edge_label'] == 'subclass_of':
curie = v
new_categories.update(get_category_via_superclass(self.graph, curie))
new_categories = [sentencecase_to_snakecase(x) for x in new_categories]
if len(new_categories) == 0:
new_categories.append('named_thing')
logging.debug("Output categories: {}".format(new_categories))
node_to_categories[n] = new_categories
nx.set_node_attributes(self.graph, node_to_categories, 'category')
nx.set_node_attributes(self.graph, preserve, '_old_category')
[docs] def merge_graphs(self, graphs: List[nx.MultiDiGraph]) -> None:
"""
Merge all graphs with ``self.graph``
- If two nodes with same 'id' exist in two graphs, the nodes will be merged based on the 'id'
- If two nodes with the same 'id' exists in two graphs and they both have conflicting values
for a property, then the value is overwritten from left to right
- If two edges with the same 'key' exists in two graphs, the edge will be merged based on the
'key' property
- If two edges with the same 'key' exists in two graphs and they both have one or more conflicting
values for a property, then the value is overwritten from left to right
Parameters
----------
graphs: List[networkx.MultiDiGraph]
List of graphs that are to be merged with self.graph
"""
# TODO: Check behavior and consistency
graphs.insert(0, self.graph)
self.graph = nx.compose_all(graphs)
[docs] def remap_node_identifier(self, type: str, new_property: str, prefix=None) -> None:
"""
Remap a node's 'id' attribute with value from a node's ``new_property`` attribute.
Parameters
----------
type: string
label referring to nodes whose 'id' needs to be remapped
new_property: string
property name from which the new value is pulled from
prefix: string
signifies that the value for ``new_property`` is a list and the ``prefix`` indicates which value
to pick from the list
"""
#TODO: test functionality and extend further
mapping = {}
for nid, data in self.graph.nodes(data=True):
node_data = data.copy()
if type not in node_data['category']:
continue
if new_property in node_data:
if prefix:
# data[new_property] contains a list of values
new_property_values = node_data[new_property]
for v in new_property_values:
if prefix in v:
# take the first occurring value that contains the given prefix
if 'HGNC:HGNC:' in v:
# TODO: this is a temporary fix and must be removed later
v = ':'.join(v.split(':')[1:])
mapping[nid] = v
break
else:
# node_data[new_property] contains a string value
mapping[nid] = node_data[new_property]
else:
# node does not contain new_property key; fall back to original node 'id'
mapping[nid] = nid
# TODO: is there a better way to do this in networkx 2.x?
nx.set_node_attributes(self.graph, values=mapping, name='id')
nx.relabel_nodes(self.graph, mapping, copy=False)
# update 'subject' of all outgoing edges
updated_subject_values = {}
for edge in self.graph.out_edges(keys=True):
updated_subject_values[edge] = edge[0]
nx.set_edge_attributes(self.graph, values=updated_subject_values, name='subject')
# update 'object' of all incoming edges
updated_object_values = {}
for edge in self.graph.in_edges(keys=True):
updated_object_values[edge] = edge[1]
nx.set_edge_attributes(self.graph, values=updated_object_values, name='object')
[docs] def remap_node_property(self, type: str, old_property: str, new_property: str) -> None:
"""
Remap the value in node ``old_property`` attribute with value from node ``new_property`` attribute.
Parameters
----------
type: string
label referring to nodes whose property needs to be remapped
old_property: string
old property name whose value needs to be replaced
new_property: string
new property name from which the value is pulled from
"""
# TODO: is there a better way to do this in networkx 2.x?
mapping = {}
for nid, data in self.graph.nodes(data=True):
node_data = data.copy()
if type not in node_data['category']:
continue
if new_property in node_data:
mapping[nid] = node_data[new_property]
elif old_property in node_data:
mapping[nid] = node_data[old_property]
nx.set_node_attributes(self.graph, values=mapping, name=old_property)
[docs] def remap_edge_property(self, type: str, old_property: str, new_property: str) -> None:
"""
Remap the value in edge ``old_property`` attribute with value from edge ``new_property`` attribute.
Parameters
----------
type: string
label referring to edges whose property needs to be remapped
old_property: string
old property name whose value needs to be replaced
new_property: string
new property name from which the value is pulled from
"""
# TODO: is there a better way to do this in networkx 2.x?
mapping = {}
for edge, data in self.graph.edges(data=True, keys=True):
edge_key = edge[0:3]
edge_data = data.copy()
if type not in edge_data['edge_label']:
continue
if new_property in edge_data:
mapping[edge_key] = edge_data[new_property]
else:
mapping[edge_key] = edge_data[old_property]
nx.set_edge_attributes(self.graph, values=mapping, name=old_property)
[docs] @staticmethod
def dump(g: nx.MultiDiGraph) -> Dict:
"""
Convert networkx.MultiDiGraph as a dictionary.
Parameters
----------
g: networkx.MultiDiGraph
Graph to convert as a dictionary
Returns
-------
dict
A dictionary
"""
data = json_graph.node_link_data(g)
return data
[docs] @staticmethod
def dump_to_file(g: nx.MultiDiGraph, filename: str) -> None:
"""
Serialize networkx.MultiDiGraph as JSON and write to file.
Parameters
----------
g: networkx.MultiDiGraph
Graph to convert as a dictionary
filename: str
File to write the JSON
"""
FH = open(filename, "w")
json_data = Transformer.dump(g)
FH.write(json.dumps(json_data))
FH.close()
[docs] @staticmethod
def restore(data: Dict) -> nx.MultiDiGraph:
"""
Deserialize a networkx.MultiDiGraph from a dictionary.
Parameters
----------
data: dict
Dictionary containing nodes and edges
Returns
-------
networkx.MultiDiGraph
A networkx.MultiDiGraph representation
"""
g = json_graph.node_link_graph(data)
return g
[docs] @staticmethod
def restore_from_file(filename) -> nx.MultiDiGraph:
"""
Deserialize a networkx.MultiDiGraph from a JSON file.
Parameters
----------
filename: str
File to read from
Returns
-------
networkx.MultiDiGraph
A networkx.MultiDiGraph representation
"""
FH = open(filename, "r")
data = FH.read()
g = Transformer.restore(json.loads(data))
return g
@staticmethod
def current_time_in_millis():
# TODO: move to Utils (and others)
return int(round(time.time() * 1000))
[docs] @staticmethod
def validate_node(node: dict) -> dict:
"""
Given a node as a dictionary, check for required properties.
This method will return the node dictionary with default assumptions applied, if any.
Parameters
----------
node: dict
A node represented as a dict
Returns
-------
dict
A node represented as a dict, with default assumptions applied.
"""
if len(node) == 0:
logging.warning("Empty node encountered: {}".format(node))
return node
if 'id' not in node:
raise KeyError("node does not have 'id' property: {}".format(node))
if 'name' not in node:
logging.warning("node does not have 'name' property: {}".format(node))
if 'category' not in node:
logging.warning("node does not have 'category' property: {}\nUsing {} as default".format(node, Transformer.DEFAULT_NODE_LABEL))
node['category'] = [Transformer.DEFAULT_NODE_LABEL]
return node
[docs] @staticmethod
def validate_edge(edge: dict) -> dict:
"""
Given an edge as a dictionary, check for required properties.
This method will return the edge dictionary with default assumptions applied, if any.
Parameters
----------
edge: dict
An edge represented as a dict
Returns
-------
dict
An edge represented as a dict, with default assumptions applied.
"""
if 'subject' not in edge:
raise KeyError("edge does not have 'subject' property: {}".format(edge))
if 'edge_label' not in edge:
raise KeyError("edge does not have 'edge_label' property: {}".format(edge))
if 'object' not in edge:
raise KeyError("edge does not have 'object' property: {}".format(edge))
return edge