Code source de cutcutcodec.core.nn.dataaug.video

#!/usr/bin/env python3

"""Video Data Augmentations."""

import numbers
import pathlib
import random
import tempfile
import typing
import uuid

import av
import torch

from cutcutcodec.core.classes.colorspace import Colorspace
from cutcutcodec.core.io.framecaster import from_yuv, to_yuv
from cutcutcodec.core.filter.video.resize import resize


[docs] class Transcoder: """Encode and Decode the video with lossly compression. Attributes ---------- encoders : list[str] The encoders list (readonly). """ def __init__( self, encoders: typing.Iterable[str] | str = None, quality: tuple[numbers.Real, numbers.Real] | numbers.Real = (0.5, 0.9), ): """Initialise a random transcoder. Parameters ---------- encoders : list[str] or str, optional The encoders used, By default ['libx264', 'libx265', 'libvx-vp9', 'libsvtav1']. Only these encoders are supported. quality : tuple[float, float] of float The qualities bounds 0 lossless, 1 worse. """ if encoders is None: encoders = ["libx264", "libx265", "libvpx-vp9", "libsvtav1"] else: if isinstance(encoders, str): encoders = [encoders] assert hasattr(encoders, "__iter__"), encoders.__class__.__name__ encoders = list(encoders) assert all(isinstance(e, str) for e in encoders), encoders assert set(encoders).issubset({"libx264", "libx265", "libvpx-vp9", "libsvtav1"}) if isinstance(quality, numbers.Real): quality = (quality, quality) else: quality = tuple(quality) assert len(quality) == 2, quality assert isinstance(quality[0], numbers.Real) and isinstance(quality[1], numbers.Real) assert 0 <= quality[0] <= 1 and 0 <= quality[1] <= 1, quality self._encoders = encoders self.quality = quality self.fromlin = ( Colorspace.from_default_working() .to_function(Colorspace.from_default_target()) ) self.tolin = ( Colorspace.from_default_target() .to_function(Colorspace.from_default_working()) ) def __call__(self, video: torch.Tensor) -> torch.Tensor: """Transcode the images. Examples -------- >>> import torch >>> from cutcutcodec.core.nn.dataaug.video import Transcoder >>> video = torch.rand(2, 3, 1080, 1920, 3*5) >>> transcoder = Transcoder("libx264") >>> transcoder(video).shape torch.Size([2, 3, 1080, 1920, 15]) >>> """ assert isinstance(video, torch.Tensor), video.__class__.__name__ assert video.ndim >= 3, video.shape assert video.shape[2] % 3 == 0, video.shape # case recursive if video.ndim > 3: return torch.cat( [self(v) for v in video.reshape(-1, *video.shape[-3:])], dim=0 ).reshape(*video.shape) # resize for even dimension, required by some encoders buff = resize(video, (2*(video.shape[0]//2), 2*(video.shape[1]//2))) # preparation file = pathlib.Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.mp4" yuv = self.fromlin(r=buff[:, :, 0::3], g=buff[:, :, 1::3], b=buff[:, :, 2::3]) # encode with av.open(file, mode="w", format="mp4") as container: stream = container.add_stream(random.choice(self._encoders), rate=30) quality = random.random() quality = self.quality[0] * quality + self.quality[1] * (1.0 - quality) quality *= { "libx264": 51.0, "libx265": 51.0, "libvpx-vp9": 63.0, "libsvtav1": 63.0 }[stream.name] quality = round(quality) stream.options = {"crf": str(quality)} stream.height, stream.width, _ = buff.shape for i in range(buff.shape[2]//3): container.mux(stream.encode( av.video.frame.VideoFrame.from_ndarray( to_yuv( torch.cat([ yuv[0][:, :, i, None], yuv[1][:, :, i, None], yuv[2][:, :, i, None] ], dim=2) .numpy(force=True), ), format="yuv444p16le", # tv range ).reformat(format="yuv420p") )) container.mux(stream.encode(None)) # flush buffer # decode with av.open(file, mode="r", format="mp4") as container: buff = torch.cat( [ torch.from_numpy( from_yuv(f.to_ndarray(channel_last=True, format="yuv444p"), True) ).to(dtype=video.dtype, device=video.device) for packet in container.demux(video=0) for f in packet.decode() ], dim=2, ) file.unlink() buff = resize(buff, video.shape, copy=False) # back to initial shape # convert colors buff[:, :, 0::3], buff[:, :, 1::3], buff[:, :, 2::3] = self.tolin( y=buff[:, :, 0::3], u=buff[:, :, 1::3], v=buff[:, :, 2::3] ) return buff @property def encoders(self) -> list[str]: """Return the encoders useds.""" return self._encoders
[docs] def interlace(video: torch.Tensor) -> torch.Tensor: """Simulate an interlaced video. Examples -------- >>> import torch >>> from cutcutcodec.core.nn.dataaug.video import interlace >>> video = torch.empty(5, 5, 12) >>> video[:, :, 0:3] = 1.0 >>> video[:, :, 3:6] = 2.0 >>> video[:, :, 6:9] = 3.0 >>> video[:, :, 9:12] = 4.0 >>> video = interlace(video) >>> video[:, :, 6] tensor([[3., 3., 3., 3., 3.], [2., 2., 2., 2., 2.], [3., 3., 3., 3., 3.], [2., 2., 2., 2., 2.], [3., 3., 3., 3., 3.]]) >>> """ assert isinstance(video, torch.Tensor), video.__class__.__name__ assert video.ndim >= 3, video.shape assert video.shape[2] % 3 == 0, video.shape interlaced = torch.empty_like(video) interlaced[..., ::2, :, :3] = video[..., ::2, :, :3] interlaced[..., 1::2, :, :3] = interlaced[..., 0:-1:2, :, :3] for i in range(1, video.shape[2]//3): if i % 2 == 0: # even interlaced[..., ::2, :, 3*i:3*i+3] = video[..., ::2, :, 3*i:3*i+3] interlaced[..., 1::2, :, 3*i:3*i+3] = video[..., 1::2, :, 3*i-3:3*i] else: # odd interlaced[..., ::2, :, 3*i:3*i+3] = video[..., ::2, :, 3*i-3:3*i] interlaced[..., 1::2, :, 3*i:3*i+3] = video[..., 1::2, :, 3*i:3*i+3] return interlaced