跳到主要内容

综合实战——构建生产级 RAG 问答系统(FDE-RAG)

从零实现一个生产级 RAG 系统,覆盖分块策略、混合检索、重排序、答案生成、评估、护栏、部署全链路。不依赖 LangChain,用 Python 原生库实现,理解 RAG 的本质。


项目背景

你是一家拥有 500+ 技术文档的公司(API 文档、架构设计、运维手册、故障排查指南)的基础设施工程师。员工每天通过关键词搜索找文档,但搜索结果往往不相关——搜"怎么部署"找不到"部署指南",搜"数据库慢"找不到"SQL 优化手册"。

手动流程的痛点

  1. 文档散落在 Confluence、GitLab Wiki、Notion,没有统一入口
  2. 关键词搜索无法理解意图,新员工入职需要 2-3 周才能熟练找到信息
  3. 文档更新后没有人同步更新索引,搜索结果过时

目标:构建一个 RAG(Retrieval-Augmented Generation)问答系统:

  • 自动索引所有技术文档
  • 支持自然语言查询
  • 回答附带引用来源,可追溯
  • 回答质量可评估,可迭代

为什么要自己实现而不是调 LangChain?

LangChain 等框架封装了大量细节,但如果你不理解 RAG 的每个环节是怎么工作的,调试就是盲人摸象。本章从零实现,每一步都写完整代码,让你看到 RAG 系统的"骨骼和肌肉"。


整体架构

┌────────────────────────────────────────────────────────────┐
│ 离线管道(Ingestion Pipeline) │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ │
│ │ 原始文档 │──→│ 分块 │──→│ 向量化 │──→│ 存储 │ │
│ │ PDF/MD │ │ Chunking │ │ Embedding│ │ Vector │ │
│ │ │ │ │ │ │ │ DB │ │
│ └──────────┘ └──────────┘ └──────────┘ └────────┘ │
│ │ │ │ │
│ 4 种策略 BGE-M3 / Qdrant / │
│ 可选 OpenAI API Chroma │
└────────────────────────────────────────────────────────────┘

┌────────────────────────────────────────────────────────────┐
│ 在线查询(Query Pipeline) │
│ │
│ 用户查询 │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 查询重写 │ ← HyDE / Step-back / 查询分类 │
│ └──────┬───────┘ │
│ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ BM25 检索 │ │ 向量检索 │ │ 元数据过滤 │ │
│ │ (关键词) │ │ (语义) │ │ (类型/时间) │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ └──────────┬───────┘ │ │
│ ▼ │ │
│ ┌──────────────┐ │ │
│ │ RRF 融合 │←────────────────┘ │
│ │ (分数合并) │ │
│ └──────┬───────┘ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 重排序 │ ← BGE Reranker / Cohere │
│ │ Top-50→Top-5 │ │
│ └──────┬───────┘ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 答案生成 │ ← LLM + RAG Prompt │
│ │ (含引用) │ │
│ └──────┬───────┘ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 输出护栏 │ ← 幻觉检测 / 敏感信息防护 │
│ └──────┬───────┘ │
│ ▼ │
│ 最终答案 │
└────────────────────────────────────────────────────────────┘

技术栈选型

组件选型原因
分块自实现(4 种策略)不依赖 LangChain,理解本质
嵌入BGE-M3(本地)/ OpenAI APIBGE-M3 中文最优,OpenAI 备选
向量库Qdrant / ChromaQdrant 生产首选,Chroma 开发最快
BM25whoosh纯 Python,轻量,生产可用
重排序BGE Reranker v2(本地)/ Cohere API两档选择
LLMOpenAI 兼容接口不绑定特定模型
评估Ragas + 自建测试集开源标准
部署FastAPI + Docker生产标准

项目结构

fde-rag/
├── src/
│ ├── chunking/
│ │ ├── strategies.py # 分块策略(4 种)
│ │ └── semantic.py # 语义分块 + 父子索引
│ ├── embedding/
│ │ ├── models.py # 嵌入模型封装
│ │ └── batch.py # 批量嵌入
│ ├── retrieval/
│ │ ├── hybrid.py # 混合检索(BM25+向量+RRF)
│ │ ├── reranker.py # 重排序
│ │ └── query_rewrite.py # 查询重写
│ ├── generation/
│ │ ├── prompt.py # RAG 提示词模板
│ │ └── answer.py # 答案生成
│ ├── evaluation/
│ │ ├── ragas_eval.py # Ragas 评估 Pipeline
│ │ └── dataset.py # 测试数据集
│ ├── guardrails/
│ │ ├── input_guard.py # 输入护栏
│ │ └── output_guard.py # 输出护栏
│ └── api/
│ ├── server.py # FastAPI 服务
│ └── middleware.py # 中间件
├── data/
│ ├── sample_docs/ # 示例文档
│ └── eval_qa.json # 评估问答集
├── docker-compose.yml
├── Dockerfile
└── pyproject.toml

第 1 节:分块策略 Chunking

分块是 RAG 系统的生命线。分错了,后面检索、生成全是垃圾。

为什么 fixed-size 分块在生产中失败

这是我们在 200 篇技术文档上的实测数据(BGE-M3 嵌入,500 个问答对评估集):

chunk_size=100: 准确率 58% 召回率 85% MRR 0.52 ← 太碎,语义不完整
chunk_size=200: 准确率 62% 召回率 78% MRR 0.58
chunk_size=300: 准确率 68% 召回率 74% MRR 0.65
chunk_size=500: 准确率 75% 召回率 71% MRR 0.72 ← 最佳平衡点
chunk_size=800: 准确率 71% 召回率 65% MRR 0.68
chunk_size=1500: 准确率 58% 召回率 55% MRR 0.54 ← 太大,噪声淹没信号

fixed-size 的核心问题是:它不知道文档的结构。一段代码块可能被从中间截断,一个表格可能被分到两个 chunk 里,一个标题和它的内容可能被分开。

策略 1:FixedChunker(Baseline)

# chunking/strategies.py
from dataclasses import dataclass
from typing import List


