返回学习路线

每周 6-10 小时

本周目标

掌握 RAG(Retrieval-Augmented Generation)的完整技术栈。RAG 让 LLM 能够基于外部知识回答问题, 有效缓解幻觉问题。从文档预处理到检索策略再到答案生成,理解每个环节的技术选择和权衡。

学习内容

11.2 文档处理与分块

文档处理是RAG系统的第一道关卡,其质量直接影响后续检索和生成的效果。本节将详细介绍文档加载、分块策略、元数据管理等关键技术。

11.2.1 文档加载与解析:PDF、Word、HTML

RAG系统需要处理多种格式的文档,不同格式的解析方法各有特点。

PDF文档解析是最常见的需求。PDF格式设计用于保持排版一致性,而非结构化文本提取,因此解析存在一定挑战。常用的Python库包括:

  • PyPDF2:纯Python实现,轻量级,适合简单文本提取
  • pdfplumber:基于pdfminer,能更好地保留表格结构
  • pymupdf(fitz):速度快,支持图像提取,适合复杂PDF
  • Unstructured:企业级方案,支持多种文档格式

对于扫描版PDF(图像格式),需要先进行OCR识别。Tesseract是开源OCR引擎的代表,而商业方案如Azure Document Intelligence、Google Document AI在复杂版式识别上表现更好。

Word文档(.docx)解析相对简单,python-docx库可以直接提取段落、表格、样式信息。需要注意的是,Word文档中的复杂格式(如嵌套表格、文本框)可能需要特殊处理。

HTML/网页解析需要考虑网页的动态性和多样性。BeautifulSoup配合requests可以处理静态页面,而Selenium或Playwright则用于渲染JavaScript动态内容。对于大规模网页抓取,还需要考虑反爬虫策略和抓取频率控制。

以下是一个使用LangChain加载多种文档格式的示例代码:

from langchain.document_loaders import (
    PyPDFLoader,
    Docx2txtLoader,
    UnstructuredHTMLLoader,
    TextLoader
)
from langchain.schema import Document
from typing import List
import os

def load_documents(directory: str) -> List[Document]:
    """加载目录下的所有支持格式的文档"""
    documents = []
    
    for filename in os.listdir(directory):
        filepath = os.path.join(directory, filename)
        
        if filename.endswith('.pdf'):
            loader = PyPDFLoader(filepath)
        elif filename.endswith('.docx'):
            loader = Docx2txtLoader(filepath)
        elif filename.endswith('.html') or filename.endswith('.htm'):
            loader = UnstructuredHTMLLoader(filepath)
        elif filename.endswith('.txt') or filename.endswith('.md'):
            loader = TextLoader(filepath, encoding='utf-8')
        else:
            print(f"跳过不支持的文件格式: {filename}")
            continue
            
        docs = loader.load()
        # 添加文件名作为元数据
        for doc in docs:
            doc.metadata['source'] = filename
        documents.extend(docs)
    
    return documents

# 使用示例
docs = load_documents("/data/documents")
print(f"共加载 {len(docs)} 个文档片段")

11.2.2 分块策略与方法:固定长度、语义切分、递归切分

文本分块(Text Chunking)是RAG系统的关键环节。分块策略的选择会直接影响检索精度和生成质量。

**固定长度分块(Fixed-size Chunking)**是最简单的方法,按照固定的token数或字符数切分文本,可设置重叠区域保持上下文连贯。

from langchain.text_splitter import CharacterTextSplitter

# 固定长度分块示例
text_splitter = CharacterTextSplitter(
    separator="\n",  # 分隔符
    chunk_size=500,  # 每块大小
    chunk_overlap=50,  # 重叠大小
    length_function=len,
)

chunks = text_splitter.split_documents(docs)

固定长度分块的优点是实现简单、速度快,缺点是可能切断语义完整的句子或段落,导致检索时丢失上下文。

**递归字符切分(Recursive Character Text Splitter)**是LangChain推荐的分块策略。它尝试使用一系列分隔符(段落→句子→单词)递归地切分文本,优先保持语义单元的完整性。

from langchain.text_splitter import RecursiveCharacterTextSplitter

# 递归字符切分
text_splitter = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", "。", "!", "?", " ", ""],
    chunk_size=500,
    chunk_overlap=50,
    length_function=len,
)

chunks = text_splitter.split_documents(docs)

递归切分能更好地保留段落和句子边界,是大多数场景下的推荐选择。

**语义切分(Semantic Chunking)**基于文本的语义相似度进行切分。它使用嵌入模型计算相邻句子的相似度,在相似度低于阈值的位置进行切分。这种方法能产生语义更连贯的文本块。

from langchain.text_splitter import SemanticChunker
from langchain.embeddings import OpenAIEmbeddings

# 语义切分
text_splitter = SemanticChunker(
    OpenAIEmbeddings(),
    breakpoint_threshold_type="percentile",
    breakpoint_threshold_amount=85
)

chunks = text_splitter.split_documents(docs)

语义切分的优点是生成的文本块语义一致性高,缺点是计算开销较大,需要为每个句子生成嵌入向量。

基于结构的分块针对特定文档类型设计。例如,Markdown文档可以按标题层级切分,HTML可以按标签切分,代码文件可以按函数或类切分。

from langchain.text_splitter import MarkdownHeaderTextSplitter

# Markdown按标题切分
headers_to_split_on = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("###", "Header 3"),
]

markdown_splitter = MarkdownHeaderTextSplitter(
    headers_to_split_on=headers_to_split_on
)

分块策略对比图

上图展示了不同分块策略的性能对比。从实验结果可以看出,语义切分在召回率和精确率上表现最佳,但速度较慢;递归切分在各项指标上取得了较好的平衡,是生产环境的推荐选择。

分块大小选择是另一个关键参数。块太小会丢失上下文信息,块太大则会稀释关键信息。根据实践经验,对于中文文本,256-512个token是较好的选择范围。具体选择需要结合嵌入模型的上下文窗口和实际应用场景进行测试。

分块策略优点缺点适用场景
固定长度实现简单、速度快可能切断语义单元快速原型、简单文档
递归切分保留结构边界、效果均衡对复杂格式支持有限通用场景、推荐首选
语义切分语义一致性高计算开销大高质量要求场景
结构切分保留文档结构需要格式预处理Markdown、HTML、代码

