🦜 LangChain Core 核心架构

源码深度解析 · AI应用开发基石

源码级别解析 · LangChain ecosystem foundation analysis
2026-06-03 | 每日技术深度解读

为什么需要 LangChain Core

现代AI应用的核心挑战
  • LLM能力有限:无法直接访问外部数据源
  • 状态管理:需要记忆和上下文保持
  • 工具集成:需要调用外部API和服务
  • 多步推理:需要复杂的决策链路
  • 生产化部署:需要监控和调试能力

LangChain Core 的核心价值

模块化抽象层的力量
  • 统一接口:任何提供商都可以实现所需接口
  • 可替换性:可以轻松切换模型和工具提供商
  • 可扩展性:为未来技术演进预留空间
  • 开发者体验:快速原型化和生产化
  • 生态系统:丰富的集成和社区支持

LangChain Core 架构概览

分层设计理念
  • Base Abstractions(基础抽象)
  • Runnables(可执行组件)
  • Models(模型接口)
  • Prompts(提示模板)
  • Memory(记忆管理)
  • Tools(工具集成)
  • Agents(智能体)

Core Package 入门

pip install langchain-core

# 基础组件导入
from langchain_core.runnables import Runnable
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import tool
from langchain_core.memory import BaseMemory

LangChain Core 包含所有基础抽象类和接口

Runnable 接口 - 核心中的核心

LangChain的可执行组件基类
  • 定义了标准执行接口
  • 支持链式调用和组合
  • 异步执行支持
  • 配置和元数据处理
  • 回调和追踪支持

Runnable 基类定义

from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Union
from langchain_core.runnables import RunnableConfig

class Runnable(ABC):
    """LangChain 可执行组件基类"""
    
    @abstractmethod
    def invoke(
        self, 
        input: Any, 
        config: Optional[RunnableConfig] = None
    ) -> Any:
        """同步执行"""
        pass
    
    @abstractmethod
    async def ainvoke(
        self, 
        input: Any, 
        config: Optional[RunnableConfig] = None
    ) -> Any:
        """异步执行"""
        pass
    
    def __or__(self, other: 'Runnable') -> 'RunnableSequence':
        """链式操作符 |"""
        return RunnableSequence(self, other)

所有LangChain组件都继承自Runnable基类

RunnableSequence - 链式执行

class RunnableSequence(Runnable):
    """可执行组件序列"""
    
    def __init__(self, first: Runnable, last: Runnable):
        self.first = first
        self.last = last
    
    def invoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
        # 第一个组件执行
        intermediate = self.first.invoke(input, config)
        # 第二个组件执行
        result = self.last.invoke(intermediate, config)
        return result

# 使用示例
prompt = ChatPromptTemplate.from_template("Hello {name}")
model = ChatOpenAI()
parser = StrOutputParser()

chain = prompt | model | parser
result = chain.invoke({"name": "World"})

使用 | 操作符轻松组合多个组件

模型接口设计

统一模型抽象
  • BaseLanguageModel:基础语言模型
  • BaseChatModel:聊天模型
  • BaseLLM:传统LLM
  • 支持多种提供商:OpenAI、Anthropic、Hugging Face等
  • 标准化输入输出格式

BaseChatModel 接口

from typing import List, Union, Any
from langchain_core.messages import BaseMessage
from langchain_core.outputs import ChatGeneration, ChatResult
from langchain_core.language_models import BaseChatModel

class BaseChatModel(BaseLanguageModel):
    """聊天模型基类"""
    
    def invoke(
        self,
        messages: List[Union[HumanMessage, AIMessage, SystemMessage]],
        config: Optional[RunnableConfig] = None
    ) -> AIMessage:
        """同步调用"""
        pass
    
    async def ainvoke(
        self,
        messages: List[Union[HumanMessage, AIMessage, SystemMessage]],
        config: Optional[RunnableConfig] = None
    ) -> AIMessage:
        """异步调用"""
        pass
    
    def _generate(
        self,
        messages: List[BaseMessage],
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
    ) -> ChatResult:
        """核心生成逻辑"""
        pass

标准化的聊天模型接口

提示模板系统

动态提示构建
  • BasePromptTemplate:基础模板接口
  • ChatPromptTemplate:聊天消息模板
  • StringPromptTemplate:字符串模板
  • FewShotPromptTemplate:少样本学习
  • 结构化和非结构化提示

ChatPromptTemplate 实现

from typing import List, Dict, Any
from langchain_core.prompts import ChatMessagePromptTemplate, PromptTemplate

class ChatPromptTemplate(BasePromptTemplate):
    """聊天提示模板"""
    
    def __init__(self, messages: List[Dict[str, Any]]):
        self.messages = messages
        self.input_variables = self._extract_variables()
    
    def _extract_variables(self) -> List[str]:
        """提取模板变量"""
        variables = set()
        for msg in self.messages:
            if 'template' in msg:
                variables.update(self._get_variables_from_template(msg['template']))
        return list(variables)
    
    def format_prompt(self, **kwargs) -> ChatPromptValue:
        """格式化提示"""
        formatted_messages = []
        for msg in self.messages:
            if msg.get('role') == 'system':
                content = msg.get('template', '').format(**kwargs)
                formatted_messages.append(SystemMessage(content=content))
            elif msg.get('role') == 'user':
                content = msg.get('template', '').format(**kwargs)
                formatted_messages.append(HumanMessage(content=content))
        return ChatPromptValue(messages=formatted_messages)

