Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions services/dialog-engine/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ FastAPI service that powers synchronous chat + audio flows. It now accepts raw a
- `POST /chat/stream` – existing text SSE endpoint.
- `POST /chat/audio` – accepts base64 audio payloads, runs ASR, returns JSON transcript/reply.
- `POST /chat/audio/stream` – SSE stream that emits `asr-partial`, `asr-final`, `text-delta`, and `done` events.
- `POST /chat/vision` – accepts base64-encoded images plus optional prompts for multimodal reasoning.
- `POST /tts/mock` – helper for synchronous TTS testing (requires `SYNC_TTS_STREAMING=true`).

### Example (Sync Audio)
Expand Down Expand Up @@ -45,6 +46,7 @@ Use any SSE client (curl `-N`, Postman, or VS Code REST client) to hit `/chat/au
| `ASR_WHISPER_CACHE_DIR` | Optional model cache path | unset |
| `SYNC_TTS_STREAMING` | Enable `/tts/mock` audio push | `false` |
| `ENABLE_ASYNC_EXT` | Enables outbox + analytics events | `false` |
| `VISION_MAX_BYTES` | Max accepted image payload size in bytes | `4194304` |
| `OUTPUT_INGEST_WS_URL` | Output handler WS endpoint | `ws://localhost:8002/ws/ingest/tts` |

## Dependencies
Expand Down
81 changes: 81 additions & 0 deletions services/dialog-engine/src/dialog_engine/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
logger = logging.getLogger(__name__)
SYNC_TTS_STREAMING = os.getenv("SYNC_TTS_STREAMING", "false").lower() in {"1", "true", "yes", "on"}
ENABLE_ASYNC_EXT = os.getenv("ENABLE_ASYNC_EXT", "false").lower() in {"1", "true", "yes", "on"}
VISION_MAX_BYTES = int(os.getenv("VISION_MAX_BYTES", 4 * 1024 * 1024))
_flush_task = None

try:
Expand Down Expand Up @@ -431,6 +432,86 @@ async def event_generator() -> AsyncGenerator[bytes, None]:
return StreamingResponse(event_generator(), media_type="text/event-stream", headers=headers)


@app.post("/chat/vision")
async def chat_vision(request: Request) -> JSONResponse:
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="invalid json")

session_id = str(body.get("sessionId") or "default")
raw_image = body.get("image")
if not isinstance(raw_image, str) or not raw_image.strip():
raise HTTPException(status_code=400, detail="image required")

try:
image_bytes = base64.b64decode(raw_image, validate=True)
except (binascii.Error, TypeError):
raise HTTPException(status_code=400, detail="invalid image encoding")

if not image_bytes:
raise HTTPException(status_code=400, detail="image required")
if len(image_bytes) > VISION_MAX_BYTES:
raise HTTPException(status_code=413, detail="image payload too large")

prompt_raw = body.get("prompt")
prompt = prompt_raw.strip() if isinstance(prompt_raw, str) else None
mime_type_raw = body.get("mimeType")
mime_type = (
mime_type_raw.strip()
if isinstance(mime_type_raw, str) and mime_type_raw.strip()
else "image/png"
)

meta_raw = body.get("meta")
meta = dict(meta_raw) if isinstance(meta_raw, dict) else {}
meta.setdefault("input_mode", "image")

image_b64 = base64.b64encode(image_bytes).decode("ascii")

user_turn = "[图片输入]"
if prompt:
user_turn = f"[图片输入]\n提示: {prompt}"
await chat_service.remember_turn(session_id=session_id, role="user", content=user_turn)

try:
result = await chat_service.describe_image(
session_id=session_id,
image_b64=image_b64,
prompt=prompt,
mime_type=mime_type,
meta=meta,
)
except HTTPException:
raise
except Exception as exc: # pragma: no cover - guard downstream failures
logger.exception("chat.vision.failed", extra={"sessionId": session_id})
raise HTTPException(status_code=502, detail="vision_failed") from exc

