基于Lindorm多模能力快速搭建智能搜索服务

本文介绍如何基于Lindorm全文、向量融合检索以及在线推理等多模能力,快速搭建智能搜索服务。

背景介绍

近两年,AI大模型的迅速崛起为搜索产品注入了新的活力。这一发展不仅改变了用户获取信息的方式,也为企业和开发者带来了全新的机遇。如今,检索的形式已不再局限于关键词匹配,而是朝向更高效、更智能的对话式问答方向发展。过去,用户检索一个专业问题通常需要从多个平台搜集资料,并花大量时间对这些资料进行整合才能解决问题。而现在,智能检索则可以深入理解用户的搜索意图,快速整合提炼信息,显著提高了信息获取效率。

随着用户信息消费习惯的变化,搜索产品的智能化已成为提升用户体验、增加用户粘性的关键因素。微软率先在Bing中整合GPT模型,以提供更符合用户需求的回答;Google也推出了Gemini,直接针对用户提问来生成答复。

越来越多企业希望将传统搜索服务升级成智能搜索,但在构建智能搜索的过程中面临着以下三大挑战:

  • 快速搭建和迭代的需求:当前AI行业还处于快速发展阶段,企业需要将主要精力集中在核心业务创新上。从零开始搭建基础设施不仅效率低下,而且涉及多种架构各异的组件,运维方式和技术栈差异大,导致学习成本高,同时部署或更新模型也需要大量的时间和技术投入。

  • 数据规模膨胀和成本增加:随着数据规模的不断增长,内存成本和计算资源的需求也会显著增加。传统的检索方案可能无法有效处理大规模数据,导致检索速度下降,并且增加了硬件投入和运维成本。

  • 检索的准确性和灵活性:通用搜索引擎方案的检索对业务全程黑盒,使得业务难以根据实际效果对架构进行调整。因此,企业需要一种更加灵活的智能搜索解决方案,例如支持自定义部署微调后的嵌入(Embedding)和重排序(Rerank)模型等。

本文将通过Python代码演示如何基于Lindorm全文、向量融合检索以及在线推理能力,帮您快速构建智能搜索业务。

image

步骤一:开通Lindorm多模能力

  1. 登录Lindorm管理控制台

  2. 单击页面左上角的创建

  3. Lindorm售卖页面,设置以下配置项:

    配置项

    说明

    商品类型

    选择Lindorm新版售卖

    形态选择

    选择生产型

    实例配置

    勾选宽表引擎LTS搜索引擎向量引擎AI引擎

    说明
    • 提交工单开通AI引擎:

      • 在创建实例前,提交工单开通白名单权限,之后在创建过程中即可勾选AI引擎

      • 若已创建实例,请您提交工单单独开通AI引擎功能。

    • Lindorm向量引擎的功能实现依赖搜索引擎,因此需同时开通。

    • 创建实例的方法及配置项说明,请参见创建实例

    • 您可以根据实际业务需求,变更实例的规格与节点,具体操作请参见变更实例规格

  4. 单击立即购买,并根据售卖页的指引,完成支付。

步骤二:配置白名单

将客户端IP添加至Lindorm白名单。如何添加,请参见设置白名单

步骤三:下载代码

请下载完整代码示例lindorm_smart_search,以便在后续步骤中配置和构建智能搜索业务。

步骤四:环境配置

运行环境

已安装Python环境,要求安装Python 3.10及以上版本。

安装依赖

  1. 下载依赖requirements.txt

  2. 安装依赖。

 pip3 install -r requirements.txt 

配置Lindorm连接地址

在已下载代码中的env脚本里配置Lindorm各引擎的连接地址。连接地址获取方法请参见查看连接地址

大模型推理依赖百炼,需要配置百炼的API KeyAPI Key获取方法请参见获取API Key

# AI host(配置AI引擎连接地址)
AI_HOST="ld-bp17j28j2y7pm****-proxy-ai-pub.lindorm.aliyuncs.com"
AI_PORT="9002"

