"""Extract the properties of different streams of a multimedia file."""
import functools
import json
import numbers
import pathlib
import re
import subprocess
from fractions import Fraction
import numpy as np
import tqdm
from cutcutcodec.core.exceptions import MissingInformation, MissingStreamError
def _decode_duration_ffmpeg(filename: str, index: int, accurate: bool) -> Fraction:
"""Extract the duration by the complete decoding of the stream.
Slow but 100% accurate method. The duration of the last frame is taken in consideration.
Equivalent to:
``ffmpeg -thread 0 -loglevel quiet -stats -i file -map 0:v:0 -c:v rawvideo -f null /dev/null``
Examples
--------
>>> from cutcutcodec.core.analysis.ffprobe import _decode_duration_ffmpeg
>>> from cutcutcodec.utils import get_project_root
>>> audio = str(get_project_root() / "media" / "audio" / "narration_5_1.oga")
>>> _decode_duration_ffmpeg(audio, 0, accurate=True)
Fraction(8, 1)
>>> video = str(get_project_root() / "media" / "video" / "intro.webm")
>>> _decode_duration_ffmpeg(video, 0, accurate=True)
Fraction(9809, 1000)
>>>
"""
# get context
if not (data := get_metadata(filename).get("streams", [])) or index >= len(data):
raise MissingStreamError(f"only {len(data)} streams in '{filename}', no {index}")
data = data[index]
time_base = Fraction(data["time_base"])
headers, infos = get_slices_metadata(filename, slice_type=("frame" if accurate else "packet"))
last = dict(zip(headers[index], infos[index][-1, :]))
del headers, infos
# extract duration
for time_key in ("best_effort_timestamp", "pts", "pkt_pts", "dts", "pkt_dts"):
if (time := last.get(time_key, "N/A")) != "N/A":
time = int(time) * time_base
break
else: # if time not found
for time_key in (
"pts_time", "pkt_pts_time", "dts_time", "pkt_dts_time", "best_effort_timestamp_time",
):
if (time := last.get(time_key, "N/A")) != "N/A":
time = Fraction(time)
break
else:
raise MissingInformation(
"impossible to extract time of the last packet "
f"of the stream {index} of '{filename}'",
)
duration = 0
for duration_key in ("duration", "pkt_duration"):
if (duration := last.get(duration_key, "N/A")) != "N/A":
duration = int(duration) * time_base
break
else: # if duration not found
for duration_key in ("duration_time", "pkt_duration_time"):
if (duration := last.get(duration_key, "N/A")) != "N/A":
duration = Fraction(duration)
break
return time + duration
def _decode_timestamps_ffmpeg(
filename: str, index: int,
) -> np.ndarray[None | Fraction]:
"""Retrieve the exact position of the frames in a stream.
Examples
--------
>>> from cutcutcodec.core.analysis.ffprobe import _decode_timestamps_ffmpeg
>>> from cutcutcodec.utils import get_project_root
>>> audio = str(get_project_root() / "media" / "audio" / "narration_5_1.oga")
>>> _decode_timestamps_ffmpeg(audio, 0) # doctest: +ELLIPSIS
array([Fraction(0, 1), Fraction(4, 125), Fraction(8, 125),
Fraction(12, 125), Fraction(16, 125), Fraction(4, 25),
Fraction(24, 125), Fraction(28, 125), Fraction(32, 125),
...
Fraction(972, 125), Fraction(976, 125), Fraction(196, 25),
Fraction(984, 125), Fraction(988, 125), Fraction(992, 125),
Fraction(996, 125)], dtype=object)
>>> video = str(get_project_root() / "media" / "video" / "intro.webm")
>>> _decode_timestamps_ffmpeg(video, 0) # doctest: +ELLIPSIS
array([Fraction(0, 1), Fraction(33, 1000), Fraction(67, 1000),
Fraction(1, 10), Fraction(133, 1000), Fraction(167, 1000),
Fraction(1, 5), Fraction(117, 500), Fraction(267, 1000),
...
Fraction(951, 100), Fraction(9543, 1000), Fraction(1197, 125),
Fraction(961, 100), Fraction(9643, 1000), Fraction(2419, 250),
Fraction(971, 100), Fraction(9743, 1000), Fraction(1222, 125)],
dtype=object)
>>>
"""
# get context
if (headers := get_metadata(filename).get("streams", [])) and index < len(headers):
headers = headers[index]
if "time_base" in headers:
time_base = Fraction(headers["time_base"])
else:
time_base = 0
# decode frames
headers, infos = get_slices_metadata(filename, slice_type="frame")
if index >= len(headers):
raise MissingStreamError(f"only {len(headers)} streams in '{filename}', no {index}")
headers, infos = headers[index], infos[index]
# catch information
timestamps = []
for frame in infos:
data = dict(zip(headers, frame))
for time_key in ("best_effort_timestamp", "pts", "pkt_pts", "dts", "pkt_dts"):
if time_base and (time := data.get(time_key, "N/A")) != "N/A":
timestamps.append(int(time) * time_base)
break
else:
for time_key in (
"best_effort_timestamp_time", "pts_time", "pkt_pts_time", "dts_time",
"pkt_dts_time",
):
if (time := data.get(time_key, "N/A")) != "N/A":
timestamps.append(Fraction(time))
break
else:
timestamps.append(None)
# cast to ndarray
timestamps = np.array(timestamps, dtype=object)
return timestamps
def _estimate_codec_ffmpeg(filename: str, index: int) -> str:
"""Retrive via ffmpeg, the metadata concerning the codec name.
This function is fast because it reads only the header of the file.
Examples
--------
>>> from cutcutcodec.core.analysis.ffprobe import _estimate_codec_ffmpeg
>>> from cutcutcodec.utils import get_project_root
>>> video = str(get_project_root() / "media" / "video" / "intro.webm")
>>> _estimate_codec_ffmpeg(video, 0)
'vp9'
>>>
"""
metadata = get_metadata(filename)
if not ((stream_meta := metadata.get("streams", [])) and len(stream_meta) > index):
raise MissingStreamError(f"no stream {index} metadata detected in '{filename}'")
stream_meta = stream_meta[index]
for key in ("codec_name", "codec_long_name"):
if key in stream_meta:
return stream_meta[key]
raise MissingInformation(
f"'ffprobe' did not get a correct resolution in '{filename}' stream {index}",
)
def _estimate_duration_ffmpeg(filename: str, index: int, *, _indirect=True) -> Fraction:
"""Extract the duration from the metadata.
Very fast method but inaccurate. It doesn't work all the time.
Examples
--------
>>> from cutcutcodec.core.analysis.ffprobe import _estimate_duration_ffmpeg
>>> from cutcutcodec.utils import get_project_root
>>> audio = str(get_project_root() / "media" / "audio" / "narration_5_1.oga")
>>> _estimate_duration_ffmpeg(audio, 0)
Fraction(8, 1)
>>> video = str(get_project_root() / "media" / "video" / "intro.webm")
>>> _estimate_duration_ffmpeg(video, 0)
Fraction(9809, 1000)
>>>
"""
metadata = get_metadata(filename)
if (stream_meta := metadata.get("streams", [])) and len(stream_meta) > index:
stream_meta = stream_meta[index]
if "duration_ts" in stream_meta and "time_base" in stream_meta:
return stream_meta["duration_ts"] * Fraction(stream_meta["time_base"])
if "duration" in stream_meta:
return Fraction(stream_meta["duration"])
for key, val in stream_meta.get("tags", {}).items():
if "duration" in key.lower():
if (duration := parse_duration(val)) is not None:
return duration
if (format_meta := metadata.get("format", None)) is not None and "duration" in format_meta:
return Fraction(format_meta["duration"])
if _indirect:
return (
_estimate_len_ffmpeg(filename, index, _indirect=False)
/ _estimate_rate_ffmpeg(filename, index, _indirect=False)
)
raise MissingInformation(
f"'ffprobe' did not get a correct duration in '{filename}' stream {index}",
)
def _estimate_len_ffmpeg(filename: str, index: int, *, _indirect=True) -> int:
"""Extract the number of frames or samples from the metadata.
Very fast method but inaccurate. It doesn't work all the time.
Examples
--------
>>> from cutcutcodec.core.analysis.ffprobe import _estimate_len_ffmpeg
>>> from cutcutcodec.utils import get_project_root
>>> audio = str(get_project_root() / "media" / "audio" / "narration_5_1.oga")
>>> _estimate_len_ffmpeg(audio, 0)
128000
>>> video = str(get_project_root() / "media" / "video" / "intro.webm")
>>> _estimate_len_ffmpeg(video, 0)
294
>>>
"""
metadata = get_metadata(filename)
if not ((stream_meta := metadata.get("streams", [])) and len(stream_meta) > index):
raise MissingStreamError(f"no stream {index} metadata detected in '{filename}'")
stream_meta = stream_meta[index]
if "nb_frames" in stream_meta:
return int(stream_meta["nb_frames"])
if _indirect:
return round(
_estimate_duration_ffmpeg(filename, index, _indirect=False)
* _estimate_rate_ffmpeg(filename, index, _indirect=False),
)
raise MissingInformation(
f"'ffprobe' did not get a correct frames number in '{filename}' stream {index}",
)
def _estimate_pix_fmt_ffmpeg(filename: str, index: int) -> str:
"""Retrive via ffmpeg, the metadata concerning the pixel format.
This function is fast because it reads only the header of the file.
Examples
--------
>>> from cutcutcodec.core.analysis.ffprobe import _estimate_pix_fmt_ffmpeg
>>> from cutcutcodec.utils import get_project_root
>>> video = str(get_project_root() / "media" / "video" / "intro.webm")
>>> _estimate_pix_fmt_ffmpeg(video, 0)
'yuv420p'
>>>
"""
metadata = get_metadata(filename)
if not ((stream_meta := metadata.get("streams", [])) and len(stream_meta) > index):
raise MissingStreamError(f"no stream {index} metadata detected in '{filename}'")
stream_meta = stream_meta[index]
if "pix_fmt" not in stream_meta:
raise MissingInformation(
f"'ffprobe' did not find the pixel format in '{filename}' stream {index}",
)
return stream_meta["pix_fmt"]
def _estimate_range_ffmpeg(filename: str, index: int) -> str:
"""Search, via ffmpeg, the metadata concerning the encoding range.
This function is fast because it reads only the header of the file.
Examples
--------
>>> from cutcutcodec.core.analysis.ffprobe import _estimate_range_ffmpeg
>>> from cutcutcodec.utils import get_project_root
>>> video = str(get_project_root() / "media" / "video" / "intro.webm")
>>> _estimate_range_ffmpeg(video, 0)
'tv'
>>>
"""
metadata = get_metadata(filename)
if not ((stream_meta := metadata.get("streams", [])) and len(stream_meta) > index):
raise MissingStreamError(f"no stream {index} metadata detected in '{filename}'")
stream_meta = stream_meta[index]
if "color_range" not in stream_meta:
raise MissingInformation(
f"'ffprobe' did not find the color range in '{filename}' stream {index}",
)
range_ = stream_meta["color_range"]
tab = { # https://trac.ffmpeg.org/wiki/colorspace
None: None,
0: None,
"unknown": None,
"unspecified": None,
1: "tv",
"tv": "tv",
"mpeg": "tv",
"limited": "tv",
2: "pc",
"pc": "pc",
"jpeg": "pc",
"full": "pc",
}
if range_ not in tab:
raise MissingInformation(
f"'ffprobe' give the range {range_!r} in '{filename}' stream {index}, "
"which is unparsable, please update the source code",
)
return tab[range_]
def _estimate_rate_ffmpeg(filename: str, index: int, *, _indirect=True) -> Fraction:
"""Retrieve via ffmpeg, the metadata concerning the fps or the framerate.
This function is fast because it reads only the header of the file.
Examples
--------
>>> from cutcutcodec.core.analysis.ffprobe import _estimate_rate_ffmpeg
>>> from cutcutcodec.utils import get_project_root
>>> audio = str(get_project_root() / "media" / "audio" / "narration_5_1.oga")
>>> _estimate_rate_ffmpeg(audio, 0)
Fraction(16000, 1)
>>> video = str(get_project_root() / "media" / "video" / "intro.webm")
>>> _estimate_rate_ffmpeg(video, 0)
Fraction(30000, 1001)
>>>
"""
metadata = get_metadata(filename)
if not ((stream_meta := metadata.get("streams", [])) and len(stream_meta) > index):
raise MissingStreamError(f"no stream {index} metadata detected in '{filename}'")
stream_meta = stream_meta[index]
for rate_key in ("r_frame_rate", "sample_rate", "avg_frame_rate"):
if rate_key in stream_meta:
try:
return Fraction(stream_meta[rate_key])
except ZeroDivisionError:
continue
if _indirect:
return (
_estimate_len_ffmpeg(filename, index, _indirect=False)
/ _estimate_duration_ffmpeg(filename, index, _indirect=False)
)
raise MissingInformation(
f"'ffprobe' did not get a correct framerate in '{filename}' stream {index}",
)
def _estimate_resolution_ffmpeg(filename: str, index: int) -> tuple[int, int]:
"""Retrive via ffmpeg, the metadata concerning the image resolution.
This function is fast because it reads only the header of the file.
Examples
--------
>>> from cutcutcodec.core.analysis.ffprobe import _estimate_resolution_ffmpeg
>>> from cutcutcodec.utils import get_project_root
>>> video = str(get_project_root() / "media" / "video" / "intro.webm")
>>> _estimate_resolution_ffmpeg(video, 0)
(720, 1280)
>>> _estimate_resolution_ffmpeg(video, 1)
(360, 640)
>>>
"""
metadata = get_metadata(filename)
if not ((stream_meta := metadata.get("streams", [])) and len(stream_meta) > index):
raise MissingStreamError(f"no stream {index} metadata detected in '{filename}'")
stream_meta = stream_meta[index]
for h_key, w_key in (("height", "width"), ("coded_height", "coded_width")):
if h_key in stream_meta and w_key in stream_meta:
return (stream_meta[h_key], stream_meta[w_key])
raise MissingInformation(
f"'ffprobe' did not get a correct resolution in '{filename}' stream {index}",
)
@functools.lru_cache(maxsize=1024)
def _get_streams_type(filename: pathlib.Path) -> list[str]:
"""Help ``get_streams_type``."""
# optional shortcut
from cutcutcodec.core.io.cst import AUDIO_SUFFIXES, IMAGE_SUFFIXES
if filename.suffix.lower() in IMAGE_SUFFIXES:
return ["video"]
if filename.suffix.lower() in AUDIO_SUFFIXES:
return ["audio"]
# rigourus way
cmd = [
"ffprobe", "-v", "error",
"-show_entries", "stream=index,codec_type",
"-of", "csv=p=0", str(filename),
]
try:
result = subprocess.run(
cmd,
check=True,
stderr=subprocess.DEVNULL,
stdout=subprocess.PIPE,
).stdout.decode()
except subprocess.CalledProcessError as err:
raise MissingStreamError(f"'ffprobe' can not open '{filename}'") from err
if not (indexs_streams := result.strip().split("\n")):
raise MissingStreamError(f"'ffprobe' did not find any stream info in '{filename}'")
indexs_streams = [ind for ind in indexs_streams if ind]
streams = {}
for index_stream in indexs_streams:
index, stream, *_ = index_stream.split(",")
index = int(index)
if streams.get(index) is not None and stream != streams[index]:
raise MissingStreamError(f"index {index} appears twice in '{filename}'")
if stream not in {"audio", "subtitle", "video"}:
raise ValueError(
f"the stream {index} ({stream}) in '{filename}' "
"not in 'audio', 'video' or 'subtitle'",
)
streams[index] = stream
if streams.keys() != set(range(len(streams))):
raise MissingStreamError(f"missing stream index in '{filename}', {streams}")
return [streams[i] for i in range(len(streams))]
def _help_slices_metadata_context(filename: pathlib.Path) -> tuple[list[Fraction], Fraction]:
"""Help ``get_slices_metadata``."""
if not (data := get_metadata(filename).get("streams", [])):
raise MissingStreamError(f"no stream metadata detected in '{filename}'")
times_base = [Fraction(s["time_base"]) for s in data if "time_base" in s]
if len(times_base) != len(data):
raise MissingInformation(
f"the field 'time_base' is not founded for all streams metadata of '{filename}'",
)
duration = Fraction(0)
for stream_index in range(len(times_base)):
try:
duration = max(duration, _estimate_duration_ffmpeg(filename, stream_index))
except MissingInformation:
continue
return times_base, duration
def _help_slices_metadata_parse_data(data, sort_new_h, sort_old_h=None):
"""Help ``get_slices_metadata``."""
if not isinstance(data, dict):
data = dict(zip(sort_old_h, data))
data = [[data.get(h, "N/A") for h in sort_new_h]]
data = np.array(data, dtype="U")
return data
def _map_index_rel_to_abs(filename: str, index: int, stream_type: str) -> int:
"""Relative stream index to absolute stream index.
Examples
--------
>>> from cutcutcodec.core.analysis.ffprobe import _map_index_rel_to_abs
>>> from cutcutcodec.utils import get_project_root
>>> media = str(get_project_root() / "media" / "video" / "intro.webm")
>>> _map_index_rel_to_abs(media, 0, "video")
0
>>> _map_index_rel_to_abs(media, 1, "video")
1
>>> _map_index_rel_to_abs(media, 0, "audio")
2
>>> _map_index_rel_to_abs(media, 1, "audio")
3
>>>
"""
types = get_streams_type(filename)
abs_indexs = [i for i, t in enumerate(types) if t == stream_type]
if index >= len(abs_indexs):
raise MissingStreamError(
f"relative {stream_type} index {index} of '{filename}' not in {types}",
)
return abs_indexs[index]
[docs]
def get_streams_type(
filename: pathlib.Path | str | bytes, ignore_errors=False,
) -> list[str]:
"""Retrieve in order the stream types present in the file.
Parameters
----------
filename : pathlike
The pathlike of the file containing streams.
ignore_errors : boolean, default=False
If True, returns an empty list
rather than throwing an exception if no valid stream is detected.
Returns
-------
streams_type : list[str]
Each item can be "audio", "subtitle" or "video".
Raises
------
MissingStreamError
If ``ignore_errors`` is False and if one of the indexes is missing or redondant.
Examples
--------
>>> from cutcutcodec.core.analysis.ffprobe import get_streams_type
>>> from cutcutcodec.utils import get_project_root
>>> media = get_project_root() / "media" / "video" / "intro.webm"
>>> get_streams_type(media)
['video', 'video', 'audio', 'audio']
>>> get_streams_type(get_project_root() / "__main__.py", ignore_errors=True)
[]
>>>
"""
filename = pathlib.Path(filename)
assert filename.exists(), filename
assert isinstance(ignore_errors, bool), ignore_errors.__class__.__name__
try:
return _get_streams_type(filename)
except MissingStreamError as err:
if ignore_errors:
return []
raise err
[docs]
def parse_duration(duration: numbers.Real | str) -> None | Fraction:
"""Try to convert a duration information into a fraction in second.
Parameters
----------
duration : number or str
The duration to cast in integer
Returns
-------
sec_duration : Fraction
The decoded duration in second.
Examples
--------
>>> from cutcutcodec.core.analysis.ffprobe import parse_duration
>>> parse_duration(1.5) # from float
Fraction(3, 2)
>>> parse_duration(2) # from integer
Fraction(2, 1)
>>> parse_duration(".5") # from float rep
Fraction(1, 2)
>>> parse_duration("1.") # from float rep
Fraction(1, 1)
>>> parse_duration("1.5") # from complete float rep
Fraction(3, 2)
>>> parse_duration("1:01:01") # from h:m:s
Fraction(3661, 1)
>>>
"""
assert isinstance(duration, (numbers.Real, str)), duration.__class__.__name__
try:
return Fraction(duration)
except ValueError:
pass
if (match := re.fullmatch(r"(?P<h>\d+):(?P<m>\d\d):(?P<s>\d*\.?\d+)", duration)):
return 3600*int(match["h"]) + 60*int(match["m"]) + Fraction(match["s"])
return None