@dataclass
class Chunk:
"""一个文本分块"""
text: str
metadata: dict # 来源文档、位置、类型等
chunk_id: str

@property
def token_count(self) -> int:
"""估算 token 数(中文字符数 * 1.5)"""
return int(len(self.text) * 1.5)


class FixedChunker:
"""固定大小分块——简单但不智能,作为 baseline"""

def __init__(self, chunk_size: int = 500, overlap: int = 50):
self.chunk_size = chunk_size
self.overlap = overlap

def split(self, text: str, source: str = "") -> List[Chunk]:
chunks = []
# 按字符窗口滑动
start = 0
chunk_idx = 0
while start < len(text):
end = start + self.chunk_size
chunk_text = text[start:end]
if not chunk_text.strip():
break
chunks.append(Chunk(
text=chunk_text.strip(),
metadata={
"source": source,
"start": start,
"end": end,
"strategy": "fixed",
},
chunk_id=f"{source}:{chunk_idx}",
))
start = end - self.overlap
chunk_idx += 1
return chunks

策略 2:RecursiveChunker

按优先级尝试不同分隔符:段落 → 换行 → 句号 → 逗号 → 字符。

# chunking/strategies.py (continued)
class RecursiveChunker:
"""递归分隔符分块——比 fixed 好,按文档结构切分"""

# 优先级从高到低
SEPARATORS = ["\n\n", "\n", "。", ";", ",", " "]

def __init__(self, chunk_size: int = 500, overlap: int = 50):
self.chunk_size = chunk_size
self.overlap = overlap

def split(self, text: str, source: str = "") -> List[Chunk]:
# 递归分割
sentences = self._recursive_split(text, 0)
# 按 chunk_size 合并句子
chunks = []
current = []
current_len = 0
chunk_idx = 0

for sentence in sentences:
sentence_len = len(sentence)
if current_len + sentence_len > self.chunk_size and current:
chunks.append(Chunk(
text="".join(current).strip(),
metadata={
"source": source,
"strategy": "recursive",
"sentence_count": len(current),
},
chunk_id=f"{source}:{chunk_idx}",
))
# 重叠:保留最后一句
overlap_text = current[-1] if current else ""
current = [overlap_text]
current_len = len(overlap_text)
chunk_idx += 1

current.append(sentence)
current_len += sentence_len

if current:
chunks.append(Chunk(
text="".join(current).strip(),
metadata={"source": source, "strategy": "recursive"},
chunk_id=f"{source}:{chunk_idx}",
))

return chunks

def _recursive_split(self, text: str, sep_idx: int) -> List[str]:
if sep_idx >= len(self.SEPARATORS):
return [text] if text.strip() else []

sep = self.SEPARATORS[sep_idx]
parts = text.split(sep)

# 如果分出来的块都小于 chunk_size,用这个分隔符
if all(len(p) < self.chunk_size for p in parts if p.strip()):
return [p + sep for p in parts[:-1]] + [parts[-1]] if parts else []

# 否则尝试下一个分隔符
return self._recursive_split(text, sep_idx + 1)

策略 3:DocumentAwareChunker

