Skip to content

Commit

Permalink
misc
Browse files Browse the repository at this point in the history
  • Loading branch information
gunlee01 committed Jan 18, 2021
1 parent cf524bb commit e0c081c
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 9 deletions.
2 changes: 2 additions & 0 deletions scouterx/conf/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Configure struct {
lastModified time.Time

_trace bool
TraceObjSend bool

SendQueueSize int
ObjHash int32
Expand Down Expand Up @@ -238,6 +239,7 @@ func (conf *Configure) addToConf(props *properties.Properties) {

conf.resetObjNameAndType(props)
conf._trace = props.GetBool("_trace", false)
conf.TraceObjSend = props.GetBool("trace_obj_send", false)

conf.SendQueueSize = props.GetInt("send_queue_size", 3000)

Expand Down
2 changes: 1 addition & 1 deletion scouterx/netio/tcpclient/tcpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *TCPClient) Prepare() bool {
logger.Info.Printf("[scouter]tcp prepare %s, %d", c.host, c.port)
conn0, err := net.DialTimeout("tcp", c.host+":"+strconv.Itoa(c.port), time.Duration(c.connectionTimeout)*time.Millisecond)
if err != nil {
logger.Error.Printf("[scouter][err]%v", err)
logger.Error.Printf("[scouter][err]%v\n", err)
c.conn = nil
return false
}
Expand Down
2 changes: 1 addition & 1 deletion scouterx/netio/tcpclient/tcpmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func startTcp() {
if client.Prepare() {
err := client.Process()
if err != nil {
logger.Error.Printf("[scouter][err]connection to collector: %v", err)
logger.Error.Printf("[scouter][err]connection to collector: %v\n", err)
time.Sleep(time.Duration(min(sleep, maxSleep)) * time.Millisecond)
sleep = sleep * 2
if sleep > maxSleep {
Expand Down
9 changes: 5 additions & 4 deletions scouterx/netio/udpclient/udpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,23 @@ func (udpClient *UDPClient) open() error {
s, err := net.ResolveUDPAddr("udp", address)

if err != nil {
logger.Error.Printf("can't initialize udp client. %s\n", err.Error())
logger.Error.Printf("[scouter] can't resolve udp client. %s\n", err.Error())
return err
}
udpClient.Conn, err = net.DialUDP("udp", nil, s)
if err != nil {
logger.Error.Printf("can't initialize udp client. %s\n", err.Error())
logger.Error.Printf("[scouter] can't dialup udp client. %s\n", err.Error())
return err
}
return nil
}

func (udpClient *UDPClient) close() {
udpClient.Conn.Close()
if udpClient.Conn != nil {
udpClient.Conn.Close()
}
}


func (udpClient *UDPClient) writeMTU(data []byte, packetSize int) bool {
if udpClient.Conn == nil {
return false
Expand Down
41 changes: 38 additions & 3 deletions scouterx/netio/udpsender/udpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import (
var once sync.Once
var ac = conf.GetInstance()

var udpObjHash int32
var serverAddr string
var udpServerPort int
var udpMaxBytes int

type UDPSender struct {
udpChannel chan []byte
running bool
Expand All @@ -25,17 +30,42 @@ var udpSender *UDPSender

func GetInstance() *UDPSender {
once.Do(func() {
serverAddr = ac.NetCollectorIP
udpServerPort = ac.NetCollectorUDPPort
udpMaxBytes = ac.UDPMaxBytes

udpSender = new(UDPSender)
udpSender.udpChannel = channelfactory.GetUDPChannel()
udpSender.running = true
udpSender.udpClient = udpclient.New(ac.NetCollectorIP, ac.NetCollectorUDPPort)
udpSender.udpClient.SetUDPMaxBytes(ac.UDPMaxBytes)
udpSender.running = true
udpSender.udpClient = udpclient.New(serverAddr, udpServerPort)
udpSender.udpClient.SetUDPMaxBytes(udpMaxBytes)
go udpSender.run()
go reloadUdpSender()
})
return udpSender
}

func reloadUdpSender() {
for {
time.Sleep(1000)
if serverAddr != ac.NetCollectorIP || udpServerPort != ac.NetCollectorUDPPort || udpMaxBytes != ac.UDPMaxBytes {
serverAddr = ac.NetCollectorIP
udpServerPort = ac.NetCollectorUDPPort
udpMaxBytes = ac.UDPMaxBytes

udpSender.running = true
prevClient := udpSender.udpClient
udpClient := udpclient.New(serverAddr, udpServerPort)
udpClient.SetUDPMaxBytes(udpMaxBytes)
udpSender.udpClient = udpClient

if prevClient.Conn != nil {
prevClient.Conn.Close()
}
}
}
}

func (udpSender *UDPSender) AddPack(pack netdata.Pack) {
writePack, _ := netdata.NewDataOutputX(nil).WritePack(pack)
bytes := writePack.Bytes()
Expand All @@ -55,6 +85,11 @@ func (udpSender *UDPSender) AddBuffer(buffer []byte) {
}

func (udpSender *UDPSender) SendPackDirect(pack netdata.Pack) {
if ac.TraceObjSend {
if p, ok := pack.(*netdata.ObjectPack); ok {
logger.Info.Printf("[scouter] SendPackDirect[ObjPack], to:%s, pack:%s", udpSender.udpClient.Conn.RemoteAddr(), p.ToString())
}
}
writePack, _ := netdata.NewDataOutputX(nil).WritePack(pack)
bytes := writePack.Bytes()
go udpSender.udpClient.WriteBuffer(bytes)
Expand Down
5 changes: 5 additions & 0 deletions scouterx/task/agenttask/agenttask.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agenttask

import (
"github.com/scouter-contrib/scouter-agent-golang/scouterx/common/logger"
"github.com/scouter-contrib/scouter-agent-golang/scouterx/conf"
"github.com/scouter-contrib/scouter-agent-golang/scouterx/netio"
"github.com/scouter-contrib/scouter-agent-golang/scouterx/common/netdata"
Expand Down Expand Up @@ -28,5 +29,9 @@ func SendObjPack() {
objPack.ObjHash = objHash
objPack.ObjType = ac.ObjType
objPack.Version = "0.0.0"

if ac.TraceObjSend {
logger.Info.Printf("[scouter] SendObjPack: %s, %d, %s", objName, objHash, ac.ObjType)
}
netio.SendPackDirect(objPack)
}

0 comments on commit e0c081c

Please sign in to comment.