Source code for cutcutcodec.core.io.scheduler

"""Combine several stream to yield frame in monotonic order.

It is helpfull for writing and muxing.
"""

import functools
import itertools
import logging
import math
import typing
from fractions import Fraction

import numpy as np
import torch

from cutcutcodec.core.classes.frame import Frame
from cutcutcodec.core.classes.frame_audio import FrameAudio
from cutcutcodec.core.classes.frame_video import FrameVideo
from cutcutcodec.core.classes.stream import Stream
from cutcutcodec.core.classes.stream_audio import StreamAudio
from cutcutcodec.core.classes.stream_video import StreamVideo
from cutcutcodec.core.exceptions import MissingStreamError, OutOfTimeRange
from cutcutcodec.core.opti.parallel.buffer import map as threaded_map
from cutcutcodec.core.opti.parallel.buffer import starmap


def _raise_missing_stream_error(gen: typing.Iterable) -> typing.Iterable:
    """Decorate for throwing MissingStreamError if the gen yields nothing."""
    @functools.wraps(gen)
    def generator(*args, **kwargs):
        empty = True
        for res in gen(*args, **kwargs):
            empty = False
            yield res
        if empty:
            raise MissingStreamError("no frame is present in any of the streams")
    return generator


def _snapshot_audio_frames_async(arg):
    """Help audio_frames_async.

    Picklable version
    """
    stream, timestamp, rate, samples = arg
    # extraction
    try:
        frame = stream.snapshot(timestamp, rate, samples)
    except OutOfTimeRange as err:  # end is near
        end = stream.beginning + stream.duration
        samples_frac = (end - timestamp) * rate
        if (samples := math.floor(samples_frac)) <= 0:
            raise err
        frame = stream.snapshot(timestamp, rate, samples)

    # verification
    min_val, max_val = torch.aminmax(frame)
    if min_val.item() < -1 or max_val.item() > 1:
        logging.warning(
            "saturated samples detected min %f max %f", min_val.item(), max_val.item(),
        )
    return frame


