Python开发手册

向量检索是面向非结构化向量数据的检索功能,可以帮助您快速查找相似数据。如果您更习惯使用Python语言进行应用开发,可以参考本文提供的操作指导,结合业务需求实现纯向量数据检索或混合检索。

前提条件

  • 实例的服务类型Lindorm_V2,且已开通向量引擎和搜索引擎。如何查看服务类型,请参见查看产品系列

  • 已安装Python环境,且Python3.9及以上版本。

  • Python中已安装2.5.0版本Opensearch-py包。

创建向量索引

使用向量检索功能,要求索引的mappings中必须包含一个或多个向量类型字段,且所有向量字段必须显式定义。

以下示例创建一个索引,其中vector1为向量类型字段、field1为普通类型字段。

index_body = {
  "settings": {
    "index": {
      "number_of_shards": 4,
      "knn": True
    }
  },
  "mappings": {
    "_source": {"excludes": ["vector1"]},
    "properties": {
      "vector1": {
        "type": "knn_vector",
        "dimension": 3,
        "data_type": "float",
        "method": {
          "engine": "lvector",
          "name": "hnsw",
          "space_type": "l2",
          "parameters": {
            "m": 24,
            "ef_construction": 128
          }
        }
      },
      "field1": {
        "type": "long"
      }
    }
  }
}
response = client.indices.create(index='vector_test', body=index_body)

向量列参数的详细介绍,请参见向量列参数说明

数据写入

向量索引的数据写入方式与普通索引的数据写入方式一致。向量字段的数据以数组的形式写入。

单条写入

doc = {
  "field1": 1,
  "vector1": [1.2, 1.3, 1.4]
}
response = client.index(index='vector_test', body=doc, id=1)

批量写入

operations = """
{ "index" : { "_index" : "vector_test", "_id" : "2" } }
{ "field1" : 1, "vector1": [2.2, 2.3, 2.4]}
{ "index" : { "_index" : "vector_test", "_id" : "3" } }
{ "field1" : 2, "vector1": [1.2, 1.3, 4.4]}
{ "delete" : { "_index" : "vector_test", "_id" : "2" } }
{ "update" : {"_id" : "1", "_index" : "vector_test"} }
{ "doc" : {"field1" : 3, "vector1": [2.2, 3.3, 4.4]} }
"""
response = client.bulk(operations)

数据查询

查询向量数据时需要在查询请求中加入knn结构,并通过ext结构提供相关查询参数。knn、ext结构细节及其参数说明,请参见参数说明

纯向量数据查询

只查询向量字段的数据,可以直接使用knn结构的基本形式。

例如,查询vector1字段中与向量[2.3, 3.3, 4.4]相关的前10条数据,并要求最小得分为0.8。

query = {
  "size": 10,
  "query": {
    "knn": {
      "vector1": {
        "vector": [2.3, 3.3, 4.4],
        "k": 10
      }
    }
  },
  "ext": {"lvector": {"min_score": "0.8"}}
}
response = client.search(index='vector_test', body=query)

该查询将返回一条id1的文档。

融合查询

向量数据的查询可与普通数据的查询条件结合,并返回综合的查询结果。

Pre-Filter近似查询

knn查询结构内添加filter结构,并指定filter_type参数为pre_filter,可实现先过滤普通数据,再查询向量数据。

query = {
  "size": 10,
  "query": {
    "knn": {
      "vector1": {
        "vector": [2.3, 3.3, 4.4],
        "filter": {
          "range": {
            "field1": {
              "gte": 0
            }
          }
        },
        "k": 10
      }
    }
  },
  "ext": {"lvector": {"filter_type": "pre_filter"}}
}
response = client.search(index='vector_test', body=query)

Post-Filter近似查询

knn查询结构内添加filter结构,并指定filter_type参数为post_filter,可实现先查询向量数据,再过滤普通数据。

query = {
  "size": 10,
  "query": {
    "knn": {
      "vector1": {
        "vector": [2.3, 3.3, 4.4],
        "filter": {
          "range": {
            "field1": {
              "gte": 0
            }
          }
        },
        "k": 10
      }
    }
  },
  "ext": {"lvector": {"filter_type": "post_filter"}}
}
response = client.search(index='vector_test', body=query)

您也可以通过Post Filter结构添加过滤条件,实现Post-Filter近似查询。

query = {
  "size": 10,
  "query": {
    "knn": {
      "vector1": {
        "vector": [2.3, 3.3, 4.4],
        "k": 10
      }
    }
  },
  "post_filter": {
    "range": {
      "field1": {
        "gte": 0
      }
    }
  }
}
response = client.search(index='vector_test', body=query)

删除向量索引

向量索引的删除方式与普通索引的删除方式一致。

response = client.indices.delete(index='vector_test')

完整示例

import json
import random

from opensearchpy import OpenSearch

