Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions crates/aionui-channel/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,22 @@ impl crate::stream_relay::ChannelSender for ChannelManager {
) -> Result<(), crate::error::ChannelError> {
self.edit_message(plugin_id, chat_id, message_id, message).await
}

async fn start_typing(&self, plugin_id: &str, chat_id: &str) {
if let Some(plugin) = self.plugins.get(plugin_id) {
plugin.start_typing(chat_id).await;
} else {
warn!(%plugin_id, %chat_id, "start_typing: plugin not found");
}
}

async fn stop_typing(&self, plugin_id: &str, chat_id: &str) {
if let Some(plugin) = self.plugins.get(plugin_id) {
plugin.stop_typing(chat_id).await;
} else {
warn!(%plugin_id, %chat_id, "stop_typing: plugin not found");
}
}
}

#[cfg(test)]
Expand Down
6 changes: 6 additions & 0 deletions crates/aionui-channel/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ pub trait ChannelPlugin: Send + Sync {

/// The most recent error message, if status is `Error`.
fn last_error(&self) -> Option<&str>;

/// Start typing indicator for a chat. Default no-op.
async fn start_typing(&self, _chat_id: &str) {}

/// Stop typing indicator for a chat. Default no-op.
async fn stop_typing(&self, _chat_id: &str) {}
}

#[cfg(test)]
Expand Down
78 changes: 76 additions & 2 deletions crates/aionui-channel/src/plugins/weixin/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use crate::constants::{WEIXIN_API_TIMEOUT, WEIXIN_POLL_TIMEOUT};
use crate::error::ChannelError;

use super::types::{
GetUpdatesRequest, GetUpdatesResponse, ILinkResponse, ITEM_TYPE_TEXT, QrCodeData, QrCodeStatusData,
SendMessageItem, SendMessageMsg, SendMessageRequest, SendTextItem,
GetConfigRequest, GetUpdatesRequest, GetUpdatesResponse, ILinkResponse, ITEM_TYPE_TEXT,
QrCodeData, QrCodeStatusData, SendMessageItem, SendMessageMsg, SendMessageRequest,
SendTextItem, SendTypingRequest,
};

/// HTTP client for the WeChat iLink Bot API.
Expand Down Expand Up @@ -213,6 +214,79 @@ impl WeixinApi {

Ok(())
}

/// Fetch bot config including typing_ticket.
///
/// `POST /ilink/bot/getconfig`
/// Requires `ilink_user_id` and optionally `context_token` (from incoming message).
pub async fn get_config(
&self,
ilink_user_id: &str,
context_token: Option<&str>,
) -> Result<String, ChannelError> {
let body = GetConfigRequest {
ilink_user_id: ilink_user_id.to_string(),
context_token: context_token.map(String::from),
};
let resp: serde_json::Value = self
.authenticated_post("ilink/bot/getconfig", &body, WEIXIN_API_TIMEOUT)
.await
.map_err(|e| ChannelError::PlatformApi(format!("getconfig failed: {e}")))?;

// Check for API-level errors
let ret = resp.get("ret").and_then(|v| v.as_i64()).unwrap_or(0);
if ret != 0 {
let errcode = resp.get("errcode").and_then(|v| v.as_i64()).unwrap_or(0);
let errmsg = resp
.get("errmsg")
.and_then(|v| v.as_str())
.unwrap_or("unknown error");
return Err(ChannelError::PlatformApi(format!(
"getconfig error: ret={ret}, errcode={errcode}, errmsg={errmsg}"
)));
}

resp.get("typing_ticket")
.and_then(|v| v.as_str())
.map(String::from)
.ok_or_else(|| ChannelError::PlatformApi("getconfig missing typing_ticket".into()))
}

/// Send or stop typing indicator.
///
/// `POST /ilink/bot/sendtyping`
/// status: 1 = start, 2 = stop
pub async fn send_typing(
&self,
ilink_user_id: &str,
typing_ticket: &str,
status: i32,
) -> Result<(), ChannelError> {
let body = SendTypingRequest {
ilink_user_id: ilink_user_id.to_string(),
typing_ticket: typing_ticket.to_string(),
status,
};
let resp: serde_json::Value = self
.authenticated_post("ilink/bot/sendtyping", &body, WEIXIN_API_TIMEOUT)
.await
.map_err(|e| ChannelError::PlatformApi(format!("sendtyping failed: {e}")))?;

// Check for API-level errors
let ret = resp.get("ret").and_then(|v| v.as_i64()).unwrap_or(0);
if ret != 0 {
let errcode = resp.get("errcode").and_then(|v| v.as_i64()).unwrap_or(0);
let errmsg = resp
.get("errmsg")
.and_then(|v| v.as_str())
.unwrap_or("unknown error");
return Err(ChannelError::PlatformApi(format!(
"sendtyping error: ret={ret}, errcode={errcode}, errmsg={errmsg}"
)));
}

Ok(())
}
}

#[cfg(test)]
Expand Down
49 changes: 48 additions & 1 deletion crates/aionui-channel/src/plugins/weixin/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::types::{
};

use super::api::WeixinApi;
use super::types::{ITEM_TYPE_TEXT, ITEM_TYPE_VOICE, WeixinRawItem, WeixinRawMessage};
use super::types::{ITEM_TYPE_TEXT, ITEM_TYPE_VOICE, TYPING_START, TYPING_STOP, WeixinRawItem, WeixinRawMessage};

/// Default base URL for the iLink Bot API.
const DEFAULT_BASE_URL: &str = "https://ilinkai.weixin.qq.com";
Expand All @@ -34,6 +34,7 @@ pub struct WeixinPlugin {
poll_handle: Option<JoinHandle<()>>,
shutdown_tx: Option<watch::Sender<bool>>,
context_tokens: Arc<DashMap<String, String>>,
typing_tickets: Arc<DashMap<String, String>>, // user_id → typing_ticket
}

