Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/tools/feishu-channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ tool:
app_id: cli_xxx
app_secret: xxx
transport: ws
streaming: true
```

## 最小示例
Expand Down
93 changes: 90 additions & 3 deletions tests/test_feishu_channel_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
from veadk.extensions.feishu_channel import FeishuChannelExtension


@pytest.fixture
def anyio_backend():
return "asyncio"


class FakeChannel:
def __init__(self):
self.handlers = {}
Expand All @@ -31,6 +36,25 @@ async def send(self, chat_id, body, options=None):
self.sent_messages.append((chat_id, body, options))


class FakeStreamController:
def __init__(self):
self.chunks = []

async def append(self, chunk):
self.chunks.append(chunk)


class FakeStreamChannel(FakeChannel):
def __init__(self):
super().__init__()
self.stream_calls = []

async def stream(self, chat_id, spec, options=None):
controller = FakeStreamController()
await spec["markdown"](controller)
self.stream_calls.append((chat_id, controller.chunks, options))


class FakeRunner:
def __init__(self):
self.calls = []
Expand All @@ -46,6 +70,46 @@ async def run(self, messages, user_id="", session_id="", **kwargs):
return f"echo:{messages}"


class FakeStreamingMemory:
def __init__(self):
self.sessions = []
self.session_service = object()

async def create_session(self, app_name, user_id, session_id):
self.sessions.append(
{"app_name": app_name, "user_id": user_id, "session_id": session_id}
)
return True


class FakeStreamingRunner:
def __init__(self):
self.app_name = "stream_app"
self.short_term_memory = FakeStreamingMemory()
self.run_async_calls = []

async def run_async(self, user_id, session_id, new_message, run_config=None):
self.run_async_calls.append(
{
"user_id": user_id,
"session_id": session_id,
"new_message": new_message,
"run_config": run_config,
}
)
yield SimpleNamespace(
content=SimpleNamespace(
parts=[
SimpleNamespace(text="hel", thought=False),
SimpleNamespace(text="thinking", thought=True),
]
)
)
yield SimpleNamespace(
content=SimpleNamespace(parts=[SimpleNamespace(text="lo", thought=False)])
)


def build_message(**overrides):
message = SimpleNamespace(
id="om_001",
Expand Down Expand Up @@ -73,7 +137,7 @@ def build_message(**overrides):
return message


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_extension_uses_union_id_and_thread_id():
runner = FakeRunner()
channel = FakeChannel()
Expand Down Expand Up @@ -102,7 +166,7 @@ async def test_extension_uses_union_id_and_thread_id():
]


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_extension_falls_back_to_chat_id_when_thread_missing():
runner = FakeRunner()
channel = FakeChannel()
Expand All @@ -118,7 +182,7 @@ async def test_extension_falls_back_to_chat_id_when_thread_missing():
assert runner.calls[0]["session_id"] == "oc_chat"


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_extension_ignores_empty_message_by_default():
runner = FakeRunner()
channel = FakeChannel()
Expand All @@ -130,3 +194,26 @@ async def test_extension_ignores_empty_message_by_default():

assert runner.calls == []
assert channel.sent_messages == []


@pytest.mark.anyio
async def test_extension_streaming_uses_markdown_producer_controller():
runner = FakeStreamingRunner()
channel = FakeStreamChannel()
extension = FeishuChannelExtension(
runner=runner,
channel=channel,
streaming=True,
)

await extension._on_message(build_message())

assert runner.short_term_memory.sessions == [
{
"app_name": "stream_app",
"user_id": "on_union",
"session_id": "oc_chat",
}
]
assert len(runner.run_async_calls) == 1
assert channel.stream_calls == [("oc_chat", ["hel", "lo"], {"reply_to": "om_001"})]
81 changes: 69 additions & 12 deletions veadk/extensions/feishu_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def __init__(
reply_in_thread: bool = True,
ignore_empty_messages: bool = True,
channel_kwargs: dict[str, Any] | None = None,
streaming: bool = False,
) -> None:
self.runner = runner
self.session_id_factory = session_id_factory or self.default_session_id_factory
Expand All @@ -92,6 +93,10 @@ def __init__(
self.response_formatter = response_formatter or self.default_response_formatter
self.reply_in_thread = reply_in_thread
self.ignore_empty_messages = ignore_empty_messages
self.streaming = (
streaming
or str(os.getenv("TOOL_FEISHU_CHANNEL_STREAMING", "")).lower() == "true"
)

if channel is not None:
self.channel = channel
Expand Down Expand Up @@ -167,29 +172,81 @@ async def _on_message(self, message: Any) -> None:

context = self.build_message_context(message=message, text=text)

send_options = {}
if self.reply_in_thread and context.message_id:
send_options["reply_to"] = context.message_id

if self.message_handler is not None:
response_text = await self._maybe_await(self.message_handler(context))
if not response_text:
return

await self._maybe_await(
self.channel.send(
context.chat_id,
self.response_formatter(str(response_text)),
send_options,
)
)
elif getattr(self, "streaming", False) and hasattr(self.channel, "stream"):
from google.adk.agents import RunConfig
from google.adk.agents.run_config import StreamingMode
from veadk.config import getenv
from veadk.runner import _convert_messages

if self.runner.short_term_memory:
await self.runner.short_term_memory.create_session(
app_name=self.runner.app_name,
user_id=context.user_id,
session_id=context.session_id,
)

converted_messages = _convert_messages(
context.text, self.runner.app_name, context.user_id, context.session_id
)

run_config = RunConfig(
streaming_mode=StreamingMode.SSE,
max_llm_calls=int(getenv("MODEL_AGENT_MAX_LLM_CALLS", 100)),
)

async def stream_to_feishu(stream):
for converted_message in converted_messages:
async for event in self.runner.run_async(
user_id=context.user_id,
session_id=context.session_id,
new_message=converted_message,
run_config=run_config,
):
if event.content and event.content.parts:
for part in event.content.parts:
if not getattr(part, "thought", False) and part.text:
await stream.append(part.text)

await self._maybe_await(
self.channel.stream(
context.chat_id,
{"markdown": stream_to_feishu},
send_options,
)
)
else:
response_text = await self.runner.run(
messages=context.text,
user_id=context.user_id,
session_id=context.session_id,
)

if not response_text:
return

send_options = {}
if self.reply_in_thread and context.message_id:
send_options["reply_to"] = context.message_id
if not response_text:
return

await self._maybe_await(
self.channel.send(
context.chat_id,
self.response_formatter(str(response_text)),
send_options,
await self._maybe_await(
self.channel.send(
context.chat_id,
self.response_formatter(str(response_text)),
send_options,
)
)
)

def build_message_context(
self, message: Any, text: str | None = None
Expand Down
Loading