实战连载基于向量数据库的长期记忆的AI2_Milvus__GoLang_SDK_数据

实战连载基于向量数据库的长期记忆的AI2_Milvus__GoLang_SDK_数据

magic
2024-09-12 / 0 评论 / 12 阅读 / 正在检测是否收录...

上一节,我们已经把Milvus 向量数据库搭建完成,由于milvus\_cli 工具2年没更新,不是太完善,我们需要使用SDK 来操作Milvus数据库, 因为我们服务都是基于Golang 开发,这里我用Golang来测试Milvus 数据库的插入与查询以保证,代码对数据库的基本操作没问题

我们根据官方文档,自己写一个结构体 MilvusClient , 下面是完整代码

package main

import (
    "context"
    "fmt"
    "github.com/milvus-io/milvus-sdk-go/v2/client"
    "github.com/milvus-io/milvus-sdk-go/v2/entity"
    "log"
    "math/rand"
    "time"
)

// 定义 Milvus 客户端结构体
type MilvusClient struct {
    milvusClient client.Client
}

func main() {
    fmt.Println("进入main函数")

    MilvusClient, err := NewMilvusClient()
    if err != nil {
        fmt.Println("GetMilvusDb", err.Error())
        return
    }

    //MilvusClient.CreateMilvusDb()
    //MilvusClient.InsertData()
    MilvusClient.SearchResultData()
    //MilvusClient.CreateIndex()
    MilvusClient.DescribeIndex()
}

// 向量场创建一个索引
func (M *MilvusClient) CreateIndex() {

    var milvusClient = M.milvusClient
    // 创建 IndexIvfFlat 索引对象
    idx, err := entity.NewIndexIvfFlat(
        entity.L2,
        1024,
    )
    if err != nil {
        log.Fatal("fail to create ivf flat index parameter:", err.Error())
    }
    // 在 Milvus 中创建索引
    err = milvusClient.CreateIndex(
        context.Background(),
        "book",
        "book_intro",
        idx,
        false,
    )
    if err != nil {
        log.Fatal("fail to create index:", err.Error())
    }

}

// 此方法获取索引的详细信息
func (M *MilvusClient) DescribeIndex() {
    var milvusClient = M.milvusClient

    // 获取索引的详细信息
    indexInfo, err := milvusClient.DescribeIndex(
        context.Background(),
        "book",       // 指定集合名称
        "book_intro", // 指定向量字段名称
    )
    if err != nil {
        log.Fatal("fail to describe index:", err.Error())
    }
    log.Println(indexInfo)
}

// 集合加载到内存中
func (M *MilvusClient) ReleaseCollectionData() {
    var milvusClient = M.milvusClient
    // 释放集合的内存
    err := milvusClient.ReleaseCollection(
        context.Background(),
        "book",
    )
    if err != nil {
        fmt.Println("failed to release collection:", err.Error())
    }
}

// 集合加载到内存中
func (M *MilvusClient) LoadCollectionData() {

    var milvusClient = M.milvusClient

    err := milvusClient.LoadCollection(
        context.Background(),
        "book",
        false,
    )
    if err != nil {
        fmt.Println("failed to load collection:", err.Error())
    }
    fmt.Println("loading ok")
}

// 搜索数据
func (M *MilvusClient) SearchResultData() {
    var err error
    var milvusClient = M.milvusClient

    M.LoadCollectionData()

    defer M.ReleaseCollectionData()

    sp, _ := entity.NewIndexIvfFlatSearchParam( // NewIndex*SearchParam func
        2, // searchParam
    )

    searchResult, err := milvusClient.Search(
        context.Background(), // ctx
        "book",               // CollectionName
        []string{},           // partitionNames
        "",                   // expr
        []string{"book_id"},  // outputFields
        []entity.Vector{entity.FloatVector([]float32{0.1, 0.2})}, // vectors
        "book_intro", // vectorField
        entity.L2,    // metricType
        2,            // topK
        sp,           // sp
    )
    if err != nil {
        fmt.Println("failed search data:", err.Error())
    }
    fmt.Printf("%#v\n", searchResult)
    for _, sr := range searchResult {
        fmt.Println("id", sr.IDs)
        fmt.Println("Scores", sr.Scores)
    }
}

