源码级别解析 · 源码解析 · 完整架构分析 · 2026最佳实践
2026-06-01 | 每日技术深度解读
基于Vercel AI SDK v6最新版本,覆盖企业级应用开发
模块化设计,支持渐进式采用
import { generateText } from 'ai';
// 使用Vercel AI Gateway
const result = await generateText({
model: 'anthropic/claude-opus-4.6',
prompt: '什么是人工智能代理?',
maxTokens: 500,
temperature: 0.7
});
console.log(result.text);
最简单的AI文本生成,通过AI Gateway访问25+提供商
import { generateText } from 'ai';
import { anthropic } from '@ai-sdk/anthropic';
import { openai } from '@ai-sdk/openai';
// 直接使用Anthropic
const anthropicResult = await generateText({
model: anthropic('claude-3-5-sonnet-20241022'),
prompt: '解释TypeScript的类型系统'
});
// 直接使用OpenAI
const openaiResult = await generateText({
model: openai('gpt-4-turbo'),
prompt: '解释React的虚拟DOM'
});
支持直接连接各个AI提供商,无需通过Gateway
统一的接口设计,切换提供商无需重写代码
import { generateText, Output } from 'ai';
import { z } from 'zod';
const { output } = await generateText({
model: 'openai/gpt-4-turbo',
output: Output.object({
schema: z.object({
recipe: z.object({
name: z.string(),
ingredients: z.array(
z.object({
name: z.string(),
amount: z.string(),
unit: z.string().optional()
})
),
steps: z.array(z.string()),
cookingTime: z.number(),
difficulty: z.enum(['简单', '中等', '困难'])
})
})
}),
prompt: '生成一份详细的中式宫保鸡丁食谱'
});
console.log(output.recipe);
使用Zod进行类型安全的结构化数据提取
v6是框架发布以来最重要的版本更新
分层架构设计,各模块职责清晰
基于LLM的智能代理,支持复杂任务执行
import { ToolLoopAgent } from 'ai';
import { openai } from '@ai-sdk/openai';
const imageGenerationAgent = new ToolLoopAgent({
model: 'openai/gpt-4-turbo',
system: '你是一个专业的图像生成助手',
tools: {
generateImage: openai.tools.imageGeneration({
size: '1024x1024',
quality: 'hd',
style: 'vivid'
}),
saveImage: {
description: '保存生成的图片到本地',
parameters: {
type: 'object',
properties: {
imageData: { type: 'string', description: 'Base64编码的图片数据' },
filename: { type: 'string', description: '文件名' }
}
}
}
}
});
创建支持图像生成和保存的智能代理
import { ToolLoopAgent } from 'ai';
import { openai } from '@ai-sdk/openai';
const sandboxAgent = new ToolLoopAgent({
model: 'openai/gpt-4-turbo',
system: '你是一个拥有shell环境的智能助手',
tools: {
shell: openai.tools.localShell({
execute: async ({ action }) => {
const [cmd, ...args] = action.command;
const sandbox = await getSandbox(); // Vercel Sandbox
const command = await sandbox.runCommand({ cmd, args });
return { output: await command.stdout() };
},
maxCommands: 10,
timeout: 30000
}),
filesystem: openai.tools.localFilesystem()
}
});
支持文件系统操作和shell命令执行的安全代理
强大的工具系统支持复杂任务分解
import { z } from 'zod';
// 定义工具类型
const WeatherTool = {
description: '获取指定城市的天气信息',
parameters: z.object({
city: z.string().min(1, '城市名称不能为空'),
unit: z.enum(['celsius', 'fahrenheit']).default('celsius'),
days: z.number().min(1).max(7).default(1)
})
};
// 实现工具函数
async function getWeather(params: z.infer<typeof WeatherTool.parameters>) {
const response = await fetch(
`https://api.weatherapi.com/v1/forecast.json?key=${API_KEY}&q=${params.city}&days=${params.days}`
);
return response.json();
}
// 注册到AI SDK
const weatherTool = {
...WeatherTool,
execute: getWeather
};
使用Zod进行严格的类型定义和参数验证
提供完整的聊天界面和工具调用可视化
'use client';
import { useChat } from '@ai-sdk/react';
import { useState } from 'react';
export default function ChatInterface() {
const { messages, status, sendMessage } = useChat({
api: '/api/chat',
initialMessages: [
{ role: 'assistant', content: '你好!我是你的AI助手' }
]
});
const [input, setInput] = useState('');
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (input.trim()) {
sendMessage({ content: input });
setInput('');
}
};
return (
<div className="chat-container">
<div className="messages">
{messages.map((message) => (
<div
key={message.id}
className={`message ${message.role}`}
>
<strong>{message.role}:</strong>
{message.content}
</div>
))}
</div>
<form onSubmit={handleSubmit}>
<input
type="text"
value={input}
onChange={(e) => setInput(e.target.value)}
disabled={status !== 'ready'}
placeholder="输入消息..."
/>
<button
type="submit"
disabled={status !== 'ready' || !input.trim()}
>
发送
</button>
</form>
</div>
);
}
完整的React聊天界面实现,支持实时对话
'use client';
import { useChat } from '@ai-sdk/react';
import ImageGenerationView from '@/components/image-generation-view';
interface ImageGenerationMessage {
role: 'user' | 'assistant';
parts: Array<{
type: 'text' | 'tool-generateImage';
text?: string;
invocation?: any;
}>;
}
export default function ImageChat() {
const { messages, status, sendMessage } = useChat<ImageGenerationMessage>({
api: '/api/image-generation'
});
return (
<div>
{messages.map((message) => (
<div key={message.id}>
<strong>{message.role}: </strong>
{message.parts.map((part, index) => {
switch (part.type) {
case 'text':
return <div key={index}>{part.text}</div>;
case 'tool-generateImage':
return <ImageGenerationView key={index} invocation={part.invocation} />;
}
})}
</div>
))}
{/* 输入框实现 */}
</div>
);
}
支持工具调用可视化的聊天界面
提供直观的工具调用体验
import { openai } from '@ai-sdk/openai';
import { UIToolInvocation } from 'ai';
interface ImageGenerationProps {
invocation: UIToolInvocation<
ReturnType<typeof openai.tools.imageGeneration>
>;
}
export default function ImageGenerationView({
invocation
}: ImageGenerationProps) {
switch (invocation.state) {
case 'input-available':
return (
<div className="tool-invocation">
<div className="loading">正在生成图像...</div>
<div className="spinner"></div>
</div>
);
case 'output-available':
return (
<div className="tool-invocation">
<h4>生成的图像:</h4>
<img
src={`data:image/png;base64,${invocation.output.result}`}
alt="Generated image"
className="generated-image"
/>
<div className="image-info">
<p>尺寸: {invocation.output.size}</p>
<p>质量: {invocation.output.quality}</p>
</div>
</div>
);
case 'error':
return (
<div className="tool-invocation error">
<p>生成失败: {invocation.error.message}</p>
<button onClick={() => invocation.retry()}>
重试
</button>
</div>
);
}
}
完整的图像生成组件实现
完整的Next.js API路由实现
import { openai } from '@ai-sdk/openai';
import { ToolLoopAgent, InferAgentUIMessage } from 'ai';
import { createAgentUIStreamResponse } from 'ai';
// 创建图像生成代理
const imageGenerationAgent = new ToolLoopAgent({
model: 'openai/gpt-4-turbo',
tools: {
generateImage: openai.tools.imageGeneration({
partialImages: 3,
size: '1024x1024'
})
}
});
type ImageAgentMessage = InferAgentUIMessage<
typeof imageGenerationAgent
>;
export async function POST(req: Request) {
try {
const { messages } = await req.json();
// 创建流式响应
const response = createAgentUIStreamResponse({
agent: imageGenerationAgent,
messages,
onToolCall: (toolCall) => {
console.log('工具调用:', toolCall);
},
onError: (error) => {
console.error('代理错误:', error);
}
});
return response;
} catch (error) {
console.error('API错误:', error);
return new Response(
JSON.stringify({ error: '处理请求时发生错误' }),
{
status: 500,
headers: { 'Content-Type': 'application/json' }
}
);
}
}
支持流式响应的完整API实现
特定场景的专业Agent实现
import { ToolLoopAgent } from 'ai';
import { openai } from '@ai-sdk/openai';
const dataAnalysisAgent = new ToolLoopAgent({
model: 'openai/gpt-4-turbo',
system: '''你是一个专业的数据分析助手。
你可以读取CSV文件、进行统计分析、生成可视化图表。
使用pandas进行数据处理,matplotlib/seaborn进行可视化。''',
tools: {
loadCSV: {
description: '加载CSV文件并显示基本信息',
parameters: {
type: 'object',
properties: {
filename: { type: 'string' }
}
}
},
analyzeData: {
description: '进行统计分析',
parameters: {
type: 'object',
properties: {
data: { type: 'string' },
analysisType: {
type: 'string',
enum: ['descriptive', 'correlation', 'regression']
}
}
}
},
generateChart: {
description: '生成数据可视化图表',
parameters: {
type: 'object',
properties: {
data: { type: 'string' },
chartType: {
type: 'string',
enum: ['line', 'bar', 'scatter', 'histogram']
}
}
}
}
}
});
专门用于数据分析的智能代理
基于TypeScript的模块化架构设计
// packages/core/src/index.ts
export * from './generation';
export * from './providers';
export * from './tools';
export * from './agent';
export * from './utils';
// packages/providers/openai/src/index.ts
export * from './openai-provider';
export * from './openai-tools';
export * from './openai-models';
// packages/ui/react/src/index.ts
export * from './chat';
export * from './hooks';
export * from './components';
清晰的模块导出结构
// 核心Provider接口
interface Provider {
name: string;
models: Model[];
generateText(params: GenerationParams): Promise<GenerationResult>;
generateImage?(params: ImageGenerationParams): Promise<ImageGenerationResult>;
}
// OpenAI Provider实现
class OpenAIProvider implements Provider {
name = 'openai';
constructor(private apiKey: string) {}
async generateText(params: GenerationParams) {
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
model: params.model,
messages: [{ role: 'user', content: params.prompt }],
max_tokens: params.maxTokens,
temperature: params.temperature
})
});
return response.json();
}
}
统一的Provider抽象层设计
完整的类型安全保障系统
import { z } from 'zod';
// 严格定义工具参数类型
const CalculatorTool = {
name: 'calculator',
description: '执行数学计算',
parameters: z.object({
expression: z.string().min(1, '表达式不能为空'),
precision: z.number().min(1).max(10).default(6)
}).refine(
(data) => {
// 验证数学表达式有效性
try {
new Function(`return ${data.expression}`)();
return true;
} catch {
return false;
}
},
{ message: '无效的数学表达式' }
)
};
type CalculatorParams = z.infer<typeof CalculatorTool.parameters>;
// 类型安全的执行函数
async function executeCalculation(params: CalculatorParams): Promise<number> {
try {
const result = new Function(`return ${params.expression}`)();
return Number(result.toFixed(params.precision));
} catch (error) {
throw new Error(`计算错误: ${error instanceof Error ? error.message : '未知错误'}`);
}
}
结合Zod的严格类型验证
生产环境性能优化最佳实践
// 简单的内存缓存实现
class ResponseCache {
private cache = new Map<string, Promise<any>>();
async get(key: string, factory: () => Promise<any>): Promise<any> {
if (this.cache.has(key)) {
return this.cache.get(key);
}
const promise = factory();
this.cache.set(key, promise);
try {
const result = await promise;
return result;
} finally {
// 清理过期的缓存
setTimeout(() => {
this.cache.delete(key);
}, 5 * 60 * 1000); // 5分钟后过期
}
}
}
// 并发控制装饰器
function concurrencyLimit(maxConcurrent: number) {
let running = 0;
const queue: Array<() => Promise<any>> = [];
return function <T>(fn: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
const run = async () => {
try {
running++;
const result = await fn();
resolve(result);
} catch (error) {
reject(error);
} finally {
running--;
if (queue.length > 0) {
const next = queue.shift()!;
run();
}
}
};
if (running < maxConcurrent) {
run();
} else {
queue.push(run);
}
});
};
}
生产环境缓存和并发控制实现
完善的错误处理和调试系统
// 错误类型定义
enum ErrorType {
ProviderError = 'PROVIDER_ERROR',
ValidationError = 'VALIDATION_ERROR',
TimeoutError = 'TIMEOUT_ERROR',
RateLimitError = 'RATE_LIMIT_ERROR'
}
class AIError extends Error {
constructor(
public type: ErrorType,
message: string,
public details?: any,
public retryable: boolean = false
) {
super(message);
this.name = 'AIError';
}
}
// 错误处理中间件
async function handleAIRequest(requestFn: () => Promise<any>) {
try {
const result = await requestFn();
return result;
} catch (error) {
if (error instanceof AIError) {
// 处理特定错误类型
switch (error.type) {
case ErrorType.RateLimitError:
await delay(1000); // 等待1秒后重试
return handleAIRequest(requestFn);
case ErrorType.TimeoutError:
throw new AIError(
ErrorType.TimeoutError,
'请求超时,请重试',
undefined,
true
);
default:
throw error;
}
}
throw error;
}
}
完善的错误处理和重试机制
完整的测试覆盖和质量保障
import { describe, it, expect, beforeEach } from 'vitest';
import { ToolLoopAgent } from 'ai';
import { openai } from '@ai-sdk/openai';
describe('ToolLoopAgent', () => {
let agent: ToolLoopAgent;
beforeEach(() => {
agent = new ToolLoopAgent({
model: 'openai/gpt-4-turbo',
tools: {
mockTool: {
description: '测试工具',
parameters: {
type: 'object',
properties: {
input: { type: 'string' }
}
},
execute: async ({ input }) => ({
result: `处理结果: ${input}`
})
}
}
});
});
it('应该正确处理用户输入', async () => {
const response = await agent.run('使用mockTool测试输入');
expect(response.content).toContain('处理结果');
});
it('应该验证工具参数', async () => {
await expect(
agent.run('使用mockTool但没有参数')
).rejects.toThrow('缺少必需参数');
});
});
Agent的单元测试实现
生产环境部署最佳实践
# Dockerfile for AI SDK Application
FROM node:22-alpine
WORKDIR /app
# 安装依赖
COPY package*.json ./
RUN npm ci --only=production
# 复制应用代码
COPY . .
# 构建应用
RUN npm run build
# 暴露端口
EXPOSE 3000
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \\
CMD curl -f http://localhost:3000/health || exit 1
# 启动应用
CMD ["npm", "start"]
生产级Docker容器配置
完整的监控和日志系统
import { performance } from 'perf_hooks';
// 监控中间件
function monitoringMiddleware() {
return async (req: any, res: any, next: any) => {
const start = performance.now();
// 监控响应
res.on('finish', () => {
const duration = performance.now() - start;
// 记录指标
console.log({
method: req.method,
url: req.url,
statusCode: res.statusCode,
duration: duration.toFixed(2) + 'ms',
timestamp: new Date().toISOString()
});
// 发送到监控系统
if (duration > 1000) {
// 慢请求告警
console.warn(`慢请求检测: ${req.method} ${req.url} 耗时 ${duration.toFixed(2)}ms`);
}
});
next();
};
}
完整的请求监控实现
企业级安全防护措施
import { z } from 'zod';
// 输入验证中间件
const validateInput = (schema: z.ZodSchema) => {
return (req: any, res: any, next: any) => {
try {
const validated = schema.parse(req.body);
req.body = validated;
next();
} catch (error) {
if (error instanceof z.ZodError) {
return res.status(400).json({
error: '输入验证失败',
details: error.errors.map(e => ({
field: e.path.join('.'),
message: e.message
}))
});
}
next(error);
}
};
};
// 速率限制中间件
const rateLimit = (maxRequests: number, windowMs: number) => {
const requests = new Map<string, number[]>();
return (req: any, res: any, next: any) => {
const clientId = req.ip || req.connection.remoteAddress;
const now = Date.now();
if (!requests.has(clientId)) {
requests.set(clientId, []);
}
const clientRequests = requests.get(clientId)!;
// 清理过期请求
const validRequests = clientRequests.filter(
timestamp => now - timestamp < windowMs
);
if (validRequests.length >= maxRequests) {
return res.status(429).json({
error: '请求过于频繁',
retryAfter: Math.ceil((windowMs - (now - validRequests[0])) / 1000)
});
}
validRequests.push(now);
requests.set(clientId, validRequests);
next();
};
};
完整的安全防护实现
真实场景的AI应用实现
大型AI应用的微服务架构设计
// 服务发现和注册
class ServiceRegistry {
private services = new Map<string, Service[]>();
register(service: Service) {
if (!this.services.has(service.name)) {
this.services.set(service.name, []);
}
const services = this.services.get(service.name)!;
services.push(service);
// 定期健康检查
setInterval(() => {
this.healthCheck(service);
}, 30000);
}
getHealthyService(serviceName: string): Service | null {
const services = this.services.get(serviceName) || [];
const healthyServices = services.filter(s => s.healthy);
if (healthyServices.length === 0) {
return null;
}
// 轮询选择
const index = Math.floor(Math.random() * healthyServices.length);
return healthyServices[index];
}
private async healthCheck(service: Service) {
try {
const response = await fetch(`${service.url}/health`);
service.healthy = response.ok;
} catch {
service.healthy = false;
}
}
}
// 负载均衡器
class LoadBalancer {
constructor(private registry: ServiceRegistry) {}
async routeRequest(serviceName: string, request: any) {
const service = this.registry.getHealthyService(serviceName);
if (!service) {
throw new Error('没有可用的服务实例');
}
const circuitBreaker = new CircuitBreaker(service.url);
return circuitBreaker.execute(() =>
fetch(service.url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(request)
})
);
}
}
企业级微服务架构实现
全面的性能测试和监控
import { performance } from 'perf_hooks';
import { Worker } from 'worker_threads';
class PerformanceTester {
async runBenchmark(config: {
endpoint: string;
concurrentUsers: number;
requestsPerUser: number;
payloads: any[];
}) {
const results = [];
for (let user = 0; user < config.concurrentUsers; user++) {
const userResults = await this.runUserLoad(
user,
config.endpoint,
config.requestsPerUser,
config.payloads
);
results.push(...userResults);
}
return this.analyzeResults(results);
}
private async runUserLoad(
userId: number,
endpoint: string,
requestCount: number,
payloads: any[]
): Promise<PerformanceResult[]> {
const results = [];
for (let i = 0; i < requestCount; i++) {
const payload = payloads[i % payloads.length];
const start = performance.now();
try {
const response = await fetch(endpoint, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload)
});
const duration = performance.now() - start;
results.push({
userId,
requestId: i,
success: response.ok,
duration,
statusCode: response.status,
timestamp: new Date().toISOString()
});
} catch (error) {
const duration = performance.now() - start;
results.push({
userId,
requestId: i,
success: false,
duration,
error: error instanceof Error ? error.message : '未知错误',
timestamp: new Date().toISOString()
});
}
// 模拟用户思考时间
await this.delay(Math.random() * 1000 + 500);
}
return results;
}
private analyzeResults(results: PerformanceResult[]) {
const successful = results.filter(r => r.success);
const failed = results.filter(r => !r.success);
const durations = successful.map(r => r.duration);
const avgDuration = durations.reduce((a, b) => a + b, 0) / durations.length;
const minDuration = Math.min(...durations);
const maxDuration = Math.max(...durations);
return {
totalRequests: results.length,
successfulRequests: successful.length,
failedRequests: failed.length,
successRate: (successful.length / results.length) * 100,
avgResponseTime: avgDuration,
minResponseTime: minDuration,
maxResponseTime: maxDuration,
p95ResponseTime: this.percentile(durations, 95),
p99ResponseTime: this.percentile(durations, 99),
errors: failed.map(f => f.error)
};
}
private percentile(values: number[], p: number): number {
const sorted = [...values].sort((a, b) => a - b);
const index = Math.ceil((p / 100) * sorted.length) - 1;
return sorted[index];
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
完整的性能基准测试实现
AI技术发展的未来方向
强大的开源生态系统
完整的学习资源体系
从理论到实践的完整学习旅程
企业级AI应用开发最佳实践
感谢阅读!
访问 https://atcfu.com/ai-articles/vercel-ai-sdk/ 回顾本文