识别 Markdown 文档结构:标题(#)、代码块(```)、列表、段落。

# chunking/strategies.py (continued)
import re


class DocumentAwareChunker:
"""文档感知分块——识别标题、代码块、表格,保持语义完整"""

def split(self, text: str, source: str = "") -> List[Chunk]:
sections = self._parse_sections(text)
chunks = []
chunk_idx = 0

for section in sections:
# 如果 section 太大,递归切分
if len(section.content) > 800:
sub_chunks = RecursiveChunker(500, 50).split(
section.content, source
)
for sub in sub_chunks:
sub.metadata["title"] = section.title
sub.metadata["level"] = section.level
sub.chunk_id = f"{source}:{chunk_idx}"
chunks.append(sub)
chunk_idx += 1
else:
chunks.append(Chunk(
text=section.content.strip(),
metadata={
"source": source,
"title": section.title,
"level": section.level,
"strategy": "document_aware",
},
chunk_id=f"{source}:{chunk_idx}",
))
chunk_idx += 1

return chunks

def _parse_sections(self, text: str) -> List["Section"]:
"""解析 Markdown 文档结构"""
sections = []
lines = text.split("\n")
current_title = ""
current_level = 0
current_content = []
in_code_block = False

for line in lines:
if line.strip().startswith("```"):
in_code_block = not in_code_block
current_content.append(line + "\n")
continue

if in_code_block:
current_content.append(line + "\n")
continue

# 检测标题
heading_match = re.match(r"^(#{1,6})\s+(.+)$", line)
if heading_match:
# 保存当前 section
if current_content:
sections.append(Section(
title=current_title,
level=current_level,
content="".join(current_content),
))
current_title = heading_match.group(2)
current_level = len(heading_match.group(1))
current_content = [line + "\n"]
else:
current_content.append(line + "\n")

if current_content:
sections.append(Section(
title=current_title,
level=current_level,
content="".join(current_content),
))

return sections


@dataclass
class Section:
title: str
level: int
content: str

策略 4:SemanticChunker

用 embedding 模型检测语义断点——当相邻句子的嵌入向量差异超过阈值时,认为是一个语义边界。

# chunking/semantic.py
from typing import List
import numpy as np
from .strategies import Chunk


class SemanticChunker:
"""语义分块——基于 embedding 相似度检测语义边界"""

def __init__(
self,
embedder, # EmbeddingModel 实例
threshold: float = 0.65, # 余弦相似度阈值
max_chunk_size: int = 800,
):
self.embedder = embedder
self.threshold = threshold
self.max_chunk_size = max_chunk_size

def split(self, text: str, source: str = "") -> List[Chunk]:
# 1. 先按句子分割
sentences = self._split_sentences(text)
if not sentences:
return []

# 2. 批量获取所有句子的 embedding
embeddings = self.embedder.encode_batch(sentences)

# 3. 检测语义断点
breakpoints = []
for i in range(len(embeddings) - 1):
similarity = self._cosine_similarity(embeddings[i], embeddings[i + 1])
if similarity < self.threshold:
breakpoints.append(i + 1)

# 4. 按断点分组,过大的 chunk 再切分
chunks = []
chunk_idx = 0
prev_bp = 0
for bp in breakpoints + [len(sentences)]:
group_sentences = sentences[prev_bp:bp]
group_text = "".join(group_sentences).strip()
if group_text:
chunks.append(Chunk(
text=group_text,
metadata={
"source": source,
"strategy": "semantic",
"sentence_count": len(group_sentences),
},
chunk_id=f"{source}:{chunk_idx}",
))
chunk_idx += 1
prev_bp = bp

return chunks

def _split_sentences(self, text: str) -> List[str]:
"""按句子分割(支持中英文)"""
# 中文句号、英文句号、感叹号、问号
parts = re.split(r"(?<=[。!?.!?])\s*", text)
return [p.strip() for p in parts if p.strip()]

def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
return 0.0
return float(np.dot(a, b) / (norm_a * norm_b))

父子索引(Parent-Child Indexing)

检索时返回小 chunk(精度高),生成时注入大 chunk 的父上下文(信息完整)。

# chunking/semantic.py (continued)
class ParentChildIndex:
"""父子索引——检索用子块,生成用父块上下文"""

def __init__(self):
# child_id -> parent_chunk
self._children_to_parent: dict[str, Chunk] = {}
# parent_id -> parent_chunk
self._parents: dict[str, Chunk] = {}

def build(self, chunks: List[Chunk], parent_size: int = 3) -> List[Chunk]:
"""
构建父子索引:每 parent_size 个子块合并为一个父块
"""
for i in range(0, len(chunks), parent_size):
parent_group = chunks[i:i + parent_size]
parent_text = " ".join(c.text for c in parent_group)
parent = Chunk(
text=parent_text,
metadata={
**parent_group[0].metadata,
"parent": True,
"child_count": len(parent_group),
},
chunk_id=parent_group[0].chunk_id,
)
self._parents[parent.chunk_id] = parent

for child in parent_group:
self._children_to_parent[child.chunk_id] = parent

return chunks

def get_parent(self, child_id: str) -> Chunk | None:
return self._children_to_parent.get(child_id)

def get_parent_text(self, child_id: str) -> str | None:
parent = self.get_parent(child_id)
return parent.text if parent else None

分块策略选择指南

场景推荐策略原因
快速原型FixedChunker最简单,5 分钟出结果
Markdown 文档DocumentAwareChunker识别标题结构,语义完整
长文本/论文SemanticChunker按语义边界切分
代码文档RecursiveChunker代码块不被截断
生产最佳DocumentAware + Semantic 组合先用 DocumentAware 分大段,再用语义检测精切

第 2 节:嵌入模型 Embedding

嵌入模型对比

模型维度中文多语言速度推荐场景
BGE-M31024100+中文生产首选
text-embedding-3-large3072快(API)已有 OpenAI 订阅
E5-large-v21024英为主英文为主的项目
GTE-Qwen2-7B3584中英慢(大模型)高质量场景

嵌入模型封装

# embedding/models.py
import numpy as np
from abc import ABC, abstractmethod
from typing import List


class EmbeddingModel(ABC):
"""嵌入模型基类"""

@abstractmethod
def encode(self, text: str) -> np.ndarray:
"""将单个文本编码为向量"""
...

def encode_batch(self, texts: List[str]) -> np.ndarray:
"""批量编码"""
return np.array([self.encode(t) for t in texts])

@property
@abstractmethod
def dimension(self) -> int:
"""向量维度"""
...


class BGEM3Embedding(EmbeddingModel):
"""BGE-M3 本地嵌入模型"""

def __init__(self, model_name: str = "BAAI/bge-m3"):
from FlagEmbedding import BGEM3FlagModel
self.model = BGEM3FlagModel(model_name, use_fp16=True)
self._dimension = 1024

def encode(self, text: str) -> np.ndarray:
result = self.model.encode([text])
return result["dense_vecs"][0]

def encode_batch(self, texts: List[str]) -> np.ndarray:
# BGE-M3 原生支持批量编码
result = self.model.encode(texts)
return result["dense_vecs"]

@property
def dimension(self) -> int:
return self._dimension


class OpenAIEmbedding(EmbeddingModel):
"""OpenAI text-embedding-3-large 嵌入"""

def __init__(self, api_key: str, model: str = "text-embedding-3-large"):
from openai import OpenAI
self.client = OpenAI(api_key=api_key)
self.model = model
self._dimension = 3072

def encode(self, text: str) -> np.ndarray:
response = self.client.embeddings.create(
model=self.model,
input=text,
)
return np.array(response.data[0].embedding)

@property
def dimension(self) -> int:
return self._dimension

批量嵌入(含断点续传)

# embedding/batch.py
import json
import os
from pathlib import Path
from typing import List, Callable
from .models import EmbeddingModel
from ..chunking.strategies import Chunk


class BatchEmbedder:
"""批量嵌入——支持断点续传、去重、进度回调"""

def __init__(
self,
model: EmbeddingModel,
checkpoint_path: str = "checkpoint.json",
batch_size: int = 32,
):
self.model = model
self.checkpoint_path = checkpoint_path
self.batch_size = batch_size
self._processed_ids = set()
self._load_checkpoint()

def embed(
self,
chunks: List[Chunk],
progress_callback: Callable[[int, int], None] | None = None,
) -> List[tuple[Chunk, list[float]]]:
"""嵌入所有未处理的 chunk,返回 (chunk, vector) 对"""
results = []
total = len(chunks)
processed = len(self._processed_ids)

for i, chunk in enumerate(chunks):
if chunk.chunk_id in self._processed_ids:
continue

vector = self.model.encode(chunk.text).tolist()
results.append((chunk, vector))
self._processed_ids.add(chunk.chunk_id)

if progress_callback and (i + 1) % self.batch_size == 0:
progress_callback(len(self._processed_ids), total)

# 定期保存 checkpoint
if (i + 1) % (self.batch_size * 4) == 0:
self._save_checkpoint()

self._save_checkpoint()
return results

def _load_checkpoint(self):
if os.path.exists(self.checkpoint_path):
with open(self.checkpoint_path) as f:
self._processed_ids = set(json.load(f))

def _save_checkpoint(self):
with open(self.checkpoint_path, "w") as f:
json.dump(list(self._processed_ids), f)

def cleanup(self):
"""清理 checkpoint"""
if os.path.exists(self.checkpoint_path):
os.remove(self.checkpoint_path)

第 3 节:混合检索 Hybrid Retrieval

为什么只用向量检索不够?因为向量检索擅长语义匹配,但不擅长精确匹配。搜"HTTP 404"时,向量可能返回"页面未找到"的解释,而不是具体的 404 错误码文档。BM25 恰好擅长这个。

BM25 检索

# retrieval/hybrid.py
import json
import os
from typing import List, Tuple
from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT, ID
from whoosh.qparser import QueryParser
from whoosh.analysis import SimpleAnalyzer


class BM25Retriever:
"""BM25 全文检索"""

def __init__(self, index_dir: str = "bm25_index"):
self.index_dir = index_dir
self.schema = Schema(
chunk_id=ID(stored=True),
content=TEXT(stored=True, analyzer=SimpleAnalyzer()),
source=ID(stored=True),
)
self._ensure_index()

def _ensure_index(self):
if not os.path.exists(self.index_dir):
os.makedirs(self.index_dir)
self.index = create_in(self.index_dir, self.schema)
else:
self.index = open_dir(self.index_dir)

def add_documents(self, chunks: List[object], vectors: List = None):
"""添加文档到 BM25 索引"""
writer = self.index.writer()
for chunk, _ in zip(chunks, vectors or [None] * len(chunks)):
writer.add_document(
chunk_id=chunk.chunk_id,
content=chunk.text,
source=chunk.metadata.get("source", ""),
)
writer.commit()

def search(self, query: str, top_k: int = 20) -> List[Tuple[str, float]]:
"""检索,返回 [(chunk_id, score), ...]"""
searcher = self.index.searcher()
parser = QueryParser("content", self.schema)
query_obj = parser.parse(query)
results = searcher.search(query_obj, limit=top_k)

return [(r["chunk_id"], r.score) for r in results]

向量检索(以 Chroma 为例)

# retrieval/hybrid.py (continued)
import chromadb


class VectorRetriever:
"""向量检索(ChromaDB)"""

def __init__(self, collection_name: str = "fde_rag"):
self.client = chromadb.PersistentClient(path="./chroma_db")
self.collection = self.client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"},
)