11.2.3 元数据与上下文保留:标题、页码、来源

在文档处理过程中,保留元数据对于提升RAG系统的效果至关重要。元数据不仅可以帮助用户追溯信息来源,还可以在检索时进行过滤和排序。

基础元数据包括:

  • 来源(source):文档文件名或URL
  • 页码(page):PDF或Word文档的页码
  • 标题(title):文档标题或章节标题
  • 创建时间(created_at):文档创建或更新时间

结构化元数据根据文档类型有所不同:

  • 技术文档:产品版本、模块名称、文档类型
  • 法律文档:法律领域、效力级别、发布机构
  • 新闻文档:发布时间、作者、分类标签

以下是在分块时保留元数据的示例:

from langchain.schema import Document
from typing import List

def split_documents_with_metadata(
    documents: List[Document], 
    text_splitter
) -> List[Document]:
    """分块并保留元数据"""
    chunks = []
    
    for doc in documents:
        # 获取原始元数据
        base_metadata = doc.metadata.copy()
        
        # 分块
        doc_chunks = text_splitter.split_documents([doc])
        
        # 为每个块添加序号和上下文
        for i, chunk in enumerate(doc_chunks):
            chunk.metadata.update(base_metadata)
            chunk.metadata['chunk_index'] = i
            chunk.metadata['total_chunks'] = len(doc_chunks)
            
            # 添加上下文信息
            if i > 0:
                chunk.metadata['prev_context'] = doc_chunks[i-1].page_content[-100:]
            if i < len(doc_chunks) - 1:
                chunk.metadata['next_context'] = doc_chunks[i+1].page_content[:100]
        
        chunks.extend(doc_chunks)
    
    return chunks

上下文增强是一种进阶技术,通过将相邻块的内容作为上下文附加到当前块,帮助模型理解更完整的语义。例如,可以将前一个块的末尾100个字符和后一个块的开头100个字符作为上下文。

11.2.4 多模态文档处理:图像、表格、图表

实际业务中的文档往往包含图像、表格、图表等多模态元素,这些元素的处理对RAG系统提出了更高要求。

表格处理是常见挑战。简单的文本提取会将表格内容打乱,丢失行列关系。处理策略包括:

  1. 结构化提取:使用专门的表格提取工具(如Camelot、Tabula)将表格转换为结构化数据
  2. HTML表示:将表格转换为HTML格式保留结构
  3. 描述生成:使用多模态模型生成表格的文字描述
import camelot

def extract_tables_from_pdf(pdf_path: str):
    """从PDF中提取表格"""
    tables = camelot.read_pdf(pdf_path, pages='all')
    
    table_docs = []
    for i, table in enumerate(tables):
        # 转换为DataFrame
        df = table.df
        
        # 生成文本描述
        table_text = f"表格 {i+1}:\n"
        table_text += df.to_string(index=False)
        
        doc = Document(
            page_content=table_text,
            metadata={
                'source': pdf_path,
                'page': table.page,
                'type': 'table',
                'table_index': i
            }
        )
        table_docs.append(doc)
    
    return table_docs

图像处理可以采用以下策略:

  1. OCR识别:对包含文字的图像进行OCR,提取文本内容
  2. 图像描述:使用视觉语言模型(如GPT-4V、CLIP)生成图像描述
  3. 多模态嵌入:使用多模态嵌入模型将图像和文本映射到同一向量空间
from transformers import CLIPProcessor, CLIPModel
import torch
from PIL import Image

# 加载CLIP模型
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

def get_image_description(image_path: str) -> str:
    """使用CLIP获取图像描述(简化示例)"""
    image = Image.open(image_path)
    
    # 候选描述
    candidate_labels = ["图表", "流程图", "截图", "照片", "示意图"]
    
    inputs = processor(
        text=candidate_labels,
        images=image,
        return_tensors="pt",
        padding=True
    )
    
    outputs = model(**inputs)
    logits_per_image = outputs.logits_per_image
    probs = logits_per_image.softmax(dim=1)
    
    # 返回最可能的描述
    best_idx = probs.argmax().item()
    return f"这是一个{candidate_labels[best_idx]}"

11.3 向量检索系统

向量检索是RAG系统的核心技术,负责在海量文档中快速找到与用户查询最相关的内容。本节将介绍嵌入模型选择、向量数据库对比、索引优化等关键技术。

11.3.1 嵌入模型选择:Sentence-Transformers、OpenAI Embedding

嵌入模型(Embedding Model)将文本转换为高维向量,使得语义相似的文本在向量空间中距离更近。选择合适的嵌入模型对RAG系统的性能至关重要。

Sentence-Transformers是开源嵌入模型的事实标准,提供了多种预训练模型:

模型名称维度特点适用场景
all-MiniLM-L6-v2384轻量快速原型开发、资源受限环境
all-mpnet-base-v2768质量均衡通用检索任务
all-distilroberta-v1768蒸馏模型速度与质量的平衡
paraphrase-multilingual-MiniLM-L12-v2384多语言支持非英语文本
from sentence_transformers import SentenceTransformer

# 加载Sentence-Transformer模型
model = SentenceTransformer('all-MiniLM-L6-v2')

# 生成嵌入
texts = ["这是一个测试句子", "这是另一个句子"]
embeddings = model.encode(texts)

print(f"嵌入维度: {embeddings.shape[1]}")
print(f"嵌入示例: {embeddings[0][:5]}")

OpenAI Embedding提供了商业级的嵌入服务,包括text-embedding-ada-002和text-embedding-3系列:

模型名称维度特点价格(每1K tokens)
text-embedding-ada-0021536通用性强$0.0001
text-embedding-3-small1536性价比高$0.00002
text-embedding-3-large3072最高质量$0.00013
from openai import OpenAI

client = OpenAI()

def get_openai_embedding(texts: list) -> list:
    """使用OpenAI API获取嵌入"""
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=texts
    )
    return [item.embedding for item in response.data]

模型选择建议

  1. 快速原型:使用all-MiniLM-L6-v2或text-embedding-3-small
  2. 生产环境:根据质量要求选择all-mpnet-base-v2或text-embedding-3-large
  3. 多语言场景:使用paraphrase-multilingual系列或E5多语言模型
  4. 领域特定:考虑在领域数据上微调开源模型

