"""Video Data Augmentations."""
import numbers
import pathlib
import random
import tempfile
import typing
import uuid
import av
import torch
from cutcutcodec.core.classes.colorspace import Colorspace
from cutcutcodec.core.filter.video.resize import resize
from cutcutcodec.core.io.framecaster import from_yuv, to_yuv
[docs]
class Transcoder:
"""Encode and Decode the video with lossly compression.
Attributes
----------
encoders : list[str]
The encoders list (readonly).
"""
def __init__(
self,
encoders: typing.Iterable[str] | str = None,
quality: tuple[numbers.Real, numbers.Real] | numbers.Real = (0.5, 0.9),
):
"""Initialise a random transcoder.
Parameters
----------
encoders : list[str] or str, optional
The encoders used, By default ['libx264', 'libx265', 'libvx-vp9', 'libsvtav1'].
Only these encoders are supported.
quality : tuple[float, float] of float
The qualities bounds 0 lossless, 1 worse.
"""
if encoders is None:
encoders = ["libx264", "libx265", "libvpx-vp9", "libsvtav1"]
else:
if isinstance(encoders, str):
encoders = [encoders]
assert hasattr(encoders, "__iter__"), encoders.__class__.__name__
encoders = list(encoders)
assert all(isinstance(e, str) for e in encoders), encoders
assert set(encoders).issubset({"libx264", "libx265", "libvpx-vp9", "libsvtav1"})
if isinstance(quality, numbers.Real):
quality = (quality, quality)
else:
quality = tuple(quality)
assert len(quality) == 2, quality
assert isinstance(quality[0], numbers.Real) and isinstance(quality[1], numbers.Real)
assert 0 <= quality[0] <= 1 and 0 <= quality[1] <= 1, quality
self._encoders = encoders
self.quality = quality
self.fromlin = (
Colorspace.from_default_working()
.to_function(Colorspace.from_default_target())
)
self.tolin = (
Colorspace.from_default_target()
.to_function(Colorspace.from_default_working())
)
def __call__(self, video: torch.Tensor) -> torch.Tensor:
"""Transcode the images.
Examples
--------
>>> import torch
>>> from cutcutcodec.core.nn.dataaug.video import Transcoder
>>> video = torch.rand(2, 3, 1080, 1920, 3*5)
>>> transcoder = Transcoder("libx264")
>>> transcoder(video).shape
torch.Size([2, 3, 1080, 1920, 15])
>>>
"""
assert isinstance(video, torch.Tensor), video.__class__.__name__
assert video.ndim >= 3, video.shape
assert video.shape[2] % 3 == 0, video.shape
# case recursive
if video.ndim > 3:
return torch.cat(
[self(v) for v in video.reshape(-1, *video.shape[-3:])], dim=0,
).reshape(*video.shape)
# resize for even dimension, required by some encoders
buff = resize(video, (2*(video.shape[0]//2), 2*(video.shape[1]//2)))
# preparation
file = pathlib.Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.mp4"
yuv = self.fromlin(r=buff[:, :, 0::3], g=buff[:, :, 1::3], b=buff[:, :, 2::3])
# encode
with av.open(file, mode="w", format="mp4") as container:
stream = container.add_stream(random.choice(self._encoders), rate=30)
quality = random.random()
quality = self.quality[0] * quality + self.quality[1] * (1.0 - quality)
quality *= {
"libx264": 51.0, "libx265": 51.0, "libvpx-vp9": 63.0, "libsvtav1": 63.0,
}[stream.name]
quality = round(quality)
stream.options = {"crf": str(quality)}
stream.height, stream.width, _ = buff.shape
for i in range(buff.shape[2]//3):
container.mux(stream.encode(
av.video.frame.VideoFrame.from_ndarray(
to_yuv(
torch.cat([
yuv[0][:, :, i, None], yuv[1][:, :, i, None], yuv[2][:, :, i, None],
], dim=2)
.numpy(force=True),
),
format="yuv444p16le", # tv range
).reformat(format="yuv420p"),
))
container.mux(stream.encode(None)) # flush buffer
# decode
with av.open(file, mode="r", format="mp4") as container:
buff = torch.cat(
[
torch.from_numpy(
from_yuv(f.to_ndarray(channel_last=True, format="yuv444p"), True),
).to(dtype=video.dtype, device=video.device)
for packet in container.demux(video=0) for f in packet.decode()
],
dim=2,
)
file.unlink()
buff = resize(buff, video.shape, copy=False) # back to initial shape
# convert colors
buff[:, :, 0::3], buff[:, :, 1::3], buff[:, :, 2::3] = self.tolin(
y=buff[:, :, 0::3], u=buff[:, :, 1::3], v=buff[:, :, 2::3],
)
return buff
@property
def encoders(self) -> list[str]:
"""Return the encoders useds."""
return self._encoders
[docs]
def interlace(video: torch.Tensor) -> torch.Tensor:
"""Simulate an interlaced video.
Examples
--------
>>> import torch
>>> from cutcutcodec.core.nn.dataaug.video import interlace
>>> video = torch.empty(5, 5, 12)
>>> video[:, :, 0:3] = 1.0
>>> video[:, :, 3:6] = 2.0
>>> video[:, :, 6:9] = 3.0
>>> video[:, :, 9:12] = 4.0
>>> video = interlace(video)
>>> video[:, :, 6]
tensor([[3., 3., 3., 3., 3.],
[2., 2., 2., 2., 2.],
[3., 3., 3., 3., 3.],
[2., 2., 2., 2., 2.],
[3., 3., 3., 3., 3.]])
>>>
"""
assert isinstance(video, torch.Tensor), video.__class__.__name__
assert video.ndim >= 3, video.shape
assert video.shape[2] % 3 == 0, video.shape
interlaced = torch.empty_like(video)
interlaced[..., ::2, :, :3] = video[..., ::2, :, :3]
interlaced[..., 1::2, :, :3] = interlaced[..., 0:-1:2, :, :3]
for i in range(1, video.shape[2]//3):
if i % 2 == 0: # even
interlaced[..., ::2, :, 3*i:3*i+3] = video[..., ::2, :, 3*i:3*i+3]
interlaced[..., 1::2, :, 3*i:3*i+3] = video[..., 1::2, :, 3*i-3:3*i]
else: # odd
interlaced[..., ::2, :, 3*i:3*i+3] = video[..., ::2, :, 3*i-3:3*i]
interlaced[..., 1::2, :, 3*i:3*i+3] = video[..., 1::2, :, 3*i:3*i+3]
return interlaced