Skip to content

Latest commit

 

History

History
484 lines (355 loc) · 26.6 KB

5.召回:检索、粗排、多路召回.md

File metadata and controls

484 lines (355 loc) · 26.6 KB

5.召回:检索、粗排、多路召回

RAG在整个大模型技术栈里的重要性毋庸置疑,而在RAG中,除了大模型之外,另一个不可或缺的部分,就是搜索系统,大模型的正确、稳定、可控生成,离不开精准可靠的搜索系统,大量的实验中都有发现,在搜索系统足够准确的前提下,大模型的犯错情况会骤然下降,因此,更全面、系统地了解搜索系统将很重要。

本期的内容是检索和多路召回。这3个内容都是围绕着搜索引擎的工作,所以把他们放在一起来说。

1.索引

搜索的核心工作是从海量的内容中找到最合适、相关的内容,即有一个“大海捞针”的工作,在“大海”的情况下,逐个匹配的地毯式搜索方案显然并不是,而索引的核心价值,就是加速这个搜索,用尽可能低的复杂度,尤其是时间复杂度,来完成这项工作,甚至可以一定程度牺牲空间复杂度,提到复杂度,后续大家也可以看到有提到大量数据结构方面的知识,要理解这块需要很结实的数据结构基础。

所谓的索引,是指一种存储结构,该存储结构能让检索变得更加快速方便。最早的搜索一般就要从“倒排索引”开始讲起,这是搜索最最基本的技术。此处我也从倒排开始说起。

1.1 倒排索引的相关概念

这块内容在(写了个向量检索的baseline)有详细解释,但是因为比较靠后,这里搬过来。

首先先给大家解释倒排,抛开向量检索,先说字面检索,首先了解为什么搜“倒排”,能够出很多有关倒排索引的文章,是因为底层有一套kv结构,和这个就叫做倒排,key是切好的词汇,value是包含这个词汇的所有文档的title,即:

{
 "倒排":["搜索引擎概述之倒排索引 - 知乎","倒排索引简介","什么是倒排","倒排索引 | Elasticsearch: 权威指南 | Elastic", ...],
 "搜索":["搜狗搜索","搜索(汉语词语) - 百度百科", ....],
 "索引":["搜索引擎概述之倒排索引 - 知乎","倒排索引简介","倒排索引 | Elasticsearch: 权威指南 | Elastic", "索引 - 百度百科"...]
 ...
}

只需要找到检索词,把所有value都给你弄出来,这就叫做查询到了,然而随着库的变大,肯定不能把输入的每个字和库里面的做逐一匹配:

query = "倒排"
result = []
for index_key in database:
    if index_key == query:
        result.extend(database[index_key])

时间复杂度肯定就有问题(O(n)),不要小看这个线性复杂度,当库里面有千万甚至更多的内容时,线性复杂度也远远不够,就要用特定的数据结构来降低检索的时间复杂度,甚至不惜牺牲空间复杂度,对字面的,会考虑trie树等结构,可以把对数据条目数的复杂度降低到常数级,这些结构,我把他叫做索引

至于正排,则是存的对应内容的详情的,例如这个:

[{
    "title":"搜索引擎概述之倒排索引 - 知乎",
    "docs":"xxxxxxxxxx",
    "insert_time":"2023081315550000"
},{
    "title":"倒排索引 - 百度文库",
    "docs":"xxxxxxxxxx",
    "insert_time":"2023081316550000"
}]

搜的时候,可能是针对title搜的,然而,没必要也不可以把别的和查询无关的信息也存到索引中,因此,构造了一个额外的数据结构,这样:

{"id1":{
    "title":"搜索引擎概述之倒排索引 - 知乎",
    "docs":"xxxxxxxxxx",
    "insert_time":"2023081315550000"
},"id2":{
    "title":"倒排索引 - 百度文库",
    "docs":"xxxxxxxxxx",
    "insert_time":"2023081316550000"
}}

当通过倒排查到了id1后,来这个新的数据结构里面,通过id1这个钥匙就能找到这个文档的详情,并且可以展示给用户了,这个结构,就是正排。

1.2 常见索引简介

