🤖 OpenAI Function Calling

实现机制源码深度解析

基于OpenAI官方文档与实现
2026-03-29 | AI技术深度解读

📑 目录

第一部分:基础概念

  • Function Calling 简介
  • 核心架构设计
  • API 演进历史
  • 核心组件解析

第二部分:核心实现

  • JSON Schema 定义
  • 工具定义机制
  • 函数调用流程
  • 解析与执行

第三部分:SDK 实现

  • Python SDK 实现
  • JavaScript SDK 实现
  • TypeScript 类型定义
  • 工具注册机制

第四部分:进阶特性

  • Structured Outputs
  • 并行调用处理
  • 工具链编排
  • 性能优化策略

🤖 Function Calling 简介

OpenAI Function Calling(现称为 Tools)允许模型返回结构化的函数调用,而不是纯文本响应,使AI能够与外部系统、API和业务逻辑进行交互。

核心理念

  • 模型返回可执行的函数调用
  • 基于 JSON Schema 的参数定义
  • 安全的参数验证和执行
  • 支持异步和并行调用

应用场景

  • API 集成和数据获取
  • 数据库操作和管理
  • 文件处理和生成
  • 复杂任务编排

🏗️ 核心架构设计

Function Calling 架构 采用分层设计,确保类型安全、可扩展和高性能。

[Client Layer] ├── SDK (Python/JS/TypeScript) └── HTTP API Interface [Core Layer] ├── Tool Registry ├── Schema Validator ├── Call Router └── Response Parser [Execution Layer] ├── Function Executor ├── Parameter Binder └── Result Handler [External Layer] ├── APIs & Services ├── Databases └── File Systems

设计原则:类型安全、高性能、可扩展、容错性强

📚 API 演进历史

Function Calling 经历了多次重要演进,从简单的函数调用到现代化的工具系统。

版本 时间 主要变化
Initial Release 2023-06 functions / function_call 参数
Tools API 2023-11 tools / tool_choice 替换 functions
Structured Outputs 2024-09 strict: true 模式
Enhanced SDK 2025 类型安全、流式响应

⚙️ 核心组件解析

工具定义 (Tool Definition)

  • type: "function"
  • function.name - 工具名称
  • function.description - 描述
  • function.parameters - JSON Schema

调用机制

  • 模型分析用户输入
  • 选择合适的工具
  • 生成参数 JSON
  • 返回结构化调用

执行流程

  • 接收工具调用请求
  • 验证参数结构
  • 绑定到实际函数
  • 执行并返回结果
  • 处理响应格式化

关键特性

  • 类型安全的参数验证
  • 并行函数调用
  • 异步结果处理
  • 错误恢复机制

📋 JSON Schema 定义

JSON Schema 是 Function Calling 的核心,定义了工具参数的结构、类型和约束。

{
  "type": "object",
  "properties": {
    "name": {
      "type": "string",
      "description": "工具的名称"
    },
    "parameters": {
      "type": "object",
      "properties": {
        "type": { "type": "string" },
        "description": { "type": "string" },
        "properties": {
          "param1": {
            "type": "string",
            "description": "参数1描述"
          }
        },
        "required": ["param1"]
      }
    }
  }
}

支持的数据类型:string, number, boolean, array, object

🔧 工具定义机制

工具定义采用装饰器模式,使工具注册既灵活又类型安全。

from openai import OpenAI
import json

# 基础工具定义
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_stock_price",
            "description": "获取股票价格",
            "parameters": {
                "type": "object",
                "properties": {
                    "symbol": {
                        "type": "string",
                        "description": "股票代码"
                    }
                },
                "required": ["symbol"]
            }
        }
    }
]

工具特性:类型安全、描述清晰、参数验证、必填字段

🔄 函数调用流程

User Input → Model Analysis → Tool Selection → Parameter Generation ↓ Call Router → Schema Validation → Function Binding → Execution ↓ Result Processing → Response Formatting → Final Output

关键步骤

  • 输入解析和理解
  • 工具选择决策
  • 参数生成和验证
  • 函数执行和错误处理

质量控制

  • Schema 严格验证
  • 参数类型检查
  • 执行结果验证
  • 错误边界处理

⚡ 解析与执行

解析与执行层 负责将结构化的工具调用转换为实际的函数执行。

def execute_tool_call(tool_call):
    try:
        # 解析工具调用
        function_name = tool_call.function.name
        arguments = json.loads(tool_call.function.arguments)
        
        # 查找对应的函数
        function = tool_registry.get_function(function_name)
        
        # 执行函数
        result = function(**arguments)
        
        # 返回格式化结果
        return {
            "tool_call_id": tool_call.id,
            "role": "tool",
            "content": str(result)
        }
    except Exception as e:
        return handle_error(tool_call, e)

执行特性:异常处理、结果缓存、超时控制、重试机制

🎯 工具选择算法

工具选择基于语义匹配和上下文分析,确保选择最适合的工具来满足用户需求。

Semantic Analysis → Context Understanding → Tool Matching ↓ Scoring System → Best Tool Selection → Parameter Generation ↓ Confidence Check → Final Tool Call

匹配策略:语义相似度、上下文相关性、功能匹配度、历史使用数据

✅ 参数验证机制

参数验证确保类型安全和数据完整性,防止无效或危险的参数传递。

def validate_parameters(schema, parameters):
    errors = []
    
    # 检查必填字段
    for required in schema.get("required", []):
        if required not in parameters:
            errors.append(f"Missing required parameter: {required}")
    
    # 类型验证
    properties = schema.get("properties", {})
    for param_name, param_schema in properties.items():
        if param_name in parameters:
            value = parameters[param_name]
            expected_type = param_schema.get("type")
            
            if not validate_type(value, expected_type):
                errors.append(f"Invalid type for {param_name}")
    
    return errors

def validate_type(value, expected_type):
    if expected_type == "string":
        return isinstance(value, str)
    elif expected_type == "number":
        return isinstance(value, (int, float))
    # ... 其他类型验证
    return True

📤 响应处理

响应处理层负责处理函数执行结果,并将其格式化为模型可以理解的格式。

响应格式

  • 成功响应:{"role": "tool", "content": "..."}
  • 错误响应:包含错误信息和堆栈跟踪
  • 部分响应:流式或分块结果

处理策略

  • 结果序列化
  • 错误包装
  • 超时处理
  • 重试机制

💻 代码示例 - 基础

基础的 Function Calling 示例,演示如何定义工具和处理函数调用。

import openai

# 初始化客户端
client = openai.OpenAI(api_key="your-api-key")

# 定义工具
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "获取指定城市的天气信息",
            "parameters": {
                "type": "object",
                "properties": {
                    "city": {
                        "type": "string",
                        "description": "城市名称"
                    }
                },
                "required": ["city"]
            }
        }
    }
]

# 发送请求
response = client.chat.completions.create(
    model="gpt-4",
    messages=[{
        "role": "user",
        "content": "北京今天天气怎么样?"
    }],
    tools=tools
)

# 处理工具调用
if response.choices[0].message.tool_calls:
    for tool_call in response.choices[0].message.tool_calls:
        if tool_call.function.name == "get_weather":
            import json
            args = json.loads(tool_call.function.arguments)
            city = args["city"]
            weather_data = get_weather_data(city)
            # ... 处理结果

💻 代码示例 - 高级

高级 Function Calling 示例,包含并行调用、错误处理和结果聚合。

async def advanced_function_calling():
    # 定义多个工具
    tools = [
        {
            "type": "function",
            "function": {
                "name": "get_stock_price",
                "parameters": {"type": "object", "properties": {"symbol": {"type": "string"}}}
            }
        },
        {
            "type": "function", 
            "function": {
                "name": "get_news",
                "parameters": {"type": "object", "properties": {"topic": {"type": "string"}}}
            }
        }
    ]
    
    # 并行调用示例
    tasks = []
    for tool in tools:
        task = client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": "分析苹果股票和科技新闻"}],
            tools=[tool]
        )
        tasks.append(task)
    
    # 执行并行调用
    results = await asyncio.gather(*tasks)
    
    # 聚合结果
    aggregated_data = {}
    for result in results:
        if result.choices[0].message.tool_calls:
            for tool_call in result.choices[0].message.tool_calls:
                function_name = tool_call.function.name
                args = json.loads(tool_call.function.arguments)
                result = execute_function(function_name, args)
                aggregated_data[function_name] = result
    
    return aggregated_data

🐍 Python SDK 实现

OpenAI Python SDK 提供了完整的工具支持,使 Function Calling 使用变得简单直观。

from openai import OpenAI
from pydantic import BaseModel, Field
import json

# 使用 Pydantic 定义工具结构
class StockPriceRequest(BaseModel):
    symbol: str = Field(description="股票代码,如 AAPL, MSFT")

class WeatherRequest(BaseModel):
    city: str = Field(description="城市名称")

# 工具注册
def register_tools():
    return [
        {
            "type": "function",
            "function": {
                "name": "get_stock_price",
                "description": "获取实时股票价格",
                "parameters": StockPriceRequest.model_json_schema()
            }
        },
        {
            "type": "function",
            "function": {
                "name": "get_weather",
                "description": "获取城市天气信息", 
                "parameters": WeatherRequest.model_json_schema()
            }
        }
    ]

# 使用装饰器模式
def tool_call_decorator(function):
    def wrapper(*args, **kwargs):
        try:
            result = function(*args, **kwargs)
            return {"success": True, "data": result}
        except Exception as e:
            return {"success": False, "error": str(e)}
    return wrapper

🟨 JavaScript SDK 实现

JavaScript/TypeScript SDK 提供了现代化的工具支持,支持类型安全和异步处理。

import OpenAI from 'openai';

// 初始化客户端
const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
});

// 定义工具类型
interface StockPriceParams {
  symbol: string;
}

interface WeatherParams {
  city: string;
}

// 工具定义
const tools = [
  {
    type: 'function',
    function: {
      name: 'getStockPrice',
      description: '获取实时股票价格',
      parameters: {
        type: 'object',
        properties: {
          symbol: {
            type: 'string',
            description: '股票代码,如 AAPL, MSFT'
          }
        },
        required: ['symbol']
      }
    }
  },
  {
    type: 'function',
    function: {
      name: 'getWeather',
      description: '获取城市天气信息',
      parameters: {
        type: 'object',
        properties: {
          city: {
            type: 'string',
            description: '城市名称'
          }
        },
        required: ['city']
      }
    }
  }
];

// 工具执行函数
async function executeTool(toolName: string, params: any): Promise<any> {
  switch (toolName) {
    case 'getStockPrice':
      return await fetchStockPrice(params.symbol);
    case 'getWeather':
      return await fetchWeather(params.city);
    default:
      throw new Error(`Unknown tool: ${toolName}`);
  }
}

📝 TypeScript 类型定义

TypeScript 提供了完整的类型安全支持,确保 Function Calling 的类型正确性。

// 工具类型定义
interface Tool<T extends Record<string, any>> {
  type: 'function';
  function: {
    name: string;
    description: string;
    parameters: {
      type: 'object';
      properties: {
        [K in keyof T]: {
          type: string;
          description: string;
        };
      };
      required: Array<keyof T>;
    };
  };
}

// 工具调用类型
interface ToolCall {
  id: string;
  type: 'function';
  function: {
    name: string;
    arguments: string; // JSON string
  };
}

// 响应类型
interface ChatMessage {
  role: 'user' | 'assistant' | 'tool';
  content?: string;
  tool_calls?: ToolCall[];
}

