🤖 BabyAGI 任务驱动智能体

基于 Functionz 的自主智能体架构源码深度解析

源码深度解读
2026-03-29 | BabyAGI Autonomous Agent Framework

📑 目录

第一部分:基础概念

  • BabyAGI 项目简介
  • 核心架构设计
  • 项目演进历史

第二部分:核心架构

  • Functionz 框架核心
  • Flask Web 应用
  • Dashboard 系统
  • API 接口设计

第三部分:核心实现

  • 关键类与接口
  • 函数执行引擎
  • 依赖管理系统
  • 触发器机制

第四部分:进阶

  • 设计模式应用
  • 数据流与时序图
  • 性能优化策略
  • 最佳实践与调试

🤖 BabyAGI 项目简介

BabyAGI 是一个基于任务驱动的自主智能体框架,通过函数化的方式构建能够自我构建的AI智能体。

项目定位

  • 实验性的自主智能体框架
  • 任务驱动的AI工作流
  • 函数式编程范式
  • Web Dashboard 管理

核心特性

  • Functionz 函数管理框架
  • 依赖自动解析
  • 触发器自动执行
  • Web 可视化管理

🏗️ 核心架构

┌─────────────────────────────────────────────────────────┐
│                    BabyAGI                              │
├─────────────────────────────────────────────────────────┤
│  babyagi/__init__.py  │  Flask App  │  Dashboard      │
├─────────────────────────────────────────────────────────┤
│  Functionz Framework                                      │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────┐ │
│  │ Functionz Core  │  │ DB Router       │  │ Logger  │ │
│  │                 │  │                 │  │         │ │
│  │ +register_func  │  │ +get_function   │  │ +track  │ │
│  │ +execute_func   │  │ +add_key        │  │ +log    │ │
│  │ +load_pack      │  │ +get_keys       │  │         │ │
│  └─────────────────┘  └─────────────────┘  └─────────┘ │
├─────────────────────────────────────────────────────────┤
│  Function Execution Engine                                │
│  ┌─────────────────┐  ┌─────────────────┐                │
│  │ FunctionExecutor│  │ FunctionRegistrar│                │
│  │                 │  │                 │                │
│  │ +_resolve_deps  │  │ +register_func  │                │
│  │ +_create_wrapper│  │ +update_func    │                │
│  │ +execute        │  │ +add_function   │                │
│  └─────────────────┘  └─────────────────┘                │
├─────────────────────────────────────────────────────────┤
│  Dashboard & API                                         │
│  ┌─────────────────┐  ┌─────────────────┐                │
│  │ Web Interface   │  │ REST API        │                │
│  │                 │  │                 │                │
│  │ +Function Mgmt  │  │ +POST /api      │                │
│  │ +Execution Log  │  │ +GET /status    │                │
│  │ +Dependency Viz │  │ +PUT /update    │                │
│  └─────────────────┘  └─────────────────┘                │
└─────────────────────────────────────────────────────────┘

BabyAGI 采用分层架构:Web层 → Framework层 → Engine层 → DB层

📊 架构分层详解

层级 职责 核心模块
Web 层 用户接口 Flask App, Dashboard UI
Framework 层 框架核心 Functionz, API Blueprint
Engine 层 执行引擎 FunctionExecutor, Registrar
DB 层 数据存储 DBRouter, Key Management

📜 BabyAGI 演进历史

2023年3月:原始 BabyAGI 发布,引入任务规划作为自主智能体的构建方法

2024年9月:项目被归档,迁移到 babyagi_archive 仓库(快照版本)

最新版本:实验性的 Functionz 框架,基于函数的智能体构建方法

核心理念:通过函数框架构建能够自我构建的智能体,实现最小可构建系统

🎯 核心概念总览

Functionz 框架

  • 函数注册 - @babyagi.register_function()
  • 依赖管理 - 自动解析依赖关系
  • 版本控制 - 函数版本管理
  • 触发器 - 函数间自动触发

Web Dashboard

  • 函数管理 - 可视化函数管理
  • 执行日志 - 完整执行记录
  • 密钥管理 - 安全的密钥存储
  • 依赖可视化 - 依赖关系图

🔧 基础架构设计

# babyagi/__init__.py

from flask import Flask, g
from .functionz.core.framework import Functionz
from .dashboard import create_dashboard
from .api import create_api_blueprint
import os
import importlib.util
import traceback
import sys

# 单例模式的 Functionz 实例
_func_instance = Functionz()

def get_func_instance():
    return _func_instance

def create_app(dashboard_route='/dashboard'):
    app = Flask(__name__)

    # 移除前导斜杠,避免重复
    if dashboard_route.startswith('/'):
        dashboard_route = dashboard_route[1:]

    # 创建并注册 Dashboard blueprint
    dashboard_blueprint = create_dashboard(_func_instance, dashboard_route)

    # 创建并注册 API blueprint  
    api_blueprint = create_api_blueprint()

    # 注册 blueprint
    app.register_blueprint(dashboard_blueprint, url_prefix=f'/{dashboard_route}')
    app.register_blueprint(api_blueprint)  # 挂载在 '/api'

    # 配置 Dashboard 路由
    app.config['DASHBOARD_ROUTE'] = dashboard_route

    # 确保在请求上下文中可访问 Functionz 实例
    @app.before_request
    def set_functionz():
        g.functionz = _func_instance
        g.dashboard_route = dashboard_route

    return app

基础架构采用单例模式 + Flask Blueprint 的设计,确保全局状态一致性

🔗 依赖管理系统

依赖类型:外部库依赖、函数间依赖、密钥依赖

# 函数注册时的依赖定义
@babyagi.register_function(
    imports=["math", "requests"],          # 外部库依赖
    dependencies=["circle_area", "fetch_data"],  # 函数间依赖  
    key_dependencies=["openai_api_key"],   # 密钥依赖
    metadata={
        "description": "计算圆柱体积,依赖 circle_area 函数"
    }
)
def cylinder_volume(radius, height):
    import math
    area = circle_area(radius)  # 自动解析依赖
    return area * height

自动解析:框架自动处理依赖的加载、执行和缓存

⚡ Function 执行引擎

