Skip to content

Commit

Permalink
增加封包拆包
Browse files Browse the repository at this point in the history
  • Loading branch information
liuligames committed Aug 7, 2021
1 parent a0c18b8 commit 2b454c6
Show file tree
Hide file tree
Showing 17 changed files with 389 additions and 257 deletions.
40 changes: 32 additions & 8 deletions demo/Client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package main

import (
"fmt"
"io"
"net"
"time"
wkNet "wukong/net"
)

func main() {
Expand All @@ -13,24 +15,46 @@ func main() {

conn, err := net.Dial("tcp", "127.0.0.1:9527")
if err != nil {
fmt.Println("client start err", err)
fmt.Println("client start err exit! ")
return
}

for {
_, err := conn.Write([]byte("Hello LiuLiGames V0.2 "))
dp := wkNet.NewDataPack()
binaryMsg, err := dp.Pack(wkNet.NewMessage(0, []byte("LiuLiGamesV0.5 client Test Message ")))
if err != nil {
fmt.Println("write conn err", err)
fmt.Println("Pack error", err)
}

if _, err := conn.Write(binaryMsg); err != nil {
fmt.Println("conn Write error", err)
return
}
buf := make([]byte, 512)
cnt, err := conn.Read(buf)
if err != nil {
fmt.Println("read buf err", err)

binaryHead := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(conn, binaryHead); err != nil {
fmt.Println("read head error", err)
return
}

fmt.Printf("server call back: %s ,cnt = %d \n", buf, cnt)
msgHead, err := dp.Unpack(binaryHead)
if err != nil {
fmt.Println("client unpack msgHead error", err)
break
}

if msgHead.GetMsgLen() > 0 {
msg := msgHead.(*wkNet.Message)
msg.SetData(make([]byte, msg.GetMsgLen()))

if _, err := io.ReadFull(conn, msg.Data); err != nil {
fmt.Println("read msg data error", err)
return
}

fmt.Println("-----> Recv Server msgId= ",
msg.Id, " len= ", msg.MsgLen, " data = ", string(msg.Data))
}

time.Sleep(1 * time.Second)
}
Expand Down
41 changes: 22 additions & 19 deletions demo/Server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,39 @@ type PingRouter struct {
net.BaseRouter
}

func (pr *PingRouter) PreHandle(request iface.IRequest) {
fmt.Println("Call Router PreHandle ....")

_, err := request.GetConnection().GetTCPConnection().Write([]byte("before ping.... \n"))
if err != nil {
fmt.Println("call back before ping error ")
}
}
//func (pr *PingRouter) PreHandle(request iface.IRequest) {
// fmt.Println("Call Router PreHandle ....")
//
// _, err := request.GetConnection().GetTCPConnection().Write([]byte("before ping.... \n"))
// if err != nil {
// fmt.Println("call back before ping error ")
// }
//}

func (pr *PingRouter) Handle(request iface.IRequest) {
fmt.Println("Call Router Handle ....")

_, err := request.GetConnection().GetTCPConnection().Write([]byte("ping...ping...ping... \n"))
if err != nil {
fmt.Println("call back ping...ping...ping... error ")
}
}
fmt.Println("recv from client:msgId = ", request.GetMsgId(),
"data = ", string(request.GetData()))

func (pr *PingRouter) PostHandle(request iface.IRequest) {
fmt.Println("Call Router PostHandle ....")

_, err := request.GetConnection().GetTCPConnection().Write([]byte("after ping.... \n"))
err := request.GetConnection().SendMsg(1, []byte("ping....ping....ping...."))
if err != nil {
fmt.Println("call back after ping error ")
fmt.Println(err)
}
}

//func (pr *PingRouter) PostHandle(request iface.IRequest) {
// fmt.Println("Call Router PostHandle ....")
//
// _, err := request.GetConnection().GetTCPConnection().Write([]byte("after ping.... \n"))
// if err != nil {
// fmt.Println("call back after ping error ")
// }
//}

func main() {

s := net.NewServer("[liu li games V0.3]")
s := net.NewServer()

s.AddRouter(&PingRouter{})

Expand Down
4 changes: 4 additions & 0 deletions demo/conf/conf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
name: "liuLI V0.4 demoServerApp"
Host: "127.0.0.1"
TcpPort: 9527
MaxConn: 3
8 changes: 3 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ module wukong
go 1.16

require (
github.com/gin-gonic/gin v1.7.2 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/libp2p/go-reuseport v0.0.1 // indirect
github.com/panjf2000/gnet v1.5.3 // indirect
github.com/smallnest/goframe v0.0.0-20191101094441-1fbd8e51db18 // indirect
github.com/gin-gonic/gin v1.7.2
github.com/panjf2000/gnet v1.5.3
go.uber.org/atomic v1.9.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)
180 changes: 12 additions & 168 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion iface/IConnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type IConnection interface {

GetRemoteAddr() net.Addr

Send(data []byte) error
SendMsg(msgId uint32, data []byte) error
}

type HandleFunc func(*net.TCPConn, []byte, int) error
9 changes: 9 additions & 0 deletions iface/IDataPack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package iface

type IDataPack interface {
GetHeadLen() uint32

Pack(msg IMessage) ([]byte, error)

Unpack([]byte) (IMessage, error)
}
11 changes: 11 additions & 0 deletions iface/IMessage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package iface

type IMessage interface {
GetMsgId() uint32
GetMsgLen() uint32
GetData() []byte

SetMsgId(uint32)
SetMsgLen(uint32)
SetData([]byte)
}
2 changes: 2 additions & 0 deletions iface/IRequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ type IRequest interface {
GetConnection() IConnection

GetData() []byte

GetMsgId() uint32
}
56 changes: 50 additions & 6 deletions net/Connection.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package net

import (
"errors"
"fmt"
"io"
"net"
"wukong/iface"
)
Expand Down Expand Up @@ -30,16 +32,41 @@ func (c *Connection) StartReader() {
defer c.Stop()

for {
buf := make([]byte, 512)
_, err := c.Conn.Read(buf)
// buf := make([]byte, utils.GlobalObject.MaxPackageSize)
// _, err := c.Conn.Read(buf)
// if err != nil {
// fmt.Println("recv buf err", err)
// continue
// }

dp := NewDataPack()

headData := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
fmt.Println("read msg head error", err)
break
}

msg, err := dp.Unpack(headData)
if err != nil {
fmt.Println("recv buf err", err)
continue
fmt.Println("unpack error", err)
break
}

var data []byte
if msg.GetMsgLen() > 0 {
data = make([]byte, msg.GetMsgLen())
if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
fmt.Println("read msg data error", err)
break
}
}

msg.SetData(data)

req := Request{
conn: c,
data: buf,
msg: msg,
}

go func(request iface.IRequest) {
Expand Down Expand Up @@ -84,6 +111,23 @@ func (c *Connection) GetRemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}

func (c *Connection) Send(data []byte) error {
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
if c.isClosed {
return errors.New("Connection Closed when send msg ")
}

dp := NewDataPack()

binaryMsg, err := dp.Pack(NewMessage(msgId, data))
if err != nil {
fmt.Println("Pack error msg id = ", msgId)
return errors.New("Pack error msg ")
}

if _, err := c.Conn.Write(binaryMsg); err != nil {
fmt.Println("Write error msg id = ", msgId, "error :", err)
return errors.New("conn Write error ")
}

return nil
}
58 changes: 58 additions & 0 deletions net/DataPack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package net

import (
"bytes"
"encoding/binary"
"errors"
"wukong/iface"
"wukong/utils"
)

type DataPack struct{}

func NewDataPack() *DataPack {
return &DataPack{}
}

func (dp *DataPack) GetHeadLen() uint32 {
//DataLen uint32(4字节) + Id uint32(4字节)
return 8
}

func (dp *DataPack) Pack(msg iface.IMessage) ([]byte, error) {
dataBuff := bytes.NewBuffer([]byte{})

if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgLen()); err != nil {
return nil, err
}

if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil {
return nil, err
}

if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil {
return nil, err
}

return dataBuff.Bytes(), nil
}

func (dp *DataPack) Unpack(binaryData []byte) (iface.IMessage, error) {
dataBuff := bytes.NewReader(binaryData)

msg := &Message{}

if err := binary.Read(dataBuff, binary.LittleEndian, &msg.MsgLen); err != nil {
return nil, err
}

if err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil {
return nil, err
}

if utils.GlobalObject.MaxPackageSize > 0 && msg.MsgLen > utils.GlobalObject.MaxPackageSize {
return nil, errors.New("too Large msg data recv")
}

return msg, nil
}
Loading

0 comments on commit 2b454c6

Please sign in to comment.