全文向量混合检索

全文向量混合检索结合了全文检索和纯向量检索,相较于单独使用全文检索或向量检索,可以更好地获取全文与向量的融合信息。本文介绍基于Go语言,使用Lindorm向量引擎执行全文向量混合检索的方法。

前提条件

  • 已安装Go环境,建议安装Go 1.17及以上版本。

  • 已开通向量引擎。如何开通,请参见开通向量引擎

  • 已开通搜索引擎。如何开通,请参见开通指南

  • 搜索引擎为3.9.10及以上版本。如何查看或升级当前版本,请参见搜索引擎版本说明升级小版本

    重要

    如果您的搜索引擎为3.9.10以下版本,但控制台显示已是最新版本,请联系Lindorm技术支持(钉钉号:s0s3eg3)。

  • 已将客户端IP地址添加至Lindorm白名单,具体操作请参见设置白名单

准备工作

安装ElasticSearch Go客户端

您需要安装ElasticSearch Go客户端,支持以下两种方式:

  • 方式一:

    1. 修改go.mod文件配置,添加相关依赖,具体如下:

      module EsGoClient 
      
      go 1.23.0 //请替换为您的Go版本
      
      require (
      	github.com/elastic/go-elasticsearch/v7 v7.10.0
      	github.com/google/uuid v1.6.0 // 非必须,示例代码中生成uuid需要该项,请根据实际情况配置
      )
    2. 执行以下命令,更新go.mod文件。

      go mod tidy
  • 方式二:执行以下命令直接下载。

    go get github.com/elastic/go-elasticsearch/v7@v7.10.0

连接搜索引擎

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"github.com/elastic/go-elasticsearch/v7"
	"github.com/elastic/go-elasticsearch/v7/esapi"
	"log"
	"net/http"
	"strings"
	"time"
)

var client *elasticsearch.Client

func init() {
	cfg := elasticsearch.Config{
		Addresses: []string{
			"http://ld-t4n5668xk31ui****-proxy-search-pub.lindorm.aliyuncs.com:30070",
		},
		Username: "<Username>",
		Password: "<Password>",
		Transport: &http.Transport{
			MaxIdleConnsPerHost: 20,
		},
	}
	var err error
	client, err = elasticsearch.NewClient(cfg)
	if err != nil {
		fmt.Println("Init client error %s", err)
	}
}

func handlerCommonError(res *esapi.Response) {
	var errorResponse map[string]interface{}
	if err := json.NewDecoder(res.Body).Decode(&errorResponse); err != nil {
		fmt.Printf("Error parsing the error response: %s\n", err)
		return
	}
	fmt.Printf("ERROR %v\n", errorResponse)
}

type Source struct {
	Field1 int      `json:"field1"`
	Field2 string   `json:"field2"`
	Brand  string   `json:"brand"`
	Merit  string   `json:"merit"`
	Tag    []string `json:"tag"`
}

type KnnResponse struct {
	Hits struct {
		Hits []struct {
			ID     string  `json:"_id"`
			Score  float64 `json:"_score"`
			Source Source  `json:"_source"`
		} `json:"hits"`
	} `json:"hits"`
}

其中AddressesUsernamePassword分别为搜索引擎的连接地址、默认用户名和默认密码,如何获取,请参见查看连接地址

全文+向量双路召回(RRF融合检索)

在一些查询场景中,您需要综合考虑全文索引和向量索引的排序,根据一定的打分规则对各自返回的结果进一步进行加权计算,并得到最终的排名。

创建索引

以下示例使用hsnw算法。

重要

如果使用ivfpq算法,需要先将knn.offline.construction设置为true,导入离线数据后发起索引构建,构建成功后方可进行查询,详细说明请参见创建向量索引索引构建

