"""Smartly choose the framerate of an audio stream."""
import math
from fractions import Fraction
import numpy as np
from sympy.core.add import Add
from sympy.core.function import diff
from sympy.core.mul import Mul
from sympy.core.power import Pow
from sympy.core.symbol import Symbol
from sympy.functions.elementary.trigonometric import cos, sin
from sympy.functions.special.delta_functions import DiracDelta
from sympy.utilities import lambdify
from cutcutcodec.core.analysis.stream.time_backprop import time_backprop
from cutcutcodec.core.classes.stream_audio import StreamAudio
from cutcutcodec.core.compilation.sympy_to_torch.preprocess import evalf
from cutcutcodec.core.generation.audio.equation import GeneratorAudioEquation
from cutcutcodec.core.generation.audio.noise import GeneratorAudioNoise
from cutcutcodec.core.io.read_ffmpeg import ContainerInputFFMPEG
RATE_ESTIMATORS = {} # to each node stream class, associate the func to find the optimal rate
def _add_estimator(node_cls: type) -> callable:
def _add_func(func) -> callable:
RATE_ESTIMATORS[node_cls] = func
return func
return _add_func
def _eq_estimate_freq_max(
signal, t_symb: Symbol, t_min: Fraction, t_max: Fraction | float,
) -> float:
"""Recursive delegation (parallelization is slower)."""
if t_symb not in signal.free_symbols: # facultative, optimisation
return 0.0
if isinstance(signal, Add): # fourier transform is linear
return max(_eq_estimate_freq_max(s, t_symb, t_min, t_max) for s in signal.args)
if isinstance(signal, Mul): # fourier transform of prod is convolution, max is sum
return sum(_eq_estimate_freq_max(s, t_symb, t_min, t_max) for s in signal.args)
if isinstance(signal, (cos, sin)):
return _eq_abs_max_expr(
diff(signal.args[0], t_symb), t_symb, float(t_min), float(t_max),
) / (2*math.pi)
if isinstance(signal, Pow) and signal.exp.is_integer and signal.exp > 0: # recursive Mul
return float(signal.exp) * _eq_estimate_freq_max(signal.base, t_symb, t_min, t_max)
return _eq_estimate_freq_max_by_diff(signal, t_symb, float(t_min), float(t_max))
def _eq_abs_max_expr(expr, t_symb: Symbol, t_min: float, t_max: float) -> float:
"""Search an aproximation of the max with a pseudo dichotomie gradient."""
if expr.is_number:
return abs(float(expr))
if expr.atoms(DiracDelta):
return math.inf
func_max_and_diff = lambdify( # sign(abs(s)') = sign((s**2)') = sign(2*s'*s) = sign(s'*s)
[t_symb],
[expr, Mul(diff(expr, t_symb), expr, evaluate=False)],
modules="numpy",
cse=True,
)
# the initials positions
t_max = 3600.0 if t_max == math.inf else t_max
t_0 = np.logspace(0, 3, 512, base=10) # [1, 1000]
t_0 *= (t_max-t_min) / 999.0 # [(tmax-t_min)/999, 1000/999*(tmax-t_min)]
t_0 += (1000.0*t_min - t_max) / 999.0 # [t_min, t_max]
t_0 = np.concatenate([t_0, np.linspace(t_min, t_max, 1024), (t_min+t_max)-t_0], axis=0)
t_0.sort(axis=0, kind="quicksort")
# search the maximums
old_max = 0
for i in range(30): # f > 19200 Hz for delta_t <= 3600 s
new_max, direction = func_max_and_diff(t_0)
if not np.isfinite(new_max).any():
return old_max
new_max = np.absolute(new_max, out=new_max)
new_max = np.nanmax(new_max)
if old_max > new_max:
return old_max
old_max = new_max
direction = np.sign(direction).astype(int)
direction[0], direction[-1] = max(0, direction[0]), min(0, direction[-1])
direction += np.arange(len(direction), dtype=int)
t_0 = (1 - 2**(-1-i))*t_0 + (2**(-1-i))*t_0[direction]
return old_max
def _eq_estimate_freq_max_by_diff(signal, t_symb: Symbol, t_min: float, t_max: float) -> float:
"""Match the max freq sin by differencials.
Hypotetical model : signal = s = a*sin(2*pi*f*t)
<=> s' = 2*pi*a*f*cos(2*pi*f*t) and s''' = -8*pi**3*a*f**3*cos(2*pi*f*t)
<=> f_max = sqrt(abs(s'**3 * s'''))/(2 * pi * s'**2)
<=> f_max**2 = abs(-s'''/(4*pi**2*s'))
<=> 4*pi**2 * f_max**2 = abs(s'''/s')
"""
# compute the successives differentials
if signal.is_Atom: # average 8% faster this test
return 0
signal = evalf(signal) # average 60% faster with this simplification
max_freq = diff(signal, t_symb)
max_freq = diff(max_freq, t_symb, 2) / max_freq
return math.sqrt(_eq_abs_max_expr(max_freq, t_symb, t_min, t_max) / (4*math.pi**2))
@_add_estimator(ContainerInputFFMPEG)
def _optimal_rate_container_input_ffmpeg(stream: StreamAudio, *_) -> int:
"""Detect the rate of a ContainerInputFFMPEG stream.
Examples
--------
>>> from cutcutcodec.core.analysis.stream.rate_audio import optimal_rate_audio
>>> from cutcutcodec.core.io.read_ffmpeg import ContainerInputFFMPEG
>>> from cutcutcodec.utils import get_project_root
>>> audio = get_project_root() / "media" / "audio" / "narration_5_1.oga"
>>> (stream,) = ContainerInputFFMPEG(audio).out_streams
>>> optimal_rate_audio(stream)
16000
>>>
"""
assert isinstance(stream.node, ContainerInputFFMPEG), stream.node.__class__.__name__
return stream.rate
@_add_estimator(GeneratorAudioEquation)
def _optimal_rate_generator_audio_equation(
stream: StreamAudio, t_min: Fraction, t_max: Fraction | float,
) -> int:
"""Detect the rate of a GeneratorAudioEquation stream.
Examples
--------
>>> from cutcutcodec.core.analysis.stream.rate_audio import optimal_rate_audio
>>> from cutcutcodec.core.generation.audio.equation import GeneratorAudioEquation
>>>
>>> optimal_rate_audio(GeneratorAudioEquation(0).out_streams[0])
0
>>> (stream,) = GeneratorAudioEquation("sin(2*pi*440*t)").out_streams
>>> optimal_rate_audio(stream)
881
>>> (stream,) = GeneratorAudioEquation("1/2*cos(2*pi*440*t)").out_streams
>>> optimal_rate_audio(stream)
881
>>> (stream,) = GeneratorAudioEquation("cos(2*pi*440*t)+cos(2*pi*880*t)").out_streams
>>> optimal_rate_audio(stream)
1761
>>> (stream,) = GeneratorAudioEquation("cos(2*pi*440*t)**4").out_streams
>>> optimal_rate_audio(stream)
3521
>>> (stream,) = GeneratorAudioEquation("cos(2*pi*440*t)*cos(2*pi*880*t)").out_streams
>>> optimal_rate_audio(stream)
2641
>>> (stream,) = GeneratorAudioEquation("sin(2*pi*440*exp(-t))").out_streams
>>> optimal_rate_audio(stream)
881
>>> (stream,) = GeneratorAudioEquation("exp(-t**2/(2*1/1000))").out_streams
>>> optimal_rate_audio(stream)
192000
>>> (stream,) = GeneratorAudioEquation("max(-1, min(1, 100*t))").out_streams
>>> optimal_rate_audio(stream)
192000
>>>
"""
assert isinstance(stream.node, GeneratorAudioEquation), stream.node.__class__.__name__
t_symb = Symbol("t", real=True, positive=True)
freq = max(
_eq_estimate_freq_max(signal, t_symb, t_min, t_max) for signal in stream.node.signals
)
if math.isfinite(freq):
shannon = math.floor(2*freq) + 1 if freq else 0 # shannon strict superior to 2 f_max
return min(192000, shannon) # the max supported by some sound cards
return 192000
@_add_estimator(GeneratorAudioNoise)
def _optimal_rate_generator_audio_noise(stream: StreamAudio, *_) -> int:
"""Detect the rate of a GeneratorAudioNoise stream.
Examples
--------
>>> from cutcutcodec.core.analysis.stream.rate_audio import optimal_rate_audio
>>> from cutcutcodec.core.generation.audio.noise import GeneratorAudioNoise
>>> (stream,) = GeneratorAudioNoise(0).out_streams
>>> optimal_rate_audio(stream)
48000
>>>
"""
assert isinstance(stream.node, GeneratorAudioNoise), stream.node.__class__.__name__
return 48000
[docs]
def optimal_rate_audio(
stream: StreamAudio,
t_min: Fraction | None = None,
t_max: Fraction | float | None = None,
) -> int:
"""Find the optimal sampling rate for a given audio stream.
Parameters
----------
stream : cutcutcodec.core.classes.stream_audio.StreamAudio
The audio stream that we want to find the optimal rate.
t_min : float, optional
The lower bound of the time slice estimation.
t_max : float, optional
The higher bound of the time slice estimation.
Returns
-------
framerate : int
The minimum samplerate that respects the Nyquist–Shannon theorem.
The special value 0 is returned if not special rate is suggested.
"""
# verifications
assert isinstance(stream, StreamAudio), stream.__class__.__name__
assert t_min is None or isinstance(t_min, Fraction), t_min.__class__.__name__
assert t_max is None or t_max == math.inf or isinstance(t_max, Fraction), t_max
# estimation of the best rate
t_min, t_max = t_min or stream.beginning, t_max or stream.beginning + stream.duration
if (estimator := RATE_ESTIMATORS.get(stream.node.__class__, None)) is not None:
return estimator(stream, t_min, t_max)
return max(
(
optimal_rate_audio(s, *t)
for s, *t in time_backprop(stream, t_min, t_max)
if s.type == "audio"
),
default=0,
)