上面提到的就是常说的字面索引,即应对的是字面检索的情况,就是根据用户query内的词汇来进行检索完成,显然字面检索并不能完成我们日常所需,还需要大量的其他索引来支持,这里我举几个其他例子来让大家进一步理解索引多样性的必要性。

  • 向量索引。(向量表征和向量召回)。向量检索应该是大家比较熟悉的方案了,它具有非常强大的语义泛化能力,能让意思比较接近的句子都能被尽快搜索到,这项技术能大大降低我们配置同义词、配置说法的压力,底层常见的方案是hnsw(说起来原理还挺有技术含量的),另外经典的,在《统计学习方法》里,讲KNN的那章,有提到kdtree,当然也有集成的比较好的FAISS方案,有兴趣可以自己了解一下。
  • 数字索引。细想这么一个query怎么查询——“语文考90分左右的同学”,逐个匹配肯定是很方便的,对海量的数据肯定不合适,向量索引很可能召回的是60分、99分之类可能字面有些接近但是数字范围不太接近的结果,于是便需要数字索引,越接近90分相似度要越高的那种,比较常见的就是BTree系列,这在很多常见的搜索工具中肯定是有集成的。
  • 地理哈希索引。细想这么一个query怎么查询——“故宫附近的美食”,这里依靠的就是地理位置了,常见的方案是GeoHash,通过经纬度可以把位置哈希化,哈希的是根据地理的矩形空间划分的,每一位字符表示的就是特定矩形大小下所属的矩形,因此哈希的字符串越长,代表矩形越小,即表示距离附近。

从这里可以看到,面对不同的检索问题,是需要不同的索引方案的,列举这些是希望大家能打开思路,根据合适的情况进行选择。

1.3 搜索数据库支持

尽管索引类型众多,但并不需要为此造轮子,目前还是比较推荐ElasticSearch这个中间件,它具备非常完整的功能,上述提到的索引基本都支持,不支持的也可以通过安装插件的方式来解决。

当然,ElasticSearch比较重,对于数据量较少的,或者功能不需要这么多的,例如只要向量召回,那也没必要用它,Faiss是一个很不错的方案,这个就大家因地制宜吧。

2.向量检索demo

比较推荐大家看在XXX中提到的Faiss方案,在这里再展开讲一下吧。项目地址:https://github.com/ZBayes/basic_rag。

项目里和向量检索有关的模块的文件是这些:

