Code source de cutcutcodec.core.analysis.video.metric

#!/usr/bin/env python3

"""Image metrics."""

from fractions import Fraction
import math
import numbers
import pathlib
import typing

import numpy as np
import torch
import tqdm

from cutcutcodec.core.analysis.stream.rate_video import optimal_rate_video
from cutcutcodec.core.analysis.stream.shape import optimal_shape_video
from cutcutcodec.core.classes.colorspace import Colorspace
from cutcutcodec.core.io import read
from cutcutcodec.core.io.read_ffmpeg import ContainerInputFFMPEG
from cutcutcodec.core.opti.parallel import map as threaded_map, starmap
from .utils import batched_comparative_frames, batched_single_frames
from .vmaf import vmaf


__all__ = ["psnr", "ssim", "uvq", "vmaf"]


def _batch_frames(frames: typing.Iterable[tuple]) -> tuple:
    """Gather frames in 256 MB batches."""
    nb_pix = 0  # the number of pixel in one frame
    batch_ref, batch_dis = [], []
    for frame_ref, frame_dis in frames:
        if not nb_pix:
            nb_pix = frame_ref.shape[0] * frame_ref.shape[1]
        batch_ref.append(frame_ref.unsqueeze(0))
        batch_dis.append(frame_dis.unsqueeze(0))
        # 256e6 (MB) / 3 (channels) / 4 (bytes per float32) / 2 (batches)
        if len(batch_ref) >= math.ceil(1.067e7 / nb_pix):
            yield torch.cat(batch_ref, dim=0), torch.cat(batch_dis, dim=0)
            batch_ref, batch_dis = [], []
    if batch_ref:
        yield torch.cat(batch_ref, dim=0), torch.cat(batch_dis, dim=0)


def _compare(batch_ref: torch.Tensor, batch_dis: torch.Tensor, kwargs: dict) -> dict:
    """Compare the 2 batches with the different metrics."""
    # the factors comes from https://github.com/fraunhoferhhi/vvenc/wiki/Encoder-Performance
    res = {}

    def to4(data: list[float]) -> list[float]:
        return [round(e, 4) for e in data]

    if kwargs.get("psnr", False):
        res["psnr"] = to4(psnr(batch_ref, batch_dis, weights=(6, 1, 1)).tolist())
    if kwargs.get("ssim", False):
        res["ssim"] = to4(ssim(batch_ref, batch_dis, weights=(6, 1, 1), data_range=1.0).tolist())
    if kwargs.get("vmaf", False):
        res["vmaf"] = to4(vmaf(batch_ref, batch_dis).tolist())
    return res


def _yield_frames(ref: pathlib.Path, dis: pathlib.Path) -> tuple:
    """Read frames 2 by 2."""
    # find colorspace
    with ContainerInputFFMPEG(ref) as cont_ref:
        stream_ref = cont_ref.out_select("video")[0]
        colorspace = stream_ref.colorspace
    colorspace = Colorspace("y'pbpr", colorspace.primaries, colorspace.transfer)
    with (
        read(ref, colorspace=colorspace) as cont_ref,
        read(dis, colorspace=colorspace) as cont_dis,
    ):
        stream_ref = cont_ref.out_select("video")[0]
        stream_dis = cont_dis.out_select("video")[0]
        rate = optimal_rate_video(stream_ref) or Fraction(3000, 1001)
        shape = optimal_shape_video(stream_ref) or (720, 1080)
        duration = min(stream_ref.duration, stream_dis.duration)
        times = (
            [0] if math.isinf(duration) else
            np.arange(0.5/rate, float(duration), 1.0/rate).tolist()
        )
        if len(times) == 1:
            yield stream_ref.snapshot(times[0], shape), stream_dis.snapshot(times[0], shape)
        else:
            yield from tqdm.tqdm(
                threaded_map(
                    lambda t: (stream_ref.snapshot(t, shape), stream_dis.snapshot(t, shape)),
                    times,
                ),
                desc="compare",
                total=len(times),
                unit="img",
            )


