ailearn

向量数据库详解 - 从原理到实战

全面掌握向量数据库技术,构建高效的语义检索系统

访问-- -- --
  • content :{:toc}

前置知识:需要先掌握 大模型API应用

本文重点:向量数据库原理与主流方案对比


一、向量数据库概述

1.1 为什么需要向量数据库

传统数据库局限:
├── 关键词匹配:语义鸿沟问题
├── 结构化查询:无法处理非结构化数据
├── 精确匹配:不支持相似性检索
└── 扩展性差:大规模数据性能下降
向量数据库优势:
├── 语义检索:理解查询含义
├── 多模态:文本、图像、音频统一表示
├── 高效相似性搜索:ANN算法加速
└── 可扩展:支持十亿级向量
应用场景:
- RAG知识库检索
- 推荐系统
- 以图搜图
- 语义搜索
- 去重与聚类

1.2 核心概念

"""
向量数据库核心概念:
1. 向量 (Vector/Embedding)
   - 高维数值表示
   - 通常是128-1536维
   - 相似向量距离近
2. 距离度量
   - 欧氏距离 (L2): √Σ(xi-yi)²
   - 余弦相似度: cos(θ) = A·B / (|A||B|)
   - 内积 (IP): A·B
3. ANN (近似最近邻)
   - HNSW: 分层可导航小世界图
   - IVF: 倒排索引
   - LSH: 局部敏感哈希
4. 索引类型
   - FLAT: 暴力搜索,精度高
   - IVF_FLAT: 聚类+搜索
   - IVF_PQ: 乘积量化压缩
   - HNSW: 图索引
"""

二、主流向量数据库

2.1 方案对比

向量数据库对比:
┌──────────────┬─────────┬─────────┬─────────┬─────────┐
│     特性      │ Pinecone │  Milvus  │ Chroma  │  FAISS  │
├──────────────┼─────────┼─────────┼─────────┼─────────┤
│ 类型         │ 托管服务 │ 开源/云  │ 轻量级  │   库    │
│ 部署复杂度   │   低    │   中     │   低    │   低    │
│ 性能         │   高    │   高     │   中    │   高    │
│ 扩展性       │   高    │   高     │   低    │   中    │
│ 成本         │   高    │   中     │   免费  │  免费   │
│ 适用场景     │ 生产环境 │ 企业级   │ 开发/小 │ 本地/研究│
└──────────────┴─────────┴─────────┴─────────┴─────────┘
选择建议:
- 快速原型:Chroma
- 生产环境:Pinecone / Milvus
- 本地/研究:FAISS
- 企业私有化:Milvus / Qdrant

2.2 FAISS

"""
FAISS (Facebook AI Similarity Search)
Meta开源的高效向量检索库
"""
# 安装
# pip install faiss-cpu  # CPU版本
# pip install faiss-gpu  # GPU版本
import numpy as np
import faiss
# 准备数据
d = 128  # 向量维度
nb = 10000  # 数据库大小
nq = 100  # 查询数量
# 生成随机向量
np.random.seed(1234)
xb = np.random.random((nb, d)).astype('float32')
xq = np.random.random((nq, d)).astype('float32')
# 1. 暴力搜索 (IndexFlatL2)
index_flat = faiss.IndexFlatL2(d)
index_flat.add(xb)
k = 5  # 返回最近k个
D, I = index_flat.search(xq, k)
print(f"暴力搜索结果: Top-{k} 索引")
print(I[:5])
# 2. IVF索引 (更快的近似搜索)
nlist = 100  # 聚类中心数
quantizer = faiss.IndexFlatL2(d)
index_ivf = faiss.IndexIVFFlat(quantizer, d, nlist)
# 训练索引
index_ivf.train(xb)
index_ivf.add(xb)
# 搜索
index_ivf.nprobe = 10  # 搜索的聚类数
D, I = index_ivf.search(xq, k)
print(f"IVF搜索结果: Top-{k} 索引")
# 3. HNSW索引 (高性能图索引)
M = 32  # 每个节点的连接数
index_hnsw = faiss.IndexHNSWFlat(d, M)
index_hnsw.add(xb)
D, I = index_hnsw.search(xq, k)
print(f"HNSW搜索结果: Top-{k} 索引")
# 4. 带乘积量化的索引 (压缩存储)
m = 8  # 子向量数
bits = 8  # 每个子向量的位数
quantizer_pq = faiss.IndexFlatL2(d)
index_ivfpq = faiss.IndexIVFPQ(quantizer_pq, d, nlist, m, bits)
index_ivfpq.train(xb)
index_ivfpq.add(xb)
D, I = index_ivfpq.search(xq, k)
print(f"IVF+PQ搜索结果: Top-{k} 索引")
# 保存和加载索引
faiss.write_index(index_ivf, "index_ivf.faiss")
loaded_index = faiss.read_index("index_ivf.faiss")

