Source code for kgx.transformers.sparql_transformer

import logging

import rdflib
from rdflib import URIRef
from requests import HTTPError
import networkx as nx
from typing import Set, List, Dict, Generator

from pystache import render
from SPARQLWrapper import SPARQLWrapper, JSON, POSTDIRECTLY
from itertools import zip_longest
from kgx.transformers.transformer import Transformer
from kgx.transformers.rdf_graph_mixin import RdfGraphMixin


[docs]class SparqlTransformer(RdfGraphMixin, Transformer): """ Transformer for communicating with a SPARQL endpoint. """ # TODO: fix query edge_query = """ PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> PREFIX owl: <http://www.w3.org/2002/07/owl#> SELECT * WHERE { {{#predicate}} ?predicate rdfs:subPropertyOf* {{{predicate}}} . {{/predicate}} {{#subject_category}} ?subject (rdf:type?/rdfs:subClassOf*) {{{subject_category}}} . {{/subject_category}} {{#object_category}} ?object (rdf:type?/rdfs:subClassOf*) {{{object_category}}} . {{/object_category}} ?subject ?predicate ?object . } """ def __init__(self, source_graph: nx.MultiDiGraph = None, url: str = None): super().__init__(source_graph) # set the URL for SPARQL endpoint self.url = url
[docs] def load_networkx_graph(self, rdfgraph: rdflib.Graph = None, predicates: Set[URIRef] = None, **kwargs) -> None: """ Fetch triples from the SPARQL endpoint and load them as edges. Parameters ---------- rdfgraph: rdflib.Graph A rdflib Graph (unused) predicates: set A set containing predicates in rdflib.URIRef form kwargs: dict Any additional arguments. """ for predicate in predicates: predicate = '<{}>'.format(predicate) q = render(self.edge_query, {'predicate': predicate}) results = self.query(q) for r in results: s = r['subject']['value'] p = r['predicate']['value'] o = r['object']['value'] if r['object']['type'] == 'literal': self.add_node_attribute(s, key=p, value=o) else: self.add_edge(s, o, p)
[docs] def query(self, q: str) -> Dict: """ Query a SPARQL endpoint. Parameters ---------- q: str The query string Returns ------- dict A dictionary containing results from the query """ sparql = SPARQLWrapper(self.url) sparql.setQuery(q) sparql.setReturnFormat(JSON) logging.info("Query: {}".format(q)) results = sparql.query().convert() bindings = results['results']['bindings'] logging.info("Rows fetched: {}".format(len(bindings))) return bindings
[docs] def get_filters(self) -> Dict: """ Gets the current filter map, transforming if necessary. Returns ------- dict Returns a dictionary with all filters """ d = {} for k, v in self.filters.items(): # TODO: use biolink map here d[k] = v return d
[docs]class MonarchSparqlTransformer(SparqlTransformer): """ see neo_transformer for discussion """ # OBAN-specific query edge_query = """ SELECT ?subject ?predicate ?object ?prop ?val WHERE { ?a a Association: ; subject: ?subject ; predicate: ?predicate ; subject: ?object ; ?prop ?val . {{#predicate}} ?predicate rdfs:subPropertyOf* {{{predicate}}} . {{/predicate}} {{#subject_category}} ?subject (rdf:type?/rdfs:subClassOf*) {{{subject_category}}} . {{/subject_category}} {{#object_category}} ?object (rdf:type?/rdfs:subClassOf*) {{{object_category}}} . {{/object_category}} } """ def __init__(self, source_graph: nx.MultiDiGraph = None): super().__init__(source_graph) raise NotImplementedError("This class has not yet been implemented.")
[docs]class RedSparqlTransformer(SparqlTransformer): """ Transformer for communicating with Data2Services Knowledge Graph, a.k.a. Translator Red KG. """ count_query = """ PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> PREFIX owl: <http://www.w3.org/2002/07/owl#> PREFIX bl: <http://w3id.org/biolink/vocab/> SELECT (COUNT(*) AS ?triples) WHERE { {{#predicate}} ?predicate rdfs:subPropertyOf* {{{predicate}}} . {{/predicate}} {{#subject_category}} ?subject (rdf:type?/rdfs:subClassOf*) {{{subject_category}}} . {{/subject_category}} {{#object_category}} ?object (rdf:type?/rdfs:subClassOf*) {{{object_category}}} . {{/object_category}} ?a rdf:type {{{association}}} ; bl:subject ?subject ; bl:relation ?predicate ; bl:object ?object ; ?edge_property_key ?edge_property_value . } """ edge_query = """ PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> PREFIX owl: <http://www.w3.org/2002/07/owl#> PREFIX bl: <http://w3id.org/biolink/vocab/> SELECT ?subject ?predicate ?object ?edge_property_key ?edge_property_value WHERE { {{#predicate}} ?predicate rdfs:subPropertyOf* {{{predicate}}} . {{/predicate}} {{#subject_category}} ?subject (rdf:type?/rdfs:subClassOf*) {{{subject_category}}} . {{/subject_category}} {{#object_category}} ?object (rdf:type?/rdfs:subClassOf*) {{{object_category}}} . {{/object_category}} ?a rdf:type {{{association}}} ; bl:subject ?subject ; bl:relation ?predicate ; bl:object ?object ; ?edge_property_key ?edge_property_value . } ORDER BY ?subject ?predicate ?object OFFSET {{offset}} LIMIT {{limit}} """ get_node_properties_query = """ PREFIX bl: <http://w3id.org/biolink/vocab/> SELECT ?subject ?predicate ?object WHERE {{ ?subject ?predicate ?object VALUES ?subject {{ {curie_list} }} }} """ IS_DEFINED_BY = "Team Red" def __init__(self, source_graph: nx.MultiDiGraph = None, url: str ='http://graphdb.dumontierlab.com/repositories/ncats-red-kg'): super().__init__(source_graph, url) self.rdfgraph = rdflib.Graph()
[docs] def load_networkx_graph(self, rdfgraph: rdflib.Graph = None, predicates: Set[URIRef] = None, **kwargs: Dict) -> None: """ Fetch all triples using the specified predicates and add them to networkx.MultiDiGraph. Parameters ---------- rdfgraph: rdflib.Graph A rdflib Graph (unused) predicates: set A set containing predicates in rdflib.URIRef form kwargs: dict Any additional arguments. Ex: specifying 'limit' argument will limit the number of triples fetched. """ for predicate in predicates: sparql = SPARQLWrapper(self.url) association = '<{}>'.format(predicate) query = render(self.count_query, {'association': association}) logging.debug(query) sparql.setQuery(query) sparql.setReturnFormat(JSON) results = sparql.query().convert() count = int(results['results']['bindings'][0]['triples']['value']) logging.info("Expected triples for query: {}".format(count)) step = 1000 start = 0 for i in range(step, count + step, step): end = i query = render(self.edge_query, {'association': association, 'offset': start, 'limit':step}) sparql.setQuery(query) logging.debug("Fetching triples with predicate {}".format(predicate)) results = sparql.query().convert() node_list = set() for r in results['results']['bindings']: node_list.add("<{}>".format(r['subject']['value'])) node_list.add("<{}>".format(r['object']['value'])) start = end self.load_nodes(node_list) for r in results['results']['bindings']: s = r['subject']['value'] p = r['predicate']['value'] o = r['object']['value'] self.add_edge(s, o, p) # TODO: preserve edge properties if 'limit' in kwargs and i > kwargs['limit']: break self.categorize()
[docs] def categorize(self) -> None: """ Checks for a node's category property and assigns a category from BioLink Model. TODO: categorize for edges? """ for n, data in self.graph.nodes(data=True): if 'category' not in data and 'type' in data: data['category'] = data['type'].replace('biolink:', '')
[docs] def load_nodes(self, node_set: Set) -> None: """ Load nodes into networkx.MultiDiGraph. This method queries the SPARQL endpoint for all triples where nodes in the node_set is a subject. Parameters ---------- node_set: list A list of node CURIEs """ node_generator = self._grouper(node_set, 10000) nodes = next(node_generator, None) while nodes is not None: logging.info("Fetching properties for {} nodes".format(len(nodes))) nodes = filter(None, nodes) # TODO: is there a better way to fetch node properties? query = self.get_node_properties_query.format(curie_list=' '.join(nodes)) logging.info(query) sparql = SPARQLWrapper(self.url) sparql.setRequestMethod(POSTDIRECTLY) sparql.setMethod("POST") sparql.setQuery(query) sparql.setReturnFormat(JSON) node_results = sparql.query().convert() d = {} for r in node_results['results']['bindings']: if r['object']['type'] != 'bnode': subject = r['subject']['value'] object = r['object']['value'] predicate = r['predicate']['value'] if predicate.startswith('bl:'): predicate = predicate.split(':')[1] if subject not in d: d[subject] = {} d[subject][predicate] = object for node, attr_dict in d.items(): for key, value in attr_dict.items(): self.add_node_attribute(node, key=key, value=value) d.clear() nodes = next(node_generator, None)
@staticmethod def _grouper(iterable: Set, n, fillvalue: str = None) -> Generator: """ Collect data into fixed-length chunks. Parameters ---------- iterable: set A set to group n: int Size of a chunk fillvalue: str When chunking, if the last chunk contains less than n then what value to use to fill the missing values """ yield from zip_longest(*[iter(iterable)] * n, fillvalue=fillvalue)