# Row host(配置向量引擎连接地址)
ROW_HOST="ld-bp17j28j2y7pm****-proxy-lindorm-pub.lindorm.aliyuncs.com"
ROW_PORT="33060"

# Search host(配置搜索引擎连接地址)
SEARCH_HOST="ld-bp17j28j2y7pm****-proxy-search-pub.lindorm.aliyuncs.com"
SEARCH_PORT="30070"

# Lindorm user password(配置Lindorm用户密码)
LD_USER="root"
LD_PASSWORD="test****"

# 训练数据集的位置
LOAD_FILE_PATH="data/cmrc2018_train.json"

# 返回结果的最大数量
SEARCH_TOP_K="5"

# 配置百炼的API Key
DASHSCOPE_API_KEY="sk-****"

安装Jupyter Notebook

  1. 安装Jupyter。

    pip3 install jupyter
  2. 生成配置文件~/.jupyter/jupyter_notebook_config.py

    jupyter notebook --generate-config
  3. 获取要设置访问Jupyter的密码 。

    from passlib.hash import argon2
    print(argon2.hash('Vector123'))

    输出示例如下:

    $argon2id$v=19$m=65536,t=3,p=4$4TyndM75H8N4b+291xqjdA$n0QSxlv/uCLjGR0TX/jbD/XFlEu9BzQGI1b2Mcu6gxg
  4. 使用 vim ~/.jupyter/jupyter_notebook_config.py 命令打开并编辑配置文件。

    #文件最后几行加上如下配置
    c.NotebookApp.ip = '*'
    # 笔记本的默认打开目录, 自行设置
    # 笔记本启动后是否打开浏览器, 设为 False即可
    c.NotebookApp.open_browser = False
    
    # 默认访问端口, 可自行修改
    c.NotebookApp.port = 9000
    
    # 下方代码中argon2后面的内容替换成上一步骤已获取到的Jupyter的密码
    c.NotebookApp.password = 'argon2:$argon2id$v=19$m=65536,t=3,p=4$4TyndM75H8N4b+291xqjdA$n0QSxlv/uCLjGR0TX/jbD/XFlEu9BzQGI1b2Mcu6gxg'
    
    # 这个主目录非常重要,后续您的访问文件需要放在该目录
    c.NotebookApp.notebook_dir = u'/data/lindorm/LindormDemo'  #设置你打开jupyter notebook的时候想显示的位置,可以设置成经常使用的路径
  5. 启动Jupyter服务。前端启动,可以查看是否有启动错误。若需停止服务,可使用Ctrl+C来终止进程。

    jupyter notebook
    说明

    实际业务使用过程中建议使用后端启动Jupyter服务。后端启动请执行:nohup jupyter notebook --allow-root >/tmp/jupyter.log 2>&1 &

步骤五:运行ipynb脚本

Jupyter运行已下载的ipynb脚本。

部署流程图

image

部署环节说明

环节

说明

涉及引擎

创建父表(知识库)

创建一个父表,用于存储知识库的原始文本。

"""
创建父表语句,可按需修改
"""
def create_parent_table(self):
    sql = """
        CREATE TABLE IF NOT EXISTS {} (
            document_id  VARCHAR, 
            title VARCHAR, 
            context VARCHAR,
            status   INT, 
            metadata JSON, 
            PRIMARY KEY (document_id)
        )
    """.format(self.parent.row_parent_table)
    print("Create parent table sql: ", sql)
    self.common_create_table(sql)

宽表引擎

创建子表(切分知识库)

创建一个子表,用于存储原文本切分后的段落,这些段落可以根据长度或者结合长度与标点符号进行切分。

"""
创建子表语句,可按需修改
"""
def create_child_table(self):
    sql = """
        CREATE TABLE IF NOT EXISTS {} (
            document_id VARCHAR,
            chunking_position INT,
            title  VARCHAR,
            {}   VARCHAR,
            {} VARCHAR,
            metadata JSON,
            chunking_number INT,
            PRIMARY KEY (document_id, chunking_position)
        )
    """.format(self.parent.row_child_table, 
               self.parent.text_field, 
               self.parent.vector_field)
    print("Create child table sql: ", sql)
    self.common_create_table(sql)