灵活的消息模板系统

模板变量解析

# 定义聊天提示
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful {role} assistant."),
    ("user", "Explain {topic} in simple terms with examples.")
])

# 模板变量替换
variables = {"role": "technical", "topic": "LangChain Core"}
formatted_prompt = prompt.format_prompt(**variables)

# 输出格式化后的消息
print(formatted_prompt.messages)
# [
#   SystemMessage(content="You are a helpful technical assistant."),
#   HumanMessage(content="Explain LangChain Core in simple terms with examples.")
# ]

动态变量替换和消息构建

输出解析器系统

结构化数据提取
  • BaseOutputParser:基础解析器
  • StrOutputParser:字符串输出
  • PydanticOutputParser:Pydantic模型
  • JsonOutputParser:JSON数据
  • 自定义解析器扩展

输出解析器实现

from typing import Any, Dict, List
from langchain_core.outputs import Generation
from langchain_core.output_parsers import BaseOutputParser

class PydanticOutputParser(BaseOutputParser):
    """Pydantic输出解析器"""
    
    def __init__(self, pydantic_object: Type[BaseModel]):
        self.pydantic_object = pydantic_object
    
    def parse(self, text: str) -> Any:
        """解析文本为Pydantic对象"""
        try:
            # 尝试解析JSON格式的响应
            json_data = json.loads(text)
            return self.pydantic_object(**json_data)
        except json.JSONDecodeError:
            # 如果不是JSON,尝试直接解析
            return self.pydantic_object.parse_raw(text)
    
    def get_format_instructions(self) -> str:
        """获取格式说明"""
        schema_str = self.pydantic_object.model_json_schema(indent=2)
        return f"Please respond with valid JSON that matches this JSON schema:\n{schema_str}"

将LLM输出转换为结构化数据

记忆管理系统

对话状态保持
  • BaseMemory:记忆基础接口
  • InMemoryMemory:内存记忆
  • ConversationBufferMemory:缓冲记忆
  • ConversationSummaryMemory:摘要记忆
  • 键值对记忆系统

Memory 接口设计

from typing import Dict, Any, Optional
from langchain_core.messages import BaseMessage

class BaseMemory(ABC):
    """记忆系统基类"""
    
    memory_key: str = "chat_history"  # 存储键名
    return_messages: bool = True  # 是否返回消息
    input_key: str = "input"  # 输入键名
    output_key: str = "output"  # 输出键名
    
    @abstractmethod
    def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """加载记忆变量"""
        pass
    
    @abstractmethod
    def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, Any]) -> None:
        """保存对话上下文"""
        pass
    
    @abstractmethod
    def clear(self) -> None:
        """清除记忆"""
        pass

记忆系统的统一接口

InMemoryMemory 实现

from typing import Dict, Any, List
from langchain_core.messages import BaseMessage
from langchain_core.memory import BaseMemory

class InMemoryMemory(BaseMemory):
    """内存记忆实现"""
    
    def __init__(self, memory_key: str = "chat_history"):
        super().__init__(memory_key=memory_key)
        self.chat_memory: List[BaseMessage] = []
    
    def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """加载聊天历史"""
        if self.return_messages:
            return {self.memory_key: self.chat_memory}
        else:
            return {self.memory_key: self.chat_memory_to_string()}
    
    def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, Any]) -> None:
        """保存对话上下文"""
        # 保存输入消息
        if self.input_key in inputs:
            self.chat_memory.append(HumanMessage(content=str(inputs[self.input_key])))
        
        # 保存输出消息
        if self.output_key in outputs:
            self.chat_memory.append(AIMessage(content=str(outputs[self.output_key])))
    
    def chat_memory_to_string(self) -> str:
        """转换为字符串格式"""
        return "\n".join([
            f"{msg.type}: {msg.content}" 
            for msg in self.chat_memory
        ])

简单的内存记忆实现

工具系统架构

AI能力扩展
  • BaseTool:工具基类
  • StructuredTool:结构化工具
  • Tool:基础工具
  • ToolExecutor:工具执行器
  • 动态工具注册系统

BaseTool 设计

from typing import Dict, Any, Optional, Union, List
from langchain_core.tools import BaseTool

class BaseTool(BaseTool):
    """工具基类"""
    
    name: str  # 工具名称
    description: str  # 工具描述
    return_direct: bool = False  # 是否直接返回
    args_schema: Optional[Type[BaseModel]] = None  # 参数模式
    
    def _run(
        self, 
        *args,
        **kwargs
    ) -> str:
        """同步执行工具"""
        raise NotImplementedError
    
    async def _arun(
        self, 
        *args,
        **kwargs
    ) -> str:
        """异步执行工具"""
        raise NotImplementedError
    
    def run(
        self,
        *args,
        config: Optional[RunnableConfig] = None,
        **kwargs
    ) -> Any:
        """执行工具(带配置)"""
        return self._run(*args, **kwargs)
    
    async def arun(
        self,
        *args,
        config: Optional[RunnableConfig] = None,
        **kwargs
    ) -> Any:
        """异步执行工具"""
        return await self._arun(*args, **kwargs)

工具系统的统一接口

工具装饰器实现

