// Package storage 实现存储引擎接口 package storage import ( "fmt" "git.pyer.club/kingecg/goaidb/log" "git.pyer.club/kingecg/goaidb/protocol" "encoding/binary" ) // matchesQuery 是一个简单的查询匹配函数(实际应使用更复杂的逻辑) func matchesQuery(doc, query map[string]interface{}) bool { // 这里应该实现实际的查询匹配逻辑 // 当前只是一个简单的存根实现 if query == nil { return true // 如果没有查询条件,则匹配所有文档 } // 遍历查询条件的所有字段 for key, value := range query { // 检查文档是否包含该字段且值匹配 if docValue, exists := doc[key]; !exists || docValue != value { return false } } return true } // StorageEngine 存储引擎接口 type StorageEngine interface { // 数据库操作 CreateDatabase(name string) error DropDatabase(name string) error ListDatabases() ([]string, error) // 集合操作 CreateCollection(dbName, collName string) error DropCollection(dbName, collName string) error ListCollections(dbName string) ([]string, error) // 文档操作 Insert(dbName, collName string, document []byte) error Query(dbName, collName string, query []byte) ([][]byte, error) Update(dbName, collName string, query, update []byte) error Delete(dbName, collName string, query []byte) error } // NewMemoryEngine 创建内存存储引擎实例 func NewMemoryEngine() (StorageEngine, error) { return &memoryEngine{ databases: make(map[string]*memoryDatabase), }, nil } // 内存存储引擎实现 type memoryEngine struct { databases map[string]*memoryDatabase } func (e *memoryEngine) CreateDatabase(name string) error { if _, exists := e.databases[name]; exists { return fmt.Errorf("database %s already exists", name) } e.databases[name] = &memoryDatabase{ collections: make(map[string]*memoryCollection), } return nil } func (e *memoryEngine) DropDatabase(name string) error { if _, exists := e.databases[name]; !exists { return fmt.Errorf("database %s does not exist", name) } delete(e.databases, name) return nil } func (e *memoryEngine) ListDatabases() ([]string, error) { names := make([]string, 0, len(e.databases)) for name := range e.databases { names = append(names, name) } return names, nil } func (e *memoryEngine) CreateCollection(dbName, collName string) error { db, exists := e.databases[dbName] if !exists { return fmt.Errorf("database %s does not exist", dbName) } if _, collExists := db.collections[collName]; collExists { return fmt.Errorf("collection %s already exists in database %s", collName, dbName) } db.collections[collName] = &memoryCollection{ data: make([][]byte, 0), } return nil } func (e *memoryEngine) DropCollection(dbName, collName string) error { db, exists := e.databases[dbName] if !exists { return fmt.Errorf("database %s does not exist", dbName) } if _, collExists := db.collections[collName]; !collExists { return fmt.Errorf("collection %s does not exist in database %s", collName, dbName) } delete(db.collections, collName) return nil } func (e *memoryEngine) ListCollections(dbName string) ([]string, error) { db, exists := e.databases[dbName] if !exists { return nil, fmt.Errorf("database %s does not exist", dbName) } names := make([]string, 0, len(db.collections)) for name := range db.collections { names = append(names, name) } return names, nil } func (e *memoryEngine) Insert(dbName, collName string, document []byte) error { db, exists := e.databases[dbName] if !exists { return fmt.Errorf("database %s does not exist", dbName) } coll, exists := db.collections[collName] if !exists { // 自动创建集合 coll = &memoryCollection{ data: make([][]byte, 0), } db.collections[collName] = coll } coll.data = append(coll.data, document) return nil } func (e *memoryEngine) Query(dbName, collName string, query []byte) ([][]byte, error) { db, exists := e.databases[dbName] if !exists { return nil, fmt.Errorf("database %s does not exist", dbName) } coll, exists := db.collections[collName] if !exists { return nil, fmt.Errorf("collection %s does not exist in database %s", collName, dbName) } // TODO: 实现实际的查询逻辑,目前返回所有文档 return coll.data, nil } func (e *memoryEngine) Update(dbName, collName string, query, update []byte) error { // 记录调试日志 log.Debug("开始执行更新操作", "db", dbName, "collection", collName) // 获取集合 db, exists := e.databases[dbName] if !exists { log.Warn("数据库不存在", "db", dbName) return fmt.Errorf("database %s does not exist", dbName) } coll, exists := db.collections[collName] if !exists { log.Warn("集合不存在", "db", dbName, "collection", collName) return fmt.Errorf("collection %s does not exist in database %s", collName, dbName) } // 解析查询和更新文档 queryDoc, err := protocol.ParseBSON(query) if err != nil { log.Error("查询文档解析失败", "error", err, "db", dbName, "collection", collName) return fmt.Errorf("failed to parse query document: %v", err) } updateDoc, err := protocol.ParseBSON(update) if err != nil { log.Error("更新文档解析失败", "error", err, "db", dbName, "collection", collName) return fmt.Errorf("failed to parse update document: %v", err) } // 执行更新操作 matchedCount := 0 modifiedCount := 0 for i := range coll.data { // 解析当前文档 doc, err := protocol.ParseBSON(coll.data[i]) if err != nil { log.Warn("文档解析失败", "error", err, "index", i, "db", dbName, "collection", collName) continue } // 检查是否匹配查询条件 match := matchesQuery(doc, queryDoc) if match { matchedCount++ // 应用更新操作 - 简单实现$set操作 if setOp, ok := updateDoc["$set"].(map[string]interface{}); ok { // 实际应解析文档并应用更新,这里只是简单示例 for key, value := range setOp { // 在实际实现中,需要正确修改文档内容 applySetOperation(doc, key, value) log.Debug("应用$set操作", "key", key, "value", value, "db", dbName, "collection", collName) } } // 将更新后的文档重新序列化 updatedData, err := bsonMarshal(doc) if err != nil { log.Warn("文档序列化失败", "error", err, "index", i, "db", dbName, "collection", collName) continue } // 替换数据中的文档 coll.data[i] = updatedData modifiedCount++ } } log.Info("更新操作完成", "matched", matchedCount, "modified", modifiedCount, "db", dbName, "collection", collName) return nil } // applySetOperation 应用$set操作到文档 func applySetOperation(doc map[string]interface{}, key string, value interface{}) { // 简单实现单层字段设置 doc[key] = value } // bsonMarshal 将map转换为BSON格式的字节流 func bsonMarshal(doc map[string]interface{}) ([]byte, error) { // 使用协议包中的BSON序列化功能 return protocol.BsonMarshal(doc) } // parseBSON 解析BSON文档(应移至单独的bson包或使用现有库) func parseBSON(data []byte) (map[string]interface{}, []byte, error) { // 实际实现应该解析BSON格式的数据 // 这里返回一个模拟实现 if len(data) < 4 { log.Warn("数据过短,无法读取BSON文档长度") return nil, data, fmt.Errorf("data too short for BSON document length") } // 读取BSON文档长度 length := int(binary.LittleEndian.Uint32(data[0:4])) if len(data) < length { log.Warn("数据过短,无法读取完整BSON文档", "required", length, "available", len(data)) return nil, data, fmt.Errorf("data too short for BSON document") } // TODO: 实际解析BSON文档内容 // 返回空文档作为占位符,剩余数据和nil错误 result := make(map[string]interface{}) log.Debug("成功解析BSON文档", "length", length, "remaining", len(data)-length) return result, data[length:], nil } func (e *memoryEngine) Delete(dbName, collName string, query []byte) error { // TODO: 实现删除逻辑 return nil } // 内存数据库结构 type memoryDatabase struct { collections map[string]*memoryCollection } // 内存集合结构 type memoryCollection struct { data [][]byte }