// 插入数据
func (M *MilvusClient) InsertData() {
    var err error
    var milvusClient = M.milvusClient
    bookIDs := make([]int64, 0, 2000)
    wordCounts := make([]int64, 0, 2000)
    bookIntros := make([][]float32, 0, 2000)
    for i := 0; i < 2000; i++ {
        bookIDs = append(bookIDs, int64(i))
        wordCounts = append(wordCounts, int64(i+10000))
        v := make([]float32, 0, 2)
        for j := 0; j < 2; j++ {
            v = append(v, rand.Float32())
        }
        bookIntros = append(bookIntros, v)
    }
    idColumn := entity.NewColumnInt64("book_id", bookIDs)
    wordColumn := entity.NewColumnInt64("word_count", wordCounts)
    introColumn := entity.NewColumnFloatVector("book_intro", 2, bookIntros)

    _, err = milvusClient.Insert(
        context.Background(), // ctx
        "book",               // CollectionName
        "",                   // partitionName
        idColumn,             // columnarData
        wordColumn,           // columnarData
        introColumn,          // columnarData
    )
    if err != nil {
        fmt.Println("failed to insert data:", err.Error())
        return
    }
    //fmt.Println("xxx", idColumn)
}
func (M *MilvusClient) CreateMilvusDb() {
    var err error
    var milvusClient = M.milvusClient
    var (
        collectionName = "test"
    )
    schema := &entity.Schema{
        CollectionName: collectionName,
        Description:    "Test book search",
        Fields: []*entity.Field{
            {
                Name:       "book_id",
                DataType:   entity.FieldTypeInt64,
                PrimaryKey: true,
                AutoID:     false,
            },
            {
                Name:       "word_count",
                DataType:   entity.FieldTypeInt64,
                PrimaryKey: false,
                AutoID:     false,
            },
            {
                Name:     "book_intro",
                DataType: entity.FieldTypeFloatVector,
                TypeParams: map[string]string{
                    "dim": "2",
                },
            },
        },
        EnableDynamicField: true,
    }

    err = milvusClient.CreateCollection(
        context.Background(), // ctx
        schema,
        2, // shardNum
    )
    if err != nil {
        fmt.Println("创建数据库失败:", err)
        return
    }

    defer milvusClient.Close()
}

// Milvus 客户端对象,用于连接 Milvus 服务器
func NewMilvusClient() (m *MilvusClient, err error) {
    // Milvus 服务器的地址和端口
    host := "127.0.0.1"
    port := "19530"

    // 设置超时时间为10秒
    ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
    defer cancel()

    // 创建一个 Milvus 客户端
    milvusClient, err := client.NewClient(ctx, client.Config{
        Address: fmt.Sprintf("http://%s:%s", host, port),
    })
    fmt.Println("sss")
    if err != nil {
        fmt.Println("sss", err.Error())
        fmt.Errorf("创建 Milvus 客户端失败: %v", err)
        return
    }

    // 使用 defer 语句来关闭 `milvusClient` 对象
    //defer milvusClient.Close()

    // 列出所有数据库
    dbs, err := milvusClient.ListDatabases(ctx)
    if err != nil {
        fmt.Errorf("获取数据库列表失败: %v", err)
        return
    }

    // 输出数据库列表
    fmt.Println("数据库列表:")
    for _, db := range dbs {
        fmt.Println("-", db)
    }
    var s = &MilvusClient{
        milvusClient: milvusClient,
    }

    return s, nil
}

代码的结构比较简单,主要包含一个 Milvus 客户端结构体,以及该结构体的若干方法,每个方法都对应了 Milvus SDK 中的一个操作。

上面各个方法的功能和实现

  1. NewMilvusClient: 创建 Milvus 客户端对象,用于连接 Milvus 服务器。
  2. CreateIndex: 在 Milvus 中创建索引。
  3. DescribeIndex: 获取索引的详细信息。
  4. ReleaseCollectionData: 释放集合的内存。
  5. LoadCollectionData: 加载集合数据到内存。
  6. SearchResultData: 在 Milvus 中搜索数据。

我们先创建一个表book

MilvusClient, err := NewMilvusClient()
    if err != nil {
        fmt.Println("GetMilvusDb", err.Error())
        return
    }
// 创建表结构
MilvusClient.CreateMilvusDb()

执行代码,得到效果


执行测试数据插入

MilvusClient.InsertData()

这里插入2000条数据


查询数据

我们查询向量字段book\_intro


到这里,数据库的操作基本没有什么问题,这里需要注意的是,Milvus数据库查询前需要先把数据加载到内存,查询完,释放掉内存

加载到内存
M.LoadCollectionData()

查询执行完释放内存
defer M.ReleaseCollectionData()

Golang操作Milvus基本没什么问题,下一步就是设计数据库,把向量数据写入进去。

0

评论 (0)

取消