2025-06-05 17:59:27 +08:00
|
|
|
|
// Package storage 实现存储引擎接口
|
|
|
|
|
package storage
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"fmt"
|
2025-06-06 22:12:24 +08:00
|
|
|
|
"git.pyer.club/kingecg/goaidb/log"
|
|
|
|
|
"git.pyer.club/kingecg/goaidb/protocol"
|
|
|
|
|
"encoding/binary"
|
2025-06-05 17:59:27 +08:00
|
|
|
|
)
|
|
|
|
|
|
2025-06-06 22:12:24 +08:00
|
|
|
|
// matchesQuery 是一个简单的查询匹配函数(实际应使用更复杂的逻辑)
|
|
|
|
|
func matchesQuery(doc, query map[string]interface{}) bool {
|
|
|
|
|
// 这里应该实现实际的查询匹配逻辑
|
|
|
|
|
// 当前只是一个简单的存根实现
|
|
|
|
|
if query == nil {
|
|
|
|
|
return true // 如果没有查询条件,则匹配所有文档
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 遍历查询条件的所有字段
|
|
|
|
|
for key, value := range query {
|
|
|
|
|
// 检查文档是否包含该字段且值匹配
|
|
|
|
|
if docValue, exists := doc[key]; !exists || docValue != value {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-05 17:59:27 +08:00
|
|
|
|
// StorageEngine 存储引擎接口
|
|
|
|
|
type StorageEngine interface {
|
|
|
|
|
// 数据库操作
|
|
|
|
|
CreateDatabase(name string) error
|
|
|
|
|
DropDatabase(name string) error
|
|
|
|
|
ListDatabases() ([]string, error)
|
|
|
|
|
|
|
|
|
|
// 集合操作
|
|
|
|
|
CreateCollection(dbName, collName string) error
|
|
|
|
|
DropCollection(dbName, collName string) error
|
|
|
|
|
ListCollections(dbName string) ([]string, error)
|
|
|
|
|
|
|
|
|
|
// 文档操作
|
|
|
|
|
Insert(dbName, collName string, document []byte) error
|
|
|
|
|
Query(dbName, collName string, query []byte) ([][]byte, error)
|
|
|
|
|
Update(dbName, collName string, query, update []byte) error
|
|
|
|
|
Delete(dbName, collName string, query []byte) error
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewMemoryEngine 创建内存存储引擎实例
|
|
|
|
|
func NewMemoryEngine() (StorageEngine, error) {
|
|
|
|
|
return &memoryEngine{
|
|
|
|
|
databases: make(map[string]*memoryDatabase),
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 内存存储引擎实现
|
|
|
|
|
type memoryEngine struct {
|
|
|
|
|
databases map[string]*memoryDatabase
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (e *memoryEngine) CreateDatabase(name string) error {
|
|
|
|
|
if _, exists := e.databases[name]; exists {
|
|
|
|
|
return fmt.Errorf("database %s already exists", name)
|
|
|
|
|
}
|
|
|
|
|
e.databases[name] = &memoryDatabase{
|
|
|
|
|
collections: make(map[string]*memoryCollection),
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (e *memoryEngine) DropDatabase(name string) error {
|
|
|
|
|
if _, exists := e.databases[name]; !exists {
|
|
|
|
|
return fmt.Errorf("database %s does not exist", name)
|
|
|
|
|
}
|
|
|
|
|
delete(e.databases, name)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (e *memoryEngine) ListDatabases() ([]string, error) {
|
|
|
|
|
names := make([]string, 0, len(e.databases))
|
|
|
|
|
for name := range e.databases {
|
|
|
|
|
names = append(names, name)
|
|
|
|
|
}
|
|
|
|
|
return names, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (e *memoryEngine) CreateCollection(dbName, collName string) error {
|
|
|
|
|
db, exists := e.databases[dbName]
|
|
|
|
|
if !exists {
|
|
|
|
|
return fmt.Errorf("database %s does not exist", dbName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if _, collExists := db.collections[collName]; collExists {
|
|
|
|
|
return fmt.Errorf("collection %s already exists in database %s", collName, dbName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
db.collections[collName] = &memoryCollection{
|
|
|
|
|
data: make([][]byte, 0),
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (e *memoryEngine) DropCollection(dbName, collName string) error {
|
|
|
|
|
db, exists := e.databases[dbName]
|
|
|
|
|
if !exists {
|
|
|
|
|
return fmt.Errorf("database %s does not exist", dbName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if _, collExists := db.collections[collName]; !collExists {
|
|
|
|
|
return fmt.Errorf("collection %s does not exist in database %s", collName, dbName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
delete(db.collections, collName)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (e *memoryEngine) ListCollections(dbName string) ([]string, error) {
|
|
|
|
|
db, exists := e.databases[dbName]
|
|
|
|
|
if !exists {
|
|
|
|
|
return nil, fmt.Errorf("database %s does not exist", dbName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
names := make([]string, 0, len(db.collections))
|
|
|
|
|
for name := range db.collections {
|
|
|
|
|
names = append(names, name)
|
|
|
|
|
}
|
|
|
|
|
return names, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (e *memoryEngine) Insert(dbName, collName string, document []byte) error {
|
|
|
|
|
db, exists := e.databases[dbName]
|
|
|
|
|
if !exists {
|
|
|
|
|
return fmt.Errorf("database %s does not exist", dbName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
coll, exists := db.collections[collName]
|
|
|
|
|
if !exists {
|
|
|
|
|
// 自动创建集合
|
|
|
|
|
coll = &memoryCollection{
|
|
|
|
|
data: make([][]byte, 0),
|
|
|
|
|
}
|
|
|
|
|
db.collections[collName] = coll
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
coll.data = append(coll.data, document)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (e *memoryEngine) Query(dbName, collName string, query []byte) ([][]byte, error) {
|
|
|
|
|
db, exists := e.databases[dbName]
|
|
|
|
|
if !exists {
|
|
|
|
|
return nil, fmt.Errorf("database %s does not exist", dbName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
coll, exists := db.collections[collName]
|
|
|
|
|
if !exists {
|
|
|
|
|
return nil, fmt.Errorf("collection %s does not exist in database %s", collName, dbName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO: 实现实际的查询逻辑,目前返回所有文档
|
|
|
|
|
return coll.data, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (e *memoryEngine) Update(dbName, collName string, query, update []byte) error {
|
2025-06-06 22:12:24 +08:00
|
|
|
|
// 记录调试日志
|
|
|
|
|
log.Debug("开始执行更新操作", "db", dbName, "collection", collName)
|
|
|
|
|
|
|
|
|
|
// 获取集合
|
|
|
|
|
db, exists := e.databases[dbName]
|
|
|
|
|
if !exists {
|
|
|
|
|
log.Warn("数据库不存在", "db", dbName)
|
|
|
|
|
return fmt.Errorf("database %s does not exist", dbName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
coll, exists := db.collections[collName]
|
|
|
|
|
if !exists {
|
|
|
|
|
log.Warn("集合不存在", "db", dbName, "collection", collName)
|
|
|
|
|
return fmt.Errorf("collection %s does not exist in database %s", collName, dbName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 解析查询和更新文档
|
|
|
|
|
queryDoc, err := protocol.ParseBSON(query)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error("查询文档解析失败", "error", err, "db", dbName, "collection", collName)
|
|
|
|
|
return fmt.Errorf("failed to parse query document: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
updateDoc, err := protocol.ParseBSON(update)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error("更新文档解析失败", "error", err, "db", dbName, "collection", collName)
|
|
|
|
|
return fmt.Errorf("failed to parse update document: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 执行更新操作
|
|
|
|
|
matchedCount := 0
|
|
|
|
|
modifiedCount := 0
|
|
|
|
|
|
|
|
|
|
for i := range coll.data {
|
|
|
|
|
// 解析当前文档
|
|
|
|
|
doc, err := protocol.ParseBSON(coll.data[i])
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Warn("文档解析失败", "error", err, "index", i, "db", dbName, "collection", collName)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 检查是否匹配查询条件
|
|
|
|
|
match := matchesQuery(doc, queryDoc)
|
|
|
|
|
if match {
|
|
|
|
|
matchedCount++
|
|
|
|
|
|
|
|
|
|
// 应用更新操作 - 简单实现$set操作
|
|
|
|
|
if setOp, ok := updateDoc["$set"].(map[string]interface{}); ok {
|
|
|
|
|
// 实际应解析文档并应用更新,这里只是简单示例
|
|
|
|
|
for key, value := range setOp {
|
|
|
|
|
// 在实际实现中,需要正确修改文档内容
|
|
|
|
|
applySetOperation(doc, key, value)
|
|
|
|
|
log.Debug("应用$set操作", "key", key, "value", value, "db", dbName, "collection", collName)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 将更新后的文档重新序列化
|
|
|
|
|
updatedData, err := bsonMarshal(doc)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Warn("文档序列化失败", "error", err, "index", i, "db", dbName, "collection", collName)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 替换数据中的文档
|
|
|
|
|
coll.data[i] = updatedData
|
|
|
|
|
modifiedCount++
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Info("更新操作完成", "matched", matchedCount, "modified", modifiedCount, "db", dbName, "collection", collName)
|
2025-06-05 17:59:27 +08:00
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-06 22:12:24 +08:00
|
|
|
|
// applySetOperation 应用$set操作到文档
|
|
|
|
|
func applySetOperation(doc map[string]interface{}, key string, value interface{}) {
|
|
|
|
|
// 简单实现单层字段设置
|
|
|
|
|
doc[key] = value
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// bsonMarshal 将map转换为BSON格式的字节流
|
|
|
|
|
func bsonMarshal(doc map[string]interface{}) ([]byte, error) {
|
|
|
|
|
// 使用协议包中的BSON序列化功能
|
|
|
|
|
return protocol.BsonMarshal(doc)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// parseBSON 解析BSON文档(应移至单独的bson包或使用现有库)
|
|
|
|
|
func parseBSON(data []byte) (map[string]interface{}, []byte, error) {
|
|
|
|
|
// 实际实现应该解析BSON格式的数据
|
|
|
|
|
// 这里返回一个模拟实现
|
|
|
|
|
if len(data) < 4 {
|
|
|
|
|
log.Warn("数据过短,无法读取BSON文档长度")
|
|
|
|
|
return nil, data, fmt.Errorf("data too short for BSON document length")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 读取BSON文档长度
|
|
|
|
|
length := int(binary.LittleEndian.Uint32(data[0:4]))
|
|
|
|
|
if len(data) < length {
|
|
|
|
|
log.Warn("数据过短,无法读取完整BSON文档", "required", length, "available", len(data))
|
|
|
|
|
return nil, data, fmt.Errorf("data too short for BSON document")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO: 实际解析BSON文档内容
|
|
|
|
|
|
|
|
|
|
// 返回空文档作为占位符,剩余数据和nil错误
|
|
|
|
|
result := make(map[string]interface{})
|
|
|
|
|
log.Debug("成功解析BSON文档", "length", length, "remaining", len(data)-length)
|
|
|
|
|
return result, data[length:], nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2025-06-05 17:59:27 +08:00
|
|
|
|
func (e *memoryEngine) Delete(dbName, collName string, query []byte) error {
|
|
|
|
|
// TODO: 实现删除逻辑
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 内存数据库结构
|
|
|
|
|
type memoryDatabase struct {
|
|
|
|
|
collections map[string]*memoryCollection
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 内存集合结构
|
|
|
|
|
type memoryCollection struct {
|
|
|
|
|
data [][]byte
|
|
|
|
|
}
|