godocdb/document/document.go

222 lines
5.7 KiB
Go

package document
import (
"fmt"
"strings"
"sync"
"git.pyer.club/kingecg/godocdb/index"
"git.pyer.club/kingecg/godocdb/storage"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
// DocumentStore 管理带命名空间的文档存储
type DocumentStore struct {
storage *storage.LevelDBStorage
indexStore *index.IndexStore // 添加索引存储依赖
mu sync.RWMutex // 保护并发访问
}
// NewDocumentStore 创建新的文档存储实例
func NewDocumentStore(storeName string) (*DocumentStore, error) {
sm := storage.GetInstance()
storage, err := sm.GetStorage(storeName)
if err != nil {
return nil, fmt.Errorf("failed to create LevelDB storage: %v", err)
}
// 创建关联的索引存储
is, err := index.NewIndexStore(storeName)
if err != nil {
storage.Close()
return nil, fmt.Errorf("failed to create index store: %v", err)
}
if errCreate := is.CreateIndex("default_index", index.NonUnique, []string{"_id"}, []index.IndexSortOrder{index.Ascending}); errCreate != nil {
return nil, errCreate
}
return &DocumentStore{
storage: storage,
indexStore: is,
}, nil
}
// StoreDocument 带collection标识的文档存储
func (ds *DocumentStore) StoreDocument(collection string, id string, doc interface{}) error {
// 类型断言
docMap, ok := doc.(map[string]interface{})
if !ok {
return fmt.Errorf("document must be a map[string]interface{}")
}
// 自动生成文档ID
if id == "" {
// 使用文档自带的_id字段
if docID, exists := docMap["_id"].(primitive.ObjectID); exists {
id = docID.Hex()
} else {
// 自动生成新ID
newID := primitive.NewObjectID()
docMap["_id"] = newID
id = newID.Hex()
}
}
// 序列化文档
data, err := bson.Marshal(docMap)
if err != nil {
return fmt.Errorf("failed to marshal document: %v", err)
}
// 存储文档到LevelDB
key := []byte(fmt.Sprintf("documents:%s:%s", collection, id))
if err := ds.storage.Put(key, data); err != nil {
return err
}
// 获取相关索引
relatedIndexes, err := getRelatedIndexes(docMap, collection, ds.indexStore)
if err != nil {
return err
}
// 更新所有相关索引
for _, indexName := range relatedIndexes {
metadata, err := ds.indexStore.GetIndexMetadata(indexName)
if err != nil {
continue
}
for _, field := range metadata.KeyFields {
fieldValue, exists := extractFieldValue(docMap, field)
if !exists {
continue
}
// 更新索引并处理错误
if err := ds.indexStore.UpdateIndex(collection, indexName, fieldValue, id); err != nil {
// 如果是唯一索引冲突,返回错误
if strings.Contains(err.Error(), "duplicate key error") {
return fmt.Errorf("unique index constraint violated for %s.%s", collection, field)
}
}
}
}
// 创建默认索引(如果不存在)
if _, err := ds.indexStore.GetIndexMetadata("default_index"); err != nil {
// 如果索引不存在,创建默认索引
if errCreate := ds.indexStore.CreateIndex("default_index", index.NonUnique, []string{"_id"}, nil); errCreate != nil {
return errCreate
}
}
return nil
}
// GetDocument 带collection标识的文档获取
func (ds *DocumentStore) GetDocument(collection string, id string, doc interface{}) error {
// 在文档键中加入collection标识
key := []byte(fmt.Sprintf("documents:%s:%s", collection, id))
// 从LevelDB获取数据
rawData, err := ds.storage.Get(key)
if err != nil {
return fmt.Errorf("document not found: %v", err)
}
// 反序列化BSON数据
if err := bson.Unmarshal(rawData, doc); err != nil {
return err
}
return nil
}
// GetRelatedIndexes 获取文档关联的索引
func getRelatedIndexes(doc map[string]interface{}, coll string, is *index.IndexStore) ([]string, error) {
// 获取所有索引元数据
metadataList, err := is.GetAllIndexMetadata(coll)
if err != nil {
return nil, err
}
var related []string
for _, md := range metadataList {
// 检查文档是否包含索引字段
for _, field := range md.KeyFields {
if _, exists := doc[field]; exists {
related = append(related, md.Name)
break
}
}
}
// 如果没有匹配的索引,尝试获取默认索引
if len(related) == 0 {
_, err := is.GetIndexMetadata("default_index")
if err == nil {
related = append(related, "default_index")
} else {
return nil, fmt.Errorf("no suitable index found for collection %s", coll)
}
}
return related, nil
}
// ExtractFieldValue 提取文档字段值
func extractFieldValue(doc map[string]interface{}, field string) (interface{}, bool) {
value, exists := doc[field]
return value, exists
}
// DeleteDocument 删除文档并更新索引
func (ds *DocumentStore) DeleteDocument(collection string, id string) error {
ds.mu.Lock()
defer ds.mu.Unlock()
// 构建文档键
key := []byte(fmt.Sprintf("documents:%s:%s", collection, id))
// 获取文档内容以获取相关索引
rawData, err := ds.storage.Get(key)
if err != nil {
return fmt.Errorf("document not found: %v", err)
}
// 解析文档
var docMap map[string]interface{}
if err := bson.Unmarshal(rawData, &docMap); err != nil {
return fmt.Errorf("failed to unmarshal document: %v", err)
}
// 获取相关索引
relatedIndexes, err := getRelatedIndexes(docMap, collection, ds.indexStore)
if err != nil {
return err
}
// 删除相关索引条目
for _, indexName := range relatedIndexes {
metadata, err := ds.indexStore.GetIndexMetadata(indexName)
if err != nil {
continue
}
for _, field := range metadata.KeyFields {
fieldValue, exists := extractFieldValue(docMap, field)
if !exists {
continue
}
// 从索引中删除
_ = ds.indexStore.DeleteIndex(collection, indexName, fieldValue, id)
}
}
// 最后删除文档
return ds.storage.Delete(key)
}