Source code for cutcutcodec.core.analysis.video.complexity.dct

"""Compute a differenciable batched torch spatial dtc complexity."""

import functools
import numbers

import torch

from cutcutcodec.core.opti.parallel.threading import TorchThreads

from .utils import batched_frames


[docs] @functools.cache def dct_matrix(size: numbers.Integral, dtype: torch.dtype) -> torch.Tensor: r"""Return the DCT-II matrix, including average coefficient. The square matrix :math:`\boldsymbol{D} \in \mathcal M_{n,n}(\mathbb R)` is defined as :math:`d_{ij} = \cos\left(\frac{\pi}{n}\left(i-1\right)\left(j-\frac{1}{2}\right)\right)`. For a given "temporal" column vector :math:`\boldsymbol{x} \in \mathcal M_{n,1}(\mathbb R)`, the "spatial" column vector :math:`\boldsymbol{\hat{x}} \in \mathcal M_{n,1}(\mathbb R)` is obtained with :math:`\boldsymbol{\hat{x}} = \boldsymbol{D}\boldsymbol{x}`. Parameters ---------- size : int The matrix size :math:`n`. dtype : torch.dtype The torch dtype of the matrix, float16, float32 or float64. Returns ------- dtc_matrix : torch.Tensor The 2d square matrix :math:`\boldsymbol{D}` of the DCT-II coefficients. Examples -------- >>> import torch >>> from cutcutcodec.core.analysis.video.complexity.dct import dct_matrix >>> dct_matrix(8, torch.float32) tensor([[ 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000], [ 0.9808, 0.8315, 0.5556, 0.1951, -0.1951, -0.5556, -0.8315, -0.9808], [ 0.9239, 0.3827, -0.3827, -0.9239, -0.9239, -0.3827, 0.3827, 0.9239], [ 0.8315, -0.1951, -0.9808, -0.5556, 0.5556, 0.9808, 0.1951, -0.8315], [ 0.7071, -0.7071, -0.7071, 0.7071, 0.7071, -0.7071, -0.7071, 0.7071], [ 0.5556, -0.9808, 0.1951, 0.8315, -0.8315, -0.1951, 0.9808, -0.5556], [ 0.3827, -0.9239, 0.9239, -0.3827, -0.3827, 0.9239, -0.9239, 0.3827], [ 0.1951, -0.5556, 0.8315, -0.9808, 0.9808, -0.8315, 0.5556, -0.1951]]) >>> _ @ torch.sin(0.5 * torch.pi * torch.arange(8))[:, None] tensor([[ 0.0000e+00], [ 1.0616e+00], [ 2.6822e-07], [ 2.1727e+00], [-2.8284e+00], [-1.4518e+00], [-2.9802e-07], [-2.1116e-01]]) >>> """ assert isinstance(size, numbers.Integral), size.__class__.__name__ assert size >= 1, size assert isinstance(dtype, torch.dtype), dtype.__class__.__name__ lin = torch.arange(size, dtype=dtype) i_lin, j_lin = torch.meshgrid(lin*(torch.pi/size), lin+0.5, indexing="ij") return torch.cos(i_lin * j_lin)
[docs] def compute_dct(tensor: torch.Tensor, dim: int) -> torch.Tensor: r"""Compute the DCT-II on the given axis. The output vector :math:`\hat x_k` is defined as :math:`\hat x_k = \sum\limits_{l=0}^{n-1} x_l \cos\left(\frac{\pi}{n}\left(l+\frac{1}{2}\right)k\right)`. It is calculated by a matrix product, computed by :py:func:`dct_matrix`. Parameters ---------- input : torch.Tensor A n-dimensional tensor of real. dim : int The axis along which the DCT is computed. The other axes are treated as batch dimensions. Returns ------- output : torch.Tensor The dimension of the input tensor. The input and output have the same size. Examples -------- >>> import torch >>> from cutcutcodec.core.analysis.video.complexity.dct import compute_dct >>> src = torch.randn((128, 16, 16)) >>> 2d_dct = compute_dct(compute_dct(src, -1), -2) # compute the 2d dct >>> """ vertical_temp = tensor.movedim(dim, -1).unsqueeze(-1) # (..., n, 1) vertical_freq = dct_matrix(tensor.shape[dim], tensor.dtype) @ vertical_temp return vertical_freq.squeeze(-1).movedim(-1, dim)
def _compute_vca_blocs(tensor: torch.Tensor, patch: int) -> torch.Tensor: # crop the image to get a multiple of patch if crop := tensor.shape[2] % patch: tensor = tensor[:, :, crop//2:crop//2-crop, :, :] if crop := tensor.shape[3] % patch: tensor = tensor[:, :, :, crop//2:crop//2-crop, :] # reorganise into shape (batch, fps, height/patch, width/patch, patch, patch) # such as tensor[:, :, k*patch:(k+1)*patch, l*patch:(l+1)*patch, 0] = tensor_patched[:, :, k, l] tensor_patched = tensor[..., 0].unfold(2, patch, patch).unfold(3, patch, patch) dct = compute_dct(compute_dct(tensor_patched, 4), 5) # shape (..., patch, patch) # apply paper formula norm = torch.arange(1, patch+1, dtype=dct.dtype) norm_i, norm_j = torch.meshgrid(norm, norm, indexing="ij") norm = torch.exp(((norm_i*norm_j)/(patch*patch))**2 - 1.0) norm[0, 0] = 0.0 # remove mean componant, keep only non 0 freq e_dct = (dct.abs() * norm).sum((4, 5)) # shape (batch, fps, height/patch, width/patch) return e_dct
[docs] @batched_frames # to shape (batch, 1, height, width, 3) def spatial_dct(img: torch.Tensor, threads: int = 0, patch: numbers.Integral = 32) -> torch.Tensor: r"""Compute the spatial dct complexity for the image. The dct spatial complexity :math:`C_{\text{dct}} \in \mathbb{R}^+` is defined as follow: .. math:: \begin{cases} C_{\text{dct}} = \frac{1}{n_{\text{blocs}}} \sum\limits_{m=1}^{n_{\text{blocs}}} H_m \\ H_m = \frac{1}{s^2} \sum\limits_{i=1}^s \sum\limits_{j=1}^s e^{\left(\frac{ij}{s^2}\right)^2-1} \left|\mathscr{D}_m(i,j)\right| \\ \mathscr{D}_m(i,j) = \begin{cases} 0 & \text{if } i + j = 2 \\ \mathscr{F}_m(i,j) & \text{otherwise} \\ \end{cases} \\ \end{cases} With :math:`\mathscr{F}_m(i,j)` the DCT-II applied to the patch :math:`m` of the image, calculated by the function :py:func:`compute_dct`. The patches cover the full image and are not overlapping. Parameters ---------- img : arraylike The Y[UV] images, of shape ([*batch], [1], height, width, [channels]). Only the Y component is used. It has to be in range [0, 1]. The image is sliced in non-overlapping squares of size :math:`s \times s`. If the height or width of the image is not a multiple of :math:`s`, edges will be cropped. threads : int, optional Defines the number of threads. The value -1 means that the function uses as many calculation threads as there are cores. The default value (0) allows the same behavior as (-1) if the function is called in the main thread, otherwise (1) to avoid nested threads. Any other positive value corresponds to the number of threads used. patch : int, default = 32 The patch size :math:`s`. It has to be >= 1. The default value of 32 is the one proposed in the VCA paper. Returns ------- spatial_dct : arraylike The :math:`C_{\text{dct}}` scalar for each image (of shape batch). Notes ----- * It comes from the paper ``A NEW ENERGY FUNCTION FOR SEGMENTATION AND COMPRESSION``. * The `VCA <https://github.com/cd-athena/VCA/tree/stable/source>`_ tool offers an optimized version of this metric. The result is close to the ``E`` column of the .csv file generated with ``ffmpeg -i video.mp4 -f yuv4mpegpipe - | vca --y4m --input stdin --no-lowpass --complexity-csv result.csv``. * This function can be called by ``cutcutcodec metric video.mp4 --spatial-dct -o result.json``. Examples -------- >>> import numpy as np >>> from cutcutcodec.core.analysis.video.complexity import spatial_dct >>> np.random.seed(0) >>> img = np.random.random((720, 1080, 3)) # It could also be a torch array list... >>> spatial_dct(img).round(2) array([1.59]) >>> """ assert isinstance(patch, numbers.Integral), patch.__class__.__name__ assert patch >= 1, patch _, _, height, width, _ = img.shape assert (height, width) >= (patch, patch), \ f"the image {img.shape} is to small for a patch of {patch}" with TorchThreads(threads): e_dct = _compute_vca_blocs(img, int(patch)) return e_dct.mean(dim=(2, 3)) / (patch*patch)
[docs] @batched_frames # to shape (batch, 2, height, width, 3) def temporal_dct(imgs: torch.Tensor, threads: int = 0, patch: numbers.Integral = 32) -> torch.Tensor: r"""Compute the temporal dct complexity between 2 images. The dct temporal complexity :math:`H_{\text{dct}} \in \mathbb{R}^+` is defined as follow: .. math:: \begin{cases} H_{\text{dct}} = \frac{1}{n_{\text{blocs}}} \sum\limits_{m=1}^{n_{\text{blocs}}} \left| H_{m,t} - H_{m,t-1} \right| \\ H_{m,t} = \frac{1}{s^2} \sum\limits_{i=1}^s \sum\limits_{j=1}^s e^{\left(\frac{ij}{s^2}\right)^2-1} \left|\mathscr{D}_{m,t}(i,j)\right| \\ \mathscr{D}_{m,t}(i,j) = \begin{cases} 0 & \text{if } i + j = 2 \\ \mathscr{F}_{m,t}(i,j) & \text{otherwise} \\ \end{cases} \\ \end{cases} With :math:`\mathscr{F}_{m,t}(i,j)` the DCT-II applied to the patch :math:`m` of the image :math:`t`, calculated by the function :py:func:`compute_dct`. The patches cover the full image and are not overlapping. Parameters ---------- imgs : arraylike The Y[UV] images, of shape ([*batch], 2, height, width, [channels]). Only the Y component is used. It has to be in range [0, 1]. threads, patch: Same as :py:func:`spatial_dct`. Returns ------- temporal_dct : arraylike The :math:`H_{dct} \in \mathbb{R}^+` scalar for each couple of image (of shape batch). Notes ----- * It is inspired by the paper ``A NEW ENERGY FUNCTION FOR SEGMENTATION AND COMPRESSION``. * The `VCA <https://github.com/cd-athena/VCA/tree/stable/source>`_ tool offers an optimized version of a similar metric. The result is close to the ``h`` column of the .csv file generated with ``ffmpeg -i video.mp4 -f yuv4mpegpipe - | vca --y4m --input stdin --no-lowpass --complexity-csv result.csv``. * This function can be called by ``cutcutcodec metric video.mp4 --temporal-dct -o result.json``. Examples -------- >>> import numpy as np >>> from cutcutcodec.core.analysis.video.complexity import temporal_dct >>> np.random.seed(0) >>> imgs = np.random.random((2, 720, 1080, 3)) # It could also be a torch array list... >>> temporal_dct(imgs).round(2) array([0.03]) >>> """ assert isinstance(patch, numbers.Integral), patch.__class__.__name__ assert patch >= 1, patch _, nbr, height, width, _ = imgs.shape assert (height, width) >= (patch, patch), \ f"the image {imgs.shape} is to small for a patch of {patch}" assert nbr == 2, f"this temporal metric requires 2 images, {imgs.shape} is wrong" with TorchThreads(threads): e_dct = _compute_vca_blocs(imgs, int(patch)) e_dct = (e_dct[:, 0, None, :, :] - e_dct[:, 1, None, :, :]).abs() return e_dct.mean(dim=(2, 3)) / (patch*patch)