func createTextVectorHybridSearchIndex() {
	indexName := "vector_text_hybridSearch"

	vectorField := "vector1"
	textField := "text_field"
	/**
	下方中 使用 hnsw 将 "meta": {"offline.construction": "false"} 代表在线索引,
	如果是ivfpq, "meta": {"offline.construction": "true"} 写入一定量数据后再发起索引构建
	*/
	indexBody := map[string]interface{}{
		"settings": map[string]interface{}{
			"index": map[string]interface{}{
				"number_of_shards": 2,
				"knn":              true,
			},
		},
		"mappings": map[string]interface{}{
			"_source": map[string]interface{}{
				"excludes": []string{vectorField},
			},
			"properties": map[string]interface{}{
				vectorField: map[string]interface{}{
					"type":      "knn_vector",
					"dimension": 3,
					"data_type": "float",
					"meta": map[string]interface{}{
						"offline.construction": "false",
					},
					"method": map[string]interface{}{
						"engine":     "lvector",
						"name":       "hnsw",
						"space_type": "l2",
						"parameters": map[string]interface{}{
							"m":               24,
							"ef_construction": 500,
						},
					},
				},
				textField: map[string]interface{}{
					"type":     "text",
					"analyzer": "ik_max_word",
				},
				"field1": map[string]interface{}{
					"type": "long",
				},
				"field2": map[string]interface{}{
					"type": "keyword",
				},
			},
		},
	}

	body, err := json.Marshal(indexBody)
	if err != nil {
		log.Fatalf("Error marshaling index body: %s", err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
	defer cancel()

	req := esapi.IndicesCreateRequest{
		Index: indexName,
		Body:  bytes.NewReader(body),
	}

	res, err := req.Do(ctx, client)
	if err != nil {
		log.Fatalf("Error getting response: %s", err)
	}
	defer res.Body.Close()

	if res.IsError() {
		log.Printf("Error creating index: %s", res.String())
	} else {
		fmt.Println("Index created successfully")
	}
}

数据写入

func bulkWriteTextVectorHybridIndex() {
	indexName := "vector_text_hybridSearch"
	var buf bytes.Buffer
	vectorField := "vector1"
	textField := "text_field"

	type Data struct {
		Field1    int
		Field2    string
		Vector1   []float64
		TextField string
	}
	data := []Data{
		{Field1: 1, Field2: "flag1", Vector1: []float64{2.5, 2.3, 2.4}, TextField: "hello test5"},
		{Field1: 2, Field2: "flag1", Vector1: []float64{2.6, 2.3, 2.4}, TextField: "hello test6 test5"},
		{Field1: 3, Field2: "flag1", Vector1: []float64{2.7, 2.3, 2.4}, TextField: "hello test7"},
		{Field1: 4, Field2: "flag2", Vector1: []float64{2.8, 2.3, 2.4}, TextField: "hello test8 test7"},
		{Field1: 5, Field2: "flag2", Vector1: []float64{2.9, 2.3, 2.4}, TextField: "hello test9"},
	}
	for i, item := range data {
		// id 可以根据业务自己指定
		meta := []byte(fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d" } }`, indexName, i+1))
		data := map[string]interface{}{
			"field1":    item.Field1,
			"field2":    item.Field2,
			vectorField: item.Vector1,
			textField:   item.TextField,
		}
		dataJson, err := json.Marshal(data)
		if err != nil {
			fmt.Println("Error marshaling JSON:", err)
			continue
		}
		buf.Grow(len(meta) + len(dataJson) + 2)
		buf.Write(meta)
		buf.WriteByte('\n')
		buf.Write(dataJson)
		buf.WriteByte('\n')
	}

	bulkReq := esapi.BulkRequest{
		Body:  &buf,
		Index: indexName,
	}

	// 设置超时
	ctx, cancel := context.WithTimeout(context.Background(), 30*2*time.Second)
	defer cancel()

	res, err := bulkReq.Do(ctx, client)
	defer res.Body.Close()
	if err != nil {
		fmt.Println("happend err, %s", err)
	}
	if res.IsError() {
		handlerCommonError(res)
	}
}

数据查询(融合查询)

RRF计算方式如下:

进行查询时系统会根据传入的rrf_rank_constant参数,对全文检索和向量检索分别获得的topK结果进行处理。对于每个返回的文档_id,使用公式1/(rrf_rank_constant + rank(i))计算得分,其中rank(i)表示该文档在结果中的排名。

如果某个文档_id同时出现在全文检索和向量检索的topK结果中,其最终得分为两种检索方法计算得分之和。而仅出现在其中一种检索结果中的文档,则只保留该检索方法的得分。

rrf_rank_constant = 1为例,计算结果如下:

# doc   | queryA     | queryB         | score
_id: 1 =  1.0/(1+1)  + 0              = 0.5
_id: 2 =  1.0/(1+2)  + 0              = 0.33
_id: 4 =    0        + 1.0/(1+2)      = 0.33
_id: 5 =    0        + 1.0/(1+1)      = 0.5

支持通过_search接口或_msearch_rrf接口进行融合查询,两种接口的对比如下:

接口

开源性

易读性

是否支持全文、向量检索比例调整

_search

兼容

不易读

支持

_msearch_rrf

自研接口

易读

不支持

以下是两种场景下使用_search接口或_msearch_rrf接口的具体写法:

无标量字段过滤的场景

使用开源_search接口

优点:兼容开源_search接口,支持通过rrf_knn_weight_factor参数调整全文检索与纯向量检索的比例。

缺点:写法较为复杂。

ext.lvector扩展字段中,不设置filter_type,则表示该RRF检索只包含全文检索和纯向量检索,同时向量检索中无需进行标量字段的过滤。

func hybridSearchNoFilterType() {
	indexName := "vector_text_hybridSearch"
	vectorField := "vector1"
	textField := "text_field"
	vector := []float64{2.8, 2.3, 2.4} // Example vector
	filterText := "test5 test6 test7 test8 test9"

	query := map[string]interface{}{
		"size":    10,
		"_source": true,
		"query": map[string]interface{}{
			"knn": map[string]interface{}{
				vectorField: map[string]interface{}{
					"vector": vector,
					"filter": map[string]interface{}{
						"match": map[string]interface{}{
							textField: filterText,
						},
					},
					"k": 10,
				},
			},
		},
		"ext": map[string]interface{}{
			"lvector": map[string]interface{}{
				"hybrid_search_type":    "filter_rrf",
				"rrf_rank_constant":     "1",
				"rrf_knn_weight_factor": "0.5",
				"ef_search":             "100",
			},
		},
	}

	// Convert the map to JSON
	body, err := json.Marshal(query)
	if err != nil {
		fmt.Printf("Error marshaling query to JSON: %s\n", err)
		return
	}

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	search := esapi.SearchRequest{
		Index: []string{indexName},
		Body:  bytes.NewReader(body),
	}

	searchResponse, err := search.Do(ctx, client)
	if err != nil {
		fmt.Printf("Knn err: %s\n", err)
		return
	}
	defer searchResponse.Body.Close()

	if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}

	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Printf("Knn result Decode err: %s\n", err)
		return
	}

	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
	}
}

