From ee7e84c2b60346457e29866d0a87d35ddba02721 Mon Sep 17 00:00:00 2001 From: LiuShaowen Date: Sun, 8 Aug 2021 17:28:25 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AF=BB=E5=86=99=E5=8D=8F=E7=A8=8B=E5=88=86?= =?UTF-8?q?=E7=A6=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- demo/Client.go | 4 +-- demo/Client1.go | 62 ++++++++++++++++++++++++++++++++++++++++++++ demo/Server.go | 24 +++++++++++++++-- iface/IMsgHandler.go | 7 +++++ iface/IServer.go | 3 +-- net/Connection.go | 60 ++++++++++++++++++++++++++---------------- net/Message.go | 1 - net/MsgHandler.go | 37 ++++++++++++++++++++++++++ net/Server.go | 26 +++++++++---------- utils/globalobj.go | 2 +- 10 files changed, 183 insertions(+), 43 deletions(-) create mode 100644 demo/Client1.go create mode 100644 iface/IMsgHandler.go create mode 100644 net/MsgHandler.go diff --git a/demo/Client.go b/demo/Client.go index d878d48..283a457 100644 --- a/demo/Client.go +++ b/demo/Client.go @@ -10,7 +10,7 @@ import ( func main() { - fmt.Println("client start...") + fmt.Println("client0 start...") time.Sleep(1 + time.Second) conn, err := net.Dial("tcp", "127.0.0.1:9527") @@ -21,7 +21,7 @@ func main() { for { dp := wkNet.NewDataPack() - binaryMsg, err := dp.Pack(wkNet.NewMessage(0, []byte("LiuLiGamesV0.5 client Test Message "))) + binaryMsg, err := dp.Pack(wkNet.NewMessage(0, []byte("LiuLiGamesV0.7 client0 Test Message "))) if err != nil { fmt.Println("Pack error", err) } diff --git a/demo/Client1.go b/demo/Client1.go new file mode 100644 index 0000000..24fdc89 --- /dev/null +++ b/demo/Client1.go @@ -0,0 +1,62 @@ +package main + +import ( + "fmt" + "io" + "net" + "time" + wkNet "wukong/net" +) + +func main() { + + fmt.Println("client1 start...") + time.Sleep(1 + time.Second) + + conn, err := net.Dial("tcp", "127.0.0.1:9527") + if err != nil { + fmt.Println("client start err exit! ") + return + } + + for { + dp := wkNet.NewDataPack() + binaryMsg, err := dp.Pack(wkNet.NewMessage(1, []byte("LiuLiGamesV0.7 client1 Test Message "))) + if err != nil { + fmt.Println("Pack error", err) + } + + if _, err := conn.Write(binaryMsg); err != nil { + fmt.Println("conn Write error", err) + return + } + + binaryHead := make([]byte, dp.GetHeadLen()) + if _, err := io.ReadFull(conn, binaryHead); err != nil { + fmt.Println("read head error", err) + return + } + + msgHead, err := dp.Unpack(binaryHead) + if err != nil { + fmt.Println("client unpack msgHead error", err) + break + } + + if msgHead.GetMsgLen() > 0 { + msg := msgHead.(*wkNet.Message) + msg.SetData(make([]byte, msg.GetMsgLen())) + + if _, err := io.ReadFull(conn, msg.Data); err != nil { + fmt.Println("read msg data error", err) + return + } + + fmt.Println("-----> Recv Server msgId= ", + msg.Id, " len= ", msg.MsgLen, " data = ", string(msg.Data)) + } + + time.Sleep(1 * time.Second) + } + +} diff --git a/demo/Server.go b/demo/Server.go index 7568511..a1d8ab9 100644 --- a/demo/Server.go +++ b/demo/Server.go @@ -25,7 +25,7 @@ func (pr *PingRouter) Handle(request iface.IRequest) { fmt.Println("recv from client:msgId = ", request.GetMsgId(), "data = ", string(request.GetData())) - err := request.GetConnection().SendMsg(1, []byte("ping....ping....ping....")) + err := request.GetConnection().SendMsg(200, []byte("ping....ping....ping....")) if err != nil { fmt.Println(err) } @@ -40,11 +40,31 @@ func (pr *PingRouter) Handle(request iface.IRequest) { // } //} + + +type HelloRouter struct { + net.BaseRouter +} + + +func (hr *HelloRouter) Handle(request iface.IRequest) { + fmt.Println("Call HelloRouter Handle ....") + + fmt.Println("recv from client:msgId = ", request.GetMsgId(), + "data = ", string(request.GetData())) + + err := request.GetConnection().SendMsg(201, []byte("Hello Games")) + if err != nil { + fmt.Println(err) + } +} + func main() { s := net.NewServer() - s.AddRouter(&PingRouter{}) + s.AddRouter(0, &PingRouter{}) + s.AddRouter(1, &HelloRouter{}) s.Serve() diff --git a/iface/IMsgHandler.go b/iface/IMsgHandler.go new file mode 100644 index 0000000..ebf997d --- /dev/null +++ b/iface/IMsgHandler.go @@ -0,0 +1,7 @@ +package iface + +type IMsgHandler interface { + DoMsgHandler(request IRequest) + + AddRouter(msgId uint32, router IRouter) +} diff --git a/iface/IServer.go b/iface/IServer.go index 24f9f46..01768a2 100644 --- a/iface/IServer.go +++ b/iface/IServer.go @@ -5,6 +5,5 @@ type IServer interface { Stop() Serve() - AddRouter(router IRouter) - + AddRouter(msgId uint32, router IRouter) } diff --git a/net/Connection.go b/net/Connection.go index a4b9a15..2e174f5 100644 --- a/net/Connection.go +++ b/net/Connection.go @@ -9,26 +9,28 @@ import ( ) type Connection struct { - Conn *net.TCPConn - ConnID uint32 - isClosed bool - ExitChan chan bool - Router iface.IRouter + Conn *net.TCPConn + ConnID uint32 + isClosed bool + ExitChan chan bool + msgChan chan []byte + MsgHandler iface.IMsgHandler } -func NewConnection(conn *net.TCPConn, connID uint32, router iface.IRouter) *Connection { +func NewConnection(conn *net.TCPConn, connID uint32, msgHandler iface.IMsgHandler) *Connection { return &Connection{ - Conn: conn, - ConnID: connID, - Router: router, - isClosed: false, - ExitChan: make(chan bool, 1), + Conn: conn, + ConnID: connID, + MsgHandler: msgHandler, + isClosed: false, + msgChan: make(chan []byte), + ExitChan: make(chan bool, 1), } } func (c *Connection) StartReader() { fmt.Println("reader goroutine is running.....") - defer fmt.Println("connId = ", c.ConnID, "reader is exit remote addr is ", c.GetRemoteAddr().String()) + defer fmt.Println("connId = ", c.ConnID, "[reader is exit] remote addr is ", c.GetRemoteAddr().String()) defer c.Stop() for { @@ -69,11 +71,24 @@ func (c *Connection) StartReader() { msg: msg, } - go func(request iface.IRequest) { - c.Router.PreHandle(request) - c.Router.Handle(request) - c.Router.PostHandle(request) - }(&req) + go c.MsgHandler.DoMsgHandler(&req) + } +} + +func (c *Connection) StartWriter() { + fmt.Println("[Writer Goroutine is running]") + defer fmt.Println(c.GetRemoteAddr().String(), " [conn Writer exit]") + + for { + select { + case data := <-c.msgChan: + if _, err := c.Conn.Write(data); err != nil { + fmt.Println("Send data error ", err) + return + } + case <-c.ExitChan: + return + } } } @@ -81,7 +96,8 @@ func (c *Connection) Start() { fmt.Println("conn start ... connId = ", c.ConnID) go c.StartReader() - //todo 启动从当前连接写数据的业务 + + go c.StartWriter() } @@ -95,7 +111,10 @@ func (c *Connection) Stop() { _ = c.Conn.Close() + c.ExitChan <- true + close(c.ExitChan) + close(c.msgChan) } func (c *Connection) GetTCPConnection() *net.TCPConn { @@ -124,10 +143,7 @@ func (c *Connection) SendMsg(msgId uint32, data []byte) error { return errors.New("Pack error msg ") } - if _, err := c.Conn.Write(binaryMsg); err != nil { - fmt.Println("Write error msg id = ", msgId, "error :", err) - return errors.New("conn Write error ") - } + c.msgChan <- binaryMsg return nil } diff --git a/net/Message.go b/net/Message.go index 0a62798..62c120c 100644 --- a/net/Message.go +++ b/net/Message.go @@ -29,7 +29,6 @@ func (m *Message) SetMsgId(id uint32) { } func (m *Message) SetMsgLen(msgLen uint32) { m.MsgLen = msgLen - } func (m *Message) SetData(data []byte) { m.Data = data diff --git a/net/MsgHandler.go b/net/MsgHandler.go new file mode 100644 index 0000000..744d03f --- /dev/null +++ b/net/MsgHandler.go @@ -0,0 +1,37 @@ +package net + +import ( + "fmt" + "strconv" + "wukong/iface" +) + +type MsgHandler struct { + Apis map[uint32]iface.IRouter +} + +func NewMsgHandler() *MsgHandler { + return &MsgHandler{ + Apis: make(map[uint32]iface.IRouter), + } +} + +func (mh *MsgHandler) DoMsgHandler(request iface.IRequest) { + handler, ok := mh.Apis[request.GetMsgId()] + if !ok { + fmt.Println(" api msgId = ", request.GetMsgId(), "is not found Need Register") + } + + handler.PreHandle(request) + handler.Handle(request) + handler.PostHandle(request) +} + +func (mh *MsgHandler) AddRouter(msgId uint32, router iface.IRouter) { + if _, ok := mh.Apis[msgId]; ok { + panic("repeat api , msgId = " + strconv.Itoa(int(msgId))) + } + + mh.Apis[msgId] = router + fmt.Println("Add api msgId = ", msgId) +} diff --git a/net/Server.go b/net/Server.go index a699755..4ff5774 100644 --- a/net/Server.go +++ b/net/Server.go @@ -8,11 +8,11 @@ import ( ) type Server struct { - Name string - IPVersion string - IP string - Port int - Router iface.IRouter + Name string + IPVersion string + IP string + Port int + MsgHandler iface.IMsgHandler } func (s *Server) Start() { @@ -46,7 +46,7 @@ func (s *Server) Start() { continue } - dealConn := NewConnection(conn, connId, s.Router) + dealConn := NewConnection(conn, connId, s.MsgHandler) connId++ go dealConn.Start() @@ -66,17 +66,17 @@ func (s *Server) Serve() { select {} } -func (s *Server) AddRouter(router iface.IRouter) { - s.Router = router +func (s *Server) AddRouter(msgId uint32, router iface.IRouter) { + s.MsgHandler.AddRouter(msgId, router) fmt.Println("Add Router !!!!") } func NewServer() iface.IServer { return &Server{ - Name: utils.GlobalObject.Name, - IPVersion: "tcp4", - IP: utils.GlobalObject.Host, - Port: utils.GlobalObject.TcpPort, - Router: nil, + Name: utils.GlobalObject.Name, + IPVersion: "tcp4", + IP: utils.GlobalObject.Host, + Port: utils.GlobalObject.TcpPort, + MsgHandler: NewMsgHandler(), } } diff --git a/utils/globalobj.go b/utils/globalobj.go index 02d26aa..21348ec 100644 --- a/utils/globalobj.go +++ b/utils/globalobj.go @@ -33,7 +33,7 @@ func (g *GlobalObj) Reload() { func init() { GlobalObject = &GlobalObj{ Name: "LiuLIServerApp", - Version: "V0.5", + Version: "V0.7", TcpPort: 9527, Host: "0.0.0.0", MaxConn: 1000,