diff --git a/network/server.go b/network/server.go index ef142c0..92d80f9 100644 --- a/network/server.go +++ b/network/server.go @@ -47,6 +47,7 @@ func (s *Server) handleConnection(conn net.Conn) { return } + log.WriteOpMsgToFile("opbin.log", uint8(2), buffer[:n]) // 解析MongoDB协议消息 message, err := protocol.ParseMessage(buffer[:n]) if err != nil { diff --git a/protocol/bson.go b/protocol/bson.go index c262b11..bbf0a7e 100644 --- a/protocol/bson.go +++ b/protocol/bson.go @@ -174,6 +174,20 @@ func parseBSONValue(elementType byte, data []byte, pos int) (interface{}, int, e } return subDoc, docLength, nil + case 0x04: // Array + if len(data) < 4 { + return nil, 0, fmt.Errorf("data too short for Array length") + } + // 读取数组长度 + arrayLength := int(binary.LittleEndian.Uint32(data[0:4])) + if len(data) < arrayLength { + return nil, 0, fmt.Errorf("data too short for Array") + } + subDoc, _, err := parseBSON(data[0:arrayLength]) + if err != nil { + return nil, 0, fmt.Errorf("failed to parse array: %v", err) + } + return subDoc, arrayLength, nil default: return nil, 0, fmt.Errorf("unsupported BSON element type: 0x%02X", elementType) @@ -183,10 +197,10 @@ func parseBSONValue(elementType byte, data []byte, pos int) (interface{}, int, e // BsonMarshal 将map转换为BSON格式的字节流 func BsonMarshal(doc map[string]interface{}) ([]byte, error) { buf := &bytes.Buffer{} - + // 写入占位符长度(4字节) buf.Write(make([]byte, 4)) - + // 遍历文档元素 for key, value := range doc { // 写入元素类型和键名 @@ -194,24 +208,24 @@ func BsonMarshal(doc map[string]interface{}) ([]byte, error) { if err != nil { return nil, err } - + buf.WriteByte(elementType) buf.WriteString(key) buf.WriteByte(0x00) // 键名终止符 - + // 写入值数据 if err := writeBSONValue(buf, elementType, value); err != nil { return nil, err } } - + // 写入文档结束符 buf.WriteByte(0x00) - + // 回填文档长度 length := uint32(buf.Len()) binary.LittleEndian.PutUint32(buf.Bytes(), length) - + return buf.Bytes(), nil } @@ -248,7 +262,7 @@ func writeBSONValue(buf *bytes.Buffer, elementType byte, value interface{}) erro b := make([]byte, 4) binary.LittleEndian.PutUint32(b, uint32(v)) buf.Write(b) - + case 0x12: // Int64 v, ok := value.(int64) if !ok { @@ -257,7 +271,7 @@ func writeBSONValue(buf *bytes.Buffer, elementType byte, value interface{}) erro b := make([]byte, 8) binary.LittleEndian.PutUint64(b, uint64(v)) buf.Write(b) - + case 0x01: // Double v, ok := value.(float64) if !ok { @@ -266,7 +280,7 @@ func writeBSONValue(buf *bytes.Buffer, elementType byte, value interface{}) erro b := make([]byte, 8) binary.LittleEndian.PutUint64(b, math.Float64bits(v)) buf.Write(b) - + case 0x02: // String v, ok := value.(string) if !ok { @@ -281,7 +295,7 @@ func writeBSONValue(buf *bytes.Buffer, elementType byte, value interface{}) erro // 写入字符串内容和终止符 buf.Write(strBytes) buf.WriteByte(0x00) - + case 0x08: // Boolean v, ok := value.(bool) if !ok { @@ -292,10 +306,10 @@ func writeBSONValue(buf *bytes.Buffer, elementType byte, value interface{}) erro } else { buf.WriteByte(0x00) } - + case 0x0A: // Null // 不需要写入任何数据 - + case 0x03: // EmbeddedDocument v, ok := value.(map[string]interface{}) if !ok { @@ -307,7 +321,7 @@ func writeBSONValue(buf *bytes.Buffer, elementType byte, value interface{}) erro } // 直接写入子文档数据(包含完整的长度信息) buf.Write(subDoc) - + default: return fmt.Errorf("unsupported BSON element type: 0x%02X", elementType) } diff --git a/protocol/parser.go b/protocol/parser.go index 8404a15..02172ff 100644 --- a/protocol/parser.go +++ b/protocol/parser.go @@ -5,11 +5,12 @@ import ( "bytes" "encoding/binary" "fmt" + "strings" ) // UpdateFlags are the flags for OP_UPDATE const ( - Upsert = 1 << iota + Upsert = 1 << iota MultiUpdate // 标志位用于多文档更新 ) @@ -97,11 +98,11 @@ func parseUpdate(data []byte) (interface{}, error) { return &UpdateMessage{ Body: struct { - Flags UpdateFlags + Flags UpdateFlags DatabaseName string - CollName string - Query map[string]interface{} - UpdateSpec map[string]interface{} + CollName string + Query map[string]interface{} + UpdateSpec map[string]interface{} }{ Flags: flags, DatabaseName: dbName, @@ -190,29 +191,41 @@ func parseQuery(data []byte) (interface{}, error) { if dbEnd == -1 { return nil, fmt.Errorf("database name not null terminated") } - dbName := string(data[4 : dbEnd+4]) + dbcolName := string(data[4 : dbEnd+4]) + dcnames := strings.Split(dbcolName, ".") + dbName := dcnames[0] + collName := dcnames[1] // 剩余数据包含集合名和查询条件 remaining := data[dbEnd+5:] // 跳过终止符 - + var numberToSkip, numberToReturn int32 + binary.Read(bytes.NewReader(remaining[0:4]), binary.LittleEndian, &numberToSkip) + binary.Read(bytes.NewReader(remaining[4:8]), binary.LittleEndian, &numberToReturn) // 提取集合名 - collEnd := bytes.IndexByte(remaining, 0) - if collEnd == -1 { - return nil, fmt.Errorf("collection name not null terminated") - } - collName := string(remaining[:collEnd]) + // collEnd := bytes.IndexByte(remaining, 0) + // if collEnd == -1 { + // return nil, fmt.Errorf("collection name not null terminated") + // } + // if collEnd == 0 { + // collEnd = bytes.IndexFunc(remaining, func(r rune) bool { + // return r != 0 + // }) + // } + // collName := string(remaining[:collEnd]) // 解析查询条件 - queryDoc, _, err := parseBSON(remaining[collEnd+1:]) + queryDoc, _, err := parseBSON(remaining[8:]) if err != nil { return nil, fmt.Errorf("failed to parse query conditions: %v", err) } return &QueryMessage{ - Flags: flags, - DatabaseName: dbName, - CollName: collName, - Query: queryDoc, + Flags: flags, + DatabaseName: dbName, + CollName: collName, + NumberToSkip: numberToSkip, + NumberToReturn: numberToReturn, + Query: queryDoc, }, nil } @@ -265,10 +278,12 @@ func parseInsert(data []byte) (interface{}, error) { // QueryMessage OP_QUERY消息体结构 type QueryMessage struct { - Flags uint32 // 查询标志 - DatabaseName string // 数据库名称 - CollName string // 集合名称 - Query map[string]interface{} // 查询条件 + Flags uint32 // 查询标志 + DatabaseName string // 数据库名称 + CollName string // 集合名称 + NumberToSkip int32 + NumberToReturn int32 + Query map[string]interface{} // 查询条件 } // InsertMessage OP_INSERT消息体结构 @@ -281,8 +296,8 @@ type InsertMessage struct { // UpdateMessage represents an OP_UPDATE message type UpdateMessage struct { - Header Header - Body struct { + Header Header + Body struct { Flags UpdateFlags DatabaseName string CollName string diff --git a/protocol/parser_test.go b/protocol/parser_test.go index a5ec82f..545e86f 100644 --- a/protocol/parser_test.go +++ b/protocol/parser_test.go @@ -2,8 +2,22 @@ package protocol import ( "testing" + + "git.pyer.club/kingecg/goaidb/log" ) +func TestParseQuery(t *testing.T) { + _, data, err := log.ReadOpMsgFromFile("/home/kingecg/code/goaidb/opbin.log") + if err != nil { + t.Fatalf("ReadOpMsgFromFile failed: %v", err) + } + msg, err := ParseMessage(data) + if err != nil { + t.Fatalf("ParseMessage failed: %v", err) + } + log.Info(msg.Body) +} + func TestParseUpdate(t *testing.T) { // 构造测试数据(最小有效Update消息) data := []byte{