class FunctionExecutor:
    def __init__(self, python_func):
        self.python_func = python_func

    def execute(self, function_name, *args, **kwargs):
        start_time = datetime.now()
        executed_functions = []
        log_id = None
        
        try:
            # 记录执行开始
            executed_functions.append(function_name)
            function_version = self.python_func.db.get_function(function_name)
            
            # 解析依赖
            self._resolve_dependencies(function_version, local_scope, log_id, executed_functions)
            
            # 执行主函数
            result = exec(function_version['code'], local_scope)
            return result
            
        except Exception as e:
            # 错误处理和日志记录
            logger.error(f"执行函数 {function_name} 失败: {str(e)}")
            raise

执行流程:依赖解析 → 函数执行 → 结果返回 → 错误处理

🌐 Flask Web 应用

# babyagi/__init__.py 续

# 函数注册装饰器
def register_function(*args, **kwargs):
    def wrapper(func):
        try:
            _func_instance.register_function(*args, **kwargs)(func)
            setattr(sys.modules[__name__], func.__name__, func)
            #print(f"Function '{func.__name__}' registered successfully.")
        except Exception as e:
            print(f"Error registering function '{func.__name__}': {e}")
            traceback.print_exc()
        return func
    return wrapper

# 函数包加载器
def load_functions(pack_name_or_path):
    #print(f"Attempting to load function pack: {pack_name_or_path}")
    if os.path.exists(pack_name_or_path):
        try:
            spec = importlib.util.spec_from_file_location("custom_pack", pack_name_or_path)
            module = importlib.util.module_from_spec(spec)
            
            # 设置模块函数
            module.func = _func_instance
            spec.loader.exec_module(module)
            
        except Exception as e:
            logger.error(f"Error loading function pack {pack_name_or_path}: {str(e)}")
            traceback.print_exc()

智能体应用:通过装饰器模式,将普通 Python 函数转换为 BabyAGI 管理的函数

📊 Dashboard 管理系统

Dashboard 功能:Web 界面管理函数、查看执行、处理依赖

函数管理

  • 注册新函数
  • 更新现有函数
  • 版本控制
  • 函数删除

执行监控

  • 实时执行日志
  • 依赖关系可视化
  • 性能指标
  • 错误追踪

🔌 REST API 接口

# API 设计概览

# 函数管理
POST   /api/functions       # 注册新函数
PUT    /api/functions/{id}  # 更新函数
GET    /api/functions       # 获取所有函数
DELETE /api/functions/{id} # 删除函数

# 函数执行
POST   /api/execute        # 执行函数
GET    /api/results/{id}   # 获取执行结果

# 依赖管理
GET    /api/dependencies   # 获取依赖关系
POST   /api/resolve       # 解析依赖

# 触发器管理
POST   /api/triggers       # 添加触发器
GET    /api/triggers       # 获取触发器
DELETE /api/triggers/{id} # 删除触发器

# 密钥管理
POST   /api/keys          # 添加密钥
GET    /api/keys          # 获取所有密钥
DELETE /api/keys/{id}     # 删除密钥

RESTful API 设计,支持完整的 CRUD 操作和特殊功能

🔧 Functionz 核心类

# packages/reactivity/src/framework.py

class Functionz:
    def __init__(self, db_type='local', **db_kwargs):
        self.db = DBRouter(db_type, **db_kwargs)        # 数据库路由
        self.executor = FunctionExecutor(self)           # 执行引擎
        self.registrar = FunctionRegistrar(self)         # 注册器

    # 函数执行
    def execute_function(self, function_name: str, *args, **kwargs):
        return self.executor.execute(function_name, *args, **kwargs)

    def __getattr__(self, name):
        if self.db.get_function(name):
            return lambda *args, **kwargs: self.executor.execute(name, *args, **kwargs)
        raise AttributeError(f"'PythonFunc' object has no attribute '{name}'")

    # 函数管理
    def get_function(self, name: str):
        return self.db.get_function(name)

    def get_function_versions(self, name: str):
        return self.db.get_function_versions(name)

    def activate_function_version(self, name: str, version: int) -> None:
        self.db.activate_function_version(name, version)

    # 函数注册(暴露注册器方法)
    def register_function(self, *args, **kwargs):
        return self.registrar.register_function(*args, **kwargs)

    def update_function(self, *args, **kwargs):
        return self.registrar.update_function(*args, **kwargs)

    def add_function(self, *args, **kwargs):
        return self.registrar.add_function(*args, **kwargs)

🎯 Functionz 接口设计

接口类型 方法 功能
执行接口 execute_function() 执行指定函数
查询接口 get_function(), get_all_functions() 查询函数信息
管理接口 register_function(), update_function() 函数生命周期管理
版本接口 get_function_versions(), activate_function_version() 版本控制管理

🚩 Functionz 枚举定义

# 标记接口
interface Target {
  [ReactiveFlags.SKIP]?: boolean      // 跳过响应式
  [ReactiveFlags.IS_REACTIVE]?: boolean // 响应式标记
  [ReactiveFlags.IS_READONLY]?: boolean // 只读标记
  [ReactiveFlags.IS_SHALLOW]?: boolean  // 浅层标记
  [ReactiveFlags.RAW]?: any            // 原始对象引用
}

# 函数状态枚举
enum FunctionState {
  ACTIVE = 1,       // 活跃状态
  INACTIVE = 2,     // 非活跃状态
  ARCHIVED = 3,     // 归档状态
}

# 执行状态枚举
enum ExecutionState {
  PENDING = 0,      // 等待执行
  RUNNING = 1,      // 执行中
  COMPLETED = 2,    // 执行完成
  FAILED = 3,       // 执行失败
}

设计特点:通过枚举和接口定义,确保类型安全和状态一致性

💾 数据库路由 (DBRouter)