def tool(
    *args,
    name: Optional[str] = None,
    description: Optional[str] = None,
    args_schema: Optional[Type[BaseModel]] = None,
    return_direct: bool = False,
):
    """工具装饰器"""
    def decorator(func: Callable) -> BaseTool:
        tool_name = name or func.__name__
        tool_description = description or func.__doc__ or ""
        
        class ToolWrapper(BaseTool):
            name = tool_name
            description = tool_description
            args_schema = args_schema
            return_direct = return_direct
            
            def _run(self, *args, **kwargs):
                return func(*args, **kwargs)
            
            async def _arun(self, *args, **kwargs):
                if asyncio.iscoroutinefunction(func):
                    return await func(*args, **kwargs)
                else:
                    return func(*args, **kwargs)
        
        return ToolWrapper()
    
    if len(args) == 1 and callable(args[0]):
        return decorator(args[0])
    else:
        return decorator

# 使用装饰器创建工具
@tool
def calculate(expression: str) -> float:
    """计算数学表达式"""
    return eval(expression)

# 或者使用类定义
class CalculatorTool(BaseTool):
    name = "calculator"
    description = "计算数学表达式"
    
    def _run(self, expression: str) -> float:
        return eval(expression)

两种工具定义方式

智能体架构

决策执行引擎
  • Agent:智能体基类
  • AgentExecutor:执行器
  • AgentExecutor:工具执行器
  • 多智能体协作
  • 决策逻辑设计

智能体决策逻辑

from typing import Dict, Any, List, Optional
from langchain_core.agents import AgentExecutor, Agent

class Agent(Agent):
    """智能体基类"""
    
    def __init__(self, llm, tools, system_message):
        self.llm = llm
        self.tools = tools
        self.system_message = system_message
        self.tools_by_name = {tool.name: tool for tool in tools}
    
    def plan(
        self, 
        intermediate_steps: List[tuple],
        **kwargs
    ) -> AgentAction:
        """规划下一步行动"""
        # 构建提示
        prompt = self._build_prompt(intermediate_steps, **kwargs)
        
        # 调用LLM决策
        response = self.llm.invoke(prompt)
        
        # 解析响应
        action = self._parse_response(response)
        return action
    
    def _build_prompt(self, intermediate_steps, **kwargs):
        """构建决策提示"""
        messages = [SystemMessage(content=self.system_message)]
        
        # 添加中间步骤
        for action, observation in intermediate_steps:
            messages.append(HumanMessage(content=f"Thought: {action.log}"))
            messages.append(AIMessage(content=f"Observation: {observation}"))
        
        # 添加当前任务
        messages.append(HumanMessage(content=f"Task: {kwargs.get('input', '')}"))
        
        return messages

智能体决策的核心逻辑

文档处理系统

数据导入和预处理
  • BaseLoader:文档加载器
  • Document:文档对象
  • TextSplitter:文本分割
  • DocumentTransformer:文档转换
  • 多格式支持:PDF、HTML、Markdown等

Document 对象结构

from typing import List, Dict, Any, Optional
from dataclasses import dataclass

@dataclass
class Document:
    """文档对象"""
    page_content: str  # 文档内容
    metadata: Dict[str, Any]  # 元数据
    lookup_str: str = ""  # 查找字符串
    lookup_index: int = 0  # 查找索引
    
    def __post_init__(self):
        if not self.metadata:
            self.metadata = {}
    
    def with_metadata(self, **kwargs: Any) -> 'Document':
        """添加元数据"""
        new_metadata = self.metadata.copy()
        new_metadata.update(kwargs)
        return Document(
            page_content=self.page_content,
            metadata=new_metadata
        )
    
    def to_json(self) -> Dict[str, Any]:
        """转换为JSON"""
        return {
            'page_content': self.page_content,
            'metadata': self.metadata
        }

文档对象的核心设计

文档加载器实现

from typing import List, Dict, Any, Iterator
from langchain_core.documents import Document
from abc import ABC, abstractmethod

class BaseLoader(ABC):
    """文档加载器基类"""
    
    @abstractmethod
    def lazy_load(self) -> Iterator[Document]:
        """懒加载文档"""
        pass
    
    def load(self) -> List[Document]:
        """加载所有文档"""
        return list(self.lazy_load())

class TextLoader(BaseLoader):
    """文本加载器"""
    
    def __init__(self, file_path: str, encoding: str = "utf-8"):
        self.file_path = file_path
        self.encoding = encoding
    
    def lazy_load(self) -> Iterator[Document]:
        """加载文本文件"""
        with open(self.file_path, 'r', encoding=self.encoding) as f:
            content = f.read()
            yield Document(
                page_content=content,
                metadata={'source': self.file_path}
            )

基础文档加载器实现

向量存储系统

语义检索核心
  • BaseVectorStore:向量存储基类
  • BaseRetriever:检索器基类
  • Embeddings:嵌入接口
  • 相似性搜索算法
  • 索引和更新机制

向量存储接口

from typing import List, Dict, Any, Optional
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings

class BaseVectorStore:
    """向量存储基类"""
    
    def add_texts(
        self,
        texts: List[str],
        metadatas: Optional[List[Dict[str, Any]]] = None,
        **kwargs
    ) -> List[str]:
        """添加文本到向量存储"""
        raise NotImplementedError
    
    def similarity_search(
        self,
        query: str,
        k: int = 4,
        **kwargs
    ) -> List[Document]:
        """相似性搜索"""
        raise NotImplementedError
    
    def delete(self, ids: List[str]) -> None:
        """删除文档"""
        raise NotImplementedError

