🔍 Haystack RAG 检索增强框架源码解析

Pipeline/Component/Retriever/Generator 四大核心,完整拆解 Python 最强 RAG 框架

源码级别解析 · pipeline.py · component.py · retrievers.py · generators.py
2026-03-24 | 每日技术深度解读

什么是 Haystack?

Haystack 是 deepset 开发的生产级 RAG(检索增强生成)框架

核心特性:

  • 组件化架构(Pipeline + Component)
  • 声明式 Pipeline 定义(Python 代码或 YAML)
  • 内置 RAG 组件(Retriever、Generator、Embedder)
  • 支持异步执行(async/await)
  • 类型安全的组件连接
  • 序列化与持久化支持

定位: LangChain/LlamaIndex 的替代方案,更注重生产环境稳定性

核心架构概览

┌─────────────────────────────────────────────────────────────────┐ │ Haystack 架构 │ ├─────────────────────────────────────────────────────────────────┤ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Pipeline │───▶│ Component │───▶│ Socket │ │ │ │ (编排引擎) │ │ (组件抽象) │ │ (输入/输出) │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ │ │ │ ├──────────┬──────────┐ │ │ ▼ ▼ ▼ ▼ │ │ ┌─────────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ Graph │ │Retriever│ │Generator│ │Embedder │ │ │ │ (有向图) │ │ (检索器) │ │ (生成器) │ │(嵌入器) │ │ │ └─────────────┘ └─────────┘ └─────────┘ └─────────┘ │ └─────────────────────────────────────────────────────────────────┘

核心模块:Pipeline、Component、Retriever、Generator

四大核心概念

概念 职责 类比
Pipeline 组件编排引擎,管理执行流程 工作流引擎(Airflow)
Component 可复用的处理单元 Unix 管道命令
Socket 组件的输入/输出端口 函数参数和返回值
Document 数据载体(文本 + 元数据 + 向量) ORM 实体对象

Pipeline 接口定义

class Pipeline:
    """组件编排引擎"""
    
    def __init__(self):
        self.graph = nx.DiGraph()  # 有向无环图
        self._component_names: dict[int, str] = {}
    
    # 组件管理
    def add_component(self, name: str, instance: Component) -> None: ...
    def get_component(self, name: str) -> Component: ...
    def remove_component(self, name: str) -> None: ...
    
    # 连接管理
    def connect(self, from_component: str, to_component: str) -> None: ...
    
    # 执行
    def run(self, data: dict, ...) -> dict: ...
    async def run_async(self, data: dict, ...) -> dict: ...
    
    # 序列化
    def to_dict(self) -> dict: ...
    @classmethod
    def from_dict(cls, data: dict) -> "Pipeline": ...
    def dumps(self) -> str: ...
    @classmethod
    def loads(cls, data: str) -> "Pipeline": ...

Pipeline.add_component 组件管理

def add_component(self, name: str, instance: Component) -> None:
    # 1. 验证组件名称
    if name in self.graph.nodes:
        raise ValueError(f"Component '{name}' already exists")
    
    # 2. 验证是否为有效组件
    if not isinstance(instance, Component):
        raise ValueError("Must be a Component instance")
    
    # 3. 检查组件是否已被其他 Pipeline 使用
    if instance.__haystack_added_to_pipeline__ is not None:
        raise ValueError(f"Component already added to another Pipeline")
    
    # 4. 添加到图中
    self.graph.add_node(name, instance=instance)
    self._component_names[id(instance)] = name
    
    # 5. 标记组件归属
    instance.__haystack_added_to_pipeline__ = weakref.ref(self)

关键: 一个 Component 实例只能属于一个 Pipeline

Pipeline 拓扑排序

def _topological_sort(self) -> list[str]:
    """使用 Kahn 算法进行拓扑排序"""
    # 1. 计算入度
    in_degree = {node: 0 for node in self.graph.nodes()}
    for node in self.graph.nodes():
        for predecessor in self.graph.predecessors(node):
            in_degree[node] += 1
    
    # 2. 初始化队列(入度为 0 的节点)
    queue = deque([node for node, degree in in_degree.items() if degree == 0])
    result = []
    
    # 3. BFS 遍历
    while queue:
        node = queue.popleft()
        result.append(node)
        
        for successor in self.graph.successors(node):
            in_degree[successor] -= 1
            if in_degree[successor] == 0:
                queue.append(successor)
    
    # 4. 检测循环依赖
    if len(result) != len(self.graph.nodes()):
        raise ValueError("Pipeline contains a cycle!")
    
    return result

执行优先级队列