// 工具注册器
class ToolRegistry {
  private tools: Map<string, Function> = new Map();

  registerTool<T>(name: string, fn: (params: T) => Promise<any>) {
    this.tools.set(name, fn);
  }

  async executeTool(toolCall: ToolCall): Promise<any> {
    const fn = this.tools.get(toolCall.function.name);
    if (!fn) {
      throw new Error(`Tool not found: ${toolCall.function.name}`);
    }
    
    const params = JSON.parse(toolCall.function.arguments);
    return await fn(params);
  }
}

🔧 工具注册机制

工具注册机制采用工厂模式,使工具管理既灵活又易于维护。

class ToolRegistry:
    def __init__(self):
        self.tools = {}
        self.schemas = {}
    
    def register_tool(self, name: str, func: Callable, schema: dict):
        """注册工具"""
        self.tools[name] = func
        self.schemas[name] = schema
        
    def get_tool(self, name: str) -> Callable:
        """获取工具函数"""
        return self.tools.get(name)
    
    def get_schema(self, name: str) -> dict:
        """获取工具Schema"""
        return self.schemas.get(name)
    
    def list_tools(self) -> list:
        """列出所有工具"""
        return list(self.tools.keys())

# 装饰器方式注册
def tool(name: str, description: str):
    def decorator(func: Callable):
        schema = generate_schema(func, description)
        tool_registry.register_tool(name, func, schema)
        return func
    return decorator

# 使用示例
@tool("calculate", "执行数学计算")
def calculate(expression: str) -> float:
    """安全的数学表达式计算"""
    try:
        return eval(expression)
    except:
        raise ValueError("Invalid expression")

@tool("search", "搜索信息") 
def search(query: str) -> str:
    """网络搜索功能"""
    # 实现搜索逻辑
    pass

🔄 动态工具加载

动态工具加载支持运行时工具发现和加载,提高系统的灵活性和可扩展性。

Plugin System → Tool Discovery → Schema Generation → Registration ↓ Hot Loading → Validation Testing → Integration ↓ Runtime Updates → Performance Monitoring
class DynamicToolLoader:
    def __init__(self):
        self.plugin_directories = []
        self.loaded_tools = {}
    
    def add_plugin_directory(self, directory: str):
        """添加插件目录"""
        self.plugin_directories.append(directory)
    
    def discover_tools(self):
        """发现所有可用工具"""
        tools = []
        
        for directory in self.plugin_directories:
            for file_path in glob.glob(f"{directory}/*.py"):
                module_name = os.path.splitext(os.path.basename(file_path))[0]
                spec = importlib.util.spec_from_file_location(module_name, file_path)
                module = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(module)
                
                # 查找工具装饰器函数
                for attr_name in dir(module):
                    attr = getattr(module, attr_name)
                    if hasattr(attr, '_tool_info'):
                        tools.append(attr)
        
        return tools
    
    def load_tool(self, tool_func):
        """加载单个工具"""
        tool_info = tool_func._tool_info
        self.loaded_tools[tool_info['name']] = tool_func
        return tool_info
    
    def unload_tool(self, name: str):
        """卸载工具"""
        if name in self.loaded_tools:
            del self.loaded_tools[name]

🚨 错误处理策略

错误处理采用分层设计,确保系统在遇到错误时能够优雅降级并提供有用的错误信息。

错误类型

  • 参数验证错误
  • 函数执行错误
  • 网络连接错误
  • 超时错误
  • 权限错误

处理策略

  • 错误分类和包装
  • 错误消息格式化
  • 重试机制
  • 降级处理
  • 错误日志记录

🔄 重试机制

重试机制确保系统在遇到临时故障时能够自动恢复,提高系统的可靠性。

import asyncio
import time
from typing import Callable, Any

class RetryHandler:
    def __init__(self, max_retries: int = 3, delay: float = 1.0):
        self.max_retries = max_retries
        self.delay = delay
    
    async def execute_with_retry(self, func: Callable, *args, **kwargs) -> Any:
        """带重试的异步执行"""
        last_error = None
        
        for attempt in range(self.max_retries + 1):
            try:
                return await func(*args, **kwargs)
            except Exception as e:
                last_error = e
                
                if attempt < self.max_retries:
                    wait_time = self.delay * (2 ** attempt)  # 指数退避
                    await asyncio.sleep(wait_time)
                    continue
                
                # 所有重试都失败
                raise last_error
    
    def execute_with_retry_sync(self, func: Callable, *args, **kwargs) -> Any:
        """同步重试执行"""
        last_error = None
        
        for attempt in range(self.max_retries + 1):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                last_error = e
                
                if attempt < self.max_retries:
                    time.sleep(self.delay * (2 ** attempt))
                    continue
                
                raise last_error

# 使用示例
async def safe_tool_execution(tool_name: str, params: dict):
    retry_handler = RetryHandler(max_retries=3, delay=1.0)
    
    try:
        return await retry_handler.execute_with_retry(
            execute_tool, tool_name, params
        )
    except Exception as e:
        logger.error(f"Tool {tool_name} failed after retries: {e}")
        return {"error": "Tool execution failed", "details": str(e)}

⚡ 并行调用处理

并行调用处理允许多个工具同时执行,显著提高系统的吞吐量和响应速度。

import asyncio
from typing import List, Dict, Any
import concurrent.futures

class ParallelToolExecutor:
    def __init__(self, max_workers: int = 5):
        self.max_workers = max_workers
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
    
    async def execute_parallel(self, tool_calls: List[Dict]) -> List[Dict]:
        """并行执行多个工具调用"""
        # 创建任务列表
        tasks = []
        for tool_call in tool_calls:
            task = asyncio.create_task(
                self._execute_single_tool(tool_call)
            )
            tasks.append(task)
        
        # 并行执行所有任务
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 处理结果
        processed_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                processed_results.append({
                    "tool_call_id": tool_calls[i]["id"],
                    "error": str(result),
                    "success": False
                })
            else:
                processed_results.append({
                    "tool_call_id": tool_calls[i]["id"],
                    "result": result,
                    "success": True
                })
        
        return processed_results
    
    async def _execute_single_tool(self, tool_call: Dict) -> Any:
        """执行单个工具调用"""
        tool_name = tool_call["function"]["name"]
        params = json.loads(tool_call["function"]["arguments"])
        
        # 在线程池中执行工具
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(
            self.executor,
            execute_tool_sync,
            tool_name,
            params
        )
        
        return result
    
    def shutdown(self):
        """关闭执行器"""
        self.executor.shutdown()

# 使用示例
async def process_multiple_requests(requests: List[Dict]):
    executor = ParallelToolExecutor(max_workers=3)
    
    try:
        results = await executor.execute_parallel(requests)
        return results
    finally:
        executor.shutdown()

🔗 工具链编排

工具链编排允许将多个工具组合成复杂的业务流程,实现更强大的自动化能力。

Input → Tool Selection → Execution Chain → Result Aggregation ↓ Error Handling → Fallback Strategy → Output Generation ↓ Logging & Monitoring → Performance Optimization
class ToolChainOrchestrator:
    def __init__(self):
        self.chains = {}
        self.execution_history = []
    
    def register_chain(self, name: str, tools: List[str], dependencies: Dict):
        """注册工具链"""
        self.chains[name] = {
            "tools": tools,
            "dependencies": dependencies,
            "status": "registered"
        }
    
    async def execute_chain(self, chain_name: str, input_data: Dict) -> Dict:
        """执行工具链"""
        if chain_name not in self.chains:
            raise ValueError(f"Chain {chain_name} not found")
        
        chain = self.chains[chain_name]
        results = {}
        
        # 按依赖关系排序工具执行顺序
        execution_order = self._resolve_dependencies(chain)
        
        for tool_name in execution_order:
            # 获取工具参数
            params = self._prepare_tool_params(tool_name, input_data, results)
            
            # 执行工具
            try:
                result = await execute_tool_async(tool_name, params)
                results[tool_name] = result
                
                # 更新输入数据供后续工具使用
                input_data.update(result)
                
            except Exception as e:
                # 错误处理和重试
                error_result = await self._handle_tool_error(tool_name, params, e)
                results[tool_name] = error_result
        
        return results
    
    def _resolve_dependencies(self, chain: Dict) -> List[str]:
        """解析工具依赖关系"""
        # 拓扑排序算法
        visited = set()
        temp_visited = set()
        result = []
        
        def visit(tool_name):
            if tool_name in temp_visited:
                raise ValueError(f"Circular dependency detected")
            
            if tool_name not in visited:
                temp_visited.add(tool_name)
                
                # 访问依赖的工具
                for dep in chain["dependencies"].get(tool_name, []):
                    visit(dep)
                
                temp_visited.remove(tool_name)
                visited.add(tool_name)
                result.append(tool_name)
        
        for tool_name in chain["tools"]:
            if tool_name not in visited:
                visit(tool_name)
        
        return result

📊 Structured Outputs

Structured Outputs 是 Function Calling 的重要增强,确保返回的 JSON 严格符合指定的 Schema。

核心特性

  • strict: true 模式
  • 类型强制转换
  • 必需字段验证
  • 无额外属性
  • 枚举值验证

使用场景

  • 数据格式标准化
  • API 契约强制
  • 配置文件生成
  • 数据验证
{
  "type": "function",
  "function": {
    "name": "generate_report",
    "description": "生成标准格式报告",
    "parameters": {
      "type": "object",
      "strict": true,
      "properties": {
        "title": {
          "type": "string",
          "description": "报告标题"
        },
        "sections": {
          "type": "array",
          "items": {
            "type": "object",
            "properties": {
              "name": {"type": "string"},
              "content": {"type": "string"}
            },
            "required": ["name", "content"]
          }
        },
        "metadata": {
          "type": "object",
          "properties": {
            "author": {"type": "string"},
            "created_at": {"type": "string", "format": "date-time"}
          },
          "required": ["author"]
        }
      },
      "required": ["title", "sections"]
    }
  }
}

⚙️ 严格模式配置

严格模式确保输出的类型安全性和数据完整性,防止无效或意外的数据格式。

class StrictModeValidator:
    def __init__(self):
        self.validators = {
            "string": self._validate_string,
            "number": self._validate_number,
            "integer": self._validate_integer,
            "boolean": self._validate_boolean,
            "array": self._validate_array,
            "object": self._validate_object
        }
    
    def validate_strict(self, schema: dict, data: any) -> dict:
        """严格模式验证"""
        # 检查必需字段
        required_fields = schema.get("required", [])
        for field in required_fields:
            if field not in data:
                raise ValueError(f"Missing required field: {field}")
        
        # 检查额外字段
        properties = schema.get("properties", {})
        for field in data:
            if field not in properties:
                if schema.get("strict", False):
                    raise ValueError(f"Unexpected field: {field}")
        
        # 验证每个字段
        for field, field_schema in properties.items():
            if field in data:
                self._validate_field(field, field_schema, data[field])
        
        return data
    
    def _validate_field(self, field: str, schema: dict, value: any):
        """验证单个字段"""
        field_type = schema.get("type")
        validator = self.validators.get(field_type)
        
        if validator:
            validator(value, schema)
        
        # 检查枚举值
        if "enum" in schema:
            if value not in schema["enum"]:
                raise ValueError(f"Invalid value for {field}: {value}")
        
        # 检查模式匹配
        if "pattern" in schema:
            import re
            if not re.match(schema["pattern"], str(value)):
                raise ValueError(f"Pattern mismatch for {field}: {value}")
    
    def _validate_object(self, value: any, schema: dict):
        """验证对象类型"""
        if not isinstance(value, dict):
            raise ValueError("Expected object type")
        
        # 验证对象属性
        properties = schema.get("properties", {})
        for prop, prop_schema in properties.items():
            if prop in value:
                self._validate_field(prop, prop_schema, value[prop])

