🔍 LangSmith 源码深度解析

LLM应用调试与评测平台架构剖析

源码级别解析 · 源码解析 · SDK架构 · 工程实践
2026-04-21 | 每日技术深度解读

LangSmith 平台概述

框架无关的LLM应用开发平台
  • ✅ 调试与追踪
  • ✅ 评测与测试
  • ✅ 部署与监控
  • ✅ 协作与版本管理

支持任何LLM应用,与LangChain深度集成

核心架构设计

分层架构 + 微服务设计
  • 🏗️ SDK层:Python & JavaScript
  • 🔌 API层:RESTful接口
  • 🗄️ 存储层:Trace与Dataset
  • 👥 用户层:Web界面

支持多云部署,满足企业合规要求

技术栈分析

现代化技术栈
  • 前端:React + TypeScript
  • 后端:Python FastAPI
  • 数据库:PostgreSQL + Redis
  • 消息队列:Celery + RabbitMQ

高并发架构,支持大规模应用场景

Python SDK 安装配置

pip install -U langsmith

export LANGSMITH_TRACING=true
export LANGSMITH_API_KEY=ls_...
export LANGSMITH_WORKSPACE_ID=<workspace-id>

环境变量配置,支持组织级密钥管理

JavaScript SDK 配置

npm install langsmith

process.env.LANGSMITH_TRACING = "true"
process.env.LANGSMITH_ENDPOINT = "https://api.smith.langchain.com"
process.env.LANGSMITH_API_KEY = "<YOUR-API-KEY>"

TypeScript支持,ES模块规范

追踪机制核心原理

装饰器 + RunTree架构
  • 🎯 @traceable装饰器
  • 🌳 RunTree父子关系
  • 🔄 自动嵌套追踪
  • 📊 Token使用统计

函数级别精确追踪,支持任意嵌套调用

基础追踪示例

from langsmith import traceable
from langsmith.wrappers import wrap_openai

client = wrap_openai(openai.Client())

@traceable
def chat_completion(messages, model="gpt-3.5-turbo"):
    return client.chat.completions.create(
        messages=messages,
        model=model
    )

自动记录输入输出,支持token统计

RunTree 数据结构

运行时追踪树
  • name:组件名称
  • run_type:类型标识
  • inputs:输入数据
  • outputs:输出结果
  • error:错误信息

完整的调用链路追踪

RunTree 手动追踪

from langsmith.run_trees import RunTree

parent_run = RunTree(
    name="My Chat Bot",
    run_type="chain",
    inputs={"text": "Hello world"}
)
parent_run.post()

child_run = parent_run.create_child(
    name="OpenAI LLM",
    run_type="llm",
    inputs={"messages": [...]}
)
child_run.post()

支持手动创建和管理的追踪节点

OpenAI 集成机制

Wrapper模式实现
  • 🔄 自动拦截API调用
  • 📝 记录请求参数
  • 📊 统计Token使用
  • 🔍 追踪响应数据

无侵入式集成,保持原有API兼容性

OpenAI包装器实现

class LangSmithOpenAI:
    def __init__(self, client):
        self.client = client
    
    def chat.completions.create(self, **kwargs):
        # 创建LLM运行节点
        llm_run = RunTree(
            name="openai.chat",
            run_type="llm",
            inputs=kwargs
        )
        llm_run.post()
        
        try:
            result = self.client.chat.completions.create(**kwargs)
            llm_run.end(outputs=dict(result))
        except Exception as e:
            llm_run.end(error=str(e))
            raise
        
        llm_run.patch()
        return result

核心追踪逻辑实现

LangChain 深度集成

原生支持无缝集成
  • 🔗 自动识别组件类型
  • 🌳 链式调用追踪
  • 📈 性能指标收集
  • 🔧 自定义扩展点

零配置自动启用追踪功能

LangChain 链式追踪

from langchain_core.runnables import chain
from langsmith import traceable

@traceable
def process_data(data: dict) -> dict:
    return {"processed": data["input"]}

