-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat: context compressor #4322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: context compressor #4322
Conversation
Co-authored-by: kawayiYokami <289104862@qq.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey - 我发现了 7 个问题,并留下了一些总体反馈:
- 在
ContextTruncator.truncate_by_turns中,当keep_most_recent_turns <= dequeue_turns或非常小时,切片(-(keep_most_recent_turns - dequeue_turns + 1) * 2 :)的行为会比较意外,甚至可能删除几乎所有内容;建议将dequeue_turns相对于keep_most_recent_turns做约束,并针对这些边界情况增加显式的保护和测试。 - 默认的 LLM 压缩指令字符串同时出现在配置默认值和
LLMSummaryCompressor中,存在重复;建议将这个默认值集中管理(例如使用一个共享常量),避免未来修改时后端逻辑与配置产生偏差。 TokenCounter._estimate_tokens的启发式规则(每字符 0.6/0.3)比较随意,可能与实际模型的分词差距较大;如果这个值用于控制压缩行为,建议使该启发式在不同 provider/model 维度上可配置,或者明确限制其仅用于非关键的、近似性的决策。
供 AI Agent 使用的提示词
Please address the comments from this code review:
## Overall Comments
- 在 `ContextTruncator.truncate_by_turns` 中,当 `keep_most_recent_turns <= dequeue_turns` 或非常小时,切片 `(-(keep_most_recent_turns - dequeue_turns + 1) * 2 :)` 的行为会比较意外,甚至可能删除几乎所有内容;建议将 `dequeue_turns` 相对于 `keep_most_recent_turns` 做约束,并针对这些边界情况增加显式的保护和测试。
- 默认的 LLM 压缩指令字符串同时出现在配置默认值和 `LLMSummaryCompressor` 中,存在重复;建议将这个默认值集中管理(例如使用一个共享常量),避免未来修改时后端逻辑与配置产生偏差。
- `TokenCounter._estimate_tokens` 的启发式规则(每字符 0.6/0.3)比较随意,可能与实际模型的分词差距较大;如果这个值用于控制压缩行为,建议使该启发式在不同 provider/model 维度上可配置,或者明确限制其仅用于非关键的、近似性的决策。
## Individual Comments
### Comment 1
<location> `astrbot/core/agent/context/truncator.py:22-31` </location>
<code_context>
+ def truncate_by_turns(
</code_context>
<issue_to_address>
**issue (bug_risk):** truncate_by_turns 在 keep_most_recent_turns 较小或为 0 时语义不清晰,可能导致出人意料的切片结果
`keep_most_recent_turns` 与 `dequeue_turns` 之间的交互关系比较难以理解,并且在取值较小时行为会变得古怪。比如:
- 当 `keep_most_recent_turns == 0` 且 `dequeue_turns == 1` 时,`(keep_most_recent_turns - dequeue_turns + 1) * 2` 等于 0,因此 `messages[-0:]` 实际上会返回完整列表(完全没有截断)。
- 当 `keep_most_recent_turns < dequeue_turns` 时,会产生负的切片下标,从而删除超过 `dequeue_turns` 的轮数。
更清晰的一种方式,是先根据“轮”的计数算出需要保留的消息数量,仅从末尾进行一次切片,然后再根据消息边界进行调整。这样可以消除负/零切片的边界情况,使得在所有调用点上都更容易推理其行为。
</issue_to_address>
### Comment 2
<location> `astrbot/core/agent/context/truncator.py:68-77` </location>
<code_context>
+
+ return self.fix_messages(truncated_contexts)
+
+ def truncate_by_halving(
+ self,
+ messages: list[Message],
+ ) -> list[Message]:
+ """对半砍策略,删除 50% 的消息"""
+ if len(messages) <= 2:
+ return messages
+
+ first_non_system = 0
+ for i, msg in enumerate(messages):
+ if msg.role != "system":
+ first_non_system = i
+ break
+
+ messages_to_delete = (len(messages) - first_non_system) // 2
+
+ result = messages[:first_non_system]
+ result.extend(messages[first_non_system + messages_to_delete :])
+
+ index = next(
+ (i for i, item in enumerate(result) if item.role == "user"),
+ None,
+ )
+ if index is not None:
+ result = result[index:]
+
+ return self.fix_messages(result)
</code_context>
<issue_to_address>
**issue (bug_risk):** 对半截断策略可能会丢弃初始 system 消息以及更早的指令
在 `truncate_by_halving` 中,你首先保留了前面的 system 消息,但之后又从第一个 `user` 角色开始重新对 `result` 做切片:
```python
index = next((i for i, item in enumerate(result) if item.role == "user"), None)
if index is not None:
result = result[index:]
```
这可能会丢弃之前保留的 system 提示和出现在首个 user 消息之前的其它非用户指令,相当于在“对半截断对话历史”之前就把配置类信息丢掉了。更安全的做法是像 `truncate_by_turns` 一样,始终保留初始的 system 消息,仅对非 system 部分执行对半截断,然后在不对其应用基于 `user` 的切片逻辑的前提下,再把 system 消息拼接回结果。
</issue_to_address>
### Comment 3
<location> `astrbot/core/agent/context/compressor.py:42-51` </location>
<code_context>
+ return messages
+
+
+class TruncateByTurnsCompressor(ContextCompressor):
+ """Truncate by turns compressor implementation.
+ Truncates the message list by removing older turns.
+ """
+
+ def __init__(self, truncate_turns: int = 1):
+ """Initialize the truncate by turns compressor.
+
+ Args:
+ truncate_turns: The number of turns to remove when truncating (default: 1).
+ """
+ self.truncate_turns = truncate_turns
+
+ async def compress(self, messages: list[Message]) -> list[Message]:
+ truncator = ContextTruncator()
+ truncated_messages = truncator.truncate_by_turns(
+ messages,
+ keep_most_recent_turns=0,
</code_context>
<issue_to_address>
**issue (bug_risk):** 在 TruncateByTurnsCompressor 中使用 keep_most_recent_turns=0 可能无法真正删除历史
`truncate_by_turns` 的保护分支与切片逻辑在 `keep_most_recent_turns=0` 时,往往会直接返回未修改的 `messages`(在某些情况下甚至保留完整列表,参见前一个评论)。这意味着该压缩器可能实际上并未删除 `truncate_turns` 对应数量的历史轮次。可以考虑添加一个专门的“丢弃最旧 N 个轮次”的操作,不依赖 `keep_most_recent_turns`;或者传入一个非零的 `keep_most_recent_turns`,以匹配你期望保留的窗口大小。
</issue_to_address>
### Comment 4
<location> `astrbot/core/agent/runners/tool_loop_agent_runner.py:57` </location>
<code_context>
self.req = request
self.streaming = kwargs.get("streaming", False)
+
+ # enforce max turns, will discard older turns when exceeded BEFORE compression
+ # -1 means no limit
+ self.enforce_max_turns = kwargs.get("enforce_max_turns", -1)
</code_context>
<issue_to_address>
**issue (complexity):** 建议将所有截断与压缩逻辑统一收敛到一个基于配置驱动的 ContextManager.process 调用中,这样 runner 就不需要直接管理上下文细节。
你可以通过让 `ContextManager` 成为 *所有* 上下文变更(轮次截断 + 压缩)的单一入口点,并在其中集中处理配置与错误,来降低新增的复杂度。
### 1. 将截断 + 压缩统一到一个 `ContextManager.process` 调用
当前 `step` 需要协调两条流程:
```python
# step()
if self.enforce_max_turns != -1:
try:
truncated_messages = self.context_truncator.truncate_by_turns(
self.run_context.messages,
keep_most_recent_turns=self.enforce_max_turns,
dequeue_turns=self.truncate_turns,
)
self.run_context.messages = truncated_messages
except Exception as e:
logger.error(...)
try:
await self.do_context_compress()
except Exception as e:
logger.error(...)
```
可以把这部分逻辑下沉到 `ContextManager`,使得 runner 仅需:
```python
# step()
try:
self.run_context.messages = await self.context_manager.process(
self.run_context.messages
)
except Exception as e:
logger.error("Error during context processing: %s", e, exc_info=True)
```
同时可以完全移除 runner 中的 `ContextTruncator` 和 `do_context_compress`。
### 2. 将配置集中到一个统一的 context 配置对象中
目前一部分开关在 runner 中,一部分在 `ContextManager` 中,可以通过定义一个小型配置对象,并在初始化时一次性传入来统一管理:
```python
# context/config.py
from dataclasses import dataclass
from typing import Optional
from astrbot.core.provider.provider import Provider
@dataclass
class ContextConfig:
max_context_tokens: int
enforce_max_turns: int # -1 means no limit
truncate_turns: int
llm_compress_instruction: Optional[str]
llm_compress_keep_recent: int
llm_compress_provider: Optional[Provider]
```
在 `reset` 中使用:
```python
# in __init__/reset of the runner
from ..context.config import ContextConfig
self.context_config = ContextConfig(
max_context_tokens=provider.provider_config.get("max_context_tokens", 0),
enforce_max_turns=kwargs.get("enforce_max_turns", -1),
truncate_turns=kwargs.get("truncate_turns", 1),
llm_compress_instruction=kwargs.get("llm_compress_instruction"),
llm_compress_keep_recent=kwargs.get("llm_compress_keep_recent", 0),
llm_compress_provider=kwargs.get("llm_compress_provider"),
)
self.context_manager = ContextManager(self.context_config)
```
这样 `ToolLoopAgentRunner` 就无需了解 `truncate_by_turns` 或压缩实现的细节。
### 3. 将截断 + 压缩 + 错误处理移动到 `ContextManager` 内部
在 `ContextManager.process` 中实现组合逻辑,使行为保持不变但在本地更加一致集中:
```python
# context/manager.py
class ContextManager:
def __init__(self, config: ContextConfig) -> None:
self.config = config
self._truncator = ContextTruncator()
# existing token counter/compressor init...
async def process(self, messages: list[Message]) -> list[Message]:
try:
result = messages
# 1. 基于轮次的截断
if self.config.enforce_max_turns != -1:
result = self._truncator.truncate_by_turns(
result,
keep_most_recent_turns=self.config.enforce_max_turns,
dequeue_turns=self.config.truncate_turns,
)
# 2. 基于 token 的压缩(沿用现有逻辑)
result = await self._compress_if_needed(result)
return result
except Exception as e:
logger.error("Error during context processing: %s", e, exc_info=True)
# 尽力而为:返回当前结果(或回退到原始 messages)
return messages
```
然后可以删除 runner 中的:
```python
self.context_truncator = ContextTruncator()
...
async def do_context_compress(self): ...
...
# 以及 step() 中两个独立的 try/except 块
```
这样既保留了所有现有功能(最大轮次限制、轮次出队、压缩),又带来以下好处:
- runner 只需要一次 `context_manager.process(...)` 调用。
- 所有配置集中在一个地方(`ContextConfig`)。
- 上下文逻辑的错误处理也集中在 `ContextManager` 内部。
</issue_to_address>
### Comment 5
<location> `astrbot/core/agent/context/manager.py:14` </location>
<code_context>
+ from astrbot.core.provider.provider import Provider
+
+
+class ContextManager:
+ """Context compression manager."""
+
</code_context>
<issue_to_address>
**issue (complexity):** 建议将压缩和 token 检查逻辑直接内联到 `process` 中,避免额外的辅助方法,让控制流程在一个地方更容易理解。
你可以通过把线性的压缩流程内联到 `process` 中,并移除两个辅助方法,来简化 `ContextManager`。这样既能保持所有行为(阈值检查、压缩、对半截断)不变,又能减少间接层级和心智负担。
### 简化 `process`,移除 `_initial_token_check` / `_run_compression`
```python
class ContextManager:
COMPRESSION_THRESHOLD = 0.82
async def process(self, messages: list[Message]) -> list[Message]:
"""Process the messages, compressing/truncating if needed."""
if self.max_context_tokens <= 0 or not messages:
return messages
# Initial token check
total_tokens = self.token_counter.count_tokens(messages)
usage_rate = total_tokens / self.max_context_tokens
logger.debug(
"ContextManager: total tokens = %s, max_context_tokens = %s",
total_tokens,
self.max_context_tokens,
)
if usage_rate <= self.COMPRESSION_THRESHOLD:
return messages
# First pass: primary compression strategy
messages = await self.compressor.compress(messages)
# Second pass: double-check and halve if still too large
tokens_after = self.token_counter.count_tokens(messages)
if tokens_after / self.max_context_tokens > self.COMPRESSION_THRESHOLD:
messages = self.truncator.truncate_by_halving(messages)
return messages
```
这样就可以安全地移除 `_initial_token_check` 和 `_run_compression`。行为保持不变,但控制流程变成了一个易于理解的单一管线:
1. 检查限制 → 提前返回。
2. 统计 token → 阈值检查。
3. 通过选定策略压缩。
4. 再次统计,如果仍然过大则对半截断。
</issue_to_address>
### Comment 6
<location> `astrbot/core/agent/context/compressor.py:14` </location>
<code_context>
+from ..context.truncator import ContextTruncator
+
+
+class ContextCompressor(ABC):
+ """
+ Abstract base class for context compressors.
</code_context>
<issue_to_address>
**issue (complexity):** 建议通过用可调用/Protocol 替代 ABC、将 no-op 默认压缩器内联,以及把消息切分从 LLM 调用中分离出来,来简化压缩器设计、减少间接层级并分离关注点。
在保留全部功能的前提下,可以适当缩小抽象的表面面积。
### 1. 用轻量的 Protocol / 可调用对象替代 ABC
如果所有压缩器都是在代码中显式声明并接线(没有动态插件加载),那么 ABC 的开销略显过重。使用 `Protocol` 或简单的可调用类型可以保留“策略”这个概念,同时减少基类带来的间接层级。
例如:
```python
# compressor.py
from typing import Protocol, TYPE_CHECKING, Awaitable, Callable
if TYPE_CHECKING:
from ..message import Message
class ContextCompressor(Protocol):
async def __call__(self, messages: list["Message"]) -> list["Message"]:
...
```
然后压缩器可以直接实现为可调用对象:
```python
from dataclasses import dataclass
from ..context.truncator import ContextTruncator
@dataclass
class TruncateByTurnsCompressor:
truncate_turns: int = 1
async def __call__(self, messages: list[Message]) -> list[Message]:
truncator = ContextTruncator()
return truncator.truncate_by_turns(
messages,
keep_most_recent_turns=0,
dequeue_turns=self.truncate_turns,
)
```
在 `ContextManager` 中的使用方式也会更简单:
```python
self._compressor: ContextCompressor = TruncateByTurnsCompressor(truncate_turns=1)
compressed = await self._compressor(messages)
```
这样既保留了多态性,又移除了抽象基类带来的额外间接层级。
### 2. 用一个简单的可调用替代 `DefaultCompressor`
如果默认行为只是“什么都不做并返回原消息”,而 `DefaultCompressor` 只是编码了 `return messages`,那么可以避免额外的命名类:
```python
from typing import Awaitable, Callable
NoopCompressor: ContextCompressor = lambda messages: _noop(messages)
async def _noop(messages: list[Message]) -> list[Message]:
return messages
```
或者在当前使用 `DefaultCompressor()` 的地方直接传入 `_noop`。这样可以在不增加额外类型的前提下保持原有行为。
### 3. 在 `LLMSummaryCompressor` 中将消息切分逻辑从 LLM 调用中拆分出来
`LLMSummaryCompressor` 目前既负责消息分段,又负责调用 provider。你可以保留该类,但通过抽取一个纯函数来完成切分,使逻辑更容易理解和测试:
```python
def _split_history(
messages: list[Message], keep_recent: int
) -> tuple[Message | None, list[Message], list[Message]]:
system_msg = messages[0] if messages and messages[0].role == "system" else None
start_idx = 1 if system_msg else 0
messages_to_summarize = messages[start_idx : -keep_recent]
recent_messages = messages[-keep_recent:] if keep_recent > 0 else []
return system_msg, messages_to_summarize, recent_messages
```
然后 `compress` 可以变得更加线性:
```python
async def compress(self, messages: list[Message]) -> list[Message]:
if len(messages) <= self.keep_recent + 1:
return messages
system_msg, messages_to_summarize, recent_messages = _split_history(
messages, self.keep_recent
)
if not messages_to_summarize:
return messages
instruction_message = Message(role="user", content=self.instruction_text)
llm_payload = messages_to_summarize + [instruction_message]
try:
response = await self.provider.text_chat(contexts=llm_payload)
summary_content = response.completion_text
except Exception as e:
logger.error(f"Failed to generate summary: {e}")
return messages
result: list[Message] = []
if system_msg:
result.append(system_msg)
result.append(
Message(
role="system",
content=f"History conversation summary: {summary_content}",
)
)
result.extend(recent_messages)
return result
```
这样既保持了行为不变,又使不同关注点(分段 vs 调用 LLM)更清晰,更易于推理。
</issue_to_address>
### Comment 7
<location> `astrbot/core/agent/context/truncator.py:4` </location>
<code_context>
+from ..message import Message
+
+
+class ContextTruncator:
+ """Context truncator."""
+
</code_context>
<issue_to_address>
**issue (complexity):** 建议抽取共享的清理(sanitize)辅助函数,并明确基于轮次的计算逻辑,让两个截断方法复用相同的修复逻辑,从而提升可读性和可维护性。
你可以在保留当前行为的前提下,通过以下方式减少复杂度和重复:
1. **抽取共享的“sanitize”逻辑**(先截到第一个 `user` 再调用 `fix_messages`)
2. **在 `truncate_by_turns` 中显式化“轮”相关的计算**
3. **让两个截断策略都复用这些辅助函数**
### 1. 集中“修复”/sanitize 行为
这两个方法都会:
- 截到第一个 `user` 消息之后,并且
- 调用 `fix_messages`。
可以将这一过程命名为一个步骤,让调用者明确“截断”和“上下文修复”是两个独立的阶段:
```python
class ContextTruncator:
def _strip_before_first_user(self, messages: list[Message]) -> list[Message]:
index = next(
(i for i, m in enumerate(messages) if m.role == "user"),
None,
)
if index is not None and index > 0:
return messages[index:]
return messages
def sanitize(self, messages: list[Message]) -> list[Message]:
"""Normalize context order and repair tool segments."""
messages = self._strip_before_first_user(messages)
return self.fix_messages(messages)
def fix_messages(self, messages: list[Message]) -> list[Message]:
fixed_messages: list[Message] = []
for message in messages:
if message.role == "tool":
if len(fixed_messages) < 2:
fixed_messages = []
else:
fixed_messages.append(message)
else:
fixed_messages.append(message)
return fixed_messages
```
然后两个截断方法都只在末尾调用一次 `sanitize(...)`,让职责更加清晰。
### 2. 简化并文档化 `truncate_by_turns` 的数学逻辑
你可以保持现有意图不变,同时让基于轮次的计算更易读:
```python
def truncate_by_turns(
self,
messages: list[Message],
keep_most_recent_turns: int,
dequeue_turns: int = 1,
) -> list[Message]:
if keep_most_recent_turns == -1:
return messages
if not messages:
return messages
system_message: Message | None = None
non_system = messages
if messages[0].role == "system":
system_message = messages[0]
non_system = messages[1:]
total_turns = len(non_system) // 2
if total_turns <= keep_most_recent_turns:
return messages
# Drop oldest turns but ensure we drop at least `dequeue_turns`
turns_to_keep = keep_most_recent_turns
turns_to_drop = max(total_turns - turns_to_keep, dequeue_turns)
start_index = turns_to_drop * 2 # 2 messages per turn
truncated = non_system[start_index:]
if system_message is not None:
truncated = [system_message] + truncated
return self.sanitize(truncated)
```
这样依旧保持原有行为(“保留最近 N 轮,并至少丢弃 `dequeue_turns` 轮”),但避免了晦涩的 `-(keep_most_recent_turns - dequeue_turns + 1) * 2` 表达式,并把所有计算都绑定在显式的“轮次”概念上。
### 3. 在 `truncate_by_halving` 中复用辅助函数
有了 `sanitize` 后,`truncate_by_halving` 就可以更短、更少重复:
```python
def truncate_by_halving(self, messages: list[Message]) -> list[Message]:
if len(messages) <= 2:
return messages
first_non_system = next(
(i for i, m in enumerate(messages) if m.role != "system"),
len(messages),
)
messages_to_delete = (len(messages) - first_non_system) // 2
kept_prefix = messages[:first_non_system]
kept_suffix = messages[first_non_system + messages_to_delete :]
result = kept_prefix + kept_suffix
return self.sanitize(result)
```
这样:
- 保留了当前所有行为,
- 将结构性“修复”逻辑集中在一处,
- 并使两种截断策略更易理解和维护。
</issue_to_address>帮我变得更有用!请对每条评论点 👍 或 👎,我会根据你的反馈持续改进评审质量。
Original comment in English
Hey - I've found 7 issues, and left some high level feedback:
- In
ContextTruncator.truncate_by_turns, the slice(-(keep_most_recent_turns - dequeue_turns + 1) * 2 :)will behave unexpectedly or even drop almost everything whenkeep_most_recent_turns <= dequeue_turnsor very small; consider clampingdequeue_turnsrelative tokeep_most_recent_turnsand adding explicit guards/tests for these edge cases. - The default LLM compression instruction string is duplicated in both the config default and
LLMSummaryCompressor; consider centralizing this default (e.g., a shared constant) so future edits don’t drift between backend logic and configuration. - The
TokenCounter._estimate_tokensheuristic (0.6/0.3 per-character) is quite ad‑hoc and may diverge significantly from actual model tokenization; if this value is used to gate compression behavior, consider making the heuristic configurable per-provider/model or clearly constraining its use to non-critical, approximate decisions.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `ContextTruncator.truncate_by_turns`, the slice `(-(keep_most_recent_turns - dequeue_turns + 1) * 2 :)` will behave unexpectedly or even drop almost everything when `keep_most_recent_turns <= dequeue_turns` or very small; consider clamping `dequeue_turns` relative to `keep_most_recent_turns` and adding explicit guards/tests for these edge cases.
- The default LLM compression instruction string is duplicated in both the config default and `LLMSummaryCompressor`; consider centralizing this default (e.g., a shared constant) so future edits don’t drift between backend logic and configuration.
- The `TokenCounter._estimate_tokens` heuristic (0.6/0.3 per-character) is quite ad‑hoc and may diverge significantly from actual model tokenization; if this value is used to gate compression behavior, consider making the heuristic configurable per-provider/model or clearly constraining its use to non-critical, approximate decisions.
## Individual Comments
### Comment 1
<location> `astrbot/core/agent/context/truncator.py:22-31` </location>
<code_context>
+ def truncate_by_turns(
</code_context>
<issue_to_address>
**issue (bug_risk):** truncate_by_turns semantics are unclear for small/zero keep_most_recent_turns and can lead to surprising slices
The interaction between `keep_most_recent_turns` and `dequeue_turns` is difficult to reason about and behaves oddly for small values. For instance:
- `keep_most_recent_turns == 0` and `dequeue_turns == 1` makes `(keep_most_recent_turns - dequeue_turns + 1) * 2` equal 0, so `messages[-0:]` returns the full list (no truncation).
- `keep_most_recent_turns < dequeue_turns` yields a negative slice index, dropping more than `dequeue_turns` turns.
A clearer approach would be to first compute the exact number of messages to keep based on turn counting, slice once from the end using that count, and then adjust to message boundaries. This would eliminate the negative/zero slice edge cases and make the behavior easier to reason about in all call sites.
</issue_to_address>
### Comment 2
<location> `astrbot/core/agent/context/truncator.py:68-77` </location>
<code_context>
+
+ return self.fix_messages(truncated_contexts)
+
+ def truncate_by_halving(
+ self,
+ messages: list[Message],
+ ) -> list[Message]:
+ """对半砍策略,删除 50% 的消息"""
+ if len(messages) <= 2:
+ return messages
+
+ first_non_system = 0
+ for i, msg in enumerate(messages):
+ if msg.role != "system":
+ first_non_system = i
+ break
+
+ messages_to_delete = (len(messages) - first_non_system) // 2
+
+ result = messages[:first_non_system]
+ result.extend(messages[first_non_system + messages_to_delete :])
+
+ index = next(
+ (i for i, item in enumerate(result) if item.role == "user"),
+ None,
+ )
+ if index is not None:
+ result = result[index:]
+
+ return self.fix_messages(result)
</code_context>
<issue_to_address>
**issue (bug_risk):** Halving truncation can drop the initial system message and earlier instructions
In `truncate_by_halving`, you first preserve leading system messages, but then you re-slice `result` from the first `user` role:
```python
index = next((i for i, item in enumerate(result) if item.role == "user"), None)
if index is not None:
result = result[index:]
```
This can drop the preserved system prompt and any other non-user instructions before the first user message, effectively discarding configuration rather than just halving the conversational history. It would be safer to always keep the initial system message (as in `truncate_by_turns`) and apply halving only to the non-system portion, then prepend the system message without applying the `user`-based slicing to it.
</issue_to_address>
### Comment 3
<location> `astrbot/core/agent/context/compressor.py:42-51` </location>
<code_context>
+ return messages
+
+
+class TruncateByTurnsCompressor(ContextCompressor):
+ """Truncate by turns compressor implementation.
+ Truncates the message list by removing older turns.
+ """
+
+ def __init__(self, truncate_turns: int = 1):
+ """Initialize the truncate by turns compressor.
+
+ Args:
+ truncate_turns: The number of turns to remove when truncating (default: 1).
+ """
+ self.truncate_turns = truncate_turns
+
+ async def compress(self, messages: list[Message]) -> list[Message]:
+ truncator = ContextTruncator()
+ truncated_messages = truncator.truncate_by_turns(
+ messages,
+ keep_most_recent_turns=0,
</code_context>
<issue_to_address>
**issue (bug_risk):** Using keep_most_recent_turns=0 in TruncateByTurnsCompressor may not actually drop any history
`truncate_by_turns` has guard clauses and slice logic that, for `keep_most_recent_turns=0`, can frequently return `messages` unchanged (and in some cases preserve the full list; see prior comment). That means this compressor may not actually remove `truncate_turns` worth of history. Consider either adding a dedicated "drop N oldest turns" operation that doesn’t depend on `keep_most_recent_turns`, or passing a nonzero `keep_most_recent_turns` that matches the intended retained window.
</issue_to_address>
### Comment 4
<location> `astrbot/core/agent/runners/tool_loop_agent_runner.py:57` </location>
<code_context>
self.req = request
self.streaming = kwargs.get("streaming", False)
+
+ # enforce max turns, will discard older turns when exceeded BEFORE compression
+ # -1 means no limit
+ self.enforce_max_turns = kwargs.get("enforce_max_turns", -1)
</code_context>
<issue_to_address>
**issue (complexity):** Consider consolidating all truncation and compression handling into a single, config-driven ContextManager.process call so the runner no longer manages context details directly.
You can reduce the added complexity by making `ContextManager` the single entry point for *all* context mutation (turn truncation + compression), and by centralizing configuration and error handling there.
### 1. Unify truncation + compression into one `ContextManager.process` call
Right now `step` coordinates two flows:
```python
# step()
if self.enforce_max_turns != -1:
try:
truncated_messages = self.context_truncator.truncate_by_turns(
self.run_context.messages,
keep_most_recent_turns=self.enforce_max_turns,
dequeue_turns=self.truncate_turns,
)
self.run_context.messages = truncated_messages
except Exception as e:
logger.error(...)
try:
await self.do_context_compress()
except Exception as e:
logger.error(...)
```
You can push this into `ContextManager` so the runner just does:
```python
# step()
try:
self.run_context.messages = await self.context_manager.process(
self.run_context.messages
)
except Exception as e:
logger.error("Error during context processing: %s", e, exc_info=True)
```
And remove `ContextTruncator` and `do_context_compress` from the runner entirely.
### 2. Centralize configuration into a single context config
Instead of keeping half the knobs in the runner and half in `ContextManager`, define a small config object and pass that once:
```python
# context/config.py
from dataclasses import dataclass
from typing import Optional
from astrbot.core.provider.provider import Provider
@dataclass
class ContextConfig:
max_context_tokens: int
enforce_max_turns: int # -1 means no limit
truncate_turns: int
llm_compress_instruction: Optional[str]
llm_compress_keep_recent: int
llm_compress_provider: Optional[Provider]
```
Use it in `reset`:
```python
# in __init__/reset of the runner
from ..context.config import ContextConfig
self.context_config = ContextConfig(
max_context_tokens=provider.provider_config.get("max_context_tokens", 0),
enforce_max_turns=kwargs.get("enforce_max_turns", -1),
truncate_turns=kwargs.get("truncate_turns", 1),
llm_compress_instruction=kwargs.get("llm_compress_instruction"),
llm_compress_keep_recent=kwargs.get("llm_compress_keep_recent", 0),
llm_compress_provider=kwargs.get("llm_compress_provider"),
)
self.context_manager = ContextManager(self.context_config)
```
Now `ToolLoopAgentRunner` doesn’t need to know about `truncate_by_turns` or compression internals.
### 3. Move truncation + compression + error handling inside `ContextManager`
Implement the combined logic inside `ContextManager.process` so behavior stays the same but is locally coherent:
```python
# context/manager.py
class ContextManager:
def __init__(self, config: ContextConfig) -> None:
self.config = config
self._truncator = ContextTruncator()
# existing token counter/compressor init...
async def process(self, messages: list[Message]) -> list[Message]:
try:
result = messages
# 1. turn-based truncation
if self.config.enforce_max_turns != -1:
result = self._truncator.truncate_by_turns(
result,
keep_most_recent_turns=self.config.enforce_max_turns,
dequeue_turns=self.config.truncate_turns,
)
# 2. token-based compression (existing logic)
result = await self._compress_if_needed(result)
return result
except Exception as e:
logger.error("Error during context processing: %s", e, exc_info=True)
# best-effort: return whatever we have (or fall back to original)
return messages
```
Then delete the runner’s:
```python
self.context_truncator = ContextTruncator()
...
async def do_context_compress(self): ...
...
# and the two separate try/except blocks in step()
```
This keeps all existing functionality (max-turn enforcement, turn dequeueing, compression) but:
- The runner has a single `context_manager.process(...)` call.
- Configuration is in one place (`ContextConfig`).
- Error handling for context logic is localized in `ContextManager`.
</issue_to_address>
### Comment 5
<location> `astrbot/core/agent/context/manager.py:14` </location>
<code_context>
+ from astrbot.core.provider.provider import Provider
+
+
+class ContextManager:
+ """Context compression manager."""
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider inlining the compression and token-check logic directly into `process` to avoid extra helper methods and make the control flow easier to follow in one place.
You can simplify `ContextManager` by inlining the linear compression flow into `process` and removing the two helper methods. This keeps all behavior (threshold check, compression, halving) intact but reduces indirection and mental overhead.
### Simplify `process` and drop `_initial_token_check` / `_run_compression`
```python
class ContextManager:
COMPRESSION_THRESHOLD = 0.82
async def process(self, messages: list[Message]) -> list[Message]:
"""Process the messages, compressing/truncating if needed."""
if self.max_context_tokens <= 0 or not messages:
return messages
# Initial token check
total_tokens = self.token_counter.count_tokens(messages)
usage_rate = total_tokens / self.max_context_tokens
logger.debug(
"ContextManager: total tokens = %s, max_context_tokens = %s",
total_tokens,
self.max_context_tokens,
)
if usage_rate <= self.COMPRESSION_THRESHOLD:
return messages
# First pass: primary compression strategy
messages = await self.compressor.compress(messages)
# Second pass: double-check and halve if still too large
tokens_after = self.token_counter.count_tokens(messages)
if tokens_after / self.max_context_tokens > self.COMPRESSION_THRESHOLD:
messages = self.truncator.truncate_by_halving(messages)
return messages
```
Then you can safely remove `_initial_token_check` and `_run_compression`. The behavior is identical, but the control flow is now a single, easy-to-follow pipeline:
1. Check limit → early return.
2. Count tokens → threshold check.
3. Compress via selected strategy.
4. Recount and optionally halve.
</issue_to_address>
### Comment 6
<location> `astrbot/core/agent/context/compressor.py:14` </location>
<code_context>
+from ..context.truncator import ContextTruncator
+
+
+class ContextCompressor(ABC):
+ """
+ Abstract base class for context compressors.
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the compressor design by replacing the ABC with a callable/Protocol, inlining the no-op default, and extracting message-splitting from LLM calls to separate concerns and reduce indirection.
You can keep all functionality while reducing the abstraction surface a bit.
### 1. Replace the ABC with a lightweight Protocol / callable
If all compressors are known and wired up in code (no dynamic plugin loading), an ABC is heavier than needed. A `Protocol` or even a plain callable type keeps the “strategy” concept without the base-class indirection.
For example:
```python
# compressor.py
from typing import Protocol, TYPE_CHECKING, Awaitable, Callable
if TYPE_CHECKING:
from ..message import Message
class ContextCompressor(Protocol):
async def __call__(self, messages: list["Message"]) -> list["Message"]:
...
```
Then compressors can just be callables:
```python
from dataclasses import dataclass
from ..context.truncator import ContextTruncator
@dataclass
class TruncateByTurnsCompressor:
truncate_turns: int = 1
async def __call__(self, messages: list[Message]) -> list[Message]:
truncator = ContextTruncator()
return truncator.truncate_by_turns(
messages,
keep_most_recent_turns=0,
dequeue_turns=self.truncate_turns,
)
```
Usage in `ContextManager` becomes simpler as well:
```python
self._compressor: ContextCompressor = TruncateByTurnsCompressor(truncate_turns=1)
compressed = await self._compressor(messages)
```
This keeps polymorphism but removes the abstract base class and its extra indirection.
### 2. Drop `DefaultCompressor` in favor of a trivial callable
If the default behavior is “no-op compression” and `DefaultCompressor` is only encoding `return messages`, you can avoid another named class:
```python
from typing import Awaitable, Callable
NoopCompressor: ContextCompressor = lambda messages: _noop(messages)
async def _noop(messages: list[Message]) -> list[Message]:
return messages
```
Or just pass `_noop` wherever you currently pass `DefaultCompressor()`. This preserves behavior without an extra type.
### 3. Split message-slicing from LLM invocation in `LLMSummaryCompressor`
`LLMSummaryCompressor` currently does both message partitioning and the provider call. You can keep the class but make the logic easier to follow/test by extracting a pure helper for the slicing:
```python
def _split_history(
messages: list[Message], keep_recent: int
) -> tuple[Message | None, list[Message], list[Message]]:
system_msg = messages[0] if messages and messages[0].role == "system" else None
start_idx = 1 if system_msg else 0
messages_to_summarize = messages[start_idx : -keep_recent]
recent_messages = messages[-keep_recent:] if keep_recent > 0 else []
return system_msg, messages_to_summarize, recent_messages
```
Then `compress` becomes more linear:
```python
async def compress(self, messages: list[Message]) -> list[Message]:
if len(messages) <= self.keep_recent + 1:
return messages
system_msg, messages_to_summarize, recent_messages = _split_history(
messages, self.keep_recent
)
if not messages_to_summarize:
return messages
instruction_message = Message(role="user", content=self.instruction_text)
llm_payload = messages_to_summarize + [instruction_message]
try:
response = await self.provider.text_chat(contexts=llm_payload)
summary_content = response.completion_text
except Exception as e:
logger.error(f"Failed to generate summary: {e}")
return messages
result: list[Message] = []
if system_msg:
result.append(system_msg)
result.append(
Message(
role="system",
content=f"History conversation summary: {summary_content}",
)
)
result.extend(recent_messages)
return result
```
Behavior remains identical, but concerns (partitioning vs. LLM call) are clearer and easier to reason about.
</issue_to_address>
### Comment 7
<location> `astrbot/core/agent/context/truncator.py:4` </location>
<code_context>
+from ..message import Message
+
+
+class ContextTruncator:
+ """Context truncator."""
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting shared sanitization helpers and clarifying the turn-based calculations so both truncation methods reuse the same repair logic and are easier to read and maintain.
You can keep the current behavior while reducing complexity and duplication by:
1. **Extracting shared “sanitize” logic** (first-user trimming + `fix_messages`)
2. **Making the turn math explicit in `truncate_by_turns`**
3. **Reusing helpers in both truncation strategies**
### 1. Centralize the “fix” / sanitize behavior
Both methods:
- strip to the first `user` message, and
- call `fix_messages`.
That can be made a single, named step so callers know truncation is separate from “context repair”:
```python
class ContextTruncator:
def _strip_before_first_user(self, messages: list[Message]) -> list[Message]:
index = next(
(i for i, m in enumerate(messages) if m.role == "user"),
None,
)
if index is not None and index > 0:
return messages[index:]
return messages
def sanitize(self, messages: list[Message]) -> list[Message]:
"""Normalize context order and repair tool segments."""
messages = self._strip_before_first_user(messages)
return self.fix_messages(messages)
def fix_messages(self, messages: list[Message]) -> list[Message]:
fixed_messages: list[Message] = []
for message in messages:
if message.role == "tool":
if len(fixed_messages) < 2:
fixed_messages = []
else:
fixed_messages.append(message)
else:
fixed_messages.append(message)
return fixed_messages
```
Then both truncators just call `sanitize(...)` at the end, making the responsibility more explicit.
### 2. Simplify and document `truncate_by_turns` math
You can keep the logic but make the turn-based calculation more readable:
```python
def truncate_by_turns(
self,
messages: list[Message],
keep_most_recent_turns: int,
dequeue_turns: int = 1,
) -> list[Message]:
if keep_most_recent_turns == -1:
return messages
if not messages:
return messages
system_message: Message | None = None
non_system = messages
if messages[0].role == "system":
system_message = messages[0]
non_system = messages[1:]
total_turns = len(non_system) // 2
if total_turns <= keep_most_recent_turns:
return messages
# Drop oldest turns but ensure we drop at least `dequeue_turns`
turns_to_keep = keep_most_recent_turns
turns_to_drop = max(total_turns - turns_to_keep, dequeue_turns)
start_index = turns_to_drop * 2 # 2 messages per turn
truncated = non_system[start_index:]
if system_message is not None:
truncated = [system_message] + truncated
return self.sanitize(truncated)
```
This keeps the same behavior (“keep N recent turns, drop at least `dequeue_turns`”) but avoids the opaque `-(keep_most_recent_turns - dequeue_turns + 1) * 2` expression and ties everything to explicit turn counts.
### 3. Reuse helpers in `truncate_by_halving`
With `sanitize` in place, `truncate_by_halving` becomes shorter and less duplicated:
```python
def truncate_by_halving(self, messages: list[Message]) -> list[Message]:
if len(messages) <= 2:
return messages
first_non_system = next(
(i for i, m in enumerate(messages) if m.role != "system"),
len(messages),
)
messages_to_delete = (len(messages) - first_non_system) // 2
kept_prefix = messages[:first_non_system]
kept_suffix = messages[first_non_system + messages_to_delete :]
result = kept_prefix + kept_suffix
return self.sanitize(result)
```
This:
- keeps all current behavior,
- consolidates the structural “repair” logic,
- and makes the truncation strategies easier to understand and maintain.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| from ..message import Message | ||
|
|
||
|
|
||
| class ContextTruncator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): 建议抽取共享的清理(sanitize)辅助函数,并明确基于轮次的计算逻辑,让两个截断方法复用相同的修复逻辑,从而提升可读性和可维护性。
你可以在保留当前行为的前提下,通过以下方式减少复杂度和重复:
- 抽取共享的“sanitize”逻辑(先截到第一个
user再调用fix_messages) - 在
truncate_by_turns中显式化“轮”相关的计算 - 让两个截断策略都复用这些辅助函数
1. 集中“修复”/sanitize 行为
这两个方法都会:
- 截到第一个
user消息之后,并且 - 调用
fix_messages。
可以将这一过程命名为一个步骤,让调用者明确“截断”和“上下文修复”是两个独立的阶段:
class ContextTruncator:
def _strip_before_first_user(self, messages: list[Message]) -> list[Message]:
index = next(
(i for i, m in enumerate(messages) if m.role == "user"),
None,
)
if index is not None and index > 0:
return messages[index:]
return messages
def sanitize(self, messages: list[Message]) -> list[Message]:
"""Normalize context order and repair tool segments."""
messages = self._strip_before_first_user(messages)
return self.fix_messages(messages)
def fix_messages(self, messages: list[Message]) -> list[Message]:
fixed_messages: list[Message] = []
for message in messages:
if message.role == "tool":
if len(fixed_messages) < 2:
fixed_messages = []
else:
fixed_messages.append(message)
else:
fixed_messages.append(message)
return fixed_messages然后两个截断方法都只在末尾调用一次 sanitize(...),让职责更加清晰。
2. 简化并文档化 truncate_by_turns 的数学逻辑
你可以保持现有意图不变,同时让基于轮次的计算更易读:
def truncate_by_turns(
self,
messages: list[Message],
keep_most_recent_turns: int,
dequeue_turns: int = 1,
) -> list[Message]:
if keep_most_recent_turns == -1:
return messages
if not messages:
return messages
system_message: Message | None = None
non_system = messages
if messages[0].role == "system":
system_message = messages[0]
non_system = messages[1:]
total_turns = len(non_system) // 2
if total_turns <= keep_most_recent_turns:
return messages
# Drop oldest turns but ensure we drop at least `dequeue_turns`
turns_to_keep = keep_most_recent_turns
turns_to_drop = max(total_turns - turns_to_keep, dequeue_turns)
start_index = turns_to_drop * 2 # 2 messages per turn
truncated = non_system[start_index:]
if system_message is not None:
truncated = [system_message] + truncated
return self.sanitize(truncated)这样依旧保持原有行为(“保留最近 N 轮,并至少丢弃 dequeue_turns 轮”),但避免了晦涩的 -(keep_most_recent_turns - dequeue_turns + 1) * 2 表达式,并把所有计算都绑定在显式的“轮次”概念上。
3. 在 truncate_by_halving 中复用辅助函数
有了 sanitize 后,truncate_by_halving 就可以更短、更少重复:
def truncate_by_halving(self, messages: list[Message]) -> list[Message]:
if len(messages) <= 2:
return messages
first_non_system = next(
(i for i, m in enumerate(messages) if m.role != "system"),
len(messages),
)
messages_to_delete = (len(messages) - first_non_system) // 2
kept_prefix = messages[:first_non_system]
kept_suffix = messages[first_non_system + messages_to_delete :]
result = kept_prefix + kept_suffix
return self.sanitize(result)这样:
- 保留了当前所有行为,
- 将结构性“修复”逻辑集中在一处,
- 并使两种截断策略更易理解和维护。
Original comment in English
issue (complexity): Consider extracting shared sanitization helpers and clarifying the turn-based calculations so both truncation methods reuse the same repair logic and are easier to read and maintain.
You can keep the current behavior while reducing complexity and duplication by:
- Extracting shared “sanitize” logic (first-user trimming +
fix_messages) - Making the turn math explicit in
truncate_by_turns - Reusing helpers in both truncation strategies
1. Centralize the “fix” / sanitize behavior
Both methods:
- strip to the first
usermessage, and - call
fix_messages.
That can be made a single, named step so callers know truncation is separate from “context repair”:
class ContextTruncator:
def _strip_before_first_user(self, messages: list[Message]) -> list[Message]:
index = next(
(i for i, m in enumerate(messages) if m.role == "user"),
None,
)
if index is not None and index > 0:
return messages[index:]
return messages
def sanitize(self, messages: list[Message]) -> list[Message]:
"""Normalize context order and repair tool segments."""
messages = self._strip_before_first_user(messages)
return self.fix_messages(messages)
def fix_messages(self, messages: list[Message]) -> list[Message]:
fixed_messages: list[Message] = []
for message in messages:
if message.role == "tool":
if len(fixed_messages) < 2:
fixed_messages = []
else:
fixed_messages.append(message)
else:
fixed_messages.append(message)
return fixed_messagesThen both truncators just call sanitize(...) at the end, making the responsibility more explicit.
2. Simplify and document truncate_by_turns math
You can keep the logic but make the turn-based calculation more readable:
def truncate_by_turns(
self,
messages: list[Message],
keep_most_recent_turns: int,
dequeue_turns: int = 1,
) -> list[Message]:
if keep_most_recent_turns == -1:
return messages
if not messages:
return messages
system_message: Message | None = None
non_system = messages
if messages[0].role == "system":
system_message = messages[0]
non_system = messages[1:]
total_turns = len(non_system) // 2
if total_turns <= keep_most_recent_turns:
return messages
# Drop oldest turns but ensure we drop at least `dequeue_turns`
turns_to_keep = keep_most_recent_turns
turns_to_drop = max(total_turns - turns_to_keep, dequeue_turns)
start_index = turns_to_drop * 2 # 2 messages per turn
truncated = non_system[start_index:]
if system_message is not None:
truncated = [system_message] + truncated
return self.sanitize(truncated)This keeps the same behavior (“keep N recent turns, drop at least dequeue_turns”) but avoids the opaque -(keep_most_recent_turns - dequeue_turns + 1) * 2 expression and ties everything to explicit turn counts.
3. Reuse helpers in truncate_by_halving
With sanitize in place, truncate_by_halving becomes shorter and less duplicated:
def truncate_by_halving(self, messages: list[Message]) -> list[Message]:
if len(messages) <= 2:
return messages
first_non_system = next(
(i for i, m in enumerate(messages) if m.role != "system"),
len(messages),
)
messages_to_delete = (len(messages) - first_non_system) // 2
kept_prefix = messages[:first_non_system]
kept_suffix = messages[first_non_system + messages_to_delete :]
result = kept_prefix + kept_suffix
return self.sanitize(result)This:
- keeps all current behavior,
- consolidates the structural “repair” logic,
- and makes the truncation strategies easier to understand and maintain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces a comprehensive context management and compression system for the AstrBot agent framework to prevent context overflow during long conversations. The implementation provides two strategies: truncation by conversation turns and LLM-based intelligent summarization, with automatic token counting to trigger compression when usage exceeds 82% of the model's context window.
Key Changes:
- Added pluggable context management architecture with
ContextManager,ContextCompressor,ContextTruncator, andTokenCountermodules - Integrated LLM-based summarization that can compress old conversation history while preserving recent messages
- Extended provider configuration to include
max_context_tokenswith automatic inference from model metadata - Relocated context management settings from "Others" to a dedicated "Truncate and Compress" section in the UI
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
astrbot/core/agent/context/manager.py |
Implements main ContextManager orchestrating token checking and compression strategies |
astrbot/core/agent/context/compressor.py |
Provides LLMSummaryCompressor and TruncateByTurnsCompressor implementations |
astrbot/core/agent/context/truncator.py |
Handles message list truncation by turns or halving strategies |
astrbot/core/agent/context/token_counter.py |
Estimates token usage using heuristic calculations for Chinese and other text |
astrbot/core/agent/runners/tool_loop_agent_runner.py |
Integrates context management into the agent execution loop |
astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py |
Wires compression parameters from config to agent runner and injects model context limits |
astrbot/core/config/default.py |
Adds configuration schema for context compression strategies and LLM compression parameters |
dashboard/src/i18n/locales/zh-CN/features/config-metadata.json |
Moves context settings to new "truncate_and_compress" section with Chinese labels |
dashboard/src/i18n/locales/en-US/features/config-metadata.json |
Moves context settings to new "truncate_and_compress" section with English labels |
dashboard/src/composables/useProviderSources.ts |
Automatically populates max_context_tokens from model metadata when creating providers |
dashboard/src/components/shared/ConfigItemRenderer.vue |
Adjusts flex layout for slider/text field controls |
- Implemented a full test suite for ContextManager covering initialization, message processing, token-based compression, and error handling. - Added tests for ContextTruncator focusing on message fixing, truncation by turns, dropping oldest turns, and halving. - Ensured that both test suites validate edge cases and maintain expected behavior with various message types, including system and tool messages.
|
LGTM |
closes: #4148
related to PR: #4178
Co-authored-by: @kawayiYokami
TODOs:
Modifications / 改动点
Screenshots or Test Results / 运行截图或测试结果
Checklist / 检查清单
requirements.txt和pyproject.toml文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations inrequirements.txtandpyproject.toml.由 Sourcery 提供的摘要
为本地代理引入可配置的上下文管理与压缩系统,包括在达到上下文限制时使用基于 LLM 的总结功能。
新功能:
ContextManager,通过可插拔的压缩器(按轮数截断以及基于 LLM 的总结)在 token 限制内管理会话历史。max_context_tokens,在后端和控制台中,如模型元数据可用,则自动填充。改进:
ContextManager和ContextTruncator,并将其接入整个处理流水线。Original summary in English
Summary by Sourcery
Introduce a configurable context management and compression system for local agents, including LLM-based summarization when context limits are reached.
New Features:
Enhancements: