源码级别解析 · 全栈架构 · 源码剖析 · 实战指南
2026-06-02 | 每日技术深度解读
GitHub 上的明星项目,🌟 10k+ stars
类型安全的端到端通信架构
重新定义 AI 智能体的交互模式
完整的数据流向和组件交互关系
异步处理 + 实时响应的双重模式
// ChatWindow 组件 - 实时对话界面
const ChatWindow: React.FC<ChatWindowProps> = ({ agentId }) => {
const [messages, setMessages] = useState<Message[]>([]);
const [inputValue, setInputValue] = useState('');
// 使用 tRPC 进行类型安全通信
const sendMessage = trpc.agent.sendMessage.useMutation({
onSuccess: (newMessage) => {
setMessages(prev => [...prev, newMessage]);
},
onError: (error) => {
console.error('发送消息失败:', error);
}
});
return (
<div className="chat-container">
{messages.map((msg) => (
<MessageBubble key={msg.id} message={msg} />
))}
<InputField
value={inputValue}
onChange={(e) => setInputValue(e.target.value)}
onSend={() => sendMessage.mutate({
agentId,
content: inputValue
})}
/>
</div>
);
};
基于 tRPC 的类型安全前端通信架构
// next/src/server/api/root.ts - tRPC 路由配置
import { t } from './trpc';
import { z } from 'zod';
export const appRouter = t.router({
// 智能体相关 API
agent: t.router({
create: t.procedure
.input(z.object({
name: z.string(),
objective: z.string(),
model: z.enum(['gpt-4', 'claude-3', 'gemini-pro']),
}))
.mutation(async ({ input }) => {
// 创建新智能体
const agent = await prisma.agent.create({
data: {
name: input.name,
objective: input.objective,
model: input.model,
status: 'CREATED'
}
});
return agent;
}),
sendMessage: t.procedure
.input(z.object({
agentId: z.string(),
content: z.string(),
}))
.mutation(async ({ input }) => {
// 处理用户消息并发送至后端
const response = await fetch('http://localhost:8000/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
agentId: input.agentId,
message: input.content
})
});
return response.json();
}),
}),
});
类型安全的 API 路由定义和验证
支持热插拔的智能体管理模式
| 状态 | 描述 | 触发条件 | 操作权限 |
|---|---|---|---|
| CREATED | 已创建,等待启动 | 用户点击启动 | 编辑配置 |
| RUNNING | 运行中,执行任务 | 启动成功 | 暂停/终止 |
| PAUSED | 已暂停,保存状态 | 用户点击暂停 | 恢复/终止 |
| COMPLETED | 目标完成,停止运行 | 目标达成 | 查看历史 |
| FAILED | 执行失败,需要干预 | 多次重试失败 | 重新配置/终止 |
| TERMINATED | 已终止,资源释放 | 用户主动终止 | 查看日志 |
# platform/reworkd_platform/agents/manager.py
class AgentManager:
def __init__(self):
self.agents = {}
self.event_bus = EventBus()
async def create_agent(self, agent_config: AgentConfig) -> Agent:
"""创建新的智能体实例"""
agent = Agent(
id=str(uuid.uuid4()),
config=agent_config,
state=AgentState.CREATED
)
# 初始化智能体
await agent.initialize()
# 注册到管理器
self.agents[agent.id] = agent
# 发布创建事件
await self.event_bus.publish(
AgentEvent(
type='AGENT_CREATED',
agent_id=agent.id,
data=agent.config.dict()
)
)
return agent
async def start_agent(self, agent_id: str) -> bool:
"""启动指定智能体"""
agent = self.agents.get(agent_id)
if not agent:
raise AgentNotFoundError(f"Agent {agent_id} not found")
await agent.start()
await self.event_bus.publish(
AgentEvent(type='AGENT_STARTED', agent_id=agent_id)
)
return True
基于事件驱动的智能体管理架构
松耦合的组件设计,支持水平扩展
完整的智能体状态转换逻辑
# platform/reworkd_platform/web/api/agent.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List
router = APIRouter()
class AgentRequest(BaseModel):
name: str
objective: str
model: str = "gpt-4"
class AgentResponse(BaseModel):
id: str
name: str
status: str
created_at: datetime
@router.post("/agents", response_model=AgentResponse)
async def create_agent(request: AgentRequest):
"""创建新智能体"""
try:
# 验证模型配置
if request.model not in AVAILABLE_MODELS:
raise HTTPException(
status_code=400,
detail=f"Model {request.model} not available"
)
# 创建智能体
agent = await agent_manager.create_agent(
AgentConfig(
name=request.name,
objective=request.objective,
model=request.model
)
)
return AgentResponse(
id=agent.id,
name=agent.name,
status=agent.state.value,
created_at=agent.created_at
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
RESTful API 设计和错误处理
统一的 LLM 抽象层,支持热插拔
# platform/reworkd_platform/llm/base.py
from abc import ABC, abstractmethod
from typing import List, Optional
class LLMProvider(ABC):
"""LLM 提供者抽象基类"""
@abstractmethod
async def generate_response(
self,
messages: List[dict],
temperature: float = 0.7,
max_tokens: int = 1000
) -> str:
"""生成响应"""
pass
@abstractmethod
async def validate_config(self) -> bool:
"""验证配置"""
pass
class OpenAIProvider(LLMProvider):
"""OpenAI 提供者实现"""
def __init__(self, api_key: str, model: str = "gpt-4"):
self.client = AsyncOpenAI(api_key=api_key)
self.model = model
async def generate_response(self, messages: List[dict], **kwargs) -> str:
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
**kwargs
)
return response.choices[0].message.content
class ClaudeProvider(LLMProvider):
"""Anthropic Claude 提供者实现"""
def __init__(self, api_key: str, model: str = "claude-3-sonnet-20240229"):
self.client = AsyncAnthropic(api_key=api_key)
self.model = model
async def generate_response(self, messages: List[dict], **kwargs) -> str:
response = await self.client.messages.create(
model=self.model,
messages=messages,
**kwargs
)
return content[0].text
class LLMManager:
"""LLM 管理器"""
def __init__(self):
self.providers = {}
def register_provider(self, name: str, provider: LLMProvider):
"""注册 LLM 提供者"""
self.providers[name] = provider
async def get_response(self, provider_name: str, messages: List[dict], **kwargs) -> str:
"""获取响应"""
provider = self.providers.get(provider_name)
if not provider:
raise ValueError(f"Provider {provider_name} not found")
return await provider.generate_response(messages, **kwargs)
策略模式实现多模型支持
基于强化学习的智能决策系统
# platform/reworkd_platform/agents/executor.py
class AgentExecutor:
"""智能体执行器"""
def __init__(self, agent: Agent, llm_manager: LLMManager):
self.agent = agent
self.llm_manager = llm_manager
self.task_queue = AsyncQueue()
self.is_running = False
async def start(self):
"""启动执行器"""
self.is_running = True
# 启动执行循环
asyncio.create_task(self._execution_loop())
# 开始目标分析
await self._analyze_objective()
async def _execution_loop(self):
"""主执行循环"""
while self.is_running:
try:
# 获取下一个任务
task = await asyncio.wait_for(
self.task_queue.get(),
timeout=1.0
)
# 执行任务
await self._execute_task(task)
except asyncio.TimeoutError:
continue
except Exception as e:
await self._handle_error(e)
async def _analyze_objective(self):
"""分析目标并生成任务"""
objective = self.agent.config.objective
# 使用 LLM 分析目标
prompt = f"分析以下目标并分解为可执行任务:\n{objective}"
response = await self.llm_manager.get_response(
self.agent.config.model,
[{"role": "user", "content": prompt}]
)
# 解析任务列表
tasks = self._parse_tasks(response)
# 添加到任务队列
for task in tasks:
await self.task_queue.put(task)
异步任务队列和执行循环
基于大语言模型的目标理解能力
完整的任务生命周期管理
# platform/reworkd_platform/agents/handler.py
class MessageHandler:
"""消息处理器"""
def __init__(self, agent: Agent):
self.agent = agent
self.message_history = []
async def process_user_message(self, message: str) -> str:
"""处理用户消息"""
# 添加到对话历史
self.message_history.append({
"role": "user",
"content": message,
"timestamp": datetime.utcnow()
})
# 构建提示词
prompt = self._build_prompt()
# 获取 LLM 响应
response = await self._get_llm_response(prompt)
# 添加到对话历史
self.message_history.append({
"role": "assistant",
"content": response,
"timestamp": datetime.utcnow()
})
return response
def _build_prompt(self) -> str:
"""构建提示词"""
# 系统提示
system_prompt = f"你是一个名为 {self.agent.name} 的 AI 智能体。\n"
system_prompt += f"你的目标是:{self.agent.config.objective}\n\n"
system_prompt += "请按照以下格式回应:\n"
system_prompt += "1. 思考:分析当前情况\n"
system_prompt += "2. 行动:决定下一步行动\n"
system_prompt += "3. 反思:总结结果和经验\n\n"
# 对话历史
conversation = ""
for msg in self.message_history[-10:]: # 只保留最近 10 条
conversation += f"{msg['role']}: {msg['content']}\n"
return system_prompt + conversation
async def _get_llm_response(self, prompt: str) -> str:
"""获取 LLM 响应"""
messages = [{"role": "user", "content": prompt}]
return await self.agent.llm_manager.get_response(
self.agent.config.model,
messages
)
基于上下文的消息处理系统
支持长对话上下文的理解能力
# platform/reworkd_platform/db/models.py
from sqlalchemy import Column, String, DateTime, Text, JSON, Enum
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
from enum import Enum as PyEnum
class AgentState(PyEnum):
CREATED = "CREATED"
RUNNING = "RUNNING"
PAUSED = "PAUSED"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
TERMINATED = "TERMINATED"
Base = declarative_base()
class AgentModel(Base):
__tablename__ = "agents"
id = Column(String, primary_key=True)
name = Column(String, nullable=False)
objective = Column(Text, nullable=False)
model = Column(String, nullable=False)
state = Column(Enum(AgentState), default=AgentState.CREATED)
config = Column(JSON) # 额外配置参数
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关联关系
messages = relationship("MessageModel", back_populates="agent")
tasks = relationship("TaskModel", back_populates="agent")
class MessageModel(Base):
__tablename__ = "messages"
id = Column(String, primary_key=True)
agent_id = Column(String, ForeignKey("agents.id"))
role = Column(String, nullable=False) # user/assistant
content = Column(Text, nullable=False)
metadata = Column(JSON) # 消息元数据
timestamp = Column(DateTime, default=datetime.utcnow)
# 关联关系
agent = relationship("AgentModel", back_populates="messages")
class TaskModel(Base):
__tablename__ = "tasks"
id = Column(String, primary_key=True)
agent_id = Column(String, ForeignKey("agents.id"))
name = Column(String, nullable=False)
description = Column(Text)
status = Column(String, default="PENDING") # PENDING/RUNNING/COMPLETED/FAILED
result = Column(Text) # 任务结果
error = Column(Text) # 错误信息
created_at = Column(DateTime, default=datetime.utcnow)
completed_at = Column(DateTime)
# 关联关系
agent = relationship("AgentModel", back_populates="tasks")
完整的数据库模型设计
基于 PostgreSQL 的强一致性保证
| 表名 | 索引字段 | 索引类型 | 使用场景 |
|---|---|---|---|
| agents | id (主键) | B-Tree | 快速查找智能体 |
| agents | state | B-Tree | 按状态筛选智能体 |
| agents | created_at | B-Tree | 时间范围查询 |
| messages | agent_id | B-Tree | 获取智能体消息 |
| messages | timestamp | B-Tree | 时间排序 |
| tasks | agent_id | B-Tree | 任务查询 |
| tasks | status | B-Tree | 状态筛选 |
| tasks | created_at | B-Tree | 创建时间排序 |
// next/src/server/db/schema.prisma
generator client {
provider = "prisma-client-js"
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
model Agent {
id String @id @default(cuid())
name String
objective String
model String
state AgentState @default(CREATED)
config Json?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
// 关联关系
messages Message[]
tasks Task[]
@@map("agents")
}
model Message {
id String @id @default(cuid())
agentId String
role String // user, assistant, system
content String
metadata Json?
timestamp DateTime @default(now())
// 关联关系
agent Agent @relation(fields: [agentId], references: [id], onDelete: Cascade)
@@map("messages")
}
model Task {
id String @id @default(cuid())
agentId String
name String
description String?
status String @default("PENDING")
result String?
error String?
createdAt DateTime @default(now())
completedAt DateTime?
// 关联关系
agent Agent @relation(fields: [agentId], references: [id], onDelete: Cascade)
@@map("tasks")
}
enum AgentState {
CREATED
RUNNING
PAUSED
COMPLETED
FAILED
TERMINATED
}
类型安全的数据库模型定义
企业级安全标准实现
// next/src/server/auth/index.ts
import NextAuth from "next-auth";
import CredentialsProvider from "next-auth/providers/credentials";
import GoogleProvider from "next-auth/providers/google";
import { prisma } from "../db";
export const authOptions = {
providers: [
CredentialsProvider({
name: "credentials",
credentials: {
email: { label: "Email", type: "email" },
password: { label: "Password", type: "password" }
},
async authorize(credentials) {
// 验证用户凭据
const user = await prisma.user.findUnique({
where: {
email: credentials.email
}
});
if (user && await verifyPassword(credentials.password, user.password)) {
return {
id: user.id,
email: user.email,
name: user.name,
role: user.role
};
}
return null;
}
}),
GoogleProvider({
clientId: process.env.GOOGLE_CLIENT_ID!,
clientSecret: process.env.GOOGLE_CLIENT_SECRET!,
}),
],
session: {
strategy: "jwt" as const,
maxAge: 30 * 24 * 60 * 60, // 30 days
},
callbacks: {
async jwt({ token, user }) {
if (user) {
token.role = user.role;
}
return token;
},
async session({ session, token }) {
if (session.user) {
session.user.id = token.sub as string;
session.user.role = token.role as string;
}
return session;
},
},
pages: {
signIn: "/signin",
signOut: "/signout",
},
};
export default NextAuth(authOptions);
完整的认证流程配置
生产级 API 网关实现
// next/src/server/middleware.ts
import { NextRequest, NextResponse } from "next/server";
import { getToken } from "next-auth/jwt";
// 限流配置
const RATE_LIMITS = {
"/api/agents": {
windowMs: 60 * 1000, // 1 minute
max: 100, // 100 requests per minute
},
"/api/chat": {
windowMs: 60 * 1000,
max: 1000, // 1000 requests per minute
},
};
// 请求限流中间件
async function rateLimit(request: NextRequest, limit: number) {
const ip = request.ip || "unknown";
const key = `rate_limit:${ip}:${request.nextUrl.pathname}`;
const current = await redis.get(key);
const count = current ? parseInt(current) + 1 : 1;
if (count > limit) {
return NextResponse.json(
{ error: "Rate limit exceeded" },
{ status: 429 }
);
}
await redis.setex(key, 60, count.toString());
return null;
}
// 认证中间件
async function authMiddleware(request: NextRequest) {
const token = await getToken({ req: request });
if (!token) {
return NextResponse.json(
{ error: "Unauthorized" },
{ status: 401 }
);
}
return null;
}
export async function middleware(request: NextRequest) {
const pathname = request.nextUrl.pathname;
// API 路由处理
if (pathname.startsWith("/api/")) {
// 检查限流
const limitConfig = RATE_LIMITS[pathname as keyof typeof RATE_LIMITS];
if (limitConfig) {
const rateLimitResponse = await rateLimit(request, limitConfig.max);
if (rateLimitResponse) return rateLimitResponse;
}
// 需要认证的 API 路由
const protectedRoutes = ["/api/agents", "/api/chat", "/api/tasks"];
if (protectedRoutes.some(route => pathname.startsWith(route))) {
const authResponse = await authMiddleware(request);
if (authResponse) return authResponse;
}
}
return NextResponse.next();
}
生产级中间件实现
微服务架构实现
# docker-compose.yml
version: '3.8'
services:
# Next.js 前端
frontend:
build:
context: ./next
dockerfile: Dockerfile
ports:
- "3000:3000"
environment:
- NODE_ENV=production
- NEXT_PUBLIC_API_URL=http://localhost:8000
depends_on:
- backend
networks:
- agentgpt-network
# FastAPI 后端
backend:
build:
context: ./platform
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://postgres:password@db:5432/agentgpt
- REDIS_URL=redis://redis:6379
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
depends_on:
- db
- redis
networks:
- agentgpt-network
# PostgreSQL 数据库
db:
image: postgres:15-alpine
environment:
POSTGRES_DB: agentgpt
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- agentgpt-network
# Redis 缓存
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
networks:
- agentgpt-network
# Nginx 反向代理
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- frontend
- backend
networks:
- agentgpt-network
volumes:
postgres_data:
redis_data:
networks:
agentgpt-network:
driver: bridge
完整的服务编排配置
DevOps 完整可观测性栈
# platform/reworkd_platform/monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time
# 定义监控指标
REQUEST_COUNT = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
REQUEST_DURATION = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint']
)
ACTIVE_AGENTS = Gauge(
'active_agents_count',
'Number of active agents'
)
AGENT_TASKS = Gauge(
'agent_tasks_total',
'Total agent tasks',
['status']
)
class MetricsCollector:
"""指标收集器"""
@staticmethod
def record_request(method: str, endpoint: str, status: int, duration: float):
"""记录 HTTP 请求指标"""
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
REQUEST_DURATION.labels(method=method, endpoint=endpoint).observe(duration)
@staticmethod
def update_active_agents(count: int):
"""更新活跃智能体数量"""
ACTIVE_AGENTS.set(count)
@staticmethod
def update_agent_tasks(status: str, count: int):
"""更新智能体任务数量"""
AGENT_TASKS.labels(status=status).set(count)
Prometheus 监控指标定义
生产级性能优化方案
# platform/reworkd_platform/utils/async_utils.py
import asyncio
from functools import wraps
from typing import Any, Callable, TypeVar
T = TypeVar('T')
class AsyncRateLimiter:
"""异步限流器"""
def __init__(self, max_calls: int, time_window: float):
self.max_calls = max_calls
self.time_window = time_window
self.calls = []
async def acquire(self) -> bool:
"""获取许可"""
now = asyncio.get_event_loop().time()
# 清理过期的调用记录
self.calls = [call_time for call_time in self.calls if now - call_time < self.time_window]
if len(self.calls) >= self.max_calls:
return False
self.calls.append(now)
return True
def decorator(self, func: Callable[..., T]) -> Callable[..., T]:
"""装饰器工厂"""
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> T:
if not await self.acquire():
raise Exception("Rate limit exceeded")
return await func(*args, **kwargs)
return wrapper
# 使用示例
rate_limiter = AsyncRateLimiter(max_calls=100, time_window=60.0)
@rate_limiter.decorator
async def api_call(endpoint: str, params: dict) -> dict:
"""API 调用函数"""
async with aiohttp.ClientSession() as session:
async with session.get(endpoint, params=params) as response:
return await response.json()
高性能异步工具实现
多层次安全防护体系
# platform/reworkd_platform/security/middleware.py
import re
from fastapi import Request, HTTPException
from fastapi.security import APIKeyHeader
from typing import Optional
# API 密钥验证
api_key_header = APIKeyHeader(name="X-API-Key")
# 允许的域名列表
ALLOWED_DOMAINS = [
"localhost",
"127.0.0.1",
"agentgpt.reworkd.ai"
]
# SQL 注入模式
SQL_INJECTION_PATTERNS = [
r"(union|select|insert|delete|update|drop|create|alter|exec)",
r"(\*|;|--|\\/\\*|\\*\\/)",
r"(script|iframe|object|embed)",
r"(javascript:|vbscript:|data:)",
]
class SecurityMiddleware:
"""安全中间件"""
@staticmethod
async def validate_cors(request: Request):
"""验证 CORS 请求"""
origin = request.headers.get("origin")
if origin:
parsed = urlparse(origin)
if parsed.hostname not in ALLOWED_DOMAINS:
raise HTTPException(status_code=403, detail="CORS not allowed")
@staticmethod
async def validate_input(data: str) -> bool:
"""验证输入内容"""
for pattern in SQL_INJECTION_PATTERNS:
if re.search(pattern, data, re.IGNORECASE):
return False
return True
@staticmethod
async def sanitize_output(data: str) -> str:
"""清理输出内容"""
# 转义 HTML 特殊字符
data = data.replace("&", "&")
data = data.replace("<", "<")
data = ">", ">"
data = data.replace('"', """)
data = data.replace("'", "'")
return data
# 使用示例
@app.middleware("http")
async def security_middleware(request: Request, call_next):
# 验证 CORS
await SecurityMiddleware.validate_cors(request)
# 请求体验证
if request.method in ["POST", "PUT", "PATCH"]:
body = await request.body()
if not await SecurityMiddleware.validate_input(str(body)):
raise HTTPException(status_code=400, detail="Invalid input")
response = await call_next(request)
# 清理响应内容
if response.body:
response.body = await SecurityMiddleware.sanitize_output(str(response.body))
return response
全面的安全防护实现
支持大规模部署的架构设计
从单体到微服务的演进路径
CI/CD 集成的自动化测试
# tests/test_agent_manager.py
import pytest
from unittest.mock import AsyncMock, MagicMock
from reworkd_platform.agents.manager import AgentManager
from reworkd_platform.agents.agent import Agent, AgentConfig
class TestAgentManager:
"""智能体管理器测试"""
@pytest.fixture
def mock_llm_manager(self):
"""模拟 LLM 管理器"""
llm_manager = MagicMock()
llm_manager.get_response = AsyncMock(return_value="Mock response")
return llm_manager
@pytest.fixture
def agent_manager(self, mock_llm_manager):
"""创建智能体管理器"""
return AgentManager(llm_manager=mock_llm_manager)
@pytest.mark.asyncio
async def test_create_agent(self, agent_manager):
"""测试创建智能体"""
config = AgentConfig(
name="Test Agent",
objective="Test objective",
model="gpt-4"
)
agent = await agent_manager.create_agent(config)
assert agent.name == "Test Agent"
assert agent.state == AgentState.CREATED
assert agent.config == config
@pytest.mark.asyncio
async def test_start_agent(self, agent_manager):
"""测试启动智能体"""
config = AgentConfig(
name="Test Agent",
objective="Test objective",
model="gpt-4"
)
agent = await agent_manager.create_agent(config)
await agent_manager.start_agent(agent.id)
assert agent.state == AgentState.RUNNING
@pytest.mark.asyncio
async def test_agent_error_handling(self, agent_manager):
"""测试智能体错误处理"""
with pytest.raises(AgentNotFoundError):
await agent_manager.start_agent("nonexistent-id")
完整的测试覆盖方案
DevOps 自动化流水线
.github/workflows/deploy.yml
name: Deploy to Production
on:
push:
branches: ["main"]
pull_request:
branches: ["main"]
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis:
image: redis:7
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: '20'
- name: Install dependencies
run: |
cd next && npm ci
cd ../platform && poetry install
- name: Run tests
run: |
cd next && npm test
cd ../platform && poetry run pytest
env:
DATABASE_URL: postgresql://postgres:postgres@localhost:5432/test
REDIS_URL: redis://localhost:6379
build:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Build Docker images
run: |
docker build -t agentgpt-frontend:latest ./next
docker build -t agentgpt-backend:latest ./platform
- name: Push to registry
run: |
echo "${{ secrets.DOCKER_PASSWORD }}" | docker login -u ${{ secrets.DOCKER_USERNAME }} --password-stdin
docker push agentgpt-frontend:latest
docker push agentgpt-backend:latest
deploy:
needs: build
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Deploy to production
uses: appleboy/[email protected]
with:
host: ${{ secrets.PRODUCTION_HOST }}
username: ${{ secrets.PRODUCTION_USER }}
key: ${{ secrets.PRODUCTION_KEY }}
script: |
docker-compose -f /opt/agentgpt/docker-compose.yml pull
docker-compose -f /opt/agentgpt/docker-compose.yml up -d
docker system prune -f
完整的 CI/CD 流水线配置
企业级项目开发标准
AI 智能体的发展方向
AgentGPT 架构设计的核心优势
感谢阅读!
访问 https://atcfu.com/ai-articles/agentgpt/ 回顾本文