源码级别解析 · 深度剖析·状态化工作流·智能体编排
2026-04-19 | 每日技术深度解读
相比传统框架,LangGraph 提供更底层、更灵活的编排能力
pip install -U langgraph
# 或使用 uv
uv add langgraph
支持 pip 和 uv 两种安装方式
from langgraph.graph import StateGraph, MessagesState, START, END
def mock_llm(state: MessagesState):
return {"messages": [{"role": "ai", "content": "hello world"}]}
graph = StateGraph(MessagesState)
graph.add_node(mock_llm)
graph.add_edge(START, "mock_llm")
graph.add_edge("mock_llm", END)
graph = graph.compile()
LangGraph 的基础架构模式
LangGraph 的核心架构组件及其交互关系
from typing_extensions import TypedDict, Annotated
from langchain.messages import AnyMessage
import operator
class MessagesState(TypedDict):
messages: Annotated[list[AnyMessage], operator.add]
llm_calls: int
tool_results: list[dict]
LangGraph 的状态定义模式,使用类型注解确保状态一致性
from langchain.messages import SystemMessage
def llm_call(state: dict):
"""LLM 节点:决定是否调用工具"""
return {
"messages": [
model_with_tools.invoke(
[
SystemMessage(
content="You are a helpful assistant..."
)
] + state["messages"]
)
],
"llm_calls": state.get('llm_calls', 0) + 1
}
LLM 节点的标准实现模式
from langchain.messages import ToolMessage
def tool_node(state: dict):
"""工具节点:执行工具调用"""
result = []
for tool_call in state["messages"][-1].tool_calls:
tool = tools_by_name[tool_call["name"]]
observation = tool.invoke(tool_call["args"])
result.append(ToolMessage(
content=observation,
tool_call_id=tool_call["id"]
))
return {"messages": result}
工具节点的实现模式,支持多工具并发调用
from typing import Literal
def should_continue(state: MessagesState) -> Literal["tool_node", END]:
"""决定是否继续循环"""
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
return "tool_node"
return END
# 使用条件边
agent_builder.add_conditional_edges(
"llm_call",
should_continue,
["tool_node", END]
)
条件边的标准实现,支持动态路由
根据性能和可靠性需求选择合适的持久化模式
# 持久化模式选择
result = graph.stream(
{"input": "计算 3+4"},
durability="sync" # 同步持久化
)
# 使用检查点
from langgraph.checkpoint.memory import MemoryCheckpointer
checkpointer = MemoryCheckpointer()
graph = graph.compile(checkpointer=checkpointer)
持久化模式的使用和配置
# 使用任务包装非确定性操作
from langgraph.prebuilt import InjectedState
def process_file_task(state: dict, file_path: str):
"""包装文件读写操作"""
with open(file_path, 'r') as f:
content = f.read()
return {"processed_content": content}
# 在节点中使用任务
def process_file_node(state: dict):
task = process_file_task.with_retry(state, "data.txt")
return task.invoke()
使用任务包装确保操作的可重放性
# 基础内存配置
from langgraph.checkpoint.memory import MemoryCheckpointer
from langgraph.graph import START, END
checkpointer = MemoryCheckpointer()
graph = graph.compile(checkpointer=checkpointer)
# 带线程 ID 的内存
thread_config = {"thread_id": "conversation-123"}
result = graph.invoke(
{"messages": ["Hello"]},
config=thread_config
)
内存系统的配置和使用模式
# 配置中断
from langgraph.checkpoint.memory import MemoryCheckpointer
checkpointer = MemoryCheckpointer()
# 编译时配置中断
graph = graph.compile(
checkpointer=checkpointer,
interrupt_before=["human_review"],
interrupt_after=["llm_call"]
)
# 运行时中断
config = {"thread_id": "conversation-123"}
result = graph.invoke(
{"messages": ["Review this document"]},
config=config
)
中断机制的使用和配置
# 设置 LangSmith 跟踪
import os
os.environ["LANGCHAIN_TRACING"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
# 配置跟踪选项
from langgraph.prebuilt import inject_trace
# 编译时注入跟踪
graph = graph.compile(inject_trace=True)
LangSmith 的配置和使用
# 函数式 API 使用
from langgraph.graph import MessagesState
from langchain_core.messages import AIMessage, HumanMessage
def agent_with_memory(state: MessagesState):
"""带内存的智能体函数式实现"""
messages = state["messages"]
response = model.invoke(messages)
return {"messages": messages + [response]}
# 使用函数式 API
graph = agent_with_memory.compile()
函数式 API 的简洁实现方式
# 子图实现
subgraph = StateGraph(SubAgentState)
subgraph.add_node("search", search_agent)
subgraph.add_node("analyze", analyze_agent)
subgraph.add_edge(START, "search")
subgraph.add_edge("search", "analyze")
subgraph.add_edge("analyze", END)
compiled_subgraph = subgraph.compile()
# 在主图中使用子图
main_graph.add_node("research", compiled_subgraph)
子图的创建和集成方式
# 自定义工具定义
from langchain.tools import tool
tool = tool
def search_web(query: str) -> str:
"""搜索互联网获取信息"""
# 实现搜索逻辑
return search_results
def calculate(expression: str) -> float:
"""计算数学表达式"""
# 实现计算逻辑
return result
# 工具绑定
model_with_tools = model.bind_tools([search_web, calculate])
自定义工具的创建和绑定方式
# 节点重试配置
def llm_with_retry(state: dict):
"""带重试的 LLM 节点"""
try:
response = model_with_tools.invoke(state["messages"])
return {"messages": [response]}
except Exception as e:
# 记录错误信息
error_msg = f"LLM 调用失败: {str(e)}"
return {"messages": [AIMessage(content=error_msg)]}
# 使用 retry 包装器
from langgraph.prebuilt import retry
llm_node = retry(llm_with_retry, max_attempts=3)
节点的重试机制配置
# 并行执行配置
from langgraph.graph import START, END, StateGraph
from langgraph.prebuilt import inject_parallel
def parallel_processor(state: dict):
"""并行处理多个任务"""
# 使用异步并行处理
tasks = [
process_data_async(state["data1"]),
process_data_async(state["data2"]),
process_data_async(state["data3"])
]
results = asyncio.gather(*tasks)
return {"processed_data": results}
# 编译时配置并行
graph = graph.compile(inject_parallel=True)
并行处理的配置和使用
基于实际项目经验的开发建议
LangGraph 重新定义了智能体开发和部署的标准
感谢阅读!
访问 https://atcfu.com/ai-articles/langgraph-stateful-agents/ 回顾本文