impl Default for WeixinPlugin {
Expand All @@ -46,6 +47,7 @@ impl Default for WeixinPlugin {
poll_handle: None,
shutdown_tx: None,
context_tokens: Arc::new(DashMap::new()),
typing_tickets: Arc::new(DashMap::new()),
}
}
}
Expand Down Expand Up @@ -147,6 +149,7 @@ impl ChannelPlugin for WeixinPlugin {

self.api = None;
self.context_tokens.clear();
self.typing_tickets.clear();
self.status = PluginStatus::Stopped;
info!("WeChat plugin stopped");
Ok(())
Expand Down Expand Up @@ -195,6 +198,50 @@ impl ChannelPlugin for WeixinPlugin {
fn last_error(&self) -> Option<&str> {
self.last_error.as_deref()
}

async fn start_typing(&self, chat_id: &str) {
let api = match &self.api {
Some(api) => api,
None => return,
};

// Fetch typing ticket if not cached
if !self.typing_tickets.contains_key(chat_id) {
let context_token = self.context_tokens.get(chat_id).map(|v| v.clone());
debug!(chat_id=%chat_id, has_context_token=context_token.is_some(), "Fetching typing ticket via get_config");
match api.get_config(chat_id, context_token.as_deref()).await {
Ok(ticket) => {
debug!(chat_id=%chat_id, ticket_len=ticket.len(), "Got typing ticket from get_config");
self.typing_tickets.insert(chat_id.to_string(), ticket);
}
Err(e) => {
warn!(chat_id=%chat_id, error=%e, "Failed to fetch typing ticket from WeChat get_config");
return;
}
}
}

if let Some(ticket) = self.typing_tickets.get(chat_id) {
if let Err(e) = api.send_typing(chat_id, &ticket, TYPING_START).await {
warn!(chat_id=%chat_id, error=%e, "Failed to send typing start to WeChat");
self.typing_tickets.remove(chat_id);
}
}
}

async fn stop_typing(&self, chat_id: &str) {
let api = match &self.api {
Some(api) => api,
None => return,
};

if let Some(ticket) = self.typing_tickets.get(chat_id) {
if let Err(e) = api.send_typing(chat_id, &ticket, TYPING_STOP).await {
warn!(chat_id=%chat_id, error=%e, "Failed to send typing stop to WeChat");
self.typing_tickets.remove(chat_id);
}
}
}
}

// ---------------------------------------------------------------------------
Expand Down
21 changes: 21 additions & 0 deletions crates/aionui-channel/src/plugins/weixin/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,27 @@ pub(crate) struct SendTextItem {
// SSE event payloads (frontend-facing — DO NOT CHANGE field names)
// ---------------------------------------------------------------------------

// --- typing ticket ---
#[derive(Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) struct GetConfigRequest {
pub ilink_user_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub context_token: Option<String>,
}

// --- sendtyping ---
pub(crate) const TYPING_START: i32 = 1;
pub(crate) const TYPING_STOP: i32 = 2;

#[derive(Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) struct SendTypingRequest {
pub ilink_user_id: String,
pub typing_ticket: String,
pub status: i32,
}

#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct SseQrEvent {
Expand Down
39 changes: 39 additions & 0 deletions crates/aionui-channel/src/stream_relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ pub trait ChannelSender: Send + Sync {
message_id: &str,
message: UnifiedOutgoingMessage,
) -> Result<(), ChannelError>;

/// Start typing indicator for a chat. Default no-op.
async fn start_typing(&self, _plugin_id: &str, _chat_id: &str) {}

/// Stop typing indicator for a chat. Default no-op.
async fn stop_typing(&self, _plugin_id: &str, _chat_id: &str) {}
}

/// Relays agent stream events to an IM platform.
Expand Down Expand Up @@ -72,6 +78,19 @@ impl ChannelStreamRelay {
let mut text_buffer = String::new();
let mut has_content = false;

// Start continuous typing indicator — refreshes every 2s like Hermes
let keep_typing = {
let sender = Arc::clone(&self.sender);
let plugin_id = self.config.plugin_id.clone();
let chat_id = self.config.chat_id.clone();
tokio::spawn(async move {
loop {
sender.start_typing(&plugin_id, &chat_id).await;
tokio::time::sleep(Duration::from_secs(2)).await;
}
})
};

loop {
match rx.recv().await {
Ok(event) => match ChannelMessageService::process_stream_event(&event) {
Expand Down Expand Up @@ -100,6 +119,14 @@ impl ChannelStreamRelay {
.send_message(&self.config.plugin_id, &self.config.chat_id, final_msg)
.await;
}

keep_typing.abort();

// Stop typing after sending message to avoid gap
self.sender
.stop_typing(&self.config.plugin_id, &self.config.chat_id)
.await;

info!(
plugin_id = %self.config.plugin_id,
chat_id = %self.config.chat_id,
Expand All @@ -126,6 +153,12 @@ impl ChannelStreamRelay {
.sender
.send_message(&self.config.plugin_id, &self.config.chat_id, error_msg)
.await;

keep_typing.abort();

self.sender
.stop_typing(&self.config.plugin_id, &self.config.chat_id)
.await;
break;
}
None => {}
Expand All @@ -139,6 +172,12 @@ impl ChannelStreamRelay {
.send_message(&self.config.plugin_id, &self.config.chat_id, final_msg)
.await;
}

keep_typing.abort();

self.sender
.stop_typing(&self.config.plugin_id, &self.config.chat_id)
.await;
break;
}
Err(broadcast::error::RecvError::Lagged(n)) => {
Expand Down