🤖 AG2

开源AI智能体操作系统深度解析

源码级别解析 · 从AutoGen到下一代多智能体框架的演进之路
2026-05-08 | 每日技术深度解读

目录概览

本次分享将涵盖AG2的完整架构与技术深度
  • 📚 AG2项目背景与演进历程
  • 🏗️ 核心架构设计与组件解析
  • 🤖 Agent类深度剖析
  • 🔄 Beta框架革新特性
  • 🔧 工具系统与扩展机制
  • 👥 多智能体协调模式
  • 📚 知识存储与记忆管理
  • 🎯 任务分发与子智能体
  • 👨‍💬 人机交互与HITL工作流
  • 🚀 高级特性与最佳实践

深入源码级别的技术解析,涵盖AG2从底层到高层的完整架构

项目背景与演进

从AutoGen到AG2的进化之路
  • 🌟 AG2是AutoGen的演进版本
  • 🔧 完全开源,多组织协作维护
  • 📈 AG2 Beta成为v1.0的官方版本
  • 🎯 目标是简化智能体AI的开发与研究
  • 🏗️ 提供多智能体协作的编程框架

AG2旨在为构建AI智能体和促进多智能体协作解决任务提供强大的编程框架

核心特性概览

AG2的核心能力与创新特性
  • 💬 多智能体对话与协作
  • 🛠️ 丰富的工具支持与扩展
  • 🔄 自主与人机混合工作流
  • 📋 多智能体对话模式
  • 🎨 灵活的中间件与观察者模式
  • 🔍 知识存储与智能检索
  • 🎯 任务分发与子智能体管理

AG2提供了从基础智能体到复杂多智能体系统的完整解决方案

安装与环境配置

快速上手AG2开发环境
# 安装AG2 (Windows/Linux)
pip install ag2[openai]

# Mac系统
pip install 'ag2[openai]'

# 验证安装
python -c "import autogen; print('AG2 installed successfully')"

AG2要求Python版本>=3.10,支持模块化安装,可根据需要选择额外的依赖包

API密钥配置

安全配置大语言模型API密钥
[
  {
    "model": "gpt-5",
    "api_key": "<your_openai_api_key_here>",
    "base_url": "https://api.openai.com/v1"
  },
  {
    "model": "claude-3-5-sonnet-20241022",
    "api_key": "<your_anthropic_api_key_here>",
    "base_url": "https://api.anthropic.com"
  }
]

推荐使用OAI_CONFIG_LIST文件存储API密钥,确保添加到.gitignore避免意外提交

第一个智能体示例

快速创建和运行你的第一个AG2智能体
from autogen import AssistantAgent, UserProxyAgent, LLMConfig

# 加载LLM配置
llm_config = LLMConfig.from_json(path="OAI_CONFIG_LIST")

# 创建助手智能体
assistant = AssistantAgent(
    "assistant", 
    llm_config=llm_config,
    system_message="You are a helpful AI assistant."
)

# 创建用户代理智能体
user_proxy = UserProxyAgent(
    "user_proxy", 
    code_execution_config={"work_dir": "coding", "use_docker": False}
)

# 运行智能体任务
user_proxy.run(
    assistant, 
    message="Summarize the main differences between Python lists and tuples."
).process()

这是最基础的AG2使用示例,展示了Agent和UserProxyAgent的协作模式

框架架构概览

AG2的整体架构层次
  • 🏗️ 基础Agent层:核心智能体基类
  • 🎯 对话层:ConversableAdapter与对话管理
  • 🔧 工具层:丰富的工具生态系统
  • 📡 事件层:事件驱动与流处理
  • 🔄 中间件层:可扩展的中间件链
  • 👁️ 观察者层:事件监听与响应
  • 💾 存储层:知识存储与持久化

AG2采用分层架构设计,每一层都提供清晰的接口和扩展点

AG2架构图

Agent (核心) ├── ConversableAdapter (兼容层) ├── Middleware (中间件链) ├── Observers (观察者) ├── Tools (工具系统) ├── KnowledgeStore (知识存储) └── TaskSpawning (任务分发) Event Stream → Message Bus → Event Processing

AG2的核心架构围绕Agent类展开,通过事件驱动的中间件链实现灵活的功能扩展

Beta框架架构

