Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ target

*.local.json
.env

.obsidian
_*.md
23 changes: 12 additions & 11 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,18 @@ cargo fmt --check # format check

## How to Add a New Channel Provider

1. Create `src/channel/<name>.rs` with a struct that implements `ChannelProvider`.
2. Implement `start(&self, tx, self_arc, shutdown)` — run the polling or webhook loop, check `SecurityGate`, stamp each message with `MessageContext`, send on `tx`. Exit when `shutdown.cancelled()` resolves. Use `self_arc` (not `self`) inside any closures that need to reference the provider.
3. Implement `send_response(&self, chat_id, response)` — deliver each `ResponseChunk` to the platform.
4. Implement `resolve_users(&self, users)` — convert `AllowedUser` entries to platform-native ID strings for `SecurityGate`.
5. Stamp `MessageContext` at ingestion: `workspace: Arc<WorkspaceHandle>`, `provider: self_arc.clone()`, `output_config: Arc::new(effective_output_config(global, channel_cfg))`.
6. Add `pub mod <name>;` in `src/channel/mod.rs`.
7. Add the kind string to `KNOWN_CHANNELS` in `src/config.rs`.
8. Add construction and startup logic in the channel match block in `src/main.rs`.
9. Add `warn_misplaced_fields()` entries in `src/config.rs` for any platform-specific config fields.
10. Add tests in `src/tests/channel/<name>_test.rs` and wire with `#[path = ...]` in the source file.
11. Update the channels table in `README.md` and add a field reference section in `docs/configuration.md`.
1. Create `src/channel/<name>.rs` with a struct that implements `ChannelProvider` and `ChannelProviderFactory`.
2. Implement `ChannelProviderFactory::create(ch_config, workspace, global_output)` — validate provider-specific config fields, build a temporary provider with a dummy `SecurityGate` to call `resolve_users`, then build the real provider with the resolved gate and effective output config.
3. Implement `start(&self, tx, self_arc, shutdown)` — run the polling or webhook loop, check `SecurityGate`, stamp each message with `MessageContext`, send on `tx`. Exit when `shutdown.cancelled()` resolves. Use `self_arc` (not `self`) inside any closures that need to reference the provider.
4. Implement `send_response(&self, chat_id, response)` — deliver each `ResponseChunk` to the platform.
5. Implement `resolve_users(&self, users)` — convert `AllowedUser` entries to platform-native ID strings for `SecurityGate`.
6. Stamp `MessageContext` at ingestion: `workspace: Arc<WorkspaceHandle>`, `provider: self_arc.clone()`, `output_config: Arc::new(effective_output_config(global, channel_cfg))`.
7. Add `pub mod <name>;` in `src/channel/mod.rs`.
8. Add the kind string to `KNOWN_CHANNELS` in `src/config.rs`.
9. Add a match arm for the new kind in `build()` in `src/channel/mod.rs`.
10. Add `warn_misplaced_fields()` entries in `src/config.rs` for any platform-specific config fields.
11. Add tests in `src/tests/channel/<name>_test.rs` and wire with `#[path = ...]` in the source file.
12. Update the channels table in `README.md` and add a field reference section in `docs/configuration.md`.

## Do Not

Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ Lightweight Rust daemon that bridges messaging platforms to local AI CLI tools.
```
┌──────────────┐
│ Telegram │──┐
├──────────────┤ │ ┌─────────────────────────────────────────┐ ┌──────────────┐
│ WhatsApp │──┼────▶│ RustifyMyClaw │────▶│ claude │
├──────────────┤ │ │ Security → Router → Executor → Format │◀───│ codex │
│ Slack │──┘ └─────────────────────────────────────────┘ │ gemini │
└──────────────┘ └──────────────┘
├──────────────┤ │ ┌─────────────────────────────────────────┐ ┌──────────────┐
│ WhatsApp │──┼────▶│ RustifyMyClaw │────▶ │ claude │
├──────────────┤ │ │ Security → Router → Executor → Format │◀─── │ codex │
│ Slack │──┘ └─────────────────────────────────────────┘ │ gemini │
└──────────────┘ └──────────────┘
```

