r"""Sequence prediction tools.
Terminology
-----------
* :math:`n_o` is the number of observations used by the predictor.
* :math:`o_i` are the past observation, with :math:`i \in [\![1,n_o]\!]`.
* :math:`\hat{o}_{n+1}` is the prediction of the next observation.
"""
import abc
import copy
import math
import numbers
import typing
import numpy as np
[docs]
class Predictor(abc.ABC):
"""Basic structuring class for predicting a sequence of reels.
Attributes
----------
memory : int
The number of memorized samples (readonly).
obs : list[float]
Past observations, more recent first (readonly).
"""
def __init__(self, memory: numbers.Integral | float = math.inf):
"""Initialize the memory.
Parameters
----------
memory : int, default=inf
The number of samples to be memorised.
"""
assert isinstance(memory, numbers.Integral | float), memory.__class__.__name__
if isinstance(memory, float):
assert math.isinf(memory), memory
self._memory: int | float = math.inf
self._obs: list[float] = []
else:
assert memory >= 2, memory
self._memory: int | float = int(memory)
self._obs: list[float] = [0.0 for _ in range(self._memory)]
@abc.abstractmethod
def _update(self):
"""Update the internal step, to be ready for the next prediction."""
raise NotImplementedError
@property
def memory(self) -> int | float:
"""Return the number of memorized samples."""
return self._memory
@property
def obs(self) -> list[float]:
"""Return the past observations."""
return self._obs.copy() # copy to be user safe
[docs]
def predict(self, nbr: numbers.Integral = 1) -> list[float]:
r"""Predict the next samples :math:`\hat{o}_{n+1}`.
Parameters
----------
nbr : int, default=1
The number of sample to predict.
Returns
-------
samples : list[float]
The predicted sample, the last predicted in first position.
"""
assert isinstance(nbr, numbers.Integral), nbr.__class__.__name__
assert nbr >= 1, nbr
predicted: list[float] = []
self_ = self.__class__.__new__(self.__class__)
self_.__dict__ = copy.deepcopy(self.__dict__)
for _ in range(nbr-1):
predicted.insert(0, self_.predict_next())
self_.update(predicted[0])
predicted.insert(0, self_.predict_next())
return predicted
[docs]
@abc.abstractmethod
def predict_next(self) -> float:
r"""Predict the next sample :math:`\hat{o}_{n+1}`."""
raise NotImplementedError
[docs]
def update(self, obs: numbers.Real | typing.Iterable[numbers.Real]):
"""Update the state of the predictor using the new samples.
Parameters
----------
obs : float or list[float]
The new observation(s).
"""
if isinstance(obs, numbers.Real):
self.update([obs])
else:
assert hasattr(obs, "__iter__"), obs.__class__.__name__
obs = list(obs)
assert all(isinstance(o, numbers.Real) for o in obs), obs
for i in range(min(self._memory, len(obs))-1, -1, -1):
self._obs.insert(0, float(obs[i]))
if not math.isinf(self._memory):
del self._obs[self._memory:]
self._update()
[docs]
class LinearPredictor(Predictor):
r"""Prediction by linear combination of observations.
The prediction of the next observation is defined as follow:
.. math::
\hat{o}_{n+1} = \sum\limits_{k=1}^{n_\alpha} \alpha_k o_{n+1-k}
Notation:
.. math::
\begin{align}
\begin{pmatrix} \hat{o}_n \\ \vdots \\ \hat{o}_{n-h} \end{pmatrix}
&= \begin{pmatrix}
o_{n-1} & \ldots & o_{n-1-n_\alpha} \\
\vdots & \ddots & \vdots \\
o_{n-1-h} & \ldots & o_{n-1-h-n_\alpha} \\
\end{pmatrix}
\begin{pmatrix} \alpha_1 \\ \vdots \\ \alpha_{n_\alpha} \end{pmatrix} \\
\Leftrightarrow \boldsymbol{\hat{o}} &= \boldsymbol{X} \boldsymbol{\alpha}
\end{align}
The :math:`\alpha_k` scalars are founded to minimise the mse:
.. math::
\begin{align}
&\boldsymbol{\alpha} = \underset{\boldsymbol{\alpha}}{\operatorname{argmin}}\left(
\left\|
\begin{pmatrix} \hat{o}_n \\ \vdots \\ \hat{o}_{n-h} \end{pmatrix}
- \begin{pmatrix} o_n \\ \vdots \\ o_{n-h} \end{pmatrix}
\right\|^2
\right) \\
\Leftrightarrow &\boldsymbol{\alpha}
= \underset{\boldsymbol{\alpha}}{\operatorname{argmin}}\left(
\left\| \boldsymbol{\hat{o}} - \boldsymbol{o} \right\|^2
\right) \\
\Leftrightarrow &\boldsymbol{\alpha}
= \underset{\boldsymbol{\alpha}}{\operatorname{argmin}}\left(
\left( \boldsymbol{X} \boldsymbol{\alpha} - \boldsymbol{o} \right)^\intercal
\left( \boldsymbol{X} \boldsymbol{\alpha} - \boldsymbol{o} \right)
\right) \\
\Leftrightarrow &\frac{
\partial \left(
\left( \boldsymbol{X} \boldsymbol{\alpha} - \boldsymbol{o} \right)^\intercal
\left( \boldsymbol{X} \boldsymbol{\alpha} - \boldsymbol{o} \right)
\right)
}{\partial \boldsymbol{\alpha}} = \boldsymbol{0} \\
\Leftrightarrow &\boldsymbol{\alpha}
= \left( \boldsymbol{X}^\intercal \boldsymbol{X} \right)^{-1}
\boldsymbol{X}^\intercal \boldsymbol{o} \\
\end{align}
Examples
--------
>>> from cutcutcodec.core.signal.predict import LinearPredictor
>>> # simple case
>>> predictor = LinearPredictor(memory=4)
>>> predictor.update([4.0, 3.0, 2.0, 1.0]) # arithmetic sequence (more recent first)
>>> round(predictor.predict_next(), 6)
5.0
>>> # complicated case
>>> predictor = LinearPredictor(memory=6)
>>> predictor.update([4.0, 2.0, 3.0, 1.0, 2.0, 0.0]) # less trivial sequence
>>> [round(p, 6) for p in predictor.predict(4)]
[6.0, 4.0, 5.0, 3.0]
>>>
"""
def __init__(self, *args, n_alpha: numbers.Integral | None = None, **kwargs):
r"""Initialise the predictor.
Parameters
----------
*args, **kwargs
Transmitted to :py:class:`Predictor`
n_alpha : int, optional
The number of internal coefficients :math:`n_\alpha`.
By default :math:`n_\alpha = \lfloor \frac{n_{obs}+1}{2} \rfloor`.
"""
super().__init__(*args, **kwargs)
if n_alpha is None:
if math.isinf(self._memory):
n_alpha = 3 # default value
else:
n_alpha = (self._memory + 1) // 2
else:
assert isinstance(n_alpha, numbers.Integral), n_alpha.__class__.__name__
assert n_alpha >= 1, n_alpha
if not math.isinf(self._memory):
assert n_alpha <= (self._memory + 1) // 2, (
f"with an history of size {self._memory}, "
f"alpha has to be <= {(self._memory + 1) // 2}"
)
self._alpha = [1.0] + [0.0 for _ in range(n_alpha-1)]
def _update(self):
"""Recompute the best alpha."""
for n_alpha in range(len(self._alpha), 0, -1):
# first check
if (h_size := len(self._obs) - n_alpha) <= 0:
if n_alpha == 1:
raise RuntimeError("please provide more observations")
continue
# creation of the vector and matrix
o_vec = np.asarray(self._obs[:h_size], dtype=np.float64)[:, None]
x_mat = np.asarray(
[self._obs[i+1:i+1+n_alpha] for i in range(h_size)],
dtype=np.float64,
)
# try to resolve
xtx_inv = np.linalg.pinv(x_mat.mT @ x_mat, hermitian=True)
# set result
alpha = xtx_inv @ x_mat.mT @ o_vec
alpha = alpha[:, 0].tolist()
alpha.extend([0.0 for _ in range(len(self._alpha) - n_alpha)])
self._alpha = alpha
break
[docs]
def predict_next(self) -> float:
r"""Compute :math:`\sum\limits_{k=1}^{n_\alpha} \alpha_k o_{n+1-k}`."""
return sum(a*o for a, o in zip(self._alpha, self._obs))