diff --git a/TASK.md b/TASK.md index e69de29..105bb96 100644 --- a/TASK.md +++ b/TASK.md @@ -0,0 +1,93 @@ +## 중복 질문 판별 개선 계획 +- 기존 ask_message_embedding은 단지 이본 질문의 벡터 값만 저장하므로 현재 프로젝트에서 작동중인 맥락 주입 부분에서 불일치가 가능함 +- 최근 2턴(사용자 질문)을 다 불러와 이번 질문과 합쳐 하나의 텍스트 블록으로 만든다. 예: `[Q-2]\n[Q-1]\n[Q-now]`. +- 없을 시 현재 질문만 저장 +- 프롬프트에 주입하는 히스토리도 2턴이므로 캐시 비교 기준과 완전히 일치해 follow-up 질문 반복 시 캐시 정확도가 올라간다. +- 임베딩은 질문당 1회만 추가로 수행되므로 비용 증가는 미미하며, 길이가 길 경우 앞 턴을 줄이는 로직을 헬퍼에서 처리한다. + +### 말투 ID 독립 컬럼 및 마이그레이션 계획 +1. **DB 스키마 변경**: 중복 질문 판별 전용임을 명시하기 위해 `ask_message_embedding`을 `ask_question_cache`(또는 `ask_duplicate_embedding`)으로 리네임한다. 리네임 후 `speech_tone_id integer NOT NULL DEFAULT -1` 컬럼을 추가하고, 기존 레코드는 tone 정보가 없어 재사용 가치가 낮으므로 컬럼 추가 직후 `TRUNCATE` 또는 `DELETE`로 전량 삭제한다. +2. **엔티티/레포지토리 업데이트**: `ask-message-embedding.repository.ts`에서 `MessageEmbedding` 타입과 `upsertEmbedding`/`findSimilarEmbeddings` 결과에 `speechToneId` 필드를 노출한다. +3. **persistConversation 수정**: `session-history.service.ts`에서 `persistConversation` 호출 시 `speechTone` 파라미터를 새로 받아, 메시지 레포지토리에는 아무 변화 없이 `embeddingRepository.upsertEmbedding`에만 전달한다. +4. **캐시 비교 및 재작성 로직**: `findCachedAnswer`가 `speechToneId`를 반환하도록 수정하고, `qa.service.ts`/`qa.v2.service.ts`에서 tone ID 비교 결과에 따라 캐시 재생 또는 tone 재작성 분기를 처리한다. +5. **백필 전략(옵션)**: 추가 데이터를 이관하고 싶다면 별도 배치를 설계해 `speech_tone_id`를 채울 수 있지만, 초기에는 기본값 `-1`을 tone 불명 값으로 삼고 rewrite 플로우를 따른다. +6. **명명 개선 검토**: 테이블 리네임과 컬럼 추가는 같은 마이그레이션에서 처리하고, 관련 코드/SQL 명칭도 일괄 업데이트한다. + +## 캐시 응답 말투 정합성 계획 +1. **목표**: 캐시에서 꺼낸 답변의 말투가 API 요청 값과 일치하면 즉시 재사용하고, 불일치하면 동일 답변을 tone 전용 LLM으로 재작성한 뒤 전달한다. +2. **tone 검증 순서** + - (a) `findCachedAnswer`가 반환한 후보 배열(유사도 기준 정렬)을 순회하며 `speechToneId === 요청 값`인 항목을 찾는다. + - (b) 같은 ID가 있는 경우 해당 후보를 즉시 재생하고, tone 재작성은 생략한다. + - (c) 같은 ID가 하나도 없으면 유사도 1순위 후보를 선택해 `replace-tone.service.ts`에 전달하고, tone만 바꾼 결과를 사용자에게 전송한다. (threshold 미달이면 기존처럼 새 LLM 답변 생성) +3. **replace-tone.service.ts** + - 시그니처: `rewriteTone(answer: string, opts: { speechToneId: number; speechTonePrompt: string; llm?: LlmOverride })`. + - 프롬프트 구성 + - System: "너는 편집자다. 아래 콘텐츠의 의미, 사실, 구조를 훼손하지 말고, 요청된 말투 지시만 반영해 다시 작성해." + - User: ```tone 지시: ${speechTonePrompt} +원문: ${answer}``` + - 모델/프로바이더는 운영 편의를 위해 기존 QA 파이프라인과 동일한 `generate` 래퍼를 그대로 사용한다(즉, Ask 요청에서 선택된 LLM 설정을 재사용). temperature는 0~0.2, max_tokens는 원문 길이와 비슷하게 맞춘다. + - tone 재작성 결과가 비어 있거나 원문과 지나치게 다르면 실패로 간주하고 캐시를 포기한 뒤 RAG/LLM 경로로 폴백한다. +4. **서비스 연동** (`qa.service.ts`, `qa.v2.service.ts`) + - `findCachedAnswer`가 tone ID와 함께 후보 배열을 돌려줄 수 있도록 확장하거나, tone별 우선순위를 반환한다. + - tone 동일 후보가 있으면 기존 `replayCachedAnswer`를 실행한다. + - tone 불일치만 있는 경우엔 `rewriteTone` 호출 후, SSE `answer` 이벤트와 `persistConversation`에 재작성된 텍스트를 사용하고 `speech_tone_id`를 목표 값으로 저장한다. +5. **운영 고려사항**: tone ID의 기본값을 `-1`(unknown)으로 두고, 이 값은 tone 동일 후보 검색에서 매칭되지 않도록 처리한다. 즉, 모든 후보가 `-1`이면 top-1 rewrite 대상으로만 사용된다. + +### 구현 우선순위 및 단계 +1. **중복 질문 판별 개선**: 히스토리 2턴을 합친 텍스트 블록 기반으로 임베딩을 저장하고 캐시 비교에 활용한다. (상단 계획을 먼저 적용) +2. **말투 ID 컬럼 추가**: 위 마이그레이션 계획대로 DB 및 레포지토리를 확장해 tone 정보가 영속되도록 한다. +3. **말투 조정 기능 도입**: `replace-tone.service.ts` 구현과 `replayCachedAnswer` 통합으로 캐시 히트 시 tone 검증/재작성 플로우를 완성한다. +4. **후속 최적화**: tone 컬럼이 채워진 이후에는 tone 일치 여부를 먼저 확인해 tone 분석/재작성 호출을 최소화한다. + +## 상세 구현 설계 +1. **마이그레이션** + - 새 SQL 파일(예: `docs/migrations/2025-XX-ask-question-cache-tone.sql`)을 작성하여 테이블 리네임(`ALTER TABLE ask_message_embedding RENAME TO ask_question_cache;`) → 컬럼 추가(`ADD COLUMN speech_tone_id integer NOT NULL DEFAULT -1;`) → 기존 데이터 삭제(`TRUNCATE ask_question_cache;`)를 순차 진행한다. + - 필요한 경우 `speech_tone_id`에 인덱스(`CREATE INDEX ... ON ask_question_cache(owner_user_id, requester_user_id, speech_tone_id)`)를 추가해 tone 별 검색을 빠르게 한다. +2. **레포지토리 계층** + - `ask-message-embedding.repository.ts` (리네임 후 `ask-question-cache.repository.ts` 고려) + - `MessageEmbedding`/`SimilarMessage` 인터페이스에 `speechToneId: number` 추가. 기본값 `-1`은 별도 상수로 관리한다. + - `upsertEmbedding` INSERT/UPDATE 문에 `speech_tone_id` 컬럼을 포함하고, 매개변수로 tone ID를 받는다. + - `findSimilarEmbeddings` SELECT에 `speech_tone_id AS "speechToneId"`를 추가하고, 반환 타입에 포함. + - `session-history.service.ts` + - `persistConversation` 파라미터에 `speechTone?: number`를 추가하여 assistant 톤을 전달. + - `embeddingRepository.upsertEmbedding` 호출 시 새 tone 값을 전달. + - `findCachedAnswer`가 `speechToneId`를 함께 포함한 후보 배열을 리턴하도록 수정 (ex: `{ answer, searchPlan, retrievalMeta, similarity, speechToneId }`). +3. **서비스 계층 (QA)** + - `qa.service.ts` / `qa.v2.service.ts` + - 캐시 조회 결과를 tone별로 분류: `const matched = candidates.find(c => c.speechToneId === speechTone)`. + - `matched`가 있으면 기존 `replayCachedAnswer(matched)` 실행. + - 없고 후보 배열이 존재하면 `const primary = candidates[0];` 로 선정 후 `replaceTone.rewriteTone(primary.answer, speechTonePrompt)` 호출. + - 재작성 결과 텍스트를 SSE `answer` 이벤트로 흘려보내고, `persistConversation`에 `speechTone` 값을 명시해 저장. + - 재작성 여부를 `DebugLogger`에 남기고, 실패 시 기존 RAG/LLM 경로로 폴백한다. +4. **replace-tone.service.ts** + - 최종 시그니처: `export const rewriteTone = async ( + answer: string, + opts: { speechToneId: number; speechTonePrompt: string; llm?: LlmOverride } + ): Promise`. + - 내부에서 `generate`를 호출하며, 시스템 프롬프트에 "다음 답변의 내용은 유지하고 tone만 아래 지시에 맞춰라" 구조를 사용한다. + - 응답이 비거나 너무 짧으면 실패로 간주하고 오류를 throw. +5. **SSE / 이벤트** + - tone 재작성 시에도 `search_plan`, `context` 이벤트는 캐시된 값 그대로 재생하고 `answer` 이벤트에만 수정된 텍스트를 전송. + - `session_saved` 이벤트에 `cached: true`와 `tone_rewritten: true` (추가 속성) 등을 포함해 프론트에서 구분할 수 있도록 한다. +## 커밋 단위 구현 계획 +1. **마이그레이션 + DB 명명 정리** + - `docs/migrations/2025-XX-ask-question-cache-tone.sql` 추가: 테이블 리네임 → 컬럼 추가 → TRUNCATE → 인덱스 생성. + - `README.md` 등 마이그레이션 가이드에 새 스크립트 실행 방법 추가. + +2. **레포지토리 계층 업데이트** + - (선택) `ask-message-embedding.repository.ts` 파일명을 `ask-question-cache.repository.ts`로 변경하고 import 경로 수정. + - 인터페이스/쿼리에 `speechToneId` 반영, upsert 파라미터에 tone ID 추가. + +3. **세션 히스토리 서비스 수정** + - `persistConversation` 시그니처에 `speechTone?: number` 추가. + - `embeddingRepository.upsertEmbedding` 호출부에 tone 전달. + - `findCachedAnswer` 반환 타입을 tone 정보 포함 배열로 변경. + +4. **QA 서비스 캐시 로직 개편** + - `qa.service.ts`/`qa.v2.service.ts`에서 tone 일치 후보 우선 사용, 불일치 시 `replaceTone` 경로로 분기. + - SSE 이벤트/`persistConversation`에 재작성 결과 및 tone ID 반영. + - DebugLogger 로깅 추가. + +5. **replace-tone.service.ts 신규 추가** + - `rewriteTone` 함수 구현, 프롬프트 템플릿/에러 처리를 포함. + - 필요 시 `qa.prompts.ts`에 tone 전용 프롬프트 자산 추가. diff --git a/docs/history-tasks/ASK_DUPLICATE_CACHE.md b/docs/history-tasks/ASK_DUPLICATE_CACHE.md new file mode 100644 index 0000000..e6ac36f --- /dev/null +++ b/docs/history-tasks/ASK_DUPLICATE_CACHE.md @@ -0,0 +1,46 @@ +# ASK 중복 질문 캐시 & Tone Rewrite 회고 + +이번 작업은 “중복 질문 캐시를 더 똑똑하게 만들고, tone 정합성을 지키면서도 비용을 줄이자”라는 목표로 진행했다. 아래 정리는 과장 없이 우리가 실제로 마주친 문제와 해결 과정, 그리고 그 결과다. + +## 왜 손봤나 +- **Follow-up 질문 품질**: 프롬프트에는 항상 직전 2턴이 붙지만, 캐시는 단일 질문 벡터만 저장했다. follow-up 질문이 들어오면 캐시가 엇나가거나, 유사도 비교 자체가 어렵다. +- **Tone 불일치**: 캐시에서 꺼낸 답변의 말투가 요청 값과 다르면 사용자 경험이 깨진다. tone 정보를 캐시에 같이 저장하지 않으면, 결국 새 LLM 호출을 해야 했다. +- **콘텐츠 정합성**: 사용자가 글을 수정/삭제해 임베딩이 새로 생성될 때, 예전 중복 질문 캐시가 남아 있으면 최신 본문과 어긋난 답변이 튀어나온다. + +## 어떻게 풀었나 + +### 1. 캐시 스키마 + tone 메타 +- `ask_message_embedding`을 `ask_question_cache`로 리네임하고, `speech_tone_id integer NOT NULL DEFAULT -1`을 추가했다. (파일: `docs/migrations/2025-03-ask-question-cache-tone.sql`) +- 마이그레이션 직후 `TRUNCATE`로 tone 정보가 없는 캐시를 비웠다. 덕분에 tone-aware 로직과 충돌하는 레코드가 남지 않았다. + +### 2. 히스토리 2턴을 합쳐 임베딩 +- `buildDuplicateQuestionBlock`(`session-history.service.ts:22`)이 `[Q-2]`, `[Q-1]`, `[Q-now]` 블록을 만들어준다. 길이가 길면 앞선 턴부터 자른다. +- `qa.service.ts:172`, `qa.v2.service.ts:154`에서 `[현재 질문, 중복 질문 블록]`을 동시에 임베딩한다. + - 첫 번째 벡터 → RAG 검색용 + - 두 번째 벡터 → 캐시 저장/조회용 +- `persistConversation`(`session-history.service.ts:68`)이 answer tone과 중복 질문 벡터를 함께 `ask_question_cache`에 upsert한다. + +### 3. tone-aware 캐시 조회 +- `findCachedAnswer`(`session-history.service.ts:146`)가 owner, requester, post, category 조건에 맞는 후보를 tone ID와 함께 돌려준다. +- `selectToneAwareCacheCandidate`(`session-history.service.ts:36`)가 요청 tone과 동일한 후보를 고르고, 없으면 top-1 후보를 rewrite 대상으로 지정한다. +- `qa.service.ts:192`, `qa.v2.service.ts:175`에서 tone이 맞는 캐시는 그대로 재생하고, tone이 다르면 rewrite 경로로 분기한다. + +### 4. `replace-tone.service.ts` 디테일 +- 기존 `generate` 래퍼를 그대로 사용하면서 system/user 프롬프트를 tone 교체에 맞춰 고정했다. +- **Ask v2**: 캐시 → tone mismatch → `rewriteTone`만 호출하므로 LLM 호출 수가 2 → 1로 줄어든다. + **Ask v1**: LLM 호출 수는 동일하지만 tone 재작성은 원문만 넣으니 입력 토큰이 줄어 비용 절감이 된다. +- tone 재작성 실패 시에는 로그를 남기고 RAG 경로로 폴백한다. 성공하면 SSE `answer` 이벤트와 `persistConversation`에 tone ID를 저장한다. + +### 5. 임베딩 워커에서 정합성 보장 +- 포스트 임베딩 작업(수정/삭제 포함)이 끝나면 `queue-consumer.ts`가 `deleteEmbeddingsByOwner`(`ask-question-cache.repository.ts:166`)를 호출한다. +- 그 사용자에 대한 중복 질문 캐시가 전부 지워져서, 새로운 임베딩과 캐시가 항상 같은 시점을 바라보게 된다. 이 정합성 덕분에 “본문은 최신인데 캐시는 옛날 기록” 같은 상황을 확실하게 막았다. + +## 운영 & 디버깅 팁 +- `speech_tone_id = -1`은 tone 미확인 상태로 간주한다. rewrite 한 번만 성공하면 tone이 채워져 이후에는 재작성 없이 캐시를 재생할 수 있다. +- `DEBUG_CHANNELS=qa` + `DEBUG_EXCLUDE_TYPES` 조합으로 `debug.qa.cache_candidates` / `debug.qa.v2.cache_candidates` 로그만 추려보면 tone 매칭 상태를 바로 확인할 수 있다. +- 특정 사용자의 중복 질문 캐시를 비우고 싶으면 `deleteEmbeddingsByOwner`를 실행하면 된다. 워커에서 이미 자동으로 호출하지만, 필요 시 수동으로도 가능하다. + +## 결과 +- follow-up 질문에서도 동일한 히스토리를 기준으로 캐시가 비교되니, 중복 질문 탐지가 더 정확해졌다. +- tone mismatch 상황에서도 `rewriteTone`만 호출하면 되기 때문에, Ask v2에서는 LLM 호출을 1번으로 줄였고 v1에서도 입력 토큰이 줄어 비용이 내려갔다. +- 임베딩 워커 단계에서 캐시를 정리하니, 콘텐츠 정합성을 걱정할 일이 없어졌다. “최신 글과 tone까지 맞춘 캐시”라는 목표를 과장 없이 달성했다. diff --git a/docs/history-tasks/ASK_SESSION_INTEGRATION.md b/docs/history-tasks/ASK_SESSION_INTEGRATION.md new file mode 100644 index 0000000..085b1a2 --- /dev/null +++ b/docs/history-tasks/ASK_SESSION_INTEGRATION.md @@ -0,0 +1,253 @@ +# ASK Session Integration Guide + +이 문서는 프론트엔드가 새 ASK 세션/히스토리 기능을 활용하기 위해 필요한 API 계약과 구현 예시를 정리한 자료다. `/ai/ask`, `/ai/v2/ask` 스트림 요청부터 세션 REST 엔드포인트, 무한 스크롤 메시지 페이징까지 한 흐름으로 설명한다. + +--- + +## 1. ASK 요청 흐름 + +### 1.1 세션 ID 확보/생성 +1. 기존 세션을 재사용할 때는 세션 목록 API(`GET /ai/v2/sessions`)로 ID를 조회한 뒤 선택한다. +2. 새 세션을 만들려면 ASK 요청의 `session_id`를 `null`이거나 생략하고, `user_id`(챗봇 주인 ID)를 반드시 포함한다. + - 서버가 자동으로 세션을 생성하고 다음을 반환한다. + - HTTP 헤더 `session-id: ` + - SSE `event: session` → `{ session_id, owner_user_id, requester_user_id }` +3. 새 ID를 받으면 프론트에서 상태에 저장하고 이후 요청에서는 `session_id`만 전달하면 된다. 이때 `user_id`는 optional이며, 보낸 경우 DB owner와 일치해야 한다. + +### 1.2 `/ai/ask` / `/ai/v2/ask` SSE 요청 샘플 + +```http +POST /ai/v2/ask HTTP/1.1 +Authorization: Bearer +Content-Type: application/json + +{ + "question": "최근 인프라 포스트 요약해줘", + "user_id": "blog-owner-123", // 새 세션일 때 필수 + "session_id": null, // null 또는 생략 → 세션 자동 생성 + "category_id": 42, + "speech_tone": -2 +} +``` + +스트림 이벤트 순서(상황에 따라 일부 생략): +1. `event: session` *(신규 세션인 경우)* +2. `event: search_plan` +3. `event: rewrite` / `event: keywords` *(하이브리드 일 때)* +4. `event: search_result`, `event: search_result_meta`, `event: exist_in_post_status`, `event: context` +5. `event: hybrid_result`, `event: hybrid_result_meta` *(필요 시)* +6. `event: answer` (LLM 토큰이 들어있는 SSE) +7. `event: session_saved` → `{ session_id, owner_user_id, requester_user_id, cached }` + - `cached: true`는 질문이 캐시 적중되어 기존 답변을 재사용했음을 의미 +8. 오류 시 `event: session_error`(reason 포함) + 기존 `event: error` + +프론트에서는 `session_saved`/`session_error`를 기준으로 UI 상태(“보관 완료” 배지 등)를 갱신할 수 있다. + +### 1.3 캐시 히트 처리 +동일한 질문(같은 사용자와 필터)일 경우 서버가 자동으로 캐시를 재생한다. +- SSE로 `search_plan`/`search_result`/`answer`가 즉시 도착하고, `session_saved` 이벤트의 `cached`가 `true`. +- **프론트 액션은 일반 답변과 동일**: SSE 순서/값이 동일하게 재생되므로 별도 분기를 둘 필요는 없지만, `cached`를 활용해 “이전 답변을 재사용했습니다” 같은 안내를 띄울 수 있다. + +--- + +## 2. 세션 REST API + +### 2.1 목록 조회 `GET /ai/v2/sessions` +- 쿼리 파라미터 + - `limit`: 기본 20, 최대 50 + - `cursor`: Base64(`created_at|id`) 문자열 + - `owner_user_id`: 특정 블로그/챗봇만 필터링할 때 사용 +- 응답 +```json +{ + "sessions": [ + { + "session_id": 123, + "owner_user_id": "blog-owner-123", + "requester_user_id": "viewer-999", + "title": "인프라 정리 질문", + "metadata": {}, + "last_question_at": "2025-01-19T10:05:12.123Z", + "created_at": "2025-01-19T09:55:00.000Z", + "updated_at": "2025-01-19T10:05:12.123Z", + "message_count": 4 + } + ], + "paging": { + "cursor": "MjAyNS0wMS0xOVQxMDowNToxMi4xMjNa|123", + "has_more": true + } +} +``` +- 페이징 구현 예시 + 1. 최초 호출: `GET /ai/v2/sessions?limit=20` + 2. 응답 `paging.cursor`가 존재하면 “더 보기” 클릭 시 `GET ...?cursor=` + 3. `has_more=false`일 때까지 반복 + +### 2.2 단일 세션 메타 `GET /ai/v2/sessions/:id` +- 자신이 만든 세션이 아니면 404. +- `message_count`를 추가로 주므로 목록에서 선택한 뒤 최신 상태를 다시 확인할 수 있다. + +### 2.3 메시지 페이지네이션 `GET /ai/v2/sessions/:id/messages` +- 쿼리 + - `limit` (default 20, max 50) + - `cursor` + - `direction`: `'backward'`(기본) 또는 `'forward'` +- 응답 +```json +{ + "session_id": 123, + "owner_user_id": "blog-owner-123", + "requester_user_id": "viewer-999", + "messages": [ + { + "id": 456, + "role": "user", + "content": "최근 인프라 글을 알려줘", + "search_plan": {...}, + "retrieval_meta": null, + "created_at": "2025-01-19T10:05:12.123Z" + }, + { + "id": 457, + "role": "assistant", + "content": "인프라 관련 최신 글은 ...", + "search_plan": null, + "retrieval_meta": {...}, + "created_at": "2025-01-19T10:05:20.000Z" + } + ], + "paging": { + "direction": "backward", + "has_more": true, + "next_cursor": "MjAyNS0wMS0xOVQxMDowNToxMi4xMjNa|456" + } +} +``` +- **무한 스크롤 구현 팁** + 1. 최신 메시지를 불러오려면 `direction=backward`, `cursor` 생략으로 시작. UI에서는 리스트 끝에 붙인다. + 2. 위로 스크롤하여 과거 메시지를 계속 불러오고 싶다면 응답의 `next_cursor`를 사용해 `GET ...?cursor=&direction=backward`. + 3. 대화 중간으로 점프해 이후 메시지를 로드하려면 동일 cursor를 `direction=forward`로 호출하면 된다. + 4. 응답 메시지는 API에서 시간순으로 이미 정렬되어 있으므로 바로 렌더링하면 된다. + +**무한 스크롤 의사 코드** +```ts +type PagingState = { + prevCursor: string | null; + nextCursor: string | null; + hasMorePrev: boolean; +}; + +const state: PagingState = { prevCursor: null, nextCursor: null, hasMorePrev: true }; + +// 최신(아래쪽) 메시지 로드 +const loadLatest = async () => { + const params = new URLSearchParams({ limit: '20', direction: 'backward' }); + if (state.prevCursor) params.set('cursor', state.prevCursor); + const res = await fetch(`/ai/v2/sessions/${sessionId}/messages?${params}`, { headers }); + const body = await res.json(); + renderPrepend(body.messages); // 위쪽에 추가 + state.prevCursor = body.paging?.next_cursor ?? null; + state.hasMorePrev = Boolean(body.paging?.has_more); +}; + +// 사용자가 아래로 내려간 뒤 이후 메시지를 보고 싶을 때 +const loadForward = async () => { + if (!state.nextCursor) return; + const params = new URLSearchParams({ limit: '20', direction: 'forward', cursor: state.nextCursor }); + const res = await fetch(`/ai/v2/sessions/${sessionId}/messages?${params}`, { headers }); + const body = await res.json(); + renderAppend(body.messages); // 아래쪽에 추가 + state.nextCursor = body.paging?.next_cursor ?? null; +}; +``` + +### 2.4 PATCH / DELETE +- `PATCH /ai/v2/sessions/:id` + - Body: `{ "title": "...", "metadata": { ... } }` (둘 중 하나 이상 필수) + - 성공 시 최신 메타를 반환. +- `DELETE /ai/v2/sessions/:id` + - `{ "session_id": 123, "deleted": true }` + - 세션/메시지/임베딩이 모두 cascade로 제거되므로 프론트에서 제거 후 새로고침 필요 없음. + +--- + +## 3. 프론트엔드 구현 참고 + +### 3.1 ASK 스트림 핸들러 의사 코드 +```ts +const sse = new EventSourcePolyfill('/ai/v2/ask', { headers: { Authorization: `Bearer ${token}` }, payload }); +const state = { sessionId: null, chunks: [] }; + +sse.addEventListener('session', (evt) => { + const data = JSON.parse(evt.data); + state.sessionId = data.session_id; + // 새 세션 ID를 저장해 다음 질문에 사용 +}); + +sse.addEventListener('search_plan', (evt) => { ... }); +sse.addEventListener('context', (evt) => { ... }); +sse.addEventListener('answer', (evt) => { + state.chunks.push(JSON.parse(evt.data)); + renderStreamingAnswer(state.chunks.join('')); +}); + +sse.addEventListener('session_saved', (evt) => { + const data = JSON.parse(evt.data); + showToast(data.cached ? '기존 답변을 재사용했어요.' : '대화가 저장되었습니다.'); +}); + +sse.addEventListener('session_error', (evt) => { + console.warn('세션 저장 실패', evt.data); +}); + +sse.onerror = () => { + sse.close(); +}; +``` + +### 3.2 대화 목록/상세 UI 시나리오 +1. **좌측 패널**: `/ai/v2/sessions?limit=20`으로 최근 대화 조회 → 커서 기반 “더 보기” 버튼. +2. **메시지 영역**: 세션을 선택하면 `GET /ai/v2/sessions/:id/messages`로 최신 메시지 불러오기 → `direction=backward`. +3. **무한 스크롤**: 맨 위로 스크롤되면 `cursor=previous.next_cursor`로 과거 메시지 로드. +4. **실시간 갱신**: SSE에서 받은 user/assistant 메시지를 메모리에 쌓고, 스트림 종료 후 `session_saved` 이벤트가 오면 REST API 결과와 동기화 가능. + +### 3.3 세션 ID 전파 +- 새 ASK 요청 → 응답 헤더 `session-id`와 `event: session`을 받으면, 프론트의 현재 대화 객체에 그 ID를 기록한다. +- 이후 폼 전송 시 `session_id`만 바디에 넣어서 이어서 질문할 수 있다. +- 다른 블로그로 이동하면 기존 세션 ID를 버리고 `user_id`를 새 값으로 넣어 다시 질문하면 된다(서버가 다른 owner와 세션을 매칭하지 않도록 검증함). + +--- + +## 4. 오류 및 예외 처리 + +### 4.1 주요 SSE 이벤트 & UI 매핑 + +| 이벤트 | 예시 payload | 권장 UI 처리 | +|--------|--------------|--------------| +| `session` | `{ session_id, owner_user_id, requester_user_id }` | 새 세션 카드 추가, 현재 대화 헤더 업데이트 | +| `search_plan` | `{ mode: 'rag', ... }` | 디버그 패널, “검색 계획 준비 중…” 표시 | +| `rewrite` / `keywords` | `["재작성1", ...]` / `["키워드1", ...]` | 검색 과정 시각화(선택 사항) | +| `search_result` / `hybrid_result` | `[ { postId, postTitle }, ... ]` | 참고 컨텍스트 목록 표시 | +| `search_result_meta` / `hybrid_result_meta` | 추가 메타 정보 | 고급 모드 또는 디버그 뷰 | +| `exist_in_post_status` | `true/false` | “관련 글을 찾음/찾지 못함” 안내 뱃지 | +| `context` | `[ { postId, postTitle }, ... ]` | UI 우측 “참조 글 목록” 섹션 | +| `answer` | `"…LLM 청크…"` | 채팅 말풍선 실시간 갱신 | +| `session_saved` | `{ session_id, cached }` | 저장 완료/캐시 재사용 토스트, 상태 뱃지 | +| `session_error` | `{ reason }` | 오류 토스트, 재시도 버튼 노출 | +| `error` | `{ message }` | 스트림 종료 + 에러 메시지 | + +### 4.2 오류 대응 요약 + +| 상황 | 응답/이벤트 | 대응 방법 | +|------|-------------|-----------| +| `session_id`가 유효하지 않음 | 400 + `{ message: 'Invalid session_id' }` | 프론트 세션 상태 초기화, 새 세션 생성 | +| 세션 owner 불일치 | 409 + `{ message: 'Session owner mismatch' }` | 다른 블로그로 전환 후 새 세션 시작 | +| 세션 접근 권한 없음 | 404 (`존재하지 않는다고 응답`) | 리스트를 다시 로드해 실제로 존재하는지 확인 | +| 포스트가 삭제/비공개 | SSE `event: error` + `session_error(reason=post_not_found/forbidden_post)` | 사용자에게 안내 후 대화 중단 | +| 저장 실패 | SSE `event: session_error` + reason | 로그/토스트로 사용자에게 “대화 저장에 실패했습니다” 알림 | +| LLM 오류/스트림 예외 | SSE `event: error` + `session_error(reason=llm_error/stream_error)` | 스트림 종료 후 재시도 UI | + +--- + +이 가이드를 토대로 세션 기반 ASK UX를 구현하면, 신규 세션 생성에서 히스토리 로딩까지 백엔드와 일관된 동작을 보장할 수 있다. 추가 질문은 `docs/history-tasks/ASK_SESSION_MANAGEMENT` 시리즈나 최근 커밋을 참고한다. diff --git a/docs/migrations/2025-02-ask-session-history.sql b/docs/migrations/2025-02-ask-session-history.sql new file mode 100644 index 0000000..2f64a7a --- /dev/null +++ b/docs/migrations/2025-02-ask-session-history.sql @@ -0,0 +1,71 @@ +BEGIN; + +-- Ensure pgvector extension is available for embedding storage +CREATE EXTENSION IF NOT EXISTS vector; + +-- Persist ASK session metadata +CREATE TABLE IF NOT EXISTS ask_session ( + id BIGSERIAL PRIMARY KEY, + requester_user_id TEXT NOT NULL, + owner_user_id TEXT NOT NULL, + title TEXT, + metadata JSONB NOT NULL DEFAULT '{}'::jsonb, + last_question_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_ask_session_requester_created_at + ON ask_session (requester_user_id, created_at DESC); + +CREATE INDEX IF NOT EXISTS idx_ask_session_owner_created_at + ON ask_session (owner_user_id, created_at DESC); + +CREATE INDEX IF NOT EXISTS idx_ask_session_last_question_at + ON ask_session (last_question_at DESC NULLS LAST); + +-- Persist individual ASK messages (user + assistant turns) +CREATE TABLE IF NOT EXISTS ask_message ( + id BIGSERIAL PRIMARY KEY, + session_id BIGINT NOT NULL REFERENCES ask_session(id) ON DELETE CASCADE, + role TEXT NOT NULL CHECK (role IN ('user', 'assistant')), + content TEXT NOT NULL, + search_plan JSONB, + retrieval_meta JSONB, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_ask_message_session_created_at + ON ask_message (session_id, created_at DESC, id DESC); + +-- Store embeddings for dedupe/cache checks +CREATE TABLE IF NOT EXISTS ask_message_embedding ( + message_id BIGINT PRIMARY KEY REFERENCES ask_message(id) ON DELETE CASCADE, + owner_user_id TEXT NOT NULL, + requester_user_id TEXT NOT NULL, + category_id BIGINT, + post_id BIGINT, + answer_message_id BIGINT REFERENCES ask_message(id), + embedding VECTOR(1536) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_ask_message_embedding_owner + ON ask_message_embedding (owner_user_id); + +CREATE INDEX IF NOT EXISTS idx_ask_message_embedding_owner_category + ON ask_message_embedding (owner_user_id, category_id); + +CREATE INDEX IF NOT EXISTS idx_ask_message_embedding_owner_post + ON ask_message_embedding (owner_user_id, post_id); + +CREATE INDEX IF NOT EXISTS idx_ask_message_embedding_requester + ON ask_message_embedding (requester_user_id); + +-- IVF FLAT index for similarity search (requires ANALYZE after large inserts) +CREATE INDEX IF NOT EXISTS idx_ask_message_embedding_vec + ON ask_message_embedding + USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100); + +COMMIT; diff --git a/docs/migrations/2025-03-ask-question-cache-tone.sql b/docs/migrations/2025-03-ask-question-cache-tone.sql new file mode 100644 index 0000000..161da8e --- /dev/null +++ b/docs/migrations/2025-03-ask-question-cache-tone.sql @@ -0,0 +1,16 @@ +BEGIN; + +-- Rename the duplicate-detection table to clarify its purpose. +ALTER TABLE IF EXISTS ask_message_embedding RENAME TO ask_question_cache; + +-- Keep naming consistent for the IVFFlat index if it already exists. +ALTER INDEX IF EXISTS idx_ask_message_embedding_vec RENAME TO idx_ask_question_cache_vec; + +-- Store speech tone IDs with cached answers; -1 indicates unknown tone. +ALTER TABLE IF EXISTS ask_question_cache + ADD COLUMN IF NOT EXISTS speech_tone_id integer NOT NULL DEFAULT -1; + +-- Existing cache entries lack tone metadata, so drop them after the schema change. +TRUNCATE ask_question_cache; + +COMMIT; diff --git a/docs/migrations/README.md b/docs/migrations/README.md index 3f4de87..5f994be 100644 --- a/docs/migrations/README.md +++ b/docs/migrations/README.md @@ -25,3 +25,20 @@ Notes: - Indexes increase disk usage and write overhead; create only on columns used for text search. - The extension must be installed once per database. +## Create ASK session/message tables + +File: `2025-02-ask-session-history.sql` + +Purpose: +- Create `ask_session`, `ask_message`, and `ask_question_cache` tables with the indexes needed for session history APIs. The cache table keeps duplicate-detection embeddings and speech tone metadata. +- Ensure the `vector` extension is enabled so question embeddings can be stored for duplicate-detection. `ask_question_cache.speech_tone_id` must default to `-1` to indicate unknown tone. + +Run: + +```bash +psql "$DATABASE_URL" -f docs/migrations/2025-02-ask-session-history.sql +``` + +Notes: +- The IVFFlat index requires a populated table before it becomes efficient; run `ANALYZE ask_question_cache;` after bulk loading data. +- `ask_question_cache` references `ask_message`, so dropping the session tables will cascade to embeddings automatically. diff --git a/src/config.ts b/src/config.ts index a2ff720..4ea7bb3 100644 --- a/src/config.ts +++ b/src/config.ts @@ -22,6 +22,7 @@ const configSchema = z.object({ LLM_COST_ROUND: z.coerce.number().default(4), DEBUG_ALL: z.string().default('false'), DEBUG_CHANNELS: z.string().default(''), + DEBUG_EXCLUDE_TYPES: z.string().default(''), REDIS_URL: z.string().optional(), REDIS_HOST: z.string().default('127.0.0.1'), REDIS_PORT: z.coerce.number().default(6379), diff --git a/src/controllers/ai.controller.ts b/src/controllers/ai.controller.ts index 6bfcec1..9982fda 100644 --- a/src/controllers/ai.controller.ts +++ b/src/controllers/ai.controller.ts @@ -8,6 +8,9 @@ import { import { answerStream } from '../services/qa.service'; import { EmbedTitleRequest, EmbedContentRequest, AskRequest } from '../types/ai.types'; import { DebugLogger } from '../utils/debug-logger'; +import { AuthRequest } from '../middlewares/auth.middleware'; +import { extractRequesterId } from '../utils/auth'; +import { resolveSessionContext, SessionContextError } from '../utils/session'; export const embedTitleHandler = async ( req: Request<{}, {}, EmbedTitleRequest>, @@ -47,13 +50,36 @@ export const embedContentHandler = async ( }; export const askHandler = async ( - req: Request<{}, {}, AskRequest>, + req: AuthRequest & Request<{}, {}, AskRequest>, res: Response, next: NextFunction ) => { // RAG 기반 QA 결과를 SSE 스트림으로 클라이언트에 전달 try { - const { question, user_id, category_id, speech_tone, post_id, llm } = req.body as any; + const requesterUserId = extractRequesterId(req); + if (!requesterUserId) { + return res.status(401).json({ message: 'Unauthorized' }); + } + + const { question, user_id, session_id, category_id, speech_tone, post_id, llm } = req.body; + + let sessionResult; + try { + sessionResult = await resolveSessionContext({ + requesterUserId, + sessionId: session_id, + ownerUserId: user_id, + titleHint: question, + }); + } catch (error) { + if (error instanceof SessionContextError) { + return res.status(error.status).json({ message: error.message }); + } + throw error; + } + + const { session, created } = sessionResult; + const ownerUserId = session.ownerUserId; // SSE를 위한 헤더 설정과 버퍼링 완화 옵션 res.setHeader('Content-Type', 'text/event-stream; charset=utf-8'); @@ -61,6 +87,7 @@ export const askHandler = async ( res.setHeader('Connection', 'keep-alive'); // Nginx 버퍼링 비활성화 res.setHeader('X-Accel-Buffering', 'no'); + res.setHeader('session-id', String(session.id)); // 헤더를 먼저 전송해 클라이언트 처리를 즉시 시작 (res as any).flushHeaders?.(); // 소켓의 네이글 알고리즘 버퍼링을 줄여 전송 지연 완화 @@ -68,7 +95,36 @@ export const askHandler = async ( // 프록시 버퍼링 임계값을 넘기기 위한 초기 keep-alive 전송 res.write(':ok\n\n'); - const stream = await answerStream(question, user_id, category_id, speech_tone, post_id, llm); + if (created) { + const payload = { + session_id: String(session.id), + owner_user_id: ownerUserId, + requester_user_id: requesterUserId, + }; + res.write(`event: session\n`); + res.write(`data: ${JSON.stringify(payload)}\n\n`); + } + + const stream = await answerStream({ + question, + session, + requesterUserId, + ownerUserId, + categoryId: category_id, + speechTone: speech_tone, + postId: post_id, + llm, + }); + + stream.on('session_saved', (payload) => { + res.write(`event: session_saved\n`); + res.write(`data: ${JSON.stringify(payload)}\n\n`); + }); + stream.on('session_error', (payload) => { + res.write(`event: session_error\n`); + res.write(`data: ${JSON.stringify(payload)}\n\n`); + }); + // SSE 델타가 즉시 전송되도록 수동 브리징 stream.on('data', (chunk) => { const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(String(chunk)); @@ -88,6 +144,7 @@ export const askHandler = async ( // 클라이언트 연결이 끊기면 스트림 자원 해제 req.on('close', () => { try { + stream.emit('client_disconnect'); stream.destroy(); } catch {} }); diff --git a/src/controllers/ai.v2.controller.ts b/src/controllers/ai.v2.controller.ts index 1fdf5be..ea79aa4 100644 --- a/src/controllers/ai.v2.controller.ts +++ b/src/controllers/ai.v2.controller.ts @@ -1,28 +1,95 @@ import { Request, Response, NextFunction } from 'express'; import { AskV2Request } from '../types/ai.v2.types'; import { answerStreamV2 } from '../services/qa.v2.service'; +import { AuthRequest } from '../middlewares/auth.middleware'; +import { extractRequesterId } from '../utils/auth'; +import { resolveSessionContext, SessionContextError } from '../utils/session'; export const askV2Handler = async ( - req: Request<{}, {}, AskV2Request>, + req: AuthRequest & Request<{}, {}, AskV2Request>, res: Response, next: NextFunction ) => { // 검색 계획 기반 v2 QA를 SSE로 중계 try { - const { question, user_id, category_id, speech_tone, post_id, llm } = req.body as any; - res.setHeader('Content-Type', 'text/event-stream'); - res.setHeader('Cache-Control', 'no-cache'); + const requesterUserId = extractRequesterId(req); + if (!requesterUserId) { + return res.status(401).json({ message: 'Unauthorized' }); + } + + const { question, user_id, session_id, category_id, speech_tone, post_id, llm } = req.body; + + let sessionResult; + try { + sessionResult = await resolveSessionContext({ + requesterUserId, + sessionId: session_id, + ownerUserId: user_id, + titleHint: question, + }); + } catch (error) { + if (error instanceof SessionContextError) { + return res.status(error.status).json({ message: error.message }); + } + throw error; + } + + const { session, created } = sessionResult; + const ownerUserId = session.ownerUserId; + + res.setHeader('Content-Type', 'text/event-stream; charset=utf-8'); + res.setHeader('Cache-Control', 'no-cache, no-transform'); res.setHeader('Connection', 'keep-alive'); + res.setHeader('X-Accel-Buffering', 'no'); + res.setHeader('session-id', String(session.id)); + (res as any).flushHeaders?.(); + (res.socket as any)?.setNoDelay?.(true); + res.write(':ok\n\n'); - const stream = await answerStreamV2( + if (created) { + const payload = { + session_id: String(session.id), + owner_user_id: ownerUserId, + requester_user_id: requesterUserId, + }; + res.write(`event: session\n`); + res.write(`data: ${JSON.stringify(payload)}\n\n`); + } + + const stream = await answerStreamV2({ question, - user_id, - category_id, - speech_tone, - post_id, - llm - ); - stream.pipe(res); + session, + requesterUserId, + ownerUserId, + categoryId: category_id, + speechTone: speech_tone, + postId: post_id, + llm, + }); + + stream.on('session_saved', (payload) => { + res.write(`event: session_saved\n`); + res.write(`data: ${JSON.stringify(payload)}\n\n`); + }); + stream.on('session_error', (payload) => { + res.write(`event: session_error\n`); + res.write(`data: ${JSON.stringify(payload)}\n\n`); + }); + + stream.on('data', (chunk) => { + const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(String(chunk)); + res.write(buf); + (res as any).flush?.(); + }); + stream.on('end', () => res.end()); + stream.on('error', () => res.end()); + + req.on('close', () => { + try { + stream.emit('client_disconnect'); + stream.destroy(); + } catch {} + }); } catch (error) { next(error); } diff --git a/src/controllers/session.controller.ts b/src/controllers/session.controller.ts new file mode 100644 index 0000000..1b8b0d7 --- /dev/null +++ b/src/controllers/session.controller.ts @@ -0,0 +1,174 @@ +import { Request, Response } from 'express'; +import { AuthRequest } from '../middlewares/auth.middleware'; +import * as sessionRepository from '../repositories/ask-session.repository'; +import * as messageRepository from '../repositories/ask-message.repository'; +import { sessionListQuerySchema, sessionMessagesQuerySchema, sessionPatchSchema } from '../types/session.types'; +import { extractRequesterId } from '../utils/auth'; + +const encodeCursor = (createdAt: Date, id: number): string => + Buffer.from(`${createdAt.toISOString()}|${id}`).toString('base64'); + +const decodeCursor = (cursor: string): { createdAt: Date; id: number } | null => { + try { + const decoded = Buffer.from(cursor, 'base64').toString('utf-8'); + const [iso, idStr] = decoded.split('|'); + if (!iso || !idStr) return null; + const createdAt = new Date(iso); + const id = Number(idStr); + if (Number.isNaN(createdAt.getTime()) || !Number.isFinite(id)) return null; + return { createdAt, id }; + } catch { + return null; + } +}; + +const toSessionResponse = ( + session: sessionRepository.AskSession | sessionRepository.AskSessionSummary, + messageCountOverride?: number +) => ({ + session_id: session.id, + owner_user_id: session.ownerUserId, + requester_user_id: session.requesterUserId, + title: session.title, + metadata: session.metadata ?? {}, + last_question_at: session.lastQuestionAt ? session.lastQuestionAt.toISOString() : null, + created_at: session.createdAt.toISOString(), + updated_at: session.updatedAt.toISOString(), + message_count: + messageCountOverride ?? ('messageCount' in session ? session.messageCount : undefined), +}); + +const toMessageResponse = (message: messageRepository.AskMessage) => ({ + id: message.id, + role: message.role, + content: message.content, + search_plan: message.searchPlan, + retrieval_meta: message.retrievalMeta, + created_at: message.createdAt.toISOString(), +}); + +export const listSessionsHandler = async (req: AuthRequest, res: Response) => { + const requesterId = extractRequesterId(req); + if (!requesterId) return res.status(401).json({ message: 'Unauthorized' }); + + const parse = sessionListQuerySchema.safeParse(req.query); + if (!parse.success) return res.status(400).json({ message: 'Invalid query', issues: parse.error.format() }); + const { limit, cursor, owner_user_id: ownerUserId } = parse.data; + + let cursorPayload: { createdAt: Date; id: number } | undefined; + if (cursor) { + const decoded = decodeCursor(cursor); + if (!decoded) return res.status(400).json({ message: 'Invalid cursor' }); + cursorPayload = decoded; + } + + const sessions = await sessionRepository.listSessionsForRequester({ + requesterUserId: requesterId, + ownerUserId, + cursorCreatedAt: cursorPayload?.createdAt, + cursorId: cursorPayload?.id, + limit, + }); + + const hasMore = sessions.length === limit; + const nextCursor = + hasMore && sessions.length + ? encodeCursor(sessions[sessions.length - 1].createdAt, sessions[sessions.length - 1].id) + : null; + + res.json({ + sessions: sessions.map((session) => toSessionResponse(session)), + paging: { cursor: nextCursor, has_more: hasMore }, + }); +}; + +export const getSessionHandler = async (req: AuthRequest, res: Response) => { + const requesterId = extractRequesterId(req); + if (!requesterId) return res.status(401).json({ message: 'Unauthorized' }); + + const sessionId = Number(req.params.id); + if (!Number.isFinite(sessionId)) return res.status(400).json({ message: 'Invalid session id' }); + + const session = await sessionRepository.findSessionForRequester(sessionId, requesterId); + if (!session) return res.status(404).json({ message: 'Session not found' }); + + const messageCount = await messageRepository.countMessagesForSession(sessionId); + + res.json(toSessionResponse(session, messageCount)); +}; + +export const getSessionMessagesHandler = async (req: AuthRequest, res: Response) => { + const requesterId = extractRequesterId(req); + if (!requesterId) return res.status(401).json({ message: 'Unauthorized' }); + + const sessionId = Number(req.params.id); + if (!Number.isFinite(sessionId)) return res.status(400).json({ message: 'Invalid session id' }); + + const session = await sessionRepository.findSessionForRequester(sessionId, requesterId); + if (!session) return res.status(404).json({ message: 'Session not found' }); + + const parse = sessionMessagesQuerySchema.safeParse(req.query); + if (!parse.success) return res.status(400).json({ message: 'Invalid query', issues: parse.error.format() }); + + const { limit, cursor, direction } = parse.data; + let cursorPayload: { createdAt: Date; id: number } | undefined; + if (cursor) { + const decoded = decodeCursor(cursor); + if (!decoded) return res.status(400).json({ message: 'Invalid cursor' }); + cursorPayload = decoded; + } + + const messages = await messageRepository.getMessagesBySession({ + sessionId, + limit, + direction, + cursor: cursorPayload ? { createdAt: cursorPayload.createdAt, id: cursorPayload.id } : undefined, + }); + + const hasMore = messages.length === limit; + const nextCursor = + hasMore && messages.length + ? encodeCursor(messages[messages.length - 1].createdAt, messages[messages.length - 1].id) + : null; + + res.json({ + session_id: session.id, + owner_user_id: session.ownerUserId, + requester_user_id: session.requesterUserId, + messages: messages.map(toMessageResponse), + paging: { direction, has_more: hasMore, next_cursor: nextCursor }, + }); +}; + +export const patchSessionHandler = async (req: AuthRequest, res: Response) => { + const requesterId = extractRequesterId(req); + if (!requesterId) return res.status(401).json({ message: 'Unauthorized' }); + + const sessionId = Number(req.params.id); + if (!Number.isFinite(sessionId)) return res.status(400).json({ message: 'Invalid session id' }); + + const parse = sessionPatchSchema.safeParse(req.body); + if (!parse.success) return res.status(400).json({ message: 'Invalid body', issues: parse.error.format() }); + + const updates = parse.data; + if (!('title' in updates) && !('metadata' in updates)) + return res.status(400).json({ message: 'No fields to update' }); + + const updated = await sessionRepository.updateSessionMeta(sessionId, requesterId, updates); + if (!updated) return res.status(404).json({ message: 'Session not found' }); + + res.json(toSessionResponse(updated)); +}; + +export const deleteSessionHandler = async (req: AuthRequest, res: Response) => { + const requesterId = extractRequesterId(req); + if (!requesterId) return res.status(401).json({ message: 'Unauthorized' }); + + const sessionId = Number(req.params.id); + if (!Number.isFinite(sessionId)) return res.status(400).json({ message: 'Invalid session id' }); + + const deleted = await sessionRepository.deleteSession(sessionId, requesterId); + if (!deleted) return res.status(404).json({ message: 'Session not found' }); + + res.json({ session_id: sessionId, deleted: true }); +}; diff --git a/src/llm/providers/openai-responses.ts b/src/llm/providers/openai-responses.ts index 82e808b..598ff28 100644 --- a/src/llm/providers/openai-responses.ts +++ b/src/llm/providers/openai-responses.ts @@ -8,10 +8,9 @@ const openai = new OpenAI({ apiKey: config.OPENAI_API_KEY }); const toResponsesInput = (messages: OpenAIStyleMessage[] = []) => { // 단순 채팅 메시지를 Responses API 입력 구조로 변환 - // Responses API는 content 타입으로 'text'가 아닌 'input_text'를 요구함 return messages.map((m) => ({ role: m.role, - content: [{ type: 'input_text', text: m.content }], + content: m.content, })); }; diff --git a/src/repositories/ask-message.repository.ts b/src/repositories/ask-message.repository.ts new file mode 100644 index 0000000..e86cbc8 --- /dev/null +++ b/src/repositories/ask-message.repository.ts @@ -0,0 +1,146 @@ +import type { QueryExecutor } from '../utils/db'; +import { runQuery } from '../utils/db'; + +export type MessageRole = 'user' | 'assistant'; + +export interface AskMessage { + id: number; + sessionId: number; + role: MessageRole; + content: string; + searchPlan: Record | null; + retrievalMeta: Record | null; + createdAt: Date; +} + +type MessageRow = { + id: number; + sessionId: number; + role: MessageRole; + content: string; + searchPlan: Record | null; + retrievalMeta: Record | null; + createdAt: Date; +}; + +const baseSelect = ` + SELECT + id, + session_id AS "sessionId", + role, + content, + search_plan AS "searchPlan", + retrieval_meta AS "retrievalMeta", + created_at AS "createdAt" + FROM ask_message +`; + +const mapMessage = (row: MessageRow): AskMessage => ({ + ...row, + searchPlan: row.searchPlan ?? null, + retrievalMeta: row.retrievalMeta ?? null, +}); + +export const insertMessage = async ( + params: { + sessionId: number; + role: MessageRole; + content: string; + searchPlan?: Record | null; + retrievalMeta?: Record | null; + }, + executor?: QueryExecutor +): Promise => { + const result = await runQuery( + ` + INSERT INTO ask_message (session_id, role, content, search_plan, retrieval_meta) + VALUES ($1, $2, $3, $4, $5) + RETURNING + id, + session_id AS "sessionId", + role, + content, + search_plan AS "searchPlan", + retrieval_meta AS "retrievalMeta", + created_at AS "createdAt" + `, + [params.sessionId, params.role, params.content, params.searchPlan ?? null, params.retrievalMeta ?? null], + executor + ); + + return mapMessage(result.rows[0]); +}; + +export const getMessageById = async (messageId: number): Promise => { + const result = await runQuery(`${baseSelect} WHERE id = $1`, [messageId]); + if (!result.rowCount) return null; + return mapMessage(result.rows[0]); +}; + +export const getLatestMessages = async ( + sessionId: number, + limit = 4, + executor?: QueryExecutor +): Promise => { + const result = await runQuery( + `${baseSelect} WHERE session_id = $1 ORDER BY created_at DESC, id DESC LIMIT $2`, + [sessionId, limit], + executor + ); + return result.rows.map(mapMessage).reverse(); +}; + +export type MessageDirection = 'forward' | 'backward'; + +export interface FetchMessagesParams { + sessionId: number; + limit: number; + direction?: MessageDirection; + cursor?: { createdAt: Date; id: number }; +} + +export const getMessagesBySession = async ({ + sessionId, + limit, + direction = 'backward', + cursor, +}: FetchMessagesParams): Promise => { + const predicates = ['session_id = $1']; + const values: unknown[] = [sessionId]; + + if (cursor) { + values.push(cursor.createdAt, cursor.id); + const cursorCreatedIdx = values.length - 1; + const cursorIdIdx = values.length; + + if (direction === 'forward') { + predicates.push( + `(created_at > $${cursorCreatedIdx} OR (created_at = $${cursorCreatedIdx} AND id > $${cursorIdIdx}))` + ); + } else { + predicates.push( + `(created_at < $${cursorCreatedIdx} OR (created_at = $${cursorCreatedIdx} AND id < $${cursorIdIdx}))` + ); + } + } + + values.push(limit); + const limitIdx = values.length; + + const orderClause = direction === 'forward' ? 'ORDER BY created_at ASC, id ASC' : 'ORDER BY created_at DESC, id DESC'; + + const result = await runQuery( + `${baseSelect} WHERE ${predicates.join(' AND ')} ${orderClause} LIMIT $${limitIdx}`, + values + ); + + const mapped = result.rows.map(mapMessage); + return direction === 'forward' ? mapped : mapped.reverse(); +}; + +export const countMessagesForSession = async (sessionId: number): Promise => { + const result = await runQuery<{ count: string }>('SELECT COUNT(*)::text AS count FROM ask_message WHERE session_id = $1', [ + sessionId, + ]); + return Number(result.rows[0]?.count ?? 0); +}; diff --git a/src/repositories/ask-question-cache.repository.ts b/src/repositories/ask-question-cache.repository.ts new file mode 100644 index 0000000..310c4b5 --- /dev/null +++ b/src/repositories/ask-question-cache.repository.ts @@ -0,0 +1,169 @@ +import pgvector from 'pgvector/pg'; +import type { QueryExecutor } from '../utils/db'; +import { runQuery } from '../utils/db'; + +export interface MessageEmbedding { + messageId: number; + ownerUserId: string; + requesterUserId: string; + categoryId: number | null; + postId: number | null; + answerMessageId: number | null; + speechToneId: number; + createdAt: Date; + updatedAt: Date; +} + +type EmbeddingRow = { + messageId: number; + ownerUserId: string; + requesterUserId: string; + categoryId: number | null; + postId: number | null; + answerMessageId: number | null; + speechToneId: number; + createdAt: Date; + updatedAt: Date; +}; + +const baseSelect = ` + SELECT + message_id AS "messageId", + owner_user_id AS "ownerUserId", + requester_user_id AS "requesterUserId", + category_id AS "categoryId", + post_id AS "postId", + answer_message_id AS "answerMessageId", + speech_tone_id AS "speechToneId", + created_at AS "createdAt", + updated_at AS "updatedAt" + FROM ask_question_cache +`; + +const mapRow = (row: EmbeddingRow): MessageEmbedding => ({ ...row }); + +export const upsertEmbedding = async ( + params: { + messageId: number; + ownerUserId: string; + requesterUserId: string; + embedding: number[]; + categoryId?: number | null; + postId?: number | null; + answerMessageId?: number | null; + speechToneId?: number; + }, + executor?: QueryExecutor +): Promise => { + const result = await runQuery( + ` + INSERT INTO ask_question_cache ( + message_id, + owner_user_id, + requester_user_id, + category_id, + post_id, + answer_message_id, + speech_tone_id, + embedding + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (message_id) DO UPDATE + SET + category_id = EXCLUDED.category_id, + post_id = EXCLUDED.post_id, + answer_message_id = EXCLUDED.answer_message_id, + speech_tone_id = EXCLUDED.speech_tone_id, + embedding = EXCLUDED.embedding, + updated_at = now() + RETURNING + message_id AS "messageId", + owner_user_id AS "ownerUserId", + requester_user_id AS "requesterUserId", + category_id AS "categoryId", + post_id AS "postId", + answer_message_id AS "answerMessageId", + speech_tone_id AS "speechToneId", + created_at AS "createdAt", + updated_at AS "updatedAt" + `, + [ + params.messageId, + params.ownerUserId, + params.requesterUserId, + params.categoryId ?? null, + params.postId ?? null, + params.answerMessageId ?? null, + params.speechToneId ?? -1, + pgvector.toSql(params.embedding), + ], + executor + ); + + return mapRow(result.rows[0]); +}; + +export interface SimilarMessage { + messageId: number; + answerMessageId: number | null; + speechToneId: number; + similarity: number; +} + +export interface SimilarSearchParams { + ownerUserId: string; + requesterUserId: string; + embedding: number[]; + postId?: number | null; + categoryId?: number | null; + limit?: number; +} + +export const findSimilarEmbeddings = async ({ + ownerUserId, + requesterUserId, + embedding, + postId, + categoryId, + limit = 3, +}: SimilarSearchParams): Promise => { + const filters = ['owner_user_id = $2', 'requester_user_id = $3']; + const values: unknown[] = [pgvector.toSql(embedding), ownerUserId, requesterUserId]; + + if (postId != null) { + values.push(postId); + filters.push('post_id = $' + values.length); + } else { + filters.push('post_id IS NULL'); + } + + if (postId == null) { + values.push(categoryId ?? null); + filters.push('category_id IS NOT DISTINCT FROM $' + values.length); + } + + values.push(limit); + const limitIdx = values.length; + + const result = await runQuery( + ` + SELECT + message_id AS "messageId", + answer_message_id AS "answerMessageId", + speech_tone_id AS "speechToneId", + 1 - (embedding <=> $1) AS similarity + FROM ask_question_cache + WHERE ${filters.join(' AND ')} + ORDER BY embedding <-> $1 + LIMIT $${limitIdx} + `, + values + ); + + return result.rows; +}; + +export const deleteEmbeddingsByOwner = async (ownerUserId: string): Promise => { + const result = await runQuery('DELETE FROM ask_question_cache WHERE owner_user_id = $1', [ownerUserId]); + return result.rowCount ?? 0; +}; diff --git a/src/repositories/ask-session.repository.ts b/src/repositories/ask-session.repository.ts new file mode 100644 index 0000000..25cb791 --- /dev/null +++ b/src/repositories/ask-session.repository.ts @@ -0,0 +1,215 @@ +import { runQuery, QueryExecutor } from '../utils/db'; + +export type JsonMap = Record; + +export interface AskSession { + id: number; + requesterUserId: string; + ownerUserId: string; + title: string | null; + metadata: JsonMap; + lastQuestionAt: Date | null; + createdAt: Date; + updatedAt: Date; +} + +export interface AskSessionSummary extends AskSession { + messageCount: number; +} + +type AskSessionRow = { + id: number; + requesterUserId: string; + ownerUserId: string; + title: string | null; + metadata: JsonMap | null; + lastQuestionAt: Date | null; + createdAt: Date; + updatedAt: Date; + messageCount?: number; +}; + +const baseSelect = ` + SELECT + s.id, + s.requester_user_id AS "requesterUserId", + s.owner_user_id AS "ownerUserId", + s.title, + s.metadata, + s.last_question_at AS "lastQuestionAt", + s.created_at AS "createdAt", + s.updated_at AS "updatedAt" + FROM ask_session s +`; + +const mapRow = (row: T): T & { metadata: JsonMap } => ({ + ...row, + metadata: row.metadata ?? {}, +}); + +export const createSession = async (params: { + requesterUserId: string; + ownerUserId: string; + title?: string | null; + metadata?: JsonMap; +}): Promise => { + const result = await runQuery( + ` + INSERT INTO ask_session (requester_user_id, owner_user_id, title, metadata) + VALUES ($1, $2, $3, COALESCE($4::jsonb, '{}'::jsonb)) + RETURNING + id, + requester_user_id AS "requesterUserId", + owner_user_id AS "ownerUserId", + title, + metadata, + last_question_at AS "lastQuestionAt", + created_at AS "createdAt", + updated_at AS "updatedAt" + `, + [params.requesterUserId, params.ownerUserId, params.title ?? null, JSON.stringify(params.metadata ?? {})] + ); + + return mapRow(result.rows[0]); +}; + +export const findSessionById = async (sessionId: number): Promise => { + const result = await runQuery(`${baseSelect} WHERE s.id = $1`, [sessionId]); + if (result.rows.length === 0) return null; + return mapRow(result.rows[0]); +}; + +export const findSessionForRequester = async ( + sessionId: number, + requesterUserId: string +): Promise => { + const result = await runQuery(`${baseSelect} WHERE s.id = $1 AND s.requester_user_id = $2`, [ + sessionId, + requesterUserId, + ]); + if (result.rows.length === 0) return null; + return mapRow(result.rows[0]); +}; + +export interface ListSessionsParams { + requesterUserId: string; + ownerUserId?: string; + cursorCreatedAt?: Date; + cursorId?: number; + limit?: number; +} + +export const listSessionsForRequester = async ({ + requesterUserId, + ownerUserId, + cursorCreatedAt, + cursorId, + limit = 20, +}: ListSessionsParams): Promise => { + const conditions = ['s.requester_user_id = $1']; + const values: unknown[] = [requesterUserId]; + let paramIndex = values.length; + + if (ownerUserId) { + values.push(ownerUserId); + paramIndex += 1; + conditions.push(`s.owner_user_id = $${paramIndex}`); + } + + if (cursorCreatedAt && cursorId) { + values.push(cursorCreatedAt, cursorId); + const cursorCreatedIdx = values.length - 1; + const cursorIdIdx = values.length; + conditions.push( + `(s.created_at < $${cursorCreatedIdx} OR (s.created_at = $${cursorCreatedIdx} AND s.id < $${cursorIdIdx}))` + ); + } + + values.push(limit); + const limitIdx = values.length; + + const sql = ` + SELECT + s.id, + s.requester_user_id AS "requesterUserId", + s.owner_user_id AS "ownerUserId", + s.title, + s.metadata, + s.last_question_at AS "lastQuestionAt", + s.created_at AS "createdAt", + s.updated_at AS "updatedAt", + COALESCE(stats.message_count, 0)::int AS "messageCount" + FROM ask_session s + LEFT JOIN LATERAL ( + SELECT COUNT(*) AS message_count + FROM ask_message m + WHERE m.session_id = s.id + ) AS stats ON true + WHERE ${conditions.join(' AND ')} + ORDER BY s.created_at DESC, s.id DESC + LIMIT $${limitIdx} + `; + + const result = await runQuery(sql, values); + return result.rows.map((row) => mapRow(row)); +}; + +export const updateSessionMeta = async ( + sessionId: number, + requesterUserId: string, + updates: { title?: string | null; metadata?: JsonMap } +): Promise => { + const sets: string[] = ['updated_at = now()']; + const values: unknown[] = []; + + if (updates.title !== undefined) { + values.push(updates.title ?? null); + sets.push(`title = $${values.length}`); + } + + if (updates.metadata !== undefined) { + values.push(JSON.stringify(updates.metadata ?? {})); + sets.push(`metadata = COALESCE($${values.length}::jsonb, '{}'::jsonb)`); + } + + if (sets.length === 1) { + return findSessionForRequester(sessionId, requesterUserId); + } + + values.push(sessionId, requesterUserId); + const sessionIdx = values.length - 1; + const requesterIdx = values.length; + + const result = await runQuery( + ` + UPDATE ask_session + SET ${sets.join(', ')} + WHERE id = $${sessionIdx} AND requester_user_id = $${requesterIdx} + RETURNING + id, + requester_user_id AS "requesterUserId", + owner_user_id AS "ownerUserId", + title, + metadata, + last_question_at AS "lastQuestionAt", + created_at AS "createdAt", + updated_at AS "updatedAt" + `, + values + ); + + if (result.rows.length === 0) return null; + return mapRow(result.rows[0]); +}; + +export const touchSessionLastQuestion = async (sessionId: number, executor?: QueryExecutor): Promise => { + await runQuery('UPDATE ask_session SET last_question_at = now(), updated_at = now() WHERE id = $1', [sessionId], executor); +}; + +export const deleteSession = async (sessionId: number, requesterUserId: string): Promise => { + const result = await runQuery('DELETE FROM ask_session WHERE id = $1 AND requester_user_id = $2', [ + sessionId, + requesterUserId, + ]); + return (result.rowCount ?? 0) > 0; +}; diff --git a/src/routes/ai.v2.routes.ts b/src/routes/ai.v2.routes.ts index 08bc65e..eb2aabf 100644 --- a/src/routes/ai.v2.routes.ts +++ b/src/routes/ai.v2.routes.ts @@ -1,6 +1,7 @@ import { Router } from 'express'; import { askV2Handler } from '../controllers/ai.v2.controller'; import { authMiddleware } from '../middlewares/auth.middleware'; +import sessionRouter from './session.routes'; // 검색 계획을 사용하는 v2 ASK 엔드포인트 라우터 const aiV2Router = Router(); @@ -10,5 +11,6 @@ aiV2Router.get('/health', (req, res) => { }); aiV2Router.post('/ask', authMiddleware, askV2Handler); +aiV2Router.use('/sessions', sessionRouter); export default aiV2Router; diff --git a/src/routes/session.routes.ts b/src/routes/session.routes.ts new file mode 100644 index 0000000..a3eadbe --- /dev/null +++ b/src/routes/session.routes.ts @@ -0,0 +1,21 @@ +import { Router } from 'express'; +import { authMiddleware } from '../middlewares/auth.middleware'; +import { + deleteSessionHandler, + getSessionHandler, + getSessionMessagesHandler, + listSessionsHandler, + patchSessionHandler, +} from '../controllers/session.controller'; + +const sessionRouter = Router(); + +sessionRouter.use(authMiddleware); + +sessionRouter.get('/', listSessionsHandler); +sessionRouter.get('/:id', getSessionHandler); +sessionRouter.get('/:id/messages', getSessionMessagesHandler); +sessionRouter.patch('/:id', patchSessionHandler); +sessionRouter.delete('/:id', deleteSessionHandler); + +export default sessionRouter; diff --git a/src/services/qa.service.ts b/src/services/qa.service.ts index a8966d0..52ab7e9 100644 --- a/src/services/qa.service.ts +++ b/src/services/qa.service.ts @@ -1,4 +1,3 @@ -import { createEmbeddings } from './embedding.service'; import { PassThrough } from 'stream'; import config from '../config'; import * as postRepository from '../repositories/post.repository'; @@ -7,6 +6,12 @@ import * as qaPrompts from '../prompts/qa.prompts'; import { generate } from '../llm'; import { DebugLogger } from '../utils/debug-logger'; import * as userRepository from '../repositories/user.repository'; +import { createEmbeddings } from './embedding.service'; +import * as sessionHistoryService from './session-history.service'; +import { AskSession } from '../repositories/ask-session.repository'; +import { extractAnswerText } from '../utils/sse'; +import { rewriteTone } from './replace-tone.service'; +import type { LlmOverride } from '../types/llm.types'; // HTML 태그를 제거하고 길이를 제한하여 LLM 컨텍스트를 정제 const preprocessContent = (content: string): string => { @@ -16,41 +21,75 @@ const preprocessContent = (content: string): string => { // 사용자 말투 ID에 따라 프롬프트 지시문을 반환 const getSpeechTonePrompt = async (speechTone: number, userId: string): Promise => { - if (speechTone === -1) return "간결하고 명확한 말투로 답변해"; - if (speechTone === -2) return "아래의 블로그 본문 컨텍스트를 참고하여 본문의 말투를 파악해 최대한 비슷한 말투로 답변해"; + if (speechTone === -1) return '간결하고 명확한 말투로 답변해'; + if (speechTone === -2) return '아래의 블로그 본문 컨텍스트를 참고하여 본문의 말투를 파악해 최대한 비슷한 말투로 답변해'; const persona = await personaRepository.findPersonaById(speechTone, userId); if (persona) { return `${persona.name}: ${persona.description}`; } - return "간결하고 명확한 말투로 답변해"; // 기본 말투 + return '간결하고 명확한 말투로 답변해'; // 기본 말투 +}; + +export interface AnswerStreamOptions { + question: string; + session: AskSession; + requesterUserId: string; + ownerUserId: string; + categoryId?: number; + speechTone?: number; + postId?: number; + llm?: LlmOverride; } -type LlmOverride = { - provider?: 'openai' | 'gemini'; - model?: string; - options?: { temperature?: number; top_p?: number; max_output_tokens?: number }; +const prependHistory = ( + base: { role: 'system' | 'user' | 'assistant' | 'tool' | 'function'; content: string }[], + history: { role: 'user' | 'assistant'; content: string }[] +) => { + if (!history.length) return base; + if (base.length === 0 || base[0].role !== 'system') { + return [...history, ...base]; + } + const [systemMessage, ...rest] = base; + return [systemMessage, ...history, ...rest]; }; +const sessionSavedPayload = (session: AskSession, cached = false) => ({ + session_id: String(session.id), + owner_user_id: session.ownerUserId, + requester_user_id: session.requesterUserId, + cached, +}); + +const sessionErrorPayload = (session: AskSession, reason: string) => ({ + session_id: String(session.id), + owner_user_id: session.ownerUserId, + requester_user_id: session.requesterUserId, + reason, +}); + // 질문에 대한 RAG 답변을 SSE 스트림으로 생성 -export const answerStream = async ( - question: string, - userId: string, - categoryId?: number, - speechTone: number = -1, - postId?: number, - llm?: LlmOverride -): Promise => { +export const answerStream = async ({ + question, + session, + requesterUserId, + ownerUserId, + categoryId, + speechTone = -1, + postId, + llm, +}: AnswerStreamOptions): Promise => { const stream = new PassThrough(); DebugLogger.log('qa', { type: 'debug.qa.start', questionLen: question?.length || 0, - userId, + ownerUserId, categoryId, postId, speechTone, llm, + sessionId: session.id, }); let messages: { role: 'system' | 'user' | 'assistant' | 'tool' | 'function'; content: string }[] = []; @@ -61,11 +100,144 @@ export const answerStream = async ( }[] | undefined = undefined; + let bufferedAnswer = ''; + let questionEmbedding: number[] | null = null; + let duplicateQuestionEmbedding: number[] | null = null; + let searchPlanPayload: Record | null = null; + let retrievalMetaPayload: Record | null = null; + let clientDisconnected = false; + + const replayCachedAnswer = async ( + cached: sessionHistoryService.CachedAnswerResult, + options?: { answerOverride?: string; speechToneIdOverride?: number } + ) => { + const finalAnswer = options?.answerOverride ?? cached.answer; + const speechToneForPersistence = + typeof options?.speechToneIdOverride === 'number' ? options?.speechToneIdOverride : cached.speechToneId; + if (cached.searchPlan) { + stream.write(`event: search_plan\n`); + stream.write(`data: ${JSON.stringify(cached.searchPlan)}\n\n`); + } + const context = Array.isArray((cached.retrievalMeta as any)?.context) + ? (cached.retrievalMeta as any).context + : null; + if (context) { + stream.write(`event: search_result\n`); + stream.write(`data: ${JSON.stringify(context)}\n\n`); + stream.write(`event: context\n`); + stream.write(`data: ${JSON.stringify(context)}\n\n`); + } + const existFlag = (cached.retrievalMeta as any)?.exist_in_post_status; + if (typeof existFlag === 'boolean') { + stream.write(`event: exist_in_post_status\n`); + stream.write(`data: ${JSON.stringify(existFlag)}\n\n`); + } + stream.write(`event: answer\n`); + stream.write(`data: ${JSON.stringify(finalAnswer)}\n\n`); + + try { + if (!questionEmbedding || !duplicateQuestionEmbedding) + throw new Error('Missing embeddings for cache replay'); + await sessionHistoryService.persistConversation({ + sessionId: session.id, + requesterUserId, + ownerUserId, + question, + answer: finalAnswer, + searchPlan: cached.searchPlan ?? undefined, + retrievalMeta: cached.retrievalMeta ?? undefined, + categoryId, + postId, + questionEmbedding, + duplicateQuestionEmbedding, + speechTone: speechToneForPersistence, + }); + stream.emit('session_saved', sessionSavedPayload(session, true)); + } catch (error) { + DebugLogger.error('qa', { + type: 'debug.qa.cache_persistence_error', + sessionId: session.id, + message: (error as Error)?.message ?? 'unknown', + }); + stream.emit('session_error', sessionErrorPayload(session, 'persistence_failed')); + } + stream.end(); + }; + + stream.once('client_disconnect', () => { + clientDisconnected = true; + }); + (async () => { - const [speechTonePrompt, blogMeta] = await Promise.all([ - getSpeechTonePrompt(speechTone, userId), - userRepository.findUserBlogMetadata(userId), + const [speechTonePrompt, blogMeta, historyMessages] = await Promise.all([ + getSpeechTonePrompt(speechTone, ownerUserId), + userRepository.findUserBlogMetadata(ownerUserId), + sessionHistoryService.loadRecentMessages(session.id), ]); + const duplicateQuestionBlock = sessionHistoryService.buildDuplicateQuestionBlock(question, historyMessages); + const embeddingVector = await createEmbeddings([question, duplicateQuestionBlock]); + questionEmbedding = embeddingVector[0]; + duplicateQuestionEmbedding = embeddingVector[1]; + + const cachedAnswerList = duplicateQuestionEmbedding + ? await sessionHistoryService.findCachedAnswer({ + ownerUserId, + requesterUserId, + embedding: duplicateQuestionEmbedding, + postId: postId ?? undefined, + categoryId: categoryId ?? undefined, + }) + : []; + + const requestedSpeechTone = typeof speechTone === 'number' ? speechTone : -1; + const { matchingCandidate: matchingCachedAnswer, rewriteCandidate } = + sessionHistoryService.selectToneAwareCacheCandidate(cachedAnswerList, requestedSpeechTone); + DebugLogger.log('qa', { + type: 'debug.qa.cache_candidates', + requestedSpeechTone, + candidateCount: cachedAnswerList.length, + candidateTones: cachedAnswerList.map((candidate) => candidate.speechToneId), + }); + + if (matchingCachedAnswer) { + DebugLogger.log('qa', { + type: 'debug.qa.cache_hit', + sessionId: session.id, + similarity: matchingCachedAnswer.similarity, + speechTone: requestedSpeechTone, + }); + await replayCachedAnswer(matchingCachedAnswer); + return; + } + + if (rewriteCandidate) { + DebugLogger.log('qa', { + type: 'debug.qa.cache_hit_tone_mismatch', + sessionId: session.id, + similarity: rewriteCandidate.similarity, + requestedSpeechTone, + cachedSpeechTone: rewriteCandidate.speechToneId, + }); + try { + const rewrittenAnswer = await rewriteTone(rewriteCandidate.answer, { + speechToneId: requestedSpeechTone, + speechTonePrompt, + llm, + }); + await replayCachedAnswer(rewriteCandidate, { + answerOverride: rewrittenAnswer, + speechToneIdOverride: requestedSpeechTone, + }); + return; + } catch (error) { + DebugLogger.warn('qa', { + type: 'debug.qa.cache_tone_rewrite_failed', + sessionId: session.id, + message: (error as Error)?.message ?? 'tone_rewrite_failed', + }); + } + } + const toSimpleMessages = ( raw: any[] ): { role: 'system' | 'user' | 'assistant' | 'tool' | 'function'; content: string }[] => { @@ -80,15 +252,16 @@ export const answerStream = async ( if (!post) { stream.write(`event: error\ndata: ${JSON.stringify({ code: 404, message: 'Post not found' })}\n\n`); + stream.emit('session_error', sessionErrorPayload(session, 'post_not_found')); stream.end(); DebugLogger.warn('qa', { type: 'debug.qa.post', status: 'not_found', postId }); return; } - - // 비공개 글이면 소유자만 접근하도록 검증 - if (!post.is_public && post.user_id !== userId) { + + if (!post.is_public && post.user_id !== ownerUserId) { stream.write(`event: error\n`); stream.write(`data: ${JSON.stringify({ code: 403, message: 'Forbidden' })}\n\n`); + stream.emit('session_error', sessionErrorPayload(session, 'forbidden_post')); stream.end(); DebugLogger.warn('qa', { type: 'debug.qa.post', status: 'forbidden', postId }); return; @@ -108,14 +281,20 @@ export const answerStream = async ( qaPrompts.createPostContextPrompt(post, processedContent, question, speechTonePrompt, blogMeta ?? undefined) ); + searchPlanPayload = { mode: 'post', post_id: postId }; + retrievalMetaPayload = { + strategy: '단일 포스트 컨텍스트', + post_id: postId, + context: [{ postId: post.id, postTitle: post.title }], + exist_in_post_status: true, + }; } else { - const [questionEmbedding] = await createEmbeddings([question]); - const similarChunks = await postRepository.findSimilarChunks(userId, questionEmbedding, categoryId); - + const similarChunks = await postRepository.findSimilarChunks(ownerUserId, questionEmbedding!, categoryId); + const existInPost = similarChunks.length > 0; stream.write(`event: exist_in_post_status\ndata: ${JSON.stringify(existInPost)}\n\n`); - const context = similarChunks.map(chunk => ({ postId: chunk.postId, postTitle: chunk.postTitle })); + const context = similarChunks.map((chunk) => ({ postId: chunk.postId, postTitle: chunk.postTitle })); stream.write(`event: context\ndata: ${JSON.stringify(context)}\n\n`); DebugLogger.log('qa', { type: 'debug.qa.path', @@ -130,26 +309,36 @@ export const answerStream = async ( postChunk: chunk.postChunk, createdAt: (chunk as any).postCreatedAt ?? null, })); + const retrievalMeta = { + strategy: categoryId ? `임베딩 기반 RAG (카테고리 ${categoryId})` : '임베딩 기반 RAG', + resultCount: similarChunks.length, + context, + exist_in_post_status: existInPost, + }; messages = toSimpleMessages( qaPrompts.createRagPrompt(question, ragChunks, speechTonePrompt, { - retrievalMeta: { - strategy: categoryId - ? `임베딩 기반 RAG (카테고리 ${categoryId})` - : '임베딩 기반 RAG', - resultCount: similarChunks.length, - }, + retrievalMeta, blogMeta: blogMeta ?? undefined, }) ); + + searchPlanPayload = { mode: 'rag', category_id: categoryId ?? null }; + retrievalMetaPayload = retrievalMeta; } + const historyForPrompt = historyMessages.map((message) => ({ + role: message.role, + content: message.content, + })) as { role: 'user' | 'assistant'; content: string }[]; + messages = prependHistory(messages, historyForPrompt); + const llmStream = await generate({ provider: llm?.provider || 'openai', model: llm?.model || config.CHAT_MODEL, messages, tools, options: llm?.options, - meta: { userId, categoryId, postId }, + meta: { userId: ownerUserId, categoryId, postId }, }); DebugLogger.log('qa', { type: 'debug.qa.call', @@ -162,6 +351,10 @@ export const answerStream = async ( llmStream.on('data', (chunk) => { const str = Buffer.isBuffer(chunk) ? chunk.toString('utf8') : String(chunk); + const answerTexts = extractAnswerText(str); + if (answerTexts.length) { + bufferedAnswer += answerTexts.join(''); + } DebugLogger.log('qa', { type: 'debug.qa.chunk', at: Date.now(), @@ -170,17 +363,61 @@ export const answerStream = async ( }); stream.write(chunk); }); - llmStream.on('end', () => { + + llmStream.on('end', async () => { + if (clientDisconnected) { + stream.end(); + return; + } + DebugLogger.log('qa', { + type: 'debug.qa.buffered_answer', + sessionId: session.id, + length: bufferedAnswer.length, + preview: bufferedAnswer.slice(0, 80), + }); + try { + if (questionEmbedding && duplicateQuestionEmbedding) { + await sessionHistoryService.persistConversation({ + sessionId: session.id, + requesterUserId, + ownerUserId, + question, + answer: bufferedAnswer.trim(), + searchPlan: searchPlanPayload ?? undefined, + retrievalMeta: retrievalMetaPayload ?? undefined, + categoryId, + postId, + questionEmbedding, + duplicateQuestionEmbedding, + speechTone, + }); + stream.emit('session_saved', sessionSavedPayload(session)); + } else { + stream.emit('session_error', sessionErrorPayload(session, 'missing_question_embedding')); + } + } catch (error) { + DebugLogger.error('qa', { + type: 'debug.qa.persistence_error', + message: (error as Error)?.message ?? 'unknown', + sessionId: session.id, + }); + stream.emit('session_error', sessionErrorPayload(session, 'persistence_failed')); + } stream.end(); }); + llmStream.on('error', (e) => { DebugLogger.error('qa', { type: 'debug.qa.llmError', message: (e as any)?.message || 'error' }); - }); - - })().catch(err => { - console.error('Stream process error:', err); - stream.write(`event: error\ndata: ${JSON.stringify({ message: 'Internal server error' })}\n\n`); + stream.write(`event: error\n`); + stream.write(`data: ${JSON.stringify({ message: 'Internal server error' })}\n\n`); + stream.emit('session_error', sessionErrorPayload(session, 'llm_error')); stream.end(); + }); + })().catch((err) => { + console.error('Stream process error:', err); + stream.write(`event: error\ndata: ${JSON.stringify({ message: 'Internal server error' })}\n\n`); + stream.emit('session_error', sessionErrorPayload(session, 'stream_error')); + stream.end(); }); return stream; diff --git a/src/services/qa.v2.service.ts b/src/services/qa.v2.service.ts index 209b5d4..dd1c8f2 100644 --- a/src/services/qa.v2.service.ts +++ b/src/services/qa.v2.service.ts @@ -10,6 +10,11 @@ import { runSemanticSearch } from './semantic-search.service'; import { runHybridSearch } from './hybrid-search.service'; import { createEmbeddings } from './embedding.service'; import { DebugLogger } from '../utils/debug-logger'; +import * as sessionHistoryService from './session-history.service'; +import { AskSession } from '../repositories/ask-session.repository'; +import { extractAnswerText } from '../utils/sse'; +import { rewriteTone } from './replace-tone.service'; +import type { LlmOverride } from '../types/llm.types'; // HTML을 제거하고 길이를 제한해 LLM 컨텍스트를 정리 const preprocessContent = (content: string): string => { @@ -28,28 +33,193 @@ const getSpeechTonePrompt = async (speechTone: number, userId: string): Promise< return '간결하고 명확한 말투로 답변해'; }; -type LlmOverride = { - provider?: 'openai' | 'gemini'; - model?: string; - options?: { temperature?: number; top_p?: number; max_output_tokens?: number }; +export interface AnswerStreamV2Options { + question: string; + session: AskSession; + requesterUserId: string; + ownerUserId: string; + categoryId?: number; + speechTone?: number; + postId?: number; + llm?: LlmOverride; +} + +const prependHistory = ( + base: { role: 'system' | 'user' | 'assistant' | 'tool' | 'function'; content: string }[], + history: { role: 'user' | 'assistant'; content: string }[] +) => { + if (!history.length) return base; + if (base.length === 0 || base[0].role !== 'system') { + return [...history, ...base]; + } + const [systemMessage, ...rest] = base; + return [systemMessage, ...history, ...rest]; }; +const sessionSavedPayload = (session: AskSession, cached = false) => ({ + session_id: String(session.id), + owner_user_id: session.ownerUserId, + requester_user_id: session.requesterUserId, + cached, +}); + +const sessionErrorPayload = (session: AskSession, reason: string) => ({ + session_id: String(session.id), + owner_user_id: session.ownerUserId, + requester_user_id: session.requesterUserId, + reason, +}); + // 검색 계획을 활용한 v2 QA 스트림을 생성 -export const answerStreamV2 = async ( - question: string, - userId: string, - categoryId?: number, - speechTone: number = -1, - postId?: number, - llm?: LlmOverride -): Promise => { +export const answerStreamV2 = async ({ + question, + session, + requesterUserId, + ownerUserId, + categoryId, + speechTone = -1, + postId, + llm, +}: AnswerStreamV2Options): Promise => { const stream = new PassThrough(); + let bufferedAnswer = ''; + let searchPlanPayload: Record | null = null; + let retrievalMetaPayload: Record | null = null; + let questionEmbedding: number[] | null = null; + let duplicateQuestionEmbedding: number[] | null = null; + let clientDisconnected = false; + + const replayCachedAnswer = async ( + cached: sessionHistoryService.CachedAnswerResult, + options?: { answerOverride?: string; speechToneIdOverride?: number } + ) => { + const finalAnswer = options?.answerOverride ?? cached.answer; + const speechToneForPersistence = + typeof options?.speechToneIdOverride === 'number' ? options.speechToneIdOverride : cached.speechToneId; + if (cached.searchPlan) { + stream.write(`event: search_plan\n`); + stream.write(`data: ${JSON.stringify(cached.searchPlan)}\n\n`); + } + const context = Array.isArray((cached.retrievalMeta as any)?.context) + ? (cached.retrievalMeta as any).context + : null; + if (context) { + stream.write(`event: search_result\n`); + stream.write(`data: ${JSON.stringify(context)}\n\n`); + stream.write(`event: context\n`); + stream.write(`data: ${JSON.stringify(context)}\n\n`); + } + const existFlag = (cached.retrievalMeta as any)?.exist_in_post_status; + if (typeof existFlag === 'boolean') { + stream.write(`event: exist_in_post_status\n`); + stream.write(`data: ${JSON.stringify(existFlag)}\n\n`); + } + stream.write(`event: answer\n`); + stream.write(`data: ${JSON.stringify(finalAnswer)}\n\n`); + + try { + if (!questionEmbedding || !duplicateQuestionEmbedding) + throw new Error('Missing embeddings for cache replay'); + await sessionHistoryService.persistConversation({ + sessionId: session.id, + requesterUserId, + ownerUserId, + question, + answer: finalAnswer, + searchPlan: cached.searchPlan ?? undefined, + retrievalMeta: cached.retrievalMeta ?? undefined, + categoryId, + postId, + questionEmbedding, + duplicateQuestionEmbedding, + speechTone: speechToneForPersistence, + }); + stream.emit('session_saved', sessionSavedPayload(session, true)); + } catch (error) { + DebugLogger.error('qa', { + type: 'debug.qa.v2.cache_persistence_error', + sessionId: session.id, + message: (error as Error)?.message ?? 'unknown', + }); + stream.emit('session_error', sessionErrorPayload(session, 'persistence_failed')); + } + stream.end(); + }; + + stream.once('client_disconnect', () => { + clientDisconnected = true; + }); + (async () => { - const [speechTonePrompt, blogMeta] = await Promise.all([ - getSpeechTonePrompt(speechTone, userId), - userRepository.findUserBlogMetadata(userId), + const [speechTonePrompt, blogMeta, historyMessages] = await Promise.all([ + getSpeechTonePrompt(speechTone, ownerUserId), + userRepository.findUserBlogMetadata(ownerUserId), + sessionHistoryService.loadRecentMessages(session.id), ]); + const duplicateQuestionBlock = sessionHistoryService.buildDuplicateQuestionBlock(question, historyMessages); + const embeddingVector = await createEmbeddings([question, duplicateQuestionBlock]); + questionEmbedding = embeddingVector[0]; + duplicateQuestionEmbedding = embeddingVector[1]; + + const cachedAnswerList = duplicateQuestionEmbedding + ? await sessionHistoryService.findCachedAnswer({ + ownerUserId, + requesterUserId, + embedding: duplicateQuestionEmbedding, + postId: postId ?? undefined, + categoryId: categoryId ?? undefined, + }) + : []; + + const requestedSpeechTone = typeof speechTone === 'number' ? speechTone : -1; + const { matchingCandidate: matchingCachedAnswer, rewriteCandidate } = + sessionHistoryService.selectToneAwareCacheCandidate(cachedAnswerList, requestedSpeechTone); + DebugLogger.log('qa', { + type: 'debug.qa.v2.cache_candidates', + requestedSpeechTone, + candidateCount: cachedAnswerList.length, + candidateTones: cachedAnswerList.map((candidate) => candidate.speechToneId), + }); + + if (matchingCachedAnswer) { + DebugLogger.log('qa', { + type: 'debug.qa.v2.cache_hit', + sessionId: session.id, + similarity: matchingCachedAnswer.similarity, + speechTone: requestedSpeechTone, + }); + await replayCachedAnswer(matchingCachedAnswer); + return; + } + + if (rewriteCandidate) { + DebugLogger.log('qa', { + type: 'debug.qa.v2.cache_hit_tone_mismatch', + sessionId: session.id, + similarity: rewriteCandidate.similarity, + requestedSpeechTone, + cachedSpeechTone: rewriteCandidate.speechToneId, + }); + try { + const rewrittenAnswer = await rewriteTone(rewriteCandidate.answer, { + speechToneId: requestedSpeechTone, + speechTonePrompt, + llm, + }); + await replayCachedAnswer(rewriteCandidate, { + answerOverride: rewrittenAnswer, + speechToneIdOverride: requestedSpeechTone, + }); + return; + } catch (error) { + DebugLogger.warn('qa', { + type: 'debug.qa.v2.cache_tone_rewrite_failed', + sessionId: session.id, + message: (error as Error)?.message ?? 'tone_rewrite_failed', + }); + } + } let messages: { role: 'system' | 'user' | 'assistant' | 'tool' | 'function'; content: string }[] = []; let tools: @@ -74,20 +244,21 @@ export const answerStreamV2 = async ( if (!post) { stream.write(`event: error\n`); stream.write(`data: ${JSON.stringify({ code: 404, message: 'Post not found' })}\n\n`); + stream.emit('session_error', sessionErrorPayload(session, 'post_not_found')); stream.end(); return; } - if (!post.is_public && post.user_id !== userId) { + if (!post.is_public && post.user_id !== ownerUserId) { stream.write(`event: error\n`); stream.write(`data: ${JSON.stringify({ code: 403, message: 'Forbidden' })}\n\n`); + stream.emit('session_error', sessionErrorPayload(session, 'forbidden_post')); stream.end(); return; } // 검색 계획 정보를 스트림으로 먼저 공지 + const postPlan = { mode: 'post', filters: { post_id: postId, user_id: ownerUserId } }; stream.write(`event: search_plan\n`); - stream.write( - `data: ${JSON.stringify({ mode: 'post', filters: { post_id: postId, user_id: userId } })}\n\n` - ); + stream.write(`data: ${JSON.stringify(postPlan)}\n\n`); const processed = preprocessContent(post.content); const ctx = [{ postId: post.id, postTitle: post.title }]; @@ -101,16 +272,24 @@ export const answerStreamV2 = async ( messages = toSimpleMessages( qaPrompts.createPostContextPrompt(post, processed, question, speechTonePrompt, blogMeta ?? undefined) ); + + searchPlanPayload = postPlan; + retrievalMetaPayload = { + strategy: '단일 포스트 컨텍스트', + post_id: postId, + context: ctx, + exist_in_post_status: true, + }; } else { // 질문 기반 검색 계획 생성 경로 - const planPair = await generateSearchPlan(question, { user_id: userId, category_id: categoryId }); + const planPair = await generateSearchPlan(question, { user_id: ownerUserId, category_id: categoryId }); if (!planPair) { // 계획 생성 실패 시 v1 RAG로 조용히 폴백 - const [questionEmbedding] = await createEmbeddings([question]); - const similarChunks = await postRepository.findSimilarChunks(userId, questionEmbedding, categoryId); + const similarChunks = await postRepository.findSimilarChunks(ownerUserId, questionEmbedding, categoryId); const context = similarChunks.map((c) => ({ postId: c.postId, postTitle: c.postTitle })); + const fallbackPlan = { mode: 'rag', fallback: true }; stream.write(`event: search_plan\n`); - stream.write(`data: ${JSON.stringify({ mode: 'rag', fallback: true })}\n\n`); + stream.write(`data: ${JSON.stringify(fallbackPlan)}\n\n`); stream.write(`event: search_result\n`); stream.write(`data: ${JSON.stringify(context)}\n\n`); stream.write(`event: exist_in_post_status\n`); @@ -124,24 +303,29 @@ export const answerStreamV2 = async ( postChunk: c.postChunk, createdAt: (c as any).postCreatedAt ?? null, })); + const retrievalMeta = { + strategy: '임베딩 기반 RAG (검색 계획 폴백)', + resultCount: similarChunks.length, + context, + exist_in_post_status: similarChunks.length > 0, + }; messages = toSimpleMessages( qaPrompts.createRagPrompt(question, ragChunks, speechTonePrompt, { - retrievalMeta: { - strategy: '임베딩 기반 RAG (검색 계획 생성 실패 폴백)', - resultCount: similarChunks.length, - notes: ['검색 계획 생성 실패로 기본 임베딩 검색을 사용했습니다.'], - }, + retrievalMeta, blogMeta: blogMeta ?? undefined, }) ); + searchPlanPayload = fallbackPlan; + retrievalMetaPayload = retrievalMeta; } else { const plan: any = planPair.normalized; + searchPlanPayload = plan; stream.write(`event: search_plan\n`); stream.write(`data: ${JSON.stringify(plan)}\n\n`); // 전송된 검색 계획을 디버그 로그로 남김 DebugLogger.log('sse', { type: 'debug.sse.search_plan', - userId, + userId: ownerUserId, categoryId, plan_summary: { mode: plan.mode, @@ -176,14 +360,13 @@ export const answerStreamV2 = async ( } rows = await runHybridSearch( question, - userId, + ownerUserId, plan, { categoryId: categoryId ?? undefined, limit: plan.limit } ); const hybridContext = rows.map((r) => ({ postId: r.postId, postTitle: r.postTitle })); stream.write(`event: hybrid_result\n`); stream.write(`data: ${JSON.stringify(hybridContext)}\n\n`); - // 메타데이터를 선택적으로 구독하는 클라이언트를 위한 추가 이벤트 try { const hybridMeta = rows.map((r) => ({ postId: r.postId, @@ -196,16 +379,15 @@ export const answerStreamV2 = async ( } catch {} if (!rows.length) { - rows = await runSemanticSearch(question, userId, plan, { categoryId: categoryId ?? undefined }); + rows = await runSemanticSearch(question, ownerUserId, plan, { categoryId: categoryId ?? undefined }); } } else { - rows = await runSemanticSearch(question, userId, plan, { categoryId: categoryId ?? undefined }); + rows = await runSemanticSearch(question, ownerUserId, plan, { categoryId: categoryId ?? undefined }); } const context = rows.map((r) => ({ postId: r.postId, postTitle: r.postTitle })); stream.write(`event: search_result\n`); stream.write(`data: ${JSON.stringify(context)}\n\n`); - // 메타데이터를 선택적으로 요청하는 클라이언트를 위한 추가 이벤트 try { const resultMeta = rows.map((r) => ({ postId: r.postId, @@ -227,35 +409,93 @@ export const answerStreamV2 = async ( postChunk: r.postChunk, createdAt: r.postCreatedAt ?? null, })); + const retrievalMeta = { + strategy: plan.hybrid?.enabled + ? `검색 계획 기반 하이브리드 (${plan.hybrid.retrieval_bias || 'balanced'})` + : '검색 계획 기반 임베딩', + plan, + resultCount: rows.length, + context, + exist_in_post_status: rows.length > 0, + }; messages = toSimpleMessages( qaPrompts.createRagPrompt(question, planChunks, speechTonePrompt, { - retrievalMeta: { - strategy: plan.hybrid?.enabled - ? `검색 계획 기반 하이브리드 (${plan.hybrid.retrieval_bias || 'balanced'})` - : '검색 계획 기반 임베딩', - plan, - resultCount: rows.length, - }, + retrievalMeta, blogMeta: blogMeta ?? undefined, }) ); + retrievalMetaPayload = retrievalMeta; } } + const historyForPrompt = historyMessages.map((message) => ({ + role: message.role, + content: message.content, + })) as { role: 'user' | 'assistant'; content: string }[]; + messages = prependHistory(messages, historyForPrompt); + const llmStream = await generate({ provider: llm?.provider || 'openai', model: llm?.model || config.CHAT_MODEL, messages, tools, options: llm?.options, - meta: { userId, categoryId, postId }, + meta: { userId: ownerUserId, categoryId, postId }, }); - llmStream.on('data', (chunk) => stream.write(chunk)); - llmStream.on('end', () => stream.end()); + llmStream.on('data', (chunk) => { + const str = Buffer.isBuffer(chunk) ? chunk.toString('utf8') : String(chunk); + const answerTexts = extractAnswerText(str); + if (answerTexts.length) { + bufferedAnswer += answerTexts.join(''); + } + stream.write(chunk); + }); + llmStream.on('end', async () => { + if (clientDisconnected) { + stream.end(); + return; + } + DebugLogger.log('qa', { + type: 'debug.qa.v2.buffered_answer', + sessionId: session.id, + length: bufferedAnswer.length, + preview: bufferedAnswer.slice(0, 80), + }); + try { + if (questionEmbedding && duplicateQuestionEmbedding) { + await sessionHistoryService.persistConversation({ + sessionId: session.id, + requesterUserId, + ownerUserId, + question, + answer: bufferedAnswer.trim(), + searchPlan: searchPlanPayload ?? undefined, + retrievalMeta: retrievalMetaPayload ?? undefined, + categoryId, + postId, + questionEmbedding, + duplicateQuestionEmbedding, + speechTone, + }); + stream.emit('session_saved', sessionSavedPayload(session)); + } else { + stream.emit('session_error', sessionErrorPayload(session, 'missing_question_embedding')); + } + } catch (error) { + DebugLogger.error('qa', { + type: 'debug.qa.v2.persistence_error', + message: (error as Error)?.message ?? 'unknown', + sessionId: session.id, + }); + stream.emit('session_error', sessionErrorPayload(session, 'persistence_failed')); + } + stream.end(); + }); llmStream.on('error', () => { stream.write(`event: error\n`); stream.write(`data: ${JSON.stringify({ message: 'Internal server error' })}\n\n`); + stream.emit('session_error', sessionErrorPayload(session, 'llm_error')); stream.end(); }); })().catch((err) => { @@ -264,6 +504,7 @@ export const answerStreamV2 = async ( } catch {} stream.write(`event: error\n`); stream.write(`data: ${JSON.stringify({ message: 'Internal server error' })}\n\n`); + stream.emit('session_error', sessionErrorPayload(session, 'stream_error')); stream.end(); }); diff --git a/src/services/replace-tone.service.ts b/src/services/replace-tone.service.ts new file mode 100644 index 0000000..698874a --- /dev/null +++ b/src/services/replace-tone.service.ts @@ -0,0 +1,70 @@ +import config from '../config'; +import { generate } from '../llm'; +import { extractAnswerText } from '../utils/sse'; +import type { LlmOverride } from '../types/llm.types'; + +const SYSTEM_PROMPT = + '주어진 원문을 말투에 맞게 변경해라. 아래 콘텐츠 원문의 의미, 사실, 구조를 훼손하지 말고, 요청된 tone 지시만 반영해 다시 작성해.'; +const MIN_LENGTH_RATIO = 0.5; +const MAX_LENGTH_RATIO = 1.5; + +const collectAnswerFromStream = (stream: NodeJS.ReadableStream): Promise => { + return new Promise((resolve, reject) => { + let buffer = ''; + stream.on('data', (chunk) => { + const str = Buffer.isBuffer(chunk) ? chunk.toString('utf8') : String(chunk); + const texts = extractAnswerText(str); + if (texts.length) buffer += texts.join(''); + }); + stream.on('end', () => resolve(buffer.trim())); + stream.on('error', (err) => reject(err)); + }); +}; + +export interface RewriteToneOptions { + speechToneId: number; + speechTonePrompt: string; + llm?: LlmOverride; +} + +export const rewriteTone = async (answer: string, opts: RewriteToneOptions): Promise => { + const original = (answer ?? '').trim(); + if (!original) throw new Error('tone_rewrite_original_empty'); + + const provider = opts.llm?.provider || 'openai'; + const model = opts.llm?.model || config.CHAT_MODEL; + const options = { ...(opts.llm?.options || {}) }; + if (typeof options.temperature !== 'number') { + options.temperature = 0.2; + } + if (typeof options.top_p !== 'number') { + options.top_p = 0.9; + } + const approxTokens = Math.max(120, Math.min(4096, Math.ceil(original.length * 1.2))); + if (typeof options.max_output_tokens !== 'number') { + options.max_output_tokens = approxTokens; + } + + const stream = await generate({ + provider, + model, + messages: [ + { role: 'system', content: SYSTEM_PROMPT }, + { + role: 'user', + content: `tone 지시: ${opts.speechTonePrompt}\n원문: ${original}`, + }, + ], + options, + }); + + const rewritten = await collectAnswerFromStream(stream); + if (!rewritten) throw new Error('tone_rewrite_empty'); + + const ratio = rewritten.length / Math.max(1, original.length); + if (ratio < MIN_LENGTH_RATIO || ratio > MAX_LENGTH_RATIO) { + throw new Error('tone_rewrite_ratio_out_of_bounds'); + } + + return rewritten; +}; diff --git a/src/services/session-history.service.ts b/src/services/session-history.service.ts new file mode 100644 index 0000000..7621a96 --- /dev/null +++ b/src/services/session-history.service.ts @@ -0,0 +1,184 @@ +import type { AskMessage } from '../repositories/ask-message.repository'; +import * as messageRepository from '../repositories/ask-message.repository'; +import * as questionCacheRepository from '../repositories/ask-question-cache.repository'; +import * as sessionRepository from '../repositories/ask-session.repository'; +import { withTransaction } from '../utils/db'; + +export const HISTORY_MESSAGE_LIMIT = 4; +export const DUPLICATE_SIMILARITY_THRESHOLD = 0.93; +const DUPLICATE_USER_HISTORY_LIMIT = 2; +const DUPLICATE_TURN_LIMITS = { + previousFar: 400, + previousNear: 600, + current: 800, +}; + +const normalizeQuestionText = (input: string, limit: number) => { + const cleaned = (input ?? '').replace(/\s+/g, ' ').trim(); + if (!limit || cleaned.length <= limit) return cleaned; + return cleaned.slice(0, limit); +}; + +export const buildDuplicateQuestionBlock = (question: string, history: AskMessage[]): string => { + const userQuestions = (history || []).filter((msg) => msg.role === 'user').map((msg) => msg.content); + const recent = userQuestions.slice(-DUPLICATE_USER_HISTORY_LIMIT); + const sections: string[] = []; + if (recent.length === 2) { + sections.push(`[Q-2] ${normalizeQuestionText(recent[0], DUPLICATE_TURN_LIMITS.previousFar)}`); + sections.push(`[Q-1] ${normalizeQuestionText(recent[1], DUPLICATE_TURN_LIMITS.previousNear)}`); + } else if (recent.length === 1) { + sections.push(`[Q-1] ${normalizeQuestionText(recent[0], DUPLICATE_TURN_LIMITS.previousNear)}`); + } + sections.push(`[Q-now] ${normalizeQuestionText(question, DUPLICATE_TURN_LIMITS.current)}`); + return sections.join('\n'); +}; + +export const selectToneAwareCacheCandidate = ( + candidates: CachedAnswerResult[], + requestedSpeechTone: number +): { + matchingCandidate: CachedAnswerResult | null; + rewriteCandidate: CachedAnswerResult | null; +} => { + const normalizedTone = typeof requestedSpeechTone === 'number' ? requestedSpeechTone : -1; + const matchingCandidate = candidates.find((candidate) => candidate.speechToneId === normalizedTone) ?? null; + if (matchingCandidate) { + return { matchingCandidate, rewriteCandidate: null }; + } + return { matchingCandidate: null, rewriteCandidate: candidates[0] ?? null }; +}; + +export const loadRecentMessages = async (sessionId: number, limit = HISTORY_MESSAGE_LIMIT) => { + const boundedLimit = Math.max(0, Math.min(limit, HISTORY_MESSAGE_LIMIT)); + if (boundedLimit === 0) return []; + return messageRepository.getLatestMessages(sessionId, boundedLimit); +}; + +export interface PersistConversationInput { + sessionId: number; + requesterUserId: string; + ownerUserId: string; + question: string; + answer: string; + searchPlan?: Record | null; + retrievalMeta?: Record | null; + categoryId?: number; + postId?: number; + questionEmbedding: number[]; + duplicateQuestionEmbedding: number[]; + speechTone?: number; +} + +export const persistConversation = async ({ + sessionId, + requesterUserId, + ownerUserId, + question, + answer, + searchPlan, + retrievalMeta, + categoryId, + postId, + questionEmbedding, + duplicateQuestionEmbedding, + speechTone, +}: PersistConversationInput): Promise => { + if (!answer || questionEmbedding.length === 0 || duplicateQuestionEmbedding.length === 0) return; + + await withTransaction(async (client) => { + const userMessage = await messageRepository.insertMessage( + { + sessionId, + role: 'user', + content: question, + searchPlan: searchPlan ?? null, + retrievalMeta: null, + }, + client + ); + + const assistantMessage = await messageRepository.insertMessage( + { + sessionId, + role: 'assistant', + content: answer, + searchPlan: null, + retrievalMeta: retrievalMeta ?? null, + }, + client + ); + + await questionCacheRepository.upsertEmbedding( + { + messageId: userMessage.id, + ownerUserId, + requesterUserId, + categoryId: categoryId ?? null, + postId: postId ?? null, + answerMessageId: assistantMessage.id, + speechToneId: speechTone ?? -1, + embedding: duplicateQuestionEmbedding, + }, + client + ); + + await sessionRepository.touchSessionLastQuestion(sessionId, client); + }); +}; + +export interface CachedAnswerResult { + answer: string; + searchPlan: Record | null; + retrievalMeta: Record | null; + similarity: number; + speechToneId: number; +} + +export interface FindCachedAnswerParams { + ownerUserId: string; + requesterUserId: string; + embedding: number[]; + postId?: number; + categoryId?: number; + threshold?: number; +} + +export const findCachedAnswer = async ({ + ownerUserId, + requesterUserId, + embedding, + postId, + categoryId, + threshold = DUPLICATE_SIMILARITY_THRESHOLD, +}: FindCachedAnswerParams): Promise => { + const candidates = await questionCacheRepository.findSimilarEmbeddings({ + ownerUserId, + requesterUserId, + embedding, + postId: postId ?? null, + categoryId: categoryId ?? null, + limit: 3, + }); + + const hydratedCandidates: CachedAnswerResult[] = []; + + for (const candidate of candidates) { + if (candidate.similarity < threshold) continue; + if (!candidate.answerMessageId) continue; + + const userMessage = await messageRepository.getMessageById(candidate.messageId); + if (!userMessage) continue; + const assistantMessage = await messageRepository.getMessageById(candidate.answerMessageId); + if (!assistantMessage) continue; + + hydratedCandidates.push({ + answer: assistantMessage.content, + searchPlan: userMessage.searchPlan ?? null, + retrievalMeta: assistantMessage.retrievalMeta ?? null, + similarity: candidate.similarity, + speechToneId: candidate.speechToneId ?? -1, + }); + } + + return hydratedCandidates; +}; diff --git a/src/types/ai.types.ts b/src/types/ai.types.ts index bffc72a..a70fe3e 100644 --- a/src/types/ai.types.ts +++ b/src/types/ai.types.ts @@ -24,7 +24,8 @@ export type EmbedContentRequest = z.infer['body']; export const askSchema = z.object({ body: z.object({ question: z.string(), - user_id: z.string(), + user_id: z.string().optional(), + session_id: z.string().optional(), category_id: z.number().optional(), post_id: z.number().optional(), speech_tone: z.number().optional(), diff --git a/src/types/ai.v2.types.ts b/src/types/ai.v2.types.ts index c806262..b58378d 100644 --- a/src/types/ai.v2.types.ts +++ b/src/types/ai.v2.types.ts @@ -83,7 +83,8 @@ export type SearchPlan = z.infer; export const askV2Schema = z.object({ body: z.object({ question: z.string(), - user_id: z.string(), + user_id: z.string().optional(), + session_id: z.string().optional(), category_id: z.number().optional(), post_id: z.number().optional(), speech_tone: z.number().optional(), diff --git a/src/types/llm.types.ts b/src/types/llm.types.ts new file mode 100644 index 0000000..86966b9 --- /dev/null +++ b/src/types/llm.types.ts @@ -0,0 +1,5 @@ +export type LlmOverride = { + provider?: 'openai' | 'gemini'; + model?: string; + options?: { temperature?: number; top_p?: number; max_output_tokens?: number }; +}; diff --git a/src/types/session.types.ts b/src/types/session.types.ts new file mode 100644 index 0000000..49b7a64 --- /dev/null +++ b/src/types/session.types.ts @@ -0,0 +1,28 @@ +import { z } from 'zod'; + +export const sessionListQuerySchema = z.object({ + limit: z + .preprocess((value) => (value === undefined ? undefined : Number(value)), z.number().int().min(1).max(50)) + .optional() + .default(20), + cursor: z.string().optional(), + owner_user_id: z.string().min(1).optional(), +}); + +export const sessionMessagesQuerySchema = z.object({ + limit: z + .preprocess((value) => (value === undefined ? undefined : Number(value)), z.number().int().min(1).max(50)) + .optional() + .default(20), + cursor: z.string().optional(), + direction: z.enum(['forward', 'backward']).optional().default('backward'), +}); + +export const sessionPatchSchema = z.object({ + title: z.string().trim().min(1).max(200).optional(), + metadata: z.record(z.unknown()).optional(), +}); + +export type SessionListQuery = z.infer; +export type SessionMessagesQuery = z.infer; +export type SessionPatchBody = z.infer; diff --git a/src/utils/auth.ts b/src/utils/auth.ts new file mode 100644 index 0000000..792ae39 --- /dev/null +++ b/src/utils/auth.ts @@ -0,0 +1,9 @@ +import { AuthRequest } from '../middlewares/auth.middleware'; + +export const extractRequesterId = (req: AuthRequest): string | null => { + const user = req.user; + if (!user || typeof user !== 'object') return null; + + const candidate = (user as Record).user_id ?? (user as Record).sub; + return typeof candidate === 'string' ? candidate : null; +}; diff --git a/src/utils/db.ts b/src/utils/db.ts index cc73577..292548a 100644 --- a/src/utils/db.ts +++ b/src/utils/db.ts @@ -1,11 +1,42 @@ -import { Pool } from 'pg'; +import { Pool, PoolClient, QueryResult, QueryResultRow } from 'pg'; import config from '../config'; const pool = new Pool({ connectionString: config.DATABASE_URL, }); -// 재사용 가능한 PG 풀 인스턴스를 반환 -export const getDb = () => { - return pool; +export type DbPool = Pool; +export type DbClient = PoolClient; +export type QueryExecutor = Pick | Pick; + +export const getDb = (): DbPool => pool; + +/** + * Runs a parametrized query using either the shared pool or a provided client. + */ +export const runQuery = async ( + sql: string, + params: unknown[] = [], + executor?: QueryExecutor +): Promise> => { + const target = executor ?? pool; + return target.query(sql, params); +}; + +/** + * Wraps a callback in a BEGIN/COMMIT transaction. + */ +export const withTransaction = async (callback: (client: PoolClient) => Promise): Promise => { + const client = await pool.connect(); + try { + await client.query('BEGIN'); + const result = await callback(client); + await client.query('COMMIT'); + return result; + } catch (error) { + await client.query('ROLLBACK'); + throw error; + } finally { + client.release(); + } }; diff --git a/src/utils/debug-logger.ts b/src/utils/debug-logger.ts index c81bbc6..dc4f159 100644 --- a/src/utils/debug-logger.ts +++ b/src/utils/debug-logger.ts @@ -12,6 +12,14 @@ export type DebugChannel = type DebugLevel = 'log' | 'info' | 'warn' | 'error'; const ALL_CHANNELS: DebugChannel[] = ['plan', 'hybrid', 'qa', 'sse', 'llm', 'openai', 'server']; +const DEFAULT_EXCLUDED_TYPES = new Set([ + 'debug.qa.chunk', + 'llm.request', + 'llm.response', + 'llm.error', + 'debug.llm.start', + 'debug.llm.end', +]); const parseEnabledChannels = (): Set => { const set = new Set(); @@ -29,6 +37,47 @@ const parseEnabledChannels = (): Set => { const enabledAll = (config.DEBUG_ALL || '').toString().toLowerCase() === 'true'; const enabledChannels = parseEnabledChannels(); +const parseExcludedTypes = (): { + global: Set; + byChannel: Map>; +} => { + const global = new Set(DEFAULT_EXCLUDED_TYPES); + const byChannel = new Map>(); + const raw = (config.DEBUG_EXCLUDE_TYPES || '').toString(); + if (!raw) return { global, byChannel }; + for (const token of raw.split(',')) { + const item = token.trim(); + if (!item) continue; + const [rawChannel, rawType] = item.includes(':') ? item.split(':', 2) : [null, item]; + const type = (rawType ?? '').trim(); + if (!type) continue; + if (!rawChannel) { + global.add(type); + continue; + } + const channelKey = ALL_CHANNELS.find((ch) => ch === rawChannel.trim()); + if (!channelKey) { + global.add(type); + continue; + } + const set = byChannel.get(channelKey) ?? new Set(); + set.add(type); + byChannel.set(channelKey, set); + } + return { global, byChannel }; +}; + +const excludedTypes = parseExcludedTypes(); + +const shouldSkipPayload = (channel: DebugChannel, payload: unknown): boolean => { + if (!payload || typeof payload !== 'object' || Array.isArray(payload)) return false; + const type = (payload as Record).type; + if (typeof type !== 'string' || !type) return false; + if (excludedTypes.global.has('*') || excludedTypes.global.has(type)) return true; + const channelSet = excludedTypes.byChannel.get(channel); + if (!channelSet) return false; + return channelSet.has('*') || channelSet.has(type); +}; const pickWriter = (level: DebugLevel) => { switch (level) { @@ -95,6 +144,7 @@ export const DebugLogger = { }, write(channel: DebugChannel, payload: unknown, level: DebugLevel = 'log'): void { if (!this.isEnabled(channel)) return; + if (shouldSkipPayload(channel, payload)) return; const writer = pickWriter(level); writer(formatPayload(channel, payload)); }, diff --git a/src/utils/session.ts b/src/utils/session.ts new file mode 100644 index 0000000..303036c --- /dev/null +++ b/src/utils/session.ts @@ -0,0 +1,54 @@ +import * as sessionRepository from '../repositories/ask-session.repository'; + +export class SessionContextError extends Error { + status: number; + constructor(status: number, message: string) { + super(message); + this.status = status; + } +} + +export interface ResolveSessionOptions { + requesterUserId: string; + sessionId?: string | null; + ownerUserId?: string | null; + titleHint?: string; +} + +export const resolveSessionContext = async ({ + requesterUserId, + sessionId, + ownerUserId, + titleHint, +}: ResolveSessionOptions): Promise<{ session: sessionRepository.AskSession; created: boolean }> => { + if (sessionId) { + const numericId = Number(sessionId); + if (!Number.isFinite(numericId) || numericId <= 0) { + throw new SessionContextError(400, 'Invalid session_id'); + } + const session = await sessionRepository.findSessionForRequester(numericId, requesterUserId); + if (!session) { + throw new SessionContextError(404, 'Session not found'); + } + if (ownerUserId && ownerUserId !== session.ownerUserId) { + throw new SessionContextError(409, 'Session owner mismatch'); + } + return { session, created: false }; + } + + const trimmedOwner = ownerUserId?.trim(); + if (!trimmedOwner) { + throw new SessionContextError(400, 'user_id is required when session_id is missing'); + } + + const normalizedTitle = titleHint?.trim(); + const title = normalizedTitle ? normalizedTitle.slice(0, 120) : null; + + const session = await sessionRepository.createSession({ + requesterUserId, + ownerUserId: trimmedOwner, + title, + }); + + return { session, created: true }; +}; diff --git a/src/utils/sse.ts b/src/utils/sse.ts new file mode 100644 index 0000000..dc4b217 --- /dev/null +++ b/src/utils/sse.ts @@ -0,0 +1,37 @@ +export const extractAnswerText = (sseChunk: string): string[] => { + if (!sseChunk) return []; + const blocks = sseChunk.split('\n\n'); + const results: string[] = []; + + for (const block of blocks) { + if (!block.trim()) continue; + const lines = block.split('\n'); + let eventName: string | null = null; + const dataLines: string[] = []; + + for (const line of lines) { + if (line.startsWith('event:')) { + eventName = line.slice(6).trim(); + } else if (line.startsWith('data:')) { + dataLines.push(line.slice(5)); + } + } + + if (eventName === 'answer' && dataLines.length > 0) { + const raw = dataLines.join('\n').trim(); + if (!raw) continue; + try { + const parsed = JSON.parse(raw); + if (typeof parsed === 'string') { + results.push(parsed); + } else { + results.push(JSON.stringify(parsed)); + } + } catch { + results.push(raw); + } + } + } + + return results; +}; diff --git a/src/worker/queue-consumer.ts b/src/worker/queue-consumer.ts index 196c3d9..43a4eb8 100644 --- a/src/worker/queue-consumer.ts +++ b/src/worker/queue-consumer.ts @@ -7,6 +7,7 @@ import { storeTitleEmbedding, } from '../services/embedding.service'; import { findPostById } from '../repositories/post.repository'; +import { deleteEmbeddingsByOwner } from '../repositories/ask-question-cache.repository'; type EmbeddingJob = { postId: number; @@ -86,6 +87,8 @@ const processJob = async (job: EmbeddingJob) => { const title = typeof post.title === 'string' ? post.title.trim() : ''; const content = typeof post.content === 'string' ? post.content.trim() : ''; + let invalidateCachedAnswers = false; + if (shouldProcessTitle) { if (!title) { console.warn('[embedding-worker]', { @@ -96,6 +99,7 @@ const processJob = async (job: EmbeddingJob) => { } else { await storeTitleEmbedding(postId, title); console.log(`[embedding-worker] stored title embedding for post ${postId}`); + invalidateCachedAnswers = true; } } @@ -125,6 +129,24 @@ const processJob = async (job: EmbeddingJob) => { console.log( `[embedding-worker] stored content embeddings for post ${postId} (chunks=${chunks.length})` ); + invalidateCachedAnswers = true; + } + + if (invalidateCachedAnswers) { + try { + const removed = await deleteEmbeddingsByOwner(post.user_id); + console.log('[embedding-worker]', { + type: 'worker.ask_cache.invalidate', + ownerUserId: post.user_id, + removed, + }); + } catch (error) { + console.error('[embedding-worker]', { + type: 'worker.ask_cache.invalidate_failed', + ownerUserId: post.user_id, + message: (error as Error)?.message ?? 'unknown', + }); + } } };