#!/usr/bin/env python3
"""General node of the assembly graph."""
import abc
import typing
from cutcutcodec.core.classes.stream import Stream
[docs]
class Node(abc.ABC):
"""General node of the assembly graph.
Parameters
----------
copy : boolean, default=True
If False, try to avoid realocation as much as possible.
By default, the modifications are not inplace (readonly).
in_streams : tuple[Stream, ...]
The streams arriving on the node (readonly).
out_streams : tuple[Stream, ...]
The streams coming back on the node (readonly).
"""
def __init__(
self,
in_streams: typing.Union[list[Stream], tuple[Stream]],
out_streams: typing.Union[list[Stream], tuple[Stream]],
):
assert isinstance(in_streams, (list, tuple)), in_streams.__class__.__name__
in_streams = tuple(in_streams)
assert all(isinstance(stream, Stream) for stream in in_streams), \
[stream.__class__.__name__ for stream in in_streams]
assert isinstance(out_streams, (list, tuple)), out_streams.__class__.__name__
out_streams = tuple(out_streams)
assert all(isinstance(stream, Stream) for stream in out_streams), \
[stream.__class__.__name__ for stream in out_streams]
self._in_streams = in_streams
self._out_streams = out_streams
self._copy = getattr(self, "_copy", True)
def __eq__(self, other) -> bool:
"""Check that 2 nodes are equivalent."""
if self.__class__ != other.__class__:
return False
if self.getstate() != other.getstate():
return False
if self.in_streams != other.in_streams:
return False # not compare out_streams to avoide infinite loop
return True
def __getstate__(self) -> tuple[typing.Iterable[Stream], dict]:
"""Allow ``pickle`` to serialize efficiently."""
return self.in_streams, self.getstate()
def __setstate__(self, streams_state) -> None:
"""Allow ``pickle`` to recreate the object identically.
Parameters
----------
streams_state : tuple[typing.Iterable[cutcutcodec.core.classes.stream.Stream], dict]
These are the input streams and the other arguments.
"""
assert isinstance(streams_state, tuple), streams_state.__class__.__name__
assert len(streams_state) == 2, streams_state
in_streams, state = streams_state
self.setstate(in_streams, state)
@abc.abstractmethod
def _getstate(self) -> dict:
"""Help for ``cutcutcodec.core.classes.node.Node.getstate``."""
raise NotImplementedError
@abc.abstractmethod
def _setstate(self, in_streams: typing.Iterable[Stream], state: dict) -> None:
"""Help for ``cutcutcodec.core.classes.node.Node.setstate``."""
raise NotImplementedError
@property
def copy(self) -> bool:
"""Try to avoid realocation as much as possible if False."""
return self._copy
[docs]
def getstate(self) -> dict:
"""Retrieve the internal state of the object.
Returns
-------
state : dict
An explicit dictionary containing the different attributes of the object.
Even if this is not optimal in terms of memory and speed,
the objects must be explicit so that they can be easily manipulated
when the assembly graph is optimized.
The keys must be of type str and the dictionary must to be jsonisable.
"""
state = self._getstate()
if self._copy is False:
state.update({"copy": False})
return state
[docs]
def in_index(self, stream: Stream) -> int:
"""Return the input index of the stream comming on this node."""
indexs = [index for index, stream_ in enumerate(self._in_streams) if stream_ is stream]
if len(indexs) != 1:
raise AttributeError(f"the stream {stream} doesn't comme into the node {self}")
return indexs.pop()
[docs]
def in_select(self, type_: str) -> tuple[Stream, ...]:
"""Select the incoming streams of a specific type."""
assert isinstance(type_, str), type_.__class__.__name__
return tuple(s for s in self._in_streams if s.type == type_)
@property
def in_streams(self) -> tuple[Stream, ...]:
"""Return the incoming streams on the node."""
return self._in_streams
[docs]
def out_index(self, stream: Stream) -> int:
"""Return the output index of the stream escaping this node."""
indexs = [index for index, stream_ in enumerate(self._out_streams) if stream_ is stream]
if len(indexs) != 1:
raise AttributeError(f"the stream {stream} doesn't leave from the node {self}")
return indexs.pop()
[docs]
def out_select(self, type_: str) -> tuple[Stream, ...]:
"""Select the outgoing streams of a specific type."""
assert isinstance(type_, str), type_.__class__.__name__
return tuple(s for s in self._out_streams if s.type == type_)
@property
def out_streams(self) -> tuple[Stream, ...]:
"""Return the outgoing streams on the node."""
return self._out_streams
[docs]
def setstate(self, in_streams: typing.Iterable[Stream], state: dict) -> None:
"""Allow to completely change the internal state of the node.
Parameters
----------
in_streams
Same as ``cutcutcodec.core.classes.stream.Stream.in_streams``.
state : dict
The internal state returned by the ``getstate`` method of the same class.
"""
state = state.copy()
if (copy := state.pop("copy", None)) is not None:
self._copy = copy
self._setstate(in_streams, state)