宽表引擎

模型部署

部署Embedding模型:

部署自带开源BGE-M3模型,用于后续的文本向量化处理,将文本向量化后,可以通过语义进行近似检索。

"""
创建Embedding模型,目前推荐使用BGE-M3模型即可
"""
def create_embedding_model(self):
    if self.check_model_exists(self.parent.embedding_model_name):
        print("Model {} exists, skip create".format(self.parent.embedding_model_name))
        return

    self.common_create_model(self.parent.embedding_model_name, 
                             "huggingface://BAAI/bge-m3",
                             "FEATURE_EXTRACTION",
                             "BGE_M3")
                             
"""
对输入的文本进行Embedding成向量
"""
def text_embedding(self, input_text:str):
    url = "http://{}:{}/v1/ai/models/{}/infer".format(Config.AI_HOST, 
                                                      Config.AI_PORT, 
                                                      self.parent.embedding_model_name)
    input_text_utf8 = input_text.encode('utf-8').decode('utf-8')
    data = {
        "input": [input_text_utf8]
    }
    response = requests.post(url, data=json.dumps(data), headers=self.headers)
    json_response = response.json()
    if response.status_code != 200 or json_response["success"] is False:
        raise Exception("http request failed, status code: {}".format(json_response["msg"]))
    return json_response["data"][0]

AI引擎

部署Rerank模型:

部署自带开源BGE Rerank模型,用于对RAG检索结果重排。

"""
创建Reranker模型,目前推荐使用BGE-Reranker-V2-M3
"""
def create_reranker_model(self):
    if self.check_model_exists(self.parent.reranker_model_name):
        print("Model {} exists, skip create".format(self.parent.reranker_model_name))
        return
    
    self.common_create_model(self.parent.reranker_model_name,
                             "huggingface://BAAI/bge-reranker-v2-m3",
                             "SEMANTIC_SIMILARITY",
                             "BGE_RERANKER_V2_M3")


"""
根据问题以及目前答案的候选集,对答案进行重新排序
* input_text: 输入的问题
* chunks: 答案列表
"""
def reranker(self, input_text: str, chunks: List[str]):
    url = "http://{}:{}/v1/ai/models/{}/infer".format(Config.AI_HOST,
                                                      Config.AI_PORT, 
                                                      self.parent.reranker_model_name)        
    data = {
        "input": {"query": input_text, "chunks": chunks}
    }
    
    response = requests.post(url, data=json.dumps(data), headers=self.headers)
    json_response = response.json()
    if response.status_code != 200 or json_response["success"] is False:
        raise Exception("http request failed, status code: {}".format(json_response["msg"]))
    return json_response["data"]

AI引擎

部署问答模型:

通过API Key的方式访问大模型,将提示词(Prompt)提交给大模型。

"""
使用api_key访问通义前问的方式
"""
# reference: https://help.aliyun.com/zh/dashscope/developer-reference/qwen-api
class AliQwen():
    def __init__(self):
        self.api_key = Config.DASHSCOPE_API_KEY
        self.model_name = "qwen-turbo"
        self.PROMPT_TEMPLATE = """已知信息:
{context} 
根据上述已知信息,专业的来回答用户的问题。如果无法从中得到答案,请说 “根据已知信息无法回答该问题” 或 “没有提供足够的相关信息”,不允许在答案中添加编造成分,答案请使用中文。 问题是:{question}"""
    
    """
    非流式对话大模型
    """
    def chat(self, prompt: str):
        response = Generation.call(model=self.model_name, prompt=prompt, stream=False, api_key=self.api_key)
        if response.status_code == HTTPStatus.OK:
            return response.output.text
        else:
            raise Exception(response.message)
    
    """
    流式对话大模型
    """
    def chat_stream(self, prompt: str):
        responses = Generation.call(model=self.model_name, prompt=prompt, stream=True, api_key=self.api_key)
        for response in responses:
            if response.status_code == HTTPStatus.OK:
                yield response.output.text
            else:
                raise Exception(response.message)
    
    """
    问题与相关提示一起组装
    """
    def gen_prompt(self, query: str, context: str):
        return self.PROMPT_TEMPLATE.replace("{question}", query).replace("{context}", context)