@chain
def data_chain(input_data: dict) -> dict:
    return process_data(input_data)

装饰器自动识别chain类型

JavaScript SDK 架构

模块化设计
  • 📦 核心客户端
  • 🔌 包装器系统
  • 🎯 追踪装饰器
  • 📊 评测接口

基于TypeScript,支持现代JavaScript生态

JavaScript RunTree 实现

interface RunTreeConfig {
  name: string;
  run_type: "llm" | "chain" | "tool";
  inputs: Record<string, any>;
  outputs?: Record<string, any>;
  error?: string;
  serialized?: Record<string, any>;
}

class RunTree {
  private config: RunTreeConfig;
  
  constructor(config: RunTreeConfig) {
    this.config = config;
  }
  
  async postRun(): Promise<void> {
    // 向API服务器提交运行数据
  }
}

核心数据结构定义

TypeScript 类型系统

严格的类型定义
  • 🎯 泛型约束
  • 🔧 接口定义
  • 📝 枚举类型
  • ✅ 可选链处理

完整的类型安全支持

类型定义示例

interface Run {
  id: string;
  name: string;
  run_type: RunType;
  inputs: Record<string, any>;
  outputs?: Record<string, any>;
  error?: string;
  traces?: Run[];
  events?: Event[];
}

type RunType = "llm" | "chain" | "tool" | "retriever";

运行时数据结构

OpenAI JavaScript 集成

API包装器实现
  • 🔄 自动拦截
  • 📝 参数记录
  • 📊 Token统计
  • 🔍 响应追踪

兼容原生OpenAI SDK

OpenAI 包装器实现

export function wrapOpenAI(openai: OpenAI): OpenAI {
  const originalChat = openai.chat.completions.create;
  
  return {
    ...openai,
    chat: {
      ...openai.chat,
      completions: {
        ...openai.chat.completions,
        create: async (params, options) => {
          // 创建LLM运行节点
          const llmRun = new RunTree({
            name: "openai.chat.completions",
            run_type: "llm",
            inputs: params
          });
          
          try {
            const result = await originalChat.call(params, options);
            llmRun.end({ outputs: result });
            return result;
          } catch (error) {
            llmRun.end({ error: error.message });
            throw error;
          } finally {
            await llmRun.patchRun();
          }
        }
      }
    }
  };
}

核心包装逻辑实现

Next.js Edge Runtime 集成

Serverless环境支持
  • ⚡ Edge Runtime适配
  • 🔄 自动追踪
  • 📊 性能监控
  • 🔧 错误处理

支持Vercel Edge Functions

Next.js Edge Runtime 示例

import { traceable } from "langsmith/traceable";

const handler = traceable(
  async function (req: NextRequest) {
    const openai = wrapOpenAI(new OpenAI());
    
    const completion = await openai.chat.completions.create({
      model: "gpt-3.5-turbo",
      messages: [{ content: req.body.text, role: "user" }]
    });
    
    return NextResponse.json({
      text: completion.choices[0].message.content
    });
  },
  {
    name: "Next.js Edge Handler"
  }
);

Serverless环境下的追踪实现

评测系统架构

多维度评测机制
  • 🎯 自定义评测函数
  • 📊 量化评分指标
  • 🔄 批量评测流程
  • 📈 结果统计分析

支持多种评测维度和算法

自定义评测实现

from langsmith.evaluation import StringEvaluator

def jaccard_similarity(output: str, answer: str) -> float:
    """计算两个字符串的Jaccard相似度"""
    pred_chars = set(output.lower())
    ans_chars = set(answer.lower())
    intersection = len(pred_chars.intersection(ans_chars))
    union = len(pred_chars.union(ans_chars))
    return intersection / union if union > 0 else 0

class JaccardEvaluator(StringEvaluator):
    def __init__(self):
        super().__init__(evaluation_name="Jaccard")
    
    def grade_result(self, run_input, run_output, answer):
        score = jaccard_similarity(run_output, answer)
        value = "CORRECT" if score > 0.9 else "INCORRECT"
        return {"score": score, "value": value}

自定义评测算法实现

数据集管理系统

版本化数据管理
  • 📁 数据集结构
  • 🔄 版本控制
  • 👥 协作编辑
  • 📊 数据导入导出

支持从运行记录自动生成数据集

数据集创建示例

from langsmith import Client

client = Client()

# 从运行记录创建数据集
dataset = client.create_dataset(
    "Customer_Support_QA",
    description="客户支持问答数据集"
)

# 批量添加示例
for run in client.list_runs(
    project_name="support_bot",
    execution_order=1,
    error=False
):
    client.create_example(
        inputs=run.inputs,
        outputs=run.outputs,
        dataset_id=dataset.id
    )

自动化数据集生成流程

部署系统架构

企业级部署方案
  • ☁️ 云服务部署
  • 🏠 自托管部署
  • 🔐 安全认证
  • 📊 监控告警

支持多种部署模式

客户端配置

from langsmith import Client

client = Client({
    # API端点配置
    api_url="https://api.smith.langchain.com",
    
    # 认证配置
    api_key="ls_your_api_key",
    
    # 项目配置
    project="my_project",
    
    # 重试配置
    max_retries=6,
    max_concurrency=10,
    
    # 超时配置
    timeout=30.0
})

客户端连接配置

性能优化策略

高并发处理
  • 🏃 异步处理
  • 🔄 连接池管理
  • ⚡ 批量提交
  • 📊 内存管理

大规模应用场景优化

异步批量提交

import asyncio
from concurrent.futures import ThreadPoolExecutor

class AsyncClient:
    def __init__(self, max_workers=10):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.batch_size = 50
        self.buffer = []
    
    async def submit_run(self, run):
        """异步提交运行记录"""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor,
            self._submit_run_sync,
            run
        )
    
    def _submit_run_sync(self, run):
        """同步提交逻辑"""
        # 批量处理逻辑
        pass

异步处理架构实现

错误处理机制

优雅降级策略
  • 🔄 自动重试
  • 📝 错误日志
  • 🚨 告警通知
  • 🔧 降级服务

确保系统稳定性

错误重试机制

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
def submit_run_with_retry(run):
    """带重试机制的运行提交"""
    try:
        response = requests.post(
            f"{API_URL}/runs",
            json=run.to_dict(),
            timeout=30
        )
        response.raise_for_status()
        return response.json()
    except requests.RequestException as e:
        logger.error(f"运行提交失败: {e}")
        raise

指数退避重试策略

缓存策略

多层缓存架构
  • 🗃️ 内存缓存
  • 💾 磁盘缓存
  • 🔗 分布式缓存
  • ⏰ 过期策略

提高系统响应速度

缓存实现

import time
from functools import lru_cache

class RunCache:
    def __init__(self, ttl=300):
        self.ttl = ttl
        self.cache = {}
    
    def get(self, key):
        entry = self.cache.get(key)
        if entry and time.time() - entry['timestamp'] < self.ttl:
            return entry['value']
        return None
    
    def set(self, key, value):
        self.cache[key] = {
            'value': value,
            'timestamp': time.time()
        }
    
    @lru_cache(maxsize=1000)
    def get_schema(self, dataset_id):
        """缓存数据集模式"""
        return self._fetch_dataset_schema(dataset_id)

多级缓存实现

WebSocket 实时通信

实时数据推送
  • 🔄 实时状态更新
  • 📊 指标监控
  • 🔔 事件通知
  • 👥 协作功能

提升用户体验

WebSocket 服务器

import asyncio
import websockets

class WebSocketServer:
    def __init__(self):
        self.clients = set()
    
    async def register(self, websocket):
        """注册客户端连接"""
        self.clients.add(websocket)
        
    async def unregister(self, websocket):
        """取消注册客户端"""
        self.clients.discard(websocket)
    
    async def broadcast(self, message):
        """广播消息到所有客户端"""
        if self.clients:
            await asyncio.gather(
                *[client.send(message) for client in self.clients],
                return_exceptions=True
            )
    
    async def handle_run_update(self, run):
        """处理运行更新事件"""
        message = json.dumps({
            "type": "run_update",
            "data": run.to_dict()
        })
        await self.broadcast(message)

实时通信实现

安全架构设计

企业级安全策略
  • 🔐 API密钥管理
  • 👥 角色权限控制
  • 🔒 数据加密
  • 📋 审计日志

满足企业合规要求

API 认证机制

多认证方式支持
  • 🔑 API密钥认证
  • 👤 OAuth2.0
  • 🏢 企业SSO
  • 🔗 JWT令牌

灵活的认证策略

API认证中间件

from fastapi import Depends, HTTPException, status
from fastapi.security import APIKeyHeader
from typing import Annotated

security = APIKeyHeader(name="X-API-Key")

def get_current_user(api_key: Annotated[str, Depends(security)]):
    """API密钥认证"""
    user = authenticate_api_key(api_key)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="无效的API密钥"
        )
    return user

def authenticate_api_key(api_key: str):
    """验证API密钥"""
    # 查询数据库验证密钥
    return user_service.get_by_api_key(api_key)

认证中间件实现

数据加密策略

端到端加密
  • 🔐 传输加密
  • 🗃️ 存储加密
  • 👥 敏感数据脱敏
  • 🔗 密钥轮换

保护数据安全

数据加密实现

from cryptography.fernet import Fernet
import base64

class DataEncryption:
    def __init__(self, key):
        self.cipher = Fernet(key)
    
    def encrypt(self, data: str) -> bytes:
        """加密数据"""
        return self.cipher.encrypt(data.encode())
    
    def decrypt(self, encrypted_data: bytes) -> str:
        """解密数据"""
        return self.cipher.decrypt(encrypted_data).decode()
    
    @staticmethod
    def generate_key() -> bytes:
        """生成加密密钥"""
        return Fernet.generate_key()
    
    def encrypt_sensitive_data(self, data: dict) -> dict:
        """加密敏感字段"""
        encrypted = data.copy()
        for field in ["api_key", "password", "token"]:
            if field in data:
                encrypted[field] = self.encrypt(str(data[field])).decode()
        return encrypted

AES加密实现

监控与告警系统

实时监控机制
  • 📊 性能指标
  • 🚨 异常检测
  • 📈 趋势分析
  • 🔔 告警规则

主动问题发现

监控指标收集

import prometheus_client
from prometheus_client import Counter, Histogram, Gauge

# 定义监控指标
REQUEST_COUNT = Counter(
    'langsmith_requests_total',
    '总请求数',
    ['method', 'endpoint', 'status']
)

REQUEST_DURATION = Histogram(
    'langsmith_request_duration_seconds',
    '请求处理时间',
    ['method', 'endpoint']
)

ACTIVE_RUNS = Gauge(
    'langsmith_active_runs',
    '活跃运行数'
)

class MonitoringMiddleware:
    def __init__(self, app):
        self.app = app
        prometheus_client.start_http_server(8000)
    
    async def __call__(self, scope, receive, send):
        start_time = time.time()
        
        try:
            await self.app(scope, receive, send)
            status = "success"
        except Exception:
            status = "error"
            raise
        finally:
            duration = time.time() - start_time
            REQUEST_COUNT.labels(
                method=scope["method"],
                endpoint=scope["path"],
                status=status
            ).inc()
            REQUEST_DURATION.labels(
                method=scope["method"],
                endpoint=scope["path"]
            ).observe(duration)

Prometheus监控集成

日志系统架构

结构化日志
  • 📝 日志级别
  • 🔧 日志格式
  • 📊 日志聚合
  • 🔍 日志查询

便于调试和审计

日志配置

import logging
import json
from datetime import datetime