下一代AG2 Beta的设计理念
  • 🌊 流式处理与事件驱动架构
  • 🎯 优化的智能体生命周期管理
  • 📝 类型安全的响应验证
  • 🔄 改进的错误处理与重试机制
  • 🎨 灵活的配置与注入系统
  • 📊 增强的可观测性与调试能力

Beta框架是AG2未来的发展方向,提供了更强大和智能化的能力

Agent类深度剖析

Agent类的核心设计与实现
  • 🏗️ Agent是AG2的核心构建块
  • 🔄 运行模型循环,调用工具
  • 📡 处理中间件和观察者事件
  • 🎯 可选的harness原语运行
  • 🔧 支持装配、压缩、聚合等高级功能

Agent类是多智能体系统的基础,提供了完整的智能体生命周期管理

Agent类核心方法

Agent的关键API接口
class Agent(Generic[TResult]):
    def __init__(
        self,
        name: str,
        prompt: PromptType = (),
        config: ModelConfig = None,
        tools: Iterable[Tool] = (),
        middleware: Iterable[MiddlewareFactory] = (),
        observers: Iterable[Observer] = (),
        knowledge: KnowledgeConfig = None,
        tasks: TaskConfig = False,
        assembly: Iterable[AssemblyPolicy] = (),
    )
    
    async def ask(
        self,
        *msg: str | Input,
        stream: Stream = None,
        tools: Iterable[Tool] = (),
        middleware: Iterable[MiddlewareFactory] = (),
        observers: Iterable[Observer] = (),
        response_schema: ResponseProto = None,
    ) -> AgentReply

Agent类提供了丰富的初始化选项和异步ask方法用于与智能体交互

AgentReply机制

AgentReply的异步处理与响应验证
  • 📝 响应内容解析与验证
  • 🔄 异步内容获取与重试机制
  • 📋 历史记录与上下文管理
  • 🎯 结构化响应验证支持
  • 📁 文件处理与二进制结果

AgentReply提供了强大的响应处理能力,支持异步操作和类型安全

中间件系统

可扩展的中间件架构
class BaseMiddleware:
    def on_turn(self, turn_func: AgentTurn) -> AgentTurn:
        """包装turn执行逻辑"""
        pass
    
    def on_llm_call(self, llm_func: LLMCall) -> LLMCall:
        """包装LLM调用逻辑"""
        pass

# 使用示例
def logging_middleware(event: BaseEvent, context: Context) -> BaseMiddleware:
    return LoggingMiddleware(event, context)

中间件系统允许在LLM调用前后插入自定义逻辑,实现横切关注点

观察者模式

事件监听与响应机制
  • 👁️ Observer用于监听Agent事件
  • 🎯 支持条件监听和回调函数
  • 📊 实时事件处理与分析
  • 🔄 支持链式观察者注册
  • 📝 详细的事件生命周期管理

观察者模式实现了对Agent行为的监听和分析,便于调试和扩展

事件驱动架构

基于事件流的处理机制
  • 📡 MemoryStream:内存事件流
  • 🌊 Stream:通用事件流接口
  • 🔄 事件订阅与发布机制
  • 🎯 异步事件处理与锁机制
  • 📊 事件历史与回放能力

事件驱动架构是AG2的核心,确保了系统的灵活性和可扩展性

装配策略(Assembly Policies)

对话上下文的智能管理
  • 🎯 ConversationPolicy:对话策略
  • 📱 SlidingWindow:滑动窗口策略
  • ⚠️ AlertPolicy:告警策略
  • 🔧 AssemblerMiddleware:装配中间件
  • 🔄 _HaltCheckMiddleware:停止检查

装配策略智能管理对话上下文,提供多种上下文保持策略

知识存储系统

智能体的长期记忆管理
@dataclass
class KnowledgeConfig:
    """知识存储相关的Agent参数"""
    store: KnowledgeStore
    compact: CompactStrategy | None = None
    compact_trigger: CompactTrigger | None = None
    aggregate: AggregateStrategy | None = None
    aggregate_trigger: AggregateTrigger | None = None
    bootstrap: StoreBootstrap | None = None

知识存储系统支持智能体的长期记忆、信息压缩和智能检索

任务配置系统