# 数据库抽象层,支持多种存储后端
class DBRouter:
    def __init__(self, db_type='local', **db_kwargs):
        self.db_type = db_type
        self.db_kwargs = db_kwargs
        
        # 根据类型初始化不同的存储后端
        if db_type == 'local':
            self.storage = LocalStorage(**db_kwargs)
        elif db_type == 'sqlite':
            self.storage = SQLiteStorage(**db_kwargs)
        elif db_type == 'memory':
            self.storage = MemoryStorage(**db_kwargs)
        else:
            raise ValueError(f"Unsupported db_type: {db_type}")

    # 函数存储
    def get_function(self, name: str):
        return self.storage.get_function(name)
    
    def get_function_versions(self, name: str):
        return self.storage.get_function_versions(name)
    
    def add_function(self, function_data):
        return self.storage.add_function(function_data)

    # 依赖管理
    def get_function_imports(self, name: str):
        return self.storage.get_function_imports(name)
    
    def get_function_dependencies(self, name: str):
        return self.storage.get_function_dependencies(name)

    # 密钥管理
    def add_secret_key(self, key_name: str, key_value: str):
        return self.storage.add_secret_key(key_name, key_value)
    
    def get_all_secret_keys(self):
        return self.storage.get_all_secret_keys()

📝 Function Registrar

# 函数注册器,处理函数的注册、更新、版本管理
class FunctionRegistrar:
    def __init__(self, python_func):
        self.python_func = python_func
        self.function_registry = {}

    def register_function(self, *args, **kwargs):
        def wrapper(func):
            try:
                # 创建函数元数据
                function_metadata = self._create_function_metadata(func, *args, **kwargs)
                
                # 存储函数
                function_data = {
                    'name': func.__name__,
                    'code': self._get_function_code(func),
                    'metadata': function_metadata,
                    'version': 1,
                    'created_at': datetime.now(),
                    'state': FunctionState.ACTIVE
                }
                
                # 添加到注册表和数据库
                self.function_registry[func.__name__] = function_data
                self.python_func.db.add_function(function_data)
                
                # 设置全局函数引用
                setattr(sys.modules[__name__], func.__name__, func)
                
                return func
                
            except Exception as e:
                logger.error(f"注册函数 {func.__name__} 失败: {str(e)}")
                raise
                
        return wrapper

    def _create_function_metadata(self, func, *args, **kwargs):
        metadata = {
            'description': kwargs.get('metadata', {}).get('description', ''),
            'imports': kwargs.get('imports', []),
            'dependencies': kwargs.get('dependencies', []),
            'key_dependencies': kwargs.get('key_dependencies', []),
            'tags': kwargs.get('tags', []),
            'author': kwargs.get('author', ''),
            'category': kwargs.get('category', 'general')
        }
        return metadata

⚡ Function Executor

# 核心执行器,负责函数的实际执行和依赖解析
class FunctionExecutor:
    def __init__(self, python_func):
        self.python_func = python_func

    def execute(self, function_name: str, *args, **kwargs):
        start_time = datetime.now()
        executed_functions = []
        log_id = None
        
        try:
            # 获取函数数据
            function_version = self.python_func.db.get_function(function_name)
            if not function_version:
                raise ValueError(f"Function '{function_name}' not found")
            
            # 创建本地作用域
            local_scope = {}
            
            # 解析依赖
            self._resolve_dependencies(
                function_version, 
                local_scope, 
                log_id, 
                executed_functions
            )
            
            # 执行函数
            exec(function_version['code'], local_scope)
            
            # 获取执行结果
            result = local_scope.get(function_name)(*args, **kwargs)
            
            # 记录执行日志
            self._log_execution(
                function_name, 
                result, 
                start_time, 
                executed_functions
            )
            
            return result
            
        except Exception as e:
            # 错误处理
            logger.error(f"执行函数 {function_name} 失败: {str(e)}")
            raise

🔧 执行器核心方法

# 外部依赖安装
def _install_external_dependency(self, package_name: str, imp_name: str):
    try:
        # 尝试导入
        return importlib.import_module(imp_name)
    except ImportError:
        # 如果不存在,自动安装
        subprocess.check_call([sys.executable, "-m", "pip", "install", package_name])
        return importlib.import_module(package_name)

# 依赖解析核心逻辑
def _resolve_dependencies(self, function_version: Dict[str, Any], local_scope: Dict[str, Any],
                         parent_log_id: Optional[int], executed_functions: List[str],
                         visited: Optional[set] = None) -> None:
    if visited is None:
        visited = set()

    function_name = function_version['name']
    function_imports = self.python_func.db.get_function_imports(function_name)

    # 1. 解析外部库依赖
    for imp in function_imports:
        lib_name = imp['lib'] if imp['lib'] else imp['name']
        if lib_name not in local_scope:
            module = self._install_external_dependency(lib_name, imp['name'])
            local_scope[imp['name']] = module

    # 2. 解析函数依赖
    for dep_name in function_version.get('dependencies', []):
        if dep_name not in local_scope and dep_name not in visited:
            visited.add(dep_name)
            dep_data = self.python_func.db.get_function(dep_name)
            if not dep_data:
                raise ValueError(f"Dependency '{dep_name}' not found")
            
            # 递归解析依赖
            self._resolve_dependencies(dep_data, local_scope, parent_log_id, executed_functions, visited)
            
            # 执行依赖函数并包装
            exec(dep_data['code'], local_scope)
            if dep_name in local_scope:
                dep_func = local_scope[dep_name]
                local_scope[dep_name] = self._create_function_wrapper(dep_func, dep_name, parent_log_id, executed_functions)

🔍 依赖解析深度解析

解析策略:深度优先搜索,循环依赖检测,自动安装依赖

# 依赖解析算法
def _resolve_dependencies(self, function_version, local_scope, log_id, executed_functions, visited=None):
    visited = visited or set()
    function_name = function_version['name']
    
    # 检测循环依赖
    if function_name in visited:
        raise ValueError(f"检测到循环依赖: {function_name}")
    
    # 标记已访问
    visited.add(function_name)
    
    # 1. 解析外部库依赖
    for import_spec in function_version.get('imports', []):
        if import_spec['name'] not in local_scope:
            # 自动安装并导入
            module = self._install_external_dependency(import_spec['lib'], import_spec['name'])
            local_scope[import_spec['name']] = module
    
    # 2. 递归解析函数依赖
    for dep_name in function_version.get('dependencies', []):
        if dep_name not in local_scope:
            dep_data = self.python_func.db.get_function(dep_name)
            if not dep_data:
                raise ValueError(f"依赖函数 {dep_name} 不存在")
            
            # 递归解析
            self._resolve_dependencies(dep_data, local_scope, log_id, executed_functions, visited)
            
            # 执行依赖函数并包装
            exec(dep_data['code'], local_scope)
            if dep_name in local_scope:
                local_scope[dep_name] = self._create_function_wrapper(
                    local_scope[dep_name], dep_name, log_id, executed_functions
                )
    
    # 3. 解析密钥依赖
    for key_dep in function_version.get('key_dependencies', []):
        key_value = self.python_func.db.get_secret_key(key_dep)
        local_scope[key_dep] = key_value

