Source code for clingraph.orm

"""
Defines an ORM for clingraphs using clorm
"""

import logging

import clorm
from clingo.symbol import Function, String
from clorm import ComplexTerm, ConstantField
from clorm import FactBase as ClormFactBase
from clorm import Predicate, Raw, RawField, refine_field
from jinja2 import Template

from .exceptions import InvalidSyntax
from .utils import pythonify_symbol, stringify_symbol

log = logging.getLogger("custom")

# pylint: disable=abstract-method
# pylint: disable=too-few-public-methods

if hasattr(clorm.orm.symbols_facts, "NonFactError"):
    NonFactError = clorm.orm.symbols_facts.NonFactError  # NOLINT
else:
    NonFactError = NotImplementedError

if hasattr(clorm.orm.symbols_facts, "FactParserError"):
    FactParserError = clorm.orm.symbols_facts.FactParserError  # NOLINT
else:
    FactParserError = NotImplementedError


class AttrID(ComplexTerm):
    # pylint: disable=missing-class-docstring
    attr_name = ConstantField
    attr_variable = RawField
    attr_key = RawField

    class Meta:
        is_tuple = True


class AttrIDSugar(ComplexTerm):
    # pylint: disable=missing-class-docstring
    attr_name = ConstantField
    attr_variable = RawField

    class Meta:
        is_tuple = True


ElementType = refine_field(ConstantField, ["graph", "node", "edge", "graph_nodes", "graph_edges"])


