2025-06-07 17:56:45 +08:00
|
|
|
package document
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
2025-06-07 22:28:29 +08:00
|
|
|
"os"
|
|
|
|
"strings"
|
2025-06-07 23:02:49 +08:00
|
|
|
"sync"
|
|
|
|
|
2025-06-07 22:28:29 +08:00
|
|
|
"git.pyer.club/kingecg/godocdb/index"
|
2025-06-07 17:56:45 +08:00
|
|
|
"git.pyer.club/kingecg/godocdb/storage"
|
2025-06-07 23:02:49 +08:00
|
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
2025-06-07 17:56:45 +08:00
|
|
|
)
|
|
|
|
|
2025-06-07 22:28:29 +08:00
|
|
|
// DocumentStore 管理带命名空间的文档存储
|
2025-06-07 17:56:45 +08:00
|
|
|
type DocumentStore struct {
|
2025-06-07 22:28:29 +08:00
|
|
|
storage *storage.LevelDBStorage
|
|
|
|
indexStore *index.IndexStore // 添加索引存储依赖
|
2025-06-07 23:02:49 +08:00
|
|
|
mu sync.RWMutex // 保护并发访问
|
2025-06-07 17:56:45 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewDocumentStore 创建新的文档存储实例
|
|
|
|
func NewDocumentStore(path string) (*DocumentStore, error) {
|
2025-06-07 22:28:29 +08:00
|
|
|
// 确保目录存在
|
|
|
|
if err := os.MkdirAll(path, 0755); err != nil {
|
|
|
|
return nil, fmt.Errorf("failed to create storage directory: %v", err)
|
|
|
|
}
|
|
|
|
|
2025-06-07 17:56:45 +08:00
|
|
|
storage, err := storage.NewLevelDBStorage(path)
|
|
|
|
if err != nil {
|
2025-06-07 22:28:29 +08:00
|
|
|
return nil, fmt.Errorf("failed to create LevelDB storage: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// 创建关联的索引存储
|
|
|
|
is, err := index.NewIndexStore(path)
|
|
|
|
if err != nil {
|
|
|
|
storage.Close()
|
|
|
|
return nil, fmt.Errorf("failed to create index store: %v", err)
|
2025-06-07 17:56:45 +08:00
|
|
|
}
|
2025-06-07 23:02:49 +08:00
|
|
|
if errCreate := is.CreateIndex("default_index", index.NonUnique, []string{"_id"}, []index.IndexSortOrder{index.Ascending}); errCreate != nil {
|
|
|
|
return nil, errCreate
|
|
|
|
}
|
2025-06-07 22:28:29 +08:00
|
|
|
|
|
|
|
return &DocumentStore{
|
|
|
|
storage: storage,
|
|
|
|
indexStore: is,
|
|
|
|
}, nil
|
2025-06-07 17:56:45 +08:00
|
|
|
}
|
|
|
|
|
2025-06-07 22:28:29 +08:00
|
|
|
// StoreDocument 带collection标识的文档存储
|
|
|
|
func (ds *DocumentStore) StoreDocument(collection string, id string, doc interface{}) error {
|
|
|
|
// 类型断言
|
|
|
|
docMap, ok := doc.(map[string]interface{})
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("document must be a map[string]interface{}")
|
|
|
|
}
|
|
|
|
|
|
|
|
// 自动生成文档ID
|
|
|
|
if id == "" {
|
|
|
|
// 使用文档自带的_id字段
|
|
|
|
if docID, exists := docMap["_id"].(primitive.ObjectID); exists {
|
|
|
|
id = docID.Hex()
|
|
|
|
} else {
|
|
|
|
// 自动生成新ID
|
|
|
|
newID := primitive.NewObjectID()
|
|
|
|
docMap["_id"] = newID
|
|
|
|
id = newID.Hex()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// 序列化文档
|
|
|
|
data, err := bson.Marshal(docMap)
|
2025-06-07 17:56:45 +08:00
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("failed to marshal document: %v", err)
|
|
|
|
}
|
|
|
|
|
2025-06-07 22:28:29 +08:00
|
|
|
// 存储文档到LevelDB
|
|
|
|
key := []byte(fmt.Sprintf("documents:%s:%s", collection, id))
|
|
|
|
if err := ds.storage.Put(key, data); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// 获取相关索引
|
|
|
|
relatedIndexes, err := getRelatedIndexes(docMap, collection, ds.indexStore)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// 更新所有相关索引
|
|
|
|
for _, indexName := range relatedIndexes {
|
|
|
|
metadata, err := ds.indexStore.GetIndexMetadata(indexName)
|
|
|
|
if err != nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, field := range metadata.KeyFields {
|
|
|
|
fieldValue, exists := extractFieldValue(docMap, field)
|
|
|
|
if !exists {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// 更新索引并处理错误
|
|
|
|
if err := ds.indexStore.UpdateIndex(collection, indexName, fieldValue, id); err != nil {
|
|
|
|
// 如果是唯一索引冲突,返回错误
|
|
|
|
if strings.Contains(err.Error(), "duplicate key error") {
|
|
|
|
return fmt.Errorf("unique index constraint violated for %s.%s", collection, field)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// 创建默认索引(如果不存在)
|
|
|
|
if _, err := ds.indexStore.GetIndexMetadata("default_index"); err != nil {
|
|
|
|
// 如果索引不存在,创建默认索引
|
|
|
|
if errCreate := ds.indexStore.CreateIndex("default_index", index.NonUnique, []string{"_id"}, nil); errCreate != nil {
|
|
|
|
return errCreate
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
2025-06-07 17:56:45 +08:00
|
|
|
}
|
|
|
|
|
2025-06-07 22:28:29 +08:00
|
|
|
// GetDocument 带collection标识的文档获取
|
|
|
|
func (ds *DocumentStore) GetDocument(collection string, id string, doc interface{}) error {
|
|
|
|
// 在文档键中加入collection标识
|
|
|
|
key := []byte(fmt.Sprintf("documents:%s:%s", collection, id))
|
|
|
|
|
|
|
|
// 从LevelDB获取数据
|
|
|
|
rawData, err := ds.storage.Get(key)
|
2025-06-07 17:56:45 +08:00
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("document not found: %v", err)
|
|
|
|
}
|
|
|
|
|
2025-06-07 22:28:29 +08:00
|
|
|
// 反序列化BSON数据
|
|
|
|
if err := bson.Unmarshal(rawData, doc); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
2025-06-07 17:56:45 +08:00
|
|
|
}
|
|
|
|
|
2025-06-07 22:28:29 +08:00
|
|
|
// GetRelatedIndexes 获取文档关联的索引
|
|
|
|
func getRelatedIndexes(doc map[string]interface{}, coll string, is *index.IndexStore) ([]string, error) {
|
|
|
|
// 获取所有索引元数据
|
|
|
|
metadataList, err := is.GetAllIndexMetadata(coll)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
var related []string
|
|
|
|
for _, md := range metadataList {
|
|
|
|
// 检查文档是否包含索引字段
|
|
|
|
for _, field := range md.KeyFields {
|
|
|
|
if _, exists := doc[field]; exists {
|
|
|
|
related = append(related, md.Name)
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// 如果没有匹配的索引,尝试获取默认索引
|
|
|
|
if len(related) == 0 {
|
|
|
|
_, err := is.GetIndexMetadata("default_index")
|
|
|
|
if err == nil {
|
|
|
|
related = append(related, "default_index")
|
|
|
|
} else {
|
|
|
|
return nil, fmt.Errorf("no suitable index found for collection %s", coll)
|
|
|
|
}
|
|
|
|
}
|
2025-06-07 23:02:49 +08:00
|
|
|
|
2025-06-07 22:28:29 +08:00
|
|
|
return related, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// ExtractFieldValue 提取文档字段值
|
|
|
|
func extractFieldValue(doc map[string]interface{}, field string) (interface{}, bool) {
|
|
|
|
value, exists := doc[field]
|
|
|
|
return value, exists
|
|
|
|
}
|
|
|
|
|
|
|
|
// DeleteDocument 删除文档并更新索引
|
|
|
|
func (ds *DocumentStore) DeleteDocument(collection string, id string) error {
|
|
|
|
ds.mu.Lock()
|
|
|
|
defer ds.mu.Unlock()
|
|
|
|
|
|
|
|
// 构建文档键
|
|
|
|
key := []byte(fmt.Sprintf("documents:%s:%s", collection, id))
|
|
|
|
|
|
|
|
// 获取文档内容以获取相关索引
|
|
|
|
rawData, err := ds.storage.Get(key)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("document not found: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// 解析文档
|
|
|
|
var docMap map[string]interface{}
|
|
|
|
if err := bson.Unmarshal(rawData, &docMap); err != nil {
|
|
|
|
return fmt.Errorf("failed to unmarshal document: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// 获取相关索引
|
|
|
|
relatedIndexes, err := getRelatedIndexes(docMap, collection, ds.indexStore)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// 删除相关索引条目
|
|
|
|
for _, indexName := range relatedIndexes {
|
|
|
|
metadata, err := ds.indexStore.GetIndexMetadata(indexName)
|
|
|
|
if err != nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, field := range metadata.KeyFields {
|
|
|
|
fieldValue, exists := extractFieldValue(docMap, field)
|
|
|
|
if !exists {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// 从索引中删除
|
|
|
|
_ = ds.indexStore.DeleteIndex(collection, indexName, fieldValue, id)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// 最后删除文档
|
2025-06-07 17:56:45 +08:00
|
|
|
return ds.storage.Delete(key)
|
2025-06-07 22:28:29 +08:00
|
|
|
}
|