源码级别解析 · 从AutoGen到下一代多智能体框架的演进之路
2026-05-08 | 每日技术深度解读
深入源码级别的技术解析,涵盖AG2从底层到高层的完整架构
AG2旨在为构建AI智能体和促进多智能体协作解决任务提供强大的编程框架
AG2提供了从基础智能体到复杂多智能体系统的完整解决方案
# 安装AG2 (Windows/Linux)
pip install ag2[openai]
# Mac系统
pip install 'ag2[openai]'
# 验证安装
python -c "import autogen; print('AG2 installed successfully')"
AG2要求Python版本>=3.10,支持模块化安装,可根据需要选择额外的依赖包
[
{
"model": "gpt-5",
"api_key": "<your_openai_api_key_here>",
"base_url": "https://api.openai.com/v1"
},
{
"model": "claude-3-5-sonnet-20241022",
"api_key": "<your_anthropic_api_key_here>",
"base_url": "https://api.anthropic.com"
}
]
推荐使用OAI_CONFIG_LIST文件存储API密钥,确保添加到.gitignore避免意外提交
from autogen import AssistantAgent, UserProxyAgent, LLMConfig
# 加载LLM配置
llm_config = LLMConfig.from_json(path="OAI_CONFIG_LIST")
# 创建助手智能体
assistant = AssistantAgent(
"assistant",
llm_config=llm_config,
system_message="You are a helpful AI assistant."
)
# 创建用户代理智能体
user_proxy = UserProxyAgent(
"user_proxy",
code_execution_config={"work_dir": "coding", "use_docker": False}
)
# 运行智能体任务
user_proxy.run(
assistant,
message="Summarize the main differences between Python lists and tuples."
).process()
这是最基础的AG2使用示例,展示了Agent和UserProxyAgent的协作模式
AG2采用分层架构设计,每一层都提供清晰的接口和扩展点
AG2的核心架构围绕Agent类展开,通过事件驱动的中间件链实现灵活的功能扩展
Beta框架是AG2未来的发展方向,提供了更强大和智能化的能力
Agent类是多智能体系统的基础,提供了完整的智能体生命周期管理
class Agent(Generic[TResult]):
def __init__(
self,
name: str,
prompt: PromptType = (),
config: ModelConfig = None,
tools: Iterable[Tool] = (),
middleware: Iterable[MiddlewareFactory] = (),
observers: Iterable[Observer] = (),
knowledge: KnowledgeConfig = None,
tasks: TaskConfig = False,
assembly: Iterable[AssemblyPolicy] = (),
)
async def ask(
self,
*msg: str | Input,
stream: Stream = None,
tools: Iterable[Tool] = (),
middleware: Iterable[MiddlewareFactory] = (),
observers: Iterable[Observer] = (),
response_schema: ResponseProto = None,
) -> AgentReply
Agent类提供了丰富的初始化选项和异步ask方法用于与智能体交互
AgentReply提供了强大的响应处理能力,支持异步操作和类型安全
class BaseMiddleware:
def on_turn(self, turn_func: AgentTurn) -> AgentTurn:
"""包装turn执行逻辑"""
pass
def on_llm_call(self, llm_func: LLMCall) -> LLMCall:
"""包装LLM调用逻辑"""
pass
# 使用示例
def logging_middleware(event: BaseEvent, context: Context) -> BaseMiddleware:
return LoggingMiddleware(event, context)
中间件系统允许在LLM调用前后插入自定义逻辑,实现横切关注点
观察者模式实现了对Agent行为的监听和分析,便于调试和扩展
事件驱动架构是AG2的核心,确保了系统的灵活性和可扩展性
装配策略智能管理对话上下文,提供多种上下文保持策略
@dataclass
class KnowledgeConfig:
"""知识存储相关的Agent参数"""
store: KnowledgeStore
compact: CompactStrategy | None = None
compact_trigger: CompactTrigger | None = None
aggregate: AggregateStrategy | None = None
aggregate_trigger: AggregateTrigger | None = None
bootstrap: StoreBootstrap | None = None
知识存储系统支持智能体的长期记忆、信息压缩和智能检索
任务配置系统提供了灵活的子智能体创建和工具管理机制
# 运行单个子任务
@tool(name="run_subtask", description="Spawn isolated subtask agent")
async def run_subtask(task: str, ctx: Context) -> str:
return await agent._spawn_subtask(task, ctx)
# 运行多个子任务
@tool(name="run_subtasks", description="Run multiple subtasks in parallel")
async def run_subtasks(ctx: Context, tasks: list[str], parallel: bool = True) -> str:
if parallel:
results = await asyncio.gather(
*(agent._spawn_subtask(t, ctx) for t in tasks),
return_exceptions=True
)
else:
results = [await agent._spawn_subtask(t, ctx) for t in tasks]
子智能体工具支持独立的任务执行和并行处理能力
压缩策略有效管理长对话历史,提高处理效率和上下文管理
聚合策略支持多轮对话结果的智能处理和长期保存
AG2提供了全面的工具生态系统,满足各种开发需求
内置工具提供了开发过程中常用的核心功能支持
集成了多种主流搜索引擎,提供强大的信息检索能力
class Skill:
"""AI技能的定义与实现"""
def __init__(
self,
name: str,
description: str,
tool: Tool,
dependencies: dict[str, Any] = None
)
def execute(self, context: Context, **kwargs) -> Any:
"""执行技能"""
pass
# 技能工具使用
skills_tool = SkillsTool(skills=[skill1, skill2, skill3])
技能系统提供了AI能力的模块化管理和智能调用机制
ToolExecutor负责工具的智能调用和执行管理
# 基本工具定义
@tool
def calculate(expression: str) -> str:
"""计算数学表达式"""
try:
result = eval(expression)
return f"Result: {result}"
except Exception as e:
return f"Error: {e}"
# 带schema的工具定义
@tool(
name="advanced_calc",
description="Advanced calculator with functions",
schema={
"type": "object",
"properties": {
"expression": {"type": "string"},
"precision": {"type": "integer", "default": 2}
}
}
)
def advanced_calc(expression: str, precision: int = 2) -> str:
"""高级计算器"""
result = eval(expression)
return f"Result: {result:.{precision}f}"
@tool装饰器简化了工具定义,支持自动类型检查和文档生成
class ConversableAdapter(ConversableAgent):
def __init__(self, agent: Agent) -> None:
super().__init__(agent.name)
self.__agent = agent
self.__conversation: AgentReply | None = None
async def a_generate_conversable_reply(
self,
messages: list[dict[str, Any]],
sender: ConversableAgent | None = None,
config: OpenAIWrapper | None = None,
) -> tuple[bool, dict[str, Any] | None]:
# 将新API调用转换为beta Agent调用
request = messages[-1]["content"]
r = await self.__agent.ask(request)
return True, r.response.to_api()
ConversableAdapter确保了新旧API的兼容性,便于平滑迁移
AG2支持多种智能体对话模式,满足不同协作场景需求
from autogen import ConversableAgent, LLMConfig
from autogen.agentchat import run_group_chat
from autogen.agentchat.group.patterns import AutoPattern
# 定义多个智能体
llm_config = LLMConfig.from_json(path="OAI_CONFIG_LIST")
coder = ConversableAgent(
name="coder",
system_message="You are a Python developer. Write clean, efficient code.",
llm_config=llm_config,
)
reviewer = ConversableAgent(
name="reviewer",
system_message="You are a code reviewer. Analyze code and suggest improvements.",
llm_config=llm_config,
)
# 运行群组对话
response = run_group_chat(
pattern=AutoPattern(),
agents=[coder, reviewer],
messages="Write a function to calculate fibonacci numbers and review it.",
max_turns=8
)
response.process()
群组对话模式允许多个智能体协作完成复杂任务
嵌套对话模式支持复杂任务的分层处理和智能委托
# 顺序对话实现
from autogen import ConversableAgent, GroupChat
# 创建顺序对话组
sequential_chat = GroupChat(
agents=[planner, developer, tester],
messages="Create a web application for task management.",
speaker_selection_method="round_robin",
max_consecutive_speaker=1
)
# 按顺序执行对话
for i in range(5): # 5轮对话
agent = sequential_chat.select_speaker()
response = agent.generate_reply(messages=sequential_chat.messages)
sequential_chat.add_message(response, agent.name)
顺序对话模式适用于需要明确步骤和逻辑顺序的任务
HITL模式确保重要决策有人类参与,提高可靠性和安全性
# 人类钩子函数定义
def custom_hitl_hook(interactive_message: str) -> str:
"""自定义人类交互逻辑"""
print(f"🤖 AI需要人类输入: {interactive_message}")
while True:
user_input = input("请输入你的响应: ")
if user_input.lower() in ['quit', 'exit']:
return "TERMINATE"
return user_input
# 应用HITL钩子
agent = Agent(
"assistant",
config=model_config,
hitl_hook=custom_hitl_hook
)
HITL中间件实现了人机交互的标准化流程
代码执行环境支持多种语言的在线运行和测试
# 沙箱代码执行
from autogen.beta.tools.code import SandboxCodeTool
# 创建代码执行工具
code_tool = SandboxCodeTool(
work_dir="/tmp/code_execution",
timeout=30,
use_docker=True, # 使用Docker容器
allowed_packages=["numpy", "pandas", "matplotlib"]
)
# 执行Python代码
result = await code_tool.execute(
"import numpy as np\narr = np.array([1, 2, 3, 4, 5])\nresult = arr.mean()\nprint(f'Mean: {result}')"
)
print(f"执行结果: {result}")
沙箱代码工具提供了安全隔离的代码执行环境
网络策略确保了智能体访问网络的安全性和合规性
# Web搜索工具
from autogen.beta.tools.builtin import WebSearchTool, WebFetchTool
# 搜索工具
search_tool = WebSearchTool(
engine="duckduckgo",
max_results=5
)
# 抓取工具
fetch_tool = WebFetchTool(
max_size=10 * 1024 * 1024, # 10MB限制
timeout=30
)
# 使用示例
search_results = await search_tool.search("最新AI技术趋势")
fetched_content = await fetch_tool.fetch("https://example.com/article")
Web工具提供了强大的信息检索和内容获取能力
文件系统工具集提供了完整的文件操作能力
# 本地Shell工具
from autogen.beta.tools.shell import LocalShellTool
shell_tool = LocalShellTool(
work_dir="/tmp",
timeout=60,
allowed_commands=["ls", "cp", "mv", "rm", "mkdir"],
blocked_commands=["rm -rf", "sudo", "chmod 777"]
)
# 执行shell命令
result = await shell_tool.execute("ls -la")
print(f"命令结果: {result}")
# 容器环境Shell工具
container_shell = ContainerReferenceEnvironment(image="python:3.9")
container_result = await container_shell.execute("python --version")
Shell工具提供了安全的命令行执行能力,支持本地和容器环境
配置管理系统提供了灵活的配置选项和依赖注入机制
# 自定义错误处理中间件
class ErrorHandlingMiddleware(BaseMiddleware):
def __init__(self, max_retries=3, backoff_factor=2):
self.max_retries = max_retries
self.backoff_factor = backoff_factor
async def on_llm_call(self, llm_func: LLMCall) -> LLMCall:
"""带重试的LLM调用"""
async def retry_wrapper(*args, **kwargs):
for attempt in range(self.max_retries):
try:
return await llm_func(*args, **kwargs)
except Exception as e:
if attempt == self.max_retries - 1:
raise e
wait_time = self.backoff_factor ** attempt
await asyncio.sleep(wait_time)
return retry_wrapper
AG2提供了完善的错误处理和重试机制,确保系统的稳定性
日志系统为调试和性能优化提供了全面的可观测性支持
# 响应schema定义
from pydantic import BaseModel, Field
class AnalysisResult(BaseModel):
"""分析结果模型"""
summary: str = Field(..., description="分析摘要")
sentiment: str = Field(..., description="情感分析结果")
keywords: list[str] = Field(default=[], description="关键词列表")
confidence: float = Field(..., ge=0.0, le=1.0, description="置信度")
# 使用schema验证
agent = Agent(
"analyzer",
response_schema=AnalysisResult,
config=model_config
)
# 自动验证响应
response = await agent.ask("分析这段文本的情感...", response_schema=AnalysisResult)
响应验证确保了输出的类型安全性和结构完整性
AG2提供了多种性能优化策略,满足高并发和低延迟需求
遵循最佳实践确保AG2应用的可维护性和可扩展性
# 单元测试示例
import pytest
from unittest.mock import AsyncMock
@pytest.mark.asyncio
async def test_agent_ask():
# Mock LLM响应
mock_response = AsyncMock()
mock_response.content = "Test response"
agent = Agent(
"test_agent",
config=MockModelConfig(response=mock_response)
)
response = await agent.ask("Test message")
assert response.body == "Test response"
# 集成测试示例
@pytest.mark.asyncio
async def test_multi_agent_interaction():
# 创建测试智能体
agent1 = Agent("agent1", config=mock_config)
agent2 = Agent("agent2", config=mock_config)
# 测试对话交互
response = await agent1.ask("Hello", stream=MemoryStream())
assert response is not None
全面的测试策略确保AG2应用的质量和稳定性
AG2支持多种部署模式,适应不同的生产环境需求
AG2将继续发展,提供更加强大和易用的智能体开发平台
AG2提供了平滑的迁移路径,确保现有应用的兼容性
AG2拥有活跃的社区和丰富的生态资源,为开发者提供全方位支持
AG2代表了AI智能体开发的新范式,为构建复杂的AI应用提供了强大平台
感谢阅读!
访问 https://atcfu.com/ai-articles/ag2-agentos/ 回顾本文