Source code for cutcutcodec.core.nn.dataaug.video

"""Video Data Augmentations."""

import numbers
import pathlib
import random
import tempfile
import typing
import uuid

import av
import torch

from cutcutcodec.core.classes.colorspace import Colorspace
from cutcutcodec.core.filter.video.resize import resize
from cutcutcodec.core.io.framecaster import from_yuv, to_yuv


[docs] class Transcoder: """Encode and Decode the video with lossly compression. Attributes ---------- encoders : list[str] The encoders list (readonly). """ def __init__( self, encoders: typing.Iterable[str] | str = None, quality: tuple[numbers.Real, numbers.Real] | numbers.Real = (0.5, 0.9), ): """Initialise a random transcoder. Parameters ---------- encoders : list[str] or str, optional The encoders used, By default ['libx264', 'libx265', 'libvx-vp9', 'libsvtav1']. Only these encoders are supported. quality : tuple[float, float] of float The qualities bounds 0 lossless, 1 worse. """ if encoders is None: encoders = ["libx264", "libx265", "libvpx-vp9", "libsvtav1"] else: if isinstance(encoders, str): encoders = [encoders] assert hasattr(encoders, "__iter__"), encoders.__class__.__name__ encoders = list(encoders) assert all(isinstance(e, str) for e in encoders), encoders assert set(encoders).issubset({"libx264", "libx265", "libvpx-vp9", "libsvtav1"}) if isinstance(quality, numbers.Real): quality = (quality, quality) else: quality = tuple(quality) assert len(quality) == 2, quality assert isinstance(quality[0], numbers.Real) and isinstance(quality[1], numbers.Real) assert 0 <= quality[0] <= 1 and 0 <= quality[1] <= 1, quality self._encoders = encoders self.quality = quality self.fromlin = ( Colorspace.from_default_working() .to_function(Colorspace.from_default_target()) ) self.tolin = ( Colorspace.from_default_target() .to_function(Colorspace.from_default_working()) ) def __call__(self, video: torch.Tensor) -> torch.Tensor: """Transcode the images. Examples -------- >>> import torch >>> from cutcutcodec.core.nn.dataaug.video import Transcoder >>> video = torch.rand(2, 3, 1080, 1920, 3*5) >>> transcoder = Transcoder("libx264") >>> transcoder(video).shape torch.Size([2, 3, 1080, 1920, 15]) >>> """ assert isinstance(video, torch.Tensor), video.__class__.__name__ assert video.ndim >= 3, video.shape assert video.shape[2] % 3 == 0, video.shape # case recursive if video.ndim > 3: return torch.cat( [self(v) for v in video.reshape(-1, *video.shape[-3:])], dim=0, ).reshape(*video.shape) # resize for even dimension, required by some encoders buff = resize(video, (2*(video.shape[0]//2), 2*(video.shape[1]//2))) # preparation file = pathlib.Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.mp4" yuv = self.fromlin(r=buff[:, :, 0::3], g=buff[:, :, 1::3], b=buff[:, :, 2::3]) # encode with av.open(file, mode="w", format="mp4") as container: stream = container.add_stream(random.choice(self._encoders), rate=30) quality = random.random() quality = self.quality[0] * quality + self.quality[1] * (1.0 - quality) quality *= { "libx264": 51.0, "libx265": 51.0, "libvpx-vp9": 63.0, "libsvtav1": 63.0, }[stream.name] quality = round(quality) stream.options = {"crf": str(quality)} stream.height, stream.width, _ = buff.shape for i in range(buff.shape[2]//3): container.mux(stream.encode( av.video.frame.VideoFrame.from_ndarray( to_yuv( torch.cat([ yuv[0][:, :, i, None], yuv[1][:, :, i, None], yuv[2][:, :, i, None], ], dim=2) .numpy(force=True), ), format="yuv444p16le", # tv range ).reformat(format="yuv420p"), )) container.mux(stream.encode(None)) # flush buffer # decode with av.open(file, mode="r", format="mp4") as container: buff = torch.cat( [ torch.from_numpy( from_yuv(f.to_ndarray(channel_last=True, format="yuv444p"), True), ).to(dtype=video.dtype, device=video.device) for packet in container.demux(video=0) for f in packet.decode() ], dim=2, ) file.unlink() buff = resize(buff, video.shape, copy=False) # back to initial shape # convert colors buff[:, :, 0::3], buff[:, :, 1::3], buff[:, :, 2::3] = self.tolin( y=buff[:, :, 0::3], u=buff[:, :, 1::3], v=buff[:, :, 2::3], ) return buff @property def encoders(self) -> list[str]: """Return the encoders useds.""" return self._encoders
[docs] def interlace(video: torch.Tensor) -> torch.Tensor: """Simulate an interlaced video. Examples -------- >>> import torch >>> from cutcutcodec.core.nn.dataaug.video import interlace >>> video = torch.empty(5, 5, 12) >>> video[:, :, 0:3] = 1.0 >>> video[:, :, 3:6] = 2.0 >>> video[:, :, 6:9] = 3.0 >>> video[:, :, 9:12] = 4.0 >>> video = interlace(video) >>> video[:, :, 6] tensor([[3., 3., 3., 3., 3.], [2., 2., 2., 2., 2.], [3., 3., 3., 3., 3.], [2., 2., 2., 2., 2.], [3., 3., 3., 3., 3.]]) >>> """ assert isinstance(video, torch.Tensor), video.__class__.__name__ assert video.ndim >= 3, video.shape assert video.shape[2] % 3 == 0, video.shape interlaced = torch.empty_like(video) interlaced[..., ::2, :, :3] = video[..., ::2, :, :3] interlaced[..., 1::2, :, :3] = interlaced[..., 0:-1:2, :, :3] for i in range(1, video.shape[2]//3): if i % 2 == 0: # even interlaced[..., ::2, :, 3*i:3*i+3] = video[..., ::2, :, 3*i:3*i+3] interlaced[..., 1::2, :, 3*i:3*i+3] = video[..., 1::2, :, 3*i-3:3*i] else: # odd interlaced[..., ::2, :, 3*i:3*i+3] = video[..., ::2, :, 3*i-3:3*i] interlaced[..., 1::2, :, 3*i:3*i+3] = video[..., 1::2, :, 3*i:3*i+3] return interlaced