子智能体任务分发机制
  • 🎯 TaskConfig管理任务参数
  • 🔄 支持工具包含/排除策略
  • 📋 额外工具注入能力
  • 🔧 自定义LLM配置
  • 👥 智能体继承与隔离

任务配置系统提供了灵活的子智能体创建和工具管理机制

子智能体工具

run_subtask与run_subtasks详解
# 运行单个子任务
@tool(name="run_subtask", description="Spawn isolated subtask agent")
async def run_subtask(task: str, ctx: Context) -> str:
    return await agent._spawn_subtask(task, ctx)

# 运行多个子任务
@tool(name="run_subtasks", description="Run multiple subtasks in parallel")
async def run_subtasks(ctx: Context, tasks: list[str], parallel: bool = True) -> str:
    if parallel:
        results = await asyncio.gather(
            *(agent._spawn_subtask(t, ctx) for t in tasks),
            return_exceptions=True
        )
    else:
        results = [await agent._spawn_subtask(t, ctx) for t in tasks]

子智能体工具支持独立的任务执行和并行处理能力

压缩策略

对话历史的智能压缩管理
  • 📝 历史记录的压缩与存储
  • 🎯 触发条件配置
  • 📊 CompactStrategy实现
  • 🔄 CompactTrigger管理
  • 💾 智能摘要生成

压缩策略有效管理长对话历史,提高处理效率和上下文管理

聚合策略

多轮对话的智能聚合
  • 📈 对话结果的智能聚合
  • 🎯 聚合触发器配置
  • 📊 AggregateStrategy实现
  • 🔄 AggregateTrigger管理
  • 💾 持久化存储支持

聚合策略支持多轮对话结果的智能处理和长期保存

工具生态系统

AG2丰富的工具生态
  • 🔧 builtin:内置工具集
  • 🌐 search:搜索工具集
  • ⚙️ shell:命令行工具
  • 💻 code:代码执行工具
  • 🎨 skills:技能工具
  • 📁 filesystem:文件系统工具
  • 🔗 toolkits:工具集成包

AG2提供了全面的工具生态系统,满足各种开发需求

内置工具详解

核心工具功能介绍
  • 📝 MemoryTool:内存管理
  • 🔧 ShellTool:命令执行
  • 🌐 WebSearchTool:网络搜索
  • 📥 WebFetchTool:网页抓取
  • 🖼️ ImageGenerationTool:图像生成
  • 🐳 MCPServerTool:MCP服务器
  • 🔒 NetworkPolicy:网络策略

内置工具提供了开发过程中常用的核心功能支持

搜索工具集成

多种搜索引擎集成
  • 🦆 DuckDuckSearchTool:DuckDuckGo搜索
  • 🔍 ExaToolkit:Exa智能搜索
  • 🤔 PerplexitySearchToolkit:Perplexity搜索
  • 🎯 TavilySearchTool:Tavily搜索
  • 🧠 SkillSearchToolkit:技能搜索

集成了多种主流搜索引擎,提供强大的信息检索能力

技能系统

AI技能的管理与使用
class Skill:
    """AI技能的定义与实现"""
    def __init__(
        self,
        name: str,
        description: str,
        tool: Tool,
        dependencies: dict[str, Any] = None
    )
    
    def execute(self, context: Context, **kwargs) -> Any:
        """执行技能"""
        pass

# 技能工具使用
skills_tool = SkillsTool(skills=[skill1, skill2, skill3])

技能系统提供了AI能力的模块化管理和智能调用机制

工具执行器

ToolExecutor的执行机制
  • 🔄 工具调用与执行
  • 🎯 工具发现与匹配
  • 📊 执行状态管理
  • 🔄 错误处理与重试
  • 📝 执行日志记录

ToolExecutor负责工具的智能调用和执行管理

工具装饰器

@tool装饰器的使用
# 基本工具定义
@tool
def calculate(expression: str) -> str:
    """计算数学表达式"""
    try:
        result = eval(expression)
        return f"Result: {result}"
    except Exception as e:
        return f"Error: {e}"

# 带schema的工具定义
@tool(
    name="advanced_calc",
    description="Advanced calculator with functions",
    schema={
        "type": "object",
        "properties": {
            "expression": {"type": "string"},
            "precision": {"type": "integer", "default": 2}
        }
    }
)
def advanced_calc(expression: str, precision: int = 2) -> str:
    """高级计算器"""
    result = eval(expression)
    return f"Result: {result:.{precision}f}"

