From b4849122d54ac0286f5a5236fd5ad88464c4e435 Mon Sep 17 00:00:00 2001 From: "ziyan.li" Date: Fri, 22 May 2026 14:21:25 +0800 Subject: [PATCH 1/2] feat(feishu-channel): add streaming response support for feishu channel 1. add streaming config parameter to extension init and env fallback 2. add fake stream test utilities and new streaming test case 3. refactor message handling logic to support both sync and stream modes 4. update docs to include streaming configuration option --- docs/docs/tools/feishu-channel.md | 1 + tests/test_feishu_channel_extension.py | 95 +++++++++++++++++++++++++- veadk/extensions/feishu_channel.py | 78 +++++++++++++++++---- 3 files changed, 159 insertions(+), 15 deletions(-) diff --git a/docs/docs/tools/feishu-channel.md b/docs/docs/tools/feishu-channel.md index f395d8ec..b36bc2ff 100644 --- a/docs/docs/tools/feishu-channel.md +++ b/docs/docs/tools/feishu-channel.md @@ -38,6 +38,7 @@ tool: app_id: cli_xxx app_secret: xxx transport: ws + streaming: true ``` ## 最小示例 diff --git a/tests/test_feishu_channel_extension.py b/tests/test_feishu_channel_extension.py index 0f059ed2..a98ffd74 100644 --- a/tests/test_feishu_channel_extension.py +++ b/tests/test_feishu_channel_extension.py @@ -19,6 +19,11 @@ from veadk.extensions.feishu_channel import FeishuChannelExtension +@pytest.fixture +def anyio_backend(): + return "asyncio" + + class FakeChannel: def __init__(self): self.handlers = {} @@ -31,6 +36,25 @@ async def send(self, chat_id, body, options=None): self.sent_messages.append((chat_id, body, options)) +class FakeStreamController: + def __init__(self): + self.chunks = [] + + async def append(self, chunk): + self.chunks.append(chunk) + + +class FakeStreamChannel(FakeChannel): + def __init__(self): + super().__init__() + self.stream_calls = [] + + async def stream(self, chat_id, spec, options=None): + controller = FakeStreamController() + await spec["markdown"](controller) + self.stream_calls.append((chat_id, controller.chunks, options)) + + class FakeRunner: def __init__(self): self.calls = [] @@ -46,6 +70,46 @@ async def run(self, messages, user_id="", session_id="", **kwargs): return f"echo:{messages}" +class FakeStreamingMemory: + def __init__(self): + self.sessions = [] + self.session_service = object() + + async def create_session(self, app_name, user_id, session_id): + self.sessions.append( + {"app_name": app_name, "user_id": user_id, "session_id": session_id} + ) + return True + + +class FakeStreamingRunner: + def __init__(self): + self.app_name = "stream_app" + self.short_term_memory = FakeStreamingMemory() + self.run_async_calls = [] + + async def run_async(self, user_id, session_id, new_message, run_config=None): + self.run_async_calls.append( + { + "user_id": user_id, + "session_id": session_id, + "new_message": new_message, + "run_config": run_config, + } + ) + yield SimpleNamespace( + content=SimpleNamespace( + parts=[ + SimpleNamespace(text="hel", thought=False), + SimpleNamespace(text="thinking", thought=True), + ] + ) + ) + yield SimpleNamespace( + content=SimpleNamespace(parts=[SimpleNamespace(text="lo", thought=False)]) + ) + + def build_message(**overrides): message = SimpleNamespace( id="om_001", @@ -73,7 +137,7 @@ def build_message(**overrides): return message -@pytest.mark.asyncio +@pytest.mark.anyio async def test_extension_uses_union_id_and_thread_id(): runner = FakeRunner() channel = FakeChannel() @@ -102,7 +166,7 @@ async def test_extension_uses_union_id_and_thread_id(): ] -@pytest.mark.asyncio +@pytest.mark.anyio async def test_extension_falls_back_to_chat_id_when_thread_missing(): runner = FakeRunner() channel = FakeChannel() @@ -118,7 +182,7 @@ async def test_extension_falls_back_to_chat_id_when_thread_missing(): assert runner.calls[0]["session_id"] == "oc_chat" -@pytest.mark.asyncio +@pytest.mark.anyio async def test_extension_ignores_empty_message_by_default(): runner = FakeRunner() channel = FakeChannel() @@ -130,3 +194,28 @@ async def test_extension_ignores_empty_message_by_default(): assert runner.calls == [] assert channel.sent_messages == [] + + +@pytest.mark.anyio +async def test_extension_streaming_uses_markdown_producer_controller(): + runner = FakeStreamingRunner() + channel = FakeStreamChannel() + extension = FeishuChannelExtension( + runner=runner, + channel=channel, + streaming=True, + ) + + await extension._on_message(build_message()) + + assert runner.short_term_memory.sessions == [ + { + "app_name": "stream_app", + "user_id": "on_union", + "session_id": "oc_chat", + } + ] + assert len(runner.run_async_calls) == 1 + assert channel.stream_calls == [ + ("oc_chat", ["hel", "lo"], {"reply_to": "om_001"}) + ] diff --git a/veadk/extensions/feishu_channel.py b/veadk/extensions/feishu_channel.py index d0c7b8c9..60985630 100644 --- a/veadk/extensions/feishu_channel.py +++ b/veadk/extensions/feishu_channel.py @@ -84,6 +84,7 @@ def __init__( reply_in_thread: bool = True, ignore_empty_messages: bool = True, channel_kwargs: dict[str, Any] | None = None, + streaming: bool = False, ) -> None: self.runner = runner self.session_id_factory = session_id_factory or self.default_session_id_factory @@ -92,6 +93,7 @@ def __init__( self.response_formatter = response_formatter or self.default_response_formatter self.reply_in_thread = reply_in_thread self.ignore_empty_messages = ignore_empty_messages + self.streaming = streaming or str(os.getenv("TOOL_FEISHU_CHANNEL_STREAMING", "")).lower() == "true" if channel is not None: self.channel = channel @@ -167,8 +169,64 @@ async def _on_message(self, message: Any) -> None: context = self.build_message_context(message=message, text=text) + send_options = {} + if self.reply_in_thread and context.message_id: + send_options["reply_to"] = context.message_id + if self.message_handler is not None: response_text = await self._maybe_await(self.message_handler(context)) + if not response_text: + return + + await self._maybe_await( + self.channel.send( + context.chat_id, + self.response_formatter(str(response_text)), + send_options, + ) + ) + elif getattr(self, "streaming", False) and hasattr(self.channel, "stream"): + from google.adk.agents import RunConfig + from google.adk.agents.run_config import StreamingMode + from veadk.config import getenv + from veadk.runner import _convert_messages + + if self.runner.short_term_memory: + await self.runner.short_term_memory.create_session( + app_name=self.runner.app_name, + user_id=context.user_id, + session_id=context.session_id + ) + + converted_messages = _convert_messages( + context.text, self.runner.app_name, context.user_id, context.session_id + ) + + run_config = RunConfig( + streaming_mode=StreamingMode.SSE, + max_llm_calls=int(getenv("MODEL_AGENT_MAX_LLM_CALLS", 100)), + ) + + async def stream_to_feishu(stream): + for converted_message in converted_messages: + async for event in self.runner.run_async( + user_id=context.user_id, + session_id=context.session_id, + new_message=converted_message, + run_config=run_config, + ): + if event.content and event.content.parts: + for part in event.content.parts: + if not getattr(part, "thought", False) and part.text: + await stream.append(part.text) + + await self._maybe_await( + self.channel.stream( + context.chat_id, + {"markdown": stream_to_feishu}, + send_options, + ) + ) else: response_text = await self.runner.run( messages=context.text, @@ -176,20 +234,16 @@ async def _on_message(self, message: Any) -> None: session_id=context.session_id, ) - if not response_text: - return - - send_options = {} - if self.reply_in_thread and context.message_id: - send_options["reply_to"] = context.message_id + if not response_text: + return - await self._maybe_await( - self.channel.send( - context.chat_id, - self.response_formatter(str(response_text)), - send_options, + await self._maybe_await( + self.channel.send( + context.chat_id, + self.response_formatter(str(response_text)), + send_options, + ) ) - ) def build_message_context( self, message: Any, text: str | None = None From 98c1c2ab2a2463ae8216ee53f149fc0ed67f8a7c Mon Sep 17 00:00:00 2001 From: "ziyan.li" Date: Fri, 22 May 2026 15:42:46 +0800 Subject: [PATCH 2/2] style: run pre-commit ruff-format --- tests/test_feishu_channel_extension.py | 4 +--- veadk/extensions/feishu_channel.py | 15 +++++++++------ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/tests/test_feishu_channel_extension.py b/tests/test_feishu_channel_extension.py index a98ffd74..74cc3e41 100644 --- a/tests/test_feishu_channel_extension.py +++ b/tests/test_feishu_channel_extension.py @@ -216,6 +216,4 @@ async def test_extension_streaming_uses_markdown_producer_controller(): } ] assert len(runner.run_async_calls) == 1 - assert channel.stream_calls == [ - ("oc_chat", ["hel", "lo"], {"reply_to": "om_001"}) - ] + assert channel.stream_calls == [("oc_chat", ["hel", "lo"], {"reply_to": "om_001"})] diff --git a/veadk/extensions/feishu_channel.py b/veadk/extensions/feishu_channel.py index 60985630..30e70228 100644 --- a/veadk/extensions/feishu_channel.py +++ b/veadk/extensions/feishu_channel.py @@ -93,7 +93,10 @@ def __init__( self.response_formatter = response_formatter or self.default_response_formatter self.reply_in_thread = reply_in_thread self.ignore_empty_messages = ignore_empty_messages - self.streaming = streaming or str(os.getenv("TOOL_FEISHU_CHANNEL_STREAMING", "")).lower() == "true" + self.streaming = ( + streaming + or str(os.getenv("TOOL_FEISHU_CHANNEL_STREAMING", "")).lower() == "true" + ) if channel is not None: self.channel = channel @@ -193,11 +196,11 @@ async def _on_message(self, message: Any) -> None: if self.runner.short_term_memory: await self.runner.short_term_memory.create_session( - app_name=self.runner.app_name, - user_id=context.user_id, - session_id=context.session_id + app_name=self.runner.app_name, + user_id=context.user_id, + session_id=context.session_id, ) - + converted_messages = _convert_messages( context.text, self.runner.app_name, context.user_id, context.session_id ) @@ -219,7 +222,7 @@ async def stream_to_feishu(stream): for part in event.content.parts: if not getattr(part, "thought", False) and part.text: await stream.append(part.text) - + await self._maybe_await( self.channel.stream( context.chat_id,