2.3 Chroma

"""
Chroma: 轻量级开源向量数据库
适合快速开发和原型验证
"""
# 安装
# pip install chromadb
import chromadb
from chromadb.config import Settings
# 创建客户端
client = chromadb.Client()  # 内存模式
# 或持久化模式
# client = chromadb.PersistentClient(path="./chroma_db")
# 创建集合
collection = client.create_collection(
    name="documents",
    metadata={"description": "My document collection"}
)
# 添加文档
documents = [
    "Python是一种流行的编程语言",
    "机器学习是人工智能的子领域",
    "向量数据库用于语义检索",
    "深度学习使用神经网络",
    "自然语言处理处理文本数据"
]
collection.add(
    documents=documents,
    metadatas=[{"source": f"doc_{i}"} for i in range(len(documents))],
    ids=[f"id_{i}" for i in range(len(documents))]
)
# 查询
results = collection.query(
    query_texts=["什么是AI技术"],
    n_results=3
)
print("查询结果:")
for doc, metadata in zip(results['documents'][0], results['metadatas'][0]):
    print(f"- {doc} (来源: {metadata['source']})")
# 使用自定义Embedding
from chromadb.utils import embedding_functions
# OpenAI Embedding
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
    api_key="your-api-key",
    model_name="text-embedding-ada-002"
)
# HuggingFace Embedding
hf_ef = embedding_functions.SentenceTransformerEmbeddingFunction(
    model_name="all-MiniLM-L6-v2"
)
# 创建使用自定义Embedding的集合
collection_custom = client.create_collection(
    name="custom_embeddings",
    embedding_function=hf_ef
)

2.4 Pinecone

"""
Pinecone: 托管式向量数据库
生产环境首选
"""
# 安装
# pip install pinecone
import pinecone
# 初始化
pinecone.init(
    api_key="your-api-key",
    environment="us-west1-gcp"
)
# 创建索引
pinecone.create_index(
    name="my-index",
    dimension=1536,  # 向量维度
    metric="cosine",  # 距离度量: cosine, euclidean, dotproduct
    pod_type="p1.x1"  # 实例类型
)
# 连接索引
index = pinecone.Index("my-index")
# 插入向量
vectors = [
    ("id1", [0.1, 0.2, ...], {"text": "hello", "category": "greeting"}),
    ("id2", [0.3, 0.4, ...], {"text": "world", "category": "noun"}),
]
index.upsert(vectors=vectors)
# 查询
query_vector = [0.15, 0.25, ...]
results = index.query(
    vector=query_vector,
    top_k=10,
    include_metadata=True,
    filter={"category": {"$eq": "greeting"}}  # 元数据过滤
)
print(f"查询结果: {results}")
# 批量操作
# 批量插入
def batch_upsert(index, vectors, batch_size=100):
    for i in range(0, len(vectors), batch_size):
        batch = vectors[i:i+batch_size]
        index.upsert(vectors=batch)
