diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e050af9..fa20cd8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -86,7 +86,7 @@ jobs: - name: Install package run: | python -m pip install --upgrade pip setuptools wheel - python -m pip install -e ".[test]" + python -m pip install -e ".[test,cv]" - name: Run tests run: python -m pytest @@ -100,6 +100,16 @@ jobs: - name: CLI smoke — validate example run: hletterscriptgen validate examples/letter_set/writer_example.json --format json + - name: CLI smoke — scan-blobs + run: | + python -c " + import cv2, numpy as np + img = np.full((100, 100, 3), 255, dtype=np.uint8) + img[10:30, 10:30] = 0 + cv2.imwrite('/tmp/smoke_scan.png', img) + " + hletterscriptgen scan-blobs /tmp/smoke_scan.png --format json + package: name: Build distributions runs-on: ubuntu-latest diff --git a/AGENTS.md b/AGENTS.md index d0c8ba5..bb6207d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -34,7 +34,7 @@ hletterscriptgen validate examples/letter_set/writer_example.json --format json ## Stable public surfaces -- CLI: `hletterscriptgen {version, schema, validate, generate, check-eligible}`. +- CLI: `hletterscriptgen {version, schema, validate, generate, check-eligible, scan-blobs}`. - Output contract: `letter_set.v1` (see `src/hletterscriptgen/schemas/letter_set.schema.json` and `docs/letter_set_v1.md`). diff --git a/pyproject.toml b/pyproject.toml index 9a3d934..facc220 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -89,3 +89,7 @@ enable_error_code = ["redundant-expr", "truthy-bool", "ignore-without-code"] [[tool.mypy.overrides]] module = "jsonschema.*" ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "cv2" +ignore_missing_imports = true diff --git a/src/hletterscriptgen/cli.py b/src/hletterscriptgen/cli.py index 9116639..6214da1 100644 --- a/src/hletterscriptgen/cli.py +++ b/src/hletterscriptgen/cli.py @@ -53,9 +53,34 @@ def _build_parser() -> argparse.ArgumentParser: help="Output format (default: text).", ) - sub.add_parser( + generate_p = sub.add_parser( "generate", - help="(Not yet implemented) Generate letter sets from upstream scans.", + help="Generate letter sets from upstream scans using a generation profile.", + ) + generate_p.add_argument( + "--profile", + type=Path, + required=True, + metavar="PROFILE", + help="Path to a generation profile JSON file.", + ) + generate_p.add_argument( + "--output", + type=Path, + required=True, + metavar="DIR", + help="Output directory (created if absent).", + ) + generate_p.add_argument( + "--generated-at", + type=str, + default=None, + metavar="ISO8601", + help=( + "Override the generated_at timestamp in output documents " + "(ISO 8601 format, e.g. '2025-01-01T00:00:00+00:00'). " + "Useful for deterministic / reproducible builds." + ), ) eligible_p = sub.add_parser( @@ -74,6 +99,39 @@ def _build_parser() -> argparse.ArgumentParser: help="Output format (default: text).", ) + scan_blobs_p = sub.add_parser( + "scan-blobs", + help=( + "Detect glyph blobs in a scan image via CCA. " + "Use the output to populate a generation profile." + ), + ) + scan_blobs_p.add_argument( + "image", + type=Path, + help="Path to the scan image (JPEG, PNG, TIFF, …).", + ) + scan_blobs_p.add_argument( + "--min-dim", + type=int, + default=16, + metavar="PX", + help="Minimum blob dimension in pixels (default: 16).", + ) + scan_blobs_p.add_argument( + "--max-area", + type=int, + default=None, + metavar="PX2", + help="Maximum blob area in pixels (default: 10%% of image area).", + ) + scan_blobs_p.add_argument( + "--format", + choices=("text", "json"), + default="json", + help="Output format (default: json).", + ) + return parser @@ -160,13 +218,61 @@ def _cmd_check_eligible(args: argparse.Namespace) -> int: return EXIT_OK if ok else EXIT_VALIDATION_FAILED -def _cmd_generate() -> int: - print( - "generate: not yet implemented in this scaffolding release. " - "See docs/roadmap.md for planned milestones.", - file=sys.stderr, - ) - return EXIT_NOT_IMPLEMENTED +def _cmd_generate(args: argparse.Namespace) -> int: + from hletterscriptgen.generate_profile import GenerateProfileError, load_generate_profile + from hletterscriptgen.generator import GeneratorError, generate + + try: + profile = load_generate_profile(args.profile) + except GenerateProfileError as exc: + print(str(exc), file=sys.stderr) + return EXIT_INPUT_ERROR + + try: + output_paths = generate( + profile, + args.output, + generated_at=args.generated_at, + ) + except GeneratorError as exc: + print(str(exc), file=sys.stderr) + return EXIT_INPUT_ERROR + + for p in output_paths: + print(f"OK {p}") + return EXIT_OK + + +def _cmd_scan_blobs(args: argparse.Namespace) -> int: + from hletterscriptgen.extractor import ExtractionError, extract_glyphs + + try: + glyphs = extract_glyphs( + args.image, + min_dimension=args.min_dim, + max_area=args.max_area, + ) + except ExtractionError as exc: + print(str(exc), file=sys.stderr) + return EXIT_INPUT_ERROR + + if args.format == "json": + payload = { + "image": str(args.image), + "count": len(glyphs), + "blobs": [ + {"x": g.x, "y": g.y, "width": g.width, "height": g.height} + for g in glyphs + ], + } + json.dump(payload, sys.stdout, indent=2) + sys.stdout.write("\n") + else: + for i, g in enumerate(glyphs): + print(f"blob {i:4d}: x={g.x:5d} y={g.y:5d} w={g.width:5d} h={g.height:5d}") + print(f"{len(glyphs)} blob(s) detected in {args.image}") + + return EXIT_OK def main(argv: list[str] | None = None) -> int: @@ -180,8 +286,10 @@ def main(argv: list[str] | None = None) -> int: if args.command == "validate": return _cmd_validate(args) if args.command == "generate": - return _cmd_generate() + return _cmd_generate(args) if args.command == "check-eligible": return _cmd_check_eligible(args) + if args.command == "scan-blobs": + return _cmd_scan_blobs(args) parser.error(f"unknown command: {args.command}") diff --git a/src/hletterscriptgen/extractor.py b/src/hletterscriptgen/extractor.py new file mode 100644 index 0000000..be0c668 --- /dev/null +++ b/src/hletterscriptgen/extractor.py @@ -0,0 +1,297 @@ +"""CCA-based glyph extraction from handwritten Hebrew page scans. + +Implements the Option A segmentation approach chosen in M3 sub-PR 1: +connected-component analysis (CCA) via ``opencv-python-headless`` with +Otsu binarisation. See ``docs/design/segmentation-approach.md`` for +the decision record, algorithm spec, and known failure modes. + +``opencv-python-headless`` is an *optional* dependency (install with +``pip install hletterscriptgen[cv]``). All public functions raise +:class:`ExtractionError` when the library is not available, rather than +an import-time ``ImportError``, so the rest of the package stays +importable on environments without the CV stack. + +The module exposes: + +* :data:`MIN_GLYPH_PX` — minimum bounding-box dimension (px). +* :data:`DEFAULT_MAX_AREA_FRACTION` — default upper-area ceiling as a fraction of page area. +* :class:`Glyph` — frozen dataclass for a detected blob's bounding box. +* :func:`binarize_scan` — load a scan and return its Otsu-binarised array. +* :func:`crop_binary` — crop a glyph from an already-binarised array. +* :func:`extract_glyphs` — detect blobs in a scan via CCA. +* :func:`crop_glyph` — convenience wrapper: binarize a scan and crop one blob. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +# Minimum bounding-box width AND height in pixels, per issue #16 decision D2. +MIN_GLYPH_PX: int = 16 + +# Upper-area ceiling as a fraction of total image area. Blobs that cover +# more than this fraction of the page are assumed to be noise (stamps, +# ruled lines, bleed-through from the verso). Empirical calibration of +# this value is deferred to a later sub-PR; 10 % is a conservative default. +DEFAULT_MAX_AREA_FRACTION: float = 0.10 + + +class ExtractionError(Exception): + """Raised when glyph extraction fails. + + Covers both missing-library errors (opencv not installed) and + runtime errors (image not found, encode failure, out-of-bounds crop). + """ + + +def _require_cv2() -> Any: + """Return the ``cv2`` module, raising :class:`ExtractionError` if not installed.""" + try: + import cv2 + + return cv2 + except ImportError as exc: + raise ExtractionError( + "opencv-python-headless is required for glyph extraction; " + "install it with: pip install hletterscriptgen[cv]" + ) from exc + + +@dataclass(frozen=True) +class Glyph: + """Bounding box of a connected component detected in a scan image. + + Coordinates are in the scan image's pixel space. ``x`` and ``y`` are + the top-left corner of the bounding box; ``width`` and ``height`` are + its extent. All values are non-negative integers; ``width`` and + ``height`` are ≥ 1. + + Glyphs returned by :func:`extract_glyphs` are sorted by ascending ``y`` + first, then descending ``x`` within each row. + """ + + x: int + y: int + width: int + height: int + + +def binarize_scan(image_path: Path) -> Any: + """Load a scan image and return a binarised (Otsu) single-channel array. + + Applies ``THRESH_BINARY_INV | THRESH_OTSU`` so that ink pixels become + foreground (255) and background pixels become 0. The result is a + 2-D uint8 NumPy array with the same spatial dimensions as the source image. + + Calling this once and passing the result to :func:`crop_binary` for each + glyph is more efficient than calling :func:`crop_glyph` per glyph, because + it avoids redundant image I/O and re-binarisation. + + Parameters + ---------- + image_path: + Path to the scan image (JPEG, PNG, TIFF, or any format OpenCV can decode). + + Raises + ------ + ExtractionError + When ``opencv-python-headless`` is not installed or the image cannot + be read. + """ + cv2 = _require_cv2() + img = cv2.imread(str(image_path)) + if img is None: + raise ExtractionError(f"could not load image: {image_path}") + grey = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + _, binary = cv2.threshold(grey, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU) + return binary + + +def crop_binary(binary: Any, glyph: Glyph) -> bytes: + """Crop a glyph region from a binarised array and return PNG bytes. + + Unlike :func:`crop_glyph`, this function operates on an already-binarised + array (the result of :func:`binarize_scan`), avoiding a redundant image + load and Otsu threshold per glyph. Prefer this when cropping multiple + glyphs from the same scan. + + Parameters + ---------- + binary: + A 2-D uint8 NumPy array produced by :func:`binarize_scan` (or + equivalent Otsu binarisation). Shape is ``(height, width)``. + glyph: + Bounding box to crop, in the image's pixel space. + + Returns + ------- + bytes + PNG-encoded bytes of the cropped region. + + Raises + ------ + ExtractionError + When OpenCV is not installed, the glyph bbox falls outside the array + dimensions, or PNG encoding fails. + """ + cv2 = _require_cv2() + img_h, img_w = binary.shape[:2] + x, y, w, h = glyph.x, glyph.y, glyph.width, glyph.height + + if x < 0 or y < 0 or x + w > img_w or y + h > img_h: + raise ExtractionError( + f"glyph bbox (x={x}, y={y}, w={w}, h={h}) falls outside " + f"image dimensions {img_w}x{img_h}" + ) + + crop = binary[y : y + h, x : x + w] + ok, buf = cv2.imencode(".png", crop) + if not ok: + raise ExtractionError("failed to encode glyph crop as PNG") + return bytes(buf.tobytes()) + + +def extract_glyphs( + image_path: Path, + *, + min_dimension: int = MIN_GLYPH_PX, + max_area: int | None = None, +) -> list[Glyph]: + """Detect letter glyphs in a scan via connected-component analysis. + + Algorithm (per ``docs/design/segmentation-approach.md``): + + 1. Load the scan image (BGR). + 2. Convert to greyscale. + 3. Binarise with Otsu's method (``THRESH_BINARY_INV`` — ink pixels + become foreground / 255, background becomes 0). + 4. Run ``connectedComponentsWithStats`` to label foreground blobs. + 8-connectivity is used so that diagonal contacts (common in cursive + Hebrew script) are treated as part of the same component; 4-connectivity + would incorrectly split glyphs at diagonal junctions. + 5. Drop blobs where ``width < min_dimension`` or ``height < min_dimension`` + (quality floor, issue #16 D2). + 6. Drop blobs whose pixel area exceeds ``max_area`` (noise ceiling; + defaults to 10 % of total image area when ``None``). + 7. Sort survivors by ascending ``y`` first, then descending ``x`` within + each row, and return them as :class:`Glyph` records. + + .. note:: + Deskew pre-processing is intentionally omitted from M3 — the + failure-mode analysis rates skew as low-severity for the current + corpus. Add a Hough-line deskew step before binarisation if + empirical scan quality demands it. + + .. note:: + Nikud (diacritical marks) are emitted as separate blobs when they + exceed ``min_dimension``. Merging them with their parent letter + body is out of scope for M3; deferred to M4. + + Parameters + ---------- + image_path: + Path to the scan image (JPEG, PNG, TIFF, or any format OpenCV can + decode). + min_dimension: + Minimum bounding-box width *and* height in pixels. A blob must + satisfy both constraints to survive. Defaults to :data:`MIN_GLYPH_PX`. + max_area: + Maximum bounding-box area in pixels. Blobs with a pixel area + strictly greater than this are dropped as likely noise. When + ``None`` (default), the ceiling is set to 10 % of the image's + total area at runtime. + + Raises + ------ + ExtractionError + When ``opencv-python-headless`` is not installed, the image cannot + be read, or ``min_dimension`` < 1. + """ + if min_dimension < 1: + raise ExtractionError(f"min_dimension must be ≥ 1, got {min_dimension}") + + cv2 = _require_cv2() + + img = cv2.imread(str(image_path)) + if img is None: + raise ExtractionError(f"could not load image: {image_path}") + + img_h, img_w = img.shape[:2] + effective_max_area = ( + max_area if max_area is not None else int(img_h * img_w * DEFAULT_MAX_AREA_FRACTION) + ) + + grey = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + _, binary = cv2.threshold(grey, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU) + + # 8-connectivity: diagonal contacts (common in Hebrew cursive) belong to the + # same component; 4-connectivity would incorrectly split them. + num_labels, _, stats, _ = cv2.connectedComponentsWithStats(binary, connectivity=8) + + glyphs: list[Glyph] = [] + # Label 0 is the background component — start at 1. + for i in range(1, num_labels): + x = int(stats[i, cv2.CC_STAT_LEFT]) + y = int(stats[i, cv2.CC_STAT_TOP]) + w = int(stats[i, cv2.CC_STAT_WIDTH]) + h = int(stats[i, cv2.CC_STAT_HEIGHT]) + area = int(stats[i, cv2.CC_STAT_AREA]) + + if w < min_dimension or h < min_dimension: + continue + if area > effective_max_area: + continue + + glyphs.append(Glyph(x=x, y=y, width=w, height=h)) + + # Sort: ascending y (top-to-bottom rows), then descending x within each row. + glyphs.sort(key=lambda g: (g.y, -g.x)) + return glyphs + + +def crop_glyph(image_path: Path, glyph: Glyph) -> bytes: + """Crop a glyph region from a scan image and return PNG bytes. + + Convenience wrapper around :func:`binarize_scan` and :func:`crop_binary`. + When cropping multiple glyphs from the same scan, prefer calling + :func:`binarize_scan` once and :func:`crop_binary` for each glyph to + avoid re-loading and re-binarising the image on every call. + + The crop is taken from the *binarised* (Otsu) image, not the original + colour scan, so the returned PNG contains only black ink (255) and + white background (0) pixels. + + Parameters + ---------- + image_path: + Path to the source scan image. + glyph: + Bounding box to crop, in the scan's pixel space. + + Returns + ------- + bytes + PNG-encoded bytes of the cropped region. + + Raises + ------ + ExtractionError + When the library is not installed, the image cannot be read, the + glyph bbox falls outside the image boundaries, or PNG encoding + fails. + """ + return crop_binary(binarize_scan(image_path), glyph) + + +__all__ = [ + "DEFAULT_MAX_AREA_FRACTION", + "MIN_GLYPH_PX", + "ExtractionError", + "Glyph", + "binarize_scan", + "crop_binary", + "crop_glyph", + "extract_glyphs", +] diff --git a/src/hletterscriptgen/generate_profile.py b/src/hletterscriptgen/generate_profile.py new file mode 100644 index 0000000..7b5ad60 --- /dev/null +++ b/src/hletterscriptgen/generate_profile.py @@ -0,0 +1,350 @@ +"""Generation profile: human-curated letter annotations for the generate pipeline. + +A generation profile is a JSON config file that tells the generator +*exactly* which glyph bounding boxes to crop from which upstream scans, +and which Hebrew letter each glyph represents. Because all bounding +boxes are explicitly declared by a human, the generator's output is +deterministic: same profile + same upstream revision → bit-identical +letter-set tree. + +Typical workflow: + +1. Run ``hletterscriptgen scan-blobs `` to discover CCA-detected + blobs in a scan. +2. Review the blob list, assign Hebrew letter labels, and record the + chosen bounding boxes in a ``generate_profile.json`` file. +3. Run ``hletterscriptgen generate --profile generate_profile.json + --output ./out`` to produce letter_set.v1 documents. + +Profile JSON shape:: + + { + "upstream_checkout": "../public-domain-hand-written-hebrew-scans", + "writers": [ + { + "writer_id": "writer_bialik", + "attribution_method": "collection_metadata", + "notes": "...", + "scans": [ + { + "entry_id": "commons__bialik_letter_safed_1927__p0001", + "glyphs": [ + {"letter": "א", "x": 10, "y": 20, "width": 30, "height": 40}, + {"letter": "ב", "x": 55, "y": 22, "width": 28, "height": 38} + ] + } + ] + } + ] + } + +The module exposes: + +* :class:`GlyphAnnotation` — a single bbox + letter label. +* :class:`ScanAnnotation` — all annotated glyphs for one upstream scan. +* :class:`WriterAnnotation` — all annotated scans for one writer. +* :class:`GenerateProfile` — top-level config object (includes pre-computed + ``config_hash`` for embedding in output documents). +* :class:`GenerateProfileError` — base error class. +* :func:`load_generate_profile` — read, validate, and return a + :class:`GenerateProfile`; the profile's ``config_hash`` field is computed + from the raw JSON at load time. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from hletterscriptgen import HEBREW_LETTERS +from hletterscriptgen.hashing import ( + config_hash as _compute_config_hash, +) + +# --------------------------------------------------------------------------- +# Errors +# --------------------------------------------------------------------------- + + +class GenerateProfileError(ValueError): + """Raised when a generation profile file is invalid. + + ``path`` refers to the profile JSON file associated with the error. + """ + + def __init__(self, message: str, *, path: Path) -> None: + super().__init__(f"{path}: {message}") + self.path = path + + +# --------------------------------------------------------------------------- +# Data classes +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class GlyphAnnotation: + """A manually annotated glyph: one Hebrew letter at a specific bbox. + + ``letter`` must be a single character from the Hebrew block + (U+05D0..U+05EA, base and final forms). ``x``, ``y`` are the + top-left corner of the bounding box in the scan's pixel space; + ``width`` and ``height`` are its extent. ``notes`` is optional. + """ + + letter: str # single Hebrew character + x: int # left edge of bbox (px, ≥ 0) + y: int # top edge of bbox (px, ≥ 0) + width: int # bbox width (px, ≥ 1) + height: int # bbox height (px, ≥ 1) + notes: str | None = None + + +@dataclass(frozen=True) +class ScanAnnotation: + """All annotated glyphs for one upstream scan entry. + + ``entry_id`` identifies the upstream ``entries.jsonl`` record. + ``glyphs`` must be non-empty. + """ + + entry_id: str + glyphs: tuple[GlyphAnnotation, ...] + + +@dataclass(frozen=True) +class WriterAnnotation: + """All annotated scans for one writer. + + ``writer_id`` is a stable, repo-unique identifier. + ``attribution_method`` is the same vocabulary as + :class:`hletterscriptgen.attribution.AttributionMethod`. + ``scans`` must be non-empty. + """ + + writer_id: str + attribution_method: str + scans: tuple[ScanAnnotation, ...] + notes: str | None = None + + +@dataclass(frozen=True) +class GenerateProfile: + """Top-level generation profile config. + + ``upstream_checkout`` is the path to the local upstream repo checkout + (used for both pinning the revision and resolving scan file paths). + ``writers`` is a non-empty tuple of :class:`WriterAnnotation` records. + ``config_hash`` is the SHA-256 hex digest of the canonical-JSON serialisation + of the raw profile dict, computed at load time by :func:`load_generate_profile`. + It is embedded in the ``generator.config_hash`` field of output documents so + that the profile version that produced a dataset can be reconstructed. + """ + + upstream_checkout: Path + writers: tuple[WriterAnnotation, ...] + config_hash: str + + +# --------------------------------------------------------------------------- +# Internal parsing helpers +# --------------------------------------------------------------------------- + + +def _require_str(raw: dict[str, Any], key: str, context: str, *, path: Path) -> str: + val = raw.get(key) + if not isinstance(val, str): + raise GenerateProfileError( + f"{context}: '{key}' must be a non-null string, " + f"got {type(val).__name__ if val is not None else 'null'}", + path=path, + ) + return val + + +def _require_int_ge( + raw: dict[str, Any], key: str, minimum: int, context: str, *, path: Path +) -> int: + val = raw.get(key) + if not isinstance(val, int) or isinstance(val, bool): + raise GenerateProfileError( + f"{context}: '{key}' must be an integer, " + f"got {type(val).__name__ if val is not None else 'null'}", + path=path, + ) + if val < minimum: + raise GenerateProfileError( + f"{context}: '{key}' must be ≥ {minimum}, got {val}", + path=path, + ) + return val + + +def _parse_glyph(raw: Any, index: int, scan_ctx: str, *, path: Path) -> GlyphAnnotation: + ctx = f"{scan_ctx}/glyphs[{index}]" + if not isinstance(raw, dict): + raise GenerateProfileError(f"{ctx}: expected a JSON object", path=path) + + letter = _require_str(raw, "letter", ctx, path=path) + if len(letter) != 1 or letter not in HEBREW_LETTERS: + raise GenerateProfileError( + f"{ctx}: 'letter' must be a single Hebrew character " + f"(U+05D0..U+05EA), got {letter!r}", + path=path, + ) + + x = _require_int_ge(raw, "x", 0, ctx, path=path) + y = _require_int_ge(raw, "y", 0, ctx, path=path) + width = _require_int_ge(raw, "width", 1, ctx, path=path) + height = _require_int_ge(raw, "height", 1, ctx, path=path) + + notes = raw.get("notes") + if notes is not None and not isinstance(notes, str): + raise GenerateProfileError(f"{ctx}: 'notes' must be a string if present", path=path) + + return GlyphAnnotation(letter=letter, x=x, y=y, width=width, height=height, notes=notes) + + +def _parse_scan(raw: Any, index: int, writer_ctx: str, *, path: Path) -> ScanAnnotation: + ctx = f"{writer_ctx}/scans[{index}]" + if not isinstance(raw, dict): + raise GenerateProfileError(f"{ctx}: expected a JSON object", path=path) + + entry_id = _require_str(raw, "entry_id", ctx, path=path) + if not entry_id.strip(): + raise GenerateProfileError(f"{ctx}: 'entry_id' must not be blank", path=path) + + raw_glyphs = raw.get("glyphs") + if not isinstance(raw_glyphs, list) or not raw_glyphs: + raise GenerateProfileError( + f"{ctx}: 'glyphs' must be a non-empty list", path=path + ) + + glyphs = tuple( + _parse_glyph(g, i, f"{ctx}", path=path) for i, g in enumerate(raw_glyphs) + ) + return ScanAnnotation(entry_id=entry_id, glyphs=glyphs) + + +def _parse_writer(raw: Any, index: int, *, path: Path) -> WriterAnnotation: + ctx = f"writers[{index}]" + if not isinstance(raw, dict): + raise GenerateProfileError(f"{ctx}: expected a JSON object", path=path) + + writer_id = _require_str(raw, "writer_id", ctx, path=path) + if not writer_id.strip(): + raise GenerateProfileError(f"{ctx}: 'writer_id' must not be blank", path=path) + + attribution_method = _require_str(raw, "attribution_method", ctx, path=path) + if not attribution_method.strip(): + raise GenerateProfileError( + f"{ctx}: 'attribution_method' must not be blank", path=path + ) + + raw_scans = raw.get("scans") + if not isinstance(raw_scans, list) or not raw_scans: + raise GenerateProfileError( + f"{ctx}: 'scans' must be a non-empty list", path=path + ) + + scans = tuple( + _parse_scan(s, i, f"{ctx}", path=path) for i, s in enumerate(raw_scans) + ) + + notes = raw.get("notes") + if notes is not None and not isinstance(notes, str): + raise GenerateProfileError( + f"{ctx}: 'notes' must be a string if present", path=path + ) + + return WriterAnnotation( + writer_id=writer_id, + attribution_method=attribution_method, + scans=scans, + notes=notes, + ) + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def load_generate_profile(path: Path) -> GenerateProfile: + """Read and validate a generation profile JSON file. + + Returns a :class:`GenerateProfile` whose ``config_hash`` field contains + the SHA-256 hex digest of the canonical-JSON serialisation of the raw + profile dict. That hash is embedded in the ``generator.config_hash`` + field of output ``letter_set.v1`` documents so that the profile version + that produced a dataset can be reconstructed. + + Raises :class:`GenerateProfileError` when the file is missing, not + valid JSON, or structurally invalid. Raises :class:`OSError` for + other I/O failures. + """ + try: + raw_text = path.read_text(encoding="utf-8") + except FileNotFoundError as exc: + raise GenerateProfileError( + f"generation profile file not found: {path}", path=path + ) from exc + + try: + raw: dict[str, Any] = json.loads(raw_text) + except json.JSONDecodeError as exc: + raise GenerateProfileError(f"invalid JSON: {exc.msg}", path=path) from exc + + if not isinstance(raw, dict): + raise GenerateProfileError("expected a JSON object at top level", path=path) + + raw_checkout = raw.get("upstream_checkout") + if not isinstance(raw_checkout, str) or not raw_checkout.strip(): + raise GenerateProfileError( + "'upstream_checkout' must be a non-empty string", path=path + ) + + raw_writers = raw.get("writers") + if not isinstance(raw_writers, list) or not raw_writers: + raise GenerateProfileError("'writers' must be a non-empty list", path=path) + + writers = tuple(_parse_writer(w, i, path=path) for i, w in enumerate(raw_writers)) + + # Enforce uniqueness of writer_ids and entry_ids across all writers. + seen_writer_ids: set[str] = set() + seen_entry_ids: dict[str, str] = {} # entry_id → writer_id + for wa in writers: + if wa.writer_id in seen_writer_ids: + raise GenerateProfileError( + f"duplicate writer_id {wa.writer_id!r}", path=path + ) + seen_writer_ids.add(wa.writer_id) + for sa in wa.scans: + if sa.entry_id in seen_entry_ids: + raise GenerateProfileError( + f"entry_id {sa.entry_id!r} appears under both " + f"{seen_entry_ids[sa.entry_id]!r} and {wa.writer_id!r}", + path=path, + ) + seen_entry_ids[sa.entry_id] = wa.writer_id + + # Resolve upstream_checkout relative to the profile file's parent dir. + checkout_path = (path.parent / raw_checkout).resolve() + + return GenerateProfile( + upstream_checkout=checkout_path, + writers=writers, + config_hash=_compute_config_hash(raw), + ) + + +__all__ = [ + "GenerateProfile", + "GenerateProfileError", + "GlyphAnnotation", + "ScanAnnotation", + "WriterAnnotation", + "load_generate_profile", +] diff --git a/src/hletterscriptgen/generator.py b/src/hletterscriptgen/generator.py new file mode 100644 index 0000000..2aa46bc --- /dev/null +++ b/src/hletterscriptgen/generator.py @@ -0,0 +1,451 @@ +"""End-to-end glyph extraction pipeline (M3 MVP). + +Orchestrates the full generate flow: + +1. Pin the upstream checkout revision. +2. Load and index all eligible upstream entries. +3. For each writer → each scan → each annotated glyph: + a. Look up the upstream entry; skip ineligible entries (warn). + b. Resolve the scan file path from the upstream checkout. + c. Binarise the scan once, then crop each glyph from the binary array. + d. Write the PNG to the output tree. + e. Record a ``variant`` for the letter_set.v1 document. +4. Build and validate the ``letter_set.v1`` document for each writer. +5. Write ``letter_set.json`` to the writer's output directory. + +Output tree structure:: + + / + / + letter_set.json + glyphs/ + / + @@___.png + +All paths in ``letter_set.json`` are POSIX-relative to the writer's +output directory. The ``variant_id`` is derived deterministically from +the entry_id, letter, and bounding box. + +Determinism +----------- +Given the same generation profile, the same upstream revision, and the +same version of ``hletterscriptgen``, the output tree is bit-identical. +The ``generated_at`` field is the one exception — callers should inject a +fixed timestamp for reproducible builds (see ``--generated-at`` on the +``generate`` CLI). +""" + +from __future__ import annotations + +import hashlib +import json +import warnings as _warnings +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +from hletterscriptgen import HEBREW_LETTERS, __version__ +from hletterscriptgen.extractor import ExtractionError, Glyph, binarize_scan, crop_binary +from hletterscriptgen.generate_profile import ( + GenerateProfile, + GlyphAnnotation, + WriterAnnotation, +) +from hletterscriptgen.upstream import ( + UpstreamEntry, + UpstreamError, + is_eligible, + load_entries, + upstream_pin_from_checkout, +) +from hletterscriptgen.validation import validate_document + + +class GeneratorError(Exception): + """Raised when the generator encounters a fatal error.""" + + +class GeneratorWarning(UserWarning): + """Issued for non-fatal conditions (skipped entries, missing files).""" + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _variant_id(entry_id: str, letter: str, glyph: GlyphAnnotation) -> str: + """Deterministic variant identifier derived from the bbox and letter. + + Uses ``@`` as a separator between the entry_id, letter, and coordinates + so the boundary is unambiguous even when entry_ids themselves contain + double-underscores. + """ + return f"{entry_id}@{letter}@{glyph.x}_{glyph.y}_{glyph.width}_{glyph.height}" + + +def _asset_path(entry_id: str, letter: str, glyph: GlyphAnnotation) -> str: + """POSIX-relative asset path within the writer's output directory.""" + fname = f"{entry_id}@{letter}@{glyph.x}_{glyph.y}_{glyph.width}_{glyph.height}.png" + return f"glyphs/{letter}/{fname}" + + +def _sha256_hex(data: bytes) -> str: + return hashlib.sha256(data).hexdigest() + + +def _resolve_scan_path( + entry: UpstreamEntry, + upstream_checkout: Path, +) -> Path | None: + """Return the absolute path of the 'original' scan file, or None. + + Probes the upstream entry's ``files`` list for a file with + ``role == 'original'``. Checks ALTO / hOCR sidecar slots first + (per the deferred open question in sub-PR 1); falls back to + ``local_path`` on the original file. + + Returns ``None`` when no usable file path can be resolved. + """ + # Probe for annotation sidecars first (ALTO / hOCR). The upstream + # schema defines ``transcription.alto_path`` and ``hocr_path`` slots; + # all values are currently null in the corpus, but this probe future- + # proofs the extractor so it can graduate to Option B/C without a + # code change once sidecars are populated. + # NOTE: UpstreamEntry models only the ``files[]`` array; sidecar paths + # live under ``transcription`` which is not yet in the modelled subset. + # The probe below is a no-op until the upstream model is extended. + # TODO: extend UpstreamEntry to model transcription.alto_path and + # transcription.hocr_path, then probe those here before falling back. + + # Fall back to CCA: use the 'original' file's local_path. + for f in entry.files: + if f.role == "original" and f.local_path is not None: + return (upstream_checkout / f.local_path).resolve() + return None + + +def _extract_variants( + writer: WriterAnnotation, + entry_index: dict[str, UpstreamEntry], + upstream_checkout: Path, + writer_out_dir: Path, + generated_at: str, + pending_warnings: list[str], +) -> tuple[dict[str, list[dict[str, Any]]], set[str], set[str]]: + """Crop all glyph variants for one writer and write PNG assets to disk. + + Returns ``(letters_map, observed_licenses, used_entry_ids)``. + Non-fatal issues (missing entries, ineligible scans, crop failures) are + appended to ``pending_warnings``. + + The scan image is binarised once per scan file; all glyph crops for that + scan share the same binary array, avoiding redundant I/O. + """ + letters_map: dict[str, list[dict[str, Any]]] = {} + observed_licenses: set[str] = set() + used_entry_ids: set[str] = set() + + for scan in writer.scans: + entry = entry_index.get(scan.entry_id) + if entry is None: + pending_warnings.append( + f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r} " + "not found in upstream entries — skipped" + ) + continue + + if not is_eligible(entry): + pending_warnings.append( + f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r} " + "is not eligible — skipped" + ) + continue + + scan_path = _resolve_scan_path(entry, upstream_checkout) + if scan_path is None: + pending_warnings.append( + f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r} " + "has no resolvable scan file — skipped" + ) + continue + + if not scan_path.is_file(): + pending_warnings.append( + f"writer {writer.writer_id!r}: scan file not found at " + f"{scan_path} — skipped" + ) + continue + + license_expr = entry.rights.license_expression + if license_expr is None: + pending_warnings.append( + f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r} " + "has no license_expression — skipped" + ) + continue + + # Binarise the scan once; reuse the binary array for all glyphs in + # this scan to avoid re-loading and re-thresholding per glyph. + try: + binary = binarize_scan(scan_path) + except ExtractionError as exc: + pending_warnings.append( + f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r}: " + f"could not binarize scan ({exc}) — skipped" + ) + continue + + used_entry_ids.add(scan.entry_id) + + for glyph_ann in scan.glyphs: + if glyph_ann.letter not in HEBREW_LETTERS: + pending_warnings.append( + f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r}: " + f"glyph letter {glyph_ann.letter!r} not a recognised " + "Hebrew character — skipped" + ) + continue + + glyph = Glyph( + x=glyph_ann.x, + y=glyph_ann.y, + width=glyph_ann.width, + height=glyph_ann.height, + ) + try: + png_bytes = crop_binary(binary, glyph) + except ExtractionError as exc: + pending_warnings.append( + f"writer {writer.writer_id!r}: entry_id {scan.entry_id!r}: " + f"crop failed ({exc}) — skipped" + ) + continue + + rel_path = _asset_path(scan.entry_id, glyph_ann.letter, glyph_ann) + out_file = writer_out_dir / rel_path + out_file.parent.mkdir(parents=True, exist_ok=True) + out_file.write_bytes(png_bytes) + + variant: dict[str, Any] = { + "variant_id": _variant_id(scan.entry_id, glyph_ann.letter, glyph_ann), + "asset_path": rel_path, + "checksum_sha256": _sha256_hex(png_bytes), + "image": { + "width_px": glyph_ann.width, + "height_px": glyph_ann.height, + "format": "png", + }, + "source": { + "scan_entry_id": scan.entry_id, + "license": license_expr, + "bbox_in_source": { + "x": glyph_ann.x, + "y": glyph_ann.y, + "width": glyph_ann.width, + "height": glyph_ann.height, + }, + }, + "extracted_at": generated_at, + } + if glyph_ann.notes is not None: + variant["notes"] = glyph_ann.notes + + letters_map.setdefault(glyph_ann.letter, []).append(variant) + observed_licenses.add(license_expr) + + return letters_map, observed_licenses, used_entry_ids + + +def _build_document( + writer: WriterAnnotation, + letters_map: dict[str, list[dict[str, Any]]], + observed_licenses: set[str], + used_entry_ids: set[str], + upstream_pin_repo: str, + upstream_pin_revision: str, + config_hash: str, + generated_at: str, +) -> dict[str, Any]: + """Assemble and return the letter_set.v1 document dict for one writer.""" + return { + "schema_version": "letter_set.v1", + "writer_id": writer.writer_id, + "writer_provenance": { + "source_repo": upstream_pin_repo, + "source_entry_ids": sorted(used_entry_ids), + "attribution_method": writer.attribution_method, + **({"notes": writer.notes} if writer.notes else {}), + }, + "generator": { + "name": "hletterscriptgen", + "version": __version__, + "config_hash": config_hash, + }, + "generated_at": generated_at, + "upstream": { + "repo": upstream_pin_repo, + "revision": upstream_pin_revision, + }, + "letters": letters_map, + "license_summary": { + "licenses": sorted(observed_licenses), + }, + } + + +def _process_writer( + writer: WriterAnnotation, + entry_index: dict[str, UpstreamEntry], + upstream_checkout: Path, + writer_out_dir: Path, + upstream_pin_repo: str, + upstream_pin_revision: str, + config_hash: str, + generated_at: str, + pending_warnings: list[str], +) -> dict[str, Any]: + """Build and write the letter_set.v1 document for one writer. + + Returns the letter_set.v1 document dict (already written to disk). + Appends non-fatal issues to ``pending_warnings``. + Raises :class:`GeneratorError` on fatal conditions. + """ + letters_map, observed_licenses, used_entry_ids = _extract_variants( + writer=writer, + entry_index=entry_index, + upstream_checkout=upstream_checkout, + writer_out_dir=writer_out_dir, + generated_at=generated_at, + pending_warnings=pending_warnings, + ) + + if not letters_map: + raise GeneratorError( + f"writer {writer.writer_id!r}: no glyphs were successfully extracted; " + "check warnings for details" + ) + + document = _build_document( + writer=writer, + letters_map=letters_map, + observed_licenses=observed_licenses, + used_entry_ids=used_entry_ids, + upstream_pin_repo=upstream_pin_repo, + upstream_pin_revision=upstream_pin_revision, + config_hash=config_hash, + generated_at=generated_at, + ) + + result = validate_document(document) + if not result.ok: + issues = "; ".join(i.format() for i in result.issues) + raise GeneratorError( + f"writer {writer.writer_id!r}: generated document failed " + f"validation: {issues}" + ) + + letter_set_path = writer_out_dir / "letter_set.json" + letter_set_path.write_text( + json.dumps(document, ensure_ascii=False, indent=2, sort_keys=True) + "\n", + encoding="utf-8", + ) + return document + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def generate( + profile: GenerateProfile, + output_dir: Path, + *, + generated_at: str | None = None, +) -> list[Path]: + """Run the full generation pipeline and return paths to letter_set.json files. + + Parameters + ---------- + profile: + Parsed generation profile (from + :func:`~hletterscriptgen.generate_profile.load_generate_profile`). + The profile's ``config_hash`` field is embedded in output documents. + output_dir: + Root output directory. Created if it does not exist. + generated_at: + ISO 8601 timestamp string to embed in the output documents. + When ``None`` (default) the current UTC time is used (second + precision, no microseconds). Pass a fixed value for reproducible + builds (see ``--generated-at`` on the ``generate`` CLI). + + Returns + ------- + list[Path] + Absolute paths to each ``letter_set.json`` written, one per writer. + + Raises + ------ + GeneratorError + When the upstream checkout cannot be pinned, no eligible entries + are found, or a writer's document fails validation. + UpstreamError + When the upstream entries file cannot be loaded. + """ + if generated_at is None: + generated_at = datetime.now(tz=UTC).replace(microsecond=0).isoformat() + + output_dir.mkdir(parents=True, exist_ok=True) + + # 1. Pin the upstream checkout. + try: + pin = upstream_pin_from_checkout(profile.upstream_checkout) + except (UpstreamError, OSError) as exc: + raise GeneratorError( + f"could not pin upstream checkout at " + f"{profile.upstream_checkout}: {exc}" + ) from exc + + # 2. Load and index all upstream entries. + entries_path = profile.upstream_checkout / "data" / "index" / "entries.jsonl" + try: + all_entries = list(load_entries(entries_path)) + except UpstreamError as exc: + raise GeneratorError(f"could not load upstream entries: {exc}") from exc + + entry_index: dict[str, UpstreamEntry] = {e.entry_id: e for e in all_entries} + + # 3. Process each writer. + pending_warnings: list[str] = [] + output_paths: list[Path] = [] + + for writer in profile.writers: + writer_out_dir = output_dir / writer.writer_id + writer_out_dir.mkdir(parents=True, exist_ok=True) + + _process_writer( + writer=writer, + entry_index=entry_index, + upstream_checkout=profile.upstream_checkout, + writer_out_dir=writer_out_dir, + upstream_pin_repo=pin.repo, + upstream_pin_revision=pin.revision, + config_hash=profile.config_hash, + generated_at=generated_at, + pending_warnings=pending_warnings, + ) + output_paths.append((writer_out_dir / "letter_set.json").resolve()) + + if pending_warnings: + for msg in pending_warnings: + _warnings.warn(msg, GeneratorWarning, stacklevel=2) + + return output_paths + + +__all__ = [ + "GeneratorError", + "GeneratorWarning", + "generate", +] diff --git a/tests/fixtures/generate_profile/valid_profile.json b/tests/fixtures/generate_profile/valid_profile.json new file mode 100644 index 0000000..6598343 --- /dev/null +++ b/tests/fixtures/generate_profile/valid_profile.json @@ -0,0 +1,19 @@ +{ + "upstream_checkout": "../upstream", + "writers": [ + { + "writer_id": "writer_fixture_a", + "attribution_method": "manual_review", + "notes": "Fixture writer for tests", + "scans": [ + { + "entry_id": "fixture__eligible_pdm__p0001", + "glyphs": [ + {"letter": "א", "x": 5, "y": 5, "width": 20, "height": 20}, + {"letter": "ב", "x": 35, "y": 5, "width": 20, "height": 20} + ] + } + ] + } + ] +} diff --git a/tests/test_cli.py b/tests/test_cli.py index b854a0b..cefd580 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -12,7 +12,6 @@ from hletterscriptgen import __version__ from hletterscriptgen.cli import ( EXIT_INPUT_ERROR, - EXIT_NOT_IMPLEMENTED, EXIT_OK, EXIT_VALIDATION_FAILED, main, @@ -61,13 +60,13 @@ def test_validate_subcommand_fails_on_bad_doc( assert all("kind" in err for err in payload["errors"]) -def test_generate_subcommand_exits_not_implemented( +def test_generate_subcommand_requires_profile_and_output( capsys: pytest.CaptureFixture[str], ) -> None: - code = main(["generate"]) - assert code == EXIT_NOT_IMPLEMENTED - assert EXIT_NOT_IMPLEMENTED == 69 - assert "not yet implemented" in capsys.readouterr().err + # --profile and --output are now required; omitting them is a usage error. + with pytest.raises(SystemExit) as exc_info: + main(["generate"]) + assert exc_info.value.code == 2 # argparse usage error def test_check_eligible_text_mixed(capsys: pytest.CaptureFixture[str]) -> None: diff --git a/tests/test_extractor.py b/tests/test_extractor.py new file mode 100644 index 0000000..b62f384 --- /dev/null +++ b/tests/test_extractor.py @@ -0,0 +1,243 @@ +"""Tests for the CCA glyph extractor.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +cv2 = pytest.importorskip("cv2") +import numpy as np # noqa: E402 — after importorskip so this only runs with cv2 + +from hletterscriptgen.extractor import ( # noqa: E402 + MIN_GLYPH_PX, + ExtractionError, + Glyph, + crop_glyph, + extract_glyphs, +) + +# --------------------------------------------------------------------------- +# Synthetic image helpers +# --------------------------------------------------------------------------- + + +def _white_image(width: int, height: int) -> np.ndarray: # type: ignore[type-arg] + """Return a white (255) BGR image of the given dimensions.""" + return np.full((height, width, 3), 255, dtype=np.uint8) + + +def _draw_black_rect( + img: np.ndarray, # type: ignore[type-arg] + x: int, + y: int, + w: int, + h: int, +) -> None: + """Draw a solid black rectangle on img (in-place).""" + img[y : y + h, x : x + w] = 0 + + +def _save_png(img: np.ndarray, path: Path) -> None: # type: ignore[type-arg] + cv2.imwrite(str(path), img) + + +# --------------------------------------------------------------------------- +# extract_glyphs +# --------------------------------------------------------------------------- + + +def test_extract_glyphs_detects_blobs(tmp_path: Path) -> None: + """Two well-separated black rectangles on a white background.""" + img = _white_image(200, 100) + _draw_black_rect(img, x=10, y=10, w=30, h=30) # left blob + _draw_black_rect(img, x=100, y=10, w=30, h=30) # right blob + scan = tmp_path / "scan.png" + _save_png(img, scan) + + glyphs = extract_glyphs(scan) + + assert len(glyphs) == 2 + # Sorted right-to-left (larger x first) within the same y-row. + assert glyphs[0].x > glyphs[1].x + + +def test_extract_glyphs_filters_small_blobs(tmp_path: Path) -> None: + """Blobs below MIN_GLYPH_PX in either dimension should be dropped.""" + img = _white_image(200, 100) + _draw_black_rect(img, x=10, y=10, w=MIN_GLYPH_PX, h=MIN_GLYPH_PX) # exactly at floor + _draw_black_rect(img, x=60, y=10, w=MIN_GLYPH_PX - 1, h=MIN_GLYPH_PX) # too narrow + _draw_black_rect(img, x=110, y=10, w=MIN_GLYPH_PX, h=MIN_GLYPH_PX - 1) # too short + scan = tmp_path / "scan.png" + _save_png(img, scan) + + glyphs = extract_glyphs(scan) + + assert len(glyphs) == 1 + assert glyphs[0].x == 10 + + +def test_extract_glyphs_respects_custom_min_dimension(tmp_path: Path) -> None: + img = _white_image(200, 100) + _draw_black_rect(img, x=10, y=10, w=10, h=10) + _draw_black_rect(img, x=60, y=10, w=30, h=30) + scan = tmp_path / "scan.png" + _save_png(img, scan) + + glyphs_strict = extract_glyphs(scan, min_dimension=20) + glyphs_loose = extract_glyphs(scan, min_dimension=8) + + assert len(glyphs_strict) == 1 # only the 30x30 blob passes + assert len(glyphs_loose) == 2 + + +def test_extract_glyphs_filters_large_blobs(tmp_path: Path) -> None: + """Blobs exceeding max_area should be dropped.""" + img = _white_image(200, 100) + _draw_black_rect(img, x=5, y=5, w=30, h=30) # 900 px² — below ceiling + _draw_black_rect(img, x=100, y=5, w=60, h=60) # 3600 px² — above ceiling + scan = tmp_path / "scan.png" + _save_png(img, scan) + + glyphs = extract_glyphs(scan, max_area=1000) + + assert len(glyphs) == 1 + assert glyphs[0].x == 5 + + +def test_extract_glyphs_default_max_area_drops_page_blobs(tmp_path: Path) -> None: + """A blob covering >10 % of the page should be dropped by default.""" + img = _white_image(100, 100) # 10 000 px total + _draw_black_rect(img, x=0, y=0, w=100, h=100) # fills entire page + scan = tmp_path / "scan.png" + _save_png(img, scan) + + # The single blob covers 100 % of the page, so it should be dropped. + glyphs = extract_glyphs(scan) + assert glyphs == [] + + +def test_extract_glyphs_empty_image(tmp_path: Path) -> None: + """An all-white image should yield no blobs.""" + img = _white_image(100, 100) + scan = tmp_path / "scan.png" + _save_png(img, scan) + + assert extract_glyphs(scan) == [] + + +def test_extract_glyphs_raises_on_missing_image(tmp_path: Path) -> None: + with pytest.raises(ExtractionError, match="could not load image"): + extract_glyphs(tmp_path / "nonexistent.png") + + +def test_extract_glyphs_raises_on_bad_min_dimension(tmp_path: Path) -> None: + img = _white_image(50, 50) + scan = tmp_path / "scan.png" + _save_png(img, scan) + with pytest.raises(ExtractionError, match="min_dimension"): + extract_glyphs(scan, min_dimension=0) + + +def test_extract_glyphs_sorting_is_hebrew_order(tmp_path: Path) -> None: + """Hebrew reading order: top-to-bottom rows, right-to-left within rows.""" + img = _white_image(300, 200) + # Row 1 (y=10): three blobs, right-to-left order expected + _draw_black_rect(img, x=200, y=10, w=20, h=20) # rightmost + _draw_black_rect(img, x=100, y=10, w=20, h=20) # middle + _draw_black_rect(img, x=10, y=10, w=20, h=20) # leftmost + # Row 2 (y=100): one blob + _draw_black_rect(img, x=150, y=100, w=20, h=20) + scan = tmp_path / "scan.png" + _save_png(img, scan) + + glyphs = extract_glyphs(scan) + + assert len(glyphs) == 4 + # First three from row 1, right-to-left + assert glyphs[0].x == 200 + assert glyphs[1].x == 100 + assert glyphs[2].x == 10 + # Last blob from row 2 + assert glyphs[3].y >= 100 + + +# --------------------------------------------------------------------------- +# crop_glyph +# --------------------------------------------------------------------------- + + +def test_crop_glyph_returns_png_bytes(tmp_path: Path) -> None: + img = _white_image(100, 100) + _draw_black_rect(img, x=10, y=10, w=20, h=20) + scan = tmp_path / "scan.png" + _save_png(img, scan) + + glyph = Glyph(x=10, y=10, width=20, height=20) + png_bytes = crop_glyph(scan, glyph) + + assert isinstance(png_bytes, bytes) + assert len(png_bytes) > 0 + # PNG magic bytes + assert png_bytes[:4] == b"\x89PNG" + + +def test_crop_glyph_dimensions_match_bbox(tmp_path: Path) -> None: + img = _white_image(100, 100) + _draw_black_rect(img, x=5, y=5, w=25, h=35) + scan = tmp_path / "scan.png" + _save_png(img, scan) + + glyph = Glyph(x=5, y=5, width=25, height=35) + png_bytes = crop_glyph(scan, glyph) + + # Decode the PNG and check dimensions + buf = np.frombuffer(png_bytes, dtype=np.uint8) + decoded = cv2.imdecode(buf, cv2.IMREAD_GRAYSCALE) + assert decoded is not None + assert decoded.shape == (35, 25) # (height, width) + + +def test_crop_glyph_is_binarised(tmp_path: Path) -> None: + """The cropped image should contain only 0 and 255 pixel values.""" + img = _white_image(100, 100) + # Draw a grey rectangle (not pure black) — after Otsu binarisation it becomes black + img[20:40, 20:40] = 80 + scan = tmp_path / "scan.png" + _save_png(img, scan) + + glyph = Glyph(x=20, y=20, width=20, height=20) + png_bytes = crop_glyph(scan, glyph) + + buf = np.frombuffer(png_bytes, dtype=np.uint8) + decoded = cv2.imdecode(buf, cv2.IMREAD_GRAYSCALE) + assert decoded is not None + unique_vals = set(decoded.flatten().tolist()) + assert unique_vals <= {0, 255} + + +def test_crop_glyph_is_deterministic(tmp_path: Path) -> None: + """Calling crop_glyph twice on the same input returns identical bytes.""" + img = _white_image(100, 100) + _draw_black_rect(img, x=10, y=10, w=20, h=20) + scan = tmp_path / "scan.png" + _save_png(img, scan) + + glyph = Glyph(x=10, y=10, width=20, height=20) + assert crop_glyph(scan, glyph) == crop_glyph(scan, glyph) + + +def test_crop_glyph_raises_on_missing_image(tmp_path: Path) -> None: + glyph = Glyph(x=0, y=0, width=10, height=10) + with pytest.raises(ExtractionError, match="could not load image"): + crop_glyph(tmp_path / "nonexistent.png", glyph) + + +def test_crop_glyph_raises_on_out_of_bounds(tmp_path: Path) -> None: + img = _white_image(50, 50) + scan = tmp_path / "scan.png" + _save_png(img, scan) + + out_of_bounds = Glyph(x=40, y=40, width=20, height=20) # extends past 50x50 + with pytest.raises(ExtractionError, match="outside"): + crop_glyph(scan, out_of_bounds) diff --git a/tests/test_generate_profile.py b/tests/test_generate_profile.py new file mode 100644 index 0000000..01789bf --- /dev/null +++ b/tests/test_generate_profile.py @@ -0,0 +1,219 @@ +"""Tests for the generation profile loader.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from hletterscriptgen.generate_profile import ( + GenerateProfile, + GenerateProfileError, + load_generate_profile, +) +from hletterscriptgen.hashing import config_hash + +FIXTURE_DIR = Path(__file__).resolve().parent / "fixtures" / "generate_profile" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _write_profile(tmp_path: Path, payload: object) -> Path: + p = tmp_path / "profile.json" + p.write_text(json.dumps(payload), encoding="utf-8") + return p + + +_MINIMAL_GLYPH = {"letter": "א", "x": 0, "y": 0, "width": 20, "height": 20} +_MINIMAL_SCAN = {"entry_id": "e__s__p0001", "glyphs": [_MINIMAL_GLYPH]} +_MINIMAL_WRITER = { + "writer_id": "w1", + "attribution_method": "manual_review", + "scans": [_MINIMAL_SCAN], +} +_MINIMAL_PROFILE = { + "upstream_checkout": ".", + "writers": [_MINIMAL_WRITER], +} + + +# --------------------------------------------------------------------------- +# Valid profile +# --------------------------------------------------------------------------- + + +def test_load_valid_fixture() -> None: + profile = load_generate_profile(FIXTURE_DIR / "valid_profile.json") + assert isinstance(profile, GenerateProfile) + assert len(profile.writers) == 1 + writer = profile.writers[0] + assert writer.writer_id == "writer_fixture_a" + assert writer.attribution_method == "manual_review" + assert writer.notes == "Fixture writer for tests" + assert len(writer.scans) == 1 + scan = writer.scans[0] + assert scan.entry_id == "fixture__eligible_pdm__p0001" + assert len(scan.glyphs) == 2 + assert scan.glyphs[0].letter == "א" + assert scan.glyphs[1].letter == "ב" + # config_hash must be a 64-char lowercase hex string + assert isinstance(profile.config_hash, str) + assert len(profile.config_hash) == 64 + + +def test_profile_has_correct_config_hash(tmp_path: Path) -> None: + """config_hash must equal SHA-256 of the canonical JSON of the raw profile dict.""" + p = _write_profile(tmp_path, _MINIMAL_PROFILE) + profile = load_generate_profile(p) + assert profile.config_hash == config_hash(_MINIMAL_PROFILE) + + +def test_upstream_checkout_resolved_relative_to_profile(tmp_path: Path) -> None: + sub = tmp_path / "configs" + sub.mkdir() + p = sub / "profile.json" + p.write_text(json.dumps({**_MINIMAL_PROFILE, "upstream_checkout": "../data"}), encoding="utf-8") + profile = load_generate_profile(p) + assert profile.upstream_checkout == (tmp_path / "data").resolve() + + +def test_glyph_notes_optional(tmp_path: Path) -> None: + glyph_with_notes = {**_MINIMAL_GLYPH, "notes": "a test note"} + profile_data = { + "upstream_checkout": ".", + "writers": [ + {**_MINIMAL_WRITER, "scans": [{"entry_id": "e__s__p0001", "glyphs": [glyph_with_notes]}]}, # noqa: E501 + ], + } + p = _write_profile(tmp_path, profile_data) + profile = load_generate_profile(p) + assert profile.writers[0].scans[0].glyphs[0].notes == "a test note" + + +# --------------------------------------------------------------------------- +# File-level errors +# --------------------------------------------------------------------------- + + +def test_raises_on_missing_file(tmp_path: Path) -> None: + with pytest.raises(GenerateProfileError, match="not found"): + load_generate_profile(tmp_path / "nonexistent.json") + + +def test_raises_on_invalid_json(tmp_path: Path) -> None: + p = tmp_path / "profile.json" + p.write_text("{not valid", encoding="utf-8") + with pytest.raises(GenerateProfileError, match="invalid JSON"): + load_generate_profile(p) + + +def test_raises_on_non_object_root(tmp_path: Path) -> None: + p = _write_profile(tmp_path, [1, 2, 3]) + with pytest.raises(GenerateProfileError, match="JSON object"): + load_generate_profile(p) + + +# --------------------------------------------------------------------------- +# upstream_checkout validation +# --------------------------------------------------------------------------- + + +def test_raises_on_missing_upstream_checkout(tmp_path: Path) -> None: + payload = {k: v for k, v in _MINIMAL_PROFILE.items() if k != "upstream_checkout"} + p = _write_profile(tmp_path, payload) + with pytest.raises(GenerateProfileError, match="upstream_checkout"): + load_generate_profile(p) + + +def test_raises_on_blank_upstream_checkout(tmp_path: Path) -> None: + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "upstream_checkout": " "}) + with pytest.raises(GenerateProfileError, match="upstream_checkout"): + load_generate_profile(p) + + +# --------------------------------------------------------------------------- +# writers validation +# --------------------------------------------------------------------------- + + +def test_raises_on_empty_writers(tmp_path: Path) -> None: + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": []}) + with pytest.raises(GenerateProfileError, match="writers"): + load_generate_profile(p) + + +def test_raises_on_duplicate_writer_ids(tmp_path: Path) -> None: + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [_MINIMAL_WRITER, _MINIMAL_WRITER]}) # noqa: E501 + with pytest.raises(GenerateProfileError, match="duplicate writer_id"): + load_generate_profile(p) + + +def test_raises_on_duplicate_entry_ids_across_writers(tmp_path: Path) -> None: + writer2 = {**_MINIMAL_WRITER, "writer_id": "w2"} + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [_MINIMAL_WRITER, writer2]}) + with pytest.raises(GenerateProfileError, match="entry_id"): + load_generate_profile(p) + + +def test_raises_on_blank_writer_id(tmp_path: Path) -> None: + bad = {**_MINIMAL_WRITER, "writer_id": " "} + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [bad]}) + with pytest.raises(GenerateProfileError, match="writer_id"): + load_generate_profile(p) + + +def test_raises_on_empty_scans(tmp_path: Path) -> None: + bad = {**_MINIMAL_WRITER, "scans": []} + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [bad]}) + with pytest.raises(GenerateProfileError, match="scans"): + load_generate_profile(p) + + +# --------------------------------------------------------------------------- +# glyph validation +# --------------------------------------------------------------------------- + + +def test_raises_on_non_hebrew_letter(tmp_path: Path) -> None: + bad_glyph = {**_MINIMAL_GLYPH, "letter": "A"} + scan = {"entry_id": "e__s__p0001", "glyphs": [bad_glyph]} + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) # noqa: E501 + with pytest.raises(GenerateProfileError, match="Hebrew character"): + load_generate_profile(p) + + +def test_raises_on_multi_char_letter(tmp_path: Path) -> None: + bad_glyph = {**_MINIMAL_GLYPH, "letter": "אב"} + scan = {"entry_id": "e__s__p0001", "glyphs": [bad_glyph]} + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) # noqa: E501 + with pytest.raises(GenerateProfileError, match="Hebrew character"): + load_generate_profile(p) + + +def test_raises_on_negative_x(tmp_path: Path) -> None: + bad_glyph = {**_MINIMAL_GLYPH, "x": -1} + scan = {"entry_id": "e__s__p0001", "glyphs": [bad_glyph]} + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) # noqa: E501 + with pytest.raises(GenerateProfileError, match="≥ 0"): + load_generate_profile(p) + + +def test_raises_on_zero_width(tmp_path: Path) -> None: + bad_glyph = {**_MINIMAL_GLYPH, "width": 0} + scan = {"entry_id": "e__s__p0001", "glyphs": [bad_glyph]} + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) # noqa: E501 + with pytest.raises(GenerateProfileError, match="≥ 1"): + load_generate_profile(p) + + +@pytest.mark.parametrize("letter", ["א", "ב", "ג", "ת", "ך", "ם", "ן", "ף", "ץ"]) # noqa: RUF001 +def test_accepts_all_hebrew_letter_forms(tmp_path: Path, letter: str) -> None: + glyph = {**_MINIMAL_GLYPH, "letter": letter} + scan = {"entry_id": "e__s__p0001", "glyphs": [glyph]} + p = _write_profile(tmp_path, {**_MINIMAL_PROFILE, "writers": [{**_MINIMAL_WRITER, "scans": [scan]}]}) # noqa: E501 + profile = load_generate_profile(p) + assert profile.writers[0].scans[0].glyphs[0].letter == letter diff --git a/tests/test_generator.py b/tests/test_generator.py new file mode 100644 index 0000000..d3c0e70 --- /dev/null +++ b/tests/test_generator.py @@ -0,0 +1,448 @@ +"""End-to-end tests for the generator pipeline.""" + +from __future__ import annotations + +import json +import subprocess +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +cv2 = pytest.importorskip("cv2") +import numpy as np # noqa: E402 + +from hletterscriptgen.generate_profile import load_generate_profile # noqa: E402 +from hletterscriptgen.generator import GeneratorError, generate # noqa: E402 + +# --------------------------------------------------------------------------- +# Helpers — synthetic upstream checkout + scan +# --------------------------------------------------------------------------- + +_ELIGIBLE_ENTRY = { + "entry_id": "test__writer_a__p0001", + "source_id": "test__writer_a", + "creators": [{"name": "Test Author", "role": "writer", "death_year": 1900}], + "files": [ + { + "role": "original", + "local_path": "data/scans/test__writer_a/p0001.png", + "sha256": "0" * 64, + "mime_type": "image/png", + "width_px": 100, + "height_px": 100, + } + ], + "rights": { + "license_expression": "PDM-1.0", + "commercial_use_allowed": True, + "derivatives_allowed": True, + "scan_redistribution_allowed": True, + "verification_status": "primary_page_checked", + }, + "quality": {"usable_for_htr": True, "legibility": "high"}, +} + + +def _git(path: Path, *args: str) -> None: + subprocess.run(["git", "-C", str(path), *args], check=True, capture_output=True) + + +def _make_upstream_checkout(tmp_path: Path) -> Path: + """Create a minimal upstream git repo with one eligible entry and a scan.""" + repo = tmp_path / "upstream" + repo.mkdir() + + # Initialise git repo + _git(repo, "init", "-q", "-b", "main") + _git(repo, "config", "user.email", "test@example.com") + _git(repo, "config", "user.name", "Test") + _git(repo, "config", "commit.gpgsign", "false") + _git(repo, "remote", "add", "origin", "https://github.com/HeOCR/public-domain-hand-written-hebrew-scans.git") + + # entries.jsonl + index_dir = repo / "data" / "index" + index_dir.mkdir(parents=True) + (index_dir / "entries.jsonl").write_text( + json.dumps(_ELIGIBLE_ENTRY) + "\n", encoding="utf-8" + ) + + # Synthetic scan PNG: white 100x100 with two black 20x20 blobs + scan_dir = repo / "data" / "scans" / "test__writer_a" + scan_dir.mkdir(parents=True) + img = np.full((100, 100, 3), 255, dtype=np.uint8) + img[5:25, 5:25] = 0 # blob at (5,5) 20x20 + img[5:25, 35:55] = 0 # blob at (35,5) 20x20 + cv2.imwrite(str(scan_dir / "p0001.png"), img) + + _git(repo, "add", ".") + _git(repo, "commit", "-q", "-m", "initial") + return repo + + +def _make_profile(tmp_path: Path, upstream: Path) -> Path: + """Write a generation profile that uses the upstream checkout.""" + profile_data = { + "upstream_checkout": str(upstream), + "writers": [ + { + "writer_id": "writer_test_a", + "attribution_method": "manual_review", + "scans": [ + { + "entry_id": "test__writer_a__p0001", + "glyphs": [ + {"letter": "א", "x": 5, "y": 5, "width": 20, "height": 20}, + {"letter": "ב", "x": 35, "y": 5, "width": 20, "height": 20}, + ], + } + ], + } + ], + } + p = tmp_path / "profile.json" + p.write_text(json.dumps(profile_data), encoding="utf-8") + return p + + +# --------------------------------------------------------------------------- +# generate — happy path +# --------------------------------------------------------------------------- + + +def test_generate_produces_letter_set_json(tmp_path: Path) -> None: + upstream = _make_upstream_checkout(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + profile = load_generate_profile(profile_path) + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + assert len(paths) == 1 + letter_set_path = paths[0] + assert letter_set_path.exists() + assert letter_set_path.name == "letter_set.json" + + +def test_generate_letter_set_validates(tmp_path: Path) -> None: + from hletterscriptgen.validation import validate_document + + upstream = _make_upstream_checkout(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + profile = load_generate_profile(profile_path) + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + doc = json.loads(paths[0].read_text(encoding="utf-8")) + result = validate_document(doc) + assert result.ok, [i.format() for i in result.issues] + + +def test_generate_letter_set_content(tmp_path: Path) -> None: + upstream = _make_upstream_checkout(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + profile = load_generate_profile(profile_path) + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + doc = json.loads(paths[0].read_text(encoding="utf-8")) + assert doc["schema_version"] == "letter_set.v1" + assert doc["writer_id"] == "writer_test_a" + assert doc["upstream"]["repo"] == "HeOCR/public-domain-hand-written-hebrew-scans" + assert doc["generator"]["name"] == "hletterscriptgen" + # Both annotated letters must appear + assert "א" in doc["letters"] + assert "ב" in doc["letters"] + assert len(doc["letters"]["א"]) == 1 + assert len(doc["letters"]["ב"]) == 1 + + +def test_generate_writes_glyph_pngs(tmp_path: Path) -> None: + upstream = _make_upstream_checkout(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + profile = load_generate_profile(profile_path) + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + doc = json.loads(paths[0].read_text(encoding="utf-8")) + writer_dir = paths[0].parent + + for letter, variants in doc["letters"].items(): + for variant in variants: + asset = writer_dir / variant["asset_path"] + assert asset.exists(), f"asset missing: {asset}" + assert asset.read_bytes()[:4] == b"\x89PNG" + + +def test_generate_checksum_matches_png(tmp_path: Path) -> None: + import hashlib + + upstream = _make_upstream_checkout(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + profile = load_generate_profile(profile_path) + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + doc = json.loads(paths[0].read_text(encoding="utf-8")) + writer_dir = paths[0].parent + + for variants in doc["letters"].values(): + for variant in variants: + png_bytes = (writer_dir / variant["asset_path"]).read_bytes() + assert hashlib.sha256(png_bytes).hexdigest() == variant["checksum_sha256"] + + +def test_generate_is_deterministic(tmp_path: Path) -> None: + """Same profile + upstream + generated_at → bit-identical output.""" + upstream = _make_upstream_checkout(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + + out1 = tmp_path / "out1" + out2 = tmp_path / "out2" + + profile = load_generate_profile(profile_path) + ts = "2025-06-01T12:00:00+00:00" + paths1 = generate(profile, out1, generated_at=ts) + paths2 = generate(profile, out2, generated_at=ts) + + doc1 = json.loads(paths1[0].read_text(encoding="utf-8")) + doc2 = json.loads(paths2[0].read_text(encoding="utf-8")) + assert doc1 == doc2 + + # PNG bytes should be identical too + for variants in doc1["letters"].values(): + for variant in variants: + b1 = (out1 / "writer_test_a" / variant["asset_path"]).read_bytes() + b2 = (out2 / "writer_test_a" / variant["asset_path"]).read_bytes() + assert b1 == b2 + + +def test_generate_raises_when_no_glyphs_extracted(tmp_path: Path) -> None: + """A profile whose scans all have missing files should raise GeneratorError.""" + upstream = _make_upstream_checkout(tmp_path) + # Point the profile at a non-existent entry_id + profile_data = { + "upstream_checkout": str(upstream), + "writers": [ + { + "writer_id": "writer_test_a", + "attribution_method": "manual_review", + "scans": [ + { + "entry_id": "does__not__exist__p0001", + "glyphs": [{"letter": "א", "x": 0, "y": 0, "width": 20, "height": 20}], + } + ], + } + ], + } + p = tmp_path / "profile.json" + p.write_text(json.dumps(profile_data), encoding="utf-8") + + profile = load_generate_profile(p) + with pytest.raises(GeneratorError, match="no glyphs"): + generate(profile, tmp_path / "out", generated_at="2025-01-01T00:00:00+00:00") + + +# --------------------------------------------------------------------------- +# CLI integration +# --------------------------------------------------------------------------- + + +def test_cli_generate_exits_ok(tmp_path: Path) -> None: + from hletterscriptgen.cli import main + + upstream = _make_upstream_checkout(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + rc = main([ + "generate", + "--profile", str(profile_path), + "--output", str(output_dir), + "--generated-at", "2025-01-01T00:00:00+00:00", + ]) + assert rc == 0 + assert (output_dir / "writer_test_a" / "letter_set.json").exists() + + +def test_cli_scan_blobs_exits_ok(tmp_path: Path) -> None: + from hletterscriptgen.cli import main + + img = np.full((100, 100, 3), 255, dtype=np.uint8) + img[10:30, 10:30] = 0 + scan = tmp_path / "scan.png" + cv2.imwrite(str(scan), img) + + rc = main(["scan-blobs", str(scan)]) + assert rc == 0 + + +def test_cli_scan_blobs_text_format(tmp_path: Path, capsys: pytest.CaptureFixture[str]) -> None: + from hletterscriptgen.cli import main + + img = np.full((100, 100, 3), 255, dtype=np.uint8) + img[10:30, 10:30] = 0 + scan = tmp_path / "scan.png" + cv2.imwrite(str(scan), img) + + rc = main(["scan-blobs", str(scan), "--format", "text"]) + assert rc == 0 + out = capsys.readouterr().out + assert "blob" in out + assert "detected" in out + + +# --------------------------------------------------------------------------- +# Mock-based tests — document assembly without cv2 I/O +# --------------------------------------------------------------------------- + +# A minimal valid PNG (1x1 white pixel) used as a fake crop result. +_FAKE_PNG = ( + b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01" + b"\x08\x02\x00\x00\x00\x90wS\xde\x00\x00\x00\x0cIDATx\x9cc\xf8\x0f\x00" + b"\x00\x01\x01\x00\x05\x18\xd8N\x00\x00\x00\x00IEND\xaeB`\x82" +) + + +def _make_upstream_checkout_no_cv2(tmp_path: Path) -> Path: + """Create a minimal upstream git repo with a stub scan file (no cv2 needed).""" + repo = tmp_path / "upstream_nodeps" + repo.mkdir() + + _git(repo, "init", "-q", "-b", "main") + _git(repo, "config", "user.email", "test@example.com") + _git(repo, "config", "user.name", "Test") + _git(repo, "config", "commit.gpgsign", "false") + _git(repo, "remote", "add", "origin", "https://github.com/HeOCR/public-domain-hand-written-hebrew-scans.git") + + index_dir = repo / "data" / "index" + index_dir.mkdir(parents=True) + (index_dir / "entries.jsonl").write_text( + json.dumps(_ELIGIBLE_ENTRY) + "\n", encoding="utf-8" + ) + + # Write any bytes — binarize_scan is mocked and won't read this. + scan_dir = repo / "data" / "scans" / "test__writer_a" + scan_dir.mkdir(parents=True) + (scan_dir / "p0001.png").write_bytes(b"STUB") + + _git(repo, "add", ".") + _git(repo, "commit", "-q", "-m", "initial") + return repo + + +def test_generate_document_structure_mocked(tmp_path: Path) -> None: + """Document assembly, validation, and file writing with mocked cv2 calls.""" + upstream = _make_upstream_checkout_no_cv2(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + profile = load_generate_profile(profile_path) + fake_binary = MagicMock() + + with patch("hletterscriptgen.generator.binarize_scan", return_value=fake_binary): + with patch("hletterscriptgen.generator.crop_binary", return_value=_FAKE_PNG): + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + assert len(paths) == 1 + doc = json.loads(paths[0].read_text(encoding="utf-8")) + assert doc["schema_version"] == "letter_set.v1" + assert doc["writer_id"] == "writer_test_a" + assert "א" in doc["letters"] + assert "ב" in doc["letters"] + + +def test_generate_mocked_validates(tmp_path: Path) -> None: + """Generated document must pass schema validation even with mocked crops.""" + from hletterscriptgen.validation import validate_document + + upstream = _make_upstream_checkout_no_cv2(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + profile = load_generate_profile(profile_path) + with patch("hletterscriptgen.generator.binarize_scan", return_value=MagicMock()): + with patch("hletterscriptgen.generator.crop_binary", return_value=_FAKE_PNG): + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + doc = json.loads(paths[0].read_text(encoding="utf-8")) + result = validate_document(doc) + assert result.ok, [i.format() for i in result.issues] + + +def test_generate_mocked_writes_png_assets(tmp_path: Path) -> None: + """PNG assets must exist on disk with correct PNG magic bytes.""" + upstream = _make_upstream_checkout_no_cv2(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + profile = load_generate_profile(profile_path) + with patch("hletterscriptgen.generator.binarize_scan", return_value=MagicMock()): + with patch("hletterscriptgen.generator.crop_binary", return_value=_FAKE_PNG): + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + doc = json.loads(paths[0].read_text(encoding="utf-8")) + writer_dir = paths[0].parent + for variants in doc["letters"].values(): + for variant in variants: + asset = writer_dir / variant["asset_path"] + assert asset.exists() + assert asset.read_bytes()[:4] == b"\x89PNG" + + +def test_generate_mocked_glyph_notes_in_variant(tmp_path: Path) -> None: + """A glyph annotation with notes must propagate the notes to its variant.""" + upstream = _make_upstream_checkout_no_cv2(tmp_path) + profile_data = { + "upstream_checkout": str(upstream), + "writers": [ + { + "writer_id": "writer_test_a", + "attribution_method": "manual_review", + "scans": [ + { + "entry_id": "test__writer_a__p0001", + "glyphs": [ + { + "letter": "א", + "x": 5, "y": 5, "width": 20, "height": 20, + "notes": "this is a test note", + }, + ], + } + ], + } + ], + } + p = tmp_path / "profile.json" + p.write_text(json.dumps(profile_data), encoding="utf-8") + + output_dir = tmp_path / "out" + profile = load_generate_profile(p) + with patch("hletterscriptgen.generator.binarize_scan", return_value=MagicMock()): + with patch("hletterscriptgen.generator.crop_binary", return_value=_FAKE_PNG): + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + doc = json.loads(paths[0].read_text(encoding="utf-8")) + variant = doc["letters"]["א"][0] + assert variant.get("notes") == "this is a test note" + + +def test_generate_mocked_config_hash_in_document(tmp_path: Path) -> None: + """generator.config_hash in the output document must match profile.config_hash.""" + upstream = _make_upstream_checkout_no_cv2(tmp_path) + profile_path = _make_profile(tmp_path, upstream) + output_dir = tmp_path / "out" + + profile = load_generate_profile(profile_path) + with patch("hletterscriptgen.generator.binarize_scan", return_value=MagicMock()): + with patch("hletterscriptgen.generator.crop_binary", return_value=_FAKE_PNG): + paths = generate(profile, output_dir, generated_at="2025-01-01T00:00:00+00:00") + + doc = json.loads(paths[0].read_text(encoding="utf-8")) + assert doc["generator"]["config_hash"] == profile.config_hash