🔗 函数包装器设计

# 创建函数包装器,支持日志记录、错误处理、依赖传播
def _create_function_wrapper(self, func: callable, func_name: str, parent_log_id: int, executed_functions: List[str]):
    def wrapper(*args, **kwargs):
        return self.execute(func_name, *args, executed_functions=executed_functions, parent_log_id=parent_log_id, **kwargs)
    
    # 保留原始函数的元数据
    wrapper.__name__ = func.__name__
    wrapper.__doc__ = func.__doc__
    wrapper.__module__ = func.__module__
    
    # 添加 BabyAGI 特定的属性
    wrapper._babyagi_wrapper = True
    wrapper._original_func = func
    wrapper._func_name = func_name
    
    return wrapper

# 包装器的优势:
# 1. 日志记录:自动记录函数调用
# 2. 错误处理:统一的错误处理机制
# 3. 依赖传播:正确传递执行上下文
# 4. 性能监控:自动记录执行时间
# 5. 调试支持:保留原始函数引用

包装器模式:在不修改原始函数的情况下,为其添加额外的功能和行为

📦 模块加载机制

# 支持从文件路径加载函数包
def load_function_pack(self, pack_name: str):
    packs_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'packs')
    pack_path = os.path.join(packs_dir, pack_name + '.py')
    
    if not os.path.exists(pack_path):
        logger.error(f"Function pack '{pack_name}' not found.")
        return
    
    self._load_module_from_path(pack_path, pack_name)

def load_functions_from_file(self, file_path: str):
    if not os.path.exists(file_path):
        logger.error(f"File '{file_path}' not found.")
        return
    
    module_name = os.path.splitext(os.path.basename(file_path))[0]
    self._load_module_from_path(file_path, module_name)

def _load_module_from_path(self, path: str, module_name: str):
    spec = importlib.util.spec_from_file_location(module_name, path)
    module = importlib.util.module_from_spec(spec)
    module.func = self  # 注入 Functionz 实例
    
    original_sys_path = sys.path[:]
    try:
        # 添加项目路径到 sys.path
        babyagi_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
        if babyagi_root not in sys.path:
            sys.path.insert(0, babyagi_root)
        
        # 执行模块
        spec.loader.exec_module(module)
        logger.info(f"Loaded module '{module_name}' from '{path}'")
        
    except Exception as e:
        logger.error(f"Error loading module '{module_name}' from '{path}': {str(e)}")
        import traceback
        logger.error(traceback.format_exc())
    finally:
        # 恢复原始 sys.path
        sys.path = original_sys_path

⚡ 触发器管理系统

触发器机制:当一个函数发生变化时,自动触发其他相关函数的执行

# 触发器添加
def add_trigger(self, triggered_function_name, triggering_function_name=None):
    self.db.add_trigger(triggered_function_name, triggering_function_name)

# 获取函数的触发器
def get_triggers_for_function(self, function_name: str) -> List[str]:
    function_data = self.get_function(function_name)
    return function_data.get('triggers', [])

# 执行时自动触发
def execute_function(self, function_name: str, *args, **kwargs):
    # 执行主函数
    result = self.executor.execute(function_name, *args, **kwargs)
    
    # 触发相关函数
    triggers = self.get_triggers_for_function(function_name)
    for trigger_name in triggers:
        try:
            self.execute_function(trigger_name, result)
        except Exception as e:
            logger.error(f"Trigger {trigger_name} failed: {str(e)}")
    
    return result

📝 完整日志系统

# 综合日志记录系统,跟踪所有函数活动
class LoggingSystem:
    def __init__(self):
        self.logger = logging.getLogger('babyagi')
        
    def track_function_execution(self, function_name, args, kwargs, start_time, result, end_time):
        log_entry = {
            'function': function_name,
            'args': args,
            'kwargs': kwargs,
            'start_time': start_time,
            'end_time': end_time,
            'duration': (end_time - start_time).total_seconds(),
            'result': str(result)[:1000],  # 限制长度
            'status': 'success'
        }
        
        # 记录到数据库
        self._save_to_database(log_entry)
        
        # 实时显示在 Dashboard
        self._log_to_dashboard(log_entry)
    
    def track_dependency_resolution(self, function_name, dependency_name, resolved_value):
        log_entry = {
            'type': 'dependency',
            'function': function_name,
            'dependency': dependency_name,
            'resolved': bool(resolved_value),
            'timestamp': datetime.now()
        }
        
        self._save_to_database(log_entry)
    
    def track_error(self, function_name, error, stack_trace):
        log_entry = {
            'function': function_name,
            'error': str(error),
            'stack_trace': stack_trace,
            'timestamp': datetime.now(),
            'status': 'failed'
        }
        
        self._save_to_database(log_entry)

📝 注册器核心实现

# 注册器的核心功能实现
class FunctionRegistrar:
    def __init__(self, python_func):
        self.python_func = python_func
        self.function_registry = {}
        self.version_counter = {}

    def register_function(self, *args, **kwargs):
        def wrapper(func):
            function_name = func.__name__
            
            # 创建函数数据
            function_data = {
                'name': function_name,
                'code': self._extract_function_code(func),
                'metadata': self._build_metadata(func, *args, **kwargs),
                'version': self._get_next_version(function_name),
                'created_at': datetime.now(),
                'updated_at': datetime.now(),
                'state': FunctionState.ACTIVE,
                'triggers': kwargs.get('triggers', [])
            }
            
            # 存储到注册表
            self.function_registry[function_name] = function_data
            
            # 存储到数据库
            self.python_func.db.add_function(function_data)
            
            # 设置全局函数引用
            self._set_global_function(func, function_data)
            
            logger.info(f"Function '{function_name}' registered successfully")
            return func
            
        return wrapper

    def _extract_function_code(self, func):
        # 获取函数的源代码
        try:
            lines = inspect.getsourcelines(func)
            code = ''.join(lines[0])
            return code
        except Exception as e:
            logger.warning(f"Could not extract source code for {func.__name__}: {str(e)}")
            return f"# Source code not available for {func.__name__}"

