🕸️ LangGraph 智能体框架深度解析

构建有状态、持久化的AI代理系统

源码级别解析 · 源码架构与实战应用 · 2026
2026-04-26 | 每日技术深度解读

什么是LangGraph?

低级编排框架
  • 由LangChain开发的AI代理编排框架
  • 基于图结构的智能体工作流管理
  • 支持长时间运行、有状态的代理系统
  • 受Google Pregel和Apache Beam启发

Trusted by Klarna, Replit, Elastic等公司

核心特性概览

五大核心能力
  • 持久化执行 - 故障恢复和长时间运行
  • 人在回路 - 人工干预和状态修改
  • 综合记忆 - 工作记忆和长期持久记忆
  • LangSmith调试 - 可视化执行路径
  • 生产级部署 - 可扩展的stateful工作流

LangGraph vs 传统Agent

架构对比
  • 传统Agent:简单线性流程,无状态
  • LangGraph:图结构,有状态持久化
  • 传统Agent:缺乏错误恢复机制
  • LangGraph:自动恢复和检查点机制
  • 传统Agent:记忆能力有限
  • LangGraph:多层级记忆系统

LangGraph整体架构

┌─────────────────────────────────────────────┐ │ LangGraph 架构 │ ├─────────────────────────────────────────────┤ │ ┌─────────────┐ ┌─────────────┐ │ │ │ Agents │ │ LangChain │ │ │ │ Core │◄──►│ Integr │ │ │ └─────────────┘ └─────────────┘ │ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ Checkpoint │ │ LangSmith │ │ │ │ System │ │ Platform │ │ │ └─────────────┘ └─────────────┘ │ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ Memory │ │ Deep │ │ │ │ Management │ │ Agents │ │ │ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────┘

模块化设计,支持独立使用和集成

安装与快速开始

环境配置
  • pip install -U langgraph
  • 支持Python 3.8+
  • 与LangChain无缝集成
  • 独立的框架使用

第一个LangGraph应用

from langgraph.graph import Graph, END
from langgraph.prebuilt import ToolExecutor
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

# 创建LLM
llm = ChatOpenAI(model="gpt-4")

# 定义图结构
def agent_node(state, llm):
    response = llm.invoke([HumanMessage(content=state["messages"][-1].content)])
    return {"messages": [response]}

def should_continue(state):
    if "final" in state["messages"][-1].content.lower():
        return END
    return "agent"

# 构建图
graph = Graph()
graph.add_node("agent", agent_node)
graph.add_edge("agent", should_continue)
graph.set_entry_point("agent")

# 编译图
app = graph.compile()

# 运行
result = app.invoke({"messages": [HumanMessage(content="Hello!")]})
print(result["messages"][-1].content)

基本的LangGraph应用结构

核心概念:状态管理

有状态智能体
  • 状态是LangGraph的核心概念
  • 状态包含消息、工具调用、中间结果
  • 状态在整个工作流中传递和更新
  • 支持状态的持久化和恢复

状态定义与更新

from typing import Dict, List, TypedDict
from langgraph.graph import StateGraph

class AgentState(TypedDict):
    messages: List[str]
    tool_calls: List[Dict]
    step_count: int
    is_complete: bool

def update_state(state: AgentState, new_message: str) -> AgentState:
    return {
        **state,
        "messages": [...state["messages"], new_message],
        "step_count": state["step_count"] + 1
    }

# 初始化状态
initial_state = AgentState(
    messages=["Hello, world!"],
    tool_calls=[],
    step_count=0,
    is_complete=False
)

使用TypedDict定义状态结构

节点(Nodes)

工作流构建块
  • 节点是图中的基本执行单元
  • 每个节点接受状态,返回更新后的状态
  • 可以是LLM调用、工具执行、条件判断
  • 节点之间通过边连接

自定义节点实现

from langgraph.graph import StateGraph, END
from langchain_core.tools import tool

# 定义工具
@tool
def calculator(expression: str) -> float:
    """Simple calculator function"""
    return eval(expression)

# LLM节点
def llm_node(state):
    prompt = f"Current state: {state}\n\nPlease respond:"
    response = llm.invoke(prompt)
    return {"response": response.content}