MTEB基准(Massive Text Embedding Benchmark)是评估嵌入模型的重要参考。它涵盖了检索、聚类、分类等多种任务,可以帮助开发者选择适合特定场景的模型。

11.3.2 向量数据库对比:FAISS、Chroma、Pinecone、Milvus

向量数据库负责存储和检索高维向量,是RAG系统的核心基础设施。选择合适的向量数据库需要考虑性能、规模、运维成本等多个因素。

向量数据库性能对比图

数据库类型特点适用规模索引类型
FAISS开源库纯内存、速度快<1000万HNSW、IVF、PQ
Chroma开源数据库轻量易用<500万HNSW
Pinecone托管服务免运维、弹性扩展任意规模专有算法
Milvus开源/托管分布式、功能丰富十亿级多种索引
Weaviate开源/托管混合搜索强<5000万HNSW
Qdrant开源/托管过滤功能强<5000万HNSW

FAISS(Facebook AI Similarity Search)是Meta开源的向量检索库,提供了多种高效的ANN算法实现。它采用纯内存计算,在单机环境下性能优异。

import faiss
import numpy as np

# 创建FAISS索引
dimension = 384  # 嵌入维度
index = faiss.IndexFlatIP(dimension)  # 内积相似度(余弦相似度需归一化)

# 添加向量
vectors = np.random.random((10000, dimension)).astype('float32')
vectors = vectors / np.linalg.norm(vectors, axis=1, keepdims=True)  # 归一化
index.add(vectors)

# 搜索
query = np.random.random((1, dimension)).astype('float32')
query = query / np.linalg.norm(query)
distances, indices = index.search(query, k=10)

print(f"最相似的10个向量的索引: {indices[0]}")
print(f"相似度分数: {distances[0]}")

Chroma是专为AI应用设计的轻量级向量数据库,与LangChain集成良好,适合快速原型开发。

import chromadb

# 创建Chroma客户端
client = chromadb.Client()

# 创建集合
collection = client.create_collection(name="documents")

# 添加文档
collection.add(
    documents=["这是文档1", "这是文档2", "这是文档3"],
    metadatas=[{"source": "doc1"}, {"source": "doc2"}, {"source": "doc3"}],
    ids=["id1", "id2", "id3"]
)

# 查询
results = collection.query(
    query_texts=["查询内容"],
    n_results=2
)

Pinecone是全托管的向量数据库服务,无需关心基础设施运维,适合不想投入运维资源的企业。

from pinecone import Pinecone, ServerlessSpec

# 初始化Pinecone
pc = Pinecone(api_key="your-api-key")

# 创建索引
index_name = "rag-index"
if index_name not in pc.list_indexes().names():
    pc.create_index(
        name=index_name,
        dimension=384,
        metric="cosine",
        spec=ServerlessSpec(cloud="aws", region="us-east-1")
    )

# 连接索引
index = pc.Index(index_name)

# 添加向量
index.upsert(vectors=[
    ("id1", [0.1] * 384, {"source": "doc1"}),
    ("id2", [0.2] * 384, {"source": "doc2"}),
])

# 查询
results = index.query(
    vector=[0.1] * 384,
    top_k=2,
    include_metadata=True
)

Milvus是功能最丰富的开源向量数据库,支持分布式部署、混合检索、多租户等企业级特性。

from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection

# 连接Milvus
connections.connect("default", host="localhost", port="19530")

# 定义集合schema
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=384),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
]
schema = CollectionSchema(fields, "Document collection")

# 创建集合
collection = Collection("documents", schema)

# 创建索引
index_params = {
    "metric_type": "COSINE",
    "index_type": "HNSW",
    "params": {"M": 16, "efConstruction": 500}
}
collection.create_index("embedding", index_params)

11.3.3 索引构建与优化:HNSW、IVF、乘积量化

近似最近邻(ANN)算法是向量数据库的核心,在检索速度和召回率之间取得平衡。常见的索引算法包括:

**暴力搜索(Flat/Exhaustive Search)**计算查询向量与所有文档向量的相似度,返回最相似的K个结果。优点是召回率100%,缺点是时间复杂度O(N),仅适用于小规模数据。

**HNSW(Hierarchical Navigable Small World)**是目前最流行的ANN算法之一。它构建多层图结构,通过贪心搜索快速定位最近邻。

HNSW的核心参数:

  • M:每个节点的最大连接数,越大图越稠密,召回率越高,内存消耗越大
  • efConstruction:构建时的搜索深度,越大构建质量越高
  • efSearch:查询时的搜索深度,越大召回率越高,查询越慢
import faiss

# 创建HNSW索引
d = 384  # 维度
M = 16  # 连接数
efConstruction = 200

index = faiss.IndexHNSWFlat(d, M)
index.hnsw.efConstruction = efConstruction
index.hnsw.efSearch = 128

# 添加数据
xb = np.random.random((100000, d)).astype('float32')
index.add(xb)

# 搜索
xq = np.random.random((1, d)).astype('float32')
D, I = index.search(xq, k=10)

**IVF(Inverted File Index)**先将向量空间划分为多个聚类中心,查询时只需搜索最近的几个聚类。

# 创建IVF索引
nlist = 100  # 聚类中心数
quantizer = faiss.IndexFlatIP(d)
index = faiss.IndexIVFFlat(quantizer, d, nlist)

# 训练(需要训练数据)
index.train(xb)
index.add(xb)

# 设置搜索时的聚类数
index.nprobe = 10  # 搜索10个最近的聚类

**乘积量化(Product Quantization, PQ)**将高维向量分割为多个子向量,对每个子向量单独量化,大幅减少存储空间。

# 创建PQ索引
m = 16  # 子向量数
nbits = 8  # 每个子向量的比特数

index = faiss.IndexPQ(d, m, nbits)
index.train(xb)
index.add(xb)

IVF+PQ组合结合了两者的优势,先通过IVF缩小搜索范围,再在候选集上使用PQ加速相似度计算,是处理亿级向量的常用方案。

# 创建IVFPQ索引
nlist = 100
m = 16
nbits = 8

quantizer = faiss.IndexFlatIP(d)
index = faiss.IndexIVFPQ(quantizer, d, nlist, m, nbits)

index.train(xb)
index.add(xb)

索引算法性能对比图