# 删除
index.delete(ids=["id1", "id2"])
# 删除索引
pinecone.delete_index("my-index")

2.5 Milvus

"""
Milvus: 开源企业级向量数据库
支持分布式部署
"""
# 安装
# pip install pymilvus
from pymilvus import (
    connections, 
    Collection, 
    FieldSchema, 
    CollectionSchema, 
    DataType,
    utility
)
# 连接
connections.connect("default", host="localhost", port="19530")
# 定义Schema
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=512)
]
schema = CollectionSchema(fields, "Document collection")
# 创建集合
collection = Collection("documents", schema)
# 创建索引
index_params = {
    "metric_type": "L2",
    "index_type": "IVF_FLAT",
    "params": {"nlist": 128}
}
collection.create_index("embedding", index_params)
# 插入数据
entities = [
    [[0.1, 0.2, ...]],  # embeddings
    ["sample text"]      # text
]
collection.insert(entities)
# 加载集合到内存
collection.load()
# 查询
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
results = collection.search(
    data=[[0.1, 0.2, ...]],  # 查询向量
    anns_field="embedding",
    param=search_params,
    limit=10,
    expr='text like "%python%"'  # 过滤条件
)
# 释放资源
collection.release()
connections.disconnect("default")

三、性能优化

3.1 索引选择

"""
索引选择指南:
数据规模 < 10万:
├── IndexFlatL2 / IndexFlatIP (暴力搜索)
└── 精度最高,速度可接受
数据规模 10万 - 100万:
├── IndexIVFFlat
├── nlist = sqrt(N)
└── 精度和速度平衡
数据规模 100万 - 1000万:
├── IndexIVFPQ
├── 或 HNSW
└── 压缩存储,快速检索
数据规模 > 1000万:
├── HNSW + 量化
├── 或分布式Milvus
└── 可扩展性优先
内存受限:
├── IndexIVFPQ
├── 或 DiskANN
└── 牺牲精度换空间
延迟要求高:
├── HNSW
├── 或 IndexIVFFlat + 大nprobe
└── 以空间换时间
"""
def create_optimal_index(vectors, memory_limit_gb=4):
    """根据数据规模选择最优索引"""
    n, d = vectors.shape
    
    # 计算内存需求
    flat_memory = n * d * 4 / 1e9  # GB
    
    if n < 100000:
        # 小规模:暴力搜索
        return faiss.IndexFlatL2(d)
    
    elif n < 1000000:
        # 中等规模:IVF
        nlist = int(np.sqrt(n))
        quantizer = faiss.IndexFlatL2(d)
        return faiss.IndexIVFFlat(quantizer, d, nlist)
    
    elif flat_memory > memory_limit_gb:
        # 内存受限:PQ压缩
        nlist = int(np.sqrt(n))
        m = d // 8
        quantizer = faiss.IndexFlatL2(d)
        return faiss.IndexIVFPQ(quantizer, d, nlist, m, 8)
    
    else:
        # 大规模:HNSW
        M = 32
        return faiss.IndexHNSWFlat(d, M)

3.2 查询优化

# 批量查询
def batch_search(index, queries, batch_size=100, k=10):
    """批量查询优化"""
    results = []
    for i in range(0, len(queries), batch_size):
        batch = queries[i:i+batch_size]
        D, I = index.search(batch, k)
        results.append((D, I))
    return results
# 并行查询
import concurrent.futures
def parallel_search(index, queries, k=10, workers=4):
    """并行查询"""
    def search_single(query):
        D, I = index.search(query.reshape(1, -1), k)
        return D[0], I[0]
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
        results = list(executor.map(search_single, queries))
    
    return results