def _prepare_component_inputs(self, component_name: str) -> dict:
    """准备组件输入数据"""
    component = self.graph.nodes[component_name]["instance"]
    inputs = {}
    
    # 1. 从前驱组件的输出中获取输入
    for predecessor in self.graph.predecessors(component_name):
        edge_data = self.graph.edges[predecessor, component_name]
        from_socket = edge_data["from_socket"]
        to_socket = edge_data["to_socket"]
        
        # 获取前驱组件的输出值
        predecessor_output = self._component_outputs[predecessor]
        inputs[to_socket] = predecessor_output[from_socket]
    
    # 2. 合并用户提供的输入
    if component_name in self._user_inputs:
        inputs.update(self._user_inputs[component_name])
    
    # 3. 应用默认值
    for socket_name, socket in component.__haystack_input__.items():
        if socket_name not in inputs and socket.default_value is not _empty:
            inputs[socket_name] = socket.default_value
    
    return inputs

Pipeline.run 执行流程

┌─────────────────────────────────────────────────────────────────┐ │ Pipeline.run() 执行流程 │ ├─────────────────────────────────────────────────────────────────┤ │ 1. 拓扑排序(确定执行顺序) │ │ │ │ │ ▼ │ │ 2. 验证输入(检查所有必需参数) │ │ │ │ │ ▼ │ │ 3. 预热组件(调用 warm_up) │ │ │ │ │ ▼ │ │ 4. 按拓扑顺序执行组件 │ │ ├─ 准备输入数据 │ │ ├─ 调用 component.run() │ │ ├─ 保存输出到 _component_outputs │ │ └─ 传递给后继组件 │ │ │ │ │ ▼ │ │ 5. 返回最终输出(所有叶子节点的输出) │ └─────────────────────────────────────────────────────────────────┘

Pipeline 断点调试

def run(self, data: dict, debug: bool = False) -> dict:
    """支持断点调试的执行"""
    if debug:
        # 1. 记录每个组件的执行状态
        self._debug_info = {
            "inputs": {},
            "outputs": {},
            "timings": {}
        }
    
    for component_name in self._topological_sort():
        component = self.graph.nodes[component_name]["instance"]
        inputs = self._prepare_component_inputs(component_name)
        
        if debug:
            start_time = time.time()
            self._debug_info["inputs"][component_name] = deepcopy(inputs)
        
        # 执行组件
        outputs = component.run(**inputs)
        
        if debug:
            self._debug_info["outputs"][component_name] = outputs
            self._debug_info["timings"][component_name] = time.time() - start_time
        
        self._component_outputs[component_name] = outputs
    
    return self._collect_final_outputs()

Component 装饰器

@component
class MyRetriever:
    """自定义检索器组件"""
    
    def __init__(self, top_k: int = 10):
        self.top_k = top_k
    
    @component.output_types(documents=list[Document])
    def run(self, query: str) -> dict:
        """执行检索"""
        docs = self._search(query, top_k=self.top_k)
        return {"documents": docs}

# 装饰器做了什么?
# 1. 设置 __haystack_input__ 和 __haystack_output__
# 2. 注册到 component.registry
# 3. 验证 run() 方法签名
# 4. 添加序列化支持

核心: 装饰器自动解析类型注解,创建输入/输出 Socket

ComponentMeta 元类

class ComponentMeta(type):
    """组件元类,在实例化时设置 Socket"""
    
    def __call__(cls, *args, **kwargs):
        # 1. 调用 __new__ 和 __init__
        instance = super().__call__(*args, **kwargs)
        
        # 2. 检查 run_async 是否为协程
        has_async_run = hasattr(instance, "run_async")
        if has_async_run and not inspect.iscoroutinefunction(instance.run_async):
            raise ComponentError("run_async must be a coroutine")
        
        # 3. 解析输入 Socket
        ComponentMeta._parse_and_set_input_sockets(cls, instance)
        
        # 4. 解析输出 Socket
        ComponentMeta._parse_and_set_output_sockets(instance)
        
        # 5. 标记未添加到 Pipeline
        instance.__haystack_added_to_pipeline__ = None
        
        return instance

InputSocket 输入插座

@dataclass
class InputSocket:
    """组件的输入端口"""
    name: str                    # Socket 名称
    type: Any                    # 类型注解
    default_value: Any = _empty  # 默认值
    is_variadic: bool = False    # 是否可变参数
    
    @property
    def is_optional(self) -> bool:
        """是否可选(有默认值)"""
        return self.default_value is not _empty

# 示例:从 run 方法签名解析
def run(self, query: str, top_k: int = 10, **kwargs):
    pass

# 解析结果:
# InputSocket(name="query", type=str, default_value=_empty)
# InputSocket(name="top_k", type=int, default_value=10)
# InputSocket(name="kwargs", type=dict, is_variadic=True)

