源码深度解读
2026-04-02 | SuperAGI Framework
第一部分:基础概念
第二部分:核心模型
第三部分:核心实现
第四部分:进阶
SuperAGI是一个开发者优先的开源自主智能体框架,旨在让开发者快速构建、管理和运行有用的自主智能体。支持并发智能体执行、工具扩展、GUI界面等功能。
核心理念
主要特性
┌─────────────────────────────────────────────────────────┐
│ SuperAGI Framework │
├─────────────────────────────────────────────────────────┤
│ Agent │ AgentWorkflow │ Tools │ Configuration │
├─────────────────────────────────────────────────────────┤
│ ExecutionManager │ MemoryStorage │ EventSystem │
├─────────────────────────────────────────────────────────┤
│ Database: PostgreSQL │ Cache: Redis │ GUI: Next.js │
├─────────────────────────────────────────────────────────┤
│ VectorDB: Qdrant │ Models: OpenAI │ Tools: Extensible │
└─────────────────────────────────────────────────────────┘
SuperAGI采用微服务架构:智能体层 → 工作流层 → 执行层 → 存储层
| 层级 | 职责 | 核心模块 |
|---|---|---|
| 智能体层 | AI智能体定义 | Agent, AgentTemplate |
| 工作流层 | 任务编排 | AgentWorkflow, Task |
| 工具层 | 能力扩展 | BaseTool, ToolRegistry |
| 执行层 | 任务执行 | ExecutionManager, Celery |
| 存储层 | 数据持久化 | PostgreSQL, Redis, VectorDB |
2023 v0.1.0:初始版本,基础智能体功能,简单工具支持
2023 v0.5.0:引入工作流系统,任务编排能力,GUI界面
2024 v1.0.0:多智能体协作,向量数据库集成,生产级部署
关键突破:从单一智能体发展到多智能体协作系统,支持复杂业务场景
| 特性 | SuperAGI | AutoGen | LangChain |
|---|---|---|---|
| 部署方式 | 自托管/Docker | 云服务 | 库集成 |
| 多智能体 | ✅ 原生支持 | ✅ 原生支持 | ⚠️ 需扩展 |
| 工具系统 | ✅ 可扩展工具 | ✅ 丰富工具 | ✅ 生态工具 |
| 持久化 | ✅ 完整存储 | ⚠️ 基础存储 | ❌ 无持久化 |
| GUI界面 | ✅ 原生支持 | ❌ 无GUI | ❌ 无GUI |
智能体核心
工具与扩展
# superagi/models/agent.py
class Agent(DBBaseModel):
"""Represents an agent entity."""
__tablename__ = 'agents'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String)
project_id = Column(Integer)
description = Column(String)
agent_workflow_id = Column(Integer)
is_deleted = Column(Boolean, default=False)
def fetch_configuration(cls, session, agent_id: int):
"""Fetches the configuration of an agent."""
agent = session.query(Agent).filter_by(id=agent_id).first()
# 获取智能体配置逻辑
Agent模型是SuperAGI的核心,定义了智能体的基本属性和行为
# Agent Workflow 实现
class AgentWorkflow:
"""智能体工作流编排"""
def __init__(self, name, steps=None):
self.name = name
self.steps = steps or []
self.current_step = 0
self.status = "created"
def execute_step(self, step, context):
"""执行单个工作流步骤"""
try:
# 解析和执行工具调用
if hasattr(step, 'tool_calls'):
for tool_call in step.tool_calls:
result = self.execute_tool(tool_call, context)
context.update(result)
# 更新步骤状态
step.status = "completed"
self.current_step += 1
except Exception as e:
step.status = "failed"
raise e
Agent Workflow 实现智能体的任务编排和步骤执行逻辑
# superagi/tools/base_tool.py
class BaseTool:
"""工具基类"""
def __init__(self, name, description):
self.name = name
self.description = description
self.schema = self._create_schema()
def _create_schema(self):
"""创建工具参数模式"""
return create_model(
f"{self.name}Input",
**self._get_parameters()
)
def execute(self, **kwargs):
"""执行工具逻辑"""
raise NotImplementedError
async def aexecute(self, **kwargs):
"""异步执行工具逻辑"""
raise NotImplementedError
工具系统特点:基于Pydantic的参数验证,支持同步/异步执行,可扩展的工具生态
# 执行管理器架构
class ExecutionManager:
"""任务执行管理器"""
def __init__(self):
self.celery_app = Celery('superagi')
self.task_queue = asyncio.Queue()
self.active_tasks = {}
async def execute_workflow(self, workflow):
"""执行工作流"""
try:
for step in workflow.steps:
task = await self._execute_step(step)
self.active_tasks[step.id] = task
await task
except Exception as e:
await self._handle_error(e)
async def _execute_step(self, step):
"""执行单个步骤"""
return asyncio.create_task(self._process_step(step))
执行管理器负责协调任务的执行,支持并发和错误处理
短期存储
长期存储
存储策略:采用多级存储架构,保证数据持久化的同时提供高性能访问
# 工具基类设计
from abc import abstractmethod
from typing import Dict, Any, Optional
class BaseTool(BaseModel):
"""工具基类"""
name: str
description: str
config: Optional[Dict[str, Any]] = None
@abstractmethod
def execute(self, **kwargs) -> Dict[str, Any]:
"""执行工具"""
pass
@abstractmethod
def validate_input(self, **kwargs) -> bool:
"""验证输入参数"""
pass
def before_execute(self, **kwargs):
"""执行前处理"""
pass
def after_execute(self, result: Dict[str, Any]):
"""执行后处理"""
pass
工具基类定义了标准化的工具接口,支持自定义验证和生命周期管理
# 事件系统设计
class EventSystem:
"""事件驱动系统"""
def __init__(self):
self.event_handlers = defaultdict(list)
self.event_queue = asyncio.Queue()
def register_handler(self, event_type, handler):
"""注册事件处理器"""
self.event_handlers[event_type].append(handler)
async def emit(self, event_type, data):
"""触发事件"""
event = Event(event_type, data)
await self.event_queue.put(event)
async def process_events(self):
"""处理事件队列"""
while True:
event = await self.event_queue.get()
for handler in self.event_handlers[event.type]:
await handler(event)
事件机制:支持异步事件处理,提供松耦合的系统架构
# docker-compose.yaml
version: '3.8'
services:
backend:
build: .
depends_on:
- super__redis
- super__postgres
command: ["/app/entrypoint.sh"]
celery:
build: .
depends_on:
- super__redis
- super__postgres
command: ["/app/entrypoint_celery.sh"]
gui:
build:
context: ./gui
networks:
- super_network
super__redis:
image: "redis/redis-stack-server:latest"
super__postgres:
image: "docker.io/library/postgres:15"
Docker Compose实现的多服务部署,支持扩展和高可用性
PostgreSQL 数据库
Redis 缓存
数据一致性:采用事务管理,保证数据完整性和一致性
# 缓存策略
class CacheManager:
"""缓存管理器"""
def __init__(self):
self.redis = RedisClient()
self.local_cache = {}
self.cache_policy = {}
async def get(self, key):
"""获取缓存"""
# 1. 检查本地缓存
if key in self.local_cache:
return self.local_cache[key]
# 2. 检查Redis缓存
value = await self.redis.get(key)
if value:
self.local_cache[key] = value
return value
return None
async def set(self, key, value, ttl=None):
"""设置缓存"""
await self.redis.setex(key, ttl or 3600, value)
self.local_cache[key] = value
多级缓存架构,支持本地缓存和分布式缓存,提高系统性能
前端技术栈
核心功能
设计理念:直观易用的用户界面,降低智能体开发门槛
# 多智能体协作架构
class MultiAgentSystem:
"""多智能体协作系统"""
def __init__(self):
self.agents = {}
self.coordination = AgentCoordination()
self.communication = MessageBus()
async def orchestrate_agents(self, task):
"""编排多个智能体执行任务"""
agents = self._select_agents(task)
workflow = self._create_collaboration_workflow(agents)
for step in workflow.steps:
agent = self.agents[step.agent_id]
await agent.execute(step, task.context)
async def communicate_agents(self, from_agent, to_agent, message):
"""智能体间通信"""
await self.communication.send(
from_agent=from_agent,
to_agent=to_agent,
message=message
)
支持复杂的智能体协作,提供消息传递和任务编排能力
计算优化
存储优化
监控指标:响应时间、吞吐量、错误率、资源利用率
智能体设计原则
工具开发规范
电商客户服务系统
数据分析平台
SuperAGI 核心优势
未来发展方向
🤖 SuperAGI - 让AI智能体开发变得简单