基于 Python SDK 源码分析
2026-03-23 | 技术深度解读
第一部分:基础架构
第二部分:核心类解析
第三部分:核心机制
第四部分:实践指南
LangServe 是 LangChain 官方提供的部署框架,让开发者能够将 LangChain Runnable 快速部署为生产级 REST API。
核心特性
API 端点类型
Schema 端点
┌─────────────────────────────────────────────────────┐
│ LangServe 架构 │
├─────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────┐ │
│ │ FastAPI Application │ │
│ │ add_routes(app, runnable, path="/chain") │ │
│ └─────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────┐ │
│ │ APIHandler │ │
│ │ • 请求验证与解析 │ │
│ │ • Config 处理 │ │
│ │ • 调用 Runnable │ │
│ │ • 响应序列化 │ │
│ └─────────────────────────────────────────────┘ │
│ ↓ ↓ ↓ │
│ ┌──────────────┐ ┌──────────────┐ ┌────────────┐ │
│ │ Validation │ │ Serialization│ │ Callbacks │ │
│ │ 请求/响应 │ │ LangChain │ │ 事件聚合 │ │
│ │ 模型生成 │ │ 对象编解码 │ │ 回调处理 │ │
│ └──────────────┘ └──────────────┘ └────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────┐ │
│ │ LangChain Runnable │ │
│ │ • Chain / LLM / Agent / Retriever │ │
│ └─────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
POST /{path}/invoke # 单次调用
POST /{path}/batch # 批量调用
POST /{path}/stream # 流式输出 (SSE)
POST /{path}/stream_log # 带中间步骤日志的流式
POST /{path}/stream_events # 事件流 (Beta)
GET /{path}/input_schema # 输入 JSON Schema
GET /{path}/output_schema # 输出 JSON Schema
GET /{path}/config_schema # 配置 JSON Schema
GET /{path}/playground/ # 交互式 Playground UI
POST /{path}/feedback # LangSmith 反馈
POST /{path}/public_trace_link # 公开追踪链接
# Config Hash 变体 (用于 Playground 分享链接)
POST /{path}/c/{config_hash}/invoke
POST /{path}/c/{config_hash}/batch
...
| 组件 | 文件 | 职责 |
|---|---|---|
| add_routes | server.py | 注册 FastAPI 路由 |
| APIHandler | api_handler.py | 处理 API 请求 |
| Validation | validation.py | 动态 Pydantic 模型 |
| Serialization | serialization.py | 对象序列化/反序列化 |
| Callbacks | callbacks.py | 事件聚合 |
| Playground | playground.py | 交互式 UI |
1. 零配置部署
2. 类型安全
3. 流式优先
4. 可观测性
| 特性 | LangServe | FastAPI 原生 | Flask + LangChain |
|---|---|---|---|
| 自动路由生成 | ✅ add_routes | ❌ 手动 | ❌ 手动 |
| 流式响应 | ✅ 内置 SSE | ⚠️ 需要实现 | ⚠️ 需要实现 |
| 类型验证 | ✅ 自动推断 | ✅ Pydantic | ⚠️ 手动 |
| Playground UI | ✅ 内置 | ❌ 无 | ❌ 无 |
| LangSmith 集成 | ✅ 原生 | ❌ 需配置 | ❌ 需配置 |
| OpenAPI 文档 | ✅ 自动生成 | ✅ 自动生成 | ⚠️ 需插件 |
def add_routes(
app: Union[FastAPI, APIRouter],
runnable: Runnable,
*,
path: str = "",
input_type: Union[Type, Literal["auto"], BaseModel] = "auto",
output_type: Union[Type, Literal["auto"], BaseModel] = "auto",
config_keys: Sequence[str] = ("configurable",),
per_req_config_modifier: Optional[PerRequestConfigModifier] = None,
enable_feedback_endpoint: bool = False,
enable_public_trace_link_endpoint: bool = False,
disabled_endpoints: Optional[Sequence[EndpointName]] = None,
enabled_endpoints: Optional[Sequence[EndpointName]] = None,
playground_type: Literal["default", "chat"] = "default",
serializer: Optional[Serializer] = None,
) -> None:
"""Register the routes on the given FastAPI app or APIRouter."""
class _EndpointConfiguration:
"""Logic for enabling/disabling endpoints."""
def __init__(
self,
*,
enabled_endpoints: Optional[Sequence[EndpointName]] = None,
disabled_endpoints: Optional[Sequence[EndpointName]] = None,
enable_feedback_endpoint: bool = False,
enable_public_trace_link_endpoint: bool = False,
) -> None:
# 验证:不能同时指定 enabled 和 disabled
if enabled_endpoints and disabled_endpoints:
raise ValueError(
'Cannot specify both "enabled_endpoints" and "disabled_endpoints".'
)
# 计算每个端点的启用状态
self.is_invoke_enabled = ...
self.is_batch_enabled = ...
self.is_stream_enabled = ...
self.is_stream_log_enabled = ...
self.is_stream_events_enabled = ...
self.is_playground_enabled = ...
# ... 更多端点状态
def add_routes(app, runnable, path="", ...):
# 1. 创建端点配置
endpoint_configuration = _EndpointConfiguration(
enabled_endpoints=enabled_endpoints,
disabled_endpoints=disabled_endpoints,
)
# 2. 创建 APIHandler
api_handler = APIHandler(
runnable,
path=path,
input_type=input_type,
output_type=output_type,
config_keys=config_keys,
...
)
# 3. 注册 invoke 端点
if endpoint_configuration.is_invoke_enabled:
@app.post(f"{namespace}/invoke")
async def invoke(request: Request) -> Response:
return await api_handler.invoke(request)
# 4. 注册 batch 端点
if endpoint_configuration.is_batch_enabled:
@app.post(f"{namespace}/batch")
async def batch(request: Request) -> Response:
return await api_handler.batch(request)
# ... 更多端点注册
# 实际处理端点 (不显示在文档中)
@app.post(f"{namespace}/invoke", include_in_schema=False)
async def invoke(request: Request) -> Response:
"""Handle a request."""
return await api_handler.invoke(request)
# 文档端点 (用于 OpenAPI 生成)
async def _invoke_docs(
invoke_request: Annotated[InvokeRequest, Body()],
config_hash: str = "",
) -> InvokeResponse:
"""Invoke the runnable with the given input and config."""
raise AssertionError("This endpoint should not be reachable.")
invoke_docs = app.post(
f"{namespace}/invoke",
response_model=api_handler.InvokeResponse,
tags=route_tags,
)(_invoke_docs)
if endpoint_configuration.is_batch_enabled:
@app.post(f"{namespace}/batch", include_in_schema=False)
async def batch(request: Request) -> Response:
return await api_handler.batch(request)
# 支持 Config Hash 变体
if endpoint_configuration.is_config_hash_enabled:
@app.post(namespace + "/c/{config_hash}/batch")
async def batch_with_config(
request: Request, config_hash: str = ""
) -> Response:
return await api_handler.batch(request, config_hash=config_hash)
# 批量请求格式
{
"inputs": ["input1", "input2", "input3"],
"config": {"tags": ["batch"]} # 或 [{"tags": ["a"]}, {"tags": ["b"]}]
}
if endpoint_configuration.is_stream_enabled:
@app.post(f"{namespace}/stream", include_in_schema=False)
async def stream(request: Request) -> EventSourceResponse:
"""Handle a request."""
return await api_handler.stream(request)
# SSE 事件格式
# 数据事件
{"event": "data", "data": {"content": "Hello"}}
# 元数据事件
{"event": "metadata", "data": "{\"run_id\": \"xxx\"}"}
# 错误事件
{"event": "error", "data": "{\"status_code\": 500, \"message\": \"...\"}"}
# 结束事件
{"event": "end"}
if endpoint_configuration.is_stream_log_enabled:
@app.post(f"{namespace}/stream_log", include_in_schema=False)
async def stream_log(request: Request) -> EventSourceResponse:
return await api_handler.stream_log(request)
# 请求格式
{
"input": "your input",
"config": {},
"include_names": ["Retriever", "LLM"], # 过滤包含的名称
"include_types": ["llm", "chain"], # 过滤包含的类型
"include_tags": ["important"], # 过滤包含的标签
"exclude_names": ["Debug"], # 排除的名称
"exclude_types": [], # 排除的类型
"exclude_tags": [] # 排除的标签
}
# 用途:查看中间步骤、调试链执行过程
if has_astream_events and endpoint_configuration.is_stream_events_enabled:
@app.post(f"{namespace}/stream_events", include_in_schema=False)
async def stream_events(request: Request) -> EventSourceResponse:
return await api_handler.astream_events(request)
# Beta 功能 (需要 langchain-core >= 0.1.14)
# 支持更细粒度的事件流
# 请求格式
{
"input": "your input",
"config": {},
"include_names": [],
"include_types": [],
"include_tags": [],
"exclude_names": [],
"exclude_types": [],
"exclude_tags": []
}
# astream_events_version: "v1" | "v2" (默认 v2)
class APIHandler:
"""Implementation of the various API endpoints for a runnable server."""
def __init__(
self,
runnable: Runnable,
*,
path: str,
prefix: str = "",
input_type: Union[Type, Literal["auto"], BaseModel] = "auto",
output_type: Union[Type, Literal["auto"], BaseModel] = "auto",
config_keys: Sequence[str] = ("configurable",),
include_callback_events: bool = False,
enable_feedback_endpoint: bool = False,
token_feedback_config: Optional[TokenFeedbackConfig] = None,
per_req_config_modifier: Optional[PerRequestConfigModifier] = None,
serializer: Optional[Serializer] = None,
): ...
def __init__(self, runnable, ...):
# 1. 验证依赖
if importlib.util.find_spec("sse_starlette") is None:
raise ImportError("sse_starlette must be installed")
# 2. 配置验证
if "run_name" in config_keys:
raise ValueError("Cannot configure run_name")
# 3. 初始化 LangSmith Client (如果需要)
self._langsmith_client = (
ls_client.Client()
if tracing_is_enabled() and (enable_feedback_endpoint or token_feedback_enabled)
else None
)
# 4. 解析输入/输出类型
input_type_ = _resolve_model(runnable.get_input_schema(), "Input", namespace)
output_type_ = _resolve_model(runnable.get_output_schema(), "Output", namespace)
# 5. 创建请求/响应模型
self._InvokeRequest = create_invoke_request_model(...)
self._InvokeResponse = create_invoke_response_model(...)
async def _get_config_and_input(
self,
request: Request,
config_hash: str,
*,
endpoint: Optional[str] = None,
server_config: Optional[RunnableConfig] = None,
) -> Tuple[RunnableConfig, Any]:
"""Extract the config and input from the request."""
# 1. 解析 JSON body
body = await request.json()
body = InvokeRequestShallowValidator.model_validate(body)
# 2. 合并 config (hash + body config)
user_provided_config = await _unpack_request_config(
config_hash,
body.config,
config_keys=self._config_keys,
model=self._ConfigPayload,
request=request,
per_req_config_modifier=self._per_req_config_modifier,
)
# 3. 更新默认配置
config = _update_config_with_defaults(
self._run_name, user_provided_config, request, endpoint=endpoint
)
# 4. 验证并解包输入
schema = self._runnable.with_config(config).input_schema
input_ = schema.model_validate(body.input)
return config, _unpack_input(input_)
async def invoke(self, request: Request, config_hash: str = "") -> Response:
# 1. 获取 config 和 input
config, input_ = await self._get_config_and_input(request, config_hash)
run_id = config["run_id"]
# 2. 添加事件聚合回调
event_aggregator = AsyncEventAggregatorCallback()
_add_callbacks(config, [event_aggregator])
# 3. 执行 runnable
invoke_coro = self._runnable.ainvoke(input_, config=config)
# 4. 如果启用 token feedback,并行创建 feedback token
if self._token_feedback_enabled:
output, feedback_token = await asyncio.gather(
invoke_coro,
feedback_coro
)
else:
output = await invoke_coro
# 5. 返回响应
return _json_encode_response(
self._InvokeResponse(
output=self._serializer.dumpd(output),
callback_events=callback_events,
metadata=InvokeResponseMetadata(run_id=run_id, ...),
)
)
async def batch(self, request: Request, config_hash: str = "") -> Response:
body = BatchRequestShallowValidator.model_validate(await request.json())
# 1. 处理 config (单个或列表)
if isinstance(body.config, list):
if len(body.config) != len(body.inputs):
raise HTTPException(422, "Number of configs must match inputs")
configs = [await _unpack_request_config(...) for config in body.config]
else:
configs = await _unpack_request_config(...)
# 2. 为每个输入添加回调聚合器
aggregators = [AsyncEventAggregatorCallback() for _ in range(len(inputs))]
# 3. 批量执行
output = await self._runnable.abatch(inputs, config=final_configs)
# 4. 返回响应
return _json_encode_response(
self._BatchResponse(
output=self._serializer.dumpd(output),
callback_events=callback_events,
metadata=BatchResponseMetadata(run_ids=run_ids, ...),
)
)
async def stream(self, request: Request, config_hash: str = "") -> EventSourceResponse:
config, input_ = await self._get_config_and_input(request, config_hash)
async def _stream() -> AsyncIterator[dict]:
try:
# 添加回调
config_w_callbacks = config.copy()
event_aggregator = AsyncEventAggregatorCallback()
_add_callbacks(config_w_callbacks, [event_aggregator])
async for chunk in self._runnable.astream(input_, config=config_w_callbacks):
# 发送元数据事件 (首次)
if not has_sent_metadata:
yield _create_metadata_event(run_id, feedback_key, feedback_token)
has_sent_metadata = True
# 发送数据事件
yield {
"data": self._serializer.dumps(chunk).decode("utf-8"),
"event": "data",
}
yield {"event": "end"}
except BaseException:
yield {"event": "error", "data": json.dumps({...})}
raise
return EventSourceResponse(_stream())
async def stream_log(self, request: Request, config_hash: str = "") -> EventSourceResponse:
config, input_ = await self._get_config_and_input(request, config_hash)
# 解析 stream_log 参数
body = await request.json()
stream_log_request = StreamLogParameters(**body)
async def _stream_log() -> AsyncIterator[dict]:
has_sent_metadata = False
async for chunk in self._runnable.astream_log(
input_,
config=config,
diff=True,
include_names=stream_log_request.include_names,
include_types=stream_log_request.include_types,
include_tags=stream_log_request.include_tags,
exclude_names=stream_log_request.exclude_names,
exclude_types=stream_log_request.exclude_types,
exclude_tags=stream_log_request.exclude_tags,
):
# 发送 patch 事件
yield {"data": chunk.json(), "event": "data"}
yield {"event": "end"}
return EventSourceResponse(_stream_log())
"""Code to dynamically create pydantic models for validating requests and responses."""
# 类型别名
Validator = Union[Type[BaseModel], type]
# 核心函数
def create_invoke_request_model(namespace, input_type, config) -> Type[BaseModel]
def create_batch_request_model(namespace, input_type, config) -> Type[BaseModel]
def create_stream_request_model(namespace, input_type, config) -> Type[BaseModel]
def create_stream_log_request_model(namespace, input_type, config) -> Type[BaseModel]
def create_stream_events_request_model(namespace, input_type, config) -> Type[BaseModel]
def create_invoke_response_model(namespace, output_type, include_callbacks) -> Type[BaseModel]
def create_batch_response_model(namespace, output_type, include_callbacks) -> Type[BaseModel]
# 浅验证器 (用于初步解析)
class InvokeRequestShallowValidator(BaseModel)
class BatchRequestShallowValidator(BaseModel)
def create_invoke_request_model(
namespace: str,
input_type: Validator,
config: Type[BaseModel],
) -> Type[BaseModel]:
"""Create a pydantic model for the invoke request."""
invoke_request_type = create_model(
f"{namespace}InvokeRequest",
input=(input_type, Field(..., description="The input to the runnable.")),
config=(
config,
Field(
default_factory=dict,
description="Subset of RunnableConfig object in LangChain.",
),
),
kwargs=(dict, Field(default_factory=dict)),
)
invoke_request_type.model_rebuild()
return invoke_request_type
# 生成的模型示例
class MyChainInvokeRequest(BaseModel):
input: str
config: dict = {}
kwargs: dict = {}
def create_invoke_response_model(
namespace: str,
output_type: Validator,
include_callbacks: bool,
) -> Type[BaseModel]:
fields = {
"output": (output_type, Field(..., description="The output of the invocation.")),
"metadata": (InvokeResponseMetadata, Field(..., description="Metadata about the response")),
}
if include_callbacks:
fields["callback_events"] = (
List[CallbackEvent],
Field(..., description="Callback events generated by the server side."),
)
invoke_response_type = create_model(
f"{namespace}InvokeResponse",
__base__=InvokeBaseResponse,
**fields,
)
return invoke_response_type
# 生成的模型示例
class MyChainInvokeResponse(InvokeBaseResponse):
output: str
metadata: InvokeResponseMetadata
callback_events: List[CallbackEvent] # 如果启用
# Callback 事件类型定义
class OnChainStart(BaseCallback):
type: Literal["on_chain_start"] = "on_chain_start"
inputs: Any
class OnChainEnd(BaseCallback):
type: Literal["on_chain_end"] = "on_chain_end"
outputs: Any
class OnChainError(BaseCallback):
type: Literal["on_chain_error"] = "on_chain_error"
error: Error
class OnLLMStart(BaseCallback):
type: Literal["on_llm_start"] = "on_llm_start"
prompts: List[str]
class OnToolStart(BaseCallback):
type: Literal["on_tool_start"] = "on_tool_start"
input_str: str
# 联合类型
CallbackEvent = RootModel[
Union[OnChainStart, OnChainEnd, OnChainError, OnLLMStart, OnToolStart, ...]
]
"""Serialization for well known objects and callback events."""
# 支持的 LangChain 对象类型
WellKnownLCObject = RootModel[
Annotated[
Union[
Annotated[AIMessage, Tag(tag="ai")],
Annotated[HumanMessage, Tag(tag="human")],
Annotated[ChatMessage, Tag(tag="chat")],
Annotated[SystemMessage, Tag(tag="system")],
Annotated[ToolMessage, Tag(tag="tool")],
Annotated[Document, Tag(tag="Document")],
Annotated[AgentAction, Tag(tag="AgentAction")],
Annotated[AgentFinish, Tag(tag="AgentFinish")],
Annotated[ChatGeneration, Tag(tag="ChatGeneration")],
Annotated[LLMResult, Tag(tag="LLMResult")],
...
],
Field(discriminator=Discriminator(_get_type)),
]
]
class Serializer(abc.ABC):
def dumpd(self, obj: Any) -> Any:
"""Convert to JSON serializable object."""
return orjson.loads(self.dumps(obj))
def loads(self, s: bytes) -> Any:
"""Load from JSON string."""
return self.loadd(orjson.loads(s))
@abc.abstractmethod
def dumps(self, obj: Any) -> bytes:
"""Dump to JSON byte string."""
@abc.abstractmethod
def loadd(self, obj: Any) -> Any:
"""Load from python object into well known object."""
class WellKnownLCSerializer(Serializer):
def dumps(self, obj: Any) -> bytes:
return orjson.dumps(obj, default=default)
def loadd(self, obj: Any) -> Any:
return _decode_lc_objects(obj)
def _decode_lc_objects(value: Any) -> Any:
"""Decode the value into well known LangChain objects."""
if isinstance(value, dict):
# 递归解码嵌套字典
v = {key: _decode_lc_objects(v) for key, v in value.items()}
# 尝试解析为 WellKnownLCObject
try:
obj = WellKnownLCObject.model_validate(v)
return obj.root
except (ValidationError, ValueError, TypeError):
return v # 不是已知对象,返回原始字典
elif isinstance(value, list):
return [_decode_lc_objects(item) for item in value]
else:
return value
# 示例:解码 AI Message
{"type": "ai", "content": "Hello"} → AIMessage(content="Hello")
LangServe 提供了多层 Config 合并机制,确保灵活性和安全性。
# Config 优先级(从低到高)
1. 客户端发送的 config (body.config)
2. Config Hash (URL 中的 config_hash)
3. Server Config (server_config 参数)
4. Per-Request Config Modifier (per_req_config_modifier)
5. 默认 Config (LangServe 内部设置)
# 关键函数
async def _unpack_request_config(...) -> Dict[str, Any]:
"""合并所有 config 源"""
config = merge_configs(*config_dicts)
projected_config = {k: config[k] for k in config_keys if k in config}
if server_config:
projected_config = merge_configs(projected_config, server_config)
if per_req_config_modifier:
projected_config = await per_req_config_modifier(projected_config, request)
return projected_config
async def _unpack_request_config(
*client_sent_configs: Union[BaseModel, Mapping, str],
config_keys: Sequence[str],
model: Type[BaseModel],
request: Request,
per_req_config_modifier: Optional[PerRequestConfigModifier],
server_config: Optional[RunnableConfig],
) -> Dict[str, Any]:
# 1. 解析所有客户端发送的 config
config_dicts = []
for config in client_sent_configs:
if isinstance(config, str):
# Config Hash: LZString 解压
config_dicts.append(model(**_config_from_hash(config)).model_dump())
elif isinstance(config, BaseModel):
config_dicts.append(config.model_dump())
elif isinstance(config, Mapping):
config_dicts.append(model(**config).model_dump())
# 2. 合并 configs (last writer wins)
config = merge_configs(*config_dicts)
# 3. 投影只允许的 keys
projected_config = {k: config[k] for k in config_keys if k in config}
# 4. 合并 server config
if server_config:
projected_config = merge_configs(projected_config, server_config)
# 5. 应用 per-request modifier
if per_req_config_modifier:
projected_config = await per_req_config_modifier(projected_config, request)
return projected_config
def _update_config_with_defaults(
run_name: str,
incoming_config: RunnableConfig,
request: Request,
*,
endpoint: Optional[str] = None,
) -> RunnableConfig:
"""Set up some baseline configuration for the underlying runnable."""
# 不可覆盖的默认配置
non_overridable_default_config: RunnableConfig = {
"metadata": {
"__useragent": request.headers.get("user-agent"),
"__langserve_version": __version__,
"__langserve_endpoint": endpoint, # 如果指定
},
"run_name": run_name, # 设置为 path
}
# 合并 (last writer wins)
config = merge_configs(
overridable_default_config,
incoming_config,
non_overridable_default_config,
)
# 生成 run_id (如果未设置)
if "run_id" not in config or config["run_id"] is None:
config["run_id"] = str(uuid.uuid4())
return config
class TokenFeedbackConfig(TypedDict):
"""Token feedback configuration."""
key_configs: List[PerKeyFeedbackConfig]
# 初始化时创建 LangSmith Client
self._langsmith_client = (
ls_client.Client()
if tracing_is_enabled() and token_feedback_enabled
else None
)
# invoke 时并行创建 feedback token
if self._token_feedback_enabled:
feedback_key = self._token_feedback_config["key_configs"][0]["key"]
feedback_coro = run_in_executor(
None,
self._langsmith_client.create_presigned_feedback_token,
run_id,
feedback_key,
)
output, feedback_token = await asyncio.gather(invoke_coro, feedback_coro)
# 返回 token 给客户端
metadata=InvokeResponseMetadata(
run_id=run_id,
feedback_tokens=[FeedbackToken(
key=feedback_key,
token_url=feedback_token.url,
expires_at=feedback_token.expires_at.isoformat(),
)],
)
# 注册 playground 端点
if endpoint_configuration.is_playground_enabled:
playground = app.get(
namespace + "/playground/{file_path:path}",
dependencies=dependencies,
include_in_schema=False,
)(api_handler.playground)
# 支持 Config Hash 变体
if endpoint_configuration.is_config_hash_enabled:
app.get(
namespace + "/c/{config_hash}/playground/{file_path:path}",
...
)(playground)
# Playground 类型
playground_type: Literal["default", "chat"] = "default"
# default: 支持更多输入/输出类型
# chat: UX 优化为聊天交互
工厂模式
策略模式
模板方法模式
观察者模式
┌─────────────────────────────────────────────────────┐
│ FastAPI App │
│ ┌─────────────────────────────────────────────┐ │
│ │ add_routes(app, runnable) │ │
│ │ ┌─────────────────────────────────────┐ │ │
│ │ │ _EndpointConfiguration │ │ │
│ │ └─────────────────────────────────────┘ │ │
│ │ ┌─────────────────────────────────────┐ │ │
│ │ │ APIHandler │ │ │
│ │ │ ├─ invoke(request) │ │ │
│ │ │ ├─ batch(request) │ │ │
│ │ │ ├─ stream(request) │ │ │
│ │ │ └─ stream_log(request) │ │ │
│ │ └─────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────┘ │
│ ↓ ↓ ↓ │
│ ┌──────────────┐ ┌──────────────┐ ┌────────────┐ │
│ │ Validation │ │ Serialization│ │ Callbacks │ │
│ └──────────────┘ └──────────────┘ └────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────┐ │
│ │ LangChain Runnable │ │
│ └─────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
HTTP Request → FastAPI Route
↓
┌─────────────────────────┐
│ APIHandler._get_config_ │
│ and_input() │
│ • 解析 JSON body │
│ • 验证 input/config │
│ • 合并 config │
└───────────┬─────────────┘
↓
┌─────────────────────────┐
│ Runnable.ainvoke/ │
│ astream/abatch │
│ • 执行 LangChain chain │
│ • 触发 callbacks │
└───────────┬─────────────┘
↓
┌─────────────────────────┐
│ Serializer.dumpd() │
│ • 序列化输出 │
│ • 处理 LangChain 对象 │
└───────────┬─────────────┘
↓
┌─────────────────────────┐
│ JSONResponse / SSE │
│ • 返回 HTTP 响应 │
└─────────────────────────┘
HTTP Request → APIHandler.stream()
↓
┌─────────────────────────┐
│ _get_config_and_input() │
└───────────┬─────────────┘
↓
┌─────────────────────────┐
│ 创建 async generator │
│ _stream(): │
│ async for chunk in │
│ runnable.astream(): │
│ yield { │
│ "event":"data", │
│ "data": chunk │
│ } │
└───────────┬─────────────┘
↓
┌─────────────────────────┐
│ EventSourceResponse │
│ • text/event-stream │
│ • 实时推送事件 │
└─────────────────────────┘
↓
Client 接收 SSE 事件
Python Object → Serializer.dumps()
↓
┌─────────────────────────┐
│ orjson.dumps(obj, │
│ default=default) │
│ • 高性能 JSON 序列化 │
│ • 处理 Pydantic 模型 │
└───────────┬─────────────┘
↓
┌─────────────────────────┐
│ JSON Bytes │
│ {"type":"ai", │
│ "content":"Hello"} │
└───────────┬─────────────┘
↓
┌─────────────────────────┐
│ HTTP Response Body │
└─────────────────────────┘
# 反序列化
JSON → orjson.loads() → _decode_lc_objects()
↓
WellKnownLCObject.model_validate()
↓
AIMessage(...)
1. orjson 高性能序列化
2. 模型缓存
3. 并行执行
4. 流式响应
# 1. 请求验证错误
except ValidationError as e:
raise RequestValidationError(e.errors(), body=body)
# 2. JSON 解析错误
except json.JSONDecodeError:
raise RequestValidationError(errors=["Invalid JSON body"])
# 3. Config 验证错误
if "configurable" in config and "configurable" not in config_keys:
raise HTTPException(
422,
"The config field `configurable` has been disallowed by the server."
)
# 4. 流式响应错误处理
async def _stream():
try:
async for chunk in runnable.astream(...):
yield {"event": "data", "data": ...}
yield {"event": "end"}
except BaseException:
yield {
"event": "error",
"data": json.dumps({
"status_code": 500,
"message": "Internal Server Error"
}),
}
raise
1. Config 投影
2. 错误信息脱敏
3. 内部元数据隔离
4. Public Trace Link 警告
类型定义
Config 管理
端点选择
序列化
| 反模式 | 问题 | 正确做法 |
|---|---|---|
| 在 runnable 中保存状态 | 并发问题 | 使用 config 传递状态 |
| 使用 Pydantic v1 模型 | 序列化失败 | 使用 pydantic.BaseModel |
| 生产环境开启 playground | 安全风险 | disabled_endpoints=["playground"] |
| 忽略 config_keys | 配置泄露 | 明确指定允许的 keys |
| 过度使用 callback_events | 性能下降 | 仅在调试时启用 |
# 1. 启用 callback_events
add_routes(
app,
runnable,
include_callback_events=True, # 查看服务器端事件
)
# 2. 使用 stream_log 查看中间步骤
POST /chain/stream_log
{
"input": "test",
"include_names": ["*"] # 包含所有步骤
}
# 3. 检查 Schema 端点
GET /chain/input_schema
GET /chain/output_schema
GET /chain/config_schema
# 4. LangSmith 追踪
# 设置环境变量
LANGSMITH_API_KEY=xxx
LANGSMITH_TRACING=true
# 5. 日志级别
import logging
logging.basicConfig(level=logging.DEBUG)
# 自动追踪
# 设置环境变量后自动启用
LANGSMITH_API_KEY=xxx
LANGSMITH_TRACING=true
LANGSMITH_PROJECT=my-project
# Feedback 端点
add_routes(
app, runnable,
enable_feedback_endpoint=True, # 启用 feedback 端点
)
POST /chain/feedback
{
"run_id": "xxx",
"key": "correctness",
"score": 0.9
}
# Token-based Feedback (更安全)
add_routes(
app, runnable,
token_feedback_config={"key_configs": [{"key": "correctness"}]},
)
# 客户端使用返回的 feedback_tokens 提交反馈
# 1. 使用 Gunicorn + Uvicorn
gunicorn -w 4 -k uvicorn.workers.UvicornWorker app:app
# 2. 禁用不必要的端点
add_routes(
app, runnable,
disabled_endpoints=["playground"],
enable_feedback_endpoint=False,
enable_public_trace_link_endpoint=False,
)
# 3. 添加认证
from fastapi import Depends, HTTPException, Header
async def verify_token(authorization: str = Header(...)):
if not authorization.startswith("Bearer "):
raise HTTPException(401)
# 验证 token...
add_routes(app, runnable, dependencies=[Depends(verify_token)])
# 4. 限流
from slowapi import Limiter
limiter = Limiter(key_func=get_remote_address)
@app.post("/chain/invoke", dependencies=[Depends(limiter.limit("10/minute"))])
短期
中期
长期
社区趋势
核心要点:
推荐资源:
from fastapi import FastAPI
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langserve import add_routes
# 1. 创建 chain
prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
model = ChatOpenAI()
parser = StrOutputParser()
chain = prompt | model | parser
# 2. 创建 FastAPI app
app = FastAPI(title="My LangChain Server")
# 3. 添加路由
add_routes(
app,
chain,
path="/joke",
enable_feedback_endpoint=True,
disabled_endpoints=["playground"], # 生产环境禁用
)
# 4. 运行
# uvicorn app:app --reload
# 访问
# POST /joke/invoke {"input": {"topic": "cats"}}
# POST /joke/stream {"input": {"topic": "cats"}}
# GET /joke/input_schema
# GET /joke/playground/