OutputSocket 输出插座

@dataclass
class OutputSocket:
    """组件的输出端口"""
    name: str    # Socket 名称
    type: Any    # 类型注解

# 定义输出的两种方式:

# 方式1:装饰器
@component.output_types(documents=list[Document], query=str)
def run(self, query: str) -> dict:
    return {"documents": [...], "query": query}

# 方式2:在 __init__ 中设置
def __init__(self):
    component.set_output_types(
        self,
        documents=list[Document],
        query=str
    )

# 解析结果:
# OutputSocket(name="documents", type=list[Document])
# OutputSocket(name="query", type=str)

Component Protocol

@runtime_checkable
class Component(Protocol):
    """组件协议,用于类型检查"""
    
    def run(self, *args: Any, **kwargs: Any) -> Mapping[str, Any]:
        """同步执行方法"""
        ...
    
    # 注意:
    # 1. Protocol 只用于类型检查,不影响运行时
    # 2. run() 签名灵活,可以是任意参数
    # 3. 返回值必须是字典(Mapping[str, Any])
    
    # 示例:以下都是有效的组件
    # def run(self, query: str) -> dict: ...
    # def run(self, **kwargs) -> dict: ...
    # def run(self, documents: list) -> dict: ...

协议检查: isinstance(obj, Component) 在运行时返回 True/False

warm_up 预热机制

class SentenceTransformersDocumentEmbedder:
    """文档嵌入器"""
    
    def __init__(self, model: str = "sentence-transformers/all-mpnet-base-v2"):
        self.model = model
        self.embedding_backend = None  # 延迟加载
    
    def warm_up(self) -> None:
        """预热:加载模型"""
        if self.embedding_backend is None:
            self.embedding_backend = _SentenceTransformersEmbeddingBackendFactory.get_embedding_backend(
                model=self.model,
                device=self.device.to_torch_str(),
                ...
            )
    
    def run(self, documents: list[Document]) -> dict:
        if self.embedding_backend is None:
            self.warm_up()  # 自动预热
        
        embeddings = self.embedding_backend.embed(...)
        return {"documents": [...]}

output_types 装饰器

def output_types(**types: Any) -> Callable:
    """装饰器工厂:定义输出类型"""
    
    def output_types_decorator(run_method: Callable) -> Callable:
        # 1. 验证方法名
        if run_method.__name__ not in ("run", "run_async"):
            raise ComponentError("只能用于 run/run_async 方法")
        
        # 2. 创建 OutputSocket
        sockets = {
            name: OutputSocket(name=name, type=type_)
            for name, type_ in types.items()
        }
        
        # 3. 缓存到方法属性
        setattr(run_method, "_output_types_cache", sockets)
        
        return run_method
    
    return output_types_decorator

# 使用示例
@component.output_types(replies=list[ChatMessage])
def run(self, messages: list[ChatMessage]) -> dict:
    return {"replies": [...]}

set_input_types 动态输入

class PromptBuilder:
    """提示词构建器"""
    
    def __init__(self, template: str):
        self.template = template
        
        # 从模板中提取变量
        variables = self._extract_template_variables(template)
        
        # 动态设置输入类型
        for var in variables:
            component.set_input_type(self, var, Any, default="")
    
    def run(self, **kwargs) -> dict:
        """使用模板变量作为输入"""
        prompt = self.template.render(**kwargs)
        return {"prompt": prompt}

# 示例:
# template = "Translate {{ text }} to {{ language }}"
# builder = PromptBuilder(template)
# 
# 自动创建输入:
# InputSocket(name="text", type=Any, default="")
# InputSocket(name="language", type=Any, default="")

InMemoryBM25Retriever

@component
class InMemoryBM25Retriever:
    """基于 BM25 的内存检索器"""
    
    def __init__(
        self,
        document_store: InMemoryDocumentStore,
        filters: dict | None = None,
        top_k: int = 10,
        scale_score: bool = False,
        filter_policy: FilterPolicy = FilterPolicy.REPLACE
    ):
        self.document_store = document_store
        self.filters = filters
        self.top_k = top_k
        self.scale_score = scale_score
        self.filter_policy = filter_policy
    
    @component.output_types(documents=list[Document])
    def run(self, query: str, filters: dict | None = None, top_k: int | None = None):
        # 合并过滤器
        final_filters = self._merge_filters(filters)
        
        # 执行 BM25 检索
        docs = self.document_store.bm25_retrieval(
            query=query,
            filters=final_filters,
            top_k=top_k or self.top_k,
            scale_score=self.scale_score
        )
        return {"documents": docs}

BM25 检索算法

