godocdb/index/index.go

316 lines
8.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package index
import (
"bytes"
"fmt"
"strings"
"sync"
"git.pyer.club/kingecg/godocdb/storage"
"go.mongodb.org/mongo-driver/bson"
)
// IndexType 索引类型
type IndexType int
const (
NonUnique IndexType = iota
Unique
)
// IndexSortOrder 表示索引的排序方式
type IndexSortOrder string
const (
Ascending IndexSortOrder = "asc"
Descending IndexSortOrder = "desc"
)
// IndexMetadata 索引元数据
type IndexMetadata struct {
Name string
Type IndexType
KeyFields []string
SortOrders []IndexSortOrder // 每个字段对应的排序方式长度应与KeyFields一致
Version int // 索引版本号,用于缓存一致性校验
}
// IndexStore 管理索引存储
type IndexStore struct {
storage *storage.LevelDBStorage
indexes map[string]map[string]string // 缓存索引数据 {indexName: {key: value}}
mu sync.RWMutex // 保护并发访问
}
// NewIndexStore 创建新的索引存储实例
func NewIndexStore(storeName string) (*IndexStore, error) {
// 确保目录存在
if !strings.HasSuffix(storeName, "_index") {
// add _index suffix to avoid name confilict with document store, do remove this
storeName += "_index"
}
storage, err := storage.GetInstance().GetStorage(storeName)
if err != nil {
return nil, fmt.Errorf("failed to create LevelDB storage: %v", err)
}
return &IndexStore{
storage: storage,
indexes: make(map[string]map[string]string),
}, nil
}
// CreateIndex 创建索引
func (is *IndexStore) CreateIndex(indexName string, indexType IndexType, keyFields []string, sortOrders []IndexSortOrder) error {
is.mu.Lock()
defer is.mu.Unlock()
// 验证keyFields和sortOrders长度一致
if len(keyFields) != len(sortOrders) {
return fmt.Errorf("keyFields and sortOrders must have the same length")
}
// 存储索引元数据
metadata := IndexMetadata{
Name: indexName,
Type: indexType,
KeyFields: keyFields,
SortOrders: sortOrders,
Version: 1, // 初始版本号
}
data, err := bson.Marshal(metadata)
if err != nil {
return fmt.Errorf("failed to marshal index metadata: %v", err)
}
key := []byte(fmt.Sprintf("indexes:metadata:%s", indexName))
if err := is.storage.Put(key, data); err != nil {
return err
}
// 初始化索引结构
index := make(map[string]string)
// 存储初始索引结构
indexKey := fmt.Sprintf("indexes:data:%s", indexName)
indexData, err := bson.Marshal(index)
if err != nil {
return fmt.Errorf("failed to marshal index data: %v", err)
}
return is.storage.Put([]byte(indexKey), indexData)
}
// UpdateIndex 在指定collection下更新索引
func (is *IndexStore) UpdateIndex(collection string, indexName string, fieldValue interface{}, documentID string) error {
is.mu.Lock()
defer is.mu.Unlock()
// 增加空值检查
if fieldValue == nil {
return fmt.Errorf("cannot update index with nil field value")
}
// 从存储加载当前索引
indexKey := []byte(fmt.Sprintf("indexes:data:%s", indexName))
storedData, err := is.storage.Get(indexKey)
if err != nil {
// 如果不存在则创建新索引
storedData, _ = bson.Marshal(make(map[string]string))
}
// 反序列化索引数据
var index map[string]string
if err := bson.Unmarshal(storedData, &index); err != nil {
return fmt.Errorf("failed to unmarshal index data: %v", err)
}
// 获取索引元数据
metadata, err := is.GetIndexMetadata(indexName)
if err != nil {
return fmt.Errorf("failed to get index metadata: %v", err)
}
// 更新索引
indexKeyField := fmt.Sprintf("%s:%v", collection, fieldValue)
if _, ok := index[indexKeyField]; ok {
// 处理重复值的逻辑(需要根据索引类型决定是否覆盖)
if metadata.Type == Unique {
return fmt.Errorf("duplicate key error for unique index")
}
}
index[indexKeyField] = documentID
// 序列化并存储更新后的索引
updatedData, err := bson.Marshal(index)
if err != nil {
return fmt.Errorf("failed to marshal updated index: %v", err)
}
return is.storage.Put(indexKey, updatedData)
}
// getOrCreateIndex 获取现有索引或创建新索引
func (is *IndexStore) getOrCreateIndex(indexName string) (map[string]string, error) {
// 检查内存缓存
if index, ok := is.indexes[indexName]; ok {
return index, nil
}
// 从存储中加载
indexKey := fmt.Sprintf("indexes:data:%s", indexName)
storedData, err := is.storage.Get([]byte(indexKey))
if err != nil {
// 如果不存在则创建空索引
return make(map[string]string), nil
}
// 反序列化索引数据
var loadedIndex map[string]string
if err := bson.Unmarshal(storedData, &loadedIndex); err != nil {
return nil, fmt.Errorf("failed to unmarshal index data: %v", err)
}
return loadedIndex, nil
}
// GetIndexedDocuments 查询特定collection下的文档
func (is *IndexStore) GetIndexedDocuments(collection string, indexName string, fieldValue interface{}) ([]string, error) {
is.mu.RLock()
defer is.mu.RUnlock()
// 构造前缀
prefix := fmt.Sprintf("%s:%v", collection, fieldValue)
// 直接从存储读取
indexKey := fmt.Sprintf("indexes:data:%s", indexName)
storedData, err := is.storage.Get([]byte(indexKey))
if err != nil {
return nil, fmt.Errorf("index not found: %v", err)
}
var loadedIndex map[string]string
if err := bson.Unmarshal(storedData, &loadedIndex); err != nil {
return nil, err
}
var results []string
// 遍历查找匹配项
for key, value := range loadedIndex {
if strings.HasPrefix(key, prefix) {
results = append(results, value)
}
}
return results, nil
}
// GetAllIndexMetadata 获取所有索引的元数据
func (is *IndexStore) GetAllIndexMetadata(collection string) ([]IndexMetadata, error) {
// 构建前缀键(注意这里移除了多余的冒号)
prefix := []byte(fmt.Sprintf("indexes:metadata:%s:", collection))
// 使用storage的Scan方法替代直接访问db字段
keys, _, err := is.storage.Scan(prefix)
if err != nil {
return nil, fmt.Errorf("failed to scan metadata: %v", err)
}
var metadataList []IndexMetadata
for _, key := range keys {
// 验证键是否匹配前缀
if !bytes.HasPrefix(key, prefix) {
continue
}
// 读取元数据值
value, err := is.storage.Get(key)
if err != nil {
continue
}
// 解析元数据
var md IndexMetadata
if err := bson.Unmarshal(value, &md); err != nil {
continue
}
metadataList = append(metadataList, md)
}
return metadataList, nil
}
// DeleteIndex 删除索引条目
func (is *IndexStore) DeleteIndex(collection string, indexName string, fieldValue interface{}, documentID string) error {
is.mu.Lock()
defer is.mu.Unlock()
// 修复索引键的构造方式,使用统一格式
indexKey := []byte(fmt.Sprintf("indexes:data:%s:%s:%v", collection, indexName, fieldValue))
// 从LevelDB删除
return is.storage.Delete(indexKey)
}
// DropIndex 删除索引
func (is *IndexStore) DropIndex(indexName string) error {
is.mu.Lock()
defer is.mu.Unlock()
// 删除元数据
metadataKey := []byte(fmt.Sprintf("indexes:metadata:%s", indexName))
if err := is.storage.Delete(metadataKey); err != nil {
return err
}
// 删除内存中的缓存
delete(is.indexes, indexName)
// 删除索引数据
indexKey := []byte(fmt.Sprintf("indexes:data:%s", indexName))
return is.storage.Delete(indexKey)
}
// GetIndexMetadata 获取索引元数据
func (is *IndexStore) GetIndexMetadata(indexName string) (IndexMetadata, error) {
key := []byte(fmt.Sprintf("indexes:metadata:%s", indexName))
storedData, err := is.storage.Get(key)
if err != nil {
return IndexMetadata{}, fmt.Errorf("index not found: %v", err)
}
var metadata IndexMetadata
if err := bson.Unmarshal(storedData, &metadata); err != nil {
return IndexMetadata{}, fmt.Errorf("failed to unmarshal index metadata: %v", err)
}
return metadata, nil
}
// FlushIndexToDisk 将内存中的索引更新写入磁盘
func (is *IndexStore) FlushIndexToDisk(indexName string) error {
// 因为现在直接操作存储,不需要单独的刷新方法
// 现在Flush只是简单验证索引是否存在
is.mu.RLock()
defer is.mu.RUnlock()
if _, ok := is.indexes[indexName]; !ok {
return fmt.Errorf("index not found in memory: %s", indexName)
}
return nil
}
// ClearIndexCache 清除内存中的索引缓存
func (is *IndexStore) ClearIndexCache(indexName string) {
is.mu.Lock()
defer is.mu.Unlock()
delete(is.indexes, indexName)
}