如果使用ivfpq算法:不使用属性过滤的场景,ext.lvector扩展参数可以设置为:

 "ext": {"lvector": {
    "hybrid_search_type": "filter_rrf", 
    "rrf_rank_constant": "60",
    "rrf_knn_weight_factor": "0.5",
    "nprobe": "80", 
    "reorder_factor": "2",
    "client_refactor":"true"
  }}
说明
  • 您可以适当增加nprobe的值,例如设置为80、100、120、140160。nprobe参数对性能的损耗远比reorder_factor参数小,但也不宜将nprobe的值设置得过大。

  • 如果查询语句中参数k的值设置得较大,例如大于等于100,建议将reorder_factor设置为1或者2

使用自研_msearch_rrf接口

优点:写法较清晰。

缺点:不兼容开源_search接口,不支持调整全文检索与纯向量检索的比例。

func getDenseVectorQuery(indexName string, vectorField string, denseVector string, topK int) (bytes.Buffer, error) {
	var buf bytes.Buffer
	index := fmt.Sprintf(`{"index": "%s"}`, indexName)
	query := map[string]interface{}{
		"size": topK,
		"_source": []string{
			"field1",
			"field2",
		},
		"query": map[string]interface{}{
			"knn": map[string]interface{}{
				vectorField: map[string]interface{}{
					"vector": denseVector,
					"k":      topK,
				},
			},
		},
		"ext": map[string]interface{}{
			"lvector": map[string]interface{}{
				"ef_search": "100",
			},
		},
	}
	knnBytes, err := json.Marshal(query)
	if err != nil {
		return buf, err
	}
	buf.WriteString(index + "\n")
	buf.Write(knnBytes)
	buf.WriteString("\n")
	return buf, nil
}

func getTextQuery(indexName string, textField string, queryText string, topK int) (bytes.Buffer, error) {
	var buf bytes.Buffer
	index := fmt.Sprintf(`{"index": "%s"}`, indexName)
	query := map[string]interface{}{
		"size": topK,
		"_source": []string{
			"field1",
			"field2",
		},
		"query": map[string]interface{}{
			"match": map[string]interface{}{
				textField: queryText,
			},
		},
	}
	knnBytes, err := json.Marshal(query)
	if err != nil {
		return buf, err
	}
	buf.WriteString(index + "\n")
	buf.Write(knnBytes)
	buf.WriteString("\n")
	return buf, nil
}