class InMemoryVectorStore(BaseVectorStore):
    """内存向量存储实现"""
    
    def __init__(self, embedding: Embeddings):
        self.embedding = embedding
        self.documents: List[Document] = []
        self.embeddings: List[List[float]] = []
    
    def add_texts(self, texts, metadatas=None, **kwargs):
        """添加文本并计算嵌入"""
        if metadatas is None:
            metadatas = [{}] * len(texts)
        
        for text, metadata in zip(texts, metadatas):
            doc = Document(page_content=text, metadata=metadata)
            embedding = self.embedding.embed_query(text)
            
            self.documents.append(doc)
            self.embeddings.append(embedding)
        
        return [f"doc_{i}" for i in range(len(texts))]

向量存储的核心接口

回调系统

事件和追踪
  • BaseCallbackHandler:回调处理器
  • LangSmithTracer:追踪器
  • ConsoleCallbackHandler:控制台输出
  • FileCallbackHandler:文件输出
  • 自定义回调扩展

回调处理器实现

from typing import Any, Dict, Optional
from langchain_core.callbacks import BaseCallbackHandler

class LangSmithTracer(BaseCallbackHandler):
    """LangSmith追踪器"""
    
    def __init__(self, project_name: str = "default"):
        self.project_name = project_name
        self.runs: Dict[str, Any] = {}
    
    def on_chain_start(
        self, 
        serialized: Dict[str, Any],
        inputs: Dict[str, Any],
        **kwargs
    ) -> None:
        """链开始时调用"""
        run_id = kwargs.get('run_id')
        self.runs[run_id] = {
            'type': 'chain',
            'inputs': inputs,
            'steps': [],
            'start_time': time.time()
        }
    
    def on_chain_end(
        self, 
        response: Dict[str, Any],
        **kwargs
    ) -> None:
        """链结束时调用"""
        run_id = kwargs.get('run_id')
        if run_id in self.runs:
            self.runs[run_id]['end_time'] = time.time()
            self.runs[run_id]['response'] = response
            
            # 发送到LangSmith
            self._send_to_langsmith(self.runs[run_id])
    
    def _send_to_langsmith(self, run_data: Dict[str, Any]) -> None:
        """发送数据到LangSmith"""
        # 实现发送逻辑
        pass

运行时事件追踪系统

配置管理系统

运行时配置
  • RunnableConfig:运行配置
  • ConfigurableField:可配置字段
  • 环境变量支持
  • 运行时参数
  • 线程和并发控制

RunnableConfig 结构

from typing import Dict, Any, Optional, Union
from dataclasses import dataclass

@dataclass
class RunnableConfig:
    """运行配置"""
    run_id: Optional[str] = None  # 运行ID
    tags: Optional[List[str]] = None  # 标签
    metadata: Optional[Dict[str, Any]] = None  # 元数据
    callbacks: Optional[Any] = None  # 回调管理器
    recursion_limit: int = 100  # 递归限制
    max_concurrency: int = 100  # 最大并发数
    seed: Optional[int] = None  # 随机种子
    
    configurable: Optional[Dict[str, Any]] = None  # 可配置参数
    
    @classmethod
    def from_env(cls) -> 'RunnableConfig':
        """从环境变量创建配置"""
        return cls(
            tags=os.getenv('LANGCHAIN_TAGS', '').split(','),
            metadata={'env': os.getenv('ENVIRONMENT', 'development')},
            max_concurrency=int(os.getenv('LANGCHAIN_MAX_CONCURRENCY', '100'))
        )
    
    def merge(self, other: 'RunnableConfig') -> 'RunnableConfig':
        """合并配置"""
        return RunnableConfig(
            run_id=other.run_id or self.run_id,
            tags=other.tags or self.tags,
            metadata={**self.metadata, **(other.metadata or {})}
        )

运行时配置管理

错误处理系统

异常管理
  • LangChainException:基类异常
  • TracerException:追踪异常
  • OutputParserException:解析异常
  • ToolException:工具异常
  • ContextOverflowError:上下文溢出

异常类层次

class LangChainException(Exception):
    """LangChain基类异常"""
    pass

class TracerException(LangChainException):
    """追踪系统异常"""
    pass

class OutputParserException(LangChainException):
    """输出解析异常"""
    
    def __init__(self, text: str, error: Exception):
        self.text = text
        self.error = error
        super().__init__(f"Failed to parse output: {error}")

class ToolException(LangChainException):
    """工具执行异常"""
    
    def __init__(self, tool_name: str, error: Exception):
        self.tool_name = tool_name
        self.error = error
        super().__init__(f"Tool '{tool_name}' failed: {error}")

class ContextOverflowError(LangChainException):
    """上下文溢出异常"""
    
    def __init__(self, max_length: int, actual_length: int):
        self.max_length = max_length
        self.actual_length = actual_length
        super().__init__(f"Context length {actual_length} exceeds max {max_length}")

全面的异常处理系统

序列化和持久化

对象存储
  • Serializable:序列化接口
  • BaseStore:存储基类
  • InMemoryStore:内存存储
  • RedisStore:Redis存储
  • 对象生命周期管理

序列化系统

from typing import Any, Dict, Type
from abc import ABC, abstractmethod