上图展示了不同索引算法在搜索速度、召回率和内存效率方面的对比。HNSW在速度和召回率上表现优异,是大多数场景的首选;IVF+PQ则在超大规模数据集上具有内存优势。

11.3.4 混合检索策略:稀疏检索+密集检索

纯向量检索(密集检索)虽然能捕捉语义相似性,但在精确匹配关键词方面存在局限。混合检索结合稀疏检索(如BM25)和密集检索的优势,能显著提升检索效果。

BM25是经典的关键词检索算法,基于词频和逆文档频率计算相关性。它对精确匹配效果好,但无法理解语义相似性。

混合检索的实现方式

  1. 并行检索:同时使用向量检索和关键词检索,合并结果
  2. 加权融合:对两种检索的分数进行加权求和
  3. 级联检索:先用一种方法粗筛,再用另一种方法精排
from rank_bm25 import BM25Okapi
import numpy as np
from sklearn.preprocessing import MinMaxScaler

class HybridRetriever:
    """混合检索器:结合BM25和向量检索"""
    
    def __init__(self, documents, embeddings, k1=1.5, b=0.75):
        self.documents = documents
        self.embeddings = embeddings
        
        # 初始化BM25
        tokenized_docs = [doc.lower().split() for doc in documents]
        self.bm25 = BM25Okapi(tokenized_docs, k1=k1, b=b)
        
        # 向量索引
        self.dimension = embeddings.shape[1]
        self.vector_index = faiss.IndexFlatIP(self.dimension)
        self.vector_index.add(embeddings)
    
    def retrieve(self, query, query_embedding, top_k=10, alpha=0.5):
        """
        混合检索
        
        Args:
            query: 查询文本
            query_embedding: 查询向量
            top_k: 返回结果数
            alpha: 向量检索权重(0-1)
        """
        # BM25检索
        tokenized_query = query.lower().split()
        bm25_scores = self.bm25.get_scores(tokenized_query)
        
        # 向量检索
        query_embedding = query_embedding.reshape(1, -1)
        vector_scores, vector_indices = self.vector_index.search(
            query_embedding, len(self.documents)
        )
        
        # 归一化分数
        scaler = MinMaxScaler()
        bm25_norm = scaler.fit_transform(bm25_scores.reshape(-1, 1)).flatten()
        
        # 构建向量分数数组
        vector_score_dict = {idx: score for idx, score in 
                            zip(vector_indices[0], vector_scores[0])}
        vector_scores_array = np.array([vector_score_dict.get(i, 0) 
                                        for i in range(len(self.documents))])
        vector_norm = scaler.fit_transform(vector_scores_array.reshape(-1, 1)).flatten()
        
        # 融合分数
        hybrid_scores = alpha * vector_norm + (1 - alpha) * bm25_norm
        
        # 获取top_k结果
        top_indices = np.argsort(hybrid_scores)[::-1][:top_k]
        
        results = []
        for idx in top_indices:
            results.append({
                'document': self.documents[idx],
                'score': hybrid_scores[idx],
                'vector_score': vector_norm[idx],
                'bm25_score': bm25_norm[idx]
            })
        
        return results

# 使用示例
documents = [
    "Python是一种流行的编程语言",
    "JavaScript用于网页开发",
    "机器学习是人工智能的分支",
    "Python在数据科学中广泛应用"
]

# 生成嵌入(实际使用时应使用真实嵌入模型)
embeddings = np.random.random((len(documents), 384)).astype('float32')
embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)

retriever = HybridRetriever(documents, embeddings)

query = "Python编程"
query_embedding = np.random.random(384).astype('float32')
query_embedding = query_embedding / np.linalg.norm(query_embedding)

results = retriever.retrieve(query, query_embedding, top_k=3, alpha=0.7)
for r in results:
    print(f"文档: {r['document']}")
    print(f"混合分数: {r['score']:.3f} (向量: {r['vector_score']:.3f}, BM25: {r['bm25_score']:.3f})")
    print()

11.4 检索增强生成

检索到相关文档后,如何有效利用这些信息生成高质量回答是RAG系统的另一个关键环节。本节将介绍检索结果融合、上下文管理、重排序等技术。

11.4.1 检索结果融合:重排序、分数融合

初步检索往往返回多个候选文档,需要进一步处理才能输入大语言模型。常见的融合策略包括:

简单拼接:将检索到的文档按顺序拼接,是最直接的方法。

def simple_context_assembly(documents, max_tokens=3000):
    """简单拼接文档作为上下文"""
    context = ""
    for i, doc in enumerate(documents):
        context += f"\n[文档 {i+1}]\n{doc}\n"
        # 简单token估算(实际应使用tokenizer)
        if len(context) > max_tokens * 4:  # 粗略估算:1 token ≈ 4字符
            break
    return context

分数加权:根据检索分数对文档进行加权,高分文档获得更多关注。

def weighted_context_assembly(documents_with_scores, max_tokens=3000):
    """根据分数加权拼接文档"""
    # 按分数排序
    sorted_docs = sorted(documents_with_scores, 
                        key=lambda x: x['score'], 
                        reverse=True)
    
    context = ""
    for doc_info in sorted_docs:
        doc = doc_info['document']
        score = doc_info['score']
        context += f"\n[相关度: {score:.2f}]\n{doc}\n"
        
        if len(context) > max_tokens * 4:
            break
    
    return context

**MMR(Maximal Marginal Relevance)**在相关性和多样性之间取得平衡,避免返回内容高度相似的文档。

def mmr_select(documents, embeddings, query_embedding, lambda_param=0.5, top_k=5):
    """
    MMR算法选择多样化文档
    
    Args:
        documents: 文档列表
        embeddings: 文档嵌入
        query_embedding: 查询嵌入
        lambda_param: 相关性vs多样性的权衡参数
        top_k: 选择文档数
    """
    selected = []
    remaining = list(range(len(documents)))
    
    # 计算与查询的相似度
    query_sim = np.dot(embeddings, query_embedding)
    
    while len(selected) < top_k and remaining:
        if not selected:
            # 第一个选择最相关的
            best_idx = remaining[np.argmax(query_sim[remaining])]
        else:
            # MMR评分
            mmr_scores = []
            for idx in remaining:
                # 相关性部分
                relevance = query_sim[idx]
                # 多样性部分(与已选文档的最大相似度)
                diversity = max([np.dot(embeddings[idx], embeddings[s]) 
                                for s in selected])
                # MMR分数
                mmr_score = lambda_param * relevance - (1 - lambda_param) * diversity
                mmr_scores.append(mmr_score)
            
            best_idx = remaining[np.argmax(mmr_scores)]
        
        selected.append(best_idx)
        remaining.remove(best_idx)
    
    return [documents[i] for i in selected]