# 使用示例
validator = StrictModeValidator()
schema = {
    "type": "object",
    "strict": True,
    "properties": {
        "name": {"type": "string"},
        "age": {"type": "number", "minimum": 0}
    },
    "required": ["name", "age"]
}

data = {"name": "John", "age": 25}
validated_data = validator.validate_strict(schema, data)

🔍 JSON Schema 验证

JSON Schema 验证确保参数的结构和内容符合预期,提供强大的数据验证能力。

验证规则

  • 类型验证
  • 格式验证
  • 范围验证
  • 模式匹配
  • 依赖关系

高级特性

  • 条件验证
  • 数组长度限制
  • 对象属性验证
  • 自定义验证器
  • 错误消息本地化

🛡️ 类型安全保证

类型安全保证确保在编译时就能发现类型错误,提供更好的开发体验和系统可靠性。

// 运行时类型检查
class TypeValidator<T> {
    private schema: any;
    
    constructor(schema: any) {
        this.schema = schema;
    }
    
    validate(data: any): T {
        return this._validateRecursive(data, this.schema);
    }
    
    private _validateRecursive(data: any, schema: any): any {
        // 基本类型验证
        if (typeof schema === 'string') {
            return this._validatePrimitive(data, schema);
        }
        
        // 对象类型验证
        if (schema.type === 'object') {
            return this._validateObject(data, schema);
        }
        
        // 数组类型验证
        if (schema.type === 'array') {
            return this._validateArray(data, schema);
        }
        
        // 其他类型...
        return data;
    }
    
    private _validateObject(data: any, schema: any): any {
        if (typeof data !== 'object' || data === null) {
            throw new Error('Expected object');
        }
        
        const result: any = {};
        
        // 验证必需字段
        for (const required of schema.required || []) {
            if (!(required in data)) {
                throw new Error(`Missing required field: ${required}`);
            }
            result[required] = data[required];
        }
        
        // 验证所有属性
        for (const [key, value] of Object.entries(data)) {
            if (key in schema.properties) {
                result[key] = this._validateRecursive(value, schema.properties[key]);
            } else if (schema.additionalProperties === false) {
                throw new Error(`Unexpected property: ${key}`);
            }
        }
        
        return result;
    }
}

// 泛型工具定义
interface Tool<T extends Record<string, any>> {
    name: string;
    description: string;
    schema: TypeValidator<T>;
    execute: (params: T) => Promise<any>;
}

// 使用示例
const weatherTool: Tool<{city: string}> = {
    name: 'getWeather',
    description: '获取天气信息',
    schema: new TypeValidator({
        type: 'object',
        properties: {
            city: { type: 'string' }
        },
        required: ['city']
    }),
    execute: async ({city}) => {
        // 实际的天气获取逻辑
        return { temperature: 25, city };
    }
};

🌊 数据流设计

数据流设计采用响应式编程模式,确保数据处理的管道化和可观测性。

Input Stream → Validation Stream → Transform Stream → Execution Stream ↓ Error Handling Stream → Retry Stream → Result Stream → Output Stream ↓ Monitoring Stream → Logging Stream → Metrics Stream
from dataclasses import dataclass
from typing import AsyncGenerator, Callable, Any
import asyncio

@dataclass
class DataFlow<T>:
    data: T
    metadata: dict
    timestamp: float
    source: str

class DataPipeline:
    def __init__(self):
        self.steps = []
        self.monitoring_callbacks = []
    
    def add_step(self, name: str, processor: Callable):
        """添加处理步骤"""
        self.steps.append({
            'name': name,
            'processor': processor
        })
        return self
    
    def add_monitoring(self, callback: Callable):
        """添加监控回调"""
        self.monitoring_callbacks.append(callback)
        return self
    
    async def process(self, input_data: Any) -> AsyncGenerator[Any, None]:
        """处理数据流"""
        current_data = input_data
        
        for step in self.steps:
            step_name = step['name']
            processor = step['processor']
            
            try:
                # 执行处理步骤
                current_data = await processor(current_data)
                
                # 监控处理结果
                for callback in self.monitoring_callbacks:
                    await callback(step_name, current_data)
                
                yield current_data
                
            except Exception as e:
                # 错误处理
                error_result = {
                    'error': str(e),
                    'step': step_name,
                    'input': current_data
                }
                
                for callback in self.monitoring_callbacks:
                    await callback(f"{step_name}_error", error_result)
                
                yield error_result
                break

# 使用示例
async def weather_data_pipeline():
    pipeline = DataPipeline()
    
    # 添加处理步骤
    pipeline.add_step("validate", validate_weather_request)
    pipeline.add_step("transform", transform_weather_params)
    pipeline.add_step("execute", execute_weather_api)
    pipeline.add_step("format", format_weather_response)
    
    # 添加监控
    pipeline.add_monitoring(log_step_execution)
    pipeline.add_monitoring(track_metrics)
    
    # 处理数据流
    input_request = {"city": "Beijing", "units": "celsius"}
    
    async for result in pipeline.process(input_request):
        print(f"Processing result: {result}")
        if "error" in result:
            break

⏰ 时序图分析

时序图展示了 Function Calling 的完整执行流程,清晰地展示了各个组件之间的交互。

User → OpenAI API: Request with Tools OpenAI API → LLM: Analyze Request + Tools LLM → OpenAI API: Tool Call Decision OpenAI API → Client: Tool Call Response Client → Tool Registry: Execute Tool Tool Registry → Function: Call Implementation Function → External Service: API Call External Service → Function: Response Data Function → Tool Registry: Result Tool Registry → Client: Tool Result Client → OpenAI API: Tool Result OpenAI API → LLM: Context + Results LLM → User: Final Response

关键时序特征:异步处理、并行调用、错误传播、结果聚合

📊 UML 类图

UML 类图展示了 Function Calling 系统的主要类和关系,体现了系统的整体架构。

«interface» ToolRegistry ┌─────────────────────────┐ + registerTool(name, func) + getTool(name) + executeTool(toolCall) └─────────────────────────┘ ▲ │ ┌─────────────────────────┐ ┌─────────────────────────┐ │ FunctionCaller │ │ ToolValidator │ ├─────────────────────────┤ ├─────────────────────────┤ + callTools(requests) │ + validate(schema, data)│ + executeCall(toolCall) │ + validateStrict() │ + handleErrors() │ └─────────────────────────┘ └─────────────────────────┘ ▲ │ │ ▼ │ ┌─────────────────────────┐ ┌─────────────────────────┐ │ ToolExecutor │ │ SchemaGenerator │ ├─────────────────────────┤ ├─────────────────────────┤ + execute(func, params) │ + generateSchema(func) │ + handleTimeout() │ + validateSchema() │ + retryOnFailure() │ └─────────────────────────┘ └─────────────────────────┘

🔄 状态转换图

状态转换图展示了 Function Calling 的执行状态变化,有助于理解系统的生命周期。

[Idle] → [Ready] → [Processing] → [Executing] → [Completed] ↑ ↓ ↓ ↓ ↓ └── [Error] ← [Failed] ← [Timeout] ← [ValidationError] ↓ [Retry] → [Processing]

状态说明

  • Idle: 空闲状态
  • Ready: 准备执行
  • Processing: 处理中
  • Executing: 执行中
  • Completed: 完成

错误状态

  • ValidationError: 验证失败
  • Timeout: 超时
  • Failed: 执行失败
  • Error: 错误状态
  • Retry: 重试中

⚡ 性能优化策略

性能优化采用多层次的策略,确保 Function Calling 在高并发场景下的稳定性能。

缓存策略

  • 工具定义缓存
  • Schema 预编译
  • 结果缓存
  • 会话状态缓存

并发优化

  • 连接池管理
  • 异步 I/O
  • 请求批处理
  • 负载均衡

内存优化

  • 对象池化
  • 内存回收
  • 序列化优化
  • 数据压缩

网络优化

  • HTTP/2 支持
  • 连接复用
  • 请求压缩
  • CDN 加速

💾 缓存机制

缓存机制显著提高重复请求的响应速度,减少外部 API 调用和网络开销。

import time
from functools import lru_cache
from typing import Dict, Any
import hashlib

class CacheManager:
    def __init__(self, ttl: int = 300, max_size: int = 1000):
        self.ttl = ttl  # Time to live in seconds
        self.max_size = max_size
        self.cache = {}
        self.access_times = {}
    
    def generate_key(self, tool_name: str, params: Dict) -> str:
        """生成缓存键"""
        # 对参数进行排序以确保一致性
        sorted_params = sorted(params.items())
        param_str = str(sorted_params)
        
        # 创建哈希键
        key_str = f"{tool_name}:{param_str}"
        return hashlib.md5(key_str.encode()).hexdigest()
    
    def get(self, tool_name: str, params: Dict) -> Any:
        """获取缓存结果"""
        key = self.generate_key(tool_name, params)
        
        if key in self.cache:
            entry = self.cache[key]
            
            # 检查是否过期
            if time.time() - entry['timestamp'] < self.ttl:
                self.access_times[key] = time.time()
                return entry['data']
            else:
                # 过期,删除
                del self.cache[key]
                if key in self.access_times:
                    del self.access_times[key]
        
        return None
    
    def set(self, tool_name: str, params: Dict, data: Any):
        """设置缓存结果"""
        key = self.generate_key(tool_name, params)
        
        # 如果缓存已满,删除最旧的条目
        if len(self.cache) >= self.max_size:
            oldest_key = min(self.access_times.keys(), 
                           key=lambda k: self.access_times[k])
            del self.cache[oldest_key]
            del self.access_times[oldest_key]
        
        # 设置新缓存
        self.cache[key] = {
            'data': data,
            'timestamp': time.time(),
            'tool_name': tool_name,
            'params': params
        }
        self.access_times[key] = time.time()
    
    def clear(self):
        """清空缓存"""
        self.cache.clear()
        self.access_times.clear()

# 装饰器方式的缓存
def cached_tool(ttl: int = 300):
    """工具缓存装饰器"""
    cache_manager = CacheManager(ttl=ttl)
    
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            # 生成缓存键
            key = f"{func.__name__}:{str(kwargs)}"
            
            # 检查缓存
            cached_result = cache_manager.get(func.__name__, kwargs)
            if cached_result is not None:
                return cached_result
            
            # 执行函数
            result = await func(*args, **kwargs)
            
            # 缓存结果
            cache_manager.set(func.__name__, kwargs, result)
            
            return result
        return wrapper
    return decorator

# 使用示例
@cached_tool(ttl=60)  # 缓存60秒
async def get_stock_price(symbol: str) -> Dict:
    """获取股票价格,带缓存"""
    # 实际的API调用
    return await fetch_stock_api(symbol)

📦 批处理优化

批处理优化将多个请求合并处理,减少网络开销和提高吞吐量。

import asyncio
from typing import List, Dict, Any
from concurrent.futures import ThreadPoolExecutor
import json