# 工具节点
def tool_node(state):
    tool_result = calculator.invoke(state["expression"])
    return {"calculation_result": tool_result}

# 条件节点
def should_use_tool(state):
    return "calculate" in state["message"].lower()

不同类型节点的实现方式

边(Edges)

流程控制
  • 定义节点之间的执行流程
  • 条件边:基于条件决定下一步
  • 默认边:固定路径
  • 循环边:支持迭代和重试

边的类型与流程控制

┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ Entry Node │───►│ Decision │◄───│ Loop Node │ │ │ │ Node │ │ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ │ │ │ ┌─────────────────┐ │ └────────────►│ Tool Node │◄──────────────┘ │ │ └─────────────────┘ │ │ ▼ ▼ ┌─────────────────┐ │ END Node │ │ │ └─────────────────┘

支持复杂的执行流程控制

边与条件控制

from langgraph.graph import StateGraph, END

def conditional_edge(state):
    """决定下一步执行路径"""
    if state["response_type"] == "tool_call":
        return "tool_node"
    elif state["response_type"] == "llm_response":
        return "llm_node"
    else:
        return END

def loop_condition(state):
    """循环控制条件"""
    return state["attempt_count"] < 3 and not state["success"]

# 添加条件边
graph.add_conditional_edges(
    "decision_node",
    conditional_edge,
    {
        "tool_node": "tool_node",
        "llm_node": "llm_node",
        END: END
    }
)

# 添加循环边
graph.add_edge("retry_node", "decision_node")

条件边和循环边的实现

持久化执行机制

故障恢复与连续性
  • 自动保存状态到检查点
  • 故障后从检查点恢复
  • 长时间运行的任务支持
  • 分布式执行环境

检查点配置

from langgraph.checkpoint.memory import MemoryCheckpointSaver
from langgraph.checkpoint.sqlite import SqliteCheckpointSaver

def create_checkpoint(checkpoint_type="memory"):
    """创建检查点管理器"""
    if checkpoint_type == "memory":
        return MemoryCheckpointSaver()
    elif checkpoint_type == "sqlite":
        return SqliteCheckpointSaver("./langgraph_checkpoints.db")
    elif checkpoint_type == "postgres":
        from langgraph.checkpoint.postgres import PostgresCheckpointSaver
        return PostgresCheckpointSaver(
            connection_string="postgresql://user:pass@localhost/db"
        )
    else:
        raise ValueError("Unsupported checkpoint type")

# 配置检查点
checkpoint = create_checkpoint("sqlite")
app = graph.compile(checkpoint=checkpoint)

支持多种持久化后端

人在回路(Human-in-the-Loop)

人工干预机制
  • 执行过程中暂停和恢复
  • 人工审核和修改状态
  • 实时干预决策点
  • 质量控制与监督

人工干预实现

from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolExecutor

class ApprovalRequired(Exception):
    """人工审批异常"""
    pass

def human_intervention_node(state):
    """需要人工干预的节点"""
    # 这里可以集成UI或通知系统
    print(f"需要人工审批: {state}")
    
    # 模拟人工审批过程
    # 实际应用中可能通过Web界面、Slack通知等方式
    user_approval = input("请批准此操作 (y/n): ")
    
    if user_approval.lower() == 'y':
        return {**state, "approved": True}
    else:
        return {**state, "approved": False, "rejection_reason": "User denied"}

def should_require_approval(state):
    """判断是否需要人工审批"""
    return state.get("sensitive_operation", False)

人工干预节点的实现模式

记忆系统架构

多层级记忆管理
  • 短期工作记忆:当前对话上下文
  • 长期持久记忆:跨会话保持
  • 记忆检索与更新策略
  • 记忆版本控制

记忆系统架构

