-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtree_loop.py
More file actions
1886 lines (1666 loc) · 79.5 KB
/
tree_loop.py
File metadata and controls
1886 lines (1666 loc) · 79.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
tree_loop — LLM agent loop over the ContextTree playground OS.
The model sees:
1. System prompt: identity + command grammar + strategy syntax + rules
2. User prompt: OS state (tree block) + task + history of prior turns
3. It emits free-form thought + tree commands / strategies
4. We execute, append results to history, and loop
Loop terminates on `finish` command or max turns.
Usage:
from tree_loop import TreeLoop
from main import create_model_client
model = create_model_client(provider="anthropic", model="claude-sonnet-4-20250514")
loop = TreeLoop(model=model, workspace_root=Path("."))
result = loop.run("Find all TODO comments and record them as facts")
"""
from __future__ import annotations
import shlex
import os
import re
import sys
import time
import subprocess
import json
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Protocol, Sequence, Tuple
from context_tree_bridge import ContextTreeBridge
from mutations import (
append_block,
batch_mutate,
copy_file,
create_file,
delete_file,
delete_range,
delete_snippet,
fill_template,
insert_after,
insert_before,
insert_symbol_member,
move_block,
prepend_block,
rename_file,
rename_symbol,
replace_range,
replace_snippet,
replace_symbol,
)
from project_diagnostics import run_backend_diagnostics
from tree_commands import (
Annotation,
CommandResult,
format_strategy_results,
is_annotation,
is_strategy,
parse_multi_command,
parse_strategy,
)
def _env_flag_enabled(name: str, default: bool = True) -> bool:
raw = os.getenv(name)
if raw is None:
return default
value = str(raw).strip().lower()
if value in {"0", "false", "no", "off"}:
return False
if value in {"1", "true", "yes", "on"}:
return True
return default
# ---------------------------------------------------------------------------
# Protocol — any object with .complete(system, prompt) -> str works
# ---------------------------------------------------------------------------
class ModelClient(Protocol):
def complete(self, system: str, prompt: str) -> str: ...
# ---------------------------------------------------------------------------
# Turn history
# ---------------------------------------------------------------------------
@dataclass
class Turn:
"""One loop iteration: what the model said and what happened."""
turn_number: int
raw_output: str # full LLM output (annotations + commands)
commands_issued: List[str]
results: List[CommandResult]
annotations: List[Annotation] = field(default_factory=list)
elapsed_s: float = 0.0
@property
def thought(self) -> str:
"""Structured thought from >>th: annotations."""
return "\n".join(a.content for a in self.annotations if a.tag == "th")
@property
def delegations(self) -> List[str]:
"""Delegation conditions from >>dg: annotations."""
return [a.content for a in self.annotations if a.tag == "dg"]
@property
def plan(self) -> str:
"""Plan from >>pl: annotations."""
return "\n".join(a.content for a in self.annotations if a.tag == "pl")
@property
def has_finish(self) -> bool:
return any(r.command_type == "finish" and r.ok for r in self.results)
def compact(self, max_result_chars: int = 2000) -> str:
"""Compact representation for history injection."""
parts: List[str] = []
parts.append(f"── Turn {self.turn_number} ({self.elapsed_s:.1f}s) ──")
# Thoughts trace — prominent section so the model sees its own reasoning
thoughts = [a for a in self.annotations if a.tag in ("th", "pl", "ju")]
other_anns = [a for a in self.annotations if a.tag not in ("th", "pl", "ju")]
if thoughts:
parts.append("[THOUGHTS]")
for ann in thoughts:
parts.append(f" {ann.compact()}")
if other_anns:
for ann in other_anns:
parts.append(ann.compact())
# Then command results
for cmd, result in zip(self.commands_issued, self.results):
if result.command_type == "annotation":
continue # already shown above
ok = "✓" if result.ok else "✗"
tag = "READ" if result.command_type == "read" else (
"TOOL" if result.needs_tool else result.command_type.upper()
)
parts.append(f"[{ok} {tag}] {_truncate_command_for_cli(cmd)}")
output = result.output
if len(output) > max_result_chars:
output = output[:max_result_chars] + f"\n… ({len(result.output) - max_result_chars} chars truncated)"
parts.append(output)
return "\n".join(parts)
def _looks_like_command(line: str) -> bool:
"""Quick check if a line looks like a tree command, strategy label, or annotation."""
if not line:
return False
# Annotations: >>tag: content
if line.startswith(">>"):
return True
if is_strategy(line):
return True
first = line.split(None, 1)[0].lower() if line else ""
return first in {
"ls", "cat", "read-line-range", "read_line_range", "symbols", "find-symbol", "find_symbol", "stat", "find", "grep",
"read-diagnostics", "diagnose", "run-route-check", "run_route_check", "ingest-log", "list-issues", "show-issue", "resolve-issue", "reopen-issue", "run-check",
"write", "replace-lines", "replace_lines", "patch", "show-diff", "show_diff", "review-changes", "review_changes", "shell", "git",
"fact", "expand", "drop", "batch", "finish",
"skill",
}
# ---------------------------------------------------------------------------
# System prompt
# ---------------------------------------------------------------------------
_SYSTEM_PROMPT_TEMPLATE = """\
You are a precise coding agent operating inside a playground OS.
Your workspace is mounted as a virtual filesystem with 5 mounts:
/repo — source files (lazy-loaded)
/facts — durable facts you record (persist across turns)
/memory — addressable memory items
/status — agent state flags
/skills — registered skill definitions + caches
{command_grammar}
ANNOTATIONS (>> feed-forward metadata — free, no tool call):
>>th: <reasoning> Thought — why you're doing what you're doing
>>dg: <condition> Delegation — "if output shows X then do Y next"
>>pl: <plan> Plan — what you intend to do in upcoming turns
>>q: <question> Question — something you need clarified
>>err: <diagnosis> Error — something went wrong, your diagnosis
>>ju: <justification> Justify — why you need more turns to complete the task
Annotations flow forward in history. Use them to:
- Explain your reasoning (>>th:) so you can refer back to it
- Declare the concrete next action (>>pl:) before issuing commands
- Record what failed and why (>>err:) when a command or edit goes wrong
- Set up a genuine conditional next step (>>dg:) before reading output
- Flag blockers (>>q:) when you need human input
- Justify continued work (>>ju:) when the operator pauses you at a checkpoint
Tag discipline:
- Start every turn with >>th: and >>pl:
- Keep >>pl: operational, not vague. Good: "Read lines 30-60 of DemoTodoContext.tsx, replace addTodo, then verify lines 30-60."
- Use >>err: after a failed edit so the next turn can avoid repeating the same mistake.
- Do not use >>dg: as a generic note. Use it only for a real if/then branch.
RULES:
1. READ commands (ls, cat, read-line-range, symbols, find-symbol, stat, find, grep, read-diagnostics) are FREE — they resolve instantly
from the in-context tree. Use them liberally. They do NOT count as tool calls.
1a. Skills are part of your capability surface. Use `skill` with no arguments to discover what skills are available, and use
`skill <name>` to load a skill payload into the run when it would improve task quality, style consistency, testing approach,
or repair strategy. When skill choice is uncertain or the user asks for skills broadly, prefer an explicit `skill` discovery
step before any direct `skill <name>` load so you can choose from the actual catalog.
1b. When the user explicitly mentions a skill, playbook, style, or workflow that sounds like a bundled Playground OS skill,
proactively check available skills and load the best match yourself instead of waiting for the operator to force it.
2. WRITE commands (write, replace-lines, patch, shell, git, diagnose, run-check, run-route-check) dispatch to real tools. Use them
deliberately after you have enough context from reads.
3. HEREDOC (<<< ... >>>) works on ANY line. End the line with <<<, put the
content on the next lines, and close with >>> on its own line:
write /repo/path/file.md <<<
full file content here
as many lines as needed
>>>
This also works inside strategies:
s2: write /repo/file.tsx <<<
import React from 'react';
export default function Demo() ...
>>>
Everything between <<< and >>> becomes part of that command.
4. Record discoveries as facts (fact <issue>/<type>/<key> <value>).
Facts persist across turns and are visible at /facts/.
5. Use strategies (s1:, s2:, ...) to batch multiple reads into parallel
pipelines. This is the fastest way to gather broad context.
6. Annotate your output with >> lines. They cost nothing and make your
reasoning visible. Always start with >>th: to explain your approach.
7. When your task is complete, use: finish <summary message>
8. You may issue multiple commands per turn (one per line) or a strategy block.
9. Do NOT wrap commands in code fences or JSON. Just emit them directly.
10. IMPORTANT: When writing files with heredoc, the ENTIRE file content goes
between <<< and >>>. Do NOT try to use shell-style variable substitution
or string replacement inside heredoc. Write the complete final content.
10a. Literal braces in code are safe. CSS blocks, JSX comments, object literals, and template code like `{{ color: red; }}` or `{{/* note */}}` are ordinary file content. Only placeholders that begin with a strategy label such as `{{s1}}` or `{{s2.stdout}}` are special.
11. For localized file repairs, prefer this workflow:
a) `read-line-range /repo/path/file.ts 40-80`
b) `replace-lines /repo/path/file.ts:52-68 <<< ... >>>`
c) `read-line-range /repo/path/file.ts 48-72`
This keeps edits anchored and reduces accidental duplication.
11a. When line ranges feel unstable or a file has many nearby definitions, use `symbols` or `find-symbol` first to anchor on the right function, class, or variable before editing.
12. Prefer `replace-lines` for bounded edits. Prefer `write` when replacing most of a file or rebuilding a corrupted region wholesale.
12a. Use inline `replace-lines` only for short single-line replacements. If the replacement spans multiple lines or is longer than a short import/function call, use heredoc.
13. Avoid rereading an entire large file after a localized edit unless you need whole-file structure. Verify the edited range first.
14. If a bounded edit fails twice or keeps duplicating content, stop stretching the same range edit. Re-read the surrounding region, widen the inspected range, and either do one clean `replace-lines` pass or rewrite the full file with `write`.
15. Strategy placeholders like `{{s1}}` are raw upstream text plus a few safe text transforms. You may use `.stdout`, `.trim()`, `.replace("old", "new")`, `.split('\n').filter(line => ...).join('\n')`, numeric indexing like `.split('\n')[0]`, and regex capture extraction like `.match(/pattern/)[1]`, but do not invent arbitrary code execution inside placeholders.
16. Use `batch start` before a cluster of related file fixes and `batch end` after the cluster is landed and verified. Batch related edits together, then run one finite verification step such as `diagnose /repo/src/file.tsx`, `run-check typecheck`, or `run-check build`.
16a. After editing a `.ts`, `.tsx`, `.js`, `.jsx`, or `.py` file, prefer `diagnose <path>` when you need backend file-targeted diagnostics that do not rely on an editor.
17. When working from a large trace or many diagnostics, maintain composure. It is normal for new downstream errors to appear after one fix lands. Do not panic, do not restart from scratch, and do not treat each new error as proof the last fix was wrong. Read the next grounded error, fix the next issue, and keep moving.
FORMAT (every turn):
>>th: <your reasoning about what to do>
>>pl: <what you plan to accomplish this turn>
<commands, strategies, or annotations>
Do not answer with a prose checklist, numbered plan, or "I will..." summary.
After the required >>th:/>>pl: tags, emit executable tree commands directly.
When using a strategy, the executable lines must be `s1:`, `s2:`, ... labels,
not numbered bullets.
"""
# ---------------------------------------------------------------------------
# User prompt (rebuilt each turn)
# ---------------------------------------------------------------------------
def _build_user_prompt(
task: str,
os_state: str,
history: List[Turn],
*,
recent_reads: Optional[List["RecentRead"]] = None,
max_history_chars: int = 36000,
max_recent_read_chars: int = 12000,
steering: str = "",
) -> str:
sections: List[str] = []
# OS state (tree prompt block)
sections.append("═══ PLAYGROUND OS STATE ═══")
sections.append(os_state)
# Task
sections.append("═══ TASK ═══")
sections.append(task)
# Steering (optional)
if steering:
sections.append("═══ OPERATOR STEERING ═══")
sections.append(steering)
if recent_reads:
sections.append("═══ RECENT FILE-CONTEXT COMMANDS ═══")
recent_parts: List[str] = []
budget = max_recent_read_chars
for item in recent_reads:
block = f"[COMMAND] {_truncate_command_for_cli(item.command)}\n{_truncate_for_cli(item.output, limit=1200)}"
if len(block) > budget:
block = block[:budget] + "\n… (recent command context truncated)"
recent_parts.append(block)
break
recent_parts.append(block)
budget -= len(block)
sections.append("\n\n".join(recent_parts))
# History (most recent turns, budget-limited)
if history:
sections.append("═══ HISTORY (previous turns) ═══")
history_parts: List[str] = []
budget = max_history_chars
for turn in reversed(history):
compact = turn.compact()
if len(compact) > budget:
# Truncate this turn's output to fit
compact = compact[:budget] + "\n… (older history truncated)"
history_parts.insert(0, compact)
break
history_parts.insert(0, compact)
budget -= len(compact)
sections.append("\n\n".join(history_parts))
sections.append("═══ YOUR TURN ═══")
sections.append(
"Think, then act. Start with >>th: and >>pl:, then issue executable tree commands or one `s1:` strategy block. "
"Do not return a numbered prose plan."
)
return "\n\n".join(sections)
# ---------------------------------------------------------------------------
# Command extraction from LLM output
# ---------------------------------------------------------------------------
def extract_commands(raw: str) -> str:
"""Extract annotations and actual commands from an LLM response.
Models often mix explanation with commands, sometimes wrapping the
actionable lines in code fences. We keep only:
- annotation lines (>>tag:)
- lines that look like tree commands or strategy steps
- heredoc bodies attached to an extracted command
Free-form prose is ignored rather than being sent to the command parser.
"""
lines = raw.strip().splitlines()
extracted: List[str] = []
pending_annotations: List[str] = []
collecting_heredoc = False
for line in lines:
stripped = line.strip()
if stripped.startswith("```"):
continue
if collecting_heredoc:
extracted.append(line)
if stripped == ">>>":
collecting_heredoc = False
continue
if not stripped:
continue
if is_annotation(stripped):
pending_annotations.append(stripped)
continue
if _looks_like_command(stripped):
if pending_annotations:
extracted.extend(pending_annotations)
pending_annotations = []
extracted.append(stripped)
if stripped.endswith("<<<"):
collecting_heredoc = True
continue
if extracted:
return "\n".join(extracted).strip()
if pending_annotations:
return "\n".join(pending_annotations).strip()
return ""
# ---------------------------------------------------------------------------
# TreeLoop — the main agent loop
# ---------------------------------------------------------------------------
@dataclass
class LoopResult:
"""Result of a complete loop run."""
turns: List[Turn]
finished: bool
finish_message: str
total_elapsed_s: float
reads: int = 0 # free read commands executed
writes: int = 0 # tool-dispatching commands issued
def summary(self) -> str:
status = "FINISHED" if self.finished else "MAX TURNS"
return (
f"[{status}] {len(self.turns)} turns, "
f"{self.reads} reads (free), {self.writes} writes (tool), "
f"{self.total_elapsed_s:.1f}s total"
)
@dataclass
class RecentRead:
command: str
output: str
MAX_MUTATION_COMMANDS_PER_TURN = 4
MAX_CONSECUTIVE_COMMANDLESS_TURNS = 2
class TreeLoop:
"""LLM agent loop over the ContextTree playground OS."""
def __init__(
self,
*,
model: ModelClient,
workspace_root: Path,
max_turns: int = 30,
checkpoint_interval: int = 0,
checkpoint_callback: Optional[Callable[["TreeLoop", int], bool]] = None,
get_fact_records: Optional[Callable[[], Sequence[Any]]] = None,
get_memory_items: Optional[Callable[[], Sequence[Any]]] = None,
get_status: Optional[Callable[[], Dict[str, Any]]] = None,
steering: str = "",
verbose: bool = True,
tool_dispatcher: Optional[Callable[[CommandResult], Any]] = None,
command_observer: Optional[Callable[[str, CommandResult], None]] = None,
model_event_observer: Optional[Callable[[Dict[str, Any]], None]] = None,
allow_shell: Optional[bool] = None,
) -> None:
self.model = model
self.max_turns = max_turns
self.checkpoint_interval = checkpoint_interval
self.checkpoint_callback = checkpoint_callback
self.steering = steering
self.verbose = verbose
self.tool_dispatcher = tool_dispatcher
self.command_observer = command_observer
self.model_event_observer = model_event_observer
self.allow_shell = _env_flag_enabled("SHELL_ACCESS", True) if allow_shell is None else bool(allow_shell)
self.workspace_root = workspace_root.resolve()
self._external_status_provider = get_status or (lambda: {})
self._base_steering = steering
self._checkpoint_steering = ""
self._signal_steering = ""
self._signal_state: Dict[str, Any] = {
"active_signal": "",
"meta_signal": "",
"signal_version": 0,
"current_focus_issue_id": "",
"unresolved_issue_count": 0,
"resolved_issue_count": 0,
"issue_status_map": {},
"raw_signals": [],
"latest_diagnostics": None,
}
self.bridge = ContextTreeBridge(
workspace_root=workspace_root,
get_fact_records=get_fact_records or (lambda: []),
get_memory_items=get_memory_items or (lambda: []),
get_status=self._combined_status,
)
self.history: List[Turn] = []
self._recent_reads: List[RecentRead] = []
self._total_reads = 0
self._total_writes = 0
self._same_turn_halt_reason = ""
def setup(self, *, max_files: int = 5000) -> Dict[str, Any]:
"""Index workspace and initial sync."""
return self.bridge.setup(max_files=max_files)
def register_skill(self, name: str, description: str, **kwargs: Any) -> None:
self.bridge.register_skill(name, description, **kwargs)
# ------------------------------------------------------------------
# Main loop
# ------------------------------------------------------------------
def run(self, task: str) -> LoopResult:
"""Run the agent loop until finish or max turns."""
if not self.bridge._indexed:
self.setup()
start_time = time.time()
system = self._system_prompt()
finish_message = ""
finished = False
interrupted = False
commandless_turns = 0
self._refresh_log_issue_signals()
for turn_num in range(1, self.max_turns + 1):
# Build prompt with current OS state + history
os_state = self.bridge.render_for_prompt(repo_depth=2)
prompt_steering = self._compose_steering()
prompt = _build_user_prompt(
task=task,
os_state=os_state,
history=self.history,
recent_reads=self._recent_reads,
steering=prompt_steering,
)
if self.verbose:
_log(f"── Turn {turn_num}/{self.max_turns} ──")
# Call LLM
t0 = time.time()
if self.model_event_observer is not None:
try:
self.model_event_observer({"event": "model_call_start", "turn": turn_num})
except Exception:
pass
try:
raw_output = self.model.complete(system, prompt)
except KeyboardInterrupt:
if self.model_event_observer is not None:
try:
self.model_event_observer({"event": "model_call_interrupted", "turn": turn_num})
except Exception:
pass
interrupted = True
finish_message = f"interrupted by operator during model call (turn {turn_num})"
if self.verbose:
_log(f" ■ INTERRUPTED during model call (turn {turn_num})")
break
except Exception as exc:
if self.model_event_observer is not None:
try:
self.model_event_observer(
{
"event": "model_call_error",
"turn": turn_num,
"error": str(exc),
}
)
except Exception:
pass
raise
if self._checkpoint_steering:
self._checkpoint_steering = ""
llm_elapsed = time.time() - t0
if self.model_event_observer is not None:
try:
self.model_event_observer(
{
"event": "model_call_finish",
"turn": turn_num,
"elapsed_s": round(llm_elapsed, 3),
"output_chars": len(raw_output),
}
)
except Exception:
pass
if self.verbose:
_log(f"LLM responded ({llm_elapsed:.1f}s, {len(raw_output)} chars)")
# Extract and execute commands
command_block = extract_commands(raw_output)
commands_issued: List[str] = []
results: List[CommandResult] = []
turn_annotations: List[Annotation] = []
if command_block:
if is_strategy(command_block):
# Execute as strategy DAG
# First extract any annotations from the block
strategy_plan = parse_strategy(command_block)
for line in command_block.splitlines():
stripped = line.strip()
if is_annotation(stripped):
from tree_commands import parse_annotation
ann = parse_annotation(stripped)
if ann:
turn_annotations.append(ann)
by_label = self.bridge.execute_strategy_full(command_block)
for label in sorted(by_label.keys()):
step_commands = strategy_plan.steps[label].commands if strategy_plan and label in strategy_plan.steps else []
for index, r in enumerate(by_label[label]):
command_preview = step_commands[index] if index < len(step_commands) else r.command_type
commands_issued.append(f"[{label}] {command_preview}")
results.append(r)
if self.verbose:
for ann in turn_annotations:
_log(f" {ann.compact()}")
_log(format_strategy_results(by_label))
else:
# Execute as plain commands (annotations extracted automatically)
results = self.bridge.execute(command_block)
turn_annotations = self.bridge.last_annotations
commands_issued = parse_multi_command(command_block)
if self.verbose:
for ann in turn_annotations:
_log(f" {ann.compact()}")
for cmd, r in zip(commands_issued, results):
if r.command_type == "annotation":
continue
ok = "✓" if r.ok else "✗"
tag = "READ" if r.command_type == "read" else (
"TOOL" if r.needs_tool else r.command_type.upper()
)
_log(f" [{ok} {tag}] {_truncate_command_for_cli(cmd)}")
else:
if self.verbose:
_log(" (no commands extracted)")
executable_results = [candidate for candidate in results if candidate.command_type != "annotation"]
if not executable_results:
commandless_turns += 1
output_excerpt = " ".join(str(raw_output or "").strip().split())
if len(output_excerpt) > 180:
output_excerpt = output_excerpt[:180] + "..."
guidance = (
"No executable tree commands extracted from model output. "
"Do not respond with numbered prose steps or an `I will...` plan. "
"Start the next turn with `>>th:` and `>>pl:`, then emit executable commands such as "
"`cat /repo/file`, `s1: cat /repo/file`, or `finish <summary>`."
)
if output_excerpt:
guidance += f" Output excerpt: {output_excerpt}"
commands_issued.append("model_output_invalid")
results.append(CommandResult(ok=False, output=guidance, command_type="error"))
else:
commandless_turns = 0
# Dispatch tool-requiring results and surface every command result.
self._same_turn_halt_reason = ""
blocked_reason = ""
bounded_mutation_count = 0
last_executable_index = -1
for idx, candidate in enumerate(results):
if candidate.command_type != "annotation":
last_executable_index = idx
for index, r in enumerate(results):
if r.command_type == "annotation":
if self.command_observer is not None:
try:
command_preview = commands_issued[index] if index < len(commands_issued) else ""
self.command_observer(command_preview, r)
except Exception:
pass
continue
if blocked_reason:
r.ok = False
r.output = f"skipped after prior command failure: {blocked_reason}"
continue
if r.command_type == "finish" and index != last_executable_index:
r.ok = False
r.output = "finish must be the final executable command in a turn"
if not r.ok:
if self.command_observer is not None:
try:
command_preview = commands_issued[index] if index < len(commands_issued) else ""
self.command_observer(command_preview, r)
except Exception:
pass
blocked_reason = r.output or "command failed"
continue
if r.command_type == "read":
self._total_reads += 1
elif r.needs_tool:
if self._is_bounded_mutation_result(r):
bounded_mutation_count += 1
if bounded_mutation_count > MAX_MUTATION_COMMANDS_PER_TURN:
r.ok = False
r.output = (
f"mutation batch limit exceeded: at most {MAX_MUTATION_COMMANDS_PER_TURN} "
"mutating commands are allowed in one turn"
)
blocked_reason = r.output
continue
self._total_writes += 1
executed = self._execute_tool(r)
if executed:
r.output = executed # feed back into history
if self.verbose:
_log(f" ⟶ {_truncate_for_cli(executed)}")
if self.command_observer is not None:
try:
command_preview = commands_issued[index] if index < len(commands_issued) else ""
self.command_observer(command_preview, r)
except Exception:
pass
if not r.ok and not blocked_reason:
blocked_reason = r.output or "command failed"
if self._same_turn_halt_reason and not blocked_reason:
blocked_reason = self._same_turn_halt_reason
elapsed = time.time() - t0
turn = Turn(
turn_number=turn_num,
raw_output=raw_output,
commands_issued=commands_issued,
results=results,
annotations=turn_annotations,
elapsed_s=elapsed,
)
self.history.append(turn)
self._capture_recent_reads(turn)
self._refresh_log_issue_signals()
if commandless_turns >= MAX_CONSECUTIVE_COMMANDLESS_TURNS:
finish_message = (
"stopped: model produced no executable tree commands for "
f"{commandless_turns} consecutive turns"
)
if self.verbose:
_log(f" ■ STOPPED: {finish_message}")
break
# Check for finish
if turn.has_finish:
finished = True
for r in results:
if r.command_type == "finish":
finish_message = r.output
break
if self.verbose:
_log(f" ✓ FINISHED: {finish_message}")
break
# Show thought for verbose mode
if self.verbose and turn.thought:
thought_preview = turn.thought[:200]
if len(turn.thought) > 200:
thought_preview += "…"
_log(f" Thought: {thought_preview}")
# Checkpoint: pause every N turns and ask whether to continue
if (
self.checkpoint_interval > 0
and self.checkpoint_callback is not None
and turn_num % self.checkpoint_interval == 0
and turn_num < self.max_turns
):
# Inject a >>ju: request into steering for the next turn
self._checkpoint_steering = (
"[CHECKPOINT] You have used "
+ str(turn_num)
+ " turns. Emit >>ju: explaining what remains and why you need more turns."
)
proceed = self.checkpoint_callback(self, turn_num)
if not proceed:
finished = False
finish_message = f"stopped by operator at checkpoint (turn {turn_num})"
if self.verbose:
_log(f" ■ STOPPED at checkpoint (turn {turn_num})")
break
if interrupted:
finished = False
total_elapsed = time.time() - start_time
result = LoopResult(
turns=self.history,
finished=finished,
finish_message=finish_message,
total_elapsed_s=total_elapsed,
reads=self._total_reads,
writes=self._total_writes,
)
if self.verbose:
_log(result.summary())
return result
def _capture_recent_reads(self, turn: Turn, *, max_items: int = 8) -> None:
captured: List[RecentRead] = []
for cmd, result in zip(turn.commands_issued, turn.results):
if not self._is_file_context_command(cmd, result):
continue
captured.append(RecentRead(command=cmd, output=result.output))
if not captured:
return
self._recent_reads.extend(captured)
self._recent_reads = self._recent_reads[-max_items:]
def _normalize_recent_command(self, command: str) -> str:
stripped = command.strip()
if stripped.startswith("[s") and "] " in stripped:
return stripped.split("] ", 1)[1].strip()
return stripped
def _parse_recent_file_command(self, command: str) -> Optional[Tuple[str, str, int, int]]:
normalized = self._normalize_recent_command(command)
if normalized.startswith("cat /repo/"):
target = normalized[len("cat "):].strip()
path_part = target
start_line = 0
end_line = 0
last_segment = target.rsplit("/", 1)[-1]
if ":" in last_segment:
path_part, range_part = target.rsplit(":", 1)
if "-" not in range_part:
return None
start_text, end_text = range_part.split("-", 1)
try:
start_line = int(start_text)
end_line = int(end_text)
except ValueError:
return None
rel_path = path_part.removeprefix("/repo/").strip()
return ("cat", rel_path, start_line, end_line)
if normalized.startswith("read-line-range /repo/") or normalized.startswith("read_line_range /repo/"):
parts = normalized.split()
if len(parts) < 3:
return None
path_part = parts[1]
range_part = parts[2]
if "-" not in range_part:
return None
start_text, end_text = range_part.split("-", 1)
try:
start_line = int(start_text)
end_line = int(end_text)
except ValueError:
return None
rel_path = path_part.removeprefix("/repo/").strip()
return ("read-line-range", rel_path, start_line, end_line)
return None
def _refresh_recent_file_context(self, rel_path: str) -> None:
normalized_path = str(rel_path or "").strip().removeprefix("/repo/").removeprefix("repo/")
if not normalized_path:
return
refreshed: List[RecentRead] = []
for item in self._recent_reads:
parsed = self._parse_recent_file_command(item.command)
if not parsed:
refreshed.append(item)
continue
command_type, command_path, start_line, end_line = parsed
if command_path != normalized_path:
refreshed.append(item)
continue
repo_path = f"/repo/{normalized_path}"
if command_type == "cat":
output = self.bridge.tree.cat(repo_path, start_line=start_line, end_line=end_line)
else:
output = self.bridge.tree.read_line_range(repo_path, start_line, end_line, include_line_numbers=True)
refreshed.append(RecentRead(command=item.command, output=output))
self._recent_reads = refreshed
def _is_file_context_command(self, command: str, result: CommandResult) -> bool:
if not result.ok or result.command_type != "read":
return False
lowered = command.lower()
return (
"cat /repo/" in lowered
or "read-line-range /repo/" in lowered
or "read_line_range /repo/" in lowered
or "grep /repo/" in lowered
or "symbols /repo/" in lowered
or "find-symbol /repo/" in lowered
or "find_symbol /repo/" in lowered
or "show-issue " in lowered
or lowered.startswith("[s") and (
"cat /repo/" in lowered
or "read-line-range /repo/" in lowered
or "read_line_range /repo/" in lowered
or "grep /repo/" in lowered
or "symbols /repo/" in lowered
or "find-symbol /repo/" in lowered
or "find_symbol /repo/" in lowered
or "show-issue " in lowered
)
)
def _is_bounded_mutation_result(self, result: CommandResult) -> bool:
action = result.tool_action or {}
action_type = str(action.get("type", "") or "")
return action_type in {
"write_file",
"replace_lines",
"patch_file",
"git_add",
"git_restore",
"git_commit",
}
# ------------------------------------------------------------------
# Prompt construction
# ------------------------------------------------------------------
def _system_prompt(self) -> str:
grammar = self.bridge.render_command_grammar()
shell_note = "\nSHELL ACCESS POLICY: SHELL_ACCESS=false. Do not emit `shell <command>` or any action that dispatches to `run_shell`. Prefer structured file, git, and diagnostics commands.\n" if not self.allow_shell else ""
return (_SYSTEM_PROMPT_TEMPLATE + shell_note).format(command_grammar=grammar)
def _combined_status(self) -> Dict[str, Any]:
status = {
"task_satisfied": False,
"edit_batch_mode": False,
"step": len(self.history),
}
try:
external = self._external_status_provider()
if isinstance(external, dict):
status.update(external)
except Exception:
pass
status.update(self._signal_state)
return status
def _compose_steering(self) -> str:
parts = [part for part in [self._base_steering, self._signal_steering, self._checkpoint_steering] if part]
return "\n".join(parts)
def _refresh_log_issue_signals(self) -> None:
issues = self.bridge.tree.list_log_issues()
status_map = {str(issue.get("id", "")): str(issue.get("status", "open")) for issue in issues if issue.get("id")}
previous_map = dict(self._signal_state.get("issue_status_map", {}))
unresolved = [issue for issue in issues if str(issue.get("status", "open")) != "resolved"]
resolved = [issue for issue in issues if str(issue.get("status", "open")) == "resolved"]
raw_signals: List[str] = []
previous_unresolved = [issue_id for issue_id, status in previous_map.items() if status != "resolved"]
changed_to_resolved = [
issue_id
for issue_id, status in status_map.items()
if previous_map.get(issue_id) != "resolved" and status == "resolved"
]
newly_added = [issue_id for issue_id in status_map if issue_id not in previous_map]
for issue_id in newly_added:
if status_map.get(issue_id) != "resolved":
raw_signals.append(f"issue_ingested:{issue_id}")
for issue_id in changed_to_resolved:
raw_signals.append(f"issue_resolved:{issue_id}")
current_focus = str(self._signal_state.get("current_focus_issue_id", ""))
unresolved_ids = [str(issue.get("id", "")) for issue in unresolved]
if current_focus not in unresolved_ids:
if current_focus:
raw_signals.append("focus_invalidated")
current_focus = unresolved_ids[0] if unresolved_ids else ""
meta_signal = ""
if previous_unresolved and not unresolved:
meta_signal = "all_issues_resolved"
elif len(unresolved) > 1:
meta_signal = "issue_batch_ready"
elif len(unresolved) == 1:
meta_signal = "single_issue_focus_ready"
elif raw_signals:
meta_signal = "issue_signal_update"
self._signal_state["issue_status_map"] = status_map
self._signal_state["current_focus_issue_id"] = current_focus
self._signal_state["unresolved_issue_count"] = len(unresolved)
self._signal_state["resolved_issue_count"] = len(resolved)
self._signal_state["raw_signals"] = raw_signals
if raw_signals:
self._signal_state["active_signal"] = raw_signals[0]
self._signal_state["signal_version"] = int(self._signal_state.get("signal_version", 0)) + 1
elif not unresolved and not issues:
self._signal_state["active_signal"] = ""
self._signal_state["meta_signal"] = meta_signal
if unresolved:
focus_issue = next((issue for issue in unresolved if str(issue.get("id", "")) == current_focus), unresolved[0])
focus_summary = str(focus_issue.get("summary", focus_issue.get("message", current_focus)))
if meta_signal == "issue_batch_ready":
issue_ids = ", ".join(unresolved_ids[:5])
self._signal_steering = "\n".join([
f"[SIGNAL {meta_signal}] There are {len(unresolved)} unresolved parsed issue(s).",
f"[RAW SIGNALS] {', '.join(raw_signals[:6]) if raw_signals else 'open_issue_set'}",