Skip to content

Commit

Permalink
add retry in tunnel
Browse files Browse the repository at this point in the history
  • Loading branch information
wwqgtxx committed Sep 17, 2022
1 parent 78b606f commit b65f890
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 34 deletions.
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ require (
github.com/gofrs/uuid v4.2.0+incompatible
github.com/gorilla/websocket v1.5.0
github.com/insomniacslk/dhcp v0.0.0-20220822114210-de18a9d48e84
github.com/jpillora/backoff v1.0.0
github.com/kentik/patricia v1.0.0
github.com/miekg/dns v1.1.50
github.com/oschwald/geoip2-golang v1.8.0
github.com/sagernet/sing v0.0.0-20220910144724-62c4ebdbcb3f
github.com/sagernet/sing-vmess v0.0.0-20220907073918-72d7fdf6825f
github.com/sagernet/sing v0.0.0-20220916071326-834794b006ea
github.com/sagernet/sing-vmess v0.0.0-20220913015714-c4ab86d40e12
github.com/sirupsen/logrus v1.9.0
github.com/stretchr/testify v1.8.0
go.etcd.io/bbolt v1.3.6
Expand All @@ -23,7 +24,7 @@ require (
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90
golang.org/x/net v0.0.0-20220822230855-b0a4917ee28c
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde
golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64
golang.org/x/sys v0.0.0-20220913120320-3275c407cedc
golang.zx2c4.com/wireguard/windows v0.5.3
gopkg.in/yaml.v3 v3.0.1
gvisor.dev/gvisor v0.0.0-20220506231117-8ef340c14150
Expand Down
14 changes: 8 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/hugelgupf/socketpair v0.0.0-20190730060125-05d35a94e714/go.mod h1:2Goc3h8EklBH5mspfHFxBnEoURQCGzQQH1ga9Myjvis=
github.com/insomniacslk/dhcp v0.0.0-20220822114210-de18a9d48e84 h1:MJTy6H+EpXLeAn0P5WAWeLk6dJA3V0ik6S3VJfUyQuI=
github.com/insomniacslk/dhcp v0.0.0-20220822114210-de18a9d48e84/go.mod h1:h+MxyHxRg9NH3terB1nfRIUaQEcI0XOVkdR9LNBlp8E=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/jsimonetti/rtnetlink v0.0.0-20190606172950-9527aa82566a/go.mod h1:Oz+70psSo5OFh8DBl0Zv2ACw7Esh6pPUphlvZG9x7uw=
github.com/jsimonetti/rtnetlink v0.0.0-20200117123717-f846d4f6c1f4/go.mod h1:WGuG/smIU4J/54PblvSbh+xvCZmpJnFgr3ds6Z55XMQ=
github.com/jsimonetti/rtnetlink v0.0.0-20201009170750-9c6f07d100c1/go.mod h1:hqoO/u39cqLeBLebZ8fWdE96O7FxrAsRYhnVOdgHxok=
Expand All @@ -54,10 +56,10 @@ github.com/oschwald/maxminddb-golang v1.10.0/go.mod h1:Y2ELenReaLAZ0b400URyGwvYx
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/sagernet/sing v0.0.0-20220910144724-62c4ebdbcb3f h1:w1TJq7Lw3It35tDyMsZLtYz4T2msf1UK9JxC85L5+sk=
github.com/sagernet/sing v0.0.0-20220910144724-62c4ebdbcb3f/go.mod h1:kZvzh1VDa/Dg/Bt5WaYKU0jl5ept8KKDpl3Ay4gRtRQ=
github.com/sagernet/sing-vmess v0.0.0-20220907073918-72d7fdf6825f h1:6l9aXZqAl1JqXJWi89KHpWnM/moQUPGG+XiwMc+yD0A=
github.com/sagernet/sing-vmess v0.0.0-20220907073918-72d7fdf6825f/go.mod h1:u66Vv7NHXJWfeAmhh7JuJp/cwxmuQlM56QoZ7B7Mmd0=
github.com/sagernet/sing v0.0.0-20220916071326-834794b006ea h1:ZAWvZdeByPBBz3Vs+w3Erbh+DDo7D4biokoPhXl0nNU=
github.com/sagernet/sing v0.0.0-20220916071326-834794b006ea/go.mod h1:x3NHUeJBQwV75L51zwmLKQdLtRvR+M4PmXkfQtU1vIY=
github.com/sagernet/sing-vmess v0.0.0-20220913015714-c4ab86d40e12 h1:4HYGbTDDemgBVTmaspXbkgjJlXc3hYVjNxSddJndq8Y=
github.com/sagernet/sing-vmess v0.0.0-20220913015714-c4ab86d40e12/go.mod h1:u66Vv7NHXJWfeAmhh7JuJp/cwxmuQlM56QoZ7B7Mmd0=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
Expand Down Expand Up @@ -125,8 +127,8 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210525143221-35b2ab0089ea/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64 h1:UiNENfZ8gDvpiWw7IpOMQ27spWmThO1RwwdQVbJahJM=
golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220913120320-3275c407cedc h1:dpclq5m2YrqPGStKmtw7IcNbKLfbIqKXvNxDJKdIKYc=
golang.org/x/sys v0.0.0-20220913120320-3275c407cedc/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand Down
23 changes: 12 additions & 11 deletions listener/vmess/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"net"
"strings"
"sync"
"sync/atomic"

"github.com/Dreamacro/clash/adapter/inbound"
C "github.com/Dreamacro/clash/constant"
Expand Down Expand Up @@ -56,11 +55,13 @@ func (h *handler) NewConnection(ctx context.Context, conn net.Conn, metadata met

func (h *handler) NewPacketConnection(ctx context.Context, conn network.PacketConn, metadata metadata.Metadata) error {
defer func() { _ = conn.Close() }()
wg := &sync.WaitGroup{}
defer wg.Wait() // this goroutine must exit after all conn.WritePacket() is not running
allow := &atomic.Bool{}
defer allow.Store(false) // set writeBackAllow before wg.Wait()
allow.Store(true)
mutex := sync.Mutex{}
conn2 := conn // a new interface to set nil in defer
defer func() {
mutex.Lock() // this goroutine must exit after all conn.WritePacket() is not running
defer mutex.Unlock()
conn2 = nil
}()
for {
buff := buf.NewPacket() // do not use stack buffer
dest, err := conn.ReadPacket(buff)
Expand All @@ -70,11 +71,11 @@ func (h *handler) NewPacketConnection(ctx context.Context, conn network.PacketCo
}
target := socks5.ParseAddr(dest.String())
packet := &packet{
conn: conn,
rAddr: metadata.Source.UDPAddr(),
buff: buff,
writeBackWg: wg,
writeBackAllow: allow,
conn: &conn2,
mutex: &mutex,
rAddr: metadata.Source.UDPAddr(),
lAddr: conn.LocalAddr(),
buff: buff,
}
select {
case h.udpIn <- inbound.NewPacket(target, packet, C.VMESS):
Expand Down
18 changes: 8 additions & 10 deletions listener/vmess/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,18 @@ import (
"net"
"net/url"
"sync"
"sync/atomic"

"github.com/sagernet/sing/common/buf"
"github.com/sagernet/sing/common/metadata"
"github.com/sagernet/sing/common/network"
)

type packet struct {
conn network.PacketConn
conn *network.PacketConn
mutex *sync.Mutex
rAddr net.Addr
lAddr net.Addr
buff *buf.Buffer

writeBackWg *sync.WaitGroup
writeBackAllow *atomic.Bool
}

func (c *packet) Data() []byte {
Expand All @@ -38,13 +36,13 @@ func (c *packet) WriteBack(b []byte, addr net.Addr) (n int, err error) {
return
}

c.writeBackWg.Add(1)
defer c.writeBackWg.Done()
if !c.writeBackAllow.Load() {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.conn == nil {
err = errors.New("writeBack to closed connection")
return
}
err = c.conn.WritePacket(buff, metadata.ParseSocksaddr(addr.String()))
err = (*c.conn).WritePacket(buff, metadata.ParseSocksaddr(addr.String()))
return
}

Expand All @@ -58,7 +56,7 @@ func (c *packet) Drop() {
}

func (c *packet) InAddr() net.Addr {
return c.conn.LocalAddr()
return c.lAddr
}

func parseVmessURL(s string) (addr, username, password string, err error) {
Expand Down
42 changes: 38 additions & 4 deletions tunnel/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"sync"
"time"

"github.com/jpillora/backoff"

"github.com/Dreamacro/clash/adapter/inbound"
"github.com/Dreamacro/clash/common/channel"
"github.com/Dreamacro/clash/component/inner_dialer"
Expand Down Expand Up @@ -253,8 +255,9 @@ func handleUDPConn(packet *inbound.PacketAdapter) {

ctx, cancel := context.WithTimeout(context.Background(), C.DefaultUDPTimeout)
defer cancel()
rawPc, err := proxy.ListenPacketContext(ctx, metadata.Pure())
if err != nil {
rawPc, err := retry(ctx, func(ctx context.Context) (C.PacketConn, error) {
return proxy.ListenPacketContext(ctx, metadata.Pure())
}, func(err error) {
if rule == nil {
log.Warnln(
"[%s][UDP] dial %s(%s) %s --> %s error: %s",
Expand All @@ -268,6 +271,8 @@ func handleUDPConn(packet *inbound.PacketAdapter) {
} else {
log.Warnln("[%s][UDP] dial %s(%s) (match %s/%s) %s --> %s error: %s", metadata.Type.String(), proxy.Name(), metadata.Process, rule.RuleType().String(), rule.Payload(), metadata.SourceAddress(), metadata.RemoteAddress(), err.Error())
}
})
if err != nil {
return
}
pCtx.InjectPacketConn(rawPc)
Expand Down Expand Up @@ -330,8 +335,9 @@ func handleTCPConn(connCtx C.ConnContext) {

ctx, cancel := context.WithTimeout(context.Background(), C.DefaultTCPTimeout)
defer cancel()
remoteConn, err := proxy.DialContext(ctx, metadata.Pure())
if err != nil {
remoteConn, err := retry(ctx, func(ctx context.Context) (C.Conn, error) {
return proxy.DialContext(ctx, metadata.Pure())
}, func(err error) {
if rule == nil {
log.Warnln(
"[%s][TCP] dial %s(%s) %s --> %s error: %s",
Expand All @@ -345,6 +351,8 @@ func handleTCPConn(connCtx C.ConnContext) {
} else {
log.Warnln("[%s][TCP] dial %s(%s) (match %s/%s) %s --> %s error: %s", metadata.Type.String(), proxy.Name(), metadata.Process, rule.RuleType().String(), rule.Payload(), metadata.SourceAddress(), metadata.RemoteAddress(), err.Error())
}
})
if err != nil {
return
}
remoteConn = statistic.NewTCPTracker(remoteConn, statistic.DefaultManager, metadata, rule)
Expand Down Expand Up @@ -473,3 +481,29 @@ func match(metadata *C.Metadata) (C.Proxy, C.Rule, error) {

return proxies["DIRECT"], nil, nil
}

func retry[T any](ctx context.Context, ft func(context.Context) (T, error), fe func(err error)) (t T, err error) {
b := &backoff.Backoff{
Min: 10 * time.Millisecond,
Max: 1 * time.Second,
Factor: 2,
Jitter: true,
}
for i := 0; i < 10; i++ {
t, err = ft(ctx)
if err != nil {
if fe != nil {
fe(err)
}
select {
case <-time.After(b.Duration()):
continue
case <-ctx.Done():
return
}
} else {
break
}
}
return
}

0 comments on commit b65f890

Please sign in to comment.