From 29a457db59da8a22dcc7d347410bc3094a8b921d Mon Sep 17 00:00:00 2001 From: shmukit Date: Mon, 4 May 2026 01:36:54 +0600 Subject: [PATCH] feat(extraction): modularize PDF-Extract-Kit integration and enhance extraction capabilities - Introduced a structured wrapper for PDF-Extract-Kit, facilitating modular extraction processes. - Added new mixins for image handling, page processing, question building, and text utilities. - Updated configuration examples to support LiteLLM model routing. - Enhanced extraction settings with validation for segmentation presets. - Improved UI to display active configuration snapshots and allow for dynamic updates. --- content_gen/.env.example | 11 +- content_gen/core/config_schema.py | 14 + content_gen/scripts/README.md | 2 +- .../scripts/extraction/kit/__init__.py | 5 + .../scripts/extraction/kit/_bootstrap.py | 28 + .../scripts/extraction/kit/image_utils.py | 51 ++ .../scripts/extraction/kit/page_processor.py | 334 ++++++++ .../extraction/kit/question_builder.py | 55 ++ .../scripts/extraction/kit/text_utils.py | 102 +++ content_gen/scripts/extraction/kit/wrapper.py | 222 ++++++ .../extraction/pdf_extract_kit_wrapper.py | 732 +----------------- .../scripts/processing/upload_to_storage.py | 57 +- edmate_config.yaml.example | 6 + qc_viewer/static/automate.html | 28 + qc_viewer/static/js/automate_ui.js | 32 + 15 files changed, 930 insertions(+), 749 deletions(-) create mode 100644 content_gen/scripts/extraction/kit/__init__.py create mode 100644 content_gen/scripts/extraction/kit/_bootstrap.py create mode 100644 content_gen/scripts/extraction/kit/image_utils.py create mode 100644 content_gen/scripts/extraction/kit/page_processor.py create mode 100644 content_gen/scripts/extraction/kit/question_builder.py create mode 100644 content_gen/scripts/extraction/kit/text_utils.py create mode 100644 content_gen/scripts/extraction/kit/wrapper.py diff --git a/content_gen/.env.example b/content_gen/.env.example index a376c47..6695a34 100644 --- a/content_gen/.env.example +++ b/content_gen/.env.example @@ -4,9 +4,14 @@ # ===== Optional Image CDN Storage (Legacy/Optional) ===== # Not required when using base64 image mode + Postgres persistence. # Only needed if you explicitly run CDN upload mode. -# AZURE_STORAGE_ACCOUNT_NAME=your_account_name_here -# AZURE_STORAGE_ACCOUNT_KEY=your_account_key_here -# AZURE_STORAGE_CDN_URL=https://cdn.edmate.com # Optional: custom CDN domain +# Prefer account name + key (no connection string in your shell history): +# AZURE_STORAGE_ACCOUNT_NAME= +# AZURE_STORAGE_ACCOUNT_KEY= +# +# Or paste the full value from Azure Portal "Access keys" (keep out of git): +# AZURE_STORAGE_CONNECTION_STRING= +# +# AZURE_STORAGE_CDN_URL=https://cdn.example.com # Optional: custom CDN domain # ===== Database ===== # PostgreSQL connection string diff --git a/content_gen/core/config_schema.py b/content_gen/core/config_schema.py index bb463e5..fa1dc31 100644 --- a/content_gen/core/config_schema.py +++ b/content_gen/core/config_schema.py @@ -79,6 +79,20 @@ class ExtractionSettings(BaseModel): description="bangladeshi | numbered_only — controls regex segmentation heuristics", ) + @field_validator("segmentation_preset", mode="before") + @classmethod + def _validate_segmentation_preset(cls, v: Any) -> str: + allowed = frozenset({"bangladeshi", "numbered_only"}) + s = (v if v is not None else "bangladeshi") + if not isinstance(s, str): + s = str(s) + s = s.strip().lower() + if s not in allowed: + raise ValueError( + f"segmentation_preset must be one of {sorted(allowed)}, got {v!r}" + ) + return s + @field_validator("engine", mode="before") @classmethod def _coerce_engine(cls, v: Any) -> Any: diff --git a/content_gen/scripts/README.md b/content_gen/scripts/README.md index 6e9f535..1426766 100644 --- a/content_gen/scripts/README.md +++ b/content_gen/scripts/README.md @@ -6,7 +6,7 @@ This directory contains the **PDF → structured questions → explanations** pi | Path | Role | |------|------| -| `extraction/` | `pdf_extract_kit_wrapper.py` — adapter around [PDF-Extract-Kit](https://github.com/opendatalab/PDF-Extract-Kit) (must be cloned to `content_gen/tools/PDF-Extract-Kit`; see repo `scripts/setup_pdf_extract_kit.sh`). | +| `extraction/` | `pdf_extract_kit_wrapper.py` (shim) → `extraction/kit/` — modular adapter around [PDF-Extract-Kit](https://github.com/opendatalab/PDF-Extract-Kit) (clone to `content_gen/tools/PDF-Extract-Kit`; see `scripts/setup_pdf_extract_kit.sh`). | | `adapters/` (package root `content_gen/adapters/`) | `PyMuPDFAdapter`, `KitExtractionAdapter`, `VisionExtractionAdapter` — selected by `extraction_settings.engine` in `edmate_config.yaml`. | | `pipeline/` | `pipeline_orchestrator.py` — main CLI orchestrator; `national_exam_processor.py` — optional standalone path with `--extraction-engine`. | | `processing/` | `content_generator.py`, import/upload helpers. | diff --git a/content_gen/scripts/extraction/kit/__init__.py b/content_gen/scripts/extraction/kit/__init__.py new file mode 100644 index 0000000..41ed4d4 --- /dev/null +++ b/content_gen/scripts/extraction/kit/__init__.py @@ -0,0 +1,5 @@ +"""PDF-Extract-Kit integration split into focused modules.""" + +from content_gen.scripts.extraction.kit.wrapper import PDFExtractKitWrapper + +__all__ = ["PDFExtractKitWrapper"] diff --git a/content_gen/scripts/extraction/kit/_bootstrap.py b/content_gen/scripts/extraction/kit/_bootstrap.py new file mode 100644 index 0000000..e8984fe --- /dev/null +++ b/content_gen/scripts/extraction/kit/_bootstrap.py @@ -0,0 +1,28 @@ +""" +PDF-Extract-Kit path bootstrap — must run before importing pdf_extract_kit. +""" +from __future__ import annotations + +import sys +from pathlib import Path + +# content_gen/scripts/extraction/kit -> content_gen +CONTENT_GEN_ROOT = Path(__file__).resolve().parent.parent.parent.parent + +KIT_PATH = CONTENT_GEN_ROOT / "tools" / "PDF-Extract-Kit" +if str(KIT_PATH) not in sys.path: + sys.path.insert(0, str(KIT_PATH)) + +try: + import pdf_extract_kit.tasks # noqa: F401 — trigger registration + from pdf_extract_kit.utils.config_loader import initialize_tasks_and_models + + HAS_KIT = True +except (ImportError, ModuleNotFoundError): + HAS_KIT = False + initialize_tasks_and_models = None # type: ignore[assignment] + print( + "⚠️ PDF-Extract-Kit not found. Extraction features using this engine will be disabled." + ) + +__all__ = ["CONTENT_GEN_ROOT", "KIT_PATH", "HAS_KIT", "initialize_tasks_and_models"] diff --git a/content_gen/scripts/extraction/kit/image_utils.py b/content_gen/scripts/extraction/kit/image_utils.py new file mode 100644 index 0000000..14bb441 --- /dev/null +++ b/content_gen/scripts/extraction/kit/image_utils.py @@ -0,0 +1,51 @@ +"""Image crop helpers for PDF-Extract-Kit wrapper.""" +from __future__ import annotations + +from pathlib import Path +from typing import List, Optional + +import fitz + + +class KitImageUtilsMixin: + images_dir: Optional[Path] + def _extract_bbox_image( + self, + page: fitz.Page, + bbox: List[float], + q_num: int, + element_type: str, + ) -> Path: + """ + Extract and save image from bounding box + + Args: + page: PyMuPDF page object + bbox: Bounding box [x0, y0, x1, y1] + q_num: Question number + element_type: Type of element (figure, table, formula) + + Returns: + Path to saved image + """ + width = max(1.0, bbox[2] - bbox[0]) + height = max(1.0, bbox[3] - bbox[1]) + pad = max(12.0, min(width, height) * 0.08) + + final_bbox = [ + max(0, bbox[0] - pad), + max(0, bbox[1] - pad), + min(page.rect.width, bbox[2] + pad), + min(page.rect.height, bbox[3] + pad), + ] + + images_dir = self.images_dir + if images_dir is None: + raise ValueError("images_dir must be initialized before extracting images") + img_name = f"q{q_num}_{element_type}.png" + img_path = images_dir / img_name + + pix = page.get_pixmap(matrix=fitz.Matrix(3, 3), clip=fitz.Rect(final_bbox)) + pix.save(str(img_path)) + + return img_path diff --git a/content_gen/scripts/extraction/kit/page_processor.py b/content_gen/scripts/extraction/kit/page_processor.py new file mode 100644 index 0000000..949e241 --- /dev/null +++ b/content_gen/scripts/extraction/kit/page_processor.py @@ -0,0 +1,334 @@ +"""Per-page layout and text partitioning for PDF-Extract-Kit wrapper.""" +from __future__ import annotations + +import re +from abc import abstractmethod +from pathlib import Path +from typing import Any, Dict, List, Optional, cast + +import fitz + + +class KitPageProcessorMixin: + # --- cross-mixin attribute dependencies (set by PDFExtractKitWrapper.__init__) --- + layout_detector: Optional[Any] + images_dir: Optional[Path] + question_detection_mode: str + min_question_number: int + max_question_number: Optional[int] + + # --- cross-mixin method dependencies (provided by sibling mixins) --- + @abstractmethod + def _clean_noise(self, text: str) -> str: ... + + @abstractmethod + def _reconstruct_line_text( + self, spans: List[Dict], avg_baseline: float, main_size: float + ) -> str: ... + + @abstractmethod + def _extract_bbox_image( + self, page: fitz.Page, bbox: List[float], q_num: int, element_type: str + ) -> Path: ... + def _process_page( + self, + page: fitz.Page, + page_num: int, + doc: fitz.Document, + last_q_num: Optional[int] = None, + ) -> tuple[List[Dict], Optional[int]]: + """ + Process a single page using span-level partitioning and coordinate mapping. + Returns (list of question fragments, updated last_q_num). + """ + question_positions = self._detect_question_numbers_with_positions(page) + + if not question_positions: + if last_q_num: + question_positions = [(last_q_num, 0)] + else: + return [], None + + questions: Dict[int, Dict] = {} + new_last_q_num = last_q_num + + for q_num, _ in question_positions: + if self._is_valid_question_number(q_num): + questions[q_num] = { + "question_number": q_num, + "page": page_num, + "question_text": "", + "options": {"A": "", "B": "", "C": "", "D": ""}, + "stem_images": [], + "option_images": {}, + } + new_last_q_num = q_num + + all_spans: List[Dict] = [] + text_dict = cast(Dict, page.get_text("dict")) + for block in text_dict["blocks"]: + if "lines" in block: + for line in block["lines"]: + for span in line["spans"]: + if not span["text"].strip(): + continue + all_spans.append(span) + + spans_by_question = {q: [] for q in questions} + for span in all_spans: + y_mid = (span["bbox"][1] + span["bbox"][3]) / 2 + q_num = self._assign_to_question(y_mid, question_positions, page_num) + if q_num and q_num in spans_by_question: + spans_by_question[q_num].append(span) + + for q_num, spans in spans_by_question.items(): + if not spans: + continue + + spans.sort(key=lambda s: (s["bbox"][1] + s["bbox"][3]) / 2) + visual_lines: List[List[Dict]] = [] + if spans: + current_line = [spans[0]] + for s in spans[1:]: + last_y_mid = (current_line[-1]["bbox"][1] + current_line[-1]["bbox"][3]) / 2 + curr_y_mid = (s["bbox"][1] + s["bbox"][3]) / 2 + if abs(curr_y_mid - last_y_mid) < 9: + current_line.append(s) + else: + visual_lines.append(current_line) + current_line = [s] + visual_lines.append(current_line) + + current_field = "question_text" + + for vline in visual_lines: + vline.sort(key=lambda s: s["bbox"][0]) + line_main_size = max(s["size"] for s in vline) + line_baselines = [ + s["bbox"][1] + for s in vline + if abs(s["size"] - line_main_size) < 0.5 + ] + line_avg_baseline = ( + sum(line_baselines) / len(line_baselines) + if line_baselines + else vline[0]["bbox"][1] + ) + + marker_indices = [] + for i, span in enumerate(vline): + txt = span["text"].strip().rstrip(".") + font = span["font"].lower() + x = span["bbox"][0] + known_cols = [70, 81, 170, 181, 270, 281, 370, 381] + is_bold = "bold" in font or "bold" in span.get("flags_str", "").lower() + if txt in ["A", "B", "C", "D"] and is_bold and any( + abs(x - c) < 15 for c in known_cols + ): + marker_indices.append((i, txt)) + + if marker_indices: + if marker_indices[0][0] > 0: + prefix_text = self._reconstruct_line_text( + vline[0 : marker_indices[0][0]], + line_avg_baseline, + line_main_size, + ) + prefix_text = self._clean_noise(prefix_text) + if prefix_text: + if current_field == "question_text": + questions[q_num]["question_text"] += " " + prefix_text + else: + questions[q_num]["options"][current_field] += " " + prefix_text + + for m_idx in range(len(marker_indices)): + start_idx, opt_letter = marker_indices[m_idx] + end_idx = ( + marker_indices[m_idx + 1][0] + if m_idx + 1 < len(marker_indices) + else len(vline) + ) + opt_text = self._reconstruct_line_text( + vline[start_idx + 1 : end_idx], + line_avg_baseline, + line_main_size, + ) + opt_text = self._clean_noise(opt_text) + questions[q_num]["options"][opt_letter] += " " + opt_text + current_field = opt_letter + else: + line_text = self._reconstruct_line_text( + vline, line_avg_baseline, line_main_size + ) + line_text = self._clean_noise(line_text) + if line_text: + if current_field == "question_text": + if not questions[q_num]["question_text"]: + line_text = re.sub(r"^\d+[\.\s]*", "", line_text) + questions[q_num]["question_text"] += " " + line_text + else: + questions[q_num]["options"][current_field] += " " + line_text + + layout_detector = self.layout_detector + if layout_detector is None: + raise RuntimeError("Layout detector is not initialized") + images_dir = self.images_dir + if images_dir is None: + raise ValueError("images_dir must be initialized before processing page images") + pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) + temp_img_path = images_dir / f"_temp_page_{page_num}.png" + pix.save(str(temp_img_path)) + results = layout_detector.predict_images(str(temp_img_path), str(images_dir)) + layout_result = results[0] + boxes = layout_result.boxes + + for box in boxes: + cls = int(box.cls[0]) + xyxy = box.xyxy[0].tolist() + pdf_bbox = [c / 2 for c in xyxy] + y_mid = (pdf_bbox[1] + pdf_bbox[3]) / 2 + type_name = layout_detector.model.id_to_names.get(cls, "unknown") + + if type_name in ["figure", "table", "isolate_formula"]: + q_num = self._assign_to_question(y_mid, question_positions, page_num) + if q_num and q_num in questions: + img_path = self._extract_bbox_image(page, pdf_bbox, q_num, type_name) + questions[q_num]["stem_images"].append(str(img_path)) + temp_img_path.unlink() + + for q in questions.values(): + q["question_text"] = q["question_text"].strip() + q["question_text"] = re.sub(r"^(\d+[\.\s]*)+", "", q["question_text"]) + for opt in q["options"]: + val = q["options"][opt].strip() + val = re.sub(r"\s+[\d_]$", "", val) + q["options"][opt] = val + + return list(questions.values()), new_last_q_num + + def _detect_question_numbers_with_positions(self, page: fitz.Page) -> List[tuple]: + """ + Detect question numbers and their Y positions + + Args: + page: PyMuPDF page object + + Returns: + List of (question_number, y_position) tuples + """ + text_dict = cast(Dict, page.get_text("dict")) + blocks = text_dict.get("blocks", []) + question_positions: List[tuple] = [] + + min_x = 1000.0 + for block in blocks: + if "lines" in block: + for line in block["lines"]: + min_x = min(min_x, line["bbox"][0]) + + for block_idx, block in enumerate(blocks): + if "lines" in block: + for i, line in enumerate(block["lines"]): + line_text = " ".join( + span["text"].strip() + for span in line["spans"] + if span["text"].strip() + ) + line_text = line_text.strip() + + if re.search(r"\d{4}/\d{2}/\w+/\d{2}", line_text): + continue + if "© UCLES" in line_text: + continue + + x_pos = line["bbox"][0] + if x_pos > min_x + 50 and x_pos > 150: + continue + + if self.question_detection_mode == "strict": + marker_pattern = r"^(\d+)\s+([A-Z][a-z]+)" + elif self.question_detection_mode == "open": + marker_pattern = r"^(\d+)[\.\s]*([A-Z\d\(\\]|$)" + else: + marker_pattern = r"^(\d+)[\.\s]*([A-Z]|\\|\(|\$|[a-z]{3,})" + + match = re.match(marker_pattern, line_text) + if match: + q_num = int(match.group(1)) + if self._is_valid_question_number(q_num): + y_pos = line["bbox"][1] + question_positions.append((q_num, y_pos)) + continue + + q_num_match = re.match(r"^(\d+)[\.]?$", line_text) + if q_num_match: + q_num = int(q_num_match.group(1)) + if self._is_valid_question_number(q_num): + is_question = False + check_text = "" + if i + 1 < len(block["lines"]): + check_text = " ".join( + s["text"] for s in block["lines"][i + 1]["spans"] + ).strip() + elif block_idx + 1 < len(blocks): + next_block = blocks[block_idx + 1] + if "lines" in next_block and len(next_block["lines"]) > 0: + check_text = " ".join( + s["text"] for s in next_block["lines"][0]["spans"] + ).strip() + + if len(check_text) > 3: + if not re.search(r"\d{4}/\d{2}/\w+/\d{2}", check_text): + is_question = True + + if is_question: + y_pos = line["bbox"][1] + question_positions.append((q_num, y_pos)) + + sorted_positions = sorted(question_positions, key=lambda x: x[1]) + deduped: List[tuple] = [] + seen: set = set() + for q_num, y_pos in sorted_positions: + if q_num in seen: + continue + seen.add(q_num) + deduped.append((q_num, y_pos)) + return deduped + + def _assign_to_question( + self, + y_pos: float, + question_positions: List[tuple], + page_num: int, + ) -> Optional[int]: + """ + Assign a detected element to a question number based on Y position + + Args: + y_pos: Y coordinate of element + question_positions: List of (question_num, y_position) tuples + page_num: Current page number + + Returns: + Question number or None + """ + if not question_positions: + return None + + if y_pos > 775: + return None + + for i in range(len(question_positions) - 1, -1, -1): + q_num, q_y = question_positions[i] + if y_pos >= q_y: + return q_num + + return None + + def _is_valid_question_number(self, number: int) -> bool: + """Question number guardrails, configurable per curriculum/run.""" + if number < self.min_question_number: + return False + if self.max_question_number is not None and number > self.max_question_number: + return False + return True diff --git a/content_gen/scripts/extraction/kit/question_builder.py b/content_gen/scripts/extraction/kit/question_builder.py new file mode 100644 index 0000000..d60fed3 --- /dev/null +++ b/content_gen/scripts/extraction/kit/question_builder.py @@ -0,0 +1,55 @@ +"""Merge question fragments across pages.""" +from __future__ import annotations + +from abc import abstractmethod +from typing import Dict, List + + +class KitQuestionBuilderMixin: + # --- cross-mixin method dependency (provided by KitPageProcessorMixin) --- + @abstractmethod + def _is_valid_question_number(self, number: int) -> bool: ... + def _merge_questions(self, questions: List[Dict]) -> List[Dict]: + """Merge question fragments across pages into canonical runtime questions.""" + merged: Dict[int, Dict] = {} + for q in questions: + num = q.get("question_number", 0) + if not self._is_valid_question_number(num): + continue + + if num not in merged: + merged[num] = { + "question_number": num, + "page": q.get("page"), + "question_text": (q.get("question_text") or "").strip(), + "options": { + "A": (q.get("options", {}).get("A", "") or "").strip(), + "B": (q.get("options", {}).get("B", "") or "").strip(), + "C": (q.get("options", {}).get("C", "") or "").strip(), + "D": (q.get("options", {}).get("D", "") or "").strip(), + }, + "stem_images": list(dict.fromkeys(q.get("stem_images", []) or [])), + "option_images": q.get("option_images", {}) or {}, + } + continue + + q_text = (q.get("question_text") or "").strip() + if q_text: + merged[num]["question_text"] = ( + f"{merged[num]['question_text']} {q_text}".strip() + ) + + for opt in ["A", "B", "C", "D"]: + opt_text = (q.get("options", {}).get(opt, "") or "").strip() + if not opt_text: + continue + existing = merged[num]["options"].get(opt, "") + merged[num]["options"][opt] = f"{existing} {opt_text}".strip() + + merged[num]["stem_images"] = list( + dict.fromkeys( + merged[num]["stem_images"] + (q.get("stem_images", []) or []) + ) + ) + + return sorted(merged.values(), key=lambda item: item["question_number"]) diff --git a/content_gen/scripts/extraction/kit/text_utils.py b/content_gen/scripts/extraction/kit/text_utils.py new file mode 100644 index 0000000..3425559 --- /dev/null +++ b/content_gen/scripts/extraction/kit/text_utils.py @@ -0,0 +1,102 @@ +"""Text reconstruction and noise cleaning for PDF-Extract-Kit wrapper.""" +from __future__ import annotations + +import re +from pathlib import Path +from typing import Dict, List, Optional + + +class KitTextUtilsMixin: + # --- cross-mixin attribute dependencies (set by PDFExtractKitWrapper.__init__) --- + extraction_noise_patterns: List[str] + outputs_dir: Optional[Path] + base_name: Optional[str] + def _clean_noise(self, text: str) -> str: + """Filter global noise and map symbols from reconstructed text parts""" + symbol_map = { + "\uf070": "π", + "\uf061": "α", + "\uf062": "β", + "\uf067": "γ", + "\uf044": "Δ", + "\uf0b0": "°", + "\uf0b1": "±", + "\uf0e6": "(", + "\uf0f6": ")", + "\uf0e7": "[", + "\uf0f7": "]", + "\uf03d": "=", + "\uf02b": "+", + "\uf02d": "–", + "\uf057": "Ω", + "\uf0b8": "÷", + } + for code, char in symbol_map.items(): + text = text.replace(code, char) + + text = re.sub(r"\d{4}/\d{2}/\w+/\d{2}", "", text) + text = re.sub(r"© UCLES.*", "", text, flags=re.I) + text = re.sub(r"\[Turn over", "", text, flags=re.I) + + for pattern in self.extraction_noise_patterns: + if pattern: + text = re.sub(pattern, "", text, flags=re.I | re.DOTALL) + + return text.strip() + + def _reconstruct_line_text( + self, spans: List[Dict], avg_baseline: float, main_size: float + ) -> str: + """Helper to reconstruct text with markup from a list of spans on one line""" + if not spans: + return "" + parts = [] + for span in spans: + text = span["text"] + size = span["size"] + top = span["bbox"][1] + + if size < main_size * 0.9: + if top < avg_baseline - 1: + parts.append(f"^{text}") + elif top > avg_baseline + 1: + parts.append(f"_{text}") + else: + parts.append(text) + else: + parts.append(text) + return "".join(parts).strip() + + def _generate_processed_text(self, output_data: Dict) -> None: + """Generate the standard processed text file in data/outputs following prompts.py""" + outputs_dir = self.outputs_dir + base_name = self.base_name + if outputs_dir is None or base_name is None: + raise ValueError( + "outputs_dir and base_name must be initialized before generating processed text" + ) + text_path = outputs_dir / f"{base_name}_processed.txt" + + sorted_qs = sorted( + output_data.get("questions", []), key=lambda x: x["question_number"] + ) + + with open(text_path, "w", encoding="utf-8") as f: + for q in sorted_qs: + f.write( + f"Question {q['question_number']}Question and Options in Text Format\n\n" + ) + + f.write(f"{q['question_text'].strip()}\n\n") + + opts = q["options"] + opt_str = f"A. {opts['A']} B. {opts['B']} C. {opts['C']} D. {opts['D']}" + f.write(f"{opt_str.strip()}\n\n") + + f.write("Detailed Explanation of the Question and Right Answer\n\n") + f.write("[EXPLANATION_PLACEHOLDER]\n\n") + f.write("Option Wise Explanation (Detailed)\n\n") + f.write("[OPTION_EXPLANATION_PLACEHOLDER]\n\n") + f.write("### 🧠 Concept Gap Analysis and Flashcards\n\n") + f.write("[FLASHCARDS_PLACEHOLDER]\n\n") + f.write("-" * 50 + "\n\n") diff --git a/content_gen/scripts/extraction/kit/wrapper.py b/content_gen/scripts/extraction/kit/wrapper.py new file mode 100644 index 0000000..1e28306 --- /dev/null +++ b/content_gen/scripts/extraction/kit/wrapper.py @@ -0,0 +1,222 @@ +"""PDF-Extract-Kit wrapper — public API and orchestration.""" +from __future__ import annotations + +import json +from pathlib import Path +from typing import Callable, Dict, List, Optional + +import fitz + +from content_gen.scripts.extraction.kit._bootstrap import ( + CONTENT_GEN_ROOT, + HAS_KIT, + KIT_PATH, + initialize_tasks_and_models, +) +from content_gen.scripts.extraction.kit.image_utils import KitImageUtilsMixin +from content_gen.scripts.extraction.kit.page_processor import KitPageProcessorMixin +from content_gen.scripts.extraction.kit.question_builder import KitQuestionBuilderMixin +from content_gen.scripts.extraction.kit.text_utils import KitTextUtilsMixin + + +class PDFExtractKitWrapper( + KitPageProcessorMixin, + KitTextUtilsMixin, + KitImageUtilsMixin, + KitQuestionBuilderMixin, +): + """ + Wrapper for PDF-Extract-Kit that provides a simple interface + compatible with the old smart_extract.py output format + """ + + def __init__( + self, + pdf_path: Optional[str] = None, + output_dir: Optional[str] = None, + use_gpu: bool = False, + min_question_number: int = 1, + max_question_number: Optional[int] = None, + question_detection_mode: str = "balanced", + extraction_noise_patterns: Optional[List[str]] = None, + ) -> None: + self.use_gpu = use_gpu + self.pdf_path = pdf_path + self.output_dir = Path(output_dir) if output_dir else None + self.min_question_number = min_question_number + self.max_question_number = max_question_number + self.question_detection_mode = question_detection_mode + if extraction_noise_patterns is None: + from content_gen.core.config_schema import DEFAULT_EXTRACTION_NOISE_PATTERNS + + self.extraction_noise_patterns = list(DEFAULT_EXTRACTION_NOISE_PATTERNS) + else: + self.extraction_noise_patterns = list(extraction_noise_patterns) + + if output_dir: + self.output_dir = Path(output_dir) + else: + self.output_dir = CONTENT_GEN_ROOT / "data" / "extracted" + + if self.pdf_path: + self.base_name = Path(self.pdf_path).stem + self.images_dir = self.output_dir / "images" / self.base_name + self.images_dir.mkdir(parents=True, exist_ok=True) + self.outputs_dir = self.output_dir.parent / "outputs" + self.outputs_dir.mkdir(parents=True, exist_ok=True) + else: + self.base_name = None + self.images_dir = None + self.outputs_dir = None + + self._init_models() + + def _init_models(self) -> None: + """Initialize PDF-Extract-Kit AI models""" + if not HAS_KIT: + print("❌ Cannot initialize models: PDF-Extract-Kit not found in tools/") + self.layout_detector = None + return + if initialize_tasks_and_models is None: + print("❌ Cannot initialize models: config loader is unavailable") + self.layout_detector = None + return + + device = "cuda" if self.use_gpu else "cpu" + config = { + "tasks": { + "layout_detection": { + "model": "layout_detection_yolo", + "model_config": { + "img_size": 1280, + "conf_thres": 0.25, + "iou_thres": 0.45, + "model_path": str( + KIT_PATH / "models/Layout/YOLO/doclayout_yolo_ft.pt" + ), + "device": device, + }, + } + } + } + + print("🤖 Initializing PDF-Extract-Kit AI models...") + task_instances = initialize_tasks_and_models(config) + self.layout_detector = task_instances["layout_detection"] + print("✅ Models loaded successfully") + + def extract( + self, progress_callback: Optional[Callable[[int, str], None]] = None + ) -> Dict: + if not HAS_KIT: + raise RuntimeError( + "Extraction failed: PDF-Extract-Kit is not installed or found in tools/" + ) + if not self.pdf_path: + raise ValueError("pdf_path must be set before calling extract()") + if ( + not self.output_dir + or not self.base_name + or not self.images_dir + or not self.outputs_dir + ): + raise ValueError( + "output_dir/base_name/images_dir/outputs_dir must be initialized before calling extract()" + ) + if self.layout_detector is None: + raise RuntimeError("Layout detector is not initialized") + + doc = fitz.open(self.pdf_path) + all_questions: List[Dict] = [] + + print(f"📄 Processing: {self.pdf_path}") + print(f" Pages: {len(doc)}") + + if progress_callback: + progress_callback(25, "Extracting diagrams and images via Vision AI...") + + last_q_num: Optional[int] = None + for page_num in range(len(doc)): + page = doc[page_num] + questions_on_page, last_q_num = self._process_page( + page, page_num + 1, doc, last_q_num + ) + all_questions.extend(questions_on_page) + + doc.close() + + if progress_callback: + progress_callback(45, "Parsing text and layout structures...") + + merged_questions = self._merge_questions(all_questions) + output = { + "source": self.pdf_path, + "base_name": self.base_name, + "questions": merged_questions, + "raw_questions": all_questions, + "total_questions": len(merged_questions), + "extraction_settings": { + "min_question_number": self.min_question_number, + "max_question_number": self.max_question_number, + "question_detection_mode": self.question_detection_mode, + }, + } + + json_path = self.output_dir / f"{self.base_name}_extracted.json" + with open(json_path, "w") as f: + json.dump(output, f, indent=2) + + self._generate_processed_text(output) + + print("\n✅ Extraction complete!") + print(f" Questions: {len(merged_questions)}") + print(f" JSON: {json_path}") + print(f" Text Output: {self.outputs_dir / f'{self.base_name}_processed.txt'}") + print( + f" Images: {self.images_dir} ({len(list(self.images_dir.glob('*.png')))} files)" + ) + + return output + + def extract_questions( + self, + source_path: str, + output_dir: str, + progress_callback: Optional[Callable[[int, str], None]] = None, + ) -> Dict: + """Adapter-compatible extraction method""" + self.pdf_path = source_path + self.output_dir = Path(output_dir) + self.base_name = Path(source_path).stem + + self.images_dir = self.output_dir / "images" / self.base_name + self.images_dir.mkdir(parents=True, exist_ok=True) + + self.outputs_dir = self.output_dir.parent / "outputs" + self.outputs_dir.mkdir(parents=True, exist_ok=True) + + return self.extract(progress_callback=progress_callback) + + +def main() -> None: + """CLI entry point""" + import argparse + + parser = argparse.ArgumentParser( + description="Extract questions from PDF using PDF-Extract-Kit" + ) + parser.add_argument("pdf_path", help="Path to PDF file") + parser.add_argument( + "--output-dir", + default="content_gen/data/extracted", + help="Output directory", + ) + + args = parser.parse_args() + + extractor = PDFExtractKitWrapper(args.pdf_path, args.output_dir) + extractor.extract() + + +if __name__ == "__main__": + main() diff --git a/content_gen/scripts/extraction/pdf_extract_kit_wrapper.py b/content_gen/scripts/extraction/pdf_extract_kit_wrapper.py index 54a5680..c2782ee 100755 --- a/content_gen/scripts/extraction/pdf_extract_kit_wrapper.py +++ b/content_gen/scripts/extraction/pdf_extract_kit_wrapper.py @@ -1,734 +1,6 @@ #!/usr/bin/env python3 -""" -PDF-Extract-Kit Wrapper -Unified interface for PDF extraction using PDF-Extract-Kit AI models -Replaces: smart_extract.py, extract_pdf_content.py, extract_diagram.py -""" -import os -import sys -from pathlib import Path - -# Add PDF-Extract-Kit to path - MUST happen before local imports -KIT_PATH = Path(__file__).parent.parent.parent / "tools" / "PDF-Extract-Kit" -if str(KIT_PATH) not in sys.path: - sys.path.insert(0, str(KIT_PATH)) - -try: - import pdf_extract_kit.tasks # Trigger registration - from pdf_extract_kit.utils.config_loader import load_config, initialize_tasks_and_models - HAS_KIT = True -except (ImportError, ModuleNotFoundError): - HAS_KIT = False - initialize_tasks_and_models = None - print("⚠️ PDF-Extract-Kit not found. Extraction features using this engine will be disabled.") -import json -import re -import fitz -from typing import Dict, List, Optional, Callable - - -class PDFExtractKitWrapper: - """ - Wrapper for PDF-Extract-Kit that provides a simple interface - compatible with the old smart_extract.py output format - """ - def __init__( - self, - pdf_path: Optional[str] = None, - output_dir: Optional[str] = None, - use_gpu: bool = False, - min_question_number: int = 1, - max_question_number: Optional[int] = None, - question_detection_mode: str = "balanced", - extraction_noise_patterns: Optional[List[str]] = None, - ): - """ - Initialize PDF extractor - - Args: - pdf_path: Optional path to PDF file - output_dir: Optional output directory - use_gpu: Whether to use GPU for models - """ - self.use_gpu = use_gpu - self.pdf_path = pdf_path - self.output_dir = Path(output_dir) if output_dir else None - self.min_question_number = min_question_number - self.max_question_number = max_question_number - self.question_detection_mode = question_detection_mode - # Configurable boilerplate regexes (see edmate_config extraction_noise_patterns) - if extraction_noise_patterns is None: - from content_gen.core.config_schema import DEFAULT_EXTRACTION_NOISE_PATTERNS - - self.extraction_noise_patterns = list(DEFAULT_EXTRACTION_NOISE_PATTERNS) - else: - self.extraction_noise_patterns = list(extraction_noise_patterns) - - # Determine working directory or use provided output_dir - if output_dir: - self.output_dir = Path(output_dir) - else: - # Default to data/extracted relative to the content_gen root - script_path = Path(__file__).parent.absolute() - self.output_dir = script_path.parent.parent / "data" / "extracted" - if self.pdf_path: - self.base_name = Path(self.pdf_path).stem - # Create PDF-specific subfolder for images - self.images_dir = self.output_dir / "images" / self.base_name - self.images_dir.mkdir(parents=True, exist_ok=True) - - # Sub-folder for processed text files (relative to data root) - self.outputs_dir = self.output_dir.parent / "outputs" - self.outputs_dir.mkdir(parents=True, exist_ok=True) - else: - self.base_name = None - self.images_dir = None - self.outputs_dir = None - - # Initialize PDF-Extract-Kit models - self._init_models() - - def _init_models(self): - """Initialize PDF-Extract-Kit AI models""" - if not HAS_KIT: - print("❌ Cannot initialize models: PDF-Extract-Kit not found in tools/") - self.layout_detector = None - return - if initialize_tasks_and_models is None: - print("❌ Cannot initialize models: config loader is unavailable") - self.layout_detector = None - return - - device = "cuda" if self.use_gpu else "cpu" - # Configuration for layout detection - config = { - "tasks": { - "layout_detection": { - "model": "layout_detection_yolo", - "model_config": { - "img_size": 1280, - "conf_thres": 0.25, - "iou_thres": 0.45, - "model_path": str(KIT_PATH / "models/Layout/YOLO/doclayout_yolo_ft.pt"), - "device": device - } - } - } - } - - print("🤖 Initializing PDF-Extract-Kit AI models...") - task_instances = initialize_tasks_and_models(config) - self.layout_detector = task_instances["layout_detection"] - print("✅ Models loaded successfully") - - def extract(self, progress_callback: Optional[Callable[[int, str], None]] = None) -> Dict: - """ - Extract questions and diagrams from PDF - - Returns: - Dictionary with structure: - { - "source": "path/to/pdf", - "questions": [ - { - "question_number": 1, - "page": 1, - "stem_images": ["q1_stem.png"], - "option_images": {"A": ["q1_opt_A.png"], ...} - } - ] - } - """ - if not HAS_KIT: - raise RuntimeError("Extraction failed: PDF-Extract-Kit is not installed or found in tools/") - if not self.pdf_path: - raise ValueError("pdf_path must be set before calling extract()") - if not self.output_dir or not self.base_name or not self.images_dir or not self.outputs_dir: - raise ValueError( - "output_dir/base_name/images_dir/outputs_dir must be initialized before calling extract()" - ) - if self.layout_detector is None: - raise RuntimeError("Layout detector is not initialized") - - doc = fitz.open(self.pdf_path) - all_questions = [] - - print(f"📄 Processing: {self.pdf_path}") - print(f" Pages: {len(doc)}") - - if progress_callback: - progress_callback(25, "Extracting diagrams and images via Vision AI...") - - last_q_num = None - for page_num in range(len(doc)): - page = doc[page_num] - questions_on_page, last_q_num = self._process_page(page, page_num + 1, doc, last_q_num) - all_questions.extend(questions_on_page) - - doc.close() - - if progress_callback: - progress_callback(45, "Parsing text and layout structures...") - - merged_questions = self._merge_questions(all_questions) - output = { - "source": self.pdf_path, - "base_name": self.base_name, - "questions": merged_questions, - "raw_questions": all_questions, - "total_questions": len(merged_questions), - "extraction_settings": { - "min_question_number": self.min_question_number, - "max_question_number": self.max_question_number, - "question_detection_mode": self.question_detection_mode, - } - } - - # Save JSON - json_path = self.output_dir / f"{self.base_name}_extracted.json" - with open(json_path, 'w') as f: - json.dump(output, f, indent=2) - - # Generate standard processed text file in data/outputs - self._generate_processed_text(output) - - print(f"\n✅ Extraction complete!") - print(f" Questions: {len(merged_questions)}") - print(f" JSON: {json_path}") - print( - f" Text Output: {self.outputs_dir / f'{self.base_name}_processed.txt'}") - print( - f" Images: {self.images_dir} ({len(list(self.images_dir.glob('*.png')))} files)") - - return output - - def extract_questions(self, source_path: str, output_dir: str, progress_callback: Optional[Callable[[int, str], None]] = None) -> Dict: - """ - Adapter-compatible extraction method - """ - self.pdf_path = source_path - self.output_dir = Path(output_dir) - self.base_name = Path(source_path).stem - - # Create PDF-specific subfolder for images - self.images_dir = self.output_dir / "images" / self.base_name - self.images_dir.mkdir(parents=True, exist_ok=True) - - # Sub-folder for processed text files (relative to data root) - self.outputs_dir = self.output_dir.parent / "outputs" - self.outputs_dir.mkdir(parents=True, exist_ok=True) - - return self.extract(progress_callback=progress_callback) - - def _process_page(self, page, page_num: int, doc, last_q_num: Optional[int] = None) -> tuple[List[Dict], Optional[int]]: - """ - Process a single page using span-level partitioning and coordinate mapping. - Returns (list of question fragments, updated last_q_num). - """ - # Detect question numbers with their Y positions - question_positions = self._detect_question_numbers_with_positions(page) - - # If no questions on this page, but we have a last_q_num from previous page, - # treat the entire page as a continuation of that question. - if not question_positions: - if last_q_num: - # Use a dummy position for the whole page - question_positions = [(last_q_num, 0)] - else: - # Still no starting point, likely a cover page or instructions - return [], None - - questions = {} - new_last_q_num = last_q_num - - for q_num, _ in question_positions: - if self._is_valid_question_number(q_num): - questions[q_num] = { - "question_number": q_num, - "page": page_num, - "question_text": "", - "options": {"A": "", "B": "", "C": "", "D": ""}, - "stem_images": [], - "option_images": {} - } - new_last_q_num = q_num - - # 1. Collect all spans - all_spans = [] - text_dict = page.get_text("dict") - for block in text_dict["blocks"]: - if "lines" in block: - for line in block["lines"]: - for span in line["spans"]: - if not span["text"].strip(): - continue - all_spans.append(span) - - # 2. Group spans into questions - spans_by_question = {q: [] for q in questions} - for span in all_spans: - y_mid = (span["bbox"][1] + span["bbox"][3]) / 2 - q_num = self._assign_to_question( - y_mid, question_positions, page_num) - if q_num and q_num in spans_by_question: - spans_by_question[q_num].append(span) - - # 3. Process each question - for q_num, spans in spans_by_question.items(): - if not spans: - continue - - # Group into Visual Lines - spans.sort(key=lambda s: (s["bbox"][1] + s["bbox"][3]) / 2) - visual_lines = [] - if spans: - current_line = [spans[0]] - for s in spans[1:]: - last_y_mid = ( - current_line[-1]["bbox"][1] + current_line[-1]["bbox"][3]) / 2 - curr_y_mid = (s["bbox"][1] + s["bbox"][3]) / 2 - if abs(curr_y_mid - last_y_mid) < 9: - current_line.append(s) - else: - visual_lines.append(current_line) - current_line = [s] - visual_lines.append(current_line) - - current_field = "question_text" - - for vline in visual_lines: - vline.sort(key=lambda s: s["bbox"][0]) - line_main_size = max(s["size"] for s in vline) - line_baselines = [s["bbox"][1] for s in vline if abs( - s["size"] - line_main_size) < 0.5] - line_avg_baseline = sum( - line_baselines) / len(line_baselines) if line_baselines else vline[0]["bbox"][1] - - # Robust Marker Detection (A-D) - marker_indices = [] - for i, span in enumerate(vline): - txt = span["text"].strip().rstrip('.') - font = span["font"].lower() - x = span["bbox"][0] - # Markers are Bold A-D at specific columns - known_cols = [70, 81, 170, 181, 270, 281, 370, 381] - is_bold = "bold" in font or "bold" in span.get( - "flags_str", "").lower() - if txt in ["A", "B", "C", "D"] and is_bold and any(abs(x - c) < 15 for c in known_cols): - marker_indices.append((i, txt)) - - if marker_indices: - # Handle text before first marker - if marker_indices[0][0] > 0: - prefix_text = self._reconstruct_line_text( - vline[0:marker_indices[0][0]], line_avg_baseline, line_main_size) - prefix_text = self._clean_noise(prefix_text) - if prefix_text: - if current_field == "question_text": - questions[q_num]["question_text"] += " " + \ - prefix_text - else: - questions[q_num]["options"][current_field] += " " + \ - prefix_text - - for m_idx in range(len(marker_indices)): - start_idx, opt_letter = marker_indices[m_idx] - end_idx = marker_indices[m_idx+1][0] if m_idx + \ - 1 < len(marker_indices) else len(vline) - opt_text = self._reconstruct_line_text( - vline[start_idx+1:end_idx], line_avg_baseline, line_main_size) - opt_text = self._clean_noise(opt_text) - questions[q_num]["options"][opt_letter] += " " + opt_text - current_field = opt_letter - else: - line_text = self._reconstruct_line_text( - vline, line_avg_baseline, line_main_size) - line_text = self._clean_noise(line_text) - if line_text: - if current_field == "question_text": - if not questions[q_num]["question_text"]: - line_text = re.sub( - r'^\d+[\.\s]*', '', line_text) - questions[q_num]["question_text"] += " " + line_text - else: - questions[q_num]["options"][current_field] += " " + line_text - - # Handle images - layout_detector = self.layout_detector - if layout_detector is None: - raise RuntimeError("Layout detector is not initialized") - images_dir = self.images_dir - if images_dir is None: - raise ValueError("images_dir must be initialized before processing page images") - pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) - temp_img_path = images_dir / f"_temp_page_{page_num}.png" - pix.save(str(temp_img_path)) - results = layout_detector.predict_images( - str(temp_img_path), str(images_dir)) - layout_result = results[0] - boxes = layout_result.boxes - - for box in boxes: - cls = int(box.cls[0]) - xyxy = box.xyxy[0].tolist() - pdf_bbox = [c / 2 for c in xyxy] - y_mid = (pdf_bbox[1] + pdf_bbox[3]) / 2 - type_name = layout_detector.model.id_to_names.get( - cls, "unknown") - - if type_name in ["figure", "table", "isolate_formula"]: - q_num = self._assign_to_question( - y_mid, question_positions, page_num) - if q_num and q_num in questions: - img_path = self._extract_bbox_image( - page, pdf_bbox, q_num, type_name) - questions[q_num]["stem_images"].append(str(img_path)) - temp_img_path.unlink() - - # Cleanup - for q in questions.values(): - q["question_text"] = q["question_text"].strip() - q["question_text"] = re.sub( - r'^(\d+[\.\s]*)+', '', q["question_text"]) - for opt in q["options"]: - # Clean up options and remove trailing artifacts like single digits or underscores - val = q["options"][opt].strip() - val = re.sub(r'\s+[\d_]$', '', val) - q["options"][opt] = val - - return list(questions.values()), new_last_q_num - - def _clean_noise(self, text: str) -> str: - """Filter global noise and map symbols from reconstructed text parts""" - # Symbol mapping for Wingdings-style private-use glyphs often emitted by PDF text layers - symbol_map = { - "\uf070": "π", - "\uf061": "α", - "\uf062": "β", - "\uf067": "γ", - "\uf044": "Δ", - "\uf0b0": "°", - "\uf0b1": "±", - "\uf0e6": "(", - "\uf0f6": ")", - "\uf0e7": "[", - "\uf0f7": "]", - "\uf03d": "=", - "\uf02b": "+", - "\uf02d": "–", - "\uf057": "Ω", - "\uf0b8": "÷", - } - for code, char in symbol_map.items(): - text = text.replace(code, char) - - # Paper codes and Cambridge footers - text = re.sub(r'\d{4}/\d{2}/\w+/\d{2}', '', text) - text = re.sub(r'© UCLES.*', '', text, flags=re.I) - text = re.sub(r'\[Turn over', '', text, flags=re.I) - - for pattern in self.extraction_noise_patterns: - if pattern: - text = re.sub(pattern, '', text, flags=re.I | re.DOTALL) - - return text.strip() - - def _reconstruct_line_text(self, spans: List[Dict], avg_baseline: float, main_size: float) -> str: - """Helper to reconstruct text with markup from a list of spans on one line""" - if not spans: - return "" - parts = [] - for span in spans: - text = span["text"] - size = span["size"] - top = span["bbox"][1] - - if size < main_size * 0.9: - if top < avg_baseline - 1: - parts.append(f"^{text}") - elif top > avg_baseline + 1: - parts.append(f"_{text}") - else: - parts.append(text) - else: - parts.append(text) - return "".join(parts).strip() - - def _generate_processed_text(self, output_data: Dict): - """Generate the standard processed text file in data/outputs following prompts.py""" - outputs_dir = self.outputs_dir - base_name = self.base_name - if outputs_dir is None or base_name is None: - raise ValueError("outputs_dir and base_name must be initialized before generating processed text") - text_path = outputs_dir / f"{base_name}_processed.txt" - - sorted_qs = sorted( - output_data.get("questions", []), - key=lambda x: x["question_number"] - ) - - with open(text_path, 'w', encoding='utf-8') as f: - for q in sorted_qs: - f.write( - f"Question {q['question_number']}Question and Options in Text Format\n\n") - - # Question text - f.write(f"{q['question_text'].strip()}\n\n") - - # Options - opts = q["options"] - # Match the reference format: A. Text B. Text ... - opt_str = f"A. {opts['A']} B. {opts['B']} C. {opts['C']} D. {opts['D']}" - f.write(f"{opt_str.strip()}\n\n") - - f.write("Detailed Explanation of the Question and Right Answer\n\n") - f.write("[EXPLANATION_PLACEHOLDER]\n\n") - f.write("Option Wise Explanation (Detailed)\n\n") - f.write("[OPTION_EXPLANATION_PLACEHOLDER]\n\n") - f.write("### 🧠 Concept Gap Analysis and Flashcards\n\n") - f.write("[FLASHCARDS_PLACEHOLDER]\n\n") - f.write("-" * 50 + "\n\n") - - def _detect_question_numbers_with_positions(self, page) -> List[tuple]: - """ - Detect question numbers and their Y positions - - Args: - page: PyMuPDF page object - - Returns: - List of (question_number, y_position) tuples - """ - import re - - text_dict = page.get_text("dict") - blocks = text_dict.get("blocks", []) - question_positions = [] - - # 1. Identify leftmost possible position on page - min_x = 1000.0 - for block in blocks: - if "lines" in block: - for line in block["lines"]: - min_x = min(min_x, line["bbox"][0]) - - for block_idx, block in enumerate(blocks): - if "lines" in block: - for i, line in enumerate(block["lines"]): - # Get line text - line_text = " ".join(span["text"].strip() for span in line["spans"] if span["text"].strip()) - line_text = line_text.strip() - - # Skip paper codes/footers that look like numbers - if re.search(r'\d{4}/\d{2}/\w+/\d{2}', line_text): - continue - if "© UCLES" in line_text: - continue - - # Check position: Question numbers are typically near the leftmost edge - x_pos = line["bbox"][0] - # Allow up to 50px indentation from the leftmost text element - if x_pos > min_x + 50 and x_pos > 150: - continue - - # Pattern 1: Number + Text/Marker on same line - if self.question_detection_mode == "strict": - marker_pattern = r'^(\d+)\s+([A-Z][a-z]+)' - elif self.question_detection_mode == "open": - marker_pattern = r'^(\d+)[\.\s]*([A-Z\d\(\\]|$)' - else: # balanced - # More inclusive: allow space or dot, and any uppercase or special starter - marker_pattern = r'^(\d+)[\.\s]*([A-Z]|\\|\(|\$|[a-z]{3,})' - - match = re.match(marker_pattern, line_text) - if match: - q_num = int(match.group(1)) - if self._is_valid_question_number(q_num): - y_pos = line["bbox"][1] - question_positions.append((q_num, y_pos)) - continue - - # Pattern 2: Number on separate line (Q1-9 often) - # Allow optional trailing dot - q_num_match = re.match(r'^(\d+)[\.]?$', line_text) - if q_num_match: - q_num = int(q_num_match.group(1)) - if self._is_valid_question_number(q_num): - # Check next line/block for validation (should look like a question) - is_question = False - check_text = "" - if i + 1 < len(block["lines"]): - check_text = " ".join(s["text"] for s in block["lines"][i + 1]["spans"]).strip() - elif block_idx + 1 < len(blocks): - next_block = blocks[block_idx + 1] - if "lines" in next_block and len(next_block["lines"]) > 0: - check_text = " ".join(s["text"] for s in next_block["lines"][0]["spans"]).strip() - - # Looser validation: just needs to NOT be a footer or very short - if len(check_text) > 3: - if not re.search(r'\d{4}/\d{2}/\w+/\d{2}', check_text): - is_question = True - - if is_question: - y_pos = line["bbox"][1] - question_positions.append((q_num, y_pos)) - - # Sort by Y position and de-duplicate by question number (keep first sighting per page). - sorted_positions = sorted(question_positions, key=lambda x: x[1]) - deduped: List[tuple] = [] - seen = set() - for q_num, y_pos in sorted_positions: - if q_num in seen: - continue - seen.add(q_num) - deduped.append((q_num, y_pos)) - return deduped - - def _assign_to_question( - self, - y_pos: float, - question_positions: List[tuple], - page_num: int - ) -> Optional[int]: - """ - Assign a detected element to a question number based on Y position - - Args: - y_pos: Y coordinate of element - question_positions: List of (question_num, y_position) tuples - page_num: Current page number - - Returns: - Question number or None - """ - if not question_positions: - # No questions detected on this page, skip this element - return None - - # Footer Guard: Ignore elements at the very bottom of the page (A4 height is 842pt) - if y_pos > 775: - return None - - # Find the question this element belongs to - # Element belongs to the question above it (closest question with y < element_y) - for i in range(len(question_positions) - 1, -1, -1): - q_num, q_y = question_positions[i] - if y_pos >= q_y: # Element is below this question - return q_num - - # If element is above all questions, treat it as preamble/instruction noise. - return None - - def _extract_bbox_image( - self, - page, - bbox: List[float], - q_num: int, - element_type: str - ) -> Path: - """ - Extract and save image from bounding box - - Args: - page: PyMuPDF page object - bbox: Bounding box [x0, y0, x1, y1] - q_num: Question number - element_type: Type of element (figure, table, formula) - - Returns: - Path to saved image - """ - # Adaptive padding preserves labels/axes around detector boxes. - width = max(1.0, bbox[2] - bbox[0]) - height = max(1.0, bbox[3] - bbox[1]) - pad = max(12.0, min(width, height) * 0.08) - - final_bbox = [ - max(0, bbox[0] - pad), - max(0, bbox[1] - pad), - min(page.rect.width, bbox[2] + pad), - min(page.rect.height, bbox[3] + pad) - ] - - # Generate filename - images_dir = self.images_dir - if images_dir is None: - raise ValueError("images_dir must be initialized before extracting images") - img_name = f"q{q_num}_{element_type}.png" - img_path = images_dir / img_name - - # Extract high-resolution image - pix = page.get_pixmap(matrix=fitz.Matrix(3, 3), - clip=fitz.Rect(final_bbox)) - pix.save(str(img_path)) - - return img_path - - def _merge_questions(self, questions: List[Dict]) -> List[Dict]: - """Merge question fragments across pages into canonical runtime questions.""" - merged: Dict[int, Dict] = {} - for q in questions: - num = q.get("question_number", 0) - if not self._is_valid_question_number(num): - continue - - if num not in merged: - merged[num] = { - "question_number": num, - "page": q.get("page"), - "question_text": (q.get("question_text") or "").strip(), - "options": { - "A": (q.get("options", {}).get("A", "") or "").strip(), - "B": (q.get("options", {}).get("B", "") or "").strip(), - "C": (q.get("options", {}).get("C", "") or "").strip(), - "D": (q.get("options", {}).get("D", "") or "").strip() - }, - "stem_images": list(dict.fromkeys(q.get("stem_images", []) or [])), - "option_images": q.get("option_images", {}) or {} - } - continue - - q_text = (q.get("question_text") or "").strip() - if q_text: - merged[num]["question_text"] = f"{merged[num]['question_text']} {q_text}".strip() - - for opt in ["A", "B", "C", "D"]: - opt_text = (q.get("options", {}).get(opt, "") or "").strip() - if not opt_text: - continue - existing = merged[num]["options"].get(opt, "") - merged[num]["options"][opt] = f"{existing} {opt_text}".strip() - - merged[num]["stem_images"] = list(dict.fromkeys( - merged[num]["stem_images"] + (q.get("stem_images", []) or []) - )) - - return sorted(merged.values(), key=lambda item: item["question_number"]) - - def _is_valid_question_number(self, number: int) -> bool: - """Question number guardrails, configurable per curriculum/run.""" - if number < self.min_question_number: - return False - if self.max_question_number is not None and number > self.max_question_number: - return False - return True - - -def main(): - """CLI entry point""" - import argparse - - parser = argparse.ArgumentParser( - description="Extract questions from PDF using PDF-Extract-Kit") - parser.add_argument("pdf_path", help="Path to PDF file") - parser.add_argument( - "--output-dir", default="content_gen/data/extracted", help="Output directory") - - args = parser.parse_args() - - extractor = PDFExtractKitWrapper(args.pdf_path, args.output_dir) - extractor.extract() - +"""Shim: implementation lives in ``content_gen.scripts.extraction.kit``.""" +from content_gen.scripts.extraction.kit.wrapper import PDFExtractKitWrapper, main # noqa: F401 if __name__ == "__main__": main() diff --git a/content_gen/scripts/processing/upload_to_storage.py b/content_gen/scripts/processing/upload_to_storage.py index b398b21..52557d0 100644 --- a/content_gen/scripts/processing/upload_to_storage.py +++ b/content_gen/scripts/processing/upload_to_storage.py @@ -8,29 +8,57 @@ import json import sys from pathlib import Path -from typing import Dict, List, Tuple +from typing import Dict, List, Optional, Tuple + +from azure.core.credentials import AzureNamedKeyCredential from azure.storage.blob import BlobServiceClient, ContentSettings +def _account_name_from_conn_str(conn: str) -> Optional[str]: + """Parse AccountName from an Azure storage connection string.""" + for seg in conn.split(";"): + seg = seg.strip() + if seg.lower().startswith("accountname="): + return seg.split("=", 1)[1].strip() + return None + + class StorageUploader: def __init__(self): """ Initialize storage uploader with Azure Blob Storage """ - self.client = self._init_client() + self.client, self._blob_account_name = self._init_client() - def _init_client(self): - """Initialize Azure Blob Storage client""" - # Azure Blob Storage configuration + def _init_client(self) -> Tuple[BlobServiceClient, str]: + """Initialize client; return (client, storage account name for public URLs).""" account_name = os.getenv("AZURE_STORAGE_ACCOUNT_NAME") account_key = os.getenv("AZURE_STORAGE_ACCOUNT_KEY") - - if not all([account_name, account_key]): + conn = os.getenv("AZURE_STORAGE_CONNECTION_STRING", "").strip() + + if conn: + client = BlobServiceClient.from_connection_string(conn) + resolved = (account_name or "").strip() or _account_name_from_conn_str(conn) or "" + if not resolved: + raise ValueError( + "Set AZURE_STORAGE_ACCOUNT_NAME, or include AccountName= in " + "AZURE_STORAGE_CONNECTION_STRING, so public blob URLs can be built." + ) + return client, resolved + + if not account_name or not account_key: raise ValueError( - "Missing Azure credentials. Set AZURE_STORAGE_ACCOUNT_NAME, AZURE_STORAGE_ACCOUNT_KEY") - - connection_string = f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};EndpointSuffix=core.windows.net" - return BlobServiceClient.from_connection_string(connection_string) + "Missing Azure credentials. Set AZURE_STORAGE_CONNECTION_STRING, or " + "both AZURE_STORAGE_ACCOUNT_NAME and AZURE_STORAGE_ACCOUNT_KEY." + ) + + name = account_name.strip() + key = account_key.strip() + # Named key credential — avoids a connection-string literal in source + # (scanners flag AccountKey= substrings in repository code). + credential = AzureNamedKeyCredential(name, key) + account_url = f"https://{name}.blob.core.windows.net" + return BlobServiceClient(account_url=account_url, credential=credential), name def upload_file(self, local_path: str, container: str, key: str, retries: int = 3) -> str: """ @@ -71,14 +99,13 @@ def upload_file(self, local_path: str, container: str, key: str, retries: int = def _generate_cdn_url(self, container: str, key: str) -> str: """Generate public CDN URL for uploaded file""" - # Use custom CDN domain if configured, otherwise default blob endpoint - account_name = os.getenv("AZURE_STORAGE_ACCOUNT_NAME") custom_cdn = os.getenv("AZURE_STORAGE_CDN_URL") if custom_cdn: return f"{custom_cdn}/{container}/{key}" - else: - return f"https://{account_name}.blob.core.windows.net/{container}/{key}" + return ( + f"https://{self._blob_account_name}.blob.core.windows.net/{container}/{key}" + ) def upload_directory( self, diff --git a/edmate_config.yaml.example b/edmate_config.yaml.example index 47ad9eb..1a906f2 100644 --- a/edmate_config.yaml.example +++ b/edmate_config.yaml.example @@ -1,6 +1,12 @@ # Portable example — copy to edmate_config.yaml and customize. # Do not use YAML tags like !!python/object; values are plain strings. +# Model routing uses LiteLLM provider/model strings (passed through to LiteLLM). +# Swap to any LiteLLM-compatible route without code changes, for example: +# openai/gpt-4o +# anthropic/claude-3-5-sonnet-20240620 +# azure/gpt-4o +# vertex_ai/gemini-2.5-flash model_routing: extraction: "vertex_ai/gemini-2.5-flash" generation: "vertex_ai/gemini-2.5-flash" diff --git a/qc_viewer/static/automate.html b/qc_viewer/static/automate.html index e508836..db90308 100644 --- a/qc_viewer/static/automate.html +++ b/qc_viewer/static/automate.html @@ -198,6 +198,34 @@

