源码级别解析 · pipeline.py · component.py · retrievers.py · generators.py
2026-03-24 | 每日技术深度解读
Haystack 是 deepset 开发的生产级 RAG(检索增强生成)框架
核心特性:
定位: LangChain/LlamaIndex 的替代方案,更注重生产环境稳定性
核心模块:Pipeline、Component、Retriever、Generator
| 概念 | 职责 | 类比 |
|---|---|---|
| Pipeline | 组件编排引擎,管理执行流程 | 工作流引擎(Airflow) |
| Component | 可复用的处理单元 | Unix 管道命令 |
| Socket | 组件的输入/输出端口 | 函数参数和返回值 |
| Document | 数据载体(文本 + 元数据 + 向量) | ORM 实体对象 |
class Pipeline:
"""组件编排引擎"""
def __init__(self):
self.graph = nx.DiGraph() # 有向无环图
self._component_names: dict[int, str] = {}
# 组件管理
def add_component(self, name: str, instance: Component) -> None: ...
def get_component(self, name: str) -> Component: ...
def remove_component(self, name: str) -> None: ...
# 连接管理
def connect(self, from_component: str, to_component: str) -> None: ...
# 执行
def run(self, data: dict, ...) -> dict: ...
async def run_async(self, data: dict, ...) -> dict: ...
# 序列化
def to_dict(self) -> dict: ...
@classmethod
def from_dict(cls, data: dict) -> "Pipeline": ...
def dumps(self) -> str: ...
@classmethod
def loads(cls, data: str) -> "Pipeline": ...
def add_component(self, name: str, instance: Component) -> None:
# 1. 验证组件名称
if name in self.graph.nodes:
raise ValueError(f"Component '{name}' already exists")
# 2. 验证是否为有效组件
if not isinstance(instance, Component):
raise ValueError("Must be a Component instance")
# 3. 检查组件是否已被其他 Pipeline 使用
if instance.__haystack_added_to_pipeline__ is not None:
raise ValueError(f"Component already added to another Pipeline")
# 4. 添加到图中
self.graph.add_node(name, instance=instance)
self._component_names[id(instance)] = name
# 5. 标记组件归属
instance.__haystack_added_to_pipeline__ = weakref.ref(self)
关键: 一个 Component 实例只能属于一个 Pipeline
def _topological_sort(self) -> list[str]:
"""使用 Kahn 算法进行拓扑排序"""
# 1. 计算入度
in_degree = {node: 0 for node in self.graph.nodes()}
for node in self.graph.nodes():
for predecessor in self.graph.predecessors(node):
in_degree[node] += 1
# 2. 初始化队列(入度为 0 的节点)
queue = deque([node for node, degree in in_degree.items() if degree == 0])
result = []
# 3. BFS 遍历
while queue:
node = queue.popleft()
result.append(node)
for successor in self.graph.successors(node):
in_degree[successor] -= 1
if in_degree[successor] == 0:
queue.append(successor)
# 4. 检测循环依赖
if len(result) != len(self.graph.nodes()):
raise ValueError("Pipeline contains a cycle!")
return result
def _prepare_component_inputs(self, component_name: str) -> dict:
"""准备组件输入数据"""
component = self.graph.nodes[component_name]["instance"]
inputs = {}
# 1. 从前驱组件的输出中获取输入
for predecessor in self.graph.predecessors(component_name):
edge_data = self.graph.edges[predecessor, component_name]
from_socket = edge_data["from_socket"]
to_socket = edge_data["to_socket"]
# 获取前驱组件的输出值
predecessor_output = self._component_outputs[predecessor]
inputs[to_socket] = predecessor_output[from_socket]
# 2. 合并用户提供的输入
if component_name in self._user_inputs:
inputs.update(self._user_inputs[component_name])
# 3. 应用默认值
for socket_name, socket in component.__haystack_input__.items():
if socket_name not in inputs and socket.default_value is not _empty:
inputs[socket_name] = socket.default_value
return inputs
def run(self, data: dict, debug: bool = False) -> dict:
"""支持断点调试的执行"""
if debug:
# 1. 记录每个组件的执行状态
self._debug_info = {
"inputs": {},
"outputs": {},
"timings": {}
}
for component_name in self._topological_sort():
component = self.graph.nodes[component_name]["instance"]
inputs = self._prepare_component_inputs(component_name)
if debug:
start_time = time.time()
self._debug_info["inputs"][component_name] = deepcopy(inputs)
# 执行组件
outputs = component.run(**inputs)
if debug:
self._debug_info["outputs"][component_name] = outputs
self._debug_info["timings"][component_name] = time.time() - start_time
self._component_outputs[component_name] = outputs
return self._collect_final_outputs()
@component
class MyRetriever:
"""自定义检索器组件"""
def __init__(self, top_k: int = 10):
self.top_k = top_k
@component.output_types(documents=list[Document])
def run(self, query: str) -> dict:
"""执行检索"""
docs = self._search(query, top_k=self.top_k)
return {"documents": docs}
# 装饰器做了什么?
# 1. 设置 __haystack_input__ 和 __haystack_output__
# 2. 注册到 component.registry
# 3. 验证 run() 方法签名
# 4. 添加序列化支持
核心: 装饰器自动解析类型注解,创建输入/输出 Socket
class ComponentMeta(type):
"""组件元类,在实例化时设置 Socket"""
def __call__(cls, *args, **kwargs):
# 1. 调用 __new__ 和 __init__
instance = super().__call__(*args, **kwargs)
# 2. 检查 run_async 是否为协程
has_async_run = hasattr(instance, "run_async")
if has_async_run and not inspect.iscoroutinefunction(instance.run_async):
raise ComponentError("run_async must be a coroutine")
# 3. 解析输入 Socket
ComponentMeta._parse_and_set_input_sockets(cls, instance)
# 4. 解析输出 Socket
ComponentMeta._parse_and_set_output_sockets(instance)
# 5. 标记未添加到 Pipeline
instance.__haystack_added_to_pipeline__ = None
return instance
@dataclass
class InputSocket:
"""组件的输入端口"""
name: str # Socket 名称
type: Any # 类型注解
default_value: Any = _empty # 默认值
is_variadic: bool = False # 是否可变参数
@property
def is_optional(self) -> bool:
"""是否可选(有默认值)"""
return self.default_value is not _empty
# 示例:从 run 方法签名解析
def run(self, query: str, top_k: int = 10, **kwargs):
pass
# 解析结果:
# InputSocket(name="query", type=str, default_value=_empty)
# InputSocket(name="top_k", type=int, default_value=10)
# InputSocket(name="kwargs", type=dict, is_variadic=True)
@dataclass
class OutputSocket:
"""组件的输出端口"""
name: str # Socket 名称
type: Any # 类型注解
# 定义输出的两种方式:
# 方式1:装饰器
@component.output_types(documents=list[Document], query=str)
def run(self, query: str) -> dict:
return {"documents": [...], "query": query}
# 方式2:在 __init__ 中设置
def __init__(self):
component.set_output_types(
self,
documents=list[Document],
query=str
)
# 解析结果:
# OutputSocket(name="documents", type=list[Document])
# OutputSocket(name="query", type=str)
@runtime_checkable
class Component(Protocol):
"""组件协议,用于类型检查"""
def run(self, *args: Any, **kwargs: Any) -> Mapping[str, Any]:
"""同步执行方法"""
...
# 注意:
# 1. Protocol 只用于类型检查,不影响运行时
# 2. run() 签名灵活,可以是任意参数
# 3. 返回值必须是字典(Mapping[str, Any])
# 示例:以下都是有效的组件
# def run(self, query: str) -> dict: ...
# def run(self, **kwargs) -> dict: ...
# def run(self, documents: list) -> dict: ...
协议检查: isinstance(obj, Component) 在运行时返回 True/False
class SentenceTransformersDocumentEmbedder:
"""文档嵌入器"""
def __init__(self, model: str = "sentence-transformers/all-mpnet-base-v2"):
self.model = model
self.embedding_backend = None # 延迟加载
def warm_up(self) -> None:
"""预热:加载模型"""
if self.embedding_backend is None:
self.embedding_backend = _SentenceTransformersEmbeddingBackendFactory.get_embedding_backend(
model=self.model,
device=self.device.to_torch_str(),
...
)
def run(self, documents: list[Document]) -> dict:
if self.embedding_backend is None:
self.warm_up() # 自动预热
embeddings = self.embedding_backend.embed(...)
return {"documents": [...]}
def output_types(**types: Any) -> Callable:
"""装饰器工厂:定义输出类型"""
def output_types_decorator(run_method: Callable) -> Callable:
# 1. 验证方法名
if run_method.__name__ not in ("run", "run_async"):
raise ComponentError("只能用于 run/run_async 方法")
# 2. 创建 OutputSocket
sockets = {
name: OutputSocket(name=name, type=type_)
for name, type_ in types.items()
}
# 3. 缓存到方法属性
setattr(run_method, "_output_types_cache", sockets)
return run_method
return output_types_decorator
# 使用示例
@component.output_types(replies=list[ChatMessage])
def run(self, messages: list[ChatMessage]) -> dict:
return {"replies": [...]}
class PromptBuilder:
"""提示词构建器"""
def __init__(self, template: str):
self.template = template
# 从模板中提取变量
variables = self._extract_template_variables(template)
# 动态设置输入类型
for var in variables:
component.set_input_type(self, var, Any, default="")
def run(self, **kwargs) -> dict:
"""使用模板变量作为输入"""
prompt = self.template.render(**kwargs)
return {"prompt": prompt}
# 示例:
# template = "Translate {{ text }} to {{ language }}"
# builder = PromptBuilder(template)
#
# 自动创建输入:
# InputSocket(name="text", type=Any, default="")
# InputSocket(name="language", type=Any, default="")
@component
class InMemoryBM25Retriever:
"""基于 BM25 的内存检索器"""
def __init__(
self,
document_store: InMemoryDocumentStore,
filters: dict | None = None,
top_k: int = 10,
scale_score: bool = False,
filter_policy: FilterPolicy = FilterPolicy.REPLACE
):
self.document_store = document_store
self.filters = filters
self.top_k = top_k
self.scale_score = scale_score
self.filter_policy = filter_policy
@component.output_types(documents=list[Document])
def run(self, query: str, filters: dict | None = None, top_k: int | None = None):
# 合并过滤器
final_filters = self._merge_filters(filters)
# 执行 BM25 检索
docs = self.document_store.bm25_retrieval(
query=query,
filters=final_filters,
top_k=top_k or self.top_k,
scale_score=self.scale_score
)
return {"documents": docs}
def bm25_retrieval(self, query: str, top_k: int) -> list[Document]:
"""BM25 检索算法"""
# 1. 分词
query_terms = self._tokenize(query)
# 2. 计算每个文档的 BM25 分数
scores = []
for doc in self.documents:
score = 0.0
doc_terms = self._tokenize(doc.content)
doc_len = len(doc_terms)
for term in query_terms:
if term in doc_terms:
# TF (Term Frequency)
tf = doc_terms.count(term) / doc_len
# IDF (Inverse Document Frequency)
df = sum(1 for d in self.documents if term in d.content)
idf = log((N - df + 0.5) / (df + 0.5) + 1)
# BM25 公式
score += idf * (tf * (k1 + 1)) / (tf + k1 * (1 - b + b * doc_len / avgdl))
scores.append((doc, score))
# 3. 排序并返回 top_k
scores.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, score in scores[:top_k]]
@component
class InMemoryBM25Retriever:
"""支持同步和异步的检索器"""
@component.output_types(documents=list[Document])
def run(self, query: str, ...) -> dict[str, list[Document]]:
"""同步执行"""
docs = self.document_store.bm25_retrieval(...)
return {"documents": docs}
@component.output_types(documents=list[Document])
async def run_async(self, query: str, ...) -> dict[str, list[Document]]:
"""异步执行"""
docs = await self.document_store.bm25_retrieval_async(...)
return {"documents": docs}
# 注意:
# 1. run 和 run_async 必须有相同的签名
# 2. 输出类型必须一致
# 3. Pipeline.run() 会自动选择同步/异步方法
class FilterPolicy(Enum):
REPLACE = "replace" # 替换初始化过滤器
MERGE = "merge" # 合并初始化和运行时过滤器
def _merge_filters(self, runtime_filters: dict | None) -> dict:
"""合并过滤器"""
if self.filter_policy == FilterPolicy.MERGE and runtime_filters:
# 合并策略:运行时覆盖初始化
return {**(self.filters or {}), **runtime_filters}
else:
# 替换策略:使用运行时过滤器
return runtime_filters or self.filters
# 使用示例:
# 初始化时:filters={"category": "tech"}
# 运行时:filters={"date": "2024"}
#
# MERGE 结果:{"category": "tech", "date": "2024"}
# REPLACE 结果:{"date": "2024"}
@component
class SentenceTransformersDocumentEmbedder:
"""文档嵌入器"""
def __init__(
self,
model: str = "sentence-transformers/all-mpnet-base-v2",
device: ComponentDevice | None = None,
batch_size: int = 32,
normalize_embeddings: bool = False,
precision: Literal["float32", "int8", "uint8", "binary"] = "float32",
backend: Literal["torch", "onnx", "openvino"] = "torch"
):
self.model = model
self.embedding_backend = None
def warm_up(self) -> None:
"""加载模型"""
self.embedding_backend = _SentenceTransformersEmbeddingBackendFactory.get_embedding_backend(
model=self.model,
device=self.device.to_torch_str(),
backend=self.backend
)
@component.output_types(documents=list[Document])
def run(self, documents: list[Document]) -> dict:
embeddings = self.embedding_backend.embed([doc.content for doc in documents])
new_docs = [replace(doc, embedding=emb) for doc, emb in zip(documents, embeddings)]
return {"documents": new_docs}
@component
class OpenAIChatGenerator:
"""OpenAI 聊天生成器"""
SUPPORTED_MODELS = ["gpt-4o", "gpt-4", "gpt-3.5-turbo", ...]
def __init__(
self,
api_key: Secret = Secret.from_env_var("OPENAI_API_KEY"),
model: str = "gpt-4o-mini",
streaming_callback: StreamingCallbackT | None = None,
generation_kwargs: dict | None = None,
tools: list[Tool] | None = None,
tools_strict: bool = False
):
self.client = OpenAI(api_key=api_key.resolve_value())
self.async_client = AsyncOpenAI(api_key=api_key.resolve_value())
@component.output_types(replies=list[ChatMessage])
def run(
self,
messages: list[ChatMessage],
streaming_callback: StreamingCallbackT | None = None,
generation_kwargs: dict | None = None
) -> dict:
# 调用 OpenAI API
response = self.client.chat.completions.create(
model=self.model,
messages=[msg.to_openai_dict() for msg in messages],
**generation_kwargs
)
replies = [self._convert_to_chat_message(response, choice) for choice in response.choices]
return {"replies": replies}
def run(self, messages: list[ChatMessage], streaming_callback=None):
# 判断是否流式
is_streaming = streaming_callback is not None
response = self.client.chat.completions.create(
model=self.model,
messages=[msg.to_openai_dict() for msg in messages],
stream=is_streaming
)
if is_streaming:
# 流式处理
chunks = []
for chunk in response:
chunk_delta = self._convert_chunk_to_streaming_chunk(chunk)
chunks.append(chunk_delta)
streaming_callback(chunk_delta) # 实时回调
replies = [self._convert_chunks_to_chat_message(chunks)]
else:
# 非流式处理
replies = [self._convert_to_chat_message(response, choice) for choice in response.choices]
return {"replies": replies}
@component
class OpenAIChatGenerator:
def __init__(self, tools: list[Tool] | None = None):
self.tools = tools
def _prepare_tools(self, tools: list[Tool] | None) -> list[dict]:
"""准备工具定义"""
if not tools:
return []
return [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.parameters
}
}
for tool in tools
]
def run(self, messages: list[ChatMessage], tools: list[Tool] | None = None):
response = self.client.chat.completions.create(
model=self.model,
messages=[...],
tools=self._prepare_tools(tools or self.tools)
)
# 提取工具调用
tool_calls = self._extract_tool_calls(response)
return {"replies": [ChatMessage.from_assistant(tool_calls=tool_calls)]}
from pydantic import BaseModel
class Person(BaseModel):
name: str
age: int
email: str
@component
class OpenAIChatGenerator:
def run(self, messages: list[ChatMessage], response_format: type[BaseModel] | None = None):
# 准备 response_format
if response_format:
json_schema = {
"type": "json_schema",
"json_schema": {
"name": response_format.__name__,
"strict": True,
"schema": to_strict_json_schema(response_format)
}
}
generation_kwargs["response_format"] = json_schema
# 调用 OpenAI API
response = self.client.chat.completions.create(
model=self.model,
messages=[...],
**generation_kwargs
)
# 解析响应
person = response.choices[0].message.parsed # 自动解析为 Pydantic 模型
return {"replies": [ChatMessage.from_assistant(content=person)]}
@component
class PromptBuilder:
"""提示词构建器"""
def __init__(
self,
template: str,
required_variables: list[str] | Literal["*"] | None = None,
variables: list[str] | None = None
):
self._template_string = template
self._env = SandboxedEnvironment(extensions=[Jinja2TimeExtension])
self.template = self._env.from_string(template)
# 提取模板变量
if not variables:
_, template_variables = _extract_template_variables_and_assignments(template)
variables = list(template_variables)
# 动态设置输入类型
for var in variables:
if required_variables == "*" or var in (required_variables or []):
component.set_input_type(self, var, Any) # 必需
else:
component.set_input_type(self, var, Any, "") # 可选,默认空字符串
@component.output_types(prompt=str)
def run(
self,
template: str | None = None,
template_variables: dict | None = None,
**kwargs
) -> dict:
"""渲染模板"""
# 合并变量
variables = {**kwargs, **(template_variables or {})}
# 验证必需变量
self._validate_variables(set(variables.keys()))
# 选择模板
compiled_template = self.template
if template is not None:
compiled_template = self._env.from_string(template)
# 渲染
prompt = compiled_template.render(variables)
return {"prompt": prompt}
# 示例:
# template = "Translate {{ text }} to {{ language }}"
# builder.run(text="Hello", language="Chinese")
# 输出:{"prompt": "Translate Hello to Chinese"}
# 运行时替换模板
documents = [Document(content="Python is great")]
new_template = """
Given these documents, answer the question.
Documents:
{% for doc in documents %}
Document {{ loop.index }}:
{{ doc.content }}
{% endfor %}
Question: {{ query }}
Answer:
"""
result = pipeline.run({
"prompt_builder": {
"documents": documents,
"query": "What is Python?",
"template": new_template # 运行时替换
}
})
print(result["prompt_builder"]["prompt"])
from haystack import Pipeline
from haystack.components.converters import TextFileToDocument
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.components.writers import DocumentWriter
# 创建索引 Pipeline
indexing_pipeline = Pipeline()
# 添加组件
indexing_pipeline.add_component("converter", TextFileToDocument())
indexing_pipeline.add_component("cleaner", DocumentCleaner())
indexing_pipeline.add_component("splitter", DocumentSplitter(split_by="sentence", split_length=10))
indexing_pipeline.add_component("embedder", SentenceTransformersDocumentEmbedder())
indexing_pipeline.add_component("writer", DocumentWriter(document_store))
# 连接组件
indexing_pipeline.connect("converter", "cleaner")
indexing_pipeline.connect("cleaner", "splitter")
indexing_pipeline.connect("splitter", "embedder")
indexing_pipeline.connect("embedder", "writer")
# 执行索引
indexing_pipeline.run({"converter": {"sources": ["doc.txt"]}})
from haystack.components.retrievers import InMemoryEmbeddingRetriever
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
# 创建查询 Pipeline
query_pipeline = Pipeline()
# 添加组件
query_pipeline.add_component("text_embedder", SentenceTransformersTextEmbedder())
query_pipeline.add_component("retriever", InMemoryEmbeddingRetriever(document_store))
query_pipeline.add_component("prompt_builder", PromptBuilder(template=rag_template))
query_pipeline.add_component("llm", OpenAIGenerator())
# 连接组件
query_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
query_pipeline.connect("retriever.documents", "prompt_builder.documents")
query_pipeline.connect("prompt_builder.prompt", "llm.prompt")
# 执行查询
result = query_pipeline.run({
"text_embedder": {"text": "What is Python?"},
"prompt_builder": {"query": "What is Python?"}
})
def connect(self, from_component: str, to_component: str) -> None:
"""连接两个组件"""
# 1. 验证组件存在
if from_component not in self.graph.nodes:
raise ValueError(f"Component '{from_component}' not found")
if to_component not in self.graph.nodes:
raise ValueError(f"Component '{to_component}' not found")
# 2. 获取组件实例
from_instance = self.graph.nodes[from_component]["instance"]
to_instance = self.graph.nodes[to_component]["instance"]
# 3. 匹配 Socket
from_sockets = from_instance.__haystack_output__
to_sockets = to_instance.__haystack_input__
# 自动匹配同名 Socket
for socket_name in from_sockets:
if socket_name in to_sockets:
self.graph.add_edge(
from_component, to_component,
from_socket=socket_name,
to_socket=socket_name
)
# 4. 检测循环依赖
if not nx.is_directed_acyclic_graph(self.graph):
raise ValueError("Connection creates a cycle!")
# 序列化 Pipeline
pipeline_dict = query_pipeline.to_dict()
# 保存到文件
import yaml
with open("pipeline.yml", "w") as f:
yaml.dump(pipeline_dict, f)
# 从文件加载
with open("pipeline.yml", "r") as f:
pipeline_dict = yaml.safe_load(f)
pipeline = Pipeline.from_dict(pipeline_dict)
# pipeline_dict 结构:
{
"metadata": {"version": "2.0"},
"components": {
"retriever": {
"type": "InMemoryBM25Retriever",
"init_parameters": {
"document_store": {"type": "InMemoryDocumentStore"},
"top_k": 10
}
}
},
"connections": [
{"sender": "retriever.documents", "receiver": "prompt_builder.documents"}
]
}
def to_dict(self) -> dict:
"""序列化 Pipeline"""
return {
"metadata": {"version": "2.0"},
"components": {
name: {
"type": instance.__class__.__module__ + "." + instance.__class__.__name__,
"init_parameters": instance.to_dict()["init_parameters"]
}
for name, instance in self.graph.nodes(data="instance")
},
"connections": [
{
"sender": f"{from_comp}.{data['from_socket']}",
"receiver": f"{to_comp}.{data['to_socket']}"
}
for from_comp, to_comp, data in self.graph.edges(data=True)
]
}
@classmethod
def from_dict(cls, data: dict) -> "Pipeline":
"""反序列化 Pipeline"""
pipeline = cls()
# 1. 创建组件
for name, config in data["components"].items():
component_class = deserialize_class(config["type"])
instance = component_class(**config["init_parameters"])
pipeline.add_component(name, instance)
# 2. 连接组件
for conn in data["connections"]:
sender, receiver = conn["sender"].split("."), conn["receiver"].split(".")
pipeline.connect(sender[0], receiver[0])
return pipeline
metadata:
version: "2.0"
author: "Haystack Team"
components:
- name: retriever
type: haystack.components.retrievers.InMemoryBM25Retriever
params:
document_store:
type: haystack.document_stores.InMemoryDocumentStore
top_k: 10
- name: prompt_builder
type: haystack.components.builders.PromptBuilder
params:
template: |
Given these documents: {{ documents }}
Answer: {{ query }}
- name: generator
type: haystack.components.generators.OpenAIGenerator
params:
model: gpt-4o-mini
api_key:
env_vars: ["OPENAI_API_KEY"]
connections:
- sender: retriever.documents
receiver: prompt_builder.documents
- sender: prompt_builder.prompt
receiver: generator.prompt
class PipelineError(Exception):
"""Pipeline 错误基类"""
pass
class PipelineRuntimeError(PipelineError):
"""运行时错误"""
pass
class PipelineConnectError(PipelineError):
"""连接错误"""
pass
def run(self, data: dict) -> dict:
try:
# 验证输入
self._validate_inputs(data)
# 执行 Pipeline
for component_name in self._topological_sort():
try:
component = self.graph.nodes[component_name]["instance"]
inputs = self._prepare_inputs(component_name, data)
outputs = component.run(**inputs)
self._component_outputs[component_name] = outputs
except Exception as e:
raise PipelineRuntimeError(
f"Error in component '{component_name}': {e}"
) from e
return self._collect_outputs()
except PipelineError:
raise
except Exception as e:
raise PipelineError(f"Unexpected error: {e}") from e
# 并行执行示例
# ┌─> ComponentA ─┐
# Input ──┤ ├──> Merge
# └─> ComponentB ─┘
# ComponentA 和 ComponentB 可以并行执行
async def run_async(self, data: dict) -> dict:
"""异步执行 Pipeline"""
# 1. 识别可并行执行的组件
layers = self._topological_layers()
for layer in layers:
# 2. 并行执行同一层的组件
tasks = []
for component_name in layer:
component = self.graph.nodes[component_name]["instance"]
inputs = self._prepare_inputs(component_name, data)
if hasattr(component, "run_async"):
tasks.append(component.run_async(**inputs))
else:
# 包装同步方法为异步
tasks.append(asyncio.to_thread(component.run, **inputs))
# 3. 等待所有任务完成
outputs = await asyncio.gather(*tasks)
# 4. 保存输出
for component_name, output in zip(layer, outputs):
self._component_outputs[component_name] = output
return self._collect_outputs()
# 批处理 Embedding
@component
class BatchEmbedder:
def __init__(self, batch_size: int = 32):
self.batch_size = batch_size
self.embedding_backend = None
def run(self, documents: list[Document]) -> dict:
# 分批处理
all_embeddings = []
for i in range(0, len(documents), self.batch_size):
batch = documents[i:i + self.batch_size]
texts = [doc.content for doc in batch]
embeddings = self.embedding_backend.embed(texts)
all_embeddings.extend(embeddings)
# 更新文档
new_docs = [
replace(doc, embedding=emb)
for doc, emb in zip(documents, all_embeddings)
]
return {"documents": new_docs}
# 性能提升:批处理可以减少 50% 的 API 调用次数
| 框架 | 架构 | 优点 | 缺点 |
|---|---|---|---|
| Haystack | Pipeline + Component | 生产级、类型安全、易调试 | 学习曲线陡峭 |
| LangChain | Chain + Agent | 生态丰富、社区活跃 | 调试困难、不稳定 |
| LlamaIndex | Index + Query | RAG 专注、易上手 | 灵活性较低 |
| DSPy | Module + Optimizer | 自动优化 Prompt | 概念复杂 |
# 1. 使用 debug 模式
result = pipeline.run(data, debug=True)
print(pipeline._debug_info)
# 2. 可视化 Pipeline
pipeline.draw("pipeline.png")
# 3. 打印组件连接
for from_comp, to_comp, data in pipeline.graph.edges(data=True):
print(f"{from_comp}.{data['from_socket']} -> {to_comp}.{data['to_socket']}")
# 4. 单独测试组件
retriever = pipeline.get_component("retriever")
result = retriever.run(query="test")
# 5. 检查组件状态
for name, instance in pipeline.graph.nodes(data="instance"):
print(f"{name}: {instance.__haystack_input__}")
print(f"{name}: {instance.__haystack_output__}")
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 组件连接失败 | Socket 名称不匹配 | 检查输入/输出 Socket 名称 |
| 循环依赖错误 | Pipeline 有环 | 重新设计 Pipeline 拓扑 |
| 类型不匹配 | Socket 类型不一致 | 使用类型转换组件 |
| 内存溢出 | 文档数量过多 | 使用批处理和流式处理 |
| 性能慢 | 模型未预热 | 调用 warm_up 或使用 Pipeline.warm_up() |
RAG 应用、文档问答、知识库、智能客服
感谢阅读!
访问 https://atcfu.com/ai-articles/haystack-rag/ 回顾本文