🚀 LangServe 部署服务框架

LangChain 应用生产级部署源码深度解析

基于 Python SDK 源码分析
2026-03-23 | 技术深度解读

📑 目录

第一部分:基础架构

  • LangServe 简介
  • 核心架构概览
  • API 端点设计
  • 核心组件关系

第二部分:核心类解析

  • add_routes 函数
  • _EndpointConfiguration
  • APIHandler 类

第三部分:核心机制

  • Validation 模块
  • Serialization 模块
  • Config 处理机制
  • Feedback Token 机制

第四部分:实践指南

  • 设计模式
  • 性能优化
  • 最佳实践
  • 生产部署

🚀 LangServe 简介

LangServe 是 LangChain 官方提供的部署框架,让开发者能够将 LangChain Runnable 快速部署为生产级 REST API。

核心特性

  • 自动生成 API 端点
  • 支持同步/流式响应
  • 内置 Playground UI
  • OpenAPI 文档自动生成
  • LangSmith 追踪集成

API 端点类型

  • invoke - 单次调用
  • batch - 批量调用
  • stream - 流式输出
  • stream_log - 带日志流式
  • stream_events - 事件流

Schema 端点

  • input_schema
  • output_schema
  • config_schema

🏗️ 核心架构概览

┌─────────────────────────────────────────────────────┐
│                 LangServe 架构                       │
├─────────────────────────────────────────────────────┤
│  ┌─────────────────────────────────────────────┐    │
│  │              FastAPI Application            │    │
│  │  add_routes(app, runnable, path="/chain")   │    │
│  └─────────────────────────────────────────────┘    │
│                       ↓                             │
│  ┌─────────────────────────────────────────────┐    │
│  │              APIHandler                     │    │
│  │  • 请求验证与解析                           │    │
│  │  • Config 处理                              │    │
│  │  • 调用 Runnable                             │    │
│  │  • 响应序列化                                │    │
│  └─────────────────────────────────────────────┘    │
│           ↓              ↓              ↓           │
│  ┌──────────────┐ ┌──────────────┐ ┌────────────┐  │
│  │  Validation  │ │ Serialization│ │  Callbacks │  │
│  │  请求/响应    │ │  LangChain   │ │  事件聚合   │  │
│  │  模型生成    │ │  对象编解码   │ │  回调处理   │  │
│  └──────────────┘ └──────────────┘ └────────────┘  │
│                       ↓                             │
│  ┌─────────────────────────────────────────────┐    │
│  │           LangChain Runnable                │    │
│  │  • Chain / LLM / Agent / Retriever         │    │
│  └─────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────┘

🔌 API 端点设计

POST /{path}/invoke          # 单次调用
POST /{path}/batch           # 批量调用
POST /{path}/stream          # 流式输出 (SSE)
POST /{path}/stream_log      # 带中间步骤日志的流式
POST /{path}/stream_events   # 事件流 (Beta)

GET /{path}/input_schema     # 输入 JSON Schema
GET /{path}/output_schema    # 输出 JSON Schema
GET /{path}/config_schema    # 配置 JSON Schema

GET /{path}/playground/      # 交互式 Playground UI
POST /{path}/feedback        # LangSmith 反馈
POST /{path}/public_trace_link # 公开追踪链接

# Config Hash 变体 (用于 Playground 分享链接)
POST /{path}/c/{config_hash}/invoke
POST /{path}/c/{config_hash}/batch
...

🔗 核心组件关系

组件 文件 职责
add_routes server.py 注册 FastAPI 路由
APIHandler api_handler.py 处理 API 请求
Validation validation.py 动态 Pydantic 模型
Serialization serialization.py 对象序列化/反序列化
Callbacks callbacks.py 事件聚合
Playground playground.py 交互式 UI

💡 设计理念

1. 零配置部署

  • 自动推断输入/输出类型
  • 自动生成 OpenAPI 文档
  • 内置 Playground UI

2. 类型安全

  • Pydantic 模型验证
  • 动态模型生成
  • Schema 端点暴露

3. 流式优先

  • SSE (Server-Sent Events)
  • 实时输出反馈
  • 中间步骤可见

4. 可观测性

  • LangSmith 深度集成
  • Callback 事件聚合
  • Feedback Token 机制

📊 与其他框架对比

特性 LangServe FastAPI 原生 Flask + LangChain
自动路由生成 ✅ add_routes ❌ 手动 ❌ 手动
流式响应 ✅ 内置 SSE ⚠️ 需要实现 ⚠️ 需要实现
类型验证 ✅ 自动推断 ✅ Pydantic ⚠️ 手动
Playground UI ✅ 内置 ❌ 无 ❌ 无
LangSmith 集成 ✅ 原生 ❌ 需配置 ❌ 需配置
OpenAPI 文档 ✅ 自动生成 ✅ 自动生成 ⚠️ 需插件

🔧 add_routes 函数

def add_routes(
    app: Union[FastAPI, APIRouter],
    runnable: Runnable,
    *,
    path: str = "",
    input_type: Union[Type, Literal["auto"], BaseModel] = "auto",
    output_type: Union[Type, Literal["auto"], BaseModel] = "auto",
    config_keys: Sequence[str] = ("configurable",),
    per_req_config_modifier: Optional[PerRequestConfigModifier] = None,
    enable_feedback_endpoint: bool = False,
    enable_public_trace_link_endpoint: bool = False,
    disabled_endpoints: Optional[Sequence[EndpointName]] = None,
    enabled_endpoints: Optional[Sequence[EndpointName]] = None,
    playground_type: Literal["default", "chat"] = "default",
    serializer: Optional[Serializer] = None,
) -> None:
    """Register the routes on the given FastAPI app or APIRouter."""

🔧 _EndpointConfiguration

class _EndpointConfiguration:
    """Logic for enabling/disabling endpoints."""
    
    def __init__(
        self,
        *,
        enabled_endpoints: Optional[Sequence[EndpointName]] = None,
        disabled_endpoints: Optional[Sequence[EndpointName]] = None,
        enable_feedback_endpoint: bool = False,
        enable_public_trace_link_endpoint: bool = False,
    ) -> None:
        # 验证:不能同时指定 enabled 和 disabled
        if enabled_endpoints and disabled_endpoints:
            raise ValueError(
                'Cannot specify both "enabled_endpoints" and "disabled_endpoints".'
            )
        
        # 计算每个端点的启用状态
        self.is_invoke_enabled = ...
        self.is_batch_enabled = ...
        self.is_stream_enabled = ...
        self.is_stream_log_enabled = ...
        self.is_stream_events_enabled = ...
        self.is_playground_enabled = ...
        # ... 更多端点状态

🔧 路由注册逻辑

def add_routes(app, runnable, path="", ...):
    # 1. 创建端点配置
    endpoint_configuration = _EndpointConfiguration(
        enabled_endpoints=enabled_endpoints,
        disabled_endpoints=disabled_endpoints,
    )
    
    # 2. 创建 APIHandler
    api_handler = APIHandler(
        runnable,
        path=path,
        input_type=input_type,
        output_type=output_type,
        config_keys=config_keys,
        ...
    )
    
    # 3. 注册 invoke 端点
    if endpoint_configuration.is_invoke_enabled:
        @app.post(f"{namespace}/invoke")
        async def invoke(request: Request) -> Response:
            return await api_handler.invoke(request)
    
    # 4. 注册 batch 端点
    if endpoint_configuration.is_batch_enabled:
        @app.post(f"{namespace}/batch")
        async def batch(request: Request) -> Response:
            return await api_handler.batch(request)
    
    # ... 更多端点注册

⚡ invoke 端点

# 实际处理端点 (不显示在文档中)
@app.post(f"{namespace}/invoke", include_in_schema=False)
async def invoke(request: Request) -> Response:
    """Handle a request."""
    return await api_handler.invoke(request)

# 文档端点 (用于 OpenAPI 生成)
async def _invoke_docs(
    invoke_request: Annotated[InvokeRequest, Body()],
    config_hash: str = "",
) -> InvokeResponse:
    """Invoke the runnable with the given input and config."""
    raise AssertionError("This endpoint should not be reachable.")

invoke_docs = app.post(
    f"{namespace}/invoke",
    response_model=api_handler.InvokeResponse,
    tags=route_tags,
)(_invoke_docs)

⚡ batch 端点

if endpoint_configuration.is_batch_enabled:
    @app.post(f"{namespace}/batch", include_in_schema=False)
    async def batch(request: Request) -> Response:
        return await api_handler.batch(request)
    
    # 支持 Config Hash 变体
    if endpoint_configuration.is_config_hash_enabled:
        @app.post(namespace + "/c/{config_hash}/batch")
        async def batch_with_config(
            request: Request, config_hash: str = ""
        ) -> Response:
            return await api_handler.batch(request, config_hash=config_hash)

# 批量请求格式
{
    "inputs": ["input1", "input2", "input3"],
    "config": {"tags": ["batch"]}  # 或 [{"tags": ["a"]}, {"tags": ["b"]}]
}

🌊 stream 端点

if endpoint_configuration.is_stream_enabled:
    @app.post(f"{namespace}/stream", include_in_schema=False)
    async def stream(request: Request) -> EventSourceResponse:
        """Handle a request."""
        return await api_handler.stream(request)

# SSE 事件格式
# 数据事件
{"event": "data", "data": {"content": "Hello"}}

# 元数据事件
{"event": "metadata", "data": "{\"run_id\": \"xxx\"}"}

# 错误事件
{"event": "error", "data": "{\"status_code\": 500, \"message\": \"...\"}"}

# 结束事件
{"event": "end"}

📋 stream_log 端点

if endpoint_configuration.is_stream_log_enabled:
    @app.post(f"{namespace}/stream_log", include_in_schema=False)
    async def stream_log(request: Request) -> EventSourceResponse:
        return await api_handler.stream_log(request)

# 请求格式
{
    "input": "your input",
    "config": {},
    "include_names": ["Retriever", "LLM"],  # 过滤包含的名称
    "include_types": ["llm", "chain"],       # 过滤包含的类型
    "include_tags": ["important"],           # 过滤包含的标签
    "exclude_names": ["Debug"],              # 排除的名称
    "exclude_types": [],                     # 排除的类型
    "exclude_tags": []                       # 排除的标签
}

# 用途:查看中间步骤、调试链执行过程

🎯 stream_events 端点

if has_astream_events and endpoint_configuration.is_stream_events_enabled:
    @app.post(f"{namespace}/stream_events", include_in_schema=False)
    async def stream_events(request: Request) -> EventSourceResponse:
        return await api_handler.astream_events(request)

# Beta 功能 (需要 langchain-core >= 0.1.14)
# 支持更细粒度的事件流

# 请求格式
{
    "input": "your input",
    "config": {},
    "include_names": [],
    "include_types": [],
    "include_tags": [],
    "exclude_names": [],
    "exclude_types": [],
    "exclude_tags": []
}

# astream_events_version: "v1" | "v2" (默认 v2)

🔧 APIHandler 类

class APIHandler:
    """Implementation of the various API endpoints for a runnable server."""
    
    def __init__(
        self,
        runnable: Runnable,
        *,
        path: str,
        prefix: str = "",
        input_type: Union[Type, Literal["auto"], BaseModel] = "auto",
        output_type: Union[Type, Literal["auto"], BaseModel] = "auto",
        config_keys: Sequence[str] = ("configurable",),
        include_callback_events: bool = False,
        enable_feedback_endpoint: bool = False,
        token_feedback_config: Optional[TokenFeedbackConfig] = None,
        per_req_config_modifier: Optional[PerRequestConfigModifier] = None,
        serializer: Optional[Serializer] = None,
    ): ...

🔧 APIHandler 初始化

def __init__(self, runnable, ...):
    # 1. 验证依赖
    if importlib.util.find_spec("sse_starlette") is None:
        raise ImportError("sse_starlette must be installed")
    
    # 2. 配置验证
    if "run_name" in config_keys:
        raise ValueError("Cannot configure run_name")
    
    # 3. 初始化 LangSmith Client (如果需要)
    self._langsmith_client = (
        ls_client.Client()
        if tracing_is_enabled() and (enable_feedback_endpoint or token_feedback_enabled)
        else None
    )
    
    # 4. 解析输入/输出类型
    input_type_ = _resolve_model(runnable.get_input_schema(), "Input", namespace)
    output_type_ = _resolve_model(runnable.get_output_schema(), "Output", namespace)
    
    # 5. 创建请求/响应模型
    self._InvokeRequest = create_invoke_request_model(...)
    self._InvokeResponse = create_invoke_response_model(...)

⚙️ _get_config_and_input

async def _get_config_and_input(
    self,
    request: Request,
    config_hash: str,
    *,
    endpoint: Optional[str] = None,
    server_config: Optional[RunnableConfig] = None,
) -> Tuple[RunnableConfig, Any]:
    """Extract the config and input from the request."""
    
    # 1. 解析 JSON body
    body = await request.json()
    body = InvokeRequestShallowValidator.model_validate(body)
    
    # 2. 合并 config (hash + body config)
    user_provided_config = await _unpack_request_config(
        config_hash,
        body.config,
        config_keys=self._config_keys,
        model=self._ConfigPayload,
        request=request,
        per_req_config_modifier=self._per_req_config_modifier,
    )
    
    # 3. 更新默认配置
    config = _update_config_with_defaults(
        self._run_name, user_provided_config, request, endpoint=endpoint
    )
    
    # 4. 验证并解包输入
    schema = self._runnable.with_config(config).input_schema
    input_ = schema.model_validate(body.input)
    return config, _unpack_input(input_)

⚡ invoke 方法

async def invoke(self, request: Request, config_hash: str = "") -> Response:
    # 1. 获取 config 和 input
    config, input_ = await self._get_config_and_input(request, config_hash)
    run_id = config["run_id"]
    
    # 2. 添加事件聚合回调
    event_aggregator = AsyncEventAggregatorCallback()
    _add_callbacks(config, [event_aggregator])
    
    # 3. 执行 runnable
    invoke_coro = self._runnable.ainvoke(input_, config=config)
    
    # 4. 如果启用 token feedback,并行创建 feedback token
    if self._token_feedback_enabled:
        output, feedback_token = await asyncio.gather(
            invoke_coro, 
            feedback_coro
        )
    else:
        output = await invoke_coro
    
    # 5. 返回响应
    return _json_encode_response(
        self._InvokeResponse(
            output=self._serializer.dumpd(output),
            callback_events=callback_events,
            metadata=InvokeResponseMetadata(run_id=run_id, ...),
        )
    )

⚡ batch 方法

async def batch(self, request: Request, config_hash: str = "") -> Response:
    body = BatchRequestShallowValidator.model_validate(await request.json())
    
    # 1. 处理 config (单个或列表)
    if isinstance(body.config, list):
        if len(body.config) != len(body.inputs):
            raise HTTPException(422, "Number of configs must match inputs")
        configs = [await _unpack_request_config(...) for config in body.config]
    else:
        configs = await _unpack_request_config(...)
    
    # 2. 为每个输入添加回调聚合器
    aggregators = [AsyncEventAggregatorCallback() for _ in range(len(inputs))]
    
    # 3. 批量执行
    output = await self._runnable.abatch(inputs, config=final_configs)
    
    # 4. 返回响应
    return _json_encode_response(
        self._BatchResponse(
            output=self._serializer.dumpd(output),
            callback_events=callback_events,
            metadata=BatchResponseMetadata(run_ids=run_ids, ...),
        )
    )

🌊 stream 方法

async def stream(self, request: Request, config_hash: str = "") -> EventSourceResponse:
    config, input_ = await self._get_config_and_input(request, config_hash)
    
    async def _stream() -> AsyncIterator[dict]:
        try:
            # 添加回调
            config_w_callbacks = config.copy()
            event_aggregator = AsyncEventAggregatorCallback()
            _add_callbacks(config_w_callbacks, [event_aggregator])
            
            async for chunk in self._runnable.astream(input_, config=config_w_callbacks):
                # 发送元数据事件 (首次)
                if not has_sent_metadata:
                    yield _create_metadata_event(run_id, feedback_key, feedback_token)
                    has_sent_metadata = True
                
                # 发送数据事件
                yield {
                    "data": self._serializer.dumps(chunk).decode("utf-8"),
                    "event": "data",
                }
            
            yield {"event": "end"}
        except BaseException:
            yield {"event": "error", "data": json.dumps({...})}
            raise
    
    return EventSourceResponse(_stream())

📋 stream_log 方法

async def stream_log(self, request: Request, config_hash: str = "") -> EventSourceResponse:
    config, input_ = await self._get_config_and_input(request, config_hash)
    
    # 解析 stream_log 参数
    body = await request.json()
    stream_log_request = StreamLogParameters(**body)
    
    async def _stream_log() -> AsyncIterator[dict]:
        has_sent_metadata = False
        async for chunk in self._runnable.astream_log(
            input_,
            config=config,
            diff=True,
            include_names=stream_log_request.include_names,
            include_types=stream_log_request.include_types,
            include_tags=stream_log_request.include_tags,
            exclude_names=stream_log_request.exclude_names,
            exclude_types=stream_log_request.exclude_types,
            exclude_tags=stream_log_request.exclude_tags,
        ):
            # 发送 patch 事件
            yield {"data": chunk.json(), "event": "data"}
        
        yield {"event": "end"}
    
    return EventSourceResponse(_stream_log())

✅ Validation 模块

"""Code to dynamically create pydantic models for validating requests and responses."""

# 类型别名
Validator = Union[Type[BaseModel], type]

# 核心函数
def create_invoke_request_model(namespace, input_type, config) -> Type[BaseModel]
def create_batch_request_model(namespace, input_type, config) -> Type[BaseModel]
def create_stream_request_model(namespace, input_type, config) -> Type[BaseModel]
def create_stream_log_request_model(namespace, input_type, config) -> Type[BaseModel]
def create_stream_events_request_model(namespace, input_type, config) -> Type[BaseModel]

def create_invoke_response_model(namespace, output_type, include_callbacks) -> Type[BaseModel]
def create_batch_response_model(namespace, output_type, include_callbacks) -> Type[BaseModel]

# 浅验证器 (用于初步解析)
class InvokeRequestShallowValidator(BaseModel)
class BatchRequestShallowValidator(BaseModel)

✅ 请求模型创建

def create_invoke_request_model(
    namespace: str,
    input_type: Validator,
    config: Type[BaseModel],
) -> Type[BaseModel]:
    """Create a pydantic model for the invoke request."""
    invoke_request_type = create_model(
        f"{namespace}InvokeRequest",
        input=(input_type, Field(..., description="The input to the runnable.")),
        config=(
            config,
            Field(
                default_factory=dict,
                description="Subset of RunnableConfig object in LangChain.",
            ),
        ),
        kwargs=(dict, Field(default_factory=dict)),
    )
    invoke_request_type.model_rebuild()
    return invoke_request_type

# 生成的模型示例
class MyChainInvokeRequest(BaseModel):
    input: str
    config: dict = {}
    kwargs: dict = {}

✅ 响应模型创建