🔄 函数注册完整流程

# 完整的函数注册流程
def register_function(self, *args, **kwargs):
    def wrapper(func):
        function_name = func.__name__
        
        # 1. 参数验证
        self._validate_registration_params(func, *args, **kwargs)
        
        # 2. 元数据构建
        metadata = self._build_function_metadata(func, *args, **kwargs)
        
        # 3. 代码提取
        source_code = self._extract_source_code(func)
        
        # 4. 依赖解析
        dependencies = self._resolve_registration_dependencies(kwargs)
        
        # 5. 函数数据创建
        function_data = {
            'name': function_name,
            'code': source_code,
            'metadata': metadata,
            'dependencies': dependencies,
            'version': self._get_next_version(function_name),
            'created_at': datetime.now(),
            'state': FunctionState.ACTIVE
        }
        
        # 6. 存储到数据库
        self.python_func.db.add_function(function_data)
        
        # 7. 设置全局引用
        setattr(sys.modules[__name__], function_name, func)
        
        # 8. 触发注册事件
        self._trigger_registration_event(function_data)
        
        return func
    return wrapper

关键步骤:参数验证 → 元数据构建 → 代码提取 → 依赖解析 → 数据存储 → 全局设置 → 事件触发

🔄 函数更新机制

版本控制:每次更新都会创建新版本,支持回滚到历史版本

# 函数更新机制
def update_function(self, function_name: str, new_code: str, new_metadata: dict = None):
    # 1. 获取当前函数
    current_function = self.python_func.db.get_function(function_name)
    if not current_function:
        raise ValueError(f"Function '{function_name}' not found")
    
    # 2. 创建新版本
    new_version = {
        'name': function_name,
        'code': new_code,
        'metadata': new_metadata or current_function['metadata'],
        'version': current_function['version'] + 1,
        'created_at': datetime.now(),
        'updated_at': datetime.now(),
        'state': FunctionState.ACTIVE,
        'previous_version': current_function['version']
    }
    
    # 3. 存储新版本
    self.python_func.db.add_function(new_version)
    
    # 4. 标记旧版本为非活跃
    current_function['state'] = FunctionState.INACTIVE
    self.python_func.db.update_function(current_function)
    
    # 5. 重新加载函数
    self._reload_function(function_name, new_version)
    
    # 6. 触发更新事件
    self._trigger_update_event(function_name, new_version)
    
    return new_version

📊 Dashboard 核心实现

# Dashboard 核心 - 基于 Flask Blueprint
def create_dashboard(functionz_instance, dashboard_route):
    dashboard = Blueprint('dashboard', __name__, template_folder='templates', static_folder='static')
    
    @dashboard.route('/')
    def index():
        return render_template('index.html', 
                             functions=functionz_instance.get_all_functions(),
                             dashboard_route=dashboard_route)
    
    @dashboard.route('/functions')
    def functions_page():
        return render_template('functions.html',
                             functions=functionz_instance.get_all_functions(),
                             dashboard_route=dashboard_route)
    
    @dashboard.route('/execute')
    def execute_page():
        return render_template('execute.html',
                             functions=functionz_instance.get_all_functions(),
                             dashboard_route=dashboard_route)
    
    @dashboard.route('/api/functions', methods=['GET'])
    def api_get_functions():
        functions = functionz_instance.get_all_functions()
        return jsonify({'functions': functions})
    
    @dashboard.route('/api/execute', methods=['POST'])
    def api_execute_function():
        data = request.json
        function_name = data.get('function_name')
        args = data.get('args', [])
        kwargs = data.get('kwargs', {})
        
        try:
            result = functionz_instance.execute_function(function_name, *args, **kwargs)
            return jsonify({'success': True, 'result': result})
        except Exception as e:
            return jsonify({'success': False, 'error': str(e)})
    
    return dashboard

🌐 Flask 应用集成

# 完整的 Flask 应用创建和配置
def create_app(dashboard_route='/dashboard'):
    app = Flask(__name__, 
                template_folder=os.path.join(os.path.dirname(__file__), 'dashboard', 'templates'),
                static_folder=os.path.join(os.path.dirname(__file__), 'dashboard', 'static'))
    
    # 配置
    app.config['SECRET_KEY'] = 'babyagi-secret-key'
    app.config['DASHBOARD_ROUTE'] = dashboard_route
    
    # 注册蓝图
    dashboard_blueprint = create_dashboard(_func_instance, dashboard_route)
    api_blueprint = create_api_blueprint()
    
    app.register_blueprint(dashboard_blueprint, url_prefix=f'/{dashboard_route}')
    app.register_blueprint(api_blueprint, url_prefix='/api')
    
    # 全局上下文
    @app.before_request
    def set_globals():
        g.functionz = _func_instance
        g.dashboard_route = dashboard_route
    
    # 错误处理
    @app.errorhandler(404)
    def not_found(error):
        return render_template('404.html'), 404
    
    @app.errorhandler(500)
    def internal_error(error):
        return render_template('500.html'), 500
    
    return app

# 应用启动
if __name__ == "__main__":
    app = create_app()
    app.run(host='0.0.0.0', port=8080, debug=True)

应用特点:模块化设计、蓝图集成、错误处理、全局状态管理

🔌 API 设计原则

RESTful 设计:遵循 REST 原则,使用标准的 HTTP 方法和状态码

设计原则

  • 统一接口
  • 无状态通信
  • 资源导向
  • 标准 HTTP 方法
  • 合适的 HTTP 状态码

安全考虑

  • 输入验证
  • 错误处理
  • 认证授权
  • 速率限制
  • 日志记录

📝 函数管理 API

# 函数注册 API
POST /api/functions
Content-Type: application/json

