源码级别解析 · 源码解析 · LangChain 团队开发
2026-06-02 | 每日技术深度解读
由 LangChain 团队开发的弹性智能体构建框架
LangGraph 提供了智能体构建的根本性改进
这些特性使 LangGraph 适合生产环境部署
LangGraph 采用分层架构,分离构建和执行阶段
StateGraph 是 LangGraph 的主要入口点
from langgraph.graph import StateGraph, MessagesState, START, END
from typing import Annotated, List, TypedDict
# 定义状态模式
class AgentState(TypedDict):
messages: Annotated[List[dict], operator.add]
user_input: str
response: str
step_count: int
# 创建图构建器
graph_builder = StateGraph(AgentState)
# 添加节点
graph_builder.add_node("process_input", process_input)
graph_builder.add_node("generate_response", generate_response)
graph_builder.add_node("validate_output", validate_output)
# 定义边
graph_builder.add_edge(START, "process_input")
graph_builder.add_edge("process_input", "generate_response")
graph_builder.add_edge("generate_response", "validate_output")
graph_builder.add_edge("validate_output", END)
StateGraph 使用类型化的状态定义和声明式的图构建
状态是 LangGraph 的核心概念,管理所有计算节点的数据流
from typing import Annotated, List
from langgraph.graph import operator
# 消息追加的 Reducer
@operator.add
def append_messages(left: List[dict], right: List[dict]) -> List[dict]:
"""追加消息而不是覆盖"""
return left + right
# 计数器 Reducer
@operator.add
def increment_count(left: int, right: int) -> int:
"""递增计数器"""
return left + right
# 应用状态定义
class ChatState(TypedDict):
messages: Annotated[List[dict], append_messages]
step_count: Annotated[int, increment_count]
Reducer 函数决定了多个写操作如何合并
节点是图中的计算单元,执行具体的业务逻辑
from typing import Dict, Any
def process_input(state: AgentState) -> Dict[str, Any]:
"""处理用户输入的节点"""
user_input = state["messages"][-1]["content"]
# 文本预处理
processed_text = preprocess_text(user_input)
return {
"user_input": processed_text,
"step_count": state.get("step_count", 0) + 1
}
async def generate_response(state: AgentState) -> Dict[str, Any]:
"""生成响应的异步节点"""
# 构建提示
prompt = build_prompt(state)
# 调用 LLM
response = await llm.generate(prompt)
return {
"response": response.content,
"step_count": state.get("step_count", 0) + 1
}
def validate_output(state: AgentState) -> Dict[str, Any]:
"""验证输出的节点"""
response = state["response"]
if is_valid_response(response):
return {"response": response}
else:
return {"response": "处理失败,请重试"}
节点函数可以是同步或异步的,返回部分状态更新
边定义了节点之间的控制流和数据流
from langgraph.graph import Edge
# 普通边 - 直接跳转
graph_builder.add_edge("node1", "node2")
# 条件边 - 基于条件选择下一个节点
def should_continue(state: AgentState) -> str:
"""根据状态决定下一个节点"""
if state.get("is_complete", False):
return "end"
else:
return "continue"
graph_builder.add_conditional_edges(
"validate",
should_continue,
{
"end": END,
"continue": "retry"
}
)
# 循环边 - 实现迭代
graph_builder.add_edge("retry", "process_input")
条件边实现了复杂的控制逻辑,支持分支和循环
编译过程将声明式的图定义转换为可执行的运行时图
# 编译图
compiled_graph = graph_builder.compile(
# 检查点配置
checkpointer=MemorySaver(),
# 输出模式
interrupt_before=["validation"],
# 输出通道
input_keys=["messages"],
output_keys=["response", "step_count"]
)
# 编译后的图支持多种调用方式
# 同步调用
result = compiled_graph.invoke({"messages": []})
# 异步调用
async_result = await compiled_graph.ainvoke({"messages": []})
# 流式调用
for chunk in compiled_graph.stream({"messages": []}):
print(chunk)
编译过程配置了持久化、中断和流式输出等特性
Pregel 是 LangGraph 的执行引擎,实现了 Google Pregel 论文的思想
Pregel 通过超步循环实现确定性并发执行
每个超步遵循 Plan-Execute-Update 循环模式
class PregelEngine:
def __init__(self, compiled_graph: CompiledGraph):
self.graph = compiled_graph
self.checkpointer = compiled_graph.checkpointer
self.step = 0
async def execute_step(self, state: dict) -> dict:
"""执行单个超步"""
# Plan 阶段:计算执行计划
execution_plan = self.plan_execution(state)
# Execute 阶段:并发执行节点
node_results = await self.execute_nodes(execution_plan, state)
# Update 阶段:合并状态更新
new_state = self.merge_updates(state, node_results)
# Checkpoint 阶段:保存状态
await self.checkpointer.save_checkpoint(self.step, new_state)
self.step += 1
return new_state
Pregel 引擎通过超步循环实现确定性的并发执行
通道管理系统内节点间的数据流和状态传递
from langgraph.channels import BaseChannel
# LastValue 通道 - 只保留最新值
class LastValueChannel(BaseChannel):
def __init__(self, value):
self.value = value
def update(self, new_value):
self.value = new_value
def get(self):
return self.value
# BinaryOperatorAggregate 通道 - 累积值
class BinaryOperatorChannel(BaseChannel):
def __init__(self, initial, operator):
self.value = initial
self.operator = operator
def update(self, new_value):
self.value = self.operator(self.value, new_value)
def get(self):
return self.value
# Topic 通道 - 发布订阅模式
class TopicChannel(BaseChannel):
def __init__(self):
self.subscribers = []
def subscribe(self, callback):
self.subscribers.append(callback)
def publish(self, value):
for callback in self.subscribers:
callback(value)
不同类型的通道支持不同的状态更新语义
检查点系统提供持久化的状态管理,支持故障恢复
from langgraph.checkpoint import BaseCheckpointSaver
import json
from datetime import datetime
class JSONCheckpointSaver(BaseCheckpointSaver):
def __init__(self, filepath: str):
self.filepath = filepath
async def save_checkpoint(self, config, checkpoint):
"""保存检查点"""
checkpoint_data = {
"timestamp": datetime.now().isoformat(),
"config": config,
"checkpoint": checkpoint,
"version": "1.0"
}
with open(self.filepath, 'w') as f:
json.dump(checkpoint_data, f, indent=2)
async def load_checkpoint(self, config):
"""加载检查点"""
try:
with open(self.filepath, 'r') as f:
data = json.load(f)
return data["checkpoint"]
except FileNotFoundError:
return None
async def list_checkpoints(self, config):
"""列出检查点"""
# 实现检查点列表逻辑
pass
检查点系统支持多种存储后端和恢复策略
Store 提供长期的键值存储,支持跨会话的数据持久化
from langgraph.store import BaseStore
import sqlite3
from typing import Any, Optional
class SQLiteStore(BaseStore):
def __init__(self, db_path: str):
self.db_path = db_path
self._init_db()
def _init_db(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS store (
key TEXT PRIMARY KEY,
value TEXT,
timestamp REAL
)
''')
conn.commit()
conn.close()
async def get(self, key: str) -> Any:
"""获取存储值"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT value FROM store WHERE key = ?", (key,))
result = cursor.fetchone()
conn.close()
return json.loads(result[0]) if result else None
async def set(self, key: str, value: Any) -> None:
"""设置存储值"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO store VALUES (?, ?, ?)",
(key, json.dumps(value), time.time())
)
conn.commit()
conn.close()
Store 接口支持多种存储后端和缓存策略
缓存系统优化重复计算,提高执行效率
from langgraph.cache import BaseCache
from functools import wraps
import hashlib
cclass MemoryCache(BaseCache):
def __init__(self, max_size: int = 1000):
self.cache = {}
self.max_size = max_size
def _make_key(self, func, args, kwargs):
"""生成缓存键"""
key_data = {
"func": func.__name__,
"args": args,
"kwargs": kwargs
}
return hashlib.md5(str(key_data).encode()).hexdigest()
def get(self, key: str) -> Any:
"""获取缓存值"""
return self.cache.get(key)
def set(self, key: str, value: Any) -> None:
"""设置缓存值"""
if len(self.cache) >= self.max_size:
# 简单的 LRU 淘汰策略
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
self.cache[key] = value
def clear(self) -> None:
"""清空缓存"""
self.cache.clear()
# 装饰器使用缓存
@cached
def expensive_computation(data):
"""昂贵的计算函数"""
time.sleep(1) # 模拟计算
return result
缓存系统支持多种缓存策略和内存管理
流式输出提供实时反馈,支持长时间运行的任务
async def streaming_example():
# 流式调用
async for chunk in compiled_graph.astream(
{"messages": [{"role": "user", "content": "Hello"}]}
):
print(f"Chunk: {chunk}")
# 处理流式数据
for node, node_output in chunk.items():
if node == "llm_node":
# 增量处理 LLM 输出
process_incremental_output(node_output)
# 带中断的流式处理
async def streaming_with_interrupt():
interrupt_graph = compiled_graph.interrupt_before(["validation"])
async for chunk in interrupt_graph.astream(
{"messages": [{"role": "user", "content": "Hello"}]}
):
print(f"Chunk: {chunk}")
# 可以在这里人工干预
if "validation" in chunk:
# 人工检查和修改
await human_intervention(chunk["validation"])
流式输出支持多种流式模式和人工干预
中断机制支持人工检查和修改,实现人机协作
from langgraph.graph import Interrupt
# 配置中断点
interrupt_graph = compiled_graph.interrupt_before(["human_review"])
# 执行到中断点
config = {"thread_id": "conversation_123"}
result = interrupt_graph.invoke(
{"messages": [{"role": "user", "content": "请审核这个结果"}]},
config=config
)
# 人工审核阶段
print(f"需要人工审核的节点: {result}")
# 人工审核后的恢复
if is_approved_by_human(result):
# 恢复执行
final_result = interrupt_graph.invoke(
result,
config=config
)
else:
# 修改后恢复
modified_result = human_modify_result(result)
final_result = interrupt_graph.invoke(
modified_result,
config=config
)
中断机制在关键节点暂停执行,允许人工干预
LangGraph 支持复杂的多智能体协作模式
多智能体架构通过共享状态和协调机制实现复杂协作
# 定义智能体状态
class MultiAgentState(TypedDict):
task: str
subtasks: List[str]
results: Dict[str, Any]
current_agent: str
master_messages: List[dict]
subagent_messages: List[dict]
# 主智能体节点
def master_agent(state: MultiAgentState):
"""主智能体:任务分解和协调"""
task = state["task"]
# 任务分解
subtasks = decompose_task(task)
return {
"subtasks": subtasks,
"current_agent": "subagent_1",
"results": {}
}
# 子智能体节点
def subagent_1(state: MultiAgentState):
"""子智能体1:执行专门任务"""
task = state["subtasks"][0]
# 执行任务
result = execute_specialized_task(task)
return {
"results": {"subtask_1": result},
"current_agent": "master_agent"
}
# 结果聚合节点
def aggregate_results(state: MultiAgentState):
"""聚合所有子智能体的结果"""
results = state["results"]
# 综合结果
final_result = synthesize_results(results)
return {
"final_result": final_result,
"task_completed": True
}
多智能体通过状态共享和任务协调实现复杂协作
LangGraph 提供完整的错误处理和恢复机制
from langgraph.graph import GraphError
# 错误处理节点
def error_handler(state: AgentState, error: Exception):
"""错误处理节点"""
error_type = type(error).__name__
if error_type == "LLMError":
# LLM 错误处理
return handle_llm_error(state, error)
elif error_type == "ValidationError":
# 验证错误处理
return handle_validation_error(state, error)
else:
# 通用错误处理
return handle_general_error(state, error)
# 自动重试机制
def retry_node(node_func, max_retries=3):
"""重试装饰器"""
@wraps(node_func)
async def wrapper(state, *args, **kwargs):
last_error = None
for attempt in range(max_retries):
try:
return await node_func(state, *args, **kwargs)
except Exception as error:
last_error = error
if attempt < max_retries - 1:
# 指数退避等待
wait_time = 2 ** attempt
await asyncio.sleep(wait_time)
continue
# 重试失败后调用错误处理
return await error_handler(state, last_error)
return wrapper
错误处理支持多种错误类型和恢复策略
状态一致性保证是 LangGraph 的核心特性之一
class StateConsistencyManager:
def __init__(self):
self.locks = {}
self.version_counter = 0
async def update_state(self, state_id: str, update_func):
"""原子性状态更新"""
# 获取锁
lock = self.locks.get(state_id)
if lock is None:
lock = asyncio.Lock()
self.locks[state_id] = lock
async with lock:
# 创建状态快照
snapshot = copy.deepcopy(state)
try:
# 应用更新
result = await update_func(snapshot)
# 更新版本号
self.version_counter += 1
result["_version"] = self.version_counter
return result
except Exception as error:
# 回滚到快照
return snapshot
def get_state_version(self, state_id: str, version: int = None):
"""获取特定版本的状态"""
# 实现版本状态检索
pass
一致性管理器提供原子性操作和版本控制
多种优化技术确保 LangGraph 的高性能执行
import asyncio
from concurrent.futures import ThreadPoolExecutor
class ParallelExecutor:
def __init__(self, max_workers: int = 4):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
async def execute_parallel(self, nodes, state):
"""并行执行多个节点"""
tasks = []
for node_name, node_func in nodes.items():
task = asyncio.create_task(
self._run_node(node_func, state)
)
tasks.append((node_name, task))
# 等待所有任务完成
results = {}
for node_name, task in tasks:
try:
result = await task
results[node_name] = result
except Exception as error:
results[node_name] = error
return results
async def _run_node(self, node_func, state):
"""在单独的线程中运行节点"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
lambda: node_func(state)
)
class IncrementalUpdater:
def __init__(self):
self.cache = {}
def apply_incremental_update(self, state: dict, update: dict):
"""应用增量更新"""
state_key = id(state)
if state_key not in self.cache:
self.cache[state_key] = copy.deepcopy(state)
base_state = self.cache[state_key]
# 应用增量更新
self._merge_updates(base_state, update)
return base_state
并行执行和增量更新显著提升性能
内存管理确保长时间运行的任务不会内存泄漏
import gc
import weakref
from typing import Dict, Any
class MemoryManager:
def __init__(self, max_memory_mb: int = 1024):
self.max_memory = max_memory_mb * 1024 * 1024
self.current_memory = 0
self.object_pools = {}
self.weak_refs = {}
def allocate_object(self, obj_type: str, *args, **kwargs):
"""分配对象到对象池"""
if obj_type not in self.object_pools:
self.object_pools[obj_type] = weakref.WeakValueDictionary()
# 检查内存限制
estimated_size = self._estimate_size(*args, **kwargs)
if self.current_memory + estimated_size > self.max_memory:
self._cleanup_memory()
# 创建对象
obj = obj_type(*args, **kwargs)
obj_id = id(obj)
# 添加到对象池
self.object_pools[obj_type][obj_id] = obj
self.weak_refs[obj_id] = weakref.ref(obj)
self.current_memory += estimated_size
return obj
def _cleanup_memory(self):
"""清理内存"""
# 触发垃圾回收
gc.collect()
# 清理弱引用
dead_refs = []
for obj_id, ref in self.weak_refs.items():
if ref() is None:
dead_refs.append(obj_id)
for obj_id in dead_refs:
del self.weak_refs[obj_id]
def get_memory_usage(self) -> Dict[str, Any]:
"""获取内存使用情况"""
return {
"current_memory": self.current_memory,
"max_memory": self.max_memory,
"usage_percent": (self.current_memory / self.max_memory) * 100,
"object_count": len(self.weak_refs)
}
内存管理提供资源控制和自动清理机制
配置管理系统支持灵活的系统配置
import os
import yaml
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
class LangGraphConfig(BaseModel):
"""LangGraph 配置模型"""
# 执行配置
max_workers: int = Field(default=4, description="最大工作线程数")
timeout: int = Field(default=300, description="超时时间(秒)")
retry_attempts: int = Field(default=3, description="重试次数")
# 存储配置
checkpoint_path: str = Field(default="./checkpoints", description="检查点路径")
store_path: str = Field(default="./store", description="存储路径")
cache_size: int = Field(default=1000, description="缓存大小")
# 网络配置
api_timeout: int = Field(default=30, description="API 超时时间")
max_retries: int = Field(default=3, description="API 最大重试次数")
# 日志配置
log_level: str = Field(default="INFO", description="日志级别")
log_file: Optional[str] = Field(default=None, description="日志文件路径")
class ConfigManager:
def __init__(self, config_path: str = "config.yaml"):
self.config_path = config_path
self.config = self._load_config()
def _load_config(self) -> LangGraphConfig:
"""加载配置"""
# 优先从环境变量加载
env_config = self._load_from_env()
# 从配置文件加载
file_config = self._load_from_file()
# 合并配置(环境变量优先)
merged_config = {**file_config, **env_config}
return LangGraphConfig(**merged_config)
def _load_from_env(self) -> Dict[str, Any]:
"""从环境变量加载配置"""
env_mapping = {
"LANGGRAPH_MAX_WORKERS": "max_workers",
"LANGGRAPH_TIMEOUT": "timeout",
"LANGGRAPH_CHECKPOINT_PATH": "checkpoint_path",
"LANGGRAPH_STORE_PATH": "store_path"
}
config = {}
for env_var, config_key in env_mapping.items():
if env_var in os.environ:
config[config_key] = int(os.environ[env_var])
return config
def _load_from_file(self) -> Dict[str, Any]:
"""从配置文件加载"""
try:
with open(self.config_path, 'r') as f:
return yaml.safe_load(f) or {}
except FileNotFoundError:
return {}
配置管理支持多种配置源和动态更新
完整的监控和调试支持系统运行状态
import time
import logging
from typing import Dict, Any, List
from dataclasses import dataclass
from collections import deque
@dataclass
class ExecutionMetrics:
"""执行指标"""
execution_time: float
memory_usage: float
cpu_usage: float
node_count: int
error_count: int
timestamp: float
class MonitoringSystem:
def __init__(self):
self.logger = logging.getLogger("langgraph")
self.metrics_history = deque(maxlen=1000)
self.current_execution = None
def start_execution(self, graph_name: str, config: Dict[str, Any]):
"""开始执行监控"""
self.current_execution = {
"graph_name": graph_name,
"config": config,
"start_time": time.time(),
"metrics": [],
"errors": []
}
self.logger.info(f"开始执行图: {graph_name}")
def record_metrics(self, node_name: str, metrics: ExecutionMetrics):
"""记录指标"""
if self.current_execution:
self.current_execution["metrics"].append({
"node": node_name,
"metrics": metrics,
"timestamp": time.time()
})
# 更新历史记录
self.metrics_history.append({
"graph_name": self.current_execution["graph_name"],
"node": node_name,
"metrics": metrics
})
def record_error(self, node_name: str, error: Exception):
"""记录错误"""
if self.current_execution:
self.current_execution["errors"].append({
"node": node_name,
"error": str(error),
"error_type": type(error).__name__,
"timestamp": time.time()
})
self.logger.error(f"节点 {node_name} 错误: {error}")
def get_execution_summary(self) -> Dict[str, Any]:
"""获取执行摘要"""
if not self.current_execution:
return {}
execution_time = time.time() - self.current_execution["start_time"]
return {
"graph_name": self.current_execution["graph_name"],
"execution_time": execution_time,
"total_nodes": len(self.current_execution["metrics"]),
"error_count": len(self.current_execution["errors"]),
"metrics": self.current_execution["metrics"]
}
监控系统提供全面的执行追踪和性能指标
完善的测试策略确保系统稳定性
import pytest
import asyncio
from unittest.mock import Mock, patch
# 单元测试
class TestStateGraph:
def test_node_execution(self):
"""测试节点执行"""
def test_node(state):
return {"test_result": "success"}
state = {"messages": []}
result = test_node(state)
assert result["test_result"] == "success"
def test_state_update(self):
"""测试状态更新"""
state = {"counter": 0}
def increment_node(state):
return {"counter": state["counter"] + 1}
result = increment_node(state)
assert result["counter"] == 1
# 集成测试
class TestLangGraphIntegration:
@pytest.mark.asyncio
async def test_graph_execution(self):
"""测试完整图执行"""
# 构建测试图
graph_builder = StateGraph(TestState)
graph_builder.add_node("node1", node1)
graph_builder.add_node("node2", node2)
graph_builder.add_edge(START, "node1")
graph_builder.add_edge("node1", "node2")
graph_builder.add_edge("node2", END)
compiled_graph = graph_builder.compile()
# 执行图
result = await compiled_graph.ainvoke({"data": "test"})
assert "processed" in result
assert "transformed" in result
# 性能测试
class TestPerformance:
def test_concurrent_execution(self):
"""测试并发执行"""
import time
start_time = time.time()
# 模拟并发执行
results = []
for i in range(10):
result = test_node({"id": i})
results.append(result)
execution_time = time.time() - start_time
assert execution_time < 1.0 # 应该在1秒内完成
测试策略覆盖各种测试场景和性能指标
多种部署策略适应不同生产环境需求
FROM python:3.11-slim
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 设置环境变量
ENV LANGGRAPH_MAX_WORKERS=4
ENV LANGGRAPH_TIMEOUT=300
ENV LANGGRAPH_CHECKPOINT_PATH=/app/checkpoints
# 创建必要目录
RUN mkdir -p /app/checkpoints /app/store
# 暴露端口(如果需要 API)
EXPOSE 8000
# 启动命令
CMD ["python", "-m", "langgraph.server"]
Docker 容器化部署简化了环境配置
扩展性设计支持系统的水平扩展
from typing import Dict, Any
import redis
import asyncio
class DistributedGraphManager:
def __init__(self, redis_host: str = "localhost", redis_port: int = 6379):
self.redis = redis.Redis(host=redis_host, port=redis_port)
self.worker_nodes = {}
async def distribute_graph(self, graph_config: Dict[str, Any]):
"""分布式分发图"""
# 选择工作节点
selected_nodes = self._select_worker_nodes(graph_config)
# 分发图配置
tasks = []
for node in selected_nodes:
task = asyncio.create_task(
self._deploy_to_node(node, graph_config)
)
tasks.append(task)
# 等待所有节点部署完成
await asyncio.gather(*tasks)
return selected_nodes
def _select_worker_nodes(self, graph_config: Dict[str, Any]) -> List[str]:
"""选择工作节点"""
# 基于图复杂度和节点负载选择
required_workers = self._calculate_required_workers(graph_config)
# 获取可用节点
available_nodes = self._get_available_nodes()
# 选择负载最低的节点
selected_nodes = sorted(
available_nodes,
key=lambda x: self._get_node_load(x)
)[:required_workers]
return selected_nodes
def _get_available_nodes(self) -> List[str]:
"""获取可用节点列表"""
# 从 Redis 获取注册的节点
nodes = self.redis.smembers("langgraph:workers")
return [node.decode() for node in nodes]
def _get_node_load(self, node_id: str) -> float:
"""获取节点负载"""
# 从 Redis 获取节点负载信息
load_data = self.redis.hget(f"langgraph:worker:{node_id}", "load")
return float(load_data) if load_data else 0.0
分布式扩展支持大规模部署和负载均衡
安全设计确保系统运行的安全性
from cryptography.fernet import Fernet
from typing import Dict, Any
import hashlib
class SecurityManager:
def __init__(self):
self.cipher = Fernet(Fernet.generate_key())
self.audit_logger = logging.getLogger("audit")
def encrypt_state(self, state: Dict[str, Any]) -> bytes:
"""加密状态数据"""
json_data = json.dumps(state).encode()
return self.cipher.encrypt(json_data)
def decrypt_state(self, encrypted_data: bytes) -> Dict[str, Any]:
"""解密状态数据"""
json_data = self.cipher.decrypt(encrypted_data)
return json.loads(json_data.decode())
def hash_sensitive_data(self, data: str) -> str:
"""哈希敏感数据"""
return hashlib.sha256(data.encode()).hexdigest()
def log_access(self, user_id: str, action: str, resource: str):
"""记录访问日志"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"action": action,
"resource": resource,
"ip_address": self._get_client_ip()
}
self.audit_logger.info(json.dumps(log_entry))
def _get_client_ip(self) -> str:
"""获取客户端 IP"""
# 实现获取客户端 IP 的逻辑
pass
安全管理系统提供完整的安全保障
LangGraph 在多个领域都有广泛应用
# 客服机器人状态
class CustomerServiceState(TypedDict):
conversation_id: str
messages: List[dict]
customer_info: Dict[str, Any]
issue_summary: str
resolution_steps: List[str]
current_step: int
resolved: bool
class CustomerServiceAgent:
def __init__(self):
self.graph = self._build_graph()
def _build_graph(self):
"""构建客服机器人图"""
graph_builder = StateGraph(CustomerServiceState)
# 添加节点
graph_builder.add_node("greet", self._greet_customer)
graph_builder.add_node("gather_info", self._gather_customer_info)
graph_builder.add_node("analyze_issue", self._analyze_issue)
graph_builder.add_node("provide_solution", self._provide_solution)
graph_builder.add_node("resolve_issue", self._resolve_issue)
# 添加边
graph_builder.add_edge(START, "greet")
graph_builder.add_edge("greet", "gather_info")
graph_builder.add_edge("gather_info", "analyze_issue")
graph_builder.add_edge("analyze_issue", "provide_solution")
graph_builder.add_edge("provide_solution", "resolve_issue")
graph_builder.add_edge("resolve_issue", END)
return graph_builder.compile(checkpointer=MemorySaver())
def _greet_customer(self, state: CustomerServiceState):
"""欢迎客户"""
return {
"messages": [{"role": "ai", "content": "您好!我是智能客服,很高兴为您服务。请告诉我您遇到了什么问题?"}]
}
def _gather_customer_info(self, state: CustomerServiceState):
"""收集客户信息"""
# 分析客户信息
customer_info = extract_customer_info(state["messages"])
return {
"customer_info": customer_info
}
def _analyze_issue(self, state: CustomerServiceState):
"""分析问题"""
issue = state["messages"][-1]["content"]
issue_summary = analyze_customer_issue(issue)
return {
"issue_summary": issue_summary,
"resolution_steps": generate_resolution_steps(issue_summary)
}
def _provide_solution(self, state: CustomerServiceState):
"""提供解决方案"""
solution = generate_solution(state["issue_summary"])
return {
"messages": [{"role": "ai", "content": solution}]
}
def _resolve_issue(self, state: CustomerServiceState):
"""解决问题"""
return {
"resolved": True,
"messages": [{"role": "ai", "content": "问题已解决,感谢您的耐心等待!"}]
}
客服机器人展示了 LangGraph 的实际应用能力
最佳实践指导开发者高效使用 LangGraph
常见问题帮助开发者快速定位和解决问题
LangGraph 持续演进,不断改进用户体验
感谢阅读!
访问 https://atcfu.com/ai-articles/langgraph-core/ 回顾本文