def create_invoke_response_model(
    namespace: str,
    output_type: Validator,
    include_callbacks: bool,
) -> Type[BaseModel]:
    fields = {
        "output": (output_type, Field(..., description="The output of the invocation.")),
        "metadata": (InvokeResponseMetadata, Field(..., description="Metadata about the response")),
    }
    
    if include_callbacks:
        fields["callback_events"] = (
            List[CallbackEvent],
            Field(..., description="Callback events generated by the server side."),
        )
    
    invoke_response_type = create_model(
        f"{namespace}InvokeResponse",
        __base__=InvokeBaseResponse,
        **fields,
    )
    return invoke_response_type

# 生成的模型示例
class MyChainInvokeResponse(InvokeBaseResponse):
    output: str
    metadata: InvokeResponseMetadata
    callback_events: List[CallbackEvent]  # 如果启用

✅ Callback 事件验证

# Callback 事件类型定义
class OnChainStart(BaseCallback):
    type: Literal["on_chain_start"] = "on_chain_start"
    inputs: Any

class OnChainEnd(BaseCallback):
    type: Literal["on_chain_end"] = "on_chain_end"
    outputs: Any

class OnChainError(BaseCallback):
    type: Literal["on_chain_error"] = "on_chain_error"
    error: Error

class OnLLMStart(BaseCallback):
    type: Literal["on_llm_start"] = "on_llm_start"
    prompts: List[str]

class OnToolStart(BaseCallback):
    type: Literal["on_tool_start"] = "on_tool_start"
    input_str: str

# 联合类型
CallbackEvent = RootModel[
    Union[OnChainStart, OnChainEnd, OnChainError, OnLLMStart, OnToolStart, ...]
]

📦 Serialization 模块

"""Serialization for well known objects and callback events."""

# 支持的 LangChain 对象类型
WellKnownLCObject = RootModel[
    Annotated[
        Union[
            Annotated[AIMessage, Tag(tag="ai")],
            Annotated[HumanMessage, Tag(tag="human")],
            Annotated[ChatMessage, Tag(tag="chat")],
            Annotated[SystemMessage, Tag(tag="system")],
            Annotated[ToolMessage, Tag(tag="tool")],
            Annotated[Document, Tag(tag="Document")],
            Annotated[AgentAction, Tag(tag="AgentAction")],
            Annotated[AgentFinish, Tag(tag="AgentFinish")],
            Annotated[ChatGeneration, Tag(tag="ChatGeneration")],
            Annotated[LLMResult, Tag(tag="LLMResult")],
            ...
        ],
        Field(discriminator=Discriminator(_get_type)),
    ]
]

📦 WellKnownLCSerializer

class Serializer(abc.ABC):
    def dumpd(self, obj: Any) -> Any:
        """Convert to JSON serializable object."""
        return orjson.loads(self.dumps(obj))
    
    def loads(self, s: bytes) -> Any:
        """Load from JSON string."""
        return self.loadd(orjson.loads(s))
    
    @abc.abstractmethod
    def dumps(self, obj: Any) -> bytes:
        """Dump to JSON byte string."""
    
    @abc.abstractmethod
    def loadd(self, obj: Any) -> Any:
        """Load from python object into well known object."""


class WellKnownLCSerializer(Serializer):
    def dumps(self, obj: Any) -> bytes:
        return orjson.dumps(obj, default=default)
    
    def loadd(self, obj: Any) -> Any:
        return _decode_lc_objects(obj)

📦 对象编解码

def _decode_lc_objects(value: Any) -> Any:
    """Decode the value into well known LangChain objects."""
    if isinstance(value, dict):
        # 递归解码嵌套字典
        v = {key: _decode_lc_objects(v) for key, v in value.items()}
        
        # 尝试解析为 WellKnownLCObject
        try:
            obj = WellKnownLCObject.model_validate(v)
            return obj.root
        except (ValidationError, ValueError, TypeError):
            return v  # 不是已知对象,返回原始字典
    
    elif isinstance(value, list):
        return [_decode_lc_objects(item) for item in value]
    
    else:
        return value

# 示例:解码 AI Message
{"type": "ai", "content": "Hello"} → AIMessage(content="Hello")

⚙️ Config 处理机制

LangServe 提供了多层 Config 合并机制,确保灵活性和安全性。

# Config 优先级(从低到高)
1. 客户端发送的 config (body.config)
2. Config Hash (URL 中的 config_hash)
3. Server Config (server_config 参数)
4. Per-Request Config Modifier (per_req_config_modifier)
5. 默认 Config (LangServe 内部设置)

# 关键函数
async def _unpack_request_config(...) -> Dict[str, Any]:
    """合并所有 config 源"""
    config = merge_configs(*config_dicts)
    projected_config = {k: config[k] for k in config_keys if k in config}
    
    if server_config:
        projected_config = merge_configs(projected_config, server_config)
    
    if per_req_config_modifier:
        projected_config = await per_req_config_modifier(projected_config, request)
    
    return projected_config

⚙️ _unpack_request_config

