From 0010b90bfa232456b27942555928655327011596 Mon Sep 17 00:00:00 2001 From: effective-range Date: Fri, 15 May 2026 13:30:26 +0200 Subject: [PATCH 01/13] Add comprehensive tests for BlobFsCapture and BlobExtractor functionality - Implement tests for BlobFsCapture including file creation, magic number validation, image writing, and error handling for long filenames and large images. - Add tests for BlobExtractor to ensure correct extraction of valid images, handling of partial blobs, and resilience against corruption. - Introduce tests for pattern resolution in blob file naming and ensure correct behavior of the rotate function in CompositeBlobCapture. - Validate command-line interface behavior for blob extraction, including trigger frame annotations and blob information display. --- .fpm | 2 + .vscode/settings.json | 9 +- common_utility/__init__.py | 1 + common_utility/capture.py | 542 +++++++++++++++++++++++++++++++++ pyproject.toml | 2 + setup.cfg | 14 +- tests/captureTest.py | 605 +++++++++++++++++++++++++++++++++++++ 7 files changed, 1167 insertions(+), 8 deletions(-) create mode 100644 common_utility/capture.py create mode 100644 tests/captureTest.py diff --git a/.fpm b/.fpm index cbec0c9..dd458df 100644 --- a/.fpm +++ b/.fpm @@ -5,3 +5,5 @@ --log warn --python-bin python3 --python-package-name-prefix python3 +--depends python3-opencv +--depends python3-numpy \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index fbf984b..4e488b7 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -16,5 +16,12 @@ "--config=setup.cfg" ], "python.analysis.typeCheckingMode": "standard", - "python.testing.pytestArgs": [] + "python.testing.pytestArgs": [], + "python-envs.pythonProjects": [ + { + "path": ".", + "envManager": "ms-python.python:venv", + "packageManager": "ms-python.python:pip" + } + ] } diff --git a/common_utility/__init__.py b/common_utility/__init__.py index bf395b4..9d59e65 100644 --- a/common_utility/__init__.py +++ b/common_utility/__init__.py @@ -5,3 +5,4 @@ from .configLoader import * from .rateLimiter import * from .interfaceResolver import * +from .capture import * diff --git a/common_utility/capture.py b/common_utility/capture.py new file mode 100644 index 0000000..c7b7e45 --- /dev/null +++ b/common_utility/capture.py @@ -0,0 +1,542 @@ +from __future__ import annotations + +import argparse +import mmap +import re +import struct +import pathlib +import os +import datetime +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any, Optional + +if TYPE_CHECKING: + import numpy as np + from numpy.typing import NDArray + +from context_logger import get_logger + +log = get_logger("capture") + +# Timezone shift string shared with bird_recog for directory hierarchy and filenames +_offset = datetime.datetime.now().astimezone().utcoffset() +_tzshift = int(((_offset.total_seconds() if _offset is not None else 0) / 60 / 60) * 100) +tzshift_str = f"{'p' if _tzshift >= 0 else 'm'}{abs(_tzshift):04d}" + +MAGIC: int = 0xEFFEC51E +VERSION: int = 1 +MAX_FILENAME_BYTES: int = 1024 + +# Superblock: magic(I) version(I) num_slots(I) write_head(I) max_image_bytes(I) index_entry_size(I) + 40 pad = 64 bytes +SUPERBLOCK_FORMAT = " None: + parser.add_argument( + "--blob-capture-file", + default="capture.blob", + help="Name of the blob ring-buffer file, stored under the capture folder", + ) + parser.add_argument( + "--blob-num-slots", + default=60, + type=int, + help="Number of image slots in the blob ring buffer", + ) + parser.add_argument( + "--blob-max-image-bytes", + default=8 * 1024 * 1024, + type=int, + help="Maximum bytes per image slot in the blob ring buffer", + ) + + +class BlobCompletionHandler(ABC): + """Invoked when a rotated event blob has received all its follow-up writes.""" + + @abstractmethod + def __call__(self, event_id: str, blob: BlobFsCapture) -> None: ... + + +class NoOpBlobCompletionHandler(BlobCompletionHandler): + def __call__(self, event_id: str, blob: BlobFsCapture) -> None: + pass + + +def create_file_path_for_capture( + output_dir: str, + hierarchy_capture: bool, + dir_set: list[pathlib.Path], + capture_ts: datetime.datetime, + filename: str, +) -> pathlib.Path: + if hierarchy_capture: + storage_path = output_dir + capture_ts.strftime("/%Y-%m-%d/%H/%M") + storage_path = storage_path + f"_{tzshift_str}" + if not os.path.exists(storage_path): + os.makedirs(storage_path) + dir_set.append(pathlib.Path(storage_path)) + return pathlib.Path(os.path.join(storage_path, filename)) + return pathlib.Path(os.path.join(output_dir, filename)) + + +def _quality_to_compress_level(quality: int) -> int: + return max(0, min(9, round((100 - quality) * 9 / 100))) + + +_PATTERN_RE = re.compile(r"X+") + + +def _resolve_blob_pattern(pattern: pathlib.Path) -> pathlib.Path: + """Return the next sequential path for a pattern containing a run of 'X' chars. + + Globs the parent directory, finds the highest existing index, and returns a + path with that index + 1 (starting at 1 when no files exist yet). + Example: Path('/tmp/capture-XXXXX.blob') with 'capture-00001.blob' already + present → Path('/tmp/capture-00002.blob'). + If the name contains no 'X' run, the path is returned unchanged. + """ + match = _PATTERN_RE.search(pattern.name) + if match is None: + return pattern + width = len(match.group()) + prefix = pattern.name[: match.start()] + suffix = pattern.name[match.end() :] + parent = pattern.parent + + existing = sorted( + p for p in parent.glob(f"{prefix}{'?' * width}{suffix}") if p.name[len(prefix) : len(prefix) + width].isdigit() + ) + next_idx = int(existing[-1].name[len(prefix) : len(prefix) + width]) + 1 if existing else 1 + return parent / f"{prefix}{next_idx:0{width}d}{suffix}" + + +class ImageCaptureInterface(ABC): + @abstractmethod + def capture(self, image: NDArray[np.uint8], filename: str, flags: int = 0) -> None: ... + + @abstractmethod + def get_last_filepath(self) -> Optional[pathlib.Path]: ... + + @abstractmethod + def set_capture_ts(self, ts: datetime.datetime) -> None: ... + + @abstractmethod + def rotate(self, event_id: str, immediate: bool = False) -> ImageCaptureInterface: ... + + @abstractmethod + def active_blob_path(self) -> Optional[pathlib.Path]: ... + + @abstractmethod + def update_last_flags(self, flags: int) -> None: ... + + +class BlobFsCapture(ImageCaptureInterface): + def __init__( + self, + blob_path: pathlib.Path, + num_slots: int = 60, + max_image_bytes: int = 8 * 1024 * 1024, + ) -> None: + self._blob_path = _resolve_blob_pattern(blob_path) + self._num_slots = num_slots + self._max_image_bytes = max_image_bytes + self._index_base = SUPERBLOCK_SIZE + self._data_base = SUPERBLOCK_SIZE + num_slots * INDEX_ENTRY_SIZE + self._file_size = self._data_base + num_slots * max_image_bytes + self._file: Optional[Any] = None + self._mm: Optional[mmap.mmap] = None + self._write_head = 0 + self._open() + + def _open(self) -> None: + if not self._blob_path.exists(): + self._create() + else: + self._reopen() + + def _create(self) -> None: + self._blob_path.parent.mkdir(parents=True, exist_ok=True) + with open(self._blob_path, "wb") as f: + f.seek(self._file_size - 1) + f.write(b"\x00") + self._file = open(self._blob_path, "r+b") + self._mm = mmap.mmap(self._file.fileno(), self._file_size) + self._write_head = 0 + self._flush_superblock() + + def _reopen(self) -> None: + self._file = open(self._blob_path, "r+b") + actual_size = os.path.getsize(self._blob_path) + if actual_size != self._file_size: + self._file.close() + raise ValueError(f"Blob file size mismatch: expected {self._file_size}, got {actual_size}") + self._mm = mmap.mmap(self._file.fileno(), self._file_size) + fields = struct.unpack_from(SUPERBLOCK_FORMAT, self._mm, 0) + magic = fields[0] + if magic != MAGIC: + self._mm.close() + self._file.close() + raise ValueError(f"Invalid blob magic: 0x{magic:08X}, expected 0x{MAGIC:08X}") + self._write_head = fields[3] + + def _flush_superblock(self) -> None: + data = struct.pack( + SUPERBLOCK_FORMAT, + MAGIC, + VERSION, + self._num_slots, + self._write_head, + self._max_image_bytes, + INDEX_ENTRY_SIZE, + ) + assert self._mm is not None + self._mm[0:SUPERBLOCK_SIZE] = data + + def _slot_index_offset(self, slot: int) -> int: + return self._index_base + slot * INDEX_ENTRY_SIZE + + def _slot_data_offset(self, slot: int) -> int: + return self._data_base + slot * self._max_image_bytes + + def capture(self, image: NDArray[np.uint8], filename: str, flags: int = 0) -> None: + try: + import cv2 as _cv2 + except ImportError: + raise ImportError( + "cv2 is required for image capture. " + "Install it with: pip install python-common-utility[imaging]" + ) from None + assert self._mm is not None + filename_bytes = filename.encode("utf-8") + if len(filename_bytes) > MAX_FILENAME_BYTES: + raise ValueError(f"Filename too long: {len(filename_bytes)} bytes, max {MAX_FILENAME_BYTES}") + + ok, buf = _cv2.imencode(".png", image) + if not ok: + raise ValueError("cv2.imencode failed to encode image as PNG") + png_bytes = buf.tobytes() + + if len(png_bytes) > self._max_image_bytes: + raise ValueError(f"PNG too large: {len(png_bytes)} bytes, max {self._max_image_bytes}") + + slot = self._write_head + data_offset = self._slot_data_offset(slot) + idx_offset = self._slot_index_offset(slot) + + # Write image data into the slot's region + self._mm[data_offset : data_offset + len(png_bytes)] = png_bytes + + # Write index entry + filename_padded = filename_bytes.ljust(MAX_FILENAME_BYTES, b"\x00") + entry = struct.pack( + INDEX_ENTRY_FORMAT, + data_offset, + len(png_bytes), + flags, + len(filename_bytes), + 0, + filename_padded, + ) + self._mm[idx_offset : idx_offset + INDEX_ENTRY_SIZE] = entry + + # Advance write_head and persist to superblock + self._write_head = (self._write_head + 1) % self._num_slots + struct.pack_into(" Optional[pathlib.Path]: + return None + + def set_capture_ts(self, ts: datetime.datetime) -> None: + pass + + def active_blob_path(self) -> Optional[pathlib.Path]: + return self._blob_path + + def update_last_flags(self, flags: int) -> None: + assert self._mm is not None + last_slot = (self._write_head - 1) % self._num_slots + idx_offset = self._slot_index_offset(last_slot) + struct.pack_into(" BlobFsCapture: + """Rename the current blob to .blob and open a fresh one at the original path. + + The current file handle and mmap are transferred to a new BlobFsCapture wrapper (the + "old handle") which is returned to the caller. On Linux, renaming an open/mmap'd file + preserves the inode so the old handle remains valid after the rename. + """ + old: BlobFsCapture = object.__new__(BlobFsCapture) + old.__dict__.update(self.__dict__) + + renamed = self._blob_path.parent / f"{event_id}.blob" + os.rename(self._blob_path, renamed) + old._blob_path = renamed + + self._file = None + self._mm = None + self._write_head = 0 + self._open() + + return old + + def close(self) -> None: + if self._mm is not None: + self._mm.flush() + self._mm.close() + self._mm = None + if self._file is not None: + self._file.close() + self._file = None + + def __enter__(self) -> BlobFsCapture: + return self + + def __exit__(self, *args: Any) -> None: + self.close() + + +class CompositeBlobCapture(ImageCaptureInterface): + """Composite capture backend: one primary blob + a list of recently-rotated event blobs. + + After rotation each event blob receives `follow_up_count` additional frames (flags=0), + then is closed and the completion handler is invoked. + """ + + def __init__( + self, + blob_path: pathlib.Path, + num_slots: int, + max_image_bytes: int, + follow_up_count: int, + completion_handler: BlobCompletionHandler, + ) -> None: + self._blob = BlobFsCapture(blob_path, num_slots, max_image_bytes) + self._follow_up_count = follow_up_count + self._completion_handler = completion_handler + self._active: list[tuple[BlobFsCapture, str, int]] = [] + + def set_capture_ts(self, ts: datetime.datetime) -> None: + self._blob.set_capture_ts(ts) + + def get_last_filepath(self) -> Optional[pathlib.Path]: + return self._blob.get_last_filepath() + + def capture(self, image: NDArray[np.uint8], filename: str, flags: int = 0) -> None: + self._blob.capture(image, filename, flags) + next_active: list[tuple[BlobFsCapture, str, int]] = [] + for blob, event_id, remaining in self._active: + blob.capture(image, filename, 0) + remaining -= 1 + if remaining > 0: + next_active.append((blob, event_id, remaining)) + else: + blob.close() + self._completion_handler(event_id, blob) + self._active = next_active + + def rotate(self, event_id: str, immediate: bool = False) -> CompositeBlobCapture: + old = self._blob.rotate(event_id) + if immediate: + old.close() + self._completion_handler(event_id, old) + else: + self._active.append((old, event_id, self._follow_up_count)) + return self + + def close(self) -> None: + for blob, _event_id, _remaining in self._active: + blob.close() + self._active = [] + self._blob.close() + + def active_blob_path(self) -> Optional[pathlib.Path]: + return self._blob._blob_path + + def update_last_flags(self, flags: int) -> None: + self._blob.update_last_flags(flags) + + +class BlobExtractor: + def __init__(self, blob_path: pathlib.Path) -> None: + self._blob_path = blob_path + + def extract(self, dest_dir: pathlib.Path) -> list[pathlib.Path]: + try: + import cv2 as _cv2 + import numpy as _np + except ImportError: + raise ImportError( + "cv2 and numpy are required for blob extraction. " + "Install them with: pip install python-common-utility[imaging]" + ) from None + dest_dir.mkdir(parents=True, exist_ok=True) + data = self._blob_path.read_bytes() + + if len(data) < SUPERBLOCK_SIZE: + raise ValueError("File too small to contain a valid superblock") + + magic, _version, num_slots, _write_head, _max_image_bytes, index_entry_size = struct.unpack_from( + SUPERBLOCK_FORMAT, data, 0 + ) + if magic != MAGIC: + raise ValueError(f"Invalid blob magic: 0x{magic:08X}, expected 0x{MAGIC:08X}") + + extracted: list[pathlib.Path] = [] + for i in range(num_slots): + idx_offset = SUPERBLOCK_SIZE + i * index_entry_size + if idx_offset + index_entry_size > len(data): + break + + image_offset, image_size, _flags, filename_len, _reserved, filename_raw = struct.unpack_from( + INDEX_ENTRY_FORMAT, data, idx_offset + ) + + if image_size == 0: + continue + if image_offset + image_size > len(data): + continue + + png_bytes = data[image_offset : image_offset + image_size] + arr = _np.frombuffer(png_bytes, dtype=_np.uint8) + if _cv2.imdecode(arr, _cv2.IMREAD_COLOR) is None: + continue + + raw_fn = filename_raw[:filename_len] + try: + filename = raw_fn.decode("utf-8") + except UnicodeDecodeError: + filename = raw_fn.decode("latin-1") + + if not filename: + filename = f"slot_{i:04d}.png" + + dest_path = dest_dir / filename + dest_path.write_bytes(png_bytes) + extracted.append(dest_path) + + return extracted + + +def main() -> None: + import argparse as _argparse + import sys + + parser = _argparse.ArgumentParser( + prog="er-scarecrow-blob-extract", + description="Extract PNG images from a BlobFsCapture ring-buffer file.", + formatter_class=_argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument("blob", type=pathlib.Path, help="Path to the .blob file") + parser.add_argument( + "dest", + type=pathlib.Path, + nargs="?", + default=pathlib.Path("."), + help="Destination directory for extracted images (created if absent)", + ) + parser.add_argument( + "--info", + action="store_true", + help="Print blob metadata and slot summary without extracting images", + ) + args = parser.parse_args() + + if not args.blob.exists(): + print(f"error: blob file not found: {args.blob}", file=sys.stderr) + sys.exit(1) + + data = args.blob.read_bytes() + if len(data) < SUPERBLOCK_SIZE: + print("error: file too small to be a valid blob", file=sys.stderr) + sys.exit(1) + + magic, version, num_slots, write_head, max_image_bytes, index_entry_size = struct.unpack_from( + SUPERBLOCK_FORMAT, data, 0 + ) + if magic != MAGIC: + print(f"error: invalid magic 0x{magic:08X} (expected 0x{MAGIC:08X})", file=sys.stderr) + sys.exit(1) + + trigger_names = get_trigger_names_from_blob(data, num_slots, index_entry_size) + + if args.info: + print_blob_info(args, data, version, num_slots, write_head, max_image_bytes, index_entry_size, trigger_names) + return + + extracted = BlobExtractor(args.blob).extract(args.dest) + for path in sorted(extracted): + marker = " [TRIGGER_FRAME]" if path.name in trigger_names else "" + print(f"{path}{marker}") + print(f"\nextracted {len(extracted)} image(s) to {args.dest}", file=sys.stderr) + + +def print_blob_info( + args: Any, + data: bytes, + version: int, + num_slots: int, + write_head: int, + max_image_bytes: int, + index_entry_size: int, + trigger_names: set[str], +) -> None: + occupied = sum( + 1 for i in range(num_slots) if struct.unpack_from(" 0 + ) + + def _fmt(n: int) -> str: + if n >= 1024 * 1024: + return f"{n / 1024 / 1024:.1f} MiB ({n} bytes)" + return f"{n / 1024:.1f} KiB ({n} bytes)" + + print(f"blob: {args.blob}") + print(f"magic: {MAGIC:#010x}") + print(f"version: {version}") + print(f"num_slots: {num_slots} (write_head={write_head})") + print(f"max_image_bytes: {_fmt(max_image_bytes)}") + print(f"index_entry_size: {index_entry_size}") + print(f"file_size: {_fmt(len(data))}") + print(f"occupied_slots: {occupied}/{num_slots}") + for trigger in sorted(trigger_names): + print(f"trigger_frame: {trigger}") + + +def get_trigger_names_from_blob(data: bytes, num_slots: int, index_entry_size: int) -> set[str]: + trigger_names: set[str] = set() + for i in range(num_slots): + idx_offset = SUPERBLOCK_SIZE + i * index_entry_size + if idx_offset + index_entry_size > len(data): + break + _img_off, img_size, flags, fn_len, _res, fn_raw = struct.unpack_from(INDEX_ENTRY_FORMAT, data, idx_offset) + if img_size > 0 and flags & TRIGGER_FRAME: + raw_fn = fn_raw[:fn_len] + try: + fn = raw_fn.decode("utf-8") + except UnicodeDecodeError: + fn = raw_fn.decode("latin-1") + trigger_names.add(fn) + return trigger_names + + +def make_capture_backend( + args: argparse.Namespace, + capture_folder: pathlib.Path, + follow_up_count: int, + completion_handler: Optional[BlobCompletionHandler] = None, +) -> ImageCaptureInterface: + blob_path = capture_folder / args.blob_capture_file + num_slots: int = args.blob_num_slots + max_image_bytes: int = args.blob_max_image_bytes + handler = completion_handler if completion_handler is not None else NoOpBlobCompletionHandler() + return CompositeBlobCapture(blob_path, num_slots, max_image_bytes, follow_up_count, handler) diff --git a/pyproject.toml b/pyproject.toml index 875ad7d..95b1520 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,8 @@ dependencies = [ "pydantic", "jinja2", "netifaces", + "opencv-python", + "numpy", "python-context-logger @ git+https://github.com/EffectiveRange/python-context-logger.git@latest", ] dynamic = ["version"] diff --git a/setup.cfg b/setup.cfg index 5bea32d..235b84f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -5,15 +5,15 @@ packaging = [mypy] packages = common_utility -strict = True +strict = true [flake8] exclude = build,dist,.eggs,.venv max-line-length = 120 max-complexity = 10 -count = True -statistics = True -show-source = True +count = true +statistics = true +show-source = true per-file-ignores = # F401: imported but unused # F403: import * used; unable to detect undefined names @@ -26,7 +26,7 @@ python_classes = *Test [coverage:run] relative_files = true -branch = True +branch = true source = common_utility [coverage:report] @@ -47,8 +47,8 @@ exclude_also = ; Don't complain about abstract methods, they aren't run: @(abc\.)?abstractmethod -ignore_errors = True -skip_empty = True +ignore_errors = true +skip_empty = true [coverage:html] directory = coverage/html diff --git a/tests/captureTest.py b/tests/captureTest.py new file mode 100644 index 0000000..d5ba13e --- /dev/null +++ b/tests/captureTest.py @@ -0,0 +1,605 @@ +import argparse +import datetime +import io +import pathlib +import struct +import sys + +import numpy as np +import pytest + +from unittest.mock import MagicMock, patch + +from common_utility.capture import ( + MAGIC, + SUPERBLOCK_FORMAT, + SUPERBLOCK_SIZE, + INDEX_ENTRY_FORMAT, + INDEX_ENTRY_SIZE, + TRIGGER_FRAME, + BlobCompletionHandler, + NoOpBlobCompletionHandler, + CompositeBlobCapture, + BlobFsCapture, + BlobExtractor, + MAX_FILENAME_BYTES, + _resolve_blob_pattern, + add_args, + create_file_path_for_capture, + make_capture_backend, + main as blob_extract_main, +) + + +def _make_image(h: int = 64, w: int = 64) -> np.ndarray: + rng = np.random.default_rng(42) + return rng.integers(0, 255, (h, w, 3), dtype=np.uint8) + + +# --------------------------------------------------------------------------- +# BlobFsCapture +# --------------------------------------------------------------------------- + + +class BlobFsCaptureTest: + def test_creates_blob_file_with_correct_size(self, tmp_path: pathlib.Path) -> None: + num_slots = 4 + max_bytes = 512 * 1024 + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=num_slots, max_image_bytes=max_bytes) + blob.close() + expected = SUPERBLOCK_SIZE + num_slots * INDEX_ENTRY_SIZE + num_slots * max_bytes + assert (tmp_path / "cap.blob").stat().st_size == expected + + def test_superblock_magic_on_new_file(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=256 * 1024) + blob.close() + data = (tmp_path / "cap.blob").read_bytes() + magic = struct.unpack_from(" None: + path = tmp_path / "cap.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) + blob.close() + # Corrupt magic + raw = bytearray(path.read_bytes()) + struct.pack_into(" None: + num_slots = 3 + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=num_slots, max_image_bytes=512 * 1024) + images = [_make_image() for _ in range(num_slots + 1)] + filenames = [f"frame_{i}.png" for i in range(num_slots + 1)] + for img, fn in zip(images, filenames): + blob.capture(img, fn) + blob.close() + + # write_head should have wrapped: after 4 writes into 3 slots, write_head == 1 + data = (tmp_path / "cap.blob").read_bytes() + _, _, _, write_head, _, _ = struct.unpack_from(SUPERBLOCK_FORMAT, data, 0) + assert write_head == 1 + + # Slot 0 should contain the 4th image (index 3) + idx0_offset = SUPERBLOCK_SIZE + img_offset, img_size, _flags, fn_len, _res, fn_raw = struct.unpack_from(INDEX_ENTRY_FORMAT, data, idx0_offset) + fn = fn_raw[:fn_len].decode("utf-8") + assert fn == "frame_3.png" + + def test_filename_too_long_raises(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=256 * 1024) + with pytest.raises(ValueError, match="too long"): + blob.capture(_make_image(), "x" * (MAX_FILENAME_BYTES + 1)) + blob.close() + + def test_image_too_large_raises(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=1) + with pytest.raises(ValueError, match="too large"): + blob.capture(_make_image(), "frame.png") + blob.close() + + def test_context_manager(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "cap.blob" + with BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) as blob: + blob.capture(_make_image(), "frame.png") + assert path.exists() + + def test_flags_stored_in_index(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "frame.png", flags=0xCAFEBABE) + blob.close() + data = (tmp_path / "cap.blob").read_bytes() + _img_off, _img_sz, flags, _fn_len, _res, _fn_raw = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE) + assert flags == 0xCAFEBABE + + def test_update_last_flags_overwrites_flags_field(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "frame.png", flags=0) + blob.update_last_flags(TRIGGER_FRAME) + blob.close() + data = (tmp_path / "cap.blob").read_bytes() + _img_off, _img_sz, flags, _fn_len, _res, _fn_raw = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE) + assert flags == TRIGGER_FRAME + + def test_update_last_flags_targets_most_recent_slot(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "first.png", flags=0) + blob.capture(_make_image(), "second.png", flags=0) + blob.update_last_flags(TRIGGER_FRAME) + blob.close() + data = (tmp_path / "cap.blob").read_bytes() + # slot 0 (first) must stay 0 + _a, _b, flags0, *_ = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE) + assert flags0 == 0 + # slot 1 (second/last) must have TRIGGER_FRAME + _a, _b, flags1, *_ = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE + INDEX_ENTRY_SIZE) + assert flags1 == TRIGGER_FRAME + + +# --------------------------------------------------------------------------- +# BlobExtractor +# --------------------------------------------------------------------------- + + +class BlobExtractorTest: + def _write_blob(self, tmp_path: pathlib.Path, images_and_names: list[tuple]) -> pathlib.Path: + path = tmp_path / "cap.blob" + blob = BlobFsCapture(path, num_slots=5, max_image_bytes=512 * 1024) + for img, fn in images_and_names: + blob.capture(img, fn) + blob.close() + return path + + def test_extract_valid_images(self, tmp_path: pathlib.Path) -> None: + names = ["a.png", "b.png", "c.png"] + path = self._write_blob(tmp_path, [(_make_image(), n) for n in names]) + dest = tmp_path / "out" + extracted = BlobExtractor(path).extract(dest) + assert len(extracted) == 3 + assert {p.name for p in extracted} == set(names) + for p in extracted: + assert p.exists() + + def test_partial_blob_is_decodable(self, tmp_path: pathlib.Path) -> None: + """A blob with fewer written entries than its slot capacity must still decode correctly.""" + num_slots = 5 + written_names = ["frame_0.png", "frame_1.png", "frame_2.png"] + path = tmp_path / "partial.blob" + + blob = BlobFsCapture(path, num_slots=num_slots, max_image_bytes=512 * 1024) + for name in written_names: + blob.capture(_make_image(), name) + blob.close() + + extracted = BlobExtractor(path).extract(tmp_path / "out") + assert len(extracted) == len(written_names) + assert {p.name for p in extracted} == set(written_names) + + def test_extract_creates_dest_dir(self, tmp_path: pathlib.Path) -> None: + path = self._write_blob(tmp_path, [(_make_image(), "a.png")]) + dest = tmp_path / "new" / "subdir" + BlobExtractor(path).extract(dest) + assert dest.exists() + + def test_extract_invalid_magic_raises(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "bad.blob" + path.write_bytes(b"\xff" * 64) + with pytest.raises(ValueError, match="magic"): + BlobExtractor(path).extract(tmp_path / "out") + + def test_corruption_resilience(self, tmp_path: pathlib.Path) -> None: + """Simulates two interrupted writes; BlobExtractor must extract the 3 valid images.""" + path = tmp_path / "cap.blob" + blob = BlobFsCapture(path, num_slots=5, max_image_bytes=512 * 1024) + for i in range(3): + blob.capture(_make_image(), f"valid_{i}.png") + blob.close() + + raw = bytearray(path.read_bytes()) + + # Scenario A: slot 3 — advance write_head in superblock but leave index entry zeroed + struct.pack_into(" None: + blob = BlobFsCapture(tmp_path / "cap-XXX.blob", num_slots=4, max_image_bytes=256 * 1024) + blob.close() + assert (tmp_path / "cap-001.blob").exists() + + def test_pattern_skips_existing_files(self, tmp_path: pathlib.Path) -> None: + (tmp_path / "cap-001.blob").write_bytes(b"x") + (tmp_path / "cap-002.blob").write_bytes(b"x") + blob = BlobFsCapture(tmp_path / "cap-XXX.blob", num_slots=4, max_image_bytes=256 * 1024) + blob.close() + assert (tmp_path / "cap-003.blob").exists() + + def test_no_pattern_uses_literal_path(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "literal.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) + blob.close() + assert path.exists() + + def test_pattern_width_determines_padding(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap-XXXXX.blob", num_slots=4, max_image_bytes=256 * 1024) + blob.close() + assert (tmp_path / "cap-00001.blob").exists() + + +# --------------------------------------------------------------------------- +# rotate() +# --------------------------------------------------------------------------- + + +class RotateTest: + def test_blob_rotate_renames_file_to_event_id(self, tmp_path: pathlib.Path) -> None: + original = tmp_path / "cap-XXX.blob" + blob = BlobFsCapture(original, num_slots=4, max_image_bytes=512 * 1024) + resolved = blob._blob_path # e.g. cap-001.blob + blob.capture(_make_image(), "frame0.png") + old = blob.rotate("my-event-id") + try: + # The original file was renamed to my-event-id.blob + assert (tmp_path / "my-event-id.blob").exists() + assert old._blob_path == tmp_path / "my-event-id.blob" + # The primary blob reopened at the original resolved path + assert blob._blob_path == resolved + assert resolved.exists() + finally: + old.close() + blob.close() + + def test_blob_rotate_old_handle_retains_data(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap-XXX.blob", num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "before_rotate.png") + old = blob.rotate("event-abc") + try: + blob.capture(_make_image(), "after_rotate.png") + blob.close() + old.close() + # Old handle (renamed to event-abc.blob): contains before_rotate.png + dest = tmp_path / "from_old" + extracted_old = BlobExtractor(tmp_path / "event-abc.blob").extract(dest) + assert {p.name for p in extracted_old} == {"before_rotate.png"} + # New blob (original path): contains after_rotate.png + dest2 = tmp_path / "from_new" + extracted_new = BlobExtractor(tmp_path / "cap-001.blob").extract(dest2) + assert {p.name for p in extracted_new} == {"after_rotate.png"} + finally: + pass # already closed above + + def test_blob_rotate_multiple_times(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap-XX.blob", num_slots=4, max_image_bytes=512 * 1024) + event_ids = ["evt-a", "evt-b", "evt-c"] + handles = [] + for i, eid in enumerate(event_ids): + blob.capture(_make_image(), f"frame{i}.png") + handles.append(blob.rotate(eid)) + blob.close() + for h in handles: + h.close() + # Three event blobs renamed + the primary still exists at cap-01.blob + for eid in event_ids: + assert (tmp_path / f"{eid}.blob").exists() + assert (tmp_path / "cap-01.blob").exists() + + +# --------------------------------------------------------------------------- +# CompositeBlobCapture +# --------------------------------------------------------------------------- + + +class CompositeBlobCaptureTest: + def _make_composite( + self, + tmp_path: pathlib.Path, + follow_up: int = 2, + handler: BlobCompletionHandler | None = None, + ) -> CompositeBlobCapture: + if handler is None: + handler = NoOpBlobCompletionHandler() + return CompositeBlobCapture( + tmp_path / "cap-XXX.blob", + num_slots=8, + max_image_bytes=512 * 1024, + follow_up_count=follow_up, + completion_handler=handler, + ) + + def test_capture_writes_to_primary(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path) + comp.capture(_make_image(), "frame.png") + comp.close() + blobs = list(tmp_path.glob("*.blob")) + assert len(blobs) == 1 + extracted = BlobExtractor(blobs[0]).extract(tmp_path / "out") + assert len(extracted) == 1 + + def test_rotate_renames_file_to_event_id(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path) + comp.capture(_make_image(), "trigger.png") + comp.rotate("fire-uuid") + comp.close() + assert (tmp_path / "fire-uuid.blob").exists() + + def test_follow_up_writes_reach_rotated_blob(self, tmp_path: pathlib.Path) -> None: + follow_up = 3 + comp = self._make_composite(tmp_path, follow_up=follow_up) + comp.capture(_make_image(), "trigger.png") + comp.rotate("ev1") + for i in range(follow_up): + comp.capture(_make_image(), f"followup{i}.png") + comp.close() + # Event blob should contain: trigger frame + follow_up frames + extracted = BlobExtractor(tmp_path / "ev1.blob").extract(tmp_path / "out") + assert len(extracted) == 1 + follow_up + + def test_completion_handler_called_with_event_id_and_blob(self, tmp_path: pathlib.Path) -> None: + handler = MagicMock(spec=BlobCompletionHandler) + follow_up = 2 + comp = self._make_composite(tmp_path, follow_up=follow_up, handler=handler) + comp.capture(_make_image(), "trigger.png") + comp.rotate("ev-done") + for _ in range(follow_up): + comp.capture(_make_image(), "extra.png") + comp.close() + handler.assert_called_once() + call_event_id, call_blob = handler.call_args[0] + assert call_event_id == "ev-done" + assert isinstance(call_blob, BlobFsCapture) + + def test_no_completion_on_regular_close(self, tmp_path: pathlib.Path) -> None: + handler = MagicMock(spec=BlobCompletionHandler) + comp = self._make_composite(tmp_path, follow_up=5, handler=handler) + comp.capture(_make_image(), "trigger.png") + comp.rotate("ev-early") + # close without exhausting follow-up quota + comp.capture(_make_image(), "one.png") + comp.close() + handler.assert_not_called() + + def test_trigger_frame_flag_in_primary_pre_rotation(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path, follow_up=2) + comp.capture(_make_image(), "trigger.png", flags=TRIGGER_FRAME) + primary_path = comp._blob._blob_path + comp.rotate("ev-flag") + comp.close() + # The trigger frame is in the event blob (the file that was the primary before rotation) + data = (tmp_path / "ev-flag.blob").read_bytes() + _img_off, _img_sz, flags, *_ = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE) + assert flags == TRIGGER_FRAME + # New primary blob has no entries yet (fresh) + primary_data = primary_path.read_bytes() + _magic2, _ver2, _slots2, wh2, *_ = struct.unpack_from(SUPERBLOCK_FORMAT, primary_data, 0) + assert wh2 == 0 + + def test_follow_up_entries_have_zero_flags(self, tmp_path: pathlib.Path) -> None: + follow_up = 2 + comp = self._make_composite(tmp_path, follow_up=follow_up) + comp.capture(_make_image(), "trigger.png", flags=TRIGGER_FRAME) + comp.rotate("ev-flags") + for i in range(follow_up): + comp.capture(_make_image(), f"followup{i}.png") + comp.close() + data = (tmp_path / "ev-flags.blob").read_bytes() + # Check all follow-up entries have flags == 0 + for slot in range(1, 1 + follow_up): + offset = SUPERBLOCK_SIZE + slot * INDEX_ENTRY_SIZE + _img_off, _img_sz, flags, *_ = struct.unpack_from(INDEX_ENTRY_FORMAT, data, offset) + assert flags == 0 + + def test_update_last_flags_sets_flag_on_primary_blob(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path) + comp.capture(_make_image(), "frame.png", flags=0) + comp.update_last_flags(TRIGGER_FRAME) + primary_path = comp._blob._blob_path + comp.close() + data = primary_path.read_bytes() + _img_off, _img_sz, flags, *_ = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE) + assert flags == TRIGGER_FRAME + + def test_active_blob_path_returns_primary_path(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path) + assert comp.active_blob_path() == comp._blob._blob_path + comp.close() + + def test_active_blob_path_updates_after_rotate(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path) + original_primary = comp._blob._blob_path + comp.capture(_make_image(), "trigger.png") + comp.rotate("ev-rotate") + # After rotation the primary blob has been reopened at the original path + assert comp.active_blob_path() == original_primary + comp.close() + + def test_rotate_immediate_calls_handler_right_away(self, tmp_path: pathlib.Path) -> None: + handler = MagicMock(spec=BlobCompletionHandler) + comp = self._make_composite(tmp_path, follow_up=3, handler=handler) + comp.capture(_make_image(), "trigger.png") + comp.rotate("ev-imm", immediate=True) + # Handler must be called immediately — no follow-up frames needed + handler.assert_called_once() + call_event_id, call_blob = handler.call_args[0] + assert call_event_id == "ev-imm" + assert isinstance(call_blob, BlobFsCapture) + comp.close() + + def test_rotate_immediate_does_not_add_to_active(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path, follow_up=3) + comp.capture(_make_image(), "trigger.png") + comp.rotate("ev-imm2", immediate=True) + assert len(comp._active) == 0 + comp.close() + + def test_rotate_immediate_does_not_deliver_follow_up_frames(self, tmp_path: pathlib.Path) -> None: + handler = MagicMock(spec=BlobCompletionHandler) + comp = self._make_composite(tmp_path, follow_up=3, handler=handler) + comp.capture(_make_image(), "trigger.png") + comp.rotate("ev-imm3", immediate=True) + # Additional frames go only to the new primary — handler stays called exactly once + for i in range(3): + comp.capture(_make_image(), f"extra{i}.png") + comp.close() + handler.assert_called_once() + + +# --------------------------------------------------------------------------- +# add_args / make_capture_backend / create_file_path_for_capture +# --------------------------------------------------------------------------- + + +class CaptureFactoryTest: + def test_add_args_registers_defaults(self) -> None: + parser = argparse.ArgumentParser() + add_args(parser) + args = parser.parse_args([]) + assert args.blob_capture_file == "capture.blob" + assert args.blob_num_slots == 60 + assert args.blob_max_image_bytes == 8 * 1024 * 1024 + + def test_add_args_allows_overrides(self) -> None: + parser = argparse.ArgumentParser() + add_args(parser) + args = parser.parse_args(["--blob-capture-file", "my.blob", "--blob-num-slots", "10", "--blob-max-image-bytes", "1024"]) + assert args.blob_capture_file == "my.blob" + assert args.blob_num_slots == 10 + assert args.blob_max_image_bytes == 1024 + + def test_make_capture_backend_returns_composite(self, tmp_path: pathlib.Path) -> None: + parser = argparse.ArgumentParser() + add_args(parser) + args = parser.parse_args([]) + backend = make_capture_backend(args, tmp_path, follow_up_count=5) + assert isinstance(backend, CompositeBlobCapture) + backend.close() + + def test_make_capture_backend_uses_completion_handler(self, tmp_path: pathlib.Path) -> None: + parser = argparse.ArgumentParser() + add_args(parser) + args = parser.parse_args([]) + handler = MagicMock(spec=BlobCompletionHandler) + backend = make_capture_backend(args, tmp_path, follow_up_count=1, completion_handler=handler) + assert isinstance(backend, CompositeBlobCapture) + backend.capture(_make_image(), "t.png") + backend.rotate("ev") + backend.capture(_make_image(), "f.png") + backend.close() + handler.assert_called_once() + + def test_create_file_path_without_hierarchy(self, tmp_path: pathlib.Path) -> None: + dir_set: list[pathlib.Path] = [] + ts = datetime.datetime(2024, 6, 15, 10, 30) + result = create_file_path_for_capture(str(tmp_path), False, dir_set, ts, "frame.png") + assert result == tmp_path / "frame.png" + assert dir_set == [] + + def test_create_file_path_with_hierarchy(self, tmp_path: pathlib.Path) -> None: + dir_set: list[pathlib.Path] = [] + ts = datetime.datetime(2024, 6, 15, 10, 30) + result = create_file_path_for_capture(str(tmp_path), True, dir_set, ts, "frame.png") + assert "2024-06-15" in str(result) + assert result.name == "frame.png" + assert len(dir_set) == 1 + assert dir_set[0].exists() + + def test_create_file_path_with_hierarchy_reuses_existing_dir(self, tmp_path: pathlib.Path) -> None: + ts = datetime.datetime(2024, 6, 15, 10, 30) + dir_set1: list[pathlib.Path] = [] + create_file_path_for_capture(str(tmp_path), True, dir_set1, ts, "a.png") + dir_set2: list[pathlib.Path] = [] + create_file_path_for_capture(str(tmp_path), True, dir_set2, ts, "b.png") + # Second call must not append to dir_set since directory already exists + assert dir_set2 == [] + + +# --------------------------------------------------------------------------- +# blob-extract CLI — trigger frame annotation +# --------------------------------------------------------------------------- + + +def _run_extract_cli(blob_path: pathlib.Path, dest: pathlib.Path, info: bool = False) -> str: + """Run the extract CLI and return captured stdout.""" + stdout = io.StringIO() + argv = ["blob-extract", str(blob_path), str(dest)] + if info: + argv.append("--info") + with patch("sys.argv", argv), patch("sys.stdout", stdout), patch("sys.stderr", io.StringIO()): + blob_extract_main() + return stdout.getvalue() + + +class BlobExtractCliTest: + def _write_blob(self, tmp_path: pathlib.Path) -> pathlib.Path: + path = tmp_path / "test.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "before.png", flags=0) + blob.capture(_make_image(), "trigger.png", flags=TRIGGER_FRAME) + blob.capture(_make_image(), "after.png", flags=0) + blob.close() + return path + + def test_trigger_frame_line_is_annotated(self, tmp_path: pathlib.Path) -> None: + blob_path = self._write_blob(tmp_path) + output = _run_extract_cli(blob_path, tmp_path / "out") + lines = {line.strip() for line in output.splitlines() if line.strip()} + assert any("trigger.png" in line and "[TRIGGER_FRAME]" in line for line in lines) + + def test_non_trigger_lines_have_no_annotation(self, tmp_path: pathlib.Path) -> None: + blob_path = self._write_blob(tmp_path) + output = _run_extract_cli(blob_path, tmp_path / "out") + for line in output.splitlines(): + if "before.png" in line or "after.png" in line: + assert "[TRIGGER_FRAME]" not in line + + def test_no_trigger_frame_no_annotation(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "plain.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "frame0.png", flags=0) + blob.capture(_make_image(), "frame1.png", flags=0) + blob.close() + output = _run_extract_cli(path, tmp_path / "out") + assert "[TRIGGER_FRAME]" not in output + + def test_blob_info(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "info.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "frame0.png", flags=0) + blob.capture(_make_image(), "frame1.png", flags=TRIGGER_FRAME) + blob.close() + output = _run_extract_cli(path, tmp_path / "out", info=True) + + assert f"blob: {path}" in output + assert f"magic: {MAGIC:#010x}" in output + assert "version: 1" in output + assert "num_slots: 4 (write_head=2)" in output + assert "max_image_bytes: 512.0 KiB (524288 bytes)" in output + assert "occupied_slots: 2/4" in output + assert "trigger_frame: frame1.png" in output + assert not (tmp_path / "out").exists() From 5c3ab705cadef7ed6fd998a40a637c9f557183bf Mon Sep 17 00:00:00 2001 From: effective-range Date: Fri, 15 May 2026 13:38:56 +0200 Subject: [PATCH 02/13] Refactor capture module: streamline imports and remove unused file path creation functions --- common_utility/capture.py | 45 ++++++--------------------------------- pyproject.toml | 9 ++++++++ tests/captureTest.py | 28 +----------------------- 3 files changed, 17 insertions(+), 65 deletions(-) diff --git a/common_utility/capture.py b/common_utility/capture.py index c7b7e45..c8c6e2e 100644 --- a/common_utility/capture.py +++ b/common_utility/capture.py @@ -8,11 +8,11 @@ import os import datetime from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Any, Optional +from typing import Any, Optional -if TYPE_CHECKING: - import numpy as np - from numpy.typing import NDArray +import cv2 +import numpy as np +from numpy.typing import NDArray from context_logger import get_logger @@ -70,22 +70,6 @@ def __call__(self, event_id: str, blob: BlobFsCapture) -> None: pass -def create_file_path_for_capture( - output_dir: str, - hierarchy_capture: bool, - dir_set: list[pathlib.Path], - capture_ts: datetime.datetime, - filename: str, -) -> pathlib.Path: - if hierarchy_capture: - storage_path = output_dir + capture_ts.strftime("/%Y-%m-%d/%H/%M") - storage_path = storage_path + f"_{tzshift_str}" - if not os.path.exists(storage_path): - os.makedirs(storage_path) - dir_set.append(pathlib.Path(storage_path)) - return pathlib.Path(os.path.join(storage_path, filename)) - return pathlib.Path(os.path.join(output_dir, filename)) - def _quality_to_compress_level(quality: int) -> int: return max(0, min(9, round((100 - quality) * 9 / 100))) @@ -207,19 +191,12 @@ def _slot_data_offset(self, slot: int) -> int: return self._data_base + slot * self._max_image_bytes def capture(self, image: NDArray[np.uint8], filename: str, flags: int = 0) -> None: - try: - import cv2 as _cv2 - except ImportError: - raise ImportError( - "cv2 is required for image capture. " - "Install it with: pip install python-common-utility[imaging]" - ) from None assert self._mm is not None filename_bytes = filename.encode("utf-8") if len(filename_bytes) > MAX_FILENAME_BYTES: raise ValueError(f"Filename too long: {len(filename_bytes)} bytes, max {MAX_FILENAME_BYTES}") - ok, buf = _cv2.imencode(".png", image) + ok, buf = cv2.imencode(".png", image) if not ok: raise ValueError("cv2.imencode failed to encode image as PNG") png_bytes = buf.tobytes() @@ -372,14 +349,6 @@ def __init__(self, blob_path: pathlib.Path) -> None: self._blob_path = blob_path def extract(self, dest_dir: pathlib.Path) -> list[pathlib.Path]: - try: - import cv2 as _cv2 - import numpy as _np - except ImportError: - raise ImportError( - "cv2 and numpy are required for blob extraction. " - "Install them with: pip install python-common-utility[imaging]" - ) from None dest_dir.mkdir(parents=True, exist_ok=True) data = self._blob_path.read_bytes() @@ -408,8 +377,8 @@ def extract(self, dest_dir: pathlib.Path) -> list[pathlib.Path]: continue png_bytes = data[image_offset : image_offset + image_size] - arr = _np.frombuffer(png_bytes, dtype=_np.uint8) - if _cv2.imdecode(arr, _cv2.IMREAD_COLOR) is None: + arr = np.frombuffer(png_bytes, dtype=np.uint8) + if cv2.imdecode(arr, cv2.IMREAD_COLOR) is None: continue raw_fn = filename_raw[:filename_len] diff --git a/pyproject.toml b/pyproject.toml index 95b1520..db59685 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,12 @@ dependencies = [ ] dynamic = ["version"] +[project.optional-dependencies] +dev = [ + "mypy", + "flake8", +] + [tool.setuptools] package-dir = {"" = "."} packages = ["common_utility", "test_utility"] @@ -34,6 +40,9 @@ build-backend = "setuptools.build_meta" version_scheme = "guess-next-dev" local_scheme = "node-and-date" +[tool.black] +line-length = 120 + [tool.pytest.ini_options] addopts = ["--verbose", "--capture=no"] python_files = ["*Test.py"] diff --git a/tests/captureTest.py b/tests/captureTest.py index d5ba13e..cbe88f4 100644 --- a/tests/captureTest.py +++ b/tests/captureTest.py @@ -23,9 +23,7 @@ BlobFsCapture, BlobExtractor, MAX_FILENAME_BYTES, - _resolve_blob_pattern, add_args, - create_file_path_for_capture, make_capture_backend, main as blob_extract_main, ) @@ -471,7 +469,7 @@ def test_rotate_immediate_does_not_deliver_follow_up_frames(self, tmp_path: path # --------------------------------------------------------------------------- -# add_args / make_capture_backend / create_file_path_for_capture +# add_args / make_capture_backend # --------------------------------------------------------------------------- @@ -513,30 +511,6 @@ def test_make_capture_backend_uses_completion_handler(self, tmp_path: pathlib.Pa backend.close() handler.assert_called_once() - def test_create_file_path_without_hierarchy(self, tmp_path: pathlib.Path) -> None: - dir_set: list[pathlib.Path] = [] - ts = datetime.datetime(2024, 6, 15, 10, 30) - result = create_file_path_for_capture(str(tmp_path), False, dir_set, ts, "frame.png") - assert result == tmp_path / "frame.png" - assert dir_set == [] - - def test_create_file_path_with_hierarchy(self, tmp_path: pathlib.Path) -> None: - dir_set: list[pathlib.Path] = [] - ts = datetime.datetime(2024, 6, 15, 10, 30) - result = create_file_path_for_capture(str(tmp_path), True, dir_set, ts, "frame.png") - assert "2024-06-15" in str(result) - assert result.name == "frame.png" - assert len(dir_set) == 1 - assert dir_set[0].exists() - - def test_create_file_path_with_hierarchy_reuses_existing_dir(self, tmp_path: pathlib.Path) -> None: - ts = datetime.datetime(2024, 6, 15, 10, 30) - dir_set1: list[pathlib.Path] = [] - create_file_path_for_capture(str(tmp_path), True, dir_set1, ts, "a.png") - dir_set2: list[pathlib.Path] = [] - create_file_path_for_capture(str(tmp_path), True, dir_set2, ts, "b.png") - # Second call must not append to dir_set since directory already exists - assert dir_set2 == [] # --------------------------------------------------------------------------- From cd4c4d3a82fd0e23b828d1e3af3ea8efb0326c47 Mon Sep 17 00:00:00 2001 From: effective-range Date: Fri, 15 May 2026 13:39:30 +0200 Subject: [PATCH 03/13] Clean up settings.json by removing unnecessary black-formatter arguments and tidy up capture.py by removing an empty line --- .vscode/settings.json | 4 +--- common_utility/capture.py | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 4e488b7..d22bbb3 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -12,9 +12,7 @@ "black-formatter.interpreter": [ "${workspaceFolder}/.venv/bin/python3" ], - "black-formatter.args": [ - "--config=setup.cfg" - ], + "black-formatter.args": [], "python.analysis.typeCheckingMode": "standard", "python.testing.pytestArgs": [], "python-envs.pythonProjects": [ diff --git a/common_utility/capture.py b/common_utility/capture.py index c8c6e2e..3c7aab8 100644 --- a/common_utility/capture.py +++ b/common_utility/capture.py @@ -70,7 +70,6 @@ def __call__(self, event_id: str, blob: BlobFsCapture) -> None: pass - def _quality_to_compress_level(quality: int) -> int: return max(0, min(9, round((100 - quality) * 9 / 100))) From 4349f71a137701b806e134faa0edba4c241d8e2a Mon Sep 17 00:00:00 2001 From: effective-range Date: Fri, 15 May 2026 13:52:06 +0200 Subject: [PATCH 04/13] Refactor capture module: replace TRIGGER_FRAME with TRIGGER_FRAME_FLAG and clean up unused code --- common_utility/capture.py | 62 ++++++--------------------------------- setup.cfg | 1 + tests/captureTest.py | 59 +++++++++++++------------------------ 3 files changed, 31 insertions(+), 91 deletions(-) diff --git a/common_utility/capture.py b/common_utility/capture.py index 3c7aab8..841164f 100644 --- a/common_utility/capture.py +++ b/common_utility/capture.py @@ -2,10 +2,10 @@ import argparse import mmap -import re import struct import pathlib import os +import sys import datetime from abc import ABC, abstractmethod from typing import Any, Optional @@ -14,15 +14,6 @@ import numpy as np from numpy.typing import NDArray -from context_logger import get_logger - -log = get_logger("capture") - -# Timezone shift string shared with bird_recog for directory hierarchy and filenames -_offset = datetime.datetime.now().astimezone().utcoffset() -_tzshift = int(((_offset.total_seconds() if _offset is not None else 0) / 60 / 60) * 100) -tzshift_str = f"{'p' if _tzshift >= 0 else 'm'}{abs(_tzshift):04d}" - MAGIC: int = 0xEFFEC51E VERSION: int = 1 MAX_FILENAME_BYTES: int = 1024 @@ -35,7 +26,7 @@ INDEX_ENTRY_FORMAT = " None: @@ -70,37 +61,6 @@ def __call__(self, event_id: str, blob: BlobFsCapture) -> None: pass -def _quality_to_compress_level(quality: int) -> int: - return max(0, min(9, round((100 - quality) * 9 / 100))) - - -_PATTERN_RE = re.compile(r"X+") - - -def _resolve_blob_pattern(pattern: pathlib.Path) -> pathlib.Path: - """Return the next sequential path for a pattern containing a run of 'X' chars. - - Globs the parent directory, finds the highest existing index, and returns a - path with that index + 1 (starting at 1 when no files exist yet). - Example: Path('/tmp/capture-XXXXX.blob') with 'capture-00001.blob' already - present → Path('/tmp/capture-00002.blob'). - If the name contains no 'X' run, the path is returned unchanged. - """ - match = _PATTERN_RE.search(pattern.name) - if match is None: - return pattern - width = len(match.group()) - prefix = pattern.name[: match.start()] - suffix = pattern.name[match.end() :] - parent = pattern.parent - - existing = sorted( - p for p in parent.glob(f"{prefix}{'?' * width}{suffix}") if p.name[len(prefix) : len(prefix) + width].isdigit() - ) - next_idx = int(existing[-1].name[len(prefix) : len(prefix) + width]) + 1 if existing else 1 - return parent / f"{prefix}{next_idx:0{width}d}{suffix}" - - class ImageCaptureInterface(ABC): @abstractmethod def capture(self, image: NDArray[np.uint8], filename: str, flags: int = 0) -> None: ... @@ -128,7 +88,7 @@ def __init__( num_slots: int = 60, max_image_bytes: int = 8 * 1024 * 1024, ) -> None: - self._blob_path = _resolve_blob_pattern(blob_path) + self._blob_path = blob_path self._num_slots = num_slots self._max_image_bytes = max_image_bytes self._index_base = SUPERBLOCK_SIZE @@ -370,9 +330,7 @@ def extract(self, dest_dir: pathlib.Path) -> list[pathlib.Path]: INDEX_ENTRY_FORMAT, data, idx_offset ) - if image_size == 0: - continue - if image_offset + image_size > len(data): + if image_size == 0 or image_offset + image_size > len(data): continue png_bytes = data[image_offset : image_offset + image_size] @@ -397,13 +355,11 @@ def extract(self, dest_dir: pathlib.Path) -> list[pathlib.Path]: def main() -> None: - import argparse as _argparse - import sys - parser = _argparse.ArgumentParser( - prog="er-scarecrow-blob-extract", + parser = argparse.ArgumentParser( + prog="er-blob-extract", description="Extract PNG images from a BlobFsCapture ring-buffer file.", - formatter_class=_argparse.ArgumentDefaultsHelpFormatter, + formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument("blob", type=pathlib.Path, help="Path to the .blob file") parser.add_argument( @@ -444,7 +400,7 @@ def main() -> None: extracted = BlobExtractor(args.blob).extract(args.dest) for path in sorted(extracted): - marker = " [TRIGGER_FRAME]" if path.name in trigger_names else "" + marker = " [TRIGGER_FRAME_FLAG]" if path.name in trigger_names else "" print(f"{path}{marker}") print(f"\nextracted {len(extracted)} image(s) to {args.dest}", file=sys.stderr) @@ -487,7 +443,7 @@ def get_trigger_names_from_blob(data: bytes, num_slots: int, index_entry_size: i if idx_offset + index_entry_size > len(data): break _img_off, img_size, flags, fn_len, _res, fn_raw = struct.unpack_from(INDEX_ENTRY_FORMAT, data, idx_offset) - if img_size > 0 and flags & TRIGGER_FRAME: + if img_size > 0 and flags & TRIGGER_FRAME_FLAG: raw_fn = fn_raw[:fn_len] try: fn = raw_fn.decode("utf-8") diff --git a/setup.cfg b/setup.cfg index 235b84f..fecb071 100644 --- a/setup.cfg +++ b/setup.cfg @@ -11,6 +11,7 @@ strict = true exclude = build,dist,.eggs,.venv max-line-length = 120 max-complexity = 10 +extend-ignore = E203 count = true statistics = true show-source = true diff --git a/tests/captureTest.py b/tests/captureTest.py index cbe88f4..0d90379 100644 --- a/tests/captureTest.py +++ b/tests/captureTest.py @@ -16,7 +16,7 @@ SUPERBLOCK_SIZE, INDEX_ENTRY_FORMAT, INDEX_ENTRY_SIZE, - TRIGGER_FRAME, + TRIGGER_FRAME_FLAG, BlobCompletionHandler, NoOpBlobCompletionHandler, CompositeBlobCapture, @@ -115,25 +115,25 @@ def test_flags_stored_in_index(self, tmp_path: pathlib.Path) -> None: def test_update_last_flags_overwrites_flags_field(self, tmp_path: pathlib.Path) -> None: blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=512 * 1024) blob.capture(_make_image(), "frame.png", flags=0) - blob.update_last_flags(TRIGGER_FRAME) + blob.update_last_flags(TRIGGER_FRAME_FLAG) blob.close() data = (tmp_path / "cap.blob").read_bytes() _img_off, _img_sz, flags, _fn_len, _res, _fn_raw = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE) - assert flags == TRIGGER_FRAME + assert flags == TRIGGER_FRAME_FLAG def test_update_last_flags_targets_most_recent_slot(self, tmp_path: pathlib.Path) -> None: blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=512 * 1024) blob.capture(_make_image(), "first.png", flags=0) blob.capture(_make_image(), "second.png", flags=0) - blob.update_last_flags(TRIGGER_FRAME) + blob.update_last_flags(TRIGGER_FRAME_FLAG) blob.close() data = (tmp_path / "cap.blob").read_bytes() # slot 0 (first) must stay 0 _a, _b, flags0, *_ = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE) assert flags0 == 0 - # slot 1 (second/last) must have TRIGGER_FRAME + # slot 1 (second/last) must have TRIGGER_FRAME_FLAG _a, _b, flags1, *_ = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE + INDEX_ENTRY_SIZE) - assert flags1 == TRIGGER_FRAME + assert flags1 == TRIGGER_FRAME_FLAG # --------------------------------------------------------------------------- @@ -227,30 +227,12 @@ def test_corruption_resilience(self, tmp_path: pathlib.Path) -> None: class BlobPatternTest: - def test_pattern_creates_first_unused_file(self, tmp_path: pathlib.Path) -> None: - blob = BlobFsCapture(tmp_path / "cap-XXX.blob", num_slots=4, max_image_bytes=256 * 1024) - blob.close() - assert (tmp_path / "cap-001.blob").exists() - - def test_pattern_skips_existing_files(self, tmp_path: pathlib.Path) -> None: - (tmp_path / "cap-001.blob").write_bytes(b"x") - (tmp_path / "cap-002.blob").write_bytes(b"x") - blob = BlobFsCapture(tmp_path / "cap-XXX.blob", num_slots=4, max_image_bytes=256 * 1024) - blob.close() - assert (tmp_path / "cap-003.blob").exists() - def test_no_pattern_uses_literal_path(self, tmp_path: pathlib.Path) -> None: path = tmp_path / "literal.blob" blob = BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) blob.close() assert path.exists() - def test_pattern_width_determines_padding(self, tmp_path: pathlib.Path) -> None: - blob = BlobFsCapture(tmp_path / "cap-XXXXX.blob", num_slots=4, max_image_bytes=256 * 1024) - blob.close() - assert (tmp_path / "cap-00001.blob").exists() - - # --------------------------------------------------------------------------- # rotate() # --------------------------------------------------------------------------- @@ -288,7 +270,7 @@ def test_blob_rotate_old_handle_retains_data(self, tmp_path: pathlib.Path) -> No assert {p.name for p in extracted_old} == {"before_rotate.png"} # New blob (original path): contains after_rotate.png dest2 = tmp_path / "from_new" - extracted_new = BlobExtractor(tmp_path / "cap-001.blob").extract(dest2) + extracted_new = BlobExtractor(tmp_path / "cap-XXX.blob").extract(dest2) assert {p.name for p in extracted_new} == {"after_rotate.png"} finally: pass # already closed above @@ -306,7 +288,7 @@ def test_blob_rotate_multiple_times(self, tmp_path: pathlib.Path) -> None: # Three event blobs renamed + the primary still exists at cap-01.blob for eid in event_ids: assert (tmp_path / f"{eid}.blob").exists() - assert (tmp_path / "cap-01.blob").exists() + assert (tmp_path / "cap-XX.blob").exists() # --------------------------------------------------------------------------- @@ -385,14 +367,14 @@ def test_no_completion_on_regular_close(self, tmp_path: pathlib.Path) -> None: def test_trigger_frame_flag_in_primary_pre_rotation(self, tmp_path: pathlib.Path) -> None: comp = self._make_composite(tmp_path, follow_up=2) - comp.capture(_make_image(), "trigger.png", flags=TRIGGER_FRAME) + comp.capture(_make_image(), "trigger.png", flags=TRIGGER_FRAME_FLAG) primary_path = comp._blob._blob_path comp.rotate("ev-flag") comp.close() # The trigger frame is in the event blob (the file that was the primary before rotation) data = (tmp_path / "ev-flag.blob").read_bytes() _img_off, _img_sz, flags, *_ = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE) - assert flags == TRIGGER_FRAME + assert flags == TRIGGER_FRAME_FLAG # New primary blob has no entries yet (fresh) primary_data = primary_path.read_bytes() _magic2, _ver2, _slots2, wh2, *_ = struct.unpack_from(SUPERBLOCK_FORMAT, primary_data, 0) @@ -401,7 +383,7 @@ def test_trigger_frame_flag_in_primary_pre_rotation(self, tmp_path: pathlib.Path def test_follow_up_entries_have_zero_flags(self, tmp_path: pathlib.Path) -> None: follow_up = 2 comp = self._make_composite(tmp_path, follow_up=follow_up) - comp.capture(_make_image(), "trigger.png", flags=TRIGGER_FRAME) + comp.capture(_make_image(), "trigger.png", flags=TRIGGER_FRAME_FLAG) comp.rotate("ev-flags") for i in range(follow_up): comp.capture(_make_image(), f"followup{i}.png") @@ -416,12 +398,12 @@ def test_follow_up_entries_have_zero_flags(self, tmp_path: pathlib.Path) -> None def test_update_last_flags_sets_flag_on_primary_blob(self, tmp_path: pathlib.Path) -> None: comp = self._make_composite(tmp_path) comp.capture(_make_image(), "frame.png", flags=0) - comp.update_last_flags(TRIGGER_FRAME) + comp.update_last_flags(TRIGGER_FRAME_FLAG) primary_path = comp._blob._blob_path comp.close() data = primary_path.read_bytes() _img_off, _img_sz, flags, *_ = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE) - assert flags == TRIGGER_FRAME + assert flags == TRIGGER_FRAME_FLAG def test_active_blob_path_returns_primary_path(self, tmp_path: pathlib.Path) -> None: comp = self._make_composite(tmp_path) @@ -485,7 +467,9 @@ def test_add_args_registers_defaults(self) -> None: def test_add_args_allows_overrides(self) -> None: parser = argparse.ArgumentParser() add_args(parser) - args = parser.parse_args(["--blob-capture-file", "my.blob", "--blob-num-slots", "10", "--blob-max-image-bytes", "1024"]) + args = parser.parse_args( + ["--blob-capture-file", "my.blob", "--blob-num-slots", "10", "--blob-max-image-bytes", "1024"] + ) assert args.blob_capture_file == "my.blob" assert args.blob_num_slots == 10 assert args.blob_max_image_bytes == 1024 @@ -512,7 +496,6 @@ def test_make_capture_backend_uses_completion_handler(self, tmp_path: pathlib.Pa handler.assert_called_once() - # --------------------------------------------------------------------------- # blob-extract CLI — trigger frame annotation # --------------------------------------------------------------------------- @@ -534,7 +517,7 @@ def _write_blob(self, tmp_path: pathlib.Path) -> pathlib.Path: path = tmp_path / "test.blob" blob = BlobFsCapture(path, num_slots=4, max_image_bytes=512 * 1024) blob.capture(_make_image(), "before.png", flags=0) - blob.capture(_make_image(), "trigger.png", flags=TRIGGER_FRAME) + blob.capture(_make_image(), "trigger.png", flags=TRIGGER_FRAME_FLAG) blob.capture(_make_image(), "after.png", flags=0) blob.close() return path @@ -543,14 +526,14 @@ def test_trigger_frame_line_is_annotated(self, tmp_path: pathlib.Path) -> None: blob_path = self._write_blob(tmp_path) output = _run_extract_cli(blob_path, tmp_path / "out") lines = {line.strip() for line in output.splitlines() if line.strip()} - assert any("trigger.png" in line and "[TRIGGER_FRAME]" in line for line in lines) + assert any("trigger.png" in line and "[TRIGGER_FRAME_FLAG]" in line for line in lines) def test_non_trigger_lines_have_no_annotation(self, tmp_path: pathlib.Path) -> None: blob_path = self._write_blob(tmp_path) output = _run_extract_cli(blob_path, tmp_path / "out") for line in output.splitlines(): if "before.png" in line or "after.png" in line: - assert "[TRIGGER_FRAME]" not in line + assert "[TRIGGER_FRAME_FLAG]" not in line def test_no_trigger_frame_no_annotation(self, tmp_path: pathlib.Path) -> None: path = tmp_path / "plain.blob" @@ -559,13 +542,13 @@ def test_no_trigger_frame_no_annotation(self, tmp_path: pathlib.Path) -> None: blob.capture(_make_image(), "frame1.png", flags=0) blob.close() output = _run_extract_cli(path, tmp_path / "out") - assert "[TRIGGER_FRAME]" not in output + assert "[TRIGGER_FRAME_FLAG]" not in output def test_blob_info(self, tmp_path: pathlib.Path) -> None: path = tmp_path / "info.blob" blob = BlobFsCapture(path, num_slots=4, max_image_bytes=512 * 1024) blob.capture(_make_image(), "frame0.png", flags=0) - blob.capture(_make_image(), "frame1.png", flags=TRIGGER_FRAME) + blob.capture(_make_image(), "frame1.png", flags=TRIGGER_FRAME_FLAG) blob.close() output = _run_extract_cli(path, tmp_path / "out", info=True) From 823735b97e55040ad2c7534d559f6168a886c36a Mon Sep 17 00:00:00 2001 From: effective-range Date: Fri, 15 May 2026 14:08:43 +0200 Subject: [PATCH 05/13] Update BlobFsCapture and BlobExtractor to support PNG compression level and adjust default max image bytes --- common_utility/capture.py | 117 ++++++++++++++++++++++---------------- tests/captureTest.py | 3 +- 2 files changed, 71 insertions(+), 49 deletions(-) diff --git a/common_utility/capture.py b/common_utility/capture.py index 841164f..928d9e7 100644 --- a/common_utility/capture.py +++ b/common_utility/capture.py @@ -43,10 +43,18 @@ def add_args(parser: argparse.ArgumentParser) -> None: ) parser.add_argument( "--blob-max-image-bytes", - default=8 * 1024 * 1024, + default=4 * 1024 * 1024, type=int, help="Maximum bytes per image slot in the blob ring buffer", ) + parser.add_argument( + "--blob-png-compression", + default=1, + type=int, + choices=range(10), + metavar="0-9", + help="PNG compression level for captured images (0=none, 9=max); default 1 favours throughput", + ) class BlobCompletionHandler(ABC): @@ -86,11 +94,13 @@ def __init__( self, blob_path: pathlib.Path, num_slots: int = 60, - max_image_bytes: int = 8 * 1024 * 1024, + max_image_bytes: int = 4 * 1024 * 1024, + png_compression: int = 1, ) -> None: self._blob_path = blob_path self._num_slots = num_slots self._max_image_bytes = max_image_bytes + self._png_compression = png_compression self._index_base = SUPERBLOCK_SIZE self._data_base = SUPERBLOCK_SIZE + num_slots * INDEX_ENTRY_SIZE self._file_size = self._data_base + num_slots * max_image_bytes @@ -155,7 +165,7 @@ def capture(self, image: NDArray[np.uint8], filename: str, flags: int = 0) -> No if len(filename_bytes) > MAX_FILENAME_BYTES: raise ValueError(f"Filename too long: {len(filename_bytes)} bytes, max {MAX_FILENAME_BYTES}") - ok, buf = cv2.imencode(".png", image) + ok, buf = cv2.imencode(".png", image, [cv2.IMWRITE_PNG_COMPRESSION, self._png_compression]) if not ok: raise ValueError("cv2.imencode failed to encode image as PNG") png_bytes = buf.tobytes() @@ -256,8 +266,9 @@ def __init__( max_image_bytes: int, follow_up_count: int, completion_handler: BlobCompletionHandler, + png_compression: int = 1, ) -> None: - self._blob = BlobFsCapture(blob_path, num_slots, max_image_bytes) + self._blob = BlobFsCapture(blob_path, num_slots, max_image_bytes, png_compression) self._follow_up_count = follow_up_count self._completion_handler = completion_handler self._active: list[tuple[BlobFsCapture, str, int]] = [] @@ -306,24 +317,30 @@ def update_last_flags(self, flags: int) -> None: class BlobExtractor: def __init__(self, blob_path: pathlib.Path) -> None: self._blob_path = blob_path - - def extract(self, dest_dir: pathlib.Path) -> list[pathlib.Path]: - dest_dir.mkdir(parents=True, exist_ok=True) - data = self._blob_path.read_bytes() - + data = blob_path.read_bytes() if len(data) < SUPERBLOCK_SIZE: raise ValueError("File too small to contain a valid superblock") - - magic, _version, num_slots, _write_head, _max_image_bytes, index_entry_size = struct.unpack_from( + magic, version, num_slots, write_head, max_image_bytes, index_entry_size = struct.unpack_from( SUPERBLOCK_FORMAT, data, 0 ) if magic != MAGIC: raise ValueError(f"Invalid blob magic: 0x{magic:08X}, expected 0x{MAGIC:08X}") + self._data = data + self.version = version + self.num_slots = num_slots + self.write_head = write_head + self.max_image_bytes = max_image_bytes + self.index_entry_size = index_entry_size + self._frame_flags: Optional[dict[str, int]] = None + + def extract(self, dest_dir: pathlib.Path) -> list[pathlib.Path]: + dest_dir.mkdir(parents=True, exist_ok=True) + data = self._data extracted: list[pathlib.Path] = [] - for i in range(num_slots): - idx_offset = SUPERBLOCK_SIZE + i * index_entry_size - if idx_offset + index_entry_size > len(data): + for i in range(self.num_slots): + idx_offset = SUPERBLOCK_SIZE + i * self.index_entry_size + if idx_offset + self.index_entry_size > len(data): break image_offset, image_size, _flags, filename_len, _reserved, filename_raw = struct.unpack_from( @@ -353,6 +370,30 @@ def extract(self, dest_dir: pathlib.Path) -> list[pathlib.Path]: return extracted + def _ensure_frame_flags(self) -> dict[str, int]: + if self._frame_flags is None: + frame_flags: dict[str, int] = {} + for i in range(self.num_slots): + idx_offset = SUPERBLOCK_SIZE + i * self.index_entry_size + if idx_offset + self.index_entry_size > len(self._data): + break + _, img_size, flags, fn_len, _, fn_raw = struct.unpack_from( + INDEX_ENTRY_FORMAT, self._data, idx_offset + ) + if img_size > 0: + raw_fn = fn_raw[:fn_len] + try: + fn = raw_fn.decode("utf-8") + except UnicodeDecodeError: + fn = raw_fn.decode("latin-1") + frame_flags[fn] = flags + self._frame_flags = frame_flags + return frame_flags + return self._frame_flags + + def get_filenames_with_flags(self, mask: int) -> set[str]: + return {fn for fn, flags in self._ensure_frame_flags().items() if flags & mask} + def main() -> None: @@ -376,31 +417,26 @@ def main() -> None: ) args = parser.parse_args() - if not args.blob.exists(): + try: + extractor = BlobExtractor(args.blob) + except FileNotFoundError: print(f"error: blob file not found: {args.blob}", file=sys.stderr) sys.exit(1) - - data = args.blob.read_bytes() - if len(data) < SUPERBLOCK_SIZE: - print("error: file too small to be a valid blob", file=sys.stderr) - sys.exit(1) - - magic, version, num_slots, write_head, max_image_bytes, index_entry_size = struct.unpack_from( - SUPERBLOCK_FORMAT, data, 0 - ) - if magic != MAGIC: - print(f"error: invalid magic 0x{magic:08X} (expected 0x{MAGIC:08X})", file=sys.stderr) + except ValueError as e: + print(f"error: {e}", file=sys.stderr) sys.exit(1) - trigger_names = get_trigger_names_from_blob(data, num_slots, index_entry_size) - if args.info: - print_blob_info(args, data, version, num_slots, write_head, max_image_bytes, index_entry_size, trigger_names) + print_blob_info( + args, extractor._data, extractor.version, extractor.num_slots, + extractor.write_head, extractor.max_image_bytes, + extractor.index_entry_size, extractor.get_filenames_with_flags(TRIGGER_FRAME_FLAG), + ) return - extracted = BlobExtractor(args.blob).extract(args.dest) + extracted = extractor.extract(args.dest) for path in sorted(extracted): - marker = " [TRIGGER_FRAME_FLAG]" if path.name in trigger_names else "" + marker = " [TRIGGER_FRAME_FLAG]" if path.name in extractor.get_filenames_with_flags(TRIGGER_FRAME_FLAG) else "" print(f"{path}{marker}") print(f"\nextracted {len(extracted)} image(s) to {args.dest}", file=sys.stderr) @@ -436,22 +472,6 @@ def _fmt(n: int) -> str: print(f"trigger_frame: {trigger}") -def get_trigger_names_from_blob(data: bytes, num_slots: int, index_entry_size: int) -> set[str]: - trigger_names: set[str] = set() - for i in range(num_slots): - idx_offset = SUPERBLOCK_SIZE + i * index_entry_size - if idx_offset + index_entry_size > len(data): - break - _img_off, img_size, flags, fn_len, _res, fn_raw = struct.unpack_from(INDEX_ENTRY_FORMAT, data, idx_offset) - if img_size > 0 and flags & TRIGGER_FRAME_FLAG: - raw_fn = fn_raw[:fn_len] - try: - fn = raw_fn.decode("utf-8") - except UnicodeDecodeError: - fn = raw_fn.decode("latin-1") - trigger_names.add(fn) - return trigger_names - def make_capture_backend( args: argparse.Namespace, @@ -462,5 +482,6 @@ def make_capture_backend( blob_path = capture_folder / args.blob_capture_file num_slots: int = args.blob_num_slots max_image_bytes: int = args.blob_max_image_bytes + png_compression: int = args.blob_png_compression handler = completion_handler if completion_handler is not None else NoOpBlobCompletionHandler() - return CompositeBlobCapture(blob_path, num_slots, max_image_bytes, follow_up_count, handler) + return CompositeBlobCapture(blob_path, num_slots, max_image_bytes, follow_up_count, handler, png_compression) diff --git a/tests/captureTest.py b/tests/captureTest.py index 0d90379..e6178cf 100644 --- a/tests/captureTest.py +++ b/tests/captureTest.py @@ -462,7 +462,8 @@ def test_add_args_registers_defaults(self) -> None: args = parser.parse_args([]) assert args.blob_capture_file == "capture.blob" assert args.blob_num_slots == 60 - assert args.blob_max_image_bytes == 8 * 1024 * 1024 + assert args.blob_max_image_bytes == 4 * 1024 * 1024 + assert args.blob_png_compression == 1 def test_add_args_allows_overrides(self) -> None: parser = argparse.ArgumentParser() From a39010781bb06f519fc6dc7b87c4b83cd6f7a4db Mon Sep 17 00:00:00 2001 From: effective-range Date: Fri, 15 May 2026 14:16:15 +0200 Subject: [PATCH 06/13] Refactor BlobExtractor to use memory mapping for file access and add context management support --- common_utility/capture.py | 75 +++++++++++++++++++++++++++------------ 1 file changed, 52 insertions(+), 23 deletions(-) diff --git a/common_utility/capture.py b/common_utility/capture.py index 928d9e7..0619181 100644 --- a/common_utility/capture.py +++ b/common_utility/capture.py @@ -317,15 +317,26 @@ def update_last_flags(self, flags: int) -> None: class BlobExtractor: def __init__(self, blob_path: pathlib.Path) -> None: self._blob_path = blob_path - data = blob_path.read_bytes() - if len(data) < SUPERBLOCK_SIZE: - raise ValueError("File too small to contain a valid superblock") - magic, version, num_slots, write_head, max_image_bytes, index_entry_size = struct.unpack_from( - SUPERBLOCK_FORMAT, data, 0 - ) - if magic != MAGIC: - raise ValueError(f"Invalid blob magic: 0x{magic:08X}, expected 0x{MAGIC:08X}") - self._data = data + self._file: Optional[Any] = None + self._mm: Optional[mmap.mmap] = None + file = open(blob_path, "rb") + self._file = file + try: + file_size = os.path.getsize(blob_path) + if file_size < SUPERBLOCK_SIZE: + raise ValueError("File too small to contain a valid superblock") + self._mm = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ) + magic, version, num_slots, write_head, max_image_bytes, index_entry_size = struct.unpack_from( + SUPERBLOCK_FORMAT, self._mm, 0 + ) + if magic != MAGIC: + raise ValueError(f"Invalid blob magic: 0x{magic:08X}, expected 0x{MAGIC:08X}") + except Exception: + if self._mm is not None: + self._mm.close() + file.close() + self._file = None + raise self.version = version self.num_slots = num_slots self.write_head = write_head @@ -333,24 +344,38 @@ def __init__(self, blob_path: pathlib.Path) -> None: self.index_entry_size = index_entry_size self._frame_flags: Optional[dict[str, int]] = None + def close(self) -> None: + if self._mm is not None: + self._mm.close() + self._mm = None + if self._file is not None: + self._file.close() + self._file = None + + def __enter__(self) -> BlobExtractor: + return self + + def __exit__(self, *_: Any) -> None: + self.close() + def extract(self, dest_dir: pathlib.Path) -> list[pathlib.Path]: + assert self._mm is not None dest_dir.mkdir(parents=True, exist_ok=True) - data = self._data extracted: list[pathlib.Path] = [] for i in range(self.num_slots): idx_offset = SUPERBLOCK_SIZE + i * self.index_entry_size - if idx_offset + self.index_entry_size > len(data): + if idx_offset + self.index_entry_size > len(self._mm): break image_offset, image_size, _flags, filename_len, _reserved, filename_raw = struct.unpack_from( - INDEX_ENTRY_FORMAT, data, idx_offset + INDEX_ENTRY_FORMAT, self._mm, idx_offset ) - if image_size == 0 or image_offset + image_size > len(data): + if image_size == 0 or image_offset + image_size > len(self._mm): continue - png_bytes = data[image_offset : image_offset + image_size] + png_bytes = bytes(self._mm[image_offset : image_offset + image_size]) arr = np.frombuffer(png_bytes, dtype=np.uint8) if cv2.imdecode(arr, cv2.IMREAD_COLOR) is None: continue @@ -372,14 +397,13 @@ def extract(self, dest_dir: pathlib.Path) -> list[pathlib.Path]: def _ensure_frame_flags(self) -> dict[str, int]: if self._frame_flags is None: + assert self._mm is not None frame_flags: dict[str, int] = {} for i in range(self.num_slots): idx_offset = SUPERBLOCK_SIZE + i * self.index_entry_size - if idx_offset + self.index_entry_size > len(self._data): + if idx_offset + self.index_entry_size > len(self._mm): break - _, img_size, flags, fn_len, _, fn_raw = struct.unpack_from( - INDEX_ENTRY_FORMAT, self._data, idx_offset - ) + _, img_size, flags, fn_len, _, fn_raw = struct.unpack_from(INDEX_ENTRY_FORMAT, self._mm, idx_offset) if img_size > 0: raw_fn = fn_raw[:fn_len] try: @@ -427,10 +451,16 @@ def main() -> None: sys.exit(1) if args.info: + assert extractor._mm is not None print_blob_info( - args, extractor._data, extractor.version, extractor.num_slots, - extractor.write_head, extractor.max_image_bytes, - extractor.index_entry_size, extractor.get_filenames_with_flags(TRIGGER_FRAME_FLAG), + args, + extractor._mm, + extractor.version, + extractor.num_slots, + extractor.write_head, + extractor.max_image_bytes, + extractor.index_entry_size, + extractor.get_filenames_with_flags(TRIGGER_FRAME_FLAG), ) return @@ -443,7 +473,7 @@ def main() -> None: def print_blob_info( args: Any, - data: bytes, + data: bytes | mmap.mmap, version: int, num_slots: int, write_head: int, @@ -472,7 +502,6 @@ def _fmt(n: int) -> str: print(f"trigger_frame: {trigger}") - def make_capture_backend( args: argparse.Namespace, capture_folder: pathlib.Path, From 121f33df28603a12aed7f926323ca67a973a204f Mon Sep 17 00:00:00 2001 From: effective-range Date: Fri, 15 May 2026 14:16:51 +0200 Subject: [PATCH 07/13] Add test for BlobExtractor to handle truncated data sections while extracting images --- tests/captureTest.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/captureTest.py b/tests/captureTest.py index e6178cf..dad7b03 100644 --- a/tests/captureTest.py +++ b/tests/captureTest.py @@ -187,6 +187,24 @@ def test_extract_invalid_magic_raises(self, tmp_path: pathlib.Path) -> None: with pytest.raises(ValueError, match="magic"): BlobExtractor(path).extract(tmp_path / "out") + def test_truncated_data_section_extracts_intact_images(self, tmp_path: pathlib.Path) -> None: + """Blob truncated partway into the third slot's data: first two images are still extracted.""" + num_slots = 3 + max_bytes = 512 * 1024 + path = tmp_path / "truncated.blob" + blob = BlobFsCapture(path, num_slots=num_slots, max_image_bytes=max_bytes) + for name in ["a.png", "b.png", "c.png"]: + blob.capture(_make_image(), name) + blob.close() + + # Keep the superblock, full index, and both first slots' data — cut 50 bytes into slot 2 + data_section_start = SUPERBLOCK_SIZE + num_slots * INDEX_ENTRY_SIZE + path.write_bytes(path.read_bytes()[: data_section_start + 2 * max_bytes + 50]) + + extracted = BlobExtractor(path).extract(tmp_path / "out") + assert len(extracted) == 2 + assert {p.name for p in extracted} == {"a.png", "b.png"} + def test_corruption_resilience(self, tmp_path: pathlib.Path) -> None: """Simulates two interrupted writes; BlobExtractor must extract the 3 valid images.""" path = tmp_path / "cap.blob" From 68176b1c57736f5bd20d44528df2cfa4c92b0999 Mon Sep 17 00:00:00 2001 From: effective-range Date: Fri, 15 May 2026 14:17:20 +0200 Subject: [PATCH 08/13] Remove unused imports in captureTest.py to clean up the code --- tests/captureTest.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/captureTest.py b/tests/captureTest.py index dad7b03..5f131ad 100644 --- a/tests/captureTest.py +++ b/tests/captureTest.py @@ -1,9 +1,7 @@ import argparse -import datetime import io import pathlib import struct -import sys import numpy as np import pytest @@ -251,6 +249,7 @@ def test_no_pattern_uses_literal_path(self, tmp_path: pathlib.Path) -> None: blob.close() assert path.exists() + # --------------------------------------------------------------------------- # rotate() # --------------------------------------------------------------------------- From b06a715933ed504bd724b319c5dab03a25173535 Mon Sep 17 00:00:00 2001 From: effective-range Date: Fri, 15 May 2026 14:21:31 +0200 Subject: [PATCH 09/13] Add script entry point for er-blob-extract in pyproject.toml --- pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index db59685..add4b83 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,9 @@ dev = [ "flake8", ] +[project.scripts] +er-blob-extract = "common_utility.capture:main" + [tool.setuptools] package-dir = {"" = "."} packages = ["common_utility", "test_utility"] From 3d8cc09409880b4d67c0026c0c2e43de4996a859 Mon Sep 17 00:00:00 2001 From: effective-range Date: Fri, 15 May 2026 14:23:03 +0200 Subject: [PATCH 10/13] Refactor main function to remove direct blob file argument and integrate add_args for argument parsing --- common_utility/capture.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common_utility/capture.py b/common_utility/capture.py index 0619181..9c831fa 100644 --- a/common_utility/capture.py +++ b/common_utility/capture.py @@ -426,7 +426,6 @@ def main() -> None: description="Extract PNG images from a BlobFsCapture ring-buffer file.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) - parser.add_argument("blob", type=pathlib.Path, help="Path to the .blob file") parser.add_argument( "dest", type=pathlib.Path, @@ -439,6 +438,7 @@ def main() -> None: action="store_true", help="Print blob metadata and slot summary without extracting images", ) + add_args(parser) args = parser.parse_args() try: From 33be1cfb97f93422edb334e56cd8a0e052dd77f3 Mon Sep 17 00:00:00 2001 From: effective-range Date: Fri, 15 May 2026 14:25:15 +0200 Subject: [PATCH 11/13] Update BlobExtractor argument name to blob_capture_file for consistency --- common_utility/capture.py | 6 +++--- tests/captureTest.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/common_utility/capture.py b/common_utility/capture.py index 9c831fa..ed866ec 100644 --- a/common_utility/capture.py +++ b/common_utility/capture.py @@ -442,9 +442,9 @@ def main() -> None: args = parser.parse_args() try: - extractor = BlobExtractor(args.blob) + extractor = BlobExtractor(args.blob_capture_file) except FileNotFoundError: - print(f"error: blob file not found: {args.blob}", file=sys.stderr) + print(f"error: blob file not found: {args.blob_capture_file}", file=sys.stderr) sys.exit(1) except ValueError as e: print(f"error: {e}", file=sys.stderr) @@ -490,7 +490,7 @@ def _fmt(n: int) -> str: return f"{n / 1024 / 1024:.1f} MiB ({n} bytes)" return f"{n / 1024:.1f} KiB ({n} bytes)" - print(f"blob: {args.blob}") + print(f"blob: {args.blob_capture_file}") print(f"magic: {MAGIC:#010x}") print(f"version: {version}") print(f"num_slots: {num_slots} (write_head={write_head})") diff --git a/tests/captureTest.py b/tests/captureTest.py index 5f131ad..e7ccb16 100644 --- a/tests/captureTest.py +++ b/tests/captureTest.py @@ -522,7 +522,7 @@ def test_make_capture_backend_uses_completion_handler(self, tmp_path: pathlib.Pa def _run_extract_cli(blob_path: pathlib.Path, dest: pathlib.Path, info: bool = False) -> str: """Run the extract CLI and return captured stdout.""" stdout = io.StringIO() - argv = ["blob-extract", str(blob_path), str(dest)] + argv = ["blob-extract", f"--blob-capture-file={str(blob_path)}", str(dest)] if info: argv.append("--info") with patch("sys.argv", argv), patch("sys.stdout", stdout), patch("sys.stderr", io.StringIO()): From 13a41f38cfbb1d32d1ec7dbe9d6dc244c4e886fe Mon Sep 17 00:00:00 2001 From: effective-range Date: Fri, 15 May 2026 14:28:53 +0200 Subject: [PATCH 12/13] Replace opencv-python with opencv-python-headless in dependencies for compatibility --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index add4b83..c30b050 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,7 @@ dependencies = [ "pydantic", "jinja2", "netifaces", - "opencv-python", + "opencv-python-headless", "numpy", "python-context-logger @ git+https://github.com/EffectiveRange/python-context-logger.git@latest", ] From ca3d19a09efe126ee17470b0981421ad19c491f8 Mon Sep 17 00:00:00 2001 From: effective-range Date: Fri, 15 May 2026 14:52:15 +0200 Subject: [PATCH 13/13] Add pytest-cov to development dependencies for test coverage --- pyproject.toml | 1 + tests/captureTest.py | 129 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index c30b050..44e9e04 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ dynamic = ["version"] dev = [ "mypy", "flake8", + "pytest-cov", ] [project.scripts] diff --git a/tests/captureTest.py b/tests/captureTest.py index e7ccb16..a4b57a2 100644 --- a/tests/captureTest.py +++ b/tests/captureTest.py @@ -119,6 +119,46 @@ def test_update_last_flags_overwrites_flags_field(self, tmp_path: pathlib.Path) _img_off, _img_sz, flags, _fn_len, _res, _fn_raw = struct.unpack_from(INDEX_ENTRY_FORMAT, data, SUPERBLOCK_SIZE) assert flags == TRIGGER_FRAME_FLAG + def test_reopen_size_mismatch_raises(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "cap.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) + blob.close() + with open(path, "ab") as f: + f.write(b"\x00") + with pytest.raises(ValueError, match="size mismatch"): + BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) + + def test_get_last_filepath_returns_none(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=256 * 1024) + assert blob.get_last_filepath() is None + blob.close() + + def test_set_capture_ts_is_a_no_op(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=256 * 1024) + blob.set_capture_ts(MagicMock()) + blob.close() + + def test_active_blob_path_returns_blob_path(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "cap.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) + assert blob.active_blob_path() == path + blob.close() + + def test_reopen_existing_blob_restores_write_head(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "cap.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) + blob.capture(_make_image(), "frame.png") + expected_head = blob._write_head + blob.close() + reopened = BlobFsCapture(path, num_slots=4, max_image_bytes=256 * 1024) + assert reopened._write_head == expected_head + reopened.close() + + def test_double_close_is_safe(self, tmp_path: pathlib.Path) -> None: + blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=256 * 1024) + blob.close() + blob.close() + def test_update_last_flags_targets_most_recent_slot(self, tmp_path: pathlib.Path) -> None: blob = BlobFsCapture(tmp_path / "cap.blob", num_slots=4, max_image_bytes=512 * 1024) blob.capture(_make_image(), "first.png", flags=0) @@ -203,6 +243,65 @@ def test_truncated_data_section_extracts_intact_images(self, tmp_path: pathlib.P assert len(extracted) == 2 assert {p.name for p in extracted} == {"a.png", "b.png"} + def test_double_close_is_safe(self, tmp_path: pathlib.Path) -> None: + path = self._write_blob(tmp_path, [(_make_image(), "frame.png")]) + e = BlobExtractor(path) + e.close() + e.close() + + def test_file_too_small_raises(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "tiny.blob" + path.write_bytes(b"\x00" * (SUPERBLOCK_SIZE - 1)) + with pytest.raises(ValueError, match="too small"): + BlobExtractor(path) + + def test_context_manager_and_truncated_index_breaks_early(self, tmp_path: pathlib.Path) -> None: + """Index truncated mid-second-entry: extract() and get_filenames_with_flags() both break early.""" + path = tmp_path / "cap.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "frame.png") + blob.close() + # Cut in the middle of the second index entry so the loop must break + truncate_at = SUPERBLOCK_SIZE + INDEX_ENTRY_SIZE + INDEX_ENTRY_SIZE // 2 + path.write_bytes(path.read_bytes()[:truncate_at]) + with BlobExtractor(path) as e: + extracted = e.extract(tmp_path / "out") + trigger_names = e.get_filenames_with_flags(TRIGGER_FRAME_FLAG) + assert extracted == [] + assert trigger_names == set() + assert e._mm is None # context manager closed it + + def test_zero_filename_len_uses_slot_index_fallback(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "cap.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "real.png") + blob.close() + # fn_len is at byte 24 within the slot-0 index entry + raw = bytearray(path.read_bytes()) + struct.pack_into(" None: + """Filename bytes invalid in UTF-8 but valid in latin-1 are decoded correctly in both paths.""" + path = tmp_path / "cap.blob" + blob = BlobFsCapture(path, num_slots=4, max_image_bytes=512 * 1024) + blob.capture(_make_image(), "placeholder.png") + blob.close() + # Patch slot-0 index: set a latin-1 filename (0xE9 = é, not valid UTF-8) + latin1_name = b"caf\xe9.png" + raw = bytearray(path.read_bytes()) + struct.pack_into(" None: """Simulates two interrupted writes; BlobExtractor must extract the 3 valid images.""" path = tmp_path / "cap.blob" @@ -455,6 +554,16 @@ def test_rotate_immediate_does_not_add_to_active(self, tmp_path: pathlib.Path) - assert len(comp._active) == 0 comp.close() + def test_set_capture_ts_delegates_to_primary(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path) + comp.set_capture_ts(MagicMock()) + comp.close() + + def test_get_last_filepath_delegates_to_primary(self, tmp_path: pathlib.Path) -> None: + comp = self._make_composite(tmp_path) + assert comp.get_last_filepath() is None + comp.close() + def test_rotate_immediate_does_not_deliver_follow_up_frames(self, tmp_path: pathlib.Path) -> None: handler = MagicMock(spec=BlobCompletionHandler) comp = self._make_composite(tmp_path, follow_up=3, handler=handler) @@ -562,6 +671,26 @@ def test_no_trigger_frame_no_annotation(self, tmp_path: pathlib.Path) -> None: output = _run_extract_cli(path, tmp_path / "out") assert "[TRIGGER_FRAME_FLAG]" not in output + def test_missing_blob_exits_with_error(self, tmp_path: pathlib.Path) -> None: + stderr = io.StringIO() + argv = ["blob-extract", f"--blob-capture-file={tmp_path / 'nonexistent.blob'}", str(tmp_path / "out")] + with patch("sys.argv", argv), patch("sys.stderr", stderr): + with pytest.raises(SystemExit) as exc: + blob_extract_main() + assert exc.value.code == 1 + assert "not found" in stderr.getvalue() + + def test_invalid_blob_exits_with_error(self, tmp_path: pathlib.Path) -> None: + path = tmp_path / "bad.blob" + path.write_bytes(b"\xff" * SUPERBLOCK_SIZE) + stderr = io.StringIO() + argv = ["blob-extract", f"--blob-capture-file={path}", str(tmp_path / "out")] + with patch("sys.argv", argv), patch("sys.stderr", stderr): + with pytest.raises(SystemExit) as exc: + blob_extract_main() + assert exc.value.code == 1 + assert "error" in stderr.getvalue() + def test_blob_info(self, tmp_path: pathlib.Path) -> None: path = tmp_path / "info.blob" blob = BlobFsCapture(path, num_slots=4, max_image_bytes=512 * 1024)