┌─────────────────────────────────────────────────┐ │ 记忆系统架构 │ ├─────────────────────────────────────────────────┤ │ ┌─────────────────┐ ┌─────────────────┐ │ │ │ Working │ │ Long-term │ │ │ │ Memory │ │ Memory │ │ │ │ │ │ │ │ │ │ • 当前对话 │ │ • 用户偏好 │ │ │ │ • 临时状态 │ │ • 历史记录 │ │ │ │ • 中间结果 │ │ • 上下文信息 │ │ │ └─────────────────┘ └─────────────────┘ │ │ │ │ │ │ ┌─────────────────────────────────────────┐ │ │ │ Memory Manager │ │ │ │ • 记忆检索 │ │ │ │ • 记忆更新 │ │ │ │ • 版本控制 │ │ │ │ • 冲突解决 │ │ │ └─────────────────────────────────────────┘ │ └─────────────────────────────────────────────────┘

分层记忆系统的设计理念

记忆管理实现

from langgraph.memory import ConversationBufferMemory
from langgraph.graph import StateGraph
from typing import Dict, List
import datetime

class EnhancedMemory:
    """增强的记忆管理器"""
    
    def __init__(self):
        self.working_memory = []
        self.long_term_memory = {}
        self.memory_index = {}
    
    def add_working_memory(self, message: str, metadata: Dict = None):
        """添加短期记忆"""
        entry = {
            "content": message,
            "timestamp": datetime.datetime.now(),
            "metadata": metadata or {}
        }
        self.working_memory.append(entry)
    
    def add_long_term_memory(self, key: str, value: any, tags: List[str] = None):
        """添加长期记忆"""
        self.long_term_memory[key] = {
            "value": value,
            "created_at": datetime.datetime.now(),
            "updated_at": datetime.datetime.now(),
            "tags": tags or []
        }
        self._update_memory_index(key, tags)
    
    def retrieve_memory(self, query: str, memory_type: str = "all"):
        """记忆检索"""
        # 实现基于查询的记忆检索逻辑
        # 可以使用向量搜索、关键词匹配等
        pass

自定义记忆管理器的实现

工具集成与扩展

强大的工具生态系统
  • 内置常用工具集
  • 自定义工具开发
  • 工具链编排
  • 工具性能监控

自定义工具开发

from langgraph.prebuilt import ToolExecutor
from langchain_core.tools import tool
from pydantic import BaseModel, Field

class ToolInput(BaseModel):
    """工具输入模型"""
    query: str = Field(description="用户查询")
    context: str = Field(description="上下文信息")

@tool("search_engine")
def search_web(query: str, context: str = "") -> str:
    """搜索引擎工具
    
    Args:
        query: 搜索查询
        context: 搜索上下文
        
    Returns:
        搜索结果
    """
    # 实现搜索引擎调用
    return f"Search results for: {query}"

@tool("data_analyzer")
def analyze_data(data: str, analysis_type: str = "summary") -> str:
    """数据分析工具
    
    Args:
        data: 要分析的数据
        analysis_type: 分析类型
        
    Returns:
        分析结果
    """
    # 实现数据分析逻辑
    return f"Analysis result: {analysis_type}"

# 工具列表
tools = [search_web, analyze_data]
tool_executor = ToolExecutor(tools)

使用LangChain工具规范

流式输出与实时交互

用户体验优化
  • 支持流式token输出
  • 实时显示执行进度
  • 增量结果更新
  • 取消和重试机制

流式输出实现

from langgraph.graph import StateGraph
from typing import AsyncIterator
import asyncio

class StreamingAgent:
    """支持流式输出的智能体"""
    
    def __init__(self, graph: StateGraph):
        self.graph = graph
    
    async def stream_response(
        self, 
        initial_state: Dict,
        stream_mode: str = "values"
    ) -> AsyncIterator[Dict]:
        """流式输出响应"""
        async for chunk in self.graph.astream(
            initial_state, 
            stream_mode=stream_mode
        ):
            yield chunk
    
    async def stream_with_progress(
        self, 
        initial_state: Dict
    ) -> AsyncIterator[Dict]:
        """带进度条的流式输出"""
        step_count = 0
        
        async for chunk in self.stream_response(initial_state):
            step_count += 1
            progress = {
                "step": step_count,
                "current_node": list(chunk.keys())[0],
                "data": chunk[list(chunk.keys())[0]]
            }
            yield progress

流式输出的实现模式

错误处理与恢复

鲁棒性设计
  • 多层级错误捕获
  • 自动重试机制
  • 优雅降级策略
  • 错误日志记录

错误处理实现

