Source code for cutcutcodec.core.analysis.stream.shape

"""Smartly choose the shape of a video stream."""

import math

from cutcutcodec.core.classes.stream_video import StreamVideo
from cutcutcodec.core.filter.video.resize import FilterVideoResize
from cutcutcodec.core.io.read_ffmpeg import ContainerInputFFMPEG
from cutcutcodec.core.io.read_image import ContainerInputImage
from cutcutcodec.core.io.read_svg import ContainerInputSVG

SHAPE_ESTIMATORS = {}  # to each node stream class, associate the func to find the optimal rate


def _add_estimator(node_cls: type) -> callable:
    def _add_func(func) -> callable:
        SHAPE_ESTIMATORS[node_cls] = func
        return func
    return _add_func


@_add_estimator(FilterVideoResize)
def _optimal_shape_filter_video_resize(stream: StreamVideo) -> tuple[int, int]:
    """Detect the best shape of a FilterVideoReshape.

    Examples
    --------
    >>> from cutcutcodec.core.analysis.stream.shape import optimal_shape_video
    >>> from cutcutcodec.core.filter.video.resize import FilterVideoResize
    >>> from cutcutcodec.core.generation.video.noise import GeneratorVideoNoise
    >>> (stream,) = FilterVideoResize(GeneratorVideoNoise(0).out_streams, (360, 640)).out_streams
    >>> optimal_shape_video(stream)
    (360, 640)
    >>>

    """
    assert isinstance(stream.node, FilterVideoResize), stream.node.__class__.__name__
    return stream.node.shape


@_add_estimator(ContainerInputFFMPEG)
def _optimal_shape_container_input_ffmpeg(stream: StreamVideo) -> tuple[int, int]:
    """Detect the shape of a ContainerInputFFMPEG.

    Examples
    --------
    >>> from cutcutcodec.core.analysis.stream.shape import optimal_shape_video
    >>> from cutcutcodec.core.io.read_ffmpeg import ContainerInputFFMPEG
    >>> from cutcutcodec.utils import get_project_root
    >>> video = get_project_root() / "media" / "video" / "intro.webm"
    >>> stream = ContainerInputFFMPEG(video).out_select("video")[0]
    >>> optimal_shape_video(stream)
    (720, 1280)
    >>>

    """
    assert isinstance(stream.node, ContainerInputFFMPEG), stream.node.__class__.__name__
    return (stream.height, stream.width)


@_add_estimator(ContainerInputImage)
def _optimal_shape_container_input_image(stream: StreamVideo) -> tuple[int, int]:
    """Detect the shape of a ContainerInputFFMPEG.

    Examples
    --------
    >>> from cutcutcodec.core.analysis.stream.shape import optimal_shape_video
    >>> from cutcutcodec.core.io.read_image import ContainerInputImage
    >>> from cutcutcodec.utils import get_project_root
    >>> image = get_project_root() / "media" / "image" / "logo.png"
    >>> (stream,) = ContainerInputImage(image).out_streams
    >>> optimal_shape_video(stream)
    (64, 64)
    >>>

    """
    assert isinstance(stream.node, ContainerInputImage), stream.node.__class__.__name__
    return (stream.height, stream.width)


@_add_estimator(ContainerInputSVG)
def _optimal_shape_container_input_svg(stream: StreamVideo) -> tuple[int, int]:
    """Detect the shape of a ContainerInputFFMPEG.

    Examples
    --------
    >>> from cutcutcodec.core.analysis.stream.shape import optimal_shape_video
    >>> from cutcutcodec.core.io.read_svg import ContainerInputSVG
    >>> from cutcutcodec.utils import get_project_root
    >>> image = get_project_root() / "media" / "image" / "logo.svg"
    >>> (stream,) = ContainerInputSVG(image).out_streams
    >>> optimal_shape_video(stream)
    (64, 64)
    >>>

    """
    assert isinstance(stream.node, ContainerInputSVG), stream.node.__class__.__name__
    return (stream.height, stream.width)


[docs] def optimal_shape_video( stream: StreamVideo, choices: set[tuple[int, int]] | None = None, ) -> None | tuple[int, int]: """Find the optimal frame shape for a given video stream. Parameters ---------- stream : cutcutcodec.core.classes.stream_video.StreamVideo The video stream that we want to find the optimal fps. choices : set[tuple[int, int]], optional The possible shape. If provide, returns the most appropriate shape of this set. The convention adopted is the numpy convention (height, width) in pixels. Returns ------- shape : tuple[int, int] The shape that limit the reshape informations and the information loss. """ # verifications assert isinstance(stream, StreamVideo), stream.__class__.__name__ if choices is not None: assert isinstance(choices, set), choices.__class__.__name__ assert all(isinstance(s, tuple) for s in choices), choices assert all(len(s) == 2 for s in choices), choices assert all(isinstance(s[0], int) and isinstance(s[1], int) for s in choices), choices assert all(min(s) >= 1 for s in choices), choices def shape_key(shape): """Favorise little surface and square surface.""" height, width = shape area = height * width deformation = 2*math.sqrt(area) / (height + width) # 1 -> square, 0 -> flat return area - deformation # optimisation if choices and len(choices) == 1: # case not nescessary to do computing return choices.pop() # estimation of the best shape if (estimator := SHAPE_ESTIMATORS.get(stream.node.__class__, None)) is not None: shape = estimator(stream) else: shapes = (optimal_shape_video(s) for s in stream.node.in_streams if s.type == "video") shapes = [s for s in shapes if s is not None] shape = (max(s[0] for s in shapes), max(s[1] for s in shapes)) if shapes else None # select the most appropriate rate among the choices if not choices or shape is None: return min(choices, key=shape_key) if choices else shape choices = sorted(choices, key=shape_key) for choice in choices: if shape[0] <= choice[0] and shape[1] <= choice[1]: return choice return choices[-1]