源码级别解析 · 源码架构与实战应用 · 2026
2026-04-26 | 每日技术深度解读
Trusted by Klarna, Replit, Elastic等公司
模块化设计,支持独立使用和集成
from langgraph.graph import Graph, END
from langgraph.prebuilt import ToolExecutor
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
# 创建LLM
llm = ChatOpenAI(model="gpt-4")
# 定义图结构
def agent_node(state, llm):
response = llm.invoke([HumanMessage(content=state["messages"][-1].content)])
return {"messages": [response]}
def should_continue(state):
if "final" in state["messages"][-1].content.lower():
return END
return "agent"
# 构建图
graph = Graph()
graph.add_node("agent", agent_node)
graph.add_edge("agent", should_continue)
graph.set_entry_point("agent")
# 编译图
app = graph.compile()
# 运行
result = app.invoke({"messages": [HumanMessage(content="Hello!")]})
print(result["messages"][-1].content)
基本的LangGraph应用结构
from typing import Dict, List, TypedDict
from langgraph.graph import StateGraph
class AgentState(TypedDict):
messages: List[str]
tool_calls: List[Dict]
step_count: int
is_complete: bool
def update_state(state: AgentState, new_message: str) -> AgentState:
return {
**state,
"messages": [...state["messages"], new_message],
"step_count": state["step_count"] + 1
}
# 初始化状态
initial_state = AgentState(
messages=["Hello, world!"],
tool_calls=[],
step_count=0,
is_complete=False
)
使用TypedDict定义状态结构
from langgraph.graph import StateGraph, END
from langchain_core.tools import tool
# 定义工具
@tool
def calculator(expression: str) -> float:
"""Simple calculator function"""
return eval(expression)
# LLM节点
def llm_node(state):
prompt = f"Current state: {state}\n\nPlease respond:"
response = llm.invoke(prompt)
return {"response": response.content}
# 工具节点
def tool_node(state):
tool_result = calculator.invoke(state["expression"])
return {"calculation_result": tool_result}
# 条件节点
def should_use_tool(state):
return "calculate" in state["message"].lower()
不同类型节点的实现方式
支持复杂的执行流程控制
from langgraph.graph import StateGraph, END
def conditional_edge(state):
"""决定下一步执行路径"""
if state["response_type"] == "tool_call":
return "tool_node"
elif state["response_type"] == "llm_response":
return "llm_node"
else:
return END
def loop_condition(state):
"""循环控制条件"""
return state["attempt_count"] < 3 and not state["success"]
# 添加条件边
graph.add_conditional_edges(
"decision_node",
conditional_edge,
{
"tool_node": "tool_node",
"llm_node": "llm_node",
END: END
}
)
# 添加循环边
graph.add_edge("retry_node", "decision_node")
条件边和循环边的实现
from langgraph.checkpoint.memory import MemoryCheckpointSaver
from langgraph.checkpoint.sqlite import SqliteCheckpointSaver
def create_checkpoint(checkpoint_type="memory"):
"""创建检查点管理器"""
if checkpoint_type == "memory":
return MemoryCheckpointSaver()
elif checkpoint_type == "sqlite":
return SqliteCheckpointSaver("./langgraph_checkpoints.db")
elif checkpoint_type == "postgres":
from langgraph.checkpoint.postgres import PostgresCheckpointSaver
return PostgresCheckpointSaver(
connection_string="postgresql://user:pass@localhost/db"
)
else:
raise ValueError("Unsupported checkpoint type")
# 配置检查点
checkpoint = create_checkpoint("sqlite")
app = graph.compile(checkpoint=checkpoint)
支持多种持久化后端
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolExecutor
class ApprovalRequired(Exception):
"""人工审批异常"""
pass
def human_intervention_node(state):
"""需要人工干预的节点"""
# 这里可以集成UI或通知系统
print(f"需要人工审批: {state}")
# 模拟人工审批过程
# 实际应用中可能通过Web界面、Slack通知等方式
user_approval = input("请批准此操作 (y/n): ")
if user_approval.lower() == 'y':
return {**state, "approved": True}
else:
return {**state, "approved": False, "rejection_reason": "User denied"}
def should_require_approval(state):
"""判断是否需要人工审批"""
return state.get("sensitive_operation", False)
人工干预节点的实现模式
分层记忆系统的设计理念
from langgraph.memory import ConversationBufferMemory
from langgraph.graph import StateGraph
from typing import Dict, List
import datetime
class EnhancedMemory:
"""增强的记忆管理器"""
def __init__(self):
self.working_memory = []
self.long_term_memory = {}
self.memory_index = {}
def add_working_memory(self, message: str, metadata: Dict = None):
"""添加短期记忆"""
entry = {
"content": message,
"timestamp": datetime.datetime.now(),
"metadata": metadata or {}
}
self.working_memory.append(entry)
def add_long_term_memory(self, key: str, value: any, tags: List[str] = None):
"""添加长期记忆"""
self.long_term_memory[key] = {
"value": value,
"created_at": datetime.datetime.now(),
"updated_at": datetime.datetime.now(),
"tags": tags or []
}
self._update_memory_index(key, tags)
def retrieve_memory(self, query: str, memory_type: str = "all"):
"""记忆检索"""
# 实现基于查询的记忆检索逻辑
# 可以使用向量搜索、关键词匹配等
pass
自定义记忆管理器的实现
from langgraph.prebuilt import ToolExecutor
from langchain_core.tools import tool
from pydantic import BaseModel, Field
class ToolInput(BaseModel):
"""工具输入模型"""
query: str = Field(description="用户查询")
context: str = Field(description="上下文信息")
@tool("search_engine")
def search_web(query: str, context: str = "") -> str:
"""搜索引擎工具
Args:
query: 搜索查询
context: 搜索上下文
Returns:
搜索结果
"""
# 实现搜索引擎调用
return f"Search results for: {query}"
@tool("data_analyzer")
def analyze_data(data: str, analysis_type: str = "summary") -> str:
"""数据分析工具
Args:
data: 要分析的数据
analysis_type: 分析类型
Returns:
分析结果
"""
# 实现数据分析逻辑
return f"Analysis result: {analysis_type}"
# 工具列表
tools = [search_web, analyze_data]
tool_executor = ToolExecutor(tools)
使用LangChain工具规范
from langgraph.graph import StateGraph
from typing import AsyncIterator
import asyncio
class StreamingAgent:
"""支持流式输出的智能体"""
def __init__(self, graph: StateGraph):
self.graph = graph
async def stream_response(
self,
initial_state: Dict,
stream_mode: str = "values"
) -> AsyncIterator[Dict]:
"""流式输出响应"""
async for chunk in self.graph.astream(
initial_state,
stream_mode=stream_mode
):
yield chunk
async def stream_with_progress(
self,
initial_state: Dict
) -> AsyncIterator[Dict]:
"""带进度条的流式输出"""
step_count = 0
async for chunk in self.stream_response(initial_state):
step_count += 1
progress = {
"step": step_count,
"current_node": list(chunk.keys())[0],
"data": chunk[list(chunk.keys())[0]]
}
yield progress
流式输出的实现模式
import logging
from typing import Optional
from langgraph.graph import StateGraph, END
from langgraph.errors import GraphInterrupt
class ErrorHandler:
"""错误处理器"""
def __init__(self):
self.logger = logging.getLogger("langgraph_error_handler")
def handle_node_error(self, error: Exception, state: Dict) -> Dict:
"""处理节点执行错误"""
self.logger.error(f"Node execution error: {error}")
if isinstance(error, GraphInterrupt):
# 处理中断错误
return {**state, "interrupted": True, "error_message": str(error)}
elif isinstance(error, (ValueError, TypeError)):
# 处理数据错误
return {**state, "data_error": True, "error_details": str(error)}
else:
# 处理其他错误
return {**state, "critical_error": True, "error_message": str(error)}
class RetryManager:
"""重试管理器"""
def __init__(self, max_retries: int = 3, backoff_factor: float = 2.0):
self.max_retries = max_retries
self.backoff_factor = backoff_factor
async def execute_with_retry(self, func, *args, **kwargs):
"""带重试的执行"""
import time
for attempt in range(self.max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == self.max_retries - 1:
raise
wait_time = self.backoff_factor ** attempt
time.sleep(wait_time)
continue
错误处理和重试机制的实现
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langgraph.graph import StateGraph, END
# LangChain组件
llm = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template(
"You are a helpful assistant. {input}"
)
output_parser = StrOutputParser()
# 创建LangChain链
langchain_chain = prompt | llm | output_parser
def langchain_node(state):
"""使用LangChain组件的节点"""
result = langchain_chain.invoke({"input": state["messages"][-1].content})
return {"response": result}
# 集成到LangGraph
graph = StateGraph()
graph.add_node("langchain", langchain_node)
graph.set_entry_point("langchain")
# 编译应用
app = graph.compile()
LangChain组件的集成方式
支持模块化的子图设计
from langgraph.graph import StateGraph, END
from typing import Dict
class SubgraphA:
"""子图A:LLM处理"""
def __init__(self):
self.graph = StateGraph()
self._setup_graph()
def _setup_graph(self):
self.graph.add_node("llm", self.llm_node)
self.graph.set_entry_point("llm")
self.graph.add_edge("llm", END)
def llm_node(self, state: Dict) -> Dict:
# LLM处理逻辑
return {"llm_response": "LLM processed"}
def compile(self):
return self.graph.compile()
class SubgraphB:
"""子图B:工具调用"""
def __init__(self):
self.graph = StateGraph()
self._setup_graph()
def _setup_graph(self):
self.graph.add_node("tool", self.tool_node)
self.graph.set_entry_point("tool")
self.graph.add_edge("tool", END)
def tool_node(self, state: Dict) -> Dict:
# 工具调用逻辑
return {"tool_result": "Tool executed"}
def compile(self):
return self.graph.compile()
# 使用子图
subgraph_a = SubgraphA().compile()
subgraph_b = SubgraphB().compile()
def main_graph_node(state: Dict) -> Dict:
# 调用子图
result_a = subgraph_a.invoke(state)
result_b = subgraph_b.invoke(result_a)
return result_b
子图的创建和集成模式
from langgraph.graph import StateGraph, END
import asyncio
from typing import List, Dict
class ParallelProcessor:
"""并行处理器"""
def __init__(self, graph: StateGraph):
self.graph = graph
async def parallel_execute(self, states: List[Dict]) -> List[Dict]:
"""并行执行多个状态"""
tasks = []
for state in states:
task = self.graph.aainvoke(state)
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
def batch_process(self, states: List[Dict], batch_size: int = 5) -> List[Dict]:
"""批处理执行"""
results = []
for i in range(0, len(states), batch_size):
batch = states[i:i + batch_size]
batch_results = asyncio.run(self.parallel_execute(batch))
results.extend(batch_results)
return results
class CacheManager:
"""缓存管理器"""
def __init__(self):
self.cache = {}
self.cache_ttl = 3600 # 1小时缓存
def get_cached_result(self, key: str) -> Optional[Dict]:
"""获取缓存结果"""
if key in self.cache:
result, timestamp = self.cache[key]
import time
if time.time() - timestamp < self.cache_ttl:
return result
else:
del self.cache[key]
return None
def cache_result(self, key: str, result: Dict):
"""缓存结果"""
import time
self.cache[key] = (result, time.time())
性能优化的实现
from langgraph.graph import StateGraph
from typing import Dict, Any
import logging
from datetime import datetime
class MonitoringAgent:
"""监控代理"""
def __init__(self):
self.logger = logging.getLogger("langgraph_monitor")
self.metrics = {
"total_executions": 0,
"successful_executions": 0,
"failed_executions": 0,
"average_execution_time": 0.0
}
self.execution_times = []
def log_execution_start(self, state: Dict):
"""记录执行开始"""
self.logger.info(f"Execution started: {datetime.now()}")
self.logger.info(f"Initial state: {state}")
return datetime.now()
def log_execution_end(self, start_time: datetime, result: Dict, error: Exception = None):
"""记录执行结束"""
execution_time = (datetime.now() - start_time).total_seconds()
self.execution_times.append(execution_time)
self.metrics["total_executions"] += 1
self.metrics["average_execution_time"] = sum(self.execution_times) / len(self.execution_times)
if error:
self.metrics["failed_executions"] += 1
self.logger.error(f"Execution failed: {error}")
else:
self.metrics["successful_executions"] += 1
self.logger.info(f"Execution completed: {execution_time:.2f}s")
def get_metrics(self) -> Dict[str, Any]:
"""获取监控指标"""
return self.metrics.copy()
# 配置LangSmith
import os
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "langgraph-monitoring"
监控和日志配置
# Dockerfile for LangGraph deployment
FROM python:3.11-slim
# 设置工作目录
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["python", "app.py"]
容器化部署基础配置
import pytest
from langgraph.graph import StateGraph, END
from unittest.mock import Mock, patch
class TestLangGraphAgent:
"""LangGraph代理测试类"""
@pytest.fixture
def simple_graph(self):
"""创建简单测试图"""
graph = StateGraph()
def test_node(state):
return {"response": "test_result"}
graph.add_node("test", test_node)
graph.set_entry_point("test")
graph.add_edge("test", END)
return graph.compile()
def test_node_execution(self, simple_graph):
"""测试节点执行"""
initial_state = {"messages": ["Hello"]}
result = simple_graph.invoke(initial_state)
assert "response" in result
assert result["response"] == "test_result"
def test_error_handling(self, simple_graph):
"""测试错误处理"""
def error_node(state):
raise ValueError("Test error")
error_graph = StateGraph()
error_graph.add_node("error", error_node)
error_graph.set_entry_point("error")
error_graph.add_edge("error", END)
# 验证错误被正确捕获
with pytest.raises(ValueError):
error_graph.compile().invoke({"messages": ["test"]})
测试框架的使用
class CustomerServiceAgent:
"""客户服务智能体"""
def __init__(self):
self.graph = self._create_graph()
def _create_graph(self):
"""创建客服代理图"""
graph = StateGraph()
# 添加节点
graph.add_node("greeting", self.greeting_node)
graph.add_node("intent_classification", self.intent_node)
graph.add_node("response", self.response_node)
graph.add_node("escalation", self.escalation_node)
# 添加边
graph.set_entry_point("greeting")
graph.add_edge("greeting", "intent_classification")
graph.add_conditional_edges(
"intent_classification",
self.should_escalate,
{
"response": "response",
"escalation": "escalation"
}
)
graph.add_edge("response", END)
graph.add_edge("escalation", END)
return graph.compile()
def greeting_node(self, state):
"""问候节点"""
return {"response": "您好!我是智能客服,很高兴为您服务。"}
def intent_node(self, state):
"""意图识别节点"""
# 实现意图识别逻辑
intent = "general_inquiry" # 示例意图
return {"intent": intent}
def should_escalate(self, state):
"""判断是否需要升级"""
return "escalate" if state["intent"] == "complex_issue" else "response"
客服代理的实现架构
感谢阅读!
访问 https://atcfu.com/ai-articles/langgraph-agents/ 回顾本文