From 154ad97db4bf687fc0ba8e0d7db6f1904f19bcb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?xinfan=2Ewu=28=E5=90=B4=E6=AD=86=E5=B8=86=29?= Date: Wed, 29 May 2024 20:40:54 +0800 Subject: [PATCH] feat:update to dubbo-getty-v1.4.12 --- transport/client.go | 18 ++++++++++++------ transport/client_test.go | 12 ++++++------ transport/session.go | 1 - 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/transport/client.go b/transport/client.go index ceed74f..4b17f94 100644 --- a/transport/client.go +++ b/transport/client.go @@ -422,8 +422,10 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) { // a for-loop connect to make sure the connection pool is valid func (c *client) reConnect() { - var num, max, times, interval int - + var ( + num, max, times, interval int + maxDuration int64 + ) max = c.number interval = c.reconnectInterval if interval == 0 { @@ -436,15 +438,18 @@ func (c *client) reConnect() { } num = c.sessionNum() - if max <= num { + if max <= num || max < times { + //Exit when the number of connection pools is sufficient or the reconnection times exceeds the connections numbers. break } c.connect() times++ - if maxTimes < times { - times = maxTimes + if times > maxTimes { + maxDuration = int64(maxTimes) * int64(interval) + } else { + maxDuration = int64(times) * int64(interval) } - <-gxtime.After(time.Duration(int64(times) * int64(interval))) + <-gxtime.After(time.Duration(maxDuration)) } } @@ -458,6 +463,7 @@ func (c *client) stop() { c.Lock() for s := range c.ssMap { s.RemoveAttribute(sessionClientKey) + s.RemoveAttribute(ignoreReconnectKey) s.Close() } c.ssMap = nil diff --git a/transport/client_test.go b/transport/client_test.go index 50f4cda..05af4ad 100644 --- a/transport/client_test.go +++ b/transport/client_test.go @@ -140,7 +140,7 @@ func TestTCPClient(t *testing.T) { assert.True(t, conn.compress == CompressNone) beforeWriteBytes := conn.writeBytes beforeWritePkgNum := conn.writePkgNum - l, err := conn.send([]byte("hello")) + l, err := conn.Send([]byte("hello")) assert.Nil(t, err) assert.True(t, l == 5) beforeWritePkgNum.Add(1) @@ -156,7 +156,7 @@ func TestTCPClient(t *testing.T) { assert.Equal(t, beforeWritePkgNum, conn.writePkgNum) var pkgs [][]byte pkgs = append(pkgs, []byte("hello"), []byte("hello")) - l, err = conn.send(pkgs) + l, err = conn.Send(pkgs) assert.Nil(t, err) assert.True(t, l == 10) beforeWritePkgNum.Add(2) @@ -275,11 +275,11 @@ func TestUDPClient(t *testing.T) { } t.Logf("udp context:%s", udpCtx) udpConn := ss.(*session).Connection.(*gettyUDPConn) - _, err = udpConn.send(udpCtx) + _, err = udpConn.Send(udpCtx) assert.NotNil(t, err) udpCtx.Pkg = []byte("hello") beforeWriteBytes := udpConn.writeBytes - _, err = udpConn.send(udpCtx) + _, err = udpConn.Send(udpCtx) beforeWriteBytes.Add(5) assert.Equal(t, beforeWriteBytes, udpConn.writeBytes) assert.Nil(t, err) @@ -338,7 +338,7 @@ func TestNewWSClient(t *testing.T) { assert.True(t, conn.compress == CompressNone) err := conn.handlePing("hello") assert.Nil(t, err) - l, err := conn.send("hello") + l, err := conn.Send("hello") assert.NotNil(t, err) assert.True(t, l == 0) ss.SetSession(ss) @@ -347,7 +347,7 @@ func TestNewWSClient(t *testing.T) { active := ss.GetActive() assert.NotNil(t, active) beforeWriteBytes := conn.writeBytes - _, err = conn.send([]byte("hello")) + _, err = conn.Send([]byte("hello")) assert.Nil(t, err) beforeWriteBytes.Add(5) assert.Equal(t, beforeWriteBytes, conn.writeBytes) diff --git a/transport/session.go b/transport/session.go index 03128e4..4173dd7 100644 --- a/transport/session.go +++ b/transport/session.go @@ -571,7 +571,6 @@ func (s *session) addTask(pkg interface{}) { log.Errorf("[Id:%d, name=%s, endpoint=%s] Session is closed", s.ID(), s.name, s.EndPoint()) return } - s.listener.OnMessage(s, pkg) s.IncReadPkgNum() }