{
    "function_name": "calculate_circle_area",
    "code": "import math\ndef calculate_circle_area(radius):\n    return math.pi * radius ** 2",
    "metadata": {
        "description": "计算圆的面积",
        "imports": ["math"],
        "dependencies": [],
        "tags": ["geometry", "math"]
    }
}

Response:
{
    "success": true,
    "function_id": "uuid-12345",
    "message": "Function registered successfully"
}

# 获取函数列表
GET /api/functions

Response:
{
    "success": true,
    "functions": [
        {
            "name": "calculate_circle_area",
            "version": 1,
            "metadata": {...}
        }
    ]
}

⚡ 函数执行 API

# 函数执行 API
POST /api/execute
Content-Type: application/json

{
    "function_name": "calculate_cylinder_volume",
    "args": [5, 10],
    "kwargs": {},
    "context": {
        "user_id": "user-123",
        "session_id": "session-456"
    }
}

Response:
{
    "success": true,
    "execution_id": "exec-789",
    "result": 785.3981633974483,
    "execution_time": 0.023,
    "executed_functions": ["calculate_cylinder_volume", "calculate_circle_area"]
}

# 获取执行结果
GET /api/results/exec-789

Response:
{
    "success": true,
    "execution": {
        "execution_id": "exec-789",
        "function_name": "calculate_cylinder_volume",
        "args": [5, 10],
        "result": 785.3981633974483,
        "start_time": "2026-03-29T16:30:00Z",
        "end_time": "2026-03-29T16:30:00.023Z",
        "status": "completed"
    }
}

🔗 依赖管理 API

# 获取依赖关系
GET /api/dependencies/calculate_cylinder_volume

Response:
{
    "success": true,
    "dependencies": {
        "imports": ["math"],
        "functions": ["calculate_circle_area"],
        "keys": ["openai_api_key"]
    }
}

# 解析依赖
POST /api/resolve
Content-Type: application/json

{
    "function_names": ["calculate_cylinder_volume", "calculate_circle_area"]
}

Response:
{
    "success": true,
    "resolution": {
        "calculate_cylinder_volume": {
            "status": "resolved",
            "missing": [],
            "required": ["math", "calculate_circle_area"]
        },
        "calculate_circle_area": {
            "status": "resolved",
            "missing": [],
            "required": ["math"]
        }
    }
}

⚡ 触发器管理 API

# 添加触发器
POST /api/triggers
Content-Type: application/json

{
    "triggered_function": "generate_report",
    "triggering_function": "calculate_cylinder_volume",
    "conditions": {
        "result_greater_than": 0
    }
}

Response:
{
    "success": true,
    "trigger_id": "trigger-123",
    "message": "Trigger created successfully"
}

# 获取触发器
GET /api/triggers

Response:
{
    "success": true,
    "triggers": [
        {
            "id": "trigger-123",
            "triggered_function": "generate_report",
            "triggering_function": "calculate_cylinder_volume",
            "conditions": {...}
        }
    ]
}

# 删除触发器
DELETE /api/triggers/trigger-123

Response:
{
    "success": true,
    "message": "Trigger deleted successfully"
}

📐 设计模式 - 函数式编程

函数式编程在 BabyAGI 中的应用,强调纯函数、不可变数据和函数组合

# 纯函数示例 - 无副作用,相同输入产生相同输出
@babyagi.register_function()
def calculate_circle_area(radius):
    """纯函数:计算圆的面积"""
    import math
    if radius <= 0:
        raise ValueError("Radius must be positive")
    return math.pi * radius ** 2

# 高阶函数 - 函数作为参数或返回值
@babyagi.register_function()
def apply_operation(numbers, operation):
    """高阶函数:对数字列表应用操作"""
    return [operation(num) for num in numbers]

# 函数组合
@babyagi.register_function()
def calculate_cylinder_volume(radius, height):
    """函数组合:组合多个函数"""
    import math
    area = calculate_circle_area(radius)  # 复用已有函数
    return area * height

📐 设计模式 - 依赖注入

依赖注入通过函数参数显式声明依赖,而不是在函数内部创建依赖

# 传统方式 - 在函数内部创建依赖
@babyagi.register_function()
def process_data():
    # 硬编码依赖
    api_client = APIClient()
    database = Database()
    return api_client.fetch_and_process_data(database)

# 依赖注入方式 - 通过参数声明依赖
@babyagi.register_function(
    dependencies=["api_client", "database"]  # 声明依赖
)
def process_data_injected(api_client, database):
    # 依赖通过参数注入,易于测试和替换
    return api_client.fetch_and_process_data(database)

# 自动依赖解析
@babyagi.register_function(
    dependencies=["process_data_injected"],
    key_dependencies=["openai_api_key"]
)
def generate_report(data, api_key):
    # 框架自动解析和注入依赖
    processed_data = process_data_injected(data)
    # 使用 API 密钥
    return generate_summary(processed_data, api_key)

📐 设计模式 - 观察者模式

观察者模式用于函数间的通知机制,当一个函数状态变化时通知相关函数

# 观察者模式实现
class FunctionObserver:
    def __init__(self):
        self.observers = {}
    
    def register_observer(self, function_name, observer_function):
        if function_name not in self.observers:
            self.observers[function_name] = []
        self.observers[function_name].append(observer_function)
    
    def notify_observers(self, function_name, data):
        if function_name in self.observers:
            for observer in self.observers[function_name]:
                try:
                    observer(data)
                except Exception as e:
                    logger.error(f"Observer {observer.__name__} failed: {str(e)}")

# 使用示例
def on_function_updated(function_name, new_code):
    """观察者:函数更新时的回调"""
    print(f"Function {function_name} was updated")
    # 触发重新加载或测试

# 注册观察者
observer = FunctionObserver()
observer.register_observer("calculate_circle_area", on_function_updated)

🔄 数据流 - 函数注册流程

