源码级别解析 · 源码解析 · SDK架构 · 工程实践
2026-04-21 | 每日技术深度解读
支持任何LLM应用,与LangChain深度集成
支持多云部署,满足企业合规要求
高并发架构,支持大规模应用场景
pip install -U langsmith
export LANGSMITH_TRACING=true
export LANGSMITH_API_KEY=ls_...
export LANGSMITH_WORKSPACE_ID=<workspace-id>
环境变量配置,支持组织级密钥管理
npm install langsmith
process.env.LANGSMITH_TRACING = "true"
process.env.LANGSMITH_ENDPOINT = "https://api.smith.langchain.com"
process.env.LANGSMITH_API_KEY = "<YOUR-API-KEY>"
TypeScript支持,ES模块规范
函数级别精确追踪,支持任意嵌套调用
from langsmith import traceable
from langsmith.wrappers import wrap_openai
client = wrap_openai(openai.Client())
@traceable
def chat_completion(messages, model="gpt-3.5-turbo"):
return client.chat.completions.create(
messages=messages,
model=model
)
自动记录输入输出,支持token统计
完整的调用链路追踪
from langsmith.run_trees import RunTree
parent_run = RunTree(
name="My Chat Bot",
run_type="chain",
inputs={"text": "Hello world"}
)
parent_run.post()
child_run = parent_run.create_child(
name="OpenAI LLM",
run_type="llm",
inputs={"messages": [...]}
)
child_run.post()
支持手动创建和管理的追踪节点
无侵入式集成,保持原有API兼容性
class LangSmithOpenAI:
def __init__(self, client):
self.client = client
def chat.completions.create(self, **kwargs):
# 创建LLM运行节点
llm_run = RunTree(
name="openai.chat",
run_type="llm",
inputs=kwargs
)
llm_run.post()
try:
result = self.client.chat.completions.create(**kwargs)
llm_run.end(outputs=dict(result))
except Exception as e:
llm_run.end(error=str(e))
raise
llm_run.patch()
return result
核心追踪逻辑实现
零配置自动启用追踪功能
from langchain_core.runnables import chain
from langsmith import traceable
@traceable
def process_data(data: dict) -> dict:
return {"processed": data["input"]}
@chain
def data_chain(input_data: dict) -> dict:
return process_data(input_data)
装饰器自动识别chain类型
基于TypeScript,支持现代JavaScript生态
interface RunTreeConfig {
name: string;
run_type: "llm" | "chain" | "tool";
inputs: Record<string, any>;
outputs?: Record<string, any>;
error?: string;
serialized?: Record<string, any>;
}
class RunTree {
private config: RunTreeConfig;
constructor(config: RunTreeConfig) {
this.config = config;
}
async postRun(): Promise<void> {
// 向API服务器提交运行数据
}
}
核心数据结构定义
完整的类型安全支持
interface Run {
id: string;
name: string;
run_type: RunType;
inputs: Record<string, any>;
outputs?: Record<string, any>;
error?: string;
traces?: Run[];
events?: Event[];
}
type RunType = "llm" | "chain" | "tool" | "retriever";
运行时数据结构
兼容原生OpenAI SDK
export function wrapOpenAI(openai: OpenAI): OpenAI {
const originalChat = openai.chat.completions.create;
return {
...openai,
chat: {
...openai.chat,
completions: {
...openai.chat.completions,
create: async (params, options) => {
// 创建LLM运行节点
const llmRun = new RunTree({
name: "openai.chat.completions",
run_type: "llm",
inputs: params
});
try {
const result = await originalChat.call(params, options);
llmRun.end({ outputs: result });
return result;
} catch (error) {
llmRun.end({ error: error.message });
throw error;
} finally {
await llmRun.patchRun();
}
}
}
}
};
}
核心包装逻辑实现
支持Vercel Edge Functions
import { traceable } from "langsmith/traceable";
const handler = traceable(
async function (req: NextRequest) {
const openai = wrapOpenAI(new OpenAI());
const completion = await openai.chat.completions.create({
model: "gpt-3.5-turbo",
messages: [{ content: req.body.text, role: "user" }]
});
return NextResponse.json({
text: completion.choices[0].message.content
});
},
{
name: "Next.js Edge Handler"
}
);
Serverless环境下的追踪实现
支持多种评测维度和算法
from langsmith.evaluation import StringEvaluator
def jaccard_similarity(output: str, answer: str) -> float:
"""计算两个字符串的Jaccard相似度"""
pred_chars = set(output.lower())
ans_chars = set(answer.lower())
intersection = len(pred_chars.intersection(ans_chars))
union = len(pred_chars.union(ans_chars))
return intersection / union if union > 0 else 0
class JaccardEvaluator(StringEvaluator):
def __init__(self):
super().__init__(evaluation_name="Jaccard")
def grade_result(self, run_input, run_output, answer):
score = jaccard_similarity(run_output, answer)
value = "CORRECT" if score > 0.9 else "INCORRECT"
return {"score": score, "value": value}
自定义评测算法实现
支持从运行记录自动生成数据集
from langsmith import Client
client = Client()
# 从运行记录创建数据集
dataset = client.create_dataset(
"Customer_Support_QA",
description="客户支持问答数据集"
)
# 批量添加示例
for run in client.list_runs(
project_name="support_bot",
execution_order=1,
error=False
):
client.create_example(
inputs=run.inputs,
outputs=run.outputs,
dataset_id=dataset.id
)
自动化数据集生成流程
支持多种部署模式
from langsmith import Client
client = Client({
# API端点配置
api_url="https://api.smith.langchain.com",
# 认证配置
api_key="ls_your_api_key",
# 项目配置
project="my_project",
# 重试配置
max_retries=6,
max_concurrency=10,
# 超时配置
timeout=30.0
})
客户端连接配置
大规模应用场景优化
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncClient:
def __init__(self, max_workers=10):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.batch_size = 50
self.buffer = []
async def submit_run(self, run):
"""异步提交运行记录"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
self._submit_run_sync,
run
)
def _submit_run_sync(self, run):
"""同步提交逻辑"""
# 批量处理逻辑
pass
异步处理架构实现
确保系统稳定性
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def submit_run_with_retry(run):
"""带重试机制的运行提交"""
try:
response = requests.post(
f"{API_URL}/runs",
json=run.to_dict(),
timeout=30
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logger.error(f"运行提交失败: {e}")
raise
指数退避重试策略
提高系统响应速度
import time
from functools import lru_cache
class RunCache:
def __init__(self, ttl=300):
self.ttl = ttl
self.cache = {}
def get(self, key):
entry = self.cache.get(key)
if entry and time.time() - entry['timestamp'] < self.ttl:
return entry['value']
return None
def set(self, key, value):
self.cache[key] = {
'value': value,
'timestamp': time.time()
}
@lru_cache(maxsize=1000)
def get_schema(self, dataset_id):
"""缓存数据集模式"""
return self._fetch_dataset_schema(dataset_id)
多级缓存实现
提升用户体验
import asyncio
import websockets
class WebSocketServer:
def __init__(self):
self.clients = set()
async def register(self, websocket):
"""注册客户端连接"""
self.clients.add(websocket)
async def unregister(self, websocket):
"""取消注册客户端"""
self.clients.discard(websocket)
async def broadcast(self, message):
"""广播消息到所有客户端"""
if self.clients:
await asyncio.gather(
*[client.send(message) for client in self.clients],
return_exceptions=True
)
async def handle_run_update(self, run):
"""处理运行更新事件"""
message = json.dumps({
"type": "run_update",
"data": run.to_dict()
})
await self.broadcast(message)
实时通信实现
满足企业合规要求
灵活的认证策略
from fastapi import Depends, HTTPException, status
from fastapi.security import APIKeyHeader
from typing import Annotated
security = APIKeyHeader(name="X-API-Key")
def get_current_user(api_key: Annotated[str, Depends(security)]):
"""API密钥认证"""
user = authenticate_api_key(api_key)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的API密钥"
)
return user
def authenticate_api_key(api_key: str):
"""验证API密钥"""
# 查询数据库验证密钥
return user_service.get_by_api_key(api_key)
认证中间件实现
保护数据安全
from cryptography.fernet import Fernet
import base64
class DataEncryption:
def __init__(self, key):
self.cipher = Fernet(key)
def encrypt(self, data: str) -> bytes:
"""加密数据"""
return self.cipher.encrypt(data.encode())
def decrypt(self, encrypted_data: bytes) -> str:
"""解密数据"""
return self.cipher.decrypt(encrypted_data).decode()
@staticmethod
def generate_key() -> bytes:
"""生成加密密钥"""
return Fernet.generate_key()
def encrypt_sensitive_data(self, data: dict) -> dict:
"""加密敏感字段"""
encrypted = data.copy()
for field in ["api_key", "password", "token"]:
if field in data:
encrypted[field] = self.encrypt(str(data[field])).decode()
return encrypted
AES加密实现
主动问题发现
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge
# 定义监控指标
REQUEST_COUNT = Counter(
'langsmith_requests_total',
'总请求数',
['method', 'endpoint', 'status']
)
REQUEST_DURATION = Histogram(
'langsmith_request_duration_seconds',
'请求处理时间',
['method', 'endpoint']
)
ACTIVE_RUNS = Gauge(
'langsmith_active_runs',
'活跃运行数'
)
class MonitoringMiddleware:
def __init__(self, app):
self.app = app
prometheus_client.start_http_server(8000)
async def __call__(self, scope, receive, send):
start_time = time.time()
try:
await self.app(scope, receive, send)
status = "success"
except Exception:
status = "error"
raise
finally:
duration = time.time() - start_time
REQUEST_COUNT.labels(
method=scope["method"],
endpoint=scope["path"],
status=status
).inc()
REQUEST_DURATION.labels(
method=scope["method"],
endpoint=scope["path"]
).observe(duration)
Prometheus监控集成
便于调试和审计
import logging
import json
from datetime import datetime
class StructuredLogger:
def __init__(self, name):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# 结构化格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 文件处理器
file_handler = logging.FileHandler('langsmith.log')
file_handler.setFormatter(formatter)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def log_run(self, run, level="info"):
"""记录运行数据"""
log_data = {
"timestamp": datetime.now().isoformat(),
"type": "run",
"run_id": run.id,
"name": run.name,
"run_type": run.run_type,
"inputs": run.inputs,
"outputs": getattr(run, 'outputs', None),
"error": getattr(run, 'error', None)
}
getattr(self.logger, level)(json.dumps(log_data, ensure_ascii=False))
结构化日志实现
大数据量场景优化
-- 运行记录表索引优化
CREATE INDEX idx_runs_project_name ON runs(project_name);
CREATE INDEX idx_runs_created_at ON runs(created_at);
CREATE INDEX idx_runs_run_type ON runs(run_type);
CREATE INDEX idx_runs_execution_order ON runs(execution_order);
CREATE INDEX idx_runs_parent_run_id ON runs(parent_run_id);
-- 复合索引用于常用查询
CREATE INDEX idx_runs_project_created_type ON
runs(project_name, created_at, run_type);
-- 数据集表索引
CREATE INDEX idx_datasets_name ON datasets(name);
CREATE INDEX idx_datasets_project_id ON datasets(project_id);
-- 示例表索引
CREATE INDEX idx_examples_dataset_id ON examples(dataset_id);
CREATE INDEX idx_examples_created_at ON examples(created_at);
数据库索引优化
提高系统性能
import redis
from redis import Redis
from datetime import timedelta
class RedisCache:
def __init__(self, host='localhost', port=6379, db=0):
self.redis = Redis(host=host, port=port, db=db)
self.default_timeout = timedelta(hours=1)
def get_run(self, run_id: str):
"""获取运行记录"""
key = f"run:{run_id}"
data = self.redis.get(key)
if data:
return json.loads(data)
return None
def set_run(self, run_id: str, run_data: dict):
"""设置运行记录缓存"""
key = f"run:{run_id}"
self.redis.setex(
key,
self.default_timeout,
json.dumps(run_data)
)
def get_dataset(self, dataset_id: str):
"""获取数据集缓存"""
key = f"dataset:{dataset_id}"
data = self.redis.get(key)
if not data:
# 缓存未命中,从数据库加载
data = self._load_dataset_from_db(dataset_id)
if data:
self.redis.setex(
key,
self.default_timeout,
json.dumps(data)
)
return json.loads(data) if data else None
Redis缓存实现
高可用架构
# 服务配置文件
services:
api:
image: langchain/langsmith-api
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/langsmith
- REDIS_URL=redis://redis:6379
- API_KEY_SECRET=secret
depends_on:
- db
- redis
worker:
image: langchain/langsmith-worker
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/langsmith
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
db:
image: postgres:14
environment:
- POSTGRES_DB=langsmith
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
volumes:
postgres_data:
redis_data:
Docker Compose服务配置
生产级部署
---
# API 服务部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: langsmith-api
spec:
replicas: 3
selector:
matchLabels:
app: langsmith-api
template:
metadata:
labels:
app: langsmith-api
spec:
containers:
- name: api
image: langchain/langsmith-api:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: langsmith-secret
key: database-url
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
---
# 服务暴露
apiVersion: v1
kind: Service
metadata:
name: langsmith-api-service
spec:
selector:
app: langsmith-api
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
K8s部署配置
性能优化依据
import time
import asyncio
import statistics
from concurrent.futures import ThreadPoolExecutor
class PerformanceTester:
def __init__(self, url, max_concurrent=100):
self.url = url
self.max_concurrent = max_concurrent
async def run_concurrent_requests(self, num_requests=1000):
"""并发性能测试"""
start_time = time.time()
async def make_request(request_id):
start = time.time()
try:
response = await self._make_api_request()
end = time.time()
return {
'request_id': request_id,
'success': True,
'duration': end - start,
'status_code': response.status_code
}
except Exception as e:
end = time.time()
return {
'request_id': request_id,
'success': False,
'duration': end - start,
'error': str(e)
}
# 创建并发任务
tasks = [make_request(i) for i in range(num_requests)]
results = await asyncio.gather(*tasks)
total_time = time.time() - start_time
self._analyze_results(results, total_time)
def _analyze_results(self, results, total_time):
"""分析测试结果"""
successful = [r for r in results if r['success']
failed = [r for r in results if not r['success']]
if successful:
durations = [r['duration'] for r in successful]
avg_duration = statistics.mean(durations)
max_duration = max(durations)
min_duration = min(durations)
print(f"✅ 成功请求数: {len(successful)}")
print(f"❌ 失败请求数: {len(failed)}")
print(f"⏱️ 总耗时: {total_time:.2f}s")
print(f"📊 吞吐量: {len(successful)/total_time:.2f} req/s")
print(f"📈 平均响应时间: {avg_duration*1000:.2f}ms")
print(f"📉 最快响应时间: {min_duration*1000:.2f}ms")
print(f"📉 最慢响应时间: {max_duration*1000:.2f}ms")
性能测试实现
提升开发效率
快速故障定位
满足定制需求
持续迭代优化
LangSmith作为LLM应用开发的基础设施
感谢阅读!
访问 https://atcfu.com/ai-articles/langsmith/ 回顾本文