def bm25_retrieval(self, query: str, top_k: int) -> list[Document]:
    """BM25 检索算法"""
    # 1. 分词
    query_terms = self._tokenize(query)
    
    # 2. 计算每个文档的 BM25 分数
    scores = []
    for doc in self.documents:
        score = 0.0
        doc_terms = self._tokenize(doc.content)
        doc_len = len(doc_terms)
        
        for term in query_terms:
            if term in doc_terms:
                # TF (Term Frequency)
                tf = doc_terms.count(term) / doc_len
                
                # IDF (Inverse Document Frequency)
                df = sum(1 for d in self.documents if term in d.content)
                idf = log((N - df + 0.5) / (df + 0.5) + 1)
                
                # BM25 公式
                score += idf * (tf * (k1 + 1)) / (tf + k1 * (1 - b + b * doc_len / avgdl))
        
        scores.append((doc, score))
    
    # 3. 排序并返回 top_k
    scores.sort(key=lambda x: x[1], reverse=True)
    return [doc for doc, score in scores[:top_k]]

run/run_async 双模式

@component
class InMemoryBM25Retriever:
    """支持同步和异步的检索器"""
    
    @component.output_types(documents=list[Document])
    def run(self, query: str, ...) -> dict[str, list[Document]]:
        """同步执行"""
        docs = self.document_store.bm25_retrieval(...)
        return {"documents": docs}
    
    @component.output_types(documents=list[Document])
    async def run_async(self, query: str, ...) -> dict[str, list[Document]]:
        """异步执行"""
        docs = await self.document_store.bm25_retrieval_async(...)
        return {"documents": docs}

# 注意:
# 1. run 和 run_async 必须有相同的签名
# 2. 输出类型必须一致
# 3. Pipeline.run() 会自动选择同步/异步方法

FilterPolicy 过滤策略

class FilterPolicy(Enum):
    REPLACE = "replace"  # 替换初始化过滤器
    MERGE = "merge"      # 合并初始化和运行时过滤器

def _merge_filters(self, runtime_filters: dict | None) -> dict:
    """合并过滤器"""
    if self.filter_policy == FilterPolicy.MERGE and runtime_filters:
        # 合并策略:运行时覆盖初始化
        return {**(self.filters or {}), **runtime_filters}
    else:
        # 替换策略:使用运行时过滤器
        return runtime_filters or self.filters

# 使用示例:
# 初始化时:filters={"category": "tech"}
# 运行时:filters={"date": "2024"}
# 
# MERGE 结果:{"category": "tech", "date": "2024"}
# REPLACE 结果:{"date": "2024"}

SentenceTransformersDocumentEmbedder

@component
class SentenceTransformersDocumentEmbedder:
    """文档嵌入器"""
    
    def __init__(
        self,
        model: str = "sentence-transformers/all-mpnet-base-v2",
        device: ComponentDevice | None = None,
        batch_size: int = 32,
        normalize_embeddings: bool = False,
        precision: Literal["float32", "int8", "uint8", "binary"] = "float32",
        backend: Literal["torch", "onnx", "openvino"] = "torch"
    ):
        self.model = model
        self.embedding_backend = None
    
    def warm_up(self) -> None:
        """加载模型"""
        self.embedding_backend = _SentenceTransformersEmbeddingBackendFactory.get_embedding_backend(
            model=self.model,
            device=self.device.to_torch_str(),
            backend=self.backend
        )
    
    @component.output_types(documents=list[Document])
    def run(self, documents: list[Document]) -> dict:
        embeddings = self.embedding_backend.embed([doc.content for doc in documents])
        new_docs = [replace(doc, embedding=emb) for doc, emb in zip(documents, embeddings)]
        return {"documents": new_docs}

Embedding 后端架构

┌─────────────────────────────────────────────────────────────────┐ │ Embedding Backend 架构 │ ├─────────────────────────────────────────────────────────────────┤ │ ┌──────────────────────────────────────────────────┐ │ │ │ _SentenceTransformersEmbeddingBackendFactory │ │ │ └──────────────────────────────────────────────────┘ │ │ │ │ │ ┌────────────────┼────────────────┐ │ │ ▼ ▼ ▼ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Torch │ │ ONNX │ │ OpenVINO │ │ │ │ Backend │ │ Backend │ │ Backend │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ │ │ │ └────────────────┴────────────────┘ │ │ │ │ │ ▼ │ │ SentenceTransformer │ │ (Hugging Face) │ └─────────────────────────────────────────────────────────────────┘

OpenAIChatGenerator

