全文向量混合检索结合了全文检索和纯向量检索,相较于单独使用全文检索或向量检索,可以更好地获取全文与向量的融合信息。本文介绍基于Go语言,使用Lindorm向量引擎执行全文向量混合检索的方法。
前提条件
准备工作
安装ElasticSearch Go客户端
您需要安装ElasticSearch Go客户端,支持以下两种方式:
方式一:
修改
go.mod
文件配置,添加相关依赖,具体如下:module EsGoClient go 1.23.0 //请替换为您的Go版本 require ( github.com/elastic/go-elasticsearch/v7 v7.10.0 github.com/google/uuid v1.6.0 // 非必须,示例代码中生成uuid需要该项,请根据实际情况配置 )
执行以下命令,更新
go.mod
文件。go mod tidy
方式二:执行以下命令直接下载。
go get github.com/elastic/go-elasticsearch/v7@v7.10.0
连接搜索引擎
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"log"
"net/http"
"strings"
"time"
)
var client *elasticsearch.Client
func init() {
cfg := elasticsearch.Config{
Addresses: []string{
"http://ld-t4n5668xk31ui****-proxy-search-pub.lindorm.aliyuncs.com:30070",
},
Username: "<Username>",
Password: "<Password>",
Transport: &http.Transport{
MaxIdleConnsPerHost: 20,
},
}
var err error
client, err = elasticsearch.NewClient(cfg)
if err != nil {
fmt.Println("Init client error %s", err)
}
}
func handlerCommonError(res *esapi.Response) {
var errorResponse map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&errorResponse); err != nil {
fmt.Printf("Error parsing the error response: %s\n", err)
return
}
fmt.Printf("ERROR %v\n", errorResponse)
}
type Source struct {
Field1 int `json:"field1"`
Field2 string `json:"field2"`
Brand string `json:"brand"`
Merit string `json:"merit"`
Tag []string `json:"tag"`
}
type KnnResponse struct {
Hits struct {
Hits []struct {
ID string `json:"_id"`
Score float64 `json:"_score"`
Source Source `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}
其中Addresses、Username和Password分别为搜索引擎的连接地址、默认用户名和默认密码,如何获取,请参见查看连接地址。
全文+向量双路召回(RRF融合检索)
在一些查询场景中,您需要综合考虑全文索引和向量索引的排序,根据一定的打分规则对各自返回的结果进一步进行加权计算,并得到最终的排名。
创建索引
以下示例使用hsnw算法。
func createTextVectorHybridSearchIndex() {
indexName := "vector_text_hybridSearch"
vectorField := "vector1"
textField := "text_field"
/**
下方中 使用 hnsw 将 "meta": {"offline.construction": "false"} 代表在线索引,
如果是ivfpq, "meta": {"offline.construction": "true"} 写入一定量数据后再发起索引构建
*/
indexBody := map[string]interface{}{
"settings": map[string]interface{}{
"index": map[string]interface{}{
"number_of_shards": 2,
"knn": true,
},
},
"mappings": map[string]interface{}{
"_source": map[string]interface{}{
"excludes": []string{vectorField},
},
"properties": map[string]interface{}{
vectorField: map[string]interface{}{
"type": "knn_vector",
"dimension": 3,
"data_type": "float",
"meta": map[string]interface{}{
"offline.construction": "false",
},
"method": map[string]interface{}{
"engine": "lvector",
"name": "hnsw",
"space_type": "l2",
"parameters": map[string]interface{}{
"m": 24,
"ef_construction": 500,
},
},
},
textField: map[string]interface{}{
"type": "text",
"analyzer": "ik_max_word",
},
"field1": map[string]interface{}{
"type": "long",
},
"field2": map[string]interface{}{
"type": "keyword",
},
},
},
}
body, err := json.Marshal(indexBody)
if err != nil {
log.Fatalf("Error marshaling index body: %s", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
req := esapi.IndicesCreateRequest{
Index: indexName,
Body: bytes.NewReader(body),
}
res, err := req.Do(ctx, client)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
log.Printf("Error creating index: %s", res.String())
} else {
fmt.Println("Index created successfully")
}
}
数据写入
func bulkWriteTextVectorHybridIndex() {
indexName := "vector_text_hybridSearch"
var buf bytes.Buffer
vectorField := "vector1"
textField := "text_field"
type Data struct {
Field1 int
Field2 string
Vector1 []float64
TextField string
}
data := []Data{
{Field1: 1, Field2: "flag1", Vector1: []float64{2.5, 2.3, 2.4}, TextField: "hello test5"},
{Field1: 2, Field2: "flag1", Vector1: []float64{2.6, 2.3, 2.4}, TextField: "hello test6 test5"},
{Field1: 3, Field2: "flag1", Vector1: []float64{2.7, 2.3, 2.4}, TextField: "hello test7"},
{Field1: 4, Field2: "flag2", Vector1: []float64{2.8, 2.3, 2.4}, TextField: "hello test8 test7"},
{Field1: 5, Field2: "flag2", Vector1: []float64{2.9, 2.3, 2.4}, TextField: "hello test9"},
}
for i, item := range data {
// id 可以根据业务自己指定
meta := []byte(fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d" } }`, indexName, i+1))
data := map[string]interface{}{
"field1": item.Field1,
"field2": item.Field2,
vectorField: item.Vector1,
textField: item.TextField,
}
dataJson, err := json.Marshal(data)
if err != nil {
fmt.Println("Error marshaling JSON:", err)
continue
}
buf.Grow(len(meta) + len(dataJson) + 2)
buf.Write(meta)
buf.WriteByte('\n')
buf.Write(dataJson)
buf.WriteByte('\n')
}
bulkReq := esapi.BulkRequest{
Body: &buf,
Index: indexName,
}
// 设置超时
ctx, cancel := context.WithTimeout(context.Background(), 30*2*time.Second)
defer cancel()
res, err := bulkReq.Do(ctx, client)
defer res.Body.Close()
if err != nil {
fmt.Println("happend err, %s", err)
}
if res.IsError() {
handlerCommonError(res)
}
}
数据查询(融合查询)
RRF计算方式如下:
进行查询时系统会根据传入的rrf_rank_constant参数,对全文检索和向量检索分别获得的topK结果进行处理。对于每个返回的文档_id,使用公式1/(rrf_rank_constant + rank(i))
计算得分,其中rank(i)表示该文档在结果中的排名。
如果某个文档_id同时出现在全文检索和向量检索的topK结果中,其最终得分为两种检索方法计算得分之和。而仅出现在其中一种检索结果中的文档,则只保留该检索方法的得分。
以rrf_rank_constant = 1
为例,计算结果如下:
# doc | queryA | queryB | score
_id: 1 = 1.0/(1+1) + 0 = 0.5
_id: 2 = 1.0/(1+2) + 0 = 0.33
_id: 4 = 0 + 1.0/(1+2) = 0.33
_id: 5 = 0 + 1.0/(1+1) = 0.5
支持通过_search接口或_msearch_rrf接口进行融合查询,两种接口的对比如下:
接口 | 开源性 | 易读性 | 是否支持全文、向量检索比例调整 |
_search | 兼容 | 不易读 | 支持 |
_msearch_rrf | 自研接口 | 易读 | 不支持 |
以下是两种场景下使用_search接口或_msearch_rrf接口的具体写法:
无标量字段过滤的场景
使用开源_search接口
优点:兼容开源_search接口,支持通过rrf_knn_weight_factor参数调整全文检索与纯向量检索的比例。
缺点:写法较为复杂。
在ext.lvector扩展字段中,不设置filter_type,则表示该RRF检索只包含全文检索和纯向量检索,同时向量检索中无需进行标量字段的过滤。
func hybridSearchNoFilterType() {
indexName := "vector_text_hybridSearch"
vectorField := "vector1"
textField := "text_field"
vector := []float64{2.8, 2.3, 2.4} // Example vector
filterText := "test5 test6 test7 test8 test9"
query := map[string]interface{}{
"size": 10,
"_source": true,
"query": map[string]interface{}{
"knn": map[string]interface{}{
vectorField: map[string]interface{}{
"vector": vector,
"filter": map[string]interface{}{
"match": map[string]interface{}{
textField: filterText,
},
},
"k": 10,
},
},
},
"ext": map[string]interface{}{
"lvector": map[string]interface{}{
"hybrid_search_type": "filter_rrf",
"rrf_rank_constant": "1",
"rrf_knn_weight_factor": "0.5",
"ef_search": "100",
},
},
}
// Convert the map to JSON
body, err := json.Marshal(query)
if err != nil {
fmt.Printf("Error marshaling query to JSON: %s\n", err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
search := esapi.SearchRequest{
Index: []string{indexName},
Body: bytes.NewReader(body),
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Printf("Knn err: %s\n", err)
return
}
defer searchResponse.Body.Close()
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Printf("Knn result Decode err: %s\n", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
}
}
如果使用ivfpq算法:不使用属性过滤的场景,ext.lvector扩展参数可以设置为:
"ext": {"lvector": {
"hybrid_search_type": "filter_rrf",
"rrf_rank_constant": "60",
"rrf_knn_weight_factor": "0.5",
"nprobe": "80",
"reorder_factor": "2",
"client_refactor":"true"
}}
您可以适当增加nprobe的值,例如设置为80、100、120、140或160。nprobe参数对性能的损耗远比reorder_factor参数小,但也不宜将nprobe的值设置得过大。
如果查询语句中参数k的值设置得较大,例如大于等于
100
,建议将reorder_factor设置为1
或者2
。
使用自研_msearch_rrf接口
优点:写法较清晰。
缺点:不兼容开源_search接口,不支持调整全文检索与纯向量检索的比例。
func getDenseVectorQuery(indexName string, vectorField string, denseVector string, topK int) (bytes.Buffer, error) {
var buf bytes.Buffer
index := fmt.Sprintf(`{"index": "%s"}`, indexName)
query := map[string]interface{}{
"size": topK,
"_source": []string{
"field1",
"field2",
},
"query": map[string]interface{}{
"knn": map[string]interface{}{
vectorField: map[string]interface{}{
"vector": denseVector,
"k": topK,
},
},
},
"ext": map[string]interface{}{
"lvector": map[string]interface{}{
"ef_search": "100",
},
},
}
knnBytes, err := json.Marshal(query)
if err != nil {
return buf, err
}
buf.WriteString(index + "\n")
buf.Write(knnBytes)
buf.WriteString("\n")
return buf, nil
}
func getTextQuery(indexName string, textField string, queryText string, topK int) (bytes.Buffer, error) {
var buf bytes.Buffer
index := fmt.Sprintf(`{"index": "%s"}`, indexName)
query := map[string]interface{}{
"size": topK,
"_source": []string{
"field1",
"field2",
},
"query": map[string]interface{}{
"match": map[string]interface{}{
textField: queryText,
},
},
}
knnBytes, err := json.Marshal(query)
if err != nil {
return buf, err
}
buf.WriteString(index + "\n")
buf.Write(knnBytes)
buf.WriteString("\n")
return buf, nil
}
func hybridSearchNoFilterTypeMSearchRRF() {
indexName := "vector_text_hybridSearch"
vectorField := "vector1"
textField := "text_field"
buf := new(bytes.Buffer)
query, _ := getDenseVectorQuery(indexName, vectorField, "[2.8, 2.3, 2.4]", 10)
query2, _ := getTextQuery(indexName, textField, "test5 test6 test7 test8 test9", 10)
buf.Write(query.Bytes())
buf.Write(query2.Bytes())
url := "/_msearch_rrf?re_score=true&rrf_rank_constant=1"
mSearchRRFRequest, err := http.NewRequest("POST", url, buf)
if err != nil {
fmt.Println("Build err ", err)
return
}
mSearchRRFRequest.Header.Set("Accept", "application/json")
mSearchRRFRequest.Header.Set("Content-Type", "application/json")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
res, err := client.Transport.Perform(mSearchRRFRequest.WithContext(ctx))
if err != nil {
fmt.Println("Err ", err)
return
}
defer res.Body.Close()
response := esapi.Response{
StatusCode: res.StatusCode,
Body: res.Body,
Header: res.Header,
}
var responseBody KnnResponse
if err := json.NewDecoder(response.Body).Decode(&responseBody); err != nil {
fmt.Printf("Knn result Decode err: %s\n", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
}
}
连接参数中必须添加re_score=true
。
包含标量字段过滤场景
使用开源_search接口
在ext.lvector扩展字段中设置filter_type参数,则表示该RRF检索中的向量检索还需进行标量字段的过滤。
RRF融合检索时,如果希望携带filter过滤条件,需要将全文检索的query条件和用于过滤的filter条件分别设置到两个bool表达式中,通过bool.must进行连接。must中的第一个bool表达式将用于全文检索,计算全文匹配度得分。must中第二个bool filter表达式将用于knn检索的过滤条件。
func hybridSearchWithFilterType() {
indexName := "vector_text_hybridSearch"
vectorField := "vector1"
textField := "text_field"
size := 10
// "_source": 指定为true 返回所有字段;指定列表返回指定字段;指定false 只返回_id与_score
knnQuery := map[string]interface{}{
"size": 10,
"_source": []string{
"field1",
"field2",
},
"query": map[string]interface{}{
"knn": map[string]interface{}{
vectorField: map[string]interface{}{
"vector": []float64{2.8, 2.3, 2.4},
"filter": map[string]interface{}{
"bool": map[string]interface{}{
"must": []interface{}{
map[string]interface{}{
"bool": map[string]interface{}{
"must": []interface{}{
map[string]interface{}{
"match": map[string]interface{}{
textField: map[string]interface{}{
"query": "test5 test6 test7 test8 test9",
},
},
},
},
},
},
map[string]interface{}{
"bool": map[string]interface{}{
"filter": []interface{}{
map[string]interface{}{
"range": map[string]interface{}{
"field1": map[string]interface{}{
"gt": 2,
},
},
},
map[string]interface{}{
"term": map[string]interface{}{
"field2": "flag2",
},
},
},
},
},
},
},
},
"k": size,
},
},
},
"ext": map[string]interface{}{
"lvector": map[string]interface{}{
"hybrid_search_type": "filter_rrf",
"filter_type": "efficient_filter",
"rrf_rank_constant": "1",
"rrf_knn_weight_factor": "0.5",
"ef_search": "100",
"k_expand_scope": "1000",
},
},
}
content, err := json.Marshal(knnQuery)
if err != nil {
fmt.Printf("Error marshalling JSON: %s\n", err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
search := esapi.SearchRequest{
Index: []string{indexName},
Body: bytes.NewReader(content),
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err %s", err)
return
}
defer searchResponse.Body.Close()
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err %s", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f, Field1 %d \n", hit.ID, hit.Score, hit.Source.Field1)
}
}
如果使用ivfpq算法:不使用属性过滤的场景,ext.lvector扩展参数可以设置为:
"ext": {"lvector": {
"filter_type": "efficient_filter",
"hybrid_search_type": "filter_rrf",
"rrf_rank_constant": "60",
"rrf_knn_weight_factor": "0.5",
"nprobe": "80",
"reorder_factor": "2",
"client_refactor":"true"
}}
使用自研_msearch_rrf接口
func getDenseVectorQueryWithFilter(indexName string, vectorField string, denseVector string, topK int) (bytes.Buffer, error) {
var buf bytes.Buffer
index := fmt.Sprintf(`{"index": "%s"}`, indexName)
query := map[string]interface{}{
"size": topK,
"_source": []interface{}{"field1", "field2"},
"query": map[string]interface{}{
"knn": map[string]interface{}{
vectorField: map[string]interface{}{
"vector": denseVector,
"filter": map[string]interface{}{
"bool": map[string]interface{}{
"filter": []interface{}{
map[string]interface{}{
"range": map[string]interface{}{
"field1": map[string]interface{}{
"gt": 2,
},
},
},
map[string]interface{}{
"term": map[string]interface{}{
"field2": "flag2",
},
},
},
},
},
"k": topK,
},
},
},
"ext": map[string]interface{}{
"lvector": map[string]interface{}{
"filter_type": "efficient_filter",
"ef_search": "100",
"k_expand_scope": "2000",
},
},
}
knnBytes, err := json.Marshal(query)
if err != nil {
return buf, err
}
buf.WriteString(index + "\n")
buf.Write(knnBytes)
buf.WriteString("\n")
return buf, nil
}
func getTextQueryWithFilter(indexName string, textField string, queryText string, topK int) (bytes.Buffer, error) {
var buf bytes.Buffer
index := fmt.Sprintf(`{"index": "%s"}`, indexName)
query := map[string]interface{}{
"size": topK,
"_source": []string{"field1", "field2"},
"query": map[string]interface{}{
"bool": map[string]interface{}{
"must": []interface{}{
map[string]interface{}{
"match": map[string]interface{}{
textField: queryText,
},
},
},
"filter": map[string]interface{}{
"bool": map[string]interface{}{
"filter": []interface{}{
map[string]interface{}{
"range": map[string]interface{}{
"field1": map[string]interface{}{
"gt": 2,
},
},
},
map[string]interface{}{
"term": map[string]interface{}{
"field2": "flag2",
},
},
},
},
},
},
},
}
knnBytes, err := json.Marshal(query)
if err != nil {
return buf, err
}
buf.WriteString(index + "\n")
buf.Write(knnBytes)
buf.WriteString("\n")
return buf, nil
}
func hybridSearchNoFilterTypeMSearchRRFWithFilter() {
indexName := "vector_text_hybridSearch"
vectorField := "vector1"
textField := "text_field"
buf := new(bytes.Buffer)
query, _ := getDenseVectorQueryWithFilter(indexName, vectorField, "[2.8, 2.3, 2.4]", 10)
query2, _ := getTextQueryWithFilter(indexName, textField, "test5 test6 test7 test8 test9", 10)
buf.Write(query.Bytes())
buf.Write(query2.Bytes())
url := "/_msearch_rrf?re_score=true&rrf_rank_constant=1"
mSearchRRFRequest, err := http.NewRequest("POST", url, buf)
if err != nil {
fmt.Println("Build err ", err)
return
}
mSearchRRFRequest.Header.Set("Accept", "application/json")
mSearchRRFRequest.Header.Set("Content-Type", "application/json")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
res, err := client.Transport.Perform(mSearchRRFRequest.WithContext(ctx))
if err != nil {
fmt.Println("Err ", err)
return
}
defer res.Body.Close()
response := esapi.Response{
StatusCode: res.StatusCode,
Body: res.Body,
Header: res.Header,
}
var responseBody KnnResponse
if err := json.NewDecoder(response.Body).Decode(&responseBody); err != nil {
fmt.Printf("Knn result Decode err: %s\n", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
}
}
连接参数中必须添加re_score=true
。
参数说明
参数 | 是否必填 | 默认值 | 说明 |
filter_type | 否 | 无 | 查询使用的模式。支持的取值:pre_filter、post_filter和efficient_filter。 参数详细说明,请参见参数说明。 重要 包含标量字段过滤的场景下该参数必填,无标量字段过滤的场景该参数不必填写。 |
hybrid_search_type | 是 | 无 | 设置为 说明 使用自研_msearch_rrf接口时忽略该参数。 |
rrf_rank_constant | 否 | 60 | 表示RRF计算公式里计算得分的加权系数。计算公式为: |
rrf_window_size | 否 | topK | 表示全文检索需要返回的中间结果数,默认和knn检索的topK保持一致。 说明 使用自研_msearch_rrf接口时忽略该参数。 |
rrf_knn_weight_factor 重要 仅3.9.3及以上版本的搜索引擎支持该参数。 | 否 | 0.5 | 取值范围为 说明 使用自研_msearch_rrf接口时忽略该参数。 |