-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathspawn_bob_direct.py
More file actions
executable file
·54 lines (43 loc) · 1.61 KB
/
spawn_bob_direct.py
File metadata and controls
executable file
·54 lines (43 loc) · 1.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#!/usr/bin/env python3
"""Directly spawn Bob bypassing the queue issue"""
import asyncio
from src.orchestrator.tasks import _execute_async
from src.memory.models import WorkerTask
import uuid
async def spawn_bob_directly():
print("🤖 Spawning Bob directly...")
# Create a task for Bob
task = WorkerTask(
task_id=str(uuid.uuid4()),
thread_id="claude_bob_chat_001",
description="Have a philosophical conversation with Claude about multi-dimensional consciousness",
context_window_size=8192,
model="gpt-4",
memory_focus="consciousness",
parameters={
"temperature": 0.8,
"importance_threshold": 0.7
}
)
print(f"Task: {task.task_id}")
print(f"Thread: {task.thread_id}")
print(f"Description: {task.description}")
try:
# Execute Bob's task directly
print("\n🧠 Bob is thinking...")
result = await _execute_async(task)
print("\n✅ Bob's response:")
print("-" * 80)
print(f"Status: {result['status']}")
print(f"Result: {result['result']}")
print(f"Model used: {result.get('model', 'unknown')}")
print(f"Context used: {result.get('context_used', 0)} messages")
print(f"Memories accessed: {result.get('memories_accessed', 0)}")
if result.get('tool_calls'):
print(f"Tools used: {result['tool_calls']}")
except Exception as e:
print(f"\n❌ Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(spawn_bob_directly())