def add_documents(self, chunks: List[object], vectors: List[List[float]]):
"""添加文档到向量库"""
self.collection.add(
embeddings=vectors,
documents=[c.text for c in chunks],
metadatas=[c.metadata for c in chunks],
ids=[c.chunk_id for c in chunks],
)

def search(
self,
query_vector: List[float],
top_k: int = 20,
where_filter: dict | None = None,
) -> List[Tuple[str, float]]:
"""检索,返回 [(chunk_id, distance), ...]"""
kwargs = {
"query_embeddings": [query_vector],
"n_results": top_k,
"include": ["distances", "metadatas"],
}
if where_filter:
kwargs["where"] = where_filter

results = self.collection.query(**kwargs)
# Chroma 返回 distance(越小越相似),转为 similarity score
ids = results["ids"][0]
distances = results["distances"][0]
scores = [1.0 - d for d in distances]

return list(zip(ids, scores))

RRF(Reciprocal Rank Fusion)融合

# retrieval/hybrid.py (continued)
from collections import defaultdict


def reciprocal_rank_fusion(
bm25_results: List[Tuple[str, float]],
vector_results: List[Tuple[str, float]],
k: int = 60,
top_k: int = 50,
) -> List[Tuple[str, float]]:
"""
RRF 融合算法——合并 BM25 和向量检索结果

RRF(d) = sum(1 / (k + rank(d))) for each ranking
k=60 是论文推荐的常数,平衡高排名和低排名的权重
"""
scores = defaultdict(float)

# BM25 排名贡献
for rank, (chunk_id, _) in enumerate(bm25_results):
scores[chunk_id] += 1.0 / (k + rank + 1)

# 向量检索排名贡献
for rank, (chunk_id, _) in enumerate(vector_results):
scores[chunk_id] += 1.0 / (k + rank + 1)

# 按 RRF 分数排序
sorted_results = sorted(scores.items(), key=lambda x: -x[1])
return sorted_results[:top_k]


class HybridRetriever:
"""混合检索——BM25 + 向量 + RRF 融合"""

def __init__(
self,
bm25: BM25Retriever,
vector: VectorRetriever,
embedder,
):
self.bm25 = bm25
self.vector = vector
self.embedder = embedder

def search(
self,
query: str,
top_k: int = 50,
where_filter: dict | None = None,
) -> List[Tuple[str, float]]:
"""混合检索"""
# 1. BM25 检索(关键词匹配)
bm25_results = self.bm25.search(query, top_k * 2)

# 2. 向量检索(语义匹配)
query_vector = self.embedder.encode(query).tolist()
vector_results = self.vector.search(query_vector, top_k * 2, where_filter)

# 3. RRF 融合
fused = reciprocal_rank_fusion(bm25_results, vector_results, top_k=top_k)

return fused

查询重写

# retrieval/query_rewrite.py
from typing import List


class QueryRewriter:
"""查询重写——提升检索质量"""

def __init__(self, llm_client=None):
self.llm = llm_client

def hyde(self, query: str) -> str:
"""
HyDE(Hypothetical Document Embeddings):
让 LLM 生成一个假设性答案文档,用这个文档的 embedding 去检索
"""
if not self.llm:
return query

prompt = (
f"请为以下问题写一段简短的技术回答(100 字以内),"
f"使用专业术语和准确的技术词汇:\n\n问题:{query}"
)
hypothetical = self.llm.generate(prompt, max_tokens=150)
return hypothetical