┌──────────────────────────────────────────────────────────┐
│                   函数注册数据流                           │
├──────────────────────────────────────────────────────────┤
│ 1. 装饰器调用                                           │
│    @babyagi.register_function(                          │
│        imports=["math"],                                │
│        dependencies=["circle_area"]                     │
│    )                                                    │
│                                                         │
│ 2. 函数包装                                             │
│    └→ 创建 wrapper 函数                                 │
│    └→ 提取函数源代码                                    │
│    └→ 构建元数据                                        │
│                                                         │
│ 3. 依赖检查                                             │
│    └→ 外部库验证 (math)                                 │
│    └→ 函数依赖检查 (circle_area)                        │
│    └→ 循环依赖检测                                      │
│                                                         │
│ 4. 数据存储                                             │
│    └→ FunctionData 对象创建                             │
│    └→ 版本号分配                                        │
│    └→ 数据库存储                                        │
│    └→ 全局函数设置                                      │
│                                                         │
│ 5. 事件触发                                             │
│    └→ 注册完成事件                                      │
│    └→ 观察者通知                                        │
│    └→ Dashboard 更新                                    │
└──────────────────────────────────────────────────────────┘

🔄 数据流 - 函数执行流程

┌──────────────────────────────────────────────────────────┐
│                   函数执行数据流                           │
├──────────────────────────────────────────────────────────┤
│ 1. 执行请求                                             │
│    └→ babyagi.execute_function("func", *args, **kwargs)  │
│                                                         │
│ 2. 函数查找                                             │
│    └→ 数据库查询函数信息                                │
│    └→ 获取函数代码和元数据                              │
│    └→ 版本验证                                          │
│                                                         │
│ 3. 依赖解析                                             │
│    └→ 外部库安装 (import math)                           │
│    └→ 函数依赖执行 (circle_area)                        │
│    └→ 递归依赖解析                                      │
│    └→ 本地作用域构建                                    │
│                                                         │
│ 4. 函数执行                                             │
│    └→ exec(function_code, local_scope)                   │
│    └→ 函数调用                                          │
│    └→ 结果获取                                          │
│    └→ 错误处理                                          │
│                                                         │
│ 5. 结果返回                                             │
│    └→ 执行日志记录                                      │
│    └→ 触发器执行                                        │
│    └→ 结果返回给调用者                                  │
│    └→ Dashboard 更新                                    │
└──────────────────────────────────────────────────────────┘

🔄 数据流 - 依赖解析流程

# 依赖解析的深度优先搜索算法
def resolve_dependencies_dfs(function_name, visited=None, path=None):
    visited = visited or set()
    path = path or []
    
    # 检测循环依赖
    if function_name in visited:
        raise ValueError(f"循环依赖检测: {' -> '.join(path + [function_name])}")
    
    # 标记已访问
    visited.add(function_name)
    path.append(function_name)
    
    try:
        # 获取函数信息
        function = get_function(function_name)
        
        # 解析外部库依赖
        for import_spec in function.get('imports', []):
            if import_spec['name'] not in sys.modules:
                install_import(import_spec)
        
        # 递归解析函数依赖
        for dep_name in function.get('dependencies', []):
            if dep_name not in executed_functions:
                resolve_dependencies_dfs(dep_name, visited, path)
        
        return path
        
    finally:
        # 回溯
        visited.remove(function_name)
        path.pop()

# 执行顺序示例
# calculate_cylinder_volume → calculate_circle_area → math
# 依赖树:
# └─ calculate_cylinder_volume
#    └─ calculate_circle_area
#       └─ math

⚡ 数据流 - 触发机制

触发机制:当一个函数执行完成或发生变化时,自动执行相关函数

# 触发器管理系统
class TriggerSystem:
    def __init__(self):
        self.triggers = {}  # {triggered_function: [triggering_functions]}
    
    def add_trigger(self, triggered_function, triggering_function=None):
        if triggered_function not in self.triggers:
            self.triggers[triggered_function] = []
        
        if triggering_function and triggering_function not in self.triggers[triggered_function]:
            self.triggers[triggered_function].append(triggering_function)
    
    def execute_triggers(self, function_name, result):
        """当函数执行完成后触发相关函数"""
        if function_name in self.triggers:
            for trigger_func in self.triggers[function_name]:
                try:
                    # 传递结果给触发函数
                    trigger_func(result)
                except Exception as e:
                    logger.error(f"Trigger {trigger_func} failed: {str(e)}")
    
    def auto_trigger_on_update(self, function_name, new_code):
        """当函数代码更新时自动触发"""
        self.add_trigger("regenerate_tests", function_name)
        self.add_trigger("update_documentation", function_name)
        
        # 自动触发
        self.execute_triggers(function_name, {"code": new_code})

# 使用示例
@babyagi.register_function(
    triggers=["generate_tests", "update_docs"]
)
def calculate_circle_area(radius):
    return math.pi * radius ** 2

🚀 性能优化 - 函数缓存机制

缓存策略:避免重复执行相同参数的函数,提高执行效率

# 函数缓存实现
class FunctionCache:
    def __init__(self):
        self.cache = {}  # {(function_name, args_hash): result}
        self.max_size = 1000
        self.hit_count = 0
        self.miss_count = 0
    
    def get(self, function_name, args, kwargs):
        """获取缓存结果"""
        key = self._make_key(function_name, args, kwargs)
        
        if key in self.cache:
            self.hit_count += 1
            return self.cache[key]
        
        self.miss_count += 1
        return None
    
    def set(self, function_name, args, kwargs, result):
        """设置缓存结果"""
        key = self._make_key(function_name, args, kwargs)
        
        # LRU 淘汰策略
        if len(self.cache) >= self.max_size:
            oldest_key = next(iter(self.cache))
            del self.cache[oldest_key]
        
        self.cache[key] = result
    
    def _make_key(self, function_name, args, kwargs):
        """生成缓存键"""
        import hashlib
        key_str = f"{function_name}:{str(args)}:{str(kwargs)}"
        return hashlib.md5(key_str.encode()).hexdigest()

# 带缓存的函数执行
def execute_with_cache(self, function_name, *args, **kwargs):
    # 检查缓存
    cached_result = self.cache.get(function_name, args, kwargs)
    if cached_result is not None:
        return cached_result
    
    # 执行函数
    result = self.executor.execute(function_name, *args, **kwargs)
    
    # 缓存结果
    self.cache.set(function_name, args, kwargs, result)
    return result

🚀 性能优化 - 懒加载策略

