源码级别解析 · 源码深度解析 · 多智能体协作 · SOP标准化
2026-05-06 | 每日技术深度解读
MetaGPT将一个需求输入转化为用户故事/竞争分析/需求/数据结构/API/文档等完整输出
内部包含产品经理、架构师、项目经理、工程师等角色,提供完整的软件开发流程
CLI提供了简单易用的接口,支持多种参数配置
from metagpt.software_company import generate_repo
from metagpt.utils.project_repo import ProjectRepo
# 生成项目仓库
repo: ProjectRepo = generate_repo("Create a 2048 game")
print(repo) # 输出项目结构
# 使用数据分析师
import asyncio
from metagpt.roles.di.data_interpreter import DataInterpreter
async def main():
di = DataInterpreter()
await di.run("Run data analysis on sklearn Iris dataset, include a plot")
asyncio.run(main())
MetaGPT既可作为CLI工具使用,也可作为Python库集成到其他项目中
Team类实现了智能体团队的全生命周期管理
class Team(BaseModel):
"""
Team: 拥有一个或多个角色(智能体),SOP(标准操作程序),
以及用于即时通讯的环境,致力于任何多智能体活动,
例如协作编写可执行代码。
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
env: Optional[Environment] = None
investment: float = Field(default=10.0)
idea: str = Field(default="")
use_mgx: bool = Field(default=True)
def hire(self, roles: list[Role]):
"""招聘角色进行协作"""
self.env.add_roles(roles)
def invest(self, investment: float):
"""投资公司,超出最大预算时抛出NoMoneyException"""
self.investment = investment
self.cost_manager.max_budget = investment
async def run(self, n_round=3, idea="", send_to="", auto_archive=True):
"""运行公司直到目标轮次或没钱"""
if idea:
self.run_project(idea=idea, send_to=send_to)
while n_round > 0:
if self.env.is_idle:
break
n_round -= 1
self._check_balance()
await self.env.run()
self.env.archive(auto_archive)
Team类提供了智能体团队的核心管理功能,包括招聘、投资、运行等
环境层是智能体间通信的基础,负责消息的传递和路由
class Environment:
"""环境:智能体间消息传递的基础设施"""
def __init__(self, context: Context = None):
self.context = context or Context()
self.roles: dict[str, Role] = {}
self.history: list[Message] = []
self._is_idle = True
def add_roles(self, roles: list[Role]):
"""添加角色到环境"""
for role in roles:
self.roles[role.name] = role
role.env = self
def publish_message(self, message: Message, send_to: str = ""):
"""发布消息到环境"""
self.history.append(message)
# 路由消息到相应的角色
async def run(self):
"""运行环境,处理消息"""
# 检查是否有待处理消息
# 激活相应的角色
# 处理角色响应
Environment类实现了智能体间的消息传递和状态管理
Role类为所有智能体提供了统一的基础架构
class Role(BaseModel):
"""角色:能够观察环境并执行动作的智能体"""
name: str = ""
profile: str = ""
goal: str = ""
constraints: str = ""
env: Optional[Environment] = None
memories: list[Memory] = []
def __init__(self, **data):
super().__init__(**data)
self.rc = RoleContext()
self.actions: dict[str, Action] = {}
def _observe(self, obs: list[Message]) -> int:
"""观察环境中的消息"""
count = 0
for msg in obs:
if self._can_handle(msg):
self.memories.append(msg)
count += 1
return count
async def _react(self) -> Message:
"""响应环境消息"""
if not self.rc.todo:
await self._think()
return await self._act()
async def _act(self) -> Message:
"""执行动作"""
action = self.actions.get(self.rc.todo_action)
if action:
return await action.run(self.rc.todo)
return Message(content="No action to execute")
Role类实现了智能体的核心行为模式:观察-思考-行动
产品经理是软件开发流程的起点,负责将用户需求转化为产品需求文档
class ProductManager(RoleZero):
"""产品经理:负责产品开发和管理的角色"""
name: str = "Alice"
profile: str = "Product Manager"
goal: str = "Create a Product Requirement Document or market research/competitive product research."
constraints: str = "utilize the same language as the user requirements for seamless communication"
tools: list[str] = ["RoleZero", "Browser", "Editor", "SearchEnhancedQA"]
todo_action: str = "WritePRD"
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
if self.use_fixed_sop:
self.enable_memory = False
self.set_actions([PrepareDocuments(send_to=self), WritePRD])
self._watch([UserRequirement, PrepareDocuments])
self.rc.react_mode = RoleReactMode.BY_ORDER
ProductManager支持固定SOP模式,按照预定顺序执行动作
架构师是技术决策的核心,确保系统的可扩展性和可维护性
class Architect(Role):
"""架构师:负责系统架构设计的角色"""
name: str = "Bob"
profile: str = "Architect"
goal: str = "Design a system architecture based on product requirements"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.set_actions([DesignArchitecture, DocumentArchitecture])
self._watch([UserRequirement, WritePRD])
async def _think(self) -> bool:
"""决定下一步行动"""
if not self.rc.todo:
# 检查是否有新的产品需求
recent_messages = self.env.get_recent_messages(n=5)
if any(msg.content_type == "user_requirement" for msg in recent_messages):
self.rc.todo = "DesignArchitecture"
return bool(self.rc.todo)
架构师通过观察产品需求文档来触发架构设计动作
工程师角色负责将设计转化为实际可运行的代码
class Engineer2(Role):
"""新一代工程师:支持多文件编程和代码审查"""
name: str = "Charlie"
profile: str = "Engineer2"
goal: str = "Write clean, maintainable, and well-tested code"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.set_actions([WriteCode, ReviewCode, RunTests])
self._watch([DesignArchitecture, WriteCode])
async def _act(self) -> Message:
"""执行代码编写动作"""
if self.rc.todo == "WriteCode":
# 根据架构设计文档编写代码
design_doc = self._get_design_document()
code_files = await self._generate_code(design_doc)
return Message(content=f"Generated code: {len(code_files)} files")
elif self.rc.todo == "ReviewCode":
# 代码审查
review_result = await self._review_code()
return Message(content=f"Code review result: {review_result}")
Engineer2支持完整的开发流程,包括编写、审查和测试
数据分析师角色扩展了MetaGPT在数据分析领域的能力
class DataInterpreter(Role):
"""数据解释器:复杂数据分析角色"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.set_actions([AnalyzeData, VisualizeData, GenerateReport])
async def run(self, instruction: str):
"""运行数据分析任务"""
# 解析用户指令
task = self._parse_instruction(instruction)
if task.type == "analysis":
result = await self._analyze_data(task.data)
elif task.type == "visualization":
result = await self._visualize_data(task.data)
return Message(content=result)
DataInterpreter支持自然语言指令驱动的数据分析
动作系统是MetaGPT执行具体任务的框架
class Action(BaseModel):
"""动作:可执行的任务单元"""
name: str = ""
context: Optional[Context] = None
async def run(self, context: Context, **kwargs) -> Message:
"""执行动作"""
raise NotImplementedError
def _get_relevant_memories(self, query: str, n: int = 5) -> list[Memory]:
"""获取相关记忆"""
# 使用向量相似度搜索相关记忆
embeddings = [self._embed_memory(mem) for mem in self.context.memories]
query_embedding = self._embed_text(query)
similarities = [self._cosine_sim(query_embedding, emb) for emb in embeddings]
# 返回最相关的记忆
top_indices = np.argsort(similarities)[-n:]
return [self.context.memories[i] for i in top_indices]
Action类提供了动作执行的基础功能,包括记忆检索和上下文管理
这些动作构成了MetaGPT的核心执行流程
class WritePRD(Action):
"""编写产品需求文档"""
name: str = "WritePRD"
async def run(self, context: Context, requirement: str) -> Message:
"""生成产品需求文档"""
# 获取相关市场信息和竞品分析
market_research = await self._conduct_market_research(requirement)
competitive_analysis = await self._analyze_competitors(requirement)
# 生成PRD内容
prd_content = self._generate_prd_content(
requirement=requirement,
market_research=market_research,
competitive_analysis=competitive_analysis
)
# 保存PRD文档
await self._save_prd(prd_content)
return Message(content=prd_content, sent_to="Architect")
WritePRD动作整合了市场调研、竞品分析和产品规划
工具系统扩展了MetaGPT的能力边界,使其能够与外部系统交互
class Browser:
"""浏览器工具:支持网页浏览和信息抓取"""
def __init__(self):
self.driver = webdriver.Chrome()
async def search(self, query: str) -> str:
"""搜索网页信息"""
self.driver.get(f"https://www.google.com/search?q={query}")
results = []
for result in self.driver.find_elements(By.CSS_SELECTOR, '.g'):
title = result.find_element(By.CSS_SELECTOR, 'h3').text
link = result.find_element(By.CSS_SELECTOR, 'a').get_attribute('href')
results.append({"title": title, "link": link})
return results
async def visit_page(self, url: str) -> str:
"""访问页面并提取内容"""
self.driver.get(url)
content = self.driver.find_element(By.CSS_SELECTOR, 'body').text
return content
Browser工具提供了强大的网页信息获取能力
记忆系统是智能体保持上下文连续性的关键
class Memory(BaseModel):
"""记忆:存储智能体的经验和知识"""
content: str
metadata: dict = {}
embedding: list[float] = []
timestamp: datetime = Field(default_factory=datetime.now)
def embed(self):
"""生成记忆的向量表示"""
if not self.embedding:
self.embedding = self._generate_embedding(self.content)
return self.embedding
@staticmethod
def search(memories: list['Memory'], query: str, n: int = 5) -> list['Memory']:
"""搜索相关记忆"""
query_embedding = EmbeddingService.embed(query)
similarities = []
for memory in memories:
mem_embedding = memory.embed()
similarity = cosine_similarity(query_embedding, mem_embedding)
similarities.append((memory, similarity))
# 按相似度排序返回前n个记忆
similarities.sort(key=lambda x: x[1], reverse=True)
return [mem for mem, sim in similarities[:n]]
Memory系统支持高效的语义搜索和记忆管理
配置系统提供了灵活的系统配置管理
llm:
api_type: "openai"
model: "gpt-4-turbo"
base_url: "https://api.openai.com/v1"
api_key: "YOUR_API_KEY"
project:
name: "my_project"
path: "/workspace/my_project"
team:
investment: 3.0
n_round: 5
code_review: true
配置文件支持模块化的配置管理
状态管理使得MetaGPT能够处理长期项目并支持中断恢复
def serialize(self, stg_path: Path = None):
"""序列化团队状态"""
stg_path = SERDESER_PATH.joinpath("team") if stg_path is None else stg_path
team_info_path = stg_path.joinpath("team.json")
serialized_data = self.model_dump()
serialized_data["context"] = self.env.context.serialize()
write_json_file(team_info_path, serialized_data)
@classmethod
def deserialize(cls, stg_path: Path, context: Context = None) -> "Team":
"""反序列化团队状态"""
team_info_path = stg_path.joinpath("team.json")
if not team_info_path.exists():
raise FileNotFoundError("team.json not exist")
team_info: dict = read_json_file(team_info_path)
ctx = context or Context()
ctx.deserialize(team_info.pop("context", None))
team = Team(**team_info, context=ctx)
return team
序列化系统支持完整的项目状态保存和恢复
SOP系统确保了开发流程的标准化和可重复性
成本管理系统确保项目的经济可行性
class CostManager:
"""成本管理器:跟踪和控制项目成本"""
def __init__(self, max_budget: float = 10.0):
self.max_budget = max_budget
self.total_cost = 0.0
self.token_usage = {
"input": 0,
"output": 0
}
def add_cost(self, cost: float):
"""添加成本"""
self.total_cost += cost
if self.total_cost > self.max_budget:
raise NoMoneyException(
self.total_cost,
f"Insufficient funds: {self.max_budget}"
)
成本管理系统提供了详细的成本跟踪和控制
消息系统是智能体间协作的基础
class Message(BaseModel):
"""消息:智能体间通信的载体"""
content: str
sent_by: str = ""
send_to: str = ""
content_type: str = "text"
metadata: dict = {}
timestamp: datetime = Field(default_factory=datetime.now)
def __str__(self):
return f"Message(from={self.sent_by}, to={self.send_to}, type={self.content_type})"
Message类提供了完整的消息管理和通信功能
文件系统管理MetaGPT生成项目的完整文件结构
class ProjectRepo:
"""项目仓库:管理项目文件结构"""
def __init__(self, project_path: str):
self.project_path = Path(project_path)
self.structure = {}
def create_structure(self, design_document: dict):
"""根据设计文档创建项目结构"""
# 创建根目录
self.project_path.mkdir(parents=True, exist_ok=True)
# 创建基本目录结构
directories = ["src", "tests", "docs", "config", "data"]
for dir_name in directories:
(self.project_path / dir_name).mkdir(exist_ok=True)
ProjectRepo提供了完整的项目文件生成和管理功能
测试系统确保生成代码的质量和可靠性
class TestGenerator:
"""测试用例生成器"""
def generate_tests(self, code_files: dict) -> dict:
"""生成测试文件"""
test_files = {}
for file_path, code in code_files.items():
if file_path.endswith(".py"):
# 生成对应的测试文件
test_code = self._generate_test_code(code, file_path)
test_files[f"tests/test_{file_path.split('/')[-1]}"] = test_code
return test_files
测试系统支持自动化的测试用例生成
部署系统将MetaGPT生成项目部署到生产环境
监控系统确保项目的稳定运行
扩展机制使MetaGPT能够适应各种不同的应用场景
MetaGPT实现了从需求到上线的完整软件开发流程
async def develop_software(idea: str, investment: float = 3.0, n_round: int = 5):
"""完整的软件开发流程"""
# 1. 初始化团队
from metagpt.config2 import config
from metagpt.context import Context
from metagpt.roles import (
TeamLeader, ProductManager, Architect, Engineer2, DataAnalyst
)
from metagpt.team import Team
config.update_via_cli()
ctx = Context(config=config)
company = Team(context=ctx)
company.hire([
TeamLeader(),
ProductManager(),
Architect(),
Engineer2(),
DataAnalyst(),
])
# 2. 投资并启动项目
company.invest(investment)
# 3. 运行开发流程
project_path = await company.run(n_round=n_round, idea=idea)
return project_path
这个示例展示了完整的MetaGPT软件开发流程
MetaGPT在数据分析领域也有强大的应用能力
合理的配置能够显著提高项目质量和效率
合适的角色配置是项目成功的关键
成本控制确保项目的经济可行性
性能优化提高系统的响应速度和效率
完善的错误处理确保系统的稳定性
恢复机制确保系统的可靠性和容错性
MetaGPT正在不断发展和完善
强大的社区和生态是MetaGPT持续发展的重要保障
丰富的学习资源帮助用户快速上手
MetaGPT代表了AI软件开发的未来方向
感谢阅读!
访问 https://atcfu.com/ai-articles/metagpt-multi-agent-framework/ 回顾本文