from collections.abc import Hashable
import copy
import hashlib
import itertools
import os
import random
import re
import sys
import tempfile
import unicodedata
import networkx as nx
from colomoto.types import HypercubeCollection
from colomoto_jupyter import import_colomoto_tool
from colomoto_jupyter.io import ensure_localfile
from colomoto_jupyter.sessionfiles import new_output_file
from colomoto_jupyter import IN_IPYTHON, jupyter_setup
import boolean
import boolean.boolean as bpy
re_nonword = re.compile(r"\W", flags=re.A)
if IN_IPYTHON:
jupyter_setup("minibn", label="miniBN")
[docs]class NOT(bpy.NOT):
def __init__(self, *args):
super().__init__(*args)
self.operator = "!"
class _TRUE(bpy._TRUE):
def __call__(self, **kw):
return self if not kw else True
class _FALSE(bpy._FALSE):
def __call__(self, **kw):
return self if not kw else False
[docs]def is_constant(f):
return isinstance(f, (bpy._TRUE,bpy._FALSE))
[docs]class BaseNetwork(dict):
def __init__(self, data=None, Symbol_class=boolean.Symbol,
allowed_in_name=('.','_',':','-'), **kwargs):
super().__init__()
self.ba = boolean.BooleanAlgebra(NOT_class=NOT,
TRUE_class=_TRUE,
FALSE_class=_FALSE,
Symbol_class=Symbol_class,
allowed_in_token=allowed_in_name)
if data:
if isinstance(data, str):
if "\n" in data or not os.path.exists(data):
self.import_data(data.splitlines())
else:
with open(data) as fp:
self.import_data(fp)
elif isinstance(data, dict):
for a, f in data.items():
self[a] = f
else:
imported = False
def biolqm_import(biolqm, lqm):
bnfile = new_output_file(self.biolqm_format)
assert biolqm.save(lqm, bnfile)
with open(bnfile) as fp:
self.import_data(fp)
return True
if "biolqm" in sys.modules:
biolqm = sys.modules["biolqm"]
if biolqm.is_biolqm_object(data):
imported = biolqm_import(biolqm, data)
if not imported and "ginsim" in sys.modules:
ginsim = sys.modules["ginsim"]
if ginsim.is_ginsim_object(data):
biolqm = import_colomoto_tool("biolqm")
imported = biolqm_import(biolqm, ginsim.to_biolqm(data))
if not imported:
self.import_data(data)
for a, f in kwargs.items():
self[a] = f
[docs] @classmethod
def auto_cast(celf, obj):
if isinstance(obj, celf):
return obj
return celf(obj)
[docs] def import_data(self, data):
raise NotImplementedError
[docs] def source(self, sep):
raise NotImplementedError
def __repr__(self):
return self.source(sep=" <- ")
[docs] @classmethod
def load(celf, filename, **kwargs):
f = celf(**kwargs)
filename = ensure_localfile(filename)
with open(filename) as data:
f.import_data(data)
return f
[docs] def v(self, name):
return self.ba.symbols(name)[0]
[docs] def vars(self, *names):
return self.ba.symbols(*names)
def _autokey(self, a):
if isinstance(a, self.ba.Symbol):
a = a.obj
return a
def _autobool(self, expr):
if isinstance(expr, bool):
return self.ba.TRUE if expr else self.ba.FALSE
elif isinstance(expr, int):
return self.ba.TRUE if expr > 0 else self.ba.FALSE
return expr
def _normalize_tr(self, tr):
ntr = {}
for k, v in tr.items():
if isinstance(v, str):
v = self.ba.parse(v)
if not isinstance(k, self.ba.Symbol):
k = self.v(k)
ntr[k] = self._autobool(v)
return ntr
def __call__(self, cfg):
tr = self._normalize_tr(cfg)
def _autostate(expr):
if expr == self.ba.TRUE:
return 1
elif expr == self.ba.FALSE:
return 0
return expr
return {a: _autostate(self[a].subs(tr).simplify()) for a in self}
[docs] def zero(self):
return {a:0 for a in self}
[docs] def rewrite(self, a, tr, simplify=True):
tr = self._normalize_tr(tr)
self[a] = self[a].subs(tr, simplify=simplify)
def __setitem__(self, a, f):
if isinstance(f, str):
f = self.ba.parse(f)
f = self._autobool(f)
return super().__setitem__(self._autokey(a), f)
def __getitem__(self, a):
return super().__getitem__(self._autokey(a))
[docs] def copy(self):
bn = copy.copy(self)
bn.ba = self.ba
return bn
def _quick_rename(self, name, newname):
if newname == name:
return
assert newname not in self, "Node {} already exists!".format(newname)
self[newname] = self[name]
del self[name]
[docs] def rename(self, a, b):
self._quick_rename(a, b)
for n in self:
self.rewrite(n, {a:b}, simplify=False)
[docs] def sanitize_names(self):
def _sanitize_name(name):
sname = unicodedata.normalize("NFD", name)
sname = re_nonword.sub("_", sname)
if sname != name:
basesname = sname
tag = 2
while sname in self:
sname = f"{basesname}_{tag}"
tag += 1
self._quick_rename(name, sname)
return sname
names = list(sorted(self.keys()))
tr = {}
btr = {}
for name in names:
sname = _sanitize_name(name)
if sname:
tr[name] = sname
btr[self.v(name)] = self.v(sname)
if btr:
for a in self:
self.rewrite(a, btr, simplify=False)
return tr
biolqm_format = None
[docs] def to_biolqm(self):
bnfile = new_output_file(ext=self.biolqm_format)
with open(bnfile, "w") as f:
f.write(self.source())
biolqm = import_colomoto_tool("biolqm")
return biolqm.load(bnfile)
[docs]def simplify_dnf(ba, f):
def is_wellformed_dnf(f):
pos, neg = set(), set()
def is_lit(f):
if isinstance(f, ba.Symbol):
pos.add(f.obj)
return True
elif isinstance(f, ba.NOT) \
and isinstance(f.args[0], ba.Symbol):
neg.add(f.args[0].obj)
return True
return False
def is_clause(f):
if is_lit(f):
return True
if isinstance(f, ba.AND):
for g in f.args:
if not is_lit(g):
return False
return True
return False
if f is ba.TRUE or f is ba.FALSE:
return True, set()
if is_clause(f):
return True, pos.intersection(neg)
if isinstance(f, ba.OR):
for g in f.args:
if not is_clause(g):
return False, None
return True, pos.intersection(neg)
return False, None
is_dnf, suspects = is_wellformed_dnf(f)
if is_dnf and suspects:
return ba.dnf(ba.cnf(f))
return f
[docs]def struct_of_dnf(ba, f, container=frozenset, sort=False):
def make_lit(l):
if isinstance(l, ba.NOT):
return (l.args[0].obj, False)
else:
return (l.obj, True)
def make_clause(c):
if isinstance(c, ba.AND):
lits = c.args
else:
lits = [c]
lits = map(make_lit, lits)
return container(sorted(lits) if sort else lits)
if f is ba.TRUE:
return True
elif f is ba.FALSE:
return False
if not isinstance(f, ba.OR):
clauses = [f]
else:
clauses = f.args
clauses = map(make_clause, clauses)
return container(sorted(clauses) if sort else clauses)
[docs]class BooleanNetwork(BaseNetwork):
biolqm_format = "bnet"
[docs] def source(self, sep=", "):
buf = ""
for a, f in sorted(self.items()):
if f is self.ba.TRUE:
f = 1
elif f is self.ba.FALSE:
f = 0
buf += "{}{}{}\n".format(a, sep, f)
return buf
[docs] def import_data(self, data):
header = None
for line in data:
line = line.split("#")[0].strip()
if not line:
continue
sep = line.find("<-")
if sep < 0:
comma = line.index(",")
left = line[:comma].strip()
right = line[comma+1:].strip()
else:
left = line[:sep].strip()
right = line[sep+2:].strip()
if header is None and (left, right) == ("targets", "factors"):
header = True
continue
self[left] = self.ba.parse(right)
[docs] def simplify(self, in_place=False, suspect_dnf=True):
"""
if in_place, modifies the network in place, otherwise works on a copy
returns it.
if suspect_dnf, try harder simplifications for functions that are in DNF and have literals with
opposite signs in clauses (will make CNF and DNF transformations).
"""
bn = self if in_place else copy.copy(self)
for a, f in bn.items():
f = f.simplify()
if suspect_dnf:
f = simplify_dnf(self.ba, f)
bn[a] = f
return bn if not in_place else None
[docs] def as_dnf(self):
def make_lit(l):
if isinstance(l, self.ba.NOT):
return (l.args[0].obj, False)
else:
return (l.obj, True)
def make_clause(c):
if isinstance(c, self.ba.AND):
lits = c.args
else:
lits = [c]
return list(map(make_lit, lits))
def make_dnf(f):
if f is self.ba.TRUE:
return True
elif f is self.ba.FALSE:
return False
clauses = self.ba.dnf(f)
if not isinstance(clauses, self.ba.OR):
clauses = [clauses]
else:
clauses = clauses.args
return list(map(make_clause, clauses))
return {i: make_dnf(f) for (i,f) in self.items()}
[docs] def make_hash(self, simplify=False):
"""
compute a hash for the BN based on its DNF representation
"""
D = []
for i, f in sorted(self.items()):
if f not in [self.ba.TRUE, self.ba.FALSE]:
f = self.ba.dnf(f)
if simplify:
f = simplify_dnf(self.ba, f)
struct = struct_of_dnf(self.ba, f, container=tuple, sort=True)
D.append((i,struct))
return hashlib.md5(str(D).encode()).hexdigest()
[docs] def influence_graph(self):
import networkx as nx
influences = set()
ig = nx.MultiDiGraph()
for a, f in self.items():
ig.add_node(a)
for lit in f.simplify().literalize().get_literals():
if isinstance(lit, boolean.NOT):
b = lit.args[0].obj
sign = -1
else:
b = lit.obj
sign = 1
influences.add((b,a,sign))
for (b, a, sign) in influences:
ig.add_edge(b, a, sign=sign, label="+" if sign > 0 else "-")
return ig
[docs] def constants(self):
return {i: f for i,f in self.items() if is_constant(f)}
[docs] def propagate_constants(self):
"""
For each node having a constant function, replace references to that
node by the constant in the expression of all the other nodes.
Performs simple function simplification.
Modifies the Boolean network in-place.
The set of constant nodes can be accessed with the `constants()` method.
"""
csts = {self.v(i): self.ba.TRUE if bool(f) else self.ba.FALSE
for i, f in self.constants().items()}
while csts:
new_csts = {}
for a, fa, in self.items():
if not is_constant(fa):
fa = fa.subs(csts).simplify()
self[a] = fa
if is_constant(fa):
new_csts[self.v(a)] = fa
csts = new_csts
[docs] def to_pint(self):
pypint = import_colomoto_tool("pypint")
from pypint.converters import import_minibn
return import_minibn(self)
[docs] def to_pyboolnet(self):
PyBoolNet = import_colomoto_tool("PyBoolNet")
fd, bnetfile = tempfile.mkstemp(".bnet")
try:
with os.fdopen(fd, "w") as fp:
fp.write(self.source())
return PyBoolNet.FileExchange.bnet2primes(bnetfile)
finally:
os.unlink(bnetfile)
[docs] def dynamics(self, update_mode="asynchronous", init=None, loops=None):
"""
Returns a directed graph (`networkx.DiGraph` object) of the dynamics
with the `update_mode`.
:param update_mode: either `"asynchronous"` (or equivalently
`"fully-asynchronous"`), `"synchronous"` (or equivalently
`"parallel"`), `"general"`.
Alternatively, it can be a function returning an
:class:`.UpdateModeDynamics` object.
:param dict[str,int] init: Optional initial state from which the
dynamics is computed.
"""
if isinstance(update_mode, str):
if update_mode in ["asynchronous", "fully-asynchronous"]:
update_mode = FullyAsynchronousDynamics
elif update_mode == "general":
update_mode = GeneralAsynchronousDynamics
elif update_mode in ["synchronous", "parallel"]:
update_mode = SynchronousDynamics
else:
raise ValueError(f"Unknown update mode {update_mode}")
opts = {}
if loops is not None:
opts["loops"] = loops
update_mode = update_mode(self, **opts)
if init:
return update_mode.partial_dynamics(init)
else:
return update_mode.dynamics()
[docs]class MVVar(boolean.Symbol):
def __init__(self, obj):
if isinstance(obj, str):
parts = obj.split(":")
if len(parts) > 1:
try:
level = int(parts[-1])
obj = (":".join(parts[:-1]), level)
except ValueError:
pass
super().__init__(obj)
[docs] def is_instanciated(self):
return isinstance(self.obj, tuple)
[docs] def nodevar(self):
if self.is_instanciated():
return self.obj[0]
return self
[docs] def level(self):
assert self.is_instanciated()
return self.obj[1]
def __str__(self):
if self.is_instanciated():
return "{}:{}".format(*self.obj)
else:
return "{}".format(self.obj)
def __truediv__(self, i):
assert not self.is_instanciated() and isinstance(i, int), repr(self.obj)
return self.Symbol((self.obj, i))
def __lt__(a, b):
assert isinstance(a, MVVar)
if isinstance(b, MVVar):
def getobj(mv):
return mv.obj if mv.is_instanciated() else (mv.obj, 1)
return getobj(a) < getobj(b)
if isinstance(b, int):
return ~(a / b)
return False
def __le__(a, b):
assert isinstance(a, MVVar) and isinstance(b, int)
return ~(a/(b+1))
def __gt__(a, b):
assert isinstance(a, MVVar) and isinstance(b, int)
return a/(b+1)
def __ge__(a, b):
assert isinstance(a, MVVar) and isinstance(b, int)
return a/b
def __eq__(a, b):
if isinstance(a, MVVar) and isinstance(b, int):
return (a/b) & ~(a/(b+1))
return super().__eq__(b)
def __ne__(a, b):
if isinstance(a, MVVar) and isinstance(b, int):
return ~(a/b) | (a/(b+1))
return super().__eq__(b)
def __hash__(self):
return super().__hash__()
[docs]class MultiValuedNetwork(BaseNetwork):
biolqm_format = "mnet"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, Symbol_class=MVVar)
def _normalize(self, a, spec):
(va,) = self.vars(a)
if isinstance(spec, dict):
return [(va/i, f) for i, f in sorted(spec.items())]
elif isinstance(spec, boolean.Expression):
return [(va, spec)]
return spec
[docs] def simplify(self):
mn = copy.copy(self)
for a, spec in self.items():
if isinstance(spec, dict):
mn[a] = {i: f.simplify() for i,f in spec.items()}
elif isinstance(spec, boolean.Expression):
mn[a] = spec.simplify()
else: # list
mn[a] = [(i,f.simplify()) for i,f in spec]
return mn
[docs] def rewrite(self, a, tr):
tr = self._normalize_tr(tr)
k = self._autokey(a.nodevar())
spec = self[k]
def _rewrite(expr):
return expr.subs(tr).simplify()
if isinstance(spec, boolean.Expression):
super().rewrite(k, tr)
elif isinstance(spec, dict):
if a.is_instanciated():
spec[a.level()] = _rewrite(spec[a.level()])
else:
for k, f in spec.items():
spec[k] = _rewrite(f)
else:
if a.is_instanciated():
def d_rewrite(df):
(d,f) = df
if d == a:
f = _rewrite(f)
return (d,f)
else:
def d_rewrite(df):
(d,f) = df
return (d, _rewrite(f))
self[k] = list(map(d_rewrite, self[k]))
[docs] def append(self, a, f):
if isinstance(a, str):
(a,) = self.vars(a)
if isinstance(f, str):
f = self.ba.parse(f)
k = self._autokey(a.nodevar())
self[k] = self._normalize(k, self[k]) if k in self else []
self[k].append((a, f))
[docs] def source(self, sep=" <- "):
buf = ""
for a, f in sorted(self.items()):
for d, g in self._normalize(a, f):
buf += "{}{}{}\n".format(d, sep, g)
return buf
[docs] def import_data(self, data):
for line in data:
line = line.strip()
if not line:
continue
left, right = line.split("<-")
self.append(left.strip(), right.strip())
[docs] def influence_graph(self):
import networkx as nx
influences = set()
ig = nx.MultiDiGraph()
for a, f in self.items():
ig.add_node(a)
for d, spec in self._normalize(a, f):
for lit in spec.simplify().literalize().get_literals():
if isinstance(lit, boolean.NOT):
b = lit.args[0]
sign = -1
else:
b = lit
sign = 1
b = self._autokey(b.nodevar())
influences.add((b,a,sign))
for (b, a, sign) in influences:
ig.add_edge(b, a, sign=sign, label="+" if sign > 0 else "-")
return ig
class _Run(object):
def __init__(self, model, init, k):
"""
Run at most `k` steps of an execution of given `model` from initial
configuration `init`.
Stops at fixpoints.
"""
if not isinstance(model, BooleanNetwork):
raise TypeError("Only BooleanNetwork objects are supported")
self.model = model
self.init = init
self.k = k
def select_for_update(self, nodes):
"""
Return the sub-sequence of `nodes` to actually update
"""
raise NotImplementedError
def __iter__(self):
cur = self.init.copy()
yield cur.copy()
for i in range(self.k):
target = self.model(cur)
update = [a for a,i in target.items() if cur[a] != i]
if not update:
return
update = self.select_for_update(update)
for a in update:
cur[a] = target[a]
yield cur.copy()
class _RandomRun(_Run):
def __init__(self, model, init, k, seed=None):
super().__init__(model, init, k)
self.random = random.Random(seed)
[docs]class SyncRun(_Run):
"""
Synchronous update run
"""
[docs] def select_for_update(self, nodes):
return nodes
[docs]class FAsyncRun(_RandomRun):
"""
Fully-asynchronous update run
"""
[docs] def select_for_update(self, nodes):
return (self.random.choice(nodes),)
[docs]class GAsyncRun(_RandomRun):
"""
(General) asynchronous update run
"""
[docs] def select_for_update(self, nodes):
k = len(nodes)
mask = 0
while not mask:
mask = self.random.getrandbits(k)
mask = "{0:b}".format(mask).rjust(k, "0")
return [a for (a,sel) in zip(nodes, mask) if sel == "1"]
[docs]class UpdateModeDynamics(object):
"""
Abstract class for the updating mode of a BooleanNetwork object
"""
def __init__(self, model, loops=False):
if not isinstance(model, BooleanNetwork):
raise TypeError("Only BooleanNetwork objects are supported")
self.model = model
self.nodes = tuple(model)
self.n = len(self.nodes)
self.loops = loops
def __call__(self, x):
"""
Sub-classes have to implement this method which should return an
iterator (or list) over the states following `x`.
"""
raise NotImplementedError
[docs] def push(self, d, x):
def fmt(z):
return "".join([str(z[i]) for i in self.nodes])
rx = fmt(x)
d.add_node(rx)
children = list(self(x))
for y in children:
ry = fmt(y)
if rx != ry or self.loops:
d.add_edge(rx, ry)
return children
[docs] def random_walk(self, init, steps=0, stop_condition=None, stop_at=None):
if stop_at:
stop_at = HypercubeCollection.cast(stop_at)
user_stop = stop_condition
def stop_condition(x):
if user_stop is not None and user_stop(x):
return True
return stop_at.match_state(x)
x = init
yield x
i = 0
while True:
i += 1
nexts = list(self(x))
if not nexts or (len(nexts) == 1 and nexts[0] == x):
return
x = random.choice(nexts)
yield x
if i == steps:
break
if stop_condition is not None and stop_condition(x):
break
[docs] def dynamics(self):
d = nx.DiGraph()
x = {i: 0 for i in self.nodes}
self.push(d, x)
even = True
right = -1
for m in range(1,2**self.n):
if even:
x[self.nodes[0]] = 1-x[self.nodes[0]]
for i in range(self.n):
if x[self.nodes[i]]:
right = i
break
else:
x[self.nodes[right+1]] = 1-x[self.nodes[right+1]]
if x[self.nodes[right+1]]:
right = right+1
even = not even
self.push(d, x)
return d
[docs] def partial_dynamics(self, init):
d = nx.DiGraph()
def m(x):
return tuple([x[i] for i in self.nodes])
init = {i: int(init[i]) for i in self.nodes}
todo = {m(init)}
done = set()
while todo:
mx = todo.pop()
x = dict(zip(self.nodes, mx))
done.add(mx)
for y in self.push(d, x):
my = m(y)
if my not in done:
todo.add(my)
return d
[docs]class ElementaryUpdateModeDynamics(UpdateModeDynamics):
def __init__(self, model, min_u, max_u, **opts):
super().__init__(model, **opts)
self.min_u = min_u
self.max_u = max_u
def __call__(self, x):
z = self.model(x)
for k in range(self.min_u, self.max_u+1):
for I in itertools.combinations(self.nodes, k):
y = x.copy()
mod = False or self.loops
for i in I:
if z[i] != y[i]:
mod = True
y[i] = z[i]
if mod:
yield y
[docs]class FullyAsynchronousDynamics(ElementaryUpdateModeDynamics):
def __init__(self, model, **opts):
super().__init__(model, 1, 1, **opts)
[docs]class GeneralAsynchronousDynamics(ElementaryUpdateModeDynamics):
def __init__(self, model, **opts):
n = len(model)
super().__init__(model, 1, n, **opts)
[docs]class SynchronousDynamics(ElementaryUpdateModeDynamics):
def __init__(self, model, loops=True, **opts):
n = len(model)
super().__init__(model, n, n, loops=loops, **opts)
ParallelDynamics = SynchronousDynamics
[docs]class PeriodicDynamics(UpdateModeDynamics):
"""
Periodic (deterministic) updating mode.
It is parameterized by a sequence of sets of nodes to update simultaneously
in order to compute the next configuration.
"""
def __init__(self, sequence, model, loops=True):
super().__init__(model, loops=loops)
# allow for elements of the sequence to be directly a node
def magic(I):
if isinstance(I, Hashable) and I in model:
return {I}
return I
self.sequence = tuple(map(magic, sequence))
def __call__(self, x):
y = x.copy()
for I in self.sequence:
z = self.model(y)
for i in I:
y[i] = z[i]
yield y
[docs]class BlockSequentialDynamics(PeriodicDynamics):
pass
[docs]class SequentialDynamics(BlockSequentialDynamics):
def __init__(self, sequence, model, **opts):
super().__init__([(i,) for i in sequence], model, **opts)
[docs]class BlockParallelDynamics(BlockSequentialDynamics):
def __init__(self, spec, model, **opts):
def magic(I):
if isinstance(I, Hashable) and I in model:
return (I,)
return I
spec = [magic(I) for I in spec]
l = max(map(len,spec))
def m(seq):
ls = len(seq)
return l//ls + (1 if l%ls else 0)
spec = [seq*m(seq) for seq in spec]
sequence = [{seq[i] for seq in spec if seq[i] is not None} \
for i in range(l)]
super().__init__(sequence, model, **opts)