import logging
from typing import Optional
from langgraph.graph import StateGraph, END
from langgraph.errors import GraphInterrupt

class ErrorHandler:
    """错误处理器"""
    
    def __init__(self):
        self.logger = logging.getLogger("langgraph_error_handler")
    
    def handle_node_error(self, error: Exception, state: Dict) -> Dict:
        """处理节点执行错误"""
        self.logger.error(f"Node execution error: {error}")
        
        if isinstance(error, GraphInterrupt):
            # 处理中断错误
            return {**state, "interrupted": True, "error_message": str(error)}
        elif isinstance(error, (ValueError, TypeError)):
            # 处理数据错误
            return {**state, "data_error": True, "error_details": str(error)}
        else:
            # 处理其他错误
            return {**state, "critical_error": True, "error_message": str(error)}

class RetryManager:
    """重试管理器"""
    
    def __init__(self, max_retries: int = 3, backoff_factor: float = 2.0):
        self.max_retries = max_retries
        self.backoff_factor = backoff_factor
    
    async def execute_with_retry(self, func, *args, **kwargs):
        """带重试的执行"""
        import time
        
        for attempt in range(self.max_retries):
            try:
                return await func(*args, **kwargs)
            except Exception as e:
                if attempt == self.max_retries - 1:
                    raise
                
                wait_time = self.backoff_factor ** attempt
                time.sleep(wait_time)
                continue

错误处理和重试机制的实现

LangGraph与LangChain集成

生态系统协同
  • 无缝集成LangChain组件
  • 共享工具和记忆系统
  • 统一的API接口
  • 可插拔架构

集成示例

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langgraph.graph import StateGraph, END

# LangChain组件
llm = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template(
    "You are a helpful assistant. {input}"
)
output_parser = StrOutputParser()

# 创建LangChain链
langchain_chain = prompt | llm | output_parser

def langchain_node(state):
    """使用LangChain组件的节点"""
    result = langchain_chain.invoke({"input": state["messages"][-1].content})
    return {"response": result}

# 集成到LangGraph
graph = StateGraph()
graph.add_node("langchain", langchain_node)
graph.set_entry_point("langchain")

# 编译应用
app = graph.compile()

LangChain组件的集成方式

子图(Subgraphs)

模块化设计
  • 将复杂图分解为子图
  • 子图封装和复用
  • 层次化工作流
  • 并行执行优化

子图架构示例

┌─────────────────────────────────────────────────┐ │ 主图 (Main Graph) │ ├─────────────────────────────────────────────────┤ │ ┌─────────────┐ ┌─────────────┐ │ │ │ Input │───►│ Process │ │ │ │ Node │ │ Node │ │ │ └─────────────┘ └──────┬──────┘ │ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ Subgraph A│◄───►│ Subgraph B│ │ │ │ (LLM) │ │ (Tools) │ │ │ └─────────────┘ └─────────────┘ │ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ Output │◄───►│ Output │ │ │ │ Node │ │ Node │ │ │ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────┘

支持模块化的子图设计

子图创建与使用

from langgraph.graph import StateGraph, END
from typing import Dict

class SubgraphA:
    """子图A:LLM处理"""
    
    def __init__(self):
        self.graph = StateGraph()
        self._setup_graph()
    
    def _setup_graph(self):
        self.graph.add_node("llm", self.llm_node)
        self.graph.set_entry_point("llm")
        self.graph.add_edge("llm", END)
    
    def llm_node(self, state: Dict) -> Dict:
        # LLM处理逻辑
        return {"llm_response": "LLM processed"}
    
    def compile(self):
        return self.graph.compile()

class SubgraphB:
    """子图B:工具调用"""
    
    def __init__(self):
        self.graph = StateGraph()
        self._setup_graph()
    
    def _setup_graph(self):
        self.graph.add_node("tool", self.tool_node)
        self.graph.set_entry_point("tool")
        self.graph.add_edge("tool", END)
    
    def tool_node(self, state: Dict) -> Dict:
        # 工具调用逻辑
        return {"tool_result": "Tool executed"}
    
    def compile(self):
        return self.graph.compile()

# 使用子图
subgraph_a = SubgraphA().compile()
subgraph_b = SubgraphB().compile()

