goaidb/network/server.go

101 lines
2.3 KiB
Go
Raw Permalink Normal View History

2025-06-05 17:59:27 +08:00
// Package network 实现网络通信层
package network
import (
"fmt"
"io"
"net"
2025-06-06 20:47:59 +08:00
2025-06-06 22:12:24 +08:00
"git.pyer.club/kingecg/goaidb/log"
2025-06-06 20:47:59 +08:00
"git.pyer.club/kingecg/goaidb/protocol"
"git.pyer.club/kingecg/goaidb/storage"
2025-06-05 17:59:27 +08:00
)
// Server 网络服务器结构体
type Server struct {
storage storage.StorageEngine
}
// NewServer 创建新的网络服务器
func NewServer(storage storage.StorageEngine) *Server {
return &Server{storage: storage}
}
// Serve 启动服务
func (s *Server) Serve(listener net.Listener) {
for {
conn, err := listener.Accept()
if err != nil {
fmt.Printf("Failed to accept connection: %v\n", err)
continue
}
go s.handleConnection(conn)
}
}
// 处理客户端连接
func (s *Server) handleConnection(conn net.Conn) {
defer conn.Close()
buffer := make([]byte, 4*1024) // 4KB缓冲区
for {
n, err := conn.Read(buffer)
if err != nil {
if err != io.EOF {
2025-06-06 22:12:24 +08:00
log.Error("连接读取失败", "error", err)
2025-06-05 17:59:27 +08:00
}
return
}
2025-06-07 11:06:55 +08:00
// log.WriteOpMsgToFile("opbin.log", uint8(2), buffer[:n])
2025-06-05 17:59:27 +08:00
// 解析MongoDB协议消息
message, err := protocol.ParseMessage(buffer[:n])
if err != nil {
2025-06-06 22:12:24 +08:00
log.Error("消息解析失败", "error", err)
2025-06-05 17:59:27 +08:00
continue
}
2025-06-06 22:12:24 +08:00
var response []byte
switch message.OpCode {
case protocol.OP_UPDATE:
updateMsg := message.Body.(*protocol.UpdateMessage)
// 序列化查询和更新文档为BSON格式
2025-06-06 22:54:22 +08:00
queryBson, err := protocol.BsonMarshal(updateMsg.Body.Query)
2025-06-06 22:12:24 +08:00
if err != nil {
log.Error("查询文档序列化失败", "error", err)
continue
}
2025-06-06 22:54:22 +08:00
updateBson, err := protocol.BsonMarshal(updateMsg.Body.UpdateSpec)
2025-06-06 22:12:24 +08:00
if err != nil {
log.Error("更新文档序列化失败", "error", err)
continue
}
2025-06-06 22:54:22 +08:00
err = s.storage.Update(updateMsg.Body.DatabaseName, updateMsg.Body.CollName, queryBson, updateBson)
2025-06-06 22:12:24 +08:00
if err != nil {
log.Error("存储层更新失败", "error", err)
continue
}
response = constructUpdateResponse(message)
default:
log.Warn("不支持的操作码", "opcode", message.OpCode)
2025-06-05 17:59:27 +08:00
}
// 发送响应
_, err = conn.Write(response)
if err != nil {
2025-06-06 22:12:24 +08:00
log.Error("响应发送失败", "error", err)
2025-06-05 17:59:27 +08:00
return
}
}
2025-06-06 20:47:59 +08:00
}
2025-06-06 22:12:24 +08:00
// 构造简单的OP_REPLY响应
func constructUpdateResponse(request *protocol.Message) []byte {
// 实际实现应构造完整的OP_REPLY消息
// 这里只是一个示例,返回空文档
2025-06-07 00:13:03 +08:00
return []byte{0x01, 0x00, 0x00, 0x00}
2025-06-06 22:12:24 +08:00
}