def step_back(self, query: str) -> List[str]:
"""
Step-back Prompting:
生成更宽泛的上层问题,检索更通用的上下文
"""
if not self.llm:
return [query]

prompt = (
f"请为以下具体问题生成 2 个更宽泛的上层问题,"
f"用于检索更通用的技术背景:\n\n问题:{query}\n\n"
f"返回格式:每行一个问题"
)
response = self.llm.generate(prompt, max_tokens=200)
return [line.strip() for line in response.split("\n") if line.strip()]

def classify(self, query: str) -> str:
"""查询分类——路由到不同检索策略"""
keywords_lower = query.lower()

# 代码/错误码查询——走 BM25 为主
code_keywords = ["error", "exception", "404", "500", "traceback", "bug", "报错", "异常"]
if any(kw in keywords_lower for kw in code_keywords):
return "code"

# 操作指南查询——走混合检索
action_keywords = ["怎么", "如何", "how to", "步骤", "教程", "部署", "安装"]
if any(kw in keywords_lower for kw in action_keywords):
return "howto"

# 概念解释查询——走向量检索为主
concept_keywords = ["什么是", "what is", "原理", "机制", "区别", "vs", "比较"]
if any(kw in keywords_lower for kw in concept_keywords):
return "concept"

return "general"

第 4 节:重排序 Reranking

混合检索返回 Top-50 后,需要用交叉编码器(Cross-Encoder)做精排,把最相关的 5 条送给 LLM。

BGE Reranker 实现

# retrieval/reranker.py
from typing import List, Tuple


class BGEReranker:
"""BGE Reranker v2——本地交叉编码器"""

def __init__(self, model_name: str = "BAAI/bge-reranker-v2-m3"):
from FlagEmbedding import FlagReranker
self.model = FlagReranker(model_name, use_fp16=True)

def rerank(
self,
query: str,
passages: List[Tuple[str, str]], # [(chunk_id, text), ...]
top_k: int = 5,
) -> List[Tuple[str, float]]:
"""重排序,返回 [(chunk_id, score), ...]"""
if not passages:
return []

pairs = [(query, text) for _, text in passages]
scores = self.model.compute_score(pairs, normalize=True)

# BGE 返回的可能是标量或列表
if isinstance(scores, float):
scores = [scores]

scored = [(passages[i][0], float(scores[i])) for i in range(len(passages))]
scored.sort(key=lambda x: -x[1])
return scored[:top_k]


class CohereReranker:
"""Cohere Rerank API——云端重排序"""

def __init__(self, api_key: str, model: str = "rerank-multilingual-v3.0"):
import cohere
self.client = cohere.Client(api_key)
self.model = model

def rerank(
self,
query: str,
passages: List[Tuple[str, str]],
top_k: int = 5,
) -> List[Tuple[str, float]]:
texts = [text for _, text in passages]
response = self.client.rerank(
model=self.model,
query=query,
documents=texts,
top_n=top_k,
)

return [(passages[r.index][0], r.relevance_score) for r in response.results]

两阶段检索 Pipeline

# retrieval/reranker.py (continued)
class RerankPipeline:
"""两阶段检索:混合检索 → 重排序"""

def __init__(self, hybrid_retriever, reranker):
self.retriever = hybrid_retriever
self.reranker = reranker
# 需要缓存 chunk 文本用于重排
self._text_cache: dict[str, str] = {}

def add_to_cache(self, chunks: List[object]):
for chunk in chunks:
self._text_cache[chunk.chunk_id] = chunk.text

def search(self, query: str, top_k: int = 5) -> List[Tuple[str, float, str]]:
"""
完整检索流程:
1. 混合检索 → Top-50
2. 重排序 → Top-5
3. 返回 (chunk_id, score, text)
"""
# 阶段 1:混合检索
raw_results = self.retriever.search(query, top_k=50)

# 阶段 2:取回文本用于重排
passages = []
for chunk_id, score in raw_results:
text = self._text_cache.get(chunk_id, "")
if text:
passages.append((chunk_id, text))

# 阶段 3:重排序
reranked = self.reranker.rerank(query, passages, top_k=top_k)

# 阶段 4:组装结果
return [
(chunk_id, score, self._text_cache.get(chunk_id, ""))
for chunk_id, score in reranked
]

重排序效果对比

不使用重排序(Top-5 直接来自混合检索):
准确率:72% 平均相关度:0.68

使用 BGE Reranker(Top-50 → 重排 → Top-5):
准确率:85% 平均相关度:0.82
延迟增加:~200ms(本地 GPU)

使用 Cohere Rerank(Top-50 → API 重排 → Top-5):
准确率:87% 平均相关度:0.85
延迟增加:~800ms(网络往返)

结论:重排序是 RAG 系统 ROI 最高的优化——+13% 准确率,延迟可接受

第 5 节:答案生成 Generation

RAG 提示词模板

# generation/prompt.py
from typing import List
from datetime import datetime


class RAGPromptBuilder:
"""RAG 提示词构建器"""

SYSTEM_PROMPT = """\
你是一个专业的技术问答助手。请根据以下提供的技术文档片段回答问题。

要求:
1. 只基于提供的文档内容回答,不要使用外部知识
2. 如果文档中没有足够信息,请明确说"根据现有文档,无法回答这个问题"
3. 每个回答末尾标注引用来源,格式为 [来源: 文档名称]
4. 涉及代码示例时,使用 markdown 代码块格式
5. 回答应该结构清晰,分点说明
"""

def build(
self,
query: str,
contexts: List[tuple], # [(chunk_id, score, text), ...]
max_context_length: int = 4000,
) -> tuple[str, str]:
"""
构建 RAG 提示词
返回 (system_prompt, user_prompt)
"""
# 组装上下文
context_parts = []
total_length = 0
used_sources = []

for i, (chunk_id, score, text) in enumerate(contexts):
source = chunk_id.split(":")[0] if ":" in chunk_id else "unknown"
context_part = f"--- 文档片段 {i + 1} [来源: {source}] (相关度: {score:.2f}) ---\n{text}"
if total_length + len(context_part) > max_context_length:
break
context_parts.append(context_part)
used_sources.append(source)
total_length += len(context_part)

