diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 8766e80..80f367b 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -41,6 +41,9 @@ jobs: docker stop bubblog-ai || true docker rm bubblog-ai || true + docker stop bubblog-ai-worker || true + docker rm bubblog-ai-worker || true + docker pull $IMAGE_NAME:latest docker run -d --name bubblog-ai -p 8000:3000 \ @@ -51,4 +54,28 @@ jobs: -e ALGORITHM="${{ secrets.ALGORITHM }}" \ -e EMBED_MODEL="${{ secrets.EMBED_MODEL }}" \ -e CHAT_MODEL="${{ secrets.CHAT_MODEL }}" \ - $IMAGE_NAME:latest \ No newline at end of file + -e REDIS_URL="${{ secrets.REDIS_URL }}" \ + -e REDIS_HOST="${{ secrets.REDIS_HOST }}" \ + -e REDIS_PORT="${{ secrets.REDIS_PORT }}" \ + -e EMBEDDING_QUEUE_KEY="${{ secrets.EMBEDDING_QUEUE_KEY }}" \ + -e EMBEDDING_FAILED_QUEUE_KEY="${{ secrets.EMBEDDING_FAILED_QUEUE_KEY }}" \ + -e EMBEDDING_WORKER_MAX_RETRIES="${{ secrets.EMBEDDING_WORKER_MAX_RETRIES }}" \ + -e EMBEDDING_WORKER_BACKOFF_MS="${{ secrets.EMBEDDING_WORKER_BACKOFF_MS }}" \ + $IMAGE_NAME:latest + + docker run -d --name bubblog-ai-worker \ + -e OPENAI_API_KEY="${{ secrets.OPENAI_API_KEY }}" \ + -e DATABASE_URL="${{ secrets.DATABASE_URL }}" \ + -e SECRET_KEY="${{ secrets.SECRET_KEY }}" \ + -e TOKEN_AUDIENCE="${{ secrets.TOKEN_AUDIENCE }}" \ + -e ALGORITHM="${{ secrets.ALGORITHM }}" \ + -e EMBED_MODEL="${{ secrets.EMBED_MODEL }}" \ + -e CHAT_MODEL="${{ secrets.CHAT_MODEL }}" \ + -e REDIS_URL="${{ secrets.REDIS_URL }}" \ + -e REDIS_HOST="${{ secrets.REDIS_HOST }}" \ + -e REDIS_PORT="${{ secrets.REDIS_PORT }}" \ + -e EMBEDDING_QUEUE_KEY="${{ secrets.EMBEDDING_QUEUE_KEY }}" \ + -e EMBEDDING_FAILED_QUEUE_KEY="${{ secrets.EMBEDDING_FAILED_QUEUE_KEY }}" \ + -e EMBEDDING_WORKER_MAX_RETRIES="${{ secrets.EMBEDDING_WORKER_MAX_RETRIES }}" \ + -e EMBEDDING_WORKER_BACKOFF_MS="${{ secrets.EMBEDDING_WORKER_BACKOFF_MS }}" \ + $IMAGE_NAME:latest node dist/worker/queue-consumer.js diff --git a/TASK.md b/TASK.md index 4623441..a8e2463 100644 --- a/TASK.md +++ b/TASK.md @@ -1,136 +1,67 @@ -## Hybrid Search Upgrade Plan (Working Doc) - -### 1. Current Implementation Snapshot -- `runHybridSearch(question, userId, plan)` (src/services/hybrid-search.service.ts) - - Embeds `[question, ...plan.rewrites]` and runs `findSimilarChunksV2` per embedding. - - Executes `textSearchChunksV2` once using the original question + keywords. - - Merges chunk candidates by `postId:postChunk`, keeps max vector/text score per chunk, min–max normalizes each modality, then fuses via `alpha` blend. - - Returns top `plan.top_k` chunks (capped 10); `plan.limit` ignored here. -- Semantic-only fallback uses same repository call without text blending (`runSemanticSearch`). -- Planner (`generateSearchPlan`) currently emits rewrites/keywords but keyword quality/quantity varies; schema clamps counts post-hoc. -- Category filters from API are not wired into hybrid search; text rewrites are not reused in lexical search; chunk key uses raw text. - -### 2. Pain Points & Gaps -1. **Filtering gaps** – category/time filters partially ignored, final `limit` unused, vector threshold normalization can collapse to zero when max=min. -2. **Keyword quality** – LLM often emits multi-word phrases or duplicates; count not consistently within intended range. -3. **Rewrite redundancy** – All rewrites treated equally; no semantic-distance-aware weighting → aggressive rewrites may be undervalued or noisy ones over-weighted. -4. **Fused scoring sensitivity** – Min–max normalization across union is brittle when modalities have outliers; no similarity-based bonus for high-confidence hits. -5. **Post-level UX** – Current pipeline optimized for RAG chunk retrieval; no reusable API that returns deduplicated post-level hits with pagination. -6. **Observability** – Limited metrics around rewrite effectiveness, keyword usage, or threshold activations. - -### 3. Goals & Guiding Principles -- Preserve strong recall via multi-embedding + lexical hybrid while adding stability and transparency. -- Make rewrite/keyword generation purposeful: enforce concise tokens, staged semantic drift, and maintain question coverage. -- Provide a standalone hybrid search endpoint for user-facing search with post-level results. -- Instrument similarity thresholds and modality contributions to support tuning. - -### 4. Retrieval Quality Enhancements (Track A) - -4.1 **Similarity Threshold Boosting** -- Reuse the existing retrieval bias labels (`lexical`, `balanced`, `semantic`) to derive both `alpha` and default modality thresholds (`sem_boost_threshold`, `lex_boost_threshold`) so planner output stays compact. Defaults (retain current behavior for now): - - `lexical`: `alpha = 0.30`, `sem_boost_threshold = 0.65`, `lex_boost_threshold = 0.80` - - `balanced`: `alpha = 0.50`, `sem_boost_threshold = 0.70`, `lex_boost_threshold = 0.75` - - `semantic`: `alpha = 0.75`, `sem_boost_threshold = 0.80`, `lex_boost_threshold = 0.65` -- Encode the mapping as a single constants table (e.g., `RETRIEVAL_BIAS_PRESETS`) so both planner normalization and hybrid scoring reference identical values. -- Permit optional overrides in `plan.hybrid`, but clamp to sensible bounds (e.g., 0.4–0.85) for consistency. -- When a normalized vector/text score crosses its threshold, apply a bounded boost (e.g., multiply by 1.1–1.3 or add 0.1), log activations, and cap boosts to maintain ranking stability. - -4.2 **Rewrite Strategy & Weighting** -- Update planner prompt to generate staged rewrites: - - `rewrite_1`: conservative paraphrase. - - `rewrite_2`: adds synonymous term / clarifies entity. - - `rewrite_3+`: higher semantic drift or alternative framing. -- After plan normalization (`search-plan.service.ts`): - - Compute embedding-based cosine similarity between original question and each rewrite. - - Drop rewrites below a floor (e.g., <0.35) or route them to lexical-only usage. - - Derive per-rewrite weights (e.g., `weight = clamp(similarity, 0.6, 1.2)`) and supply to `runHybridSearch`. - - Similarity calculations use fresh embedding API calls (no caching) for both the question and rewrites within the request. -- In hybrid service, apply weights when aggregating vector scores (weighted max/avg instead of pure max) so high-quality rewrites contribute proportionally. - -4.3 **Keyword Constraints & Quality** -- Modify `planSchema` / prompt: keywords must be single Korean/English tokens (no spaces), trimmed, 1–5 items. -- In normalization, enforce `.slice(0,5)`, drop tokens <2 chars or containing whitespace/punctuation (except hyphen/underscore if needed). -- Extend text search to run over `[question, ...filtered rewrites]` for lexical recall or compute textual similarity per rewrite (optional v2 step). - -4.4 **Repository/Data Adjustments** -- Update `findSimilarChunksV2` / `textSearchChunksV2` to return `chunk_index`, `post_created_at`, and optionally `post_tags` for downstream boosts. - - Tag aggregation via `post_tag` ⇔ `tag` should be added only if such tables exist; otherwise return `[]` and skip joins. -- Switch dedup key to `${postId}:${chunk_index}` to avoid string-heavy keys. -- Filters wiring: Do NOT add `filters.category_ids` to the plan. Keep the plan schema limited to `filters.time`. - - Use `categoryId` from the controller as a server-side pre-filter only. - - Derive `from/to` from the normalized plan time window (label → absolute) and apply in repositories. - - Respect `plan.limit` at the final slicing stage. -- Keep retrieval as exact KNN on `pgvector` (ORDER BY `<=>`); `top_k` stays per-source fetch size while final slicing respects `plan.limit`. - - - -### 5. Search API & Post-Level Experience (Track B) -- **Service decomposition** – Extract shared primitive `buildHybridCandidates({ question, rewrites, keywords, plan, userId, categoryId })` returning chunk-level scores + metadata + diagnostic stats. -- **Post aggregation** – Create aggregator to deduplicate by post (max score, optional average of top 2, representative snippet) and apply deterministic `limit/offset` pagination (page size default 10, max 10). -- **Public API** – Add unauthenticated REST endpoint (JSON, no SSE) such as `GET /search/hybrid` accepting question, filters, paging params; reuse the planner or a lightweight variant as UX dictates. -- **QA reuse** – `answerStreamV2` continues calling chunk-level layer; search endpoint uses same embeddings/threshold logic to avoid drift. - -### 6. Prompts & Planner Improvements -- Update `buildSearchPlanPrompt` instructions: - - Require keywords to be single words, explicitly request “1~5 단일 키워드”. - - Outline staged rewrite roles to nudge LLM output. - - Remind that temporal expressions stay in `filters.time`. -- Keep the client-facing `planSchema` minimal (no explicit `alpha`/threshold/weight fields). Server derives weights/thresholds internally from the retrieval bias label and does not surface them to the frontend. -- Update schema docs only for keyword bounds (1–5) and any internal validation notes; no additional fields are exposed over the API. -- In normalization, log keyword count, rewrite count, threshold values to support telemetry. - -### 7. Observability & Telemetry -- Structured logs/SSE events: - - For each query: number of rewrites retained, similarity weights, threshold boosts triggered, counts per modality. - - Emit metrics for search endpoint (total posts returned, pagination info, latency). -- Standardize a log payload (e.g., `type: 'retrieval.boost', bias, alpha, sem_thr, lex_thr, modality, original_score, boosted_score`) to simplify analysis and tuning. -- Add debug flags to inspect per-rewrite vector/text hit lists for evaluation. - -### 8. Performance Considerations -- Generate embeddings for `[question, rewrites]` with fresh API calls per request (no caching); accept the additional cost for correctness. -- Cap total vector queries by `plan.hybrid.max_rewrites`; consider batching embeddings via OpenAI API if supported. -- Monitor effect of threshold boosts on latency; adjust SQL to prefetch needed metadata in single round-trip. - -### 9. Execution Roadmap (Detailed) - -**Phase 0 – Foundations & Bugfixes** -- Task 0.1: Thread request filters (`categoryId`, `limit`) through `qa.v2.service.ts`. Do NOT add `filters.category_ids` to the plan; the server applies `categoryId` as a pre-filter. Derive `from/to` solely from the normalized plan `filters.time` (label → absolute) and use in repositories. -- Task 0.2: Honor `plan.limit` when returning hybrid results, switch dedupe key to `${postId}:${chunk_index}`, and propagate `chunk_index` through types. -- Task 0.3: Expand `findSimilarChunksV2`/`textSearchChunksV2` to select `chunk_index`, `post_created_at`, and optionally aggregated `post_tags` (only if tag tables exist); update SQL joins and DTOs with safe fallbacks. -- Task 0.4: Update hybrid/semantic services to surface new metadata in SSE payloads, keeping backward compatibility for existing clients. - -**Phase 1 – Planner & Prompt Hardening** -- Task 1.1: Tighten `planSchema` validation (keywords 1–5 single tokens, rewrites ≤ max_rewrites) and normalize via shared helpers with telemetry hooks. -- Task 1.2: Revise `buildSearchPlanPrompt` instructions to enforce staged rewrites, single-token keywords, and explicit temporal guidance; add regression fixtures for prompt drift. -- Task 1.3: Implement normalization pass that cleans keywords, generates embeddings for rewrites, filters low-similarity variants, and records per-rewrite cosine similarity. -- Task 1.4: Persist summary logs (`rewrites_len`, similarity weights, keyword counts) via structured logger for observability. - -**Phase 2 – Retrieval Scoring Upgrades** -- Task 2.1: Introduce `RETRIEVAL_BIAS_PRESETS` mapping (`alpha`, `sem_boost_threshold`, `lex_boost_threshold`) and clamp overrides in normalization. -- Task 2.2: Apply threshold-based boosts in `runHybridSearch`, logging activations and capping final scores for stability. -- Task 2.3: Weight vector scores by rewrite similarity (e.g., weighted max/avg) and expose diagnostics per rewrite. -- Task 2.4: Extend lexical search to iterate across `[question, rewrites]`, merging results while respecting keyword filters and avoiding redundant queries. -- Task 2.5: Enforce post-level diversity (max N chunks/post) before final ranking and respect `plan.limit` after fusion. - -**Phase 3 – Search API Delivery** -- Task 3.1: Extract `buildHybridCandidates` service returning chunk-level hits plus diagnostics; retrofit QA flow to consume it. -- Task 3.2: Build post aggregation layer (score fusion, snippet selection, pagination respecting `limit/offset`) with deterministic ordering. -- Task 3.3: Add `GET /search/hybrid` route, request validation, and integration tests covering filters, pagination, and telemetry events. -- Task 3.4: Document API usage and ensure rate-limiting/auth hooks match product requirements. - -**Phase 4 – Tuning & Observability** -- Task 4.1: Emit structured SSE/log events for threshold boosts, rewrite weighting, keyword pruning, and modality contributions. -- Task 4.2: Backfill dashboards or log queries (e.g., BigQuery/Redash) to monitor latency, hit counts, and boost frequency. -- Task 4.3: Create evaluation playbook with canonical queries, offline regression scripts, and guidance for tuning boost factors. -- Task 4.4: Investigate alternative fusion strategies (RRF/z-score) gated behind feature flags for safe experimentation. - -### 10. Open Questions -- Do we need separate planner settings for public search vs QA (e.g., higher keyword count)? -- Should rewrite weights persist back into plan schema for transparency to the client? -- What default boost factors strike best balance between recall and precision? Requires offline eval. - ---- -Use this document as the anchor before implementation; update sections as design decisions finalize or metrics inform threshold choices. - -### 11. Backlog -- Normalization stability (min–max collapse): evaluate mitigations without immediate implementation. Candidates include constant fallback (e.g., 0.5), epsilon guards, rank-based fusion (RRF), z-score fusion, unimodal fallback, and telemetry for activation frequency. +## Redis 기반 임베딩 큐 도입 계획 + +### 1. 적합성 검토 +- 기존 Node.js 임베딩 API는 유지하면서도 Redis 큐를 사이에 두면 Spring Boot → Node.js 간의 느슨한 결합과 재시도를 확보할 수 있음. +- Spring Boot 프로듀서는 이미 Redis LPUSH 로직을 보유하고 있어 추가 개발 부담이 낮음. +- Node.js 컨슈머는 BRPOP 기반 무한 루프로 구현 가능하며, 현재 OpenAI 임베딩 호출 흐름과 자연스럽게 연결됨. +- Redis 리스트는 선입선출 특성을 제공하고, 장애 시 실패 큐(`embedding:failed`)로 분리하여 운영팀이 모니터링/재처리하기 용이함. +- 고가용성 Redis 인프라가 전제돼야 하며, 큐 적체/중복 처리에 대한 모니터링과 알람 체계가 필요함. + +### 2. 아키텍처 개요 +``` +[Spring Boot] → LPUSH → [Redis List embedding:queue] → BRPOP → [Node.js Consumer] → OpenAI 임베딩 → DB 저장 + ↘ 실패 시 LPUSH embedding:failed +``` + +### 3. 구현 계획 +1. **컨슈머 워커 초안 작성** + - `services/embedding.service.ts` 를 호출하는 `processEmbeddingQueue` 모듈 작성. + - Graceful shutdown, Concurrency(동시 워커 수) 옵션, 로깅(성공/실패/처리시간)을 포함. +2. **큐 메시지 스키마 확정** + - `post_id`, `title`, `content`, `retryCount` 등을 포함하는 JSON 구조 정의 및 문서화. + - 추후 schema 변경 대비 버전 필드 도입 검토. +3. **실패 처리 및 재시도 정책** + - 실패 시 `embedding:failed` 로 이동 후 경고 로그 기록. + - 재시도 워커(주기적 RPOP → LPUSH) 또는 Ops 수동 트리거 전략 결정. +4. **운영 모니터링** + - `LLEN embedding:queue`, `embedding:failed` 메트릭을 Prometheus/Grafana 또는 기존 모니터링에 연동. + - 알람 기준: 큐 길이 임계치, 실패 큐 누적, 워커 미응답. +5. **배포 전략** + - Node.js 컨슈머를 기존 서버 프로세스와 분리( Docker 컨테이너)하여 독립 운영. + - Spring Boot 측은 이미 구현된 LPUSH 로직을 활성화하고, 기존 REST 임베딩 호출은 점진적으로 감축. + +### 4. 추가 고려사항 +- 멱등성 확보를 위해 컨슈머 처리 완료 후 Redis 측에서 메시지를 제거했는지(이미 BRPOP 로 제거) 확인하고, 실패 재처리 시 중복 삽입 방지 로직 검토. +- OpenAI API 호출 실패 시 exponential backoff 적용 여부. +- 긴 콘텐츠 임베딩 시 chunking 로직(`chunkText`)과 큐 메시지 크기 제한 검토. +- 보안: Redis 접근 제어, TLS 필요 여부 확인. + +### 5. 다음 단계 +- [x] Node.js 컨슈머 초안 코드 작성 및 환경 변수(`REDIS_URL`, `EMBEDDING_QUEUE_KEY`) 정리. +- [ ] 개발 환경에서 Redis 로컬 인스턴스와 통합 테스트 진행. +- [ ] 모니터링/알람 구성 논의. + +### 6. 컨테이너/배포 설계 +- **기본 이미지 재사용**: 기존 `Dockerfile` 로 빌드한 동일 이미지를 `api`(Express)와 `worker`(컨슈머)가 공유, 각 컨테이너는 `command` 만 다르게 지정. +- **엔트리포인트 분리**: `src/worker/queue-consumer.ts` 추가 → `tsc` 결과가 `dist/worker/queue-consumer.js` 로 생성되도록 빌드 경로 확인. `package.json` 에 `worker` 스크립트(`node dist/worker/queue-consumer.js`) 등록. +- **Docker Compose 초안** + ```yaml + services: + api: + build: . + command: ["node", "dist/server.js"] + ports: ["3000:3000"] + env_file: .env + depends_on: [redis] + + worker: + build: . + command: ["node", "dist/worker/queue-consumer.js"] + env_file: .env + depends_on: [redis] + + redis: + image: redis:7-alpine + ``` +- **환경 변수 공유**: `.env` 에 Redis 접속 정보(`REDIS_HOST`, `REDIS_PORT`, `REDIS_URL` 등)와 큐 이름, 실패 큐 이름 등을 명시하고 두 서비스 모두 로드. +- **운영 고려**: `worker` 컨테이너 스케일 아웃(예: `docker compose up --scale worker=3`)에 대비해 작업 멱등성 확인. 장애 시 개별 컨테이너 재시작 전략, 로그 수집 경로(예: stdout→EFK) 정의. diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..545ad02 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,19 @@ +version: "3.9" + +services: + api: + build: + context: . + command: ["node", "dist/server.js"] + env_file: + - .env + ports: + - "3000:3000" + restart: unless-stopped + worker: + build: + context: . + command: ["node", "dist/worker/queue-consumer.js"] + env_file: + - .env + restart: unless-stopped diff --git a/docs/history-tasks/HybridSearchUpgradePlan.md b/docs/history-tasks/HybridSearchUpgradePlan.md new file mode 100644 index 0000000..4623441 --- /dev/null +++ b/docs/history-tasks/HybridSearchUpgradePlan.md @@ -0,0 +1,136 @@ +## Hybrid Search Upgrade Plan (Working Doc) + +### 1. Current Implementation Snapshot +- `runHybridSearch(question, userId, plan)` (src/services/hybrid-search.service.ts) + - Embeds `[question, ...plan.rewrites]` and runs `findSimilarChunksV2` per embedding. + - Executes `textSearchChunksV2` once using the original question + keywords. + - Merges chunk candidates by `postId:postChunk`, keeps max vector/text score per chunk, min–max normalizes each modality, then fuses via `alpha` blend. + - Returns top `plan.top_k` chunks (capped 10); `plan.limit` ignored here. +- Semantic-only fallback uses same repository call without text blending (`runSemanticSearch`). +- Planner (`generateSearchPlan`) currently emits rewrites/keywords but keyword quality/quantity varies; schema clamps counts post-hoc. +- Category filters from API are not wired into hybrid search; text rewrites are not reused in lexical search; chunk key uses raw text. + +### 2. Pain Points & Gaps +1. **Filtering gaps** – category/time filters partially ignored, final `limit` unused, vector threshold normalization can collapse to zero when max=min. +2. **Keyword quality** – LLM often emits multi-word phrases or duplicates; count not consistently within intended range. +3. **Rewrite redundancy** – All rewrites treated equally; no semantic-distance-aware weighting → aggressive rewrites may be undervalued or noisy ones over-weighted. +4. **Fused scoring sensitivity** – Min–max normalization across union is brittle when modalities have outliers; no similarity-based bonus for high-confidence hits. +5. **Post-level UX** – Current pipeline optimized for RAG chunk retrieval; no reusable API that returns deduplicated post-level hits with pagination. +6. **Observability** – Limited metrics around rewrite effectiveness, keyword usage, or threshold activations. + +### 3. Goals & Guiding Principles +- Preserve strong recall via multi-embedding + lexical hybrid while adding stability and transparency. +- Make rewrite/keyword generation purposeful: enforce concise tokens, staged semantic drift, and maintain question coverage. +- Provide a standalone hybrid search endpoint for user-facing search with post-level results. +- Instrument similarity thresholds and modality contributions to support tuning. + +### 4. Retrieval Quality Enhancements (Track A) + +4.1 **Similarity Threshold Boosting** +- Reuse the existing retrieval bias labels (`lexical`, `balanced`, `semantic`) to derive both `alpha` and default modality thresholds (`sem_boost_threshold`, `lex_boost_threshold`) so planner output stays compact. Defaults (retain current behavior for now): + - `lexical`: `alpha = 0.30`, `sem_boost_threshold = 0.65`, `lex_boost_threshold = 0.80` + - `balanced`: `alpha = 0.50`, `sem_boost_threshold = 0.70`, `lex_boost_threshold = 0.75` + - `semantic`: `alpha = 0.75`, `sem_boost_threshold = 0.80`, `lex_boost_threshold = 0.65` +- Encode the mapping as a single constants table (e.g., `RETRIEVAL_BIAS_PRESETS`) so both planner normalization and hybrid scoring reference identical values. +- Permit optional overrides in `plan.hybrid`, but clamp to sensible bounds (e.g., 0.4–0.85) for consistency. +- When a normalized vector/text score crosses its threshold, apply a bounded boost (e.g., multiply by 1.1–1.3 or add 0.1), log activations, and cap boosts to maintain ranking stability. + +4.2 **Rewrite Strategy & Weighting** +- Update planner prompt to generate staged rewrites: + - `rewrite_1`: conservative paraphrase. + - `rewrite_2`: adds synonymous term / clarifies entity. + - `rewrite_3+`: higher semantic drift or alternative framing. +- After plan normalization (`search-plan.service.ts`): + - Compute embedding-based cosine similarity between original question and each rewrite. + - Drop rewrites below a floor (e.g., <0.35) or route them to lexical-only usage. + - Derive per-rewrite weights (e.g., `weight = clamp(similarity, 0.6, 1.2)`) and supply to `runHybridSearch`. + - Similarity calculations use fresh embedding API calls (no caching) for both the question and rewrites within the request. +- In hybrid service, apply weights when aggregating vector scores (weighted max/avg instead of pure max) so high-quality rewrites contribute proportionally. + +4.3 **Keyword Constraints & Quality** +- Modify `planSchema` / prompt: keywords must be single Korean/English tokens (no spaces), trimmed, 1–5 items. +- In normalization, enforce `.slice(0,5)`, drop tokens <2 chars or containing whitespace/punctuation (except hyphen/underscore if needed). +- Extend text search to run over `[question, ...filtered rewrites]` for lexical recall or compute textual similarity per rewrite (optional v2 step). + +4.4 **Repository/Data Adjustments** +- Update `findSimilarChunksV2` / `textSearchChunksV2` to return `chunk_index`, `post_created_at`, and optionally `post_tags` for downstream boosts. + - Tag aggregation via `post_tag` ⇔ `tag` should be added only if such tables exist; otherwise return `[]` and skip joins. +- Switch dedup key to `${postId}:${chunk_index}` to avoid string-heavy keys. +- Filters wiring: Do NOT add `filters.category_ids` to the plan. Keep the plan schema limited to `filters.time`. + - Use `categoryId` from the controller as a server-side pre-filter only. + - Derive `from/to` from the normalized plan time window (label → absolute) and apply in repositories. + - Respect `plan.limit` at the final slicing stage. +- Keep retrieval as exact KNN on `pgvector` (ORDER BY `<=>`); `top_k` stays per-source fetch size while final slicing respects `plan.limit`. + + + +### 5. Search API & Post-Level Experience (Track B) +- **Service decomposition** – Extract shared primitive `buildHybridCandidates({ question, rewrites, keywords, plan, userId, categoryId })` returning chunk-level scores + metadata + diagnostic stats. +- **Post aggregation** – Create aggregator to deduplicate by post (max score, optional average of top 2, representative snippet) and apply deterministic `limit/offset` pagination (page size default 10, max 10). +- **Public API** – Add unauthenticated REST endpoint (JSON, no SSE) such as `GET /search/hybrid` accepting question, filters, paging params; reuse the planner or a lightweight variant as UX dictates. +- **QA reuse** – `answerStreamV2` continues calling chunk-level layer; search endpoint uses same embeddings/threshold logic to avoid drift. + +### 6. Prompts & Planner Improvements +- Update `buildSearchPlanPrompt` instructions: + - Require keywords to be single words, explicitly request “1~5 단일 키워드”. + - Outline staged rewrite roles to nudge LLM output. + - Remind that temporal expressions stay in `filters.time`. +- Keep the client-facing `planSchema` minimal (no explicit `alpha`/threshold/weight fields). Server derives weights/thresholds internally from the retrieval bias label and does not surface them to the frontend. +- Update schema docs only for keyword bounds (1–5) and any internal validation notes; no additional fields are exposed over the API. +- In normalization, log keyword count, rewrite count, threshold values to support telemetry. + +### 7. Observability & Telemetry +- Structured logs/SSE events: + - For each query: number of rewrites retained, similarity weights, threshold boosts triggered, counts per modality. + - Emit metrics for search endpoint (total posts returned, pagination info, latency). +- Standardize a log payload (e.g., `type: 'retrieval.boost', bias, alpha, sem_thr, lex_thr, modality, original_score, boosted_score`) to simplify analysis and tuning. +- Add debug flags to inspect per-rewrite vector/text hit lists for evaluation. + +### 8. Performance Considerations +- Generate embeddings for `[question, rewrites]` with fresh API calls per request (no caching); accept the additional cost for correctness. +- Cap total vector queries by `plan.hybrid.max_rewrites`; consider batching embeddings via OpenAI API if supported. +- Monitor effect of threshold boosts on latency; adjust SQL to prefetch needed metadata in single round-trip. + +### 9. Execution Roadmap (Detailed) + +**Phase 0 – Foundations & Bugfixes** +- Task 0.1: Thread request filters (`categoryId`, `limit`) through `qa.v2.service.ts`. Do NOT add `filters.category_ids` to the plan; the server applies `categoryId` as a pre-filter. Derive `from/to` solely from the normalized plan `filters.time` (label → absolute) and use in repositories. +- Task 0.2: Honor `plan.limit` when returning hybrid results, switch dedupe key to `${postId}:${chunk_index}`, and propagate `chunk_index` through types. +- Task 0.3: Expand `findSimilarChunksV2`/`textSearchChunksV2` to select `chunk_index`, `post_created_at`, and optionally aggregated `post_tags` (only if tag tables exist); update SQL joins and DTOs with safe fallbacks. +- Task 0.4: Update hybrid/semantic services to surface new metadata in SSE payloads, keeping backward compatibility for existing clients. + +**Phase 1 – Planner & Prompt Hardening** +- Task 1.1: Tighten `planSchema` validation (keywords 1–5 single tokens, rewrites ≤ max_rewrites) and normalize via shared helpers with telemetry hooks. +- Task 1.2: Revise `buildSearchPlanPrompt` instructions to enforce staged rewrites, single-token keywords, and explicit temporal guidance; add regression fixtures for prompt drift. +- Task 1.3: Implement normalization pass that cleans keywords, generates embeddings for rewrites, filters low-similarity variants, and records per-rewrite cosine similarity. +- Task 1.4: Persist summary logs (`rewrites_len`, similarity weights, keyword counts) via structured logger for observability. + +**Phase 2 – Retrieval Scoring Upgrades** +- Task 2.1: Introduce `RETRIEVAL_BIAS_PRESETS` mapping (`alpha`, `sem_boost_threshold`, `lex_boost_threshold`) and clamp overrides in normalization. +- Task 2.2: Apply threshold-based boosts in `runHybridSearch`, logging activations and capping final scores for stability. +- Task 2.3: Weight vector scores by rewrite similarity (e.g., weighted max/avg) and expose diagnostics per rewrite. +- Task 2.4: Extend lexical search to iterate across `[question, rewrites]`, merging results while respecting keyword filters and avoiding redundant queries. +- Task 2.5: Enforce post-level diversity (max N chunks/post) before final ranking and respect `plan.limit` after fusion. + +**Phase 3 – Search API Delivery** +- Task 3.1: Extract `buildHybridCandidates` service returning chunk-level hits plus diagnostics; retrofit QA flow to consume it. +- Task 3.2: Build post aggregation layer (score fusion, snippet selection, pagination respecting `limit/offset`) with deterministic ordering. +- Task 3.3: Add `GET /search/hybrid` route, request validation, and integration tests covering filters, pagination, and telemetry events. +- Task 3.4: Document API usage and ensure rate-limiting/auth hooks match product requirements. + +**Phase 4 – Tuning & Observability** +- Task 4.1: Emit structured SSE/log events for threshold boosts, rewrite weighting, keyword pruning, and modality contributions. +- Task 4.2: Backfill dashboards or log queries (e.g., BigQuery/Redash) to monitor latency, hit counts, and boost frequency. +- Task 4.3: Create evaluation playbook with canonical queries, offline regression scripts, and guidance for tuning boost factors. +- Task 4.4: Investigate alternative fusion strategies (RRF/z-score) gated behind feature flags for safe experimentation. + +### 10. Open Questions +- Do we need separate planner settings for public search vs QA (e.g., higher keyword count)? +- Should rewrite weights persist back into plan schema for transparency to the client? +- What default boost factors strike best balance between recall and precision? Requires offline eval. + +--- +Use this document as the anchor before implementation; update sections as design decisions finalize or metrics inform threshold choices. + +### 11. Backlog +- Normalization stability (min–max collapse): evaluate mitigations without immediate implementation. Candidates include constant fallback (e.g., 0.5), epsilon guards, rank-based fusion (RRF), z-score fusion, unimodal fallback, and telemetry for activation frequency. diff --git a/docs/reports/REPORT-embedding-worker.md b/docs/reports/REPORT-embedding-worker.md new file mode 100644 index 0000000..95c13b9 --- /dev/null +++ b/docs/reports/REPORT-embedding-worker.md @@ -0,0 +1,71 @@ +# 보고서: Redis 큐 기반 임베딩 워커 도입 및 배포 구성 + +## 1. 개요 +- 목적: Spring Boot → Redis → Node.js 파이프라인으로 임베딩 생성을 비동기 처리하고, Express API와 분리된 워커를 운영한다. +- 상태: 워커 엔트리포인트·환경 변수 스키마·도커 컴포즈·GitHub Actions 배포 흐름까지 반영 완료. +- 범위: 기존 API 서버 코드는 유지하면서 Redis 큐 소비 로직을 추가하고, 단일 Docker 이미지로 API/워커 컨테이너를 분리 운용한다. + +## 2. 워커 구조 +- 파일: `src/worker/queue-consumer.ts` + - Redis 연결: `REDIS_URL`(우선) 또는 `REDIS_HOST`/`REDIS_PORT`. + - 작업 형식: `{ postId, title?, content?, attempt? }`. + - 처리 순서 + 1. `BRPOP` 으로 `EMBEDDING_QUEUE_KEY` 대기. + 2. 제목(`storeTitleEmbedding`)과 본문(`chunkText` → `createEmbeddings` → `storeContentEmbeddings`) 순차 처리. + 3. 오류 시 재시도: `attempt` 증가, `EMBEDDING_WORKER_MAX_RETRIES`, `EMBEDDING_WORKER_BACKOFF_MS` 기반 backoff, 한계를 넘으면 `EMBEDDING_FAILED_QUEUE_KEY` 로 이동. + - 기타: Graceful shutdown(SIGINT/SIGTERM), DebugLogger 로 주요 이벤트 기록. + +## 3. 환경 변수 (추가 항목) +| 키 | 용도 | 기본값 | +| --- | --- | --- | +| `REDIS_URL` | 외부 Redis 접속 URL (우선 사용) | 없음 | +| `REDIS_HOST` / `REDIS_PORT` | URL 미지정 시 호스트/포트 | `127.0.0.1` / `6379` | +| `EMBEDDING_QUEUE_KEY` | 작업 큐 이름 | `embedding:queue` | +| `EMBEDDING_FAILED_QUEUE_KEY` | 실패 큐 이름 | `embedding:failed` | +| `EMBEDDING_WORKER_MAX_RETRIES` | 최대 재시도 횟수 | `3` | +| `EMBEDDING_WORKER_BACKOFF_MS` | 재시도 간 대기(ms) | `5000` | + +## 4. 도커 이미지 & 실행 +- Dockerfile 기본 CMD: `node dist/server.js` (Express API). +- 동일 이미지를 재사용하되 `docker run ... node dist/worker/queue-consumer.js` 로 커맨드를 오버라이드하면 워커가 실행된다. +- pm2 불필요: 컨테이너 단일 프로세스 가정 + Docker `restart` 정책으로 복구. + +## 5. docker-compose (개발용) +```yaml +services: + api: + build: . + command: ["node", "dist/server.js"] + env_file: [.env] + ports: ["3000:3000"] + restart: unless-stopped + + worker: + build: . + command: ["node", "dist/worker/queue-consumer.js"] + env_file: [.env] + restart: unless-stopped +``` +- 외부 Redis 사용이 기본 전제. 필요 시 개발 환경에서만 Redis 서비스를 추가해 `.env` 를 해당 컨테이너로 지정. + +## 6. GitHub Actions 배포 (main.yml) +- 이미지: `${{ secrets.DOCKER_USERNAME }}/bubblog-ai:latest` 빌드/푸시. +- EC2 배포 단계: + 1. 기존 `bubblog-ai`, `bubblog-ai-worker` 컨테이너 정지/삭제. + 2. 최신 이미지 pull. + 3. API 컨테이너 실행(기본 CMD). + 4. 워커 컨테이너 실행(`node dist/worker/queue-consumer.js` 명령). +- 두 컨테이너 모두 Redis 및 재시도 관련 Secrets를 전달하여 구성 누락을 방지. +- Secrets(예시): `REDIS_URL`, `REDIS_HOST`, `REDIS_PORT`, `EMBEDDING_QUEUE_KEY`, `EMBEDDING_FAILED_QUEUE_KEY`, `EMBEDDING_WORKER_MAX_RETRIES`, `EMBEDDING_WORKER_BACKOFF_MS` 등. + +## 7. 운영 참고 사항 +- Spring Boot 프로듀서는 LPUSH 로 작업을 큐에 적재(이미 구현됨). +- Redis 는 외부 서버/매니지드 환경을 사용; 본 프로젝트 컨테이너에서는 Consumer 역할만 수행. +- 실패 큐(`embedding:failed`) 모니터링 및 재처리(예: RPOP → LPUSH → 재시도 스케줄러) 전략 필요. +- API 컨테이너에서 Redis 변수가 필요하지는 않지만, 비상시 커맨드 오버라이드를 대비해 공통으로 주입해 둔 상태. + +## 8. 향후 체크리스트 +- [ ] 스테이징 환경에서 Redis/DB 연결 및 임베딩 저장 성공 여부 검증. +- [ ] 실패 큐 모니터링/알림 구성. +- [ ] 워커 스케일 아웃 전략 정의 (컨테이너 수 확장 시 처리 충돌 없는지 확인). +- [ ] Redis 접근 제어/TLS 여부 점검. diff --git a/package-lock.json b/package-lock.json index 5814cd7..2df2f37 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,6 +14,7 @@ "cors": "^2.8.5", "dotenv": "^16.4.5", "express": "^4.19.2", + "ioredis": "^5.8.2", "jsonwebtoken": "^9.0.2", "openai": "^4.47.1", "pg": "^8.11.5", @@ -60,6 +61,11 @@ "node": ">=18.0.0" } }, + "node_modules/@ioredis/commands": { + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.4.0.tgz", + "integrity": "sha512-aFT2yemJJo+TZCmieA7qnYGQooOS7QfNmYrzGtsYd3g9j5iDP8AimYYAesf79ohjbLG12XxC4nG5DyEnC88AsQ==" + }, "node_modules/@jridgewell/resolve-uri": { "version": "3.1.2", "resolved": "https://registry.npmjs.org/@jridgewell/resolve-uri/-/resolve-uri-3.1.2.tgz", @@ -512,6 +518,14 @@ "fsevents": "~2.3.2" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/combined-stream": { "version": "1.0.8", "resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.8.tgz", @@ -603,6 +617,14 @@ "node": ">=0.4.0" } }, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "engines": { + "node": ">=0.10" + } + }, "node_modules/depd": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/depd/-/depd-2.0.0.tgz", @@ -1173,6 +1195,29 @@ "resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.4.tgz", "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==" }, + "node_modules/ioredis": { + "version": "5.8.2", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.8.2.tgz", + "integrity": "sha512-C6uC+kleiIMmjViJINWk80sOQw5lEzse1ZmvD+S/s8p8CWapftSaC+kocGTx6xrbrJ4WmYQGC08ffHLr6ToR6Q==", + "dependencies": { + "@ioredis/commands": "1.4.0", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, "node_modules/ipaddr.js": { "version": "1.9.1", "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz", @@ -1282,11 +1327,21 @@ "safe-buffer": "^5.0.1" } }, + "node_modules/lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==" + }, "node_modules/lodash.includes": { "version": "4.3.0", "resolved": "https://registry.npmjs.org/lodash.includes/-/lodash.includes-4.3.0.tgz", "integrity": "sha512-W3Bx6mdkRTGtlJISOvVD/lbqjTlPPUDTMnlXZFnVwi9NKJ6tiAk6LVdlhZMm17VZisqhKcgzpO5Wz91PCt5b0w==" }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==" + }, "node_modules/lodash.isboolean": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/lodash.isboolean/-/lodash.isboolean-3.0.3.tgz", @@ -1772,6 +1827,25 @@ "node": ">=8.10.0" } }, + "node_modules/redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "engines": { + "node": ">=4" + } + }, + "node_modules/redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "dependencies": { + "redis-errors": "^1.0.0" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/safe-buffer": { "version": "5.2.1", "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz", @@ -1958,6 +2032,11 @@ "node": ">= 10.x" } }, + "node_modules/standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==" + }, "node_modules/statuses": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz", diff --git a/package.json b/package.json index f4594d4..f2fec74 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,9 @@ "build": "tsc", "start": "node dist/server.js", "test": "echo \"Error: no test specified\" && exit 1", - "db:migrate:pgtrgm": "psql \"$DATABASE_URL\" -f docs/migrations/2025-01-pgtrgm.sql" + "db:migrate:pgtrgm": "psql \"$DATABASE_URL\" -f docs/migrations/2025-01-pgtrgm.sql", + "worker": "node dist/worker/queue-consumer.js", + "worker:dev": "ts-node src/worker/queue-consumer.ts" }, "keywords": [], "author": "", @@ -29,6 +31,7 @@ "cors": "^2.8.5", "dotenv": "^16.4.5", "express": "^4.19.2", + "ioredis": "^5.8.2", "jsonwebtoken": "^9.0.2", "openai": "^4.47.1", "pg": "^8.11.5", diff --git a/readme.md b/readme.md index fa301f2..6afdbd5 100644 --- a/readme.md +++ b/readme.md @@ -129,3 +129,55 @@ npm run db:migrate:pgtrgm ``` 주의: 인덱스는 쓰기 비용과 디스크 사용량을 증가시킵니다. 텍스트 검색에 사용하는 컬럼(`post_chunks.content`, `blog_post.title`)에만 생성하세요. + +--- + +## 6. Redis 기반 임베딩 워커 + +임베딩 생성 작업을 Redis 큐에 적재한 뒤 Node.js 워커가 안전하게 처리하도록 구성할 수 있습니다. + +### 6.1 환경 변수 + +`.env` 또는 배포 환경 변수를 통해 다음 값을 설정하세요 (필요 시 기본값 사용 가능). + +```env +# Redis 연결 (예: 외부 매니지드 인스턴스) +REDIS_URL=redis://username:password@your-redis-host:6379 +# 또는 REDIS_HOST/REDIS_PORT 조합 사용 (둘 중 하나만 설정) + +# 큐 이름 +EMBEDDING_QUEUE_KEY=embedding:queue +EMBEDDING_FAILED_QUEUE_KEY=embedding:failed + +# 워커 재시도 설정 (선택) +EMBEDDING_WORKER_MAX_RETRIES=3 +EMBEDDING_WORKER_BACKOFF_MS=5000 +``` + +### 6.2 로컬 실행 + +```bash +npm run build # TypeScript 컴파일 +npm run start # Express API +npm run worker # 프로덕션 워커 (빌드 후) +# 또는 개발용 실시간 실행 +npm run worker:dev +``` + +### 6.3 Docker Compose + +`docker-compose.yml` 에는 API와 워커 서비스가 정의되어 있습니다. 외부 Redis 에 접속하도록 `.env` 의 `REDIS_URL` 을 설정한 뒤 아래 명령으로 기동하세요. + +```bash +docker compose up --build +``` + +로컬에서 자체 Redis 컨테이너가 필요하다면 compose 파일에 Redis 서비스를 별도로 추가한 뒤 `REDIS_URL` 또는 `REDIS_HOST/PORT` 를 해당 컨테이너로 지정하세요. + +필요 시 워커만 스케일링할 수도 있습니다. + +```bash +docker compose up --build --scale worker=3 +``` + +Redis 큐 상태는 `redis-cli` 또는 `LLEN embedding:queue` 같은 명령으로 확인하세요. diff --git a/src/config.ts b/src/config.ts index a9d8592..7b02ac9 100644 --- a/src/config.ts +++ b/src/config.ts @@ -21,6 +21,13 @@ const configSchema = z.object({ LLM_COST_ROUND: z.coerce.number().default(4), DEBUG_ALL: z.string().default('false'), DEBUG_CHANNELS: z.string().default(''), + REDIS_URL: z.string().optional(), + REDIS_HOST: z.string().default('127.0.0.1'), + REDIS_PORT: z.coerce.number().default(6379), + EMBEDDING_QUEUE_KEY: z.string().default('embedding:queue'), + EMBEDDING_FAILED_QUEUE_KEY: z.string().default('embedding:failed'), + EMBEDDING_WORKER_MAX_RETRIES: z.coerce.number().default(3), + EMBEDDING_WORKER_BACKOFF_MS: z.coerce.number().default(5000), }); const config = configSchema.parse(process.env); diff --git a/src/worker/queue-consumer.ts b/src/worker/queue-consumer.ts new file mode 100644 index 0000000..a6c64c2 --- /dev/null +++ b/src/worker/queue-consumer.ts @@ -0,0 +1,198 @@ +import Redis from 'ioredis'; +import config from '../config'; +import { + chunkText, + createEmbeddings, + storeContentEmbeddings, + storeTitleEmbedding, +} from '../services/embedding.service'; +import { DebugLogger } from '../utils/debug-logger'; + +type EmbeddingJob = { + postId: number; + title?: string | null; + content?: string | null; + attempt?: number; + metadata?: Record; +}; + +const queueKey = config.EMBEDDING_QUEUE_KEY; +const failedQueueKey = config.EMBEDDING_FAILED_QUEUE_KEY; +const maxRetries = config.EMBEDDING_WORKER_MAX_RETRIES; +const backoffMs = Math.max(0, config.EMBEDDING_WORKER_BACKOFF_MS || 0); + +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + +const redis = + config.REDIS_URL && config.REDIS_URL.length > 0 + ? new Redis(config.REDIS_URL) + : new Redis({ + host: config.REDIS_HOST, + port: config.REDIS_PORT, + }); + +let shuttingDown = false; + +const handleShutdown = (signal: NodeJS.Signals) => { + if (shuttingDown) return; + shuttingDown = true; + DebugLogger.warn('server', { type: 'worker.shutdown', signal }); + try { + redis.disconnect(); + } catch { + // ignore + } + setTimeout(() => process.exit(0), 500).unref(); +}; + +process.on('SIGINT', handleShutdown); +process.on('SIGTERM', handleShutdown); + +const processJob = async (job: EmbeddingJob) => { + const postId = Number(job.postId); + if (!Number.isFinite(postId) || postId <= 0) { + throw new Error('Invalid postId in embedding job'); + } + + const title = typeof job.title === 'string' ? job.title.trim() : ''; + const content = typeof job.content === 'string' ? job.content.trim() : ''; + + if (!title && !content) { + DebugLogger.warn('server', { + type: 'worker.job.skipped', + postId, + reason: 'empty_payload', + }); + return; + } + + if (title) { + await storeTitleEmbedding(postId, title); + } + + if (content) { + const chunks = chunkText(content); + + if (!chunks.length) { + DebugLogger.warn('server', { + type: 'worker.job.skipped', + postId, + reason: 'no_chunks', + }); + return; + } + + const embeddings = await createEmbeddings(chunks); + await storeContentEmbeddings(postId, chunks, embeddings); + } +}; + +const pushToFailedQueue = async (payload: unknown) => { + try { + await redis.lpush( + failedQueueKey, + JSON.stringify({ + failedAt: new Date().toISOString(), + payload, + }) + ); + } catch (error) { + DebugLogger.error('server', { + type: 'worker.failed_queue_error', + message: (error as Error)?.message ?? 'unknown', + }); + } +}; + +const handlePayload = async (rawPayload: string) => { + let job: EmbeddingJob; + try { + job = JSON.parse(rawPayload) as EmbeddingJob; + } catch (error) { + DebugLogger.error('server', { + type: 'worker.job.invalid_json', + error: (error as Error)?.message ?? 'invalid_json', + }); + await pushToFailedQueue({ rawPayload, reason: 'invalid_json' }); + return; + } + + const attempt = Number(job.attempt || 0); + DebugLogger.log('server', { + type: 'worker.job.start', + postId: job.postId, + attempt, + }); + + try { + await processJob(job); + DebugLogger.log('server', { + type: 'worker.job.success', + postId: job.postId, + }); + } catch (error) { + const errorMessage = (error as Error)?.message ?? 'unknown'; + const nextAttempt = attempt + 1; + const enrichedPayload = { + ...job, + attempt: nextAttempt, + lastError: errorMessage, + failedAt: new Date().toISOString(), + }; + + if (nextAttempt < maxRetries) { + DebugLogger.warn('server', { + type: 'worker.job.retry', + postId: job.postId, + attempt: nextAttempt, + error: errorMessage, + }); + if (backoffMs > 0) { + await sleep(Math.min(backoffMs * nextAttempt, backoffMs * 6)); + } + await redis.lpush(queueKey, JSON.stringify(enrichedPayload)); + } else { + DebugLogger.error('server', { + type: 'worker.job.failed', + postId: job.postId, + attempt: nextAttempt, + error: errorMessage, + }); + await pushToFailedQueue(enrichedPayload); + } + } +}; + +const run = async () => { + DebugLogger.info('server', { + type: 'worker.start', + queueKey, + failedQueueKey, + maxRetries, + }); + + while (!shuttingDown) { + try { + const result = await redis.brpop(queueKey, 0); + if (!result || result.length < 2) continue; + const payload = result[1]; + await handlePayload(payload); + } catch (error) { + if (shuttingDown) break; + DebugLogger.error('server', { + type: 'worker.loop_error', + message: (error as Error)?.message ?? 'unknown', + }); + if (backoffMs > 0) { + await sleep(backoffMs); + } + } + } + + DebugLogger.info('server', { type: 'worker.exit' }); +}; + +run().catch((error) => { + console.error('[embedding-worker] fatal error:', error); + process.exit(1); +});