def main_graph_node(state: Dict) -> Dict:
    # 调用子图
    result_a = subgraph_a.invoke(state)
    result_b = subgraph_b.invoke(result_a)
    return result_b

子图的创建和集成模式

性能优化策略

效率提升方法
  • 并行节点执行
  • 缓存机制
  • 资源池管理
  • 批处理优化

并行执行优化

from langgraph.graph import StateGraph, END
import asyncio
from typing import List, Dict

class ParallelProcessor:
    """并行处理器"""
    
    def __init__(self, graph: StateGraph):
        self.graph = graph
    
    async def parallel_execute(self, states: List[Dict]) -> List[Dict]:
        """并行执行多个状态"""
        tasks = []
        
        for state in states:
            task = self.graph.aainvoke(state)
            tasks.append(task)
        
        results = await asyncio.gather(*tasks)
        return results
    
    def batch_process(self, states: List[Dict], batch_size: int = 5) -> List[Dict]:
        """批处理执行"""
        results = []
        
        for i in range(0, len(states), batch_size):
            batch = states[i:i + batch_size]
            batch_results = asyncio.run(self.parallel_execute(batch))
            results.extend(batch_results)
        
        return results

class CacheManager:
    """缓存管理器"""
    
    def __init__(self):
        self.cache = {}
        self.cache_ttl = 3600  # 1小时缓存
    
    def get_cached_result(self, key: str) -> Optional[Dict]:
        """获取缓存结果"""
        if key in self.cache:
            result, timestamp = self.cache[key]
            import time
            if time.time() - timestamp < self.cache_ttl:
                return result
            else:
                del self.cache[key]
        return None
    
    def cache_result(self, key: str, result: Dict):
        """缓存结果"""
        import time
        self.cache[key] = (result, time.time())

性能优化的实现

监控与日志

可观测性
  • LangSmith集成
  • 执行路径追踪
  • 性能指标收集
  • 错误监控

监控配置

from langgraph.graph import StateGraph
from typing import Dict, Any
import logging
from datetime import datetime

class MonitoringAgent:
    """监控代理"""
    
    def __init__(self):
        self.logger = logging.getLogger("langgraph_monitor")
        self.metrics = {
            "total_executions": 0,
            "successful_executions": 0,
            "failed_executions": 0,
            "average_execution_time": 0.0
        }
        self.execution_times = []
    
    def log_execution_start(self, state: Dict):
        """记录执行开始"""
        self.logger.info(f"Execution started: {datetime.now()}")
        self.logger.info(f"Initial state: {state}")
        return datetime.now()
    
    def log_execution_end(self, start_time: datetime, result: Dict, error: Exception = None):
        """记录执行结束"""
        execution_time = (datetime.now() - start_time).total_seconds()
        self.execution_times.append(execution_time)
        
        self.metrics["total_executions"] += 1
        self.metrics["average_execution_time"] = sum(self.execution_times) / len(self.execution_times)
        
        if error:
            self.metrics["failed_executions"] += 1
            self.logger.error(f"Execution failed: {error}")
        else:
            self.metrics["successful_executions"] += 1
            self.logger.info(f"Execution completed: {execution_time:.2f}s")
    
    def get_metrics(self) -> Dict[str, Any]:
        """获取监控指标"""
        return self.metrics.copy()

# 配置LangSmith
import os
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "langgraph-monitoring"

监控和日志配置

生产部署策略

企业级部署
  • 容器化部署
  • 水平扩展
  • 负载均衡
  • 高可用配置

Docker部署配置

# Dockerfile for LangGraph deployment
FROM python:3.11-slim

# 设置工作目录
WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["python", "app.py"]

容器化部署基础配置

测试策略

质量保证
  • 单元测试
  • 集成测试
  • 端到端测试
  • 性能测试

测试示例

import pytest
from langgraph.graph import StateGraph, END
from unittest.mock import Mock, patch