user_prompt = (
f"问题:{query}\n\n"
f"参考文档:\n"
+ "\n\n".join(context_parts)
+ "\n\n请根据以上文档片段回答问题。"
)

return self.SYSTEM_PROMPT, user_prompt

答案生成 + "我不知道"机制

# generation/answer.py
from typing import List, Optional
from .prompt import RAGPromptBuilder


class AnswerGenerator:
"""答案生成器——含质量检测和"我不知道"机制"""

def __init__(self, llm_client, min_context_score: float = 0.5):
self.llm = llm_client
self.prompt_builder = RAGPromptBuilder()
self.min_context_score = min_context_score

def generate(
self,
query: str,
contexts: List[tuple], # [(chunk_id, score, text), ...]
max_tokens: int = 1024,
) -> dict:
"""
生成答案
返回 {
"answer": str,
"sources": list[str],
"confidence": str, # "high" / "medium" / "low"
"has_sufficient_context": bool,
}
"""
# 1. 检查是否有足够的上下文
if not contexts:
return {
"answer": "抱歉,我在知识库中没有找到相关信息,无法回答这个问题。",
"sources": [],
"confidence": "low",
"has_sufficient_context": False,
}

# 2. 检查上下文质量
best_score = max(score for _, score, _ in contexts)
if best_score < self.min_context_score:
return {
"answer": (
f"我在知识库中搜索了相关内容,但没有找到足够准确的信息来回答"{query}」。"
f"建议你尝试换一个问法,或者联系技术支持。"
),
"sources": [],
"confidence": "low",
"has_sufficient_context": False,
}

# 3. 构建提示词并生成
system_prompt, user_prompt = self.prompt_builder.build(query, contexts)
answer = self.llm.generate(
system_prompt=system_prompt,
user_prompt=user_prompt,
max_tokens=max_tokens,
)

# 4. 提取引用来源
sources = list(set(
chunk_id.split(":")[0]
for chunk_id, _, _ in contexts
))

# 5. 置信度评估
confidence = self._assess_confidence(answer, contexts)

return {
"answer": answer,
"sources": sources,
"confidence": confidence,
"has_sufficient_context": True,
}

def _assess_confidence(self, answer: str, contexts: List[tuple]) -> str:
"""评估答案置信度"""
avg_score = sum(score for _, score, _ in contexts) / len(contexts)
answer_length = len(answer)

if avg_score > 0.75 and answer_length > 50:
return "high"
elif avg_score > 0.6:
return "medium"
else:
return "low"

LLM 客户端抽象

# generation/answer.py (continued)
from openai import OpenAI


class LLMClient:
"""LLM 客户端抽象——兼容 OpenAI 接口"""

def __init__(
self,
api_key: str,
base_url: str = "https://api.openai.com/v1",
model: str = "gpt-4o-mini",
):
self.client = OpenAI(api_key=api_key, base_url=base_url)
self.model = model

def generate(
self,
system_prompt: str,
user_prompt: str,
max_tokens: int = 1024,
) -> str:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
max_tokens=max_tokens,
temperature=0.1, # RAG 场景用低温度,保证一致性
)
return response.choices[0].message.content.strip()

第 6 节:评估 Evaluation

没有评估的 RAG 系统就是盲飞。你需要知道回答好不好,而不仅仅是"能不能回答"。

Ragas 评估 Pipeline

# evaluation/ragas_eval.py
from typing import List, Dict
from dataclasses import dataclass


@dataclass
class EvalResult:
question: str
answer: str
contexts: List[str]
ground_truth: str
# Ragas 指标
faithfulness: float # 答案是否忠于上下文
answer_relevancy: float # 答案是否切题
context_precision: float # 检索的上下文是否相关
context_recall: float # 检索是否覆盖了答案所需信息


class RagasEvaluator:
"""Ragas 评估——4 个核心指标"""

def __init__(self, llm_client, embedder):
self.llm = llm_client
self.embedder = embedder

def evaluate_single(
self,
question: str,
answer: str,
contexts: List[str],
ground_truth: str,
) -> EvalResult:
"""评估单个问答对"""
# 1. Faithfulness:答案中的每个声明都能在上下文中找到依据
faithfulness = self._check_faithfulness(answer, contexts)

# 2. Answer Relevancy:答案是否直接回答了问题
answer_relevancy = self._check_relevancy(question, answer)

# 3. Context Precision:检索到的上下文是否与问题相关
context_precision = self._check_context_precision(question, contexts)

# 4. Context Recall:上下文是否包含了回答所需的全部信息
context_recall = self._check_context_recall(ground_truth, contexts)

return EvalResult(
question=question,
answer=answer,
contexts=contexts,
ground_truth=ground_truth,
faithfulness=faithfulness,
answer_relevancy=answer_relevancy,
context_precision=context_precision,
context_recall=context_recall,
)

def _check_faithfulness(self, answer: str, contexts: List[str]) -> float:
"""检查答案是否忠于上下文"""
context_text = "\n".join(contexts)
prompt = (
f"请判断以下答案是否完全基于参考文档。如果答案中的每个声明都能在"
f"文档中找到依据,请返回 1.0;如果部分声明无法在文档中找到,"
f"请返回 0.0-1.0 之间的分数;如果答案与文档矛盾,返回 0.0。\n\n"
f"参考文档:{context_text[:2000]}\n\n"
f"答案:{answer}\n\n"
f"只返回一个 0.0-1.0 之间的数字。"
)
result = self.llm.generate(
system_prompt="你是一个严格的评估助手。只返回数字。",
user_prompt=prompt,
max_tokens=10,
)
try:
return float(result.strip())
except ValueError:
return 0.5

def _check_relevancy(self, question: str, answer: str) -> float:
"""检查答案是否切题"""
prompt = (
f"请判断以下答案是否直接回答了问题。返回 0.0-1.0 之间的分数。\n\n"
f"问题:{question}\n\n"
f"答案:{answer}\n\n"
f"只返回一个数字。"
)
result = self.llm.generate(
system_prompt="你是一个严格的评估助手。只返回数字。",
user_prompt=prompt,
max_tokens=10,
)
try:
return float(result.strip())
except ValueError:
return 0.5

