源码深度解读
2026-03-29 | BabyAGI Autonomous Agent Framework
第一部分:基础概念
第二部分:核心架构
第三部分:核心实现
第四部分:进阶
BabyAGI 是一个基于任务驱动的自主智能体框架,通过函数化的方式构建能够自我构建的AI智能体。
项目定位
核心特性
┌─────────────────────────────────────────────────────────┐
│ BabyAGI │
├─────────────────────────────────────────────────────────┤
│ babyagi/__init__.py │ Flask App │ Dashboard │
├─────────────────────────────────────────────────────────┤
│ Functionz Framework │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────┐ │
│ │ Functionz Core │ │ DB Router │ │ Logger │ │
│ │ │ │ │ │ │ │
│ │ +register_func │ │ +get_function │ │ +track │ │
│ │ +execute_func │ │ +add_key │ │ +log │ │
│ │ +load_pack │ │ +get_keys │ │ │ │
│ └─────────────────┘ └─────────────────┘ └─────────┘ │
├─────────────────────────────────────────────────────────┤
│ Function Execution Engine │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ FunctionExecutor│ │ FunctionRegistrar│ │
│ │ │ │ │ │
│ │ +_resolve_deps │ │ +register_func │ │
│ │ +_create_wrapper│ │ +update_func │ │
│ │ +execute │ │ +add_function │ │
│ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────┤
│ Dashboard & API │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Web Interface │ │ REST API │ │
│ │ │ │ │ │
│ │ +Function Mgmt │ │ +POST /api │ │
│ │ +Execution Log │ │ +GET /status │ │
│ │ +Dependency Viz │ │ +PUT /update │ │
│ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────┘
BabyAGI 采用分层架构:Web层 → Framework层 → Engine层 → DB层
| 层级 | 职责 | 核心模块 |
|---|---|---|
| Web 层 | 用户接口 | Flask App, Dashboard UI |
| Framework 层 | 框架核心 | Functionz, API Blueprint |
| Engine 层 | 执行引擎 | FunctionExecutor, Registrar |
| DB 层 | 数据存储 | DBRouter, Key Management |
2023年3月:原始 BabyAGI 发布,引入任务规划作为自主智能体的构建方法
2024年9月:项目被归档,迁移到 babyagi_archive 仓库(快照版本)
最新版本:实验性的 Functionz 框架,基于函数的智能体构建方法
核心理念:通过函数框架构建能够自我构建的智能体,实现最小可构建系统
Functionz 框架
Web Dashboard
# babyagi/__init__.py
from flask import Flask, g
from .functionz.core.framework import Functionz
from .dashboard import create_dashboard
from .api import create_api_blueprint
import os
import importlib.util
import traceback
import sys
# 单例模式的 Functionz 实例
_func_instance = Functionz()
def get_func_instance():
return _func_instance
def create_app(dashboard_route='/dashboard'):
app = Flask(__name__)
# 移除前导斜杠,避免重复
if dashboard_route.startswith('/'):
dashboard_route = dashboard_route[1:]
# 创建并注册 Dashboard blueprint
dashboard_blueprint = create_dashboard(_func_instance, dashboard_route)
# 创建并注册 API blueprint
api_blueprint = create_api_blueprint()
# 注册 blueprint
app.register_blueprint(dashboard_blueprint, url_prefix=f'/{dashboard_route}')
app.register_blueprint(api_blueprint) # 挂载在 '/api'
# 配置 Dashboard 路由
app.config['DASHBOARD_ROUTE'] = dashboard_route
# 确保在请求上下文中可访问 Functionz 实例
@app.before_request
def set_functionz():
g.functionz = _func_instance
g.dashboard_route = dashboard_route
return app
基础架构采用单例模式 + Flask Blueprint 的设计,确保全局状态一致性
依赖类型:外部库依赖、函数间依赖、密钥依赖
# 函数注册时的依赖定义
@babyagi.register_function(
imports=["math", "requests"], # 外部库依赖
dependencies=["circle_area", "fetch_data"], # 函数间依赖
key_dependencies=["openai_api_key"], # 密钥依赖
metadata={
"description": "计算圆柱体积,依赖 circle_area 函数"
}
)
def cylinder_volume(radius, height):
import math
area = circle_area(radius) # 自动解析依赖
return area * height
自动解析:框架自动处理依赖的加载、执行和缓存
class FunctionExecutor:
def __init__(self, python_func):
self.python_func = python_func
def execute(self, function_name, *args, **kwargs):
start_time = datetime.now()
executed_functions = []
log_id = None
try:
# 记录执行开始
executed_functions.append(function_name)
function_version = self.python_func.db.get_function(function_name)
# 解析依赖
self._resolve_dependencies(function_version, local_scope, log_id, executed_functions)
# 执行主函数
result = exec(function_version['code'], local_scope)
return result
except Exception as e:
# 错误处理和日志记录
logger.error(f"执行函数 {function_name} 失败: {str(e)}")
raise
执行流程:依赖解析 → 函数执行 → 结果返回 → 错误处理
# babyagi/__init__.py 续
# 函数注册装饰器
def register_function(*args, **kwargs):
def wrapper(func):
try:
_func_instance.register_function(*args, **kwargs)(func)
setattr(sys.modules[__name__], func.__name__, func)
#print(f"Function '{func.__name__}' registered successfully.")
except Exception as e:
print(f"Error registering function '{func.__name__}': {e}")
traceback.print_exc()
return func
return wrapper
# 函数包加载器
def load_functions(pack_name_or_path):
#print(f"Attempting to load function pack: {pack_name_or_path}")
if os.path.exists(pack_name_or_path):
try:
spec = importlib.util.spec_from_file_location("custom_pack", pack_name_or_path)
module = importlib.util.module_from_spec(spec)
# 设置模块函数
module.func = _func_instance
spec.loader.exec_module(module)
except Exception as e:
logger.error(f"Error loading function pack {pack_name_or_path}: {str(e)}")
traceback.print_exc()
智能体应用:通过装饰器模式,将普通 Python 函数转换为 BabyAGI 管理的函数
Dashboard 功能:Web 界面管理函数、查看执行、处理依赖
函数管理
执行监控
# API 设计概览
# 函数管理
POST /api/functions # 注册新函数
PUT /api/functions/{id} # 更新函数
GET /api/functions # 获取所有函数
DELETE /api/functions/{id} # 删除函数
# 函数执行
POST /api/execute # 执行函数
GET /api/results/{id} # 获取执行结果
# 依赖管理
GET /api/dependencies # 获取依赖关系
POST /api/resolve # 解析依赖
# 触发器管理
POST /api/triggers # 添加触发器
GET /api/triggers # 获取触发器
DELETE /api/triggers/{id} # 删除触发器
# 密钥管理
POST /api/keys # 添加密钥
GET /api/keys # 获取所有密钥
DELETE /api/keys/{id} # 删除密钥
RESTful API 设计,支持完整的 CRUD 操作和特殊功能
# packages/reactivity/src/framework.py
class Functionz:
def __init__(self, db_type='local', **db_kwargs):
self.db = DBRouter(db_type, **db_kwargs) # 数据库路由
self.executor = FunctionExecutor(self) # 执行引擎
self.registrar = FunctionRegistrar(self) # 注册器
# 函数执行
def execute_function(self, function_name: str, *args, **kwargs):
return self.executor.execute(function_name, *args, **kwargs)
def __getattr__(self, name):
if self.db.get_function(name):
return lambda *args, **kwargs: self.executor.execute(name, *args, **kwargs)
raise AttributeError(f"'PythonFunc' object has no attribute '{name}'")
# 函数管理
def get_function(self, name: str):
return self.db.get_function(name)
def get_function_versions(self, name: str):
return self.db.get_function_versions(name)
def activate_function_version(self, name: str, version: int) -> None:
self.db.activate_function_version(name, version)
# 函数注册(暴露注册器方法)
def register_function(self, *args, **kwargs):
return self.registrar.register_function(*args, **kwargs)
def update_function(self, *args, **kwargs):
return self.registrar.update_function(*args, **kwargs)
def add_function(self, *args, **kwargs):
return self.registrar.add_function(*args, **kwargs)
| 接口类型 | 方法 | 功能 |
|---|---|---|
| 执行接口 | execute_function() | 执行指定函数 |
| 查询接口 | get_function(), get_all_functions() | 查询函数信息 |
| 管理接口 | register_function(), update_function() | 函数生命周期管理 |
| 版本接口 | get_function_versions(), activate_function_version() | 版本控制管理 |
# 标记接口
interface Target {
[ReactiveFlags.SKIP]?: boolean // 跳过响应式
[ReactiveFlags.IS_REACTIVE]?: boolean // 响应式标记
[ReactiveFlags.IS_READONLY]?: boolean // 只读标记
[ReactiveFlags.IS_SHALLOW]?: boolean // 浅层标记
[ReactiveFlags.RAW]?: any // 原始对象引用
}
# 函数状态枚举
enum FunctionState {
ACTIVE = 1, // 活跃状态
INACTIVE = 2, // 非活跃状态
ARCHIVED = 3, // 归档状态
}
# 执行状态枚举
enum ExecutionState {
PENDING = 0, // 等待执行
RUNNING = 1, // 执行中
COMPLETED = 2, // 执行完成
FAILED = 3, // 执行失败
}
设计特点:通过枚举和接口定义,确保类型安全和状态一致性
# 数据库抽象层,支持多种存储后端
class DBRouter:
def __init__(self, db_type='local', **db_kwargs):
self.db_type = db_type
self.db_kwargs = db_kwargs
# 根据类型初始化不同的存储后端
if db_type == 'local':
self.storage = LocalStorage(**db_kwargs)
elif db_type == 'sqlite':
self.storage = SQLiteStorage(**db_kwargs)
elif db_type == 'memory':
self.storage = MemoryStorage(**db_kwargs)
else:
raise ValueError(f"Unsupported db_type: {db_type}")
# 函数存储
def get_function(self, name: str):
return self.storage.get_function(name)
def get_function_versions(self, name: str):
return self.storage.get_function_versions(name)
def add_function(self, function_data):
return self.storage.add_function(function_data)
# 依赖管理
def get_function_imports(self, name: str):
return self.storage.get_function_imports(name)
def get_function_dependencies(self, name: str):
return self.storage.get_function_dependencies(name)
# 密钥管理
def add_secret_key(self, key_name: str, key_value: str):
return self.storage.add_secret_key(key_name, key_value)
def get_all_secret_keys(self):
return self.storage.get_all_secret_keys()
# 函数注册器,处理函数的注册、更新、版本管理
class FunctionRegistrar:
def __init__(self, python_func):
self.python_func = python_func
self.function_registry = {}
def register_function(self, *args, **kwargs):
def wrapper(func):
try:
# 创建函数元数据
function_metadata = self._create_function_metadata(func, *args, **kwargs)
# 存储函数
function_data = {
'name': func.__name__,
'code': self._get_function_code(func),
'metadata': function_metadata,
'version': 1,
'created_at': datetime.now(),
'state': FunctionState.ACTIVE
}
# 添加到注册表和数据库
self.function_registry[func.__name__] = function_data
self.python_func.db.add_function(function_data)
# 设置全局函数引用
setattr(sys.modules[__name__], func.__name__, func)
return func
except Exception as e:
logger.error(f"注册函数 {func.__name__} 失败: {str(e)}")
raise
return wrapper
def _create_function_metadata(self, func, *args, **kwargs):
metadata = {
'description': kwargs.get('metadata', {}).get('description', ''),
'imports': kwargs.get('imports', []),
'dependencies': kwargs.get('dependencies', []),
'key_dependencies': kwargs.get('key_dependencies', []),
'tags': kwargs.get('tags', []),
'author': kwargs.get('author', ''),
'category': kwargs.get('category', 'general')
}
return metadata
# 核心执行器,负责函数的实际执行和依赖解析
class FunctionExecutor:
def __init__(self, python_func):
self.python_func = python_func
def execute(self, function_name: str, *args, **kwargs):
start_time = datetime.now()
executed_functions = []
log_id = None
try:
# 获取函数数据
function_version = self.python_func.db.get_function(function_name)
if not function_version:
raise ValueError(f"Function '{function_name}' not found")
# 创建本地作用域
local_scope = {}
# 解析依赖
self._resolve_dependencies(
function_version,
local_scope,
log_id,
executed_functions
)
# 执行函数
exec(function_version['code'], local_scope)
# 获取执行结果
result = local_scope.get(function_name)(*args, **kwargs)
# 记录执行日志
self._log_execution(
function_name,
result,
start_time,
executed_functions
)
return result
except Exception as e:
# 错误处理
logger.error(f"执行函数 {function_name} 失败: {str(e)}")
raise
# 外部依赖安装
def _install_external_dependency(self, package_name: str, imp_name: str):
try:
# 尝试导入
return importlib.import_module(imp_name)
except ImportError:
# 如果不存在,自动安装
subprocess.check_call([sys.executable, "-m", "pip", "install", package_name])
return importlib.import_module(package_name)
# 依赖解析核心逻辑
def _resolve_dependencies(self, function_version: Dict[str, Any], local_scope: Dict[str, Any],
parent_log_id: Optional[int], executed_functions: List[str],
visited: Optional[set] = None) -> None:
if visited is None:
visited = set()
function_name = function_version['name']
function_imports = self.python_func.db.get_function_imports(function_name)
# 1. 解析外部库依赖
for imp in function_imports:
lib_name = imp['lib'] if imp['lib'] else imp['name']
if lib_name not in local_scope:
module = self._install_external_dependency(lib_name, imp['name'])
local_scope[imp['name']] = module
# 2. 解析函数依赖
for dep_name in function_version.get('dependencies', []):
if dep_name not in local_scope and dep_name not in visited:
visited.add(dep_name)
dep_data = self.python_func.db.get_function(dep_name)
if not dep_data:
raise ValueError(f"Dependency '{dep_name}' not found")
# 递归解析依赖
self._resolve_dependencies(dep_data, local_scope, parent_log_id, executed_functions, visited)
# 执行依赖函数并包装
exec(dep_data['code'], local_scope)
if dep_name in local_scope:
dep_func = local_scope[dep_name]
local_scope[dep_name] = self._create_function_wrapper(dep_func, dep_name, parent_log_id, executed_functions)
解析策略:深度优先搜索,循环依赖检测,自动安装依赖
# 依赖解析算法
def _resolve_dependencies(self, function_version, local_scope, log_id, executed_functions, visited=None):
visited = visited or set()
function_name = function_version['name']
# 检测循环依赖
if function_name in visited:
raise ValueError(f"检测到循环依赖: {function_name}")
# 标记已访问
visited.add(function_name)
# 1. 解析外部库依赖
for import_spec in function_version.get('imports', []):
if import_spec['name'] not in local_scope:
# 自动安装并导入
module = self._install_external_dependency(import_spec['lib'], import_spec['name'])
local_scope[import_spec['name']] = module
# 2. 递归解析函数依赖
for dep_name in function_version.get('dependencies', []):
if dep_name not in local_scope:
dep_data = self.python_func.db.get_function(dep_name)
if not dep_data:
raise ValueError(f"依赖函数 {dep_name} 不存在")
# 递归解析
self._resolve_dependencies(dep_data, local_scope, log_id, executed_functions, visited)
# 执行依赖函数并包装
exec(dep_data['code'], local_scope)
if dep_name in local_scope:
local_scope[dep_name] = self._create_function_wrapper(
local_scope[dep_name], dep_name, log_id, executed_functions
)
# 3. 解析密钥依赖
for key_dep in function_version.get('key_dependencies', []):
key_value = self.python_func.db.get_secret_key(key_dep)
local_scope[key_dep] = key_value
# 创建函数包装器,支持日志记录、错误处理、依赖传播
def _create_function_wrapper(self, func: callable, func_name: str, parent_log_id: int, executed_functions: List[str]):
def wrapper(*args, **kwargs):
return self.execute(func_name, *args, executed_functions=executed_functions, parent_log_id=parent_log_id, **kwargs)
# 保留原始函数的元数据
wrapper.__name__ = func.__name__
wrapper.__doc__ = func.__doc__
wrapper.__module__ = func.__module__
# 添加 BabyAGI 特定的属性
wrapper._babyagi_wrapper = True
wrapper._original_func = func
wrapper._func_name = func_name
return wrapper
# 包装器的优势:
# 1. 日志记录:自动记录函数调用
# 2. 错误处理:统一的错误处理机制
# 3. 依赖传播:正确传递执行上下文
# 4. 性能监控:自动记录执行时间
# 5. 调试支持:保留原始函数引用
包装器模式:在不修改原始函数的情况下,为其添加额外的功能和行为
# 支持从文件路径加载函数包
def load_function_pack(self, pack_name: str):
packs_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'packs')
pack_path = os.path.join(packs_dir, pack_name + '.py')
if not os.path.exists(pack_path):
logger.error(f"Function pack '{pack_name}' not found.")
return
self._load_module_from_path(pack_path, pack_name)
def load_functions_from_file(self, file_path: str):
if not os.path.exists(file_path):
logger.error(f"File '{file_path}' not found.")
return
module_name = os.path.splitext(os.path.basename(file_path))[0]
self._load_module_from_path(file_path, module_name)
def _load_module_from_path(self, path: str, module_name: str):
spec = importlib.util.spec_from_file_location(module_name, path)
module = importlib.util.module_from_spec(spec)
module.func = self # 注入 Functionz 实例
original_sys_path = sys.path[:]
try:
# 添加项目路径到 sys.path
babyagi_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
if babyagi_root not in sys.path:
sys.path.insert(0, babyagi_root)
# 执行模块
spec.loader.exec_module(module)
logger.info(f"Loaded module '{module_name}' from '{path}'")
except Exception as e:
logger.error(f"Error loading module '{module_name}' from '{path}': {str(e)}")
import traceback
logger.error(traceback.format_exc())
finally:
# 恢复原始 sys.path
sys.path = original_sys_path
触发器机制:当一个函数发生变化时,自动触发其他相关函数的执行
# 触发器添加
def add_trigger(self, triggered_function_name, triggering_function_name=None):
self.db.add_trigger(triggered_function_name, triggering_function_name)
# 获取函数的触发器
def get_triggers_for_function(self, function_name: str) -> List[str]:
function_data = self.get_function(function_name)
return function_data.get('triggers', [])
# 执行时自动触发
def execute_function(self, function_name: str, *args, **kwargs):
# 执行主函数
result = self.executor.execute(function_name, *args, **kwargs)
# 触发相关函数
triggers = self.get_triggers_for_function(function_name)
for trigger_name in triggers:
try:
self.execute_function(trigger_name, result)
except Exception as e:
logger.error(f"Trigger {trigger_name} failed: {str(e)}")
return result
# 综合日志记录系统,跟踪所有函数活动
class LoggingSystem:
def __init__(self):
self.logger = logging.getLogger('babyagi')
def track_function_execution(self, function_name, args, kwargs, start_time, result, end_time):
log_entry = {
'function': function_name,
'args': args,
'kwargs': kwargs,
'start_time': start_time,
'end_time': end_time,
'duration': (end_time - start_time).total_seconds(),
'result': str(result)[:1000], # 限制长度
'status': 'success'
}
# 记录到数据库
self._save_to_database(log_entry)
# 实时显示在 Dashboard
self._log_to_dashboard(log_entry)
def track_dependency_resolution(self, function_name, dependency_name, resolved_value):
log_entry = {
'type': 'dependency',
'function': function_name,
'dependency': dependency_name,
'resolved': bool(resolved_value),
'timestamp': datetime.now()
}
self._save_to_database(log_entry)
def track_error(self, function_name, error, stack_trace):
log_entry = {
'function': function_name,
'error': str(error),
'stack_trace': stack_trace,
'timestamp': datetime.now(),
'status': 'failed'
}
self._save_to_database(log_entry)
# 注册器的核心功能实现
class FunctionRegistrar:
def __init__(self, python_func):
self.python_func = python_func
self.function_registry = {}
self.version_counter = {}
def register_function(self, *args, **kwargs):
def wrapper(func):
function_name = func.__name__
# 创建函数数据
function_data = {
'name': function_name,
'code': self._extract_function_code(func),
'metadata': self._build_metadata(func, *args, **kwargs),
'version': self._get_next_version(function_name),
'created_at': datetime.now(),
'updated_at': datetime.now(),
'state': FunctionState.ACTIVE,
'triggers': kwargs.get('triggers', [])
}
# 存储到注册表
self.function_registry[function_name] = function_data
# 存储到数据库
self.python_func.db.add_function(function_data)
# 设置全局函数引用
self._set_global_function(func, function_data)
logger.info(f"Function '{function_name}' registered successfully")
return func
return wrapper
def _extract_function_code(self, func):
# 获取函数的源代码
try:
lines = inspect.getsourcelines(func)
code = ''.join(lines[0])
return code
except Exception as e:
logger.warning(f"Could not extract source code for {func.__name__}: {str(e)}")
return f"# Source code not available for {func.__name__}"
# 完整的函数注册流程
def register_function(self, *args, **kwargs):
def wrapper(func):
function_name = func.__name__
# 1. 参数验证
self._validate_registration_params(func, *args, **kwargs)
# 2. 元数据构建
metadata = self._build_function_metadata(func, *args, **kwargs)
# 3. 代码提取
source_code = self._extract_source_code(func)
# 4. 依赖解析
dependencies = self._resolve_registration_dependencies(kwargs)
# 5. 函数数据创建
function_data = {
'name': function_name,
'code': source_code,
'metadata': metadata,
'dependencies': dependencies,
'version': self._get_next_version(function_name),
'created_at': datetime.now(),
'state': FunctionState.ACTIVE
}
# 6. 存储到数据库
self.python_func.db.add_function(function_data)
# 7. 设置全局引用
setattr(sys.modules[__name__], function_name, func)
# 8. 触发注册事件
self._trigger_registration_event(function_data)
return func
return wrapper
关键步骤:参数验证 → 元数据构建 → 代码提取 → 依赖解析 → 数据存储 → 全局设置 → 事件触发
版本控制:每次更新都会创建新版本,支持回滚到历史版本
# 函数更新机制
def update_function(self, function_name: str, new_code: str, new_metadata: dict = None):
# 1. 获取当前函数
current_function = self.python_func.db.get_function(function_name)
if not current_function:
raise ValueError(f"Function '{function_name}' not found")
# 2. 创建新版本
new_version = {
'name': function_name,
'code': new_code,
'metadata': new_metadata or current_function['metadata'],
'version': current_function['version'] + 1,
'created_at': datetime.now(),
'updated_at': datetime.now(),
'state': FunctionState.ACTIVE,
'previous_version': current_function['version']
}
# 3. 存储新版本
self.python_func.db.add_function(new_version)
# 4. 标记旧版本为非活跃
current_function['state'] = FunctionState.INACTIVE
self.python_func.db.update_function(current_function)
# 5. 重新加载函数
self._reload_function(function_name, new_version)
# 6. 触发更新事件
self._trigger_update_event(function_name, new_version)
return new_version
# Dashboard 核心 - 基于 Flask Blueprint
def create_dashboard(functionz_instance, dashboard_route):
dashboard = Blueprint('dashboard', __name__, template_folder='templates', static_folder='static')
@dashboard.route('/')
def index():
return render_template('index.html',
functions=functionz_instance.get_all_functions(),
dashboard_route=dashboard_route)
@dashboard.route('/functions')
def functions_page():
return render_template('functions.html',
functions=functionz_instance.get_all_functions(),
dashboard_route=dashboard_route)
@dashboard.route('/execute')
def execute_page():
return render_template('execute.html',
functions=functionz_instance.get_all_functions(),
dashboard_route=dashboard_route)
@dashboard.route('/api/functions', methods=['GET'])
def api_get_functions():
functions = functionz_instance.get_all_functions()
return jsonify({'functions': functions})
@dashboard.route('/api/execute', methods=['POST'])
def api_execute_function():
data = request.json
function_name = data.get('function_name')
args = data.get('args', [])
kwargs = data.get('kwargs', {})
try:
result = functionz_instance.execute_function(function_name, *args, **kwargs)
return jsonify({'success': True, 'result': result})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
return dashboard
# 完整的 Flask 应用创建和配置
def create_app(dashboard_route='/dashboard'):
app = Flask(__name__,
template_folder=os.path.join(os.path.dirname(__file__), 'dashboard', 'templates'),
static_folder=os.path.join(os.path.dirname(__file__), 'dashboard', 'static'))
# 配置
app.config['SECRET_KEY'] = 'babyagi-secret-key'
app.config['DASHBOARD_ROUTE'] = dashboard_route
# 注册蓝图
dashboard_blueprint = create_dashboard(_func_instance, dashboard_route)
api_blueprint = create_api_blueprint()
app.register_blueprint(dashboard_blueprint, url_prefix=f'/{dashboard_route}')
app.register_blueprint(api_blueprint, url_prefix='/api')
# 全局上下文
@app.before_request
def set_globals():
g.functionz = _func_instance
g.dashboard_route = dashboard_route
# 错误处理
@app.errorhandler(404)
def not_found(error):
return render_template('404.html'), 404
@app.errorhandler(500)
def internal_error(error):
return render_template('500.html'), 500
return app
# 应用启动
if __name__ == "__main__":
app = create_app()
app.run(host='0.0.0.0', port=8080, debug=True)
应用特点:模块化设计、蓝图集成、错误处理、全局状态管理
RESTful 设计:遵循 REST 原则,使用标准的 HTTP 方法和状态码
设计原则
安全考虑
# 函数注册 API
POST /api/functions
Content-Type: application/json
{
"function_name": "calculate_circle_area",
"code": "import math\ndef calculate_circle_area(radius):\n return math.pi * radius ** 2",
"metadata": {
"description": "计算圆的面积",
"imports": ["math"],
"dependencies": [],
"tags": ["geometry", "math"]
}
}
Response:
{
"success": true,
"function_id": "uuid-12345",
"message": "Function registered successfully"
}
# 获取函数列表
GET /api/functions
Response:
{
"success": true,
"functions": [
{
"name": "calculate_circle_area",
"version": 1,
"metadata": {...}
}
]
}
# 函数执行 API
POST /api/execute
Content-Type: application/json
{
"function_name": "calculate_cylinder_volume",
"args": [5, 10],
"kwargs": {},
"context": {
"user_id": "user-123",
"session_id": "session-456"
}
}
Response:
{
"success": true,
"execution_id": "exec-789",
"result": 785.3981633974483,
"execution_time": 0.023,
"executed_functions": ["calculate_cylinder_volume", "calculate_circle_area"]
}
# 获取执行结果
GET /api/results/exec-789
Response:
{
"success": true,
"execution": {
"execution_id": "exec-789",
"function_name": "calculate_cylinder_volume",
"args": [5, 10],
"result": 785.3981633974483,
"start_time": "2026-03-29T16:30:00Z",
"end_time": "2026-03-29T16:30:00.023Z",
"status": "completed"
}
}
# 获取依赖关系
GET /api/dependencies/calculate_cylinder_volume
Response:
{
"success": true,
"dependencies": {
"imports": ["math"],
"functions": ["calculate_circle_area"],
"keys": ["openai_api_key"]
}
}
# 解析依赖
POST /api/resolve
Content-Type: application/json
{
"function_names": ["calculate_cylinder_volume", "calculate_circle_area"]
}
Response:
{
"success": true,
"resolution": {
"calculate_cylinder_volume": {
"status": "resolved",
"missing": [],
"required": ["math", "calculate_circle_area"]
},
"calculate_circle_area": {
"status": "resolved",
"missing": [],
"required": ["math"]
}
}
}
# 添加触发器
POST /api/triggers
Content-Type: application/json
{
"triggered_function": "generate_report",
"triggering_function": "calculate_cylinder_volume",
"conditions": {
"result_greater_than": 0
}
}
Response:
{
"success": true,
"trigger_id": "trigger-123",
"message": "Trigger created successfully"
}
# 获取触发器
GET /api/triggers
Response:
{
"success": true,
"triggers": [
{
"id": "trigger-123",
"triggered_function": "generate_report",
"triggering_function": "calculate_cylinder_volume",
"conditions": {...}
}
]
}
# 删除触发器
DELETE /api/triggers/trigger-123
Response:
{
"success": true,
"message": "Trigger deleted successfully"
}
函数式编程在 BabyAGI 中的应用,强调纯函数、不可变数据和函数组合
# 纯函数示例 - 无副作用,相同输入产生相同输出
@babyagi.register_function()
def calculate_circle_area(radius):
"""纯函数:计算圆的面积"""
import math
if radius <= 0:
raise ValueError("Radius must be positive")
return math.pi * radius ** 2
# 高阶函数 - 函数作为参数或返回值
@babyagi.register_function()
def apply_operation(numbers, operation):
"""高阶函数:对数字列表应用操作"""
return [operation(num) for num in numbers]
# 函数组合
@babyagi.register_function()
def calculate_cylinder_volume(radius, height):
"""函数组合:组合多个函数"""
import math
area = calculate_circle_area(radius) # 复用已有函数
return area * height
依赖注入通过函数参数显式声明依赖,而不是在函数内部创建依赖
# 传统方式 - 在函数内部创建依赖
@babyagi.register_function()
def process_data():
# 硬编码依赖
api_client = APIClient()
database = Database()
return api_client.fetch_and_process_data(database)
# 依赖注入方式 - 通过参数声明依赖
@babyagi.register_function(
dependencies=["api_client", "database"] # 声明依赖
)
def process_data_injected(api_client, database):
# 依赖通过参数注入,易于测试和替换
return api_client.fetch_and_process_data(database)
# 自动依赖解析
@babyagi.register_function(
dependencies=["process_data_injected"],
key_dependencies=["openai_api_key"]
)
def generate_report(data, api_key):
# 框架自动解析和注入依赖
processed_data = process_data_injected(data)
# 使用 API 密钥
return generate_summary(processed_data, api_key)
观察者模式用于函数间的通知机制,当一个函数状态变化时通知相关函数
# 观察者模式实现
class FunctionObserver:
def __init__(self):
self.observers = {}
def register_observer(self, function_name, observer_function):
if function_name not in self.observers:
self.observers[function_name] = []
self.observers[function_name].append(observer_function)
def notify_observers(self, function_name, data):
if function_name in self.observers:
for observer in self.observers[function_name]:
try:
observer(data)
except Exception as e:
logger.error(f"Observer {observer.__name__} failed: {str(e)}")
# 使用示例
def on_function_updated(function_name, new_code):
"""观察者:函数更新时的回调"""
print(f"Function {function_name} was updated")
# 触发重新加载或测试
# 注册观察者
observer = FunctionObserver()
observer.register_observer("calculate_circle_area", on_function_updated)
┌──────────────────────────────────────────────────────────┐
│ 函数注册数据流 │
├──────────────────────────────────────────────────────────┤
│ 1. 装饰器调用 │
│ @babyagi.register_function( │
│ imports=["math"], │
│ dependencies=["circle_area"] │
│ ) │
│ │
│ 2. 函数包装 │
│ └→ 创建 wrapper 函数 │
│ └→ 提取函数源代码 │
│ └→ 构建元数据 │
│ │
│ 3. 依赖检查 │
│ └→ 外部库验证 (math) │
│ └→ 函数依赖检查 (circle_area) │
│ └→ 循环依赖检测 │
│ │
│ 4. 数据存储 │
│ └→ FunctionData 对象创建 │
│ └→ 版本号分配 │
│ └→ 数据库存储 │
│ └→ 全局函数设置 │
│ │
│ 5. 事件触发 │
│ └→ 注册完成事件 │
│ └→ 观察者通知 │
│ └→ Dashboard 更新 │
└──────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────┐
│ 函数执行数据流 │
├──────────────────────────────────────────────────────────┤
│ 1. 执行请求 │
│ └→ babyagi.execute_function("func", *args, **kwargs) │
│ │
│ 2. 函数查找 │
│ └→ 数据库查询函数信息 │
│ └→ 获取函数代码和元数据 │
│ └→ 版本验证 │
│ │
│ 3. 依赖解析 │
│ └→ 外部库安装 (import math) │
│ └→ 函数依赖执行 (circle_area) │
│ └→ 递归依赖解析 │
│ └→ 本地作用域构建 │
│ │
│ 4. 函数执行 │
│ └→ exec(function_code, local_scope) │
│ └→ 函数调用 │
│ └→ 结果获取 │
│ └→ 错误处理 │
│ │
│ 5. 结果返回 │
│ └→ 执行日志记录 │
│ └→ 触发器执行 │
│ └→ 结果返回给调用者 │
│ └→ Dashboard 更新 │
└──────────────────────────────────────────────────────────┘
# 依赖解析的深度优先搜索算法
def resolve_dependencies_dfs(function_name, visited=None, path=None):
visited = visited or set()
path = path or []
# 检测循环依赖
if function_name in visited:
raise ValueError(f"循环依赖检测: {' -> '.join(path + [function_name])}")
# 标记已访问
visited.add(function_name)
path.append(function_name)
try:
# 获取函数信息
function = get_function(function_name)
# 解析外部库依赖
for import_spec in function.get('imports', []):
if import_spec['name'] not in sys.modules:
install_import(import_spec)
# 递归解析函数依赖
for dep_name in function.get('dependencies', []):
if dep_name not in executed_functions:
resolve_dependencies_dfs(dep_name, visited, path)
return path
finally:
# 回溯
visited.remove(function_name)
path.pop()
# 执行顺序示例
# calculate_cylinder_volume → calculate_circle_area → math
# 依赖树:
# └─ calculate_cylinder_volume
# └─ calculate_circle_area
# └─ math
触发机制:当一个函数执行完成或发生变化时,自动执行相关函数
# 触发器管理系统
class TriggerSystem:
def __init__(self):
self.triggers = {} # {triggered_function: [triggering_functions]}
def add_trigger(self, triggered_function, triggering_function=None):
if triggered_function not in self.triggers:
self.triggers[triggered_function] = []
if triggering_function and triggering_function not in self.triggers[triggered_function]:
self.triggers[triggered_function].append(triggering_function)
def execute_triggers(self, function_name, result):
"""当函数执行完成后触发相关函数"""
if function_name in self.triggers:
for trigger_func in self.triggers[function_name]:
try:
# 传递结果给触发函数
trigger_func(result)
except Exception as e:
logger.error(f"Trigger {trigger_func} failed: {str(e)}")
def auto_trigger_on_update(self, function_name, new_code):
"""当函数代码更新时自动触发"""
self.add_trigger("regenerate_tests", function_name)
self.add_trigger("update_documentation", function_name)
# 自动触发
self.execute_triggers(function_name, {"code": new_code})
# 使用示例
@babyagi.register_function(
triggers=["generate_tests", "update_docs"]
)
def calculate_circle_area(radius):
return math.pi * radius ** 2
缓存策略:避免重复执行相同参数的函数,提高执行效率
# 函数缓存实现
class FunctionCache:
def __init__(self):
self.cache = {} # {(function_name, args_hash): result}
self.max_size = 1000
self.hit_count = 0
self.miss_count = 0
def get(self, function_name, args, kwargs):
"""获取缓存结果"""
key = self._make_key(function_name, args, kwargs)
if key in self.cache:
self.hit_count += 1
return self.cache[key]
self.miss_count += 1
return None
def set(self, function_name, args, kwargs, result):
"""设置缓存结果"""
key = self._make_key(function_name, args, kwargs)
# LRU 淘汰策略
if len(self.cache) >= self.max_size:
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
self.cache[key] = result
def _make_key(self, function_name, args, kwargs):
"""生成缓存键"""
import hashlib
key_str = f"{function_name}:{str(args)}:{str(kwargs)}"
return hashlib.md5(key_str.encode()).hexdigest()
# 带缓存的函数执行
def execute_with_cache(self, function_name, *args, **kwargs):
# 检查缓存
cached_result = self.cache.get(function_name, args, kwargs)
if cached_result is not None:
return cached_result
# 执行函数
result = self.executor.execute(function_name, *args, **kwargs)
# 缓存结果
self.cache.set(function_name, args, kwargs, result)
return result
懒加载:只在需要时才加载和执行函数,减少启动时间和内存使用
# 懒加载函数包装器
class LazyFunctionWrapper:
def __init__(self, function_name, load_func):
self.function_name = function_name
self.load_func = load_func
self._actual_function = None
self._loaded = False
def __call__(self, *args, **kwargs):
if not self._loaded:
self._actual_function = self.load_func()
self._loaded = True
return self._actual_function(*args, **kwargs)
def is_loaded(self):
return self._loaded
# 懒加载函数注册
def register_lazy_function(self, function_name, load_path):
def lazy_wrapper(*args, **kwargs):
# 第一次调用时才加载
if not hasattr(lazy_wrapper, '_actual_function'):
# 从文件加载函数
spec = importlib.util.spec_from_file_location(function_name, load_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
lazy_wrapper._actual_function = getattr(module, function_name)
return lazy_wrapper._actual_function(*args, **kwargs)
setattr(sys.modules[__name__], function_name, lazy_wrapper)
# 使用示例
@babyagi.register_function(
lazy=True, # 标记为懒加载
load_path="/path/to/heavy_function.py"
)
def heavy_computation(data):
# 第一次调用时才加载这个函数
return process_heavy_data(data)
异步执行:支持函数的异步执行,提高并发性能和响应速度
# 异步函数执行器
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncFunctionExecutor:
def __init__(self, max_workers=4):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.loop = asyncio.get_event_loop()
async def execute_async(self, function_name, *args, **kwargs):
"""异步执行函数"""
# 在线程池中同步执行
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
self.executor,
self.executor.execute,
function_name, args, kwargs
)
return result
async def execute_batch_async(self, function_calls):
"""批量异步执行多个函数"""
tasks = []
for call in function_calls:
task = self.execute_async(*call)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def execute_chain_async(self, function_chain):
"""异步执行函数链"""
results = {}
for function_name, args, kwargs in function_chain:
result = await self.execute_async(function_name, *args, **kwargs)
results[function_name] = result
return results
# 异步函数注册
def register_async_function(self, function_name):
def async_wrapper(*args, **kwargs):
return self.async_executor.execute_async(function_name, *args, **kwargs)
return async_wrapper
函数设计
依赖管理
性能优化
安全考虑
1. 循环依赖
# ❌ 错误:循环依赖
@babyagi.register_function(dependencies=["func_b"])
def func_a():
return func_b()
@babyagi.register_function(dependencies=["func_a"])
def func_b():
return func_a() # 循环调用
2. 副作用
# ❌ 错误:函数有副作用
@babyagi.register_function()
def process_data():
global_state = modify_global() # 修改全局状态
return process(global_state)
# ✅ 正确:使用参数传递
@babyagi.register_function()
def process_data(input_data):
return process(input_data) # 不修改外部状态
# 1. 启用详细日志
import logging
logging.basicConfig(level=logging.DEBUG)
# 2. 使用调试装饰器
@babyagi.register_function(debug=True)
def debug_function():
"""启用调试模式,记录详细执行信息"""
import inspect
frame = inspect.currentframe()
logger.debug(f"Function called from: {frame.f_back.f_code.co_name}")
return result
# 3. 依赖可视化
def visualize_dependencies(function_name):
"""可视化函数依赖关系"""
dependencies = get_function_dependencies(function_name)
print(f"Function: {function_name}")
print("Dependencies:")
for dep in dependencies:
print(f" - {dep}")
# 4. 性能分析
@babyagi.register_function(profile=True)
def profiled_function():
"""性能分析装饰器"""
import time
start = time.time()
result = expensive_operation()
end = time.time()
logger.info(f"Function executed in {end - start:.3f}s")
return result
# 5. 错误处理增强
@babyagi.register_function(robust=True)
def robust_function(data):
"""增强错误处理"""
try:
return process_data(data)
except Exception as e:
logger.error(f"Function failed: {str(e)}")
return fallback_result
官方文档
相关技术
相似项目
学术研究
核心要点
架构亮点
BabyAGI 是一个创新的函数化智能体框架,通过 Functionz 实现了函数的自动管理、依赖解析和触发执行,为构建自主智能体提供了强大的基础架构。
项目地址
https://github.com/yoheinakajima/babyagi