diff --git a/go.mod b/go.mod index e19a22af36..1da39f7727 100644 --- a/go.mod +++ b/go.mod @@ -10,11 +10,12 @@ require ( github.com/gofrs/uuid v4.2.0+incompatible github.com/gorilla/websocket v1.5.0 github.com/insomniacslk/dhcp v0.0.0-20220822114210-de18a9d48e84 + github.com/jpillora/backoff v1.0.0 github.com/kentik/patricia v1.0.0 github.com/miekg/dns v1.1.50 github.com/oschwald/geoip2-golang v1.8.0 - github.com/sagernet/sing v0.0.0-20220910144724-62c4ebdbcb3f - github.com/sagernet/sing-vmess v0.0.0-20220907073918-72d7fdf6825f + github.com/sagernet/sing v0.0.0-20220916071326-834794b006ea + github.com/sagernet/sing-vmess v0.0.0-20220913015714-c4ab86d40e12 github.com/sirupsen/logrus v1.9.0 github.com/stretchr/testify v1.8.0 go.etcd.io/bbolt v1.3.6 @@ -23,7 +24,7 @@ require ( golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 golang.org/x/net v0.0.0-20220822230855-b0a4917ee28c golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde - golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64 + golang.org/x/sys v0.0.0-20220913120320-3275c407cedc golang.zx2c4.com/wireguard/windows v0.5.3 gopkg.in/yaml.v3 v3.0.1 gvisor.dev/gvisor v0.0.0-20220506231117-8ef340c14150 diff --git a/go.sum b/go.sum index 899c65ad8c..8bb6952a30 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,8 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/hugelgupf/socketpair v0.0.0-20190730060125-05d35a94e714/go.mod h1:2Goc3h8EklBH5mspfHFxBnEoURQCGzQQH1ga9Myjvis= github.com/insomniacslk/dhcp v0.0.0-20220822114210-de18a9d48e84 h1:MJTy6H+EpXLeAn0P5WAWeLk6dJA3V0ik6S3VJfUyQuI= github.com/insomniacslk/dhcp v0.0.0-20220822114210-de18a9d48e84/go.mod h1:h+MxyHxRg9NH3terB1nfRIUaQEcI0XOVkdR9LNBlp8E= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/jsimonetti/rtnetlink v0.0.0-20190606172950-9527aa82566a/go.mod h1:Oz+70psSo5OFh8DBl0Zv2ACw7Esh6pPUphlvZG9x7uw= github.com/jsimonetti/rtnetlink v0.0.0-20200117123717-f846d4f6c1f4/go.mod h1:WGuG/smIU4J/54PblvSbh+xvCZmpJnFgr3ds6Z55XMQ= github.com/jsimonetti/rtnetlink v0.0.0-20201009170750-9c6f07d100c1/go.mod h1:hqoO/u39cqLeBLebZ8fWdE96O7FxrAsRYhnVOdgHxok= @@ -54,10 +56,10 @@ github.com/oschwald/maxminddb-golang v1.10.0/go.mod h1:Y2ELenReaLAZ0b400URyGwvYx github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= -github.com/sagernet/sing v0.0.0-20220910144724-62c4ebdbcb3f h1:w1TJq7Lw3It35tDyMsZLtYz4T2msf1UK9JxC85L5+sk= -github.com/sagernet/sing v0.0.0-20220910144724-62c4ebdbcb3f/go.mod h1:kZvzh1VDa/Dg/Bt5WaYKU0jl5ept8KKDpl3Ay4gRtRQ= -github.com/sagernet/sing-vmess v0.0.0-20220907073918-72d7fdf6825f h1:6l9aXZqAl1JqXJWi89KHpWnM/moQUPGG+XiwMc+yD0A= -github.com/sagernet/sing-vmess v0.0.0-20220907073918-72d7fdf6825f/go.mod h1:u66Vv7NHXJWfeAmhh7JuJp/cwxmuQlM56QoZ7B7Mmd0= +github.com/sagernet/sing v0.0.0-20220916071326-834794b006ea h1:ZAWvZdeByPBBz3Vs+w3Erbh+DDo7D4biokoPhXl0nNU= +github.com/sagernet/sing v0.0.0-20220916071326-834794b006ea/go.mod h1:x3NHUeJBQwV75L51zwmLKQdLtRvR+M4PmXkfQtU1vIY= +github.com/sagernet/sing-vmess v0.0.0-20220913015714-c4ab86d40e12 h1:4HYGbTDDemgBVTmaspXbkgjJlXc3hYVjNxSddJndq8Y= +github.com/sagernet/sing-vmess v0.0.0-20220913015714-c4ab86d40e12/go.mod h1:u66Vv7NHXJWfeAmhh7JuJp/cwxmuQlM56QoZ7B7Mmd0= github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= @@ -125,8 +127,8 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210525143221-35b2ab0089ea/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64 h1:UiNENfZ8gDvpiWw7IpOMQ27spWmThO1RwwdQVbJahJM= -golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220913120320-3275c407cedc h1:dpclq5m2YrqPGStKmtw7IcNbKLfbIqKXvNxDJKdIKYc= +golang.org/x/sys v0.0.0-20220913120320-3275c407cedc/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= diff --git a/listener/vmess/server.go b/listener/vmess/server.go index ffe565e6d4..d86f8fd07e 100644 --- a/listener/vmess/server.go +++ b/listener/vmess/server.go @@ -5,7 +5,6 @@ import ( "net" "strings" "sync" - "sync/atomic" "github.com/Dreamacro/clash/adapter/inbound" C "github.com/Dreamacro/clash/constant" @@ -56,11 +55,13 @@ func (h *handler) NewConnection(ctx context.Context, conn net.Conn, metadata met func (h *handler) NewPacketConnection(ctx context.Context, conn network.PacketConn, metadata metadata.Metadata) error { defer func() { _ = conn.Close() }() - wg := &sync.WaitGroup{} - defer wg.Wait() // this goroutine must exit after all conn.WritePacket() is not running - allow := &atomic.Bool{} - defer allow.Store(false) // set writeBackAllow before wg.Wait() - allow.Store(true) + mutex := sync.Mutex{} + conn2 := conn // a new interface to set nil in defer + defer func() { + mutex.Lock() // this goroutine must exit after all conn.WritePacket() is not running + defer mutex.Unlock() + conn2 = nil + }() for { buff := buf.NewPacket() // do not use stack buffer dest, err := conn.ReadPacket(buff) @@ -70,11 +71,11 @@ func (h *handler) NewPacketConnection(ctx context.Context, conn network.PacketCo } target := socks5.ParseAddr(dest.String()) packet := &packet{ - conn: conn, - rAddr: metadata.Source.UDPAddr(), - buff: buff, - writeBackWg: wg, - writeBackAllow: allow, + conn: &conn2, + mutex: &mutex, + rAddr: metadata.Source.UDPAddr(), + lAddr: conn.LocalAddr(), + buff: buff, } select { case h.udpIn <- inbound.NewPacket(target, packet, C.VMESS): diff --git a/listener/vmess/utils.go b/listener/vmess/utils.go index 42cf2a09e2..cb66bca7c1 100644 --- a/listener/vmess/utils.go +++ b/listener/vmess/utils.go @@ -5,7 +5,6 @@ import ( "net" "net/url" "sync" - "sync/atomic" "github.com/sagernet/sing/common/buf" "github.com/sagernet/sing/common/metadata" @@ -13,12 +12,11 @@ import ( ) type packet struct { - conn network.PacketConn + conn *network.PacketConn + mutex *sync.Mutex rAddr net.Addr + lAddr net.Addr buff *buf.Buffer - - writeBackWg *sync.WaitGroup - writeBackAllow *atomic.Bool } func (c *packet) Data() []byte { @@ -38,13 +36,13 @@ func (c *packet) WriteBack(b []byte, addr net.Addr) (n int, err error) { return } - c.writeBackWg.Add(1) - defer c.writeBackWg.Done() - if !c.writeBackAllow.Load() { + c.mutex.Lock() + defer c.mutex.Unlock() + if c.conn == nil { err = errors.New("writeBack to closed connection") return } - err = c.conn.WritePacket(buff, metadata.ParseSocksaddr(addr.String())) + err = (*c.conn).WritePacket(buff, metadata.ParseSocksaddr(addr.String())) return } @@ -58,7 +56,7 @@ func (c *packet) Drop() { } func (c *packet) InAddr() net.Addr { - return c.conn.LocalAddr() + return c.lAddr } func parseVmessURL(s string) (addr, username, password string, err error) { diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index 58e634774c..d964e0f7e3 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -11,6 +11,8 @@ import ( "sync" "time" + "github.com/jpillora/backoff" + "github.com/Dreamacro/clash/adapter/inbound" "github.com/Dreamacro/clash/common/channel" "github.com/Dreamacro/clash/component/inner_dialer" @@ -253,8 +255,9 @@ func handleUDPConn(packet *inbound.PacketAdapter) { ctx, cancel := context.WithTimeout(context.Background(), C.DefaultUDPTimeout) defer cancel() - rawPc, err := proxy.ListenPacketContext(ctx, metadata.Pure()) - if err != nil { + rawPc, err := retry(ctx, func(ctx context.Context) (C.PacketConn, error) { + return proxy.ListenPacketContext(ctx, metadata.Pure()) + }, func(err error) { if rule == nil { log.Warnln( "[%s][UDP] dial %s(%s) %s --> %s error: %s", @@ -268,6 +271,8 @@ func handleUDPConn(packet *inbound.PacketAdapter) { } else { log.Warnln("[%s][UDP] dial %s(%s) (match %s/%s) %s --> %s error: %s", metadata.Type.String(), proxy.Name(), metadata.Process, rule.RuleType().String(), rule.Payload(), metadata.SourceAddress(), metadata.RemoteAddress(), err.Error()) } + }) + if err != nil { return } pCtx.InjectPacketConn(rawPc) @@ -330,8 +335,9 @@ func handleTCPConn(connCtx C.ConnContext) { ctx, cancel := context.WithTimeout(context.Background(), C.DefaultTCPTimeout) defer cancel() - remoteConn, err := proxy.DialContext(ctx, metadata.Pure()) - if err != nil { + remoteConn, err := retry(ctx, func(ctx context.Context) (C.Conn, error) { + return proxy.DialContext(ctx, metadata.Pure()) + }, func(err error) { if rule == nil { log.Warnln( "[%s][TCP] dial %s(%s) %s --> %s error: %s", @@ -345,6 +351,8 @@ func handleTCPConn(connCtx C.ConnContext) { } else { log.Warnln("[%s][TCP] dial %s(%s) (match %s/%s) %s --> %s error: %s", metadata.Type.String(), proxy.Name(), metadata.Process, rule.RuleType().String(), rule.Payload(), metadata.SourceAddress(), metadata.RemoteAddress(), err.Error()) } + }) + if err != nil { return } remoteConn = statistic.NewTCPTracker(remoteConn, statistic.DefaultManager, metadata, rule) @@ -473,3 +481,29 @@ func match(metadata *C.Metadata) (C.Proxy, C.Rule, error) { return proxies["DIRECT"], nil, nil } + +func retry[T any](ctx context.Context, ft func(context.Context) (T, error), fe func(err error)) (t T, err error) { + b := &backoff.Backoff{ + Min: 10 * time.Millisecond, + Max: 1 * time.Second, + Factor: 2, + Jitter: true, + } + for i := 0; i < 10; i++ { + t, err = ft(ctx) + if err != nil { + if fe != nil { + fe(err) + } + select { + case <-time.After(b.Duration()): + continue + case <-ctx.Done(): + return + } + } else { + break + } + } + return +}