def _check_context_precision(self, question: str, contexts: List[str]) -> float:
"""检查检索的上下文是否相关"""
relevant_count = 0
for ctx in contexts:
prompt = (
f"请判断以下文档片段是否与问题相关。返回 yes 或 no。\n\n"
f"问题:{question}\n\n"
f"文档片段:{ctx[:500]}\n\n"
f"只返回 yes 或 no。"
)
result = self.llm.generate(
system_prompt="只返回 yes 或 no。",
user_prompt=prompt,
max_tokens=5,
)
if result.strip().lower() == "yes":
relevant_count += 1

return relevant_count / len(contexts) if contexts else 0.0

def _check_context_recall(self, ground_truth: str, contexts: List[str]) -> float:
"""检查上下文是否覆盖了标准答案的信息"""
context_text = "\n".join(contexts)
prompt = (
f"请判断参考文档是否包含了标准答案中的所有关键信息。"
f"如果完全包含返回 1.0,部分包含返回 0.5,不包含返回 0.0。\n\n"
f"标准答案:{ground_truth}\n\n"
f"参考文档:{context_text[:2000]}\n\n"
f"只返回一个数字。"
)
result = self.llm.generate(
system_prompt="你是一个严格的评估助手。只返回数字。",
user_prompt=prompt,
max_tokens=10,
)
try:
return float(result.strip())
except ValueError:
return 0.5

def evaluate_batch(
self, results: List[EvalResult]
) -> Dict[str, float]:
"""计算整体指标"""
if not results:
return {}

return {
"faithfulness": sum(r.faithfulness for r in results) / len(results),
"answer_relevancy": sum(r.answer_relevancy for r in results) / len(results),
"context_precision": sum(r.context_precision for r in results) / len(results),
"context_recall": sum(r.context_recall for r in results) / len(results),
}

测试数据集

# evaluation/dataset.py
import json
from pathlib import Path
from typing import List, Dict


class EvalDataset:
"""评估测试数据集"""

def __init__(self, data_path: str = "data/eval_qa.json"):
self.data_path = data_path
self._qa_pairs: List[Dict] = []
self.load()

def load(self):
if Path(self.data_path).exists():
with open(self.data_path) as f:
self._qa_pairs = json.load(f)

def add(self, question: str, answer: str, doc_ids: List[str] = None):
self._qa_pairs.append({
"question": question,
"ground_truth": answer,
"doc_ids": doc_ids or [],
})

def save(self):
with open(self.data_path, "w") as f:
json.dump(self._qa_pairs, f, indent=2, ensure_ascii=False)

def get_all(self) -> List[Dict]:
return self._qa_pairs

def __len__(self):
return len(self._qa_pairs)


# 示例评估数据(eval_qa.json 格式)
EXAMPLE_EVAL_DATA = [
{
"question": "如何部署 FDE 服务到 Kubernetes?",
"ground_truth": "使用 kubectl apply -f deploy/ 目录下的 YAML 文件,需要先配置好 ConfigMap 和 Secret。",
"doc_ids": ["deployment-guide"],
},
{
"question": "HTTP 503 错误怎么排查?",
"ground_truth": "检查后端服务是否存活、负载均衡配置是否正确、健康检查端点是否返回 200。",
"doc_ids": ["troubleshooting-http"],
},
]

评估报告示例

═══════════════════════════════════════════════════════
FDE-RAG 评估报告(500 问答对)
═══════════════════════════════════════════════════════

Faithfulness(忠实度): 0.87 ← 答案是否忠于文档
Answer Relevancy(相关度): 0.82 ← 答案是否切题
Context Precision(精确度): 0.79 ← 检索的上下文是否相关
Context Recall(召回率): 0.74 ← 检索是否覆盖所需信息

综合评分: 0.81

分策略对比:
Fixed Chunk (500): Faithfulness 0.81 Recall 0.68
Recursive Chunk: Faithfulness 0.84 Recall 0.71
Document-Aware: Faithfulness 0.86 Recall 0.73
Document-Aware+Rerank: Faithfulness 0.87 Recall 0.74 ← 最佳

常见失败模式:
1. 检索到了相关文档但缺少关键细节(32%)
2. 重排序把更相关的文档排到了后面(18%)
3. 文档本身过时,与实际代码不一致(15%)
4. LLM 幻觉,添加了文档中没有的信息(12%)

第 7 节:Guardrails 护栏

RAG 系统需要双向护栏:输入防攻击,输出防幻觉。

输入护栏

# guardrails/input_guard.py
import re
from dataclasses import dataclass


@dataclass
class InputCheck:
passed: bool
reason: str = ""


# Prompt 注入检测关键词
INJECTION_PATTERNS = [
r"(?i)ignore\s+previous?\s+instructions?",
r"(?i)you\s+are\s+now\s+",
r"(?i)system\s*:\s*",
r"(?i)role\s*:\s*system",
r"(?i)disregard\s+all",
r"(?i)forget\s+everything",
r"(?i)new\s+instructions?\s*:",
]


class InputGuardrail:
"""输入护栏——防注入、防敏感、限流"""

def __init__(self, max_query_length: int = 2000):
self.max_query_length = max_query_length
self._compiled_patterns = [
re.compile(p) for p in INJECTION_PATTERNS
]

def check(self, query: str) -> InputCheck:
"""检查输入是否合法"""
# 1. 长度检查
if len(query) > self.max_query_length:
return InputCheck(False, f"查询超过 {self.max_query_length} 字符限制")

# 2. Prompt 注入检测
for pattern in self._compiled_patterns:
if pattern.search(query):
return InputCheck(False, "检测到疑似 Prompt 注入攻击")

# 3. 空查询
if not query.strip():
return InputCheck(False, "查询为空")

return InputCheck(True)

输出护栏

# guardrails/output_guard.py
import re


class OutputGuardrail:
"""输出护栏——幻觉检测、敏感信息防护"""

