2025-06-07 18:18:03 +08:00
|
|
|
|
package index
|
|
|
|
|
|
|
|
|
|
import (
|
2025-06-07 23:02:49 +08:00
|
|
|
|
"bytes"
|
2025-06-07 18:18:03 +08:00
|
|
|
|
"fmt"
|
2025-06-07 22:28:29 +08:00
|
|
|
|
"os"
|
|
|
|
|
"strings"
|
2025-06-07 23:02:49 +08:00
|
|
|
|
"sync"
|
|
|
|
|
|
2025-06-07 18:18:03 +08:00
|
|
|
|
"git.pyer.club/kingecg/godocdb/storage"
|
2025-06-07 23:02:49 +08:00
|
|
|
|
"go.mongodb.org/mongo-driver/bson"
|
2025-06-07 18:18:03 +08:00
|
|
|
|
)
|
|
|
|
|
|
2025-06-07 22:28:29 +08:00
|
|
|
|
// IndexType 索引类型
|
|
|
|
|
type IndexType int
|
2025-06-07 18:18:03 +08:00
|
|
|
|
|
|
|
|
|
const (
|
2025-06-07 22:28:29 +08:00
|
|
|
|
NonUnique IndexType = iota
|
|
|
|
|
Unique
|
2025-06-07 18:18:03 +08:00
|
|
|
|
)
|
|
|
|
|
|
2025-06-07 19:13:07 +08:00
|
|
|
|
// IndexSortOrder 表示索引的排序方式
|
|
|
|
|
type IndexSortOrder string
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
Ascending IndexSortOrder = "asc"
|
|
|
|
|
Descending IndexSortOrder = "desc"
|
|
|
|
|
)
|
|
|
|
|
|
2025-06-07 18:18:03 +08:00
|
|
|
|
// IndexMetadata 索引元数据
|
|
|
|
|
type IndexMetadata struct {
|
2025-06-07 19:13:07 +08:00
|
|
|
|
Name string
|
|
|
|
|
Type IndexType
|
|
|
|
|
KeyFields []string
|
|
|
|
|
SortOrders []IndexSortOrder // 每个字段对应的排序方式(长度应与KeyFields一致)
|
2025-06-07 22:28:29 +08:00
|
|
|
|
Version int // 索引版本号,用于缓存一致性校验
|
2025-06-07 18:18:03 +08:00
|
|
|
|
}
|
|
|
|
|
|
2025-06-07 22:28:29 +08:00
|
|
|
|
// IndexStore 管理索引存储
|
2025-06-07 18:18:03 +08:00
|
|
|
|
type IndexStore struct {
|
|
|
|
|
storage *storage.LevelDBStorage
|
2025-06-07 22:28:29 +08:00
|
|
|
|
indexes map[string]map[string]string // 缓存索引数据 {indexName: {key: value}}
|
|
|
|
|
mu sync.RWMutex // 保护并发访问
|
2025-06-07 18:18:03 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewIndexStore 创建新的索引存储实例
|
|
|
|
|
func NewIndexStore(path string) (*IndexStore, error) {
|
2025-06-07 22:28:29 +08:00
|
|
|
|
// 确保目录存在
|
2025-06-07 23:02:49 +08:00
|
|
|
|
if strings.HasPrefix(path, "/") {
|
|
|
|
|
path = strings.TrimSuffix(path, "/")
|
|
|
|
|
}
|
|
|
|
|
if !strings.HasSuffix(path, "_index") {
|
|
|
|
|
// add _index suffix to avoid name confilict with document store, do remove this
|
|
|
|
|
path += "_index"
|
|
|
|
|
}
|
2025-06-07 22:28:29 +08:00
|
|
|
|
if err := os.MkdirAll(path, 0755); err != nil {
|
|
|
|
|
return nil, fmt.Errorf("failed to create storage directory: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-07 18:18:03 +08:00
|
|
|
|
storage, err := storage.NewLevelDBStorage(path)
|
|
|
|
|
if err != nil {
|
2025-06-07 22:28:29 +08:00
|
|
|
|
return nil, fmt.Errorf("failed to create LevelDB storage: %v", err)
|
2025-06-07 18:18:03 +08:00
|
|
|
|
}
|
2025-06-07 22:28:29 +08:00
|
|
|
|
|
|
|
|
|
return &IndexStore{
|
|
|
|
|
storage: storage,
|
|
|
|
|
indexes: make(map[string]map[string]string),
|
|
|
|
|
}, nil
|
2025-06-07 18:18:03 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// CreateIndex 创建索引
|
2025-06-07 19:13:07 +08:00
|
|
|
|
func (is *IndexStore) CreateIndex(indexName string, indexType IndexType, keyFields []string, sortOrders []IndexSortOrder) error {
|
2025-06-07 22:28:29 +08:00
|
|
|
|
is.mu.Lock()
|
|
|
|
|
defer is.mu.Unlock()
|
|
|
|
|
|
2025-06-07 19:13:07 +08:00
|
|
|
|
// 验证keyFields和sortOrders长度一致
|
|
|
|
|
if len(keyFields) != len(sortOrders) {
|
|
|
|
|
return fmt.Errorf("keyFields and sortOrders must have the same length")
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-07 18:18:03 +08:00
|
|
|
|
// 存储索引元数据
|
|
|
|
|
metadata := IndexMetadata{
|
2025-06-07 19:13:07 +08:00
|
|
|
|
Name: indexName,
|
|
|
|
|
Type: indexType,
|
|
|
|
|
KeyFields: keyFields,
|
2025-06-07 22:28:29 +08:00
|
|
|
|
SortOrders: sortOrders,
|
|
|
|
|
Version: 1, // 初始版本号
|
2025-06-07 18:18:03 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
data, err := bson.Marshal(metadata)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to marshal index metadata: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
key := []byte(fmt.Sprintf("indexes:metadata:%s", indexName))
|
|
|
|
|
if err := is.storage.Put(key, data); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-07 22:28:29 +08:00
|
|
|
|
// 初始化索引结构
|
|
|
|
|
index := make(map[string]string)
|
|
|
|
|
|
|
|
|
|
// 存储初始索引结构
|
2025-06-07 18:18:03 +08:00
|
|
|
|
indexKey := fmt.Sprintf("indexes:data:%s", indexName)
|
2025-06-07 22:28:29 +08:00
|
|
|
|
indexData, err := bson.Marshal(index)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to marshal index data: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-07 18:18:03 +08:00
|
|
|
|
return is.storage.Put([]byte(indexKey), indexData)
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-07 22:28:29 +08:00
|
|
|
|
// UpdateIndex 在指定collection下更新索引
|
|
|
|
|
func (is *IndexStore) UpdateIndex(collection string, indexName string, fieldValue interface{}, documentID string) error {
|
|
|
|
|
is.mu.Lock()
|
|
|
|
|
defer is.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
// 增加空值检查
|
|
|
|
|
if fieldValue == nil {
|
|
|
|
|
return fmt.Errorf("cannot update index with nil field value")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 从存储加载当前索引
|
|
|
|
|
indexKey := []byte(fmt.Sprintf("indexes:data:%s", indexName))
|
|
|
|
|
storedData, err := is.storage.Get(indexKey)
|
|
|
|
|
if err != nil {
|
|
|
|
|
// 如果不存在则创建新索引
|
|
|
|
|
storedData, _ = bson.Marshal(make(map[string]string))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 反序列化索引数据
|
|
|
|
|
var index map[string]string
|
|
|
|
|
if err := bson.Unmarshal(storedData, &index); err != nil {
|
|
|
|
|
return fmt.Errorf("failed to unmarshal index data: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 获取索引元数据
|
|
|
|
|
metadata, err := is.GetIndexMetadata(indexName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to get index metadata: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 更新索引
|
|
|
|
|
indexKeyField := fmt.Sprintf("%s:%v", collection, fieldValue)
|
|
|
|
|
if _, ok := index[indexKeyField]; ok {
|
|
|
|
|
// 处理重复值的逻辑(需要根据索引类型决定是否覆盖)
|
|
|
|
|
if metadata.Type == Unique {
|
|
|
|
|
return fmt.Errorf("duplicate key error for unique index")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
index[indexKeyField] = documentID
|
|
|
|
|
|
|
|
|
|
// 序列化并存储更新后的索引
|
|
|
|
|
updatedData, err := bson.Marshal(index)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to marshal updated index: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return is.storage.Put(indexKey, updatedData)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// getOrCreateIndex 获取现有索引或创建新索引
|
|
|
|
|
func (is *IndexStore) getOrCreateIndex(indexName string) (map[string]string, error) {
|
|
|
|
|
// 检查内存缓存
|
|
|
|
|
if index, ok := is.indexes[indexName]; ok {
|
|
|
|
|
return index, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 从存储中加载
|
|
|
|
|
indexKey := fmt.Sprintf("indexes:data:%s", indexName)
|
|
|
|
|
storedData, err := is.storage.Get([]byte(indexKey))
|
|
|
|
|
if err != nil {
|
|
|
|
|
// 如果不存在则创建空索引
|
|
|
|
|
return make(map[string]string), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 反序列化索引数据
|
|
|
|
|
var loadedIndex map[string]string
|
|
|
|
|
if err := bson.Unmarshal(storedData, &loadedIndex); err != nil {
|
|
|
|
|
return nil, fmt.Errorf("failed to unmarshal index data: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return loadedIndex, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// GetIndexedDocuments 查询特定collection下的文档
|
|
|
|
|
func (is *IndexStore) GetIndexedDocuments(collection string, indexName string, fieldValue interface{}) ([]string, error) {
|
|
|
|
|
is.mu.RLock()
|
|
|
|
|
defer is.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
// 构造前缀
|
|
|
|
|
prefix := fmt.Sprintf("%s:%v", collection, fieldValue)
|
|
|
|
|
|
|
|
|
|
// 直接从存储读取
|
|
|
|
|
indexKey := fmt.Sprintf("indexes:data:%s", indexName)
|
|
|
|
|
storedData, err := is.storage.Get([]byte(indexKey))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("index not found: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var loadedIndex map[string]string
|
|
|
|
|
if err := bson.Unmarshal(storedData, &loadedIndex); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var results []string
|
|
|
|
|
|
|
|
|
|
// 遍历查找匹配项
|
|
|
|
|
for key, value := range loadedIndex {
|
|
|
|
|
if strings.HasPrefix(key, prefix) {
|
|
|
|
|
results = append(results, value)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return results, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// GetAllIndexMetadata 获取所有索引的元数据
|
|
|
|
|
func (is *IndexStore) GetAllIndexMetadata(collection string) ([]IndexMetadata, error) {
|
|
|
|
|
// 构建前缀键(注意这里移除了多余的冒号)
|
|
|
|
|
prefix := []byte(fmt.Sprintf("indexes:metadata:%s:", collection))
|
|
|
|
|
|
|
|
|
|
// 使用storage的Scan方法替代直接访问db字段
|
|
|
|
|
keys, _, err := is.storage.Scan(prefix)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("failed to scan metadata: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var metadataList []IndexMetadata
|
|
|
|
|
for _, key := range keys {
|
|
|
|
|
// 验证键是否匹配前缀
|
|
|
|
|
if !bytes.HasPrefix(key, prefix) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 读取元数据值
|
|
|
|
|
value, err := is.storage.Get(key)
|
|
|
|
|
if err != nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 解析元数据
|
|
|
|
|
var md IndexMetadata
|
|
|
|
|
if err := bson.Unmarshal(value, &md); err != nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
metadataList = append(metadataList, md)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return metadataList, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// DeleteIndex 删除索引条目
|
|
|
|
|
func (is *IndexStore) DeleteIndex(collection string, indexName string, fieldValue interface{}, documentID string) error {
|
|
|
|
|
is.mu.Lock()
|
|
|
|
|
defer is.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
// 修复索引键的构造方式,使用统一格式
|
|
|
|
|
indexKey := []byte(fmt.Sprintf("indexes:data:%s:%s:%v", collection, indexName, fieldValue))
|
|
|
|
|
|
|
|
|
|
// 从LevelDB删除
|
|
|
|
|
return is.storage.Delete(indexKey)
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-07 18:18:03 +08:00
|
|
|
|
// DropIndex 删除索引
|
|
|
|
|
func (is *IndexStore) DropIndex(indexName string) error {
|
2025-06-07 22:28:29 +08:00
|
|
|
|
is.mu.Lock()
|
|
|
|
|
defer is.mu.Unlock()
|
|
|
|
|
|
2025-06-07 18:18:03 +08:00
|
|
|
|
// 删除元数据
|
|
|
|
|
metadataKey := []byte(fmt.Sprintf("indexes:metadata:%s", indexName))
|
|
|
|
|
if err := is.storage.Delete(metadataKey); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-07 22:28:29 +08:00
|
|
|
|
// 删除内存中的缓存
|
|
|
|
|
delete(is.indexes, indexName)
|
|
|
|
|
|
2025-06-07 18:18:03 +08:00
|
|
|
|
// 删除索引数据
|
|
|
|
|
indexKey := []byte(fmt.Sprintf("indexes:data:%s", indexName))
|
|
|
|
|
return is.storage.Delete(indexKey)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// GetIndexMetadata 获取索引元数据
|
2025-06-07 22:28:29 +08:00
|
|
|
|
func (is *IndexStore) GetIndexMetadata(indexName string) (IndexMetadata, error) {
|
2025-06-07 18:18:03 +08:00
|
|
|
|
key := []byte(fmt.Sprintf("indexes:metadata:%s", indexName))
|
2025-06-07 22:28:29 +08:00
|
|
|
|
storedData, err := is.storage.Get(key)
|
2025-06-07 18:18:03 +08:00
|
|
|
|
if err != nil {
|
2025-06-07 22:28:29 +08:00
|
|
|
|
return IndexMetadata{}, fmt.Errorf("index not found: %v", err)
|
2025-06-07 18:18:03 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var metadata IndexMetadata
|
2025-06-07 22:28:29 +08:00
|
|
|
|
if err := bson.Unmarshal(storedData, &metadata); err != nil {
|
|
|
|
|
return IndexMetadata{}, fmt.Errorf("failed to unmarshal index metadata: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return metadata, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// FlushIndexToDisk 将内存中的索引更新写入磁盘
|
|
|
|
|
func (is *IndexStore) FlushIndexToDisk(indexName string) error {
|
|
|
|
|
// 因为现在直接操作存储,不需要单独的刷新方法
|
|
|
|
|
// 现在Flush只是简单验证索引是否存在
|
|
|
|
|
is.mu.RLock()
|
|
|
|
|
defer is.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
if _, ok := is.indexes[indexName]; !ok {
|
|
|
|
|
return fmt.Errorf("index not found in memory: %s", indexName)
|
2025-06-07 18:18:03 +08:00
|
|
|
|
}
|
|
|
|
|
|
2025-06-07 22:28:29 +08:00
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ClearIndexCache 清除内存中的索引缓存
|
|
|
|
|
func (is *IndexStore) ClearIndexCache(indexName string) {
|
|
|
|
|
is.mu.Lock()
|
|
|
|
|
defer is.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
delete(is.indexes, indexName)
|
|
|
|
|
}
|