"""Allow to choose a format and codecs.
The information collected here concerns the encoding and not the decoding.
"""
import contextlib
import io
import itertools
import multiprocessing
import os
import tempfile
from fractions import Fraction
import av
import numpy as np
import tqdm
from cutcutcodec.core.classes.encoder import AllEncoders, Encoder
from cutcutcodec.core.classes.muxer import AllMuxers
from cutcutcodec.core.exceptions import DecodeError, EncodeError, IncompatibleSettings
from cutcutcodec.core.opti.cache.singleton import MetaSingleton
def _decode(file: str | io.BytesIO, muxer: str | None = None) -> dict[str]:
"""Read and extracts the informations of the stream 0.
Extract only the mains informations. It uses pyav in background.
Decode the informations of the first stream only.
Parameters
----------
file : str
The full path a the file to read or the io.BytesIO filelike, already seek to 0.
muxer : str, optional
The muxer name used for encode.
Returns
-------
properties : dict[str, str]
* `codec`: str # The name of the codec found.
* `layout`: str or None # The audio channel organisation.
* `muxer`: str # The decoded muxer name.
* `profile`: str # The name of the binary format of the data.
* `rate`: int or Fraction # The framerate or the samplerate.
* `shape`: tuple[int, int] or None # The shape of the frames.
* `type`: str # The type of stream.
Raises
------
DecodeError
If reading failed.
Notes
-----
No verifications are performed because it is a subfunction.
"""
properties = {}
try:
with av.open(file, mode="r", format=muxer) as container:
stream = container.streams[0]
properties["codec"] = stream.codec_context.codec.name
properties["layout"] = None
properties["muxer"] = container.format.name
properties["profile"] = stream.codec_context.format.name
properties["rate"] = None
properties["shape"] = None
properties["type"] = stream.type
if properties["type"] == "audio":
properties["layout"] = stream.codec_context.layout.name
properties["rate"] = stream.rate
elif properties["type"] == "video":
properties["rate"] = stream.average_rate
properties["shape"] = (stream.height, stream.width)
list(container.decode(stream)) # try to decode
except (
av.error.FFmpegError,
IndexError,
AttributeError,
UnicodeDecodeError, # for the ascii codec
) as err:
raise DecodeError(f"failed to decode {file}") from err
return properties
def _encode_audio(
file: str | io.BytesIO,
encodec: str,
muxer: str,
**kwargs,
) -> None:
"""Test the encoder capability with the null muxer in a virtual file.
Parameters
----------
file : str
The full path a the file to write or the io.BytesIO filelike.
encodec : str
The codec or the encoder name.
muxer : str
The format name.
layout : str
The name of the audio layout, ex "stereo"
rate : int, optional
The samplerate of the audio in all channels.
profile : str, optional
The name of the binary format of the data ex "flt".
Raises
------
EncodeError
If reading failed.
Notes
-----
No verifications are performed because it is a subfunction.
"""
layout, rate, profile = kwargs["layout"], kwargs.get("rate"), kwargs.get("profile")
os.environ["PYAV_LOGGING"] = "off"
try:
with av.open(file, mode="w", format=muxer) as container:
stream = container.add_stream(encodec, rate=rate, layout=layout)
stream.options = {"strict": "experimental"}
if profile is not None:
stream.format = profile
format_to_use = (
stream.format.name
if stream.format.name in av.audio.frame.format_dtypes
else "flt"
)
frame = av.audio.frame.AudioFrame.from_ndarray(
np.zeros( # zeros encoding time faster than empty
(len(av.audio.layout.AudioLayout(layout).channels), 4800),
dtype=av.audio.frame.format_dtypes[format_to_use],
),
format=format_to_use,
layout=layout,
) # raise IndexError for 7.1 layout, error in pyav
frame.rate = rate or min(av.Codec(encodec, "w").audio_rates or {48000})
frame.time_base = Fraction(1, frame.rate)
frame.pts = 0
container.mux(stream.encode(frame))
container.mux(stream.encode(None)) # flush buffer
except (av.error.FFmpegError, ValueError, AttributeError) as err:
raise EncodeError(f"failed to encode {file}") from err
def _encode_video(
file: str | io.BytesIO,
encodec: str,
muxer: str,
**kwargs,
) -> None:
"""Test the encoder capability with the null muxer in a virtual file.
Parameters
----------
file : str
The full path a the file to write or the io.BytesIO filelike.
encodec : str
The codec or the encoder name.
muxer : str
The format name.
shape : tuple[int, int]
The shape of the frames of the encoded video.
rate : Fraction, optional
The frame rate.
pix_fmt : str, optional
The name of the pixel format used.
Raises
------
EncodeError
If reading failed.
Notes
-----
No verifications are performed because it is a subfunction.
"""
shape, rate, pix_fmt = kwargs["shape"], kwargs.get("rate"), kwargs.get("pix_fmt")
os.environ["PYAV_LOGGING"] = "off"
try:
with av.open(file, mode="w", format=muxer) as container:
stream = container.add_stream(encodec, rate=rate, height=shape[0], width=shape[1])
stream.options = {
"strict": "experimental", # to allow new codecs
"x265-params": "log_level=none", # to make libx265 quiet
}
if encodec == "libsvtav1":
os.environ["SVT_LOG"] = "1" # to make libsvtav1 quiet
stream.pix_fmt = (
pix_fmt or max(
(av.Codec(encodec, "w").video_formats or (av.VideoFormat("yuv420p"),)),
key=lambda f: f.bits_per_pixel,
).name
)
for i in range(3): # nbr frames
frame = av.video.frame.VideoFrame.from_ndarray(
np.zeros((*shape, 3), dtype=np.uint8), format="rgb24",
)
frame.time_base = Fraction(1, 300300) # ppcm 1001, 1000, 25, 30, 60
frame.pts = round(
(i / (rate or stream.average_rate or Fraction(30000, 1001)))
/ frame.time_base,
)
container.mux(stream.encode(frame))
container.mux(stream.encode(None)) # flush buffer
except (av.error.FFmpegError, ValueError, AttributeError) as err:
raise EncodeError(f"failed to encode {file}") from err
[docs]
def audio_encodec_compatibility(
encodec: str,
muxer: str,
layout: str = "mono",
rate: int | None = None,
profile: str | None = None,
) -> str:
"""Test throw av the compatibility of the encoding audio parameters.
Parameters
----------
encodec : str
The codec or the encoder name.
muxer : str
The format name.
layout : str, default="mono"
The name of the audio layout, ex "stereo"
rate : int, optional
The samplerate of the audio in all channels.
profile : str, optional
The name of the binary format of the data ex "flt".
Returns
-------
codec : str
The name of the codec found.
Raises
------
IncompatibleSettings
If it fails to encode or if the decoded parameters don't matched.
Examples
--------
>>> from cutcutcodec.core.compilation.export.compatibility import audio_encodec_compatibility
>>> audio_encodec_compatibility("libvorbis", "ogg")
'vorbis'
>>>
"""
assert isinstance(muxer, str), muxer.__class__.__name__
assert isinstance(encodec, str), encodec.__class__.__name__
assert isinstance(layout, str), layout.__class__.__name__
assert rate is None or isinstance(rate, int), rate.__class__.__name__
assert profile is None or isinstance(profile, str), profile.__class__.__name__
# theorical verifications
codec_av = av.Codec(encodec, "w")
if codec_av.type != "audio":
raise IncompatibleSettings(f"the codec {encodec} is {codec_av.type}, not audio")
if (
rate is not None and codec_av.audio_rates is not None
and rate not in codec_av.audio_rates
):
raise IncompatibleSettings(
f"the codec {encodec} dose not support {rate} Hz, only {codec_av.audio_rates}",
)
if (
profile is not None and codec_av.audio_formats is not None
and profile not in {p.name for p in codec_av.audio_formats}
):
raise IncompatibleSettings(f"the codec {encodec} dose not support {profile} profile")
# prepare context
properties = {}
try:
with io.BytesIO() as file:
file.name = os.devnull
_encode_audio(file, encodec, muxer, layout=layout, rate=rate, profile=profile)
file.seek(0)
properties = _decode(file, muxer)
except (EncodeError, DecodeError) as err:
raise IncompatibleSettings("failed to encode or decode") from err
for ref_name, ref_val in [
("type", "audio"),
("muxer", muxer),
("layout", layout),
("rate", rate),
("profile", profile),
]:
if ref_val is not None and ref_val != properties[ref_name]:
raise IncompatibleSettings(
f"encoded {ref_name} {ref_val} but decoded with {properties[ref_name]}",
)
return properties["codec"]
[docs]
def video_encodec_compatibility(
encodec: str,
muxer: str,
shape: tuple[int, int] | None = None,
rate: Fraction | None = None,
pix_fmt: str | None = None,
) -> str:
"""Test throw av the compatibility of the encoding video parameters.
Parameters
----------
encodec : str
The codec or the encoder name.
muxer : str
The format name.
shape : tuple[int, int], optional
The shape of the frames of the encoded video.
rate : Fraction, optional
The frame rate.
pix_fmt : str, optional
The name of the pixel format used.
Returns
-------
codec : str
The name of the codec found.
Raises
------
IncompatibleSettings
If it fails to encode or if the decoded parameters don't matched.
Examples
--------
>>> from cutcutcodec.core.compilation.export.compatibility import video_encodec_compatibility
>>> video_encodec_compatibility("libx264", "mp4")
'h264'
>>>
"""
assert isinstance(muxer, str), muxer.__class__.__name__
assert isinstance(encodec, str), encodec.__class__.__name__
shape = shape or (64, 64)
assert isinstance(shape, tuple), shape.__class__.__name__
assert len(shape) == 2, shape
assert isinstance(shape[0], int), shape[0].__class__.__name__
assert isinstance(shape[1], int), shape[1].__class__.__name__
assert rate is None or isinstance(rate, Fraction), rate.__class__.__name__
assert pix_fmt is None or isinstance(pix_fmt, str), pix_fmt.__class__.__name__
# theorical verifications
codec_av = av.Codec(encodec, "w")
if codec_av.type != "video":
raise IncompatibleSettings(f"the codec {encodec} is {codec_av.type}, not video")
try:
if (
rate is not None and codec_av.frame_rates is not None
and rate not in codec_av.frame_rates
):
raise IncompatibleSettings(
f"the codec {encodec} dose not support {rate} fps, only {codec_av.frame_rates}",
)
except AttributeError: # failed sometimes in ``codec_av.frame_rates``
pass
if (
pix_fmt is not None and codec_av.video_formats is not None
and pix_fmt not in {f.name for f in codec_av.video_formats}
):
raise IncompatibleSettings(f"the codec {encodec} dose not support {pix_fmt} pixel format")
# prepare context
properties = {}
try:
with tempfile.SpooledTemporaryFile(max_size=10_000_000, mode="rwb") as file: # max 10Mo
_encode_video(file, encodec, muxer, shape=shape, rate=rate, pix_fmt=pix_fmt)
file.seek(0)
properties = _decode(file, muxer)
except (EncodeError, DecodeError) as err:
raise IncompatibleSettings("failed to encode") from err
for ref_name, ref_val in [
("type", "video"),
("muxer", muxer),
("profile", pix_fmt),
("rate", rate),
("shape", shape),
]:
if ref_val is not None and ref_val != properties[ref_name]:
raise IncompatibleSettings(
f"encoded {ref_name} {ref_val} but decoded with {properties[ref_name]}",
)
return properties["codec"]
[docs]
class Compatibilities(metaclass=MetaSingleton):
"""Link the muxers and the encoders."""
def __init__(self):
self._compatibilites = {}
@staticmethod
def _check_mono(enc_mux_spec: tuple[str, str, tuple[tuple[str, ...], tuple]]) -> str:
"""Test if the encodec is compatible with the given specifications.
It is process and thread safe.
"""
encoder, muxer, specifications = enc_mux_spec
kwargs = dict(zip(*specifications))
kind: str = Encoder(encoder).type
if (checker := {
"audio": audio_encodec_compatibility,
"video": video_encodec_compatibility,
}.get(kind)) is None:
raise NotImplementedError(f"only available for audio and video, not for {kind}")
with contextlib.redirect_stdout(None), contextlib.redirect_stderr(None): # quiet
try:
codec = checker(encoder, muxer, **kwargs)
except IncompatibleSettings:
return ""
return codec
[docs]
def check(self, encoders: list[str], muxers: list[str], **kwargs) -> np.ndarray[str]:
"""Check all the couples encoder/muxer (cartesian product).
Parameters
----------
encoders : list[str]
The encoder names.
muxers : list[str]
The muxer (format) names.
**kwargs : dict
The optionals named parameters of
``cutcutcodec.core.compilation.export.compatibility.audio_encodec_compatibility``
and ``cutcutcodec.core.compilation.export.compatibility.video_encodec_compatibility``.
Returns
-------
compatibility_matrix : np.ndarray[str]
The 2d boolean compatibility matrix.
Item (i, j) contains the codec name of the encoder[i] with the muxer[j].
Examples
--------
>>> from cutcutcodec.core.compilation.export.compatibility import Compatibilities
>>> Compatibilities().check(["libx264", "libaom-av1", "libvorbis"], ["mp4", "ogg"])
array([['h264', ''],
['libdav1d', ''],
['vorbis', 'vorbis']], dtype='<U8')
>>> Compatibilities().check([], [])
array([], shape=(1, 0), dtype='<U1')
>>> Compatibilities().check(["libx264"], [])
array([], shape=(1, 0), dtype='<U1')
>>> Compatibilities().check([], ["mp4"])
array([], shape=(1, 0), dtype='<U1')
>>>
"""
assert isinstance(encoders, list), encoders.__class__.__name__
assert all(isinstance(ec, str) for ec in encoders), encoders
assert set(encoders).issubset(AllEncoders().set), set(encoders)-AllEncoders().set
assert isinstance(muxers, list), muxers.__class__.__name__
assert all(isinstance(f, str) for f in muxers), muxers
assert set(muxers).issubset(AllMuxers().set), set(muxers)-AllMuxers().set
# case empty:
if len(encoders) == 0 or len(muxers) == 0:
return np.asarray([[]], dtype=str) # oblige for keep 2d array homogeneous
# makes checks
signature = tuple(sorted(kwargs))
signature = (signature, tuple(kwargs[k] for k in signature))
if (all_enc_mux_spec := [ # iter over encoder first, muxer second (same aim as shuffle)
(me[1], me[0], signature) for me in itertools.product(muxers, encoders)
if (me[1], me[0], signature) not in self._compatibilites
]): # reduce the number of tests to the minimum for optimisation
# maxtasksperchild != None for memory leak and != 1 for fork efficiency (granularity)
with multiprocessing.get_context("spawn").Pool(maxtasksperchild=128) as pool:
for enc_mux_spec, is_compatible in tqdm.tqdm(
zip(
all_enc_mux_spec,
pool.imap(
Compatibilities._check_mono,
all_enc_mux_spec,
chunksize=16,
),
# map(Compatibilities._check_mono, all_enc_mux_spec), # for debug only
),
total=len(all_enc_mux_spec),
desc="Testing encoder/muxer",
dynamic_ncols=True,
disable=(len(all_enc_mux_spec) <= 4*os.cpu_count()),
smoothing=1e-6,
unit="comb",
):
self._compatibilites[enc_mux_spec] = is_compatible
# create matrix
return np.asarray(
[
[self._compatibilites[(encoder, muxer, signature)] for muxer in muxers]
for encoder in encoders
],
dtype=str,
)
[docs]
def codecs_audio(
self, muxers: list[str] | None = None, **kwargs,
) -> dict[str, list[tuple[str, str]]]:
"""Search all the compatibles audio codecs.
Parameters
----------
muxers : list[str]
The muxer (format) names.
**kwargs : dict
The optionals named parameters of
``cutcutcodec.core.compilation.export.compatibility.audio_encodec_compatibility``.
Returns
-------
codecs : dict[str, list[tuple[str, str]]]
For all audio codec, associate the encoder/muxer pairs.
Examples
--------
>>> from pprint import pprint
>>> from cutcutcodec.core.compilation.export.compatibility import Compatibilities
>>> pprint(Compatibilities().codecs_audio(layout="5.1")) # doctest: +ELLIPSIS
{'aac': [('aac', '3g2'),
('aac', '3gp'),
('aac', 'adts'),
...
('aac', 'w64'),
('aac', 'wav'),
('aac', 'wtv')],
...
'wavpack': [('wavpack', 'matroska'), ('wavpack', 'nut'), ('wavpack', 'wv')]}
>>>
"""
encoders = sorted(AllEncoders().audio) # sorted, not list because dict is sorted
if muxers is None:
muxers = sorted(AllMuxers().set) # sorted, not list because dict is sorted
else:
assert isinstance(muxers, list), muxers.__class__.__name__
assert all(isinstance(m, str) for m in muxers), muxers
comp = self.check(encoders, muxers, **kwargs)
codecs = {}
for encoder, decoded_codecs in zip(encoders, comp):
for muxer, codec in zip(muxers, decoded_codecs):
if codec_str := str(codec):
codecs[codec_str] = codecs.get(codec_str, [])
codecs[codec_str].append((encoder, muxer))
return codecs
[docs]
def codecs_video(
self, muxers: list[str] | None = None, **kwargs,
) -> dict[str, list[tuple[str, str]]]:
"""Search all the compatibles video codecs.
Parameters
----------
muxers : list[str]
The muxer (format) names.
**kwargs : dict
The optionals named parameters of
``cutcutcodec.core.compilation.export.compatibility.video_encodec_compatibility``.
Returns
-------
codecs : dict[str, list[tuple[str, str]]]
For all video codec, associate the encoder/muxer pairs.
Examples
--------
>>> from fractions import Fraction
>>> from pprint import pprint
>>> from cutcutcodec.core.compilation.export.compatibility import Compatibilities
>>> comp = Compatibilities().codecs_video(pix_fmt="yuv444p12le", rate=Fraction(120))
>>> pprint(comp["hevc"]) # doctest: +ELLIPSIS
[('libx265', 'flv'),
...
('libx265', 'vob')]
>>>
"""
encoders = sorted(AllEncoders().video) # sorted, not list because dict is sorted
if muxers is None:
muxers = sorted(AllMuxers().set) # sorted, not list because dict is sorted
else:
assert isinstance(muxers, list), muxers.__class__.__name__
assert all(isinstance(m, str) for m in muxers), muxers
comp = self.check(encoders, muxers, **kwargs)
codecs = {}
for encoder, decoded_codecs in zip(encoders, comp):
for muxer, codec in zip(muxers, decoded_codecs):
if codec_str := str(codec):
codecs[codec_str] = codecs.get(codec_str, [])
codecs[codec_str].append((encoder, muxer))
return codecs
[docs]
def encoders_audio(
self, codec: str, muxers: list[str] | None = None, **kwargs,
) -> dict[str, set[str]]:
"""Search all the compatible audio encoders.
Parameters
----------
codec : str
The audio codec name.
muxers : list[str]
The muxer (format) names.
**kwargs : dict
The optionals named parameters of
``cutcutcodec.core.compilation.export.compatibility.audio_encodec_compatibility``.
Returns
-------
encoders : dict[str, set[str]]
For all audio encoder, associate the available muxers.
Examples
--------
>>> from pprint import pprint
>>> from cutcutcodec.core.compilation.export.compatibility import Compatibilities
>>> pprint(Compatibilities().encoders_audio("vorbis", layout="5.1")) # doctest: +ELLIPSIS
{'libvorbis': {'asf',
...
'wtv'}}
>>>
"""
assert isinstance(codec, str), codec.__class__.__name__
if muxers is None:
muxers = sorted(AllMuxers().set) # sorted, not list because dict is sorted
else:
assert isinstance(muxers, list), muxers.__class__.__name__
assert all(isinstance(m, str) for m in muxers), muxers
encoders = sorted(AllEncoders().audio) # sorted, not list because dict is sorted
comp = self.check(encoders, muxers, **kwargs)
compatible_encoders = {}
for encoder, decoded_codecs in zip(encoders, comp):
for muxer, decoded_codec in zip(muxers, decoded_codecs):
if decoded_codec == codec:
compatible_encoders[encoder] = compatible_encoders.get(encoder, set())
compatible_encoders[encoder].add(muxer)
return compatible_encoders
[docs]
def encoders_video(
self, codec: str, muxers: list[str] | None = None, **kwargs,
) -> dict[str, set[str]]:
"""Search all the compatible video encoders.
Parameters
----------
codec : str
The video codec name.
muxers : list[str]
The muxer (format) names.
**kwargs : dict
The optionals named parameters of
``cutcutcodec.core.compilation.export.compatibility.video_encodec_compatibility``.
Returns
-------
encoders : dict[str, set[str]]
For all video encoder, associate the available muxers.
Examples
--------
>>> from pprint import pprint
>>> from cutcutcodec.core.compilation.export.compatibility import Compatibilities
>>> pprint(
... Compatibilities().encoders_video("h264", pix_fmt="yuv420p")
... ) # doctest: +ELLIPSIS
{'libopenh264': {'3g2',
'3gp',
...
'vob',
'wtv'},
'libx264': {'3g2',
'3gp',
...
'vob',
'wtv'}}
>>>
"""
assert isinstance(codec, str), codec.__class__.__name__
if muxers is None:
muxers = sorted(AllMuxers().set) # sorted, not list because dict is sorted
else:
assert isinstance(muxers, list), muxers.__class__.__name__
assert all(isinstance(m, str) for m in muxers), muxers
encoders = sorted(AllEncoders().video) # sorted, not list because dict is sorted
comp = self.check(encoders, muxers, **kwargs)
compatible_encoders = {}
for encoder, decoded_codecs in zip(encoders, comp):
for muxer, decoded_codec in zip(muxers, decoded_codecs):
if decoded_codec == codec:
compatible_encoders[encoder] = compatible_encoders.get(encoder, set())
compatible_encoders[encoder].add(muxer)
return compatible_encoders
[docs]
def muxers(self, encoder: str, **kwargs) -> frozenset[str]:
"""Search all the compatibles muxers.
Parameters
----------
encoder : str
The encoder name.
**kwargs : dict
The optionals named parameters of
``cutcutcodec.core.compilation.export.compatibility.Compatibilities.check``.
Returns
-------
available_muxers : frozenset[str]
All the available muxers for the given encoder.
Examples
--------
>>> from pprint import pprint
>>> from cutcutcodec.core.compilation.export.compatibility import Compatibilities
>>> pprint(Compatibilities().muxers("libx264")) # doctest: +ELLIPSIS
frozenset({'3g2',
'3gp',
...
'vob',
'wtv'})
>>>
"""
assert isinstance(encoder, str), encoder.__class__.__name__
muxers = sorted(AllMuxers().set) # sorted for repetability
comp = self.check([encoder], muxers, **kwargs)
available_muxers = frozenset(m for m, a in zip(muxers, comp[0]) if a)
return available_muxers