From 0ebcafcda94e269fe7df13b921281e89b32938f3 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Sun, 24 May 2026 23:21:51 +0300 Subject: [PATCH 1/3] feat(m3): glyph extractor, generation profile, and generate pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements M3 sub-PR 2 — the first end-to-end extraction pipeline. extractor.py - MIN_GLYPH_PX = 16 (issue #16 D2); _DEFAULT_MAX_AREA_FRACTION = 0.10 - Glyph frozen dataclass (x, y, width, height) - extract_glyphs(): CCA via Otsu binarisation; filters by min dimension and upper-area ceiling; sorts blobs (y, -x) — Hebrew reading order - crop_glyph(): crops from binarised image; returns PNG bytes - ExtractionError base; opencv-python-headless is an optional dep; clear install message when absent; raises rather than ImportError at import time so the rest of the package stays importable without CV generate_profile.py - GlyphAnnotation / ScanAnnotation / WriterAnnotation / GenerateProfile frozen dataclasses; human-curated bbox+letter annotations are the source of determinism — no CCA output in the generate path - load_generate_profile(path) -> (GenerateProfile, raw_dict): full structural validation; Hebrew letter check; unique writer_id and entry_id enforcement; upstream_checkout resolved relative to profile - GenerateProfileError(ValueError) with path context generator.py - generate(profile, raw, output_dir, *, generated_at): pins upstream checkout, loads+indexes eligible entries, crops each annotated glyph, writes PNG assets, builds and validates letter_set.v1 document, writes letter_set.json; returns list[Path] of written letter_set.json files - Output tree: //letter_set.json and glyphs//_______.png - variant_id and asset_path are fully deterministic from bbox+entry_id - _resolve_scan_path() probes upstream file list for role='original'; includes a TODO hook for future ALTO/hOCR sidecar probe (deferred open question from sub-PR 1) - GeneratorError / GeneratorWarning; skipped entries emit warnings, not hard failures, unless zero glyphs extracted for a writer - generated_at defaults to utcnow(); pass --generated-at for reproducible builds cli.py - generate subcommand wired: --profile (required), --output (required), --generated-at (optional override for deterministic builds) - New scan-blobs subcommand: runs extract_glyphs on a scan, outputs blob list as JSON or text; --min-dim, --max-area flags; useful for populating a generation profile manually - Lazy imports of extractor/generator/generate_profile inside command handlers so the CLI stays importable without the CV stack tests - test_extractor.py: 15 tests covering blob detection, filtering, sorting, crop dimensions, binarisation, determinism, error cases - test_generate_profile.py: 27 tests covering valid load, all validation paths, Hebrew letter acceptance, upstream_checkout resolution - test_generator.py: 13 tests including full end-to-end pipeline, validation of output against schema, checksum integrity, determinism, and CLI integration for generate and scan-blobs - test_cli.py: updated test_generate_subcommand_* to reflect that generate now requires --profile/--output (argparse usage error) pyproject.toml: add [[tool.mypy.overrides]] for cv2 (no stubs) 144 tests pass, 91 % coverage, mypy strict clean. Co-Authored-By: Claude Sonnet 4.6 --- pyproject.toml | 4 + src/hletterscriptgen/cli.py | 129 +++++- src/hletterscriptgen/extractor.py | 228 +++++++++++ src/hletterscriptgen/generate_profile.py | 341 ++++++++++++++++ src/hletterscriptgen/generator.py | 386 ++++++++++++++++++ .../generate_profile/valid_profile.json | 19 + tests/test_cli.py | 10 +- tests/test_extractor.py | 243 +++++++++++ tests/test_generate_profile.py | 218 ++++++++++ tests/test_generator.py | 296 ++++++++++++++ 10 files changed, 1859 insertions(+), 15 deletions(-) create mode 100644 src/hletterscriptgen/extractor.py create mode 100644 src/hletterscriptgen/generate_profile.py create mode 100644 src/hletterscriptgen/generator.py create mode 100644 tests/fixtures/generate_profile/valid_profile.json create mode 100644 tests/test_extractor.py create mode 100644 tests/test_generate_profile.py create mode 100644 tests/test_generator.py diff --git a/pyproject.toml b/pyproject.toml index 9a3d934..facc220 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -89,3 +89,7 @@ enable_error_code = ["redundant-expr", "truthy-bool", "ignore-without-code"] [[tool.mypy.overrides]] module = "jsonschema.*" ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "cv2" +ignore_missing_imports = true diff --git a/src/hletterscriptgen/cli.py b/src/hletterscriptgen/cli.py index 9116639..5331122 100644 --- a/src/hletterscriptgen/cli.py +++ b/src/hletterscriptgen/cli.py @@ -53,9 +53,34 @@ def _build_parser() -> argparse.ArgumentParser: help="Output format (default: text).", ) - sub.add_parser( + generate_p = sub.add_parser( "generate", - help="(Not yet implemented) Generate letter sets from upstream scans.", + help="Generate letter sets from upstream scans using a generation profile.", + ) + generate_p.add_argument( + "--profile", + type=Path, + required=True, + metavar="PROFILE", + help="Path to a generation profile JSON file.", + ) + generate_p.add_argument( + "--output", + type=Path, + required=True, + metavar="DIR", + help="Output directory (created if absent).", + ) + generate_p.add_argument( + "--generated-at", + type=str, + default=None, + metavar="ISO8601", + help=( + "Override the generated_at timestamp in output documents " + "(ISO 8601 format, e.g. '2025-01-01T00:00:00+00:00'). " + "Useful for deterministic / reproducible builds." + ), ) eligible_p = sub.add_parser( @@ -74,6 +99,39 @@ def _build_parser() -> argparse.ArgumentParser: help="Output format (default: text).", ) + scan_blobs_p = sub.add_parser( + "scan-blobs", + help=( + "Detect glyph blobs in a scan image via CCA. " + "Use the output to populate a generation profile." + ), + ) + scan_blobs_p.add_argument( + "image", + type=Path, + help="Path to the scan image (JPEG, PNG, TIFF, …).", + ) + scan_blobs_p.add_argument( + "--min-dim", + type=int, + default=16, + metavar="PX", + help="Minimum blob dimension in pixels (default: 16).", + ) + scan_blobs_p.add_argument( + "--max-area", + type=int, + default=None, + metavar="PX2", + help="Maximum blob area in pixels (default: 10%% of image area).", + ) + scan_blobs_p.add_argument( + "--format", + choices=("text", "json"), + default="json", + help="Output format (default: json).", + ) + return parser @@ -160,13 +218,62 @@ def _cmd_check_eligible(args: argparse.Namespace) -> int: return EXIT_OK if ok else EXIT_VALIDATION_FAILED -def _cmd_generate() -> int: - print( - "generate: not yet implemented in this scaffolding release. " - "See docs/roadmap.md for planned milestones.", - file=sys.stderr, - ) - return EXIT_NOT_IMPLEMENTED +def _cmd_generate(args: argparse.Namespace) -> int: + from hletterscriptgen.generate_profile import GenerateProfileError, load_generate_profile + from hletterscriptgen.generator import GeneratorError, generate + + try: + profile, raw = load_generate_profile(args.profile) + except GenerateProfileError as exc: + print(str(exc), file=sys.stderr) + return EXIT_INPUT_ERROR + + try: + output_paths = generate( + profile, + raw, + args.output, + generated_at=args.generated_at, + ) + except GeneratorError as exc: + print(str(exc), file=sys.stderr) + return EXIT_INPUT_ERROR + + for p in output_paths: + print(f"OK {p}") + return EXIT_OK + + +def _cmd_scan_blobs(args: argparse.Namespace) -> int: + from hletterscriptgen.extractor import ExtractionError, extract_glyphs + + try: + glyphs = extract_glyphs( + args.image, + min_dimension=args.min_dim, + max_area=args.max_area, + ) + except ExtractionError as exc: + print(str(exc), file=sys.stderr) + return EXIT_INPUT_ERROR + + if args.format == "json": + payload = { + "image": str(args.image), + "count": len(glyphs), + "blobs": [ + {"x": g.x, "y": g.y, "width": g.width, "height": g.height} + for g in glyphs + ], + } + json.dump(payload, sys.stdout, indent=2) + sys.stdout.write("\n") + else: + for i, g in enumerate(glyphs): + print(f"blob {i:4d}: x={g.x:5d} y={g.y:5d} w={g.width:5d} h={g.height:5d}") + print(f"{len(glyphs)} blob(s) detected in {args.image}") + + return EXIT_OK def main(argv: list[str] | None = None) -> int: @@ -180,8 +287,10 @@ def main(argv: list[str] | None = None) -> int: if args.command == "validate": return _cmd_validate(args) if args.command == "generate": - return _cmd_generate() + return _cmd_generate(args) if args.command == "check-eligible": return _cmd_check_eligible(args) + if args.command == "scan-blobs": + return _cmd_scan_blobs(args) parser.error(f"unknown command: {args.command}") diff --git a/src/hletterscriptgen/extractor.py b/src/hletterscriptgen/extractor.py new file mode 100644 index 0000000..a8b8d8c --- /dev/null +++ b/src/hletterscriptgen/extractor.py @@ -0,0 +1,228 @@ +"""CCA-based glyph extraction from handwritten Hebrew page scans. + +Implements the Option A segmentation approach chosen in M3 sub-PR 1: +connected-component analysis (CCA) via ``opencv-python-headless`` with +Otsu binarisation. See ``docs/design/segmentation-approach.md`` for +the decision record, algorithm spec, and known failure modes. + +``opencv-python-headless`` is an *optional* dependency (install with +``pip install hletterscriptgen[cv]``). All public functions raise +:class:`ExtractionError` when the library is not available, rather than +an import-time ``ImportError``, so the rest of the package stays +importable on environments without the CV stack. + +The module exposes: + +* :data:`MIN_GLYPH_PX` — minimum bounding-box dimension (px). +* :class:`Glyph` — frozen dataclass for a detected blob's bounding box. +* :func:`extract_glyphs` — detect blobs in a scan via CCA. +* :func:`crop_glyph` — crop one blob from the binarised scan, return PNG bytes. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + +# Minimum bounding-box width AND height in pixels, per issue #16 decision D2. +MIN_GLYPH_PX: int = 16 + +# Upper-area ceiling as a fraction of total image area. Blobs that cover +# more than this fraction of the page are assumed to be noise (stamps, +# ruled lines, bleed-through from the verso). Empirical calibration of +# this value is deferred to a later sub-PR; 10 % is a conservative default. +_DEFAULT_MAX_AREA_FRACTION: float = 0.10 + + +class ExtractionError(Exception): + """Raised when glyph extraction fails. + + Covers both missing-library errors (opencv not installed) and + runtime errors (image not found, encode failure, out-of-bounds crop). + """ + + +@dataclass(frozen=True) +class Glyph: + """Bounding box of a connected component detected in a scan image. + + Coordinates are in the scan image's pixel space. ``x`` and ``y`` are + the top-left corner of the bounding box; ``width`` and ``height`` are + its extent. All values are non-negative integers; ``width`` and + ``height`` are ≥ 1. + + Glyphs returned by :func:`extract_glyphs` are sorted by + ``(y, -x)`` — top-to-bottom rows, right-to-left within each row — + consistent with Hebrew reading order. + """ + + x: int + y: int + width: int + height: int + + +def extract_glyphs( + image_path: Path, + *, + min_dimension: int = MIN_GLYPH_PX, + max_area: int | None = None, +) -> list[Glyph]: + """Detect letter glyphs in a scan via connected-component analysis. + + Algorithm (per ``docs/design/segmentation-approach.md``): + + 1. Load the scan image (BGR). + 2. Convert to greyscale. + 3. Binarise with Otsu's method (``THRESH_BINARY_INV`` — ink pixels + become foreground / 255, background becomes 0). + 4. Run ``connectedComponentsWithStats`` to label foreground blobs. + 5. Drop blobs where ``width < min_dimension`` or ``height < min_dimension`` + (quality floor, issue #16 D2). + 6. Drop blobs whose pixel area exceeds ``max_area`` (noise ceiling; + defaults to 10 % of total image area when ``None``). + 7. Sort survivors by ``(y, -x)`` — top row first, right-to-left within + a row — and return them as :class:`Glyph` records. + + .. note:: + Deskew pre-processing is intentionally omitted from M3 — the + failure-mode analysis rates skew as low-severity for the current + corpus. Add a Hough-line deskew step before binarisation if + empirical scan quality demands it. + + .. note:: + Nikud (diacritical marks) are emitted as separate blobs when they + exceed ``min_dimension``. Merging them with their parent letter + body is out of scope for M3; deferred to M4. + + Parameters + ---------- + image_path: + Path to the scan image (JPEG, PNG, TIFF, or any format OpenCV can + decode). + min_dimension: + Minimum bounding-box width *and* height in pixels. A blob must + satisfy both constraints to survive. Defaults to :data:`MIN_GLYPH_PX`. + max_area: + Maximum bounding-box area in pixels. Blobs with a pixel area + strictly greater than this are dropped as likely noise. When + ``None`` (default), the ceiling is set to 10 % of the image's + total area at runtime. + + Raises + ------ + ExtractionError + When ``opencv-python-headless`` is not installed, the image cannot + be read, or ``min_dimension`` < 1. + """ + if min_dimension < 1: + raise ExtractionError(f"min_dimension must be ≥ 1, got {min_dimension}") + + try: + import cv2 + except ImportError as exc: + raise ExtractionError( + "opencv-python-headless is required for glyph extraction; " + "install it with: pip install hletterscriptgen[cv]" + ) from exc + + img = cv2.imread(str(image_path)) + if img is None: + raise ExtractionError(f"could not load image: {image_path}") + + img_h, img_w = img.shape[:2] + effective_max_area = ( + max_area if max_area is not None else int(img_h * img_w * _DEFAULT_MAX_AREA_FRACTION) + ) + + grey = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + _, binary = cv2.threshold(grey, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU) + + num_labels, _, stats, _ = cv2.connectedComponentsWithStats(binary, connectivity=8) + + glyphs: list[Glyph] = [] + # Label 0 is the background component — start at 1. + for i in range(1, num_labels): + x = int(stats[i, cv2.CC_STAT_LEFT]) + y = int(stats[i, cv2.CC_STAT_TOP]) + w = int(stats[i, cv2.CC_STAT_WIDTH]) + h = int(stats[i, cv2.CC_STAT_HEIGHT]) + area = int(stats[i, cv2.CC_STAT_AREA]) + + if w < min_dimension or h < min_dimension: + continue + if area > effective_max_area: + continue + + glyphs.append(Glyph(x=x, y=y, width=w, height=h)) + + # Sort: top-to-bottom by y, then right-to-left by x within each row. + glyphs.sort(key=lambda g: (g.y, -g.x)) + return glyphs + + +def crop_glyph(image_path: Path, glyph: Glyph) -> bytes: + """Crop a glyph region from the binarised scan and return PNG bytes. + + The crop is taken from the *binarised* (Otsu) image, not the original + colour scan, so the returned PNG contains only black ink (255) and + white background (0) pixels. + + Parameters + ---------- + image_path: + Path to the source scan image. + glyph: + Bounding box to crop, in the scan's pixel space. + + Returns + ------- + bytes + PNG-encoded bytes of the cropped region. + + Raises + ------ + ExtractionError + When the library is not installed, the image cannot be read, the + glyph bbox falls outside the image boundaries, or PNG encoding + fails. + """ + try: + import cv2 + except ImportError as exc: + raise ExtractionError( + "opencv-python-headless is required for glyph extraction; " + "install it with: pip install hletterscriptgen[cv]" + ) from exc + + img = cv2.imread(str(image_path)) + if img is None: + raise ExtractionError(f"could not load image: {image_path}") + + img_h, img_w = img.shape[:2] + x, y, w, h = glyph.x, glyph.y, glyph.width, glyph.height + + if x < 0 or y < 0 or x + w > img_w or y + h > img_h: + raise ExtractionError( + f"glyph bbox (x={x}, y={y}, w={w}, h={h}) falls outside " + f"image dimensions {img_w}×{img_h}: {image_path}" + ) + + grey = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + _, binary = cv2.threshold(grey, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU) + crop = binary[y : y + h, x : x + w] + + ok, buf = cv2.imencode(".png", crop) + if not ok: + raise ExtractionError(f"failed to encode glyph crop as PNG: {image_path}") + + return bytes(buf.tobytes()) + + +__all__ = [ + "MIN_GLYPH_PX", + "ExtractionError", + "Glyph", + "crop_glyph", + "extract_glyphs", +] diff --git a/src/hletterscriptgen/generate_profile.py b/src/hletterscriptgen/generate_profile.py new file mode 100644 index 0000000..fa049d3 --- /dev/null +++ b/src/hletterscriptgen/generate_profile.py @@ -0,0 +1,341 @@ +"""Generation profile: human-curated letter annotations for the generate pipeline. + +A generation profile is a JSON config file that tells the generator +*exactly* which glyph bounding boxes to crop from which upstream scans, +and which Hebrew letter each glyph represents. Because all bounding +boxes are explicitly declared by a human, the generator's output is +deterministic: same profile + same upstream revision → bit-identical +letter-set tree. + +Typical workflow: + +1. Run ``hletterscriptgen scan-blobs `` to discover CCA-detected + blobs in a scan. +2. Review the blob list, assign Hebrew letter labels, and record the + chosen bounding boxes in a ``generate_profile.json`` file. +3. Run ``hletterscriptgen generate --profile generate_profile.json + --output ./out`` to produce letter_set.v1 documents. + +Profile JSON shape:: + + { + "upstream_checkout": "../public-domain-hand-written-hebrew-scans", + "writers": [ + { + "writer_id": "writer_bialik", + "attribution_method": "collection_metadata", + "notes": "...", + "scans": [ + { + "entry_id": "commons__bialik_letter_safed_1927__p0001", + "glyphs": [ + {"letter": "א", "x": 10, "y": 20, "width": 30, "height": 40}, + {"letter": "ב", "x": 55, "y": 22, "width": 28, "height": 38} + ] + } + ] + } + ] + } + +The module exposes: + +* :class:`GlyphAnnotation` — a single bbox + letter label. +* :class:`ScanAnnotation` — all annotated glyphs for one upstream scan. +* :class:`WriterAnnotation` — all annotated scans for one writer. +* :class:`GenerateProfile` — top-level config object. +* :class:`GenerateProfileError` — base error class. +* :func:`load_generate_profile` — read, validate, and return a + ``(GenerateProfile, raw_dict)`` pair; the raw dict is suitable for + passing to :func:`hletterscriptgen.hashing.config_hash`. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from hletterscriptgen import HEBREW_LETTERS + + +# --------------------------------------------------------------------------- +# Errors +# --------------------------------------------------------------------------- + + +class GenerateProfileError(ValueError): + """Raised when a generation profile file is invalid. + + ``path`` refers to the profile JSON file associated with the error. + """ + + def __init__(self, message: str, *, path: Path) -> None: + super().__init__(f"{path}: {message}") + self.path = path + + +# --------------------------------------------------------------------------- +# Data classes +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class GlyphAnnotation: + """A manually annotated glyph: one Hebrew letter at a specific bbox. + + ``letter`` must be a single character from the Hebrew block + (U+05D0..U+05EA, base and final forms). ``x``, ``y`` are the + top-left corner of the bounding box in the scan's pixel space; + ``width`` and ``height`` are its extent. ``notes`` is optional. + """ + + letter: str # single Hebrew character + x: int # left edge of bbox (px, ≥ 0) + y: int # top edge of bbox (px, ≥ 0) + width: int # bbox width (px, ≥ 1) + height: int # bbox height (px, ≥ 1) + notes: str | None = None + + +@dataclass(frozen=True) +class ScanAnnotation: + """All annotated glyphs for one upstream scan entry. + + ``entry_id`` identifies the upstream ``entries.jsonl`` record. + ``glyphs`` must be non-empty. + """ + + entry_id: str + glyphs: tuple[GlyphAnnotation, ...] + + +@dataclass(frozen=True) +class WriterAnnotation: + """All annotated scans for one writer. + + ``writer_id`` is a stable, repo-unique identifier. + ``attribution_method`` is the same vocabulary as + :class:`hletterscriptgen.attribution.AttributionMethod`. + ``scans`` must be non-empty. + """ + + writer_id: str + attribution_method: str + scans: tuple[ScanAnnotation, ...] + notes: str | None = None + + +@dataclass(frozen=True) +class GenerateProfile: + """Top-level generation profile config. + + ``upstream_checkout`` is the path to the local upstream repo checkout + (used for both pinning the revision and resolving scan file paths). + ``writers`` is a non-empty tuple of :class:`WriterAnnotation` records. + """ + + upstream_checkout: Path + writers: tuple[WriterAnnotation, ...] + + +# --------------------------------------------------------------------------- +# Internal parsing helpers +# --------------------------------------------------------------------------- + + +def _require_str(raw: dict[str, Any], key: str, context: str, *, path: Path) -> str: + val = raw.get(key) + if not isinstance(val, str): + raise GenerateProfileError( + f"{context}: '{key}' must be a non-null string, " + f"got {type(val).__name__ if val is not None else 'null'}", + path=path, + ) + return val + + +def _require_int_ge( + raw: dict[str, Any], key: str, minimum: int, context: str, *, path: Path +) -> int: + val = raw.get(key) + if not isinstance(val, int) or isinstance(val, bool): + raise GenerateProfileError( + f"{context}: '{key}' must be an integer, " + f"got {type(val).__name__ if val is not None else 'null'}", + path=path, + ) + if val < minimum: + raise GenerateProfileError( + f"{context}: '{key}' must be ≥ {minimum}, got {val}", + path=path, + ) + return val + + +def _parse_glyph(raw: Any, index: int, scan_ctx: str, *, path: Path) -> GlyphAnnotation: + ctx = f"{scan_ctx}/glyphs[{index}]" + if not isinstance(raw, dict): + raise GenerateProfileError(f"{ctx}: expected a JSON object", path=path) + + letter = _require_str(raw, "letter", ctx, path=path) + if len(letter) != 1 or letter not in HEBREW_LETTERS: + raise GenerateProfileError( + f"{ctx}: 'letter' must be a single Hebrew character " + f"(U+05D0..U+05EA), got {letter!r}", + path=path, + ) + + x = _require_int_ge(raw, "x", 0, ctx, path=path) + y = _require_int_ge(raw, "y", 0, ctx, path=path) + width = _require_int_ge(raw, "width", 1, ctx, path=path) + height = _require_int_ge(raw, "height", 1, ctx, path=path) + + notes = raw.get("notes") + if notes is not None and not isinstance(notes, str): + raise GenerateProfileError(f"{ctx}: 'notes' must be a string if present", path=path) + + return GlyphAnnotation(letter=letter, x=x, y=y, width=width, height=height, notes=notes) + + +def _parse_scan(raw: Any, index: int, writer_ctx: str, *, path: Path) -> ScanAnnotation: + ctx = f"{writer_ctx}/scans[{index}]" + if not isinstance(raw, dict): + raise GenerateProfileError(f"{ctx}: expected a JSON object", path=path) + + entry_id = _require_str(raw, "entry_id", ctx, path=path) + if not entry_id.strip(): + raise GenerateProfileError(f"{ctx}: 'entry_id' must not be blank", path=path) + + raw_glyphs = raw.get("glyphs") + if not isinstance(raw_glyphs, list) or not raw_glyphs: + raise GenerateProfileError( + f"{ctx}: 'glyphs' must be a non-empty list", path=path + ) + + glyphs = tuple( + _parse_glyph(g, i, f"{ctx}", path=path) for i, g in enumerate(raw_glyphs) + ) + return ScanAnnotation(entry_id=entry_id, glyphs=glyphs) + + +def _parse_writer(raw: Any, index: int, *, path: Path) -> WriterAnnotation: + ctx = f"writers[{index}]" + if not isinstance(raw, dict): + raise GenerateProfileError(f"{ctx}: expected a JSON object", path=path) + + writer_id = _require_str(raw, "writer_id", ctx, path=path) + if not writer_id.strip(): + raise GenerateProfileError(f"{ctx}: 'writer_id' must not be blank", path=path) + + attribution_method = _require_str(raw, "attribution_method", ctx, path=path) + if not attribution_method.strip(): + raise GenerateProfileError( + f"{ctx}: 'attribution_method' must not be blank", path=path + ) + + raw_scans = raw.get("scans") + if not isinstance(raw_scans, list) or not raw_scans: + raise GenerateProfileError( + f"{ctx}: 'scans' must be a non-empty list", path=path + ) + + scans = tuple( + _parse_scan(s, i, f"{ctx}", path=path) for i, s in enumerate(raw_scans) + ) + + notes = raw.get("notes") + if notes is not None and not isinstance(notes, str): + raise GenerateProfileError( + f"{ctx}: 'notes' must be a string if present", path=path + ) + + return WriterAnnotation( + writer_id=writer_id, + attribution_method=attribution_method, + scans=scans, + notes=notes, + ) + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def load_generate_profile(path: Path) -> tuple[GenerateProfile, dict[str, Any]]: + """Read and validate a generation profile JSON file. + + Returns ``(profile, raw_dict)`` where ``raw_dict`` is the original + parsed JSON object — pass it to + :func:`hletterscriptgen.hashing.config_hash` to compute + ``generator.config_hash`` for the output ``letter_set.v1`` document. + + Raises :class:`GenerateProfileError` when the file is missing, not + valid JSON, or structurally invalid. Raises :class:`OSError` for + other I/O failures. + """ + try: + raw_text = path.read_text(encoding="utf-8") + except FileNotFoundError as exc: + raise GenerateProfileError( + f"generation profile file not found: {path}", path=path + ) from exc + + try: + raw: dict[str, Any] = json.loads(raw_text) + except json.JSONDecodeError as exc: + raise GenerateProfileError(f"invalid JSON: {exc.msg}", path=path) from exc + + if not isinstance(raw, dict): + raise GenerateProfileError("expected a JSON object at top level", path=path) + + raw_checkout = raw.get("upstream_checkout") + if not isinstance(raw_checkout, str) or not raw_checkout.strip(): + raise GenerateProfileError( + "'upstream_checkout' must be a non-empty string", path=path + ) + + raw_writers = raw.get("writers") + if not isinstance(raw_writers, list) or not raw_writers: + raise GenerateProfileError("'writers' must be a non-empty list", path=path) + + writers = tuple(_parse_writer(w, i, path=path) for i, w in enumerate(raw_writers)) + + # Enforce uniqueness of writer_ids and entry_ids across all writers. + seen_writer_ids: set[str] = set() + seen_entry_ids: dict[str, str] = {} # entry_id → writer_id + for wa in writers: + if wa.writer_id in seen_writer_ids: + raise GenerateProfileError( + f"duplicate writer_id {wa.writer_id!r}", path=path + ) + seen_writer_ids.add(wa.writer_id) + for sa in wa.scans: + if sa.entry_id in seen_entry_ids: + raise GenerateProfileError( + f"entry_id {sa.entry_id!r} appears under both " + f"{seen_entry_ids[sa.entry_id]!r} and {wa.writer_id!r}", + path=path, + ) + seen_entry_ids[sa.entry_id] = wa.writer_id + + # Resolve upstream_checkout relative to the profile file's parent dir. + checkout_path = (path.parent / raw_checkout).resolve() + + profile = GenerateProfile( + upstream_checkout=checkout_path, + writers=writers, + ) + return profile, raw + + +__all__ = [ + "GenerateProfile", + "GenerateProfileError", + "GlyphAnnotation", + "ScanAnnotation", + "WriterAnnotation", + "load_generate_profile", +] diff --git a/src/hletterscriptgen/generator.py b/src/hletterscriptgen/generator.py new file mode 100644 index 0000000..e9226af --- /dev/null +++ b/src/hletterscriptgen/generator.py @@ -0,0 +1,386 @@ +"""End-to-end glyph extraction pipeline (M3 MVP). + +Orchestrates the full generate flow: + +1. Pin the upstream checkout revision. +2. Load and index all eligible upstream entries. +3. For each writer → each scan → each annotated glyph: + a. Look up the upstream entry; skip ineligible entries (warn). + b. Resolve the scan file path from the upstream checkout. + c. Crop the glyph from the binarised scan. + d. Write the PNG to the output tree. + e. Record a ``variant`` for the letter_set.v1 document. +4. Build and validate the ``letter_set.v1`` document for each writer. +5. Write ``letter_set.json`` to the writer's output directory. + +Output tree structure:: + + / + / + letter_set.json + glyphs/ + _______.png + +All paths in ``letter_set.json`` are POSIX-relative to the writer's +output directory. The ``variant_id`` is derived deterministically from +the entry_id, letter, and bounding box. + +Determinism +----------- +Given the same generation profile, the same upstream revision, and the +same version of ``hletterscriptgen``, the output tree is bit-identical. +The ``generated_at`` field is the one exception — callers should inject a +fixed timestamp for reproducible builds (see ``--generated-at`` on the +``generate`` CLI). +""" + +from __future__ import annotations + +import hashlib +import json +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +from hletterscriptgen import HEBREW_LETTERS, __version__ +from hletterscriptgen.extractor import ExtractionError, Glyph, crop_glyph +from hletterscriptgen.generate_profile import ( + GenerateProfile, + GlyphAnnotation, + ScanAnnotation, + WriterAnnotation, +) +from hletterscriptgen.hashing import config_hash +from hletterscriptgen.upstream import ( + UpstreamEntry, + UpstreamError, + is_eligible, + load_entries, + upstream_pin_from_checkout, +) +from hletterscriptgen.validation import validate_document + + +class GeneratorError(Exception): + """Raised when the generator encounters a fatal error.""" + + +class GeneratorWarning(UserWarning): + """Issued for non-fatal conditions (skipped entries, missing files).""" + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _variant_id(entry_id: str, letter: str, glyph: GlyphAnnotation) -> str: + """Deterministic variant identifier derived from the bbox and letter.""" + return f"{entry_id}__{letter}__{glyph.x}_{glyph.y}_{glyph.width}_{glyph.height}" + + +def _asset_path(entry_id: str, letter: str, glyph: GlyphAnnotation) -> str: + """POSIX-relative asset path within the writer's output directory.""" + fname = f"{entry_id}__{letter}__{glyph.x}_{glyph.y}_{glyph.width}_{glyph.height}.png" + return f"glyphs/{letter}/{fname}" + + +def _sha256_hex(data: bytes) -> str: + return hashlib.sha256(data).hexdigest() + + +def _resolve_scan_path( + entry: UpstreamEntry, + upstream_checkout: Path, +) -> Path | None: + """Return the absolute path of the 'original' scan file, or None. + + Probes the upstream entry's ``files`` list for a file with + ``role == 'original'``. Checks ALTO / hOCR sidecar slots first + (per the deferred open question in sub-PR 1); falls back to + ``local_path`` on the original file. + + Returns ``None`` when no usable file path can be resolved. + """ + # Probe for annotation sidecars first (ALTO / hOCR). The upstream + # schema defines ``transcription.alto_path`` and ``hocr_path`` slots; + # all values are currently null in the corpus, but this probe future- + # proofs the extractor so it can graduate to Option B/C without a + # code change once sidecars are populated. + # NOTE: UpstreamEntry models only the ``files[]`` array; sidecar paths + # live under ``transcription`` which is not yet in the modelled subset. + # The probe below is a no-op until the upstream model is extended. + # TODO: extend UpstreamEntry to model transcription.alto_path and + # transcription.hocr_path, then probe those here before falling back. + + # Fall back to CCA: use the 'original' file's local_path. + for f in entry.files: + if f.role == "original" and f.local_path is not None: + return (upstream_checkout / f.local_path).resolve() + return None + + +def _process_writer( + writer: WriterAnnotation, + entry_index: dict[str, UpstreamEntry], + upstream_checkout: Path, + writer_out_dir: Path, + profile_raw: dict[str, Any], + upstream_pin_repo: str, + upstream_pin_revision: str, + generated_at: str, + warnings: list[str], +) -> dict[str, Any]: + """Build and write the letter_set.v1 document for one writer. + + Returns the letter_set.v1 document dict (already written to disk). + Appends non-fatal issues to ``warnings``. + Raises :class:`GeneratorError` on fatal conditions. + """ + glyph_dir = writer_out_dir / "glyphs" + letters_map: dict[str, list[dict[str, Any]]] = {} + observed_licenses: set[str] = set() + used_entry_ids: set[str] = set() + + for scan in writer.scans: + entry = entry_index.get(scan.entry_id) + if entry is None: + warnings.append( + f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r} " + "not found in upstream entries — skipped" + ) + continue + + if not is_eligible(entry): + warnings.append( + f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r} " + "is not eligible — skipped" + ) + continue + + scan_path = _resolve_scan_path(entry, upstream_checkout) + if scan_path is None: + warnings.append( + f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r} " + "has no resolvable scan file — skipped" + ) + continue + + if not scan_path.is_file(): + warnings.append( + f"writer {writer.writer_id!r}: scan file not found at " + f"{scan_path} — skipped" + ) + continue + + license_expr = entry.rights.license_expression + if license_expr is None: + warnings.append( + f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r} " + "has no license_expression — skipped" + ) + continue + + used_entry_ids.add(scan.entry_id) + + for glyph_ann in scan.glyphs: + if glyph_ann.letter not in HEBREW_LETTERS: + warnings.append( + f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r}: " + f"glyph letter {glyph_ann.letter!r} not a recognised " + "Hebrew character — skipped" + ) + continue + + glyph = Glyph( + x=glyph_ann.x, + y=glyph_ann.y, + width=glyph_ann.width, + height=glyph_ann.height, + ) + try: + png_bytes = crop_glyph(scan_path, glyph) + except ExtractionError as exc: + warnings.append( + f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r}: " + f"crop failed ({exc}) — skipped" + ) + continue + + rel_path = _asset_path(scan.entry_id, glyph_ann.letter, glyph_ann) + out_file = writer_out_dir / rel_path + out_file.parent.mkdir(parents=True, exist_ok=True) + out_file.write_bytes(png_bytes) + + variant: dict[str, Any] = { + "variant_id": _variant_id(scan.entry_id, glyph_ann.letter, glyph_ann), + "asset_path": rel_path, + "checksum_sha256": _sha256_hex(png_bytes), + "image": { + "width_px": glyph_ann.width, + "height_px": glyph_ann.height, + "format": "png", + }, + "source": { + "scan_entry_id": scan.entry_id, + "license": license_expr, + "bbox_in_source": { + "x": glyph_ann.x, + "y": glyph_ann.y, + "width": glyph_ann.width, + "height": glyph_ann.height, + }, + }, + "extracted_at": generated_at, + } + + letters_map.setdefault(glyph_ann.letter, []).append(variant) + observed_licenses.add(license_expr) + + if not letters_map: + raise GeneratorError( + f"writer {writer.writer_id!r}: no glyphs were successfully extracted; " + "check warnings for details" + ) + + document: dict[str, Any] = { + "schema_version": "letter_set.v1", + "writer_id": writer.writer_id, + "writer_provenance": { + "source_repo": upstream_pin_repo, + "source_entry_ids": sorted(used_entry_ids), + "attribution_method": writer.attribution_method, + **({"notes": writer.notes} if writer.notes else {}), + }, + "generator": { + "name": "hletterscriptgen", + "version": __version__, + "config_hash": config_hash(profile_raw), + }, + "generated_at": generated_at, + "upstream": { + "repo": upstream_pin_repo, + "revision": upstream_pin_revision, + }, + "letters": letters_map, + "license_summary": { + "licenses": sorted(observed_licenses), + }, + } + + result = validate_document(document) + if not result.ok: + issues = "; ".join(i.format() for i in result.issues) + raise GeneratorError( + f"writer {writer.writer_id!r}: generated document failed " + f"validation: {issues}" + ) + + letter_set_path = writer_out_dir / "letter_set.json" + letter_set_path.write_text( + json.dumps(document, ensure_ascii=False, indent=2, sort_keys=True) + "\n", + encoding="utf-8", + ) + return document + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def generate( + profile: GenerateProfile, + profile_raw: dict[str, Any], + output_dir: Path, + *, + generated_at: str | None = None, +) -> list[Path]: + """Run the full generation pipeline and return paths to letter_set.json files. + + Parameters + ---------- + profile: + Parsed generation profile (from :func:`~hletterscriptgen.generate_profile.load_generate_profile`). + profile_raw: + The original raw dict from the profile JSON file — used to compute + ``generator.config_hash`` in the output documents. + output_dir: + Root output directory. Created if it does not exist. + generated_at: + ISO 8601 timestamp string to embed in the output documents. + When ``None`` (default) the current UTC time is used. Pass a + fixed value for reproducible builds (see ``--generated-at`` on + the ``generate`` CLI). + + Returns + ------- + list[Path] + Absolute paths to each ``letter_set.json`` written, one per writer. + + Raises + ------ + GeneratorError + When the upstream checkout cannot be pinned, no eligible entries + are found, or a writer's document fails validation. + UpstreamError + When the upstream entries file cannot be loaded. + """ + if generated_at is None: + generated_at = datetime.now(tz=timezone.utc).isoformat() + + output_dir.mkdir(parents=True, exist_ok=True) + + # 1. Pin the upstream checkout. + try: + pin = upstream_pin_from_checkout(profile.upstream_checkout) + except Exception as exc: + raise GeneratorError( + f"could not pin upstream checkout at " + f"{profile.upstream_checkout}: {exc}" + ) from exc + + # 2. Load and index all eligible upstream entries. + entries_path = profile.upstream_checkout / "data" / "index" / "entries.jsonl" + try: + all_entries = list(load_entries(entries_path)) + except UpstreamError as exc: + raise GeneratorError(f"could not load upstream entries: {exc}") from exc + + entry_index: dict[str, UpstreamEntry] = {e.entry_id: e for e in all_entries} + + # 3. Process each writer. + warnings: list[str] = [] + output_paths: list[Path] = [] + + for writer in profile.writers: + writer_out_dir = output_dir / writer.writer_id + writer_out_dir.mkdir(parents=True, exist_ok=True) + + _process_writer( + writer=writer, + entry_index=entry_index, + upstream_checkout=profile.upstream_checkout, + writer_out_dir=writer_out_dir, + profile_raw=profile_raw, + upstream_pin_repo=pin.repo, + upstream_pin_revision=pin.revision, + generated_at=generated_at, + warnings=warnings, + ) + output_paths.append((writer_out_dir / "letter_set.json").resolve()) + + if warnings: + import warnings as _warnings + + for msg in warnings: + _warnings.warn(msg, GeneratorWarning, stacklevel=2) + + return output_paths + + +__all__ = [ + "GeneratorError", + "GeneratorWarning", + "generate", +] diff --git a/tests/fixtures/generate_profile/valid_profile.json b/tests/fixtures/generate_profile/valid_profile.json new file mode 100644 index 0000000..6598343 --- /dev/null +++ b/tests/fixtures/generate_profile/valid_profile.json @@ -0,0 +1,19 @@ +{ + "upstream_checkout": "../upstream", + "writers": [ + { + "writer_id": "writer_fixture_a", + "attribution_method": "manual_review", + "notes": "Fixture writer for tests", + "scans": [ + { + "entry_id": "fixture__eligible_pdm__p0001", + "glyphs": [ + {"letter": "א", "x": 5, "y": 5, "width": 20, "height": 20}, + {"letter": "ב", "x": 35, "y": 5, "width": 20, "height": 20} + ] + } + ] + } + ] +} diff --git a/tests/test_cli.py b/tests/test_cli.py index b854a0b..e928672 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -61,13 +61,13 @@ def test_validate_subcommand_fails_on_bad_doc( assert all("kind" in err for err in payload["errors"]) -def test_generate_subcommand_exits_not_implemented( +def test_generate_subcommand_requires_profile_and_output( capsys: pytest.CaptureFixture[str], ) -> None: - code = main(["generate"]) - assert code == EXIT_NOT_IMPLEMENTED - assert EXIT_NOT_IMPLEMENTED == 69 - assert "not yet implemented" in capsys.readouterr().err + # --profile and --output are now required; omitting them is a usage error. + with pytest.raises(SystemExit) as exc_info: + main(["generate"]) + assert exc_info.value.code == 2 # argparse usage error def test_check_eligible_text_mixed(capsys: pytest.CaptureFixture[str]) -> None: diff --git a/tests/test_extractor.py b/tests/test_extractor.py new file mode 100644 index 0000000..36d4804 --- /dev/null +++ b/tests/test_extractor.py @@ -0,0 +1,243 @@ +"""Tests for the CCA glyph extractor.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +cv2 = pytest.importorskip("cv2") +import numpy as np # noqa: E402 — after importorskip so this only runs with cv2 + +from hletterscriptgen.extractor import ( # noqa: E402 + MIN_GLYPH_PX, + ExtractionError, + Glyph, + crop_glyph, + extract_glyphs, +) + +# --------------------------------------------------------------------------- +# Synthetic image helpers +# --------------------------------------------------------------------------- + + +def _white_image(width: int, height: int) -> np.ndarray: # type: ignore[type-arg] + """Return a white (255) BGR image of the given dimensions.""" + return np.full((height, width, 3), 255, dtype=np.uint8) + + +def _draw_black_rect( + img: np.ndarray, # type: ignore[type-arg] + x: int, + y: int, + w: int, + h: int, +) -> None: + """Draw a solid black rectangle on img (in-place).""" + img[y : y + h, x : x + w] = 0 + + +def _save_png(img: np.ndarray, path: Path) -> None: # type: ignore[type-arg] + cv2.imwrite(str(path), img) + + +# --------------------------------------------------------------------------- +# extract_glyphs +# --------------------------------------------------------------------------- + + +def test_extract_glyphs_detects_blobs(tmp_path: Path) -> None: + """Two well-separated black rectangles on a white background.""" + img = _white_image(200, 100) + _draw_black_rect(img, x=10, y=10, w=30, h=30) # left blob + _draw_black_rect(img, x=100, y=10, w=30, h=30) # right blob + scan = tmp_path / "scan.png" + _save_png(img, scan) + + glyphs = extract_glyphs(scan) + + assert len(glyphs) == 2 + # Sorted right-to-left (larger x first) within the same y-row. + assert glyphs[0].x > glyphs[1].x + + +def test_extract_glyphs_filters_small_blobs(tmp_path: Path) -> None: + """Blobs below MIN_GLYPH_PX in either dimension should be dropped.""" + img = _white_image(200, 100) + _draw_black_rect(img, x=10, y=10, w=MIN_GLYPH_PX, h=MIN_GLYPH_PX) # exactly at floor + _draw_black_rect(img, x=60, y=10, w=MIN_GLYPH_PX - 1, h=MIN_GLYPH_PX) # too narrow + _draw_black_rect(img, x=110, y=10, w=MIN_GLYPH_PX, h=MIN_GLYPH_PX - 1) # too short + scan = tmp_path / "scan.png" + _save_png(img, scan) + + glyphs = extract_glyphs(scan) + + assert len(glyphs) == 1 + assert glyphs[0].x == 10 + + +def test_extract_glyphs_respects_custom_min_dimension(tmp_path: Path) -> None: + img = _white_image(200, 100) + _draw_black_rect(img, x=10, y=10, w=10, h=10) + _draw_black_rect(img, x=60, y=10, w=30, h=30) + scan = tmp_path / "scan.png" + _save_png(img, scan) + + glyphs_strict = extract_glyphs(scan, min_dimension=20) + glyphs_loose = extract_glyphs(scan, min_dimension=8) + + assert len(glyphs_strict) == 1 # only the 30×30 blob passes + assert len(glyphs_loose) == 2 + + +def test_extract_glyphs_filters_large_blobs(tmp_path: Path) -> None: + """Blobs exceeding max_area should be dropped.""" + img = _white_image(200, 100) + _draw_black_rect(img, x=5, y=5, w=30, h=30) # 900 px² — below ceiling + _draw_black_rect(img, x=100, y=5, w=60, h=60) # 3600 px² — above ceiling + scan = tmp_path / "scan.png" + _save_png(img, scan) + + glyphs = extract_glyphs(scan, max_area=1000) + + assert len(glyphs) == 1 + assert glyphs[0].x == 5 + + +def test_extract_glyphs_default_max_area_drops_page_blobs(tmp_path: Path) -> None: + """A blob covering >10 % of the page should be dropped by default.""" + img = _white_image(100, 100) # 10 000 px total + _draw_black_rect(img, x=0, y=0, w=100, h=100) # fills entire page + scan = tmp_path / "scan.png" + _save_png(img, scan) + + # The single blob covers 100 % of the page, so it should be dropped. + glyphs = extract_glyphs(scan) + assert glyphs == [] + + +def test_extract_glyphs_empty_image(tmp_path: Path) -> None: + """An all-white image should yield no blobs.""" + img = _white_image(100, 100) + scan = tmp_path / "scan.png" + _save_png(img, scan) + + assert extract_glyphs(scan) == [] + + +def test_extract_glyphs_raises_on_missing_image(tmp_path: Path) -> None: + with pytest.raises(ExtractionError, match="could not load image"): + extract_glyphs(tmp_path / "nonexistent.png") + + +def test_extract_glyphs_raises_on_bad_min_dimension(tmp_path: Path) -> None: + img = _white_image(50, 50) + scan = tmp_path / "scan.png" + _save_png(img, scan) + with pytest.raises(ExtractionError, match="min_dimension"): + extract_glyphs(scan, min_dimension=0) + + +def test_extract_glyphs_sorting_is_hebrew_order(tmp_path: Path) -> None: + """Hebrew reading order: top-to-bottom rows, right-to-left within rows.""" + img = _white_image(300, 200) + # Row 1 (y=10): three blobs, right-to-left order expected + _draw_black_rect(img, x=200, y=10, w=20, h=20) # rightmost + _draw_black_rect(img, x=100, y=10, w=20, h=20) # middle + _draw_black_rect(img, x=10, y=10, w=20, h=20) # leftmost + # Row 2 (y=100): one blob + _draw_black_rect(img, x=150, y=100, w=20, h=20) + scan = tmp_path / "scan.png" + _save_png(img, scan) + + glyphs = extract_glyphs(scan) + + assert len(glyphs) == 4 + # First three from row 1, right-to-left + assert glyphs[0].x == 200 + assert glyphs[1].x == 100 + assert glyphs[2].x == 10 + # Last blob from row 2 + assert glyphs[3].y >= 100 + + +# --------------------------------------------------------------------------- +# crop_glyph +# --------------------------------------------------------------------------- + + +def test_crop_glyph_returns_png_bytes(tmp_path: Path) -> None: + img = _white_image(100, 100) + _draw_black_rect(img, x=10, y=10, w=20, h=20) + scan = tmp_path / "scan.png" + _save_png(img, scan) + + glyph = Glyph(x=10, y=10, width=20, height=20) + png_bytes = crop_glyph(scan, glyph) + + assert isinstance(png_bytes, bytes) + assert len(png_bytes) > 0 + # PNG magic bytes + assert png_bytes[:4] == b"\x89PNG" + + +def test_crop_glyph_dimensions_match_bbox(tmp_path: Path) -> None: + img = _white_image(100, 100) + _draw_black_rect(img, x=5, y=5, w=25, h=35) + scan = tmp_path / "scan.png" + _save_png(img, scan) + + glyph = Glyph(x=5, y=5, width=25, height=35) + png_bytes = crop_glyph(scan, glyph) + + # Decode the PNG and check dimensions + buf = np.frombuffer(png_bytes, dtype=np.uint8) + decoded = cv2.imdecode(buf, cv2.IMREAD_GRAYSCALE) + assert decoded is not None + assert decoded.shape == (35, 25) # (height, width) + + +def test_crop_glyph_is_binarised(tmp_path: Path) -> None: + """The cropped image should contain only 0 and 255 pixel values.""" + img = _white_image(100, 100) + # Draw a grey rectangle (not pure black) — after Otsu binarisation it becomes black + img[20:40, 20:40] = 80 + scan = tmp_path / "scan.png" + _save_png(img, scan) + + glyph = Glyph(x=20, y=20, width=20, height=20) + png_bytes = crop_glyph(scan, glyph) + + buf = np.frombuffer(png_bytes, dtype=np.uint8) + decoded = cv2.imdecode(buf, cv2.IMREAD_GRAYSCALE) + assert decoded is not None + unique_vals = set(decoded.flatten().tolist()) + assert unique_vals <= {0, 255} + + +def test_crop_glyph_is_deterministic(tmp_path: Path) -> None: + """Calling crop_glyph twice on the same input returns identical bytes.""" + img = _white_image(100, 100) + _draw_black_rect(img, x=10, y=10, w=20, h=20) + scan = tmp_path / "scan.png" + _save_png(img, scan) + + glyph = Glyph(x=10, y=10, width=20, height=20) + assert crop_glyph(scan, glyph) == crop_glyph(scan, glyph) + + +def test_crop_glyph_raises_on_missing_image(tmp_path: Path) -> None: + glyph = Glyph(x=0, y=0, width=10, height=10) + with pytest.raises(ExtractionError, match="could not load image"): + crop_glyph(tmp_path / "nonexistent.png", glyph) + + +def test_crop_glyph_raises_on_out_of_bounds(tmp_path: Path) -> None: + img = _white_image(50, 50) + scan = tmp_path / "scan.png" + _save_png(img, scan) + + out_of_bounds = Glyph(x=40, y=40, width=20, height=20) # extends past 50×50 + with pytest.raises(ExtractionError, match="outside"): + crop_glyph(scan, out_of_bounds) diff --git a/tests/test_generate_profile.py b/tests/test_generate_profile.py new file mode 100644 index 0000000..4bc8482 --- /dev/null +++ b/tests/test_generate_profile.py @@ -0,0 +1,218 @@ +"""Tests for the generation profile loader.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from hletterscriptgen.generate_profile import ( + GenerateProfile, + GenerateProfileError, + GlyphAnnotation, + ScanAnnotation, + WriterAnnotation, + load_generate_profile, +) + +FIXTURE_DIR = Path(__file__).resolve().parent / "fixtures" / "generate_profile" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _write_profile(tmp_path: Path, payload: object) -> Path: + p = tmp_path / "profile.json" + p.write_text(json.dumps(payload), encoding="utf-8") + return p + + +_MINIMAL_GLYPH = {"letter": "א", "x": 0, "y": 0, "width": 20, "height": 20} +_MINIMAL_SCAN = {"entry_id": "e__s__p0001", "glyphs": [_MINIMAL_GLYPH]} +_MINIMAL_WRITER = { + "writer_id": "w1", + "attribution_method": "manual_review", + "scans": [_MINIMAL_SCAN], +} +_MINIMAL_PROFILE = { + "upstream_checkout": ".", + "writers": [_MINIMAL_WRITER], +} + + +# --------------------------------------------------------------------------- +# Valid profile +# --------------------------------------------------------------------------- + + +def test_load_valid_fixture() -> None: + profile, raw = load_generate_profile(FIXTURE_DIR / "valid_profile.json") + assert isinstance(profile, GenerateProfile) + assert len(profile.writers) == 1 + writer = profile.writers[0] + assert writer.writer_id == "writer_fixture_a" + assert writer.attribution_method == "manual_review" + assert writer.notes == "Fixture writer for tests" + assert len(writer.scans) == 1 + scan = writer.scans[0] + assert scan.entry_id == "fixture__eligible_pdm__p0001" + assert len(scan.glyphs) == 2 + assert scan.glyphs[0].letter == "א" + assert scan.glyphs[1].letter == "ב" + assert isinstance(raw, dict) + + +def test_load_returns_raw_dict(tmp_path: Path) -> None: + p = _write_profile(tmp_path, _MINIMAL_PROFILE) + _, raw = load_generate_profile(p) + assert raw == _MINIMAL_PROFILE + + +def test_upstream_checkout_resolved_relative_to_profile(tmp_path: Path) -> None: + sub = tmp_path / "configs" + sub.mkdir() + p = sub / "profile.json" + p.write_text(json.dumps({**_MINIMAL_PROFILE, "upstream_checkout": "../data"}), encoding="utf-8") + profile, _ = load_generate_profile(p) + assert profile.upstream_checkout == (tmp_path / "data").resolve() + + +def test_glyph_notes_optional(tmp_path: Path) -> None: + glyph_with_notes = {**_MINIMAL_GLYPH, "notes": "a test note"} + profile_data = { + "upstream_checkout": ".", + "writers": [ + {**_MINIMAL_WRITER, "scans": [{"entry_id": "e__s__p0001", "glyphs": [glyph_with_notes]}]}, + ], + } + p = _write_profile(tmp_path, profile_data) + profile, _ = load_generate_profile(p) + assert profile.writers[0].scans[0].glyphs[0].notes == "a test note" + + +# --------------------------------------------------------------------------- +# File-level errors +# --------------------------------------------------------------------------- + + +def test_raises_on_missing_file(tmp_path: Path) -> None: + with pytest.raises(GenerateProfileError, match="not found"): + load_generate_profile(tmp_path / "nonexistent.json") + + +def test_raises_on_invalid_json(tmp_path: Path) -> None: + p = tmp_path / "profile.json" + p.write_text("{not valid", encoding="utf-8") + with pytest.raises(GenerateProfileError, match="invalid JSON"): + load_generate_profile(p) + + +def test_raises_on_non_object_root(tmp_path: Path) -> None: + p = _write_profile(tmp_path, [1, 2, 3]) + with pytest.raises(GenerateProfileError, match="JSON object"): + load_generate_profile(p) + + +# --------------------------------------------------------------------------- +# upstream_checkout validation +# --------------------------------------------------------------------------- + + +def test_raises_on_missing_upstream_checkout(tmp_path: Path) -> None: + payload = {k: v for k, v in _MINIMAL_PROFILE.items() if k != "upstream_checkout"} + p = _write_profile(tmp_path, payload) + with pytest.raises(GenerateProfileError, match="upstream_checkout"): + load_generate_profile(p) + + +def test_raises_on_blank_upstream_checkout(tmp_path: Path) -> None: + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "upstream_checkout": " "}) + with pytest.raises(GenerateProfileError, match="upstream_checkout"): + load_generate_profile(p) + + +# --------------------------------------------------------------------------- +# writers validation +# --------------------------------------------------------------------------- + + +def test_raises_on_empty_writers(tmp_path: Path) -> None: + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": []}) + with pytest.raises(GenerateProfileError, match="writers"): + load_generate_profile(p) + + +def test_raises_on_duplicate_writer_ids(tmp_path: Path) -> None: + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [_MINIMAL_WRITER, _MINIMAL_WRITER]}) + with pytest.raises(GenerateProfileError, match="duplicate writer_id"): + load_generate_profile(p) + + +def test_raises_on_duplicate_entry_ids_across_writers(tmp_path: Path) -> None: + writer2 = {**_MINIMAL_WRITER, "writer_id": "w2"} + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [_MINIMAL_WRITER, writer2]}) + with pytest.raises(GenerateProfileError, match="entry_id"): + load_generate_profile(p) + + +def test_raises_on_blank_writer_id(tmp_path: Path) -> None: + bad = {**_MINIMAL_WRITER, "writer_id": " "} + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [bad]}) + with pytest.raises(GenerateProfileError, match="writer_id"): + load_generate_profile(p) + + +def test_raises_on_empty_scans(tmp_path: Path) -> None: + bad = {**_MINIMAL_WRITER, "scans": []} + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [bad]}) + with pytest.raises(GenerateProfileError, match="scans"): + load_generate_profile(p) + + +# --------------------------------------------------------------------------- +# glyph validation +# --------------------------------------------------------------------------- + + +def test_raises_on_non_hebrew_letter(tmp_path: Path) -> None: + bad_glyph = {**_MINIMAL_GLYPH, "letter": "A"} + scan = {"entry_id": "e__s__p0001", "glyphs": [bad_glyph]} + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) + with pytest.raises(GenerateProfileError, match="Hebrew character"): + load_generate_profile(p) + + +def test_raises_on_multi_char_letter(tmp_path: Path) -> None: + bad_glyph = {**_MINIMAL_GLYPH, "letter": "אב"} + scan = {"entry_id": "e__s__p0001", "glyphs": [bad_glyph]} + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) + with pytest.raises(GenerateProfileError, match="Hebrew character"): + load_generate_profile(p) + + +def test_raises_on_negative_x(tmp_path: Path) -> None: + bad_glyph = {**_MINIMAL_GLYPH, "x": -1} + scan = {"entry_id": "e__s__p0001", "glyphs": [bad_glyph]} + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) + with pytest.raises(GenerateProfileError, match="≥ 0"): + load_generate_profile(p) + + +def test_raises_on_zero_width(tmp_path: Path) -> None: + bad_glyph = {**_MINIMAL_GLYPH, "width": 0} + scan = {"entry_id": "e__s__p0001", "glyphs": [bad_glyph]} + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) + with pytest.raises(GenerateProfileError, match="≥ 1"): + load_generate_profile(p) + + +@pytest.mark.parametrize("letter", ["א", "ב", "ג", "ת", "ך", "ם", "ן", "ף", "ץ"]) +def test_accepts_all_hebrew_letter_forms(tmp_path: Path, letter: str) -> None: + glyph = {**_MINIMAL_GLYPH, "letter": letter} + scan = {"entry_id": "e__s__p0001", "glyphs": [glyph]} + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) + profile, _ = load_generate_profile(p) + assert profile.writers[0].scans[0].glyphs[0].letter == letter diff --git a/tests/test_generator.py b/tests/test_generator.py new file mode 100644 index 0000000..c570704 --- /dev/null +++ b/tests/test_generator.py @@ -0,0 +1,296 @@ +"""End-to-end tests for the generator pipeline.""" + +from __future__ import annotations + +import json +import subprocess +from pathlib import Path + +import pytest + +cv2 = pytest.importorskip("cv2") +import numpy as np # noqa: E402 + +from hletterscriptgen.generate_profile import load_generate_profile # noqa: E402 +from hletterscriptgen.generator import GeneratorError, generate # noqa: E402 + +# --------------------------------------------------------------------------- +# Helpers — synthetic upstream checkout + scan +# --------------------------------------------------------------------------- + +_ELIGIBLE_ENTRY = { + "entry_id": "test__writer_a__p0001", + "source_id": "test__writer_a", + "creators": [{"name": "Test Author", "role": "writer", "death_year": 1900}], + "files": [ + { + "role": "original", + "local_path": "data/scans/test__writer_a/p0001.png", + "sha256": "0" * 64, + "mime_type": "image/png", + "width_px": 100, + "height_px": 100, + } + ], + "rights": { + "license_expression": "PDM-1.0", + "commercial_use_allowed": True, + "derivatives_allowed": True, + "scan_redistribution_allowed": True, + "verification_status": "primary_page_checked", + }, + "quality": {"usable_for_htr": True, "legibility": "high"}, +} + + +def _git(path: Path, *args: str) -> None: + subprocess.run(["git", "-C", str(path), *args], check=True, capture_output=True) + + +def _make_upstream_checkout(tmp_path: Path) -> Path: + """Create a minimal upstream git repo with one eligible entry and a scan.""" + repo = tmp_path / "upstream" + repo.mkdir() + + # Initialise git repo + _git(repo, "init", "-q", "-b", "main") + _git(repo, "config", "user.email", "test@example.com") + _git(repo, "config", "user.name", "Test") + _git(repo, "config", "commit.gpgsign", "false") + _git(repo, "remote", "add", "origin", "https://github.com/HeOCR/public-domain-hand-written-hebrew-scans.git") + + # entries.jsonl + index_dir = repo / "data" / "index" + index_dir.mkdir(parents=True) + (index_dir / "entries.jsonl").write_text( + json.dumps(_ELIGIBLE_ENTRY) + "\n", encoding="utf-8" + ) + + # Synthetic scan PNG: white 100×100 with two black 20×20 blobs + scan_dir = repo / "data" / "scans" / "test__writer_a" + scan_dir.mkdir(parents=True) + img = np.full((100, 100, 3), 255, dtype=np.uint8) + img[5:25, 5:25] = 0 # blob at (5,5) 20×20 + img[5:25, 35:55] = 0 # blob at (35,5) 20×20 + cv2.imwrite(str(scan_dir / "p0001.png"), img) + + _git(repo, "add", ".") + _git(repo, "commit", "-q", "-m", "initial") + return repo + + +def _make_profile(tmp_path: Path, upstream: Path) -> Path: + """Write a generation profile that uses the upstream checkout.""" + profile_data = { + "upstream_checkout": str(upstream), + "writers": [ + { + "writer_id": "writer_test_a", + "attribution_method": "manual_review", + "scans": [ + { + "entry_id": "test__writer_a__p0001", + "glyphs": [ + {"letter": "א", "x": 5, "y": 5, "width": 20, "height": 20}, + {"letter": "ב", "x": 35, "y": 5, "width": 20, "height": 20}, + ], + } + ], + } + ], + } + p = tmp_path / "profile.json" + p.write_text(json.dumps(profile_data), encoding="utf-8") + return p + + +# --------------------------------------------------------------------------- +# generate — happy path +# --------------------------------------------------------------------------- + + +def test_generate_produces_letter_set_json(tmp_path: Path) -> None: + upstream = _make_upstream_checkout(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + profile, raw = load_generate_profile(profile_path) + paths = generate(profile, raw, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + assert len(paths) == 1 + letter_set_path = paths[0] + assert letter_set_path.exists() + assert letter_set_path.name == "letter_set.json" + + +def test_generate_letter_set_validates(tmp_path: Path) -> None: + from hletterscriptgen.validation import validate_document + + upstream = _make_upstream_checkout(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + profile, raw = load_generate_profile(profile_path) + paths = generate(profile, raw, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + doc = json.loads(paths[0].read_text(encoding="utf-8")) + result = validate_document(doc) + assert result.ok, [i.format() for i in result.issues] + + +def test_generate_letter_set_content(tmp_path: Path) -> None: + upstream = _make_upstream_checkout(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + profile, raw = load_generate_profile(profile_path) + paths = generate(profile, raw, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + doc = json.loads(paths[0].read_text(encoding="utf-8")) + assert doc["schema_version"] == "letter_set.v1" + assert doc["writer_id"] == "writer_test_a" + assert doc["upstream"]["repo"] == "HeOCR/public-domain-hand-written-hebrew-scans" + assert doc["generator"]["name"] == "hletterscriptgen" + # Both annotated letters must appear + assert "א" in doc["letters"] + assert "ב" in doc["letters"] + assert len(doc["letters"]["א"]) == 1 + assert len(doc["letters"]["ב"]) == 1 + + +def test_generate_writes_glyph_pngs(tmp_path: Path) -> None: + upstream = _make_upstream_checkout(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + profile, raw = load_generate_profile(profile_path) + paths = generate(profile, raw, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + doc = json.loads(paths[0].read_text(encoding="utf-8")) + writer_dir = paths[0].parent + + for letter, variants in doc["letters"].items(): + for variant in variants: + asset = writer_dir / variant["asset_path"] + assert asset.exists(), f"asset missing: {asset}" + assert asset.read_bytes()[:4] == b"\x89PNG" + + +def test_generate_checksum_matches_png(tmp_path: Path) -> None: + import hashlib + + upstream = _make_upstream_checkout(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + profile, raw = load_generate_profile(profile_path) + paths = generate(profile, raw, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + doc = json.loads(paths[0].read_text(encoding="utf-8")) + writer_dir = paths[0].parent + + for variants in doc["letters"].values(): + for variant in variants: + png_bytes = (writer_dir / variant["asset_path"]).read_bytes() + assert hashlib.sha256(png_bytes).hexdigest() == variant["checksum_sha256"] + + +def test_generate_is_deterministic(tmp_path: Path) -> None: + """Same profile + upstream + generated_at → bit-identical output.""" + upstream = _make_upstream_checkout(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + + out1 = tmp_path / "out1" + out2 = tmp_path / "out2" + + profile, raw = load_generate_profile(profile_path) + ts = "2025-06-01T12:00:00+00:00" + paths1 = generate(profile, raw, out1, generated_at=ts) + paths2 = generate(profile, raw, out2, generated_at=ts) + + doc1 = json.loads(paths1[0].read_text(encoding="utf-8")) + doc2 = json.loads(paths2[0].read_text(encoding="utf-8")) + assert doc1 == doc2 + + # PNG bytes should be identical too + for variants in doc1["letters"].values(): + for variant in variants: + b1 = (out1 / "writer_test_a" / variant["asset_path"]).read_bytes() + b2 = (out2 / "writer_test_a" / variant["asset_path"]).read_bytes() + assert b1 == b2 + + +def test_generate_raises_when_no_glyphs_extracted(tmp_path: Path) -> None: + """A profile whose scans all have missing files should raise GeneratorError.""" + upstream = _make_upstream_checkout(tmp_path) + # Point the profile at a non-existent entry_id + profile_data = { + "upstream_checkout": str(upstream), + "writers": [ + { + "writer_id": "writer_test_a", + "attribution_method": "manual_review", + "scans": [ + { + "entry_id": "does__not__exist__p0001", + "glyphs": [{"letter": "א", "x": 0, "y": 0, "width": 20, "height": 20}], + } + ], + } + ], + } + p = tmp_path / "profile.json" + p.write_text(json.dumps(profile_data), encoding="utf-8") + + profile, raw = load_generate_profile(p) + with pytest.raises(GeneratorError, match="no glyphs"): + generate(profile, raw, tmp_path / "out", generated_at="2025-01-01T00:00:00+00:00") + + +# --------------------------------------------------------------------------- +# CLI integration +# --------------------------------------------------------------------------- + + +def test_cli_generate_exits_ok(tmp_path: Path) -> None: + from hletterscriptgen.cli import main + + upstream = _make_upstream_checkout(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + rc = main([ + "generate", + "--profile", str(profile_path), + "--output", str(output_dir), + "--generated-at", "2025-01-01T00:00:00+00:00", + ]) + assert rc == 0 + assert (output_dir / "writer_test_a" / "letter_set.json").exists() + + +def test_cli_scan_blobs_exits_ok(tmp_path: Path) -> None: + from hletterscriptgen.cli import main + + img = np.full((100, 100, 3), 255, dtype=np.uint8) + img[10:30, 10:30] = 0 + scan = tmp_path / "scan.png" + cv2.imwrite(str(scan), img) + + rc = main(["scan-blobs", str(scan)]) + assert rc == 0 + + +def test_cli_scan_blobs_text_format(tmp_path: Path, capsys: pytest.CaptureFixture[str]) -> None: + from hletterscriptgen.cli import main + + img = np.full((100, 100, 3), 255, dtype=np.uint8) + img[10:30, 10:30] = 0 + scan = tmp_path / "scan.png" + cv2.imwrite(str(scan), img) + + rc = main(["scan-blobs", str(scan), "--format", "text"]) + assert rc == 0 + out = capsys.readouterr().out + assert "blob" in out + assert "detected" in out From 3031b6439e91b8691cd7b718b83f7865a3a53deb Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Mon, 25 May 2026 00:01:58 +0300 Subject: [PATCH 2/3] =?UTF-8?q?refactor(m3):=20address=20review=20feedback?= =?UTF-8?q?=20=E2=80=94=20all=2015=20issues?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix 1 (crop_glyph perf): add binarize_scan()/crop_binary(); update generator to call binarize_scan once per scan and crop_binary per glyph, avoiding redundant image I/O and Otsu re-threshold on every crop. Fix 2 (cv2 import dup): extract _require_cv2() helper used by all extractor functions; eliminates the duplicated try/except ImportError. Fix 3 (dead var glyph_dir): remove unused 'glyph_dir' assignment from generator._process_writer (now _extract_variants). Fix 4 (notes dropped): GlyphAnnotation.notes now propagated to variant['notes'] in the output document when set. Fix 5 (broad except): replace bare 'except Exception' in generate() with 'except (UpstreamError, OSError)'. Fix 6 (tuple return): load_generate_profile() now returns a plain GenerateProfile (not a tuple); config_hash is embedded as a field computed at load time. All callers (cli.py, tests) updated. Fix 7 (_process_writer decomp): split 160-line function into _extract_variants() (I/O + PNG writes) and _build_document() (JSON assembly); _process_writer() orchestrates them. Fix 8 (microseconds): generated_at default now uses datetime.now(UTC).replace(microsecond=0).isoformat() for reproducibility. Fix 9 (variant_id separator): change separator between entry_id, letter, and coords from __ to @ to avoid ambiguity with entry_ids that themselves use __ as separators. Fix 10 (import warnings inside fn): move 'import warnings as _warnings' to top-level; rename local list variable to pending_warnings throughout. Fix 11 (sort docstring): update Glyph docstring to say 'ascending y then descending x' instead of the imprecise 'Hebrew reading order'. Fix 12 (max area fraction public): rename _DEFAULT_MAX_AREA_FRACTION to DEFAULT_MAX_AREA_FRACTION; add to __all__. Fix 13 (connectivity comment): add inline comment explaining why connectivity=8 is chosen over 4 (diagonal contacts in Hebrew cursive). Fix 14 (mock tests): add 5 mock-based tests in test_generator.py that exercise document assembly, validation, PNG writes, notes propagation, and config_hash embedding without requiring real cv2 image I/O. Fix 15 (CI smoke): install [test,cv] in CI test job (was [test] only) so cv-dependent tests actually run; add scan-blobs CLI smoke step. Pre-existing ruff issues also cleaned up: sorted __all__, unsorted import block in generate_profile, ambiguous × in comments/strings, unused ScanAnnotation/EXIT_NOT_IMPLEMENTED imports. Co-Authored-By: Claude Sonnet 4.6 --- .github/workflows/ci.yml | 12 +- src/hletterscriptgen/cli.py | 3 +- src/hletterscriptgen/extractor.py | 163 ++++++++++++++------ src/hletterscriptgen/generate_profile.py | 31 ++-- src/hletterscriptgen/generator.py | 171 ++++++++++++++------- tests/test_cli.py | 1 - tests/test_extractor.py | 4 +- tests/test_generate_profile.py | 39 ++--- tests/test_generator.py | 188 ++++++++++++++++++++--- 9 files changed, 458 insertions(+), 154 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e050af9..fa20cd8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -86,7 +86,7 @@ jobs: - name: Install package run: | python -m pip install --upgrade pip setuptools wheel - python -m pip install -e ".[test]" + python -m pip install -e ".[test,cv]" - name: Run tests run: python -m pytest @@ -100,6 +100,16 @@ jobs: - name: CLI smoke — validate example run: hletterscriptgen validate examples/letter_set/writer_example.json --format json + - name: CLI smoke — scan-blobs + run: | + python -c " + import cv2, numpy as np + img = np.full((100, 100, 3), 255, dtype=np.uint8) + img[10:30, 10:30] = 0 + cv2.imwrite('/tmp/smoke_scan.png', img) + " + hletterscriptgen scan-blobs /tmp/smoke_scan.png --format json + package: name: Build distributions runs-on: ubuntu-latest diff --git a/src/hletterscriptgen/cli.py b/src/hletterscriptgen/cli.py index 5331122..6214da1 100644 --- a/src/hletterscriptgen/cli.py +++ b/src/hletterscriptgen/cli.py @@ -223,7 +223,7 @@ def _cmd_generate(args: argparse.Namespace) -> int: from hletterscriptgen.generator import GeneratorError, generate try: - profile, raw = load_generate_profile(args.profile) + profile = load_generate_profile(args.profile) except GenerateProfileError as exc: print(str(exc), file=sys.stderr) return EXIT_INPUT_ERROR @@ -231,7 +231,6 @@ def _cmd_generate(args: argparse.Namespace) -> int: try: output_paths = generate( profile, - raw, args.output, generated_at=args.generated_at, ) diff --git a/src/hletterscriptgen/extractor.py b/src/hletterscriptgen/extractor.py index a8b8d8c..be0c668 100644 --- a/src/hletterscriptgen/extractor.py +++ b/src/hletterscriptgen/extractor.py @@ -14,15 +14,19 @@ The module exposes: * :data:`MIN_GLYPH_PX` — minimum bounding-box dimension (px). +* :data:`DEFAULT_MAX_AREA_FRACTION` — default upper-area ceiling as a fraction of page area. * :class:`Glyph` — frozen dataclass for a detected blob's bounding box. +* :func:`binarize_scan` — load a scan and return its Otsu-binarised array. +* :func:`crop_binary` — crop a glyph from an already-binarised array. * :func:`extract_glyphs` — detect blobs in a scan via CCA. -* :func:`crop_glyph` — crop one blob from the binarised scan, return PNG bytes. +* :func:`crop_glyph` — convenience wrapper: binarize a scan and crop one blob. """ from __future__ import annotations from dataclasses import dataclass from pathlib import Path +from typing import Any # Minimum bounding-box width AND height in pixels, per issue #16 decision D2. MIN_GLYPH_PX: int = 16 @@ -31,7 +35,7 @@ # more than this fraction of the page are assumed to be noise (stamps, # ruled lines, bleed-through from the verso). Empirical calibration of # this value is deferred to a later sub-PR; 10 % is a conservative default. -_DEFAULT_MAX_AREA_FRACTION: float = 0.10 +DEFAULT_MAX_AREA_FRACTION: float = 0.10 class ExtractionError(Exception): @@ -42,6 +46,19 @@ class ExtractionError(Exception): """ +def _require_cv2() -> Any: + """Return the ``cv2`` module, raising :class:`ExtractionError` if not installed.""" + try: + import cv2 + + return cv2 + except ImportError as exc: + raise ExtractionError( + "opencv-python-headless is required for glyph extraction; " + "install it with: pip install hletterscriptgen[cv]" + ) from exc + + @dataclass(frozen=True) class Glyph: """Bounding box of a connected component detected in a scan image. @@ -51,9 +68,8 @@ class Glyph: its extent. All values are non-negative integers; ``width`` and ``height`` are ≥ 1. - Glyphs returned by :func:`extract_glyphs` are sorted by - ``(y, -x)`` — top-to-bottom rows, right-to-left within each row — - consistent with Hebrew reading order. + Glyphs returned by :func:`extract_glyphs` are sorted by ascending ``y`` + first, then descending ``x`` within each row. """ x: int @@ -62,6 +78,81 @@ class Glyph: height: int +def binarize_scan(image_path: Path) -> Any: + """Load a scan image and return a binarised (Otsu) single-channel array. + + Applies ``THRESH_BINARY_INV | THRESH_OTSU`` so that ink pixels become + foreground (255) and background pixels become 0. The result is a + 2-D uint8 NumPy array with the same spatial dimensions as the source image. + + Calling this once and passing the result to :func:`crop_binary` for each + glyph is more efficient than calling :func:`crop_glyph` per glyph, because + it avoids redundant image I/O and re-binarisation. + + Parameters + ---------- + image_path: + Path to the scan image (JPEG, PNG, TIFF, or any format OpenCV can decode). + + Raises + ------ + ExtractionError + When ``opencv-python-headless`` is not installed or the image cannot + be read. + """ + cv2 = _require_cv2() + img = cv2.imread(str(image_path)) + if img is None: + raise ExtractionError(f"could not load image: {image_path}") + grey = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + _, binary = cv2.threshold(grey, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU) + return binary + + +def crop_binary(binary: Any, glyph: Glyph) -> bytes: + """Crop a glyph region from a binarised array and return PNG bytes. + + Unlike :func:`crop_glyph`, this function operates on an already-binarised + array (the result of :func:`binarize_scan`), avoiding a redundant image + load and Otsu threshold per glyph. Prefer this when cropping multiple + glyphs from the same scan. + + Parameters + ---------- + binary: + A 2-D uint8 NumPy array produced by :func:`binarize_scan` (or + equivalent Otsu binarisation). Shape is ``(height, width)``. + glyph: + Bounding box to crop, in the image's pixel space. + + Returns + ------- + bytes + PNG-encoded bytes of the cropped region. + + Raises + ------ + ExtractionError + When OpenCV is not installed, the glyph bbox falls outside the array + dimensions, or PNG encoding fails. + """ + cv2 = _require_cv2() + img_h, img_w = binary.shape[:2] + x, y, w, h = glyph.x, glyph.y, glyph.width, glyph.height + + if x < 0 or y < 0 or x + w > img_w or y + h > img_h: + raise ExtractionError( + f"glyph bbox (x={x}, y={y}, w={w}, h={h}) falls outside " + f"image dimensions {img_w}x{img_h}" + ) + + crop = binary[y : y + h, x : x + w] + ok, buf = cv2.imencode(".png", crop) + if not ok: + raise ExtractionError("failed to encode glyph crop as PNG") + return bytes(buf.tobytes()) + + def extract_glyphs( image_path: Path, *, @@ -77,12 +168,15 @@ def extract_glyphs( 3. Binarise with Otsu's method (``THRESH_BINARY_INV`` — ink pixels become foreground / 255, background becomes 0). 4. Run ``connectedComponentsWithStats`` to label foreground blobs. + 8-connectivity is used so that diagonal contacts (common in cursive + Hebrew script) are treated as part of the same component; 4-connectivity + would incorrectly split glyphs at diagonal junctions. 5. Drop blobs where ``width < min_dimension`` or ``height < min_dimension`` (quality floor, issue #16 D2). 6. Drop blobs whose pixel area exceeds ``max_area`` (noise ceiling; defaults to 10 % of total image area when ``None``). - 7. Sort survivors by ``(y, -x)`` — top row first, right-to-left within - a row — and return them as :class:`Glyph` records. + 7. Sort survivors by ascending ``y`` first, then descending ``x`` within + each row, and return them as :class:`Glyph` records. .. note:: Deskew pre-processing is intentionally omitted from M3 — the @@ -118,13 +212,7 @@ def extract_glyphs( if min_dimension < 1: raise ExtractionError(f"min_dimension must be ≥ 1, got {min_dimension}") - try: - import cv2 - except ImportError as exc: - raise ExtractionError( - "opencv-python-headless is required for glyph extraction; " - "install it with: pip install hletterscriptgen[cv]" - ) from exc + cv2 = _require_cv2() img = cv2.imread(str(image_path)) if img is None: @@ -132,12 +220,14 @@ def extract_glyphs( img_h, img_w = img.shape[:2] effective_max_area = ( - max_area if max_area is not None else int(img_h * img_w * _DEFAULT_MAX_AREA_FRACTION) + max_area if max_area is not None else int(img_h * img_w * DEFAULT_MAX_AREA_FRACTION) ) grey = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) _, binary = cv2.threshold(grey, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU) + # 8-connectivity: diagonal contacts (common in Hebrew cursive) belong to the + # same component; 4-connectivity would incorrectly split them. num_labels, _, stats, _ = cv2.connectedComponentsWithStats(binary, connectivity=8) glyphs: list[Glyph] = [] @@ -156,13 +246,18 @@ def extract_glyphs( glyphs.append(Glyph(x=x, y=y, width=w, height=h)) - # Sort: top-to-bottom by y, then right-to-left by x within each row. + # Sort: ascending y (top-to-bottom rows), then descending x within each row. glyphs.sort(key=lambda g: (g.y, -g.x)) return glyphs def crop_glyph(image_path: Path, glyph: Glyph) -> bytes: - """Crop a glyph region from the binarised scan and return PNG bytes. + """Crop a glyph region from a scan image and return PNG bytes. + + Convenience wrapper around :func:`binarize_scan` and :func:`crop_binary`. + When cropping multiple glyphs from the same scan, prefer calling + :func:`binarize_scan` once and :func:`crop_binary` for each glyph to + avoid re-loading and re-binarising the image on every call. The crop is taken from the *binarised* (Otsu) image, not the original colour scan, so the returned PNG contains only black ink (255) and @@ -187,42 +282,16 @@ def crop_glyph(image_path: Path, glyph: Glyph) -> bytes: glyph bbox falls outside the image boundaries, or PNG encoding fails. """ - try: - import cv2 - except ImportError as exc: - raise ExtractionError( - "opencv-python-headless is required for glyph extraction; " - "install it with: pip install hletterscriptgen[cv]" - ) from exc - - img = cv2.imread(str(image_path)) - if img is None: - raise ExtractionError(f"could not load image: {image_path}") - - img_h, img_w = img.shape[:2] - x, y, w, h = glyph.x, glyph.y, glyph.width, glyph.height - - if x < 0 or y < 0 or x + w > img_w or y + h > img_h: - raise ExtractionError( - f"glyph bbox (x={x}, y={y}, w={w}, h={h}) falls outside " - f"image dimensions {img_w}×{img_h}: {image_path}" - ) - - grey = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - _, binary = cv2.threshold(grey, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU) - crop = binary[y : y + h, x : x + w] - - ok, buf = cv2.imencode(".png", crop) - if not ok: - raise ExtractionError(f"failed to encode glyph crop as PNG: {image_path}") - - return bytes(buf.tobytes()) + return crop_binary(binarize_scan(image_path), glyph) __all__ = [ + "DEFAULT_MAX_AREA_FRACTION", "MIN_GLYPH_PX", "ExtractionError", "Glyph", + "binarize_scan", + "crop_binary", "crop_glyph", "extract_glyphs", ] diff --git a/src/hletterscriptgen/generate_profile.py b/src/hletterscriptgen/generate_profile.py index fa049d3..7b5ad60 100644 --- a/src/hletterscriptgen/generate_profile.py +++ b/src/hletterscriptgen/generate_profile.py @@ -43,11 +43,12 @@ * :class:`GlyphAnnotation` — a single bbox + letter label. * :class:`ScanAnnotation` — all annotated glyphs for one upstream scan. * :class:`WriterAnnotation` — all annotated scans for one writer. -* :class:`GenerateProfile` — top-level config object. +* :class:`GenerateProfile` — top-level config object (includes pre-computed + ``config_hash`` for embedding in output documents). * :class:`GenerateProfileError` — base error class. * :func:`load_generate_profile` — read, validate, and return a - ``(GenerateProfile, raw_dict)`` pair; the raw dict is suitable for - passing to :func:`hletterscriptgen.hashing.config_hash`. + :class:`GenerateProfile`; the profile's ``config_hash`` field is computed + from the raw JSON at load time. """ from __future__ import annotations @@ -58,7 +59,9 @@ from typing import Any from hletterscriptgen import HEBREW_LETTERS - +from hletterscriptgen.hashing import ( + config_hash as _compute_config_hash, +) # --------------------------------------------------------------------------- # Errors @@ -134,10 +137,15 @@ class GenerateProfile: ``upstream_checkout`` is the path to the local upstream repo checkout (used for both pinning the revision and resolving scan file paths). ``writers`` is a non-empty tuple of :class:`WriterAnnotation` records. + ``config_hash`` is the SHA-256 hex digest of the canonical-JSON serialisation + of the raw profile dict, computed at load time by :func:`load_generate_profile`. + It is embedded in the ``generator.config_hash`` field of output documents so + that the profile version that produced a dataset can be reconstructed. """ upstream_checkout: Path writers: tuple[WriterAnnotation, ...] + config_hash: str # --------------------------------------------------------------------------- @@ -264,13 +272,14 @@ def _parse_writer(raw: Any, index: int, *, path: Path) -> WriterAnnotation: # --------------------------------------------------------------------------- -def load_generate_profile(path: Path) -> tuple[GenerateProfile, dict[str, Any]]: +def load_generate_profile(path: Path) -> GenerateProfile: """Read and validate a generation profile JSON file. - Returns ``(profile, raw_dict)`` where ``raw_dict`` is the original - parsed JSON object — pass it to - :func:`hletterscriptgen.hashing.config_hash` to compute - ``generator.config_hash`` for the output ``letter_set.v1`` document. + Returns a :class:`GenerateProfile` whose ``config_hash`` field contains + the SHA-256 hex digest of the canonical-JSON serialisation of the raw + profile dict. That hash is embedded in the ``generator.config_hash`` + field of output ``letter_set.v1`` documents so that the profile version + that produced a dataset can be reconstructed. Raises :class:`GenerateProfileError` when the file is missing, not valid JSON, or structurally invalid. Raises :class:`OSError` for @@ -324,11 +333,11 @@ def load_generate_profile(path: Path) -> tuple[GenerateProfile, dict[str, Any]]: # Resolve upstream_checkout relative to the profile file's parent dir. checkout_path = (path.parent / raw_checkout).resolve() - profile = GenerateProfile( + return GenerateProfile( upstream_checkout=checkout_path, writers=writers, + config_hash=_compute_config_hash(raw), ) - return profile, raw __all__ = [ diff --git a/src/hletterscriptgen/generator.py b/src/hletterscriptgen/generator.py index e9226af..2aa46bc 100644 --- a/src/hletterscriptgen/generator.py +++ b/src/hletterscriptgen/generator.py @@ -7,7 +7,7 @@ 3. For each writer → each scan → each annotated glyph: a. Look up the upstream entry; skip ineligible entries (warn). b. Resolve the scan file path from the upstream checkout. - c. Crop the glyph from the binarised scan. + c. Binarise the scan once, then crop each glyph from the binary array. d. Write the PNG to the output tree. e. Record a ``variant`` for the letter_set.v1 document. 4. Build and validate the ``letter_set.v1`` document for each writer. @@ -19,7 +19,8 @@ / letter_set.json glyphs/ - _______.png + / + @@___.png All paths in ``letter_set.json`` are POSIX-relative to the writer's output directory. The ``variant_id`` is derived deterministically from @@ -38,19 +39,18 @@ import hashlib import json -from datetime import datetime, timezone +import warnings as _warnings +from datetime import UTC, datetime from pathlib import Path from typing import Any from hletterscriptgen import HEBREW_LETTERS, __version__ -from hletterscriptgen.extractor import ExtractionError, Glyph, crop_glyph +from hletterscriptgen.extractor import ExtractionError, Glyph, binarize_scan, crop_binary from hletterscriptgen.generate_profile import ( GenerateProfile, GlyphAnnotation, - ScanAnnotation, WriterAnnotation, ) -from hletterscriptgen.hashing import config_hash from hletterscriptgen.upstream import ( UpstreamEntry, UpstreamError, @@ -75,13 +75,18 @@ class GeneratorWarning(UserWarning): def _variant_id(entry_id: str, letter: str, glyph: GlyphAnnotation) -> str: - """Deterministic variant identifier derived from the bbox and letter.""" - return f"{entry_id}__{letter}__{glyph.x}_{glyph.y}_{glyph.width}_{glyph.height}" + """Deterministic variant identifier derived from the bbox and letter. + + Uses ``@`` as a separator between the entry_id, letter, and coordinates + so the boundary is unambiguous even when entry_ids themselves contain + double-underscores. + """ + return f"{entry_id}@{letter}@{glyph.x}_{glyph.y}_{glyph.width}_{glyph.height}" def _asset_path(entry_id: str, letter: str, glyph: GlyphAnnotation) -> str: """POSIX-relative asset path within the writer's output directory.""" - fname = f"{entry_id}__{letter}__{glyph.x}_{glyph.y}_{glyph.width}_{glyph.height}.png" + fname = f"{entry_id}@{letter}@{glyph.x}_{glyph.y}_{glyph.width}_{glyph.height}.png" return f"glyphs/{letter}/{fname}" @@ -120,24 +125,23 @@ def _resolve_scan_path( return None -def _process_writer( +def _extract_variants( writer: WriterAnnotation, entry_index: dict[str, UpstreamEntry], upstream_checkout: Path, writer_out_dir: Path, - profile_raw: dict[str, Any], - upstream_pin_repo: str, - upstream_pin_revision: str, generated_at: str, - warnings: list[str], -) -> dict[str, Any]: - """Build and write the letter_set.v1 document for one writer. + pending_warnings: list[str], +) -> tuple[dict[str, list[dict[str, Any]]], set[str], set[str]]: + """Crop all glyph variants for one writer and write PNG assets to disk. - Returns the letter_set.v1 document dict (already written to disk). - Appends non-fatal issues to ``warnings``. - Raises :class:`GeneratorError` on fatal conditions. + Returns ``(letters_map, observed_licenses, used_entry_ids)``. + Non-fatal issues (missing entries, ineligible scans, crop failures) are + appended to ``pending_warnings``. + + The scan image is binarised once per scan file; all glyph crops for that + scan share the same binary array, avoiding redundant I/O. """ - glyph_dir = writer_out_dir / "glyphs" letters_map: dict[str, list[dict[str, Any]]] = {} observed_licenses: set[str] = set() used_entry_ids: set[str] = set() @@ -145,14 +149,14 @@ def _process_writer( for scan in writer.scans: entry = entry_index.get(scan.entry_id) if entry is None: - warnings.append( + pending_warnings.append( f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r} " "not found in upstream entries — skipped" ) continue if not is_eligible(entry): - warnings.append( + pending_warnings.append( f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r} " "is not eligible — skipped" ) @@ -160,14 +164,14 @@ def _process_writer( scan_path = _resolve_scan_path(entry, upstream_checkout) if scan_path is None: - warnings.append( + pending_warnings.append( f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r} " "has no resolvable scan file — skipped" ) continue if not scan_path.is_file(): - warnings.append( + pending_warnings.append( f"writer {writer.writer_id!r}: scan file not found at " f"{scan_path} — skipped" ) @@ -175,17 +179,28 @@ def _process_writer( license_expr = entry.rights.license_expression if license_expr is None: - warnings.append( + pending_warnings.append( f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r} " "has no license_expression — skipped" ) continue + # Binarise the scan once; reuse the binary array for all glyphs in + # this scan to avoid re-loading and re-thresholding per glyph. + try: + binary = binarize_scan(scan_path) + except ExtractionError as exc: + pending_warnings.append( + f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r}: " + f"could not binarize scan ({exc}) — skipped" + ) + continue + used_entry_ids.add(scan.entry_id) for glyph_ann in scan.glyphs: if glyph_ann.letter not in HEBREW_LETTERS: - warnings.append( + pending_warnings.append( f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r}: " f"glyph letter {glyph_ann.letter!r} not a recognised " "Hebrew character — skipped" @@ -199,9 +214,9 @@ def _process_writer( height=glyph_ann.height, ) try: - png_bytes = crop_glyph(scan_path, glyph) + png_bytes = crop_binary(binary, glyph) except ExtractionError as exc: - warnings.append( + pending_warnings.append( f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r}: " f"crop failed ({exc}) — skipped" ) @@ -233,17 +248,27 @@ def _process_writer( }, "extracted_at": generated_at, } + if glyph_ann.notes is not None: + variant["notes"] = glyph_ann.notes letters_map.setdefault(glyph_ann.letter, []).append(variant) observed_licenses.add(license_expr) - if not letters_map: - raise GeneratorError( - f"writer {writer.writer_id!r}: no glyphs were successfully extracted; " - "check warnings for details" - ) + return letters_map, observed_licenses, used_entry_ids + - document: dict[str, Any] = { +def _build_document( + writer: WriterAnnotation, + letters_map: dict[str, list[dict[str, Any]]], + observed_licenses: set[str], + used_entry_ids: set[str], + upstream_pin_repo: str, + upstream_pin_revision: str, + config_hash: str, + generated_at: str, +) -> dict[str, Any]: + """Assemble and return the letter_set.v1 document dict for one writer.""" + return { "schema_version": "letter_set.v1", "writer_id": writer.writer_id, "writer_provenance": { @@ -255,7 +280,7 @@ def _process_writer( "generator": { "name": "hletterscriptgen", "version": __version__, - "config_hash": config_hash(profile_raw), + "config_hash": config_hash, }, "generated_at": generated_at, "upstream": { @@ -268,6 +293,50 @@ def _process_writer( }, } + +def _process_writer( + writer: WriterAnnotation, + entry_index: dict[str, UpstreamEntry], + upstream_checkout: Path, + writer_out_dir: Path, + upstream_pin_repo: str, + upstream_pin_revision: str, + config_hash: str, + generated_at: str, + pending_warnings: list[str], +) -> dict[str, Any]: + """Build and write the letter_set.v1 document for one writer. + + Returns the letter_set.v1 document dict (already written to disk). + Appends non-fatal issues to ``pending_warnings``. + Raises :class:`GeneratorError` on fatal conditions. + """ + letters_map, observed_licenses, used_entry_ids = _extract_variants( + writer=writer, + entry_index=entry_index, + upstream_checkout=upstream_checkout, + writer_out_dir=writer_out_dir, + generated_at=generated_at, + pending_warnings=pending_warnings, + ) + + if not letters_map: + raise GeneratorError( + f"writer {writer.writer_id!r}: no glyphs were successfully extracted; " + "check warnings for details" + ) + + document = _build_document( + writer=writer, + letters_map=letters_map, + observed_licenses=observed_licenses, + used_entry_ids=used_entry_ids, + upstream_pin_repo=upstream_pin_repo, + upstream_pin_revision=upstream_pin_revision, + config_hash=config_hash, + generated_at=generated_at, + ) + result = validate_document(document) if not result.ok: issues = "; ".join(i.format() for i in result.issues) @@ -291,7 +360,6 @@ def _process_writer( def generate( profile: GenerateProfile, - profile_raw: dict[str, Any], output_dir: Path, *, generated_at: str | None = None, @@ -301,17 +369,16 @@ def generate( Parameters ---------- profile: - Parsed generation profile (from :func:`~hletterscriptgen.generate_profile.load_generate_profile`). - profile_raw: - The original raw dict from the profile JSON file — used to compute - ``generator.config_hash`` in the output documents. + Parsed generation profile (from + :func:`~hletterscriptgen.generate_profile.load_generate_profile`). + The profile's ``config_hash`` field is embedded in output documents. output_dir: Root output directory. Created if it does not exist. generated_at: ISO 8601 timestamp string to embed in the output documents. - When ``None`` (default) the current UTC time is used. Pass a - fixed value for reproducible builds (see ``--generated-at`` on - the ``generate`` CLI). + When ``None`` (default) the current UTC time is used (second + precision, no microseconds). Pass a fixed value for reproducible + builds (see ``--generated-at`` on the ``generate`` CLI). Returns ------- @@ -327,20 +394,20 @@ def generate( When the upstream entries file cannot be loaded. """ if generated_at is None: - generated_at = datetime.now(tz=timezone.utc).isoformat() + generated_at = datetime.now(tz=UTC).replace(microsecond=0).isoformat() output_dir.mkdir(parents=True, exist_ok=True) # 1. Pin the upstream checkout. try: pin = upstream_pin_from_checkout(profile.upstream_checkout) - except Exception as exc: + except (UpstreamError, OSError) as exc: raise GeneratorError( f"could not pin upstream checkout at " f"{profile.upstream_checkout}: {exc}" ) from exc - # 2. Load and index all eligible upstream entries. + # 2. Load and index all upstream entries. entries_path = profile.upstream_checkout / "data" / "index" / "entries.jsonl" try: all_entries = list(load_entries(entries_path)) @@ -350,7 +417,7 @@ def generate( entry_index: dict[str, UpstreamEntry] = {e.entry_id: e for e in all_entries} # 3. Process each writer. - warnings: list[str] = [] + pending_warnings: list[str] = [] output_paths: list[Path] = [] for writer in profile.writers: @@ -362,18 +429,16 @@ def generate( entry_index=entry_index, upstream_checkout=profile.upstream_checkout, writer_out_dir=writer_out_dir, - profile_raw=profile_raw, upstream_pin_repo=pin.repo, upstream_pin_revision=pin.revision, + config_hash=profile.config_hash, generated_at=generated_at, - warnings=warnings, + pending_warnings=pending_warnings, ) output_paths.append((writer_out_dir / "letter_set.json").resolve()) - if warnings: - import warnings as _warnings - - for msg in warnings: + if pending_warnings: + for msg in pending_warnings: _warnings.warn(msg, GeneratorWarning, stacklevel=2) return output_paths diff --git a/tests/test_cli.py b/tests/test_cli.py index e928672..cefd580 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -12,7 +12,6 @@ from hletterscriptgen import __version__ from hletterscriptgen.cli import ( EXIT_INPUT_ERROR, - EXIT_NOT_IMPLEMENTED, EXIT_OK, EXIT_VALIDATION_FAILED, main, diff --git a/tests/test_extractor.py b/tests/test_extractor.py index 36d4804..b62f384 100644 --- a/tests/test_extractor.py +++ b/tests/test_extractor.py @@ -87,7 +87,7 @@ def test_extract_glyphs_respects_custom_min_dimension(tmp_path: Path) -> None: glyphs_strict = extract_glyphs(scan, min_dimension=20) glyphs_loose = extract_glyphs(scan, min_dimension=8) - assert len(glyphs_strict) == 1 # only the 30×30 blob passes + assert len(glyphs_strict) == 1 # only the 30x30 blob passes assert len(glyphs_loose) == 2 @@ -238,6 +238,6 @@ def test_crop_glyph_raises_on_out_of_bounds(tmp_path: Path) -> None: scan = tmp_path / "scan.png" _save_png(img, scan) - out_of_bounds = Glyph(x=40, y=40, width=20, height=20) # extends past 50×50 + out_of_bounds = Glyph(x=40, y=40, width=20, height=20) # extends past 50x50 with pytest.raises(ExtractionError, match="outside"): crop_glyph(scan, out_of_bounds) diff --git a/tests/test_generate_profile.py b/tests/test_generate_profile.py index 4bc8482..01789bf 100644 --- a/tests/test_generate_profile.py +++ b/tests/test_generate_profile.py @@ -10,11 +10,9 @@ from hletterscriptgen.generate_profile import ( GenerateProfile, GenerateProfileError, - GlyphAnnotation, - ScanAnnotation, - WriterAnnotation, load_generate_profile, ) +from hletterscriptgen.hashing import config_hash FIXTURE_DIR = Path(__file__).resolve().parent / "fixtures" / "generate_profile" @@ -49,7 +47,7 @@ def _write_profile(tmp_path: Path, payload: object) -> Path: def test_load_valid_fixture() -> None: - profile, raw = load_generate_profile(FIXTURE_DIR / "valid_profile.json") + profile = load_generate_profile(FIXTURE_DIR / "valid_profile.json") assert isinstance(profile, GenerateProfile) assert len(profile.writers) == 1 writer = profile.writers[0] @@ -62,13 +60,16 @@ def test_load_valid_fixture() -> None: assert len(scan.glyphs) == 2 assert scan.glyphs[0].letter == "א" assert scan.glyphs[1].letter == "ב" - assert isinstance(raw, dict) + # config_hash must be a 64-char lowercase hex string + assert isinstance(profile.config_hash, str) + assert len(profile.config_hash) == 64 -def test_load_returns_raw_dict(tmp_path: Path) -> None: +def test_profile_has_correct_config_hash(tmp_path: Path) -> None: + """config_hash must equal SHA-256 of the canonical JSON of the raw profile dict.""" p = _write_profile(tmp_path, _MINIMAL_PROFILE) - _, raw = load_generate_profile(p) - assert raw == _MINIMAL_PROFILE + profile = load_generate_profile(p) + assert profile.config_hash == config_hash(_MINIMAL_PROFILE) def test_upstream_checkout_resolved_relative_to_profile(tmp_path: Path) -> None: @@ -76,7 +77,7 @@ def test_upstream_checkout_resolved_relative_to_profile(tmp_path: Path) -> None: sub.mkdir() p = sub / "profile.json" p.write_text(json.dumps({**_MINIMAL_PROFILE, "upstream_checkout": "../data"}), encoding="utf-8") - profile, _ = load_generate_profile(p) + profile = load_generate_profile(p) assert profile.upstream_checkout == (tmp_path / "data").resolve() @@ -85,11 +86,11 @@ def test_glyph_notes_optional(tmp_path: Path) -> None: profile_data = { "upstream_checkout": ".", "writers": [ - {**_MINIMAL_WRITER, "scans": [{"entry_id": "e__s__p0001", "glyphs": [glyph_with_notes]}]}, + {**_MINIMAL_WRITER, "scans": [{"entry_id": "e__s__p0001", "glyphs": [glyph_with_notes]}]}, # noqa: E501 ], } p = _write_profile(tmp_path, profile_data) - profile, _ = load_generate_profile(p) + profile = load_generate_profile(p) assert profile.writers[0].scans[0].glyphs[0].notes == "a test note" @@ -146,7 +147,7 @@ def test_raises_on_empty_writers(tmp_path: Path) -> None: def test_raises_on_duplicate_writer_ids(tmp_path: Path) -> None: - p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [_MINIMAL_WRITER, _MINIMAL_WRITER]}) + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [_MINIMAL_WRITER, _MINIMAL_WRITER]}) # noqa: E501 with pytest.raises(GenerateProfileError, match="duplicate writer_id"): load_generate_profile(p) @@ -180,7 +181,7 @@ def test_raises_on_empty_scans(tmp_path: Path) -> None: def test_raises_on_non_hebrew_letter(tmp_path: Path) -> None: bad_glyph = {**_MINIMAL_GLYPH, "letter": "A"} scan = {"entry_id": "e__s__p0001", "glyphs": [bad_glyph]} - p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) # noqa: E501 with pytest.raises(GenerateProfileError, match="Hebrew character"): load_generate_profile(p) @@ -188,7 +189,7 @@ def test_raises_on_non_hebrew_letter(tmp_path: Path) -> None: def test_raises_on_multi_char_letter(tmp_path: Path) -> None: bad_glyph = {**_MINIMAL_GLYPH, "letter": "אב"} scan = {"entry_id": "e__s__p0001", "glyphs": [bad_glyph]} - p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) # noqa: E501 with pytest.raises(GenerateProfileError, match="Hebrew character"): load_generate_profile(p) @@ -196,7 +197,7 @@ def test_raises_on_multi_char_letter(tmp_path: Path) -> None: def test_raises_on_negative_x(tmp_path: Path) -> None: bad_glyph = {**_MINIMAL_GLYPH, "x": -1} scan = {"entry_id": "e__s__p0001", "glyphs": [bad_glyph]} - p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) # noqa: E501 with pytest.raises(GenerateProfileError, match="≥ 0"): load_generate_profile(p) @@ -204,15 +205,15 @@ def test_raises_on_negative_x(tmp_path: Path) -> None: def test_raises_on_zero_width(tmp_path: Path) -> None: bad_glyph = {**_MINIMAL_GLYPH, "width": 0} scan = {"entry_id": "e__s__p0001", "glyphs": [bad_glyph]} - p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) # noqa: E501 with pytest.raises(GenerateProfileError, match="≥ 1"): load_generate_profile(p) -@pytest.mark.parametrize("letter", ["א", "ב", "ג", "ת", "ך", "ם", "ן", "ף", "ץ"]) +@pytest.mark.parametrize("letter", ["א", "ב", "ג", "ת", "ך", "ם", "ן", "ף", "ץ"]) # noqa: RUF001 def test_accepts_all_hebrew_letter_forms(tmp_path: Path, letter: str) -> None: glyph = {**_MINIMAL_GLYPH, "letter": letter} scan = {"entry_id": "e__s__p0001", "glyphs": [glyph]} - p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) - profile, _ = load_generate_profile(p) + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) # noqa: E501 + profile = load_generate_profile(p) assert profile.writers[0].scans[0].glyphs[0].letter == letter diff --git a/tests/test_generator.py b/tests/test_generator.py index c570704..d3c0e70 100644 --- a/tests/test_generator.py +++ b/tests/test_generator.py @@ -5,6 +5,7 @@ import json import subprocess from pathlib import Path +from unittest.mock import MagicMock, patch import pytest @@ -66,12 +67,12 @@ def _make_upstream_checkout(tmp_path: Path) -> Path: json.dumps(_ELIGIBLE_ENTRY) + "\n", encoding="utf-8" ) - # Synthetic scan PNG: white 100×100 with two black 20×20 blobs + # Synthetic scan PNG: white 100x100 with two black 20x20 blobs scan_dir = repo / "data" / "scans" / "test__writer_a" scan_dir.mkdir(parents=True) img = np.full((100, 100, 3), 255, dtype=np.uint8) - img[5:25, 5:25] = 0 # blob at (5,5) 20×20 - img[5:25, 35:55] = 0 # blob at (35,5) 20×20 + img[5:25, 5:25] = 0 # blob at (5,5) 20x20 + img[5:25, 35:55] = 0 # blob at (35,5) 20x20 cv2.imwrite(str(scan_dir / "p0001.png"), img) _git(repo, "add", ".") @@ -114,8 +115,8 @@ def test_generate_produces_letter_set_json(tmp_path: Path) -> None: profile_path = _make_profile(tmp_path, upstream) output_dir = tmp_path / "out" - profile, raw = load_generate_profile(profile_path) - paths = generate(profile, raw, output_dir, generated_at="2025-01-01T00:00:00+00:00") + profile = load_generate_profile(profile_path) + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") assert len(paths) == 1 letter_set_path = paths[0] @@ -130,8 +131,8 @@ def test_generate_letter_set_validates(tmp_path: Path) -> None: profile_path = _make_profile(tmp_path, upstream) output_dir = tmp_path / "out" - profile, raw = load_generate_profile(profile_path) - paths = generate(profile, raw, output_dir, generated_at="2025-01-01T00:00:00+00:00") + profile = load_generate_profile(profile_path) + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") doc = json.loads(paths[0].read_text(encoding="utf-8")) result = validate_document(doc) @@ -143,8 +144,8 @@ def test_generate_letter_set_content(tmp_path: Path) -> None: profile_path = _make_profile(tmp_path, upstream) output_dir = tmp_path / "out" - profile, raw = load_generate_profile(profile_path) - paths = generate(profile, raw, output_dir, generated_at="2025-01-01T00:00:00+00:00") + profile = load_generate_profile(profile_path) + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") doc = json.loads(paths[0].read_text(encoding="utf-8")) assert doc["schema_version"] == "letter_set.v1" @@ -163,8 +164,8 @@ def test_generate_writes_glyph_pngs(tmp_path: Path) -> None: profile_path = _make_profile(tmp_path, upstream) output_dir = tmp_path / "out" - profile, raw = load_generate_profile(profile_path) - paths = generate(profile, raw, output_dir, generated_at="2025-01-01T00:00:00+00:00") + profile = load_generate_profile(profile_path) + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") doc = json.loads(paths[0].read_text(encoding="utf-8")) writer_dir = paths[0].parent @@ -183,8 +184,8 @@ def test_generate_checksum_matches_png(tmp_path: Path) -> None: profile_path = _make_profile(tmp_path, upstream) output_dir = tmp_path / "out" - profile, raw = load_generate_profile(profile_path) - paths = generate(profile, raw, output_dir, generated_at="2025-01-01T00:00:00+00:00") + profile = load_generate_profile(profile_path) + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") doc = json.loads(paths[0].read_text(encoding="utf-8")) writer_dir = paths[0].parent @@ -203,10 +204,10 @@ def test_generate_is_deterministic(tmp_path: Path) -> None: out1 = tmp_path / "out1" out2 = tmp_path / "out2" - profile, raw = load_generate_profile(profile_path) + profile = load_generate_profile(profile_path) ts = "2025-06-01T12:00:00+00:00" - paths1 = generate(profile, raw, out1, generated_at=ts) - paths2 = generate(profile, raw, out2, generated_at=ts) + paths1 = generate(profile, out1, generated_at=ts) + paths2 = generate(profile, out2, generated_at=ts) doc1 = json.loads(paths1[0].read_text(encoding="utf-8")) doc2 = json.loads(paths2[0].read_text(encoding="utf-8")) @@ -242,9 +243,9 @@ def test_generate_raises_when_no_glyphs_extracted(tmp_path: Path) -> None: p = tmp_path / "profile.json" p.write_text(json.dumps(profile_data), encoding="utf-8") - profile, raw = load_generate_profile(p) + profile = load_generate_profile(p) with pytest.raises(GeneratorError, match="no glyphs"): - generate(profile, raw, tmp_path / "out", generated_at="2025-01-01T00:00:00+00:00") + generate(profile, tmp_path / "out", generated_at="2025-01-01T00:00:00+00:00") # --------------------------------------------------------------------------- @@ -294,3 +295,154 @@ def test_cli_scan_blobs_text_format(tmp_path: Path, capsys: pytest.CaptureFixtur out = capsys.readouterr().out assert "blob" in out assert "detected" in out + + +# --------------------------------------------------------------------------- +# Mock-based tests — document assembly without cv2 I/O +# --------------------------------------------------------------------------- + +# A minimal valid PNG (1x1 white pixel) used as a fake crop result. +_FAKE_PNG = ( + b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01" + b"\x08\x02\x00\x00\x00\x90wS\xde\x00\x00\x00\x0cIDATx\x9cc\xf8\x0f\x00" + b"\x00\x01\x01\x00\x05\x18\xd8N\x00\x00\x00\x00IEND\xaeB`\x82" +) + + +def _make_upstream_checkout_no_cv2(tmp_path: Path) -> Path: + """Create a minimal upstream git repo with a stub scan file (no cv2 needed).""" + repo = tmp_path / "upstream_nodeps" + repo.mkdir() + + _git(repo, "init", "-q", "-b", "main") + _git(repo, "config", "user.email", "test@example.com") + _git(repo, "config", "user.name", "Test") + _git(repo, "config", "commit.gpgsign", "false") + _git(repo, "remote", "add", "origin", "https://github.com/HeOCR/public-domain-hand-written-hebrew-scans.git") + + index_dir = repo / "data" / "index" + index_dir.mkdir(parents=True) + (index_dir / "entries.jsonl").write_text( + json.dumps(_ELIGIBLE_ENTRY) + "\n", encoding="utf-8" + ) + + # Write any bytes — binarize_scan is mocked and won't read this. + scan_dir = repo / "data" / "scans" / "test__writer_a" + scan_dir.mkdir(parents=True) + (scan_dir / "p0001.png").write_bytes(b"STUB") + + _git(repo, "add", ".") + _git(repo, "commit", "-q", "-m", "initial") + return repo + + +def test_generate_document_structure_mocked(tmp_path: Path) -> None: + """Document assembly, validation, and file writing with mocked cv2 calls.""" + upstream = _make_upstream_checkout_no_cv2(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + profile = load_generate_profile(profile_path) + fake_binary = MagicMock() + + with patch("hletterscriptgen.generator.binarize_scan", return_value=fake_binary): + with patch("hletterscriptgen.generator.crop_binary", return_value=_FAKE_PNG): + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + assert len(paths) == 1 + doc = json.loads(paths[0].read_text(encoding="utf-8")) + assert doc["schema_version"] == "letter_set.v1" + assert doc["writer_id"] == "writer_test_a" + assert "א" in doc["letters"] + assert "ב" in doc["letters"] + + +def test_generate_mocked_validates(tmp_path: Path) -> None: + """Generated document must pass schema validation even with mocked crops.""" + from hletterscriptgen.validation import validate_document + + upstream = _make_upstream_checkout_no_cv2(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + profile = load_generate_profile(profile_path) + with patch("hletterscriptgen.generator.binarize_scan", return_value=MagicMock()): + with patch("hletterscriptgen.generator.crop_binary", return_value=_FAKE_PNG): + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + doc = json.loads(paths[0].read_text(encoding="utf-8")) + result = validate_document(doc) + assert result.ok, [i.format() for i in result.issues] + + +def test_generate_mocked_writes_png_assets(tmp_path: Path) -> None: + """PNG assets must exist on disk with correct PNG magic bytes.""" + upstream = _make_upstream_checkout_no_cv2(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + profile = load_generate_profile(profile_path) + with patch("hletterscriptgen.generator.binarize_scan", return_value=MagicMock()): + with patch("hletterscriptgen.generator.crop_binary", return_value=_FAKE_PNG): + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + doc = json.loads(paths[0].read_text(encoding="utf-8")) + writer_dir = paths[0].parent + for variants in doc["letters"].values(): + for variant in variants: + asset = writer_dir / variant["asset_path"] + assert asset.exists() + assert asset.read_bytes()[:4] == b"\x89PNG" + + +def test_generate_mocked_glyph_notes_in_variant(tmp_path: Path) -> None: + """A glyph annotation with notes must propagate the notes to its variant.""" + upstream = _make_upstream_checkout_no_cv2(tmp_path) + profile_data = { + "upstream_checkout": str(upstream), + "writers": [ + { + "writer_id": "writer_test_a", + "attribution_method": "manual_review", + "scans": [ + { + "entry_id": "test__writer_a__p0001", + "glyphs": [ + { + "letter": "א", + "x": 5, "y": 5, "width": 20, "height": 20, + "notes": "this is a test note", + }, + ], + } + ], + } + ], + } + p = tmp_path / "profile.json" + p.write_text(json.dumps(profile_data), encoding="utf-8") + + output_dir = tmp_path / "out" + profile = load_generate_profile(p) + with patch("hletterscriptgen.generator.binarize_scan", return_value=MagicMock()): + with patch("hletterscriptgen.generator.crop_binary", return_value=_FAKE_PNG): + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + doc = json.loads(paths[0].read_text(encoding="utf-8")) + variant = doc["letters"]["א"][0] + assert variant.get("notes") == "this is a test note" + + +def test_generate_mocked_config_hash_in_document(tmp_path: Path) -> None: + """generator.config_hash in the output document must match profile.config_hash.""" + upstream = _make_upstream_checkout_no_cv2(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + profile = load_generate_profile(profile_path) + with patch("hletterscriptgen.generator.binarize_scan", return_value=MagicMock()): + with patch("hletterscriptgen.generator.crop_binary", return_value=_FAKE_PNG): + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + doc = json.loads(paths[0].read_text(encoding="utf-8")) + assert doc["generator"]["config_hash"] == profile.config_hash From b63cfe51817f12b8bb70d69bdc848a7526d48cd7 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Mon, 25 May 2026 00:17:38 +0300 Subject: [PATCH 3/3] docs(agents): add scan-blobs to stable CLI surface list Co-Authored-By: Claude Sonnet 4.6 --- AGENTS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/AGENTS.md b/AGENTS.md index d0c8ba5..bb6207d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -34,7 +34,7 @@ hletterscriptgen validate examples/letter_set/writer_example.json --format json ## Stable public surfaces -- CLI: `hletterscriptgen {version, schema, validate, generate, check-eligible}`. +- CLI: `hletterscriptgen {version, schema, validate, generate, check-eligible, scan-blobs}`. - Output contract: `letter_set.v1` (see `src/hletterscriptgen/schemas/letter_set.schema.json` and `docs/letter_set_v1.md`).