"""General node of the assembly graph."""
import abc
import importlib
import typing
from cutcutcodec.core.classes.stream import Stream
[docs]
class Node(abc.ABC):
"""General node of the assembly graph.
Parameters
----------
copy : boolean, default=True
If False, try to avoid realocation as much as possible.
By default, the modifications are not inplace (readonly).
in_streams : tuple[Stream, ...]
The streams arriving on the node (readonly).
out_streams : tuple[Stream, ...]
The streams coming back on the node (readonly).
"""
def __init__(
self,
in_streams: list[Stream] | tuple[Stream],
out_streams: list[Stream] | tuple[Stream],
):
assert isinstance(in_streams, (list, tuple)), in_streams.__class__.__name__
in_streams = tuple(in_streams)
assert all(isinstance(stream, Stream) for stream in in_streams), \
[stream.__class__.__name__ for stream in in_streams]
assert isinstance(out_streams, (list, tuple)), out_streams.__class__.__name__
out_streams = tuple(out_streams)
assert all(isinstance(stream, Stream) for stream in out_streams), \
[stream.__class__.__name__ for stream in out_streams]
self._in_streams = in_streams
self._out_streams = out_streams
self._copy = getattr(self, "_copy", True)
def __eq__(self, other) -> bool:
"""Check that 2 nodes are equivalent."""
if self.__class__ != other.__class__:
return False
if self.getstate() != other.getstate():
return False
if self.in_streams != other.in_streams:
return False # not compare out_streams to avoide infinite loop
return True
def __enter__(self) -> typing.Self:
"""To allow context manager."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit the context manager."""
for stream in self.in_streams:
stream.node.__exit__(exc_type, exc_val, exc_tb)
def __getattr__(self, name: str) -> object:
"""Automates filter import to simplify graph creation syntax."""
class FilterCreator: # pylint: disable=R0903
"""Make the attribute callable."""
def __init__(self, streams, Filter):
"""Memorize the filter and the streams."""
self.streams = streams
self.Filter = Filter
def __call__(self, *args, **kwargs):
"""Create the filter, see help(self.Filter) for the documentation."""
return self.Filter(self.streams, *args, **kwargs)
if name.startswith("apply_"): # apply a filter
parts = name.split("_")[1:]
try:
mod = importlib.import_module(".".join(["cutcutcodec", "core", "filter"] + parts))
except ModuleNotFoundError as err:
raise AttributeError(f"failed to import the filter module {name}") from err
try:
Filter = getattr(mod, f"Filter{''.join(p.title() for p in parts)}")
except AttributeError as err:
raise AttributeError(f"failed to import the filter class {name}") from err
return FilterCreator(self.out_streams, Filter)
raise AttributeError
def __getstate__(self) -> tuple[typing.Iterable[Stream], dict]:
"""Allow ``pickle`` to serialize efficiently."""
return self.in_streams, self.getstate()
def __or__(self, other):
"""Create a FilterIdentity containing the streams of self and other."""
from cutcutcodec.core.filter.identity import FilterIdentity
if isinstance(other, Node):
return FilterIdentity(self.out_streams + other.out_streams)
if isinstance(other, Stream):
return FilterIdentity(self.out_streams + (other,))
return NotImplemented
def __setstate__(self, streams_state) -> None:
"""Allow ``pickle`` to recreate the object identically.
Parameters
----------
streams_state : tuple[typing.Iterable[cutcutcodec.core.classes.stream.Stream], dict]
These are the input streams and the other arguments.
"""
assert isinstance(streams_state, tuple), streams_state.__class__.__name__
assert len(streams_state) == 2, streams_state
in_streams, state = streams_state
self.setstate(in_streams, state)
@abc.abstractmethod
def _getstate(self) -> dict:
"""Help for ``cutcutcodec.core.classes.node.Node.getstate``."""
raise NotImplementedError
@abc.abstractmethod
def _setstate(self, in_streams: typing.Iterable[Stream], state: dict) -> None:
"""Help for ``cutcutcodec.core.classes.node.Node.setstate``."""
raise NotImplementedError
@property
def copy(self) -> bool:
"""Try to avoid realocation as much as possible if False."""
return self._copy
[docs]
def getstate(self) -> dict:
"""Retrieve the internal state of the object.
Returns
-------
state : dict
An explicit dictionary containing the different attributes of the object.
Even if this is not optimal in terms of memory and speed,
the objects must be explicit so that they can be easily manipulated
when the assembly graph is optimized.
The keys must be of type str and the dictionary must to be jsonisable.
"""
state = self._getstate()
if self._copy is False:
state.update({"copy": False})
return state
[docs]
def in_index(self, stream: Stream) -> int:
"""Return the input index of the stream comming on this node."""
indexs = [index for index, stream_ in enumerate(self._in_streams) if stream_ is stream]
if len(indexs) != 1:
raise AttributeError(f"the stream {stream} doesn't comme into the node {self}")
return indexs.pop()
[docs]
def in_select(self, type_: str) -> tuple[Stream, ...]:
"""Select the incoming streams of a specific type."""
assert isinstance(type_, str), type_.__class__.__name__
return tuple(s for s in self._in_streams if s.type == type_)
@property
def in_streams(self) -> tuple[Stream, ...]:
"""Return the incoming streams on the node."""
return self._in_streams
[docs]
def out_index(self, stream: Stream) -> int:
"""Return the output index of the stream escaping this node."""
indexs = [index for index, stream_ in enumerate(self._out_streams) if stream_ is stream]
if len(indexs) != 1:
raise AttributeError(f"the stream {stream} doesn't leave from the node {self}")
return indexs.pop()
[docs]
def out_select(self, type_: str) -> tuple[Stream, ...]:
"""Select the outgoing streams of a specific type."""
assert isinstance(type_, str), type_.__class__.__name__
return tuple(s for s in self._out_streams if s.type == type_)
@property
def out_streams(self) -> tuple[Stream, ...]:
"""Return the outgoing streams on the node."""
return self._out_streams
[docs]
def setstate(self, in_streams: typing.Iterable[Stream], state: dict) -> None:
"""Allow to completely change the internal state of the node.
Parameters
----------
in_streams
Same as ``cutcutcodec.core.classes.stream.Stream.in_streams``.
state : dict
The internal state returned by the ``getstate`` method of the same class.
"""
state = state.copy()
if (copy := state.pop("copy", None)) is not None:
self._copy = copy
self._setstate(in_streams, state)