11.4.2 上下文窗口管理:Token限制、上下文压缩

大语言模型有上下文长度限制,需要有效管理检索到的文档内容。

Token预算分配:根据任务复杂度合理分配token预算。

def allocate_token_budget(model_max_tokens=4096, 
                         system_prompt_tokens=200,
                         user_query_tokens=100,
                         response_reserve=500):
    """计算可用于上下文的token数"""
    available = model_max_tokens - system_prompt_tokens - user_query_tokens - response_reserve
    return max(available, 500)  # 至少保留500 tokens

上下文压缩技术

  1. 摘要压缩:使用轻量级模型对长文档进行摘要
  2. 关键句提取:提取文档中最相关的句子
  3. 信息抽取:提取实体、关系等结构化信息
from transformers import pipeline

# 加载摘要模型(轻量级)
summarizer = pipeline("summarization", model="facebook/bart-large-cnn")

def compress_document(document, max_length=100):
    """对文档进行摘要压缩"""
    if len(document) < 200:
        return document
    
    summary = summarizer(document, max_length=max_length, 
                        min_length=30, do_sample=False)
    return summary[0]['summary_text']

Map-Reduce策略:对于超长文档,可以采用Map-Reduce方式,先分别处理每个文档片段,再合并结果。

async def map_reduce_rag(documents, query, llm_client):
    """Map-Reduce方式处理长文档"""
    
    # Map阶段:每个文档单独处理
    partial_answers = []
    for doc in documents:
        prompt = f"""基于以下文档片段回答问题。如果文档不包含相关信息,回答"无相关信息"。

文档:{doc}

问题:{query}

回答:"""
        
        response = await llm_client.complete(prompt)
        if "无相关信息" not in response:
            partial_answers.append(response)
    
    # Reduce阶段:合并部分回答
    if not partial_answers:
        return "未找到相关信息"
    
    combine_prompt = f"""基于以下部分回答,综合给出最终答案。

部分回答:
{chr(10).join(f"{i+1}. {ans}" for i, ans in enumerate(partial_answers))}

问题:{query}

综合回答:"""
    
    final_answer = await llm_client.complete(combine_prompt)
    return final_answer

11.4.3 重排序与过滤:交叉编码器、相关性过滤

初步检索使用双编码器(Bi-Encoder)计算查询和文档的相似度,速度快但精度有限。重排序阶段使用交叉编码器(Cross-Encoder)进行更精确的排序。

交叉编码器将查询和文档拼接后输入模型,通过注意力机制捕捉细粒度的交互信息,排序精度更高但速度较慢。

from sentence_transformers import CrossEncoder

# 加载交叉编码器模型
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

def rerank_documents(query, documents, top_k=5):
    """使用交叉编码器重排序"""
    # 构建查询-文档对
    pairs = [[query, doc] for doc in documents]
    
    # 计算相关性分数
    scores = cross_encoder.predict(pairs)
    
    # 按分数排序
    scored_docs = list(zip(documents, scores))
    scored_docs.sort(key=lambda x: x[1], reverse=True)
    
    return scored_docs[:top_k]

# 使用示例
docs = ["Python是一种编程语言", "Java是另一种语言", "Python用于数据科学"]
query = "Python编程"
reranked = rerank_documents(query, docs, top_k=2)
for doc, score in reranked:
    print(f"分数: {score:.3f}, 文档: {doc}")

相关性过滤:设置阈值过滤低质量检索结果,避免无关信息干扰生成。

def filter_by_relevance(documents_with_scores, threshold=0.5):
    """根据相关性分数过滤"""
    filtered = [(doc, score) for doc, score in documents_with_scores 
                if score >= threshold]
    
    if not filtered:
        # 如果全部过滤,至少保留最高分的一个
        best = max(documents_with_scores, key=lambda x: x[1])
        filtered = [best]
    
    return filtered

11.4.4 生成后处理:引用生成、幻觉检测

生成回答后,还需要进行后处理以确保质量和可靠性。

引用生成:为回答中的事实添加来源引用,提高可信度。

def add_citations(answer, source_documents):
    """为回答添加引用标注"""
    cited_answer = answer + "\n\n参考来源:\n"
    for i, doc in enumerate(source_documents, 1):
        source = doc.get('metadata', {}).get('source', f'文档{i}')
        cited_answer += f"[{i}] {source}\n"
    
    return cited_answer

幻觉检测:检测模型生成的内容与检索文档不符的部分。

def detect_hallucination(answer, source_documents, similarity_threshold=0.7):
    """
    简单幻觉检测
    检查回答中的句子是否与源文档有足够相似度
    """
    from sentence_transformers import SentenceTransformer, util
    
    model = SentenceTransformer('all-MiniLM-L6-v2')
    
    # 分割回答为句子
    sentences = answer.split('。')
    
    # 编码源文档
    source_text = ' '.join([doc['document'] for doc in source_documents])
    source_embedding = model.encode(source_text, convert_to_tensor=True)
    
    hallucinated = []
    for sent in sentences:
        if len(sent.strip()) < 10:
            continue
        
        sent_embedding = model.encode(sent, convert_to_tensor=True)
        similarity = util.pytorch_cos_sim(sent_embedding, source_embedding).item()
        
        if similarity < similarity_threshold:
            hallucinated.append({
                'sentence': sent,
                'similarity': similarity
            })
    
    return hallucinated

11.5 RAG系统构建

本节将整合前面介绍的技术,使用LangChain和LlamaIndex构建完整的RAG系统,并介绍评估与优化方法。

11.5.1 LangChain核心组件:Document Loader、Retriever、Chain

LangChain是目前最流行的RAG开发框架,提供了丰富的组件和抽象。

核心组件介绍

  1. Document Loader:加载各种格式的文档
  2. Text Splitter:文本分块
  3. Embedding Model:文本嵌入
  4. Vector Store:向量存储
  5. Retriever:检索器
  6. Chain:处理链