@tool装饰器简化了工具定义,支持自动类型检查和文档生成

ConversableAdapter

新旧API的桥梁
class ConversableAdapter(ConversableAgent):
    def __init__(self, agent: Agent) -> None:
        super().__init__(agent.name)
        self.__agent = agent
        self.__conversation: AgentReply | None = None
        
    async def a_generate_conversable_reply(
        self,
        messages: list[dict[str, Any]],
        sender: ConversableAgent | None = None,
        config: OpenAIWrapper | None = None,
    ) -> tuple[bool, dict[str, Any] | None]:
        # 将新API调用转换为beta Agent调用
        request = messages[-1]["content"]
        r = await self.__agent.ask(request)
        return True, r.response.to_api()

ConversableAdapter确保了新旧API的兼容性,便于平滑迁移

多智能体对话模式

智能体间的协作策略
  • 🔄 Swarm模式:群体协作
  • 👥 Group Chat模式:群组对话
  • 🧩 Nested Chat模式:嵌套对话
  • 📋 Sequential Chat模式:顺序对话
  • 🎯 自定义对话模式
  • 🤝 人类参与对话模式

AG2支持多种智能体对话模式,满足不同协作场景需求

群组对话实现

多智能体协作的高级模式
from autogen import ConversableAgent, LLMConfig
from autogen.agentchat import run_group_chat
from autogen.agentchat.group.patterns import AutoPattern

# 定义多个智能体
llm_config = LLMConfig.from_json(path="OAI_CONFIG_LIST")

coder = ConversableAgent(
    name="coder",
    system_message="You are a Python developer. Write clean, efficient code.",
    llm_config=llm_config,
)

reviewer = ConversableAgent(
    name="reviewer", 
    system_message="You are a code reviewer. Analyze code and suggest improvements.",
    llm_config=llm_config,
)

# 运行群组对话
response = run_group_chat(
    pattern=AutoPattern(),
    agents=[coder, reviewer],
    messages="Write a function to calculate fibonacci numbers and review it.",
    max_turns=8
)
response.process()

群组对话模式允许多个智能体协作完成复杂任务

嵌套对话模式

智能体任务的分层处理
  • 🧩 主任务与子任务的层次结构
  • 🔄 智能体间任务委托
  • 📋 任务结果汇总与整合
  • 🎯 智能的角色分配
  • 🔧 错误处理与回退机制

嵌套对话模式支持复杂任务的分层处理和智能委托

顺序对话模式

线性流程的智能体协作
# 顺序对话实现
from autogen import ConversableAgent, GroupChat

# 创建顺序对话组
sequential_chat = GroupChat(
    agents=[planner, developer, tester],
    messages="Create a web application for task management.",
    speaker_selection_method="round_robin",
    max_consecutive_speaker=1
)

# 按顺序执行对话
for i in range(5):  # 5轮对话
    agent = sequential_chat.select_speaker()
    response = agent.generate_reply(messages=sequential_chat.messages)
    sequential_chat.add_message(response, agent.name)

顺序对话模式适用于需要明确步骤和逻辑顺序的任务

人类参与循环(HITL)

人机协作的智能工作流
  • 👨‍💬 人类输入与AI响应的循环
  • ⏸️ 暂停与继续机制
  • 🎯 关键决策点的人类干预
  • 📋 审批与确认流程
  • 🔄 实时反馈与调整

HITL模式确保重要决策有人类参与,提高可靠性和安全性

HITL中间件实现

人类交互的技术实现
# 人类钩子函数定义
def custom_hitl_hook(interactive_message: str) -> str:
    """自定义人类交互逻辑"""
    print(f"🤖 AI需要人类输入: {interactive_message}")
    
    while True:
        user_input = input("请输入你的响应: ")
        if user_input.lower() in ['quit', 'exit']:
            return "TERMINATE"
        return user_input

# 应用HITL钩子
agent = Agent(
    "assistant",
    config=model_config,
    hitl_hook=custom_hitl_hook
)

HITL中间件实现了人机交互的标准化流程

代码执行环境