-

创建搜索Pipeline

对子表创建搜索Pipeline,子表内容自动同步到搜索引擎,并在搜索引擎内完成Embedding。

"""
创建Pipeline,搜索内部自动对text字段调用AI引擎进行Embedding,写入vector_field 字段
"""
def create_pipeline(self):
    if self.check_pipeline_exists():
        print("Pipeline {} exists".format(self.parent.pipeline_name))
        # 如果pipeline已经存在,目前策略是跳过,如果是需要调整参数重新创建,则注释掉下方的return
        return
    inner_ai_host = Config.AI_HOST
    if "-pub" in inner_ai_host:
        inner_ai_host = inner_ai_host.replace("-pub", "-vpc")
        
    pipeline = {
        "description": "demo_chunking pipeline",
        "processors": [
            {
                "text-embedding": {
                    "inputFields": [self.parent.text_field],  
                    "outputFields": [self.parent.vector_field],
                    "userName": Config.LD_USER,
                    "password": Config.LD_PASSWORD,
                    "url": "http://{}:{}".format(inner_ai_host, int(Config.AI_PORT)),
                    "modeName": self.parent.embedding_model_name
                }
            }
        ]
    }   
    try:
        response = self.client.ingest.put_pipeline(id=self.parent.pipeline_name, body=pipeline)
        print("Create pipeline success", response)
    except Exception as e:
        print("Create pipeline errr ", e) 

宽表引擎

搜索引擎

向量引擎

创建搜索索引和向量索引

创建搜索索引,提高检索效率。

示例中使用HNSW,如果数据量大,推荐使用IVF_PQ。小数据集则可以使用FLAT。

"""
子表创建搜索索引,本文范例中使用的是HNSW索引,如果数据量较大,建议使用IVF_PQ索引
https://help.aliyun.com/document_detail/2773371.html
"""
# Reference: https://help.aliyun.com/document_detail/260841.html
def create_child_table_index(self):
    sql = """
    CREATE INDEX IF NOT EXISTS %s USING SEARCH ON %s (
            document_id(indexed=false,columnStored=false),
            chunking_position(indexed=false,columnStored=false),
            title(type=text,analyzer=ik),
            metadata(indexed=false,columnStored=false),
            %s(type=text,analyzer=ik),
            %s(mapping='{
                "type": "knn_vector",
                "dimension": 1024,
                "data_type": "float",
                "method": {
                    "engine": "lvector",
                    "name": "hnsw",
                    "space_type": "cosinesimil",
                    "parameters": {
                        "ef_construction": 500,
                        "m": 24
                        }
                    }
            }')) WITH (INDEX_SETTINGS='{
            "index": {
                "knn" : "true",
                "knn.vector_empty_value_to_keep" : true,
                "default_pipeline": "%s"
            }}',
                SOURCE_SETTINGS=
                '{
                    "includes": ["document_id", "chunking_position","title", "text"]
                }',
                numShards=2
            )
    """.strip() %  (self.parent.chunking_index_name,
                    self.parent.row_child_table,
                    self.parent.text_field,
                    self.parent.vector_field,
                    self.parent.pipeline_name)
    print("Create search index sql: \n ", sql)
    self.common_create_table(sql)

宽表引擎

搜索引擎

向量引擎

检索方式说明

  • 检索选项:全文检索/向量检索/全文和向量混合检索。

  • 检索重排:使用重排/不使用重排。

  • Prompt:使用父文档/不使用父文档(使用Context进行Prompt)。

检索方式

具体描述

流程图

全文和向量混合检索

全文和向量混合检索

