"""
Defines an ORM for clingraphs using clorm
"""
import logging
from jinja2 import Template
import clorm
from clorm import Predicate, RawField, ComplexTerm, refine_field, ConstantField, Raw
from clorm import FactBase as ClormFactBase
from clingo.symbol import Function, String
from .exceptions import InvalidSyntax
from .utils import pythonify_symbol, stringify_symbol
log = logging.getLogger('custom')
if hasattr(clorm.orm.symbols_facts, 'NonFactError'):
NonFactError = clorm.orm.symbols_facts.NonFactError # NOLINT
else:
NonFactError = NotImplementedError
if hasattr(clorm.orm.symbols_facts, 'FactParserError'):
FactParserError = clorm.orm.symbols_facts.FactParserError # NOLINT
else:
FactParserError = NotImplementedError
class AttrID(ComplexTerm):
# pylint: disable=missing-class-docstring
attr_name = ConstantField
attr_variable = RawField
attr_key = RawField
class Meta:
is_tuple = True
class AttrIDSugar(ComplexTerm):
# pylint: disable=missing-class-docstring
attr_name = ConstantField
attr_variable = RawField
class Meta:
is_tuple = True
ElementType = refine_field(ConstantField,
["graph", "node", "edge", "graph_nodes", "graph_edges"])
[docs]class Factbase():
"""
Stores facts that are accepted by clingraphs syntax.
It performs a preprocessing of the facts to unify them, and
uses clorm as ORM to store and query the facts.
"""
# pylint: disable=too-many-instance-attributes
[docs] def __init__(self, prefix: str = "", default_graph:str ="default"):
"""
Defines the factbase behavior based on the prefix for the predicates and
the name of the deafult graph
Args:
prefix (str): The prefix to all predicate names
default_graph (str): Name of the default graph,
all elements with arity 1 will be assigned to this graph
"""
# pylint: disable=missing-class-docstring
class Graph(Predicate):
id = RawField
class Meta:
name = prefix+"graph"
class SubGraph(Predicate):
id = RawField
graph = RawField
class Meta:
name = prefix+"graph"
class Node(Predicate):
id = RawField
graph = RawField
class Meta:
name = prefix+"node"
class Edge(Predicate):
id = RawField
graph = RawField
class Meta:
name = prefix+"edge"
class Attr(Predicate):
element_type = ElementType
element_id = RawField
attr_id = AttrID.Field
attr_value = RawField
class Meta:
name = prefix+"attr"
class AttrSugarSimple(Predicate):
element_type = ElementType
element_id = RawField
attr_id = ConstantField
attr_value = RawField
class Meta:
name = prefix+"attr"
class AttrSugarDouble(Predicate):
element_type = ElementType
element_id = RawField
attr_id = AttrIDSugar.Field
attr_value = RawField
class Meta:
name = prefix+"attr"
class NodeSugar(Predicate):
id = RawField
class Meta:
name = prefix+"node"
class EdgeSugar(Predicate):
id = RawField
class Meta:
name = prefix+"edge"
# pylint: disable=invalid-name
self.Graph = Graph
self.SubGraph = SubGraph
self.Node = Node
self.Edge = Edge
self.Attr = Attr
self.NodeSugar = NodeSugar
self.EdgeSugar = EdgeSugar
self.AttrSugarSimple = AttrSugarSimple
self.AttrSugarDouble = AttrSugarDouble
self.default_graph = default_graph
self.fb = ClormFactBase(indexes=[Attr.element_id])
self.prefix = prefix
[docs] @classmethod
def from_string(cls, string, prefix: str = "", default_graph:str ="default" ):
"""
Creates a :py:class:`Factbase` from a string
Args:
string (str): A string consisting of only facts, divided by a ``.``
prefix (str): The prefix to all predicate names
default_graph (str): Name of the default graph,
all elements with arity 1 will be assigned to this graph
Raises:
:py:class:`InvalidSyntax`: If the input are not facts
"""
fb = cls(prefix, default_graph)
fb.add_fact_string(string)
return fb
[docs] @classmethod
def from_model(cls, model, prefix: str = "", default_graph:str ="default" ):
"""
Creates a :py:class:`Factbase` from a clingo model
Args:
model (clingo.Model): A model returned by clingo
prefix (str): The prefix to all predicate names
default_graph (str): Name of the default graph,
all elements with arity 1 will be assigned to this graph
"""
fb = cls(prefix, default_graph)
fb.add_model(model)
return fb
def __str__(self):
"""
Returns the current set of facts as a string
"""
return self.fb.asp_str()
@property
def _unifiers(self):
"""
The list of all unifiers
"""
main_unifiers = [self.Graph, self.SubGraph,
self.Node, self.Edge, self.Attr]
sugar_unifiers = [self.NodeSugar, self.EdgeSugar,
self.AttrSugarSimple, self.AttrSugarDouble]
return main_unifiers+sugar_unifiers
def _get_element_class(self, element_type):
"""
Obtains an element class for a type given as a string
Args:
element_type (str): graph, edge or node
"""
if element_type == "edge":
return self.Edge
if element_type == "node":
return self.Node
if element_type == "graph":
return self.Graph
raise ValueError("Invalid element type")
[docs] def add_fact_string(self, program):
"""
Adds a string containing facts to the :py:class:`Factbase`
Args:
program (str): A string consisting of only facts, divided by a ``.``
Raises:
:py:class:`InvalidSyntax`: If the input are not facts
"""
#pylint: disable=duplicate-except
try:
fb = clorm.parse_fact_string(program, self._unifiers,raise_nonfact=True)
self.add_fb(fb)
except NonFactError as e:
msg = "The input string contains a complex structure that is not a fact."
raise InvalidSyntax(msg,str(e)) from None
except FactParserError as e:
msg = "The input string contains a complex structure that is not a fact."
raise InvalidSyntax(msg,str(e)) from None
except RuntimeError as e:
msg = "Syntactic error the input string can't be read as facts. \n" + program
raise InvalidSyntax(msg,str(e)) from None
[docs] def add_fact_file(self, file):
"""
Adds a file containing facts to the :py:class:`Factbase`
Args:
file (str): The path to the file
Raises:
:py:class:`InvalidSyntax`: If the input are not facts
"""
#pylint: disable=duplicate-except
try:
fb = clorm.parse_fact_files([file], self._unifiers,raise_nonfact=True)
self.add_fb(fb)
except NonFactError as e:
msg = "The file contains a complex structure that is not a fact."
raise InvalidSyntax(msg,str(e)) from None
except FactParserError as e:
msg = "The input file contains a complex structure that is not a fact."
raise InvalidSyntax(msg,str(e)) from None
except RuntimeError as e:
msg = "Syntactic error the file, can't be read as facts."
raise InvalidSyntax(msg,str(e)) from None
[docs] def add_model(self, model):
"""
Adds a clingo model to the :py:class:`Factbase`
Args:
model (clingo.Model): A model returned by clingo
"""
symbols = model.symbols(atoms=True, shown=True)
fb = clorm.unify(self._unifiers, symbols)
self.add_fb(fb)
[docs] def add_fb(self, fb):
"""
Adds a clorm fact base to the :py:class:`Factbase`
"""
processed_fb = self._desugar(fb)
self.fb = self.fb.union(processed_fb)
def _desugar(self, fb):
"""
Desugar factbase
- for each node(ID) add node(ID,default) same for edge
- replace attr(E,ID,Name,Val) with attr(E,ID,(Name,-1),Val)
"""
q = fb.query(self.AttrSugarSimple)
for attr in set(q.all()):
name = attr.attr_id
var = String("__")
key = String("__")
new_attr_id = AttrID(attr_name=name,
attr_variable=Raw(var),
attr_key=Raw(key))
e = self.Attr(element_type=attr.element_type,
element_id=attr.element_id,
attr_value=attr.attr_value,
attr_id=new_attr_id)
fb.remove(attr)
fb.add(e)
q = fb.query(self.AttrSugarDouble)
for attr in set(q.all()):
# print(attr)
attr_id = attr.attr_id
name = attr_id.attr_name
var = attr_id.attr_variable
key = String("__")
# print((name,var,key))
new_attr_id = AttrID(attr_name=name,
attr_variable=var,
attr_key=Raw(key))
e = self.Attr(element_type=attr.element_type,
element_id=attr.element_id,
attr_value=attr.attr_value,
attr_id=new_attr_id)
# print(e)
# print()
fb.remove(attr)
fb.add(e)
basic_element_classes = [
(self.NodeSugar, self.Node), (self.EdgeSugar, self.Edge)]
using_default = False
for C_Sugar, C in basic_element_classes:
q = fb.query(C_Sugar)
for node in set(q.all()):
using_default = True
e = C(id=node.id,
graph=Raw(Function(self.default_graph)))
fb.remove(node)
fb.add(e)
if using_default:
fb.add(self.Graph(id=Raw(Function(self.default_graph))))
return fb
[docs] def get_facts(self):
"""
Gets the facts in the factbase after preprocessing as a string
Returns:
(`str`) A string with the facts
"""
return self.fb.asp_str()
[docs] def get_all_graphs(self):
"""
Gets a list if the identifiers for all the graphs
Returns:
(`list`) A list with the identifiers for all the graphs
"""
q = self.fb.query(self.Graph).select(self.Graph.id)
graph_ids = list(q.all())
if len(graph_ids) == 0:
log.warning("No graphs were defined in the code. Perhaps a missing `graph` predicate.")
q = self.fb.query(self.SubGraph).select(self.SubGraph.id)
graph_ids = graph_ids+list(q.all())
return graph_ids
[docs] def get_parent_graph(self, graph_id):
"""
Gets the parent graph for a given graph_id.
Args:
graph_id: Identifier of the subgraph
Returns:
The identifier of the parent graph or None if there is no parent
"""
q = self.fb.query(self.SubGraph).where(self.SubGraph.id == graph_id)
q = q.select(self.SubGraph.graph)
if len(list(q.all())) == 0:
return None
return list(q.all())[0]
[docs] def get_graph_global_element_attr(self, element_type, graph_id):
"""
Gets the attributes for a global element: graph_nodes or graph_edges.
Args:
element_type (str): The element type: ``edge`` or ``node``
graph_id: Identifier of the graph
Returns:
(`dic`) A dictionary with attribute names as key and attribute values as values.
"""
full_element_type = f"graph_{element_type}s"
return self.get_element_attr(full_element_type, graph_id)
[docs] def get_graph_elements(self, element_type, graph_id):
"""
Gets the list of elements for a graph
Args:
element_type (str): The element type: ``edge`` or ``node``
graph_id: Identifier of the graph
Returns:
(`list`) The list of elements that belong to the graph
"""
C = self._get_element_class(element_type)
q = self.fb.query(C).where(C.graph == graph_id).select(C.id)
return list(q.all())
[docs] def get_element_attr(self, element_type, element_id):
"""
Gets the attributes a specific element
Returns a dictionary where the keys are attribute name and values are
attribute values.
Args:
element_type (str): The element type: ``graph``, ``edge`` or ``node``
element_id: Identifier of the element
Returns:
(`dic`) A dictionary with attribute names as key and attribute values as values.
"""
q = self.fb.query(self.Attr)
q = q.where(self.Attr.element_type == element_type,
self.Attr.element_id == element_id)
# pylint: disable=no-member
q = q.group_by(self.Attr.attr_id.attr_name)
q = q.select(self.Attr.attr_id.attr_variable, self.Attr.attr_id.attr_key, self.Attr.attr_value)
attrs = {}
for name, list_opts in q.all():
custom_template = False
template = "{% for k,v in data| dictsort %}{{v}}{% endfor %}"
data = {}
for var, key, val in list_opts:
var = stringify_symbol(var.symbol)
val = pythonify_symbol(val.symbol)
key = pythonify_symbol(key.symbol)
is_template = var=="__"
if is_template:
if custom_template:
template = template + str(val)
else:
template = str(val)
custom_template= True
continue
is_dict = key!="__"
if is_dict:
if var not in data:
data[var]={}
if key in data[var]:
log.warning("Entry (%s,%s,%s) repeated on element %s. Duplicates will be ignored",name,var,key,element_id)
data[var][key]= val
continue
if var in data:
log.warning("Entry (%s,%s) repeated on element %s. Duplicates will be ignored",name,var,element_id)
data[var]=val
if isinstance(template, str):
log.debug("Formatting template %s with data %s",template,data)
s = Template(template).render(data,data = data)
else:
s = template
attrs[str(name)] = str(s)
if str(name)=='texlbl': #Used for latex
attrs[str(name)] = attrs[str(name)].replace('\\\\','\\')
return attrs