"""
Defines an ORM for clingraphs using clorm
"""
import logging
import clorm
from clingo.symbol import Function, String
from clorm import ComplexTerm, ConstantField
from clorm import FactBase as ClormFactBase
from clorm import Predicate, Raw, RawField, refine_field
from jinja2 import Template
from .exceptions import InvalidSyntax
from .utils import pythonify_symbol, stringify_symbol
log = logging.getLogger("custom")
# pylint: disable=abstract-method
# pylint: disable=too-few-public-methods
if hasattr(clorm.orm.symbols_facts, "NonFactError"):
NonFactError = clorm.orm.symbols_facts.NonFactError # NOLINT
else:
NonFactError = NotImplementedError
if hasattr(clorm.orm.symbols_facts, "FactParserError"):
FactParserError = clorm.orm.symbols_facts.FactParserError # NOLINT
else:
FactParserError = NotImplementedError
class AttrID(ComplexTerm):
# pylint: disable=missing-class-docstring
attr_name = ConstantField
attr_variable = RawField
attr_key = RawField
class Meta:
is_tuple = True
class AttrIDSugar(ComplexTerm):
# pylint: disable=missing-class-docstring
attr_name = ConstantField
attr_variable = RawField
class Meta:
is_tuple = True
ElementType = refine_field(ConstantField, ["graph", "node", "edge", "graph_nodes", "graph_edges"])
[docs]class Factbase:
"""
Stores facts that are accepted by clingraphs syntax.
It performs a preprocessing of the facts to unify them, and
uses clorm as ORM to store and query the facts.
"""
# pylint: disable=too-many-instance-attributes
[docs] def __init__(self, prefix: str = "", default_graph: str = "default"):
"""
Defines the factbase behavior based on the prefix for the predicates and
the name of the deafult graph
Args:
prefix (str): The prefix to all predicate names
default_graph (str): Name of the default graph,
all elements with arity 1 will be assigned to this graph
"""
# pylint: disable=missing-class-docstring
class Graph(Predicate):
id = RawField
class Meta:
name = prefix + "graph"
class SubGraph(Predicate):
id = RawField
graph = RawField
class Meta:
name = prefix + "graph"
class Node(Predicate):
id = RawField
graph = RawField
class Meta:
name = prefix + "node"
class Edge(Predicate):
id = RawField
graph = RawField
class Meta:
name = prefix + "edge"
class Attr(Predicate):
element_type = ElementType
element_id = RawField
attr_id = AttrID.Field
attr_value = RawField
class Meta:
name = prefix + "attr"
class AttrSugarSimple(Predicate):
element_type = ElementType
element_id = RawField
attr_id = ConstantField
attr_value = RawField
class Meta:
name = prefix + "attr"
class AttrSugarDouble(Predicate):
element_type = ElementType
element_id = RawField
attr_id = AttrIDSugar.Field
attr_value = RawField
class Meta:
name = prefix + "attr"
class NodeSugar(Predicate):
id = RawField
class Meta:
name = prefix + "node"
class EdgeSugar(Predicate):
id = RawField
class Meta:
name = prefix + "edge"
# pylint: disable=invalid-name
self.Graph = Graph
self.SubGraph = SubGraph
self.Node = Node
self.Edge = Edge
self.Attr = Attr
self.NodeSugar = NodeSugar
self.EdgeSugar = EdgeSugar
self.AttrSugarSimple = AttrSugarSimple
self.AttrSugarDouble = AttrSugarDouble
self.default_graph = default_graph
self.fb = ClormFactBase(indexes=[Attr.element_id])
self.prefix = prefix
[docs] @classmethod
def from_string(cls, string, prefix: str = "", default_graph: str = "default"):
"""
Creates a :py:class:`Factbase` from a string
Args:
string (str): A string consisting of only facts, divided by a ``.``
prefix (str): The prefix to all predicate names
default_graph (str): Name of the default graph,
all elements with arity 1 will be assigned to this graph
Raises:
:py:class:`InvalidSyntax`: If the input are not facts
"""
fb = cls(prefix, default_graph)
fb.add_fact_string(string)
return fb
[docs] @classmethod
def from_model(cls, model, prefix: str = "", default_graph: str = "default"):
"""
Creates a :py:class:`Factbase` from a clingo model
Args:
model (clingo.Model): A model returned by clingo
prefix (str): The prefix to all predicate names
default_graph (str): Name of the default graph,
all elements with arity 1 will be assigned to this graph
"""
fb = cls(prefix, default_graph)
fb.add_model(model)
return fb
def __str__(self):
"""
Returns the current set of facts as a string
"""
return self.fb.asp_str()
@property
def _unifiers(self):
"""
The list of all unifiers
"""
main_unifiers = [self.Graph, self.SubGraph, self.Node, self.Edge, self.Attr]
sugar_unifiers = [
self.NodeSugar,
self.EdgeSugar,
self.AttrSugarSimple,
self.AttrSugarDouble,
]
return main_unifiers + sugar_unifiers
def _get_element_class(self, element_type):
"""
Obtains an element class for a type given as a string
Args:
element_type (str): graph, edge or node
"""
if element_type == "edge":
return self.Edge
if element_type == "node":
return self.Node
if element_type == "graph":
return self.Graph
raise ValueError("Invalid element type")
[docs] def add_fact_string(self, program):
"""
Adds a string containing facts to the :py:class:`Factbase`
Args:
program (str): A string consisting of only facts, divided by a ``.``
Raises:
:py:class:`InvalidSyntax`: If the input are not facts
"""
# pylint: disable=duplicate-except
try:
fb = clorm.parse_fact_string(program, self._unifiers, raise_nonfact=True)
self.add_fb(fb)
except NonFactError as e:
msg = "The input string contains a complex structure that is not a fact."
raise InvalidSyntax(msg, str(e)) from None
except FactParserError as e:
msg = "The input string contains a complex structure that is not a fact."
raise InvalidSyntax(msg, str(e)) from None
except RuntimeError as e:
msg = "Syntactic error the input string can't be read as facts. \n" + program
raise InvalidSyntax(msg, str(e)) from None
[docs] def add_fact_file(self, file):
"""
Adds a file containing facts to the :py:class:`Factbase`
Args:
file (str): The path to the file
Raises:
:py:class:`InvalidSyntax`: If the input are not facts
"""
# pylint: disable=duplicate-except
try:
fb = clorm.parse_fact_files([file], self._unifiers, raise_nonfact=True)
self.add_fb(fb)
except NonFactError as e:
msg = "The file contains a complex structure that is not a fact."
raise InvalidSyntax(msg, str(e)) from None
except FactParserError as e:
msg = "The input file contains a complex structure that is not a fact."
raise InvalidSyntax(msg, str(e)) from None
except RuntimeError as e:
msg = "Syntactic error the file, can't be read as facts."
raise InvalidSyntax(msg, str(e)) from None
[docs] def add_model(self, model):
"""
Adds a clingo model to the :py:class:`Factbase`
Args:
model (clingo.Model): A model returned by clingo
"""
symbols = model.symbols(atoms=True, shown=True)
fb = clorm.unify(self._unifiers, symbols)
self.add_fb(fb)
[docs] def add_fb(self, fb):
"""
Adds a clorm fact base to the :py:class:`Factbase`
"""
processed_fb = self._desugar(fb)
self.fb = self.fb.union(processed_fb)
def _desugar(self, fb):
"""
Desugar factbase
- for each node(ID) add node(ID,default) same for edge
- replace attr(E,ID,Name,Val) with attr(E,ID,(Name,-1),Val)
"""
q = fb.query(self.AttrSugarSimple)
for attr in set(q.all()):
name = attr.attr_id
var = String("__")
key = String("__")
new_attr_id = AttrID(attr_name=name, attr_variable=Raw(var), attr_key=Raw(key))
e = self.Attr(
element_type=attr.element_type,
element_id=attr.element_id,
attr_value=attr.attr_value,
attr_id=new_attr_id,
)
fb.remove(attr)
fb.add(e)
q = fb.query(self.AttrSugarDouble)
for attr in set(q.all()):
# print(attr)
attr_id = attr.attr_id
name = attr_id.attr_name
var = attr_id.attr_variable
key = String("__")
# print((name,var,key))
new_attr_id = AttrID(attr_name=name, attr_variable=var, attr_key=Raw(key))
e = self.Attr(
element_type=attr.element_type,
element_id=attr.element_id,
attr_value=attr.attr_value,
attr_id=new_attr_id,
)
# print(e)
# print()
fb.remove(attr)
fb.add(e)
basic_element_classes = [
(self.NodeSugar, self.Node),
(self.EdgeSugar, self.Edge),
]
using_default = False
for C_Sugar, C in basic_element_classes:
q = fb.query(C_Sugar)
for node in set(q.all()):
using_default = True
e = C(id=node.id, graph=Raw(Function(self.default_graph)))
fb.remove(node)
fb.add(e)
if using_default:
fb.add(self.Graph(id=Raw(Function(self.default_graph))))
return fb
[docs] def get_facts(self):
"""
Gets the facts in the factbase after preprocessing as a string
Returns:
(`str`) A string with the facts
"""
return self.fb.asp_str()
[docs] def get_all_graphs(self):
"""
Gets a list if the identifiers for all the graphs
Returns:
(`list`) A list with the identifiers for all the graphs
"""
q = self.fb.query(self.Graph).select(self.Graph.id)
graph_ids = list(q.all())
if len(graph_ids) == 0:
log.warning("No graphs were defined in the code. Perhaps a missing `graph` predicate.")
q = self.fb.query(self.SubGraph).select(self.SubGraph.id)
graph_ids = graph_ids + list(q.all())
return graph_ids
[docs] def get_parent_graph(self, graph_id):
"""
Gets the parent graph for a given graph_id.
Args:
graph_id: Identifier of the subgraph
Returns:
The identifier of the parent graph or None if there is no parent
"""
q = self.fb.query(self.SubGraph).where(self.SubGraph.id == graph_id)
q = q.select(self.SubGraph.graph)
if len(list(q.all())) == 0:
return None
return list(q.all())[0]
[docs] def get_graph_global_element_attr(self, element_type, graph_id):
"""
Gets the attributes for a global element: graph_nodes or graph_edges.
Args:
element_type (str): The element type: ``edge`` or ``node``
graph_id: Identifier of the graph
Returns:
(`dic`) A dictionary with attribute names as key and attribute values as values.
"""
full_element_type = f"graph_{element_type}s"
return self.get_element_attr(full_element_type, graph_id)
[docs] def get_graph_elements(self, element_type, graph_id):
"""
Gets the list of elements for a graph
Args:
element_type (str): The element type: ``edge`` or ``node``
graph_id: Identifier of the graph
Returns:
(`list`) The list of elements that belong to the graph
"""
C = self._get_element_class(element_type)
q = self.fb.query(C).where(C.graph == graph_id).select(C.id)
return list(q.all())
[docs] def get_element_attr(self, element_type, element_id):
"""
Gets the attributes a specific element
Returns a dictionary where the keys are attribute name and values are
attribute values.
Args:
element_type (str): The element type: ``graph``, ``edge`` or ``node``
element_id: Identifier of the element
Returns:
(`dic`) A dictionary with attribute names as key and attribute values as values.
"""
q = self.fb.query(self.Attr)
q = q.where(self.Attr.element_type == element_type, self.Attr.element_id == element_id)
# pylint: disable=no-member
q = q.group_by(self.Attr.attr_id.attr_name)
q = q.select(
self.Attr.attr_id.attr_variable,
self.Attr.attr_id.attr_key,
self.Attr.attr_value,
)
attrs = {}
for name, list_opts in q.all():
custom_template = False
template = "{% for k,v in data| dictsort %}{{v}}{% endfor %}"
data = {}
for var, key, val in list_opts:
var = stringify_symbol(var.symbol)
val = pythonify_symbol(val.symbol)
key = pythonify_symbol(key.symbol)
is_template = var == "__"
if is_template:
if custom_template:
template = template + str(val)
else:
template = str(val)
custom_template = True
continue
is_dict = key != "__"
if is_dict:
if var not in data:
data[var] = {}
if key in data[var]:
log.warning(
"Entry (%s,%s,%s) repeated on element %s. Duplicates will be ignored",
name,
var,
key,
element_id,
)
data[var][key] = val
continue
if var in data:
log.warning(
"Entry (%s,%s) repeated on element %s. Duplicates will be ignored",
name,
var,
element_id,
)
data[var] = val
if isinstance(template, str):
# log.debug("Formatting template %s with data %s",template,data)
s = Template(template).render(data, data=data)
else:
s = template
attrs[str(name)] = str(s)
if str(name) == "texlbl": # Used for latex
attrs[str(name)] = attrs[str(name)].replace("\\\\", "\\")
return attrs