@component
class OpenAIChatGenerator:
    """OpenAI 聊天生成器"""
    
    SUPPORTED_MODELS = ["gpt-4o", "gpt-4", "gpt-3.5-turbo", ...]
    
    def __init__(
        self,
        api_key: Secret = Secret.from_env_var("OPENAI_API_KEY"),
        model: str = "gpt-4o-mini",
        streaming_callback: StreamingCallbackT | None = None,
        generation_kwargs: dict | None = None,
        tools: list[Tool] | None = None,
        tools_strict: bool = False
    ):
        self.client = OpenAI(api_key=api_key.resolve_value())
        self.async_client = AsyncOpenAI(api_key=api_key.resolve_value())
    
    @component.output_types(replies=list[ChatMessage])
    def run(
        self,
        messages: list[ChatMessage],
        streaming_callback: StreamingCallbackT | None = None,
        generation_kwargs: dict | None = None
    ) -> dict:
        # 调用 OpenAI API
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[msg.to_openai_dict() for msg in messages],
            **generation_kwargs
        )
        replies = [self._convert_to_chat_message(response, choice) for choice in response.choices]
        return {"replies": replies}

Streaming 流式响应

def run(self, messages: list[ChatMessage], streaming_callback=None):
    # 判断是否流式
    is_streaming = streaming_callback is not None
    
    response = self.client.chat.completions.create(
        model=self.model,
        messages=[msg.to_openai_dict() for msg in messages],
        stream=is_streaming
    )
    
    if is_streaming:
        # 流式处理
        chunks = []
        for chunk in response:
            chunk_delta = self._convert_chunk_to_streaming_chunk(chunk)
            chunks.append(chunk_delta)
            streaming_callback(chunk_delta)  # 实时回调
        
        replies = [self._convert_chunks_to_chat_message(chunks)]
    else:
        # 非流式处理
        replies = [self._convert_to_chat_message(response, choice) for choice in response.choices]
    
    return {"replies": replies}

Tool Calling 机制

@component
class OpenAIChatGenerator:
    def __init__(self, tools: list[Tool] | None = None):
        self.tools = tools
    
    def _prepare_tools(self, tools: list[Tool] | None) -> list[dict]:
        """准备工具定义"""
        if not tools:
            return []
        
        return [
            {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.parameters
                }
            }
            for tool in tools
        ]
    
    def run(self, messages: list[ChatMessage], tools: list[Tool] | None = None):
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[...],
            tools=self._prepare_tools(tools or self.tools)
        )
        
        # 提取工具调用
        tool_calls = self._extract_tool_calls(response)
        return {"replies": [ChatMessage.from_assistant(tool_calls=tool_calls)]}

Structured Output 结构化输出

from pydantic import BaseModel

class Person(BaseModel):
    name: str
    age: int
    email: str

@component
class OpenAIChatGenerator:
    def run(self, messages: list[ChatMessage], response_format: type[BaseModel] | None = None):
        # 准备 response_format
        if response_format:
            json_schema = {
                "type": "json_schema",
                "json_schema": {
                    "name": response_format.__name__,
                    "strict": True,
                    "schema": to_strict_json_schema(response_format)
                }
            }
            generation_kwargs["response_format"] = json_schema
        
        # 调用 OpenAI API
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[...],
            **generation_kwargs
        )
        
        # 解析响应
        person = response.choices[0].message.parsed  # 自动解析为 Pydantic 模型
        return {"replies": [ChatMessage.from_assistant(content=person)]}

PromptBuilder 模板构建

@component
class PromptBuilder:
    """提示词构建器"""
    
    def __init__(
        self,
        template: str,
        required_variables: list[str] | Literal["*"] | None = None,
        variables: list[str] | None = None
    ):
        self._template_string = template
        self._env = SandboxedEnvironment(extensions=[Jinja2TimeExtension])
        self.template = self._env.from_string(template)
        
        # 提取模板变量
        if not variables:
            _, template_variables = _extract_template_variables_and_assignments(template)
            variables = list(template_variables)
        
        # 动态设置输入类型
        for var in variables:
            if required_variables == "*" or var in (required_variables or []):
                component.set_input_type(self, var, Any)  # 必需
            else:
                component.set_input_type(self, var, Any, "")  # 可选,默认空字符串

Jinja2 模板渲染

@component.output_types(prompt=str)
def run(
    self,
    template: str | None = None,
    template_variables: dict | None = None,
    **kwargs
) -> dict:
    """渲染模板"""
    # 合并变量
    variables = {**kwargs, **(template_variables or {})}
    
    # 验证必需变量
    self._validate_variables(set(variables.keys()))
    
    # 选择模板
    compiled_template = self.template
    if template is not None:
        compiled_template = self._env.from_string(template)
    
    # 渲染
    prompt = compiled_template.render(variables)
    return {"prompt": prompt}