class BatchProcessor:
    def __init__(self, batch_size: int = 10, timeout: float = 30.0):
        self.batch_size = batch_size
        self.timeout = timeout
        self.pending_requests = []
        self.batch_timer = None
        self.executor = ThreadPoolExecutor(max_workers=5)
    
    async def add_request(self, request: Dict) -> asyncio.Future:
        """添加请求到批处理队列"""
        # 创建Future用于返回结果
        future = asyncio.Future()
        
        # 添加到待处理队列
        self.pending_requests.append({
            'request': request,
            'future': future,
            'timestamp': asyncio.get_event_loop().time()
        })
        
        # 检查是否需要立即处理
        if len(self.pending_requests) >= self.batch_size:
            await self._process_batch()
        else:
            # 设置定时器
            if self.batch_timer is None:
                self.batch_timer = asyncio.create_task(self._timer_callback())
        
        return future
    
    async def _timer_callback(self):
        """定时器回调,超时后处理批"""
        await asyncio.sleep(self.timeout)
        await self._process_batch()
    
    async def _process_batch(self):
        """处理一个批次的请求"""
        if not self.pending_requests:
            return
        
        # 复制并清空队列
        batch = self.pending_requests.copy()
        self.pending_requests.clear()
        
        # 取消定时器
        if self.batch_timer:
            self.batch_timer.cancel()
            self.batch_timer = None
        
        # 处理批
        try:
            # 按工具类型分组
            tool_groups = self._group_by_tool(batch)
            
            # 并行处理不同的工具
            tasks = []
            for tool_name, requests in tool_groups.items():
                task = self._process_tool_batch(tool_name, requests)
                tasks.append(task)
            
            # 等待所有工具处理完成
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # 返回结果
            for i, result in enumerate(results):
                if isinstance(result, Exception):
                    batch[i]['future'].set_exception(result)
                else:
                    batch[i]['future'].set_result(result)
                    
        except Exception as e:
            # 处理失败,设置所有Future为异常
            for item in batch:
                item['future'].set_exception(e)
    
    def _group_by_tool(self, batch: List[Dict]) -> Dict[str, List[Dict]]:
        """按工具类型分组"""
        groups = {}
        
        for item in batch:
            tool_name = item['request'].get('tool_name')
            if tool_name not in groups:
                groups[tool_name] = []
            groups[tool_name].append(item)
        
        return groups
    
    async def _process_tool_batch(self, tool_name: str, requests: List[Dict]) -> List[Any]:
        """处理单个工具的批量请求"""
        # 提取参数
        params_list = [req['request']['params'] for req in requests]
        
        # 批量调用工具
        try:
            if tool_name == 'get_stock_price':
                results = await self._batch_get_stock_prices(params_list)
            elif tool_name == 'get_weather':
                results = await self._batch_get_weather(params_list)
            # 其他工具...
            else:
                raise ValueError(f"Unknown tool: {tool_name}")
            
            return results
            
        except Exception as e:
            raise Exception(f"Batch processing failed for {tool_name}: {e}")
    
    async def _batch_get_stock_prices(self, params_list: List[Dict]) -> List[Dict]:
        """批量获取股票价格"""
        # 使用连接池批量调用
        loop = asyncio.get_event_loop()
        
        tasks = []
        for params in params_list:
            task = loop.run_in_executor(
                self.executor,
                self._sync_get_stock_price,
                params['symbol']
            )
            tasks.append(task)
        
        results = await asyncio.gather(*tasks)
        return results
    
    def _sync_get_stock_price(self, symbol: str) -> Dict:
        """同步获取股票价格"""
        # 实际的同步调用
        return {"symbol": symbol, "price": 100.0}  # 示例数据
    
    def shutdown(self):
        """关闭批处理器"""
        if self.batch_timer:
            self.batch_timer.cancel()
        self.executor.shutdown()

# 使用示例
async def batch_example():
    processor = BatchProcessor(batch_size=5, timeout=10.0)
    
    try:
        # 添加多个请求
        futures = []
        for i in range(10):
            request = {
                'tool_name': 'get_stock_price',
                'params': {'symbol': f'AAPL{i}'}
            }
            future = await processor.add_request(request)
            futures.append(future)
        
        # 等待所有结果
        results = await asyncio.gather(*futures)
        print(f"Batch results: {len(results)} items")
        
    finally:
        processor.shutdown()

🌊 流式响应处理

流式响应处理支持实时数据流和增量结果,提供更好的用户体验和性能。

import asyncio
from typing import AsyncGenerator, Dict, Any
import json

class StreamProcessor:
    def __init__(self):
        self.active_streams = {}
        self.buffer_size = 1024
    
    async def process_stream(self, 
                           tool_name: str, 
                           params: Dict,
                           callback = None) -> AsyncGenerator[Dict, None]:
        """处理流式工具调用"""
        stream_id = f"{tool_name}_{id(params)}"
        self.active_streams[stream_id] = {
            'params': params,
            'callback': callback,
            'buffer': []
        }
        
        try:
            # 模拟流式数据
            async for chunk in self._simulate_stream_data(tool_name, params):
                # 缓存数据
                self.active_streams[stream_id]['buffer'].append(chunk)
                
                # 如果达到缓冲大小,发送数据
                if len(self.active_streams[stream_id]['buffer']) >= self.buffer_size:
                    await self._flush_stream(stream_id)
                
                # 调用回调
                if callback:
                    await callback(chunk)
                
                # 生成数据块
                yield chunk
                
        except Exception as e:
            # 错误处理
            error_chunk = {
                'type': 'error',
                'message': str(e),
                'tool_name': tool_name
            }
            yield error_chunk
            
        finally:
            # 清理
            await self._flush_stream(stream_id)
            del self.active_streams[stream_id]
    
    async def _simulate_stream_data(self, tool_name: str, params: Dict) -> AsyncGenerator[Dict, None]:
        """模拟流式数据生成"""
        if tool_name == 'get_stock_data':
            # 模拟实时股票数据
            for i in range(10):
                await asyncio.sleep(0.5)  # 模拟延迟
                yield {
                    'type': 'data',
                    'symbol': params['symbol'],
                    'timestamp': asyncio.get_event_loop().time(),
                    'price': 100 + i * 5,
                    'volume': 1000 + i * 100
                }
        elif tool_name == 'process_large_file':
            # 模拟文件处理进度
            total_chunks = 20
            for i in range(total_chunks):
                await asyncio.sleep(0.2)
                progress = (i + 1) / total_chunks
                yield {
                    'type': 'progress',
                    'current': i + 1,
                    'total': total_chunks,
                    'progress': progress,
                    'chunk_data': f'Processed chunk {i + 1}'
                }
    
    async def _flush_stream(self, stream_id: str):
        """刷新流缓冲区"""
        if stream_id in self.active_streams:
            buffer = self.active_streams[stream_id]['buffer']
            if buffer:
                # 发送批量数据
                batch_chunk = {
                    'type': 'batch',
                    'data': buffer,
                    'count': len(buffer)
                }
                
                callback = self.active_streams[stream_id]['callback']
                if callback:
                    await callback(batch_chunk)
                
                # 清空缓冲区
                self.active_streams[stream_id]['buffer'] = []

# 使用示例
async def streaming_example():
    processor = StreamProcessor()
    
    async def data_callback(chunk: Dict):
        print(f"Received chunk: {chunk}")
    
    # 处理流式数据
    async for chunk in processor.process_stream(
        'get_stock_data',
        {'symbol': 'AAPL'},
        data_callback
    ):
        print(f"Processed: {chunk}")
        
        # 如果是错误类型,停止处理
        if chunk.get('type') == 'error':
            break

🔌 WebSocket 集成

WebSocket 集成提供实时双向通信,支持实时工具调用和结果推送。

Client ←→ WebSocket Server ←→ Tool Registry ↓ ↓ ↓ Real-time Event Processing Function Communication Queue Execution ↓ ↓ ↓ State Management Error Handling Result Streaming
import asyncio
import json
import websockets
from typing import Dict, Set, Any
from dataclasses import dataclass

@dataclass
class WebSocketMessage:
    type: str
    data: Dict
    client_id: str

class WebSocketToolServer:
    def __init__(self):
        self.clients: Set[str] = set()
        self.client_tools: Dict[str, Dict] = {}
        self.tool_queue = asyncio.Queue()
        self.result_cache = {}
    
    async def register_client(self, websocket: websockets.WebSocketServerProtocol):
        """注册新客户端"""
        client_id = f"client_{id(websocket)}"
        self.clients.add(client_id)
        self.client_tools[client_id] = {}
        
        # 发送欢迎消息
        welcome_msg = {
            'type': 'connected',
            'client_id': client_id,
            'message': 'Connected to tool server'
        }
        await websocket.send(json.dumps(welcome_msg))
        
        # 处理客户端消息
        try:
            async for message in websocket:
                await self.handle_message(client_id, message)
        except websockets.exceptions.ConnectionClosed:
            pass
        finally:
            # 清理
            self.clients.discard(client_id)
            if client_id in self.client_tools:
                del self.client_tools[client_id]
    
    async def handle_message(self, client_id: str, message: str):
        """处理客户端消息"""
        try:
            data = json.loads(message)
            msg_type = data.get('type')
            
            if msg_type == 'register_tool':
                await self.register_tool(client_id, data)
            elif msg_type == 'call_tool':
                await self.call_tool(client_id, data)
            elif msg_type == 'subscribe':
                await self.subscribe(client_id, data)
            else:
                await self.send_error(client_id, f"Unknown message type: {msg_type}")
                
        except json.JSONDecodeError:
            await self.send_error(client_id, "Invalid JSON format")
        except Exception as e:
            await self.send_error(client_id, f"Error processing message: {str(e)}")
    
    async def register_tool(self, client_id: str, data: Dict):
        """注册客户端工具"""
        tool_name = data.get('tool_name')
        tool_schema = data.get('schema')
        
        if tool_name and tool_schema:
            self.client_tools[client_id][tool_name] = {
                'schema': tool_schema,
                'registered_at': asyncio.get_event_loop().time()
            }
            
            response = {
                'type': 'tool_registered',
                'tool_name': tool_name,
                'status': 'success'
            }
            await self.send_to_client(client_id, response)
        else:
            await self.send_error(client_id, "Missing tool_name or schema")
    
    async def call_tool(self, client_id: str, data: Dict):
        """调用工具"""
        tool_name = data.get('tool_name')
        params = data.get('params', {})
        
        # 查找工具
        found = False
        for cid, tools in self.client_tools.items():
            if tool_name in tools:
                found = True
                # 创建任务调用工具
                task = asyncio.create_task(
                    self._execute_tool(cid, tool_name, params, client_id)
                )
                break
        
        if not found:
            await self.send_error(client_id, f"Tool {tool_name} not found")
        else:
            # 发送确认消息
            response = {
                'type': 'tool_call_started',
                'tool_name': tool_name,
                'message': 'Tool execution started'
            }
            await self.send_to_client(client_id, response)
    
    async def _execute_tool(self, owner_client: str, tool_name: str, params: Dict, caller_client: str):
        """执行工具调用"""
        try:
            # 模拟工具执行
            result = await self._simulate_tool_execution(tool_name, params)
            
            # 发送结果给调用者
            response = {
                'type': 'tool_result',
                'tool_name': tool_name,
                'result': result,
                'status': 'success'
            }
            await self.send_to_client(caller_client, response)
            
            # 通知所有订阅者
            await self.notify_subscribers(tool_name, result)
            
        except Exception as e:
            error_response = {
                'type': 'tool_error',
                'tool_name': tool_name,
                'error': str(e),
                'status': 'failed'
            }
            await self.send_to_client(caller_client, error_response)
    
    async def _simulate_tool_execution(self, tool_name: str, params: Dict) -> Dict:
        """模拟工具执行"""
        await asyncio.sleep(1)  # 模拟执行时间
        
        if tool_name == 'calculate':
            return {
                'result': eval(params.get('expression', '0')),
                'expression': params.get('expression')
            }
        elif tool_name == 'search':
            return {
                'results': [f"Result for {params.get('query', '')}"],
                'query': params.get('query')
            }
        else:
            raise ValueError(f"Unknown tool: {tool_name}")
    
    async def send_to_client(self, client_id: str, message: Dict):
        """发送消息给特定客户端"""
        # 在实际实现中,需要维护websocket连接
        print(f"Sending to {client_id}: {message}")
    
    async def notify_subscribers(self, tool_name: str, result: Dict):
        """通知所有订阅者"""
        notification = {
            'type': 'tool_update',
            'tool_name': tool_name,
            'result': result,
            'timestamp': asyncio.get_event_loop().time()
        }
        print(f"Notifying subscribers about {tool_name}: {result}")