def demo_rrf_search():
    lindorm = Lindorm()
    query="国际初中科学奥林匹克主要比赛科目"
    results = lindorm.lindormSearch.rrf_search(query)
    display(JSON(results, expanded=True, root="rrf_search_result"))
    lindorm.close()
demo_rrf_search()
image

获取原始语料

+使用检索重排

先获取原始语料,检索重排

def demo_rerank():
    lindorm = Lindorm()
    query="国际初中科学奥林匹克主要比赛科目"
    topk=int(Config.SEARCH_TOP_K)
    origin_result = lindorm.lindormSearch.rrf_search(query,  topk * 2)
    display(JSON(origin_result[0:topk], expanded=True, root="Before rerank result"))
    texts = [item["_source"]["text"] for item in origin_result]
    reranker_result = lindorm.lindormAI.reranker(query, texts)
    reranked_origin_result = handler_reranker(origin_result, reranker_result, topk)
    display(JSON(reranked_origin_result, expanded=True, root="After rerank result"))
    lindorm.close()
demo_rerank()
image

全文和向量混合检索

+使用检索重排

+Prompt

  1. 先进行全文和向量混合检索,将混合检索的结果进行重排序。

  2. 将问题以及检索结果一起形成Prompt,提交给大模型。

def demo_chat_with_child_chunking():
    lindorm = Lindorm()
    topk=int(Config.SEARCH_TOP_K)
    query="国际初中科学奥林匹克主要比赛科目"
    search_result = lindorm.lindormSearch.rrf_search(query, topk * 2)
    texts = [item["_source"]["text"] for item in search_result]
    reranker_result = lindorm.lindormAI.reranker(query, texts)
    prompt_context = "\n".join(item['chunk'] for item in reranker_result[0:topk])
    ali_qwen = AliQwen()
    prompt = ali_qwen.gen_prompt(query, prompt_context)
    output_text = ""
    for part in ali_qwen.chat_stream(prompt):
        output_text = part 
        wrapped_text = wrap_text(output_text, 80)
        clear_output(wait=True)
        display(HTML(f"<pre style='color: red;'>{wrapped_text}</pre>"))
    
    wrapped_text = wrap_text(prompt, 120)
    display(HTML(f"<pre>提示模版为:\n{wrapped_text}</pre>"))
    lindorm.close()
    
demo_chat_with_child_chunking()
image

全文和向量混合检索

+使用检索重排

+Context Prompt

  1. 先进行全文向量混合检索,将混合检索的结果进行重排序。

  2. 根据父表(create_parent_table表)的PRIMARY KEY字段,从父表中查询初步检索结果的Context。

  3. 将问题与Context一起形成Prompt,并通过API Key访问大模型,获得回答。

def demo_chat_with_parent():
    lindorm = Lindorm()
    topk=int(Config.SEARCH_TOP_K)
    query="国际初中科学奥林匹克主要比赛科目"
    search_result = lindorm.lindormSearch.rrf_search(query, topk * 2)
    texts = [item["_source"]["text"] for item in search_result]
    reranker_result = lindorm.lindormAI.reranker(query, texts)
    reranked_origin_result = handler_reranker(search_result, reranker_result, topk)
    unique_document_ids = list(OrderedDict.fromkeys(item['_source']['document_id'] for item in reranked_origin_result))
    contexts = []
    for document_id in unique_document_ids:
        contexts.append(lindorm.lindormRow.get_parent_context(pk=document_id))
    prompt_context = "\n".join(contexts)        
    ali_qwen = AliQwen()
    prompt = ali_qwen.gen_prompt(query, prompt_context)    
    # stream
    for part in ali_qwen.chat_stream(prompt):
        output_text = part 
        wrapped_text = wrap_text(output_text, 80)
        clear_output(wait=True)
        display(HTML(f"<pre style='color: red;'>{wrapped_text}</pre>"))
    
    wrapped_text = wrap_text(prompt, 120)
    display(HTML(f"<pre>提示模版为:\n{wrapped_text}</pre>")) 
    lindorm.close()

demo_chat_with_parent()
image