源码级别解析 · LangChain ecosystem foundation analysis
2026-06-03 | 每日技术深度解读
pip install langchain-core
# 基础组件导入
from langchain_core.runnables import Runnable
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import tool
from langchain_core.memory import BaseMemory
LangChain Core 包含所有基础抽象类和接口
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Union
from langchain_core.runnables import RunnableConfig
class Runnable(ABC):
"""LangChain 可执行组件基类"""
@abstractmethod
def invoke(
self,
input: Any,
config: Optional[RunnableConfig] = None
) -> Any:
"""同步执行"""
pass
@abstractmethod
async def ainvoke(
self,
input: Any,
config: Optional[RunnableConfig] = None
) -> Any:
"""异步执行"""
pass
def __or__(self, other: 'Runnable') -> 'RunnableSequence':
"""链式操作符 |"""
return RunnableSequence(self, other)
所有LangChain组件都继承自Runnable基类
class RunnableSequence(Runnable):
"""可执行组件序列"""
def __init__(self, first: Runnable, last: Runnable):
self.first = first
self.last = last
def invoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
# 第一个组件执行
intermediate = self.first.invoke(input, config)
# 第二个组件执行
result = self.last.invoke(intermediate, config)
return result
# 使用示例
prompt = ChatPromptTemplate.from_template("Hello {name}")
model = ChatOpenAI()
parser = StrOutputParser()
chain = prompt | model | parser
result = chain.invoke({"name": "World"})
使用 | 操作符轻松组合多个组件
from typing import List, Union, Any
from langchain_core.messages import BaseMessage
from langchain_core.outputs import ChatGeneration, ChatResult
from langchain_core.language_models import BaseChatModel
class BaseChatModel(BaseLanguageModel):
"""聊天模型基类"""
def invoke(
self,
messages: List[Union[HumanMessage, AIMessage, SystemMessage]],
config: Optional[RunnableConfig] = None
) -> AIMessage:
"""同步调用"""
pass
async def ainvoke(
self,
messages: List[Union[HumanMessage, AIMessage, SystemMessage]],
config: Optional[RunnableConfig] = None
) -> AIMessage:
"""异步调用"""
pass
def _generate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
) -> ChatResult:
"""核心生成逻辑"""
pass
标准化的聊天模型接口
from typing import List, Dict, Any
from langchain_core.prompts import ChatMessagePromptTemplate, PromptTemplate
class ChatPromptTemplate(BasePromptTemplate):
"""聊天提示模板"""
def __init__(self, messages: List[Dict[str, Any]]):
self.messages = messages
self.input_variables = self._extract_variables()
def _extract_variables(self) -> List[str]:
"""提取模板变量"""
variables = set()
for msg in self.messages:
if 'template' in msg:
variables.update(self._get_variables_from_template(msg['template']))
return list(variables)
def format_prompt(self, **kwargs) -> ChatPromptValue:
"""格式化提示"""
formatted_messages = []
for msg in self.messages:
if msg.get('role') == 'system':
content = msg.get('template', '').format(**kwargs)
formatted_messages.append(SystemMessage(content=content))
elif msg.get('role') == 'user':
content = msg.get('template', '').format(**kwargs)
formatted_messages.append(HumanMessage(content=content))
return ChatPromptValue(messages=formatted_messages)
灵活的消息模板系统
# 定义聊天提示
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful {role} assistant."),
("user", "Explain {topic} in simple terms with examples.")
])
# 模板变量替换
variables = {"role": "technical", "topic": "LangChain Core"}
formatted_prompt = prompt.format_prompt(**variables)
# 输出格式化后的消息
print(formatted_prompt.messages)
# [
# SystemMessage(content="You are a helpful technical assistant."),
# HumanMessage(content="Explain LangChain Core in simple terms with examples.")
# ]
动态变量替换和消息构建
from typing import Any, Dict, List
from langchain_core.outputs import Generation
from langchain_core.output_parsers import BaseOutputParser
class PydanticOutputParser(BaseOutputParser):
"""Pydantic输出解析器"""
def __init__(self, pydantic_object: Type[BaseModel]):
self.pydantic_object = pydantic_object
def parse(self, text: str) -> Any:
"""解析文本为Pydantic对象"""
try:
# 尝试解析JSON格式的响应
json_data = json.loads(text)
return self.pydantic_object(**json_data)
except json.JSONDecodeError:
# 如果不是JSON,尝试直接解析
return self.pydantic_object.parse_raw(text)
def get_format_instructions(self) -> str:
"""获取格式说明"""
schema_str = self.pydantic_object.model_json_schema(indent=2)
return f"Please respond with valid JSON that matches this JSON schema:\n{schema_str}"
将LLM输出转换为结构化数据
from typing import Dict, Any, Optional
from langchain_core.messages import BaseMessage
class BaseMemory(ABC):
"""记忆系统基类"""
memory_key: str = "chat_history" # 存储键名
return_messages: bool = True # 是否返回消息
input_key: str = "input" # 输入键名
output_key: str = "output" # 输出键名
@abstractmethod
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""加载记忆变量"""
pass
@abstractmethod
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, Any]) -> None:
"""保存对话上下文"""
pass
@abstractmethod
def clear(self) -> None:
"""清除记忆"""
pass
记忆系统的统一接口
from typing import Dict, Any, List
from langchain_core.messages import BaseMessage
from langchain_core.memory import BaseMemory
class InMemoryMemory(BaseMemory):
"""内存记忆实现"""
def __init__(self, memory_key: str = "chat_history"):
super().__init__(memory_key=memory_key)
self.chat_memory: List[BaseMessage] = []
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""加载聊天历史"""
if self.return_messages:
return {self.memory_key: self.chat_memory}
else:
return {self.memory_key: self.chat_memory_to_string()}
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, Any]) -> None:
"""保存对话上下文"""
# 保存输入消息
if self.input_key in inputs:
self.chat_memory.append(HumanMessage(content=str(inputs[self.input_key])))
# 保存输出消息
if self.output_key in outputs:
self.chat_memory.append(AIMessage(content=str(outputs[self.output_key])))
def chat_memory_to_string(self) -> str:
"""转换为字符串格式"""
return "\n".join([
f"{msg.type}: {msg.content}"
for msg in self.chat_memory
])
简单的内存记忆实现
from typing import Dict, Any, Optional, Union, List
from langchain_core.tools import BaseTool
class BaseTool(BaseTool):
"""工具基类"""
name: str # 工具名称
description: str # 工具描述
return_direct: bool = False # 是否直接返回
args_schema: Optional[Type[BaseModel]] = None # 参数模式
def _run(
self,
*args,
**kwargs
) -> str:
"""同步执行工具"""
raise NotImplementedError
async def _arun(
self,
*args,
**kwargs
) -> str:
"""异步执行工具"""
raise NotImplementedError
def run(
self,
*args,
config: Optional[RunnableConfig] = None,
**kwargs
) -> Any:
"""执行工具(带配置)"""
return self._run(*args, **kwargs)
async def arun(
self,
*args,
config: Optional[RunnableConfig] = None,
**kwargs
) -> Any:
"""异步执行工具"""
return await self._arun(*args, **kwargs)
工具系统的统一接口
def tool(
*args,
name: Optional[str] = None,
description: Optional[str] = None,
args_schema: Optional[Type[BaseModel]] = None,
return_direct: bool = False,
):
"""工具装饰器"""
def decorator(func: Callable) -> BaseTool:
tool_name = name or func.__name__
tool_description = description or func.__doc__ or ""
class ToolWrapper(BaseTool):
name = tool_name
description = tool_description
args_schema = args_schema
return_direct = return_direct
def _run(self, *args, **kwargs):
return func(*args, **kwargs)
async def _arun(self, *args, **kwargs):
if asyncio.iscoroutinefunction(func):
return await func(*args, **kwargs)
else:
return func(*args, **kwargs)
return ToolWrapper()
if len(args) == 1 and callable(args[0]):
return decorator(args[0])
else:
return decorator
# 使用装饰器创建工具
@tool
def calculate(expression: str) -> float:
"""计算数学表达式"""
return eval(expression)
# 或者使用类定义
class CalculatorTool(BaseTool):
name = "calculator"
description = "计算数学表达式"
def _run(self, expression: str) -> float:
return eval(expression)
两种工具定义方式
from typing import Dict, Any, List, Optional
from langchain_core.agents import AgentExecutor, Agent
class Agent(Agent):
"""智能体基类"""
def __init__(self, llm, tools, system_message):
self.llm = llm
self.tools = tools
self.system_message = system_message
self.tools_by_name = {tool.name: tool for tool in tools}
def plan(
self,
intermediate_steps: List[tuple],
**kwargs
) -> AgentAction:
"""规划下一步行动"""
# 构建提示
prompt = self._build_prompt(intermediate_steps, **kwargs)
# 调用LLM决策
response = self.llm.invoke(prompt)
# 解析响应
action = self._parse_response(response)
return action
def _build_prompt(self, intermediate_steps, **kwargs):
"""构建决策提示"""
messages = [SystemMessage(content=self.system_message)]
# 添加中间步骤
for action, observation in intermediate_steps:
messages.append(HumanMessage(content=f"Thought: {action.log}"))
messages.append(AIMessage(content=f"Observation: {observation}"))
# 添加当前任务
messages.append(HumanMessage(content=f"Task: {kwargs.get('input', '')}"))
return messages
智能体决策的核心逻辑
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
@dataclass
class Document:
"""文档对象"""
page_content: str # 文档内容
metadata: Dict[str, Any] # 元数据
lookup_str: str = "" # 查找字符串
lookup_index: int = 0 # 查找索引
def __post_init__(self):
if not self.metadata:
self.metadata = {}
def with_metadata(self, **kwargs: Any) -> 'Document':
"""添加元数据"""
new_metadata = self.metadata.copy()
new_metadata.update(kwargs)
return Document(
page_content=self.page_content,
metadata=new_metadata
)
def to_json(self) -> Dict[str, Any]:
"""转换为JSON"""
return {
'page_content': self.page_content,
'metadata': self.metadata
}
文档对象的核心设计
from typing import List, Dict, Any, Iterator
from langchain_core.documents import Document
from abc import ABC, abstractmethod
class BaseLoader(ABC):
"""文档加载器基类"""
@abstractmethod
def lazy_load(self) -> Iterator[Document]:
"""懒加载文档"""
pass
def load(self) -> List[Document]:
"""加载所有文档"""
return list(self.lazy_load())
class TextLoader(BaseLoader):
"""文本加载器"""
def __init__(self, file_path: str, encoding: str = "utf-8"):
self.file_path = file_path
self.encoding = encoding
def lazy_load(self) -> Iterator[Document]:
"""加载文本文件"""
with open(self.file_path, 'r', encoding=self.encoding) as f:
content = f.read()
yield Document(
page_content=content,
metadata={'source': self.file_path}
)
基础文档加载器实现
from typing import List, Dict, Any, Optional
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
class BaseVectorStore:
"""向量存储基类"""
def add_texts(
self,
texts: List[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
**kwargs
) -> List[str]:
"""添加文本到向量存储"""
raise NotImplementedError
def similarity_search(
self,
query: str,
k: int = 4,
**kwargs
) -> List[Document]:
"""相似性搜索"""
raise NotImplementedError
def delete(self, ids: List[str]) -> None:
"""删除文档"""
raise NotImplementedError
class InMemoryVectorStore(BaseVectorStore):
"""内存向量存储实现"""
def __init__(self, embedding: Embeddings):
self.embedding = embedding
self.documents: List[Document] = []
self.embeddings: List[List[float]] = []
def add_texts(self, texts, metadatas=None, **kwargs):
"""添加文本并计算嵌入"""
if metadatas is None:
metadatas = [{}] * len(texts)
for text, metadata in zip(texts, metadatas):
doc = Document(page_content=text, metadata=metadata)
embedding = self.embedding.embed_query(text)
self.documents.append(doc)
self.embeddings.append(embedding)
return [f"doc_{i}" for i in range(len(texts))]
向量存储的核心接口
from typing import Any, Dict, Optional
from langchain_core.callbacks import BaseCallbackHandler
class LangSmithTracer(BaseCallbackHandler):
"""LangSmith追踪器"""
def __init__(self, project_name: str = "default"):
self.project_name = project_name
self.runs: Dict[str, Any] = {}
def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
**kwargs
) -> None:
"""链开始时调用"""
run_id = kwargs.get('run_id')
self.runs[run_id] = {
'type': 'chain',
'inputs': inputs,
'steps': [],
'start_time': time.time()
}
def on_chain_end(
self,
response: Dict[str, Any],
**kwargs
) -> None:
"""链结束时调用"""
run_id = kwargs.get('run_id')
if run_id in self.runs:
self.runs[run_id]['end_time'] = time.time()
self.runs[run_id]['response'] = response
# 发送到LangSmith
self._send_to_langsmith(self.runs[run_id])
def _send_to_langsmith(self, run_data: Dict[str, Any]) -> None:
"""发送数据到LangSmith"""
# 实现发送逻辑
pass
运行时事件追踪系统
from typing import Dict, Any, Optional, Union
from dataclasses import dataclass
@dataclass
class RunnableConfig:
"""运行配置"""
run_id: Optional[str] = None # 运行ID
tags: Optional[List[str]] = None # 标签
metadata: Optional[Dict[str, Any]] = None # 元数据
callbacks: Optional[Any] = None # 回调管理器
recursion_limit: int = 100 # 递归限制
max_concurrency: int = 100 # 最大并发数
seed: Optional[int] = None # 随机种子
configurable: Optional[Dict[str, Any]] = None # 可配置参数
@classmethod
def from_env(cls) -> 'RunnableConfig':
"""从环境变量创建配置"""
return cls(
tags=os.getenv('LANGCHAIN_TAGS', '').split(','),
metadata={'env': os.getenv('ENVIRONMENT', 'development')},
max_concurrency=int(os.getenv('LANGCHAIN_MAX_CONCURRENCY', '100'))
)
def merge(self, other: 'RunnableConfig') -> 'RunnableConfig':
"""合并配置"""
return RunnableConfig(
run_id=other.run_id or self.run_id,
tags=other.tags or self.tags,
metadata={**self.metadata, **(other.metadata or {})}
)
运行时配置管理
class LangChainException(Exception):
"""LangChain基类异常"""
pass
class TracerException(LangChainException):
"""追踪系统异常"""
pass
class OutputParserException(LangChainException):
"""输出解析异常"""
def __init__(self, text: str, error: Exception):
self.text = text
self.error = error
super().__init__(f"Failed to parse output: {error}")
class ToolException(LangChainException):
"""工具执行异常"""
def __init__(self, tool_name: str, error: Exception):
self.tool_name = tool_name
self.error = error
super().__init__(f"Tool '{tool_name}' failed: {error}")
class ContextOverflowError(LangChainException):
"""上下文溢出异常"""
def __init__(self, max_length: int, actual_length: int):
self.max_length = max_length
self.actual_length = actual_length
super().__init__(f"Context length {actual_length} exceeds max {max_length}")
全面的异常处理系统
from typing import Any, Dict, Type
from abc import ABC, abstractmethod
class Serializable(ABC):
"""序列化接口"""
@abstractmethod
def to_json(self) -> Dict[str, Any]:
"""转换为JSON"""
pass
@classmethod
@abstractmethod
def from_json(cls, data: Dict[str, Any]) -> 'Serializable':
"""从JSON创建对象"""
pass
class SerializedConstructor(Serializable):
"""序列化构造器"""
def __init__(self, type: str, data: Dict[str, Any]):
self.type = type
self.data = data
def to_json(self) -> Dict[str, Any]:
return {
'type': self.type,
'data': self.data
}
@classmethod
def from_json(cls, data: Dict[str, Any]) -> 'SerializedConstructor':
return cls(data['type'], data['data'])
对象序列化和持久化
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
@dataclass
class BaseMessage:
"""消息基类"""
content: str # 消息内容
additional_kwargs: Dict[str, Any] = None # 附加参数
response_metadata: Dict[str, Any] = None # 响应元数据
type: str = "" # 消息类型
def __post_init__(self):
if self.additional_kwargs is None:
self.additional_kwargs = {}
if self.response_metadata is None:
self.response_metadata = {}
@dataclass
class HumanMessage(BaseMessage):
"""用户消息"""
type: str = "human"
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'HumanMessage':
return cls(
content=data['content'],
additional_kwargs=data.get('additional_kwargs', {}),
response_metadata=data.get('response_metadata', {})
)
@dataclass
class AIMessage(BaseMessage):
"""AI消息"""
type: str = "ai"
tool_calls: Optional[List[Dict[str, Any]]] = None
def __post_init__(self):
super().__post_init__()
if self.tool_calls is None:
self.tool_calls = []
消息系统的核心类
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
@dataclass
class ToolCall:
"""工具调用"""
id: str # 调用ID
function: Dict[str, Any] # 函数调用
type: str = "function"
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ToolCall':
return cls(
id=data['id'],
function=data['function'],
type=data.get('type', 'function')
)
@dataclass
class ToolMessage(BaseMessage):
"""工具消息"""
tool_call_id: str # 工具调用ID
content: str # 工具执行结果
type: str = "tool"
@classmethod
def from_tool_call(cls, tool_call: ToolCall, content: str) -> 'ToolMessage':
return cls(
tool_call_id=tool_call.id,
content=content,
additional_kwargs={'tool_call_id': tool_call.id}
)
工具调用和结果处理
import asyncio
from typing import Any, Dict, Optional
from langchain_core.runnables import Runnable
class AsyncRunnable(Runnable):
"""异步可执行组件"""
async def ainvoke(
self,
input: Any,
config: Optional[RunnableConfig] = None
) -> Any:
"""异步执行"""
raise NotImplementedError
async def abatch(
self,
inputs: List[Any],
config: Optional[RunnableConfig] = None
) -> List[Any]:
"""批量异步执行"""
tasks = [self.ainvoke(inp, config) for inp in inputs]
return await asyncio.gather(*tasks)
async def astream(
self,
input: Any,
config: Optional[RunnableConfig] = None
) -> Any:
"""流式异步执行"""
# 默认实现,可以重写以优化
result = await self.ainvoke(input, config)
yield result
class AsyncBatchProcessor:
"""异步批处理器"""
def __init__(self, batch_size: int = 10, max_concurrent: int = 5):
self.batch_size = batch_size
self.max_concurrent = max_concurrent
async def process_batch(
self,
inputs: List[Any],
processor: AsyncRunnable
) -> List[Any]:
"""处理批量输入"""
results = []
for i in range(0, len(inputs), self.batch_size):
batch = inputs[i:i + self.batch_size]
batch_results = await processor.abatch(batch)
results.extend(batch_results)
return results
异步执行和批处理优化
from typing import Dict, List, Optional, Any
from pydantic import BaseModel, Field
from langchain_core.tools import tool
from langchain_core.output_parsers import PydanticOutputParser
class SearchQuery(BaseModel):
"""搜索查询模型"""
query: str = Field(description="搜索关键词")
max_results: int = Field(default=10, description="最大结果数")
filters: Dict[str, Any] = Field(default_factory=dict, description="过滤条件")
@tool
def search_documents(query: SearchQuery) -> List[str]:
"""搜索文档"""
# 实现搜索逻辑
return [f"Result {i}: {query.query}" for i in range(query.max_results)]
# 使用Pydantic解析器
parser = PydanticOutputParser(pydantic_object=SearchQuery)
# 解析LLM输出
text_response = "{'query': 'LangChain', 'max_results': 5}"
query = parser.parse(text_response)
print(query.query) # "LangChain"
类型安全的数据验证
import time
from typing import Dict, Any, Optional
from dataclasses import dataclass
from collections import defaultdict
@dataclass
class RunMetrics:
"""运行指标"""
start_time: float
end_time: Optional[float] = None
input_tokens: int = 0
output_tokens: int = 0
total_tokens: int = 0
error_count: int = 0
@property
def duration(self) -> Optional[float]:
"""运行时长"""
if self.end_time:
return self.end_time - self.start_time
return None
class MetricsCollector:
"""指标收集器"""
def __init__(self):
self.runs: Dict[str, RunMetrics] = {}
self.operation_stats = defaultdict(int)
def start_run(self, run_id: str) -> RunMetrics:
"""开始运行"""
metrics = RunMetrics(start_time=time.time())
self.runs[run_id] = metrics
return metrics
def end_run(self, run_id: str) -> RunMetrics:
"""结束运行"""
if run_id in self.runs:
metrics = self.runs[run_id]
metrics.end_time = time.time()
return metrics
raise ValueError(f"Run {run_id} not found")
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
'total_runs': len(self.runs),
'failed_runs': sum(1 for m in self.runs.values() if m.error_count > 0)
}
运行时监控和指标收集
from typing import Dict, Any, List, Optional
from langchain_core.tracers import LangSmithTracer
class LangSmithIntegration:
"""LangSmith集成"""
def __init__(self, project_name: str = "langchain-core"):
self.project_name = project_name
self.tracer = LangSmithTracer(project_name)
def trace_chain(self, chain, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""追踪链执行"""
with self.tracer:
result = chain.invoke(inputs)
return result
def log_evaluation(self, name: str, score: float, metadata: Dict[str, Any] = None):
"""记录评估结果"""
evaluation = {
'name': name,
'score': score,
'timestamp': time.time(),
'metadata': metadata or {}
}
# 发送到LangSmith
self._send_evaluation(evaluation)
def _send_evaluation(self, evaluation: Dict[str, Any]):
"""发送评估数据"""
# 实现发送逻辑
pass
LangSmith专业调试平台
import os
from typing import Dict, Any
# 环境变量配置
PRODUCTION_CONFIG = {
'MODEL_ENDPOINT': os.getenv('MODEL_ENDPOINT', 'https://api.openai.com/v1'),
'MAX_CONCURRENT_REQUESTS': int(os.getenv('MAX_CONCURRENT_REQUESTS', '100')),
'REQUEST_TIMEOUT': int(os.getenv('REQUEST_TIMEOUT', '30')),
'ENABLE_CACHING': os.getenv('ENABLE_CACHING', 'true').lower() == 'true',
'CACHE_TTL': int(os.getenv('CACHE_TTL', '3600'))
}
class ProductionDeployer:
"""生产部署器"""
def __init__(self, config: Dict[str, Any] = None):
self.config = config or PRODUCTION_CONFIG
self.health_check_interval = 30 # 健康检查间隔
def health_check(self) -> Dict[str, Any]:
"""健康检查"""
return {
'status': 'healthy',
'timestamp': time.time(),
'uptime': self._get_uptime(),
'memory_usage': self._get_memory_usage()
}
def scale_resources(self, target_load: float):
"""根据负载扩展资源"""
if target_load > 0.8:
self._add_resources()
elif target_load < 0.3:
self._remove_resources()
生产环境配置和管理
from typing import Dict, Any, Optional, Callable
from functools import wraps
import hashlib
import json
class CacheManager:
"""缓存管理器"""
def __init__(self, max_size: int = 1000, ttl: int = 3600):
self.cache: Dict[str, Dict[str, Any]] = {}
self.max_size = max_size
self.ttl = ttl # 缓存时间(秒)
self.access_times: Dict[str, float] = {}
def get(self, key: str) -> Optional[Any]:
"""获取缓存值"""
if key in self.cache:
entry = self.cache[key]
if time.time() - entry['timestamp'] < self.ttl:
self.access_times[key] = time.time()
return entry['value']
else:
del self.cache[key]
del self.access_times[key]
return None
def set(self, key: str, value: Any):
"""设置缓存值"""
if len(self.cache) >= self.max_size:
self._evict_lru()
self.cache[key] = {
'value': value,
'timestamp': time.time()
}
self.access_times[key] = time.time()
def _evict_lru(self):
"""移除最久未使用的缓存"""
if self.access_times:
oldest_key = min(self.access_times.items(), key=lambda x: x[1])[0]
del self.cache[oldest_key]
del self.access_times[oldest_key]
def cache_decorator(self, ttl: int = None):
"""缓存装饰器"""
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = self._generate_cache_key(func.__name__, args, kwargs)
# 尝试从缓存获取
cached_result = self.get(cache_key)
if cached_result is not None:
return cached_result
# 执行函数并缓存结果
result = func(*args, **kwargs)
self.set(cache_key, result)
return result
return wrapper
return decorator
智能缓存系统
import re
from typing import Dict, Any, List
from pydantic import BaseModel, validator
class SafePromptTemplate(BaseModel):
"""安全的提示模板"""
template: str
variables: List[str]
@validator('template')
def validate_template(cls, v):
"""验证模板内容"""
# 检查注入攻击
if any(keyword in v.lower() for keyword in [
'import ', 'exec(', 'eval(', '__import__',
'system(', 'os.system', 'subprocess'
]):
raise ValueError('模板包含潜在的安全风险')
return v
@validator('variables')
def validate_variables(cls, v):
"""验证变量名"""
for var in v:
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*