Recent Processing Drafts

Pipeline Settings

+ +
+ +
+
+ +
+
+
Engine
+
Detection
+
Kit
+
Question range
+
Segmentation
+
Default subject
+
Default curriculum
+
Budget cap (USD/day)
+
Model · extraction
+
Model · generation
+
Model · validation
+
+
+
diff --git a/qc_viewer/static/js/automate_ui.js b/qc_viewer/static/js/automate_ui.js index 1038f46..bab6be4 100644 --- a/qc_viewer/static/js/automate_ui.js +++ b/qc_viewer/static/js/automate_ui.js @@ -74,6 +74,34 @@ export const AutomationUI = { const gen = (config.model_routing && config.model_routing.generation) || '—'; footer.textContent = `Pipeline engine: ${engine} · kit: ${kit} · generation model: ${gen}`; } + + const es = config.extraction_settings || {}; + const mr = config.model_routing || {}; + const ws = config.workspace || {}; + const bud = config.budget || {}; + const minQ = es.min_question_number; + const maxQ = es.max_question_number; + const qRange = (minQ != null && maxQ != null) + ? `${minQ}–${maxQ}` + : (minQ != null && (maxQ === null || maxQ === undefined)) + ? `${minQ}–∞ (no max)` + : '—'; + + const setTxt = (id, val) => { + const el = document.getElementById(id); + if (el) el.textContent = val == null || val === '' ? '—' : String(val); + }; + setTxt('cfgEngine', engine); + setTxt('cfgDetection', mode || '—'); + setTxt('cfgKit', kit); + setTxt('cfgQuestionRange', qRange); + setTxt('cfgSegmentation', es.segmentation_preset); + setTxt('cfgSubject', ws.default_subject); + setTxt('cfgCurriculum', ws.default_curriculum); + setTxt('cfgBudget', bud.max_daily_usd != null ? String(bud.max_daily_usd) : '—'); + setTxt('cfgModelExt', mr.extraction); + setTxt('cfgModelGen', mr.generation); + setTxt('cfgModelVal', mr.validation); } catch (e) { console.error('Error loading pipeline config:', e); } @@ -171,6 +199,10 @@ export const AutomationUI = { if (icon) icon.textContent = expanded ? '−' : '+'; }; }); + + document.getElementById('btnRefreshPipelineConfig')?.addEventListener('click', () => { + this.fetchPipelineConfig(); + }); this.setupTooltips(); },