# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
#
# Author: Nikolaus C. Awtrey
#
"""
Diagram Generation
=========================================================================
The :mod:`~kda.diagrams` module contains code to generate partial diagrams
(undirected spanning trees), directional diagrams, and flux diagrams using the
diagram method developed by King and Altman :footcite:`king_schematic_1956`
and Hill :footcite:`hill_studies_1966`.
.. autofunction:: enumerate_partial_diagrams
.. autofunction:: generate_partial_diagrams
.. autofunction:: generate_directional_diagrams
.. autofunction:: generate_flux_diagrams
.. autofunction:: generate_all_flux_diagrams
References
==========
.. footbibliography::
"""
import functools
import itertools
import copy
import numpy as np
import networkx as nx
from kda import graph_utils
def _find_unique_edges(G):
"""
Creates list of unique edges for input diagram ``G``. Effectively removes
duplicate edges such as ``(1, 0)`` from ``[(0, 1), (1, 0)]``.
Parameters
----------
G : ``NetworkX.MultiDiGraph``
Input diagram
"""
# since non-directional graphs cannot contain forward/reverse edges,
# simply create a simple graph and collect its (unique) set of edges
G_temp = nx.Graph()
G_temp.add_edges_from(G.edges())
return list(G_temp.edges())
def _combine(x, y):
"""
Used to reduce list of dictionaries into single dictionary where the keys
are the indices of states, and the values are lists of neighbors for each
state.
"""
x.update(y)
return x
def _get_neighbor_dict(target, unique_edges):
"""
Recursively constructs dictionary containing neighbor connectivity
information for a set of target states in a diagram.
Parameters
----------
target : int
Index of target state
unique_edges : ndarray
Array of edges (made from 2-tuples) that are unique
to the diagram, (e.g. ``[[0, 1], [1, 2], ...]``).
Returns
-------
Dictionary of directional connections, where node
indices are mapped to a list of their respective
neighbor node indices (e.g. ``{0: [1, 5], 1: [2], ...}``).
"""
# get the indices for each edge pair that contains the target state
adj_idx = np.nonzero(unique_edges == target)[0]
# collect the edges that contain the target state
adj_edges = unique_edges[adj_idx]
# from the adjacent edges, get the neighbors of the target state
neighbors = adj_edges[np.nonzero(adj_edges != target)]
# if there are neighbors, continue
if neighbors.size:
# get the list of all possible indices
all_idx = np.arange(unique_edges.shape[0])
# get the indices for the edges that are
# not connected to the target state
nonadj_mask = np.ones(all_idx.size, dtype=bool)
nonadj_mask[adj_idx] = False
nonadj_idx = all_idx[nonadj_mask]
# collect the edges that do not contain the target state
nonadj_edges = unique_edges[nonadj_idx]
# get the unique neighbors
neighbors = np.unique(neighbors)
# recursively generate a dictionary of all connections
con_dict = functools.reduce(
_combine,
[{target: neighbors}]
+ [_get_neighbor_dict(i, nonadj_edges) for i in neighbors],
)
return con_dict
else:
# if there are no neighbors, return empty dictionary
return {}
def _get_flux_path_edges(target, unique_edges):
"""
Constructs edges for all paths leading to a target
state using input collection of unique edge tuples.
Parameters
----------
target : int
Target state.
unique_edges : array
Array of edges (made from 2-tuples) that are unique to the
diagram (e.g. ``(1, 2)`` and ``(2, 1)`` are considered the same).
Returns
-------
path_edges : list
List of edge tuples (e.g. ``[(0, 1, 0), (1, 2, 0), ...]``).
"""
neighbors = _get_neighbor_dict(target, unique_edges)
path_edges = [(nbr, tgt, 0) for tgt in neighbors for nbr in neighbors[tgt]]
return list(set(path_edges))
def _construct_cycle_edges(cycle):
"""
Constucts edge tuples in a cycle using the node indices in the cycle. It
is important for the cycle to be in the correct order and not sorted, as
a sorted cycle list will return incorrect edges.
Parameters
----------
cycle : list of int
List of node indices for cycle of interest, index zero.
Returns
-------
cycle_edges : list of tuples
List of edge tuples corresponding to the input cycle.
"""
# slice cycle to generate edge tuples using consecutive nodes
cycle_edges = list(zip(cycle[:-1], cycle[1:], np.zeros(len(cycle), dtype=int)))
# append tuple connecting first/last nodes in input cycle
cycle_edges.append((cycle[-1], cycle[0], 0))
return cycle_edges
def _find_unique_uncommon_edges(G, cycle_edges):
"""
Collects the set of unique non-cycle edges for a cycle in the input diagram.
Parameters
----------
G : ``NetworkX.MultiDiGraph``
A kinetic diagram
cycle_edges : list of tuples
List of edge tuples for a cycle of interest. Both forward and
reverse edges should be included (e.g. ``(1, 0)`` and ``(0, 1)``).
Returns
-------
edges : list of tuples
List of uncommon edges between ``G`` and ``cycle_edges``.
Since these should be unique edges (no reverse edges), these are the
unique uncommon edges between two diagrams (normal use case).
"""
G_temp = nx.MultiDiGraph()
G_temp.add_edges_from(G.edges())
G_temp.remove_edges_from(cycle_edges)
edges = _find_unique_edges(G_temp)
return edges
def _flux_edge_conditions(edge_list, n_flux_edges):
"""
Conditions that need to be true for flux edges to be valid.
Parameters
----------
edge_list : list of tuples
List of edges (tuples) to be checked. Common cases that need to be
filtered out are where half the edges are the same but reversed, or
there are simply too many edges to be a flux diagram.
n_flux_edges : int
Number of unique edges in given flux diagram. Defined as the difference
between the number of nodes in the diagram of interest and the number of
unique cycle edges in the cycle of interest.
"""
# count the edges
n_input_edges = len(edge_list)
# check if the number of input edges are the
# correct number of flux diagram edges
if n_input_edges == n_flux_edges:
# sort the list of edges into arrays of increasing order
sorted_edges = np.sort(edge_list)[:, 1:]
# count how many of the sorted edges are unique
n_unique_edges = len(set(map(tuple, sorted_edges)))
if n_unique_edges == n_input_edges:
# if there are no redundant edges, return True
return True
return False
def _append_reverse_edges(edge_list):
"""
Returns a list that contains original edges and reverse edges.
Parameters
----------
edge_list : list of edge tuples
List of unidirectional edges to have reverse edges appended to.
Returns
-------
new_edge_list : list of edge tuples
List of edge tuples with both forward and reverse edges.
"""
new_edge_list = [(e[1], e[0], e[2]) for e in edge_list]
new_edge_list.extend(edge_list)
return new_edge_list
def _get_cofactor_matrix(K_laplace):
"""
Helper function for :func:`~kda.diagrams.enumerate_partial_diagrams()`.
Uses singular value decomposition to get the cofactor matrix for
the input Laplacian matrix.
Parameters
----------
K_laplace : ndarray
``NxN`` Laplacian matrix, where ``N`` is the number of nodes.
Returns
-------
K_cof : ndarray
Cofactor matrix for the input Laplacian matrix.
"""
U, w, Vt = np.linalg.svd(K_laplace)
N = len(w)
g = np.tile(w, N)
g[:: (N + 1)] = 1
G = np.diag(-((-1) ** N) * np.prod(np.reshape(g, (N, N)), 1))
K_cof = U @ G @ Vt
K_cof = np.asarray(np.round(K_cof, decimals=0), dtype=int)
return K_cof
[docs]
def enumerate_partial_diagrams(G):
"""
Quantifies the number of partial diagrams (undirected spanning
trees) that can be generated from a kinetic diagram.
Parameters
----------
G : ``NetworkX.MultiDiGraph``
A kinetic diagram
Returns
-------
n_partials : int
The number of unique partial diagrams (spanning trees) that can be
generated from a graph represented by the input rate matrix.
Notes
-----
This implements Kirchhoff's matrix theroem
:footcite:`chakraborty_algorithms_2019` by generating the adjacency
matrix from the input diagram, generating the Laplacian matrix
from the adjacency matrix, then getting the cofactor matrix of
the Laplacian matrix. All cofactors are equal and equal to the
number of undirected spanning trees.
A more sophistocated version of this function is available in
the ``NetworkX`` library :footcite:`hagberg_exploring_2008`
(see `here <https://networkx.org/documentation/stable/reference/
algorithms/generated/networkx.algorithms.tree.mst.
number_of_spanning_trees.html>`_).
"""
# get the adjacency matrix for K
K_adj = nx.to_numpy_array(G)
# use the adjacency matrix to generate the Laplacian matrix
# multiply through by -1
K_laplace = -1 * K_adj
# now assign the degree of the ith node to the ith diagonal value
# NOTE: the degree of each node should be the sum of each row or column
# in the adjacency matrix
for i in range(len(K_adj)):
K_laplace[i, i] = np.sum(K_adj[i])
# get the cofactor matrix
K_cof = _get_cofactor_matrix(K_laplace=K_laplace)
# check that all values in the cofactor matrix are the same by
# checking if minimum and maximum values are equal
assert K_cof.min() == K_cof.max()
# just take the first value from the cofactor matrix since they are all
# the same value
n_partials = np.abs(K_cof[0][0])
return n_partials
[docs]
def generate_partial_diagrams(G, return_edges=False):
"""
Generates all partial diagrams (undirected spanning trees)
for a kinetic diagram.
Parameters
----------
G : ``NetworkX.MultiDiGraph``
A kinetic diagram
return_edges : bool
Binary used for determining whether to return ``NetworkX``
diagram objects (primarily for plotting) or the edge
tuples (generally for calculations).
Returns
-------
partials : ndarray of ``NetworkX.Graph``
Array of ``NetworkX.Graph`` where each graph is a unique
partial diagram with no loops (``return_edges=False``), or
a nested array of unique edges for valid partial diagrams
(``return_edges=True``).
"""
# calculate number of edges needed for each partial diagram
n_edges = G.number_of_nodes() - 1
# collect the nodes for the partial diagrams
base_nodes = G.nodes()
# get the unique edges in G
unique_edges = _find_unique_edges(G)
# calculate the number of expected partial diagrams
n_partials = enumerate_partial_diagrams(G)
# preallocate arrays for storing partial diagrams edges/graphs
# and initialize a counter
i = 0
if return_edges:
partials = np.empty((n_partials, n_edges, 2), dtype=np.int32)
else:
partials = np.empty(n_partials, dtype=object)
# get list of possible combinations of unique edges (N choose N-1)
# and iterate over each unique combination
for edge_list in itertools.combinations(unique_edges, n_edges):
# partial diagrams must span all nodes in the kinetic diagram
# check if edges include all nodes in the base diagram
# before continuing
nodes_in_edges = set([item for edge in edge_list for item in edge])
nodes_span_edges = set(base_nodes) == nodes_in_edges
# filter out cases where edges don't span all nodes
if nodes_span_edges:
# make a base partial graph
partial = nx.Graph()
partial.add_nodes_from(base_nodes)
partial.add_edges_from(edge_list)
# spanning trees must have `N-1` edges that span all nodes
# and are connected. We already have `N-1` edges that
# span all nodes in `G`, just check if they are connected
if nx.is_connected(partial):
if return_edges:
partials[i, :] = edge_list
else:
partials[i] = partial
i += 1
return partials
[docs]
def generate_directional_diagrams(G, return_edges=False):
"""
Generates all directional diagrams for a kinetic diagram
using depth-first-search algorithm.
Parameters
----------
G : ``NetworkX.MultiDiGraph``
A kinetic diagram
return_edges : bool
Binary used for determining whether to return ``NetworkX`` graph
objects (primarily for plotting) or the edge tuples (generally
for calculations).
Returns
-------
directional_diagrams : ndarray or ndarray of ``NetworkX.DiGraph``
Array of all directional diagram edges made from 3-tuples
(``return_edges=True``) or array of all directional
diagrams (``return_edges=False``) for ``G``.
"""
partial_diagrams = generate_partial_diagrams(G, return_edges=False)
n_states = G.number_of_nodes()
n_partials = len(partial_diagrams)
n_dir_diags = n_states * n_partials
if return_edges:
directional_diagrams = np.empty((n_dir_diags, n_states - 1, 3), dtype=np.int32)
else:
directional_diagrams = np.empty((n_dir_diags,), dtype=object)
# get the set of target nodes in ascending order
# so all directional diagrams for each state are
# generated in order
targets = np.sort(list(G.nodes))
for i, target in enumerate(targets):
for j, G_partial in enumerate(partial_diagrams):
# apply depth-first-search to partial diagram to create
# a directed spanning tree where the edges are directed
# from the target node to the leaf nodes
G_dfs = nx.dfs_tree(G_partial, source=target)
if return_edges:
# collect the edges from the directed spanning tree
# and reverse the direction of the edges to get the correct
# edges for a directional diagram
dir_edges = np.fliplr(np.asarray(G_dfs.edges(), dtype=np.int32))
# add in the zero column for now
# TODO: change downstream functions so we
# don't have to keep these unnecessary zeros
dir_edges = np.column_stack((dir_edges, np.zeros(dir_edges.shape[0])))
directional_diagrams[j + i*n_partials] = dir_edges
else:
# make a copy of the `nx.DiGraph` with reversed
# edges to get the directional diagram
G_directional = G_dfs.reverse(copy=True)
# set "is_target" to False for all nodes
nx.set_node_attributes(G_directional, False, "is_target")
# set target node to True
G_directional.nodes[target]["is_target"] = True
# add to array of directional diagrams
directional_diagrams[j + i*n_partials] = G_directional
return directional_diagrams
[docs]
def generate_flux_diagrams(G, cycle):
"""
Generates all flux diagrams for a specific cycle in the kinetic diagram.
Parameters
----------
G : ``NetworkX.MultiDiGraph``
A kinetic diagram
cycle : list of int
List of node indices for cycle of interest, index zero. Order of node
indices does not matter.
Returns
-------
flux_diagrams : list of ``NetworkX.MultiDiGraph``
List of flux diagrams. Diagrams contain the input cycle
where remaining edges follow path pointing to ``cycle``.
Cycle nodes are labeled by attribute ``'is_target'``.
"""
if sorted(cycle) == sorted(G.nodes):
print(
f"Cycle {cycle} contains all nodes in G. No flux diagrams generated."
)
return None
# get the edge tuples created by the input cycle
cycle_edges = _construct_cycle_edges(cycle)
cycle_edges = _append_reverse_edges(cycle_edges)
# get edges that are uncommon between cycle and G
non_cycle_edges = _find_unique_uncommon_edges(G, cycle_edges)
# number of non-cycle edges in flux diagram
n_non_cycle_edges = G.number_of_nodes() - len(cycle)
# generate all combinations of valid edges
# TODO: generates too many edge lists: some create cycles, some
# use both forward and reverse edges
flux_diagrams = []
for edge_list in itertools.combinations(non_cycle_edges, r=n_non_cycle_edges):
# convert each edge list into numpy array
edge_list = np.asarray(edge_list, dtype=int)
# collect the directional edges
dir_edges = []
for target in cycle:
path_edges = _get_flux_path_edges(target, edge_list)
dir_edges.extend(path_edges)
if _flux_edge_conditions(dir_edges, n_non_cycle_edges):
# initialize a graph object
flux_diag = nx.MultiDiGraph()
# add cycle edges to list of directional edges
dir_edges.extend(cycle_edges)
# add all edges to flux diagram
flux_diag.add_edges_from(dir_edges)
# set "is_target" to False for all nodes
nx.set_node_attributes(flux_diag, False, "is_target")
for target in cycle:
# for nodes in cycle, mark True
flux_diag.nodes[target]["is_target"] = True
# append valid flux diagram to list
flux_diagrams.append(flux_diag)
return flux_diagrams
[docs]
def generate_all_flux_diagrams(G):
"""
Generates all flux diagrams for a kinetic diagram.
Parameters
----------
G : ``NetworkX.MultiDiGraph``
A kinetic diagram
Returns
-------
all_flux_diagrams : list of lists of ``NetworkX.MultiDiGraph``
List of lists of flux diagrams, where each list
is for a different cycle in ``G``.
"""
all_cycles = graph_utils.find_all_unique_cycles(G)
n_nodes = G.number_of_nodes()
all_flux_diagrams = []
for cycle in all_cycles:
if len(cycle) == n_nodes:
# for all-node cycles just append None since
# they cannot have any flux diagrams
all_flux_diagrams.append(None)
continue
flux_diagrams = generate_flux_diagrams(G, cycle)
all_flux_diagrams.append(flux_diagrams)
return all_flux_diagrams