goaidb/network/server.go

100 lines
2.3 KiB
Go

// Package network 实现网络通信层
package network
import (
"fmt"
"io"
"net"
"git.pyer.club/kingecg/goaidb/log"
"git.pyer.club/kingecg/goaidb/protocol"
"git.pyer.club/kingecg/goaidb/storage"
)
// Server 网络服务器结构体
type Server struct {
storage storage.StorageEngine
}
// NewServer 创建新的网络服务器
func NewServer(storage storage.StorageEngine) *Server {
return &Server{storage: storage}
}
// Serve 启动服务
func (s *Server) Serve(listener net.Listener) {
for {
conn, err := listener.Accept()
if err != nil {
fmt.Printf("Failed to accept connection: %v\n", err)
continue
}
go s.handleConnection(conn)
}
}
// 处理客户端连接
func (s *Server) handleConnection(conn net.Conn) {
defer conn.Close()
buffer := make([]byte, 4*1024) // 4KB缓冲区
for {
n, err := conn.Read(buffer)
if err != nil {
if err != io.EOF {
log.Error("连接读取失败", "error", err)
}
return
}
// 解析MongoDB协议消息
message, err := protocol.ParseMessage(buffer[:n])
if err != nil {
log.Error("消息解析失败", "error", err)
continue
}
var response []byte
switch message.OpCode {
case protocol.OP_UPDATE:
updateMsg := message.Body.(*protocol.UpdateMessage)
// 序列化查询和更新文档为BSON格式
queryBson, err := protocol.BsonMarshal(updateMsg.Body.Query)
if err != nil {
log.Error("查询文档序列化失败", "error", err)
continue
}
updateBson, err := protocol.BsonMarshal(updateMsg.Body.UpdateSpec)
if err != nil {
log.Error("更新文档序列化失败", "error", err)
continue
}
err = s.storage.Update(updateMsg.Body.DatabaseName, updateMsg.Body.CollName, queryBson, updateBson)
if err != nil {
log.Error("存储层更新失败", "error", err)
continue
}
response = constructUpdateResponse(message)
default:
log.Warn("不支持的操作码", "opcode", message.OpCode)
}
// 发送响应
_, err = conn.Write(response)
if err != nil {
log.Error("响应发送失败", "error", err)
return
}
}
}
// 构造简单的OP_REPLY响应
func constructUpdateResponse(request *protocol.Message) []byte {
// 实际实现应构造完整的OP_REPLY消息
// 这里只是一个示例,返回空文档
return []byte{0x01, 0x00, 0x00, 0x00}
}