Source code for cutcutcodec.core.io.read_image
#!/usr/bin/env python3
"""Read an image with opencv."""
from fractions import Fraction
import math
import pathlib
import typing
import cv2
import numpy as np
import torch
from cutcutcodec.core.classes.container import ContainerInput
from cutcutcodec.core.classes.frame_video import FrameVideo
from cutcutcodec.core.classes.stream import Stream
from cutcutcodec.core.classes.stream_video import StreamVideo
from cutcutcodec.core.exceptions import DecodeError, OutOfTimeRange
from cutcutcodec.core.filter.video.resize import resize_keep_ratio
[docs]
def read_image(filename: typing.Union[str, bytes, pathlib.Path]) -> torch.Tensor:
"""Read the image and make it compatible with Video Frame.
Parameters
----------
filename : pathlike
The pathlike of the image file.
Returns
-------
image : torch.Tensor
The image in uint8 or float32 of shape (height, width, channels).
Raises
------
cutcutcodec.core.exceptions.DecodeError
If it fails to read the image.
"""
filename = pathlib.Path(filename).expanduser().resolve()
assert filename.is_file(), filename
if (img := cv2.imread(str(filename), cv2.IMREAD_UNCHANGED)) is None:
raise DecodeError(f"failed to read the image {filename} with cv2")
if img.ndim == 2:
img = np.expand_dims(img, 2)
if img.dtype != np.uint8 and np.issubdtype(img.dtype, np.integer):
iinfo = np.iinfo(img.dtype)
img = img.astype(np.float32)
img -= float(iinfo.min)
img *= 1.0 / float(iinfo.max - iinfo.min)
elif img.dtype != np.float32 and np.issubdtype(img.dtype, np.floating):
img = img.astype(np.float32)
torch_img = torch.from_numpy(img)
return torch_img
[docs]
class ContainerInputImage(ContainerInput):
"""Decode an image.
Attributes
----------
filename : pathlib.Path
The path to the physical file that contains the extracted image stream (readonly).
Examples
--------
>>> from cutcutcodec.core.io.read_image import ContainerInputImage
>>> (stream,) = ContainerInputImage.default().out_streams
>>> stream.snapshot(0, (12, 12))[..., 3]
tensor([[ 0, 0, 9, 113, 204, 247, 247, 203, 113, 10, 0, 0],
[ 0, 30, 208, 255, 255, 255, 255, 255, 255, 208, 30, 0],
[ 9, 208, 255, 255, 255, 255, 255, 255, 255, 255, 208, 9],
[113, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 113],
[204, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 203],
[247, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 246],
[247, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 246],
[204, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 203],
[113, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 113],
[ 9, 208, 255, 255, 255, 255, 255, 255, 255, 255, 208, 9],
[ 0, 30, 208, 255, 255, 255, 255, 255, 255, 208, 30, 0],
[ 0, 0, 9, 113, 203, 246, 246, 203, 113, 9, 0, 0]],
dtype=torch.uint8)
>>>
"""
def __init__(self, filename: typing.Union[str, bytes, pathlib.Path]):
"""Initialise and create the class.
Parameters
----------
filename : pathlike
Path to the file to be decoded.
Raises
------
cutcutcodec.core.exceptions.DecodeError
If it fail to extract any multimedia stream from the provided file.
"""
filename = pathlib.Path(filename).expanduser().resolve()
assert filename.is_file(), filename
self._filename = filename
super().__init__([_StreamVideoImage(self)])
def __enter__(self):
"""Make the object compatible with a context manager."""
return self
def __exit__(self, *_):
"""Exit the context manager."""
def _getstate(self) -> dict:
return {"filename": str(self.filename)}
def _setstate(self, in_streams: typing.Iterable[Stream], state: dict) -> None:
keys = {"filename"}
assert state.keys() == keys, set(state)-keys
ContainerInputImage.__init__(self, state["filename"])
[docs]
@classmethod
def default(cls):
"""Provide a minimalist example of an instance of this node."""
return cls("cutcutcodec/examples/logo.png")
@property
def filename(self) -> pathlib.Path:
"""Return the path to the physical file that contains the extracted image stream."""
return self._filename
class _StreamVideoImage(StreamVideo):
"""Read an image as a video stream.
Parameters
----------
height : int
The dimension i (vertical) of the encoded frames in pxl (readonly).
width : int
The dimension j (horizontal) of the encoded frames in pxl (readonly).
"""
is_space_continuous = False
is_time_continuous = False
def __init__(self, node: ContainerInputImage):
assert isinstance(node, ContainerInputImage), node.__class__.__name__
super().__init__(node)
self._img = read_image(node.filename)
self._height, self._width, *_ = self._img.shape
self._resized_img = FrameVideo(0, self._img) # not from_numpy for casting shape and type
def _snapshot(self, timestamp: Fraction, mask: torch.Tensor) -> torch.Tensor:
if timestamp < 0:
raise OutOfTimeRange(f"there is no image frame at timestamp {timestamp} (need >= 0)")
# reshape if needed
if self._resized_img.shape[:2] != mask.shape:
self._resized_img = resize_keep_ratio(FrameVideo(0, self._img), mask.shape)
return FrameVideo(timestamp, self._resized_img.clone())
@property
def beginning(self) -> Fraction:
return Fraction(0)
@property
def duration(self) -> typing.Union[Fraction, float]:
return math.inf
@property
def height(self) -> int:
"""Return the preconised dimension i (vertical) of the picture in pxl."""
return self._height
@property
def width(self) -> int:
"""Return the preconised dimension j (horizontal) of the picture in pxl."""
return self._width