class StructuredLogger:
    def __init__(self, name):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        # 结构化格式
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        
        # 文件处理器
        file_handler = logging.FileHandler('langsmith.log')
        file_handler.setFormatter(formatter)
        
        # 控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)
        
        self.logger.addHandler(file_handler)
        self.logger.addHandler(console_handler)
    
    def log_run(self, run, level="info"):
        """记录运行数据"""
        log_data = {
            "timestamp": datetime.now().isoformat(),
            "type": "run",
            "run_id": run.id,
            "name": run.name,
            "run_type": run.run_type,
            "inputs": run.inputs,
            "outputs": getattr(run, 'outputs', None),
            "error": getattr(run, 'error', None)
        }
        
        getattr(self.logger, level)(json.dumps(log_data, ensure_ascii=False))

结构化日志实现

数据库优化策略

高性能数据存储
  • 🗃️ 索引优化
  • 📊 查询优化
  • ⚡ 连接池
  • 🔄 读写分离

大数据量场景优化

数据库索引策略

-- 运行记录表索引优化
CREATE INDEX idx_runs_project_name ON runs(project_name);
CREATE INDEX idx_runs_created_at ON runs(created_at);
CREATE INDEX idx_runs_run_type ON runs(run_type);
CREATE INDEX idx_runs_execution_order ON runs(execution_order);
CREATE INDEX idx_runs_parent_run_id ON runs(parent_run_id);

-- 复合索引用于常用查询
CREATE INDEX idx_runs_project_created_type ON 
    runs(project_name, created_at, run_type);

-- 数据集表索引
CREATE INDEX idx_datasets_name ON datasets(name);
CREATE INDEX idx_datasets_project_id ON datasets(project_id);

-- 示例表索引
CREATE INDEX idx_examples_dataset_id ON examples(dataset_id);
CREATE INDEX idx_examples_created_at ON examples(created_at);

数据库索引优化

缓存策略优化

多级缓存架构
  • 🎯 缓存命中率
  • ⏰ 过期策略
  • 📦 缓存大小
  • 🔄 预热机制

提高系统性能

Redis 缓存配置

import redis
from redis import Redis
from datetime import timedelta

class RedisCache:
    def __init__(self, host='localhost', port=6379, db=0):
        self.redis = Redis(host=host, port=port, db=db)
        self.default_timeout = timedelta(hours=1)
    
    def get_run(self, run_id: str):
        """获取运行记录"""
        key = f"run:{run_id}"
        data = self.redis.get(key)
        if data:
            return json.loads(data)
        return None
    
    def set_run(self, run_id: str, run_data: dict):
        """设置运行记录缓存"""
        key = f"run:{run_id}"
        self.redis.setex(
            key,
            self.default_timeout,
            json.dumps(run_data)
        )
    
    def get_dataset(self, dataset_id: str):
        """获取数据集缓存"""
        key = f"dataset:{dataset_id}"
        data = self.redis.get(key)
        if not data:
            # 缓存未命中,从数据库加载
            data = self._load_dataset_from_db(dataset_id)
            if data:
                self.redis.setex(
                    key,
                    self.default_timeout,
                    json.dumps(data)
                )
        return json.loads(data) if data else None

Redis缓存实现

微服务架构设计

服务解耦
  • 🏗️ 服务发现
  • 📊 负载均衡
  • 🔄 服务网关
  • 🔐 熔断降级

高可用架构

服务配置管理

# 服务配置文件
services:
  api:
    image: langchain/langsmith-api
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:pass@db:5432/langsmith
      - REDIS_URL=redis://redis:6379
      - API_KEY_SECRET=secret
    depends_on:
      - db
      - redis
    
  worker:
    image: langchain/langsmith-worker
    environment:
      - DATABASE_URL=postgresql://user:pass@db:5432/langsmith
      - REDIS_URL=redis://redis:6379
    depends_on:
      - db
      - redis
    
  db:
    image: postgres:14
    environment:
      - POSTGRES_DB=langsmith
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
    volumes:
      - postgres_data:/var/lib/postgresql/data
    
  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data

volumes:
  postgres_data:
  redis_data:

Docker Compose服务配置

Kubernetes 部署配置