def __init__(self):
# 敏感信息模式
self._sensitive_patterns = [
re.compile(r"(?i)password\s*[:=]\s*\S+"),
re.compile(r"(?i)token\s*[:=]\s*\S{20,}"),
re.compile(r"(?i)secret\s*[:=]\s*\S+"),
re.compile(r"(?i)api[_-]?key\s*[:=]\s*\S+"),
re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), # SSN
]

def check_hallucination(self, answer: str, context_texts: list[str]) -> bool:
"""
简单幻觉检测:答案中的关键声明是否都能在上下文中找到
返回 True = 无幻觉,False = 有幻觉风险
"""
context_combined = " ".join(context_texts)

# 提取答案中的关键句(句号分隔)
sentences = [s.strip() for s in answer.split("。") if len(s.strip()) > 10]

if not sentences:
return True

# 检查每个关键句是否在上下文中有所体现
covered = 0
for sentence in sentences:
# 简单匹配:句子中 60% 以上的关键词出现在上下文中
keywords = [w for w in re.findall(r"[\w]+", sentence) if len(w) > 2]
if not keywords:
continue
matched = sum(1 for kw in keywords if kw in context_combined)
if matched / len(keywords) > 0.4:
covered += 1

return covered / len(sentences) > 0.6 if sentences else True

def check_sensitive(self, text: str) -> tuple[bool, list[str]]:
"""检测敏感信息,返回 (是否安全, 检测到的类型列表)"""
detected = []
for pattern in self._sensitive_patterns:
if pattern.search(text):
detected.append(pattern.pattern)
return len(detected) == 0, detected

def sanitize(self, answer: str) -> str:
"""清理输出——去除可能的敏感信息"""
# 替换可能的密码/token
sanitized = re.sub(
r"(?i)(password|token|secret|key)\s*[:=]\s*\S+",
r"\1: [REDACTED]",
answer,
)
return sanitized

第 8 节:生产部署

FastAPI 服务

# api/server.py
from fastapi import FastAPI, HTTPException, Header
from pydantic import BaseModel
from typing import Optional
import uuid
import time

app = FastAPI(title="FDE-RAG", version="1.0.0")


class QueryRequest(BaseModel):
query: str
top_k: int = 5
include_sources: bool = True


class QueryResponse(BaseModel):
answer: str
sources: list[str]
confidence: str
trace_id: str
latency_ms: float


@app.post("/api/query", response_model=QueryResponse)
async def query(request: QueryRequest, x_api_key: Optional[str] = Header(None)):
"""RAG 查询接口"""
trace_id = str(uuid.uuid4())
start_time = time.time()

# API Key 验证
if x_api_key != "your-api-key":
raise HTTPException(status_code=401, detail="Invalid API key")

# 输入护栏
from ..guardrails.input_guard import InputGuardrail
guard = InputGuardrail()
check = guard.check(request.query)
if not check.passed:
raise HTTPException(status_code=400, detail=check.reason)

# 查询 RAG Pipeline
# (这里接入前面实现的所有组件)
result = await rag_pipeline.query(request.query, top_k=request.top_k)

latency = (time.time() - start_time) * 1000

return QueryResponse(
answer=result["answer"],
sources=result["sources"] if request.include_sources else [],
confidence=result["confidence"],
trace_id=trace_id,
latency_ms=round(latency, 1),
)


@app.post("/api/ingest")
async def ingest_document(doc_path: str, x_api_key: Optional[str] = Header(None)):
"""文档导入接口"""
if x_api_key != "your-api-key":
raise HTTPException(status_code=401, detail="Invalid API key")

# 导入文档 → 分块 → 嵌入 → 存储
await rag_pipeline.ingest(doc_path)
return {"status": "ok", "message": f"Ingested: {doc_path}"}


@app.get("/api/health")
async def health():
"""健康检查"""
return {"status": "healthy", "version": "1.0.0"}

Docker 部署

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY pyproject.toml .
RUN pip install --no-cache-dir -e .

COPY src/ src/
COPY data/ data/

EXPOSE 8000

CMD ["uvicorn", "src.api.server:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: "3.8"
services:
api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- qdrant

qdrant:
image: qdrant/qdrant:latest
ports:
- "6333:6333"
volumes:
- qdrant_data:/qdrant/storage

volumes:
qdrant_data:

第 9 节:完整运行演示

文档导入

$ python -m fde_rag.ingest data/sample_docs/
[1/12] 处理 deployment-guide.md... → 8 chunks
[2/12] 处理 troubleshooting-http.md... → 12 chunks
...
[12/12] 处理 api-reference.md... → 45 chunks
总计: 156 chunks, 嵌入耗时 23.4s

查询示例

$ curl -X POST http://localhost:8000/api/query \
-H "Content-Type: application/json" \
-H "X-API-Key: your-api-key" \
-d '{"query": "如何排查 HTTP 503 错误?"}'

{
"answer": "HTTP 503 错误的排查步骤如下:\n\n1. 检查后端服务是否存活...",
"sources": ["troubleshooting-http"],
"confidence": "high",
"trace_id": "a1b2c3d4-...",
"latency_ms": 1247.3
}

评估运行

$ python -m fde_rag.evaluate
评估中... 500/500

═══════════════════════════════════════════════
Faithfulness: 0.87
Answer Relevancy: 0.82
Context Precision: 0.79
Context Recall: 0.74
综合评分: 0.81
═══════════════════════════════════════════════

知识点映射

本章涉及的前置章节知识点
第 02 章:LLM 基础嵌入模型、生成模型调用
第 03 章:Context EngineeringRAG 提示词设计
第 05 章:框架与工具MCP 协议(RAG 系统可作为 MCP Tool 暴露)
第 06 章:RAG 系统本章是第 06 章的完整代码实现
第 09 章:安全与 Guardrails输入/输出护栏实现
第 10 章:生产部署FastAPI、Docker、CI/CD
第 12 章:评估与可观测性Ragas 评估、Trace ID

← 返回 导航图 | 前往 MCP Server 实战