# 示例:
# template = "Translate {{ text }} to {{ language }}"
# builder.run(text="Hello", language="Chinese")
# 输出:{"prompt": "Translate Hello to Chinese"}

动态模板替换

# 运行时替换模板
documents = [Document(content="Python is great")]

new_template = """
Given these documents, answer the question.

Documents:
{% for doc in documents %}
    Document {{ loop.index }}:
    {{ doc.content }}
{% endfor %}

Question: {{ query }}
Answer:
"""

result = pipeline.run({
    "prompt_builder": {
        "documents": documents,
        "query": "What is Python?",
        "template": new_template  # 运行时替换
    }
})

print(result["prompt_builder"]["prompt"])

RAG Pipeline 实战

┌─────────────────────────────────────────────────────────────────┐ │ RAG Pipeline 架构 │ ├─────────────────────────────────────────────────────────────────┤ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Retriever │───▶│PromptBuilder│───▶│ Generator │ │ │ │ (检索器) │ │ (提示构建) │ │ (生成器) │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌─────────────┐ ┌─────────────┐ │ │ │DocumentStore│ │ Output │ │ │ │ (文档存储) │ │ (输出) │ │ │ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────────────┘

文档索引 Pipeline

from haystack import Pipeline
from haystack.components.converters import TextFileToDocument
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.components.writers import DocumentWriter

# 创建索引 Pipeline
indexing_pipeline = Pipeline()

# 添加组件
indexing_pipeline.add_component("converter", TextFileToDocument())
indexing_pipeline.add_component("cleaner", DocumentCleaner())
indexing_pipeline.add_component("splitter", DocumentSplitter(split_by="sentence", split_length=10))
indexing_pipeline.add_component("embedder", SentenceTransformersDocumentEmbedder())
indexing_pipeline.add_component("writer", DocumentWriter(document_store))

# 连接组件
indexing_pipeline.connect("converter", "cleaner")
indexing_pipeline.connect("cleaner", "splitter")
indexing_pipeline.connect("splitter", "embedder")
indexing_pipeline.connect("embedder", "writer")

# 执行索引
indexing_pipeline.run({"converter": {"sources": ["doc.txt"]}})

查询 Pipeline

from haystack.components.retrievers import InMemoryEmbeddingRetriever
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator

# 创建查询 Pipeline
query_pipeline = Pipeline()

# 添加组件
query_pipeline.add_component("text_embedder", SentenceTransformersTextEmbedder())
query_pipeline.add_component("retriever", InMemoryEmbeddingRetriever(document_store))
query_pipeline.add_component("prompt_builder", PromptBuilder(template=rag_template))
query_pipeline.add_component("llm", OpenAIGenerator())

# 连接组件
query_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
query_pipeline.connect("retriever.documents", "prompt_builder.documents")
query_pipeline.connect("prompt_builder.prompt", "llm.prompt")

# 执行查询
result = query_pipeline.run({
    "text_embedder": {"text": "What is Python?"},
    "prompt_builder": {"query": "What is Python?"}
})

组件连接规则

def connect(self, from_component: str, to_component: str) -> None:
    """连接两个组件"""
    # 1. 验证组件存在
    if from_component not in self.graph.nodes:
        raise ValueError(f"Component '{from_component}' not found")
    if to_component not in self.graph.nodes:
        raise ValueError(f"Component '{to_component}' not found")
    
    # 2. 获取组件实例
    from_instance = self.graph.nodes[from_component]["instance"]
    to_instance = self.graph.nodes[to_component]["instance"]
    
    # 3. 匹配 Socket
    from_sockets = from_instance.__haystack_output__
    to_sockets = to_instance.__haystack_input__
    
    # 自动匹配同名 Socket
    for socket_name in from_sockets:
        if socket_name in to_sockets:
            self.graph.add_edge(
                from_component, to_component,
                from_socket=socket_name,
                to_socket=socket_name
            )
    
    # 4. 检测循环依赖
    if not nx.is_directed_acyclic_graph(self.graph):
        raise ValueError("Connection creates a cycle!")

序列化与持久化

# 序列化 Pipeline
pipeline_dict = query_pipeline.to_dict()

# 保存到文件
import yaml
with open("pipeline.yml", "w") as f:
    yaml.dump(pipeline_dict, f)

# 从文件加载
with open("pipeline.yml", "r") as f:
    pipeline_dict = yaml.safe_load(f)

pipeline = Pipeline.from_dict(pipeline_dict)

