Source code for cutcutcodec.core.classes.node

#!/usr/bin/env python3

"""General node of the assembly graph."""

import abc
import typing

from cutcutcodec.core.classes.stream import Stream


[docs] class Node(abc.ABC): """General node of the assembly graph. Parameters ---------- copy : boolean, default=True If False, try to avoid realocation as much as possible. By default, the modifications are not inplace (readonly). in_streams : tuple[Stream, ...] The streams arriving on the node (readonly). out_streams : tuple[Stream, ...] The streams coming back on the node (readonly). """ def __init__( self, in_streams: typing.Union[list[Stream], tuple[Stream]], out_streams: typing.Union[list[Stream], tuple[Stream]], ): assert isinstance(in_streams, (list, tuple)), in_streams.__class__.__name__ in_streams = tuple(in_streams) assert all(isinstance(stream, Stream) for stream in in_streams), \ [stream.__class__.__name__ for stream in in_streams] assert isinstance(out_streams, (list, tuple)), out_streams.__class__.__name__ out_streams = tuple(out_streams) assert all(isinstance(stream, Stream) for stream in out_streams), \ [stream.__class__.__name__ for stream in out_streams] self._in_streams = in_streams self._out_streams = out_streams self._copy = getattr(self, "_copy", True) def __eq__(self, other) -> bool: """Check that 2 nodes are equivalent.""" if self.__class__ != other.__class__: return False if self.getstate() != other.getstate(): return False if self.in_streams != other.in_streams: return False # not compare out_streams to avoide infinite loop return True def __getstate__(self) -> tuple[typing.Iterable[Stream], dict]: """Allow ``pickle`` to serialize efficiently.""" return self.in_streams, self.getstate() def __setstate__(self, streams_state) -> None: """Allow ``pickle`` to recreate the object identically. Parameters ---------- streams_state : tuple[typing.Iterable[cutcutcodec.core.classes.stream.Stream], dict] These are the input streams and the other arguments. """ assert isinstance(streams_state, tuple), streams_state.__class__.__name__ assert len(streams_state) == 2, streams_state in_streams, state = streams_state self.setstate(in_streams, state) @abc.abstractmethod def _getstate(self) -> dict: """Help for ``cutcutcodec.core.classes.node.Node.getstate``.""" raise NotImplementedError @abc.abstractmethod def _setstate(self, in_streams: typing.Iterable[Stream], state: dict) -> None: """Help for ``cutcutcodec.core.classes.node.Node.setstate``.""" raise NotImplementedError @property def copy(self) -> bool: """Try to avoid realocation as much as possible if False.""" return self._copy
[docs] @classmethod @abc.abstractmethod def default(cls): """Provide an example of an instance of this node. Returns ------- example : Node An example of an instance of this class. Notes ----- It is a ``classmethod`` rather than a ``staticmethod`` in order to simplify inheritance. """ raise NotImplementedError
[docs] def getstate(self) -> dict: """Retrieve the internal state of the object. Returns ------- state : dict An explicit dictionary containing the different attributes of the object. Even if this is not optimal in terms of memory and speed, the objects must be explicit so that they can be easily manipulated when the assembly graph is optimized. The keys must be of type str and the dictionary must to be jsonisable. """ state = self._getstate() if self._copy is False: state.update({"copy": False}) return state
[docs] def in_index(self, stream: Stream) -> int: """Return the input index of the stream comming on this node.""" indexs = [index for index, stream_ in enumerate(self._in_streams) if stream_ is stream] if len(indexs) != 1: raise AttributeError(f"the stream {stream} doesn't comme into the node {self}") return indexs.pop()
[docs] def in_select(self, type_: str) -> tuple[Stream, ...]: """Select the incoming streams of a specific type.""" assert isinstance(type_, str), type_.__class__.__name__ return tuple(s for s in self._in_streams if s.type == type_)
@property def in_streams(self) -> tuple[Stream, ...]: """Return the incoming streams on the node.""" return self._in_streams
[docs] def out_index(self, stream: Stream) -> int: """Return the output index of the stream escaping this node.""" indexs = [index for index, stream_ in enumerate(self._out_streams) if stream_ is stream] if len(indexs) != 1: raise AttributeError(f"the stream {stream} doesn't leave from the node {self}") return indexs.pop()
[docs] def out_select(self, type_: str) -> tuple[Stream, ...]: """Select the outgoing streams of a specific type.""" assert isinstance(type_, str), type_.__class__.__name__ return tuple(s for s in self._out_streams if s.type == type_)
@property def out_streams(self) -> tuple[Stream, ...]: """Return the outgoing streams on the node.""" return self._out_streams
[docs] def setstate(self, in_streams: typing.Iterable[Stream], state: dict) -> None: """Allow to completely change the internal state of the node. Parameters ---------- in_streams Same as ``cutcutcodec.core.classes.stream.Stream.in_streams``. state : dict The internal state returned by the ``getstate`` method of the same class. """ state = state.copy() if (copy := state.pop("copy", None)) is not None: self._copy = copy self._setstate(in_streams, state)