基于源码深度解析
2026-03-27 | AI Agent Framework
第一部分:项目概览
第二部分:核心类解析
第三部分:数据流与时序
第四部分:设计与实践
🧠
分层内存
🔄
自动内存管理
🛠️
工具驱动内存
LLM 通过函数调用主动管理自己的内存,而非被动接受固定上下文。
| 特性 | MemGPT v1 | Letta v2 |
|---|---|---|
| 内存模型 | 简单的分层 | Block-based + File Blocks |
| 存储 | SQLite | PostgreSQL + 向量DB |
| API | 无 | REST + Python/TS SDK |
| 多智能体 | 不支持 | 原生支持 |
| 工具系统 | 硬编码 | 动态工具 + MCP |
| 部署 | 本地 CLI | Server + Cloud |
| 模型支持 | OpenAI only | 多模型 (模型无关) |
class BaseAgent(ABC):
"""
Abstract class for all agents.
Only one interface is required: step.
"""
@abstractmethod
def step(
self,
input_messages: List[MessageCreate],
) -> LettaUsageStatistics:
"""
Top-level event message handler
for the agent.
"""
raise NotImplementedError
class Agent(BaseAgent):
def __init__(
self,
interface: Optional[Union[AgentInterface,
StreamingRefreshCLIInterface]],
agent_state: AgentState,
user: User,
first_message_verify_mono: bool = True,
mcp_clients: Optional[Dict[str,
AsyncBaseMCPClient]] = None,
save_last_response: bool = False,
):
# 核心初始化组件
self.agent_state = agent_state
self.user = user
# 工具规则求解器
self.tool_rules_solver = ToolRulesSolver(
tool_rules=agent_state.tool_rules
)
# 模型配置
self.model = self.agent_state.llm_config.model
self.supports_structured_output = \
check_supports_structured_output(model=self.model)
# 状态管理器(Service Layer)
self.block_manager = BlockManager()
self.message_manager = MessageManager()
self.agent_manager = AgentManager()
self.passage_manager = PassageManager()
# 遥测
self.telemetry_manager = TelemetryManager() \
if settings.llm_api_logging \
else NoopTelemetryManager()
# Block 的核心属性
class Block:
id: str # 唯一标识
label: str # "persona", "human" 等
value: str # 内存内容(纯文本)
read_only: bool # 是否只读
limit: int # token 上限
# 使用示例
persona_block = Block(
label="persona",
value="I am a helpful assistant.",
read_only=False,
limit=2000
)
| 类型 | Label | 说明 | 读写 |
|---|---|---|---|
| 🧑 人设记忆 | persona | Agent 的性格和行为指南 | 可读写 |
| 👤 用户记忆 | human | 关于用户的信息 | 可读写 |
| 📝 工作草稿 | scratchpad | 临时思考和规划 | 可读写 |
| 🔒 系统提示 | system | 核心指令(通常只读) | 只读 |
| 📁 文件块 | file_blocks | 附加的文档内容 | 可读写 |
class AgentState:
id: str # Agent ID
name: str # Agent 名称
agent_type: str # Agent 类型
memory: Memory # 内存对象
llm_config: LLMConfig # LLM 配置
embedding_config: EmbeddingConfig
tool_rules: List[ToolRule] # 工具规则
message_ids: List[str] # 上下文消息 ID
response_format: ResponseFormat
timezone: str # 时区
# metadata
created_at: datetime
updated_at: datetime
AgentState 是 Agent 的完整快照,包含运行所需的所有信息。
# Memory.compile() 将所有 Block 编译为
# system prompt 的一部分
# 输入(多个 Block)
memory = Memory(
blocks=[
Block(label="persona", value="You are..."),
Block(label="human", value="User name is..."),
Block(label="scratchpad", value="Current plan..."),
]
)
# 输出(编译后的 prompt 片段)
compiled = memory.compile()
# → "\n<persona>\nYou are...\n</persona>
# \n<human>\nUser name is...\n</human>
# \n<scratchpad>\nCurrent plan...\n</scratchpad>"
compile() 用 XML 标签包裹每个 Block,便于 LLM 区分不同记忆类型。
class PromptGenerator:
"""生成完整的 system prompt"""
def generate_system_prompt(
self,
agent_state: AgentState,
) -> str:
# 1. 加载基础系统提示
system_prompt = self._load_base_prompt()
# 2. 编译内存块到 prompt
memory_str = agent_state.memory.compile()
system_prompt += memory_str
# 3. 添加工具描述
tools_str = self._format_tools(agent_state.tools)
system_prompt += tools_str
# 4. 添加内存管理指令
system_prompt += self._memory_instructions()
return system_prompt
@trace_method
def step(
self,
input_messages: List[MessageCreate],
) -> LettaUsageStatistics:
# 重建 system prompt(含最新内存)
self.agent_state = \
self.agent_manager.rebuild_system_prompt(
agent_id=self.agent_state.id,
actor=self.user,
)
# 获取上下文中的消息
messages = self.agent_manager.get_in_context_messages(
agent_id=self.agent_state.id,
actor=self.user,
)
# 追加新消息
messages.extend(new_messages)
@trace_method
def _get_ai_reply(self, ...):
# 1. 检查上下文窗口
token_count = num_tokens_from_messages(messages)
if token_count > self.agent_state.llm_config.context_window:
raise ContextWindowExceededError(...)
# 2. 准备工具列表
functions_list = self._prepare_tools()
functions_list = self._runtime_override_tool_json_schema(
functions_list
)
# 3. 调用 LLM API
response = create(
model=self.model,
messages=messages,
tools=functions_list,
)
return response
🧠 内存工具
💬 通信工具
📝 块编辑工具
Agent 通过函数调用主动操作自己的记忆,实现自我改进。
class ToolExecutionSandbox:
"""工具执行沙盒环境"""
def execute_tool(
self,
tool: Tool,
function_args: dict,
agent_state: AgentState,
) -> ToolExecutionResult:
# 1. 参数校验和强制转换
validated_args = coerce_dict_args_by_annotations(
function_args,
tool.source_code,
)
# 2. 在沙盒中执行
result = self._run_in_sandbox(
tool.source_code,
validated_args,
env=self._build_env(agent_state),
)
return ToolExecutionResult(
output=result,
status="success",
)
内存更新后,system prompt 自动重建,LLM 在下一轮对话中就能使用新记忆。
def update_memory_if_changed(
self, new_memory: Memory
) -> bool:
system_message = self.message_manager \
.get_message_by_id(
message_id=self.agent_state.message_ids[0],
actor=self.user,
)
if new_memory.compile() not in \
system_message.content[0].text:
# 内存有变化,更新 DB
for label in new_memory.list_block_labels():
updated = new_memory.get_block(label).value
if updated != self.agent_state.memory \
.get_block(label).value:
self.block_manager.update_block(
block_id=...,
block_update=BlockUpdate(value=updated),
)
# 重建 system prompt
self.agent_state = \
self.agent_manager.rebuild_system_prompt(
agent_id=self.agent_state.id,
)
return True
return False
# 检查上下文溢出
token_count = num_tokens_from_messages(messages)
context_limit = self.agent_state.llm_config \
.context_window
if token_count > context_limit:
# 触发摘要压缩
summarized = summarize_messages(messages)
def summarize_messages(
messages: List[Message],
model: str,
token_limit: int,
) -> List[Message]:
"""将旧消息压缩为摘要"""
# 1. 计算需要保留的消息数量
cutoff = calculate_summarizer_cutoff(
messages, token_limit
)
# 2. 分离需要摘要和需要保留的消息
to_summarize = messages[:cutoff]
to_keep = messages[cutoff:]
# 3. 调用 LLM 生成摘要
summary = create(
model=model,
messages=[
{"role": "system",
"content": "Summarize this conversation..."},
*format_messages(to_summarize),
],
)
# 4. 用摘要替换旧消息
return [summary_message] + to_keep
# 心跳触发流程
def heartbeat(self):
# 发送内部心跳消息
heartbeat_msg = get_heartbeat()
# Agent 像处理用户消息一样处理心跳
self.step(input_messages=[heartbeat_msg])
# Agent 可以在心跳中:
# - 整理归档记忆
# - 反思对话历史
# - 规划下一步行动
class ToolRulesSolver:
"""工具调用规则求解器"""
def __init__(self, tool_rules):
self.tool_rules = tool_rules
self.call_history = []
def register_tool_call(self, tool_name: str):
"""记录一次工具调用"""
self.call_history.append(tool_name)
def should_allow_tool(
self, tool_name: str
) -> bool:
"""根据规则判断是否允许调用"""
for rule in self.tool_rules:
if rule.matches(tool_name, self.call_history):
return rule.allow
return True # 默认允许
ToolRulesSolver 实现了类似 Linux iptables 的链式规则匹配。
class BlockManager:
"""内存块的 CRUD 管理"""
def get_block_by_id(self, block_id, actor):
# 从数据库加载 Block
return self.orm.get(Block, block_id)
def update_block(
self, block_id, block_update, actor
):
# 更新 Block 内容并持久化
block = self.get_block_by_id(block_id, actor)
if block.read_only:
raise ValueError("Block is read-only!")
block.value = block_update.value
self.orm.update(block)
return block
def create_block(self, block_create, actor):
# 创建新 Block
block = Block(**block_create.dict())
self.orm.add(block)
return block
class MessageManager:
"""消息持久化管理"""
def get_message_by_id(self, message_id, actor):
return self.orm.get(Message, message_id)
def create_message(self, message, actor):
msg = Message(**message.dict())
self.orm.add(msg)
return msg
def get_in_context_messages(
self, agent_id, actor
) -> List[Message]:
"""获取当前在上下文中的消息"""
agent = self.agent_manager.get_agent(
agent_id
)
return [self.get_message_by_id(mid, actor)
for mid in agent.message_ids]
class AgentInterface(ABC):
"""Agent 输出接口抽象"""
@abstractmethod
def internal_monologue(self, msg):
"""显示 Agent 内心独白"""
pass
@abstractmethod
def assistant_message(self, msg):
"""显示 Agent 回复消息"""
pass
@abstractmethod
def function_message(self, msg, **kwargs):
"""显示函数调用结果"""
pass
# 实现: CLIInterface, StreamingInterface,
# APIInterface, DiscordInterface 等
接口模式让同一个 Agent 可以在不同环境中运行(CLI、API、Discord Bot)。
class LLMClient:
"""统一的 LLM 调用接口"""
def __init__(self):
# 支持多个 Provider
self.providers = {
ProviderType.OPENAI: OpenAIProvider(),
ProviderType.ANTHROPIC: AnthropicProvider(),
ProviderType.LOCAL: LocalProvider(),
}
def call(
self,
model: str,
messages: List[Message],
tools: Optional[List] = None,
) -> ChatCompletionResponse:
"""根据模型名路由到对应 Provider"""
provider = self._resolve_provider(model)
return provider.chat(model, messages, tools)
LLMClient 实现了模型无关的抽象,同一套代码可以运行在不同的 LLM 上。
# 不同 LLM Provider
class LLMProvider(ABC):
@abstractmethod
def chat(self, model, messages):
pass
class OpenAIProvider(LLMProvider):
def chat(self, model, messages):
return openai.ChatCompletion.create(
model=model, messages=messages
)
class AnthropicProvider(LLMProvider):
def chat(self, model, messages):
return anthropic.messages.create(
model=model, messages=messages
)
# 不同输出接口
class AgentInterface(ABC):
def internal_monologue(self, msg): ...
def assistant_message(self, msg): ...
class CLIInterface(AgentInterface):
def assistant_message(self, msg):
print(msg)
class DiscordInterface(AgentInterface):
def assistant_message(self, msg):
channel.send(msg)
# Agent 通过 Interface 发出事件
class Agent:
def step(self, input_messages):
# ... 处理逻辑 ...
# 发出内心独白事件
self.interface.internal_monologue(thought)
# 发出工具调用事件
self.interface.function_message(
f"Ran {func}({args})"
)
# 发出回复事件
self.interface.assistant_message(reply)
# 不同 Interface 实现决定事件去向:
# CLIInterface → print 到终端
# StreamingInterface → SSE 推送
# DiscordInterface → 发送到频道
Agent 不关心输出方式,Interface 决定事件如何处理。
class ToolExecutionSandbox:
def execute_tool(self, tool, args, agent_state):
# 1. 参数校验
validated = coerce_dict_args_by_annotations(
args, tool.source_code
)
# 2. 构建受限环境
env = self._build_env(agent_state)
# 3. 超时执行
result = timeout(
self._run(tool.source_code, validated, env),
timeout_seconds=30,
)
return ToolExecutionResult(output=result)
# 增量摘要策略
def calculate_summarizer_cutoff(
messages, token_limit
):
"""计算哪些消息需要摘要"""
total = 0
for i, msg in enumerate(reversed(messages)):
total += count_tokens(msg)
if total > token_limit * 0.8:
return len(messages) - i - 1
return len(messages)
# 异步 MCP 客户端
class AsyncBaseMCPClient(ABC):
@abstractmethod
async def call_tool(
self, tool_name, args
) -> str:
pass
@abstractmethod
async def list_tools(self) -> List:
pass
# 记忆检索流程
class PassageManager:
def search(
self,
query: str,
agent_id: str,
limit: int = 10,
) -> List[Passage]:
"""向量检索相关记忆"""
# 1. 将查询转为 embedding
query_embedding = self.embed(query)
# 2. 在向量 DB 中搜索
results = self.vector_db.search(
query_embedding,
filter={"agent_id": agent_id},
limit=limit,
)
return results
| 场景 | 指标 | 表现 |
|---|---|---|
| 短对话 (<10 轮) | 首 token 延迟 | ~500ms |
| 长对话 (100+ 轮) | 上下文管理 | 自动压缩,延迟稳定 |
| 记忆检索 | top-k 查询 | <100ms |
| 多工具并行 | 吞吐量 | 3-5x 串行 |
| 内存持久化 | 写延迟 | <50ms |
MCP 协议
Model Context Protocol,连接外部工具和数据源
多智能体
原生支持 Agent 间通信和协作
Letta Cloud
托管服务,无需本地部署
letta/agent.py — Agent 核心逻辑 (89KB)letta/schemas/memory.py — 内存模型定义letta/prompts/prompt_generator.py — Prompt 生成letta/services/ — 服务层🤖
MemGPT/Letta = 给 LLM 装上了"操作系统"
让 AI 智能体拥有持久记忆、自主管理、持续学习的能力