Skip to content

Commit

Permalink
Merge branch 'develop' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
liuligames committed Aug 8, 2021
2 parents ebfd15e + c364f3f commit c6f0edc
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 39 deletions.
2 changes: 1 addition & 1 deletion demo/Client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func main() {

for {
dp := wkNet.NewDataPack()
binaryMsg, err := dp.Pack(wkNet.NewMessage(0, []byte("LiuLiGamesV0.7 client0 Test Message ")))
binaryMsg, err := dp.Pack(wkNet.NewMessage(0, []byte("LiuLiGamesV0.8 client0 Test Message ")))
if err != nil {
fmt.Println("Pack error", err)
}
Expand Down
2 changes: 1 addition & 1 deletion demo/Client1.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func main() {

for {
dp := wkNet.NewDataPack()
binaryMsg, err := dp.Pack(wkNet.NewMessage(1, []byte("LiuLiGamesV0.7 client1 Test Message ")))
binaryMsg, err := dp.Pack(wkNet.NewMessage(1, []byte("LiuLiGamesV0.8 client1 Test Message ")))
if err != nil {
fmt.Println("Pack error", err)
}
Expand Down
20 changes: 0 additions & 20 deletions demo/Server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,6 @@ type PingRouter struct {
net.BaseRouter
}

//func (pr *PingRouter) PreHandle(request iface.IRequest) {
// fmt.Println("Call Router PreHandle ....")
//
// _, err := request.GetConnection().GetTCPConnection().Write([]byte("before ping.... \n"))
// if err != nil {
// fmt.Println("call back before ping error ")
// }
//}

func (pr *PingRouter) Handle(request iface.IRequest) {
fmt.Println("Call Router Handle ....")

Expand All @@ -31,17 +22,6 @@ func (pr *PingRouter) Handle(request iface.IRequest) {
}
}

//func (pr *PingRouter) PostHandle(request iface.IRequest) {
// fmt.Println("Call Router PostHandle ....")
//
// _, err := request.GetConnection().GetTCPConnection().Write([]byte("after ping.... \n"))
// if err != nil {
// fmt.Println("call back after ping error ")
// }
//}



type HelloRouter struct {
net.BaseRouter
}
Expand Down
5 changes: 3 additions & 2 deletions demo/conf/conf.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
name: "liuLI V0.4 demoServerApp"
name: "liuLI V0.8 demoServerApp"
Host: "127.0.0.1"
TcpPort: 9527
MaxConn: 3
MaxConn: 3
WorkerPoolSize: 10
4 changes: 4 additions & 0 deletions iface/IMsgHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,8 @@ type IMsgHandler interface {
DoMsgHandler(request IRequest)

AddRouter(msgId uint32, router IRouter)

StartWorkerPool()

SendMsgToTaskQueue(request IRequest)
}
7 changes: 6 additions & 1 deletion net/Connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"net"
"wukong/iface"
"wukong/utils"
)

type Connection struct {
Expand Down Expand Up @@ -71,7 +72,11 @@ func (c *Connection) StartReader() {
msg: msg,
}

go c.MsgHandler.DoMsgHandler(&req)
if utils.GlobalObject.WorkerPoolSize > 0 {
c.MsgHandler.SendMsgToTaskQueue(&req)
} else {
go c.MsgHandler.DoMsgHandler(&req)
}
}
}

Expand Down
36 changes: 34 additions & 2 deletions net/MsgHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@ import (
"fmt"
"strconv"
"wukong/iface"
"wukong/utils"
)

type MsgHandler struct {
Apis map[uint32]iface.IRouter
Apis map[uint32]iface.IRouter
TaskQueue []chan iface.IRequest
WorkerPoolSize uint32
}

func NewMsgHandler() *MsgHandler {
return &MsgHandler{
Apis: make(map[uint32]iface.IRouter),
Apis: make(map[uint32]iface.IRouter),
WorkerPoolSize: utils.GlobalObject.WorkerPoolSize,
TaskQueue: make([]chan iface.IRequest, utils.GlobalObject.WorkerPoolSize),
}
}

Expand All @@ -35,3 +40,30 @@ func (mh *MsgHandler) AddRouter(msgId uint32, router iface.IRouter) {
mh.Apis[msgId] = router
fmt.Println("Add api msgId = ", msgId)
}

func (mh *MsgHandler) StartWorkerPool() {
for i := 0; i < int(mh.WorkerPoolSize); i++ {
mh.TaskQueue[i] = make(chan iface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)
go mh.StartOneWorker(i, mh.TaskQueue[i])
}

}

func (mh *MsgHandler) StartOneWorker(workerId int, taskQueue chan iface.IRequest) {
fmt.Println("Worker Id = ", workerId, "is started ...")

for {
select {
case request := <-taskQueue:
mh.DoMsgHandler(request)
}
}
}

func (mh *MsgHandler) SendMsgToTaskQueue(request iface.IRequest) {
workerId := request.GetConnection().GetConnID() % mh.WorkerPoolSize
fmt.Println("add connId = ", request.GetConnection().GetConnID(),
"request msgId = ", request.GetMsgId(), "to workerId = ", workerId)
mh.TaskQueue[workerId] <- request

}
4 changes: 4 additions & 0 deletions net/Server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func (s *Server) Start() {
fmt.Printf("[Start] Server Listenner at IP :%s , Port %d \n", s.IP, s.Port)

go func() {

s.MsgHandler.StartWorkerPool()


addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
if err != nil {
fmt.Println("resolve tcp addr error :", err)
Expand Down
28 changes: 16 additions & 12 deletions utils/globalobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (

type GlobalObj struct {
TcpServer iface.IServer `yaml:"TcpServer"`
Host string `yaml:"Host"`
TcpPort int `yaml:"TcpPort"`
Name string `yaml:"Name"`
Host string `yaml:"Host"`
TcpPort int `yaml:"TcpPort"`
Name string `yaml:"Name"`

Version string `yaml:"Version"`
MaxConn int `yaml:"MaxConn"`
MaxPackageSize uint32 `yaml:"MaxPackageSize"`
Version string `yaml:"Version"`
MaxConn int `yaml:"MaxConn"`
MaxPackageSize uint32 `yaml:"MaxPackageSize"`
WorkerPoolSize uint32 `yaml:"WorkerPoolSize"`
MaxWorkerTaskLen uint32 `yaml:"MaxWorkerTaskLen"`
}

var GlobalObject *GlobalObj
Expand All @@ -32,12 +34,14 @@ func (g *GlobalObj) Reload() {

func init() {
GlobalObject = &GlobalObj{
Name: "LiuLIServerApp",
Version: "V0.7",
TcpPort: 9527,
Host: "0.0.0.0",
MaxConn: 1000,
MaxPackageSize: 4096,
Name: "LiuLIServerApp",
Version: "V0.8",
TcpPort: 9527,
Host: "0.0.0.0",
MaxConn: 1000,
MaxPackageSize: 4096,
WorkerPoolSize: 10,
MaxWorkerTaskLen: 1024,
}

GlobalObject.Reload()
Expand Down

0 comments on commit c6f0edc

Please sign in to comment.