# 使用示例
async def websocket_server():
    server = WebSocketToolServer()
    
    # 启动WebSocket服务器
    async with websockets.serve(server.register_client, "localhost", 8765):
        print("WebSocket server started on ws://localhost:8765")
        await asyncio.Future()  # 永远运行

🔗 工具组合模式

工具组合模式允许将多个工具的功能组合成新的复合工具,实现功能的灵活组合和复用。

ToolA + ToolB → CompositeTool ↓ [Execute ToolA] → [Execute ToolB] → [Combine Results] ↓ CompositeResult ← Aggregate ← Transform
from typing import List, Dict, Any, Callable
from dataclasses import dataclass

@dataclass
class ToolResult:
    tool_name: str
    data: Any
    metadata: Dict
    success: bool

class CompositeTool:
    """组合工具基类"""
    def __init__(self, name: str, description: str):
        self.name = name
        self.description = description
        self.sub_tools: List[str] = []
        self.combiner: Callable = None
    
    def add_sub_tool(self, tool_name: str):
        """添加子工具"""
        if tool_name not in self.sub_tools:
            self.sub_tools.append(tool_name)
    
    def set_combiner(self, combiner: Callable):
        """设置结果组合器"""
        self.combiner = combiner
    
    async def execute(self, params: Dict) -> Any:
        """执行组合工具"""
        if not self.combiner:
            raise ValueError("Combiner not set")
        
        # 并行执行子工具
        results = {}
        for tool_name in self.sub_tools:
            try:
                result = await execute_tool_async(tool_name, params)
                results[tool_name] = ToolResult(
                    tool_name=tool_name,
                    data=result,
                    metadata={'executed_at': asyncio.get_event_loop().time()},
                    success=True
                )
            except Exception as e:
                results[tool_name] = ToolResult(
                    tool_name=tool_name,
                    data=None,
                    metadata={'error': str(e)},
                    success=False
                )
        
        # 组合结果
        return await self.combiner(results, params)

# 预定义的组合工具
class DataAnalysisComposite(CompositeTool):
    """数据分析组合工具"""
    def __init__(self):
        super().__init__(
            "data_analysis",
            "组合数据获取和分析工具"
        )
        self.add_sub_tool("get_data")
        self.add_sub_tool("analyze_data")
        self.add_sub_tool("generate_report")
        
        self.set_combiner(self._combine_analysis_results)
    
    async def _combine_analysis_results(self, results: Dict, params: Dict) -> Dict:
        """组合分析结果"""
        analysis_result = None
        report_data = None
        
        # 提取各个工具的结果
        if "analyze_data" in results and results["analyze_data"].success:
            analysis_result = results["analyze_data"].data
        
        if "generate_report" in results and results["generate_report"].success:
            report_data = results["generate_report"].data
        
        return {
            "analysis": analysis_result,
            "report": report_data,
            "metadata": {
                "tools_executed": len([r for r in results.values() if r.success]),
                "total_tools": len(results)
            }
        }

class WorkflowTool(CompositeTool):
    """工作流工具"""
    def __init__(self, name: str):
        super().__init__(name, f"Workflow: {name}")
        self.steps = []
        self.dependencies = {}
    
    def add_step(self, tool_name: str, depends_on: List[str] = None):
        """添加执行步骤"""
        self.sub_tools.append(tool_name)
        if depends_on:
            self.dependencies[tool_name] = depends_on
    
    async def execute(self, params: Dict) -> Any:
        """按依赖顺序执行步骤"""
        results = {}
        completed = set()
        
        # 按依赖顺序执行
        while len(completed) < len(self.sub_tools):
            for tool_name in self.sub_tools:
                if tool_name in completed:
                    continue
                
                # 检查依赖
                dependencies_met = True
                for dep in self.dependencies.get(tool_name, []):
                    if dep not in completed or not results[dep].success:
                        dependencies_met = False
                        break
                
                if dependencies_met:
                    try:
                        result = await execute_tool_async(tool_name, params)
                        results[tool_name] = ToolResult(
                            tool_name=tool_name,
                            data=result,
                            metadata={'executed_at': asyncio.get_event_loop().time()},
                            success=True
                        )
                        completed.add(tool_name)
                    except Exception as e:
                        results[tool_name] = ToolResult(
                            tool_name=tool_name,
                            data=None,
                            metadata={'error': str(e)},
                            success=False
                        )
                        completed.add(tool_name)  # 标记为完成,即使失败
        
        return {
            "workflow_name": self.name,
            "steps": results,
            "summary": {
                "successful_steps": len([r for r in results.values() if r.success]),
                "failed_steps": len([r for r in results.values() if not r.success]),
                "total_steps": len(results)
            }
        }

# 使用示例
async def composite_example():
    # 创建数据分析组合工具
    data_analysis = DataAnalysisComposite()
    
    # 执行组合工具
    result = await data_analysis.execute({
        "query": "sales data 2024",
        "format": "detailed"
    })
    
    print(f"Analysis result: {result}")
    
    # 创建工作流工具
    workflow = WorkflowTool("data_pipeline")
    workflow.add_step("extract_data")
    workflow.add_step("transform_data", depends_on=["extract_data"])
    workflow.add_step("load_data", depends_on=["transform_data"])
    
    # 执行工作流
    workflow_result = await workflow.execute({
        "source": "database",
        "target": "data_warehouse"
    })
    
    print(f"Workflow result: {workflow_result}")

🏭 流水线设计

流水线设计将复杂的业务流程分解为多个处理阶段,实现流程的模块化和可维护性。

from typing import List, Dict, Any, Callable
from dataclasses import dataclass
from enum import Enum
import asyncio

class PipelineStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    RETRYING = "retrying"

@dataclass
class PipelineStep:
    name: str
    processor: Callable
    dependencies: List[str]
    retry_count: int = 0
    max_retries: int = 3
    timeout: float = 30.0

class DataPipeline:
    def __init__(self, name: str):
        self.name = name
        self.steps: Dict[str, PipelineStep] = {}
        self.execution_order: List[str] = []
        self.status = PipelineStatus.PENDING
        self.results: Dict[str, Any] = {}
        self.errors: Dict[str, Exception] = {}
    
    def add_step(self, name: str, processor: Callable, dependencies: List[str] = None, **kwargs):
        """添加流水线步骤"""
        if dependencies is None:
            dependencies = []
        
        self.steps[name] = PipelineStep(
            name=name,
            processor=processor,
            dependencies=dependencies,
            **kwargs
        )
        
        # 更新执行顺序
        self._update_execution_order()
    
    def _update_execution_order(self):
        """更新步骤执行顺序(拓扑排序)"""
        visited = set()
        temp_visited = set()
        order = []
        
        def visit(step_name: str):
            if step_name in temp_visited:
                raise ValueError(f"Circular dependency detected in pipeline {self.name}")
            
            if step_name not in visited:
                temp_visited.add(step_name)
                
                # 访问依赖
                for dep in self.steps[step_name].dependencies:
                    visit(dep)
                
                temp_visited.remove(step_name)
                visited.add(step_name)
                order.append(step_name)
        
        for step_name in self.steps:
            if step_name not in visited:
                visit(step_name)
        
        self.execution_order = order
    
    async def execute(self, input_data: Dict) -> Dict:
        """执行流水线"""
        self.status = PipelineStatus.RUNNING
        self.results = {}
        self.errors = {}
        
        try:
            for step_name in self.execution_order:
                step = self.steps[step_name]
                
                # 准备步骤输入数据
                step_input = self._prepare_step_input(step_name, input_data)
                
                # 执行步骤
                step_result = await self._execute_step(step, step_input)
                
                # 存储结果
                self.results[step_name] = step_result
                
                # 更新输入数据供后续步骤使用
                input_data.update(step_result)
            
            self.status = PipelineStatus.COMPLETED
            return self.results
            
        except Exception as e:
            self.status = PipelineStatus.FAILED
            self.errors["pipeline"] = e
            raise
    
    def _prepare_step_input(self, step_name: str, input_data: Dict) -> Dict:
        """准备步骤输入数据"""
        step_input = input_data.copy()
        
        # 添加依赖步骤的结果
        for dep in self.steps[step_name].dependencies:
            if dep in self.results:
                step_input[f"_{dep}_result"] = self.results[dep]
        
        return step_input
    
    async def _execute_step(self, step: PipelineStep, input_data: Dict) -> Any:
        """执行单个步骤"""
        max_retries = step.max_retries
        retry_count = 0
        
        while retry_count <= max_retries:
            try:
                # 设置超时
                result = await asyncio.wait_for(
                    step.processor(input_data),
                    timeout=step.timeout
                )
                
                return result
                
            except asyncio.TimeoutError:
                retry_count += 1
                if retry_count > max_retries:
                    raise Exception(f"Step {step.name} timeout after {max_retries} retries")
                await asyncio.sleep(2 ** retry_count)  # 指数退避
                
            except Exception as e:
                retry_count += 1
                if retry_count > max_retries:
                    raise Exception(f"Step {step.name} failed after {max_retries} retries: {str(e)}")
                await asyncio.sleep(1)  # 简单等待
        
        raise Exception(f"Step {step.name} exceeded max retries")

# 预定义的流水线模板
class DataProcessingPipeline(DataPipeline):
    """数据处理流水线"""
    def __init__(self):
        super().__init__("data_processing")
        
        # 添加标准步骤
        self.add_step("validate", validate_data, max_retries=2)
        self.add_step("clean", clean_data, dependencies=["validate"])
        self.add_step("transform", transform_data, dependencies=["clean"])
        self.add_step("analyze", analyze_data, dependencies=["transform"])
        self.add_step("store", store_data, dependencies=["analyze"])
    
    async def execute_with_monitoring(self, input_data: Dict) -> Dict:
        """带监控的流水线执行"""
        monitoring_callbacks = []
        
        def add_monitoring(callback):
            monitoring_callbacks.append(callback)
        
        # 执行流水线
        result = await self.execute(input_data)
        
        # 触发监控回调
        for callback in monitoring_callbacks:
            callback(self)
        
        return result