func hybridSearchNoFilterTypeMSearchRRF() {
	indexName := "vector_text_hybridSearch"
	vectorField := "vector1"
	textField := "text_field"
	buf := new(bytes.Buffer)
	query, _ := getDenseVectorQuery(indexName, vectorField, "[2.8, 2.3, 2.4]", 10)
	query2, _ := getTextQuery(indexName, textField, "test5 test6 test7 test8 test9", 10)
	buf.Write(query.Bytes())
	buf.Write(query2.Bytes())

	url := "/_msearch_rrf?re_score=true&rrf_rank_constant=1"
	mSearchRRFRequest, err := http.NewRequest("POST", url, buf)
	if err != nil {
		fmt.Println("Build err ", err)
		return
	}
	mSearchRRFRequest.Header.Set("Accept", "application/json")
	mSearchRRFRequest.Header.Set("Content-Type", "application/json")

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	res, err := client.Transport.Perform(mSearchRRFRequest.WithContext(ctx))
	if err != nil {
		fmt.Println("Err ", err)
		return
	}
	defer res.Body.Close()

	response := esapi.Response{
		StatusCode: res.StatusCode,
		Body:       res.Body,
		Header:     res.Header,
	}

	var responseBody KnnResponse
	if err := json.NewDecoder(response.Body).Decode(&responseBody); err != nil {
		fmt.Printf("Knn result Decode err: %s\n", err)
		return
	}

	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
	}
}
说明

连接参数中必须添加re_score=true

包含标量字段过滤场景

使用开源_search接口

ext.lvector扩展字段中设置filter_type参数,则表示该RRF检索中的向量检索还需进行标量字段的过滤。

说明

RRF融合检索时,如果希望携带filter过滤条件,需要将全文检索的query条件和用于过滤的filter条件分别设置到两个bool表达式中,通过bool.must进行连接。must中的第一个bool表达式将用于全文检索,计算全文匹配度得分。must中第二个bool filter表达式将用于knn检索的过滤条件。

func hybridSearchWithFilterType() {
	indexName := "vector_text_hybridSearch"
	vectorField := "vector1"
	textField := "text_field"
	size := 10
	// "_source": 指定为true 返回所有字段;指定列表返回指定字段;指定false 只返回_id与_score
	knnQuery := map[string]interface{}{
		"size": 10,
		"_source": []string{
			"field1",
			"field2",
		},
		"query": map[string]interface{}{
			"knn": map[string]interface{}{
				vectorField: map[string]interface{}{
					"vector": []float64{2.8, 2.3, 2.4},
					"filter": map[string]interface{}{
						"bool": map[string]interface{}{
							"must": []interface{}{
								map[string]interface{}{
									"bool": map[string]interface{}{
										"must": []interface{}{
											map[string]interface{}{
												"match": map[string]interface{}{
													textField: map[string]interface{}{
														"query": "test5 test6 test7 test8 test9",
													},
												},
											},
										},
									},
								},
								map[string]interface{}{
									"bool": map[string]interface{}{
										"filter": []interface{}{
											map[string]interface{}{
												"range": map[string]interface{}{
													"field1": map[string]interface{}{
														"gt": 2,
													},
												},
											},
											map[string]interface{}{
												"term": map[string]interface{}{
													"field2": "flag2",
												},
											},
										},
									},
								},
							},
						},
					},
					"k": size,
				},
			},
		},
		"ext": map[string]interface{}{
			"lvector": map[string]interface{}{
				"hybrid_search_type":    "filter_rrf",
				"filter_type":           "efficient_filter",
				"rrf_rank_constant":     "1",
				"rrf_knn_weight_factor": "0.5",
				"ef_search":             "100",
				"k_expand_scope":        "1000",
			},
		},
	}

	content, err := json.Marshal(knnQuery)
	if err != nil {
		fmt.Printf("Error marshalling JSON: %s\n", err)
		return
	}

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	search := esapi.SearchRequest{
		Index: []string{indexName},
		Body:  bytes.NewReader(content),
	}
	searchResponse, err := search.Do(ctx, client)

	if err != nil {
		fmt.Println("Knn err %s", err)
		return
	}
	defer searchResponse.Body.Close()

	if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err %s", err)
		return
	}

	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f, Field1 %d \n", hit.ID, hit.Score, hit.Source.Field1)
	}
}

如果使用ivfpq算法:不使用属性过滤的场景,ext.lvector扩展参数可以设置为:

 "ext": {"lvector": {
    "filter_type": "efficient_filter",
    "hybrid_search_type": "filter_rrf", 
    "rrf_rank_constant": "60",
    "rrf_knn_weight_factor": "0.5",
    "nprobe": "80", 
    "reorder_factor": "2",
    "client_refactor":"true"
  }}

使用自研_msearch_rrf接口