async def _unpack_request_config(
    *client_sent_configs: Union[BaseModel, Mapping, str],
    config_keys: Sequence[str],
    model: Type[BaseModel],
    request: Request,
    per_req_config_modifier: Optional[PerRequestConfigModifier],
    server_config: Optional[RunnableConfig],
) -> Dict[str, Any]:
    # 1. 解析所有客户端发送的 config
    config_dicts = []
    for config in client_sent_configs:
        if isinstance(config, str):
            # Config Hash: LZString 解压
            config_dicts.append(model(**_config_from_hash(config)).model_dump())
        elif isinstance(config, BaseModel):
            config_dicts.append(config.model_dump())
        elif isinstance(config, Mapping):
            config_dicts.append(model(**config).model_dump())
    
    # 2. 合并 configs (last writer wins)
    config = merge_configs(*config_dicts)
    
    # 3. 投影只允许的 keys
    projected_config = {k: config[k] for k in config_keys if k in config}
    
    # 4. 合并 server config
    if server_config:
        projected_config = merge_configs(projected_config, server_config)
    
    # 5. 应用 per-request modifier
    if per_req_config_modifier:
        projected_config = await per_req_config_modifier(projected_config, request)
    
    return projected_config

⚙️ _update_config_with_defaults

def _update_config_with_defaults(
    run_name: str,
    incoming_config: RunnableConfig,
    request: Request,
    *,
    endpoint: Optional[str] = None,
) -> RunnableConfig:
    """Set up some baseline configuration for the underlying runnable."""
    
    # 不可覆盖的默认配置
    non_overridable_default_config: RunnableConfig = {
        "metadata": {
            "__useragent": request.headers.get("user-agent"),
            "__langserve_version": __version__,
            "__langserve_endpoint": endpoint,  # 如果指定
        },
        "run_name": run_name,  # 设置为 path
    }
    
    # 合并 (last writer wins)
    config = merge_configs(
        overridable_default_config,
        incoming_config,
        non_overridable_default_config,
    )
    
    # 生成 run_id (如果未设置)
    if "run_id" not in config or config["run_id"] is None:
        config["run_id"] = str(uuid.uuid4())
    
    return config

🎫 Feedback Token 机制

class TokenFeedbackConfig(TypedDict):
    """Token feedback configuration."""
    key_configs: List[PerKeyFeedbackConfig]

# 初始化时创建 LangSmith Client
self._langsmith_client = (
    ls_client.Client()
    if tracing_is_enabled() and token_feedback_enabled
    else None
)

# invoke 时并行创建 feedback token
if self._token_feedback_enabled:
    feedback_key = self._token_feedback_config["key_configs"][0]["key"]
    feedback_coro = run_in_executor(
        None,
        self._langsmith_client.create_presigned_feedback_token,
        run_id,
        feedback_key,
    )
    output, feedback_token = await asyncio.gather(invoke_coro, feedback_coro)

# 返回 token 给客户端
metadata=InvokeResponseMetadata(
    run_id=run_id,
    feedback_tokens=[FeedbackToken(
        key=feedback_key,
        token_url=feedback_token.url,
        expires_at=feedback_token.expires_at.isoformat(),
    )],
)

🎮 Playground 功能

# 注册 playground 端点
if endpoint_configuration.is_playground_enabled:
    playground = app.get(
        namespace + "/playground/{file_path:path}",
        dependencies=dependencies,
        include_in_schema=False,
    )(api_handler.playground)
    
    # 支持 Config Hash 变体
    if endpoint_configuration.is_config_hash_enabled:
        app.get(
            namespace + "/c/{config_hash}/playground/{file_path:path}",
            ...
        )(playground)

# Playground 类型
playground_type: Literal["default", "chat"] = "default"

# default: 支持更多输入/输出类型
# chat: UX 优化为聊天交互

📐 设计模式应用

工厂模式

  • create_invoke_request_model
  • create_batch_request_model
  • 动态 Pydantic 模型生成

策略模式

  • Serializer 抽象基类
  • WellKnownLCSerializer 实现
  • 自定义序列化器支持

模板方法模式

  • APIHandler 统一流程
  • _get_config_and_input
  • 响应构建逻辑

观察者模式

  • AsyncEventAggregatorCallback
  • Callback 事件聚合
  • LangSmith 追踪

📊 架构 UML

┌─────────────────────────────────────────────────────┐
│                   FastAPI App                       │
│  ┌─────────────────────────────────────────────┐    │
│  │         add_routes(app, runnable)           │    │
│  │  ┌─────────────────────────────────────┐    │    │
│  │  │     _EndpointConfiguration          │    │    │
│  │  └─────────────────────────────────────┘    │    │
│  │  ┌─────────────────────────────────────┐    │    │
│  │  │           APIHandler                │    │    │
│  │  │  ├─ invoke(request)                 │    │    │
│  │  │  ├─ batch(request)                  │    │    │
│  │  │  ├─ stream(request)                 │    │    │
│  │  │  └─ stream_log(request)             │    │    │
│  │  └─────────────────────────────────────┘    │    │
│  └─────────────────────────────────────────────┘    │
│              ↓            ↓            ↓            │
│  ┌──────────────┐ ┌──────────────┐ ┌────────────┐  │
│  │  Validation  │ │ Serialization│ │  Callbacks │  │
│  └──────────────┘ └──────────────┘ └────────────┘  │
│                       ↓                             │
│  ┌─────────────────────────────────────────────┐    │
│  │              LangChain Runnable             │    │
│  └─────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────┘