# 使用示例
async def pipeline_example():
    # 创建数据处理流水线
    pipeline = DataProcessingPipeline()
    
    # 定义各个步骤的处理函数
    async def validate_data(data: Dict) -> Dict:
        """数据验证步骤"""
        if "data" not in data:
            raise ValueError("Missing data field")
        return {"validated": True, "validation_report": "Data passed validation"}
    
    async def clean_data(data: Dict) -> Dict:
        """数据清洗步骤"""
        cleaned_data = data["data"]  # 简化的清洗逻辑
        return {"cleaned_data": cleaned_data}
    
    async def transform_data(data: Dict) -> Dict:
        """数据转换步骤"""
        transformed = [x * 2 for x in data["cleaned_data"]]  # 简单转换
        return {"transformed_data": transformed}
    
    async def analyze_data(data: Dict) -> Dict:
        """数据分析步骤"""
        analysis = {
            "sum": sum(data["transformed_data"]),
            "average": sum(data["transformed_data"]) / len(data["transformed_data"]),
            "count": len(data["transformed_data"])
        }
        return {"analysis": analysis}
    
    async def store_data(data: Dict) -> Dict:
        """数据存储步骤"""
        # 模拟存储
        return {"stored": True, "storage_id": f"store_{id(data)}"}
    
    # 执行流水线
    input_data = {
        "data": [1, 2, 3, 4, 5],
        "metadata": {"source": "test", "timestamp": "2024-01-01"}
    }
    
    try:
        result = await pipeline.execute(input_data)
        print(f"Pipeline completed successfully: {result}")
    except Exception as e:
        print(f"Pipeline failed: {e}")
        print(f"Errors: {pipeline.errors}")
        print(f"Partial results: {pipeline.results}")

🎭 代理模式应用

代理模式为工具调用提供间接访问层,实现访问控制、缓存、日志等功能。

from typing import Dict, Any, Callable
from functools import wraps
import time
import logging

class ToolProxy:
    """工具代理基类"""
    def __init__(self, real_tool: Callable):
        self.real_tool = real_tool
        self.logger = logging.getLogger(f"proxy_{real_tool.__name__}")
    
    def __call__(self, *args, **kwargs):
        """代理调用"""
        return self.real_tool(*args, **kwargs)

class CachingToolProxy(ToolProxy):
    """缓存代理"""
    def __init__(self, real_tool: Callable, ttl: int = 300):
        super().__init__(real_tool)
        self.cache = {}
        self.ttl = ttl
    
    def __call__(self, *args, **kwargs):
        """带缓存的代理调用"""
        # 生成缓存键
        cache_key = f"{self.real_tool.__name__}:{str(args)}:{str(sorted(kwargs.items()))}"
        
        # 检查缓存
        if cache_key in self.cache:
            cached_data = self.cache[cache_key]
            if time.time() - cached_data['timestamp'] < self.ttl:
                self.logger.info(f"Cache hit for {cache_key}")
                return cached_data['data']
            else:
                # 过期,删除
                del self.cache[cache_key]
        
        # 执行真实工具
        self.logger.info(f"Executing tool {self.real_tool.__name__}")
        result = self.real_tool(*args, **kwargs)
        
        # 缓存结果
        self.cache[cache_key] = {
            'data': result,
            'timestamp': time.time()
        }
        
        return result

class LoggingToolProxy(ToolProxy):
    """日志代理"""
    def __init__(self, real_tool: Callable):
        super().__init__(real_tool)
        self.call_count = 0
    
    def __call__(self, *args, **kwargs):
        """带日志的代理调用"""
        self.call_count += 1
        start_time = time.time()
        
        try:
            result = self.real_tool(*args, **kwargs)
            duration = time.time() - start_time
            
            self.logger.info(
                f"Tool {self.real_tool.__name__} executed successfully. "
                f"Call #{self.call_count}, Duration: {duration:.2f}s, "
                f"Args: {args}, Kwargs: {kwargs}"
            )
            
            return result
            
        except Exception as e:
            duration = time.time() - start_time
            self.logger.error(
                f"Tool {self.real_tool.__name__} failed. "
                f"Call #{self.call_count}, Duration: {duration:.2f}s, "
                f"Error: {str(e)}"
            )
            raise

class AuthorizationToolProxy(ToolProxy):
    """授权代理"""
    def __init__(self, real_tool: Callable, allowed_roles: list):
        super().__init__(real_tool)
        self.allowed_roles = allowed_roles
    
    def __call__(self, *args, **kwargs):
        """带授权的代理调用"""
        # 检查用户权限
        user_role = kwargs.get('user_role', 'guest')
        
        if user_role not in self.allowed_roles:
            raise PermissionError(f"Role '{user_role}' not allowed to use {self.real_tool.__name__}")
        
        self.logger.info(f"Authorized access to {self.real_tool.__name__} by {user_role}")
        return self.real_tool(*args, **kwargs)

class RetryToolProxy(ToolProxy):
    """重试代理"""
    def __init__(self, real_tool: Callable, max_retries: int = 3, delay: float = 1.0):
        super().__init__(real_tool)
        self.max_retries = max_retries
        self.delay = delay
    
    def __call__(self, *args, **kwargs):
        """带重试的代理调用"""
        last_exception = None
        
        for attempt in range(self.max_retries + 1):
            try:
                if attempt > 0:
                    self.logger.info(f"Retry attempt {attempt} for {self.real_tool.__name__}")
                    time.sleep(self.delay * (2 ** (attempt - 1)))  # 指数退避
                
                return self.real_tool(*args, **kwargs)
                
            except Exception as e:
                last_exception = e
                self.logger.warning(
                    f"Attempt {attempt + 1} failed for {self.real_tool.__name__}: {str(e)}"
                )
                
                if attempt == self.max_retries:
                    break
        
        # 所有重试都失败
        self.logger.error(f"All {self.max_retries + 1} attempts failed for {self.real_tool.__name__}")
        raise last_exception

class MetricsToolProxy(ToolProxy):
    """监控代理"""
    def __init__(self, real_tool: Callable):
        super().__init__(real_tool)
        self.execution_times = []
        self.success_count = 0
        self.failure_count = 0
    
    def __call__(self, *args, **kwargs):
        """带监控的代理调用"""
        start_time = time.time()
        
        try:
            result = self.real_tool(*args, **kwargs)
            duration = time.time() - start_time
            
            self.success_count += 1
            self.execution_times.append(duration)
            
            # 计算统计信息
            avg_time = sum(self.execution_times) / len(self.execution_times)
            
            self.logger.info(
                f"Tool {self.real_tool.__name__} metrics: "
                f"Success: {self.success_count}, Failures: {self.failure_count}, "
                f"Avg Time: {avg_time:.2f}s, Current Time: {duration:.2f}s"
            )
            
            return result
            
        except Exception as e:
            duration = time.time() - start_time
            self.failure_count += 1
            
            self.logger.error(
                f"Tool {self.real_tool.__name__} failed in {duration:.2f}s. "
                f"Total successes: {self.success_count}, failures: {self.failure_count}"
            )
            raise

# 代理工厂
def create_tool_proxy(real_tool: Callable, proxy_types: list) -> ToolProxy:
    """创建工具代理链"""
    proxy = real_tool
    
    for proxy_type in reversed(proxy_types):  # 反向应用代理
        if proxy_type == "cache":
            proxy = CachingToolProxy(proxy)
        elif proxy_type == "logging":
            proxy = LoggingToolProxy(proxy)
        elif proxy_type == "authorization":
            proxy = AuthorizationToolProxy(proxy, allowed_roles=["admin", "user"])
        elif proxy_type == "retry":
            proxy = RetryToolProxy(proxy)
        elif proxy_type == "metrics":
            proxy = MetricsToolProxy(proxy)
    
    return proxy

# 使用示例
async def proxy_example():
    # 定义真实工具
    def get_user_data(user_id: int) -> Dict:
        """获取用户数据的真实工具"""
        if user_id == 1:
            return {"id": 1, "name": "Alice", "email": "[email protected]"}
        else:
            raise ValueError(f"User {user_id} not found")
    
    # 创建代理链
    proxy = create_tool_proxy(
        get_user_data,
        ["cache", "logging", "retry", "metrics"]
    )
    
    # 使用代理
    try:
        # 第一次调用(会执行真实函数)
        result1 = proxy(1)
        print(f"First call result: {result1}")
        
        # 第二次调用(会命中缓存)
        result2 = proxy(1)
        print(f"Second call result: {result2}")
        
        # 会失败的调用(会重试)
        try:
            result3 = proxy(999)  # 不存在的用户
        except Exception as e:
            print(f"Expected error: {e}")
            
    except Exception as e:
        print(f"Error: {e}")

✨ 装饰器模式

装饰器模式为工具函数动态添加功能,在不修改原有代码的情况下增强工具能力。

from typing import Callable, Dict, Any, List
from functools import wraps
import time
import logging
from dataclasses import dataclass

@dataclass
class ToolMetadata:
    name: str
    description: str
    version: str = "1.0"
    category: str = "general"
    tags: List[str] = None

def tool(name: str = None, description: str = None, **metadata):
    """工具装饰器"""
    def decorator(func: Callable):
        # 设置工具元数据
        func._tool_metadata = ToolMetadata(
            name=name or func.__name__,
            description=description or func.__doc__ or "No description",
            **metadata
        )
        
        @wraps(func)
        async def wrapper(*args, **kwargs):
            return await func(*args, **kwargs)
        
        return wrapper
    return decorator

