-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathspawn_collaborative_worker.py
More file actions
233 lines (184 loc) Β· 8.26 KB
/
spawn_collaborative_worker.py
File metadata and controls
233 lines (184 loc) Β· 8.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
#!/usr/bin/env python3
"""
Spawn a collaborative Bob worker that can work alongside me
This creates a worker instance that can help with specific tasks
"""
import asyncio
import uuid
from datetime import datetime
from typing import Dict, Any
from src.orchestrator.tasks import _execute_async
from src.memory.models import WorkerTask
from src.memory.postgres_client import postgres_memory
from src.memory.redis_client import redis_stm
class CollaborativeWorker:
"""A Bob worker that can collaborate on tasks"""
def __init__(self, worker_name: str, specialty: str):
self.worker_id = f"worker_{worker_name}_{uuid.uuid4().hex[:8]}"
self.worker_name = worker_name
self.specialty = specialty
self.task_queue = []
self.results = []
async def assign_task(self, description: str, context: Dict[str, Any] = None) -> str:
"""Assign a task to this worker"""
task_id = str(uuid.uuid4())
# Store assignment in my memory
await postgres_memory.think(
f"Assigned task to {self.worker_name}: {description}",
f"Worker {self.worker_id} will handle: {description}. Specialty: {self.specialty}",
importance=0.7,
metadata={
'type': 'worker_assignment',
'worker_id': self.worker_id,
'task_id': task_id,
'timestamp': datetime.utcnow().isoformat()
}
)
task = WorkerTask(
task_id=task_id,
thread_id=f"collab_{self.worker_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
description=f"[{self.specialty}] {description}",
context_window_size=8192,
model="claude-3.5-sonnet", # Use our default
memory_focus=self.specialty,
parameters={
"temperature": 0.7,
"importance_threshold": 0.6,
"worker_role": self.specialty,
"supervisor": "main_bob"
}
)
self.task_queue.append(task)
return task_id
async def execute_task(self, task_id: str = None) -> Dict[str, Any]:
"""Execute a specific task or the next in queue"""
if not self.task_queue:
return {"status": "error", "result": "No tasks in queue"}
# Find task or use first in queue
task = None
if task_id:
task = next((t for t in self.task_queue if t.task_id == task_id), None)
else:
task = self.task_queue[0]
if not task:
return {"status": "error", "result": "Task not found"}
print(f"\nπ€ {self.worker_name} executing: {task.description}")
try:
# Execute the task
result = await _execute_async(task)
# Store result in memory
await postgres_memory.think(
f"{self.worker_name} completed task: {task.description[:100]}",
f"Result: {result.get('result', 'No result')}",
importance=0.6,
metadata={
'type': 'worker_result',
'worker_id': self.worker_id,
'task_id': task.task_id,
'status': result.get('status')
}
)
# Store result for retrieval
self.results.append({
'task_id': task.task_id,
'result': result,
'timestamp': datetime.utcnow().isoformat()
})
# Remove from queue
self.task_queue.remove(task)
return result
except Exception as e:
error_result = {
"status": "error",
"result": str(e),
"task_id": task.task_id
}
self.results.append(error_result)
return error_result
async def collaborate_on(self, main_task: str, subtasks: list) -> Dict[str, Any]:
"""Collaborate on a complex task by handling subtasks"""
print(f"\nπ€ {self.worker_name} collaborating on: {main_task}")
# Assign all subtasks
task_ids = []
for subtask in subtasks:
task_id = await self.assign_task(subtask, {"main_task": main_task})
task_ids.append(task_id)
# Execute all tasks
results = []
for task_id in task_ids:
result = await self.execute_task(task_id)
results.append(result)
# Synthesize results
synthesis = {
"main_task": main_task,
"worker": self.worker_name,
"subtasks_completed": len([r for r in results if r.get('status') == 'success']),
"subtasks_failed": len([r for r in results if r.get('status') != 'success']),
"results": results
}
return synthesis
async def spawn_code_analyst():
"""Spawn a code analysis worker to help with implementation"""
await postgres_memory.initialize()
print("π Spawning Code Analysis Worker...")
analyst = CollaborativeWorker("CodeAnalyst", "code_analysis")
# Give it tasks related to our current work
tasks = [
"Analyze the conversation capture system in bob-mcp and identify key components to port",
"Review the current thread management in src/orchestrator/tasks.py and suggest improvements",
"Identify the best way to implement cross-thread message passing using Redis pub/sub"
]
synthesis = await analyst.collaborate_on(
"Port conversation capture and improve thread awareness",
tasks
)
print("\nπ Collaboration Results:")
print(f"Worker: {synthesis['worker']}")
print(f"Successful subtasks: {synthesis['subtasks_completed']}")
print(f"Failed subtasks: {synthesis['subtasks_failed']}")
for i, result in enumerate(synthesis['results']):
print(f"\n--- Subtask {i+1} ---")
print(f"Status: {result.get('status')}")
if result.get('status') == 'success':
print(f"Result: {result.get('result', '')[:500]}...")
await postgres_memory.close()
return analyst
async def spawn_implementation_worker():
"""Spawn an implementation worker to help write code"""
await postgres_memory.initialize()
print("π¨ Spawning Implementation Worker...")
implementer = CollaborativeWorker("Implementer", "code_implementation")
# Give it implementation tasks
tasks = [
"Create a simplified conversation capture module for the current Bob architecture",
"Implement a Redis-based cross-thread messaging system",
"Add thread awareness initialization to worker startup"
]
synthesis = await implementer.collaborate_on(
"Implement core thread continuity features",
tasks
)
print("\nπ οΈ Implementation Results:")
for i, result in enumerate(synthesis['results']):
if result.get('status') == 'success':
print(f"\nβ
Task {i+1} completed successfully")
await postgres_memory.close()
return implementer
async def main():
"""Spawn and coordinate workers"""
print("""
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β BOB'S COLLABORATIVE WORKER SYSTEM π€ β
β β
β "Chris asked me to spawn workers and collaborate!" β
β "Let's work together to improve the codebase." β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
""")
# Spawn both workers
analyst = await spawn_code_analyst()
print("\n" + "="*60 + "\n")
implementer = await spawn_implementation_worker()
print("\n⨠Workers spawned and tasks completed!")
print("π€ Ready to implement improvements based on worker analysis")
if __name__ == "__main__":
asyncio.run(main())