## Why this exists
Expand Down
35 changes: 25 additions & 10 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ RustifyMyClaw is a Rust daemon that bridges messaging platforms to local AI CLI
```
┌──────────────┐
│ Telegram │──┐
├──────────────┤ │ ┌─────────────────────────────────────────┐ ┌──────────────┐
│ WhatsApp │──┼────▶│ RustifyMyClaw │────▶│ claude │
├──────────────┤ │ │ Security → Router → Executor → Format │◀────│ codex │
│ Slack │──┘ └─────────────────────────────────────────┘ │ gemini │
└──────────────┘ ▲ └──────────────┘
├──────────────┤ │ ┌─────────────────────────────────────────┐ ┌──────────────┐
│ WhatsApp │──┼───│ RustifyMyClaw │────▶ │ claude │
├──────────────┤ │ │ Security → Router → Executor → Format │◀──── │ codex │
│ Slack │──┘ └─────────────────────────────────────────┘ │ gemini │
└──────────────┘ ▲ └──────────────┘
~/.rustifymyclaw/config.yaml
```
Expand Down Expand Up @@ -56,11 +56,13 @@ A message from Telegram to a response back:
graph TD
main --> config
main --> router
main --> channel_telegram[channel/telegram]
main --> channel_whatsapp[channel/whatsapp]
main --> channel_slack[channel/slack]
main --> channel_mod[channel/mod — ChannelProvider + factory]
main --> config_reload

channel_mod --> channel_telegram[channel/telegram]
channel_mod --> channel_whatsapp[channel/whatsapp]
channel_mod --> channel_slack[channel/slack]

router --> command
router --> session
router --> executor
Expand Down Expand Up @@ -95,6 +97,7 @@ graph TD
| Channel `start()` signature | `&self` + separate `self_arc: Arc<dyn ChannelProvider>` argument | Polling closures need owned captures. A borrow doesn't live long enough; passing `Arc` explicitly avoids self-referential struct construction. |
| Backend instantiation | One instance per distinct backend name, stored in `HashMap<String, Arc<dyn CliBackend>>` | No duplicate allocations when multiple workspaces share a backend. |
| Config hot-reload | Rate limits apply immediately; all other changes require restart | Channel connections and security gates are constructed once at startup. Hot-patching them adds complexity without much operational value. |
| Channel construction | `ChannelProviderFactory` trait + `channel::build()` dispatch | Each provider owns its config validation and two-phase `SecurityGate` construction. `main.rs` calls a single factory function per channel. |

## Extension Points

