import logging
import os
import subprocess
import tempfile
import pandas as pd
[docs]def multivalue_merge(a,b):
if a == b:
return a
if a == "*" or b == "*":
return "*"
mv = set()
for x in [a,b]:
if isinstance(x,int):
mv.add(x)
else:
mv.update(x)
return tuple(mv)
[docs]class PartialState(dict):
[docs] def match_state(self, s):
for k, v in self.items():
if v != "*" and s[k] != v:
return False
return True
match_partial_state = match_state
[docs] def project(self, keys):
return dict([(k,v) for k,v in self.items() if k in keys])
[docs] def as_dataframe(self):
return pd.DataFrame([self])
[docs]class State(PartialState):
[docs] def count(self):
"""
Returns number of states represented by this object (1)
"""
return 1
[docs] def simplify(self):
return self
[docs]class Hypercube(dict):
[docs] def extend(self, ts):
if not isinstance(ts, Hypercube):
ts = Hypercube(ts)
return HypercubeCollection([self, ts])
[docs] def match_partial_state(self, ps):
for k, v in ps.items():
av = self.get(k)
if av != "*" and av != v:
return False
return True
match_state = match_partial_state
[docs] def project(self, keys):
return dict([(k,v) for k,v in self.items() if k in keys])
[docs] def count(self):
"""
Returns number of states represented by this object
"""
return 2**len([v for v in self.values() if not isinstance(v,int)])
[docs] def simplify(self):
"""
Returns a :py:class:`.State` object if there is no free component in
this hypercube, *self* otherwise.
"""
if self.is_single_state:
return State(self)
return self
[docs] def as_dataframe(self):
return pd.DataFrame([self])
@property
def is_single_state(self):
for v in self.values():
if not isinstance(v,int):
return False
return True
[docs]class HypercubeCollection(list):
[docs] def extend(self, ts):
if not isinstance(ts, Hypercube):
ts = Hypercube(ts)
return self.__class__(self+[ts])
[docs] def match_partial_state(self, ps):
for tsa in self:
if tsa.match_partial_state(ps):
return True
return False
match_state = match_partial_state
[docs] def project(self, keys):
p = self[0].project(keys)
for tp in self[1:]:
for k,v in tp.project(keys).items():
p[k] = multivalue_merge(p[k], v)
return p
[docs] def count(self):
"""
Returns number of states represented by this object
"""
return sum([h.count() for h in self])
[docs] def simplify(self):
"""
Warning: supports only Boolean states
"""
if len(self) == 1:
return self[0].simplify()
try:
subprocess.run(["espresso", "-h"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except FileNotFoundError:
logging.warning("The espresso tool is not installed, skipping simplification.\nConsider executing `python -m espresso_setup`.")
return self
nodes = list(self[0])
fd, inp = tempfile.mkstemp(prefix="colomoto-espresso")
try:
with os.fdopen(fd, "w") as fp:
fp.write(f".i {len(nodes)}\n.o 1\n")
for h in self:
fp.write("".join([str(h[n]).replace("*","-") for n in nodes]))
fp.write(" 1\n")
fp.write(".e\n")
esp = subprocess.run(["espresso", inp], capture_output=True,
encoding="ascii")
if esp.returncode == 0:
hs = [dict(zip(nodes, list(l.split(" ")[0].replace("-", "*"))))
for l in esp.stdout.split("\n")[3:-2]]
hs = [Hypercube(h).simplify() for h in hs]
if len(hs) > 1:
return self.__class__(hs)
else:
return hs[0]
finally:
os.unlink(inp)
return self
[docs] def as_dataframe(self):
return pd.DataFrame(self)
@property
def is_single_state(self):
return False
[docs] @classmethod
def cast(celf, states):
if isinstance(states, (celf, Hypercube, PartialState)):
return states
if isinstance(states, dict):
vals = set(states.values())
if "*" in vals:
return Hypercube(states)
if vals.difference([0,1]):
raise NotImplementedError("Use '*' to indicate free values")
return PartialState(states)
if isinstance(states, list):
return celf([celf.cast(s) for s in states])
raise TypeError(f"Don't know how to cast {type(states)}")
# Deprecated
[docs]class TrapSpaceAttractor(Hypercube):
pass
[docs]class TrapSpacesAttractor(HypercubeCollection):
pass