diff --git a/docs/docs/tools/feishu-channel.md b/docs/docs/tools/feishu-channel.md index f395d8ec..b36bc2ff 100644 --- a/docs/docs/tools/feishu-channel.md +++ b/docs/docs/tools/feishu-channel.md @@ -38,6 +38,7 @@ tool: app_id: cli_xxx app_secret: xxx transport: ws + streaming: true ``` ## 最小示例 diff --git a/tests/test_feishu_channel_extension.py b/tests/test_feishu_channel_extension.py index 0f059ed2..74cc3e41 100644 --- a/tests/test_feishu_channel_extension.py +++ b/tests/test_feishu_channel_extension.py @@ -19,6 +19,11 @@ from veadk.extensions.feishu_channel import FeishuChannelExtension +@pytest.fixture +def anyio_backend(): + return "asyncio" + + class FakeChannel: def __init__(self): self.handlers = {} @@ -31,6 +36,25 @@ async def send(self, chat_id, body, options=None): self.sent_messages.append((chat_id, body, options)) +class FakeStreamController: + def __init__(self): + self.chunks = [] + + async def append(self, chunk): + self.chunks.append(chunk) + + +class FakeStreamChannel(FakeChannel): + def __init__(self): + super().__init__() + self.stream_calls = [] + + async def stream(self, chat_id, spec, options=None): + controller = FakeStreamController() + await spec["markdown"](controller) + self.stream_calls.append((chat_id, controller.chunks, options)) + + class FakeRunner: def __init__(self): self.calls = [] @@ -46,6 +70,46 @@ async def run(self, messages, user_id="", session_id="", **kwargs): return f"echo:{messages}" +class FakeStreamingMemory: + def __init__(self): + self.sessions = [] + self.session_service = object() + + async def create_session(self, app_name, user_id, session_id): + self.sessions.append( + {"app_name": app_name, "user_id": user_id, "session_id": session_id} + ) + return True + + +class FakeStreamingRunner: + def __init__(self): + self.app_name = "stream_app" + self.short_term_memory = FakeStreamingMemory() + self.run_async_calls = [] + + async def run_async(self, user_id, session_id, new_message, run_config=None): + self.run_async_calls.append( + { + "user_id": user_id, + "session_id": session_id, + "new_message": new_message, + "run_config": run_config, + } + ) + yield SimpleNamespace( + content=SimpleNamespace( + parts=[ + SimpleNamespace(text="hel", thought=False), + SimpleNamespace(text="thinking", thought=True), + ] + ) + ) + yield SimpleNamespace( + content=SimpleNamespace(parts=[SimpleNamespace(text="lo", thought=False)]) + ) + + def build_message(**overrides): message = SimpleNamespace( id="om_001", @@ -73,7 +137,7 @@ def build_message(**overrides): return message -@pytest.mark.asyncio +@pytest.mark.anyio async def test_extension_uses_union_id_and_thread_id(): runner = FakeRunner() channel = FakeChannel() @@ -102,7 +166,7 @@ async def test_extension_uses_union_id_and_thread_id(): ] -@pytest.mark.asyncio +@pytest.mark.anyio async def test_extension_falls_back_to_chat_id_when_thread_missing(): runner = FakeRunner() channel = FakeChannel() @@ -118,7 +182,7 @@ async def test_extension_falls_back_to_chat_id_when_thread_missing(): assert runner.calls[0]["session_id"] == "oc_chat" -@pytest.mark.asyncio +@pytest.mark.anyio async def test_extension_ignores_empty_message_by_default(): runner = FakeRunner() channel = FakeChannel() @@ -130,3 +194,26 @@ async def test_extension_ignores_empty_message_by_default(): assert runner.calls == [] assert channel.sent_messages == [] + + +@pytest.mark.anyio +async def test_extension_streaming_uses_markdown_producer_controller(): + runner = FakeStreamingRunner() + channel = FakeStreamChannel() + extension = FeishuChannelExtension( + runner=runner, + channel=channel, + streaming=True, + ) + + await extension._on_message(build_message()) + + assert runner.short_term_memory.sessions == [ + { + "app_name": "stream_app", + "user_id": "on_union", + "session_id": "oc_chat", + } + ] + assert len(runner.run_async_calls) == 1 + assert channel.stream_calls == [("oc_chat", ["hel", "lo"], {"reply_to": "om_001"})] diff --git a/veadk/extensions/feishu_channel.py b/veadk/extensions/feishu_channel.py index d0c7b8c9..30e70228 100644 --- a/veadk/extensions/feishu_channel.py +++ b/veadk/extensions/feishu_channel.py @@ -84,6 +84,7 @@ def __init__( reply_in_thread: bool = True, ignore_empty_messages: bool = True, channel_kwargs: dict[str, Any] | None = None, + streaming: bool = False, ) -> None: self.runner = runner self.session_id_factory = session_id_factory or self.default_session_id_factory @@ -92,6 +93,10 @@ def __init__( self.response_formatter = response_formatter or self.default_response_formatter self.reply_in_thread = reply_in_thread self.ignore_empty_messages = ignore_empty_messages + self.streaming = ( + streaming + or str(os.getenv("TOOL_FEISHU_CHANNEL_STREAMING", "")).lower() == "true" + ) if channel is not None: self.channel = channel @@ -167,8 +172,64 @@ async def _on_message(self, message: Any) -> None: context = self.build_message_context(message=message, text=text) + send_options = {} + if self.reply_in_thread and context.message_id: + send_options["reply_to"] = context.message_id + if self.message_handler is not None: response_text = await self._maybe_await(self.message_handler(context)) + if not response_text: + return + + await self._maybe_await( + self.channel.send( + context.chat_id, + self.response_formatter(str(response_text)), + send_options, + ) + ) + elif getattr(self, "streaming", False) and hasattr(self.channel, "stream"): + from google.adk.agents import RunConfig + from google.adk.agents.run_config import StreamingMode + from veadk.config import getenv + from veadk.runner import _convert_messages + + if self.runner.short_term_memory: + await self.runner.short_term_memory.create_session( + app_name=self.runner.app_name, + user_id=context.user_id, + session_id=context.session_id, + ) + + converted_messages = _convert_messages( + context.text, self.runner.app_name, context.user_id, context.session_id + ) + + run_config = RunConfig( + streaming_mode=StreamingMode.SSE, + max_llm_calls=int(getenv("MODEL_AGENT_MAX_LLM_CALLS", 100)), + ) + + async def stream_to_feishu(stream): + for converted_message in converted_messages: + async for event in self.runner.run_async( + user_id=context.user_id, + session_id=context.session_id, + new_message=converted_message, + run_config=run_config, + ): + if event.content and event.content.parts: + for part in event.content.parts: + if not getattr(part, "thought", False) and part.text: + await stream.append(part.text) + + await self._maybe_await( + self.channel.stream( + context.chat_id, + {"markdown": stream_to_feishu}, + send_options, + ) + ) else: response_text = await self.runner.run( messages=context.text, @@ -176,20 +237,16 @@ async def _on_message(self, message: Any) -> None: session_id=context.session_id, ) - if not response_text: - return - - send_options = {} - if self.reply_in_thread and context.message_id: - send_options["reply_to"] = context.message_id + if not response_text: + return - await self._maybe_await( - self.channel.send( - context.chat_id, - self.response_formatter(str(response_text)), - send_options, + await self._maybe_await( + self.channel.send( + context.chat_id, + self.response_formatter(str(response_text)), + send_options, + ) ) - ) def build_message_context( self, message: Any, text: str | None = None