from langchain.document_loaders import PyPDFLoader, DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.chains import RetrievalQA
from langchain.llms import OpenAI

# 1. 加载文档
loader = DirectoryLoader(
    '/data/documents',
    glob='**/*.pdf',
    loader_cls=PyPDFLoader
)
documents = loader.load()
print(f"加载了 {len(documents)} 个文档片段")

# 2. 文本分块
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=50,
    separators=["\n\n", "\n", "。", "!", "?", " ", ""]
)
chunks = text_splitter.split_documents(documents)
print(f"分块后得到 {len(chunks)} 个文本块")

# 3. 创建嵌入模型
embeddings = HuggingFaceEmbeddings(
    model_name='sentence-transformers/all-MiniLM-L6-v2'
)

# 4. 构建向量数据库
vectorstore = FAISS.from_documents(chunks, embeddings)
print("向量数据库构建完成")

# 5. 创建检索器
retriever = vectorstore.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 5}
)

# 6. 构建RAG链
qa_chain = RetrievalQA.from_chain_type(
    llm=OpenAI(temperature=0),
    chain_type="stuff",  # 简单拼接方式
    retriever=retriever,
    return_source_documents=True,
    verbose=True
)

# 7. 执行查询
query = "什么是RAG技术?"
result = qa_chain({"query": query})

print(f"\n问题: {query}")
print(f"\n回答: {result['result']}")
print(f"\n参考来源:")
for i, doc in enumerate(result['source_documents'], 1):
    print(f"[{i}] {doc.metadata.get('source', '未知')} - {doc.page_content[:100]}...")

高级Chain配置

from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

# 自定义提示模板
custom_prompt = PromptTemplate(
    template="""你是一个专业的AI助手。请基于以下参考信息回答问题。
如果参考信息不足以回答问题,请明确说明。

参考信息:
{context}

问题:{question}

请按照以下格式回答:
1. 直接回答
2. 详细解释(如有必要)
3. 相关引用

回答:""",
    input_variables=["context", "question"]
)

# 使用自定义提示构建Chain
qa_chain = RetrievalQA.from_chain_type(
    llm=OpenAI(temperature=0),
    chain_type="stuff",
    retriever=retriever,
    chain_type_kwargs={"prompt": custom_prompt},
    return_source_documents=True
)

11.5.2 LlamaIndex架构设计:索引、查询引擎

LlamaIndex是另一个强大的RAG框架,提供了更灵活的索引和查询抽象。

核心概念

  1. Document:文档对象
  2. Node:文档的细粒度单元
  3. Index:索引结构
  4. Query Engine:查询引擎
  5. Response Synthesizer:回答合成器
from llama_index import (
    SimpleDirectoryReader,
    VectorStoreIndex,
    ServiceContext,
    get_response_synthesizer
)
from llama_index.llms import OpenAI
from llama_index.embeddings import HuggingFaceEmbedding

# 1. 加载文档
documents = SimpleDirectoryReader('/data/documents').load_data()

# 2. 配置服务上下文
service_context = ServiceContext.from_defaults(
    llm=OpenAI(model="gpt-3.5-turbo"),
    embed_model=HuggingFaceEmbedding(
        model_name="sentence-transformers/all-MiniLM-L6-v2"
    )
)

# 3. 构建索引
index = VectorStoreIndex.from_documents(
    documents,
    service_context=service_context
)

# 4. 创建查询引擎
query_engine = index.as_query_engine(
    response_mode="compact",  # 响应模式
    similarity_top_k=5
)

# 5. 查询
response = query_engine.query("什么是RAG技术?")
print(response)

# 查看参考节点
for node in response.source_nodes:
    print(f"来源: {node.node.metadata}")
    print(f"内容: {node.node.text[:100]}...")
    print(f"分数: {node.score}")

高级索引类型

from llama_index import ListIndex, TreeIndex, KeywordTableIndex

# 列表索引 - 简单顺序存储
list_index = ListIndex.from_documents(documents)

# 树形索引 - 层次化组织
tree_index = TreeIndex.from_documents(documents)

# 关键词表索引 - 基于关键词检索
keyword_index = KeywordTableIndex.from_documents(documents)

# 组合索引
from llama_index import ComposableGraph

graph = ComposableGraph.from_indices(
    VectorStoreIndex,
    [index1, index2, index3],
    index_summaries=["摘要1", "摘要2", "摘要3"]
)

11.5.3 端到端系统实现:完整Pipeline

下面是一个生产级的RAG系统完整实现:

"""
生产级RAG系统实现
"""
import os
from typing import List, Dict, Optional
from dataclasses import dataclass
import numpy as np

from langchain.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
from sentence_transformers import CrossEncoder

@dataclass
class RAGConfig:
    """RAG系统配置"""
    # 文档处理
    chunk_size: int = 512
    chunk_overlap: int = 50
    
    # 嵌入模型
    embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2"
    
    # 检索
    top_k: int = 10
    rerank_top_k: int = 5
    
    # 生成
    llm_model: str = "gpt-3.5-turbo"
    temperature: float = 0.0
    max_tokens: int = 1000