class Serializable(ABC):
    """序列化接口"""
    
    @abstractmethod
    def to_json(self) -> Dict[str, Any]:
        """转换为JSON"""
        pass
    
    @classmethod
    @abstractmethod
    def from_json(cls, data: Dict[str, Any]) -> 'Serializable':
        """从JSON创建对象"""
        pass

class SerializedConstructor(Serializable):
    """序列化构造器"""
    
    def __init__(self, type: str, data: Dict[str, Any]):
        self.type = type
        self.data = data
    
    def to_json(self) -> Dict[str, Any]:
        return {
            'type': self.type,
            'data': self.data
        }
    
    @classmethod
    def from_json(cls, data: Dict[str, Any]) -> 'SerializedConstructor':
        return cls(data['type'], data['data'])

对象序列化和持久化

消息系统

对话管理
  • BaseMessage:消息基类
  • HumanMessage:用户消息
  • AIMessage:AI消息
  • SystemMessage:系统消息
  • 多模态消息支持

消息类设计

from typing import Dict, Any, List, Optional
from dataclasses import dataclass

@dataclass
class BaseMessage:
    """消息基类"""
    content: str  # 消息内容
    additional_kwargs: Dict[str, Any] = None  # 附加参数
    response_metadata: Dict[str, Any] = None  # 响应元数据
    type: str = ""  # 消息类型
    
    def __post_init__(self):
        if self.additional_kwargs is None:
            self.additional_kwargs = {}
        if self.response_metadata is None:
            self.response_metadata = {}

@dataclass
class HumanMessage(BaseMessage):
    """用户消息"""
    type: str = "human"
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'HumanMessage':
        return cls(
            content=data['content'],
            additional_kwargs=data.get('additional_kwargs', {}),
            response_metadata=data.get('response_metadata', {})
        )

@dataclass
class AIMessage(BaseMessage):
    """AI消息"""
    type: str = "ai"
    tool_calls: Optional[List[Dict[str, Any]]] = None
    
    def __post_init__(self):
        super().__post_init__()
        if self.tool_calls is None:
            self.tool_calls = []

消息系统的核心类

工具调用系统

AI Agent执行
  • ToolCall:工具调用对象
  • ToolMessage:工具消息
  • FunctionMessage:函数消息
  • 工具调用流程
  • 结果处理机制

工具调用实现

from typing import Dict, Any, List, Optional
from dataclasses import dataclass

@dataclass
class ToolCall:
    """工具调用"""
    id: str  # 调用ID
    function: Dict[str, Any]  # 函数调用
    type: str = "function"
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'ToolCall':
        return cls(
            id=data['id'],
            function=data['function'],
            type=data.get('type', 'function')
        )

@dataclass
class ToolMessage(BaseMessage):
    """工具消息"""
    tool_call_id: str  # 工具调用ID
    content: str  # 工具执行结果
    type: str = "tool"
    
    @classmethod
    def from_tool_call(cls, tool_call: ToolCall, content: str) -> 'ToolMessage':
        return cls(
            tool_call_id=tool_call.id,
            content=content,
            additional_kwargs={'tool_call_id': tool_call.id}
        )

工具调用和结果处理

异步执行系统

高性能并发
  • AsyncRunnable:异步可执行
  • AsyncBaseTracer:异步追踪
  • 并发控制
  • 批处理优化
  • 流式处理支持

异步组件实现

import asyncio
from typing import Any, Dict, Optional
from langchain_core.runnables import Runnable

class AsyncRunnable(Runnable):
    """异步可执行组件"""
    
    async def ainvoke(
        self, 
        input: Any, 
        config: Optional[RunnableConfig] = None
    ) -> Any:
        """异步执行"""
        raise NotImplementedError
    
    async def abatch(
        self, 
        inputs: List[Any],
        config: Optional[RunnableConfig] = None
    ) -> List[Any]:
        """批量异步执行"""
        tasks = [self.ainvoke(inp, config) for inp in inputs]
        return await asyncio.gather(*tasks)
    
    async def astream(
        self, 
        input: Any, 
        config: Optional[RunnableConfig] = None
    ) -> Any:
        """流式异步执行"""
        # 默认实现,可以重写以优化
        result = await self.ainvoke(input, config)
        yield result

class AsyncBatchProcessor:
    """异步批处理器"""
    
    def __init__(self, batch_size: int = 10, max_concurrent: int = 5):
        self.batch_size = batch_size
        self.max_concurrent = max_concurrent
    
    async def process_batch(
        self, 
        inputs: List[Any],
        processor: AsyncRunnable
    ) -> List[Any]:
        """处理批量输入"""
        results = []
        
        for i in range(0, len(inputs), self.batch_size):
            batch = inputs[i:i + self.batch_size]
            batch_results = await processor.abatch(batch)
            results.extend(batch_results)
            
        return results

异步执行和批处理优化

数据验证系统

类型安全
  • BaseModel:数据模型
  • Pydantic集成
  • JSON Schema验证
  • 类型提示支持
  • 运行时类型检查

Pydantic模型集成

from typing import Dict, List, Optional, Any
from pydantic import BaseModel, Field
from langchain_core.tools import tool
from langchain_core.output_parsers import PydanticOutputParser

class SearchQuery(BaseModel):
    """搜索查询模型"""
    query: str = Field(description="搜索关键词")
    max_results: int = Field(default=10, description="最大结果数")
    filters: Dict[str, Any] = Field(default_factory=dict, description="过滤条件")

