diff --git a/opencane/agent/loop.py b/opencane/agent/loop.py index 202a99781d..c0365937d2 100644 --- a/opencane/agent/loop.py +++ b/opencane/agent/loop.py @@ -1065,7 +1065,7 @@ async def process_direct( blocked_tool_names: set[str] | None = None, require_tool_use: bool = False, message_metadata: dict[str, Any] | None = None, - ) -> str: + ) -> OutboundMessage | None: """ Process a message directly (for CLI or cron usage). @@ -1078,7 +1078,7 @@ async def process_direct( message_metadata: Optional per-message metadata for context injection. Returns: - The agent's response. + The agent's outbound response envelope. """ await self._connect_mcp() msg = InboundMessage( @@ -1096,7 +1096,7 @@ async def process_direct( blocked_tool_names=blocked_tool_names, require_tool_use=require_tool_use, ) - return response.content if response else "" + return response async def list_registered_tools(self, *, connect_mcp: bool = True) -> list[str]: """List current registered tool names, optionally connecting MCP first.""" diff --git a/opencane/api/digital_task_service.py b/opencane/api/digital_task_service.py index 4479b2de0a..668d833c61 100644 --- a/opencane/api/digital_task_service.py +++ b/opencane/api/digital_task_service.py @@ -67,7 +67,7 @@ async def _executor(goal: str, session_id: str) -> dict[str, Any]: # Stage 1: explicit MCP-only execution. if mcp_tools: mcp_prompt = _build_mcp_prompt(goal) - mcp_output = await agent_loop.process_direct( + mcp_resp = await agent_loop.process_direct( mcp_prompt, session_key=f"hardware:{session_id}:digital", channel="hardware", @@ -75,6 +75,7 @@ async def _executor(goal: str, session_id: str) -> dict[str, Any]: allowed_tool_names=set(mcp_tools), require_tool_use=True, ) + mcp_output = _direct_output_text(mcp_resp) if not _should_fallback_from_mcp(mcp_output): return { "text": str(mcp_output or "").strip(), @@ -84,7 +85,7 @@ async def _executor(goal: str, session_id: str) -> dict[str, Any]: # Stage 2: explicit web/exec fallback with MCP still available. fallback_prompt = _build_fallback_prompt(goal) - fallback_output = await agent_loop.process_direct( + fallback_resp = await agent_loop.process_direct( fallback_prompt, session_key=f"hardware:{session_id}:digital", channel="hardware", @@ -92,8 +93,9 @@ async def _executor(goal: str, session_id: str) -> dict[str, Any]: allowed_tool_names=fallback_tools, require_tool_use=True, ) + fallback_output = _direct_output_text(fallback_resp) if str(fallback_output or "").strip() == _NO_TOOL_USED: - fallback_output = await agent_loop.process_direct( + fallback_resp = await agent_loop.process_direct( goal, session_key=f"hardware:{session_id}:digital", channel="hardware", @@ -101,6 +103,7 @@ async def _executor(goal: str, session_id: str) -> dict[str, Any]: allowed_tool_names=fallback_tools, require_tool_use=False, ) + fallback_output = _direct_output_text(fallback_resp) return { "text": str(fallback_output or "").strip(), "execution_path": "web_exec_fallback", @@ -634,6 +637,17 @@ def _normalize_executor_result(result: Any) -> tuple[str, dict[str, Any]]: return str(result or ""), {} +def _direct_output_text(value: Any) -> str: + if value is None: + return "" + if isinstance(value, str): + return value + content = getattr(value, "content", None) + if isinstance(content, str): + return content + return str(content or "") + + def _build_mcp_prompt(goal: str) -> str: return ( "你在执行一个数字盲道代操作任务。要求:\n" diff --git a/opencane/cli/commands.py b/opencane/cli/commands.py index 46478f7c20..f2710d71fb 100644 --- a/opencane/cli/commands.py +++ b/opencane/cli/commands.py @@ -758,12 +758,13 @@ def gateway( # Set cron callback (needs agent) async def on_cron_job(job: CronJob) -> str | None: """Execute a cron job through the agent.""" - response = await agent.process_direct( + resp = await agent.process_direct( job.payload.message, session_key=f"cron:{job.id}", channel=job.payload.channel or "cli", chat_id=job.payload.to or "direct", ) + response = resp.content if resp else "" if job.payload.deliver and job.payload.to: from opencane.bus.events import OutboundMessage await bus.publish_outbound(OutboundMessage( @@ -777,7 +778,8 @@ async def on_cron_job(job: CronJob) -> str | None: # Create heartbeat service async def on_heartbeat(prompt: str) -> str: """Execute heartbeat through the agent.""" - return await agent.process_direct(prompt, session_key="heartbeat") + resp = await agent.process_direct(prompt, session_key="heartbeat") + return resp.content if resp else "" heartbeat = HeartbeatService( workspace=config.workspace_path, @@ -894,7 +896,7 @@ async def run_once(): try: with _thinking_ctx(): response = await agent_loop.process_direct(message, session_id) - _print_agent_response(response, render_markdown=markdown) + _print_agent_response(response.content if response else "", render_markdown=markdown) finally: await agent_loop.close_mcp() if lifelog_service: @@ -930,7 +932,7 @@ async def run_interactive(): with _thinking_ctx(): response = await agent_loop.process_direct(user_input, session_id) - _print_agent_response(response, render_markdown=markdown) + _print_agent_response(response.content if response else "", render_markdown=markdown) except KeyboardInterrupt: _restore_terminal() console.print("\nGoodbye!") @@ -1325,12 +1327,13 @@ def cron_run( result_holder: list[str] = [] async def on_job(job) -> str | None: # type: ignore[no-untyped-def] - response = await agent_loop.process_direct( + resp = await agent_loop.process_direct( job.payload.message, session_key=f"cron:{job.id}", channel=job.payload.channel or "cli", chat_id=job.payload.to or "direct", ) + response = resp.content if resp else "" result_holder.append(response) return response diff --git a/opencane/hardware/runtime/connection.py b/opencane/hardware/runtime/connection.py index e0025144fe..5d469fe9bb 100644 --- a/opencane/hardware/runtime/connection.py +++ b/opencane/hardware/runtime/connection.py @@ -580,7 +580,7 @@ async def _process_listen_stop( policy_context=policy_context, ) agent_started_ms = int(time.time() * 1000) - response = await self.agent_loop.process_direct( + response_raw = await self.agent_loop.process_direct( transcript, session_key=f"hardware:{session.device_id}:{session.session_id}", channel="hardware", @@ -589,6 +589,7 @@ async def _process_listen_stop( blocked_tool_names=tool_denylist, message_metadata={"runtime_context": runtime_context}, ) + response = _direct_output_text(response_raw) agent_latency_ms = max(0, int(time.time() * 1000) - agent_started_ms) await self._send_tts_text( session, @@ -1633,6 +1634,17 @@ def _to_float(value: Any, default: float) -> float: return float(default) +def _direct_output_text(value: Any) -> str: + if value is None: + return "" + if isinstance(value, str): + return value + content = getattr(value, "content", None) + if isinstance(content, str): + return content + return str(content or "") + + def _to_int(value: Any, default: int) -> int: try: return int(value) diff --git a/tests/test_agent_loop_interim_retry.py b/tests/test_agent_loop_interim_retry.py index 35f476b2c1..1a4aa1e532 100644 --- a/tests/test_agent_loop_interim_retry.py +++ b/tests/test_agent_loop_interim_retry.py @@ -117,7 +117,8 @@ async def test_agent_loop_retries_interim_text_then_executes_tool(tmp_path: Path outbound = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) assert outbound.content == "Tool progress update" - assert result == "Done after tool call." + assert result is not None + assert result.content == "Done after tool call." assert provider.calls == 3 @@ -134,7 +135,8 @@ async def test_agent_loop_retries_interim_text_only_once(tmp_path: Path) -> None chat_id="chat-2", ) - assert result == "second final" + assert result is not None + assert result.content == "second final" assert provider.calls == 2 @@ -152,7 +154,8 @@ async def test_agent_loop_skips_interim_retry_without_available_tools(tmp_path: allowed_tool_names=set(), ) - assert result == "first response" + assert result is not None + assert result.content == "first response" assert provider.calls == 1 @@ -169,7 +172,8 @@ async def test_agent_loop_falls_back_to_interim_when_retry_returns_empty(tmp_pat chat_id="chat-4", ) - assert result == "first interim answer" + assert result is not None + assert result.content == "first interim answer" assert provider.calls == 2 @@ -186,5 +190,5 @@ async def test_agent_loop_does_not_use_interim_fallback_after_tool_usage(tmp_pat chat_id="chat-5", ) - assert result == "" + assert result is None assert provider.calls == 3 diff --git a/tests/test_agent_loop_memory_layered.py b/tests/test_agent_loop_memory_layered.py index 304d111aff..1871aa1885 100644 --- a/tests/test_agent_loop_memory_layered.py +++ b/tests/test_agent_loop_memory_layered.py @@ -43,7 +43,8 @@ async def test_agent_loop_process_direct_records_layered_memory(tmp_path: Path) channel="cli", chat_id="chat-layered", ) - assert result == "收到" + assert result is not None + assert result.content == "收到" memory_dir = tmp_path / "memory" assert (memory_dir / "PROFILE.json").exists() diff --git a/tests/test_agent_loop_safety.py b/tests/test_agent_loop_safety.py index 5860a44099..4718845463 100644 --- a/tests/test_agent_loop_safety.py +++ b/tests/test_agent_loop_safety.py @@ -143,7 +143,8 @@ async def test_agent_loop_applies_safety_for_non_hardware_channel(tmp_path: Path channel="cli", chat_id="chat-1", ) - assert result.startswith("safe:") + assert result is not None + assert result.content.startswith("safe:") assert any( str(event.get("event_type")) == "safety_policy" and str((event.get("payload") or {}).get("source")) == "agent_reply" @@ -166,7 +167,8 @@ async def test_agent_loop_skips_safety_for_hardware_channel(tmp_path: Path) -> N channel="hardware", chat_id="device-1", ) - assert result == "请继续直行" + assert result is not None + assert result.content == "请继续直行" @pytest.mark.asyncio @@ -186,7 +188,8 @@ async def test_agent_loop_no_tool_used_token_not_modified(tmp_path: Path) -> Non allowed_tool_names={"web_search"}, require_tool_use=True, ) - assert result == "NO_TOOL_USED" + assert result is not None + assert result.content == "NO_TOOL_USED" @pytest.mark.asyncio @@ -206,7 +209,7 @@ async def test_agent_loop_message_tool_outbound_is_safety_filtered(tmp_path: Pat channel="cli", chat_id="chat-99", ) - assert result == "" + assert result is None outbound = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) assert outbound.channel == "cli" assert outbound.chat_id == "chat-99" @@ -242,7 +245,8 @@ async def test_agent_loop_process_direct_injects_runtime_context_block(tmp_path: } }, ) - assert result == "ok" + assert result is not None + assert result.content == "ok" assert provider.last_messages system_prompt = str(provider.last_messages[0].get("content") or "") assert "Device Runtime Context" in system_prompt diff --git a/tests/test_cron_run_command.py b/tests/test_cron_run_command.py index 294e4818b1..88318bc121 100644 --- a/tests/test_cron_run_command.py +++ b/tests/test_cron_run_command.py @@ -5,6 +5,7 @@ from typer.testing import CliRunner +from opencane.bus.events import OutboundMessage from opencane.cli.commands import app from opencane.config.schema import Config @@ -28,7 +29,7 @@ async def process_direct( channel: str = "cli", chat_id: str = "direct", **kwargs, # type: ignore[no-untyped-def] - ) -> str: + ) -> OutboundMessage: del kwargs self.process_calls.append( { @@ -38,7 +39,7 @@ async def process_direct( "chat_id": chat_id, } ) - return "agent-response" + return OutboundMessage(channel=channel, chat_id=chat_id, content="agent-response") async def close_mcp(self) -> None: self.closed = True diff --git a/tests/test_tool_domain_manager.py b/tests/test_tool_domain_manager.py index e7f5a8e5cf..c542e62747 100644 --- a/tests/test_tool_domain_manager.py +++ b/tests/test_tool_domain_manager.py @@ -207,7 +207,8 @@ async def test_spawn_tool_is_guarded_by_max_calls_per_turn(tmp_path: Path) -> No channel="cli", chat_id="chat-guard", ) - assert result == "done" + assert result is not None + assert result.content == "done" assert fake_spawn.calls == 1 @@ -253,7 +254,8 @@ async def test_explicit_allowlist_still_respects_channel_policy(tmp_path: Path) chat_id="device-1", allowed_tool_names={"exec"}, ) - assert result == "done" + assert result is not None + assert result.content == "done" assert fake_exec.calls == 0