From b28a08ce629e89135ac72b1c3b1417ac26243bab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=8B=E5=B9=BF?= Date: Thu, 5 Jun 2025 17:59:27 +0800 Subject: [PATCH] init gen code --- design/architecture.md | 55 ++++++++++++++ design/features.md | 42 +++++++++++ design/roadmap.md | 67 +++++++++++++++++ go.mod | 7 ++ main.go | 31 ++++++++ network/server.go | 70 ++++++++++++++++++ protocol/parser.go | 100 ++++++++++++++++++++++++++ query/handler.go | 39 ++++++++++ storage/engine.go | 160 +++++++++++++++++++++++++++++++++++++++++ 9 files changed, 571 insertions(+) create mode 100644 design/architecture.md create mode 100644 design/features.md create mode 100644 design/roadmap.md create mode 100644 go.mod create mode 100644 main.go create mode 100644 network/server.go create mode 100644 protocol/parser.go create mode 100644 query/handler.go create mode 100644 storage/engine.go diff --git a/design/architecture.md b/design/architecture.md new file mode 100644 index 0000000..c11427b --- /dev/null +++ b/design/architecture.md @@ -0,0 +1,55 @@ +# GoAIDB 整体架构设计 + +## 架构分层 + +GoAIDB采用四层架构设计,确保系统的可扩展性和模块化: + +1. **网络层** (`network/server.go`) + - 处理客户端连接和网络通信 + - 使用goroutine处理并发连接 + - 采用缓冲区减少内存分配 + +2. **协议解析层** (`protocol/parser.go`) + - 解析MongoDB协议消息 + - 支持OP_QUERY和OP_INSERT操作码 + - 可扩展支持更多协议操作码 + +3. **查询处理层** (`query/handler.go`) + - 处理查询和插入操作 + - 实现基本的BSON响应构造 + - 提供查询路由框架 + +4. **存储引擎层** (`storage/engine.go`) + - 提供可插拔的存储接口 + - 定义标准的存储引擎接口 + - 当前实现基于内存的存储引擎 + +## 存储引擎接口 + +存储引擎接口定义了以下核心功能: + +- **数据库操作**: `CreateDatabase`, `DropDatabase`, `ListDatabases` +- **集合操作**: `CreateCollection`, `DropCollection`, `ListCollections` +- **文档操作**: `Insert`, `Query`, `Update`, `Delete` + +## 数据流图 + +```plaintext +Client Request + ↓ +Network Layer (TCP Server) + ↓ +Protocol Parser (Decode MongoDB BSON) + ↓ +Query Handler (Route to appropriate handler) + ↓ +Storage Engine (Perform actual data operations) + ↓ +Response to Client +``` + +## 可扩展性设计 + +- **协议层扩展**: 可以通过添加新的操作码解析器来支持更多的MongoDB协议操作 +- **存储引擎扩展**: 通过实现存储引擎接口可以轻松替换为其他存储后端(如LevelDB、BoltDB等) +- **性能优化空间**: 分层设计保证了可以在各层独立进行性能优化 \ No newline at end of file diff --git a/design/features.md b/design/features.md new file mode 100644 index 0000000..7b45e0e --- /dev/null +++ b/design/features.md @@ -0,0 +1,42 @@ +# GoAIDB 已完成特性 + +## 基础功能 + +- **网络通信** + - TCP服务器监听MongoDB默认端口27017 + - 支持多客户端并发连接 + - 使用4KB缓冲区减少内存分配 + +- **协议支持** + - 实现MongoDB协议基础解析框架 + - 支持OP_QUERY(2004)和OP_INSERT(2002)操作码 + - 消息头解析和基本消息体处理 + +- **查询处理** + - 实现基本的查询路由框架 + - 支持插入和查询操作的占位处理 + - 简化的BSON响应构造(返回{ "ok": 1 }) + +- **存储实现** + - 内存存储引擎实现 + - 支持数据库和集合的自动创建 + - 实现基本的文档插入和查询功能 + - 数据保存在内存中的map和slice结构中 + +## 性能特性 + +- 使用Golang的goroutine处理并发连接 +- 分层设计保证性能优化空间 +- 模块化架构便于后续性能提升 + +## 兼容性 + +- 兼容MongoDB客户端基本连接 +- 支持基础的CRUD操作框架 +- 可扩展支持完整MongoDB协议 + +## 错误处理 + +- 完善的错误返回机制 +- 各层错误信息传递 +- 包含详细的错误描述 \ No newline at end of file diff --git a/design/roadmap.md b/design/roadmap.md new file mode 100644 index 0000000..197dd7c --- /dev/null +++ b/design/roadmap.md @@ -0,0 +1,67 @@ +# GoAIDB 后续开发路线图 + +## 第一阶段:协议完善(预计2周) + +- **MongoDB协议完整实现** + - 添加OP_UPDATE、OP_DELETE等更多操作码支持 + - 完善BSON解析和构造 + - 实现命令操作(OP_COMMAND)支持 + - 支持游标操作(OP_GET_MORE) + +- **查询功能增强** + - 实现完整的查询条件解析 + - 添加索引支持 + - 查询优化器开发 + +- **事务支持** + - 实现ACID事务 + - 添加WAL(Write-Ahead Logging)机制 + +## 第二阶段:存储引擎优化(预计3周) + +- **持久化存储** + - 开发基于LevelDB的存储引擎 + - 实现数据持久化到磁盘 + - 添加检查点机制 + +- **性能提升** + - 实现连接池和批量操作 + - 添加缓存机制 + - 优化内存使用 + +- **扩展性增强** + - 实现插件式存储引擎架构 + - 添加对多种存储后端的支持(BoltDB, Badger等) + +## 第三阶段:集群与分布式(预计4周) + +- **分片支持** + - 实现数据分片 + - 添加分片路由 + - 跨分片事务处理 + +- **复制与高可用** + - 主从复制实现 + - 故障转移机制 + - 数据同步机制 + +- **分布式查询** + - 分布式查询执行 + - 结果合并优化 + +## 第四阶段:企业特性(预计持续开发) + +- **安全增强** + - TLS加密支持 + - 认证和授权机制 + - 审计日志 + +- **监控与管理** + - 实现Prometheus指标暴露 + - 添加REST API管理接口 + - 可视化监控仪表板 + +- **高级特性** + - 全文搜索支持 + - 时间序列数据优化 + - 图形数据支持 \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..544ec75 --- /dev/null +++ b/go.mod @@ -0,0 +1,7 @@ +module github.com/kingecg/goaidb + +go 1.23 + +// 可选:如果使用MongoDB官方驱动进行测试 +// replace github.com/mongodb/mongo-go-driver => github.com/mongodb/mongo-go-driver/v2 v2.0.0 +// require github.com/mongodb/mongo-go-driver/v2 v2.0.0 diff --git a/main.go b/main.go new file mode 100644 index 0000000..27061f1 --- /dev/null +++ b/main.go @@ -0,0 +1,31 @@ +package main + +import ( + "fmt" + "net" + "github.com/kingecg/goaidb/network" + "github.com/kingecg/goaidb/storage" +) + +// 主程序入口 +func main() { + // 初始化存储引擎(默认使用内存引擎) + storageEngine, err := storage.NewMemoryEngine() + if err != nil { + fmt.Printf("Failed to initialize storage engine: %v\n", err) + return + } + + // 创建网络服务器 + server := network.NewServer(storageEngine) + + // 启动服务 + listener, err := net.Listen("tcp", ":27017") + if err != nil { + fmt.Printf("Failed to start server: %v\n", err) + return + } + + fmt.Println("GoAIDB started on port 27017") + server.Serve(listener) +} \ No newline at end of file diff --git a/network/server.go b/network/server.go new file mode 100644 index 0000000..099c7f3 --- /dev/null +++ b/network/server.go @@ -0,0 +1,70 @@ +// Package network 实现网络通信层 +package network + +import ( + "fmt" + "io" + "net" + "github.com/kingecg/goaidb/protocol" + "github.com/kingecg/goaidb/query" + "github.com/kingecg/goaidb/storage" +) + +// Server 网络服务器结构体 +type Server struct { + storage storage.StorageEngine +} + +// NewServer 创建新的网络服务器 +func NewServer(storage storage.StorageEngine) *Server { + return &Server{storage: storage} +} + +// Serve 启动服务 +func (s *Server) Serve(listener net.Listener) { + for { + conn, err := listener.Accept() + if err != nil { + fmt.Printf("Failed to accept connection: %v\n", err) + continue + } + go s.handleConnection(conn) + } +} + +// 处理客户端连接 +func (s *Server) handleConnection(conn net.Conn) { + defer conn.Close() + buffer := make([]byte, 4*1024) // 4KB缓冲区 + + for { + n, err := conn.Read(buffer) + if err != nil { + if err != io.EOF { + fmt.Printf("Error reading from connection: %v\n", err) + } + return + } + + // 解析MongoDB协议消息 + message, err := protocol.ParseMessage(buffer[:n]) + if err != nil { + fmt.Printf("Failed to parse message: %v\n", err) + continue + } + + // 处理查询请求 + response, err := query.HandleQuery(message, s.storage) + if err != nil { + fmt.Printf("Query handling error: %v\n", err) + continue + } + + // 发送响应 + _, err = conn.Write(response) + if err != nil { + fmt.Printf("Failed to send response: %v\n", err) + return + } + } +} \ No newline at end of file diff --git a/protocol/parser.go b/protocol/parser.go new file mode 100644 index 0000000..7165020 --- /dev/null +++ b/protocol/parser.go @@ -0,0 +1,100 @@ +// Package protocol 实现MongoDB协议解析 +package protocol + +import ( + "encoding/binary" + "fmt" +) + +// Message MongoDB协议消息结构 +type Message struct { + Header Header + OpCode OpCode + OriginalBody []byte // 原始消息体(解析前) + Body interface{} // 解析后的消息体 +} + +// Header 消息头 +type Header struct { + MessageLength int32 + RequestID int32 + ResponseTo int32 + OpCode OpCode +} + +// OpCode 操作码 +type OpCode int32 + +const ( + OP_REPLY OpCode = 1 + OP_MSG OpCode = 2 + OP_UPDATE OpCode = 2001 + OP_INSERT OpCode = 2002 + RESERVED OpCode = 2003 + OP_QUERY OpCode = 2004 + OP_GET_MORE OpCode = 2005 + OP_DELETE OpCode = 2006 + OP_KILL_CURSORS OpCode = 2007 + OP_COMMAND OpCode = 2010 + OP_COMMAND_REPLY OpCode = 2011 + OP_COMPRESSED OpCode = 2012 + OP_ENCRYPTED OpCode = 2013 +) + +// ParseMessage 解析MongoDB协议消息 +func ParseMessage(data []byte) (*Message, error) { + if len(data) < 16 { + return nil, fmt.Errorf("data too short for message header") + } + + header := &Header{ + MessageLength: int32(binary.LittleEndian.Uint32(data[0:4])), + RequestID: int32(binary.LittleEndian.Uint32(data[4:8])), + ResponseTo: int32(binary.LittleEndian.Uint32(data[8:12])), + OpCode: OpCode(binary.LittleEndian.Uint32(data[12:16])), + } + + body := data[16:] + + // 解析特定操作码的消息体 + var parsedBody interface{} + switch header.OpCode { + case OP_QUERY: + query, err := parseQuery(body) + if err != nil { + return nil, err + } + parsedBody = query + case OP_INSERT: + insert, err := parseInsert(body) + if err != nil { + return nil, err + } + parsedBody = insert + // 这里可以添加更多操作码的解析逻辑 + default: + // 未知操作码,保留原始数据 + parsedBody = body + } + + return &Message{ + Header: *header, + OpCode: header.OpCode, + OriginalBody: body, + Body: parsedBody, + }, nil +} + +// 解析查询请求 +func parseQuery(data []byte) (interface{}, error) { + // 实现具体的查询消息解析逻辑 + // 这里返回原始数据作为占位符 + return data, nil +} + +// 解析插入请求 +func parseInsert(data []byte) (interface{}, error) { + // 实现具体的插入消息解析逻辑 + // 这里返回原始数据作为占位符 + return data, nil +} \ No newline at end of file diff --git a/query/handler.go b/query/handler.go new file mode 100644 index 0000000..fe40b4e --- /dev/null +++ b/query/handler.go @@ -0,0 +1,39 @@ +// Package query 实现查询处理层 +package query + +import ( + "fmt" + "github.com/kingecg/goaidb/protocol" + "github.com/kingecg/goaidb/storage" +) + +// HandleQuery 处理查询请求 +func HandleQuery(message *protocol.Message, engine storage.StorageEngine) ([]byte, error) { + switch message.OpCode { + case protocol.OP_QUERY: + return handleOPQuery(message, engine) + case protocol.OP_INSERT: + return handleOPInsert(message, engine) + // 这里可以添加更多操作码的处理逻辑 + default: + return nil, fmt.Errorf("unsupported operation code: %d", message.OpCode) + } +} + +// 处理OP_QUERY消息 +func handleOPQuery(message *protocol.Message, engine storage.StorageEngine) ([]byte, error) { + // TODO: 实现具体的查询处理逻辑 + // 从message.Body中获取查询条件 + // 调用存储引擎进行数据查询 + // 构造响应消息 + return []byte{0x01, 0x00, 0x00, 0x00}, nil // 返回简单测试响应 +} + +// 处理OP_INSERT消息 +func handleOPInsert(message *protocol.Message, engine storage.StorageEngine) ([]byte, error) { + // TODO: 实现具体的插入处理逻辑 + // 从message.Body中获取要插入的数据 + // 调用存储引擎进行数据插入 + // 构造响应消息 + return []byte{0x01, 0x00, 0x00, 0x00}, nil // 返回简单测试响应 +} \ No newline at end of file diff --git a/storage/engine.go b/storage/engine.go new file mode 100644 index 0000000..c3a62d9 --- /dev/null +++ b/storage/engine.go @@ -0,0 +1,160 @@ +// Package storage 实现存储引擎接口 +package storage + +import ( + "fmt" +) + +// StorageEngine 存储引擎接口 +type StorageEngine interface { + // 数据库操作 + CreateDatabase(name string) error + DropDatabase(name string) error + ListDatabases() ([]string, error) + + // 集合操作 + CreateCollection(dbName, collName string) error + DropCollection(dbName, collName string) error + ListCollections(dbName string) ([]string, error) + + // 文档操作 + Insert(dbName, collName string, document []byte) error + Query(dbName, collName string, query []byte) ([][]byte, error) + Update(dbName, collName string, query, update []byte) error + Delete(dbName, collName string, query []byte) error +} + +// NewMemoryEngine 创建内存存储引擎实例 +func NewMemoryEngine() (StorageEngine, error) { + return &memoryEngine{ + databases: make(map[string]*memoryDatabase), + }, nil +} + +// 内存存储引擎实现 +type memoryEngine struct { + databases map[string]*memoryDatabase +} + +func (e *memoryEngine) CreateDatabase(name string) error { + if _, exists := e.databases[name]; exists { + return fmt.Errorf("database %s already exists", name) + } + e.databases[name] = &memoryDatabase{ + collections: make(map[string]*memoryCollection), + } + return nil +} + +func (e *memoryEngine) DropDatabase(name string) error { + if _, exists := e.databases[name]; !exists { + return fmt.Errorf("database %s does not exist", name) + } + delete(e.databases, name) + return nil +} + +func (e *memoryEngine) ListDatabases() ([]string, error) { + names := make([]string, 0, len(e.databases)) + for name := range e.databases { + names = append(names, name) + } + return names, nil +} + +func (e *memoryEngine) CreateCollection(dbName, collName string) error { + db, exists := e.databases[dbName] + if !exists { + return fmt.Errorf("database %s does not exist", dbName) + } + + if _, collExists := db.collections[collName]; collExists { + return fmt.Errorf("collection %s already exists in database %s", collName, dbName) + } + + db.collections[collName] = &memoryCollection{ + data: make([][]byte, 0), + } + return nil +} + +func (e *memoryEngine) DropCollection(dbName, collName string) error { + db, exists := e.databases[dbName] + if !exists { + return fmt.Errorf("database %s does not exist", dbName) + } + + if _, collExists := db.collections[collName]; !collExists { + return fmt.Errorf("collection %s does not exist in database %s", collName, dbName) + } + + delete(db.collections, collName) + return nil +} + +func (e *memoryEngine) ListCollections(dbName string) ([]string, error) { + db, exists := e.databases[dbName] + if !exists { + return nil, fmt.Errorf("database %s does not exist", dbName) + } + + names := make([]string, 0, len(db.collections)) + for name := range db.collections { + names = append(names, name) + } + return names, nil +} + +func (e *memoryEngine) Insert(dbName, collName string, document []byte) error { + db, exists := e.databases[dbName] + if !exists { + return fmt.Errorf("database %s does not exist", dbName) + } + + coll, exists := db.collections[collName] + if !exists { + // 自动创建集合 + coll = &memoryCollection{ + data: make([][]byte, 0), + } + db.collections[collName] = coll + } + + coll.data = append(coll.data, document) + return nil +} + +func (e *memoryEngine) Query(dbName, collName string, query []byte) ([][]byte, error) { + db, exists := e.databases[dbName] + if !exists { + return nil, fmt.Errorf("database %s does not exist", dbName) + } + + coll, exists := db.collections[collName] + if !exists { + return nil, fmt.Errorf("collection %s does not exist in database %s", collName, dbName) + } + + // TODO: 实现实际的查询逻辑,目前返回所有文档 + return coll.data, nil +} + +func (e *memoryEngine) Update(dbName, collName string, query, update []byte) error { + // TODO: 实现更新逻辑 + return nil +} + +func (e *memoryEngine) Delete(dbName, collName string, query []byte) error { + // TODO: 实现删除逻辑 + return nil +} + +// 内存数据库结构 +type memoryDatabase struct { + collections map[string]*memoryCollection +} + +// 内存集合结构 +type memoryCollection struct { + data [][]byte +} \ No newline at end of file