# 混合检索(向量+关键词)
def hybrid_search(vector_index, keyword_index, query_vector, query_text, 
                  alpha=0.5, k=10):
    """
    混合检索:结合向量检索和关键词检索
    
    alpha: 向量检索权重 (1-alpha): 关键词检索权重
    """
    # 向量检索
    vector_results = vector_index.search(query_vector, k*2)
    
    # 关键词检索
    keyword_results = keyword_index.search(query_text, k*2)
    
    # 融合排序
    scores = {}
    for i, doc_id in enumerate(vector_results['ids']):
        scores[doc_id] = scores.get(doc_id, 0) + alpha * (k*2 - i)
    
    for i, doc_id in enumerate(keyword_results['ids']):
        scores[doc_id] = scores.get(doc_id, 0) + (1-alpha) * (k*2 - i)
    
    # 排序并返回Top-k
    sorted_results = sorted(scores.items(), key=lambda x: x[1], reverse=True)
    return sorted_results[:k]

四、实战案例

4.1 构建语义搜索引擎

from sentence_transformers import SentenceTransformer
import chromadb
class SemanticSearchEngine:
    """语义搜索引擎"""
    
    def __init__(self, model_name="all-MiniLM-L6-v2", persist_dir="./search_db"):
        self.encoder = SentenceTransformer(model_name)
        self.client = chromadb.PersistentClient(path=persist_dir)
        self.collection = self.client.get_or_create_collection("documents")
    
    def index_documents(self, documents, batch_size=100):
        """索引文档"""
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i+batch_size]
            embeddings = self.encoder.encode(batch)
            
            self.collection.add(
                documents=batch,
                embeddings=embeddings.tolist(),
                ids=[f"doc_{i+j}" for j in range(len(batch))]
            )
    
    def search(self, query, top_k=5):
        """语义搜索"""
        query_embedding = self.encoder.encode([query])
        
        results = self.collection.query(
            query_embeddings=query_embedding.tolist(),
            n_results=top_k
        )
        
        return [
            {"text": doc, "distance": dist}
            for doc, dist in zip(results['documents'][0], results['distances'][0])
        ]
    
    def search_with_filters(self, query, filters=None, top_k=5):
        """带过滤条件的搜索"""
        query_embedding = self.encoder.encode([query])
        
        results = self.collection.query(
            query_embeddings=query_embedding.tolist(),
            n_results=top_k,
            where=filters
        )
        
        return results
# 使用示例
engine = SemanticSearchEngine()
# engine.index_documents(documents)
# results = engine.search("机器学习算法")

4.2 RAG知识库

class RAGKnowledgeBase:
    """RAG知识库"""
    
    def __init__(self, embedding_model="text-embedding-ada-002"):
        self.client = chromadb.PersistentClient("./rag_db")
        self.collection = self.client.get_or_create_collection("knowledge")
        
        from langchain.embeddings.openai import OpenAIEmbeddings
        self.embeddings = OpenAIEmbeddings(model=embedding_model)
    
    def add_texts(self, texts, metadatas=None):
        """添加文本"""
        embeddings = self.embeddings.embed_documents(texts)
        
        self.collection.add(
            documents=texts,
            embeddings=embeddings,
            metadatas=metadatas or [{} for _ in texts],
            ids=[f"doc_{i}" for i in range(len(texts))]
        )
    
    def add_pdf(self, pdf_path, chunk_size=500, chunk_overlap=50):
        """添加PDF文档"""
        from langchain.document_loaders import PyPDFLoader
        from langchain.text_splitter import RecursiveCharacterTextSplitter
        
        loader = PyPDFLoader(pdf_path)
        documents = loader.load()
        
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap
        )
        chunks = splitter.split_documents(documents)
        
        texts = [chunk.page_content for chunk in chunks]
        metadatas = [chunk.metadata for chunk in chunks]
        
        self.add_texts(texts, metadatas)
    
    def retrieve(self, query, top_k=5):
        """检索相关文档"""
        query_embedding = self.embeddings.embed_query(query)
        
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=top_k
        )
        
        return results

参考资源


上一篇大模型API应用 返回大模型应用 最后更新: 2026年4月16日

访问 --

讨论与反馈