容器编排
  • ⚡ 自动扩展
  • 🔄 滚动更新
  • 🔍 健康检查
  • 📊 资源限制

生产级部署

Kubernetes 部署文件

---
# API 服务部署
apiVersion: apps/v1
kind: Deployment
metadata:
  name: langsmith-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: langsmith-api
  template:
    metadata:
      labels:
        app: langsmith-api
    spec:
      containers:
      - name: api
        image: langchain/langsmith-api:latest
        ports:
        - containerPort: 8000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: langsmith-secret
              key: database-url
        resources:
          requests:
            memory: "1Gi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10

---
# 服务暴露
apiVersion: v1
kind: Service
metadata:
  name: langsmith-api-service
spec:
  selector:
    app: langsmith-api
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer

K8s部署配置

性能基准测试

性能指标验证
  • ⏱️ 响应时间
  • 📊 吞吐量
  • 💾 内存使用
  • 🔥 并发性能

性能优化依据

性能测试脚本

import time
import asyncio
import statistics
from concurrent.futures import ThreadPoolExecutor

class PerformanceTester:
    def __init__(self, url, max_concurrent=100):
        self.url = url
        self.max_concurrent = max_concurrent
    
    async def run_concurrent_requests(self, num_requests=1000):
        """并发性能测试"""
        start_time = time.time()
        
        async def make_request(request_id):
            start = time.time()
            try:
                response = await self._make_api_request()
                end = time.time()
                return {
                    'request_id': request_id,
                    'success': True,
                    'duration': end - start,
                    'status_code': response.status_code
                }
            except Exception as e:
                end = time.time()
                return {
                    'request_id': request_id,
                    'success': False,
                    'duration': end - start,
                    'error': str(e)
                }
        
        # 创建并发任务
        tasks = [make_request(i) for i in range(num_requests)]
        results = await asyncio.gather(*tasks)
        
        total_time = time.time() - start_time
        self._analyze_results(results, total_time)
    
    def _analyze_results(self, results, total_time):
        """分析测试结果"""
        successful = [r for r in results if r['success']
        failed = [r for r in results if not r['success']]
        
        if successful:
            durations = [r['duration'] for r in successful]
            avg_duration = statistics.mean(durations)
            max_duration = max(durations)
            min_duration = min(durations)
            
            print(f"✅ 成功请求数: {len(successful)}")
            print(f"❌ 失败请求数: {len(failed)}")
            print(f"⏱️ 总耗时: {total_time:.2f}s")
            print(f"📊 吞吐量: {len(successful)/total_time:.2f} req/s")
            print(f"📈 平均响应时间: {avg_duration*1000:.2f}ms")
            print(f"📉 最快响应时间: {min_duration*1000:.2f}ms")
            print(f"📉 最慢响应时间: {max_duration*1000:.2f}ms")

性能测试实现

最佳实践总结

开发建议
  • 🎯 合理设置项目
  • 📊 定期评测
  • 🔄 版本控制
  • 📝 文档维护

提升开发效率

常见问题解决

问题排查
  • 🔍 连接问题
  • ⏱️ 性能问题
  • 📊 数据不一致
  • 🔐 权限问题

快速故障定位

扩展功能开发

二次开发指南
  • 🔌 插件机制
  • 🎨 自定义UI
  • 📊 集成第三方
  • 🚀 功能扩展

满足定制需求

未来发展规划

路线图
  • 🤖 AI评测
  • 📊 增强分析
  • 🌐 多语言支持
  • 🔗 生态系统

持续迭代优化

总结与展望

核心价值
  • 🔍 调试效率提升
  • 📊 质量保障
  • 🚀 开发体验优化
  • 📈 业务价值

LangSmith作为LLM应用开发的基础设施

参考资料

  • LangSmith官方文档: https://docs.smith.langchain.com/
  • GitHub仓库: https://github.com/langchain-ai/langsmith-sdk
  • API参考: https://api.smith.langchain.com/docs

感谢阅读!
访问 https://atcfu.com/ai-articles/langsmith/ 回顾本文