"""Decode the streams of a multimedia file based on ffmpeg."""
import functools
import logging
import math
import numbers
import pathlib
import threading
import typing
from fractions import Fraction
import av
import numpy as np
import torch
from cutcutcodec.core.analysis.audio.properties.duration import get_duration_audio
from cutcutcodec.core.analysis.ffprobe import _estimate_rate_ffmpeg, get_streams_type
from cutcutcodec.core.classes.colorspace import Colorspace
from cutcutcodec.core.classes.container import ContainerInput
from cutcutcodec.core.classes.frame_audio import FrameAudio
from cutcutcodec.core.classes.frame_video import FrameVideo
from cutcutcodec.core.classes.layout import Layout
from cutcutcodec.core.classes.stream import Stream
from cutcutcodec.core.classes.stream_audio import StreamAudio
from cutcutcodec.core.classes.stream_video import StreamVideo
from cutcutcodec.core.colorspace.cst import FFMPEG_PRIMARIES, FFMPEG_RANGE, FFMPEG_TRC
from cutcutcodec.core.colorspace.heuristic import guess_space
from cutcutcodec.core.exceptions import (
DecodeError,
MissingInformation,
MissingStreamError,
OutOfTimeRange,
)
from cutcutcodec.core.filter.video.pad import pad_keep_ratio
from cutcutcodec.core.filter.video.resize import resize
from cutcutcodec.core.opti.cache.basic import method_cache
from .cst import IMAGE_SUFFIXES
from .framecaster import from_rgb, from_yuv
from .pix_map import PIX_MAP
class _StreamFFMPEGBase(Stream):
"""Factorise share methods between audio and video."""
def __init__(self, node: ContainerInputFFMPEG, av_idx: int):
assert isinstance(node, ContainerInputFFMPEG), node.__class__.__name__
assert isinstance(av_idx, int), av_idx.__class__.__name__
super().__init__(node)
self._av_idx = av_idx
self._av_container = None
self._av_stream = None
self._duration = None
self._frame_iter = None
self._prev_frame = self._next_frame = None
self.reset()
def _seek_backward(self, position: Fraction) -> None:
"""Move backwards in the file.
This method guarantees to move before the required position.
If this is not possible, we move to the very beginning of the file.
After, we always have ``self.get_current_range()[0] <= position``.
"""
if self.type == "audio":
dec = Fraction(self._av_stream.frame_size, self.rate)
elif self.type == "video":
dec = 1 / self.rate
else:
dec = 0
for pos in (position, position-10, 0):
stream = self._av_stream # must be define in 'for' because reset
try:
self._av_container.seek(
max(0, math.floor((pos - 2*dec) / stream.time_base)),
backward=True, # if there is not a keyframe at the given offset
stream=stream,
)
except av.error.PermissionError: # happens sometimes
self.reset()
break
self._prev_frame = self._next_frame = None # takes into account the new position
# verification and rough adjustment
try:
if self.get_current_range()[0] <= position:
break
except OutOfTimeRange: # if this exception is throw, reset is just done
continue
else:
self.reset()
def _seek_forward(self, position: Fraction) -> None:
"""Move forwardwards in the file.
The displacement, if some cases, can be very approximate.
"""
stream = self._av_stream
if stream.type == "audio":
dec = Fraction(stream.frame_size, self.rate)
elif stream.type == "video":
dec = 1 / self.rate
else:
dec = 0
self._av_container.seek(
max(0, math.floor((position - dec) / stream.time_base)),
backward=True, # if there is not a keyframe at the given offset
stream=stream,
)
self._prev_frame = self._next_frame = None # takes into account the new position
@property
def beginning(self) -> Fraction:
return Fraction(0)
@property
def frame_iter(self) -> typing.Iterable[av.frame.Frame]:
"""Allow to read the file at the last moment."""
if self._frame_iter is None:
self._frame_iter = iter(self._av_container.decode(self._av_stream))
return self._frame_iter
@property
def next_frame(self) -> None | av.frame.Frame:
"""Return the next frame if exists, None else."""
if self._next_frame is None:
self._prev_frame = self.prec_frame # iter if needed ("=" is for pylint W0104)
try:
self._next_frame = next(self.frame_iter)
except (StopIteration, av.error.EOFError, av.error.InvalidDataError):
self._next_frame = self._frame_iter = None
if self._duration is None: # facultative, it is just optimisation
t_start, t_end = frame_dates(self._prev_frame)
self._duration = t_start + Fraction(1, self.rate) if t_end is None else t_end
return self._next_frame
@property
def prec_frame(self) -> av.frame.Frame:
"""Return the frame at the current position."""
if self._prev_frame is None:
try:
self._prev_frame = next(self.frame_iter)
except (StopIteration, av.error.EOFError, av.error.InvalidDataError) as err:
self.reset()
raise OutOfTimeRange("there is no frame left to read") from err
return self._prev_frame
@property
@method_cache # optimise about 100 ms per call
def rate(self) -> Fraction:
"""Return the theorical image or sample frequency in the metadata."""
return _estimate_rate_ffmpeg(self.node.filename, self.index)
def reset(self) -> None:
"""Reload a new av environement."""
self._prev_frame = self._next_frame = None
self._frame_iter = None
if self._av_container is not None:
self._av_container.close()
self._av_container = av.open(str(self.node.filename), "r", **self.node.av_kwargs)
self._av_stream = self._av_container.streams[self._av_idx]
self._av_stream.thread_type = "AUTO"
def __del__(self):
"""Close the streams."""
if self._av_container is not None:
# calling .close() with no delay lead to segfault in some cases
# self._av_container.close()
self._av_container = None
class _StreamAudioFFMPEG(_StreamFFMPEGBase, StreamAudio):
"""Stream Audio from a file.
Attributes
----------
duration : Fraction
The exact duration of the stream (readonly).
This date corresponds to the end of the last sample.
rate : int
The frequency in Hz of the samples (readonly).
Notes
-----
Should use ``ffmpegio.audio.read(file, sample_fmt='dbl')``.
"""
def __init__(self, node: ContainerInputFFMPEG, idx: int):
_StreamFFMPEGBase.__init__(self, node, idx)
StreamAudio.__init__(self, node)
self._lock = threading.Lock()
def _snapshot(self, timestamp: Fraction, rate: int, samples: int) -> FrameAudio:
if timestamp < 0:
raise OutOfTimeRange(f"there is no audio frame at timestamp {timestamp} (need >= 0)")
# resample if needeed
if samples != 1 and rate != self.rate:
frame = self._snapshot(
timestamp,
rate=self.rate,
samples=math.floor(samples*Fraction(self.rate, rate)),
)
indexs = torch.arange(samples, dtype=torch.int64)
indexs *= self.rate
indexs //= rate
frame = FrameAudio(timestamp, rate, frame.layout, frame[:, indexs])
return frame
# decode concerned frames
frames_and_dates: list[list[np.ndarray, Fraction, Fraction]] = [] # frame, start_t, end_t
end = timestamp + Fraction(samples, rate) # apparition of last sample
with self._lock:
self.seek(timestamp)
while True:
try:
frame = self.prec_frame
except OutOfTimeRange as err:
raise OutOfTimeRange(
f"stream start {self.beginning} and end {self.beginning + self.duration}, "
f"no stream at timestamp {timestamp} to {timestamp} + {samples}/{rate}",
) from err
if frame.is_corrupt:
logging.warning("the frame at %f seconds is corrupted", frame.time)
continue
dates = frame_dates(frame)
frames_and_dates.append(
[ # the reshape is usefull only in some cases for debug in ffmpeg 4
frame.to_ndarray().reshape(-1, frame.samples, order="F"),
dates[0]-timestamp,
dates[1]-timestamp,
],
)
if end <= dates[1]:
break
self._prev_frame, self._next_frame = self.next_frame, None # iter in stream
# correct the drift
drift_max = self._av_stream.time_base
drift_max = 2 if drift_max is None else math.ceil(drift_max*rate)
frames_and_dates = _fix_drift_fill_crop(frames_and_dates, drift_max, rate, samples)
# create the final frame
return FrameAudio(
timestamp,
rate,
self.layout,
_convert_audio_samples(
np.concatenate([f for f, _, _ in frames_and_dates], axis=1)
if len(frames_and_dates) > 1 else frames_and_dates.pop()[0],
),
)
@property
def duration(self) -> Fraction | float:
"""Return the exact duration in seconds.
Examples
--------
>>> from cutcutcodec.core.io.read_ffmpeg import ContainerInputFFMPEG
>>> from cutcutcodec.utils import get_project_root
>>> audio = get_project_root() / "media" / "audio" / "narration_5_1.oga"
>>> with ContainerInputFFMPEG(audio) as container:
... (stream,) = container.out_streams
... stream.duration
...
Fraction(8, 1)
>>>
"""
if self._duration is not None:
return self._duration
# seek approximative
rel_index = len(
[
None for i, s in enumerate(self.node.out_streams)
if i < self.index and s.type == "audio"
],
)
with self._lock:
self.seek(get_duration_audio(self.node.filename, rel_index, accurate=False) - 10)
# decoding until reaching the last frame
while self.next_frame is not None:
self._prev_frame, self._next_frame = self.next_frame, None # iter in stream
# get the time of the last frame + the frame duration
self._duration = frame_dates(self._prev_frame)[1]
return self._duration
def get_current_range(self) -> tuple[Fraction, Fraction]:
"""Return the time interval cover by the current frame."""
if (next_frame := self.next_frame) is None:
return frame_dates(self.prec_frame)
return frame_dates(self.prec_frame)[0], frame_dates(next_frame)[0]
@property
def layout(self) -> Layout:
"""Return the signification of each channels in this audio stream.
Examples
--------
>>> from cutcutcodec.core.io.read_ffmpeg import ContainerInputFFMPEG
>>> from cutcutcodec.utils import get_project_root
>>> audio = get_project_root() / "media" / "audio" / "narration_5_1.oga"
>>> with ContainerInputFFMPEG(audio) as container:
... (stream,) = container.out_streams
... stream.layout
...
Layout('5.1')
>>>
"""
return Layout(self._av_stream.layout.name)
@property
def rate(self) -> int:
"""Return the theorical image or sample frequency in the metadata."""
return int(super().rate)
def seek(self, position: Fraction) -> None:
"""Move into the file until reaching the frame at this position.
If you are already well placed, this has no effect.
Allows backward even a little bit, but only jump forward if the jump is big enough.
Parameters
----------
position : fraction.Fraction
The target position such as ``self.prec_frame.time <= position < self.next_frame.time``.
This position is expressed in seconds.
Raises
------
OutOfTimeRange
If the required position is out of the definition range.
Examples
--------
>>> from fractions import Fraction
>>> from cutcutcodec.core.io.read_ffmpeg import ContainerInputFFMPEG
>>> from cutcutcodec.utils import get_project_root
>>> audio = get_project_root() / "media" / "audio" / "narration_5_1.oga"
>>> with ContainerInputFFMPEG(audio) as container:
... (stream,) = container.out_streams
... stream.seek(Fraction(7))
... stream.get_current_range()
... stream.seek(Fraction(5))
... stream.get_current_range()
...
(Fraction(872, 125), Fraction(876, 125))
(Fraction(624, 125), Fraction(628, 125))
>>>
"""
assert isinstance(position, Fraction), position.__class__.__name__
# case need to seek
if position > self.get_current_range()[1] + 10: # forward if jump more 10 seconds
self._seek_forward(position) # very approximative
if position < self.get_current_range()[0]:
self._seek_backward(position) # guaranteed to be before
# fine adjustment
while self.get_current_range()[1] <= position:
self._prev_frame, self._next_frame = self.next_frame, None # iter in stream
class _StreamVideoFFMPEG(_StreamFFMPEGBase, StreamVideo):
"""Stream Video from a file.
Attributes
----------
height : int
The dimension i (vertical) of the encoded frames in pxl (readonly).
duration : Fraction
The exact duration of the complete stream (readonly).
the time include the duration of the last frame.
width : int
The dimension j (horizontal) of the encoded frames in pxl (readonly).
"""
def __init__(self, node: ContainerInputFFMPEG, idx: int):
_StreamFFMPEGBase.__init__(self, node, idx)
StreamVideo.__init__(self, node)
self._key_times = None
self._lock = threading.Lock()
def _snapshot(self, timestamp: Fraction, mask: torch.Tensor) -> torch.Tensor:
if timestamp < 0:
raise OutOfTimeRange(f"there is no audio frame at timestamp {timestamp} (need >= 0)")
# decode the frame and convert into numpy array
with self._lock:
self.seek(timestamp) # adjust position
frame_av = self.prec_frame
pix_fmt = PIX_MAP.get(frame_av.format.name, "gbrapf32le")
# args given to https://github.com/PyAV-Org/PyAV/blob/main/av/video/reformatter.pxd
# and https://github.com/PyAV-Org/PyAV/blob/main/av/video/reformatter.pyx
# the options dst_colorspace, dst_color_range do not work in pyav 14.0.1
# zscale is slow to init, api is not well compatible with pyav, it fails in threads
# resize and cast are done out of the thread lock to improve multithreading perfs
# we must to give color_range: https://github.com/PyAV-Org/PyAV/issues/1431
frame_np = frame_av.to_ndarray(
channel_last=True,
format=pix_fmt,
# src_color_range=self._av_stream.codec_context.color_range,
# dst_color_range=self._av_stream.codec_context.color_range,
)
# add 1 leading channel to grayscale frame (h x w -> h x w x 1)
# shift bit because libav conversion is only bit shift
# cast into float32
# convert limited range to full range (based on UIT-R)
frame_np = from_yuv(
frame_np,
FFMPEG_RANGE[self._av_stream.codec_context.color_range] in {"tv", None},
) if ("yuv" in pix_fmt or "gray" in pix_fmt) else from_rgb(
frame_np,
FFMPEG_RANGE[self._av_stream.codec_context.color_range] == "tv",
)
# resize and keep proportion, self.height and self.width included non square pixel
dw_sh, dh_sw = mask.shape[1] * self.height, mask.shape[0] * self.width
if dw_sh == dh_sw: # if the proportion is the same
height, width = mask.shape
elif dw_sh > dh_sw: # need horizontal padding
height, width = (mask.shape[0], round(dh_sw/self.height)) # keep height unchanged
else:
height, width = (round(dw_sh/self.width), mask.shape[1]) # keep width unchanged
# reshape is allways required for non-squared pixels
frame_np = resize(frame_np, (height, width), copy=False)
# padding for keeping same aspect ratio
frame_np = pad_keep_ratio(frame_np, mask.shape, copy=False)
# convert in cutcutcodec video frame
return FrameVideo(frame_dates(frame_av)[0], frame_np)
@functools.cached_property
def colorspace(self) -> Colorspace:
"""Return the most probable color space of the stream.
Returns
-------
colorspace : Colorspace
* space : str
The encoding space ``y'pbpr`` or ``r'g'b'``.
* primaries : str
One of FFMPEG_PRIMARIES. If unspecified, it is determined using an heuristics.
* transfer : str
One of FFMPEG_TRC. If unspecified, it is determined using an heuristics.
Examples
--------
>>> from cutcutcodec.core.io.read_ffmpeg import ContainerInputFFMPEG
>>> from cutcutcodec.utils import get_project_root
>>> video = get_project_root() / "media" / "video" / "intro.webm"
>>> with ContainerInputFFMPEG(video) as container:
... stream = container.out_select("video")[0]
... stream.colorspace
...
Colorspace("y'pbpr", 'bt709', 'bt1361e, bt1361')
>>>
"""
# pylint: disable=W0212
av_stream = self._av_stream
pix = PIX_MAP[av_stream.codec_context.format.name]
space = "y'pbpr" if "yuv" in pix or "gray" in pix else "r'g'b'" # space is Y'CbCr or Y'00
primaries, transfer = guess_space(self.height, self.width, self.node.filename.suffix)
primaries = FFMPEG_PRIMARIES[av_stream.codec_context.color_primaries] or primaries
transfer = FFMPEG_TRC[av_stream.codec_context.color_trc] or transfer
return Colorspace(space, primaries, transfer)
@property
def duration(self) -> Fraction | float:
"""Return the exact duration in seconds.
Examples
--------
>>> from cutcutcodec.core.io.read_ffmpeg import ContainerInputFFMPEG
>>> from cutcutcodec.utils import get_project_root
>>> video = get_project_root() / "media" / "video" / "intro.webm"
>>> with ContainerInputFFMPEG(video) as container:
... stream = container.out_select("video")[0]
... stream.duration
...
Fraction(294281, 30000)
>>>
"""
if self._duration is not None:
return self._duration
if self.node.filename.suffix.lower() in IMAGE_SUFFIXES:
self._duration = math.inf
return self._duration
with self._lock:
# jump if we can
key_times = self.get_key_times()
key_time = key_times[-2] if len(key_times) >= 2 else Fraction(0)
self.seek(key_time) # sometimes self.reset() corrects the bug
# decoding until reaching the last frame
while self.next_frame is not None:
self._prev_frame, self._next_frame = self.next_frame, None # iter in stream
# get the time of the last frame + the frame duration
self._duration = frame_dates(self._prev_frame)[0] + 1/self.rate
return self._duration
def get_key_times(self) -> np.ndarray[Fraction]:
"""Allow to read the file at the last moment.
Returns
-------
key_times : np.ndarray[Fraction]
The display time of the Intra frames, sorted in ascending order.
Examples
--------
>>> from cutcutcodec.core.io.read_ffmpeg import ContainerInputFFMPEG
>>> from cutcutcodec.utils import get_project_root
>>> video = get_project_root() / "media" / "video" / "intro.webm"
>>> with ContainerInputFFMPEG(video) as container:
... stream = container.out_select("video")[0]
... stream.get_key_times()
...
array([Fraction(0, 1), Fraction(4271, 1000), Fraction(4271, 500)],
dtype=object)
>>>
"""
if self._key_times is None:
try:
self._key_times = np.fromiter(
(
frame_dates(frame)[0] for frame in _extract_key_frames(
self._av_stream,
)
),
dtype=object,
)
except MissingInformation as err:
raise MissingInformation("the timestamp is not known for all keyframes") from err
if len(self._key_times) == 0:
self._key_times = np.array([Fraction(0)], dtype=object)
else:
self._key_times.sort()
return self._key_times
def get_current_range(self) -> tuple[Fraction, Fraction]:
"""Return the time interval cover by the current frame."""
start_time = frame_dates(self.prec_frame)[0]
if (next_frame := self.next_frame) is None:
return start_time, start_time + 1/self.rate
return start_time, frame_dates(next_frame)[0]
@property
def has_alpha(self) -> bool:
"""Return True if the stream has alpha layer."""
pix = PIX_MAP[self._av_stream.codec_context.format.name]
return len(av.video.format.VideoFormat(pix).components) in {2, 4}
@functools.cached_property
def height(self) -> int:
"""Return the vertical size of the native frame with square pxl."""
if (ratio := Fraction(self._av_stream.sample_aspect_ratio or 1)) < 1:
return int(self._av_stream.height / ratio)
return self._av_stream.height
def seek(self, position: Fraction) -> None:
"""Move into the file until reaching the frame at this accurate position.
If you are already well placed, this has no effect.
Allows backward even a little bit, but only jump forward if the jump is big enough.
Parameters
----------
position : fraction.Fraction
The target position such as ``self.prec_frame.time <= position < self.next_frame.time``.
This position is expressed in seconds.
Raises
------
OutOfTimeRange
If the required position is out of the definition range.
Examples
--------
>>> from fractions import Fraction
>>> from cutcutcodec.core.io.read_ffmpeg import ContainerInputFFMPEG
>>> from cutcutcodec.utils import get_project_root
>>> video = get_project_root() / "media" / "video" / "intro.webm"
>>> with ContainerInputFFMPEG(video) as container:
... stream = container.out_select("video")[0]
... stream.seek(Fraction(8))
... stream.get_current_range()
... stream.seek(Fraction(2))
... stream.get_current_range()
...
(Fraction(319, 40), Fraction(1001, 125))
(Fraction(1969, 1000), Fraction(1001, 500))
>>>
"""
assert isinstance(position, Fraction), position.__class__.__name__
# case need to seek
if position > self.get_current_range()[1] + 100/self.rate: # if jump more than 100 frames
self._seek_forward(position) # very approximative
if position < self.get_current_range()[0]:
self._seek_backward(position) # guaranteed to be before
# fine adjustment
while position >= self.get_current_range()[1]:
self._prev_frame, self._next_frame = self.next_frame, None # iter in stream
# # check asked seek position is not bigger than the duration
# if position > self.get_current_range()[0]:
# raise OutOfTimeRange(
# f"stream start {self.beginning} and end {self.beginning + self.duration}, "
# f"no frame at timestamp {position}"
# )
@functools.cached_property
def width(self) -> int:
"""Return the horizontal size of the native frame with square pxl."""
if (ratio := Fraction(self._av_stream.sample_aspect_ratio or 1)) > 1:
return int(self._av_stream.width * ratio)
return self._av_stream.width
def _convert_audio_samples(audio_samples: np.ndarray[numbers.Real]) -> torch.Tensor:
"""Convert sound samples into float between -1 and 1.
Minimizes copying and reallocations.
The values are not clamped.
Examples
--------
>>> import numpy as np
>>> from cutcutcodec.core.io.read_ffmpeg import _convert_audio_samples
>>> _convert_audio_samples(np.array([-1.5, -1.0, -.5, .5, 1.0, 1.5], dtype=np.float64))
tensor([-1.5000, -1.0000, -0.5000, 0.5000, 1.0000, 1.5000],
dtype=torch.float64)
>>> _convert_audio_samples(np.array([-1.5, -1.0, -.5, .5, 1.0, 1.5], dtype=np.float32))
tensor([-1.5000, -1.0000, -0.5000, 0.5000, 1.0000, 1.5000])
>>> _convert_audio_samples(np.array([-1.5, -1.0, -.5, .5, 1.0, 1.5], dtype=np.float16))
tensor([-1.5000, -1.0000, -0.5000, 0.5000, 1.0000, 1.5000],
dtype=torch.float16)
>>> _convert_audio_samples(
... np.array([-2147483648, -1073741824, 1073741824, 2147483647], dtype=np.int32)
... )
tensor([-1.0000, -0.5000, 0.5000, 1.0000], dtype=torch.float64)
>>> _convert_audio_samples(np.array([-32768, -16384, 16384, 32767], dtype=np.int16))
tensor([-1.0000, -0.5000, 0.5000, 1.0000], dtype=torch.float64)
>>> _convert_audio_samples(np.array([0, 64, 192, 255], dtype=np.uint8))
tensor([-1.0000, -0.4980, 0.5059, 1.0000], dtype=torch.float64)
>>>
"""
assert isinstance(audio_samples, np.ndarray), audio_samples.__class__.__name__
audio_samples = torch.from_numpy(audio_samples)
if not audio_samples.dtype.is_floating_point:
iinfo = torch.iinfo(audio_samples.dtype)
audio_samples = audio_samples.to(torch.float64)
audio_samples -= .5*float(iinfo.min + iinfo.max)
audio_samples /= .5*float(iinfo.max - iinfo.min)
return audio_samples
def _extract_key_frames(av_stream: av.video.stream.VideoStream):
"""Extract the list of key frames.
Examples
--------
>>> import av
>>> from cutcutcodec.core.io.read_ffmpeg import _extract_key_frames
>>> from cutcutcodec.utils import get_project_root
>>> video = get_project_root() / "media" / "video" / "intro.webm"
>>> with av.open(video) as av_container:
... key_frames = list(_extract_key_frames(av_container.streams.video[0]))
...
>>> sorted(f.time for f in key_frames)
[0.0, 4.271, 8.542]
>>>
"""
assert isinstance(av_stream, av.video.stream.VideoStream), av_stream.__class__.__name__
av_stream.container.seek(0, backward=True, any_frame=False, stream=av_stream)
if av_stream.codec_context.codec.name != "libdav1d": # it fails with this codec
av_stream.codec_context.skip_frame = "NONINTRA"
for frame in av_stream.container.decode(av_stream):
if frame.pict_type == av.video.frame.PictureType.I:
yield frame
av_stream.container.seek(0, backward=True, any_frame=False, stream=av_stream)
av_stream.codec_context.skip_frame = "DEFAULT"
def _fix_drift_fill_crop(
frames_and_dates: list[list[np.ndarray, Fraction, Fraction]],
drift_max: int,
rate: int,
samples: int,
) -> list[list[np.ndarray, Fraction, Fraction]]:
"""Slightly shift the audio frames, fill the gap and crop the end.
After this function, it is possible to concatenate the frames.
The gap are filled with the 'nan' value.
Parameters
----------
frames_and_dates : list[list[np.ndarray, Fraction, Fraction]]
The frame data as a numpy array of shape (channels, samples).
The frames are considered to be all of the same dtype and same
number of channels.
The relative date of start and end of each frames.
The first date of the first sample has to be close to 0.
All the frames are assumed to be in a monotonic order.
drift_max : int
The maximum authorized translation (number of samples).
rate : int
The samplerate.
samples : int
The final index. In the case where there is a hole at the beginning or at the very end,
this makes it possible to translate the entirety of the frames by a maximum value of
`drift_max`/2 in order to fill the hole.
The last frame can be cropped to reach the exact number of requiered samples.
The gaps can be filled to reach the exact number of requiered samples.
Returns
-------
new_frames_and_dates
Same as input but with somme correction and contiguous frames.
Examples
--------
>>> from fractions import Fraction
>>> from pprint import pprint
>>> import numpy as np
>>> from cutcutcodec.core.io.read_ffmpeg import _fix_drift_fill_crop
>>> frames_and_dates = [
... [np.zeros((2, 10)), Fraction(1), Fraction(6)], # start after 0 -> padding
... [np.ones((2, 10)), Fraction(5), Fraction(10)], # light overlap -> shift
... [np.zeros((2, 10)), Fraction(15), Fraction(20)], # big gap -> padding
... [np.ones((2, 10)), Fraction(21), Fraction(26)], # light gap -> shift
... [np.zeros((2, 10)), Fraction(23), Fraction(28)], # big overlap -> crop
... ]
>>> pprint(_fix_drift_fill_crop(frames_and_dates, drift_max=2, rate=2, samples=60))
[(array([[nan, nan],
[nan, nan]]), Fraction(0, 1), Fraction(1, 1)),
[array([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]]),
Fraction(1, 1),
Fraction(6, 1)],
[array([[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]]),
Fraction(6, 1),
Fraction(11, 1)],
(array([[nan, nan, nan, nan, nan, nan, nan, nan],
[nan, nan, nan, nan, nan, nan, nan, nan]]),
Fraction(11, 1),
Fraction(15, 1)),
[array([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]]),
Fraction(15, 1),
Fraction(20, 1)],
[array([[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]]),
Fraction(20, 1),
Fraction(25, 1)],
[array([[0., 0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0., 0.]]),
Fraction(25, 1),
Fraction(28, 1)],
(array([[nan, nan, nan, nan],
[nan, nan, nan, nan]]),
Fraction(28, 1),
Fraction(30, 1))]
>>>
"""
dtype = frames_and_dates[0][0].dtype.type
nb_channels = frames_and_dates[0][0].shape[0]
# drift each slices for perfect concatenation
for index in range(1, len(frames_and_dates)):
drift = frames_and_dates[index][1] - frames_and_dates[index-1][2]
if rate * abs(drift) <= drift_max:
frames_and_dates[index][1] -= drift
frames_and_dates[index][2] -= drift
# fill the gaps or crop the overlaps
frames_and_dates.insert(0, [None, None, Fraction(0)]) # fake reference frame
new_frames_and_dates = []
for index in range(1, len(frames_and_dates)):
drift = frames_and_dates[index][1] - frames_and_dates[index-1][2]
if drift > 0: # case padding
logging.warning("audio frame drift of %f seconds detected", float(drift))
nb_samples = round(drift * rate)
new_frames_and_dates.append((
np.full((nb_channels, nb_samples), np.nan, dtype=dtype),
frames_and_dates[index][1] - Fraction(nb_samples, rate),
frames_and_dates[index][1],
))
elif drift < 0: # case overlap
logging.warning("audio frame overlap of %f seconds detected", float(-drift))
nb_samples = round(-drift * rate)
frames_and_dates[index][0] = frames_and_dates[index][0][:, nb_samples:]
frames_and_dates[index][1] += Fraction(nb_samples, rate)
new_frames_and_dates.append(frames_and_dates[index])
frames_and_dates = new_frames_and_dates
# reach the exact number of samples
nb_missing_samples = samples - sum(f.shape[1] for f, _, _ in frames_and_dates)
if nb_missing_samples > 0: # case padding at the last end
frames_and_dates.append((
np.full((nb_channels, nb_missing_samples), np.nan, dtype=dtype),
frames_and_dates[-1][2],
frames_and_dates[-1][2] + Fraction(nb_missing_samples, rate),
))
elif nb_missing_samples < 0: # case croping at the last end
nb_samples = frames_and_dates[-1][0].shape[1] + nb_missing_samples
frames_and_dates[-1][0] = frames_and_dates[-1][0][:, :nb_samples]
frames_and_dates[-1][2] += Fraction(nb_missing_samples, rate)
return frames_and_dates
[docs]
def frame_dates(frame: av.frame.Frame) -> tuple[Fraction, None | Fraction]:
"""Return the accurate time interval of the given frame.
Parameters
----------
frame : av.frame.Frame
The audio or video frame witch we extract the timing information.
Returns
-------
t_start : Fraction
The display time of the frame. for audio frame, it corressponds to
the time of the first sample.
t_end : Fraction or None
For audio frame only, the time to switch off the last sample. Return None for video frame.
Examples
--------
>>> import av
>>> from cutcutcodec.core.io.read_ffmpeg import frame_dates
>>> from cutcutcodec.utils import get_project_root
>>> video = get_project_root() / "media" / "video" / "intro.webm"
>>> with av.open(video) as av_container:
... frame_dates(next(av_container.decode(av_container.streams.video[0])))
... frame_dates(next(av_container.decode(av_container.streams.video[0])))
...
(Fraction(0, 1), None)
(Fraction(33, 1000), None)
>>> audio = get_project_root() / "media" / "audio" / "narration_5_1.oga"
>>> with av.open(audio) as av_container:
... frame_dates(next(av_container.decode(av_container.streams.audio[0])))
... frame_dates(next(av_container.decode(av_container.streams.audio[0])))
...
(Fraction(0, 1), Fraction(4, 125))
(Fraction(4, 125), Fraction(8, 125))
>>>
Notes
-----
For audio frame, include the duration of the last sample.
For video frame, the duration of the frame is unknown.
"""
assert isinstance(frame, av.frame.Frame), frame.__class__.__name__
if (time_base := frame.time_base) is None:
start_time = Fraction(frame.time)
elif (pts := frame.pts) is not None:
start_time = pts * time_base
elif (dts := frame.dts) is not None:
start_time = dts * time_base
else:
raise MissingInformation(f"unable to catch the time of the frame {frame}")
if isinstance(frame, av.audio.frame.AudioFrame):
stop_time = start_time + Fraction(frame.samples, frame.rate)
return start_time, stop_time
return start_time, None