Skip to content

Commit

Permalink
读写协程分离
Browse files Browse the repository at this point in the history
  • Loading branch information
liuligames committed Aug 8, 2021
1 parent 2b454c6 commit ee7e84c
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 43 deletions.
4 changes: 2 additions & 2 deletions demo/Client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func main() {

fmt.Println("client start...")
fmt.Println("client0 start...")
time.Sleep(1 + time.Second)

conn, err := net.Dial("tcp", "127.0.0.1:9527")
Expand All @@ -21,7 +21,7 @@ func main() {

for {
dp := wkNet.NewDataPack()
binaryMsg, err := dp.Pack(wkNet.NewMessage(0, []byte("LiuLiGamesV0.5 client Test Message ")))
binaryMsg, err := dp.Pack(wkNet.NewMessage(0, []byte("LiuLiGamesV0.7 client0 Test Message ")))
if err != nil {
fmt.Println("Pack error", err)
}
Expand Down
62 changes: 62 additions & 0 deletions demo/Client1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package main

import (
"fmt"
"io"
"net"
"time"
wkNet "wukong/net"
)

func main() {

fmt.Println("client1 start...")
time.Sleep(1 + time.Second)

conn, err := net.Dial("tcp", "127.0.0.1:9527")
if err != nil {
fmt.Println("client start err exit! ")
return
}

for {
dp := wkNet.NewDataPack()
binaryMsg, err := dp.Pack(wkNet.NewMessage(1, []byte("LiuLiGamesV0.7 client1 Test Message ")))
if err != nil {
fmt.Println("Pack error", err)
}

if _, err := conn.Write(binaryMsg); err != nil {
fmt.Println("conn Write error", err)
return
}

binaryHead := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(conn, binaryHead); err != nil {
fmt.Println("read head error", err)
return
}

msgHead, err := dp.Unpack(binaryHead)
if err != nil {
fmt.Println("client unpack msgHead error", err)
break
}

if msgHead.GetMsgLen() > 0 {
msg := msgHead.(*wkNet.Message)
msg.SetData(make([]byte, msg.GetMsgLen()))

if _, err := io.ReadFull(conn, msg.Data); err != nil {
fmt.Println("read msg data error", err)
return
}

fmt.Println("-----> Recv Server msgId= ",
msg.Id, " len= ", msg.MsgLen, " data = ", string(msg.Data))
}

time.Sleep(1 * time.Second)
}

}
24 changes: 22 additions & 2 deletions demo/Server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (pr *PingRouter) Handle(request iface.IRequest) {
fmt.Println("recv from client:msgId = ", request.GetMsgId(),
"data = ", string(request.GetData()))

err := request.GetConnection().SendMsg(1, []byte("ping....ping....ping...."))
err := request.GetConnection().SendMsg(200, []byte("ping....ping....ping...."))
if err != nil {
fmt.Println(err)
}
Expand All @@ -40,11 +40,31 @@ func (pr *PingRouter) Handle(request iface.IRequest) {
// }
//}



type HelloRouter struct {
net.BaseRouter
}


func (hr *HelloRouter) Handle(request iface.IRequest) {
fmt.Println("Call HelloRouter Handle ....")

fmt.Println("recv from client:msgId = ", request.GetMsgId(),
"data = ", string(request.GetData()))

err := request.GetConnection().SendMsg(201, []byte("Hello Games"))
if err != nil {
fmt.Println(err)
}
}

func main() {

s := net.NewServer()

s.AddRouter(&PingRouter{})
s.AddRouter(0, &PingRouter{})
s.AddRouter(1, &HelloRouter{})

s.Serve()

Expand Down
7 changes: 7 additions & 0 deletions iface/IMsgHandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package iface

type IMsgHandler interface {
DoMsgHandler(request IRequest)

AddRouter(msgId uint32, router IRouter)
}
3 changes: 1 addition & 2 deletions iface/IServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,5 @@ type IServer interface {
Stop()
Serve()

AddRouter(router IRouter)

AddRouter(msgId uint32, router IRouter)
}
60 changes: 38 additions & 22 deletions net/Connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,28 @@ import (
)

type Connection struct {
Conn *net.TCPConn
ConnID uint32
isClosed bool
ExitChan chan bool
Router iface.IRouter
Conn *net.TCPConn
ConnID uint32
isClosed bool
ExitChan chan bool
msgChan chan []byte
MsgHandler iface.IMsgHandler
}

func NewConnection(conn *net.TCPConn, connID uint32, router iface.IRouter) *Connection {
func NewConnection(conn *net.TCPConn, connID uint32, msgHandler iface.IMsgHandler) *Connection {
return &Connection{
Conn: conn,
ConnID: connID,
Router: router,
isClosed: false,
ExitChan: make(chan bool, 1),
Conn: conn,
ConnID: connID,
MsgHandler: msgHandler,
isClosed: false,
msgChan: make(chan []byte),
ExitChan: make(chan bool, 1),
}
}

func (c *Connection) StartReader() {
fmt.Println("reader goroutine is running.....")
defer fmt.Println("connId = ", c.ConnID, "reader is exit remote addr is ", c.GetRemoteAddr().String())
defer fmt.Println("connId = ", c.ConnID, "[reader is exit] remote addr is ", c.GetRemoteAddr().String())
defer c.Stop()

for {
Expand Down Expand Up @@ -69,19 +71,33 @@ func (c *Connection) StartReader() {
msg: msg,
}

go func(request iface.IRequest) {
c.Router.PreHandle(request)
c.Router.Handle(request)
c.Router.PostHandle(request)
}(&req)
go c.MsgHandler.DoMsgHandler(&req)
}
}

func (c *Connection) StartWriter() {
fmt.Println("[Writer Goroutine is running]")
defer fmt.Println(c.GetRemoteAddr().String(), " [conn Writer exit]")

for {
select {
case data := <-c.msgChan:
if _, err := c.Conn.Write(data); err != nil {
fmt.Println("Send data error ", err)
return
}
case <-c.ExitChan:
return
}
}
}

func (c *Connection) Start() {
fmt.Println("conn start ... connId = ", c.ConnID)

go c.StartReader()
//todo 启动从当前连接写数据的业务

go c.StartWriter()

}

Expand All @@ -95,7 +111,10 @@ func (c *Connection) Stop() {

_ = c.Conn.Close()

c.ExitChan <- true

close(c.ExitChan)
close(c.msgChan)
}

func (c *Connection) GetTCPConnection() *net.TCPConn {
Expand Down Expand Up @@ -124,10 +143,7 @@ func (c *Connection) SendMsg(msgId uint32, data []byte) error {
return errors.New("Pack error msg ")
}

if _, err := c.Conn.Write(binaryMsg); err != nil {
fmt.Println("Write error msg id = ", msgId, "error :", err)
return errors.New("conn Write error ")
}
c.msgChan <- binaryMsg

return nil
}
1 change: 0 additions & 1 deletion net/Message.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ func (m *Message) SetMsgId(id uint32) {
}
func (m *Message) SetMsgLen(msgLen uint32) {
m.MsgLen = msgLen

}
func (m *Message) SetData(data []byte) {
m.Data = data
Expand Down
37 changes: 37 additions & 0 deletions net/MsgHandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package net

import (
"fmt"
"strconv"
"wukong/iface"
)

type MsgHandler struct {
Apis map[uint32]iface.IRouter
}

func NewMsgHandler() *MsgHandler {
return &MsgHandler{
Apis: make(map[uint32]iface.IRouter),
}
}

func (mh *MsgHandler) DoMsgHandler(request iface.IRequest) {
handler, ok := mh.Apis[request.GetMsgId()]
if !ok {
fmt.Println(" api msgId = ", request.GetMsgId(), "is not found Need Register")
}

handler.PreHandle(request)
handler.Handle(request)
handler.PostHandle(request)
}

func (mh *MsgHandler) AddRouter(msgId uint32, router iface.IRouter) {
if _, ok := mh.Apis[msgId]; ok {
panic("repeat api , msgId = " + strconv.Itoa(int(msgId)))
}

mh.Apis[msgId] = router
fmt.Println("Add api msgId = ", msgId)
}
26 changes: 13 additions & 13 deletions net/Server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
)

type Server struct {
Name string
IPVersion string
IP string
Port int
Router iface.IRouter
Name string
IPVersion string
IP string
Port int
MsgHandler iface.IMsgHandler
}

func (s *Server) Start() {
Expand Down Expand Up @@ -46,7 +46,7 @@ func (s *Server) Start() {
continue
}

dealConn := NewConnection(conn, connId, s.Router)
dealConn := NewConnection(conn, connId, s.MsgHandler)
connId++

go dealConn.Start()
Expand All @@ -66,17 +66,17 @@ func (s *Server) Serve() {
select {}
}

func (s *Server) AddRouter(router iface.IRouter) {
s.Router = router
func (s *Server) AddRouter(msgId uint32, router iface.IRouter) {
s.MsgHandler.AddRouter(msgId, router)
fmt.Println("Add Router !!!!")
}

func NewServer() iface.IServer {
return &Server{
Name: utils.GlobalObject.Name,
IPVersion: "tcp4",
IP: utils.GlobalObject.Host,
Port: utils.GlobalObject.TcpPort,
Router: nil,
Name: utils.GlobalObject.Name,
IPVersion: "tcp4",
IP: utils.GlobalObject.Host,
Port: utils.GlobalObject.TcpPort,
MsgHandler: NewMsgHandler(),
}
}
2 changes: 1 addition & 1 deletion utils/globalobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (g *GlobalObj) Reload() {
func init() {
GlobalObject = &GlobalObj{
Name: "LiuLIServerApp",
Version: "V0.5",
Version: "V0.7",
TcpPort: 9527,
Host: "0.0.0.0",
MaxConn: 1000,
Expand Down

0 comments on commit ee7e84c

Please sign in to comment.