class TestLangGraphAgent:
    """LangGraph代理测试类"""
    
    @pytest.fixture
    def simple_graph(self):
        """创建简单测试图"""
        graph = StateGraph()
        
        def test_node(state):
            return {"response": "test_result"}
        
        graph.add_node("test", test_node)
        graph.set_entry_point("test")
        graph.add_edge("test", END)
        
        return graph.compile()
    
    def test_node_execution(self, simple_graph):
        """测试节点执行"""
        initial_state = {"messages": ["Hello"]}
        result = simple_graph.invoke(initial_state)
        
        assert "response" in result
        assert result["response"] == "test_result"
    
    def test_error_handling(self, simple_graph):
        """测试错误处理"""
        def error_node(state):
            raise ValueError("Test error")
        
        error_graph = StateGraph()
        error_graph.add_node("error", error_node)
        error_graph.set_entry_point("error")
        error_graph.add_edge("error", END)
        
        # 验证错误被正确捕获
        with pytest.raises(ValueError):
            error_graph.compile().invoke({"messages": ["test"]})

测试框架的使用

实际应用场景

企业级用例
  • 客户服务自动化
  • 数据分析与报告
  • 代码审查与优化
  • 多模态内容生成

客户服务自动化

智能客服系统
  • 意图识别和分类
  • 多轮对话管理
  • 工单创建和跟踪
  • 知识库查询

客服代理实现

class CustomerServiceAgent:
    """客户服务智能体"""
    
    def __init__(self):
        self.graph = self._create_graph()
    
    def _create_graph(self):
        """创建客服代理图"""
        graph = StateGraph()
        
        # 添加节点
        graph.add_node("greeting", self.greeting_node)
        graph.add_node("intent_classification", self.intent_node)
        graph.add_node("response", self.response_node)
        graph.add_node("escalation", self.escalation_node)
        
        # 添加边
        graph.set_entry_point("greeting")
        graph.add_edge("greeting", "intent_classification")
        graph.add_conditional_edges(
            "intent_classification",
            self.should_escalate,
            {
                "response": "response",
                "escalation": "escalation"
            }
        )
        graph.add_edge("response", END)
        graph.add_edge("escalation", END)
        
        return graph.compile()
    
    def greeting_node(self, state):
        """问候节点"""
        return {"response": "您好!我是智能客服,很高兴为您服务。"}
    
    def intent_node(self, state):
        """意图识别节点"""
        # 实现意图识别逻辑
        intent = "general_inquiry"  # 示例意图
        return {"intent": intent}
    
    def should_escalate(self, state):
        """判断是否需要升级"""
        return "escalate" if state["intent"] == "complex_issue" else "response"

客服代理的实现架构

数据分析自动化

智能数据分析
  • 数据收集和清洗
  • 统计分析执行
  • 可视化图表生成
  • 报告自动编写

代码审查优化

智能代码助手
  • 代码质量检查
  • 性能优化建议
  • 安全漏洞检测
  • 最佳实践推荐

最佳实践指南

开发建议
  • 合理设计节点粒度
  • 有效利用状态管理
  • 完善的错误处理
  • 持续的性能监控

常见问题与解决方案

FAQ
  • Q1: 如何处理长时间运行的任务?
  • A1: 使用检查点机制确保持久化
  • Q2: 如何优化性能?
  • A2: 并行执行和缓存策略
  • Q3: 如何调试复杂问题?
  • A3: 利用LangSmith可视化工具
  • Q4: 如何保证数据安全?
  • A4: 加密状态和访问控制

未来发展方向

技术演进
  • 多模态智能体支持
  • 边缘计算集成
  • 量子计算优化
  • 更强大的工具生态

资源与学习

学习资料
  • 官方文档:langchain.com/langgraph
  • LangChain Academy课程
  • GitHub社区论坛
  • LangSmith平台

总结

核心价值
  • LangGraph为AI代理开发提供了强大的基础设施
  • 支持复杂的多步工作流和有状态管理
  • 企业级的生产部署能力
  • 活跃的开源生态系统

参考资料

  • 官方文档: https://docs.langchain.com/oss/python/langgraph/overview
  • GitHub仓库: https://github.com/langchain-ai/langgraph
  • LangSmith平台: https://www.langchain.com/langsmith
  • 安装指南: https://docs.langchain.com/oss/python/langgraph/quickstart

感谢阅读!
访问 https://atcfu.com/ai-articles/langgraph-agents/ 回顾本文