# 请填写Lindorm搜索引擎的Elasticsearch兼容地址、用户名和密码
class LVectorDemo:
    def __init__(self):
        host = 'ld-bp106782jm960****-proxy-search-pub.lindorm.aliyuncs.com'
        port = 30070
        auth = ('username', 'password')
        self.client = OpenSearch(
            hosts=[{'host': host, 'port': port}],
            http_auth=auth,
            timeout=30
        )
        self.random = random.Random(0)

# 创建向量索引
    def create_index(self, name: str) -> None:
        index_body = {
            "settings": {
                "index": {
                    "number_of_shards": 4,
                    "knn": True
                }
            },
            "mappings": {
                "_source": {"excludes": ["vector1"]},
                "properties": {
                    "vector1": {
                        "type": "knn_vector",
                        "dimension": 5,
                        "data_type": "float",
                        "method": {
                            "engine": "lvector",
                            "name": "ivfpq",
                            "space_type": "l2",
                            "parameters": {
                                "nlist": 10,
                                "centroids_use_hnsw": True,
                                "centroids_hnsw_m": 32,
                                "centroids_hnsw_ef_construct": 200,
                                "centroids_hnsw_ef_search": 200
                            }
                        }
                    },
                    "field1": {
                        "type": "long"
                    }
                }
            }
        }
        response = self.client.indices.create(index=name, body=index_body)

 # 数据写入
    def write_docs(self, index_name: str) -> None:
        operations = []
        for i in range(0, 1000):
            id = self.random.randint(-2 ** 63, 2 ** 63 - 1)
            operations.append(json.dumps({"index": {"_index": index_name, "_id": id}}))
            operations.append("\n")
            vector1 = []
            for j in range(0, 5):
                vector1.append(self.random.random())
            operations.append(json.dumps({"field1": self.random.random(), "vector1": vector1}))
            operations.append("\n")
        response = self.client.bulk("".join(operations))

 # 纯向量数据查询
    def query_vector(self, index_name: str, vector: list[float]) -> None:
        query = {
            "query": {
                "knn": {
                    "vector1": {
                        "vector": vector,
                        "k": 10
                    }
                }
            },
            "ext": {
                "lvector": {
                    "min_score": "0.8",
                    "nprobe": "20",
                    "reorder_factor": "20"
                }
            }
        }
        response = self.client.search(index=index_name, body=query)
        print(response)

 # Pre-Filter近似查询
    def query_vector_with_pre_filter(self, index_name: str, vector: list[float]) -> None:
        query = {
            "query": {
                "knn": {
                    "vector1": {
                        "vector": vector,
                        "filter": {
                            "range": {
                                "field1": {
                                    "gte": 0
                                }
                            }
                        },
                        "k": 10
                    }
                }
            },
            "ext": {
                "lvector": {
                    "filter_type": "pre_filter",
                    "min_score": "0.1",
                    "nprobe": "20",
                    "reorder_factor": "20"
                }
            }
        }
        response = self.client.search(index=index_name, body=query)
        print(response)

 # Post-Filter近似查询
    def query_vector_with_post_filter_type1(self, index_name: str, vector: list[float]) -> None:
        query = {
            "query": {
                "knn": {
                    "vector1": {
                        "vector": vector,
                        "filter": {
                            "range": {
                                "field1": {
                                    "gte": 0
                                }
                            }
                        },
                        "k": 10
                    }
                }
            },
            "ext": {
                "lvector": {
                    "filter_type": "post_filter",
                    "min_score": "0.1",
                    "nprobe": "20",
                    "reorder_factor": "20"
                }
            }
        }
        response = self.client.search(index=index_name, body=query)
        print(response)

 # 在Post Filter结构中添加过滤条件
    def query_vector_with_post_filter_type2(self, index_name: str, vector: list[float]) -> None:
        query = {
            "query": {
                "knn": {
                    "vector1": {
                        "vector": vector,
                        "k": 10
                    }
                }
            },
            "post_filter": {
                "range": {
                    "field1": {
                        "gte": 0
                    }
                }
            },
            "ext": {
                "lvector": {
                    "filter_type": "post_filter",
                    "min_score": "0.1",
                    "nprobe": "20",
                    "reorder_factor": "20"
                }
            }
        }
        response = self.client.search(index=index_name, body=query)
        print(response)

 # 删除向量索引
    def delete_index(self, index_name: str) -> None:
        response = self.client.indices.delete(index=index_name)


if __name__ == "__main__":
    index_name = "vector_test"
    vector = [1.0, 1.0, 1.0, 1.0, 1.0]
    lvector_demo = LVectorDemo()
    lvector_demo.create_index(index_name)
    lvector_demo.write_docs(index_name)
    lvector_demo.query_vector(index_name, vector)
    lvector_demo.query_vector_with_pre_filter(index_name, vector)
    lvector_demo.query_vector_with_post_filter_type1(index_name, vector)
    lvector_demo.query_vector_with_post_filter_type2(index_name, vector)
    lvector_demo.delete_index(index_name)

Lindorm搜索引擎的Elasticsearch兼容地址、用户名和密码的获取方式,请参见查看连接信息