[docs] def audio_frames_async( stream: StreamAudio, rate: Fraction | int, start_time: Fraction, samples: int | None = None, **_, ) -> typing.Iterable[FrameAudio]: """Decode the audio frames in an over thread for performance. Parameters ---------- stream : cutcutcodec.core.classes.stream_audio.StreamAudio The audio stream for extract the frames. rate : int The time frequency between each samples to catch. start_time : fraction.Fraction The frame with start time < start_time are ignored. samples : int, optional The maximum number of samples in a frame. The last frame can to contains less samples in order to reach the end. The special default value None means to choose automaticaly the optimal number of samples in accordance to the sample rate. By default, the samples are choose for a duration of 100 ms. Yields ------ cutcutcodec.core.classes.frame_audio.FrameAudio The frame audio such as create the complete signal if we concatenate her. Examples -------- >>> from fractions import Fraction >>> from cutcutcodec.core.filter.audio.subclip import FilterAudioSubclip >>> from cutcutcodec.core.generation.audio.noise import GeneratorAudioNoise >>> from cutcutcodec.core.io.scheduler import audio_frames_async >>> >>> (stream,) = FilterAudioSubclip(GeneratorAudioNoise(0).out_streams, 0, 1).out_streams >>> for frame in audio_frames_async(stream, 1000, Fraction(10, 1001), samples=200): ... frame.time, frame.samples ... (Fraction(1, 100), 200) (Fraction(21, 100), 200) (Fraction(41, 100), 200) (Fraction(61, 100), 200) (Fraction(81, 100), 190) >>> """ assert isinstance(stream, StreamAudio), stream.__class__.__name__ if isinstance(rate, Fraction): assert rate.denominator == 1, f"{rate} has to be integer" rate = int(rate) assert isinstance(rate, int), rate.__class__.__name__ assert rate > 0, rate # avoid stationnary 0 and backward assert isinstance(start_time, Fraction), start_time.__class__.__name__ if samples is None: samples = 2**round(math.log(max(1, rate*100e-3), 2)) # approx 100 ms assert isinstance(samples, int), samples.__class__.__name__ assert samples >= 1, samples start_time = max(start_time, stream.beginning) # avoid OutOfTimeRange for t < beginning if rest := start_time % Fraction(1, rate): start_time += Fraction(1, rate) - rest # start_time = k*(1/sr), k integer interval = Fraction(samples, rate) try: yield from threaded_map( _snapshot_audio_frames_async, ((stream, start_time + i*interval, rate, samples) for i in itertools.count()), ) except OutOfTimeRange: pass
[docs] def video_frames_async( stream: StreamVideo, out_fps: Fraction, start_time: Fraction, shape: tuple[int, int], **_, ) -> typing.Iterable[FrameVideo]: """Decode the video frames in an over thread for performance. Parameters ---------- stream : cutcutcodec.core.classes.stream_video.StreamVideo The video stream for extract the frames. out_fps : Fraction The 1/step interval for each frame timestamp request. start_time : Fraction The frame with timestamp < start_time are ignored. shape : tuple[int, int] Transmitted to ``mocia.core.classes.stream_video.snapshot``. Yields ------ cutcutcodec.core.classes.frame_video.FrameVideo The frame videos in monotonic (non strict) order. Examples -------- >>> from fractions import Fraction >>> from cutcutcodec.core.filter.video.subclip import FilterVideoSubclip >>> from cutcutcodec.core.generation.video.noise import GeneratorVideoNoise >>> from cutcutcodec.core.io.scheduler import video_frames_async >>> >>> (stream,) = FilterVideoSubclip(GeneratorVideoNoise(0).out_streams, 0, 1).out_streams >>> for frame in video_frames_async(stream, Fraction(10, 1), Fraction(99, 200), shape=(1, 1)): ... frame.time ... Fraction(1, 2) Fraction(3, 5) Fraction(7, 10) Fraction(4, 5) Fraction(9, 10) >>> """ assert isinstance(stream, StreamVideo), stream.__class__.__name__ assert isinstance(out_fps, Fraction), out_fps.__class__.__name__ assert out_fps > 0, out_fps # avoid division by 0 and backward assert isinstance(start_time, Fraction), start_time.__class__.__name__ assert isinstance(shape, tuple), shape.__class__.__name__ assert len(shape) == 2, len(shape) assert all(isinstance(s, int) and s >= 1 for s in shape), shape start_time = max(start_time, stream.beginning) # avoid OutOfTimeRange for t < beginning if rest := start_time % (1/out_fps): start_time += 1/out_fps - rest # start_time = k*(1/fps), k integer try: # for i in itertools.count(): # yield stream.snapshot(start_time + i/out_fps, shape) yield from starmap( stream.snapshot, ((start_time + i/out_fps, shape) for i in itertools.count()), ) except OutOfTimeRange: pass
[docs] @_raise_missing_stream_error def scheduler( streams: list[Stream], rates: list[Fraction | int], start_time: Fraction = Fraction(0), shapes: None | tuple[int, int] | list[None | tuple[int, int]] = None, **kwargs, ) -> typing.Iterable[tuple[int, Frame]]: """Extract in chronological order the frames of all flows. Gives up frames until all streams have raised the OutOfTimeRange exception. Parameters ---------- streams : list[cutcutcodec.core.classes.stream.Stream] Audio or video streams to exract the frames. rates : list[Fraction] The frame rate for video streams and the sample rate for audio streams. start_time : Fraction, default=0 The position of the first frame yielded. The frame before are ignored. shapes : tuple[int, int] or list[tuple[int, int]], optional For video only, ignore if there is no video stream. Each shape are transmitted to the stream. If the list is not provided, the same shape is transmitted to all video streams. **kwargs : dict Transmitted to the functions ``cutcutcodec.core.io.scheduler.audio_frames_async`` and ``cutcutcodec.core.io.scheduler.video_frames_async``. Yields ------ index : int The index of the concerned stream in the order of the provided list. frame : cutcutcodec.core.classes.frame.Frame The future frame of the stream considered. The ``time`` attribute is guaranteed to be monotonic. Raises ------ cutcutcodec.core.exceptions.MissingStreamError If no frame are yielded. Examples -------- >>> from fractions import Fraction >>> from cutcutcodec.core.filter.audio.subclip import FilterAudioSubclip >>> from cutcutcodec.core.filter.video.subclip import FilterVideoSubclip >>> from cutcutcodec.core.generation.audio.noise import GeneratorAudioNoise >>> from cutcutcodec.core.generation.video.noise import GeneratorVideoNoise >>> from cutcutcodec.core.io.scheduler import scheduler >>> >>> s_1 = ( ... FilterVideoSubclip(GeneratorVideoNoise(0).out_streams, 0, Fraction(1, 10)).out_streams ... ) >>> s_2 = ( ... FilterVideoSubclip(GeneratorVideoNoise(0).out_streams, 0, Fraction(1, 4)).out_streams ... ) >>> streams_video = [s_1[0], s_2[0]] >>> rates_video = [Fraction(30), Fraction(24)] >>> s_3 = ( ... FilterAudioSubclip(GeneratorAudioNoise(0).out_streams, 0, Fraction(1, 10)).out_streams ... ) >>> s_4 = ( ... FilterAudioSubclip(GeneratorAudioNoise(0).out_streams, 0, Fraction(1, 4)).out_streams ... ) >>> streams_audio = [s_3[0], s_4[0]] >>> rates_audio = [96000, 48000] >>> >>> # test audio only >>> for index, frame in scheduler(streams_audio, rates_audio): ... index, frame.time ... (0, Fraction(0, 1)) (1, Fraction(0, 1)) (0, Fraction(32, 375)) (1, Fraction(32, 375)) (1, Fraction(64, 375)) >>> for index, frame in scheduler(streams_audio, rates_audio, start_time=Fraction(1, 10)): ... index, frame.time ... (1, Fraction(1, 10)) (1, Fraction(139, 750)) >>> >>> # test video only >>> for index, frame in scheduler(streams_video, rates_video, shape=(1, 1)): ... index, frame.time ... (0, Fraction(0, 1)) (1, Fraction(0, 1)) (0, Fraction(1, 30)) (1, Fraction(1, 24)) (0, Fraction(1, 15)) (1, Fraction(1, 12)) (1, Fraction(1, 8)) (1, Fraction(1, 6)) (1, Fraction(5, 24)) >>> for index, frame in scheduler(streams_video, rates_video, Fraction(1, 10), shape=(1, 1)): ... index, frame.time ... (1, Fraction(1, 8)) (1, Fraction(1, 6)) (1, Fraction(5, 24)) >>> >>> # test audio and video >>> for index, frame in scheduler( ... streams_audio+streams_video, rates_audio+rates_video, shape=(1, 1), samples=4096 ... ): ... index, frame.time ... (0, Fraction(0, 1)) (1, Fraction(0, 1)) (2, Fraction(0, 1)) (3, Fraction(0, 1)) (2, Fraction(1, 30)) (3, Fraction(1, 24)) (0, Fraction(16, 375)) (2, Fraction(1, 15)) (3, Fraction(1, 12)) (0, Fraction(32, 375)) (1, Fraction(32, 375)) (3, Fraction(1, 8)) (3, Fraction(1, 6)) (1, Fraction(64, 375)) (3, Fraction(5, 24)) >>> """ assert isinstance(streams, list), streams.__class__.__name__ assert all(isinstance(s, Stream) for s in streams), streams assert isinstance(rates, list), rates.__class__.__name__ assert len(streams) == len(rates) if shapes is None: shapes = [None for _ in streams] elif isinstance(shapes, tuple): shapes = [shapes for _ in streams] else: assert isinstance(shapes, list), shapes.__class__.__name__ assert len(streams) == len(shapes) buffer = [None for _ in range(len(streams))] # buffered frame for each stream queues = [ iter( {"audio": audio_frames_async, "video": video_frames_async}[stream.type] (stream, rate, start_time, **{"shape": shape, **kwargs}), ) for stream, rate, shape in zip(streams, rates, shapes) ] # asynchroneous decoder of frame for each stream, None when exausted while any(q is not None for q in queues): index = [j for j in (i for i, q in enumerate(queues) if q is not None) if buffer[j] is None] try: index = index.pop() # search position of empty stream except IndexError: # in case we have to yield a frame index = np.argmin([f.time if f is not None else math.inf for f in buffer]) yield int(index), buffer[index] buffer[index] = None else: # in case we need to recover a new frame try: buffer[index] = next(queues[index]) except StopIteration: buffer[index] = queues[index] = None