# pipeline_dict 结构:
{
    "metadata": {"version": "2.0"},
    "components": {
        "retriever": {
            "type": "InMemoryBM25Retriever",
            "init_parameters": {
                "document_store": {"type": "InMemoryDocumentStore"},
                "top_k": 10
            }
        }
    },
    "connections": [
        {"sender": "retriever.documents", "receiver": "prompt_builder.documents"}
    ]
}

to_dict/from_dict 实现

def to_dict(self) -> dict:
    """序列化 Pipeline"""
    return {
        "metadata": {"version": "2.0"},
        "components": {
            name: {
                "type": instance.__class__.__module__ + "." + instance.__class__.__name__,
                "init_parameters": instance.to_dict()["init_parameters"]
            }
            for name, instance in self.graph.nodes(data="instance")
        },
        "connections": [
            {
                "sender": f"{from_comp}.{data['from_socket']}",
                "receiver": f"{to_comp}.{data['to_socket']}"
            }
            for from_comp, to_comp, data in self.graph.edges(data=True)
        ]
    }

@classmethod
def from_dict(cls, data: dict) -> "Pipeline":
    """反序列化 Pipeline"""
    pipeline = cls()
    
    # 1. 创建组件
    for name, config in data["components"].items():
        component_class = deserialize_class(config["type"])
        instance = component_class(**config["init_parameters"])
        pipeline.add_component(name, instance)
    
    # 2. 连接组件
    for conn in data["connections"]:
        sender, receiver = conn["sender"].split("."), conn["receiver"].split(".")
        pipeline.connect(sender[0], receiver[0])
    
    return pipeline

YAML Pipeline 配置

metadata:
  version: "2.0"
  author: "Haystack Team"

components:
  - name: retriever
    type: haystack.components.retrievers.InMemoryBM25Retriever
    params:
      document_store:
        type: haystack.document_stores.InMemoryDocumentStore
      top_k: 10
  
  - name: prompt_builder
    type: haystack.components.builders.PromptBuilder
    params:
      template: |
        Given these documents: {{ documents }}
        Answer: {{ query }}
  
  - name: generator
    type: haystack.components.generators.OpenAIGenerator
    params:
      model: gpt-4o-mini
      api_key:
        env_vars: ["OPENAI_API_KEY"]

connections:
  - sender: retriever.documents
    receiver: prompt_builder.documents
  - sender: prompt_builder.prompt
    receiver: generator.prompt

错误处理机制

class PipelineError(Exception):
    """Pipeline 错误基类"""
    pass

class PipelineRuntimeError(PipelineError):
    """运行时错误"""
    pass

class PipelineConnectError(PipelineError):
    """连接错误"""
    pass

def run(self, data: dict) -> dict:
    try:
        # 验证输入
        self._validate_inputs(data)
        
        # 执行 Pipeline
        for component_name in self._topological_sort():
            try:
                component = self.graph.nodes[component_name]["instance"]
                inputs = self._prepare_inputs(component_name, data)
                outputs = component.run(**inputs)
                self._component_outputs[component_name] = outputs
            except Exception as e:
                raise PipelineRuntimeError(
                    f"Error in component '{component_name}': {e}"
                ) from e
        
        return self._collect_outputs()
    
    except PipelineError:
        raise
    except Exception as e:
        raise PipelineError(f"Unexpected error: {e}") from e

性能优化策略

组件级别

  • 使用 warm_up 预加载模型
  • 批处理文档
  • 缓存 Embedding 结果
  • 使用 ONNX/OpenVINO 加速

Pipeline 级别

  • 并行执行独立组件
  • 使用异步执行
  • 减少数据拷贝
  • 优化拓扑结构
# 并行执行示例
#          ┌─> ComponentA ─┐
# Input ──┤               ├──> Merge
#          └─> ComponentB ─┘

# ComponentA 和 ComponentB 可以并行执行

异步执行优化

async def run_async(self, data: dict) -> dict:
    """异步执行 Pipeline"""
    # 1. 识别可并行执行的组件
    layers = self._topological_layers()
    
    for layer in layers:
        # 2. 并行执行同一层的组件
        tasks = []
        for component_name in layer:
            component = self.graph.nodes[component_name]["instance"]
            inputs = self._prepare_inputs(component_name, data)
            
            if hasattr(component, "run_async"):
                tasks.append(component.run_async(**inputs))
            else:
                # 包装同步方法为异步
                tasks.append(asyncio.to_thread(component.run, **inputs))
        
        # 3. 等待所有任务完成
        outputs = await asyncio.gather(*tasks)
        
        # 4. 保存输出
        for component_name, output in zip(layer, outputs):
            self._component_outputs[component_name] = output
    
    return self._collect_outputs()

批处理优化