[docs]class Factbase: """ Stores facts that are accepted by clingraphs syntax. It performs a preprocessing of the facts to unify them, and uses clorm as ORM to store and query the facts. """ # pylint: disable=too-many-instance-attributes
[docs] def __init__(self, prefix: str = "", default_graph: str = "default"): """ Defines the factbase behavior based on the prefix for the predicates and the name of the deafult graph Args: prefix (str): The prefix to all predicate names default_graph (str): Name of the default graph, all elements with arity 1 will be assigned to this graph """ # pylint: disable=missing-class-docstring class Graph(Predicate): id = RawField class Meta: name = prefix + "graph" class SubGraph(Predicate): id = RawField graph = RawField class Meta: name = prefix + "graph" class Node(Predicate): id = RawField graph = RawField class Meta: name = prefix + "node" class Edge(Predicate): id = RawField graph = RawField class Meta: name = prefix + "edge" class Attr(Predicate): element_type = ElementType element_id = RawField attr_id = AttrID.Field attr_value = RawField class Meta: name = prefix + "attr" class AttrSugarSimple(Predicate): element_type = ElementType element_id = RawField attr_id = ConstantField attr_value = RawField class Meta: name = prefix + "attr" class AttrSugarDouble(Predicate): element_type = ElementType element_id = RawField attr_id = AttrIDSugar.Field attr_value = RawField class Meta: name = prefix + "attr" class NodeSugar(Predicate): id = RawField class Meta: name = prefix + "node" class EdgeSugar(Predicate): id = RawField class Meta: name = prefix + "edge" # pylint: disable=invalid-name self.Graph = Graph self.SubGraph = SubGraph self.Node = Node self.Edge = Edge self.Attr = Attr self.NodeSugar = NodeSugar self.EdgeSugar = EdgeSugar self.AttrSugarSimple = AttrSugarSimple self.AttrSugarDouble = AttrSugarDouble self.default_graph = default_graph self.fb = ClormFactBase(indexes=[Attr.element_id]) self.prefix = prefix
[docs] @classmethod def from_string(cls, string, prefix: str = "", default_graph: str = "default"): """ Creates a :py:class:`Factbase` from a string Args: string (str): A string consisting of only facts, divided by a ``.`` prefix (str): The prefix to all predicate names default_graph (str): Name of the default graph, all elements with arity 1 will be assigned to this graph Raises: :py:class:`InvalidSyntax`: If the input are not facts """ fb = cls(prefix, default_graph) fb.add_fact_string(string) return fb
[docs] @classmethod def from_model(cls, model, prefix: str = "", default_graph: str = "default"): """ Creates a :py:class:`Factbase` from a clingo model Args: model (clingo.Model): A model returned by clingo prefix (str): The prefix to all predicate names default_graph (str): Name of the default graph, all elements with arity 1 will be assigned to this graph """ fb = cls(prefix, default_graph) fb.add_model(model) return fb
def __str__(self): """ Returns the current set of facts as a string """ return self.fb.asp_str() @property def _unifiers(self): """ The list of all unifiers """ main_unifiers = [self.Graph, self.SubGraph, self.Node, self.Edge, self.Attr] sugar_unifiers = [ self.NodeSugar, self.EdgeSugar, self.AttrSugarSimple, self.AttrSugarDouble, ] return main_unifiers + sugar_unifiers def _get_element_class(self, element_type): """ Obtains an element class for a type given as a string Args: element_type (str): graph, edge or node """ if element_type == "edge": return self.Edge if element_type == "node": return self.Node if element_type == "graph": return self.Graph raise ValueError("Invalid element type")
[docs] def add_fact_string(self, program): """ Adds a string containing facts to the :py:class:`Factbase` Args: program (str): A string consisting of only facts, divided by a ``.`` Raises: :py:class:`InvalidSyntax`: If the input are not facts """ # pylint: disable=duplicate-except try: fb = clorm.parse_fact_string(program, self._unifiers, raise_nonfact=True) self.add_fb(fb) except NonFactError as e: msg = "The input string contains a complex structure that is not a fact." raise InvalidSyntax(msg, str(e)) from None except FactParserError as e: msg = "The input string contains a complex structure that is not a fact." raise InvalidSyntax(msg, str(e)) from None except RuntimeError as e: msg = "Syntactic error the input string can't be read as facts. \n" + program raise InvalidSyntax(msg, str(e)) from None
[docs] def add_fact_file(self, file): """ Adds a file containing facts to the :py:class:`Factbase` Args: file (str): The path to the file Raises: :py:class:`InvalidSyntax`: If the input are not facts """ # pylint: disable=duplicate-except try: fb = clorm.parse_fact_files([file], self._unifiers, raise_nonfact=True) self.add_fb(fb) except NonFactError as e: msg = "The file contains a complex structure that is not a fact." raise InvalidSyntax(msg, str(e)) from None except FactParserError as e: msg = "The input file contains a complex structure that is not a fact." raise InvalidSyntax(msg, str(e)) from None except RuntimeError as e: msg = "Syntactic error the file, can't be read as facts." raise InvalidSyntax(msg, str(e)) from None
[docs] def add_model(self, model): """ Adds a clingo model to the :py:class:`Factbase` Args: model (clingo.Model): A model returned by clingo """ symbols = model.symbols(atoms=True, shown=True) fb = clorm.unify(self._unifiers, symbols) self.add_fb(fb)
[docs] def add_fb(self, fb): """ Adds a clorm fact base to the :py:class:`Factbase` """ processed_fb = self._desugar(fb) self.fb = self.fb.union(processed_fb)
def _desugar(self, fb): """ Desugar factbase - for each node(ID) add node(ID,default) same for edge - replace attr(E,ID,Name,Val) with attr(E,ID,(Name,-1),Val) """ q = fb.query(self.AttrSugarSimple) for attr in set(q.all()): name = attr.attr_id var = String("__") key = String("__") new_attr_id = AttrID(attr_name=name, attr_variable=Raw(var), attr_key=Raw(key)) e = self.Attr( element_type=attr.element_type, element_id=attr.element_id, attr_value=attr.attr_value, attr_id=new_attr_id, ) fb.remove(attr) fb.add(e) q = fb.query(self.AttrSugarDouble) for attr in set(q.all()): # print(attr) attr_id = attr.attr_id name = attr_id.attr_name var = attr_id.attr_variable key = String("__") # print((name,var,key)) new_attr_id = AttrID(attr_name=name, attr_variable=var, attr_key=Raw(key)) e = self.Attr( element_type=attr.element_type, element_id=attr.element_id, attr_value=attr.attr_value, attr_id=new_attr_id, ) # print(e) # print() fb.remove(attr) fb.add(e) basic_element_classes = [ (self.NodeSugar, self.Node), (self.EdgeSugar, self.Edge), ] using_default = False for C_Sugar, C in basic_element_classes: q = fb.query(C_Sugar) for node in set(q.all()): using_default = True e = C(id=node.id, graph=Raw(Function(self.default_graph))) fb.remove(node) fb.add(e) if using_default: fb.add(self.Graph(id=Raw(Function(self.default_graph)))) return fb
[docs] def get_facts(self): """ Gets the facts in the factbase after preprocessing as a string Returns: (`str`) A string with the facts """ return self.fb.asp_str()
[docs] def get_all_graphs(self): """ Gets a list if the identifiers for all the graphs Returns: (`list`) A list with the identifiers for all the graphs """ q = self.fb.query(self.Graph).select(self.Graph.id) graph_ids = list(q.all()) if len(graph_ids) == 0: log.warning("No graphs were defined in the code. Perhaps a missing `graph` predicate.") q = self.fb.query(self.SubGraph).select(self.SubGraph.id) graph_ids = graph_ids + list(q.all()) return graph_ids
[docs] def get_parent_graph(self, graph_id): """ Gets the parent graph for a given graph_id. Args: graph_id: Identifier of the subgraph Returns: The identifier of the parent graph or None if there is no parent """ q = self.fb.query(self.SubGraph).where(self.SubGraph.id == graph_id) q = q.select(self.SubGraph.graph) if len(list(q.all())) == 0: return None return list(q.all())[0]
[docs] def get_graph_global_element_attr(self, element_type, graph_id): """ Gets the attributes for a global element: graph_nodes or graph_edges. Args: element_type (str): The element type: ``edge`` or ``node`` graph_id: Identifier of the graph Returns: (`dic`) A dictionary with attribute names as key and attribute values as values. """ full_element_type = f"graph_{element_type}s" return self.get_element_attr(full_element_type, graph_id)
[docs] def get_graph_elements(self, element_type, graph_id): """ Gets the list of elements for a graph Args: element_type (str): The element type: ``edge`` or ``node`` graph_id: Identifier of the graph Returns: (`list`) The list of elements that belong to the graph """ C = self._get_element_class(element_type) q = self.fb.query(C).where(C.graph == graph_id).select(C.id) return list(q.all())
[docs] def get_element_attr(self, element_type, element_id): """ Gets the attributes a specific element Returns a dictionary where the keys are attribute name and values are attribute values. Args: element_type (str): The element type: ``graph``, ``edge`` or ``node`` element_id: Identifier of the element Returns: (`dic`) A dictionary with attribute names as key and attribute values as values. """ q = self.fb.query(self.Attr) q = q.where(self.Attr.element_type == element_type, self.Attr.element_id == element_id) # pylint: disable=no-member q = q.group_by(self.Attr.attr_id.attr_name) q = q.select( self.Attr.attr_id.attr_variable, self.Attr.attr_id.attr_key, self.Attr.attr_value, ) attrs = {} for name, list_opts in q.all(): custom_template = False template = "{% for k,v in data| dictsort %}{{v}}{% endfor %}" data = {} for var, key, val in list_opts: var = stringify_symbol(var.symbol) val = pythonify_symbol(val.symbol) key = pythonify_symbol(key.symbol) is_template = var == "__" if is_template: if custom_template: template = template + str(val) else: template = str(val) custom_template = True continue is_dict = key != "__" if is_dict: if var not in data: data[var] = {} if key in data[var]: log.warning( "Entry (%s,%s,%s) repeated on element %s. Duplicates will be ignored", name, var, key, element_id, ) data[var][key] = val continue if var in data: log.warning( "Entry (%s,%s) repeated on element %s. Duplicates will be ignored", name, var, element_id, ) data[var] = val if isinstance(template, str): # log.debug("Formatting template %s with data %s",template,data) s = Template(template).render(data, data=data) else: s = template attrs[str(name)] = str(s) if str(name) == "texlbl": # Used for latex attrs[str(name)] = attrs[str(name)].replace("\\\\", "\\") return attrs