Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions opencane/agent/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@ async def process_direct(
blocked_tool_names: set[str] | None = None,
require_tool_use: bool = False,
message_metadata: dict[str, Any] | None = None,
) -> str:
) -> OutboundMessage | None:
"""
Process a message directly (for CLI or cron usage).

Expand All @@ -1078,7 +1078,7 @@ async def process_direct(
message_metadata: Optional per-message metadata for context injection.

Returns:
The agent's response.
The agent's outbound response envelope.
"""
await self._connect_mcp()
msg = InboundMessage(
Expand All @@ -1096,7 +1096,7 @@ async def process_direct(
blocked_tool_names=blocked_tool_names,
require_tool_use=require_tool_use,
)
return response.content if response else ""
return response

async def list_registered_tools(self, *, connect_mcp: bool = True) -> list[str]:
"""List current registered tool names, optionally connecting MCP first."""
Expand Down
20 changes: 17 additions & 3 deletions opencane/api/digital_task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ async def _executor(goal: str, session_id: str) -> dict[str, Any]:
# Stage 1: explicit MCP-only execution.
if mcp_tools:
mcp_prompt = _build_mcp_prompt(goal)
mcp_output = await agent_loop.process_direct(
mcp_resp = await agent_loop.process_direct(
mcp_prompt,
session_key=f"hardware:{session_id}:digital",
channel="hardware",
chat_id=session_id,
allowed_tool_names=set(mcp_tools),
require_tool_use=True,
)
mcp_output = _direct_output_text(mcp_resp)
if not _should_fallback_from_mcp(mcp_output):
return {
"text": str(mcp_output or "").strip(),
Expand All @@ -84,23 +85,25 @@ async def _executor(goal: str, session_id: str) -> dict[str, Any]:

# Stage 2: explicit web/exec fallback with MCP still available.
fallback_prompt = _build_fallback_prompt(goal)
fallback_output = await agent_loop.process_direct(
fallback_resp = await agent_loop.process_direct(
fallback_prompt,
session_key=f"hardware:{session_id}:digital",
channel="hardware",
chat_id=session_id,
allowed_tool_names=fallback_tools,
require_tool_use=True,
)
fallback_output = _direct_output_text(fallback_resp)
if str(fallback_output or "").strip() == _NO_TOOL_USED:
fallback_output = await agent_loop.process_direct(
fallback_resp = await agent_loop.process_direct(
goal,
session_key=f"hardware:{session_id}:digital",
channel="hardware",
chat_id=session_id,
allowed_tool_names=fallback_tools,
require_tool_use=False,
)
fallback_output = _direct_output_text(fallback_resp)
return {
"text": str(fallback_output or "").strip(),
"execution_path": "web_exec_fallback",
Expand Down Expand Up @@ -634,6 +637,17 @@ def _normalize_executor_result(result: Any) -> tuple[str, dict[str, Any]]:
return str(result or ""), {}


def _direct_output_text(value: Any) -> str:
if value is None:
return ""
if isinstance(value, str):
return value
content = getattr(value, "content", None)
if isinstance(content, str):
return content
return str(content or "")


def _build_mcp_prompt(goal: str) -> str:
return (
"你在执行一个数字盲道代操作任务。要求:\n"
Expand Down
13 changes: 8 additions & 5 deletions opencane/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,12 +758,13 @@ def gateway(
# Set cron callback (needs agent)
async def on_cron_job(job: CronJob) -> str | None:
"""Execute a cron job through the agent."""
response = await agent.process_direct(
resp = await agent.process_direct(
job.payload.message,
session_key=f"cron:{job.id}",
channel=job.payload.channel or "cli",
chat_id=job.payload.to or "direct",
)
response = resp.content if resp else ""
if job.payload.deliver and job.payload.to:
from opencane.bus.events import OutboundMessage
await bus.publish_outbound(OutboundMessage(
Expand All @@ -777,7 +778,8 @@ async def on_cron_job(job: CronJob) -> str | None:
# Create heartbeat service
async def on_heartbeat(prompt: str) -> str:
"""Execute heartbeat through the agent."""
return await agent.process_direct(prompt, session_key="heartbeat")
resp = await agent.process_direct(prompt, session_key="heartbeat")
return resp.content if resp else ""

heartbeat = HeartbeatService(
workspace=config.workspace_path,
Expand Down Expand Up @@ -894,7 +896,7 @@ async def run_once():
try:
with _thinking_ctx():
response = await agent_loop.process_direct(message, session_id)
_print_agent_response(response, render_markdown=markdown)
_print_agent_response(response.content if response else "", render_markdown=markdown)
finally:
await agent_loop.close_mcp()
if lifelog_service:
Expand Down Expand Up @@ -930,7 +932,7 @@ async def run_interactive():

with _thinking_ctx():
response = await agent_loop.process_direct(user_input, session_id)
_print_agent_response(response, render_markdown=markdown)
_print_agent_response(response.content if response else "", render_markdown=markdown)
except KeyboardInterrupt:
_restore_terminal()
console.print("\nGoodbye!")
Expand Down Expand Up @@ -1325,12 +1327,13 @@ def cron_run(
result_holder: list[str] = []

async def on_job(job) -> str | None: # type: ignore[no-untyped-def]
response = await agent_loop.process_direct(
resp = await agent_loop.process_direct(
job.payload.message,
session_key=f"cron:{job.id}",
channel=job.payload.channel or "cli",
chat_id=job.payload.to or "direct",
)
response = resp.content if resp else ""
result_holder.append(response)
return response

Expand Down
14 changes: 13 additions & 1 deletion opencane/hardware/runtime/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ async def _process_listen_stop(
policy_context=policy_context,
)
agent_started_ms = int(time.time() * 1000)
response = await self.agent_loop.process_direct(
response_raw = await self.agent_loop.process_direct(
transcript,
session_key=f"hardware:{session.device_id}:{session.session_id}",
channel="hardware",
Expand All @@ -589,6 +589,7 @@ async def _process_listen_stop(
blocked_tool_names=tool_denylist,
message_metadata={"runtime_context": runtime_context},
)
response = _direct_output_text(response_raw)
agent_latency_ms = max(0, int(time.time() * 1000) - agent_started_ms)
await self._send_tts_text(
session,
Expand Down Expand Up @@ -1633,6 +1634,17 @@ def _to_float(value: Any, default: float) -> float:
return float(default)


def _direct_output_text(value: Any) -> str:
if value is None:
return ""
if isinstance(value, str):
return value
content = getattr(value, "content", None)
if isinstance(content, str):
return content
return str(content or "")


def _to_int(value: Any, default: int) -> int:
try:
return int(value)
Expand Down
14 changes: 9 additions & 5 deletions tests/test_agent_loop_interim_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ async def test_agent_loop_retries_interim_text_then_executes_tool(tmp_path: Path

outbound = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert outbound.content == "Tool progress update"
assert result == "Done after tool call."
assert result is not None
assert result.content == "Done after tool call."
assert provider.calls == 3


Expand All @@ -134,7 +135,8 @@ async def test_agent_loop_retries_interim_text_only_once(tmp_path: Path) -> None
chat_id="chat-2",
)

assert result == "second final"
assert result is not None
assert result.content == "second final"
assert provider.calls == 2


Expand All @@ -152,7 +154,8 @@ async def test_agent_loop_skips_interim_retry_without_available_tools(tmp_path:
allowed_tool_names=set(),
)

assert result == "first response"
assert result is not None
assert result.content == "first response"
assert provider.calls == 1


Expand All @@ -169,7 +172,8 @@ async def test_agent_loop_falls_back_to_interim_when_retry_returns_empty(tmp_pat
chat_id="chat-4",
)

assert result == "first interim answer"
assert result is not None
assert result.content == "first interim answer"
assert provider.calls == 2


Expand All @@ -186,5 +190,5 @@ async def test_agent_loop_does_not_use_interim_fallback_after_tool_usage(tmp_pat
chat_id="chat-5",
)

assert result == ""
assert result is None
assert provider.calls == 3
3 changes: 2 additions & 1 deletion tests/test_agent_loop_memory_layered.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ async def test_agent_loop_process_direct_records_layered_memory(tmp_path: Path)
channel="cli",
chat_id="chat-layered",
)
assert result == "收到"
assert result is not None
assert result.content == "收到"

memory_dir = tmp_path / "memory"
assert (memory_dir / "PROFILE.json").exists()
Expand Down
14 changes: 9 additions & 5 deletions tests/test_agent_loop_safety.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ async def test_agent_loop_applies_safety_for_non_hardware_channel(tmp_path: Path
channel="cli",
chat_id="chat-1",
)
assert result.startswith("safe:")
assert result is not None
assert result.content.startswith("safe:")
assert any(
str(event.get("event_type")) == "safety_policy"
and str((event.get("payload") or {}).get("source")) == "agent_reply"
Expand All @@ -166,7 +167,8 @@ async def test_agent_loop_skips_safety_for_hardware_channel(tmp_path: Path) -> N
channel="hardware",
chat_id="device-1",
)
assert result == "请继续直行"
assert result is not None
assert result.content == "请继续直行"


@pytest.mark.asyncio
Expand All @@ -186,7 +188,8 @@ async def test_agent_loop_no_tool_used_token_not_modified(tmp_path: Path) -> Non
allowed_tool_names={"web_search"},
require_tool_use=True,
)
assert result == "NO_TOOL_USED"
assert result is not None
assert result.content == "NO_TOOL_USED"


@pytest.mark.asyncio
Expand All @@ -206,7 +209,7 @@ async def test_agent_loop_message_tool_outbound_is_safety_filtered(tmp_path: Pat
channel="cli",
chat_id="chat-99",
)
assert result == ""
assert result is None
outbound = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert outbound.channel == "cli"
assert outbound.chat_id == "chat-99"
Expand Down Expand Up @@ -242,7 +245,8 @@ async def test_agent_loop_process_direct_injects_runtime_context_block(tmp_path:
}
},
)
assert result == "ok"
assert result is not None
assert result.content == "ok"
assert provider.last_messages
system_prompt = str(provider.last_messages[0].get("content") or "")
assert "Device Runtime Context" in system_prompt
Expand Down
5 changes: 3 additions & 2 deletions tests/test_cron_run_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from typer.testing import CliRunner

from opencane.bus.events import OutboundMessage
from opencane.cli.commands import app
from opencane.config.schema import Config

Expand All @@ -28,7 +29,7 @@ async def process_direct(
channel: str = "cli",
chat_id: str = "direct",
**kwargs, # type: ignore[no-untyped-def]
) -> str:
) -> OutboundMessage:
del kwargs
self.process_calls.append(
{
Expand All @@ -38,7 +39,7 @@ async def process_direct(
"chat_id": chat_id,
}
)
return "agent-response"
return OutboundMessage(channel=channel, chat_id=chat_id, content="agent-response")

async def close_mcp(self) -> None:
self.closed = True
Expand Down
6 changes: 4 additions & 2 deletions tests/test_tool_domain_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ async def test_spawn_tool_is_guarded_by_max_calls_per_turn(tmp_path: Path) -> No
channel="cli",
chat_id="chat-guard",
)
assert result == "done"
assert result is not None
assert result.content == "done"
assert fake_spawn.calls == 1


Expand Down Expand Up @@ -253,7 +254,8 @@ async def test_explicit_allowlist_still_respects_channel_policy(tmp_path: Path)
chat_id="device-1",
allowed_tool_names={"exec"},
)
assert result == "done"
assert result is not None
assert result.content == "done"
assert fake_exec.calls == 0


Expand Down
Loading