安全的多语言代码执行
  • 🐍 Python代码执行
  • 🔧 容器化安全环境
  • ⏱️ 超时控制与资源限制
  • 📊 执行结果捕获与分析
  • 🔄 异步执行支持

代码执行环境支持多种语言的在线运行和测试

沙箱代码工具

SandboxCodeTool的安全机制
# 沙箱代码执行
from autogen.beta.tools.code import SandboxCodeTool

# 创建代码执行工具
code_tool = SandboxCodeTool(
    work_dir="/tmp/code_execution",
    timeout=30,
    use_docker=True,  # 使用Docker容器
    allowed_packages=["numpy", "pandas", "matplotlib"]
)

# 执行Python代码
result = await code_tool.execute(
    "import numpy as np\narr = np.array([1, 2, 3, 4, 5])\nresult = arr.mean()\nprint(f'Mean: {result}')"
)
print(f"执行结果: {result}")

沙箱代码工具提供了安全隔离的代码执行环境

网络策略与安全

网络访问的安全控制
  • 🔒 NetworkPolicy:网络访问策略
  • 🌐 白名单与黑名单机制
  • 📡 域名访问控制
  • 🛡️ 安全验证与认证
  • 📊 访问日志与监控

网络策略确保了智能体访问网络的安全性和合规性

Web搜索与抓取

智能信息获取工具
# Web搜索工具
from autogen.beta.tools.builtin import WebSearchTool, WebFetchTool

# 搜索工具
search_tool = WebSearchTool(
    engine="duckduckgo",
    max_results=5
)

# 抓取工具
fetch_tool = WebFetchTool(
    max_size=10 * 1024 * 1024,  # 10MB限制
    timeout=30
)

# 使用示例
search_results = await search_tool.search("最新AI技术趋势")
fetched_content = await fetch_tool.fetch("https://example.com/article")

Web工具提供了强大的信息检索和内容获取能力

文件系统工具集

完整的文件操作支持
  • 📁 文件读写操作
  • 📂 目录遍历与管理
  • 🔍 文件搜索与过滤
  • 📊 文件属性查询
  • 🔄 文件监控与变化检测

文件系统工具集提供了完整的文件操作能力

Shell工具集成

命令行工具的无缝集成
# 本地Shell工具
from autogen.beta.tools.shell import LocalShellTool

shell_tool = LocalShellTool(
    work_dir="/tmp",
    timeout=60,
    allowed_commands=["ls", "cp", "mv", "rm", "mkdir"],
    blocked_commands=["rm -rf", "sudo", "chmod 777"]
)

# 执行shell命令
result = await shell_tool.execute("ls -la")
print(f"命令结果: {result}")

# 容器环境Shell工具
container_shell = ContainerReferenceEnvironment(image="python:3.9")
container_result = await container_shell.execute("python --version")

Shell工具提供了安全的命令行执行能力,支持本地和容器环境

配置管理系统

灵活的配置与注入
  • 🎯 ModelConfig:模型配置
  • 🔧 LLMClient:客户端配置
  • 📊 依赖注入系统
  • 🔄 上下文变量管理
  • 🎨 环境变量支持

配置管理系统提供了灵活的配置选项和依赖注入机制

错误处理与重试

健壮的错误处理机制
# 自定义错误处理中间件
class ErrorHandlingMiddleware(BaseMiddleware):
    def __init__(self, max_retries=3, backoff_factor=2):
        self.max_retries = max_retries
        self.backoff_factor = backoff_factor
    
    async def on_llm_call(self, llm_func: LLMCall) -> LLMCall:
        """带重试的LLM调用"""
        async def retry_wrapper(*args, **kwargs):
            for attempt in range(self.max_retries):
                try:
                    return await llm_func(*args, **kwargs)
                except Exception as e:
                    if attempt == self.max_retries - 1:
                        raise e
                    wait_time = self.backoff_factor ** attempt
                    await asyncio.sleep(wait_time)
        return retry_wrapper

AG2提供了完善的错误处理和重试机制,确保系统的稳定性

日志与监控

系统可观测性支持
  • 📝 详细的事件日志
  • 📊 性能指标收集
  • 🔧 可配置的日志级别
  • 📁 日志文件管理
  • 🔄 实时监控支持

日志系统为调试和性能优化提供了全面的可观测性支持