func getDenseVectorQueryWithFilter(indexName string, vectorField string, denseVector string, topK int) (bytes.Buffer, error) {
	var buf bytes.Buffer
	index := fmt.Sprintf(`{"index": "%s"}`, indexName)
	query := map[string]interface{}{
		"size":    topK,
		"_source": []interface{}{"field1", "field2"},
		"query": map[string]interface{}{
			"knn": map[string]interface{}{
				vectorField: map[string]interface{}{
					"vector": denseVector,
					"filter": map[string]interface{}{
						"bool": map[string]interface{}{
							"filter": []interface{}{
								map[string]interface{}{
									"range": map[string]interface{}{
										"field1": map[string]interface{}{
											"gt": 2,
										},
									},
								},
								map[string]interface{}{
									"term": map[string]interface{}{
										"field2": "flag2",
									},
								},
							},
						},
					},
					"k": topK,
				},
			},
		},
		"ext": map[string]interface{}{
			"lvector": map[string]interface{}{
				"filter_type":    "efficient_filter",
				"ef_search":      "100",
				"k_expand_scope": "2000",
			},
		},
	}
	knnBytes, err := json.Marshal(query)
	if err != nil {
		return buf, err
	}
	buf.WriteString(index + "\n")
	buf.Write(knnBytes)
	buf.WriteString("\n")
	return buf, nil
}

func getTextQueryWithFilter(indexName string, textField string, queryText string, topK int) (bytes.Buffer, error) {
	var buf bytes.Buffer
	index := fmt.Sprintf(`{"index": "%s"}`, indexName)
	query := map[string]interface{}{
		"size":    topK,
		"_source": []string{"field1", "field2"},
		"query": map[string]interface{}{
			"bool": map[string]interface{}{
				"must": []interface{}{
					map[string]interface{}{
						"match": map[string]interface{}{
							textField: queryText,
						},
					},
				},
				"filter": map[string]interface{}{
					"bool": map[string]interface{}{
						"filter": []interface{}{
							map[string]interface{}{
								"range": map[string]interface{}{
									"field1": map[string]interface{}{
										"gt": 2,
									},
								},
							},
							map[string]interface{}{
								"term": map[string]interface{}{
									"field2": "flag2",
								},
							},
						},
					},
				},
			},
		},
	}
	knnBytes, err := json.Marshal(query)
	if err != nil {
		return buf, err
	}
	buf.WriteString(index + "\n")
	buf.Write(knnBytes)
	buf.WriteString("\n")
	return buf, nil
}

func hybridSearchNoFilterTypeMSearchRRFWithFilter() {
	indexName := "vector_text_hybridSearch"
	vectorField := "vector1"
	textField := "text_field"
	buf := new(bytes.Buffer)
	query, _ := getDenseVectorQueryWithFilter(indexName, vectorField, "[2.8, 2.3, 2.4]", 10)
	query2, _ := getTextQueryWithFilter(indexName, textField, "test5 test6 test7 test8 test9", 10)
	buf.Write(query.Bytes())
	buf.Write(query2.Bytes())

	url := "/_msearch_rrf?re_score=true&rrf_rank_constant=1"
	mSearchRRFRequest, err := http.NewRequest("POST", url, buf)
	if err != nil {
		fmt.Println("Build err ", err)
		return
	}
	mSearchRRFRequest.Header.Set("Accept", "application/json")
	mSearchRRFRequest.Header.Set("Content-Type", "application/json")

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	res, err := client.Transport.Perform(mSearchRRFRequest.WithContext(ctx))
	if err != nil {
		fmt.Println("Err ", err)
		return
	}
	defer res.Body.Close()

	response := esapi.Response{
		StatusCode: res.StatusCode,
		Body:       res.Body,
		Header:     res.Header,
	}

	var responseBody KnnResponse
	if err := json.NewDecoder(response.Body).Decode(&responseBody); err != nil {
		fmt.Printf("Knn result Decode err: %s\n", err)
		return
	}

	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
	}
}
说明

连接参数中必须添加re_score=true

参数说明

参数

是否必填

默认值

说明

filter_type

查询使用的模式。支持的取值:pre_filter、post_filterefficient_filter。

参数详细说明,请参见参数说明

重要

包含标量字段过滤的场景下该参数必填,无标量字段过滤的场景该参数不必填写。

hybrid_search_type

设置为filter_rrf表示进行RRF融合检索。

说明

使用自研_msearch_rrf接口时忽略该参数。

rrf_rank_constant

60

表示RRF计算公式里计算得分的加权系数。计算公式为:1/(rrf_rank_constant + rank(i))

rrf_window_size

topK

表示全文检索需要返回的中间结果数,默认和knn检索的topK保持一致。

说明

使用自研_msearch_rrf接口时忽略该参数。

rrf_knn_weight_factor

重要

3.9.3及以上版本的搜索引擎支持该参数。

0.5

取值范围为(0, 1)0.01代表纯全文检索,0.99代表纯向量检索。

说明

使用自研_msearch_rrf接口时忽略该参数。