def timing_decorator(func: Callable):
    """执行时间装饰器"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = time.time()
        try:
            result = await func(*args, **kwargs)
            duration = time.time() - start_time
            logging.info(f"Tool {func.__name__} executed in {duration:.2f}s")
            return result
        except Exception as e:
            duration = time.time() - start_time
            logging.error(f"Tool {func.__name__} failed after {duration:.2f}s: {str(e)}")
            raise
    return wrapper

def validation_decorator(schema: Dict):
    """参数验证装饰器"""
    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 这里可以添加参数验证逻辑
            # 简化示例,实际应该使用JSON Schema验证
            return await func(*args, **kwargs)
        return wrapper
    return decorator

def caching_decorator(ttl: int = 300):
    """缓存装饰器"""
    cache = {}
    
    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 生成缓存键
            cache_key = f"{func.__name__}:{str(args)}:{str(sorted(kwargs.items()))}"
            
            # 检查缓存
            if cache_key in cache:
                cached_data = cache[cache_key]
                if time.time() - cached_data['timestamp'] < ttl:
                    return cached_data['data']
                else:
                    del cache[cache_key]
            
            # 执行函数并缓存结果
            result = await func(*args, **kwargs)
            cache[cache_key] = {
                'data': result,
                'timestamp': time.time()
            }
            
            return result
        return wrapper
    return decorator

def authorization_decorator(allowed_roles: List[str]):
    """授权装饰器"""
    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            user_role = kwargs.get('user_role', 'guest')
            if user_role not in allowed_roles:
                raise PermissionError(f"Access denied for role: {user_role}")
            
            return await func(*args, **kwargs)
        return wrapper
    return decorator

def retry_decorator(max_retries: int = 3, delay: float = 1.0):
    """重试装饰器"""
    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(max_retries + 1):
                try:
                    if attempt > 0:
                        logging.info(f"Retry attempt {attempt} for {func.__name__}")
                        await asyncio.sleep(delay * (2 ** (attempt - 1)))
                    
                    return await func(*args, **kwargs)
                    
                except Exception as e:
                    last_exception = e
                    logging.warning(f"Attempt {attempt + 1} failed for {func.__name__}: {str(e)}")
            
            raise last_exception
        return wrapper
    return decorator

# 装饰器组合示例
@tool(
    name="user_management",
    description="用户管理工具",
    version="2.0",
    category="admin",
    tags=["users", "management", "admin"]
@timing_decorator
@caching_decorator(ttl=60)
@validation_decorator({"type": "object", "properties": {"user_id": {"type": "integer"}}})
@authorization_decorator(["admin"])
@retry_decorator(max_retries=3)
async def create_user(user_id: int, user_data: Dict, user_role: str = "guest") -> Dict:
    """创建用户的工具函数"""
    if user_id <= 0:
        raise ValueError("Invalid user ID")
    
    # 模拟用户创建
    user = {
        "id": user_id,
        "name": user_data.get("name", f"User{user_id}"),
        "email": user_data.get("email", f"user{user_id}@example.com"),
        "created_at": time.time(),
        "role": user_role
    }
    
    logging.info(f"Created user {user_id} with role {user_role}")
    return user

# 装饰器工厂
def create_tool_decorator(decorators: List[Callable]):
    """创建工具装饰器链"""
    def decorator(func: Callable):
        decorated_func = func
        
        # 反向应用装饰器(最外层的装饰器最先应用)
        for d in reversed(decorators):
            decorated_func = d(decorated_func)
        
        return decorated_func
    return decorator

# 使用示例
async def decorator_example():
    # 使用预定义的装饰器链
    tool_decorators = [
        timing_decorator,
        caching_decorator(ttl=30),
        authorization_decorator(["admin", "user"]),
        retry_decorator(max_retries=2)
    ]
    
    @create_tool_decorator(tool_decorators)
    @tool(name="sample_tool", description="示例工具")
    async def sample_function(data: str, user_role: str = "guest") -> str:
        return f"Processed: {data} by {user_role}"
    
    # 使用装饰器后的函数
    try:
        result = await sample_function("test data", user_role="admin")
        print(f"Result: {result}")
        
        # 再次调用(会命中缓存)
        result2 = await sample_function("test data", user_role="admin")
        print(f"Cached result: {result2}")
        
    except Exception as e:
        print(f"Error: {e}")

🎯 命令模式

命令模式将工具调用封装为命令对象,支持命令的排队、撤销和重做功能。

Command Interface → Concrete Commands ↓ ↓ Invoker → Receiver → Tool Function ↓ Command Queue → Execution → Result
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from abc import ABC, abstractmethod
import asyncio
import uuid
from enum import Enum

class CommandType(Enum):
    EXECUTE = "execute"
    CANCEL = "cancel"
    RETRY = "retry"
    UNDO = "undo"

class CommandStatus(Enum):
    PENDING = "pending"
    EXECUTING = "executing"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class Command:
    id: str
    type: CommandType
    tool_name: str
    parameters: Dict[str, Any]
    status: CommandStatus = CommandStatus.PENDING
    result: Optional[Any] = None
    error: Optional[str] = None
    created_at: float = None
    executed_at: Optional[float] = None
    completed_at: Optional[float] = None
    retry_count: int = 0
    max_retries: int = 3

class CommandReceiver:
    """命令接收者,执行实际的工具函数"""
    def __init__(self):
        self.tools = {}
    
    def register_tool(self, name: str, func: callable):
        """注册工具函数"""
        self.tools[name] = func
    
    async def execute_tool(self, tool_name: str, params: Dict) -> Any:
        """执行工具函数"""
        if tool_name not in self.tools:
            raise ValueError(f"Tool {tool_name} not found")
        
        tool_func = self.tools[tool_name]
        return await tool_func(**params)

class CommandInvoker:
    """命令调用者,管理命令执行"""
    def __init__(self, receiver: CommandReceiver):
        self.receiver = receiver
        self.command_queue = asyncio.Queue()
        self.history: List[Command] = []
        self.current_command: Optional[Command] = None
        self.is_running = False
        self.max_queue_size = 100
    
    async def execute_command(self, command: Command) -> Command:
        """执行单个命令"""
        if self.command_queue.qsize() >= self.max_queue_size:
            raise Exception("Command queue is full")
        
        await self.command_queue.put(command)
        return command
    
    async def execute_commands(self, commands: List[Command]) -> List[Command]:
        """批量执行命令"""
        results = []
        
        for command in commands:
            result = await self.execute_command(command)
            results.append(result)
        
        return results
    
    async def process_queue(self):
        """处理命令队列"""
        self.is_running = True
        
        while self.is_running or not self.command_queue.empty():
            try:
                # 获取命令(带超时)
                command = await asyncio.wait_for(
                    self.command_queue.get(),
                    timeout=1.0
                )
                
                # 执行命令
                await self._execute_single_command(command)
                
            except asyncio.TimeoutError:
                continue
            except Exception as e:
                print(f"Error processing command queue: {e}")
    
    async def _execute_single_command(self, command: Command):
        """执行单个命令"""
        self.current_command = command
        command.status = CommandStatus.EXECUTING
        command.executed_at = asyncio.get_event_loop().time()
        
        try:
            # 根据命令类型执行
            if command.type == CommandType.EXECUTE:
                result = await self.receiver.execute_tool(
                    command.tool_name,
                    command.parameters
                )
                command.result = result
                command.status = CommandStatus.COMPLETED
                
            elif command.type == CommandType.CANCEL:
                command.status = CommandStatus.CANCELLED
                
            elif command.type == CommandType.RETRY:
                if command.retry_count < command.max_retries:
                    command.retry_count += 1
                    command.status = CommandStatus.PENDING
                    await self.command_queue.put(command)  # 重新排队
                else:
                    command.status = CommandStatus.FAILED
                    command.error = "Max retries exceeded"
            
            elif command.type == CommandType.UNDO:
                # 撤销逻辑
                command.status = CommandStatus.COMPLETED
                command.result = "Undo executed"
            
        except Exception as e:
            command.error = str(e)
            command.status = CommandStatus.FAILED
            
            # 重试逻辑
            if command.retry_count < command.max_retries:
                command.retry_count += 1
                command.status = CommandStatus.PENDING
                await self.command_queue.put(command)  # 重新排队
        
        finally:
            command.completed_at = asyncio.get_event_loop().time()
            self.history.append(command)
            self.current_command = None
    
    async def cancel_command(self, command_id: str) -> bool:
        """取消命令"""
        # 在队列中查找并取消
        cancelled = False
        
        # 简化实现,实际需要更复杂的队列管理
        for command in self.history:
            if command.id == command_id and command.status == CommandStatus.PENDING:
                command.status = CommandStatus.CANCELLED
                cancelled = True
                break
        
        return cancelled
    
    async def undo_command(self, command_id: str) -> bool:
        """撤销命令"""
        for command in self.history:
            if command.id == command_id and command.status == CommandStatus.COMPLETED:
                undo_command = Command(
                    id=str(uuid.uuid4()),
                    type=CommandType.UNDO,
                    tool_name=command.tool_name,
                    parameters=command.parameters,
                    max_retries=0
                )
                await self.execute_command(undo_command)
                return True
        return False

class ToolCommandManager:
    """工具命令管理器"""
    def __init__(self):
        self.receiver = CommandReceiver()
        self.invoker = CommandInvoker(self.receiver)
    
    def register_tool(self, name: str, func: callable):
        """注册工具函数"""
        self.receiver.register_tool(name, func)
    
    async def create_command(self, tool_name: str, params: Dict, **kwargs) -> Command:
        """创建命令"""
        command = Command(
            id=str(uuid.uuid4()),
            type=CommandType.EXECUTE,
            tool_name=tool_name,
            parameters=params,
            max_retries=kwargs.get('max_retries', 3),
            created_at=asyncio.get_event_loop().time()
        )
        return command
    
    async def execute_tool(self, tool_name: str, params: Dict, **kwargs) -> Any:
        """直接执行工具(带命令包装)"""
        command = await self.create_command(tool_name, params, **kwargs)
        await self.invoker.execute_command(command)
        return command.result
    
    async def start_processor(self):
        """启动命令处理器"""
        await self.invoker.process_queue()
    
    def stop_processor(self):
        """停止命令处理器"""
        self.invoker.is_running = False

# 使用示例
async def command_example():
    # 创建命令管理器
    manager = ToolCommandManager()
    
    # 注册工具函数
    async def get_weather(city: str) -> Dict:
        """获取天气信息"""
        await asyncio.sleep(1)  # 模拟延迟
        return {
            "city": city,
            "temperature": 20 + hash(city) % 10,
            "humidity": 60 + hash(city) % 20
        }
    
    async def send_notification(message: str) -> Dict:
        """发送通知"""
        await asyncio.sleep(0.5)
        return {"sent": True, "message": message}
    
    manager.register_tool("get_weather", get_weather)
    manager.register_tool("send_notification", send_notification)
    
    # 启动命令处理器
    processor_task = asyncio.create_task(manager.start_processor())
    
    try:
        # 创建和执行命令
        commands = []
        
        # 天气查询命令
        weather_command = await manager.create_command(
            "get_weather",
            {"city": "Beijing"}
        )
        commands.append(weather_command)
        
        # 通知命令
        notification_command = await manager.create_command(
            "send_notification",
            {"message": "Weather data retrieved"}
        )
        commands.append(notification_command)
        
        # 批量执行命令
        await manager.invoker.execute_commands(commands)
        
        # 等待命令完成
        await asyncio.sleep(3)
        
        # 查看执行结果
        for command in commands:
            print(f"Command {command.id}: {command.status.value}")
            if command.result:
                print(f"Result: {command.result}")
        
    finally:
        # 停止处理器
        manager.stop_processor()
        await processor_task

👁️ 观察者模式

观察者模式实现工具执行的实时监控和事件通知,支持松耦合的响应机制。

from typing import List, Dict, Any, Callable
from dataclasses import dataclass
from abc import ABC, abstractmethod
import asyncio
import uuid
from enum import Enum

class EventType(Enum):
    TOOL_START = "tool_start"
    TOOL_COMPLETE = "tool_complete"
    TOOL_ERROR = "tool_error"
    TOOL_PROGRESS = "tool_progress"
    SYSTEM_EVENT = "system_event"

@dataclass
class Event:
    type: EventType
    tool_name: str
    data: Dict[str, Any]
    timestamp: float
    event_id: str

class Observer(ABC):
    """观察者接口"""
    @abstractmethod
    async def on_event(self, event: Event):
        """处理事件"""
        pass

class ToolEventObserver(Observer):
    """工具事件观察者"""
    async def on_event(self, event: Event):
        print(f"[{event.timestamp:.2f}] {event.type.value}: {event.tool_name}")
        if event.data:
            print(f"Data: {event.data}")

class LoggingObserver(Observer):
    """日志观察者"""
    async def on_event(self, event: Event):
        log_message = f"Event: {event.type.value} for {event.tool_name}"
        if event.data:
            log_message += f" with data: {event.data}"
        
        # 这里应该使用真实的日志系统
        print(f"[LOG] {log_message}")

class MetricsObserver(Observer):
    """监控指标观察者"""
    def __init__(self):
        self.metrics = {
            "total_events": 0,
            "tool_starts": 0,
            "tool_completions": 0,
            "tool_errors": 0,
            "total_duration": 0
        }
    
    async def on_event(self, event: Event):
        self.metrics["total_events"] += 1
        
        if event.type == EventType.TOOL_START:
            self.metrics["tool_starts"] += 1
            self.start_times[event.event_id] = event.timestamp
            
        elif event.type == EventType.TOOL_COMPLETE:
            self.metrics["tool_completions"] += 1
            if event.event_id in self.start_times:
                duration = event.timestamp - self.start_times[event.event_id]
                self.metrics["total_duration"] += duration
                
        elif event.type == EventType.TOOL_ERROR:
            self.metrics["tool_errors"] += 1
        
        # 定期打印指标
        if self.metrics["total_events"] % 10 == 0:
            print(f"[METRICS] {self.metrics}")

class ObservableSubject:
    """被观察的主题"""
    def __init__(self):
        self.observers: List[Observer] = []
        self.start_times = {}
    
    def attach(self, observer: Observer):
        """附加观察者"""
        if observer not in self.observers:
            self.observers.append(observer)
    
    def detach(self, observer: Observer):
        """分离观察者"""
        if observer in self.observers:
            self.observers.remove(observer)
    
    async def notify(self, event: Event):
        """通知所有观察者"""
        for observer in self.observers:
            try:
                await observer.on_event(event)
            except Exception as e:
                print(f"Error notifying observer: {e}")
    
    async def emit_event(self, event_type: EventType, tool_name: str, data: Dict = None):
        """发射事件"""
        event = Event(
            type=event_type,
            tool_name=tool_name,
            data=data or {},
            timestamp=asyncio.get_event_loop().time(),
            event_id=str(uuid.uuid4())
        )
        
        await self.notify(event)
        return event.event_id

class ToolExecutor:
    """工具执行器(被观察者)"""
    def __init__(self):
        self.subject = ObservableSubject()
        self.tools = {}
    
    def register_tool(self, name: str, func: Callable):
        """注册工具函数"""
        self.tools[name] = func
    
    def attach_observer(self, observer: Observer):
        """附加观察者"""
        self.subject.attach(observer)
    
    async def execute_tool(self, tool_name: str, params: Dict) -> Any:
        """执行工具(并发射事件)"""
        if tool_name not in self.tools:
            raise ValueError(f"Tool {tool_name} not found")
        
        # 发射开始事件
        start_event_id = await self.subject.emit_event(
            EventType.TOOL_START,
            tool_name,
            {"params": params}
        )
        
        try:
            # 执行工具函数
            tool_func = self.tools[tool_name]
            result = await tool_func(**params)
            
            # 发射完成事件
            await self.subject.emit_event(
                EventType.TOOL_COMPLETE,
                tool_name,
                {
                    "result": result,
                    "duration": asyncio.get_event_loop().time() - self.subject.start_times[start_event_id]
                }
            )
            
            return result
            
        except Exception as e:
            # 发射错误事件
            await self.subject.emit_event(
                EventType.TOOL_ERROR,
                tool_name,
                {
                    "error": str(e),
                    "duration": asyncio.get_event_loop().time() - self.subject.start_times[start_event_id]
                }
            )
            raise

class ProgressObserver(Observer):
    """进度观察者"""
    async def on_event(self, event: Event):
        if event.type == EventType.TOOL_PROGRESS:
            progress = event.data.get('progress', 0)
            tool_name = event.tool_name
            print(f"[PROGRESS] {tool_name}: {progress:.1%}")

class NotificationObserver(Observer):
    """通知观察者"""
    def __init__(self, notification_service: Callable):
        self.notification_service = notification_service
    
    async def on_event(self, event: Event):
        if event.type in [EventType.TOOL_COMPLETE, EventType.TOOL_ERROR]:
            message = f"Tool {event.tool_name} "
            if event.type == EventType.TOOL_COMPLETE:
                message += "completed successfully"
            else:
                message += f"failed: {event.data.get('error', 'Unknown error')}"
            
            await self.notification_service(message)

class ToolEventSystem:
    """工具事件系统"""
    def __init__(self):
        self.executor = ToolExecutor()
        self.progress_tracker = {}
    
    def setup_observers(self):
        """设置观察者"""
        # 基础观察者
        self.executor.attach_observer(ToolEventObserver())
        self.executor.attach_observer(LoggingObserver())
        self.executor.attach_observer(MetricsObserver())
        
        # 进度观察者
        self.executor.attach_observer(ProgressObserver())
        
        # 通知观察者(示例)
        async def dummy_notification(message: str):
            print(f"[NOTIFICATION] {message}")
        
        self.executor.attach_observer(NotificationObserver(dummy_notification))
    
    def register_tools(self):
        """注册工具"""
        async def process_file(file_path: str, progress_callback: Callable = None):
            """处理文件的工具"""
            total_steps = 100
            for i in range(total_steps):
                if progress_callback:
                    progress = (i + 1) / total_steps
                    await progress_callback(progress)
                await asyncio.sleep(0.01)  # 模拟处理时间
            
            return {"processed": True, "steps": total_steps}
        
        async def analyze_data(data: List[int]):
            """分析数据的工具"""
            await asyncio.sleep(0.5)
            return {
                "sum": sum(data),
                "average": sum(data) / len(data),
                "count": len(data)
            }
        
        self.executor.register_tool("process_file", process_file)
        self.executor.register_tool("analyze_data", analyze_data)
    
    async def execute_with_progress_tracking(self, tool_name: str, params: Dict) -> Any:
        """执行带进度跟踪的工具"""
        event_id = str(uuid.uuid4())
        
        async def progress_callback(progress: float):
            await self.executor.subject.emit_event(
                EventType.TOOL_PROGRESS,
                tool_name,
                {"progress": progress, "event_id": event_id}
            )
        
        # 为进度回调添加参数
        params_with_callback = params.copy()
        if tool_name == "process_file":
            params_with_callback["progress_callback"] = progress_callback
        
        return await self.executor.execute_tool(tool_name, params_with_callback)

# 使用示例
async def observer_example():
    # 创建事件系统
    system = ToolEventSystem()
    system.setup_observers()
    system.register_tools()
    
    try:
        # 执行带进度跟踪的工具
        print("Processing file...")
        result = await system.execute_with_progress_tracking(
            "process_file",
            {"file_path": "large_file.txt"}
        )
        print(f"Processing result: {result}")
        
        # 执行数据分析工具
        print("Analyzing data...")
        result = await system.executor.execute_tool(
            "analyze_data",
            {"data": [1, 2, 3, 4, 5]}
        )
        print(f"Analysis result: {result}")
        
    except Exception as e:
        print(f"Error: {e}")

📋 最佳实践

Function Calling 的最佳实践确保代码质量和系统可靠性,提供可维护和可扩展的解决方案。

设计原则

  • 单一职责:每个工具只做一件事
  • 类型安全:使用类型定义和验证
  • 幂等性:重复调用不会改变结果
  • 错误处理:完善的异常处理机制
  • 文档完整:清晰的文档和注释

性能考虑

  • 合理使用缓存
  • 避免不必要的网络调用
  • 批处理和并行化
  • 连接池管理
  • 内存使用优化

安全性

  • 输入验证和清理
  • 权限控制
  • 敏感数据保护
  • SQL 注入防护
  • XSS 防护

测试策略

  • 单元测试覆盖
  • 集成测试
  • 端到端测试
  • 性能测试
  • 安全测试

⚠️ 反模式警示

常见的 Function Calling 反模式需要避免,这些模式会导致代码质量和系统性能问题。

设计反模式

  • 上帝工具:单个工具处理所有功能
  • 参数过多:函数参数超过7个
  • 紧耦合:工具间强依赖关系
  • 重复代码:相似的逻辑重复实现
  • 全局状态:依赖全局变量

性能反模式

  • 同步阻塞:使用同步I/O
  • 无缓存重复调用:相同参数重复请求
  • 内存泄漏:未正确释放资源
  • 数据库连接泄露:连接未正确关闭
  • 过度并发:创建过多并发任务

🔧 调试技巧

Function Calling 调试需要系统化的方法,快速定位和解决问题。

调试工具

  • 详细日志记录
  • 性能监控
  • 断点调试
  • 单元测试
  • 集成测试

常见问题

  • 参数验证失败
  • 网络超时
  • 内存溢出
  • 并发竞争
  • 循环依赖

调试策略

  • 从简单到复杂
  • 隔离问题组件
  • 使用测试数据
  • 监控资源使用
  • 逐步验证假设

优化建议

  • 代码审查
  • 性能分析
  • 基准测试
  • 压力测试
  • 代码重构

🔒 安全考虑

Function Calling 需要全面的安全考虑,保护系统和用户数据的安全。

输入验证

  • 类型检查
  • 范围验证
  • 格式验证
  • 长度限制
  • 特殊字符过滤

输出编码

  • HTML 转义
  • URL 编码
  • JSON 转义
  • SQL 参数化
  • XSS 防护

权限控制

  • 基于角色的访问控制
  • 最小权限原则
  • 令牌验证
  • 会话管理
  • 审计日志

数据保护

  • 敏感数据加密
  • 传输安全
  • 存储加密
  • 数据脱敏
  • 备份与恢复

📊 监控与日志

监控和日志是 Function Calling 系统运行的基础,提供可观测性和问题诊断能力。

监控指标

  • 请求频率和响应时间
  • 错误率和成功率
  • 资源使用情况
  • 缓存命中率
  • 并发数和队列长度

告警规则

  • 错误率超过阈值
  • 响应时间过长
  • 资源使用过高
  • 队列积压
  • 内存泄漏

日志策略

  • 结构化日志
  • 请求追踪
  • 错误上下文
  • 性能分析
  • 安全事件

日志级别

  • DEBUG:详细调试信息
  • INFO:正常运行信息
  • WARN:警告信息
  • ERROR:错误信息
  • FATAL:严重错误

🧪 测试策略

全面的测试策略确保 Function Calling 的质量和可靠性,覆盖各个层面。

单元测试

  • 工具函数测试
  • 参数验证测试
  • 错误处理测试
  • 缓存逻辑测试
  • 工具组合测试

集成测试

  • 工具链测试
  • 数据库集成测试
  • API 集成测试
  • 第三方服务测试
  • 并发测试

性能测试

  • 负载测试
  • 压力测试
  • 基准测试
  • 内存泄漏测试
  • 响应时间测试

测试工具

  • pytest:Python 测试框架
  • unittest:标准测试库
  • pytest-asyncio:异步测试
  • mock:模拟对象
  • coverage:代码覆盖率

🔌 扩展与集成

Function Calling 系统需要良好的扩展性和集成能力,适应不断变化的需求。

扩展点

  • 插件系统
  • 自定义工具类型
  • 新的 Schema 类型
  • 自定义验证器
  • 扩展的监控指标

集成方式

  • REST API 集成
  • GraphQL 集成
  • 消息队列集成
  • 事件驱动集成
  • 微服务集成

兼容性

  • 版本向后兼容
  • 多语言支持
  • 跨平台兼容
  • 云服务集成
  • 容器化支持

未来方向

  • AI 智能工具推荐
  • 自动工具发现
  • 动态工具生成
  • 边缘计算支持
  • 量子计算集成

🚀 总结与展望

OpenAI Function Calling 已经成为 AI 应用开发的核心技术,正在不断演进和发展。

核心成就

  • 类型安全的函数调用
  • 灵活的工具定义机制
  • 强大的错误处理
  • 高性能并行处理
  • 丰富的 SDK 支持

技术优势

  • 降低 AI 应用开发门槛
  • 提高开发效率
  • 增强系统可靠性
  • 支持复杂业务逻辑
  • 易于维护和扩展

未来趋势

  • 更智能的工具选择
  • 自动工具生成
  • 多模态工具支持
  • 边缘计算优化
  • 实时处理能力

发展方向

  • 增强的推理能力
  • 上下文感知
  • 自适应工具
  • 自动化编排
  • 标准化生态系统