Source code for cutcutcodec.core.collections.sa

"""Real interval non overlappig key dictionary."""

import numbers
import typing
from fractions import Fraction


[docs] class SliceAccessor: """A dictionary like collection with intervals as key. * Segments are semi inclusive: [start, start+duration[ * Guarenty no overlapp. * All segments have strictly positive duration. """ def __init__(self): """Initialise an empty accessor.""" self._starts: list[Fraction] = [] # lower bound of each intervals (ascending order) self._durations: list[Fraction] = [] # the duration of each slice (>0) self._values: list = [] # the associated value to each slice key @staticmethod def _get_idx(point: Fraction, points: list[Fraction], *, point_after: bool = True) -> int: """Return idx such as points[idx] <= point and points[idx+1] > point. Parameters ---------- point : Fraction The given position. points : list[Fraction] The boundaries. point_after : boolean, default=True If True, ensure points[idx+1] exists and points[idx+1] > point. If False, idx can be the last point. Examples -------- >>> from fractions import Fraction >>> import numpy as np >>> from cutcutcodec.core.collections.sa import SliceAccessor >>> points = list(map(Fraction, np.sin(np.linspace(-np.pi/2, np.pi/2, 10000)))) >>> SliceAccessor._get_idx(Fraction(-1), points) 0 >>> SliceAccessor._get_idx(Fraction(-np.sqrt(3)/2), points) 1666 >>> SliceAccessor._get_idx(Fraction(0), points) 4999 >>> """ if len(points) == 0: msg = f"there is no point, so {point} can not be after any of them" raise KeyError(msg) idx_min, idx_max = 0, len(points) - 1 if point >= points[idx_max]: if point_after: msg = f"the point {point} is >= than the last point {points[idx_max]}" raise KeyError(msg) return idx_max if point < points[idx_min]: msg = f"the point {point} is before the first segment [{points[idx_min]}, ...[" raise KeyError(msg) while idx_max - idx_min > 1: idx = round( # tangante methode, dichothomie is idx = (idx_min + idx_max) // 2 idx_min + ( (idx_max - idx_min) * (point - points[idx_min]) / (points[idx_max] - points[idx_min]) ), ) idx = max(min(idx, idx_max - 1), idx_min + 1) # avoid infinite loop if points[idx] <= point: idx_min = idx else: idx_max = idx return idx_min @staticmethod def _parse_start_duration(start_duration: tuple) -> tuple[Fraction, Fraction]: """Check and format the input.""" assert isinstance(start_duration, tuple), start_duration.__class__.__name__ assert len(start_duration) == 2, start_duration start, duration = Fraction(start_duration[0]), Fraction(start_duration[1]) assert duration > 0, duration return start, duration def _select_overlapping( self, start: Fraction, duration: Fraction, ) -> tuple[int, tuple[Fraction, Fraction]]: """Yield the indices of all segments that collision with [start, start+duration[. Examples -------- >>> from fractions import Fraction >>> from cutcutcodec.core.collections.sa import SliceAccessor >>> sa = SliceAccessor() >>> sa._starts = list(map(Fraction, range(10))) >>> sa._durations = [Fraction(3, 4) for _ in range(10)] >>> sa._values = [None for _ in range(10)] >>> list(sa._select_overlapping(Fraction(-1), Fraction(1))) # before [] >>> list(sa._select_overlapping(Fraction(9.74), Fraction(1))) # after [] >>> list(sa._select_overlapping(Fraction(-1), Fraction(2.5))) # left overlap [(0, []), (1, [(Fraction(3, 2), Fraction(1, 4))])] >>> list(sa._select_overlapping(Fraction(8.5), Fraction(2))) # right overlap [(8, [(Fraction(8, 1), Fraction(1, 2))]), (9, [])] >>> list(sa._select_overlapping(Fraction(2.5), Fraction(2))) # middle [(2, [(Fraction(2, 1), Fraction(1, 2))]), (3, []), (4, [(Fraction(9, 2), Fraction(1, 4))])] >>> list(sa._select_overlapping(Fraction(9.5), Fraction(1))) # right last [(9, [(Fraction(9, 1), Fraction(1, 2))])] >>> """ # below, is an equivalent but faster algo than the next 3 lines # for idx, old in enumerate(zip(self._starts, self._durations, strict=True)): # if (segments := self._split_segment(*old, start, duration)) != [old]: # yield idx, segments try: idx = self._get_idx(start, self._starts) except KeyError: # case out of bounds or no bound # TODO, not optimal when out of bound, we should just test one rather all for idx, old in enumerate(zip(self._starts, self._durations, strict=True)): if (segments := self._split_segment(*old, start, duration)) != [old]: yield idx, segments else: # case self._starts[idx] <= start and self._starts[idx+1] > start while idx <= len(self._starts) - 1 and self._starts[idx] < start + duration: old = (self._starts[idx], self._durations[idx]) if (segments := self._split_segment(*old, start, duration)) != [old]: yield idx, segments idx += 1 @staticmethod def _split_segment( start_old: Fraction, duration_old: Fraction, start_new: Fraction, duration_new: Fraction, ) -> list[tuple[Fraction, Fraction]]: """Split an old segment to ensure no overlapp with the new. Examples -------- >>> from fractions import Fraction >>> from cutcutcodec.core.collections.sa import SliceAccessor >>> old = (Fraction(0, 1), Fraction(1, 1)) # [0, 1[ >>> # cases no overlap >>> SliceAccessor._split_segment(*old, Fraction(-1), Fraction(1)) # [-1, 0[ [(Fraction(0, 1), Fraction(1, 1))] >>> SliceAccessor._split_segment(*old, Fraction(1), Fraction(1)) # [1, 2[ [(Fraction(0, 1), Fraction(1, 1))] >>> # case erase >>> SliceAccessor._split_segment(*old, *old) # [0, 1[ [] >>> SliceAccessor._split_segment(*old, Fraction(-1), Fraction(3)) # [-1, 2[ [] >>> # case inside >>> SliceAccessor._split_segment(*old, Fraction(0), Fraction(1, 2)) # [0, 1/2[ [(Fraction(1, 2), Fraction(1, 2))] >>> SliceAccessor._split_segment(*old, Fraction(1, 4), Fraction(1, 2)) # [1/4, 3/4[ [(Fraction(0, 1), Fraction(1, 4)), (Fraction(3, 4), Fraction(1, 4))] >>> SliceAccessor._split_segment(*old, Fraction(1, 2), Fraction(1, 2)) # [1/2, 1[ [(Fraction(0, 1), Fraction(1, 2))] >>> # case juxtaposition >>> SliceAccessor._split_segment(*old, Fraction(-1, 2), Fraction(1)) # [-1/2, 1/2[ [(Fraction(1, 2), Fraction(1, 2))] >>> SliceAccessor._split_segment(*old, Fraction(1, 2), Fraction(1)) # [1/2, 3/2[ [(Fraction(0, 1), Fraction(1, 2))] >>> """ segments = [] if start_old < start_new: # segment before end = min(start_old + duration_old, start_new) segments.append((start_old, end - start_old)) if start_old + duration_old > start_new + duration_new: # segment after start = max(start_old, start_new + duration_new) segments.append((start, start_old + duration_old - start)) return segments
[docs] def copy(self) -> typing.Self: """Return a copy of self.""" new = self.__class__.__new__(self.__class__) new._starts = self._starts.copy() # pylint: disable=W0212 new._durations = self._durations.copy() # pylint: disable=W0212 new._values = self._values.copy() # pylint: disable=W0212 return new
def __contains__(self, position: numbers.Real) -> bool: """Test if position in self. Returns ------- boolean True if position somewhere, False if no intervall contains position. Examples -------- >>> from cutcutcodec.core.collections.sa import SliceAccessor >>> sa = SliceAccessor() >>> sa[0, 1] = None >>> 0 in sa True >>> 1 in sa False >>> """ try: self.__getitem__(position) except KeyError: return False return True def __delitem__(self, start_duration: tuple) -> int: """Make shure there is nothing in the interval. It creates a gap and return position of the gap. Examples -------- >>> from cutcutcodec.core.collections.sa import SliceAccessor >>> starts = list(map(Fraction, range(10))) >>> durations = [Fraction(1, 2) for _ in range(10)] >>> values = [None for _ in range(10)] >>> sa = SliceAccessor() >>> sa._starts, sa._durations, sa._values = starts, durations, values >>> sa.copy().__delitem__((-1, 1)) # del nothing on left 0 >>> sa.copy().__delitem__((10, 1)) # del nothing on right 10 >>> sa.copy().__delitem__((-1, 1.5)) # del on left a bit 0 >>> sa.copy().__delitem__((-1, 3.5)) # del on left a lot 0 >>> sa.copy().__delitem__((9.5, 1.5)) # del on right a bit 10 >>> sa.copy().__delitem__((7.5, 1.5)) # del on right a bit 8 >>> sa.copy().__delitem__((2.5, 3)) # del on the middle 3 >>> """ start, duration = self._parse_start_duration(start_duration) # rememove everything if (list_idx_segments := list(self._select_overlapping(start, duration))): splited_segments: dict[tuple[Fraction, Fraction], object] = {} for i, (idx, segments) in enumerate(list_idx_segments): idx = idx - i splited_segments |= dict.fromkeys(segments, self._values[idx]) del self._starts[idx] del self._durations[idx] del self._values[idx] # add splited segments first_idx = list_idx_segments[0][0] for start_, duration_ in sorted(splited_segments, reverse=True): self._starts.insert(first_idx, start_) self._durations.insert(first_idx, duration_) self._values.insert(first_idx, splited_segments[(start_, duration_)]) # indice of insertion if not self._starts: return 0 if start < self._starts[0]: return 0 if start > self._starts[-1]: return len(self._starts) return self._get_idx(start, self._starts) + 1 def __getitem__(self, position: numbers.Real) -> object: """Get the object in the segment self[position]. Raises ------ KeyError If ``position`` is not in a segment. Examples -------- >>> from cutcutcodec.core.collections.sa import SliceAccessor >>> sa = SliceAccessor() >>> sa[0, 1] = "a" >>> sa[1, 1] = "b" >>> sa[0.25, 0.5] = "c" >>> sa[1.5, 1] = "d" >>> print(sa) [0, 1/4[: a [1/4, 3/4[: c [3/4, 1[: a [1, 3/2[: b [3/2, 5/2[: d >>> sa[0] 'a' >>> sa[0.25] 'c' >>> sa[2] 'd' >>> """ position = Fraction(position) # find idx sush as self._starts[idx] <= position idx = self._get_idx(position, self._starts, point_after=False) end = self._starts[idx] + self._durations[idx] if position >= end: msg = f"the point {position} is after [..., {end}[" if idx + 1 <= len(self._starts) - 1: msg += f" and before [{self._starts[idx+1]}, ...[" raise KeyError(msg) return self._values[idx] def __iter__(self) -> tuple[Fraction, Fraction]: """Iterate on the key intervals. Yields ------ start : Fraction The lower bound of each intervals. duration: Fraction The duration of each intervals. """ yield from zip(self._starts, self._durations, strict=True) def __len__(self) -> int: """Return the number of slices.""" return len(self._starts) def __setitem__(self, start_duration: tuple, value: object): """Add an item self[start, duration] = object. Examples -------- >>> from cutcutcodec.core.collections.sa import SliceAccessor >>> sa = SliceAccessor() >>> # add no otherlapp >>> sa[0, 1] = 0 >>> sa[2, 2] = 2 >>> sa[1, 0.5] = 1 >>> # add with overlapp >>> sa[0, 0.5] = 3 >>> sa[3, 0.5] = 4 >>> print(sa) [0, 1/2[: 3 [1/2, 1[: 0 [1, 3/2[: 1 [2, 3[: 2 [3, 7/2[: 4 [7/2, 4[: 2 >>> """ idx = self.__delitem__(start_duration) start, duration = self._parse_start_duration(start_duration) self._starts.insert(idx, start) self._durations.insert(idx, duration) self._values.insert(idx, value) def __str__(self) -> str: """Display it.""" # get common denominator # den = 1 # for frac in itertools.chain(self._starts, self._durations): # frac.denominator return "\n".join( f"[{s}, {s+d}[: {v}" for s, d, v in zip(self._starts, self._durations, self._values, strict=True) )