Expand All @@ -114,7 +117,7 @@ Add a match arm to `build()` in `src/backend/mod.rs` and the name to `KNOWN_BACK

### Adding a new channel provider

See the step-by-step checklist in `CLAUDE.md`. The key interface is `ChannelProvider` in `src/channel/mod.rs`:
See the step-by-step checklist in `CLAUDE.md`. The key interfaces are `ChannelProvider` and `ChannelProviderFactory` in `src/channel/mod.rs`:

```rust
pub trait ChannelProvider: Send + Sync {
Expand All @@ -127,9 +130,21 @@ pub trait ChannelProvider: Send + Sync {
async fn send_response(&self, chat_id: &ChatId, response: FormattedResponse) -> Result<()>;
async fn resolve_users(&self, users: &[AllowedUser]) -> Result<HashSet<String>>;
}

pub trait ChannelProviderFactory: ChannelProvider + Sized {
async fn create(
ch_config: &ChannelConfig,
workspace: Arc<RwLock<WorkspaceHandle>>,
global_output: &Arc<OutputConfig>,
) -> Result<Arc<dyn ChannelProvider>>;
}
```

The `self_arc` parameter exists because polling closures need owned captures of the provider. Pass it through to any closure that stamps `MessageContext`.
`ChannelProviderFactory::create()` encapsulates the two-phase construction pattern: build a temporary provider with a dummy `SecurityGate` to call `resolve_users()`, then build the real provider with the resolved gate and effective output config. Each provider validates its own config fields inside `create()`.

The `self_arc` parameter on `start()` exists because polling closures need owned captures of the provider. Pass it through to any closure that stamps `MessageContext`.

`channel::build()` dispatches by kind string and is the single entry point called from `main.rs`.

### Adding a new command

Expand Down
142 changes: 98 additions & 44 deletions src/channel/mod.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,98 @@
use std::collections::HashSet;
use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

use crate::types::{AllowedUser, ChatId, FormattedResponse, InboundMessage};

pub mod slack;
pub mod telegram;
pub mod whatsapp;

/// Abstraction over a messaging platform.
///
/// Each platform implements this trait to normalize its API behind a common interface.
/// The executor and router never touch platform-specific types.
#[async_trait]
pub trait ChannelProvider: Send + Sync {
/// Start receiving messages and forward them to `tx`.
///
/// `self_arc` is the same `Arc` that will be embedded in each `MessageContext` so
/// the router can call `send_response` on the originating provider. Passing it here
/// avoids self-referential struct construction in the provider.
///
/// `shutdown` is cancelled when the daemon is stopping. The provider must exit
/// promptly when `shutdown.cancelled()` resolves.
///
/// This method runs indefinitely (polling loop). Spawn it as a Tokio task.
async fn start(
&self,
tx: mpsc::Sender<InboundMessage>,
self_arc: Arc<dyn ChannelProvider>,
shutdown: CancellationToken,
) -> Result<()>;

/// Send a formatted response back to the originating chat.
async fn send_response(&self, chat_id: &ChatId, response: FormattedResponse) -> Result<()>;

/// Resolve the `AllowedUser` list for this channel into a set of platform-native
/// user ID strings suitable for `SecurityGate` comparison.
async fn resolve_users(&self, users: &[AllowedUser]) -> Result<HashSet<String>>;
}
use std::collections::HashSet;
use std::sync::Arc;

use anyhow::{bail, Context, Result};
use async_trait::async_trait;
use tokio::sync::{mpsc, RwLock};
use tokio_util::sync::CancellationToken;
use tracing::info;

use crate::config::{ChannelConfig, OutputConfig};
use crate::types::{AllowedUser, ChatId, FormattedResponse, InboundMessage, WorkspaceHandle};

pub mod slack;
pub mod telegram;
pub mod whatsapp;

/// Abstraction over a messaging platform.
///
/// Each platform implements this trait to normalize its API behind a common interface.
/// The executor and router never touch platform-specific types.
#[async_trait]
pub trait ChannelProvider: Send + Sync {
/// Start receiving messages and forward them to `tx`.
///
/// `self_arc` is the same `Arc` that will be embedded in each `MessageContext` so
/// the router can call `send_response` on the originating provider. Passing it here
/// avoids self-referential struct construction in the provider.
///
/// `shutdown` is cancelled when the daemon is stopping. The provider must exit
/// promptly when `shutdown.cancelled()` resolves.
///
/// This method runs indefinitely (polling loop). Spawn it as a Tokio task.
async fn start(
&self,
tx: mpsc::Sender<InboundMessage>,
self_arc: Arc<dyn ChannelProvider>,
shutdown: CancellationToken,
) -> Result<()>;

/// Send a formatted response back to the originating chat.
async fn send_response(&self, chat_id: &ChatId, response: FormattedResponse) -> Result<()>;

/// Resolve the `AllowedUser` list for this channel into a set of platform-native
/// user ID strings suitable for `SecurityGate` comparison.
async fn resolve_users(&self, users: &[AllowedUser]) -> Result<HashSet<String>>;
}

/// Factory trait for constructing a [`ChannelProvider`] from configuration.
///
/// Each provider implements this to encapsulate its own config-field validation,
/// user resolution, and two-phase [`SecurityGate`] construction. The companion
/// [`build`] function dispatches to the correct implementation by channel kind.
#[async_trait]
pub trait ChannelProviderFactory: ChannelProvider + Sized {
/// Build a fully-initialised provider from a channel config block.
///
/// Implementations should:
/// 1. Validate provider-specific fields in `ch_config`.
/// 2. Construct a temporary instance with a dummy `SecurityGate` to call `resolve_users`.
/// 3. Build the real instance with the resolved gate and effective output config.
async fn create(
ch_config: &ChannelConfig,
workspace: Arc<RwLock<WorkspaceHandle>>,
global_output: &Arc<OutputConfig>,
) -> Result<Arc<dyn ChannelProvider>>;
}

/// Construct a [`ChannelProvider`] for the given channel config block.
///
/// This is the single entry point that `main.rs` calls for every configured channel.
pub async fn build(
ch_config: &ChannelConfig,
workspace_name: &str,
workspace: Arc<RwLock<WorkspaceHandle>>,
global_output: &Arc<OutputConfig>,
) -> Result<Arc<dyn ChannelProvider>> {
let provider: Arc<dyn ChannelProvider> = match ch_config.kind.as_str() {
"telegram" => telegram::TelegramProvider::create(ch_config, workspace, global_output).await,
"whatsapp" => whatsapp::WhatsAppProvider::create(ch_config, workspace, global_output).await,
"slack" => slack::SlackProvider::create(ch_config, workspace, global_output).await,
other => bail!("channel kind `{other}` is not implemented"),
}
.with_context(|| {
format!(
"workspace `{workspace_name}`: failed to build `{}` channel",
ch_config.kind
)
})?;

info!(
workspace = workspace_name,
kind = ch_config.kind,
bot_name = ch_config.bot_name.as_deref().unwrap_or("(unnamed)"),
"channel registered"
);

Ok(provider)
}
41 changes: 39 additions & 2 deletions src/channel/slack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message as WsMessage;
use tokio_util::sync::CancellationToken;

use crate::channel::ChannelProvider;
use crate::config::OutputConfig;
use crate::channel::{ChannelProvider, ChannelProviderFactory};
use crate::config::{self, ChannelConfig, OutputConfig};
use crate::security::SecurityGate;
use crate::types::{
AllowedUser, ChannelKind, ChatId, FormattedResponse, InboundMessage, MessageContext,
Expand Down Expand Up @@ -121,7 +121,44 @@ impl SlackProvider {
thread_map: RwLock::new(HashMap::new()),
}
}
}

#[async_trait]
impl ChannelProviderFactory for SlackProvider {
async fn create(
ch_config: &ChannelConfig,
workspace: Arc<RwLock<WorkspaceHandle>>,
global_output: &Arc<OutputConfig>,
) -> Result<Arc<dyn ChannelProvider>> {
let app_token = ch_config
.app_token
.clone()
.context("slack channel requires `app_token` (xapp-…)")?;
let use_threads = ch_config.use_threads.unwrap_or(false);

let tmp = Self::new(
ch_config.token.clone(),
app_token.clone(),
use_threads,
SecurityGate::new(Default::default()),
Arc::clone(&workspace),
Arc::clone(global_output),
);
let resolved = tmp.resolve_users(&ch_config.allowed_users).await?;
let gate = SecurityGate::new(resolved);
let effective_output = Arc::new(config::effective_output_config(global_output, ch_config));
Ok(Arc::new(Self::new(
ch_config.token.clone(),
app_token,
use_threads,
gate,
workspace,
effective_output,
)))
}
}

impl SlackProvider {
/// Open a Socket Mode WebSocket connection and return its URL.
async fn open_socket_connection(&self) -> Result<String> {
let resp: SocketModeConnectResponse = self
Expand Down
29 changes: 27 additions & 2 deletions src/channel/telegram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use tokio::sync::mpsc;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;

use crate::channel::ChannelProvider;
use crate::config::OutputConfig;
use crate::channel::{ChannelProvider, ChannelProviderFactory};
use crate::config::{self, ChannelConfig, OutputConfig};
use crate::security::SecurityGate;
use crate::types::{
AllowedUser, ChannelKind, ChatId, FormattedResponse, InboundMessage, MessageContext,
Expand Down Expand Up @@ -45,6 +45,31 @@ impl TelegramProvider {
}
}

#[async_trait]
impl ChannelProviderFactory for TelegramProvider {
async fn create(
ch_config: &ChannelConfig,
workspace: Arc<RwLock<WorkspaceHandle>>,
global_output: &Arc<OutputConfig>,
) -> Result<Arc<dyn ChannelProvider>> {
let tmp = Self::new(
ch_config.token.clone(),
SecurityGate::new(Default::default()),
Arc::clone(&workspace),
Arc::clone(global_output),
);
let resolved = tmp.resolve_users(&ch_config.allowed_users).await?;
let gate = SecurityGate::new(resolved);
let effective_output = Arc::new(config::effective_output_config(global_output, ch_config));
Ok(Arc::new(Self::new(
ch_config.token.clone(),
gate,
workspace,
effective_output,
)))
}
}

#[async_trait]
impl ChannelProvider for TelegramProvider {
async fn start(
Expand Down
Loading
Loading