@tool
def search_documents(query: SearchQuery) -> List[str]:
    """搜索文档"""
    # 实现搜索逻辑
    return [f"Result {i}: {query.query}" for i in range(query.max_results)]

# 使用Pydantic解析器
parser = PydanticOutputParser(pydantic_object=SearchQuery)

# 解析LLM输出
text_response = "{'query': 'LangChain', 'max_results': 5}"
query = parser.parse(text_response)
print(query.query)  # "LangChain"

类型安全的数据验证

监控和日志系统

运行时观察
  • 运行指标收集
  • 性能监控
  • 错误追踪
  • 使用统计
  • 自定义指标

监控指标收集

import time
from typing import Dict, Any, Optional
from dataclasses import dataclass
from collections import defaultdict

@dataclass
class RunMetrics:
    """运行指标"""
    start_time: float
    end_time: Optional[float] = None
    input_tokens: int = 0
    output_tokens: int = 0
    total_tokens: int = 0
    error_count: int = 0
    
    @property
    def duration(self) -> Optional[float]:
        """运行时长"""
        if self.end_time:
            return self.end_time - self.start_time
        return None

class MetricsCollector:
    """指标收集器"""
    
    def __init__(self):
        self.runs: Dict[str, RunMetrics] = {}
        self.operation_stats = defaultdict(int)
    
    def start_run(self, run_id: str) -> RunMetrics:
        """开始运行"""
        metrics = RunMetrics(start_time=time.time())
        self.runs[run_id] = metrics
        return metrics
    
    def end_run(self, run_id: str) -> RunMetrics:
        """结束运行"""
        if run_id in self.runs:
            metrics = self.runs[run_id]
            metrics.end_time = time.time()
            return metrics
        raise ValueError(f"Run {run_id} not found")
    
    def get_stats(self) -> Dict[str, Any]:
        """获取统计信息"""
        return {
            'total_runs': len(self.runs),
            'failed_runs': sum(1 for m in self.runs.values() if m.error_count > 0)
        }

运行时监控和指标收集

LangSmith 集成

专业调试平台
  • 可观测性平台
  • 链执行追踪
  • 性能分析
  • 错误调试
  • 测试和评估

LangSmith 集成

from typing import Dict, Any, List, Optional
from langchain_core.tracers import LangSmithTracer