-- src
    |-- models
    |   |-- simcse_model.py
    |   `-- vec_model.py
    |-- searcher
    |   |-- searcher.py
    |   `-- vec_searcher
    |       |-- vec_index.py
    |       `-- vec_searcher.py

models里面是向量召回模型,searcher是检索有关的内容。

2.1 模型

首先是simcse_model.py,引用我带了链接,用的是一位大佬的模型,方便进行向量化。

import torch
import torch.nn as nn
from loguru import logger
from tqdm import tqdm
from transformers import BertConfig, BertModel, BertTokenizer

class SimcseModel(nn.Module):
    # https://blog.csdn.net/qq_44193969/article/details/126981581
    def __init__(self, pretrained_bert_path, pooling="cls") -> None:
        super(SimcseModel, self).__init__()

        self.pretrained_bert_path = pretrained_bert_path
        self.config = BertConfig.from_pretrained(self.pretrained_bert_path)
        
        self.model = BertModel.from_pretrained(self.pretrained_bert_path, config=self.config)
        self.model.eval()
        
        # self.model = None
        self.pooling = pooling
    
    def forward(self, input_ids, attention_mask, token_type_ids):
        out = self.model(input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)

        if self.pooling == "cls":
            return out.last_hidden_state[:, 0]
        if self.pooling == "pooler":
            return out.pooler_output
        if self.pooling == 'last-avg':
            last = out.last_hidden_state.transpose(1, 2)
            return torch.avg_pool1d(last, kernel_size=last.shape[-1]).squeeze(-1)
        if self.pooling == 'first-last-avg':
            first = out.hidden_states[1].transpose(1, 2)
            last = out.hidden_states[-1].transpose(1, 2)
            first_avg = torch.avg_pool1d(first, kernel_size=last.shape[-1]).squeeze(-1)
            last_avg = torch.avg_pool1d(last, kernel_size=last.shape[-1]).squeeze(-1)
            avg = torch.cat((first_avg.unsqueeze(1), last_avg.unsqueeze(1)), dim=1)
            return torch.avg_pool1d(avg.transpose(1, 2), kernel_size=2).squeeze(-1)

然后是model.py,这个旨在包裹模型,并且给出模型预测的一些特定功能,例如推理向量,服务化转化,计算相似度等。

import torch
import torch.nn as nn
import torch.nn.functional as F
from loguru import logger

from transformers import BertTokenizer

from src.models.simcse_model import SimcseModel

class VectorizeModel:
    def __init__(self, ptm_model_path, device = "cpu") -> None:
        self.tokenizer = BertTokenizer.from_pretrained(ptm_model_path)
        self.model = SimcseModel(pretrained_bert_path=ptm_model_path, pooling="cls")
        self.model.eval()
        
        # self.DEVICE = torch.device('cuda' if torch.cuda.is_available() else "cpu")
        self.DEVICE = device
        self.model.to(self.DEVICE)
        
        self.pdist = nn.PairwiseDistance(2)
    
    def predict_vec(self,query):
        q_id = self.tokenizer(query, max_length = 200, truncation=True, padding="max_length", return_tensors='pt')
        with torch.no_grad():
            q_id_input_ids = q_id["input_ids"].squeeze(1).to(self.DEVICE)
            q_id_attention_mask = q_id["attention_mask"].squeeze(1).to(self.DEVICE)
            q_id_token_type_ids = q_id["token_type_ids"].squeeze(1).to(self.DEVICE)
            q_id_pred = self.model(q_id_input_ids, q_id_attention_mask, q_id_token_type_ids)

        return q_id_pred

    def predict_vec_request(self, query):
        q_id_pred = self.predict_vec(query)
        return q_id_pred.cpu().numpy().tolist()
    
    def predict_sim(self, q1, q2):
        q1_v = self.predict_vec(q1)
        q2_v = self.predict_vec(q2)
        sim = F.cosine_similarity(q1_v[0], q2_v[0], dim=-1)
        return sim.numpy().tolist()

if __name__ == "__main__":
    import time,random
    from tqdm import tqdm
    vec_model = VectorizeModel('C:/work/tool/huggingface/models/simcse-chinese-roberta-wwm-ext')
    print(vec_model.predict_vec("什么人不能吃花生"))

2.2 检索模块

最内部的是索引,索引外事检索器,首先是索引,这里我把他组件化了。内部支持构建、数据加入、加载、保存、检索等。

import faiss
from loguru import logger
from src.models.vec_model import VectorizeModel

class VecIndex:
    def __init__(self) -> None:
        self.index = ""
    
    def build(self, index_dim):
        description = "HNSW64"
        measure = faiss.METRIC_L2
        self.index = faiss.index_factory(index_dim, description, measure)
    
    def insert(self, vec):
        self.index.add(vec)
    
    def batch_insert(self, vecs):
        self.index.add(vecs)
    
    def load(self, read_path):
        # read_path: XXX.index
        self.index = faiss.read_index(read_path)

    def save(self, save_path):
        # save_path: XXX.index
        faiss.write_index(self.index, save_path)
    
    def search(self, vec, num):
        # id, distance
        return self.index.search(vec, num)

外部包一层搜索器,内部可以构造多种索引,根据自己需要调用即可,因为目前只有一个索引,所以从调用函数来看基本是又包了VecIndex一层。

import os, json
from loguru import logger
from src.searcher.vec_searcher.vec_index import VecIndex

class VecSearcher:
    def __init__(self):
        self.invert_index = VecIndex() # 检索倒排,使用的是索引是VecIndex
        self.forward_index = [] # 检索正排,实质上只是个list,通过ID获取对应的内容
        self.INDEX_FOLDER_PATH_TEMPLATE = "data/index/{}"

    def build(self, index_dim, index_name):
        self.index_name = index_name
        self.index_folder_path = self.INDEX_FOLDER_PATH_TEMPLATE.format(index_name)
        if not os.path.exists(self.index_folder_path) or not os.path.isdir(self.index_folder_path):
            os.mkdir(self.index_folder_path)

        self.invert_index = VecIndex()
        self.invert_index.build(index_dim)

        self.forward_index = []
    
    def insert(self, vec, doc):
        self.invert_index.insert(vec)
        # self.invert_index.batch_insert(vecs)

        self.forward_index.append(doc)
    
    def save(self):
        with open(self.index_folder_path + "/forward_index.txt", "w", encoding="utf8") as f:
            for data in self.forward_index:
                f.write("{}\n".format(json.dumps(data, ensure_ascii=False)))

        self.invert_index.save(self.index_folder_path + "/invert_index.faiss")
    
    def load(self, index_name):
        self.index_name = index_name
        self.index_folder_path = self.INDEX_FOLDER_PATH_TEMPLATE.format(index_name)

        self.invert_index = VecIndex()
        self.invert_index.load(self.index_folder_path + "/invert_index.faiss")

        self.forward_index = []
        with open(self.index_folder_path + "/forward_index.txt", encoding="utf8") as f:
            for line in f:
                self.forward_index.append(json.loads(line.strip()))
    
    def search(self, vecs, nums = 5):
        search_res = self.invert_index.search(vecs, nums)
        recall_list = []
        for idx in range(nums):
            # recall_list_idx, recall_list_detail, distance
            recall_list.append([search_res[1][0][idx], self.forward_index[search_res[1][0][idx]], search_res[0][0][idx]])
        # recall_list = list(filter(lambda x: x[2] < 100, result))

        return recall_list

VecSearcher外,还可以有多个检索器,综合起来形成一个简易的搜索工具Searcher

import json,requests,copy
import numpy as np
from loguru import logger
from src.searcher.vec_searcher.vec_searcher import VecSearcher
from src.models.vec_model import VectorizeModel

class Searcher:
    def __init__(self, model_path, vec_search_path):
        self.vec_model = VectorizeModel(model_path)
        logger.info("load vec_model done")

        self.vec_searcher = VecSearcher()
        self.vec_searcher.load(vec_search_path)
        logger.info("load vec_searcher done")

    def rank(self, query, recall_result):
        rank_result = []
        for idx in range(len(recall_result)):
            new_sim = self.vec_model.predict_sim(query, recall_result[idx][1][0])
            rank_item = copy.deepcopy(recall_result[idx])
            rank_item.append(new_sim)
            rank_result.append(copy.deepcopy(rank_item))
        rank_result.sort(key=lambda x: x[3], reverse=True)
        return rank_result
    
    def search(self, query, nums=3):
        logger.info("request: {}".format(query))

        q_vec = self.vec_model.predict_vec(query).cpu().numpy()

        recall_result = self.vec_searcher.search(q_vec, nums)

        rank_result = self.rank(query, recall_result)
        # rank_result = list(filter(lambda x:x[4] > 0.8, rank_result))

        logger.info("response: {}".format(rank_result))
        return rank_result

if __name__ == "__main__":
    VEC_MODEL_PATH = "C:/work/tool/huggingface/models/simcse-chinese-roberta-wwm-ext"
    VEC_INDEX_DATA = "vec_index_test2023121201"
    searcher = Searcher(VEC_MODEL_PATH, VEC_INDEX_DATA)
    q = "什么人不能吃花生"
    print(searcher.search(q))

2.3 灌数据

离线,在进行文档处理后(参考这篇文章:聊聊搜索系统3:文档内容处理),需要把处理好的数据灌入Searcher中,参考这个脚本:

import json,torch,copy
from tqdm import tqdm
from loguru import logger
from multiprocessing import Process,Queue
from multiprocessing import set_start_method
from src.models.vec_model import VectorizeModel
from src.searcher.vec_searcher.vec_searcher import VecSearcher 


if __name__ == "__main__":
    # 0. 必要配置
    VEC_MODEL_PATH = "C:/work/tool/huggingface/models/simcse-chinese-roberta-wwm-ext"
    SOURCE_INDEX_DATA_PATH = "./data/baike_qa_train.json"
    VEC_INDEX_DATA = "vec_index_test2023121301_20w"
    DEVICE = torch.device('cuda' if torch.cuda.is_available() else "cpu")
    PROCESS_NUM = 2
    # logger.info("load model done")

    # 1. 加载数据、模型
    vec_model = VectorizeModel(VEC_MODEL_PATH, DEVICE)
    index_dim = len(VectorizeModel(VEC_MODEL_PATH, DEVICE).predict_vec("你好啊")[0])
    source_index_data = []
    with open(SOURCE_INDEX_DATA_PATH, encoding="utf8") as f:
        for line in f:
            ll = json.loads(line.strip())
            if len(ll["title"]) >= 2:
                source_index_data.append([ll["title"], ll])
            if len(ll["desc"]) >= 2:
                source_index_data.append([ll["desc"], ll])
            # if len(source_index_data) > 2000:
            #     break
    logger.info("load data done: {}".format(len(source_index_data)))

    # 节省空间,只取前N条
    source_index_data = source_index_data[:200000]

    # 2. 创建索引并灌入数据
    # 2.1 构造索引
    vec_searcher = VecSearcher()
    vec_searcher.build(index_dim, VEC_INDEX_DATA)

    # 2.2 推理向量
    vectorize_result = []
    for q in tqdm(source_index_data):
        vec = vec_model.predict_vec(q[0]).cpu().numpy()
        tmp_result = copy.deepcopy(q)
        tmp_result.append(vec)
        vectorize_result.append(copy.deepcopy(tmp_result))

    # 2.3 开始存入
    for idx in tqdm(range(len(vectorize_result))):
        vec_searcher.insert(vectorize_result[idx][2], vectorize_result[idx][:2])

    # 3. 保存
    vec_searcher.save()

2.4 检索

所谓的检索,就是把内容从数据库里搜出来,这里介绍两个吧,一个是从elasticsearch(后称为ES)中把数据搜索出来,另一个是从上面我写的组件里搜出来。

ES在python有专门的客户端,配合客户端和专用的检索语句DSL,具体的逻辑参考:python操作Elasticsearch数据库_python连接es数据库 ,这个和mysql的链接使用是类似的,难度不是很高。难度主要在检索语法的设计,因为ES主要是字面的检索,可能会有一些复杂的逻辑,与、或、非还有一些打分逻辑啥的,这块功能做得很灵活,具体的可以参考权威指南:Elasticsearch: 权威指南 | Elastic

至于上述写的组件,则比较简单,就是直接一个语句就好了。Search里面内置了对应的模型,内部已经把向量化和搜索都已经完成了。

if __name__ == "__main__":
    VEC_MODEL_PATH = "C:/work/tool/huggingface/models/simcse-chinese-roberta-wwm-ext"
    VEC_INDEX_DATA = "vec_index_test2023121201"
    searcher = Searcher(VEC_MODEL_PATH, VEC_INDEX_DATA)
    q = "什么人不能吃花生"
    print(searcher.search(q))

内部的逻辑可以重看一下这个函数,内部的逻辑基本就是3个过程——向量化、检索、排序。

def search(self, query, nums=3):
    logger.info("request: {}".format(query))

    q_vec = self.vec_model.predict_vec(query).cpu().numpy()

    recall_result = self.vec_searcher.search(q_vec, nums)

    rank_result = self.rank(query, recall_result)
    # rank_result = list(filter(lambda x:x[4] > 0.8, rank_result))

    logger.info("response: {}".format(rank_result))
    return rank_result

3.粗排

粗排,它的核心工作是对本路内容进行一个粗略的相似度排序,毕竟检索的目的是找到最接近的TOPN,这里不可绕开的要衡量“最接近“

粗排一定程度和检索逻辑绑定,其本质任务就是计算query和doc对应检索字段之间的相似度,利用相似度的数值,可以进行排序筛选和过滤。这里有亮点需要强调:

  • 粗排的核心目标是干掉“肯定不合适”的结果,所以常常要考虑的是“相似”or“不相似”的问题,作为对比,精排由于进入精排层的物料多半是和原query比较接近,此时的粗排的分数一般都会比较接近,此时精排任务已经变成对比哪个物料“更相似”,要求一个更能拉开物料之间分数差异的算法。

下面举几个用于粗排的相似度计算方法。

  • 如果是字面召回,重点关注的是字面的相似度,常见的是BM25,目前已经是非常普及的方案了,当然还有我之前有提到的cqr/ctr(cqr&ctr:文本匹配的破城长矛无监督字面相似度cqr/ctr源码),因为BM25的数值受到句子长度影响很明显,所以并不容易卡阈值,后者cqr/ctr方案则可以很好地处理这个一点,后者一般可以作为配合前者的存在。
  • 一般的向量召回则更多考虑cos、L2之类常用的距离。
  • 如果是数字等方面的召回,直接算误差即可。
  • 如果是地理位置,可以通过经纬度很容易计算到直线距离,有地图功能时甚至可以算出导航距离。

4.向量召回

向量召回在目前之所以得到流行,除了目前已经流传已久的泛化性的原因,还有一个是灵活性。只需要根据一个目标,把内容转化为向量,即可用于进行向量召回,不需要考虑各种索引的处理,从前文大家也知道不同索引要处理的事还挺麻烦的。

首先,需要强调的第一个问题,从表征目标出发,就是向量不止有语义向量,向量表征对标的内容可以是非常丰富的,在推荐系统中,类似协同过滤的设计,是可以转为向量来做的,再者在搜索领域,也有像淘宝(KDD21-淘宝向量检索)讲用户行为偏好转为向量的案例,就是常规意义的搜索,也可以通过query-用户点击的方式,结合内容来源、内容质量等特征,构造对比学习来学习向量的方式,因此大家可以考虑把思路打开,语义向量检索只是向量的一部分。

第二,是有关向量的特征,除了文本可以转化,还有其他特征,这点可以从大家比较熟知的推荐系统中借鉴,如果存在个性化信息,用户的行为、偏好、年龄、地点之类的是可以作为表征的输入的,另外物料侧,除了考虑多种类型的文本,如问题、回答、标题的常规已有特征外,还有内容质量、内容用户画像、话题标签等特征,内容质量可以是用户平均停留时间、点击率之类的,内容用户画像则是对表达喜欢的用户的特征进行表征,另外话题是可以通过内容理解来抽取,这些特征都非常有利于进行召回。

5.多路召回

多路召回是搜索里面很常见的操作。因为用户提问的复杂性、内容的多样性等原因,往往不会一路把所有内容都召回回来,如何分路成为一个值得探讨的问题,下面会分几个维度来讲多路召回可能的操作,并会提及具体的使用场景,供大家参考使用。

  • 意图/路由划分下的多路召回。针对不同的需求,可能需要不同的操作来满足,例如要查音乐和查天气,后续的操作会不同,搜音乐以来音乐库,查天气则是查询天气接口,此时显然就是要走不同的链路来进行内容召回,再者同样是音乐类,用户可能是按歌手、歌名、流派等因素查,复杂以后很可能需要多路召回来实现,另外也需要配合上游的意图识别、实体抽取等因素,来切分链路,然后根据链路来进行召回。
  • 不同内容结构下的多路召回。这个比较简单,举例,音乐和购物,背后的数据结构是不同的,但用户的说法可能是模糊的,例如说一个专辑名,可能是要买专辑,也可能是想听音乐,此时多路召回是一个不错的选择,不着急直接看哪个优先级高,这事可以放精排层来做。
  • 不同检索方式下的多路召回。这个也比较简单,对不同的检索方案,不好放一起的,分两路是不错的选择(当然也有不分的方法),例如向量召回和字面召回两路。
  • 不同表征方式的多路召回。上面有提到向量召回的多样性,基于表征目标和表征特征是可以有多种向量表征方式的,基于不同的表征特征,就可以有多种不一样的召回方式。

提醒,多种召回方式,在一般情况下都是并发进行的,毕竟他们运行需要一定的时间,而且一般互不影响,一般就是服务化后用多进程请求的方式来进行。

6.小结

本文讨论了搜索系统中召回层的操作,重点聚焦在索引、粗排、向量召回、多路召回等工作中,供大家更深入全面理解搜索系统的召回部分。另外,仍旧建议大家多去翻翻《信息检索导论》,虽然现在视角里面的内容算是旧的,但时至今日仍有大量知识会用到,大家可以系统学习。