def _yield_frames_uvq(video: pathlib.Path) -> torch.Tensor:
    """Read the video at 5 fps in sRGB of shape 1280x720."""
    rate = 5
    with read(video, colorspace=Colorspace("r'g'b'", "srgb", "srgb")) as cont:
        stream = cont.out_select("video")[0]
        duration = stream.duration
        times = (
            [0] if math.isinf(duration) else
            np.arange(0.0, float(duration), 1.0/rate).tolist()
        )
        batch = []
        for frame in tqdm.tqdm(
            threaded_map(lambda t: stream.snapshot(t, (720, 1280)).convert(3), times),
            desc="uvq",
            total=len(times),
            unit="img",
        ):
            batch.append(frame)
            if len(batch) == rate:
                yield torch.cat([torch.Tensor(f)[None, :, :, :] for f in batch], dim=0)
                batch = []
        if batch:
            while len(batch) < rate:
                batch.append(batch[-1])
            yield torch.cat([torch.Tensor(f)[None, :, :, :] for f in batch], dim=0)


[docs] def compare( ref: pathlib.Path | str | bytes, dis: pathlib.Path | str | bytes, **kwargs ) -> dict[str, list[float]]: """Compare 2 video files with differents metrics. Parameters ---------- ref : pathlike The reference video file. dis : pathlike The distorted video. psnr : boolean, dafault=False If True, compute the psnr (very fast). ssim : boolean, default=False If True, compute the ssim (slow). uvq : boolean, default=False If True, compute the uvq on the `dis` video (very slow). It returns only one value per second. vmaf : boolean, default=False If True, compute the vmaf (medium). Returns ------- metrics : dict[str, list[float]] Each metric name is associated with the scalar value of each frame. All the numbers are rounded to 4 decimals number. Notes ----- Frames are converted to yuv if not already converted, then the distorted video is converted to the color space of the reference video. Examples -------- >>> import pprint >>> from cutcutcodec.core.analysis.video.metric import compare >>> res = compare("media/video/intro.webm", "media/video/intro.webm", psnr=True, ssim=True) >>> pprint.pprint(res) # doctest: +ELLIPSIS {'psnr': [100.0, 100.0, ..., 100.0, 100.0], 'ssim': [1.0, 1.0, ..., 1.0, 1.0]} >>> compare(None, "media/video/intro.webm", uvq=True) {'uvq': [2.8496, 2.8577, 2.9328, 3.3655, 3.8789, 3.4904, 3.1649, 3.1805, 3.1579, 3.2773]} >>> """ dis = pathlib.Path(dis).expanduser() metrics = {} if kwargs.get("psnr", False) or kwargs.get("ssim", False) or kwargs.get("vmaf", False): ref = pathlib.Path(ref).expanduser() for batch_metrics in starmap( _compare, ((r, d, kwargs) for r, d in _batch_frames(_yield_frames(ref, dis))), ): if not metrics: metrics = batch_metrics else: for key, metric in metrics.items(): metric.extend(batch_metrics[key]) if kwargs.get("uvq", False): from .uvq_google.inference import UVQInference model = UVQInference() metrics["uvq"] = [] for batch in _yield_frames_uvq(dis): metrics["uvq"].append(round(float(uvq(batch, _model=model)), 4)) return metrics
[docs] @batched_comparative_frames def psnr(ref: torch.Tensor, dis: torch.Tensor, *args, **kwargs) -> torch.Tensor: """Compute the peak signal to noise ratio of 2 images. Parameters ---------- ref, dis : arraylike The 2 images to be compared, of shape ([*batch], height, width, channels). Supported types are float32 and float64. weights : iterable[float], optional The relative weight of each channel. By default, all channels have the same weight. threads : int, optional Defines the number of threads. The value -1 means that the function uses as many calculation threads as there are cores. The default value (0) allows the same behavior as (-1) if the function is called in the main thread, otherwise (1) to avoid nested threads. Any other positive value corresponds to the number of threads used. Returns ------- psnr : arraylike The global peak signal to noise ratio, as a ponderation of the mean square error of each channel. It is batched and clamped in [0, 100] db. Notes ----- * It is optimized for C contiguous tensors. * If device is cpu and gradient is not required, a fast C code is used instead of torch code. Examples -------- >>> import numpy as np >>> from cutcutcodec.core.analysis.video.metric import psnr >>> np.random.seed(0) >>> ref = np.random.random((720, 1080, 3)) # It could also be a torch array list... >>> dis = 0.8 * ref + 0.2 * np.random.random((720, 1080, 3)) >>> psnr(ref, dis).round(1) np.float64(21.8) >>> """ if ( ref.requires_grad or dis.requires_grad or ref.device.type != "cpu" or dis.device.type != "cpu" ): from .psnr_torch import psnr_torch return psnr_torch(ref, dis, *args, **kwargs) from .metric import psnr as psnr_c return torch.asarray( [psnr_c(r, d, *args, **kwargs) for r, d in zip(ref.numpy(), dis.numpy())], dtype=ref.dtype, )
[docs] @batched_comparative_frames def ssim(ref: torch.Tensor, dis: torch.Tensor, *args, stride: int = 1, **kwargs) -> torch.Tensor: """Compute the Structural similarity index measure of 2 images. Parameters ---------- ref, dis : arraylike The 2 images to be compared, of shape ([*batch], height, width, channels). Supported types are float32 and float64. data_range : float, default=1.0 The data range of the input image (difference between maximum and minimum possible values). weights : iterable[float], optional The relative weight of each channel. By default, all channels have the same weight. sigma : float, default=1.5 The standard deviation of the gaussian. It has to be strictely positive. stride : int, default=1 The stride of the convolving kernel. threads : int, optional Defines the number of threads. The value -1 means that the function uses as many calculation threads as there are cores. The default value (0) allows the same behavior as (-1) if the function is called in the main thread, otherwise (1) to avoid nested threads. Any other positive value corresponds to the number of threads used. Returns ------- ssim : arraylike The ponderated structural similarity index measure of each layers. Notes ----- * It is optimized for C contiguous tensors. * If device is cpu, gradient is not required and stride != 1, a fast C code is used. Examples -------- >>> import numpy as np >>> from cutcutcodec.core.analysis.video.metric import ssim >>> np.random.seed(0) >>> ref = np.random.random((720, 1080, 3)) # It could also be a torch array list... >>> dis = 0.8 * ref + 0.2 * np.random.random((720, 1080, 3)) >>> ssim(ref, dis).round(2) np.float64(0.95) >>> """ assert isinstance(stride, numbers.Integral), stride.__class__.__name__ if stride == 1: from .ssim_torch import ssim_fft_torch return ssim_fft_torch(ref, dis, *args, **kwargs) if ( ref.requires_grad or dis.requires_grad or ref.device.type != "cpu" or dis.device.type != "cpu" ): from .ssim_torch import ssim_conv_torch return ssim_conv_torch(ref, dis, *args, stride=stride, **kwargs) from .metric import ssim as ssim_c return torch.asarray( [ssim_c(r, d, *args, stride=stride, **kwargs) for r, d in zip(ref.numpy(), dis.numpy())], dtype=ref.dtype, )
[docs] @batched_single_frames def uvq(dis: torch.Tensor, _model=None) -> torch.Tensor: """Compute the Perceptual Video Quality. Parameters ---------- dis : arraylike The frames to be evaluated, of shape ([*batch], fps=5, height, width, channels=3). The framerate is assumed to be 5 Hz. The frames are assumed to be in RGB in range [0, 1]. Gamut and eotf must be standard rgb. Returns ------- uvq : arraylike The perceptual video quality measure for each group of 5 images. Examples -------- >>> import numpy as np >>> from cutcutcodec.core.analysis.video.metric import uvq >>> np.random.seed(0) >>> dis = np.random.random((5, 720, 1080, 3)) # It could also be a torch array list... >>> uvq(dis).round(1) np.float32(3.3) >>> """ if _model is None: from .uvq_google.inference import UVQInference _model = UVQInference() return _model.forward(dis)