源码级别解析 · 源码解析 · 架构设计 · 实战应用
2026-04-18 | 每日技术深度解读
CrewAI 是一个从零开始构建的框架,专注于多智能体协作
这三个要素共同塑造了智能体的个性和行为模式
智能体的核心架构和主要方法
from crewai import Agent
from crewai_tools import SerperDevTool
# 创建研究智能体
researcher = Agent(
role="AI技术研究员",
goal="研究最新的AI技术发展",
backstory="你是一位经验丰富的研究员,擅长发现和分析AI领域的最新动态",
tools=[SerperDevTool()],
verbose=True,
allow_delegation=False,
max_iter=20
)
# 创建分析智能体
analyst = Agent(
role="数据分析专家",
goal="基于研究结果创建详细的分析报告",
backstory="你是一位细致的分析师,擅长将复杂数据转化为清晰易懂的报告",
verbose=True,
allow_delegation=True,
max_iter=15
)
智能体的基本创建方法和配置参数
多智能体协作的任务执行流程
from crewai import Crew, Process
from crewai import Agent
# 创建研究团队
research_crew = Crew(
agents=[researcher, analyst],
tasks=[
research_task,
analysis_task
],
process=Process.sequential, # 顺序执行
verbose=True,
memory=True
)
# 创建游戏开发团队
game_crew = Crew(
agents=[
game_designer,
programmer,
tester,
reviewer
],
tasks=[
design_task,
coding_task,
testing_task,
review_task
],
process=Process.hierarchical, # 层次化执行
verbose=True
)
智能体团队的不同创建方式和流程配置
from crewai.flow import Flow, listen, start, router
from crewai import Crew, Agent, Task
from pydantic import BaseModel
class MarketState(BaseModel):
sentiment: str = "neutral"
confidence: float = 0.0
recommendations: list = []
class AnalysisFlow(Flow[MarketState]):
@start()
def fetch_market_data(self):
self.state.sentiment = "analyzing"
return {"sector": "tech", "timeframe": "1W"}
@listen(fetch_market_data)
def analyze_with_crew(self, market_data):
# 使用智能体团队进行分析
analyst_crew = Crew(
agents=[market_analyst],
tasks=[analysis_task]
)
return analyst_crew.kickoff(inputs=market_data)
@router(analyze_with_crew)
def determine_next_steps(self):
if self.state.confidence > 0.8:
return "high_confidence"
elif self.state.confidence > 0.5:
return "medium_confidence"
return "low_confidence"
@listen("high_confidence")
def execute_strategy(self):
# 执行策略
strategy_crew = Crew(
agents=[strategy_expert],
tasks=[strategy_task]
)
return strategy_crew.kickoff()
Flows 的高级功能和状态管理
CrewAI Game Builder Crew 的实际应用
# 游戏设计智能体
game_designer = Agent(
role="游戏设计师",
goal="设计有趣且平衡的游戏概念",
backstory="你是一位经验丰富的游戏设计师,擅长创造引人入胜的游戏体验",
tools=[GameDesignTools()],
verbose=True,
allow_delegation=True
)
# 程序员智能体
programmer = Agent(
role="Python程序员",
goal="将游戏设计转化为可执行的Python代码",
backstory="你是一位专业的Python游戏开发者,精通Pygame等游戏开发库",
tools=[CodeTools()],
verbose=True,
allow_delegation=False
)
# 测试员智能体
tester = Agent(
role="游戏测试员",
goal="测试游戏功能并报告问题",
backstory="你是一位细心的游戏测试员,擅长发现游戏中的问题和改进点",
verbose=True,
allow_delegation=False
)
# 评审员智能体
reviewer = Agent(
role="代码评审员",
goal="审查代码质量并提供优化建议",
backstory="你是一位资深的代码评审专家,注重代码质量和性能优化",
verbose=True,
allow_delegation=False
)
游戏开发中各智能体的角色配置
Screenplay Writer 的创意应用
## Keith:
Robert, I don't understand. You're opposed to the death penalty because of some misplaced sense of compassion for criminals?
## Robert:
No, Keith. It's about fairness and justice. We can't just take a life because someone has taken another.
## Keith:
But what about the families of the victims? Don't they deserve justice?
## Robert:
Of course they do, but the death penalty doesn't bring them back. It just perpetuates a cycle of violence.
新闻组对话转化为剧本格式的示例
营销策略制定的多智能体协作
招聘流程的智能体自动化
个性化旅行计划的智能体协作
class Agent:
def __init__(
self,
role: str,
goal: str,
backstory: str,
llm: Optional[LLM] = None,
tools: Optional[List[BaseTool]] = None,
allow_delegation: bool = False,
max_iter: int = 20,
verbose: bool = False,
memory: bool = True,
**kwargs
):
self.role = role
self.goal = goal
self.backstory = backstory
self.llm = llm or OpenAI()
self.tools = tools or []
self.allow_delegation = allow_delegation
self.max_iter = max_iter
self.verbose = verbose
self.memory = memory
def kickoff(self, task: Task) -> str:
"""执行任务的核心方法"""
# 构建提示
prompt = self._build_prompt(task)
# 调用LLM
response = self.llm.generate(prompt)
# 处理响应
return self._process_response(response)
def _build_prompt(self, task: Task) -> str:
"""构建智能体提示"""
prompt_template = f"""
Role: {self.role}
Goal: {self.goal}
Backstory: {self.backstory}
Task: {task.description}
Expected Output: {task.expected_output}
"""
return prompt_template
Agent类的核心实现逻辑
class Task:
def __init__(
self,
description: str,
expected_output: str,
agent: Agent,
context: Optional[str] = None,
callback: Optional[Callable] = None
):
self.description = description
self.expected_output = expected_output
self.agent = agent
self.context = context
self.callback = callback
def execute(self) -> str:
"""执行任务"""
try:
# 添加上下文
if self.context:
self.description += f"\n\nContext: {self.context}"
# 执行任务
result = self.agent.kickoff(self)
# 执行回调
if self.callback:
self.callback(result)
return result
except Exception as e:
# 错误处理和重试
return self._handle_error(e)
def _handle_error(self, error: Exception) -> str:
"""处理执行错误"""
if isinstance(error, MaxRetriesExceeded):
return f"Task failed after max retries: {error}"
else:
return f"Task execution failed: {error}"
Task类的执行流程和错误处理
class Agent:
def delegate_task(self, task: Task, target_agent: Agent) -> str:
"""委派任务给其他智能体"""
if not self.allow_delegation:
raise DelegationNotAllowedError("Agent is not allowed to delegate")
# 构建委派消息
delegation_message = f"""
I need your help with the following task:
Task: {task.description}
Expected Output: {task.expected_output}
Context: {task.context}
Please provide your expert assistance.
"""
# 发送委派请求
response = target_agent.kickoff(delegation_message)
# 记录委派历史
self._record_delegation(task, target_agent, response)
return response
def _record_delegation(self, task: Task, target_agent: Agent, response: str):
"""记录委派历史"""
delegation_record = {
'task': task.description,
'target_agent': target_agent.role,
'response': response,
'timestamp': datetime.now()
}
self.delegation_history.append(delegation_record)
智能体间的任务委派机制
class Memory:
def __init__(self, max_size: int = 1000):
self.short_term_memory = []
self.long_term_memory = []
self.max_size = max_size
def add_context(self, context: str, metadata: Optional[dict] = None):
"""添加上下文信息"""
memory_entry = {
'content': context,
'timestamp': datetime.now(),
'metadata': metadata or {}
}
# 添加到短期记忆
self.short_term_memory.append(memory_entry)
# 如果超过大小限制,移除最早的条目
if len(self.short_term_memory) > self.max_size:
self.short_term_memory.pop(0)
# 如果重要信息,添加到长期记忆
if self._is_important(context):
self.long_term_memory.append(memory_entry)
def retrieve_context(self, query: str, limit: int = 5) -> List[str]:
"""检索相关上下文"""
# 使用相似度检索
relevant_memories = self._search_similar(query)
# 返回最新的相关记忆
return [m['content'] for m in relevant_memories[:limit]]
def _is_important(self, context: str) -> bool:
"""判断信息是否重要"""
# 基于关键词和频率判断重要性
important_keywords = ['critical', 'important', 'decision', 'result']
return any(keyword in context.lower() for keyword in important_keywords)
记忆系统的核心实现
class ToolRegistry:
def __init__(self):
self.tools = {}
self.tool_cache = {}
def register_tool(self, name: str, tool: BaseTool):
"""注册工具"""
self.tools[name] = tool
def get_tool(self, name: str) -> BaseTool:
"""获取工具"""
if name not in self.tools:
raise ToolNotFoundError(f"Tool '{name}' not found")
return self.tools[name]
def use_tool(self, name: str, inputs: dict) -> str:
"""使用工具"""
# 检查缓存
cache_key = f"{name}_{hash(str(inputs))}"
if cache_key in self.tool_cache:
return self.tool_cache[cache_key]
# 调用工具
tool = self.get_tool(name)
result = tool.run(inputs)
# 缓存结果
self.tool_cache[cache_key] = result
return result
class Agent:
def use_tool(self, tool_name: str, inputs: dict) -> str:
"""使用指定工具"""
return self.tool_registry.use_tool(tool_name, inputs)
工具注册和使用系统的实现
class TaskExecutor:
def __init__(self, max_retries: int = 3, backoff_factor: float = 2.0):
self.max_retries = max_retries
self.backoff_factor = backoff_factor
def execute_with_retry(self, task: Task) -> str:
"""带重试的任务执行"""
last_error = None
for attempt in range(self.max_retries):
try:
# 执行任务
result = task.execute()
return result
except Exception as e:
last_error = e
self._log_error(e, attempt)
# 计算重试延迟
delay = self._calculate_retry_delay(attempt)
time.sleep(delay)
# 所有重试都失败
raise MaxRetriesExceededError(
f"Task failed after {self.max_retries} retries. Last error: {last_error}"
)
def _calculate_retry_delay(self, attempt: int) -> float:
"""计算重试延迟"""
return self.backoff_factor ** attempt
def _log_error(self, error: Exception, attempt: int):
"""记录错误日志"""
logger.error(f"Task execution failed (attempt {attempt + 1}): {error}")
任务执行的重试和错误处理机制
import asyncio
from concurrent.futures import ThreadPoolExecutor
class ParallelExecutor:
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
self.executor = ThreadPoolExecutor(max_workers=max_workers)
async def execute_tasks_parallel(self, tasks: List[Task]) -> List[str]:
"""并行执行多个任务"""
# 创建异步任务
async_tasks = [
self._execute_task_async(task) for task in tasks
]
# 并行执行
results = await asyncio.gather(*async_tasks, return_exceptions=True)
# 处理结果
return self._process_results(results)
async def _execute_task_async(self, task: Task) -> str:
"""异步执行单个任务"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
task.execute
)
def _process_results(self, results: List) -> List[str]:
"""处理并行执行结果"""
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append(f"Task {i} failed: {result}")
else:
processed_results.append(result)
return processed_results
并行执行和异步处理的实现
class CacheSystem:
def __init__(self, max_size: int = 1000):
self.cache = {}
self.max_size = max_size
self.access_count = {}
def get(self, key: str) -> Optional[str]:
"""获取缓存值"""
if key in self.cache:
# 更新访问计数
self.access_count[key] = self.access_count.get(key, 0) + 1
return self.cache[key]
return None
def set(self, key: str, value: str):
"""设置缓存值"""
# 如果缓存已满,移除最少使用的项
if len(self.cache) >= self.max_size:
self._evict_least_used()
self.cache[key] = value
self.access_count[key] = 1
def _evict_least_used(self):
"""移除最少使用的缓存项"""
if not self.access_count:
return
# 找到最少使用的键
least_used_key = min(
self.access_count.keys(),
key=lambda k: self.access_count[k]
)
# 移除缓存
del self.cache[least_used_key]
del self.access_count[least_used_key]
def clear(self):
"""清空缓存"""
self.cache.clear()
self.access_count.clear()
class TaskCache:
"""任务专用缓存"""
def __init__(self):
self.cache = CacheSystem()
def execute_cached(self, task: Task) -> str:
"""执行带缓存的任务"""
# 生成缓存键
cache_key = self._generate_cache_key(task)
# 检查缓存
cached_result = self.cache.get(cache_key)
if cached_result:
return cached_result
# 执行任务
result = task.execute()
# 缓存结果
self.cache.set(cache_key, result)
return result
缓存系统的核心实现
感谢阅读!
访问 https://atcfu.com/ai-articles/crewai-agent-roleplay/ 回顾本文