class RAGSystem:
    """完整的RAG系统"""
    
    def __init__(self, config: RAGConfig):
        self.config = config
        self.embeddings = None
        self.vectorstore = None
        self.retriever = None
        self.qa_chain = None
        self.cross_encoder = None
        
    def initialize(self, documents_path: str):
        """初始化RAG系统"""
        print("正在初始化RAG系统...")
        
        # 1. 加载嵌入模型
        print("加载嵌入模型...")
        self.embeddings = HuggingFaceEmbeddings(
            model_name=self.config.embedding_model
        )
        
        # 2. 加载文档
        print("加载文档...")
        documents = self._load_documents(documents_path)
        
        # 3. 分块
        print("文本分块...")
        chunks = self._split_documents(documents)
        
        # 4. 构建向量数据库
        print("构建向量数据库...")
        self.vectorstore = FAISS.from_documents(chunks, self.embeddings)
        
        # 5. 创建检索器
        base_retriever = self.vectorstore.as_retriever(
            search_kwargs={"k": self.config.top_k}
        )
        
        # 6. 添加上下文压缩
        llm = ChatOpenAI(
            model=self.config.llm_model,
            temperature=self.config.temperature
        )
        compressor = LLMChainExtractor.from_llm(llm)
        self.retriever = ContextualCompressionRetriever(
            base_compressor=compressor,
            base_retriever=base_retriever
        )
        
        # 7. 加载重排序模型
        print("加载重排序模型...")
        self.cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
        
        # 8. 构建QA链
        print("构建QA链...")
        self.qa_chain = self._build_qa_chain()
        
        print("RAG系统初始化完成!")
    
    def _load_documents(self, path: str):
        """加载文档"""
        loader = DirectoryLoader(
            path,
            glob='**/*.{pdf,txt,md,docx}',
            show_progress=True
        )
        return loader.load()
    
    def _split_documents(self, documents):
        """分块处理"""
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.config.chunk_size,
            chunk_overlap=self.config.chunk_overlap,
            separators=["\n\n", "\n", "。", "!", "?", " ", ""],
            length_function=len
        )
        return text_splitter.split_documents(documents)
    
    def _build_qa_chain(self):
        """构建QA链"""
        prompt_template = """基于以下参考信息回答问题。如果参考信息不足,请明确说明。

参考信息:
{context}

问题:{question}

请给出准确、简洁的回答,并在回答后列出参考来源。

回答:"""
        
        prompt = PromptTemplate(
            template=prompt_template,
            input_variables=["context", "question"]
        )
        
        llm = ChatOpenAI(
            model=self.config.llm_model,
            temperature=self.config.temperature,
            max_tokens=self.config.max_tokens
        )
        
        return RetrievalQA.from_chain_type(
            llm=llm,
            chain_type="stuff",
            retriever=self.retriever,
            return_source_documents=True,
            chain_type_kwargs={"prompt": prompt}
        )
    
    def rerank(self, query: str, documents: List) -> List:
        """重排序"""
        if not documents or len(documents) <= self.config.rerank_top_k:
            return documents
        
        # 构建查询-文档对
        pairs = [[query, doc.page_content] for doc in documents]
        
        # 计算分数
        scores = self.cross_encoder.predict(pairs)
        
        # 排序
        scored_docs = list(zip(documents, scores))
        scored_docs.sort(key=lambda x: x[1], reverse=True)
        
        return [doc for doc, _ in scored_docs[:self.config.rerank_top_k]]
    
    def query(self, question: str) -> Dict:
        """执行查询"""
        # 执行检索
        docs = self.retriever.get_relevant_documents(question)
        
        # 重排序
        reranked_docs = self.rerank(question, docs)
        
        # 临时替换retriever返回重排序结果
        original_get_relevant = self.retriever.get_relevant_documents
        self.retriever.get_relevant_documents = lambda _: reranked_docs
        
        # 执行生成
        result = self.qa_chain({"query": question})
        
        # 恢复原始retriever
        self.retriever.get_relevant_documents = original_get_relevant
        
        return {
            'question': question,
            'answer': result['result'],
            'sources': [
                {
                    'content': doc.page_content[:200],
                    'metadata': doc.metadata
                }
                for doc in result['source_documents']
            ]
        }
    
    def save(self, path: str):
        """保存向量数据库"""
        self.vectorstore.save_local(path)
        print(f"向量数据库已保存到: {path}")
    
    def load(self, path: str):
        """加载向量数据库"""
        self.embeddings = HuggingFaceEmbeddings(
            model_name=self.config.embedding_model
        )
        self.vectorstore = FAISS.load_local(path, self.embeddings)
        self.retriever = self.vectorstore.as_retriever(
            search_kwargs={"k": self.config.top_k}
        )
        self.qa_chain = self._build_qa_chain()
        print(f"向量数据库已从 {path} 加载")


# 使用示例
if __name__ == "__main__":
    # 配置
    config = RAGConfig(
        chunk_size=512,
        top_k=10,
        rerank_top_k=5
    )
    
    # 初始化系统
    rag = RAGSystem(config)
    rag.initialize("/data/knowledge_base")
    
    # 保存索引
    rag.save("/data/vector_index")
    
    # 查询
    while True:
        question = input("\n请输入问题(输入'quit'退出):")
        if question.lower() == 'quit':
            break
        
        result = rag.query(question)
        print(f"\n回答:{result['answer']}")
        print("\n参考来源:")
        for i, source in enumerate(result['sources'], 1):
            print(f"[{i}] {source['metadata'].get('source', '未知')}")

11.5.4 评估与优化方法:RAGAS、自定义指标

RAG系统的评估是持续优化的基础。RAGAS(Retrieval-Augmented Generation Assessment)是一个专门用于评估RAG系统的框架。

RAGAS评估指标

  1. Faithfulness(忠实度):生成的回答是否忠实于检索到的文档
  2. Answer Relevance(回答相关性):回答是否与问题相关
  3. Context Precision(上下文精确度):检索到的文档中相关文档的比例
  4. Context Recall(上下文召回率):相关文档被检索到的比例
from ragas import evaluate
from ragas.metrics import (
    faithfulness,
    answer_relevancy,
    context_precision,
    context_recall
)
from datasets import Dataset

# 准备评估数据
eval_data = {
    'question': ['什么是RAG?', 'RAG有哪些优势?'],
    'answer': ['RAG是检索增强生成技术...', 'RAG的优势包括...'],
    'contexts': [['RAG技术介绍...'], ['RAG优势分析...']],
    'ground_truth': ['检索增强生成技术', '知识更新、领域适应、可解释性']
}

dataset = Dataset.from_dict(eval_data)

# 执行评估
result = evaluate(
    dataset=dataset,
    metrics=[
        faithfulness,
        answer_relevancy,
        context_precision,
        context_recall
    ]
)

print(result)

自定义评估指标

from typing import List
from sentence_transformers import SentenceTransformer, util

