源码级别解析 · 源码解析 · HKUDS开源项目 · 2024.10发布
2026-05-09 | 每日技术深度解读
| 特性 | 传统RAG | LightRAG |
|---|---|---|
| 知识图谱 | 不支持 | 自动构建 |
| 实体关系 | 不支持 | 深度提取 |
| 查询模式 | 单一向量 | Naive/KG/Mix |
| 并发处理 | 有限 | 高并发异步 |
| 增量更新 | 困难 | 轻量级 |
| 缓存机制 | 基础 | 多层智能缓存 |
from lightrag import LightRAG
# 配置参数
config = {
"working_dir": "./rag_storage",
"kv_storage": "JsonKVStorage",
"vector_storage": "NanoVectorDBStorage",
"graph_storage": "NetworkXStorage",
"llm_model_func": openai_completion,
"embedding_func": openai_embedding,
"top_k": 60,
"chunk_token_size": 1200,
"max_total_tokens": 8192
}
# 初始化LightRAG实例
rag = LightRAG(**config)
await rag.initialize_storages()
LightRAG采用配置驱动设计,支持灵活的参数设置和存储后端选择
# 同步插入文档
track_id = rag.insert(
input="Your document text here...",
split_by_character="\n"
)
# 异步批量插入
async def async_insert():
track_id = await rag.ainsert(
input=["doc1.txt", "doc2.md"],
split_by_character=None # 自动分割
)
return track_id
# 检查处理状态
status = await rag.get_processing_status(track_id)
print(f"Status: {status}")
支持同步和异步两种文档插入模式,track_id用于追踪处理进度
def custom_chunking(tokenizer, content, **kwargs):
"""自定义文档分块逻辑"""
paragraphs = content.split('\n\n')
chunks = []
current_chunk = ""
current_tokens = 0
for para in paragraphs:
para_tokens = len(tokenizer.encode(para))
if current_tokens + para_tokens > 1200 and current_chunk:
chunks.append({
"tokens": current_tokens,
"content": current_chunk.strip(),
"chunk_order_index": len(chunks)
})
current_chunk = para
current_tokens = para_tokens
else:
current_chunk += para + "\n\n"
current_tokens += para_tokens
if current_chunk:
chunks.append({
"tokens": current_tokens,
"content": current_chunk.strip(),
"chunk_order_index": len(chunks)
})
return chunks
LightRAG支持自定义分块函数,满足特殊业务需求
async def extract_entities_relations(
text_chunk, global_config
):
"""实体关系提取核心函数"""
# 构建提取提示词
prompt = build_extraction_prompt(
text=text_chunk,
entity_types=global_config[
"addon_params"]["entity_types"],
language=global_config[
"addon_params"]["language"]
)
# 调用LLM进行提取
llm_response = await global_config[
"llm_model_func"
](prompt)
# 解析提取结果
entities, relations = parse_extraction_result(
llm_response,
tuple_delimiter=PROMPTS[
"DEFAULT_TUPLE_DELIMITER"
]
)
# 智能gleaning(多轮验证)
if len(entities) < 3 or len(relations) < 2:
entities, relations = await gleaning(
text_chunk, entities, relations,
global_config
)
return entities, relations
LightRAG采用多阶段提取策略,gleaning机制确保提取结果完整
PROMPTS["entity_extraction_system_prompt"] = """
---Role---
You are a Knowledge Graph Specialist
responsible for extracting entities and
relationships from the input text.
---Instructions---
1. Entity Extraction & Output:
- entity_name, entity_type,
entity_description
Format: entity<|delimiter|>name<|...>
2. Relationship Extraction:
- source_entity, target_entity,
keywords, description
Format: relation<|delimiter|>src<|...>
3. Output entities first,
then relationships
4. Output <|COMPLETE|> when done
"""
提示词采用角色+指令+格式规范的三段式结构
async def build_knowledge_graph(
entities, relations, knowledge_graph
):
"""构建知识图谱"""
# 批量插入节点
nodes_to_insert = {}
for name, data in entities.items():
nodes_to_insert[name] = {
"entity_name": name,
"entity_type": data["entity_type"],
"description": data["description"],
"source_id": data["source_id"]
}
await knowledge_graph.batch_insert_nodes(
nodes_to_insert
)
# 批量插入边
edges_to_insert = {}
for (src, tgt), data in relations.items():
edges_to_insert[(src, tgt)] = {
"src_id": src,
"tgt_id": tgt,
"description": data["description"],
"keywords": data["keywords"],
"weight": data["weight"]
}
await knowledge_graph.batch_insert_edges(
edges_to_insert
)
采用批量插入策略,大幅提升图谱构建效率
| 存储类型 | 默认实现 | 生产替代 | 核心用途 |
|---|---|---|---|
| KV存储 | JsonKVStorage | Redis/MongoDB | 文档元数据与缓存 |
| 向量存储 | NanoVectorDB | Milvus/Qdrant | 语义相似度检索 |
| 图存储 | NetworkXStorage | Neo4j/Neo4j | 知识图谱与关系推理 |
class NanoVectorDBStorage(BaseVectorStorage):
def __init__(self, namespace, global_config, **kwargs):
self.namespace = namespace
self.cosine_threshold = kwargs.get(
"cosine_threshold", 0.2
)
self.vectors = {} # id -> vector
self.metadata = {} # id -> metadata
async def upsert(self, data):
for vid, vdata in data.items():
vector = await self._embed(
vdata["content"]
)
self.vectors[vid] = vector
self.metadata[vid] = {
"content": vdata["content"],
"meta": vdata.get("meta_fields", {})
}
async def similarity_search(
self, query_vector, top_k=10
):
scores = {}
for vid, vec in self.vectors.items():
sim = self._cosine_similarity(
query_vector, vec
)
if sim >= self.cosine_threshold:
scores[vid] = sim
return sorted(
scores.items(),
key=lambda x: x[1],
reverse=True
)[:top_k]
NanoVectorDB采用内存优先设计,提供高性能向量检索
class NetworkXStorage(BaseGraphStorage):
def __init__(self, namespace, global_config):
self.namespace = namespace
self.graph = nx.DiGraph()
self.node_metadata = {}
self.edge_metadata = {}
self._lock = asyncio.Lock()
async def insert_node(self, node_id, node_data):
async with self._lock:
self.graph.add_node(
node_id, **node_data
)
self.node_metadata[node_id] = node_data
async def get_knowledge_graph(
self, node_label, max_depth=3
):
async with self._lock:
nodes = self._find_related_nodes(
node_label, max_depth
)
subgraph = self.graph.subgraph(nodes)
return self._convert_to_kg(subgraph)
def _find_related_nodes(
self, start, max_depth
):
nodes = set()
level = {start}
for _ in range(max_depth + 1):
next_level = set()
for n in level:
nodes.add(n)
next_level.update(
self.graph.neighbors(n)
)
level = next_level - nodes
return nodes
NetworkX存储提供完整图操作,支持复杂知识推理
| 模式 | 检索方式 | 速度 | 准确率 | 适用场景 |
|---|---|---|---|---|
| naive | 纯向量相似度 | 极快 | 中等 | 简单问答 |
| kg | 知识图谱推理 | 较慢 | 极高 | 复杂推理 |
| mix | 向量+图混合 | 中等 | 高 | 通用场景 |
async def query(
self, query, mode="mix", **kwargs
):
# 关键词提取
keywords = await self._extract_keywords(query)
if mode == "naive":
results = await self._naive_query(
keywords, top_k=kwargs.get("top_k", 10)
)
elif mode == "kg":
results = await self._kg_query(
keywords,
max_depth=kwargs.get("max_depth", 3)
)
elif mode == "mix":
results = await self._mix_query(
keywords,
kg_weight=kwargs.get("kg_weight", 0.6),
vector_weight=kwargs.get(
"vector_weight", 0.4
)
)
# 生成回答
return await self._generate_response(
query, results
)
查询引擎根据模式选择不同检索策略
async def _mix_query(
self, keywords,
kg_weight=0.6, vector_weight=0.4, top_k=10
):
# 并行执行两种检索
vector_task = self._naive_query(
keywords, top_k=top_k * 2
)
kg_task = self._kg_query(
keywords, max_depth=2
)
v_results, kg_results = await asyncio.gather(
vector_task, kg_task
)
# 结果融合
fused = []
seen = set()
for doc_id, score in v_results:
if doc_id not in seen:
fused.append({
"id": doc_id, "type": "vector",
"score": score * vector_weight
})
seen.add(doc_id)
for eid, score in kg_results:
if eid not in seen:
fused.append({
"id": eid, "type": "kg",
"score": score * kg_weight
})
seen.add(eid)
fused.sort(
key=lambda x: x["score"], reverse=True
)
return fused[:top_k]
混合查询结合向量检索和知识图谱优势,提供最佳效果
PROMPTS["keywords_extraction"] = """
---Role---
You are an expert keyword extractor
for RAG systems.
---Goal---
Extract two types of keywords:
1. high_level_keywords: overarching
concepts, themes, core intent
2. low_level_keywords: specific
entities, proper nouns, jargon
---Output Format---
Valid JSON only:
{
"high_level_keywords": [...],
"low_level_keywords": [...]
}
---Real Data---
User Query: {query}
---Output---
"""
关键词提取使用LLM理解查询意图,区分抽象概念和具体实体
PROMPTS["rag_response"] = """
---Role---
Expert AI assistant synthesizing
information from a knowledge base.
---Instructions---
1. Determine user query intent
2. Scrutinize KG Data and Document
Chunks in Context
3. Weave facts into coherent response
4. Track reference_id for citations
5. Generate References section
---Content Rules---
- ONLY use Context information
- DO NOT invent or assume
- Response in user query language
- Markdown formatting
- Max 5 most relevant citations
---Context---
{context_data}
"""
RAG响应提示词强调事实依据和引用追踪
class CacheManager:
def __init__(self, max_size=1000, ttl=3600):
self.max_size = max_size
self.ttl = ttl
self.llm_cache = LRUCache(max_size)
self.embedding_cache = LRUCache(
max_size * 2
)
self.query_cache = LRUCache(max_size)
self.stats = {
"llm_hits": 0, "llm_misses": 0,
"embed_hits": 0, "embed_misses": 0
}
async def get_llm_response(self, prompt_hash):
cached = self.llm_cache.get(prompt_hash)
if cached:
self.stats["llm_hits"] += 1
if time.time() - cached["ts"] < self.ttl:
return cached["response"]
self.stats["llm_misses"] += 1
return None
async def get_embedding(self, text):
text_hash = compute_mdhash_id(text)
cached = self.embedding_cache.get(text_hash)
if cached:
self.stats["embed_hits"] += 1
if time.time() - cached["ts"] < self.ttl:
return cached["embedding"]
self.stats["embed_misses"] += 1
return None
多层缓存大幅减少LLM调用和嵌入计算开销
class ConcurrencyManager:
def __init__(self, max_concurrent=10):
self.semaphore = asyncio.Semaphore(
max_concurrent
)
self.active_tasks = set()
async def execute_with_limit(
self, func, *args, **kwargs
):
async with self.semaphore:
task = asyncio.create_task(
func(*args, **kwargs)
)
self.active_tasks.add(task)
try:
return await task
finally:
self.active_tasks.discard(task)
async def batch_process(
self, items, processor, batch_size=10
):
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
tasks = [
self.execute_with_limit(
processor, item
)
for item in batch
]
batch_results = await asyncio.gather(
*tasks
)
results.extend(batch_results)
return results
Semaphore限流+批量处理确保高并发下的稳定性
async def gleaning_extraction(
text, existing_entities, existing_relations,
global_config, max_gleaning=2
):
"""多轮验证提取"""
all_entities = dict(existing_entities)
all_relations = dict(existing_relations)
for i in range(max_gleaning):
# 构建增量提取提示词
prompt = PROMPTS[
"entity_continue_extraction"
].format(
language=global_config[
"addon_params"]["language"]
)
# 调用LLM增量提取
response = await global_config[
"llm_model_func"
](prompt)
# 解析新结果
new_entities, new_relations = \
parse_extraction_result(response)
# 合并(去重)
all_entities.update(new_entities)
all_relations.update(new_relations)
return all_entities, all_relations
Gleaning机制确保知识提取的完整性和准确性
PROMPTS["summarize_entity_descriptions"] = """
---Role---
Knowledge Graph Specialist, proficient
in data curation and synthesis.
---Task---
Synthesize descriptions into a single
comprehensive summary.
---Instructions---
1. Input: JSON description list
2. Output: Plain text summary
3. Integrate ALL key information
4. Third-person perspective
5. Handle conflicts: separate distinct
entities, reconcile historical
discrepancies
6. Length <= {summary_length} tokens
7. Language: {language}
---Input---
{description_type} Name: {name}
Description List:
{description_list}
---Output---
"""
摘要提示词强调信息完整性和冲突处理
FROM python:3.9-slim
WORKDIR /app
RUN apt-get update && apt-get install -y \
build-essential curl \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir \
-r requirements.txt
COPY . .
RUN mkdir -p /app/rag_storage
ENV PYTHONPATH=/app
ENV WORKSPACE=/app/rag_storage
EXPOSE 8000
CMD ["python", "-m", "uvicorn", \
"lightrag_server:app", \
"--host", "0.0.0.0", \
"--port", "8000"]
Docker容器化部署简化生产环境配置
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
class QueryRequest(BaseModel):
query: str
mode: str = "mix"
top_k: int = 10
class DocumentRequest(BaseModel):
content: str
file_path: str | None = None
app = FastAPI(title="LightRAG API")
rag = None
@app.on_event("startup")
async def startup():
global rag
rag = await create_rag_instance()
await rag.initialize_storages()
@app.post("/query")
async def query(req: QueryRequest):
result = await rag.aquery(
req.query, mode=req.mode,
top_k=req.top_k
)
return {"result": result}
@app.post("/documents")
async def insert(
req: DocumentRequest,
bg: BackgroundTasks
):
bg.add_task(process_doc, rag, req)
return {"status": "processing"}
FastAPI提供高性能异步API服务
| 维度 | LightRAG | LangChain | LlamaIndex |
|---|---|---|---|
| 定位 | 轻量级RAG | 通用框架 | 数据框架 |
| 知识图谱 | 内置自动构建 | 需集成 | 需集成 |
| 性能 | 极优 | 中等 | 中等 |
| 易用性 | 开箱即用 | 学习曲线陡 | 学习曲线中等 |
| 存储后端 | 多后端支持 | 丰富 | 丰富 |
| 缓存机制 | 内置多层 | 需自建 | 需自建 |
感谢阅读!
访问 https://atcfu.com/ai-articles/lightrag-retrieval-augmented-generation/ 回顾本文