基于OpenAI官方文档与实现
2026-03-29 | AI技术深度解读
第一部分:基础概念
第二部分:核心实现
第三部分:SDK 实现
第四部分:进阶特性
OpenAI Function Calling(现称为 Tools)允许模型返回结构化的函数调用,而不是纯文本响应,使AI能够与外部系统、API和业务逻辑进行交互。
核心理念
应用场景
Function Calling 架构 采用分层设计,确保类型安全、可扩展和高性能。
设计原则:类型安全、高性能、可扩展、容错性强
Function Calling 经历了多次重要演进,从简单的函数调用到现代化的工具系统。
| 版本 | 时间 | 主要变化 |
|---|---|---|
| Initial Release | 2023-06 | functions / function_call 参数 |
| Tools API | 2023-11 | tools / tool_choice 替换 functions |
| Structured Outputs | 2024-09 | strict: true 模式 |
| Enhanced SDK | 2025 | 类型安全、流式响应 |
工具定义 (Tool Definition)
type: "function"function.name - 工具名称function.description - 描述function.parameters - JSON Schema调用机制
执行流程
关键特性
JSON Schema 是 Function Calling 的核心,定义了工具参数的结构、类型和约束。
{
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "工具的名称"
},
"parameters": {
"type": "object",
"properties": {
"type": { "type": "string" },
"description": { "type": "string" },
"properties": {
"param1": {
"type": "string",
"description": "参数1描述"
}
},
"required": ["param1"]
}
}
}
}
支持的数据类型:string, number, boolean, array, object
工具定义采用装饰器模式,使工具注册既灵活又类型安全。
from openai import OpenAI
import json
# 基础工具定义
tools = [
{
"type": "function",
"function": {
"name": "get_stock_price",
"description": "获取股票价格",
"parameters": {
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "股票代码"
}
},
"required": ["symbol"]
}
}
}
]
工具特性:类型安全、描述清晰、参数验证、必填字段
关键步骤
质量控制
解析与执行层 负责将结构化的工具调用转换为实际的函数执行。
def execute_tool_call(tool_call):
try:
# 解析工具调用
function_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
# 查找对应的函数
function = tool_registry.get_function(function_name)
# 执行函数
result = function(**arguments)
# 返回格式化结果
return {
"tool_call_id": tool_call.id,
"role": "tool",
"content": str(result)
}
except Exception as e:
return handle_error(tool_call, e)
执行特性:异常处理、结果缓存、超时控制、重试机制
工具选择基于语义匹配和上下文分析,确保选择最适合的工具来满足用户需求。
匹配策略:语义相似度、上下文相关性、功能匹配度、历史使用数据
参数验证确保类型安全和数据完整性,防止无效或危险的参数传递。
def validate_parameters(schema, parameters):
errors = []
# 检查必填字段
for required in schema.get("required", []):
if required not in parameters:
errors.append(f"Missing required parameter: {required}")
# 类型验证
properties = schema.get("properties", {})
for param_name, param_schema in properties.items():
if param_name in parameters:
value = parameters[param_name]
expected_type = param_schema.get("type")
if not validate_type(value, expected_type):
errors.append(f"Invalid type for {param_name}")
return errors
def validate_type(value, expected_type):
if expected_type == "string":
return isinstance(value, str)
elif expected_type == "number":
return isinstance(value, (int, float))
# ... 其他类型验证
return True
响应处理层负责处理函数执行结果,并将其格式化为模型可以理解的格式。
响应格式
{"role": "tool", "content": "..."}处理策略
基础的 Function Calling 示例,演示如何定义工具和处理函数调用。
import openai
# 初始化客户端
client = openai.OpenAI(api_key="your-api-key")
# 定义工具
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "获取指定城市的天气信息",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称"
}
},
"required": ["city"]
}
}
}
]
# 发送请求
response = client.chat.completions.create(
model="gpt-4",
messages=[{
"role": "user",
"content": "北京今天天气怎么样?"
}],
tools=tools
)
# 处理工具调用
if response.choices[0].message.tool_calls:
for tool_call in response.choices[0].message.tool_calls:
if tool_call.function.name == "get_weather":
import json
args = json.loads(tool_call.function.arguments)
city = args["city"]
weather_data = get_weather_data(city)
# ... 处理结果
高级 Function Calling 示例,包含并行调用、错误处理和结果聚合。
async def advanced_function_calling():
# 定义多个工具
tools = [
{
"type": "function",
"function": {
"name": "get_stock_price",
"parameters": {"type": "object", "properties": {"symbol": {"type": "string"}}}
}
},
{
"type": "function",
"function": {
"name": "get_news",
"parameters": {"type": "object", "properties": {"topic": {"type": "string"}}}
}
}
]
# 并行调用示例
tasks = []
for tool in tools:
task = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "分析苹果股票和科技新闻"}],
tools=[tool]
)
tasks.append(task)
# 执行并行调用
results = await asyncio.gather(*tasks)
# 聚合结果
aggregated_data = {}
for result in results:
if result.choices[0].message.tool_calls:
for tool_call in result.choices[0].message.tool_calls:
function_name = tool_call.function.name
args = json.loads(tool_call.function.arguments)
result = execute_function(function_name, args)
aggregated_data[function_name] = result
return aggregated_data
OpenAI Python SDK 提供了完整的工具支持,使 Function Calling 使用变得简单直观。
from openai import OpenAI
from pydantic import BaseModel, Field
import json
# 使用 Pydantic 定义工具结构
class StockPriceRequest(BaseModel):
symbol: str = Field(description="股票代码,如 AAPL, MSFT")
class WeatherRequest(BaseModel):
city: str = Field(description="城市名称")
# 工具注册
def register_tools():
return [
{
"type": "function",
"function": {
"name": "get_stock_price",
"description": "获取实时股票价格",
"parameters": StockPriceRequest.model_json_schema()
}
},
{
"type": "function",
"function": {
"name": "get_weather",
"description": "获取城市天气信息",
"parameters": WeatherRequest.model_json_schema()
}
}
]
# 使用装饰器模式
def tool_call_decorator(function):
def wrapper(*args, **kwargs):
try:
result = function(*args, **kwargs)
return {"success": True, "data": result}
except Exception as e:
return {"success": False, "error": str(e)}
return wrapper
JavaScript/TypeScript SDK 提供了现代化的工具支持,支持类型安全和异步处理。
import OpenAI from 'openai';
// 初始化客户端
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
// 定义工具类型
interface StockPriceParams {
symbol: string;
}
interface WeatherParams {
city: string;
}
// 工具定义
const tools = [
{
type: 'function',
function: {
name: 'getStockPrice',
description: '获取实时股票价格',
parameters: {
type: 'object',
properties: {
symbol: {
type: 'string',
description: '股票代码,如 AAPL, MSFT'
}
},
required: ['symbol']
}
}
},
{
type: 'function',
function: {
name: 'getWeather',
description: '获取城市天气信息',
parameters: {
type: 'object',
properties: {
city: {
type: 'string',
description: '城市名称'
}
},
required: ['city']
}
}
}
];
// 工具执行函数
async function executeTool(toolName: string, params: any): Promise<any> {
switch (toolName) {
case 'getStockPrice':
return await fetchStockPrice(params.symbol);
case 'getWeather':
return await fetchWeather(params.city);
default:
throw new Error(`Unknown tool: ${toolName}`);
}
}
TypeScript 提供了完整的类型安全支持,确保 Function Calling 的类型正确性。
// 工具类型定义
interface Tool<T extends Record<string, any>> {
type: 'function';
function: {
name: string;
description: string;
parameters: {
type: 'object';
properties: {
[K in keyof T]: {
type: string;
description: string;
};
};
required: Array<keyof T>;
};
};
}
// 工具调用类型
interface ToolCall {
id: string;
type: 'function';
function: {
name: string;
arguments: string; // JSON string
};
}
// 响应类型
interface ChatMessage {
role: 'user' | 'assistant' | 'tool';
content?: string;
tool_calls?: ToolCall[];
}
// 工具注册器
class ToolRegistry {
private tools: Map<string, Function> = new Map();
registerTool<T>(name: string, fn: (params: T) => Promise<any>) {
this.tools.set(name, fn);
}
async executeTool(toolCall: ToolCall): Promise<any> {
const fn = this.tools.get(toolCall.function.name);
if (!fn) {
throw new Error(`Tool not found: ${toolCall.function.name}`);
}
const params = JSON.parse(toolCall.function.arguments);
return await fn(params);
}
}
工具注册机制采用工厂模式,使工具管理既灵活又易于维护。
class ToolRegistry:
def __init__(self):
self.tools = {}
self.schemas = {}
def register_tool(self, name: str, func: Callable, schema: dict):
"""注册工具"""
self.tools[name] = func
self.schemas[name] = schema
def get_tool(self, name: str) -> Callable:
"""获取工具函数"""
return self.tools.get(name)
def get_schema(self, name: str) -> dict:
"""获取工具Schema"""
return self.schemas.get(name)
def list_tools(self) -> list:
"""列出所有工具"""
return list(self.tools.keys())
# 装饰器方式注册
def tool(name: str, description: str):
def decorator(func: Callable):
schema = generate_schema(func, description)
tool_registry.register_tool(name, func, schema)
return func
return decorator
# 使用示例
@tool("calculate", "执行数学计算")
def calculate(expression: str) -> float:
"""安全的数学表达式计算"""
try:
return eval(expression)
except:
raise ValueError("Invalid expression")
@tool("search", "搜索信息")
def search(query: str) -> str:
"""网络搜索功能"""
# 实现搜索逻辑
pass
动态工具加载支持运行时工具发现和加载,提高系统的灵活性和可扩展性。
class DynamicToolLoader:
def __init__(self):
self.plugin_directories = []
self.loaded_tools = {}
def add_plugin_directory(self, directory: str):
"""添加插件目录"""
self.plugin_directories.append(directory)
def discover_tools(self):
"""发现所有可用工具"""
tools = []
for directory in self.plugin_directories:
for file_path in glob.glob(f"{directory}/*.py"):
module_name = os.path.splitext(os.path.basename(file_path))[0]
spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# 查找工具装饰器函数
for attr_name in dir(module):
attr = getattr(module, attr_name)
if hasattr(attr, '_tool_info'):
tools.append(attr)
return tools
def load_tool(self, tool_func):
"""加载单个工具"""
tool_info = tool_func._tool_info
self.loaded_tools[tool_info['name']] = tool_func
return tool_info
def unload_tool(self, name: str):
"""卸载工具"""
if name in self.loaded_tools:
del self.loaded_tools[name]
错误处理采用分层设计,确保系统在遇到错误时能够优雅降级并提供有用的错误信息。
错误类型
处理策略
重试机制确保系统在遇到临时故障时能够自动恢复,提高系统的可靠性。
import asyncio
import time
from typing import Callable, Any
class RetryHandler:
def __init__(self, max_retries: int = 3, delay: float = 1.0):
self.max_retries = max_retries
self.delay = delay
async def execute_with_retry(self, func: Callable, *args, **kwargs) -> Any:
"""带重试的异步执行"""
last_error = None
for attempt in range(self.max_retries + 1):
try:
return await func(*args, **kwargs)
except Exception as e:
last_error = e
if attempt < self.max_retries:
wait_time = self.delay * (2 ** attempt) # 指数退避
await asyncio.sleep(wait_time)
continue
# 所有重试都失败
raise last_error
def execute_with_retry_sync(self, func: Callable, *args, **kwargs) -> Any:
"""同步重试执行"""
last_error = None
for attempt in range(self.max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_error = e
if attempt < self.max_retries:
time.sleep(self.delay * (2 ** attempt))
continue
raise last_error
# 使用示例
async def safe_tool_execution(tool_name: str, params: dict):
retry_handler = RetryHandler(max_retries=3, delay=1.0)
try:
return await retry_handler.execute_with_retry(
execute_tool, tool_name, params
)
except Exception as e:
logger.error(f"Tool {tool_name} failed after retries: {e}")
return {"error": "Tool execution failed", "details": str(e)}
并行调用处理允许多个工具同时执行,显著提高系统的吞吐量和响应速度。
import asyncio
from typing import List, Dict, Any
import concurrent.futures
class ParallelToolExecutor:
def __init__(self, max_workers: int = 5):
self.max_workers = max_workers
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
async def execute_parallel(self, tool_calls: List[Dict]) -> List[Dict]:
"""并行执行多个工具调用"""
# 创建任务列表
tasks = []
for tool_call in tool_calls:
task = asyncio.create_task(
self._execute_single_tool(tool_call)
)
tasks.append(task)
# 并行执行所有任务
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
"tool_call_id": tool_calls[i]["id"],
"error": str(result),
"success": False
})
else:
processed_results.append({
"tool_call_id": tool_calls[i]["id"],
"result": result,
"success": True
})
return processed_results
async def _execute_single_tool(self, tool_call: Dict) -> Any:
"""执行单个工具调用"""
tool_name = tool_call["function"]["name"]
params = json.loads(tool_call["function"]["arguments"])
# 在线程池中执行工具
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
self.executor,
execute_tool_sync,
tool_name,
params
)
return result
def shutdown(self):
"""关闭执行器"""
self.executor.shutdown()
# 使用示例
async def process_multiple_requests(requests: List[Dict]):
executor = ParallelToolExecutor(max_workers=3)
try:
results = await executor.execute_parallel(requests)
return results
finally:
executor.shutdown()
工具链编排允许将多个工具组合成复杂的业务流程,实现更强大的自动化能力。
class ToolChainOrchestrator:
def __init__(self):
self.chains = {}
self.execution_history = []
def register_chain(self, name: str, tools: List[str], dependencies: Dict):
"""注册工具链"""
self.chains[name] = {
"tools": tools,
"dependencies": dependencies,
"status": "registered"
}
async def execute_chain(self, chain_name: str, input_data: Dict) -> Dict:
"""执行工具链"""
if chain_name not in self.chains:
raise ValueError(f"Chain {chain_name} not found")
chain = self.chains[chain_name]
results = {}
# 按依赖关系排序工具执行顺序
execution_order = self._resolve_dependencies(chain)
for tool_name in execution_order:
# 获取工具参数
params = self._prepare_tool_params(tool_name, input_data, results)
# 执行工具
try:
result = await execute_tool_async(tool_name, params)
results[tool_name] = result
# 更新输入数据供后续工具使用
input_data.update(result)
except Exception as e:
# 错误处理和重试
error_result = await self._handle_tool_error(tool_name, params, e)
results[tool_name] = error_result
return results
def _resolve_dependencies(self, chain: Dict) -> List[str]:
"""解析工具依赖关系"""
# 拓扑排序算法
visited = set()
temp_visited = set()
result = []
def visit(tool_name):
if tool_name in temp_visited:
raise ValueError(f"Circular dependency detected")
if tool_name not in visited:
temp_visited.add(tool_name)
# 访问依赖的工具
for dep in chain["dependencies"].get(tool_name, []):
visit(dep)
temp_visited.remove(tool_name)
visited.add(tool_name)
result.append(tool_name)
for tool_name in chain["tools"]:
if tool_name not in visited:
visit(tool_name)
return result
Structured Outputs 是 Function Calling 的重要增强,确保返回的 JSON 严格符合指定的 Schema。
核心特性
strict: true 模式使用场景
{
"type": "function",
"function": {
"name": "generate_report",
"description": "生成标准格式报告",
"parameters": {
"type": "object",
"strict": true,
"properties": {
"title": {
"type": "string",
"description": "报告标题"
},
"sections": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"content": {"type": "string"}
},
"required": ["name", "content"]
}
},
"metadata": {
"type": "object",
"properties": {
"author": {"type": "string"},
"created_at": {"type": "string", "format": "date-time"}
},
"required": ["author"]
}
},
"required": ["title", "sections"]
}
}
}
严格模式确保输出的类型安全性和数据完整性,防止无效或意外的数据格式。
class StrictModeValidator:
def __init__(self):
self.validators = {
"string": self._validate_string,
"number": self._validate_number,
"integer": self._validate_integer,
"boolean": self._validate_boolean,
"array": self._validate_array,
"object": self._validate_object
}
def validate_strict(self, schema: dict, data: any) -> dict:
"""严格模式验证"""
# 检查必需字段
required_fields = schema.get("required", [])
for field in required_fields:
if field not in data:
raise ValueError(f"Missing required field: {field}")
# 检查额外字段
properties = schema.get("properties", {})
for field in data:
if field not in properties:
if schema.get("strict", False):
raise ValueError(f"Unexpected field: {field}")
# 验证每个字段
for field, field_schema in properties.items():
if field in data:
self._validate_field(field, field_schema, data[field])
return data
def _validate_field(self, field: str, schema: dict, value: any):
"""验证单个字段"""
field_type = schema.get("type")
validator = self.validators.get(field_type)
if validator:
validator(value, schema)
# 检查枚举值
if "enum" in schema:
if value not in schema["enum"]:
raise ValueError(f"Invalid value for {field}: {value}")
# 检查模式匹配
if "pattern" in schema:
import re
if not re.match(schema["pattern"], str(value)):
raise ValueError(f"Pattern mismatch for {field}: {value}")
def _validate_object(self, value: any, schema: dict):
"""验证对象类型"""
if not isinstance(value, dict):
raise ValueError("Expected object type")
# 验证对象属性
properties = schema.get("properties", {})
for prop, prop_schema in properties.items():
if prop in value:
self._validate_field(prop, prop_schema, value[prop])
# 使用示例
validator = StrictModeValidator()
schema = {
"type": "object",
"strict": True,
"properties": {
"name": {"type": "string"},
"age": {"type": "number", "minimum": 0}
},
"required": ["name", "age"]
}
data = {"name": "John", "age": 25}
validated_data = validator.validate_strict(schema, data)
JSON Schema 验证确保参数的结构和内容符合预期,提供强大的数据验证能力。
验证规则
高级特性
类型安全保证确保在编译时就能发现类型错误,提供更好的开发体验和系统可靠性。
// 运行时类型检查
class TypeValidator<T> {
private schema: any;
constructor(schema: any) {
this.schema = schema;
}
validate(data: any): T {
return this._validateRecursive(data, this.schema);
}
private _validateRecursive(data: any, schema: any): any {
// 基本类型验证
if (typeof schema === 'string') {
return this._validatePrimitive(data, schema);
}
// 对象类型验证
if (schema.type === 'object') {
return this._validateObject(data, schema);
}
// 数组类型验证
if (schema.type === 'array') {
return this._validateArray(data, schema);
}
// 其他类型...
return data;
}
private _validateObject(data: any, schema: any): any {
if (typeof data !== 'object' || data === null) {
throw new Error('Expected object');
}
const result: any = {};
// 验证必需字段
for (const required of schema.required || []) {
if (!(required in data)) {
throw new Error(`Missing required field: ${required}`);
}
result[required] = data[required];
}
// 验证所有属性
for (const [key, value] of Object.entries(data)) {
if (key in schema.properties) {
result[key] = this._validateRecursive(value, schema.properties[key]);
} else if (schema.additionalProperties === false) {
throw new Error(`Unexpected property: ${key}`);
}
}
return result;
}
}
// 泛型工具定义
interface Tool<T extends Record<string, any>> {
name: string;
description: string;
schema: TypeValidator<T>;
execute: (params: T) => Promise<any>;
}
// 使用示例
const weatherTool: Tool<{city: string}> = {
name: 'getWeather',
description: '获取天气信息',
schema: new TypeValidator({
type: 'object',
properties: {
city: { type: 'string' }
},
required: ['city']
}),
execute: async ({city}) => {
// 实际的天气获取逻辑
return { temperature: 25, city };
}
};
数据流设计采用响应式编程模式,确保数据处理的管道化和可观测性。
from dataclasses import dataclass
from typing import AsyncGenerator, Callable, Any
import asyncio
@dataclass
class DataFlow<T>:
data: T
metadata: dict
timestamp: float
source: str
class DataPipeline:
def __init__(self):
self.steps = []
self.monitoring_callbacks = []
def add_step(self, name: str, processor: Callable):
"""添加处理步骤"""
self.steps.append({
'name': name,
'processor': processor
})
return self
def add_monitoring(self, callback: Callable):
"""添加监控回调"""
self.monitoring_callbacks.append(callback)
return self
async def process(self, input_data: Any) -> AsyncGenerator[Any, None]:
"""处理数据流"""
current_data = input_data
for step in self.steps:
step_name = step['name']
processor = step['processor']
try:
# 执行处理步骤
current_data = await processor(current_data)
# 监控处理结果
for callback in self.monitoring_callbacks:
await callback(step_name, current_data)
yield current_data
except Exception as e:
# 错误处理
error_result = {
'error': str(e),
'step': step_name,
'input': current_data
}
for callback in self.monitoring_callbacks:
await callback(f"{step_name}_error", error_result)
yield error_result
break
# 使用示例
async def weather_data_pipeline():
pipeline = DataPipeline()
# 添加处理步骤
pipeline.add_step("validate", validate_weather_request)
pipeline.add_step("transform", transform_weather_params)
pipeline.add_step("execute", execute_weather_api)
pipeline.add_step("format", format_weather_response)
# 添加监控
pipeline.add_monitoring(log_step_execution)
pipeline.add_monitoring(track_metrics)
# 处理数据流
input_request = {"city": "Beijing", "units": "celsius"}
async for result in pipeline.process(input_request):
print(f"Processing result: {result}")
if "error" in result:
break
时序图展示了 Function Calling 的完整执行流程,清晰地展示了各个组件之间的交互。
关键时序特征:异步处理、并行调用、错误传播、结果聚合
UML 类图展示了 Function Calling 系统的主要类和关系,体现了系统的整体架构。
状态转换图展示了 Function Calling 的执行状态变化,有助于理解系统的生命周期。
状态说明
错误状态
性能优化采用多层次的策略,确保 Function Calling 在高并发场景下的稳定性能。
缓存策略
并发优化
内存优化
网络优化
缓存机制显著提高重复请求的响应速度,减少外部 API 调用和网络开销。
import time
from functools import lru_cache
from typing import Dict, Any
import hashlib
class CacheManager:
def __init__(self, ttl: int = 300, max_size: int = 1000):
self.ttl = ttl # Time to live in seconds
self.max_size = max_size
self.cache = {}
self.access_times = {}
def generate_key(self, tool_name: str, params: Dict) -> str:
"""生成缓存键"""
# 对参数进行排序以确保一致性
sorted_params = sorted(params.items())
param_str = str(sorted_params)
# 创建哈希键
key_str = f"{tool_name}:{param_str}"
return hashlib.md5(key_str.encode()).hexdigest()
def get(self, tool_name: str, params: Dict) -> Any:
"""获取缓存结果"""
key = self.generate_key(tool_name, params)
if key in self.cache:
entry = self.cache[key]
# 检查是否过期
if time.time() - entry['timestamp'] < self.ttl:
self.access_times[key] = time.time()
return entry['data']
else:
# 过期,删除
del self.cache[key]
if key in self.access_times:
del self.access_times[key]
return None
def set(self, tool_name: str, params: Dict, data: Any):
"""设置缓存结果"""
key = self.generate_key(tool_name, params)
# 如果缓存已满,删除最旧的条目
if len(self.cache) >= self.max_size:
oldest_key = min(self.access_times.keys(),
key=lambda k: self.access_times[k])
del self.cache[oldest_key]
del self.access_times[oldest_key]
# 设置新缓存
self.cache[key] = {
'data': data,
'timestamp': time.time(),
'tool_name': tool_name,
'params': params
}
self.access_times[key] = time.time()
def clear(self):
"""清空缓存"""
self.cache.clear()
self.access_times.clear()
# 装饰器方式的缓存
def cached_tool(ttl: int = 300):
"""工具缓存装饰器"""
cache_manager = CacheManager(ttl=ttl)
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# 生成缓存键
key = f"{func.__name__}:{str(kwargs)}"
# 检查缓存
cached_result = cache_manager.get(func.__name__, kwargs)
if cached_result is not None:
return cached_result
# 执行函数
result = await func(*args, **kwargs)
# 缓存结果
cache_manager.set(func.__name__, kwargs, result)
return result
return wrapper
return decorator
# 使用示例
@cached_tool(ttl=60) # 缓存60秒
async def get_stock_price(symbol: str) -> Dict:
"""获取股票价格,带缓存"""
# 实际的API调用
return await fetch_stock_api(symbol)
批处理优化将多个请求合并处理,减少网络开销和提高吞吐量。
import asyncio
from typing import List, Dict, Any
from concurrent.futures import ThreadPoolExecutor
import json
class BatchProcessor:
def __init__(self, batch_size: int = 10, timeout: float = 30.0):
self.batch_size = batch_size
self.timeout = timeout
self.pending_requests = []
self.batch_timer = None
self.executor = ThreadPoolExecutor(max_workers=5)
async def add_request(self, request: Dict) -> asyncio.Future:
"""添加请求到批处理队列"""
# 创建Future用于返回结果
future = asyncio.Future()
# 添加到待处理队列
self.pending_requests.append({
'request': request,
'future': future,
'timestamp': asyncio.get_event_loop().time()
})
# 检查是否需要立即处理
if len(self.pending_requests) >= self.batch_size:
await self._process_batch()
else:
# 设置定时器
if self.batch_timer is None:
self.batch_timer = asyncio.create_task(self._timer_callback())
return future
async def _timer_callback(self):
"""定时器回调,超时后处理批"""
await asyncio.sleep(self.timeout)
await self._process_batch()
async def _process_batch(self):
"""处理一个批次的请求"""
if not self.pending_requests:
return
# 复制并清空队列
batch = self.pending_requests.copy()
self.pending_requests.clear()
# 取消定时器
if self.batch_timer:
self.batch_timer.cancel()
self.batch_timer = None
# 处理批
try:
# 按工具类型分组
tool_groups = self._group_by_tool(batch)
# 并行处理不同的工具
tasks = []
for tool_name, requests in tool_groups.items():
task = self._process_tool_batch(tool_name, requests)
tasks.append(task)
# 等待所有工具处理完成
results = await asyncio.gather(*tasks, return_exceptions=True)
# 返回结果
for i, result in enumerate(results):
if isinstance(result, Exception):
batch[i]['future'].set_exception(result)
else:
batch[i]['future'].set_result(result)
except Exception as e:
# 处理失败,设置所有Future为异常
for item in batch:
item['future'].set_exception(e)
def _group_by_tool(self, batch: List[Dict]) -> Dict[str, List[Dict]]:
"""按工具类型分组"""
groups = {}
for item in batch:
tool_name = item['request'].get('tool_name')
if tool_name not in groups:
groups[tool_name] = []
groups[tool_name].append(item)
return groups
async def _process_tool_batch(self, tool_name: str, requests: List[Dict]) -> List[Any]:
"""处理单个工具的批量请求"""
# 提取参数
params_list = [req['request']['params'] for req in requests]
# 批量调用工具
try:
if tool_name == 'get_stock_price':
results = await self._batch_get_stock_prices(params_list)
elif tool_name == 'get_weather':
results = await self._batch_get_weather(params_list)
# 其他工具...
else:
raise ValueError(f"Unknown tool: {tool_name}")
return results
except Exception as e:
raise Exception(f"Batch processing failed for {tool_name}: {e}")
async def _batch_get_stock_prices(self, params_list: List[Dict]) -> List[Dict]:
"""批量获取股票价格"""
# 使用连接池批量调用
loop = asyncio.get_event_loop()
tasks = []
for params in params_list:
task = loop.run_in_executor(
self.executor,
self._sync_get_stock_price,
params['symbol']
)
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
def _sync_get_stock_price(self, symbol: str) -> Dict:
"""同步获取股票价格"""
# 实际的同步调用
return {"symbol": symbol, "price": 100.0} # 示例数据
def shutdown(self):
"""关闭批处理器"""
if self.batch_timer:
self.batch_timer.cancel()
self.executor.shutdown()
# 使用示例
async def batch_example():
processor = BatchProcessor(batch_size=5, timeout=10.0)
try:
# 添加多个请求
futures = []
for i in range(10):
request = {
'tool_name': 'get_stock_price',
'params': {'symbol': f'AAPL{i}'}
}
future = await processor.add_request(request)
futures.append(future)
# 等待所有结果
results = await asyncio.gather(*futures)
print(f"Batch results: {len(results)} items")
finally:
processor.shutdown()
流式响应处理支持实时数据流和增量结果,提供更好的用户体验和性能。
import asyncio
from typing import AsyncGenerator, Dict, Any
import json
class StreamProcessor:
def __init__(self):
self.active_streams = {}
self.buffer_size = 1024
async def process_stream(self,
tool_name: str,
params: Dict,
callback = None) -> AsyncGenerator[Dict, None]:
"""处理流式工具调用"""
stream_id = f"{tool_name}_{id(params)}"
self.active_streams[stream_id] = {
'params': params,
'callback': callback,
'buffer': []
}
try:
# 模拟流式数据
async for chunk in self._simulate_stream_data(tool_name, params):
# 缓存数据
self.active_streams[stream_id]['buffer'].append(chunk)
# 如果达到缓冲大小,发送数据
if len(self.active_streams[stream_id]['buffer']) >= self.buffer_size:
await self._flush_stream(stream_id)
# 调用回调
if callback:
await callback(chunk)
# 生成数据块
yield chunk
except Exception as e:
# 错误处理
error_chunk = {
'type': 'error',
'message': str(e),
'tool_name': tool_name
}
yield error_chunk
finally:
# 清理
await self._flush_stream(stream_id)
del self.active_streams[stream_id]
async def _simulate_stream_data(self, tool_name: str, params: Dict) -> AsyncGenerator[Dict, None]:
"""模拟流式数据生成"""
if tool_name == 'get_stock_data':
# 模拟实时股票数据
for i in range(10):
await asyncio.sleep(0.5) # 模拟延迟
yield {
'type': 'data',
'symbol': params['symbol'],
'timestamp': asyncio.get_event_loop().time(),
'price': 100 + i * 5,
'volume': 1000 + i * 100
}
elif tool_name == 'process_large_file':
# 模拟文件处理进度
total_chunks = 20
for i in range(total_chunks):
await asyncio.sleep(0.2)
progress = (i + 1) / total_chunks
yield {
'type': 'progress',
'current': i + 1,
'total': total_chunks,
'progress': progress,
'chunk_data': f'Processed chunk {i + 1}'
}
async def _flush_stream(self, stream_id: str):
"""刷新流缓冲区"""
if stream_id in self.active_streams:
buffer = self.active_streams[stream_id]['buffer']
if buffer:
# 发送批量数据
batch_chunk = {
'type': 'batch',
'data': buffer,
'count': len(buffer)
}
callback = self.active_streams[stream_id]['callback']
if callback:
await callback(batch_chunk)
# 清空缓冲区
self.active_streams[stream_id]['buffer'] = []
# 使用示例
async def streaming_example():
processor = StreamProcessor()
async def data_callback(chunk: Dict):
print(f"Received chunk: {chunk}")
# 处理流式数据
async for chunk in processor.process_stream(
'get_stock_data',
{'symbol': 'AAPL'},
data_callback
):
print(f"Processed: {chunk}")
# 如果是错误类型,停止处理
if chunk.get('type') == 'error':
break
WebSocket 集成提供实时双向通信,支持实时工具调用和结果推送。
import asyncio
import json
import websockets
from typing import Dict, Set, Any
from dataclasses import dataclass
@dataclass
class WebSocketMessage:
type: str
data: Dict
client_id: str
class WebSocketToolServer:
def __init__(self):
self.clients: Set[str] = set()
self.client_tools: Dict[str, Dict] = {}
self.tool_queue = asyncio.Queue()
self.result_cache = {}
async def register_client(self, websocket: websockets.WebSocketServerProtocol):
"""注册新客户端"""
client_id = f"client_{id(websocket)}"
self.clients.add(client_id)
self.client_tools[client_id] = {}
# 发送欢迎消息
welcome_msg = {
'type': 'connected',
'client_id': client_id,
'message': 'Connected to tool server'
}
await websocket.send(json.dumps(welcome_msg))
# 处理客户端消息
try:
async for message in websocket:
await self.handle_message(client_id, message)
except websockets.exceptions.ConnectionClosed:
pass
finally:
# 清理
self.clients.discard(client_id)
if client_id in self.client_tools:
del self.client_tools[client_id]
async def handle_message(self, client_id: str, message: str):
"""处理客户端消息"""
try:
data = json.loads(message)
msg_type = data.get('type')
if msg_type == 'register_tool':
await self.register_tool(client_id, data)
elif msg_type == 'call_tool':
await self.call_tool(client_id, data)
elif msg_type == 'subscribe':
await self.subscribe(client_id, data)
else:
await self.send_error(client_id, f"Unknown message type: {msg_type}")
except json.JSONDecodeError:
await self.send_error(client_id, "Invalid JSON format")
except Exception as e:
await self.send_error(client_id, f"Error processing message: {str(e)}")
async def register_tool(self, client_id: str, data: Dict):
"""注册客户端工具"""
tool_name = data.get('tool_name')
tool_schema = data.get('schema')
if tool_name and tool_schema:
self.client_tools[client_id][tool_name] = {
'schema': tool_schema,
'registered_at': asyncio.get_event_loop().time()
}
response = {
'type': 'tool_registered',
'tool_name': tool_name,
'status': 'success'
}
await self.send_to_client(client_id, response)
else:
await self.send_error(client_id, "Missing tool_name or schema")
async def call_tool(self, client_id: str, data: Dict):
"""调用工具"""
tool_name = data.get('tool_name')
params = data.get('params', {})
# 查找工具
found = False
for cid, tools in self.client_tools.items():
if tool_name in tools:
found = True
# 创建任务调用工具
task = asyncio.create_task(
self._execute_tool(cid, tool_name, params, client_id)
)
break
if not found:
await self.send_error(client_id, f"Tool {tool_name} not found")
else:
# 发送确认消息
response = {
'type': 'tool_call_started',
'tool_name': tool_name,
'message': 'Tool execution started'
}
await self.send_to_client(client_id, response)
async def _execute_tool(self, owner_client: str, tool_name: str, params: Dict, caller_client: str):
"""执行工具调用"""
try:
# 模拟工具执行
result = await self._simulate_tool_execution(tool_name, params)
# 发送结果给调用者
response = {
'type': 'tool_result',
'tool_name': tool_name,
'result': result,
'status': 'success'
}
await self.send_to_client(caller_client, response)
# 通知所有订阅者
await self.notify_subscribers(tool_name, result)
except Exception as e:
error_response = {
'type': 'tool_error',
'tool_name': tool_name,
'error': str(e),
'status': 'failed'
}
await self.send_to_client(caller_client, error_response)
async def _simulate_tool_execution(self, tool_name: str, params: Dict) -> Dict:
"""模拟工具执行"""
await asyncio.sleep(1) # 模拟执行时间
if tool_name == 'calculate':
return {
'result': eval(params.get('expression', '0')),
'expression': params.get('expression')
}
elif tool_name == 'search':
return {
'results': [f"Result for {params.get('query', '')}"],
'query': params.get('query')
}
else:
raise ValueError(f"Unknown tool: {tool_name}")
async def send_to_client(self, client_id: str, message: Dict):
"""发送消息给特定客户端"""
# 在实际实现中,需要维护websocket连接
print(f"Sending to {client_id}: {message}")
async def notify_subscribers(self, tool_name: str, result: Dict):
"""通知所有订阅者"""
notification = {
'type': 'tool_update',
'tool_name': tool_name,
'result': result,
'timestamp': asyncio.get_event_loop().time()
}
print(f"Notifying subscribers about {tool_name}: {result}")
# 使用示例
async def websocket_server():
server = WebSocketToolServer()
# 启动WebSocket服务器
async with websockets.serve(server.register_client, "localhost", 8765):
print("WebSocket server started on ws://localhost:8765")
await asyncio.Future() # 永远运行
工具组合模式允许将多个工具的功能组合成新的复合工具,实现功能的灵活组合和复用。
from typing import List, Dict, Any, Callable
from dataclasses import dataclass
@dataclass
class ToolResult:
tool_name: str
data: Any
metadata: Dict
success: bool
class CompositeTool:
"""组合工具基类"""
def __init__(self, name: str, description: str):
self.name = name
self.description = description
self.sub_tools: List[str] = []
self.combiner: Callable = None
def add_sub_tool(self, tool_name: str):
"""添加子工具"""
if tool_name not in self.sub_tools:
self.sub_tools.append(tool_name)
def set_combiner(self, combiner: Callable):
"""设置结果组合器"""
self.combiner = combiner
async def execute(self, params: Dict) -> Any:
"""执行组合工具"""
if not self.combiner:
raise ValueError("Combiner not set")
# 并行执行子工具
results = {}
for tool_name in self.sub_tools:
try:
result = await execute_tool_async(tool_name, params)
results[tool_name] = ToolResult(
tool_name=tool_name,
data=result,
metadata={'executed_at': asyncio.get_event_loop().time()},
success=True
)
except Exception as e:
results[tool_name] = ToolResult(
tool_name=tool_name,
data=None,
metadata={'error': str(e)},
success=False
)
# 组合结果
return await self.combiner(results, params)
# 预定义的组合工具
class DataAnalysisComposite(CompositeTool):
"""数据分析组合工具"""
def __init__(self):
super().__init__(
"data_analysis",
"组合数据获取和分析工具"
)
self.add_sub_tool("get_data")
self.add_sub_tool("analyze_data")
self.add_sub_tool("generate_report")
self.set_combiner(self._combine_analysis_results)
async def _combine_analysis_results(self, results: Dict, params: Dict) -> Dict:
"""组合分析结果"""
analysis_result = None
report_data = None
# 提取各个工具的结果
if "analyze_data" in results and results["analyze_data"].success:
analysis_result = results["analyze_data"].data
if "generate_report" in results and results["generate_report"].success:
report_data = results["generate_report"].data
return {
"analysis": analysis_result,
"report": report_data,
"metadata": {
"tools_executed": len([r for r in results.values() if r.success]),
"total_tools": len(results)
}
}
class WorkflowTool(CompositeTool):
"""工作流工具"""
def __init__(self, name: str):
super().__init__(name, f"Workflow: {name}")
self.steps = []
self.dependencies = {}
def add_step(self, tool_name: str, depends_on: List[str] = None):
"""添加执行步骤"""
self.sub_tools.append(tool_name)
if depends_on:
self.dependencies[tool_name] = depends_on
async def execute(self, params: Dict) -> Any:
"""按依赖顺序执行步骤"""
results = {}
completed = set()
# 按依赖顺序执行
while len(completed) < len(self.sub_tools):
for tool_name in self.sub_tools:
if tool_name in completed:
continue
# 检查依赖
dependencies_met = True
for dep in self.dependencies.get(tool_name, []):
if dep not in completed or not results[dep].success:
dependencies_met = False
break
if dependencies_met:
try:
result = await execute_tool_async(tool_name, params)
results[tool_name] = ToolResult(
tool_name=tool_name,
data=result,
metadata={'executed_at': asyncio.get_event_loop().time()},
success=True
)
completed.add(tool_name)
except Exception as e:
results[tool_name] = ToolResult(
tool_name=tool_name,
data=None,
metadata={'error': str(e)},
success=False
)
completed.add(tool_name) # 标记为完成,即使失败
return {
"workflow_name": self.name,
"steps": results,
"summary": {
"successful_steps": len([r for r in results.values() if r.success]),
"failed_steps": len([r for r in results.values() if not r.success]),
"total_steps": len(results)
}
}
# 使用示例
async def composite_example():
# 创建数据分析组合工具
data_analysis = DataAnalysisComposite()
# 执行组合工具
result = await data_analysis.execute({
"query": "sales data 2024",
"format": "detailed"
})
print(f"Analysis result: {result}")
# 创建工作流工具
workflow = WorkflowTool("data_pipeline")
workflow.add_step("extract_data")
workflow.add_step("transform_data", depends_on=["extract_data"])
workflow.add_step("load_data", depends_on=["transform_data"])
# 执行工作流
workflow_result = await workflow.execute({
"source": "database",
"target": "data_warehouse"
})
print(f"Workflow result: {workflow_result}")
流水线设计将复杂的业务流程分解为多个处理阶段,实现流程的模块化和可维护性。
from typing import List, Dict, Any, Callable
from dataclasses import dataclass
from enum import Enum
import asyncio
class PipelineStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
RETRYING = "retrying"
@dataclass
class PipelineStep:
name: str
processor: Callable
dependencies: List[str]
retry_count: int = 0
max_retries: int = 3
timeout: float = 30.0
class DataPipeline:
def __init__(self, name: str):
self.name = name
self.steps: Dict[str, PipelineStep] = {}
self.execution_order: List[str] = []
self.status = PipelineStatus.PENDING
self.results: Dict[str, Any] = {}
self.errors: Dict[str, Exception] = {}
def add_step(self, name: str, processor: Callable, dependencies: List[str] = None, **kwargs):
"""添加流水线步骤"""
if dependencies is None:
dependencies = []
self.steps[name] = PipelineStep(
name=name,
processor=processor,
dependencies=dependencies,
**kwargs
)
# 更新执行顺序
self._update_execution_order()
def _update_execution_order(self):
"""更新步骤执行顺序(拓扑排序)"""
visited = set()
temp_visited = set()
order = []
def visit(step_name: str):
if step_name in temp_visited:
raise ValueError(f"Circular dependency detected in pipeline {self.name}")
if step_name not in visited:
temp_visited.add(step_name)
# 访问依赖
for dep in self.steps[step_name].dependencies:
visit(dep)
temp_visited.remove(step_name)
visited.add(step_name)
order.append(step_name)
for step_name in self.steps:
if step_name not in visited:
visit(step_name)
self.execution_order = order
async def execute(self, input_data: Dict) -> Dict:
"""执行流水线"""
self.status = PipelineStatus.RUNNING
self.results = {}
self.errors = {}
try:
for step_name in self.execution_order:
step = self.steps[step_name]
# 准备步骤输入数据
step_input = self._prepare_step_input(step_name, input_data)
# 执行步骤
step_result = await self._execute_step(step, step_input)
# 存储结果
self.results[step_name] = step_result
# 更新输入数据供后续步骤使用
input_data.update(step_result)
self.status = PipelineStatus.COMPLETED
return self.results
except Exception as e:
self.status = PipelineStatus.FAILED
self.errors["pipeline"] = e
raise
def _prepare_step_input(self, step_name: str, input_data: Dict) -> Dict:
"""准备步骤输入数据"""
step_input = input_data.copy()
# 添加依赖步骤的结果
for dep in self.steps[step_name].dependencies:
if dep in self.results:
step_input[f"_{dep}_result"] = self.results[dep]
return step_input
async def _execute_step(self, step: PipelineStep, input_data: Dict) -> Any:
"""执行单个步骤"""
max_retries = step.max_retries
retry_count = 0
while retry_count <= max_retries:
try:
# 设置超时
result = await asyncio.wait_for(
step.processor(input_data),
timeout=step.timeout
)
return result
except asyncio.TimeoutError:
retry_count += 1
if retry_count > max_retries:
raise Exception(f"Step {step.name} timeout after {max_retries} retries")
await asyncio.sleep(2 ** retry_count) # 指数退避
except Exception as e:
retry_count += 1
if retry_count > max_retries:
raise Exception(f"Step {step.name} failed after {max_retries} retries: {str(e)}")
await asyncio.sleep(1) # 简单等待
raise Exception(f"Step {step.name} exceeded max retries")
# 预定义的流水线模板
class DataProcessingPipeline(DataPipeline):
"""数据处理流水线"""
def __init__(self):
super().__init__("data_processing")
# 添加标准步骤
self.add_step("validate", validate_data, max_retries=2)
self.add_step("clean", clean_data, dependencies=["validate"])
self.add_step("transform", transform_data, dependencies=["clean"])
self.add_step("analyze", analyze_data, dependencies=["transform"])
self.add_step("store", store_data, dependencies=["analyze"])
async def execute_with_monitoring(self, input_data: Dict) -> Dict:
"""带监控的流水线执行"""
monitoring_callbacks = []
def add_monitoring(callback):
monitoring_callbacks.append(callback)
# 执行流水线
result = await self.execute(input_data)
# 触发监控回调
for callback in monitoring_callbacks:
callback(self)
return result
# 使用示例
async def pipeline_example():
# 创建数据处理流水线
pipeline = DataProcessingPipeline()
# 定义各个步骤的处理函数
async def validate_data(data: Dict) -> Dict:
"""数据验证步骤"""
if "data" not in data:
raise ValueError("Missing data field")
return {"validated": True, "validation_report": "Data passed validation"}
async def clean_data(data: Dict) -> Dict:
"""数据清洗步骤"""
cleaned_data = data["data"] # 简化的清洗逻辑
return {"cleaned_data": cleaned_data}
async def transform_data(data: Dict) -> Dict:
"""数据转换步骤"""
transformed = [x * 2 for x in data["cleaned_data"]] # 简单转换
return {"transformed_data": transformed}
async def analyze_data(data: Dict) -> Dict:
"""数据分析步骤"""
analysis = {
"sum": sum(data["transformed_data"]),
"average": sum(data["transformed_data"]) / len(data["transformed_data"]),
"count": len(data["transformed_data"])
}
return {"analysis": analysis}
async def store_data(data: Dict) -> Dict:
"""数据存储步骤"""
# 模拟存储
return {"stored": True, "storage_id": f"store_{id(data)}"}
# 执行流水线
input_data = {
"data": [1, 2, 3, 4, 5],
"metadata": {"source": "test", "timestamp": "2024-01-01"}
}
try:
result = await pipeline.execute(input_data)
print(f"Pipeline completed successfully: {result}")
except Exception as e:
print(f"Pipeline failed: {e}")
print(f"Errors: {pipeline.errors}")
print(f"Partial results: {pipeline.results}")
代理模式为工具调用提供间接访问层,实现访问控制、缓存、日志等功能。
from typing import Dict, Any, Callable
from functools import wraps
import time
import logging
class ToolProxy:
"""工具代理基类"""
def __init__(self, real_tool: Callable):
self.real_tool = real_tool
self.logger = logging.getLogger(f"proxy_{real_tool.__name__}")
def __call__(self, *args, **kwargs):
"""代理调用"""
return self.real_tool(*args, **kwargs)
class CachingToolProxy(ToolProxy):
"""缓存代理"""
def __init__(self, real_tool: Callable, ttl: int = 300):
super().__init__(real_tool)
self.cache = {}
self.ttl = ttl
def __call__(self, *args, **kwargs):
"""带缓存的代理调用"""
# 生成缓存键
cache_key = f"{self.real_tool.__name__}:{str(args)}:{str(sorted(kwargs.items()))}"
# 检查缓存
if cache_key in self.cache:
cached_data = self.cache[cache_key]
if time.time() - cached_data['timestamp'] < self.ttl:
self.logger.info(f"Cache hit for {cache_key}")
return cached_data['data']
else:
# 过期,删除
del self.cache[cache_key]
# 执行真实工具
self.logger.info(f"Executing tool {self.real_tool.__name__}")
result = self.real_tool(*args, **kwargs)
# 缓存结果
self.cache[cache_key] = {
'data': result,
'timestamp': time.time()
}
return result
class LoggingToolProxy(ToolProxy):
"""日志代理"""
def __init__(self, real_tool: Callable):
super().__init__(real_tool)
self.call_count = 0
def __call__(self, *args, **kwargs):
"""带日志的代理调用"""
self.call_count += 1
start_time = time.time()
try:
result = self.real_tool(*args, **kwargs)
duration = time.time() - start_time
self.logger.info(
f"Tool {self.real_tool.__name__} executed successfully. "
f"Call #{self.call_count}, Duration: {duration:.2f}s, "
f"Args: {args}, Kwargs: {kwargs}"
)
return result
except Exception as e:
duration = time.time() - start_time
self.logger.error(
f"Tool {self.real_tool.__name__} failed. "
f"Call #{self.call_count}, Duration: {duration:.2f}s, "
f"Error: {str(e)}"
)
raise
class AuthorizationToolProxy(ToolProxy):
"""授权代理"""
def __init__(self, real_tool: Callable, allowed_roles: list):
super().__init__(real_tool)
self.allowed_roles = allowed_roles
def __call__(self, *args, **kwargs):
"""带授权的代理调用"""
# 检查用户权限
user_role = kwargs.get('user_role', 'guest')
if user_role not in self.allowed_roles:
raise PermissionError(f"Role '{user_role}' not allowed to use {self.real_tool.__name__}")
self.logger.info(f"Authorized access to {self.real_tool.__name__} by {user_role}")
return self.real_tool(*args, **kwargs)
class RetryToolProxy(ToolProxy):
"""重试代理"""
def __init__(self, real_tool: Callable, max_retries: int = 3, delay: float = 1.0):
super().__init__(real_tool)
self.max_retries = max_retries
self.delay = delay
def __call__(self, *args, **kwargs):
"""带重试的代理调用"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
if attempt > 0:
self.logger.info(f"Retry attempt {attempt} for {self.real_tool.__name__}")
time.sleep(self.delay * (2 ** (attempt - 1))) # 指数退避
return self.real_tool(*args, **kwargs)
except Exception as e:
last_exception = e
self.logger.warning(
f"Attempt {attempt + 1} failed for {self.real_tool.__name__}: {str(e)}"
)
if attempt == self.max_retries:
break
# 所有重试都失败
self.logger.error(f"All {self.max_retries + 1} attempts failed for {self.real_tool.__name__}")
raise last_exception
class MetricsToolProxy(ToolProxy):
"""监控代理"""
def __init__(self, real_tool: Callable):
super().__init__(real_tool)
self.execution_times = []
self.success_count = 0
self.failure_count = 0
def __call__(self, *args, **kwargs):
"""带监控的代理调用"""
start_time = time.time()
try:
result = self.real_tool(*args, **kwargs)
duration = time.time() - start_time
self.success_count += 1
self.execution_times.append(duration)
# 计算统计信息
avg_time = sum(self.execution_times) / len(self.execution_times)
self.logger.info(
f"Tool {self.real_tool.__name__} metrics: "
f"Success: {self.success_count}, Failures: {self.failure_count}, "
f"Avg Time: {avg_time:.2f}s, Current Time: {duration:.2f}s"
)
return result
except Exception as e:
duration = time.time() - start_time
self.failure_count += 1
self.logger.error(
f"Tool {self.real_tool.__name__} failed in {duration:.2f}s. "
f"Total successes: {self.success_count}, failures: {self.failure_count}"
)
raise
# 代理工厂
def create_tool_proxy(real_tool: Callable, proxy_types: list) -> ToolProxy:
"""创建工具代理链"""
proxy = real_tool
for proxy_type in reversed(proxy_types): # 反向应用代理
if proxy_type == "cache":
proxy = CachingToolProxy(proxy)
elif proxy_type == "logging":
proxy = LoggingToolProxy(proxy)
elif proxy_type == "authorization":
proxy = AuthorizationToolProxy(proxy, allowed_roles=["admin", "user"])
elif proxy_type == "retry":
proxy = RetryToolProxy(proxy)
elif proxy_type == "metrics":
proxy = MetricsToolProxy(proxy)
return proxy
# 使用示例
async def proxy_example():
# 定义真实工具
def get_user_data(user_id: int) -> Dict:
"""获取用户数据的真实工具"""
if user_id == 1:
return {"id": 1, "name": "Alice", "email": "[email protected]"}
else:
raise ValueError(f"User {user_id} not found")
# 创建代理链
proxy = create_tool_proxy(
get_user_data,
["cache", "logging", "retry", "metrics"]
)
# 使用代理
try:
# 第一次调用(会执行真实函数)
result1 = proxy(1)
print(f"First call result: {result1}")
# 第二次调用(会命中缓存)
result2 = proxy(1)
print(f"Second call result: {result2}")
# 会失败的调用(会重试)
try:
result3 = proxy(999) # 不存在的用户
except Exception as e:
print(f"Expected error: {e}")
except Exception as e:
print(f"Error: {e}")
装饰器模式为工具函数动态添加功能,在不修改原有代码的情况下增强工具能力。
from typing import Callable, Dict, Any, List
from functools import wraps
import time
import logging
from dataclasses import dataclass
@dataclass
class ToolMetadata:
name: str
description: str
version: str = "1.0"
category: str = "general"
tags: List[str] = None
def tool(name: str = None, description: str = None, **metadata):
"""工具装饰器"""
def decorator(func: Callable):
# 设置工具元数据
func._tool_metadata = ToolMetadata(
name=name or func.__name__,
description=description or func.__doc__ or "No description",
**metadata
)
@wraps(func)
async def wrapper(*args, **kwargs):
return await func(*args, **kwargs)
return wrapper
return decorator
def timing_decorator(func: Callable):
"""执行时间装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
logging.info(f"Tool {func.__name__} executed in {duration:.2f}s")
return result
except Exception as e:
duration = time.time() - start_time
logging.error(f"Tool {func.__name__} failed after {duration:.2f}s: {str(e)}")
raise
return wrapper
def validation_decorator(schema: Dict):
"""参数验证装饰器"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
# 这里可以添加参数验证逻辑
# 简化示例,实际应该使用JSON Schema验证
return await func(*args, **kwargs)
return wrapper
return decorator
def caching_decorator(ttl: int = 300):
"""缓存装饰器"""
cache = {}
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = f"{func.__name__}:{str(args)}:{str(sorted(kwargs.items()))}"
# 检查缓存
if cache_key in cache:
cached_data = cache[cache_key]
if time.time() - cached_data['timestamp'] < ttl:
return cached_data['data']
else:
del cache[cache_key]
# 执行函数并缓存结果
result = await func(*args, **kwargs)
cache[cache_key] = {
'data': result,
'timestamp': time.time()
}
return result
return wrapper
return decorator
def authorization_decorator(allowed_roles: List[str]):
"""授权装饰器"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
user_role = kwargs.get('user_role', 'guest')
if user_role not in allowed_roles:
raise PermissionError(f"Access denied for role: {user_role}")
return await func(*args, **kwargs)
return wrapper
return decorator
def retry_decorator(max_retries: int = 3, delay: float = 1.0):
"""重试装饰器"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
if attempt > 0:
logging.info(f"Retry attempt {attempt} for {func.__name__}")
await asyncio.sleep(delay * (2 ** (attempt - 1)))
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
logging.warning(f"Attempt {attempt + 1} failed for {func.__name__}: {str(e)}")
raise last_exception
return wrapper
return decorator
# 装饰器组合示例
@tool(
name="user_management",
description="用户管理工具",
version="2.0",
category="admin",
tags=["users", "management", "admin"]
@timing_decorator
@caching_decorator(ttl=60)
@validation_decorator({"type": "object", "properties": {"user_id": {"type": "integer"}}})
@authorization_decorator(["admin"])
@retry_decorator(max_retries=3)
async def create_user(user_id: int, user_data: Dict, user_role: str = "guest") -> Dict:
"""创建用户的工具函数"""
if user_id <= 0:
raise ValueError("Invalid user ID")
# 模拟用户创建
user = {
"id": user_id,
"name": user_data.get("name", f"User{user_id}"),
"email": user_data.get("email", f"user{user_id}@example.com"),
"created_at": time.time(),
"role": user_role
}
logging.info(f"Created user {user_id} with role {user_role}")
return user
# 装饰器工厂
def create_tool_decorator(decorators: List[Callable]):
"""创建工具装饰器链"""
def decorator(func: Callable):
decorated_func = func
# 反向应用装饰器(最外层的装饰器最先应用)
for d in reversed(decorators):
decorated_func = d(decorated_func)
return decorated_func
return decorator
# 使用示例
async def decorator_example():
# 使用预定义的装饰器链
tool_decorators = [
timing_decorator,
caching_decorator(ttl=30),
authorization_decorator(["admin", "user"]),
retry_decorator(max_retries=2)
]
@create_tool_decorator(tool_decorators)
@tool(name="sample_tool", description="示例工具")
async def sample_function(data: str, user_role: str = "guest") -> str:
return f"Processed: {data} by {user_role}"
# 使用装饰器后的函数
try:
result = await sample_function("test data", user_role="admin")
print(f"Result: {result}")
# 再次调用(会命中缓存)
result2 = await sample_function("test data", user_role="admin")
print(f"Cached result: {result2}")
except Exception as e:
print(f"Error: {e}")
命令模式将工具调用封装为命令对象,支持命令的排队、撤销和重做功能。
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from abc import ABC, abstractmethod
import asyncio
import uuid
from enum import Enum
class CommandType(Enum):
EXECUTE = "execute"
CANCEL = "cancel"
RETRY = "retry"
UNDO = "undo"
class CommandStatus(Enum):
PENDING = "pending"
EXECUTING = "executing"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class Command:
id: str
type: CommandType
tool_name: str
parameters: Dict[str, Any]
status: CommandStatus = CommandStatus.PENDING
result: Optional[Any] = None
error: Optional[str] = None
created_at: float = None
executed_at: Optional[float] = None
completed_at: Optional[float] = None
retry_count: int = 0
max_retries: int = 3
class CommandReceiver:
"""命令接收者,执行实际的工具函数"""
def __init__(self):
self.tools = {}
def register_tool(self, name: str, func: callable):
"""注册工具函数"""
self.tools[name] = func
async def execute_tool(self, tool_name: str, params: Dict) -> Any:
"""执行工具函数"""
if tool_name not in self.tools:
raise ValueError(f"Tool {tool_name} not found")
tool_func = self.tools[tool_name]
return await tool_func(**params)
class CommandInvoker:
"""命令调用者,管理命令执行"""
def __init__(self, receiver: CommandReceiver):
self.receiver = receiver
self.command_queue = asyncio.Queue()
self.history: List[Command] = []
self.current_command: Optional[Command] = None
self.is_running = False
self.max_queue_size = 100
async def execute_command(self, command: Command) -> Command:
"""执行单个命令"""
if self.command_queue.qsize() >= self.max_queue_size:
raise Exception("Command queue is full")
await self.command_queue.put(command)
return command
async def execute_commands(self, commands: List[Command]) -> List[Command]:
"""批量执行命令"""
results = []
for command in commands:
result = await self.execute_command(command)
results.append(result)
return results
async def process_queue(self):
"""处理命令队列"""
self.is_running = True
while self.is_running or not self.command_queue.empty():
try:
# 获取命令(带超时)
command = await asyncio.wait_for(
self.command_queue.get(),
timeout=1.0
)
# 执行命令
await self._execute_single_command(command)
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"Error processing command queue: {e}")
async def _execute_single_command(self, command: Command):
"""执行单个命令"""
self.current_command = command
command.status = CommandStatus.EXECUTING
command.executed_at = asyncio.get_event_loop().time()
try:
# 根据命令类型执行
if command.type == CommandType.EXECUTE:
result = await self.receiver.execute_tool(
command.tool_name,
command.parameters
)
command.result = result
command.status = CommandStatus.COMPLETED
elif command.type == CommandType.CANCEL:
command.status = CommandStatus.CANCELLED
elif command.type == CommandType.RETRY:
if command.retry_count < command.max_retries:
command.retry_count += 1
command.status = CommandStatus.PENDING
await self.command_queue.put(command) # 重新排队
else:
command.status = CommandStatus.FAILED
command.error = "Max retries exceeded"
elif command.type == CommandType.UNDO:
# 撤销逻辑
command.status = CommandStatus.COMPLETED
command.result = "Undo executed"
except Exception as e:
command.error = str(e)
command.status = CommandStatus.FAILED
# 重试逻辑
if command.retry_count < command.max_retries:
command.retry_count += 1
command.status = CommandStatus.PENDING
await self.command_queue.put(command) # 重新排队
finally:
command.completed_at = asyncio.get_event_loop().time()
self.history.append(command)
self.current_command = None
async def cancel_command(self, command_id: str) -> bool:
"""取消命令"""
# 在队列中查找并取消
cancelled = False
# 简化实现,实际需要更复杂的队列管理
for command in self.history:
if command.id == command_id and command.status == CommandStatus.PENDING:
command.status = CommandStatus.CANCELLED
cancelled = True
break
return cancelled
async def undo_command(self, command_id: str) -> bool:
"""撤销命令"""
for command in self.history:
if command.id == command_id and command.status == CommandStatus.COMPLETED:
undo_command = Command(
id=str(uuid.uuid4()),
type=CommandType.UNDO,
tool_name=command.tool_name,
parameters=command.parameters,
max_retries=0
)
await self.execute_command(undo_command)
return True
return False
class ToolCommandManager:
"""工具命令管理器"""
def __init__(self):
self.receiver = CommandReceiver()
self.invoker = CommandInvoker(self.receiver)
def register_tool(self, name: str, func: callable):
"""注册工具函数"""
self.receiver.register_tool(name, func)
async def create_command(self, tool_name: str, params: Dict, **kwargs) -> Command:
"""创建命令"""
command = Command(
id=str(uuid.uuid4()),
type=CommandType.EXECUTE,
tool_name=tool_name,
parameters=params,
max_retries=kwargs.get('max_retries', 3),
created_at=asyncio.get_event_loop().time()
)
return command
async def execute_tool(self, tool_name: str, params: Dict, **kwargs) -> Any:
"""直接执行工具(带命令包装)"""
command = await self.create_command(tool_name, params, **kwargs)
await self.invoker.execute_command(command)
return command.result
async def start_processor(self):
"""启动命令处理器"""
await self.invoker.process_queue()
def stop_processor(self):
"""停止命令处理器"""
self.invoker.is_running = False
# 使用示例
async def command_example():
# 创建命令管理器
manager = ToolCommandManager()
# 注册工具函数
async def get_weather(city: str) -> Dict:
"""获取天气信息"""
await asyncio.sleep(1) # 模拟延迟
return {
"city": city,
"temperature": 20 + hash(city) % 10,
"humidity": 60 + hash(city) % 20
}
async def send_notification(message: str) -> Dict:
"""发送通知"""
await asyncio.sleep(0.5)
return {"sent": True, "message": message}
manager.register_tool("get_weather", get_weather)
manager.register_tool("send_notification", send_notification)
# 启动命令处理器
processor_task = asyncio.create_task(manager.start_processor())
try:
# 创建和执行命令
commands = []
# 天气查询命令
weather_command = await manager.create_command(
"get_weather",
{"city": "Beijing"}
)
commands.append(weather_command)
# 通知命令
notification_command = await manager.create_command(
"send_notification",
{"message": "Weather data retrieved"}
)
commands.append(notification_command)
# 批量执行命令
await manager.invoker.execute_commands(commands)
# 等待命令完成
await asyncio.sleep(3)
# 查看执行结果
for command in commands:
print(f"Command {command.id}: {command.status.value}")
if command.result:
print(f"Result: {command.result}")
finally:
# 停止处理器
manager.stop_processor()
await processor_task
观察者模式实现工具执行的实时监控和事件通知,支持松耦合的响应机制。
from typing import List, Dict, Any, Callable
from dataclasses import dataclass
from abc import ABC, abstractmethod
import asyncio
import uuid
from enum import Enum
class EventType(Enum):
TOOL_START = "tool_start"
TOOL_COMPLETE = "tool_complete"
TOOL_ERROR = "tool_error"
TOOL_PROGRESS = "tool_progress"
SYSTEM_EVENT = "system_event"
@dataclass
class Event:
type: EventType
tool_name: str
data: Dict[str, Any]
timestamp: float
event_id: str
class Observer(ABC):
"""观察者接口"""
@abstractmethod
async def on_event(self, event: Event):
"""处理事件"""
pass
class ToolEventObserver(Observer):
"""工具事件观察者"""
async def on_event(self, event: Event):
print(f"[{event.timestamp:.2f}] {event.type.value}: {event.tool_name}")
if event.data:
print(f"Data: {event.data}")
class LoggingObserver(Observer):
"""日志观察者"""
async def on_event(self, event: Event):
log_message = f"Event: {event.type.value} for {event.tool_name}"
if event.data:
log_message += f" with data: {event.data}"
# 这里应该使用真实的日志系统
print(f"[LOG] {log_message}")
class MetricsObserver(Observer):
"""监控指标观察者"""
def __init__(self):
self.metrics = {
"total_events": 0,
"tool_starts": 0,
"tool_completions": 0,
"tool_errors": 0,
"total_duration": 0
}
async def on_event(self, event: Event):
self.metrics["total_events"] += 1
if event.type == EventType.TOOL_START:
self.metrics["tool_starts"] += 1
self.start_times[event.event_id] = event.timestamp
elif event.type == EventType.TOOL_COMPLETE:
self.metrics["tool_completions"] += 1
if event.event_id in self.start_times:
duration = event.timestamp - self.start_times[event.event_id]
self.metrics["total_duration"] += duration
elif event.type == EventType.TOOL_ERROR:
self.metrics["tool_errors"] += 1
# 定期打印指标
if self.metrics["total_events"] % 10 == 0:
print(f"[METRICS] {self.metrics}")
class ObservableSubject:
"""被观察的主题"""
def __init__(self):
self.observers: List[Observer] = []
self.start_times = {}
def attach(self, observer: Observer):
"""附加观察者"""
if observer not in self.observers:
self.observers.append(observer)
def detach(self, observer: Observer):
"""分离观察者"""
if observer in self.observers:
self.observers.remove(observer)
async def notify(self, event: Event):
"""通知所有观察者"""
for observer in self.observers:
try:
await observer.on_event(event)
except Exception as e:
print(f"Error notifying observer: {e}")
async def emit_event(self, event_type: EventType, tool_name: str, data: Dict = None):
"""发射事件"""
event = Event(
type=event_type,
tool_name=tool_name,
data=data or {},
timestamp=asyncio.get_event_loop().time(),
event_id=str(uuid.uuid4())
)
await self.notify(event)
return event.event_id
class ToolExecutor:
"""工具执行器(被观察者)"""
def __init__(self):
self.subject = ObservableSubject()
self.tools = {}
def register_tool(self, name: str, func: Callable):
"""注册工具函数"""
self.tools[name] = func
def attach_observer(self, observer: Observer):
"""附加观察者"""
self.subject.attach(observer)
async def execute_tool(self, tool_name: str, params: Dict) -> Any:
"""执行工具(并发射事件)"""
if tool_name not in self.tools:
raise ValueError(f"Tool {tool_name} not found")
# 发射开始事件
start_event_id = await self.subject.emit_event(
EventType.TOOL_START,
tool_name,
{"params": params}
)
try:
# 执行工具函数
tool_func = self.tools[tool_name]
result = await tool_func(**params)
# 发射完成事件
await self.subject.emit_event(
EventType.TOOL_COMPLETE,
tool_name,
{
"result": result,
"duration": asyncio.get_event_loop().time() - self.subject.start_times[start_event_id]
}
)
return result
except Exception as e:
# 发射错误事件
await self.subject.emit_event(
EventType.TOOL_ERROR,
tool_name,
{
"error": str(e),
"duration": asyncio.get_event_loop().time() - self.subject.start_times[start_event_id]
}
)
raise
class ProgressObserver(Observer):
"""进度观察者"""
async def on_event(self, event: Event):
if event.type == EventType.TOOL_PROGRESS:
progress = event.data.get('progress', 0)
tool_name = event.tool_name
print(f"[PROGRESS] {tool_name}: {progress:.1%}")
class NotificationObserver(Observer):
"""通知观察者"""
def __init__(self, notification_service: Callable):
self.notification_service = notification_service
async def on_event(self, event: Event):
if event.type in [EventType.TOOL_COMPLETE, EventType.TOOL_ERROR]:
message = f"Tool {event.tool_name} "
if event.type == EventType.TOOL_COMPLETE:
message += "completed successfully"
else:
message += f"failed: {event.data.get('error', 'Unknown error')}"
await self.notification_service(message)
class ToolEventSystem:
"""工具事件系统"""
def __init__(self):
self.executor = ToolExecutor()
self.progress_tracker = {}
def setup_observers(self):
"""设置观察者"""
# 基础观察者
self.executor.attach_observer(ToolEventObserver())
self.executor.attach_observer(LoggingObserver())
self.executor.attach_observer(MetricsObserver())
# 进度观察者
self.executor.attach_observer(ProgressObserver())
# 通知观察者(示例)
async def dummy_notification(message: str):
print(f"[NOTIFICATION] {message}")
self.executor.attach_observer(NotificationObserver(dummy_notification))
def register_tools(self):
"""注册工具"""
async def process_file(file_path: str, progress_callback: Callable = None):
"""处理文件的工具"""
total_steps = 100
for i in range(total_steps):
if progress_callback:
progress = (i + 1) / total_steps
await progress_callback(progress)
await asyncio.sleep(0.01) # 模拟处理时间
return {"processed": True, "steps": total_steps}
async def analyze_data(data: List[int]):
"""分析数据的工具"""
await asyncio.sleep(0.5)
return {
"sum": sum(data),
"average": sum(data) / len(data),
"count": len(data)
}
self.executor.register_tool("process_file", process_file)
self.executor.register_tool("analyze_data", analyze_data)
async def execute_with_progress_tracking(self, tool_name: str, params: Dict) -> Any:
"""执行带进度跟踪的工具"""
event_id = str(uuid.uuid4())
async def progress_callback(progress: float):
await self.executor.subject.emit_event(
EventType.TOOL_PROGRESS,
tool_name,
{"progress": progress, "event_id": event_id}
)
# 为进度回调添加参数
params_with_callback = params.copy()
if tool_name == "process_file":
params_with_callback["progress_callback"] = progress_callback
return await self.executor.execute_tool(tool_name, params_with_callback)
# 使用示例
async def observer_example():
# 创建事件系统
system = ToolEventSystem()
system.setup_observers()
system.register_tools()
try:
# 执行带进度跟踪的工具
print("Processing file...")
result = await system.execute_with_progress_tracking(
"process_file",
{"file_path": "large_file.txt"}
)
print(f"Processing result: {result}")
# 执行数据分析工具
print("Analyzing data...")
result = await system.executor.execute_tool(
"analyze_data",
{"data": [1, 2, 3, 4, 5]}
)
print(f"Analysis result: {result}")
except Exception as e:
print(f"Error: {e}")
Function Calling 的最佳实践确保代码质量和系统可靠性,提供可维护和可扩展的解决方案。
设计原则
性能考虑
安全性
测试策略
常见的 Function Calling 反模式需要避免,这些模式会导致代码质量和系统性能问题。
设计反模式
性能反模式
Function Calling 调试需要系统化的方法,快速定位和解决问题。
调试工具
常见问题
调试策略
优化建议
Function Calling 需要全面的安全考虑,保护系统和用户数据的安全。
输入验证
输出编码
权限控制
数据保护
监控和日志是 Function Calling 系统运行的基础,提供可观测性和问题诊断能力。
监控指标
告警规则
日志策略
日志级别
全面的测试策略确保 Function Calling 的质量和可靠性,覆盖各个层面。
单元测试
集成测试
性能测试
测试工具
Function Calling 系统需要良好的扩展性和集成能力,适应不断变化的需求。
扩展点
集成方式
兼容性
未来方向
OpenAI Function Calling 已经成为 AI 应用开发的核心技术,正在不断演进和发展。
核心成就
技术优势
未来趋势
发展方向