🔄 请求处理流程

HTTP Request → FastAPI Route
              ↓
    ┌─────────────────────────┐
    │ APIHandler._get_config_ │
    │ and_input()             │
    │ • 解析 JSON body        │
    │ • 验证 input/config     │
    │ • 合并 config           │
    └───────────┬─────────────┘
                ↓
    ┌─────────────────────────┐
    │ Runnable.ainvoke/       │
    │ astream/abatch          │
    │ • 执行 LangChain chain  │
    │ • 触发 callbacks        │
    └───────────┬─────────────┘
                ↓
    ┌─────────────────────────┐
    │ Serializer.dumpd()      │
    │ • 序列化输出            │
    │ • 处理 LangChain 对象   │
    └───────────┬─────────────┘
                ↓
    ┌─────────────────────────┐
    │ JSONResponse / SSE      │
    │ • 返回 HTTP 响应        │
    └─────────────────────────┘

🌊 流式响应流程

HTTP Request → APIHandler.stream()
              ↓
    ┌─────────────────────────┐
    │ _get_config_and_input() │
    └───────────┬─────────────┘
                ↓
    ┌─────────────────────────┐
    │ 创建 async generator    │
    │ _stream():              │
    │   async for chunk in    │
    │     runnable.astream(): │
    │       yield {           │
    │         "event":"data", │
    │         "data": chunk   │
    │       }                 │
    └───────────┬─────────────┘
                ↓
    ┌─────────────────────────┐
    │ EventSourceResponse     │
    │ • text/event-stream     │
    │ • 实时推送事件          │
    └─────────────────────────┘
              ↓
         Client 接收 SSE 事件

📦 序列化流程

Python Object → Serializer.dumps()
              ↓
    ┌─────────────────────────┐
    │ orjson.dumps(obj,       │
    │   default=default)      │
    │ • 高性能 JSON 序列化    │
    │ • 处理 Pydantic 模型    │
    └───────────┬─────────────┘
                ↓
    ┌─────────────────────────┐
    │ JSON Bytes              │
    │ {"type":"ai",           │
    │  "content":"Hello"}     │
    └───────────┬─────────────┘
                ↓
    ┌─────────────────────────┐
    │ HTTP Response Body      │
    └─────────────────────────┘

# 反序列化
JSON → orjson.loads() → _decode_lc_objects()
                              ↓
                    WellKnownLCObject.model_validate()
                              ↓
                         AIMessage(...)

⚡ 性能优化策略

1. orjson 高性能序列化

  • 比标准 json 快 3-10x
  • Rust 实现
  • 支持 bytes 输出

2. 模型缓存

  • _MODEL_REGISTRY
  • _SEEN_NAMES
  • 避免重复创建 Pydantic 模型

3. 并行执行

  • asyncio.gather
  • invoke + feedback token
  • batch 并发执行

4. 流式响应

  • SSE 实时推送
  • 减少首字节延迟
  • 更好的用户体验

❌ 错误处理机制

# 1. 请求验证错误
except ValidationError as e:
    raise RequestValidationError(e.errors(), body=body)

# 2. JSON 解析错误
except json.JSONDecodeError:
    raise RequestValidationError(errors=["Invalid JSON body"])

# 3. Config 验证错误
if "configurable" in config and "configurable" not in config_keys:
    raise HTTPException(
        422,
        "The config field `configurable` has been disallowed by the server."
    )

# 4. 流式响应错误处理
async def _stream():
    try:
        async for chunk in runnable.astream(...):
            yield {"event": "data", "data": ...}
        yield {"event": "end"}
    except BaseException:
        yield {
            "event": "error",
            "data": json.dumps({
                "status_code": 500,
                "message": "Internal Server Error"
            }),
        }
        raise

🔒 安全考虑

1. Config 投影

  • config_keys 白名单
  • 防止客户端覆盖敏感配置
  • 禁止配置 run_name

2. 错误信息脱敏

  • 不暴露内部错误详情
  • 统一返回 "Internal Server Error"
  • 敏感信息不进入 callback_events

3. 内部元数据隔离

  • __useragent
  • __langserve_version
  • 以 __ 开头的 key 不返回客户端

4. Public Trace Link 警告

  • ⚠️ 暴露内部状态
  • 仅用于调试
  • 生产环境谨慎使用

✅ 最佳实践

类型定义

  • 使用 with_types() 指定输入/输出类型
  • 使用 Pydantic v2 BaseModel
  • 避免 pydantic.v1 模型

