基于 deepset-ai/haystack 源码深度解析
2026-03-24 | 技术深度解读
第一部分:基础架构
第二部分:数据存储
第三部分:核心组件
第四部分:实战应用
Haystack 是 deepset 开发的 NLP 框架,专注于构建生产级 RAG 系统。
核心特性
设计理念
┌─────────────────────────────────────────────┐
│ Haystack RAG 系统 │
├─────────────────────────────────────────────┤
│ ┌───────────┐ ┌───────────┐ ┌────────┐ │
│ │ Embedder │ → │ Retriever │ → │ Prompt │ │
│ │ (查询) │ │ (检索) │ │Builder │ │
│ └───────────┘ └───────────┘ └────────┘ │
│ ↓ ↓ ↓ │
│ ┌─────────────────────────────────────────┐│
│ │ Document Store ││
│ │ • InMemory / Elasticsearch / Qdrant ││
│ │ • BM25 / Vector Search ││
│ └─────────────────────────────────────────┘│
│ ↓ │
│ ┌───────────┐ ┌───────────┐ │
│ │ Generator │ → │ Answer │ │
│ │ (LLM) │ │ (输出) │ │
│ └───────────┘ └───────────┘ │
└─────────────────────────────────────────────┘
# pipeline.py - 核心管道类
class Pipeline:
"""管理组件图和执行流程"""
def __init__(self):
self.graph = DiGraph() # 有向无环图
self._component_names = {}
def add_component(self, name: str, instance: Component):
"""添加组件到管道"""
validate_component(instance)
self.graph.add_node(name, instance=instance)
def connect(self, source: str, dest: str):
"""连接两个组件"""
validate_connection(self.graph, source, dest)
self.graph.add_edge(source, dest)
def run(self, data: dict) -> dict:
"""执行管道"""
return self._run(self._prepare_run(data))
# 组件连接示例
from haystack import Pipeline
from haystack.components.retrievers import InMemoryBM25Retriever
from haystack.components.generators import OpenAIGenerator
# 创建管道
pipeline = Pipeline()
# 添加组件
pipeline.add_component("retriever", InMemoryBM25Retriever(doc_store))
pipeline.add_component("generator", OpenAIGenerator())
# 连接组件
# retriever.documents → generator.documents
pipeline.connect("retriever.documents", "generator.documents")
# 类型检查:确保输出类型匹配输入类型
# 自动验证连接的有效性
def _run(self, run_inputs: dict) -> dict:
"""拓扑排序执行组件"""
# 1. 拓扑排序确定执行顺序
execution_order = topological_sort(self.graph)
# 2. 按顺序执行每个组件
for component_name in execution_order:
component = self.graph.nodes[component_name]["instance"]
# 收集来自上游组件的输入
inputs = collect_inputs(component_name, self.graph)
# 执行组件
outputs = component.run(**inputs)
# 存储输出供下游使用
store_outputs(component_name, outputs)
# 3. 返回最终结果
return collect_final_outputs()
# component.py - 核心装饰器
def _component(self, cls: type[T]) -> type[T]:
"""验证组件结构并注册到注册表"""
# 1. 检查必需方法
if not hasattr(cls, 'run'):
raise ComponentError(
f"{cls.__name__} must have a 'run()' method"
)
# 2. 创建带 ComponentMeta 元类的新类
new_cls = new_class(
cls.__name__,
cls.__bases__,
{"metaclass": ComponentMeta},
copy_class_namespace
)
# 3. 注册到组件注册表
self.registry[f"{cls.__module__}.{cls.__name__}"] = new_cls
return new_cls
component = _Component()
组件必须遵循的契约:
@component
class MyRetriever:
"""自定义检索器示例"""
def __init__(self, document_store: DocumentStore):
# 初始化参数必须是基本类型或可序列化
self.document_store = document_store
def warm_up(self):
"""可选:加载模型等重资源"""
pass
@component.output_types(documents=list[Document])
def run(self, query: str, top_k: int = 10):
"""必需:主逻辑"""
docs = self.document_store.bm25_retrieval(query, top_k)
return {"documents": docs}
# 输入/输出类型声明
class ComponentMeta(type):
def __call__(cls, *args, **kwargs):
instance = super().__call__(*args, **kwargs)
# 解析输入 sockets
ComponentMeta._parse_and_set_input_sockets(cls, instance)
# 解析输出 sockets
ComponentMeta._parse_and_set_output_sockets(instance)
return instance
# 输出类型装饰器
@component.output_types(documents=list[Document], query=str)
def run(self, query: str):
return {"documents": [...], "query": query}
Document Store 是存储和检索文档的核心组件
存储后端
核心功能
# document_store.py
class InMemoryDocumentStore:
def __init__(
self,
bm25_tokenization_regex: str = r"(?u)\b\w+\b",
bm25_algorithm: Literal["BM25Okapi", "BM25L", "BM25Plus"] = "BM25L",
embedding_similarity_function: Literal["dot_product", "cosine"] = "dot_product",
index: str | None = None,
):
self.tokenizer = re.compile(bm25_tokenization_regex).findall
self.bm25_algorithm_inst = self._dispatch_bm25()
# 全局存储(按索引隔离)
self.index = index or str(uuid.uuid4())
_STORAGES[self.index] = {}
BM25 是经典的文本相关性排序算法
# BM25 评分公式
# score(D, Q) = Σ IDF(qi) * (f(qi, D) * (k1 + 1)) / (f(qi, D) + k1 * (1 - b + b * |D| / avgdl))
# 参数说明:
# - f(qi, D): 词 qi 在文档 D 中的频率
# - |D|: 文档长度
# - avgdl: 平均文档长度
# - k1: 词频饱和参数(默认 1.5)
# - b: 文档长度归一化参数(默认 0.75)
# Haystack 支持三种变体:
# 1. BM25Okapi - 经典版本
# 2. BM25L - 改进的长文档处理
# 3. BM25Plus - 增加平滑因子
def _score_bm25l(self, query: str, documents: list[Document]):
k = self.bm25_parameters.get("k1", 1.5)
b = self.bm25_parameters.get("b", 0.75)
delta = self.bm25_parameters.get("delta", 0.5)
def _compute_idf(tokens: list[str]) -> dict[str, float]:
idf = {}
n_corpus = len(self._bm25_attr)
for tok in tokens:
n = self._freq_vocab_for_idf.get(tok, 0)
idf[tok] = math.log((n_corpus + 1.0) / (n + 0.5)) * int(n != 0)
return idf
def _compute_tf(token, freq, doc_len):
freq_term = freq.get(token, 0.0)
ctd = freq_term / (1 - b + b * doc_len / self._avg_doc_len)
return (1.0 + k) * (ctd + delta) / (k + ctd + delta)
# 计算每个文档的分数...
def _score_bm25okapi(self, query: str, documents: list[Document]):
k = self.bm25_parameters.get("k1", 1.5)
b = self.bm25_parameters.get("b", 0.75)
epsilon = self.bm25_parameters.get("epsilon", 0.25)
def _compute_idf(tokens: list[str]) -> dict[str, float]:
idf = {}
for tok, n in self._freq_vocab_for_idf.items():
idf[tok] = math.log((len(self._bm25_attr) - n + 0.5) / (n + 0.5))
if idf[tok] < 0:
neg_idf_tokens.append(tok)
# 处理负 IDF
eps = epsilon * sum_idf / len(self._freq_vocab_for_idf)
for tok in neg_idf_tokens:
idf[tok] = eps
return {tok: idf.get(tok, 0.0) for tok in tokens}
def _score_bm25plus(self, query: str, documents: list[Document]):
k = self.bm25_parameters.get("k1", 1.5)
b = self.bm25_parameters.get("b", 0.75)
delta = self.bm25_parameters.get("delta", 1.0) # 注意默认值不同
def _compute_idf(tokens: list[str]) -> dict[str, float]:
idf = {}
n_corpus = len(self._bm25_attr)
for tok in tokens:
n = self._freq_vocab_for_idf.get(tok, 0)
# BM25+ 使用 +1 平滑
idf[tok] = math.log(1 + (n_corpus - n + 0.5) / (n + 0.5)) * int(n != 0)
return idf
def _compute_tf(token, freq, doc_len):
freq_term = freq.get(token, 0.0)
freq_damp = k * (1 - b + b * doc_len / self._avg_doc_len)
return freq_term * (1.0 + k) / (freq_term + freq_damp) + delta
def embedding_retrieval(
self,
query_embedding: list[float],
filters: dict | None = None,
top_k: int = 10,
scale_score: bool = False,
) -> list[Document]:
"""使用向量相似度检索文档"""
# 1. 获取所有文档
all_documents = self.filter_documents(filters)
documents_with_embeddings = [d for d in all_documents if d.embedding]
# 2. 计算相似度分数
scores = self._compute_query_embedding_similarity_scores(
embedding=query_embedding,
documents=documents_with_embeddings,
scale_score=scale_score
)
# 3. 排序并返回 top_k
top_docs = sorted(zip(documents_with_embeddings, scores),
key=lambda x: x[1], reverse=True)[:top_k]
return top_docs
Embedder 将文本转换为向量表示
文本 Embedder
文档 Embedder
@component
class SentenceTransformersTextEmbedder:
def __init__(
self,
model: str = "sentence-transformers/all-mpnet-base-v2",
device: ComponentDevice | None = None,
prefix: str = "",
suffix: str = "",
normalize_embeddings: bool = False,
precision: Literal["float32", "int8", "uint8"] = "float32",
backend: Literal["torch", "onnx", "openvino"] = "torch",
):
self.model = model
self.embedding_backend = None
def warm_up(self):
if self.embedding_backend is None:
self.embedding_backend = _SentenceTransformersEmbeddingBackendFactory.get_embedding_backend(
model=self.model, device=self.device.to_torch_str()
)
@component.output_types(embedding=list[float])
def run(self, text: str) -> dict:
if self.embedding_backend is None:
self.warm_up()
embedding = self.embedding_backend.embed([self.prefix + text + self.suffix])[0]
return {"embedding": embedding}
Retriever 负责从 Document Store 检索相关文档
| Retriever | 检索方式 | 适用场景 |
|---|---|---|
| InMemoryBM25Retriever | 关键词匹配 | 精确匹配 |
| InMemoryEmbeddingRetriever | 向量相似度 | 语义搜索 |
| ElasticsearchBM25Retriever | 分布式 BM25 | 大规模生产 |
| QdrantRetriever | 向量数据库 | 高维向量 |
@component
class InMemoryBM25Retriever:
def __init__(
self,
document_store: InMemoryDocumentStore,
filters: dict | None = None,
top_k: int = 10,
scale_score: bool = False,
filter_policy: FilterPolicy = FilterPolicy.REPLACE,
):
if not isinstance(document_store, InMemoryDocumentStore):
raise TypeError("document_store must be InMemoryDocumentStore")
self.document_store = document_store
self.top_k = top_k
@component.output_types(documents=list[Document])
def run(self, query: str, filters: dict | None = None) -> dict:
docs = self.document_store.bm25_retrieval(
query=query,
filters=filters or self.filters,
top_k=top_k or self.top_k
)
return {"documents": docs}
@component
class PromptBuilder:
"""使用 Jinja2 模板渲染 Prompt"""
def __init__(
self,
template: str,
required_variables: list[str] | Literal["*"] | None = None,
variables: list[str] | None = None,
):
self._template_string = template
self._env = SandboxedEnvironment(extensions=[Jinja2TimeExtension])
self.template = self._env.from_string(template)
# 提取模板变量
variables = variables or list(extract_template_variables(template))
for var in self.variables:
component.set_input_type(self, var, Any, "")
@component.output_types(prompt=str)
def run(self, template: str | None = None, **kwargs) -> dict:
compiled = self._env.from_string(template) if template else self.template
return {"prompt": compiled.render(kwargs)}
# RAG Prompt 模板示例
prompt_template = """
Given these documents, answer the question.
Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question: {{ query }}
{% if answer_language %}
Please answer in {{ answer_language }}.
{% endif %}
Answer:
"""
# 运行时渲染
builder.run(
documents=retrieved_docs,
query="What is Haystack?",
answer_language="Chinese"
)
# openai.py - OpenAI 聊天生成器
@component
class OpenAIChatGenerator:
def __init__(
self,
api_key: Secret = Secret.from_env_var("OPENAI_API_KEY"),
model: str = "gpt-4o-mini",
generation_kwargs: dict | None = None,
system_prompt: str | None = None,
):
self.api_key = api_key
self.model = model
self.generation_kwargs = generation_kwargs or {}
self.client = None # 延迟初始化
def warm_up(self):
if self.client is None:
self.client = OpenAI(api_key=self.api_key.resolve_value())
@component.output_types(replies=list[ChatMessage])
def run(self, messages: list[ChatMessage]) -> dict:
if self.client is None:
self.warm_up()
response = self.client.chat.completions.create(
model=self.model,
messages=[m.to_openai_dict() for m in messages],
**self.generation_kwargs
)
return {"replies": [ChatMessage.from_assistant(r.message.content) for r in response.choices]}
# ChatMessage 表示对话消息
@dataclass
class ChatMessage:
content: str
role: Literal["system", "user", "assistant", "function", "tool"]
name: str | None = None
meta: dict = field(default_factory=dict)
@classmethod
def from_user(cls, content: str) -> "ChatMessage":
return cls(content=content, role="user")
@classmethod
def from_assistant(cls, content: str) -> "ChatMessage":
return cls(content=content, role="assistant")
@classmethod
def from_system(cls, content: str) -> "ChatMessage":
return cls(content=content, role="system")
def to_openai_dict(self) -> dict:
return {"role": self.role, "content": self.content}
RAG Pipeline 架构图
┌──────────────────────────────────────────────┐
│ │
│ Query ──► TextEmbedder ──► EmbeddingRetriever
│ │ │
│ ▼ │
│ Retrieved Docs │
│ │ │
│ ▼ │
│ PromptBuilder ◄──────────┘ │
│ │ │
│ ▼ │
│ Rendered Prompt │
│ │ │
│ ▼ │
│ OpenAI Generator │
│ │ │
│ ▼ │
│ Answer │
│ │
└──────────────────────────────────────────────┘
from haystack import Pipeline, Document
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers import InMemoryEmbeddingRetriever
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from haystack.document_stores.in_memory import InMemoryDocumentStore
# 1. 准备文档
docs = [Document(content="Haystack is an NLP framework by deepset.")]
doc_store = InMemoryDocumentStore()
doc_store.write_documents(docs)
# 2. 创建 Pipeline
pipeline = Pipeline()
pipeline.add_component("embedder", SentenceTransformersTextEmbedder())
pipeline.add_component("retriever", InMemoryEmbeddingRetriever(doc_store))
pipeline.add_component("prompt_builder", PromptBuilder(template=rag_template))
pipeline.add_component("generator", OpenAIGenerator())
# 3. 连接组件
pipeline.connect("embedder.embedding", "retriever.query_embedding")
pipeline.connect("retriever.documents", "prompt_builder.documents")
pipeline.connect("prompt_builder.prompt", "generator.prompt")
# 4. 运行 Pipeline
question = "What is Haystack?"
result = pipeline.run({
"embedder": {"text": question},
"prompt_builder": {"query": question}
})
print(result["generator"]["replies"][0])
# Output: "Haystack is an NLP framework developed by deepset
# for building production-ready NLP applications..."
Haystack 采用组件化设计,每个组件职责单一
# 组件化优势
# 1. 单一职责
class InMemoryBM25Retriever: # 只负责 BM25 检索
class OpenAIGenerator: # 只负责调用 OpenAI
# 2. 可替换性
# 可以轻松切换不同的实现
retriever = InMemoryBM25Retriever(doc_store)
# 或
retriever = ElasticsearchBM25Retriever(es_client)
# 3. 可测试性
# 每个组件可以独立测试
def test_retriever():
result = retriever.run(query="test")
assert "documents" in result
# 管道模式特点
# 1. 有向无环图(DAG)
# - 组件连接形成图结构
# - 无循环依赖
# 2. 拓扑排序执行
# - 按依赖顺序执行
# - 并行执行无依赖的组件
# 3. 数据流
# - 组件间通过连接传递数据
# - 类型安全保证
# 管道可视化
pipeline.draw("my_pipeline.png") # 生成管道图
# 序列化
pipeline.dump("my_pipeline.yaml") # 保存管道配置
pipeline = Pipeline.load("my_pipeline.yaml") # 加载管道
# Haystack 使用工厂模式创建组件
# 1. 组件注册表
class _Component:
def __init__(self):
self.registry: dict[str, type] = {}
def _component(self, cls):
class_path = f"{cls.__module__}.{cls.__name__}"
self.registry[class_path] = cls
return cls
# 2. 从字典创建
@classmethod
def from_dict(cls, data: dict) -> "Component":
return default_from_dict(cls, data)
# 3. 序列化支持
def to_dict(self) -> dict:
return default_to_dict(self, **self.init_parameters)
# Pipeline 序列化示例
pipeline.dump("my_pipeline.yaml")
# my_pipeline.yaml 内容
components:
retriever:
type: haystack.components.retrievers.InMemoryBM25Retriever
init_parameters:
document_store:
type: haystack.document_stores.in_memory.InMemoryDocumentStore
top_k: 10
generator:
type: haystack.components.generators.OpenAIGenerator
init_parameters:
model: gpt-4o-mini
connections:
- sender: retriever.documents
receiver: prompt_builder.documents
# Haystack 支持异步操作
# 1. 异步 Document Store 方法
async def bm25_retrieval_async(self, query: str, top_k: int) -> list[Document]:
return await asyncio.get_running_loop().run_in_executor(
self.executor,
lambda: self.bm25_retrieval(query=query, top_k=top_k)
)
# 2. 异步 Retriever
@component.output_types(documents=list[Document])
async def run_async(self, query: str) -> dict:
docs = await self.document_store.bm25_retrieval_async(query=query)
return {"documents": docs}
# 3. 异步 Pipeline 执行
result = await pipeline.run_async(data)
# 文档过滤语法
filters = {
"operator": "AND",
"conditions": [
{"field": "meta.category", "operator": "==", "value": "tech"},
{"field": "meta.date", "operator": ">=", "value": "2024-01-01"}
]
}
# 支持的操作符
# ==, !=, >, >=, <, <=
# in, not in
# like (模糊匹配)
# 在 Retriever 中使用
retriever.run(query="Python", filters=filters)
# Document 数据类
@dataclass
class Document:
id: str = field(default_factory=lambda: str(uuid4()))
content: str | None = None
embedding: list[float] | None = None
meta: dict = field(default_factory=dict)
score: float | None = None
def to_dict(self) -> dict:
return {
"id": self.id,
"content": self.content,
"embedding": self.embedding,
"meta": self.meta,
"score": self.score
}
@classmethod
def from_dict(cls, data: dict) -> "Document":
return cls(**data)
Haystack 使用 Python 类型注解确保类型安全
# 类型注解示例
from typing import Literal
from haystack import component
from haystack.dataclasses import Document
@component
class TypedRetriever:
@component.output_types(documents=list[Document])
def run(
self,
query: str,
top_k: int = 10,
filters: dict | None = None
) -> dict[str, list[Document]]:
# 类型检查在运行时验证
return {"documents": [...]}
# Literal 类型用于限制选项
bm25_algorithm: Literal["BM25Okapi", "BM25L", "BM25Plus"] = "BM25L"
# Haystack 自定义异常
class HaystackError(Exception):
"""基础异常类"""
pass
class DocumentStoreError(HaystackError):
"""文档存储错误"""
pass
class DuplicateDocumentError(DocumentStoreError):
"""重复文档错误"""
pass
class ComponentError(HaystackError):
"""组件错误"""
pass
# 使用示例
try:
doc_store.write_documents(docs)
except DuplicateDocumentError as e:
logger.warning(f"Document already exists: {e}")
except DocumentStoreError as e:
logger.error(f"Document store error: {e}")
缓存策略
并行处理
内存优化
检索优化
# 自定义组件示例
@component
class MyCustomRanker:
"""自定义重排序组件"""
def __init__(self, model_name: str):
self.model = None
self.model_name = model_name
def warm_up(self):
self.model = load_model(self.model_name)
@component.output_types(documents=list[Document])
def run(self, documents: list[Document], query: str) -> dict:
# 自定义重排序逻辑
scores = self.model.score(query, [d.content for d in documents])
ranked = sorted(zip(documents, scores), key=lambda x: x[1], reverse=True)
return {"documents": [d for d, s in ranked]}
| 特性 | Haystack | LangChain | LlamaIndex |
|---|---|---|---|
| 架构风格 | Pipeline | Chain | Index |
| 类型安全 | 强类型 | 弱类型 | 中等 |
| 检索算法 | BM25 + 向量 | 向量为主 | 向量为主 |
| 生产就绪 | ✅ | ⚠️ | ⚠️ |
| 学习曲线 | 中等 | 低 | 低 |
RAG 应用
信息提取
搜索系统
内容生成
# 1. 文档预处理
docs = [
Document(content=clean_text(text), meta={"source": url})
for text, url in raw_data
]
# 2. 分块策略
from haystack.components.preprocessors import DocumentSplitter
splitter = DocumentSplitter(split_by="sentence", split_length=10)
# 3. 混合检索
# BM25 + 向量检索结合提高召回率
bm25_retriever = InMemoryBM25Retriever(doc_store)
embedding_retriever = InMemoryEmbeddingRetriever(doc_store)
# 4. Pipeline 序列化
pipeline.dump("production_pipeline.yaml")
# 5. 监控和日志
import logging
logging.basicConfig(level=logging.INFO)
性能问题
质量问题
部署问题
调试问题
# 1. 启用详细日志
import logging
logging.basicConfig(level=logging.DEBUG)
# 2. 检查组件输出
result = retriever.run(query="test")
print(f"Retrieved {len(result['documents'])} documents")
for doc in result['documents']:
print(f" - {doc.content[:50]}... (score: {doc.score})")
# 3. Pipeline 可视化
pipeline.draw("debug_pipeline.png")
# 4. 单独测试组件
embedder = SentenceTransformersTextEmbedder()
embedding = embedder.run(text="test")["embedding"]
print(f"Embedding dimension: {len(embedding)}")
近期计划
长期目标
社区提案
生态整合
官方资源
第三方集成
Stars: 18k+ | Contributors: 200+ | 用户遍布全球
| 模块 | 路径 | 核心内容 |
|---|---|---|
| Pipeline | haystack/core/pipeline/ | 管道核心逻辑 |
| Component | haystack/core/component/ | 组件装饰器和协议 |
| Document Store | haystack/document_stores/ | 存储后端实现 |
| Retrievers | haystack/components/retrievers/ | 检索组件 |
| Generators | haystack/components/generators/ | LLM 生成器 |
┌─────────────────────────────────────────────────────┐
│ Haystack 2.x │
├─────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────┐ │
│ │ Core Layer │ │
│ │ Pipeline | Component | Document | Dataclasses│ │
│ └─────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Document Stores │ │
│ │ InMemory | Elasticsearch | Qdrant | Pinecone│ │
│ └─────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Components │ │
│ │ Embedders | Retrievers | Builders | Generators│ │
│ └─────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Utilities │ │
│ │ Filters | Serialization | Telemetry | Logging│ │
│ └─────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
Haystack RAG 框架核心要点:
架构特点
核心技术
50 页技术深度解析完成!
🌾 Haystack - 构建生产级 RAG 系统的最佳选择
感谢阅读!欢迎 Star 和贡献代码 🌟