diff --git a/src/osekit/audio_backend/audio_backend.py b/src/osekit/audio_backend/audio_backend.py index f130ba24..9eadc890 100644 --- a/src/osekit/audio_backend/audio_backend.py +++ b/src/osekit/audio_backend/audio_backend.py @@ -47,3 +47,7 @@ def read(self, path: PathLike | str, start: int, stop: int) -> np.ndarray: def close(self) -> None: """Close the currently opened file.""" ... + + def seek(self, path: PathLike, frame: int) -> None: ... + + def stream(self, path: PathLike, chunk_size: int) -> np.ndarray: ... diff --git a/src/osekit/audio_backend/audio_file_manager.py b/src/osekit/audio_backend/audio_file_manager.py index 7ff42de6..da0bbdbb 100644 --- a/src/osekit/audio_backend/audio_file_manager.py +++ b/src/osekit/audio_backend/audio_file_manager.py @@ -102,3 +102,9 @@ def read( raise ValueError(msg) return self._backend(path).read(path=path, start=start, stop=stop) + + def seek(self, path: Path, frame: int) -> None: + self._backend(path=path).seek(path=path, frame=frame) + + def stream(self, path: Path, chunk_size: int) -> np.ndarray: + return self._backend(path=path).stream(path=path, chunk_size=chunk_size) diff --git a/src/osekit/audio_backend/mseed_backend.py b/src/osekit/audio_backend/mseed_backend.py index d9e86e4e..e15ac5e0 100644 --- a/src/osekit/audio_backend/mseed_backend.py +++ b/src/osekit/audio_backend/mseed_backend.py @@ -19,6 +19,7 @@ class MSeedBackend: def __init__(self) -> None: """Initialize the MSEED backend.""" _require_obspy() + self.seeked_frame = 0 def close(self) -> None: """Close the currently opened file. No use in MSEED files.""" @@ -83,3 +84,37 @@ def read( data = np.concatenate([trace.data for trace in file_content]) return data[start:stop] + + def seek(self, path: PathLike, frame: int) -> None: + """Set the seeked_frame of the backend. + + Streamed data will be streamed from this frame. + + Parameters + ---------- + path: PathLike | str + No effect. + frame: int + Frame to seek. + + """ + self.seeked_frame = frame + + def stream(self, path: PathLike, chunk_size: int) -> np.ndarray: + """Stream the content of the MSEED file from the seeked frame. + + Parameters + ---------- + path: PathLike + Path to the mseed file. + chunk_size: int + Number of frames to stream. + + Returns + ------- + np.ndarray: + Streamed data of length ``chunk_size`` from ``self.seeked_frame``. + """ + return self.read( + path=path, start=self.seeked_frame, stop=self.seeked_frame + chunk_size + ) diff --git a/src/osekit/audio_backend/soundfile_backend.py b/src/osekit/audio_backend/soundfile_backend.py index 11cc36ea..7bce1119 100644 --- a/src/osekit/audio_backend/soundfile_backend.py +++ b/src/osekit/audio_backend/soundfile_backend.py @@ -65,10 +65,17 @@ def read( A ``(channel * frames)`` array containing the audio data. """ - self._switch(path) - self._file.seek(start) + self.seek(path=path, frame=start) return self._file.read(stop - start) + def seek(self, path: PathLike, frame: int) -> None: + self._switch(path=path) + self._file.seek(frame) + + def stream(self, path: PathLike, chunk_size: int) -> np.ndarray: + self._switch(path=path) + return self._file.read(frames=chunk_size) + def _close(self) -> None: if self._file is None: return diff --git a/src/osekit/core_api/audio_data.py b/src/osekit/core_api/audio_data.py index 081d08b2..98210010 100644 --- a/src/osekit/core_api/audio_data.py +++ b/src/osekit/core_api/audio_data.py @@ -6,18 +6,21 @@ from __future__ import annotations +from collections.abc import Generator from math import ceil from typing import TYPE_CHECKING, Self import numpy as np import soundfile as sf +import soxr from pandas import Timedelta, Timestamp +from osekit.config import resample_quality_settings from osekit.core_api.audio_file import AudioFile from osekit.core_api.audio_item import AudioItem from osekit.core_api.base_data import BaseData from osekit.core_api.instrument import Instrument -from osekit.utils.audio_utils import Normalization, normalize, resample +from osekit.utils.audio_utils import Normalization, normalize if TYPE_CHECKING: from pathlib import Path @@ -216,14 +219,92 @@ def get_raw_value(self) -> np.ndarray: The value of the audio data. """ - data = np.empty(shape=self.shape) - idx = 0 + return np.vstack(list(self.stream())) + + @staticmethod + def _flush( + resampler: soxr.ResampleStream, + remaining_samples: int, + ) -> np.ndarray: + flush = resampler.resample_chunk(np.array([]), last=True) + if len(flush) == 0: + return np.array([]) + if not remaining_samples: + return np.array([]) + flush = flush[:remaining_samples] + return flush[:, None] if flush.ndim == 1 else flush + + def stream(self, chunk_size: int = 8192) -> Generator[np.ndarray, None, None]: + """Stream the audio data in chunks. + + Parameters + ---------- + chunk_size: int + Size of the chunks of audio yielded by the generator. + + Returns + ------- + Generator[np.ndarray, None, None]: + Generated ``np.ndarray`` of dimensions (``chunk_size``*``self.nb_channels``) + of the streamed audio data. + + """ + resampler = None + input_sr = None + produced_samples = 0 + total_samples = self.length + for item in self.items: - item_data = self._get_item_value(item) - item_data = item_data[: min(item_data.shape[0], data.shape[0] - idx)] - data[idx : idx + len(item_data)] = item_data - idx += len(item_data) - return data + if item.is_empty: + silence_length = round(item.duration.total_seconds() * self.sample_rate) + yield item.get_value().repeat( + silence_length, + axis=0, + ) + produced_samples += silence_length + continue + + if (resampler is None) or (input_sr != item.sample_rate): + if resampler: + flush = self._flush( + resampler=resampler, + remaining_samples=total_samples - produced_samples, + ) + yield flush + produced_samples += len(flush[0]) + input_sr = item.sample_rate + quality = resample_quality_settings[ + "downsample" if input_sr > self.sample_rate else "upsample" + ] + resampler = soxr.ResampleStream( + in_rate=input_sr, + out_rate=self.sample_rate, + num_channels=self.nb_channels, + quality=quality, + dtype=np.float64, + ) + + for chunk in item.stream(chunk_size=chunk_size): + y = chunk + if item.sample_rate != self.sample_rate: + y = resampler.resample_chunk(x=chunk) + + remaining = total_samples - produced_samples + y = y[:remaining] + produced_samples += len(y) + + yield y + + if produced_samples >= total_samples: + return + + if resampler is None: + return + + yield self._flush( + resampler=resampler, + remaining_samples=total_samples - produced_samples, + ) def get_value(self) -> np.ndarray: """Return the value of the audio data. @@ -313,18 +394,6 @@ def link(self, folder: Path) -> None: ) self.items = AudioData.from_files([file]).items - def _get_item_value(self, item: AudioItem) -> np.ndarray: - """Return the resampled (if needed) data from the audio item.""" - item_data = item.get_value() - if item.is_empty: - return item_data.repeat( - round(item.duration.total_seconds() * self.sample_rate), - axis=0, - ) - if item.sample_rate != self.sample_rate: - return resample(item_data, item.sample_rate, self.sample_rate) - return item_data - def split( self, nb_subdata: int = 2, diff --git a/src/osekit/core_api/audio_file.py b/src/osekit/core_api/audio_file.py index f825d376..f52c15c6 100644 --- a/src/osekit/core_api/audio_file.py +++ b/src/osekit/core_api/audio_file.py @@ -89,11 +89,8 @@ def read(self, start: Timestamp, stop: Timestamp) -> np.ndarray: """ start_sample, stop_sample = self.frames_indexes(start, stop) data = afm.read(self.path, start=start_sample, stop=stop_sample) - if len(data.shape) == 1: - return data.reshape( - data.shape[0], - 1, - ) # 2D array to match the format of multichannel audio + if data.ndim == 1: + return data[:, None] # 2D array to match the format of multichannel audio return data def frames_indexes(self, start: Timestamp, stop: Timestamp) -> tuple[int, int]: @@ -132,3 +129,33 @@ def move(self, folder: Path) -> None: """ afm.close() super().move(folder) + + def seek(self, frame: int) -> None: + """Seek the requested frame in the file. + + Parameters + ---------- + frame: int + Index of the frame to be seeked. + + """ + afm.seek(path=self.path, frame=frame) + + def stream(self, chunk_size: int) -> np.ndarray: + """Stream ``chunk_size`` frames from the audio file. + + Parameters + ---------- + chunk_size: int + Number of frames to stream from the audio file. + + Returns + ------- + np.ndarray: + A (``chunk_size``*``self.channels``) array of frames. + + """ + data = afm.stream(path=self.path, chunk_size=chunk_size) + if data.ndim == 1: + return data[:, None] # 2D array to match the format of multichannel audio + return data diff --git a/src/osekit/core_api/audio_item.py b/src/osekit/core_api/audio_item.py index 20fdacea..ef5bfcbb 100644 --- a/src/osekit/core_api/audio_item.py +++ b/src/osekit/core_api/audio_item.py @@ -2,6 +2,7 @@ from __future__ import annotations +from collections.abc import Generator from typing import TYPE_CHECKING import numpy as np @@ -64,3 +65,18 @@ def get_value(self) -> np.ndarray: if self.is_empty: return np.zeros((1, self.nb_channels)) return super().get_value() + + def stream(self, chunk_size: int) -> Generator[np.ndarray, None, None]: + start_frame, stop_frame = self.file.frames_indexes( + start=self.begin, + stop=self.end, + ) + + remaining = stop_frame - start_frame + + self.file.seek(frame=start_frame) + + while remaining > 0: + frames_to_read = min(chunk_size, remaining) + yield self.file.stream(chunk_size=frames_to_read) + remaining -= frames_to_read diff --git a/tests/test_audio.py b/tests/test_audio.py index 12e4f049..71659d05 100644 --- a/tests/test_audio.py +++ b/tests/test_audio.py @@ -222,6 +222,34 @@ def test_audio_file_read( assert np.allclose(files[0].read(start, stop)[:, 0], expected, atol=1e-7) +@pytest.mark.parametrize( + ("mocked_data", "expected_shape"), + [ + pytest.param(np.array([0, 1, 2, 3]), (4, 1), id="1d-to-2d"), + pytest.param(np.array([[0, 1], [2, 3], [4, 5], [6, 7]]), (4, 2), id="2d-to-2d"), + ], +) +def test_audio_file_stream_is_always_2d( + monkeypatch: pytest.MonkeyPatch, + mocked_data: np.ndarray, + expected_shape: tuple, +) -> None: + def mocked_stream(*args: None, **kwargs: None) -> np.ndarray: + return mocked_data + + monkeypatch.setattr("osekit.core_api.audio_file.afm.stream", mocked_stream) + + def mocked_init(self: AudioFile, *args: None, **kwargs: None) -> None: + self.path = Path() + self.begin = Timestamp("1996-04-15 00:00:00") + + monkeypatch.setattr(AudioFile, "__init__", mocked_init) + + af = AudioFile() + + assert af.stream(1024).shape == expected_shape + + def test_multichannel_audio_file_read(monkeypatch: pytest.MonkeyPatch) -> None: full_file = np.array([[1, 1, 1], [2, 2, 2], [3, 3, 3], [4, 4, 4], [5, 5, 5]]) @@ -716,6 +744,27 @@ def test_audio_resample_sample_count( assert data.get_value().shape[0] == expected_nb_samples +def test_audio_resample_different_samplerates(tmp_path: Path) -> None: + fs1 = 20 + fs2 = 10 + d1 = 1 + d2 = 1 + s1 = np.linspace(-1.0, 0.0, fs1 * d1) + s2 = np.linspace(0.0, 1.0, fs2 * d2) + + p1 = tmp_path / "s1.wav" + p2 = tmp_path / "s2.wav" + + sf.write(file=p1, data=s1, samplerate=fs1) + sf.write(file=p2, data=s2, samplerate=fs2) + + af1 = AudioFile(path=p1, begin=Timestamp("2020-01-01 00:00:00")) + af2 = AudioFile(path=p2, begin=Timestamp("2020-01-01 00:00:01")) + + ad = AudioData.from_files(files=[af1, af2], sample_rate=10) + assert ad.get_value().shape == (20, 1) + + @pytest.mark.parametrize( ("audio_files", "downsampling_quality", "upsampling_quality"), [