class LangSmithIntegration:
    """LangSmith集成"""
    
    def __init__(self, project_name: str = "langchain-core"):
        self.project_name = project_name
        self.tracer = LangSmithTracer(project_name)
    
    def trace_chain(self, chain, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """追踪链执行"""
        with self.tracer:
            result = chain.invoke(inputs)
        return result
    
    def log_evaluation(self, name: str, score: float, metadata: Dict[str, Any] = None):
        """记录评估结果"""
        evaluation = {
            'name': name,
            'score': score,
            'timestamp': time.time(),
            'metadata': metadata or {}
        }
        # 发送到LangSmith
        self._send_evaluation(evaluation)
    
    def _send_evaluation(self, evaluation: Dict[str, Any]):
        """发送评估数据"""
        # 实现发送逻辑
        pass

LangSmith专业调试平台

生产化部署

生产环境准备
  • 容器化支持
  • 微服务架构
  • 负载均衡
  • 监控告警
  • 扩展性设计

生产化配置

import os
from typing import Dict, Any

# 环境变量配置
PRODUCTION_CONFIG = {
    'MODEL_ENDPOINT': os.getenv('MODEL_ENDPOINT', 'https://api.openai.com/v1'),
    'MAX_CONCURRENT_REQUESTS': int(os.getenv('MAX_CONCURRENT_REQUESTS', '100')),
    'REQUEST_TIMEOUT': int(os.getenv('REQUEST_TIMEOUT', '30')),
    'ENABLE_CACHING': os.getenv('ENABLE_CACHING', 'true').lower() == 'true',
    'CACHE_TTL': int(os.getenv('CACHE_TTL', '3600'))
}

class ProductionDeployer:
    """生产部署器"""
    
    def __init__(self, config: Dict[str, Any] = None):
        self.config = config or PRODUCTION_CONFIG
        self.health_check_interval = 30  # 健康检查间隔
    
    def health_check(self) -> Dict[str, Any]:
        """健康检查"""
        return {
            'status': 'healthy',
            'timestamp': time.time(),
            'uptime': self._get_uptime(),
            'memory_usage': self._get_memory_usage()
        }
    
    def scale_resources(self, target_load: float):
        """根据负载扩展资源"""
        if target_load > 0.8:
            self._add_resources()
        elif target_load < 0.3:
            self._remove_resources()

生产环境配置和管理

性能优化策略

高效执行
  • 缓存机制
  • 批处理优化
  • 内存管理
  • 并发控制
  • 异步IO优化

缓存系统实现

from typing import Dict, Any, Optional, Callable
from functools import wraps
import hashlib
import json

class CacheManager:
    """缓存管理器"""
    
    def __init__(self, max_size: int = 1000, ttl: int = 3600):
        self.cache: Dict[str, Dict[str, Any]] = {}
        self.max_size = max_size
        self.ttl = ttl  # 缓存时间(秒)
        self.access_times: Dict[str, float] = {}
    
    def get(self, key: str) -> Optional[Any]:
        """获取缓存值"""
        if key in self.cache:
            entry = self.cache[key]
            if time.time() - entry['timestamp'] < self.ttl:
                self.access_times[key] = time.time()
                return entry['value']
            else:
                del self.cache[key]
                del self.access_times[key]
        return None
    
    def set(self, key: str, value: Any):
        """设置缓存值"""
        if len(self.cache) >= self.max_size:
            self._evict_lru()
        
        self.cache[key] = {
            'value': value,
            'timestamp': time.time()
        }
        self.access_times[key] = time.time()
    
    def _evict_lru(self):
        """移除最久未使用的缓存"""
        if self.access_times:
            oldest_key = min(self.access_times.items(), key=lambda x: x[1])[0]
            del self.cache[oldest_key]
            del self.access_times[oldest_key]
    
    def cache_decorator(self, ttl: int = None):
        """缓存装饰器"""
        def decorator(func: Callable):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # 生成缓存键
                cache_key = self._generate_cache_key(func.__name__, args, kwargs)
                
                # 尝试从缓存获取
                cached_result = self.get(cache_key)
                if cached_result is not None:
                    return cached_result
                
                # 执行函数并缓存结果
                result = func(*args, **kwargs)
                self.set(cache_key, result)
                return result
            return wrapper
        return decorator

智能缓存系统

安全最佳实践

安全防护
  • 输入验证
  • 输出过滤
  • 权限控制
  • 数据加密
  • 审计日志

安全验证实现

import re
from typing import Dict, Any, List
from pydantic import BaseModel, validator

class SafePromptTemplate(BaseModel):
    """安全的提示模板"""
    template: str
    variables: List[str]
    
    @validator('template')
    def validate_template(cls, v):
        """验证模板内容"""
        # 检查注入攻击
        if any(keyword in v.lower() for keyword in [
            'import ', 'exec(', 'eval(', '__import__',
            'system(', 'os.system', 'subprocess'
        ]):
            raise ValueError('模板包含潜在的安全风险')
        return v
    
    @validator('variables')
    def validate_variables(cls, v):
        """验证变量名"""
        for var in v:
            if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*
  
, var): raise ValueError(f'无效的变量名: {var}') return v class SecurityFilter: """安全过滤器""" def __init__(self): self.blocked_patterns = [ r'<script.*?>.*?</script>', # XSS攻击 r'javascript:', # JS协议 r'data:', # 数据协议 ] def sanitize_input(self, text: str) -> str: """清理输入文本""" for pattern in self.blocked_patterns: text = re.sub(pattern, '', text, flags=re.IGNORECASE) return text def validate_output(self, text: str) -> bool: """验证输出文本""" for pattern in self.blocked_patterns: if re.search(pattern, text, flags=re.IGNORECASE): return False return True

安全防护措施

测试策略

质量保证

测试框架实现

import unittest
from typing import List, Dict, Any
from unittest.mock import Mock, patch

class TestLangChainCore(unittest.TestCase):
    """LangChain Core测试"""
    
    def setUp(self):
        """测试设置"""
        self.mock_llm = Mock()
        self.mock_llm.invoke.return_value = Mock(content="Test response")
        
        self.mock_tool = Mock()
        self.mock_tool.name = "test_tool"
        self.mock_tool._run.return_value = "Tool result"
    
    def test_chain_execution(self):
        """测试链执行"""
        from langchain_core.runnables import RunnableSequence
        
        chain = RunnableSequence(self.mock_llm, Mock())
        result = chain.invoke("test input")
        
        self.mock_llm.invoke.assert_called_once_with("test input")
        self.assertIsNotNone(result)
    
    def test_tool_execution(self):
        """测试工具执行"""
        from langchain_core.tools import tool
        
        @tool
def test_func(input: str) -> str:
            return f"Processed: {input}"
        
        result = test_func("test")
        self.assertEqual(result, "Processed: test")
    
    def test_memory_system(self):
        """测试记忆系统"""
        from langchain_core.memory import InMemoryMemory
        
        memory = InMemoryMemory()
        
        # 保存上下文
        memory.save_context(
            {"input": "Hello"},
            {"output": "Hi there!"}
        )
        
        # 加载记忆
        variables = memory.load_memory_variables({})
        self.assertIn("Hello", variables.get("chat_history", ""))
    
    def test_output_parsing(self):
        """测试输出解析"""
        from langchain_core.output_parsers import StrOutputParser
        
        parser = StrOutputParser()
        result = parser.parse("test string")
        self.assertEqual(result, "test string")

全面的测试覆盖

扩展和插件系统

功能扩展

扩展系统设计

from typing import Dict, Any, List, Type
from abc import ABC, abstractmethod

class ExtensionPoint(ABC):
    """扩展点基类"""
    
    @abstractmethod
    def register_extension(self, name: str, extension: Any) -> None:
        """注册扩展"""
        pass
    
    @abstractmethod
    def get_extension(self, name: str) -> Any:
        """获取扩展"""
        pass

class PluginManager(ExtensionPoint):
    """插件管理器"""
    
    def __init__(self):
        self.plugins: Dict[str, Any] = {}
        self.extensions: Dict[str, List[Any]] = {}
    
    def register_plugin(self, name: str, plugin: Any) -> None:
        """注册插件"""
        self.plugins[name] = plugin
        
        # 自动注册扩展点
        if hasattr(plugin, 'get_extensions'):
            extensions = plugin.get_extensions()
            for ext_name, ext in extensions.items():
                self.register_extension(ext_name, ext)
    
    def get_plugin(self, name: str) -> Any:
        """获取插件"""
        return self.plugins.get(name)
    
    def register_extension(self, name: str, extension: Any) -> None:
        """注册扩展"""
        if name not in self.extensions:
            self.extensions[name] = []
        self.extensions[name].append(extension)
    
    def get_extensions(self, name: str) -> List[Any]:
        """获取扩展列表"""
        return self.extensions.get(name, [])

灵活的扩展和插件系统

性能基准测试

性能评估

性能测试套件

import time
import statistics
from typing import List, Dict, Any
from concurrent.futures import ThreadPoolExecutor

class PerformanceBenchmark:
    """性能基准测试"""
    
    def __init__(self):
        self.results: Dict[str, List[float]] = {}
    
    def measure_latency(self, func: callable, *args, **kwargs) -> float:
        """测量延迟"""
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        return end_time - start_time
    
    def measure_throughput(self, func: callable, inputs: List[Any], 
                          max_workers: int = 10) -> Dict[str, float]:
        """测量吞吐量"""
        start_time = time.time()
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            results = list(executor.map(func, inputs))
        
        end_time = time.time()
        total_time = end_time - start_time
        
        return {
            'total_requests': len(inputs),
            'total_time': total_time,
            'requests_per_second': len(inputs) / total_time,
            'average_latency': total_time / len(inputs)
        }
    
    def run_benchmark(self, name: str, func: callable, 
                     test_inputs: List[Any], iterations: int = 10):
        """运行基准测试"""
        latencies = []
        
        for _ in range(iterations):
            for input_data in test_inputs:
                latency = self.measure_latency(func, input_data)
                latencies.append(latency)
        
        self.results[name] = {
            'mean_latency': statistics.mean(latencies),
            'median_latency': statistics.median(latencies),
            'min_latency': min(latencies),
            'max_latency': max(latencies),
            'std_dev': statistics.stdev(latencies) if len(latencies) > 1 else 0
        }

全面性能基准测试

实际应用案例

生产实践

企业知识库系统

from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.tools import tool
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_core.embeddings import OpenAIEmbeddings

# 构建企业知识库系统
class EnterpriseKnowledgeBase:
    """企业知识库系统"""
    
    def __init__(self):
        self.vector_store = InMemoryVectorStore(OpenAIEmbeddings())
        self.qa_chain = self._build_qa_chain()
    
    def _build_qa_chain(self):
        """构建问答链"""
        prompt = ChatPromptTemplate.from_template("""
        基于以下上下文回答问题。如果上下文中没有相关信息,请说明不知道。
        
        上下文:\n{context}\n
        问题:{question}
        """)
        
        retriever = self.vector_store.as_retriever()
        
        chain = (
            RunnableParallel({
                'context': retriever,
                'question': RunnablePassthrough()
            })
            | prompt
            | ChatOpenAI()
            | StrOutputParser()
        )
        
        return chain
    
    def add_documents(self, documents: List[str], metadatas: List[Dict] = None):
        """添加文档到知识库"""
        if metadatas is None:
            metadatas = [{}] * len(documents)
        self.vector_store.add_texts(documents, metadatas)
    
    def ask(self, question: str) -> str:
        """回答问题"""
        return self.qa_chain.invoke(question)

# 使用示例
kb = EnterpriseKnowledgeBase()
kb.add_documents([
    "公司成立于2020年,总部位于北京。",
    "主要产品包括AI助手平台和企业知识管理系统。",
    "公司员工规模超过500人。"
])

print(kb.ask("公司什么时候成立的?"))
print(kb.ask("公司有多少员工?"))

企业知识库系统实现

调试和故障排除

问题解决

调试工具

import logging
from typing import Dict, Any, Optional
from langchain_core.tracers import ConsoleCallbackHandler

# 配置日志
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('langchain-core')

class DebugTools:
    """调试工具集合"""
    
    def __init__(self):
        self.error_log = []
        self.performance_data = {}
    
    def log_error(self, error: Exception, context: Dict[str, Any] = None):
        """记录错误"""
        error_info = {
            'timestamp': time.time(),
            'error_type': type(error).__name__,
            'error_message': str(error),
            'context': context or {}
        }
        self.error_log.append(error_info)
        logger.error(f"Error occurred: {error_info}")
    
    def trace_execution(self, func: callable, *args, **kwargs):
        """执行追踪"""
        try:
            logger.debug(f"Executing {func.__name__} with args: {args}, kwargs: {kwargs}")
            start_time = time.time()
            
            result = func(*args, **kwargs)
            
            end_time = time.time()
            duration = end_time - start_time
            
            logger.debug(f"{func.__name__} completed in {duration:.2f}s")
            return result
            
        except Exception as e:
            self.log_error(e, {'function': func.__name__, 'args': args, 'kwargs': kwargs})
            raise
    
    def analyze_performance(self, function_name: str) -> Dict[str, Any]:
        """性能分析"""
        if function_name in self.performance_data:
            data = self.performance_data[function_name]
            return {
                'total_calls': len(data),
                'average_time': sum(data) / len(data),
                'max_time': max(data),
                'min_time': min(data)
            }
        return {'total_calls': 0}

调试和故障排除工具

社区和生态系统

开源生态

未来发展趋势

技术演进

总结

核心价值

参考资料

感谢阅读!
访问 https://atcfu.com/ai-articles/langchain-core/ 回顾本文