class RAGEvaluator:
    """RAG系统评估器"""
    
    def __init__(self):
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
    
    def evaluate_retrieval(self, 
                          query: str, 
                          retrieved_docs: List[str], 
                          relevant_docs: List[str]) -> Dict:
        """评估检索质量"""
        # 计算Recall@K
        retrieved_set = set(retrieved_docs)
        relevant_set = set(relevant_docs)
        recall = len(retrieved_set & relevant_set) / len(relevant_set)
        
        # 计算Precision@K
        precision = len(retrieved_set & relevant_set) / len(retrieved_set) if retrieved_set else 0
        
        # 计算MRR
        mrr = 0
        for i, doc in enumerate(retrieved_docs):
            if doc in relevant_docs:
                mrr = 1 / (i + 1)
                break
        
        return {
            'recall@k': recall,
            'precision@k': precision,
            'mrr': mrr,
            'f1': 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
        }
    
    def evaluate_answer(self, 
                       question: str, 
                       answer: str, 
                       ground_truth: str) -> Dict:
        """评估回答质量"""
        # 语义相似度
        answer_emb = self.model.encode(answer, convert_to_tensor=True)
        truth_emb = self.model.encode(ground_truth, convert_to_tensor=True)
        similarity = util.pytorch_cos_sim(answer_emb, truth_emb).item()
        
        return {
            'semantic_similarity': similarity
        }
    
    def evaluate_end_to_end(self, 
                           queries: List[str],
                           answers: List[str],
                           ground_truths: List[str],
                           retrieved_docs_list: List[List[str]],
                           relevant_docs_list: List[List[str]]) -> Dict:
        """端到端评估"""
        retrieval_scores = []
        answer_scores = []
        
        for query, answer, truth, retrieved, relevant in zip(
            queries, answers, ground_truths, retrieved_docs_list, relevant_docs_list
        ):
            retrieval_scores.append(
                self.evaluate_retrieval(query, retrieved, relevant)
            )
            answer_scores.append(
                self.evaluate_answer(query, answer, truth)
            )
        
        # 汇总指标
        avg_recall = sum(s['recall@k'] for s in retrieval_scores) / len(retrieval_scores)
        avg_precision = sum(s['precision@k'] for s in retrieval_scores) / len(retrieval_scores)
        avg_mrr = sum(s['mrr'] for s in retrieval_scores) / len(retrieval_scores)
        avg_similarity = sum(s['semantic_similarity'] for s in answer_scores) / len(answer_scores)
        
        return {
            'retrieval': {
                'avg_recall@k': avg_recall,
                'avg_precision@k': avg_precision,
                'avg_mrr': avg_mrr
            },
            'generation': {
                'avg_semantic_similarity': avg_similarity
            }
        }

# 使用示例
evaluator = RAGEvaluator()

result = evaluator.evaluate_end_to_end(
    queries=['什么是RAG?'],
    answers=['RAG是检索增强生成技术'],
    ground_truths=['检索增强生成是一种结合检索和生成的技术'],
    retrieved_docs_list=[['RAG技术介绍', '其他文档']],
    relevant_docs_list=[['RAG技术介绍']]
)

print(result)

系统优化策略

  1. 文档处理优化:调整分块大小、尝试不同分块策略、优化元数据提取
  2. 检索优化:尝试不同嵌入模型、调整索引参数、使用混合检索
  3. 生成优化:优化提示模板、调整温度参数、使用更强大的模型
  4. Pipeline优化:添加重排序、使用上下文压缩、实现缓存机制
# 批量评估和优化示例
def optimize_chunk_size(documents, queries, ground_truths, chunk_sizes=[256, 512, 1024]):
    """优化分块大小"""
    results = []
    
    for chunk_size in chunk_sizes:
        print(f"\n测试分块大小: {chunk_size}")
        
        # 构建RAG系统
        config = RAGConfig(chunk_size=chunk_size)
        rag = RAGSystem(config)
        
        # 使用内存中的文档
        rag.embeddings = HuggingFaceEmbeddings(model_name=config.embedding_model)
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=50
        )
        chunks = text_splitter.split_documents(documents)
        rag.vectorstore = FAISS.from_documents(chunks, rag.embeddings)
        rag.retriever = rag.vectorstore.as_retriever(search_kwargs={"k": 5})
        rag.qa_chain = rag._build_qa_chain()
        
        # 评估
        answers = []
        retrieved_list = []
        
        for query in queries:
            result = rag.query(query)
            answers.append(result['answer'])
            retrieved_list.append([s['content'] for s in result['sources']])
        
        # 计算指标
        evaluator = RAGEvaluator()
        metrics = evaluator.evaluate_end_to_end(
            queries, answers, ground_truths, retrieved_list, 
            [[gt] for gt in ground_truths]
        )
        
        results.append({
            'chunk_size': chunk_size,
            'metrics': metrics
        })
        
        print(f"召回率: {metrics['retrieval']['avg_recall@k']:.3f}")
        print(f"语义相似度: {metrics['generation']['avg_semantic_similarity']:.3f}")
    
    # 选择最佳配置
    best = max(results, key=lambda x: x['metrics']['retrieval']['avg_recall@k'])
    print(f"\n最佳分块大小: {best['chunk_size']}")
    
    return results

RAG技术正在快速发展,新的优化方法层出不穷。建议读者持续关注最新的研究进展,并根据实际业务需求选择合适的技术方案。通过系统化的评估和持续的迭代优化,可以构建出高质量的RAG应用,为企业创造实际价值。

时间分配建议

  • 3h 文档处理:加载多种格式、分块策略对比
  • 3h 向量检索:Embedding 模型选择、向量数据库操作、索引优化
  • 2h 检索增强:混合检索、重排序、上下文压缩
  • 2h 系统构建:LangChain/LlamaIndex 端到端 RAG Pipeline
  • 1h 评估优化:RAGAS 评估、分块大小调优

里程碑(第 11-12 周联合)

构建个人知识库问答原型:选择一批个人文档(笔记、文章、技术文档), 搭建完整的 RAG 管道(文档加载 → 分块 → Embedding → 向量检索 → 重排序 → LLM 生成), 实现"基于个人文档的问答"功能,并使用 RAGAS 框架评估系统质量。

学完你应该能...

  • 根据文档类型选择合适的切分策略(固定长度/递归/语义/结构切分)
  • 选择合适的 Embedding 模型和向量数据库
  • 实现混合检索(BM25 + 向量)+ Cross-Encoder 重排序
  • 使用 LangChain 或 LlamaIndex 构建完整的 RAG Pipeline
  • 使用 RAGAS 框架评估和优化 RAG 系统质量
  • 处理多模态文档(表格提取、图像描述)
已经是第一周返回路线总览 →