From c364f3f251d00ff159c9064334276b31669f8b9b Mon Sep 17 00:00:00 2001 From: LiuShaowen Date: Sun, 8 Aug 2021 22:35:47 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B1=A0=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- demo/Client.go | 2 +- demo/Client1.go | 2 +- demo/Server.go | 20 -------------------- demo/conf/conf.yaml | 5 +++-- iface/IMsgHandler.go | 4 ++++ net/Connection.go | 7 ++++++- net/MsgHandler.go | 36 ++++++++++++++++++++++++++++++++++-- net/Server.go | 4 ++++ utils/globalobj.go | 28 ++++++++++++++++------------ 9 files changed, 69 insertions(+), 39 deletions(-) diff --git a/demo/Client.go b/demo/Client.go index 283a457..4b7e30b 100644 --- a/demo/Client.go +++ b/demo/Client.go @@ -21,7 +21,7 @@ func main() { for { dp := wkNet.NewDataPack() - binaryMsg, err := dp.Pack(wkNet.NewMessage(0, []byte("LiuLiGamesV0.7 client0 Test Message "))) + binaryMsg, err := dp.Pack(wkNet.NewMessage(0, []byte("LiuLiGamesV0.8 client0 Test Message "))) if err != nil { fmt.Println("Pack error", err) } diff --git a/demo/Client1.go b/demo/Client1.go index 24fdc89..50c7880 100644 --- a/demo/Client1.go +++ b/demo/Client1.go @@ -21,7 +21,7 @@ func main() { for { dp := wkNet.NewDataPack() - binaryMsg, err := dp.Pack(wkNet.NewMessage(1, []byte("LiuLiGamesV0.7 client1 Test Message "))) + binaryMsg, err := dp.Pack(wkNet.NewMessage(1, []byte("LiuLiGamesV0.8 client1 Test Message "))) if err != nil { fmt.Println("Pack error", err) } diff --git a/demo/Server.go b/demo/Server.go index a1d8ab9..0f0f5c0 100644 --- a/demo/Server.go +++ b/demo/Server.go @@ -10,15 +10,6 @@ type PingRouter struct { net.BaseRouter } -//func (pr *PingRouter) PreHandle(request iface.IRequest) { -// fmt.Println("Call Router PreHandle ....") -// -// _, err := request.GetConnection().GetTCPConnection().Write([]byte("before ping.... \n")) -// if err != nil { -// fmt.Println("call back before ping error ") -// } -//} - func (pr *PingRouter) Handle(request iface.IRequest) { fmt.Println("Call Router Handle ....") @@ -31,17 +22,6 @@ func (pr *PingRouter) Handle(request iface.IRequest) { } } -//func (pr *PingRouter) PostHandle(request iface.IRequest) { -// fmt.Println("Call Router PostHandle ....") -// -// _, err := request.GetConnection().GetTCPConnection().Write([]byte("after ping.... \n")) -// if err != nil { -// fmt.Println("call back after ping error ") -// } -//} - - - type HelloRouter struct { net.BaseRouter } diff --git a/demo/conf/conf.yaml b/demo/conf/conf.yaml index 3d88a6d..cb8bc94 100644 --- a/demo/conf/conf.yaml +++ b/demo/conf/conf.yaml @@ -1,4 +1,5 @@ -name: "liuLI V0.4 demoServerApp" +name: "liuLI V0.8 demoServerApp" Host: "127.0.0.1" TcpPort: 9527 -MaxConn: 3 \ No newline at end of file +MaxConn: 3 +WorkerPoolSize: 10 \ No newline at end of file diff --git a/iface/IMsgHandler.go b/iface/IMsgHandler.go index ebf997d..e952e69 100644 --- a/iface/IMsgHandler.go +++ b/iface/IMsgHandler.go @@ -4,4 +4,8 @@ type IMsgHandler interface { DoMsgHandler(request IRequest) AddRouter(msgId uint32, router IRouter) + + StartWorkerPool() + + SendMsgToTaskQueue(request IRequest) } diff --git a/net/Connection.go b/net/Connection.go index 2e174f5..d44640b 100644 --- a/net/Connection.go +++ b/net/Connection.go @@ -6,6 +6,7 @@ import ( "io" "net" "wukong/iface" + "wukong/utils" ) type Connection struct { @@ -71,7 +72,11 @@ func (c *Connection) StartReader() { msg: msg, } - go c.MsgHandler.DoMsgHandler(&req) + if utils.GlobalObject.WorkerPoolSize > 0 { + c.MsgHandler.SendMsgToTaskQueue(&req) + } else { + go c.MsgHandler.DoMsgHandler(&req) + } } } diff --git a/net/MsgHandler.go b/net/MsgHandler.go index 744d03f..3886eba 100644 --- a/net/MsgHandler.go +++ b/net/MsgHandler.go @@ -4,15 +4,20 @@ import ( "fmt" "strconv" "wukong/iface" + "wukong/utils" ) type MsgHandler struct { - Apis map[uint32]iface.IRouter + Apis map[uint32]iface.IRouter + TaskQueue []chan iface.IRequest + WorkerPoolSize uint32 } func NewMsgHandler() *MsgHandler { return &MsgHandler{ - Apis: make(map[uint32]iface.IRouter), + Apis: make(map[uint32]iface.IRouter), + WorkerPoolSize: utils.GlobalObject.WorkerPoolSize, + TaskQueue: make([]chan iface.IRequest, utils.GlobalObject.WorkerPoolSize), } } @@ -35,3 +40,30 @@ func (mh *MsgHandler) AddRouter(msgId uint32, router iface.IRouter) { mh.Apis[msgId] = router fmt.Println("Add api msgId = ", msgId) } + +func (mh *MsgHandler) StartWorkerPool() { + for i := 0; i < int(mh.WorkerPoolSize); i++ { + mh.TaskQueue[i] = make(chan iface.IRequest, utils.GlobalObject.MaxWorkerTaskLen) + go mh.StartOneWorker(i, mh.TaskQueue[i]) + } + +} + +func (mh *MsgHandler) StartOneWorker(workerId int, taskQueue chan iface.IRequest) { + fmt.Println("Worker Id = ", workerId, "is started ...") + + for { + select { + case request := <-taskQueue: + mh.DoMsgHandler(request) + } + } +} + +func (mh *MsgHandler) SendMsgToTaskQueue(request iface.IRequest) { + workerId := request.GetConnection().GetConnID() % mh.WorkerPoolSize + fmt.Println("add connId = ", request.GetConnection().GetConnID(), + "request msgId = ", request.GetMsgId(), "to workerId = ", workerId) + mh.TaskQueue[workerId] <- request + +} diff --git a/net/Server.go b/net/Server.go index 4ff5774..abb8ecb 100644 --- a/net/Server.go +++ b/net/Server.go @@ -23,6 +23,10 @@ func (s *Server) Start() { fmt.Printf("[Start] Server Listenner at IP :%s , Port %d \n", s.IP, s.Port) go func() { + + s.MsgHandler.StartWorkerPool() + + addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port)) if err != nil { fmt.Println("resolve tcp addr error :", err) diff --git a/utils/globalobj.go b/utils/globalobj.go index 21348ec..9047ebb 100644 --- a/utils/globalobj.go +++ b/utils/globalobj.go @@ -8,13 +8,15 @@ import ( type GlobalObj struct { TcpServer iface.IServer `yaml:"TcpServer"` - Host string `yaml:"Host"` - TcpPort int `yaml:"TcpPort"` - Name string `yaml:"Name"` + Host string `yaml:"Host"` + TcpPort int `yaml:"TcpPort"` + Name string `yaml:"Name"` - Version string `yaml:"Version"` - MaxConn int `yaml:"MaxConn"` - MaxPackageSize uint32 `yaml:"MaxPackageSize"` + Version string `yaml:"Version"` + MaxConn int `yaml:"MaxConn"` + MaxPackageSize uint32 `yaml:"MaxPackageSize"` + WorkerPoolSize uint32 `yaml:"WorkerPoolSize"` + MaxWorkerTaskLen uint32 `yaml:"MaxWorkerTaskLen"` } var GlobalObject *GlobalObj @@ -32,12 +34,14 @@ func (g *GlobalObj) Reload() { func init() { GlobalObject = &GlobalObj{ - Name: "LiuLIServerApp", - Version: "V0.7", - TcpPort: 9527, - Host: "0.0.0.0", - MaxConn: 1000, - MaxPackageSize: 4096, + Name: "LiuLIServerApp", + Version: "V0.8", + TcpPort: 9527, + Host: "0.0.0.0", + MaxConn: 1000, + MaxPackageSize: 4096, + WorkerPoolSize: 10, + MaxWorkerTaskLen: 1024, } GlobalObject.Reload()