类型安全与验证

强类型响应验证
# 响应schema定义
from pydantic import BaseModel, Field

class AnalysisResult(BaseModel):
    """分析结果模型"""
    summary: str = Field(..., description="分析摘要")
    sentiment: str = Field(..., description="情感分析结果")
    keywords: list[str] = Field(default=[], description="关键词列表")
    confidence: float = Field(..., ge=0.0, le=1.0, description="置信度")

# 使用schema验证
agent = Agent(
    "analyzer",
    response_schema=AnalysisResult,
    config=model_config
)

# 自动验证响应
response = await agent.ask("分析这段文本的情感...", response_schema=AnalysisResult)

响应验证确保了输出的类型安全性和结构完整性

性能优化策略

AG2的性能调优指南
  • ⚡ 流式处理减少延迟
  • 🔄 并发执行提升吞吐量
  • 💾 智能缓存策略
  • 📎 资源池管理
  • 🎯 异步操作优化

AG2提供了多种性能优化策略,满足高并发和低延迟需求

最佳实践

AG2开发的最佳实践
  • 🏗️ 合理的模块化设计
  • 🔧 适当的中间件使用
  • 📝 清晰的错误处理
  • 🎯 合理的超时配置
  • 📊 充分的日志记录
  • 🔄 异步编程规范
  • 🧪 完整的测试覆盖

遵循最佳实践确保AG2应用的可维护性和可扩展性

测试策略

AG2应用的测试方法
# 单元测试示例
import pytest
from unittest.mock import AsyncMock

@pytest.mark.asyncio
async def test_agent_ask():
    # Mock LLM响应
    mock_response = AsyncMock()
    mock_response.content = "Test response"
    
    agent = Agent(
        "test_agent",
        config=MockModelConfig(response=mock_response)
    )
    
    response = await agent.ask("Test message")
    assert response.body == "Test response"

# 集成测试示例
@pytest.mark.asyncio
async def test_multi_agent_interaction():
    # 创建测试智能体
    agent1 = Agent("agent1", config=mock_config)
    agent2 = Agent("agent2", config=mock_config)
    
    # 测试对话交互
    response = await agent1.ask("Hello", stream=MemoryStream())
    assert response is not None

全面的测试策略确保AG2应用的质量和稳定性

部署与扩展

AG2的生产环境部署
  • 🏗️ 容器化部署策略
  • 📊 负载均衡与扩展
  • 🔄 高可用性配置
  • 📁 持久化存储方案
  • 🔒 安全部署实践
  • 📈 监控与告警系统

AG2支持多种部署模式,适应不同的生产环境需求

未来路线图

AG2的发展方向
  • 🚀 v1.0正式版本发布
  • 📈 性能持续优化
  • 🔧 更多工具集成
  • 🌐 云原生支持
  • 🤖 模型插件化
  • 🎯 企业级功能增强

AG2将继续发展,提供更加强大和易用的智能体开发平台

迁移指南

从AutoGen到AG2的迁移
  • 🔄 API兼容性说明
  • 📋 迁移检查清单
  • 🔧 配置文件转换
  • 🎯 Beta框架推荐
  • 📚 迁移工具支持

AG2提供了平滑的迁移路径,确保现有应用的兼容性

社区与生态

AG2的开发者生态
  • 👥 活跃的开源社区
  • 📚 丰富的文档资源
  • 🎯 示例项目集合
  • 🔧 插件生态系统
  • 📈 持续的功能更新

AG2拥有活跃的社区和丰富的生态资源,为开发者提供全方位支持

总结与展望

AG2的技术价值与应用前景
  • 🏗️ 强大的智能体构建框架
  • 🔧 丰富的工具生态系统
  • 👥 灵活的多智能体协作
  • 🚀 高性能的事件驱动架构
  • 📈 广泛的应用前景

AG2代表了AI智能体开发的新范式,为构建复杂的AI应用提供了强大平台

参考资料

  • AG2 GitHub: https://github.com/ag2ai/ag2
  • 官方文档: https://docs.ag2.ai
  • Discord社区: https://discord.gg/ag2ai
  • 示例项目: https://github.com/ag2ai/build-with-ag2

感谢阅读!
访问 https://atcfu.com/ai-articles/ag2-agentos/ 回顾本文