Config 管理

  • 使用 per_req_config_modifier 注入认证信息
  • 限制 config_keys 白名单
  • 使用 server_config 设置默认值

端点选择

  • 生产环境禁用 playground
  • 按需启用 feedback 端点
  • 使用 disabled_endpoints 控制暴露面

序列化

  • 优先使用内置序列化器
  • 复杂对象实现自定义 Serializer
  • 测试序列化/反序列化一致性

❌ 常见反模式

反模式 问题 正确做法
在 runnable 中保存状态 并发问题 使用 config 传递状态
使用 Pydantic v1 模型 序列化失败 使用 pydantic.BaseModel
生产环境开启 playground 安全风险 disabled_endpoints=["playground"]
忽略 config_keys 配置泄露 明确指定允许的 keys
过度使用 callback_events 性能下降 仅在调试时启用

🔍 调试技巧

# 1. 启用 callback_events
add_routes(
    app, 
    runnable,
    include_callback_events=True,  # 查看服务器端事件
)

# 2. 使用 stream_log 查看中间步骤
POST /chain/stream_log
{
    "input": "test",
    "include_names": ["*"]  # 包含所有步骤
}

# 3. 检查 Schema 端点
GET /chain/input_schema
GET /chain/output_schema
GET /chain/config_schema

# 4. LangSmith 追踪
# 设置环境变量
LANGSMITH_API_KEY=xxx
LANGSMITH_TRACING=true

# 5. 日志级别
import logging
logging.basicConfig(level=logging.DEBUG)

📊 LangSmith 集成

# 自动追踪
# 设置环境变量后自动启用
LANGSMITH_API_KEY=xxx
LANGSMITH_TRACING=true
LANGSMITH_PROJECT=my-project

# Feedback 端点
add_routes(
    app, runnable,
    enable_feedback_endpoint=True,  # 启用 feedback 端点
)

POST /chain/feedback
{
    "run_id": "xxx",
    "key": "correctness",
    "score": 0.9
}

# Token-based Feedback (更安全)
add_routes(
    app, runnable,
    token_feedback_config={"key_configs": [{"key": "correctness"}]},
)

# 客户端使用返回的 feedback_tokens 提交反馈

🚀 生产部署指南

# 1. 使用 Gunicorn + Uvicorn
gunicorn -w 4 -k uvicorn.workers.UvicornWorker app:app

# 2. 禁用不必要的端点
add_routes(
    app, runnable,
    disabled_endpoints=["playground"],
    enable_feedback_endpoint=False,
    enable_public_trace_link_endpoint=False,
)

# 3. 添加认证
from fastapi import Depends, HTTPException, Header

async def verify_token(authorization: str = Header(...)):
    if not authorization.startswith("Bearer "):
        raise HTTPException(401)
    # 验证 token...

add_routes(app, runnable, dependencies=[Depends(verify_token)])

# 4. 限流
from slowapi import Limiter
limiter = Limiter(key_func=get_remote_address)

@app.post("/chain/invoke", dependencies=[Depends(limiter.limit("10/minute"))])

🔮 未来发展方向

短期

  • stream_events 稳定版
  • 更多 Playground 功能
  • 性能优化
  • 更好的类型推断

中期

  • WebSocket 支持
  • 批量流式响应
  • 更多 LangSmith 功能

长期

  • 多模型路由
  • 自动扩缩容
  • 边缘部署支持
  • gRPC 支持

社区趋势

  • 与 LangGraph 深度集成
  • 更多预构建模板
  • 监控仪表板

📚 总结与扩展阅读

核心要点:

  • add_routes 是入口,自动注册 FastAPI 路由
  • APIHandler 负责请求验证、Config 处理、调用 Runnable
  • Validation 模块动态生成 Pydantic 模型
  • Serialization 模块处理 LangChain 对象编解码
  • 流式响应使用 SSE (Server-Sent Events)
  • LangSmith 集成提供追踪和反馈能力

推荐资源:

  • 官方文档: https://python.langchain.com/docs/langserve
  • GitHub: https://github.com/langchain-ai/langserve
  • 示例: https://github.com/langchain-ai/langserve/tree/main/examples

💡 完整示例

from fastapi import FastAPI
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langserve import add_routes

# 1. 创建 chain
prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
model = ChatOpenAI()
parser = StrOutputParser()
chain = prompt | model | parser

# 2. 创建 FastAPI app
app = FastAPI(title="My LangChain Server")

# 3. 添加路由
add_routes(
    app,
    chain,
    path="/joke",
    enable_feedback_endpoint=True,
    disabled_endpoints=["playground"],  # 生产环境禁用
)

# 4. 运行
# uvicorn app:app --reload

# 访问
# POST /joke/invoke    {"input": {"topic": "cats"}}
# POST /joke/stream    {"input": {"topic": "cats"}}
# GET  /joke/input_schema
# GET  /joke/playground/