"""Compile an assembly graph into an evaluable tree."""
import typing
import networkx
from cutcutcodec.core.classes.container import ContainerOutput
from cutcutcodec.core.classes.node import Node
from cutcutcodec.core.classes.stream import Stream
from cutcutcodec.core.opti.cache.clean.graph import clean_graph
def _tree_from_node(node_name: str, graph: networkx.MultiDiGraph) -> None:
"""Recursively retrieve the node corresponding to the node of the graph.
Complete the ``tree`` attribute for this node and for the out streams of this node.
This function is recursive, so all ancestors are also completed.
Parameters
----------
node_name : str
The name of the node that allows to determine the corresponding subgraph.
graph : networkx.MultiDiGraph
The complete assembly graph.
Notes
-----
If the node is a terminal node, it is the complete dynamic tree.
"""
assert isinstance(graph, networkx.MultiDiGraph), graph.__class__.__name__
assert isinstance(node_name, str), node_name.__class__.__name__
assert node_name in graph.nodes, sorted(graph.nodes)
# create node and recursively the parent nodes
node = graph.nodes[node_name]
if "tree" not in node["cache"][1]:
for pred in graph.predecessors(node_name):
_tree_from_node(pred, graph) # editing by pointer
in_edges = graph.in_edges(node_name, keys=True)
if sorted(int(k.split("->")[1]) for _, _, k in in_edges) != list(range(len(in_edges))):
raise IndexError(
f"the streams ({in_edges}) arriving on {node_name} are not correctly incremented",
)
in_streams = [
graph.edges[edge_name]["cache"][1]["tree"] for edge_name in
sorted(in_edges, key=lambda src_dst_key: int(src_dst_key[2].split("->")[1]))
] # the streams that arrive on the current node
node["cache"][1]["tree"] = new_node(node["class"], in_streams, node["state"])
assert node["cache"][1]["tree"].in_streams == tuple(in_streams), \
f"the node {node_name} does not have the specified input streams"
# complete out streams
for out_edge_name in graph.out_edges(node_name, keys=True):
out_edge = graph.edges[out_edge_name]
if "tree" not in out_edge["cache"][1]:
index = int(out_edge_name[2].split("->")[0])
assert index < len(node["cache"][1]["tree"].out_streams), (
f"the {out_edge_name[0]} node has only {len(node['cache'][1]['tree'].out_streams)} "
f"output streams, impossible to access stream index {index}"
)
out_edge["cache"][1]["tree"] = node["cache"][1]["tree"].out_streams[index]
[docs]
def graph_to_tree(graph: networkx.MultiDiGraph) -> ContainerOutput:
"""Create the dynamic tree from the assembly graph.
The abstract dynamic tree alows the evaluation of the complete pipeline.
Parameters
----------
graph : networkx.MultiDiGraph
The assembly graph.
Returns
-------
container_out : cutcutcodec.core.classes.container.ContainerOutput
An evaluable multimedia muxer.
Examples
--------
>>> from cutcutcodec.core.classes.container import ContainerOutput
>>> from cutcutcodec.core.compilation.graph_to_tree import graph_to_tree
>>> from cutcutcodec.core.compilation.tree_to_graph import tree_to_graph
>>> from cutcutcodec.core.generation.audio.noise import GeneratorAudioNoise
>>> tree = tree_to_graph(ContainerOutput(GeneratorAudioNoise(0).out_streams))
>>> graph_to_tree(tree) # doctest: +ELLIPSIS
<cutcutcodec.core.classes.container.ContainerOutput object at ...>
>>>
"""
# verification and extraction of the termination node
assert isinstance(graph, networkx.MultiDiGraph), graph.__class__.__name__
out_nodes = [n for n in graph.nodes if issubclass(graph.nodes[n]["class"], ContainerOutput)]
assert len(out_nodes) == 1, f"only one output node is possible, not {len(out_nodes)}"
out_node = out_nodes.pop()
assert issubclass(graph.nodes[out_node]["class"], ContainerOutput), \
graph.nodes[out_node]["class"].__name__
# fill
update_trees(graph) # compute and add missing "tree" nodes
container_out = graph.nodes[out_node]["cache"][1]["tree"]
return container_out
[docs]
def new_node(node_class: type, in_streams: typing.Iterable[Stream], state: dict) -> Node:
"""Instantiate and initialize a new node.
Parameters
----------
node_class : type
The uninstantiated class describing the node to be created.
This class must be inherited from the ``cutcutcodec.core.classes.node.Node`` class.
in_streams : typing.Iterable[Stream]
See ``cutcutcodec.core.classes.node.Node.setstate``.
state : dict
See ``cutcutcodec.core.classes.node.Node.setstate``.
Returns
-------
node : Node
A new instantiated and initialized node.
"""
assert isinstance(node_class, type), f"{node_class} must be a class, not an object"
assert issubclass(node_class, Node), f"{node_class.__name__} class does not inherit from Node"
node = node_class.__new__(node_class)
node.setstate(in_streams, state)
return node
[docs]
def update_trees(graph: networkx.MultiDiGraph) -> None:
"""Update on each node the ``tree`` attribute.
From the assembly graph, this function reconstructs the dynamic instances
and is able to perform the calculations.
By adding to each node the attribute ``tree``,
it allows not only to keep the graph structure but also
to recalculate only the parts that need to be changed.
The operation are applies in-place.
Parameters
----------
graph : networkx.MultiDiGraph
The assembly graph who is going to have the updated ``tree`` attributes.
Examples
--------
>>> from pprint import pprint
>>> from cutcutcodec.core.classes.container import ContainerOutput
>>> from cutcutcodec.core.compilation.graph_to_tree import update_trees
>>> from cutcutcodec.core.compilation.tree_to_graph import tree_to_graph
>>> from cutcutcodec.core.filter.audio.subclip import FilterAudioSubclip
>>> from cutcutcodec.core.generation.audio.noise import GeneratorAudioNoise
>>> container_out = ContainerOutput(
... FilterAudioSubclip(GeneratorAudioNoise(0).out_streams, 1, 2).out_streams
... )
>>> graph = tree_to_graph(container_out)
>>> update_trees(graph)
>>> pprint(dict(graph.nodes("cache"))) # doctest: +ELLIPSIS
{'container_output_1': ('b1c67b749185174850a2d5cdce2bcb82',
{'tree': <cutcutcodec.core.classes.container.ContainerOutput...>}),
'filter_audio_subclip_1': ('ab86b53fe424b58e755eac50d87a77f0',
{'tree': <...FilterAudioSubclip...>}),
'generator_audio_noise_1': ('4dc3de85c734bfd23024c381599bfe3f',
{'tree': <...GeneratorAudioNoise...>})}
>>> pprint(list(graph.edges(keys=True, data=True))) # doctest: +ELLIPSIS
[('filter_audio_subclip_1',
'container_output_1',
'0->0',
{'cache': ('ab86b53fe424b58e755eac50d87a77f0|0',
{'tree': <cutcutcodec.core.filter.audio.cut._StreamAudioCut...>})}),
('generator_audio_noise_1',
'filter_audio_subclip_1',
'0->0',
{'cache': ('4dc3de85c734bfd23024c381599bfe3f|0',
{'tree': <cutcutcodec.core.generation.audio.noise._StreamAudioNoiseUniform...>})})]
>>>
"""
assert isinstance(graph, networkx.MultiDiGraph), graph.__class__.__name__
# delete obsolete cache and create field ["cache"][1]
graph = clean_graph(graph)
# complete graph
out_nodes = [node for node in graph.nodes if graph.out_degree(node) == 0]
assert out_nodes, "the graph is empty or contains cycle"
for node_name in out_nodes:
_tree_from_node(node_name, graph)