懒加载:只在需要时才加载和执行函数,减少启动时间和内存使用

# 懒加载函数包装器
class LazyFunctionWrapper:
    def __init__(self, function_name, load_func):
        self.function_name = function_name
        self.load_func = load_func
        self._actual_function = None
        self._loaded = False
    
    def __call__(self, *args, **kwargs):
        if not self._loaded:
            self._actual_function = self.load_func()
            self._loaded = True
        
        return self._actual_function(*args, **kwargs)
    
    def is_loaded(self):
        return self._loaded

# 懒加载函数注册
def register_lazy_function(self, function_name, load_path):
    def lazy_wrapper(*args, **kwargs):
        # 第一次调用时才加载
        if not hasattr(lazy_wrapper, '_actual_function'):
            # 从文件加载函数
            spec = importlib.util.spec_from_file_location(function_name, load_path)
            module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(module)
            lazy_wrapper._actual_function = getattr(module, function_name)
        
        return lazy_wrapper._actual_function(*args, **kwargs)
    
    setattr(sys.modules[__name__], function_name, lazy_wrapper)

# 使用示例
@babyagi.register_function(
    lazy=True,  # 标记为懒加载
    load_path="/path/to/heavy_function.py"
)
def heavy_computation(data):
    # 第一次调用时才加载这个函数
    return process_heavy_data(data)

🚀 性能优化 - 异步处理

异步执行:支持函数的异步执行,提高并发性能和响应速度

# 异步函数执行器
import asyncio
from concurrent.futures import ThreadPoolExecutor

class AsyncFunctionExecutor:
    def __init__(self, max_workers=4):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.loop = asyncio.get_event_loop()
    
    async def execute_async(self, function_name, *args, **kwargs):
        """异步执行函数"""
        # 在线程池中同步执行
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(
            self.executor,
            self.executor.execute,
            function_name, args, kwargs
        )
        return result
    
    async def execute_batch_async(self, function_calls):
        """批量异步执行多个函数"""
        tasks = []
        for call in function_calls:
            task = self.execute_async(*call)
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results
    
    async def execute_chain_async(self, function_chain):
        """异步执行函数链"""
        results = {}
        for function_name, args, kwargs in function_chain:
            result = await self.execute_async(function_name, *args, **kwargs)
            results[function_name] = result
        
        return results

# 异步函数注册
def register_async_function(self, function_name):
    def async_wrapper(*args, **kwargs):
        return self.async_executor.execute_async(function_name, *args, **kwargs)
    
    return async_wrapper

✅ 最佳实践

函数设计

  • 使用纯函数,避免副作用
  • 明确声明依赖关系
  • 添加适当的错误处理
  • 提供清晰的文档和注释
  • 使用有意义的函数名

依赖管理

  • 按需导入外部库
  • 避免循环依赖
  • 使用版本控制
  • 定期清理无用依赖

性能优化

  • 对频繁调用的函数使用缓存
  • 对重函数使用懒加载
  • 合理使用异步处理
  • 避免不必要的依赖解析

安全考虑

  • 验证函数输入参数
  • 限制函数执行权限
  • 敏感数据使用密钥管理
  • 记录执行日志用于审计

⚠️ 常见陷阱

1. 循环依赖

# ❌ 错误:循环依赖
@babyagi.register_function(dependencies=["func_b"])
def func_a():
    return func_b()

@babyagi.register_function(dependencies=["func_a"])
def func_b():
    return func_a()  # 循环调用

2. 副作用

# ❌ 错误:函数有副作用
@babyagi.register_function()
def process_data():
    global_state = modify_global()  # 修改全局状态
    return process(global_state)

# ✅ 正确:使用参数传递
@babyagi.register_function()
def process_data(input_data):
    return process(input_data)  # 不修改外部状态

🔍 调试技巧

# 1. 启用详细日志
import logging
logging.basicConfig(level=logging.DEBUG)

# 2. 使用调试装饰器
@babyagi.register_function(debug=True)
def debug_function():
    """启用调试模式,记录详细执行信息"""
    import inspect
    frame = inspect.currentframe()
    logger.debug(f"Function called from: {frame.f_back.f_code.co_name}")
    return result

# 3. 依赖可视化
def visualize_dependencies(function_name):
    """可视化函数依赖关系"""
    dependencies = get_function_dependencies(function_name)
    print(f"Function: {function_name}")
    print("Dependencies:")
    for dep in dependencies:
        print(f"  - {dep}")

# 4. 性能分析
@babyagi.register_function(profile=True)
def profiled_function():
    """性能分析装饰器"""
    import time
    start = time.time()
    result = expensive_operation()
    end = time.time()
    logger.info(f"Function executed in {end - start:.3f}s")
    return result

# 5. 错误处理增强
@babyagi.register_function(robust=True)
def robust_function(data):
    """增强错误处理"""
    try:
        return process_data(data)
    except Exception as e:
        logger.error(f"Function failed: {str(e)}")
        return fallback_result

📚 扩展阅读

官方文档

相关技术

  • Flask Web 框架
  • Python 装饰器模式
  • 依赖注入容器
  • 函数式编程范式
  • 观察者模式实现

相似项目

  • AutoGPT - 自主智能体
  • CrewAI - 多智能体协作
  • LangChain - LLM 链接
  • AutoGen - 多智能体框架

学术研究

  • 自主智能体架构
  • 任务规划算法
  • 函数式编程理论
  • 分布式系统设计

🎯 总结

核心要点

  • Functionz 函数管理框架
  • Flask Web 应用集成
  • 依赖解析 自动依赖管理
  • 触发器 函数间自动触发
  • Dashboard 可视化管理

架构亮点

  • 函数式编程范式
  • 依赖注入设计
  • 版本控制机制
  • Web 可视化管理
  • 自动依赖解析

BabyAGI 是一个创新的函数化智能体框架,通过 Functionz 实现了函数的自动管理、依赖解析和触发执行,为构建自主智能体提供了强大的基础架构。

🤖 感谢阅读

BabyAGI 任务驱动智能体源码深度解析

项目地址
https://github.com/yoheinakajima/babyagi

访问链接: https://atcfu.com/ai-articles/babyagi/