"""Compute a differenciable batched torch spatial dtc complexity."""
import functools
import numbers
import torch
from cutcutcodec.core.opti.parallel.threading import TorchThreads
from .utils import batched_frames
[docs]
@functools.cache
def dct_matrix(size: numbers.Integral, dtype: torch.dtype) -> torch.Tensor:
r"""Return the DCT-II matrix, including average coefficient.
The square matrix :math:`\boldsymbol{D} \in \mathcal M_{n,n}(\mathbb R)` is defined as
:math:`d_{ij} = \cos\left(\frac{\pi}{n}\left(i-1\right)\left(j-\frac{1}{2}\right)\right)`.
For a given "temporal" column vector :math:`\boldsymbol{x} \in \mathcal M_{n,1}(\mathbb R)`,
the "spatial" column vector :math:`\boldsymbol{\hat{x}} \in \mathcal M_{n,1}(\mathbb R)`
is obtained with :math:`\boldsymbol{\hat{x}} = \boldsymbol{D}\boldsymbol{x}`.
Parameters
----------
size : int
The matrix size :math:`n`.
dtype : torch.dtype
The torch dtype of the matrix, float16, float32 or float64.
Returns
-------
dtc_matrix : torch.Tensor
The 2d square matrix :math:`\boldsymbol{D}` of the DCT-II coefficients.
Examples
--------
>>> import torch
>>> from cutcutcodec.core.analysis.video.complexity.dct import dct_matrix
>>> dct_matrix(8, torch.float32)
tensor([[ 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000],
[ 0.9808, 0.8315, 0.5556, 0.1951, -0.1951, -0.5556, -0.8315, -0.9808],
[ 0.9239, 0.3827, -0.3827, -0.9239, -0.9239, -0.3827, 0.3827, 0.9239],
[ 0.8315, -0.1951, -0.9808, -0.5556, 0.5556, 0.9808, 0.1951, -0.8315],
[ 0.7071, -0.7071, -0.7071, 0.7071, 0.7071, -0.7071, -0.7071, 0.7071],
[ 0.5556, -0.9808, 0.1951, 0.8315, -0.8315, -0.1951, 0.9808, -0.5556],
[ 0.3827, -0.9239, 0.9239, -0.3827, -0.3827, 0.9239, -0.9239, 0.3827],
[ 0.1951, -0.5556, 0.8315, -0.9808, 0.9808, -0.8315, 0.5556, -0.1951]])
>>> _ @ torch.sin(0.5 * torch.pi * torch.arange(8))[:, None]
tensor([[ 0.0000e+00],
[ 1.0616e+00],
[ 2.6822e-07],
[ 2.1727e+00],
[-2.8284e+00],
[-1.4518e+00],
[-2.9802e-07],
[-2.1116e-01]])
>>>
"""
assert isinstance(size, numbers.Integral), size.__class__.__name__
assert size >= 1, size
assert isinstance(dtype, torch.dtype), dtype.__class__.__name__
lin = torch.arange(size, dtype=dtype)
i_lin, j_lin = torch.meshgrid(lin*(torch.pi/size), lin+0.5, indexing="ij")
return torch.cos(i_lin * j_lin)
[docs]
def compute_dct(tensor: torch.Tensor, dim: int) -> torch.Tensor:
r"""Compute the DCT-II on the given axis.
The output vector :math:`\hat x_k` is defined as
:math:`\hat x_k = \sum\limits_{l=0}^{n-1} x_l \cos\left(\frac{\pi}{n}\left(l+\frac{1}{2}\right)k\right)`.
It is calculated by a matrix product, computed by :py:func:`dct_matrix`.
Parameters
----------
input : torch.Tensor
A n-dimensional tensor of real.
dim : int
The axis along which the DCT is computed. The other axes are treated as batch dimensions.
Returns
-------
output : torch.Tensor
The dimension of the input tensor. The input and output have the same size.
Examples
--------
>>> import torch
>>> from cutcutcodec.core.analysis.video.complexity.dct import compute_dct
>>> src = torch.randn((128, 16, 16))
>>> 2d_dct = compute_dct(compute_dct(src, -1), -2) # compute the 2d dct
>>>
"""
vertical_temp = tensor.movedim(dim, -1).unsqueeze(-1) # (..., n, 1)
vertical_freq = dct_matrix(tensor.shape[dim], tensor.dtype) @ vertical_temp
return vertical_freq.squeeze(-1).movedim(-1, dim)
def _compute_vca_blocs(tensor: torch.Tensor, patch: int) -> torch.Tensor:
# crop the image to get a multiple of patch
if crop := tensor.shape[2] % patch:
tensor = tensor[:, :, crop//2:crop//2-crop, :, :]
if crop := tensor.shape[3] % patch:
tensor = tensor[:, :, :, crop//2:crop//2-crop, :]
# reorganise into shape (batch, fps, height/patch, width/patch, patch, patch)
# such as tensor[:, :, k*patch:(k+1)*patch, l*patch:(l+1)*patch, 0] = tensor_patched[:, :, k, l]
tensor_patched = tensor[..., 0].unfold(2, patch, patch).unfold(3, patch, patch)
dct = compute_dct(compute_dct(tensor_patched, 4), 5) # shape (..., patch, patch)
# apply paper formula
norm = torch.arange(1, patch+1, dtype=dct.dtype)
norm_i, norm_j = torch.meshgrid(norm, norm, indexing="ij")
norm = torch.exp(((norm_i*norm_j)/(patch*patch))**2 - 1.0)
norm[0, 0] = 0.0 # remove mean componant, keep only non 0 freq
e_dct = (dct.abs() * norm).sum((4, 5)) # shape (batch, fps, height/patch, width/patch)
return e_dct
[docs]
@batched_frames # to shape (batch, 1, height, width, 3)
def spatial_dct(img: torch.Tensor, threads: int = 0, patch: numbers.Integral = 32) -> torch.Tensor:
r"""Compute the spatial dct complexity for the image.
The dct spatial complexity :math:`C_{\text{dct}} \in \mathbb{R}^+` is defined as follow:
.. math::
\begin{cases}
C_{\text{dct}} = \frac{1}{n_{\text{blocs}}} \sum\limits_{m=1}^{n_{\text{blocs}}} H_m \\
H_m = \frac{1}{s^2} \sum\limits_{i=1}^s \sum\limits_{j=1}^s
e^{\left(\frac{ij}{s^2}\right)^2-1} \left|\mathscr{D}_m(i,j)\right| \\
\mathscr{D}_m(i,j) =
\begin{cases}
0 & \text{if } i + j = 2 \\
\mathscr{F}_m(i,j) & \text{otherwise} \\
\end{cases} \\
\end{cases}
With :math:`\mathscr{F}_m(i,j)` the DCT-II applied to the patch :math:`m` of the image,
calculated by the function :py:func:`compute_dct`.
The patches cover the full image and are not overlapping.
Parameters
----------
img : arraylike
The Y[UV] images, of shape ([*batch], [1], height, width, [channels]).
Only the Y component is used. It has to be in range [0, 1].
The image is sliced in non-overlapping squares of size :math:`s \times s`.
If the height or width of the image is not a multiple of :math:`s`, edges will be cropped.
threads : int, optional
Defines the number of threads.
The value -1 means that the function uses as many calculation threads as there are cores.
The default value (0) allows the same behavior as (-1) if the function
is called in the main thread, otherwise (1) to avoid nested threads.
Any other positive value corresponds to the number of threads used.
patch : int, default = 32
The patch size :math:`s`. It has to be >= 1.
The default value of 32 is the one proposed in the VCA paper.
Returns
-------
spatial_dct : arraylike
The :math:`C_{\text{dct}}` scalar for each image (of shape batch).
Notes
-----
* It comes from the paper ``A NEW ENERGY FUNCTION FOR SEGMENTATION AND COMPRESSION``.
* The `VCA <https://github.com/cd-athena/VCA/tree/stable/source>`_ tool
offers an optimized version of this metric. The result is close to the ``E`` column
of the .csv file generated with ``ffmpeg -i video.mp4 -f yuv4mpegpipe -
| vca --y4m --input stdin --no-lowpass --complexity-csv result.csv``.
* This function can be called by ``cutcutcodec metric video.mp4 --spatial-dct -o result.json``.
Examples
--------
>>> import numpy as np
>>> from cutcutcodec.core.analysis.video.complexity import spatial_dct
>>> np.random.seed(0)
>>> img = np.random.random((720, 1080, 3)) # It could also be a torch array list...
>>> spatial_dct(img).round(2)
array([1.59])
>>>
"""
assert isinstance(patch, numbers.Integral), patch.__class__.__name__
assert patch >= 1, patch
_, _, height, width, _ = img.shape
assert (height, width) >= (patch, patch), \
f"the image {img.shape} is to small for a patch of {patch}"
with TorchThreads(threads):
e_dct = _compute_vca_blocs(img, int(patch))
return e_dct.mean(dim=(2, 3)) / (patch*patch)
[docs]
@batched_frames # to shape (batch, 2, height, width, 3)
def temporal_dct(imgs: torch.Tensor, threads: int = 0, patch: numbers.Integral = 32) -> torch.Tensor:
r"""Compute the temporal dct complexity between 2 images.
The dct temporal complexity :math:`H_{\text{dct}} \in \mathbb{R}^+` is defined as follow:
.. math::
\begin{cases}
H_{\text{dct}} = \frac{1}{n_{\text{blocs}}} \sum\limits_{m=1}^{n_{\text{blocs}}}
\left| H_{m,t} - H_{m,t-1} \right| \\
H_{m,t} = \frac{1}{s^2} \sum\limits_{i=1}^s \sum\limits_{j=1}^s
e^{\left(\frac{ij}{s^2}\right)^2-1} \left|\mathscr{D}_{m,t}(i,j)\right| \\
\mathscr{D}_{m,t}(i,j) =
\begin{cases}
0 & \text{if } i + j = 2 \\
\mathscr{F}_{m,t}(i,j) & \text{otherwise} \\
\end{cases} \\
\end{cases}
With :math:`\mathscr{F}_{m,t}(i,j)` the DCT-II applied to the patch :math:`m`
of the image :math:`t`, calculated by the function :py:func:`compute_dct`.
The patches cover the full image and are not overlapping.
Parameters
----------
imgs : arraylike
The Y[UV] images, of shape ([*batch], 2, height, width, [channels]).
Only the Y component is used. It has to be in range [0, 1].
threads, patch:
Same as :py:func:`spatial_dct`.
Returns
-------
temporal_dct : arraylike
The :math:`H_{dct} \in \mathbb{R}^+` scalar for each couple of image (of shape batch).
Notes
-----
* It is inspired by the paper ``A NEW ENERGY FUNCTION FOR SEGMENTATION AND COMPRESSION``.
* The `VCA <https://github.com/cd-athena/VCA/tree/stable/source>`_ tool
offers an optimized version of a similar metric. The result is close to the ``h`` column
of the .csv file generated with ``ffmpeg -i video.mp4 -f yuv4mpegpipe -
| vca --y4m --input stdin --no-lowpass --complexity-csv result.csv``.
* This function can be called by ``cutcutcodec metric video.mp4 --temporal-dct -o result.json``.
Examples
--------
>>> import numpy as np
>>> from cutcutcodec.core.analysis.video.complexity import temporal_dct
>>> np.random.seed(0)
>>> imgs = np.random.random((2, 720, 1080, 3)) # It could also be a torch array list...
>>> temporal_dct(imgs).round(2)
array([0.03])
>>>
"""
assert isinstance(patch, numbers.Integral), patch.__class__.__name__
assert patch >= 1, patch
_, nbr, height, width, _ = imgs.shape
assert (height, width) >= (patch, patch), \
f"the image {imgs.shape} is to small for a patch of {patch}"
assert nbr == 2, f"this temporal metric requires 2 images, {imgs.shape} is wrong"
with TorchThreads(threads):
e_dct = _compute_vca_blocs(imgs, int(patch))
e_dct = (e_dct[:, 0, None, :, :] - e_dct[:, 1, None, :, :]).abs()
return e_dct.mean(dim=(2, 3)) / (patch*patch)