package index import ( "fmt" "sync" "os" "strings" "bytes" "go.mongodb.org/mongo-driver/bson" "git.pyer.club/kingecg/godocdb/storage" ) // IndexType 索引类型 type IndexType int const ( NonUnique IndexType = iota Unique ) // IndexSortOrder 表示索引的排序方式 type IndexSortOrder string const ( Ascending IndexSortOrder = "asc" Descending IndexSortOrder = "desc" ) // IndexMetadata 索引元数据 type IndexMetadata struct { Name string Type IndexType KeyFields []string SortOrders []IndexSortOrder // 每个字段对应的排序方式(长度应与KeyFields一致) Version int // 索引版本号,用于缓存一致性校验 } // IndexStore 管理索引存储 type IndexStore struct { storage *storage.LevelDBStorage indexes map[string]map[string]string // 缓存索引数据 {indexName: {key: value}} mu sync.RWMutex // 保护并发访问 } // NewIndexStore 创建新的索引存储实例 func NewIndexStore(path string) (*IndexStore, error) { // 确保目录存在 if err := os.MkdirAll(path, 0755); err != nil { return nil, fmt.Errorf("failed to create storage directory: %v", err) } storage, err := storage.NewLevelDBStorage(path) if err != nil { return nil, fmt.Errorf("failed to create LevelDB storage: %v", err) } return &IndexStore{ storage: storage, indexes: make(map[string]map[string]string), }, nil } // CreateIndex 创建索引 func (is *IndexStore) CreateIndex(indexName string, indexType IndexType, keyFields []string, sortOrders []IndexSortOrder) error { is.mu.Lock() defer is.mu.Unlock() // 验证keyFields和sortOrders长度一致 if len(keyFields) != len(sortOrders) { return fmt.Errorf("keyFields and sortOrders must have the same length") } // 存储索引元数据 metadata := IndexMetadata{ Name: indexName, Type: indexType, KeyFields: keyFields, SortOrders: sortOrders, Version: 1, // 初始版本号 } data, err := bson.Marshal(metadata) if err != nil { return fmt.Errorf("failed to marshal index metadata: %v", err) } key := []byte(fmt.Sprintf("indexes:metadata:%s", indexName)) if err := is.storage.Put(key, data); err != nil { return err } // 初始化索引结构 index := make(map[string]string) // 存储初始索引结构 indexKey := fmt.Sprintf("indexes:data:%s", indexName) indexData, err := bson.Marshal(index) if err != nil { return fmt.Errorf("failed to marshal index data: %v", err) } return is.storage.Put([]byte(indexKey), indexData) } // UpdateIndex 在指定collection下更新索引 func (is *IndexStore) UpdateIndex(collection string, indexName string, fieldValue interface{}, documentID string) error { is.mu.Lock() defer is.mu.Unlock() // 增加空值检查 if fieldValue == nil { return fmt.Errorf("cannot update index with nil field value") } // 从存储加载当前索引 indexKey := []byte(fmt.Sprintf("indexes:data:%s", indexName)) storedData, err := is.storage.Get(indexKey) if err != nil { // 如果不存在则创建新索引 storedData, _ = bson.Marshal(make(map[string]string)) } // 反序列化索引数据 var index map[string]string if err := bson.Unmarshal(storedData, &index); err != nil { return fmt.Errorf("failed to unmarshal index data: %v", err) } // 获取索引元数据 metadata, err := is.GetIndexMetadata(indexName) if err != nil { return fmt.Errorf("failed to get index metadata: %v", err) } // 更新索引 indexKeyField := fmt.Sprintf("%s:%v", collection, fieldValue) if _, ok := index[indexKeyField]; ok { // 处理重复值的逻辑(需要根据索引类型决定是否覆盖) if metadata.Type == Unique { return fmt.Errorf("duplicate key error for unique index") } } index[indexKeyField] = documentID // 序列化并存储更新后的索引 updatedData, err := bson.Marshal(index) if err != nil { return fmt.Errorf("failed to marshal updated index: %v", err) } return is.storage.Put(indexKey, updatedData) } // getOrCreateIndex 获取现有索引或创建新索引 func (is *IndexStore) getOrCreateIndex(indexName string) (map[string]string, error) { // 检查内存缓存 if index, ok := is.indexes[indexName]; ok { return index, nil } // 从存储中加载 indexKey := fmt.Sprintf("indexes:data:%s", indexName) storedData, err := is.storage.Get([]byte(indexKey)) if err != nil { // 如果不存在则创建空索引 return make(map[string]string), nil } // 反序列化索引数据 var loadedIndex map[string]string if err := bson.Unmarshal(storedData, &loadedIndex); err != nil { return nil, fmt.Errorf("failed to unmarshal index data: %v", err) } return loadedIndex, nil } // GetIndexedDocuments 查询特定collection下的文档 func (is *IndexStore) GetIndexedDocuments(collection string, indexName string, fieldValue interface{}) ([]string, error) { is.mu.RLock() defer is.mu.RUnlock() // 构造前缀 prefix := fmt.Sprintf("%s:%v", collection, fieldValue) // 直接从存储读取 indexKey := fmt.Sprintf("indexes:data:%s", indexName) storedData, err := is.storage.Get([]byte(indexKey)) if err != nil { return nil, fmt.Errorf("index not found: %v", err) } var loadedIndex map[string]string if err := bson.Unmarshal(storedData, &loadedIndex); err != nil { return nil, err } var results []string // 遍历查找匹配项 for key, value := range loadedIndex { if strings.HasPrefix(key, prefix) { results = append(results, value) } } return results, nil } // GetAllIndexMetadata 获取所有索引的元数据 func (is *IndexStore) GetAllIndexMetadata(collection string) ([]IndexMetadata, error) { // 构建前缀键(注意这里移除了多余的冒号) prefix := []byte(fmt.Sprintf("indexes:metadata:%s:", collection)) // 使用storage的Scan方法替代直接访问db字段 keys, _, err := is.storage.Scan(prefix) if err != nil { return nil, fmt.Errorf("failed to scan metadata: %v", err) } var metadataList []IndexMetadata for _, key := range keys { // 验证键是否匹配前缀 if !bytes.HasPrefix(key, prefix) { continue } // 读取元数据值 value, err := is.storage.Get(key) if err != nil { continue } // 解析元数据 var md IndexMetadata if err := bson.Unmarshal(value, &md); err != nil { continue } metadataList = append(metadataList, md) } return metadataList, nil } // DeleteIndex 删除索引条目 func (is *IndexStore) DeleteIndex(collection string, indexName string, fieldValue interface{}, documentID string) error { is.mu.Lock() defer is.mu.Unlock() // 修复索引键的构造方式,使用统一格式 indexKey := []byte(fmt.Sprintf("indexes:data:%s:%s:%v", collection, indexName, fieldValue)) // 从LevelDB删除 return is.storage.Delete(indexKey) } // DropIndex 删除索引 func (is *IndexStore) DropIndex(indexName string) error { is.mu.Lock() defer is.mu.Unlock() // 删除元数据 metadataKey := []byte(fmt.Sprintf("indexes:metadata:%s", indexName)) if err := is.storage.Delete(metadataKey); err != nil { return err } // 删除内存中的缓存 delete(is.indexes, indexName) // 删除索引数据 indexKey := []byte(fmt.Sprintf("indexes:data:%s", indexName)) return is.storage.Delete(indexKey) } // GetIndexMetadata 获取索引元数据 func (is *IndexStore) GetIndexMetadata(indexName string) (IndexMetadata, error) { key := []byte(fmt.Sprintf("indexes:metadata:%s", indexName)) storedData, err := is.storage.Get(key) if err != nil { return IndexMetadata{}, fmt.Errorf("index not found: %v", err) } var metadata IndexMetadata if err := bson.Unmarshal(storedData, &metadata); err != nil { return IndexMetadata{}, fmt.Errorf("failed to unmarshal index metadata: %v", err) } return metadata, nil } // FlushIndexToDisk 将内存中的索引更新写入磁盘 func (is *IndexStore) FlushIndexToDisk(indexName string) error { // 因为现在直接操作存储,不需要单独的刷新方法 // 现在Flush只是简单验证索引是否存在 is.mu.RLock() defer is.mu.RUnlock() if _, ok := is.indexes[indexName]; !ok { return fmt.Errorf("index not found in memory: %s", indexName) } return nil } // ClearIndexCache 清除内存中的索引缓存 func (is *IndexStore) ClearIndexCache(indexName string) { is.mu.Lock() defer is.mu.Unlock() delete(is.indexes, indexName) }