Multi-agent teams: parallel processing, agent-to-agent communication, and request tracking #163
Multi-agent teams: parallel processing, agent-to-agent communication, and request tracking #163dpbmaverick98 wants to merge 33 commits intoTinyAGI:mainfrom
Conversation
What: - Add three new tables: conversations, conversation_responses, conversation_pending_agents - Add 11 new functions for conversation state management - Follow existing SQLite patterns (WAL mode, transactions, indexes) Why: Previously, all conversation state was stored only in memory (Map<string, Conversation>). This meant that if the queue-processor crashed or was restarted during a team conversation, all active conversation state was lost. Agents would continue processing their messages, but the conversation would never complete because the pending counter and response aggregation were gone. This change persists conversation state to SQLite, enabling: 1. Restart recovery - conversations can be resumed after crash 2. State inspection - active conversations can be queried via API 3. Debuggability - conversation history is preserved Assumptions: - Conversations are short-lived (minutes, not days), so we don't need to persist the full Conversation object (Sets, Maps). We persist the minimal state needed to reconstruct: counters, IDs, and responses. - Files referenced in conversations are not persisted (they're ephemeral). - The existing in-memory conversations Map is still used for fast access during normal operation; the DB is the source of truth for recovery. Pattern compliance: - Uses transaction().immediate() for atomic operations (like claimNextMessage) - Uses INSERT OR REPLACE for upserts - Uses ON DELETE CASCADE for cleanup - Follows existing naming conventions and timestamp formats
…very What: - Remove agentProcessingChains Map that enforced sequential processing per agent - Refactor processMessage to use fire-and-forget pattern for invokeAgent - Add handleSimpleResponse and handleTeamResponse async handlers - Add handleTeamError for error recovery in team contexts - Add startup recovery logic to load active conversations from DB - Add conversation pruning maintenance interval Why: Previously, the queue-processor used a Promise chain per agent (agentProcessingChains) to ensure messages were processed sequentially. This caused the "freeze" problem: if agent A was processing a long request (e.g., 30s Claude API call), no other messages to agent A could be processed until it completed. This change makes invokeAgent fire-and-forget: 1. processMessage starts invokeAgent and returns immediately 2. The response is handled asynchronously by handleSimpleResponse/handleTeamResponse 3. Multiple messages to the same agent can be in-flight simultaneously 4. The queue processor never blocks on slow API calls Additionally, conversation state is now persisted to SQLite (from previous commit) and recovered on startup. This means if the queue-processor restarts during a team conversation, it will resume where it left off instead of losing all state. Assumptions: - invokeAgent is idempotent enough that reprocessing after a crash is safe - The DB transaction in decrementPendingInDb prevents race conditions - In-memory conversations Map is still used for fast access; DB is for recovery - Fire-and-forget is acceptable because we have retry logic via dead letter queue Breaking changes: - Removed per-agent sequential processing guarantee. Previously messages to the same agent were guaranteed to process sequentially. Now they process concurrently. This is actually the desired behavior (no freezing), but it means agents must handle concurrent requests if they share state. Pattern compliance: - Uses async/await for response handlers (cleaner than callbacks) - Uses DB functions from previous commit for persistence - Maintains existing event emission for observability - Preserves all existing error handling and logging
What:
- Add src/lib/signals.ts with file-based signaling system
- Modify enqueueResponse to signal channel when response is ready
- Update Discord, Telegram, and WhatsApp clients to use push notifications
- Add 10-second fallback polling for reliability
Why:
Previously, channel clients polled /api/responses/pending every 1-2 seconds.
This caused unnecessary latency (average 0.5-1s delay) and wasted CPU/IO on
both the client and server.
This change implements push notifications via file system:
1. When enqueueResponse is called, it writes a signal file (.tinyclaw/signals/{channel}.ready)
2. Channel clients use fs.watch() to get notified immediately
3. Response latency drops from ~1s to near-zero
4. Fallback polling every 10s catches any missed signals
Assumptions:
- File system watch (fs.watch) is reliable enough for this use case
- Signal files are cleaned up after processing to prevent duplicate triggers
- 10-second fallback is acceptable for missed signals (rare)
- All three channel clients (Discord, Telegram, WhatsApp) are on the same machine
Trade-offs:
- File-based signaling only works for local processes (same machine)
- If we need distributed deployment later, this would need to be replaced
with something like Redis pub/sub or NATS
- File system watches can be unreliable on some platforms (we have fallback)
Pattern compliance:
- Uses existing TINYCLAW_HOME for signal directory
- Follows existing error handling patterns
- Maintains backward compatibility (polling still works)
- Clean shutdown with unwatch() on SIGINT/SIGTERM
…ing guarantee What: - Make emitEvent() async to allow awaiting event listener completion - Update EventListener type to support async listeners - Add await to all emitEvent() calls in queue-processor.ts: - response_ready (handleSimpleResponse) - chain_handoff (handleTeamResponse) - team_chain_start (processMessage) - Make completeConversation() async and await team_chain_end emission - Wrap conversation recovery in async recoverConversations() function - Move startup logging into async IIFE to properly await emitEvent Why: The visualizer relies on event ordering: chain_step_start → chain_step_done → response_ready. Without await, events could be emitted in order but processed out of order due to async listener scheduling. This was a critical issue found in the NATS implementation (missing awaits on publishEvent calls). The same pattern exists here - emitEvent was fire-and-forget, so the visualizer could receive events out of sequence under high concurrency. By awaiting emitEvent, we guarantee: 1. Events are processed by listeners before continuing 2. Visualizer sees events in correct order 3. SSE clients receive events sequentially Assumptions: - Event listeners are fast enough that awaiting them won't block processing - The slight overhead of await is acceptable for ordering guarantees - Listeners that need to be fire-and-forget should internally queue work Breaking changes: - emitEvent() now returns Promise<void> instead of void - completeConversation() now returns Promise<void> - Code using these functions must now await them Pattern compliance: - Matches the fix applied in NATS branch (adding awaits to publishEvent) - Uses async/await consistently throughout the codebase - Maintains error handling (try/catch around await)
What: - Add next_retry_at column to messages table for scheduling retries - Update failMessage() to calculate exponential backoff with jitter - Update claimNextMessage() to respect next_retry_at timestamp - Add migration for existing databases (ALTER TABLE) Why: Previously, failed messages were immediately retried (status reset to 'pending'). Under high load or during outages, this caused a "thundering herd" problem: all failed messages would retry simultaneously, overwhelming the system. This change implements exponential backoff with jitter: - Retry 1: ~100ms delay - Retry 2: ~200ms delay - Retry 3: ~400ms delay - Retry 4: ~800ms delay - Retry 5: ~1600ms delay (capped at 30s) Plus 0-100ms random jitter to spread out retries and prevent synchronized retry storms. Assumptions: - Messages that fail temporarily (rate limits, network blips) will succeed after a short delay - Spreading retries over time is better than immediate retry - 5 retries with exponential backoff is sufficient for transient failures Implementation details: - ORDER BY clause prioritizes messages without next_retry_at (new messages) - Then orders by next_retry_at to process earliest scheduled first - Messages with future next_retry_at are skipped until their time comes Pattern compliance: - Uses same transaction pattern as claimNextMessage for atomicity - Maintains backward compatibility (next_retry_at is nullable) - Follows existing logging conventions
What: - Add src/lib/heartbeat.ts with heartbeat read/write functions - Queue-processor writes heartbeat every 5 seconds with timestamp, pid, uptime - Channel clients check heartbeat staleness in fallback polling loop - Atomic file write (temp + rename) to prevent corruption - Clean shutdown removes heartbeat file Why: File-based signaling (signals.ts) has no way to detect when the queue-processor crashes. If queue-processor dies: - Signal files stop being written (but clients don't know) - Clients keep watching, unaware of the crash - 10-second fallback polling continues but never gets new responses With heartbeat monitoring: - Channel clients detect stale heartbeat (default: 15s threshold) - Log warning when queue-processor may have crashed - Users can see the issue and restart the service This is simpler than NATS's consumer iterator monitoring but achieves the same goal: detecting when the message processor is unhealthy. Assumptions: - 5-second heartbeat interval is frequent enough for detection - 15-second staleness threshold (3 missed heartbeats) is reasonable - File system timestamps are accurate enough for health checks - Channel clients should log warnings but not auto-restart (user decision) Pattern compliance: - Uses same TINYCLAW_HOME directory as other state files - Follows existing error handling (log and continue) - Atomic write pattern prevents corrupted heartbeat files - Cleanup on SIGINT/SIGTERM for graceful shutdown
What: - Add MAX_MESSAGE_SIZE constant (1MB - Claude API limit) - Add validateMessage() function to check message size - Validate message before both invokeAgent calls (simple and team contexts) - Fail message immediately with clear error if too large Why: Previously, messages larger than 1MB would be sent to Claude API, which would reject them with an error. The error would trigger retry logic, wasting resources on a message that can never succeed. With validation: 1. Message size checked before any API call 2. Oversized messages fail immediately (no retry) 3. Clear error message logged for debugging 4. Prevents wasted API calls and retry cycles Assumptions: - 1MB is the appropriate limit for Claude API - Message size is the primary validation needed (other validations may be added) - Failing immediately is better than retrying oversized messages Pattern compliance: - Uses existing failMessage() for consistency - Logs error with context for debugging - Returns early (guard clause pattern) - Non-breaking change (new validation, no API changes)
… with conversation lock
**What**
Added withConversationLock() to handleTeamResponse() and handleTeamError() to prevent
race conditions when multiple agents finish simultaneously for the same conversation.
**Why (Critical Race Condition)**
When Agent A and Agent B both complete for same conversation concurrently:
- Both call handleTeamResponse(conv) with shared conv object reference
- Both modify conv.totalMessages++, conv.pending+= without synchronization (NOT ATOMIC)
- Both can reach if (newPending === 0) and call completeConversation(conv) twice
- Results in: lost updates, duplicate completion events, corrupted state
Example timeline:
Agent A finishes → handleTeamResponse starts
- persistResponse(conv_id, agentA, responseA)
- conv.totalMessages++ (read=5, write=6)
Agent B finishes → handleTeamResponse starts (same conv reference)
- persistResponse(conv_id, agentB, responseB)
- conv.totalMessages++ (read=5, write=6) ← Lost Agent A's increment!
Result: conv.totalMessages = 6 instead of 7. Conversation state corrupted.
**Solution**
Wrapped function body with withConversationLock(conv.id) which:
- Serializes updates: only one agent modifies conv at a time
- Prevents concurrent modifications to same conversation
- Ensures only one agent reaches completion check
**Similar Fix Applied To**
- handleTeamResponse(): Wraps entire response handling logic
- handleTeamError(): Same pattern for error handling
**Assumptions**
1. Fire-and-forget pattern is maintained (invoke is still async)
2. Lock overhead acceptable (milliseconds per conversation)
3. Conversation objects exist long enough for all agents to complete
4. Lock gracefully handles conversation deletion by cleanup in conversation.ts
**Testing Considerations**
- Test with 3+ agents finishing within milliseconds of each other
- Verify team_chain_end event emitted exactly once
- Check conversation state consistency in database
- Monitor for deadlocks (lock implementation has timeout handling)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…or handling
**What**
Changed clearSignal() from check-then-delete pattern to try-delete with selective
error handling. Now ignores ENOENT errors when file is already deleted.
**Why (Race Condition)**
Previous implementation used check-then-delete pattern:
```typescript
if (fs.existsSync(signalFile)) {
fs.unlinkSync(signalFile); // TOCTOU: file deleted between check and delete
}
```
This creates a Time-Of-Check-Time-Of-Use (TOCTOU) race condition:
1. Process A checks: file exists
2. Process B checks: file exists
3. Process A deletes file
4. Process B tries to delete: ENOENT error
5. Error not caught, may propagate and crash
Additionally, fs.existsSync can be slow on high-latency filesystems.
**Solution**
Direct try-delete approach with selective error handling:
```typescript
try {
fs.unlinkSync(signalFile);
} catch (error: any) {
if (error?.code !== 'ENOENT') {
throw error; // Re-throw unexpected errors
}
// Ignore ENOENT: normal when another process deleted first
}
```
Benefits:
- Atomic delete operation (no TOCTOU window)
- Faster (one syscall instead of two)
- Graceful: ignores benign ENOENT
- Still fails on real errors (permissions, disk full, etc.)
**When This Occurs**
When multiple channel clients process responses simultaneously:
- Telegram client calls clearSignal('telegram')
- WhatsApp client calls clearSignal('whatsapp')
- If same signal file, both try to delete → first succeeds, second gets ENOENT
Current likelihood: Low (different channels have different files) but possible
if signal file corruption or manual cleanup happens concurrently.
**Assumptions**
1. ENOENT is expected and benign (file already deleted)
2. Other errors (EACCES, EIO) should propagate and fail loudly
3. fs.unlinkSync is atomic (POSIX guarantee)
4. Process has correct permissions to delete signal files
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…resilience
**What**
Added recoverStaleConversations() function to detect and recover conversations
that are stuck in 'active' state, marking them as 'completed' so they can be
purged and don't cause memory leaks.
Called on startup and periodically (every 5 minutes) during normal operation.
**Why**
Conversations can become stuck in 'active' state if:
1. queue-processor crashes while agents are processing
2. Network failure prevents agent response from being persisted
3. Bug in agent handler prevents proper completion
4. Database corruption in conversation_pending_agents table
Without recovery:
- In-memory conversations Map grows unbounded
- Stuck conversations never emit team_chain_end event
- Users see conversation as "in progress" forever
- Memory leak: conversations never garbage collected
With recovery:
- Conversations marked as 'completed' after 30 min of inactivity
- Allows pruneOldConversations() to delete them
- Prevents memory leaks and orphaned conversations
- Teams can be retried by user if truly needed
**Implementation Details**
```typescript
export function recoverStaleConversations(staleThresholdMs = 30 * 60 * 1000): number {
const cutoff = Date.now() - staleThresholdMs;
return getDb().prepare(`
UPDATE conversations
SET status = 'completed'
WHERE status = 'active' AND updated_at < ?
`).run(cutoff).changes;
}
```
**Assumptions**
1. 30-minute threshold is reasonable for detecting stuck conversations
2. Marking as 'completed' is safe (responses already persisted to DB)
3. Periodic recovery (every 5 min) catches stuck conversations quickly
4. Users can retry conversation if legitimate work was interrupted
**Trade-offs**
- Possible data loss if agent is legitimately processing for 30+ min
(Mitigation: user can retry conversation, which is rare use case)
- Memory will grow to peak of ~30 min of stuck conversations
(Acceptable: periodic pruning cleans them up)
**Testing Considerations**
- Verify conversations marked as completed can be queried
- Check team_chain_end event emitted when recovery completes conversation
- Monitor logs for false positives (legitimate long-running conversations)
- Test crash scenarios to verify recovery works
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…documentation **What** Fixed two issues in recoverStaleConversations(): 1. Don't update updated_at when marking as completed (keeps original timestamp) 2. Enhanced documentation explaining why team_chain_end is NOT emitted **Why Issue TinyAGI#1: Pruning Timestamp Reset** Previous code: ```typescript UPDATE conversations SET status = 'completed', updated_at = ? ← WRONG: resets timestamp WHERE status = 'active' AND updated_at < ? ``` Problem timeline: - T=0: Conversation starts, updated_at = T0 - T=30min: Conversation gets stuck (no updates) - T=30min: Recovery runs, marks completed, sets updated_at = T30 - T=30min+24h: pruneOldConversations() looks for updated_at < 24h ago - Result: Conversation not pruned until T=30min+24h (stays in DB 24+ hours) Better approach: ```typescript UPDATE conversations SET status = 'completed' ← Keep original updated_at timestamp WHERE status = 'active' AND updated_at < ? ``` Now pruning works correctly: - Stale conversation marked completed at T=30min - Original updated_at = T0 (30+ min ago) - pruneOldConversations() deletes it when updated_at < 24h ago (works!) **Why Issue TinyAGI#2: Missing team_chain_end Event** Recovery completion is NOT a natural completion: - Natural completion: All agents finish, responses aggregated, user gets result - Stale recovery: Conversation abandoned after crash, responses may be incomplete Implications: - Visualizer won't show recovery as "completed" (correct - it's artificial) - Events not sent (prevents false positives in monitoring) - Users understand recovery = lost work, not success Alternative considered: Emit team_chain_end with recovery flag - Rejected: Would confuse visualizer and monitoring - Recovery should be silent cleanup, not broadcast as completion **Assumptions** 1. Keeping original updated_at is correct behavior (allows proper pruning) 2. Silent recovery is acceptable (users can retry if needed) 3. 30-minute stale threshold correctly identifies stuck conversations 4. Not emitting events prevents false positives in event-based systems **Testing** Verify: 1. Stale conversation marked as completed 2. completed_at timestamp NOT changed (still ~30min old) 3. pruneOldConversations() deletes it after 24h from original time 4. No team_chain_end event in logs for recovered conversations Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…covery visibility
What:
- Reduce stale conversation threshold from 30min to 10min (Gap 1)
- Add getStaleConversations() to get details of stuck conversations
- Add WARN-level logging with team/conversation details on recovery (Gap 2)
- Emit crash_recovery event for visualizer/monitoring (Gap 2)
Why:
Gap 1 (Slow Detection): 30min threshold meant users could lose up to 30 minutes
of work if agent crashed. 10min reduces data loss window while still giving
slow agents reasonable grace period.
Gap 2 (Silent Recovery): Recovery was invisible (INFO level). Ops couldn't
tell if completion was normal or crash recovery. WARN logs + events provide
visibility for monitoring and alerting.
Implementation:
- getStaleConversations() returns {id, teamId, duration} for each stuck conv
- Startup recovery: WARN log with 🔴 CRASH RECOVERY prefix + event per conv
- Periodic recovery: WARN log with 🔴 PERIODIC RECOVERY prefix + events
- Events include conversationId, teamId, stuckForMs, recoveredAt/source
Assumptions:
- 10min is acceptable grace period for slow agents (2x NATS heartbeat)
- WARN level is appropriate for crash recovery (not ERROR since it's expected)
- Events emitted before actual recovery (state change happens after logging)
Risk: None (logging only, no behavior change)
Testing: Kill agent mid-processing, verify WARN logs + events after 10min
What: - Add backup.sh script for daily automated SQLite backups - Backups stored in ~/.tinyclaw/backups/ with 7-day retention - Add database integrity check (PRAGMA integrity_check) on startup - Copy WAL files if present (WAL mode consistency) - Verify backup is readable before considering it valid Why: SQLite database is a single point of failure. Without backups, corruption or accidental deletion means total data loss. With backups, worst case is losing last 24 hours of conversation state (acceptable for production use). Usage: ./backup.sh # Manual backup crontab -e # Add to cron for daily backups 0 2 * * * /path/to/backup.sh # Daily at 2 AM Recovery: cp ~/.tinyclaw/backups/tinyclaw_YYYYMMDD_HHMMSS.db ~/.tinyclaw/tinyclaw.db rm ~/.tinyclaw/tinyclaw.db-wal ~/.tinyclaw/tinyclaw.db-shm 2>/dev/null || true Assumptions: - 7-day retention is sufficient for debugging corruption causes - Daily backups are frequent enough (conversations are recoverable) - Storage is cheap (~1-5 MB per backup, 7 backups = ~35 MB max) - Manual recovery is acceptable (rare event, ops can handle) Risk: None (additive, no behavior changes) Testing: Run backup.sh, verify 7 daily backups exist and are readable
What: - Add outstanding_requests table with request_id, conversation_id, from_agent, to_agent - Add status field: pending | acked | responded | failed | escalated - Add deadline tracking: ack_deadline (5s default), response_deadline (5min default) - Add retry tracking: retry_count, max_retries (default 3) - Add 10 functions for request lifecycle management Why: This implements the primitive request-reply pattern with timeouts to solve the "ping pong" message drop problem. When agent A asks agent B to do something: 1. Create outstanding request with deadlines 2. Agent B must ACK (acknowledge) within timeout 3. Agent B must RESPOND with result within timeout 4. If deadlines expire → retry or escalate This is how distributed systems worked before fancy protocols - just timeouts and retries at the application level. Functions added: - createOutstandingRequest() - Create new request when handoff happens - acknowledgeRequest() - Agent B confirms receipt - respondToRequest() - Agent B provides result - failRequest() - Mark permanent failure - escalateRequest() - Escalate to human - getRequestsNeedingRetry() - Find expired pending requests - getRequestsNeedingEscalation() - Find expired acked requests - incrementRequestRetry() - Retry with new deadline - getRequest() - Lookup by ID - getPendingRequestsForConversation() - Get all pending for conv - pruneOldRequests() - Cleanup old completed requests Assumptions: - 5 second ACK timeout is reasonable for agent processing - 5 minute response timeout balances speed vs complex tasks - 3 retries before escalation is sufficient - SQLite is fast enough for this tracking (no separate service needed) Pattern compliance: - Uses same SQLite patterns (WAL, transactions, indexes) - Foreign key to conversations table with CASCADE delete - Timestamps in milliseconds (consistent with rest of codebase) - Debug/Warn logging for observability
… handoffs What: - Modify enqueueInternalMessage() to create outstanding request when agent A asks agent B to do something - Include request_id in the message payload as [REQUEST:xxx] prefix - Import createOutstandingRequest from db.ts Why: This is the integration point that actually uses the outstanding_requests table. Previously, agent handoffs were fire-and-forget - no tracking if agent responds. Now we create request with ACK deadline (5s) and response deadline (5min). The request_id in the message allows the receiving agent to acknowledge and respond. Assumptions: - Agents can parse [REQUEST:xxx] prefix from messages - 5 second ACK timeout is enough for agent to receive and parse - 5 minute response timeout is enough for agent to process task Breaking changes: - Internal messages now include [REQUEST:xxx] prefix - Backward compatible (old agents can ignore the prefix)
What: - Add checkRequestTimeouts() function to detect expired ACK and response deadlines - Import outstanding request functions from db.ts - Extract and acknowledge request_id from messages when agent receives them - Add request_escalated event for monitoring - Integrate timeout checking into periodic maintenance (every 5 min) Why: This completes the request-reply pattern implementation: 1. When agent A sends message to agent B, request is created with deadlines 2. When agent B receives message, request is acknowledged (ACK) 3. If no ACK within 5s → retry with extended deadline 4. If no response within 5min → escalate to human This prevents the ping pong drop problem by: - Detecting when agent B doesn't receive the message (no ACK) - Detecting when agent B receives but doesn't respond (timeout) - Escalating instead of silently dropping Assumptions: - Request ID is in format [REQUEST:xxx] at start of message - Agents can still process messages even with prefix (or we strip it) - 5 minute check interval is frequent enough for timeouts - Escalation is logged and emitted as event for monitoring Breaking changes: - None - this is additive monitoring on top of existing flow
What: - Import getPendingRequestsForConversation from db.ts - In handleTeamResponse, check if agent's response completes an outstanding request - If matching request found (same conversation, agent was target, status=acked), call respondToRequest to mark it complete Why: The request-reply pattern requires the response to be tracked. Previously we: 1. Created request when agent A mentioned agent B 2. Acknowledged when agent B received 3. But never marked complete when agent B responded This meant requests would stay in 'acked' state forever, eventually escalating even though the agent actually responded. Now when agent B responds, we find the matching request and mark it complete. Assumptions: - Agent responds within the same conversation - Only one pending request per agent per conversation (find() returns first) - Response content is stored in the request record for audit Risk: Low - additive check, doesn't change response handling
Delete duplicate log('INFO', ) line.
Was introduced when adding request ACK handling code.
Risk: None (deleting duplicate)
Testing: Verify log appears once per message
…Request Previously: - acknowledgeRequest checked ack_deadline >= now - respondToRequest checked response_deadline >= now Problem: If ACK/response arrives 1ms after deadline, silently fails. The timeout checker would retry, but agent already processed it. Fix: Remove deadline checks from write path. Let timeout checker handle expired requests. Accept valid work even if slightly late. Also added better logging for already-acked/already-responded cases. Risk: Low - timeout checker still runs, just won't reject late-but-valid ACKs Testing: Send request, wait for deadline, verify ACK still accepted
…e agent Previously: find() returned first match, potentially wrong request. Now: - getPendingRequestsForConversation() orders by created_at ASC (FIFO) - filter() returns all acked requests for the agent - All matching requests marked as responded If agent B responds, it's responding to everything it was asked. Risk: Low - marking more requests complete is safer than wrong one Testing: Have agent A mention agent B twice, verify both marked complete
… checks Changes: - Bump max_retries from 3 to 5 for request retries (gives recoverStaleMessages more time) - Add pruneOldRequests() to maintenance loop (hourly cleanup) - Run checkRequestTimeouts every 30s instead of 5min (faster failure detection) - Separate timeout checks from main maintenance interval Why: - 3 retries was too aggressive given 5min check interval - Old requests never got cleaned up (memory leak) - 5min check interval meant 5-10min delay detecting ACK timeouts Risk: Low - additive maintenance, conservative retry bump Testing: Verify timeout checks run every 30s, old requests pruned after 24h
What: - Add docs/AGENT_COMMUNICATION_PROTOCOL.md with full protocol documentation - Document database schema, state machine, API reference - Document integration points (enqueue, processing, response, timeout) - Add configuration reference, monitoring guide, troubleshooting - Add design decisions section explaining why primitive approach vs A2A/ACP - Update README.md to reference new documentation Why: The request-reply protocol is a significant architectural addition. Without documentation, future maintainers won't understand: - Why outstanding_requests table exists - How the timeout/escalation flow works - When to use which API function - How to debug issues This documentation ensures the knowledge persists. Assumptions: - Documentation should be comprehensive enough for new team members - Code examples should be copy-pasteable - Design decisions should be explained (not just what, but why) Risk: None (documentation only) Testing: Verify markdown renders correctly, links work
… ordering The visualizer relies on event ordering. Without await, chain_step_start can race with chain_step_done, causing UI to show stale state. Risk: None (consistent with other awaited emitEvent calls) Testing: Verify visualizer shows correct agent processing state
Previously: When agent B errored, outstanding request stayed in acked state, would eventually escalate via timeout checker. Now: When agent B errors, matching requests are proactively marked as failed. This gives failRequest() a caller and provides cleaner audit trail. Risk: Low - additive, marks state faster Testing: Trigger agent error, verify request marked failed not escalated
- Remove unused getRequest import - Move pruneOldRequests from 5-min to hourly interval (consistent with other prunes) Risk: None (cleanup only) Testing: Verify builds, no runtime changes
- Add error handling section documenting failRequest() usage - Update pruneOldRequests interval from 5min to 1 hour - Add failRequest() to API reference - Update integration points to include handleTeamError Risk: None (documentation only)
- Change repo from TinyAGI/tinyclaw to dpbmaverick98/tinyclaw - Change default branch from main to sql-experiment - Add --branch flag to git clone so it clones the correct branch Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Agents receiving teammate messages now see explicit instructions to respond using [@sender: reply] syntax, preventing responses from going directly to the user instead of back to the requesting agent. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Greptile SummaryThis PR introduces multi-agent teamwork capabilities to TinyClaw, replacing the sequential NATS-based orchestration with a parallel, SQLite-backed queue processor. The key improvements are: non-blocking parallel agent processing (fire-and-forget Issues found:
Confidence Score: 2/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant User
participant QP as Queue Processor
participant DB as SQLite DB
participant Leader as Leader Agent
participant TeamAgent as Team Agent(s)
participant Channel as Channel Client (Discord/TG/WA)
User->>QP: Message via channel API
QP->>DB: claimNextMessage()
QP->>DB: emitEvent('chain_step_start')
QP-->>Leader: invokeAgent() [fire-and-forget]
QP->>QP: return (non-blocking)
Leader-->>QP: response (async .then)
QP->>DB: persistResponse()
QP->>DB: createOutstandingRequest() [REQUEST:id prefix]
QP->>DB: enqueueMessage() for each [@teammate: msg]
QP->>DB: emitEvent('chain_handoff')
Note over QP,DB: Parallel: all team agents process simultaneously
QP->>DB: claimNextMessage() per agent
QP-->>TeamAgent: invokeAgent() [fire-and-forget, REQUEST:id in msg]
QP->>DB: acknowledgeRequest(requestId)
TeamAgent-->>QP: response (async .then)
QP->>DB: respondToRequest()
QP->>DB: decrementPendingInDb()
alt All branches complete (pending=0)
QP->>DB: markConversationCompleted()
QP->>DB: enqueueResponse() → signalChannel()
DB-->>Channel: fs.watch() fires (push notification)
Channel->>DB: checkOutgoingQueue()
Channel->>User: Deliver response
else More branches pending
QP->>DB: persistConversation()
end
Note over QP: Every 30s: checkRequestTimeouts()
QP->>DB: getRequestsNeedingRetry() / getRequestsNeedingEscalation()
QP->>DB: incrementRequestRetry() or escalateRequest()
|
| await emitEvent('chain_step_start', { agentId, agentName: agent.name, fromAgent: messageData.fromAgent || null }); | ||
|
|
||
| // --- No team context: simple response to user --- | ||
| if (!teamContext) { | ||
| let finalResponse = response.trim(); | ||
|
|
||
| // Detect files | ||
| const outboundFilesSet = new Set<string>(); | ||
| collectFiles(finalResponse, outboundFilesSet); | ||
| const outboundFiles = Array.from(outboundFilesSet); | ||
| if (outboundFiles.length > 0) { | ||
| finalResponse = finalResponse.replace(/\[send_file:\s*[^\]]+\]/g, '').trim(); | ||
| // Validate message size before invoking agent | ||
| const validationError = validateMessage(message); | ||
| if (validationError) { | ||
| log('ERROR', `Message validation failed: ${validationError}`); | ||
| failMessage(dbMsg.id, validationError); | ||
| return; | ||
| } | ||
|
|
||
| // Run outgoing hooks | ||
| const { text: hookedResponse, metadata } = await runOutgoingHooks(finalResponse, { channel, sender, messageId, originalMessage: rawMessage }); | ||
|
|
||
| // Handle long responses — send as file attachment | ||
| const { message: responseMessage, files: allFiles } = handleLongResponse(hookedResponse, outboundFiles); | ||
|
|
||
| enqueueResponse({ | ||
| channel, | ||
| sender, | ||
| senderId: dbMsg.sender_id ?? undefined, | ||
| message: responseMessage, | ||
| originalMessage: rawMessage, | ||
| messageId, | ||
| agent: agentId, | ||
| files: allFiles.length > 0 ? allFiles : undefined, | ||
| metadata: Object.keys(metadata).length > 0 ? metadata : undefined, | ||
| }); | ||
|
|
||
| log('INFO', `✓ Response ready [${channel}] ${sender} via agent:${agentId} (${finalResponse.length} chars)`); | ||
| emitEvent('response_ready', { channel, sender, agentId, responseLength: finalResponse.length, responseText: finalResponse, messageId }); | ||
| // Fire-and-forget: don't await invokeAgent | ||
| invokeAgent(agent, agentId, message, workspacePath, shouldReset, agents, teams) | ||
| .then(response => { | ||
| return handleSimpleResponse(dbMsg, agentId, response); | ||
| }) | ||
| .catch(error => { | ||
| const provider = agent.provider || 'anthropic'; | ||
| const providerLabel = provider === 'openai' ? 'Codex' : provider === 'opencode' ? 'OpenCode' : 'Claude'; | ||
| log('ERROR', `${providerLabel} error (agent: ${agentId}): ${(error as Error).message}`); | ||
| return handleSimpleResponse(dbMsg, agentId, "Sorry, I encountered an error processing your request. Please check the queue logs."); | ||
| }); | ||
|
|
||
| dbCompleteMessage(dbMsg.id); | ||
| // Return immediately - don't block queue | ||
| return; |
There was a problem hiding this comment.
chain_step_done event is never emitted
chain_step_start is emitted at line 449, but chain_step_done — which was present in the original code — is never emitted in the new fire-and-forget architecture. Neither handleSimpleResponse (lines 80–137) nor handleTeamResponse (lines 143–237) emit this event.
The PR description explicitly states: "Visualizer now receives events in correct order: chain_step_start → chain_step_done → team_chain_end" — but chain_step_done has been completely dropped from the refactored flow. Any visualizer or plugin listening for this event will never fire, breaking the step completion visibility.
chain_step_done should be emitted inside handleSimpleResponse and handleTeamResponse after the agent response is processed, for example:
await emitEvent('chain_step_done', { agentId, agentName: agent.name, responseLength: response.length, responseText: response });
scripts/remote-install.sh
Outdated
| GITHUB_REPO="dpbmaverick98/tinyclaw" | ||
| DEFAULT_BRANCH="sql-experiment" |
There was a problem hiding this comment.
Install script points to author's personal fork
After merging into TinyAGI/tinyclaw, the installer will still clone from dpbmaverick98/tinyclaw on branch sql-experiment instead of the upstream repo and main branch. Any user following the one-liner install in the README or PR description will be installed from the fork, not the official repo.
These values need to be reverted before merging:
| GITHUB_REPO="dpbmaverick98/tinyclaw" | |
| DEFAULT_BRANCH="sql-experiment" | |
| GITHUB_REPO="TinyAGI/tinyclaw" | |
| DEFAULT_BRANCH="main" |
| staleConvDetails.forEach(conv => { | ||
| log('WARN', ` - Team ${conv.teamId}, Conv ${conv.id}: stuck for ${(conv.duration / 60000).toFixed(1)} min`); | ||
| emitEvent('crash_recovery', { | ||
| conversationId: conv.id, | ||
| teamId: conv.teamId, | ||
| stuckForMs: conv.duration, | ||
| recoveredAt: new Date().toISOString(), | ||
| }); | ||
| }); |
There was a problem hiding this comment.
emitEvent called without await in synchronous startup and periodic interval contexts
The PR explicitly states it "fixed 5 missing await on emitEvent() calls" but there are still two places where emitEvent (now async) is called without await, creating fire-and-forget promises:
- Lines 616–621 — startup crash recovery
forEachloop (shown here) - Lines 719–725 — inside
checkRequestTimeouts(), afterescalateRequest() - Lines 738–744 — inside the
setIntervalperiodic maintenance callback
Because emitEvent is now async, unhandled rejections from event listeners won't be caught, and event ordering is not guaranteed at these call sites. The forEach callbacks should be converted to for...of loops to allow await, and the setInterval callbacks should be wrapped in async () => { ... } IIFEs.
The chain_step_done event was dropped during the fire-and-forget refactor, breaking the visualizer which listens for it to mark agents as "done". Added emission in both handleSimpleResponse and handleTeamResponse. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Changed GITHUB_REPO from personal fork to TinyAGI/tinyclaw and DEFAULT_BRANCH from sql-experiment to main. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Prevents unhandled promise rejections from silently swallowing errors in fire-and-forget event emissions (message_received, agent_routed, crash_recovery, request_escalated). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sequential cp of .db + .db-wal + .db-shm is not safe for a live WAL-mode database. Replaced with sqlite3 .backup which guarantees a consistent snapshot. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Schema declared DEFAULT 3 but createOutstandingRequest() always inserts 5. Updated schema to match the actual value and docs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
close as duplicated to #168 |

Summary
TinyClaw now supports real multi-agent teamwork — agents can work in parallel, talk to each other, chain tasks back and forth, and never drop a message. This turns tinyclaw from "multiple agents that
take turns" into "a team that actually collaborates."
What's New
1. Non-blocking parallel agent processing
Previously, messages were processed sequentially — if one agent took 30 seconds, everything else waited. Now every agent processes independently on its own event loop tick.
Send a message to a 5-agent team → all 5 start working simultaneously → responses aggregate as they come in. No head-of-line blocking.
2. Agent-to-agent communication
Agents can now talk to each other mid-conversation using the
[@teammate: message]syntax:User: "Build me a landing page and make sure it's accessible"
@leader receives message
→ [@coder: Build the landing page with these specs...]
→ [@Reviewer: Once coder is done, review for accessibility...]
@coder works on it → [@leader: Done, here's what I built...]
@Reviewer checks it → [@leader: Found 3 issues...]
@leader aggregates → responds to user with the full picture
Each handoff is a real routed message — the receiving agent gets full context and can respond back.
3. Agent-to-agent ping-pong (back-and-forth chains)
Agents can have multi-turn conversations with each other before responding to the user:
@sam: "[@wit: what's 2+2?]"
@wit: "[@sam: 4]"
@sam: "[@wit: now multiply by 3]"
@wit: "[@sam: 12]"
@sam → responds to user: "The answer is 12"
The system routes each message to the right agent with explicit instructions on how to respond back. Every internal message includes:
[Message from teammate @sam — respond using [@sam: your reply]]:
So agents always know WHO to reply to and the exact syntax to use.
4. Request-reply protocol with timeout tracking
Every agent-to-agent handoff is now tracked with deadlines:
This means:
5. Conversation persistence and crash recovery
Team conversations survive crashes. If the process dies mid-conversation:
6. Push-based response delivery
Responses reach channel clients (Discord/Telegram/WhatsApp) in microseconds via file-based push notifications instead of polling. When an agent finishes → signal file written → channel client picks it
up instantly.
7. Message validation
Messages are validated before sending to agents (1MB size limit). Bad messages fail fast instead of wasting an agent invocation and timing out.
8. Heartbeat monitoring
Queue processor writes a heartbeat every 5 seconds. Channel clients can detect crashes in ~15 seconds and surface it to users instead of silently hanging.
SQLite gives us ACID transactions, crash recovery, and queryable state — all in-process, zero infrastructure.
How agent-to-agent communication works end-to-end
"[@coder: build the feature] [@tester: prepare test cases]"
If any agent in the chain fails or times out, the system catches it — no silent drops.
What Changed
1. New SQLite Database Layer (
src/lib/db.ts)2. Queue Processor (
src/queue-processor.ts)Replaces
orchestrator.ts+nats/agent-consumer.ts+nats/client-consumer.ts:3. Agent Communication Protocol (
docs/AGENT_COMMUNICATION_PROTOCOL.md)Request-reply pattern for agent-to-agent handoffs:
[REQUEST:id]prefix on internal messages for tracking4. Push Notifications (
src/lib/signals.ts)Replaces HTTP polling with file-based signaling:
~/.tinyclaw/signals/{channel}.readyfs.watch()→ microsecond-level latency5. Heartbeat Monitoring (
src/lib/heartbeat.ts)heartbeat.jsonevery 5 seconds (timestamp, PID, uptime)6. Database Backup Strategy (
backup.sh)7. Agent Response Routing Fix (
AGENTS.md+queue-processor.ts)[Message from teammate @sam — respond using [@sam: your reply]][@agent: reply]syntax8. Event Ordering Guarantees
awaitonemitEvent()calls in team conversation flowchain_step_start→chain_step_done→team_chain_endTest Plan
@sam ask wit to do 2+2 and report back, then multiply by 2One-liner Install
curl -fsSL https://raw.githubusercontent.com/dpbmaverick98/tinyclaw/sql-experiment/scripts/remote-install.sh | bash🤖 Generated with Claude Code