# 批处理 Embedding
@component
class BatchEmbedder:
    def __init__(self, batch_size: int = 32):
        self.batch_size = batch_size
        self.embedding_backend = None
    
    def run(self, documents: list[Document]) -> dict:
        # 分批处理
        all_embeddings = []
        for i in range(0, len(documents), self.batch_size):
            batch = documents[i:i + self.batch_size]
            texts = [doc.content for doc in batch]
            embeddings = self.embedding_backend.embed(texts)
            all_embeddings.extend(embeddings)
        
        # 更新文档
        new_docs = [
            replace(doc, embedding=emb)
            for doc, emb in zip(documents, all_embeddings)
        ]
        return {"documents": new_docs}

# 性能提升:批处理可以减少 50% 的 API 调用次数

与其他框架对比

框架 架构 优点 缺点
Haystack Pipeline + Component 生产级、类型安全、易调试 学习曲线陡峭
LangChain Chain + Agent 生态丰富、社区活跃 调试困难、不稳定
LlamaIndex Index + Query RAG 专注、易上手 灵活性较低
DSPy Module + Optimizer 自动优化 Prompt 概念复杂

最佳实践

✅ 推荐做法

  • 使用类型注解定义组件
  • 实现 warm_up 预加载
  • 支持异步执行
  • 添加详细的 docstring
  • 使用 YAML 配置 Pipeline
  • 实现 to_dict/from_dict

❌ 避免做法

  • 在 run() 中加载模型
  • 忽略类型注解
  • 硬编码配置
  • 混合同步/异步代码
  • 过度复杂的 Pipeline
  • 忘记序列化支持

调试技巧

# 1. 使用 debug 模式
result = pipeline.run(data, debug=True)
print(pipeline._debug_info)

# 2. 可视化 Pipeline
pipeline.draw("pipeline.png")

# 3. 打印组件连接
for from_comp, to_comp, data in pipeline.graph.edges(data=True):
    print(f"{from_comp}.{data['from_socket']} -> {to_comp}.{data['to_socket']}")

# 4. 单独测试组件
retriever = pipeline.get_component("retriever")
result = retriever.run(query="test")

# 5. 检查组件状态
for name, instance in pipeline.graph.nodes(data="instance"):
    print(f"{name}: {instance.__haystack_input__}")
    print(f"{name}: {instance.__haystack_output__}")

常见问题

问题 原因 解决方案
组件连接失败 Socket 名称不匹配 检查输入/输出 Socket 名称
循环依赖错误 Pipeline 有环 重新设计 Pipeline 拓扑
类型不匹配 Socket 类型不一致 使用类型转换组件
内存溢出 文档数量过多 使用批处理和流式处理
性能慢 模型未预热 调用 warm_up 或使用 Pipeline.warm_up()

Haystack 生态系统

┌─────────────────────────────────────────────────────────────────┐ │ Haystack 生态系统 │ ├─────────────────────────────────────────────────────────────────┤ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Haystack │ │ Haystack │ │ deepset │ │ │ │ Core │ │ Hub │ │ Cloud │ │ │ │ (核心库) │ │ (组件库) │ │ (云服务) │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ │ │ │ └──────────────────┼──────────────────┘ │ │ │ │ │ ┌──────────────────┼──────────────────┐ │ │ ▼ ▼ ▼ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Document │ │ Milvus/ │ │ Ollama/ │ │ │ │ Stores │ │ Pinecone │ │ OpenAI │ │ │ │ (文档存储) │ │ (向量数据库) │ │ (LLM) │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────────────┘

Haystack 未来路线图

v2.x(当前)

  • Pipeline 组件化架构
  • 异步执行支持
  • YAML 配置
  • 类型安全

v3.x(计划中)

  • 分布式 Pipeline 执行
  • 更好的多模态支持
  • 自动优化组件参数
  • 集成更多向量数据库

总结

Haystack 核心价值

  • 生产级 RAG 框架,稳定可靠
  • 组件化架构,易于扩展
  • 类型安全,减少运行时错误
  • 完善的生态系统

关键源码文件

  • pipeline.py - Pipeline 编排引擎
  • component.py - Component 装饰器和元类
  • retrievers/*.py - 检索器实现
  • generators/*.py - LLM 生成器
  • embedders/*.py - 文档嵌入器

适用场景

RAG 应用、文档问答、知识库、智能客服

参考资料

  • 源码仓库: github.com/deepset-ai/haystack
  • 官方文档: docs.haystack.deepset.ai
  • Haystack Hub: haystack.deepset.ai/hub
  • deepset Cloud: www.deepset.ai
  • 社区论坛: discourse.haystack.deepset.ai

感谢阅读!
访问 https://atcfu.com/ai-articles/haystack-rag/ 回顾本文