reply_text = str(result.get("reply", ""))
prompt_text = str(result.get("prompt") or (prompt or ""))
stats = result.get("stats") or {}

await chat_service.remember_turn(session_id=session_id, role="assistant", content=reply_text)

response_payload = {
"sessionId": session_id,
"prompt": prompt_text,
"reply": reply_text,
"stats": stats,
}

_emit_async_events(
session_id=session_id,
body=body,
transcript=user_turn,
reply_text=reply_text,
stats=stats,
)

return JSONResponse(response_payload)


@app.post("/tts/mock")
async def tts_mock(request: Request, background: BackgroundTasks):
"""M2: Trigger a mock TTS stream to Output's ingest WS for testing.
Expand Down
130 changes: 130 additions & 0 deletions services/dialog-engine/src/dialog_engine/chat_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,74 @@ async def stream_reply(
):
yield delta

async def describe_image(
self,
session_id: str,
*,
image_b64: str,
prompt: str | None,
mime_type: str | None,
meta: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
meta = meta or {}
raw_prompt = (prompt or "").strip()
prompt_text = raw_prompt or "请描述这张图片。"
lang = str(meta.get("lang") or "zh")

self._reset_metrics()
context_turns: List[MemoryTurn] = []
ltm_snippets: List[str] = []

if self._settings.llm.enabled:
context_turns = await self._fetch_short_term_context(session_id=session_id)
ltm_snippets = await self._fetch_ltm_snippets(
session_id=session_id,
user_text=prompt_text,
meta=meta,
)
self._log_context_info(len(context_turns), len(ltm_snippets))
try:
reply_text = await self._generate_vision_reply(
session_id=session_id,
prompt_text=prompt_text,
image_b64=image_b64,
mime_type=mime_type or "image/png",
meta=meta,
context=context_turns,
ltm_snippets=ltm_snippets,
)
self.last_source = "llm"
self.last_error = None
self.last_ttft_ms = None
self.last_token_count = self._estimate_tokens(reply_text)
stats = {
"chat": {
"source": self.last_source,
"tokens": self.last_token_count,
"ttft_ms": self.last_ttft_ms,
}
}
return {"reply": reply_text, "prompt": prompt_text, "stats": stats}
except LLMNotConfiguredError as exc:
self.last_error = "llm_not_configured"
self._log_llm_fallback(reason=str(exc))
except Exception as exc: # pragma: no cover - defensive catch
self.last_error = exc.__class__.__name__
self._log_llm_fallback(reason=repr(exc))

reply_text = self._craft_image_reply(raw_prompt, lang)
self.last_source = "mock"
self.last_ttft_ms = None
self.last_token_count = self._estimate_tokens(reply_text)
stats = {
"chat": {
"source": self.last_source,
"tokens": self.last_token_count,
"ttft_ms": self.last_ttft_ms,
}
}
return {"reply": reply_text, "prompt": prompt_text, "stats": stats}

async def _stream_llm(
self,
*,
Expand Down Expand Up @@ -118,6 +186,44 @@ async def _stream_mock(
await asyncio.sleep(0.02 + random.random() * 0.03)
yield word + (" " if not word.endswith("\n") else "")

async def _generate_vision_reply(
self,
*,
session_id: str,
prompt_text: str,
image_b64: str,
mime_type: str,
meta: Dict[str, Any],
context: List[MemoryTurn],
ltm_snippets: List[str],
) -> str:
client = await self._ensure_llm_client()
messages = self._compose_messages(
user_text=prompt_text,
meta=meta,
context=context,
ltm_snippets=ltm_snippets,
)
content: list[Dict[str, Any]] = []
if prompt_text:
content.append({"type": "text", "text": prompt_text})
content.append(
{
"type": "image_url",
"image_url": {
"url": f"data:{mime_type};base64,{image_b64}",
},
}
)
if messages:
messages[-1] = {"role": "user", "content": content}
else: # pragma: no cover - defensive path
messages = [{"role": "user", "content": content}]
extra_options: Dict[str, Any] = {
"extra_headers": {"x-session-id": session_id},
}
return await client.generate_vision_reply(messages, extra_options=extra_options)

async def remember_turn(self, session_id: str, *, role: str, content: str) -> None:
if not content or not content.strip():
return
Expand Down Expand Up @@ -277,3 +383,27 @@ def _craft_reply(self, user_text: str, lang: str) -> str:
f"You said: '{user_text.strip()}'. That sounds interesting! I'm here to chat whenever you like. "
"Feel free to share what you're up to!"
)

def _craft_image_reply(self, prompt_text: str, lang: str) -> str:
display_prompt = prompt_text.strip() if prompt_text else ""
if lang.lower().startswith("zh"):
if display_prompt:
return (
f"这张图片听起来很有意思!虽然我暂时看不到实际画面,"
f"但根据你的提示「{display_prompt}」我可以和你一起展开想象。"
"要不要再告诉我一些细节?"
)
return (
"这张图片看起来很有意思!虽然我暂时无法直接看到内容,"
"但如果你描述更多细节,我会和你一起展开想象。"
)
if display_prompt:
return (
"That picture sounds fascinating! I can't see it directly right now, "
f"but with your hint \"{display_prompt}\" we can imagine it together. "
"Feel free to share more details!"
)
return (
"That picture sounds fascinating! I can't view it directly, but if you describe a few more details "
"we can imagine it together."
)
70 changes: 69 additions & 1 deletion services/dialog-engine/src/dialog_engine/llm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

logger = logging.getLogger(__name__)

ChatMessage = Dict[str, str]
ChatMessage = Dict[str, Any]


class LLMNotConfiguredError(RuntimeError):
Expand Down Expand Up @@ -153,5 +153,73 @@ async def stream_chat(

raise RuntimeError("LLM streaming failed after retries") from last_error

async def generate_vision_reply(
self,
messages: Sequence[ChatMessage],
*,
model: Optional[str] = None,
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
top_p: Optional[float] = None,
timeout: Optional[float] = None,
extra_options: Optional[Dict[str, Any]] = None,
) -> str:
"""Issue a non-streaming chat completion for multimodal prompts."""

cfg = self._llm_cfg
params: Dict[str, Any] = {
"model": model or cfg.model,
"messages": list(messages),
"temperature": temperature if temperature is not None else cfg.temperature,
"max_tokens": max_tokens if max_tokens is not None else cfg.max_tokens,
"top_p": top_p if top_p is not None else cfg.top_p,
"frequency_penalty": cfg.frequency_penalty,
"presence_penalty": cfg.presence_penalty,
"timeout": timeout if timeout is not None else cfg.timeout,
}
if extra_options:
params.update(extra_options)

attempt = 0
last_error: Exception | None = None
total_attempts = cfg.retry_limit + 1

while attempt < total_attempts:
attempt += 1
try:
resp = await self._client.chat.completions.create(**params)
logger.info(
"llm.vision.complete",
extra={
"model": params.get("model"),
"attempt": attempt,
"max_tokens": params.get("max_tokens"),
},
)
choices = getattr(resp, "choices", [])
for choice in choices:
message = getattr(choice, "message", None)
if not message:
continue
content = getattr(message, "content", None)
if isinstance(content, str) and content.strip():
return content
logger.warning(
"llm.vision.empty",
extra={"model": params.get("model"), "attempt": attempt},
)
raise RuntimeError("vision_no_content")
except Exception as exc: # pragma: no cover - defensive catch
last_error = exc
logger.warning(
"llm.vision.error",
extra={"attempt": attempt, "model": params.get("model"), "error": repr(exc)},
)
if attempt >= total_attempts:
break
await asyncio.sleep(cfg.retry_backoff_seconds * attempt)

raise RuntimeError("LLM vision completion failed after retries") from last_error


__all__ = ["OpenAIChatClient", "LLMNotConfiguredError", "LLMStreamEmptyError", "ChatMessage"]
Loading
Loading