源码级别解析 · 源码级别解析 · 多智能体协作系统
2026-04-27 | 每日技术深度解读
CrewAI已成为企业AI自动化的新标准
超过100,000开发者通过社区课程认证
```python
# CrewAI 核心理念
• 高层次抽象 + 精细底层控制
• 智能体自主协作
• 角色扮演专业化
• 任务并行执行
• 实时监控与调试
```
为任何场景量身定制的自主AI智能体
真正的多智能体协作,而非简单链式调用
清晰的职责分离,便于维护和扩展
class Agent(BaseAgent):
"""CrewAI智能体基础类"""
def __init__(
self,
role: str,
goal: str,
backstory: str,
tools: List[Tool] = None,
llm: BaseLLM = None,
max_rpm: int = None,
verbose: bool = False,
allow_delegation: bool = False,
**kwargs
):
self.role = role
self.goal = goal
self.backstory = backstory
self.tools = tools or []
self.llm = llm
self.max_rpm = max_rpm
self.verbose = verbose
self.allow_delegation = allow_delegation
self.memory = Memory()
self.context = Context()
每个智能体都有明确的角色、目标和背景故事
class Task(BaseTask):
"""任务定义类"""
def __init__(
self,
description: str,
agent: Agent,
expected_output: str,
context: List[str] = None,
tools: List[Tool] = None,
async_execution: bool = False,
**kwargs
):
self.description = description
self.agent = agent
self.expected_output = expected_output
self.context = context or []
self.tools = tools or []
self.async_execution = async_execution
self.status = TaskStatus.TODO
self.result = None
self.start_time = None
self.end_time = None
任务可以异步执行,支持复杂工作流
class Crew(BaseCrew):
"""智能体协作管理"""
def __init__(
self,
agents: List[Agent],
tasks: List[Task],
verbose: bool = False,
process: Process = None,
**kwargs
):
self.agents = agents
self.tasks = tasks
self.verbose = verbose
self.process = process or Process.sequential
self.process_log = []
self.results = {}
def kickoff(self) -> Dict[str, Any]:
"""启动多智能体协作"""
return self.process.execute(self.agents, self.tasks)
支持顺序和并行处理模式
角色扮演使智能体更加专业化
# 定义研究员角色
researcher = Agent(
role='Senior Research Analyst',
goal='Gather comprehensive information about AI frameworks',
backstory='You are an expert researcher with 10+ years in AI technology',
tools=[SearchTool(), DocumentAnalysisTool()],
verbose=True
)
# 定义作家角色
writer = Agent(
role='Technical Writer',
goal='Create comprehensive documentation about CrewAI',
backstory='You specialize in creating technical documentation for complex systems',
tools=[DocumentGeneratorTool()],
verbose=True
)
每个角色都有明确的职责和能力
复杂业务逻辑的完整支持
# 任务依赖关系
tasks = [
Task(
description="Research latest AI frameworks",
agent=researcher,
expected_output="Comprehensive research report"
),
Task(
description="Analyze CrewAI architecture",
agent=researcher,
expected_output="Technical architecture analysis",
context=["Research latest AI frameworks"]
),
Task(
description="Create documentation based on research",
agent=writer,
expected_output="Complete technical documentation",
context=["Research latest AI frameworks", "Analyze CrewAI architecture"]
)
]
任务间通过context传递信息
工具是智能体能力的延伸
class BaseTool(BaseModel):
"""工具基础类"""
name: str
description: str
args_schema: Optional[Type[BaseModel]] = None
def run(self, **kwargs) -> str:
"""执行工具逻辑"""
raise NotImplementedError
def _run(self, **kwargs) -> str:
"""同步执行"""
return self.run(**kwargs)
def _arun(self, **kwargs) -> str:
"""异步执行"""
return asyncio.to_thread(self.run, **kwargs)
# 示例:搜索工具
class SearchTool(BaseTool):
name = "web_search"
description = "Search the web for information"
def run(self, query: str) -> str:
# 实现搜索逻辑
return search_results(query)
支持同步和异步工具执行
真正的多智能体协作,而非简单串行
def execute_collaboration(self):
"""多智能体协作执行"""
# 1. 任务分配
task_assignments = self._assign_tasks_to_agents()
# 2. 并行执行
async_results = []
for agent, tasks in task_assignments.items():
results = await asyncio.gather(*[
self._execute_task_async(agent, task)
for task in tasks
])
async_results.extend(results)
# 3. 结果聚合
final_result = self._aggregate_results(async_results)
# 4. 质量检查
self._validate_results(final_result)
return final_result
异步并行执行提高效率
根据业务需求选择合适的执行模式
class Process:
"""执行进程控制"""
@classmethod
def sequential(cls, agents: List[Agent], tasks: List[Task]) -> Dict[str, Any]:
"""顺序执行"""
results = {}
for task in tasks:
result = task.agent.execute(task)
results[task.description] = result
return results
@classmethod
def parallel(cls, agents: List[Agent], tasks: List[Task]) -> Dict[str, Any]:
"""并行执行"""
async def execute_task_async(task):
return await task.agent.aexecute(task)
async_results = asyncio.gather(*[
execute_task_async(task) for task in tasks
])
return {task.description: result async for task, result in zip(tasks, async_results)}
@classmethod
def hierarchical(cls, agents: List[Agent], tasks: List[Task]) -> Dict[str, Any]:
"""分层执行"""
# 实现分层逻辑
pass
不同的进程类型适应不同的业务场景
记忆让智能体具有连续学习能力
class Memory(BaseModel):
"""智能体记忆系统"""
def __init__(self):
self.short_term = []
self.long_term = []
self.embeddings = {}
def add_memory(self, content: str, metadata: Dict = None):
"""添加记忆"""
memory_entry = {
'content': content,
'timestamp': datetime.now(),
'metadata': metadata or {}
}
self.short_term.append(memory_entry)
# 转移到长期记忆
if len(self.short_term) > 10:
self.long_term.extend(self.short_term[:5])
self.short_term = self.short_term[5:]
# 生成嵌入
self.embeddings[content] = self._generate_embedding(content)
def retrieve_memory(self, query: str, limit: int = 5) -> List[Dict]:
"""检索相关记忆"""
query_embedding = self._generate_embedding(query)
similarities = []
for memory in self.long_term + self.short_term:
similarity = cosine_similarity(query_embedding, self.embeddings[memory['content']])
similarities.append((memory, similarity))
similarities.sort(key=lambda x: x[1], reverse=True)
return [memory for memory, _ in similarities[:limit]]
支持记忆的语义搜索
上下文是智能体协作的基础
class Context(BaseModel):
"""上下文管理系统"""
def __init__(self):
self.shared_context = {}
self.agent_context = {}
self.task_context = {}
def update_shared(self, key: str, value: Any):
"""更新共享上下文"""
self.shared_context[key] = value
def update_agent(self, agent_id: str, key: str, value: Any):
"""更新智能体上下文"""
if agent_id not in self.agent_context:
self.agent_context[agent_id] = {}
self.agent_context[agent_id][key] = value
def update_task(self, task_id: str, key: str, value: Any):
"""更新任务上下文"""
if task_id not in self.task_context:
self.task_context[task_id] = {}
self.task_context[task_id][key] = value
def get_context(self, scope: str, key: str = None) -> Any:
"""获取上下文"""
if scope == 'shared':
return self.shared_context.get(key)
elif scope == 'agent':
return self.agent_context.get(key, {})
elif scope == 'task':
return self.task_context.get(key, {})
return None
细粒度的上下文管理
完善的错误处理保证系统稳定性
class CrewAIException(Exception):
"""CrewAI基础异常"""
pass
class TaskExecutionError(CrewAIException):
"""任务执行异常"""
def __init__(self, task: Task, error: Exception):
self.task = task
self.error = error
super().__init__(f"Task '{task.description}' failed: {str(error)}")
def safe_execute_task(agent: Agent, task: Task) -> Any:
"""安全执行任务"""
try:
result = agent.execute(task)
return result
except ToolExecutionError as e:
# 工具执行错误
logging.error(f"Tool execution failed: {e}")
return {"error": "tool_execution", "message": str(e)}
except AgentCommunicationError as e:
# 智能体通信错误
logging.error(f"Agent communication failed: {e}")
return {"error": "agent_communication", "message": str(e)}
except Exception as e:
# 其他错误
logging.error(f"Unexpected error: {e}")
return {"error": "unknown", "message": str(e)}
多层错误捕获和处理
性能是CrewAI的核心优势
class PerformanceOptimizer:
"""性能优化器"""
def __init__(self):
self.cache = {}
self.batch_size = 10
def batch_execute(self, tasks: List[Task]) -> List[Any]:
"""批量执行任务"""
# 按智能体分组
agent_groups = self._group_tasks_by_agent(tasks)
# 批量执行
results = []
for agent, agent_tasks in agent_groups.items():
batch_results = self._execute_batch(agent, agent_tasks)
results.extend(batch_results)
return results
def cache_result(self, key: str, result: Any):
"""缓存结果"""
self.cache[key] = {
'result': result,
'timestamp': datetime.now()
}
def get_cached_result(self, key: str) -> Optional[Any]:
"""获取缓存结果"""
cached = self.cache.get(key)
if cached and self._is_cache_valid(cached):
return cached['result']
return None
批量操作和智能缓存提高效率
CrewAI AMP Suite提供企业级解决方案
实时监控确保系统稳定运行
class MonitoringSystem:
"""监控系统"""
def __init__(self):
self.metrics = {}
self.logger = logging.getLogger(__name__)
def log_execution(self, agent: Agent, task: Task, start_time: datetime, end_time: datetime, result: Any):
"""记录执行信息"""
duration = (end_time - start_time).total_seconds()
metric = {
'agent': agent.role,
'task': task.description,
'duration': duration,
'success': isinstance(result, dict) and 'error' not in result,
'timestamp': start_time
}
self._update_metrics(metric)
def _update_metrics(self, metric: Dict):
"""更新指标"""
key = f"{metric['agent']}_{metric['task']}"
if key not in self.metrics:
self.metrics[key] = []
self.metrics[key].append(metric)
def get_performance_report(self) -> Dict[str, Any]:
"""生成性能报告"""
report = {}
for key, metrics in self.metrics.items():
report[key] = {
'avg_duration': sum(m['duration'] for m in metrics) / len(metrics),
'success_rate': sum(1 for m in metrics if m['success']) / len(metrics),
'total_executions': len(metrics)
}
return report
详细的性能监控和分析
Crew Control Plane提供统一的管理界面
内置安全措施确保系统安全
CrewAI已在多个领域成功应用
高质量内容的生产流程
# 内容创作流程
content_crew = Crew(
agents=[
Agent(role='Researcher', goal='Research topic X', ...),
Agent(role='Writer', goal='Write comprehensive article', ...),
Agent(role='Editor', goal='Edit and polish content', ...),
Agent(role='Designer', goal='Design layout and graphics', ...)
],
tasks=[
Task(description='Research topic X', agent=researcher, ...),
Task(description='Write article draft', agent=writer, ...),
Task(description='Edit and polish', agent=editor, ...),
Task(description='Design layout', agent=designer, ...)
]
)
# 执行内容创作
result = content_crew.kickoff()
完整的内容创作工作流
端到端的数据分析流程
提高客服效率和质量
提高软件开发效率
选择适合自己需求的框架
| 特性 | CrewAI | LangChain |
|---|---|---|
| 独立实现 | ✅ 完全独立 | ❌ 依赖众多 |
| 性能 | ⚡ 高性能 | 🐢 中等 |
| 多智能体 | ✅ 专门设计 | ✅ 支持但非专注 |
| 学习曲线 | 📈 简单 | 📊 中等 |
| 企业支持 | ✅ 企业级支持 | ⚠️ 社区支持 |
CrewAI在各方面都有优势
遵循最佳实践提高效率
# 最佳实践:智能体设计
def create_optimized_agents():
"""创建优化的智能体"""
# 1. 明确定义角色
researcher = Agent(
role='Senior Research Analyst',
goal='Provide comprehensive research',
backstory='10+ years in research',
tools=[SearchTool(), AnalysisTool()],
allow_delegation=False # 明确设置
)
# 2. 优化工具选择
researcher.tools = [
OptimizedSearchTool(), # 使用优化版本
FastAnalysisTool() # 使用快速版本
]
# 3. 设置适当的参数
researcher.max_rpm = 30 # 合理的速率限制
researcher.verbose = True # 启用详细日志
return researcher
# 最佳实践:任务拆分
def create_well_defined_tasks():
"""创建定义良好的任务"""
return [
Task(
description='Research specific topic',
expected_output='Comprehensive research report',
context=['Initial requirements'] # 明确上下文
),
Task(
description='Analyze research findings',
expected_output='Detailed analysis report',
context=['Research specific topic']
)
]
优化智能体和任务设计
合适的配置提高系统性能
提前预防问题的发生
持续创新保持领先
强大的社区支持
完善的学习资源帮助快速上手
from crewai import Agent, Task, Crew, Process
# 创建智能体
researcher = Agent(
role='Research Analyst',
goal='Research AI frameworks',
backstory='Expert researcher',
tools=[SearchTool()]
)
writer = Agent(
role='Technical Writer',
goal='Create documentation',
backstory='Expert writer',
tools=[DocumentTool()]
)
# 创建任务
research_task = Task(
description='Research latest AI frameworks',
agent=researcher,
expected_output='Research report'
)
write_task = Task(
description='Write comprehensive documentation',
agent=writer,
expected_output='Complete documentation',
context=['Research latest AI frameworks']
)
# 创建团队并执行
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential
)
# 执行任务
result = crew.kickoff()
print(result)
简单的开始,强大的能力
CrewAI是多智能体协作的未来
感谢阅读!
访问 https://atcfu.com/ai-articles/crewai/ 回顾本文