diff --git a/app/app.go b/app/app.go index 1b1020350..6198696fd 100644 --- a/app/app.go +++ b/app/app.go @@ -94,13 +94,14 @@ func RunWarp(ctx context.Context, l *slog.Logger, opts WarpOptions) error { } func runWarp(ctx context.Context, l *slog.Logger, opts WarpOptions, endpoint string) error { - conf, err := wiresocks.ParseConfig(path.Join(opts.CacheDir, "primary", "wgcf-profile.ini"), endpoint) + conf, err := wiresocks.ParseConfig(path.Join(opts.CacheDir, "primary", "wgcf-profile.ini")) if err != nil { return err } conf.Interface.MTU = singleMTU for i, peer := range conf.Peers { + peer.Endpoint = endpoint peer.Trick = true peer.KeepAlive = 3 conf.Peers[i] = peer @@ -122,13 +123,14 @@ func runWarp(ctx context.Context, l *slog.Logger, opts WarpOptions, endpoint str } func runWarpWithPsiphon(ctx context.Context, l *slog.Logger, opts WarpOptions, endpoint string) error { - conf, err := wiresocks.ParseConfig(path.Join(opts.CacheDir, "primary", "wgcf-profile.ini"), endpoint) + conf, err := wiresocks.ParseConfig(path.Join(opts.CacheDir, "primary", "wgcf-profile.ini")) if err != nil { return err } conf.Interface.MTU = singleMTU for i, peer := range conf.Peers { + peer.Endpoint = endpoint peer.Trick = true peer.KeepAlive = 3 conf.Peers[i] = peer @@ -157,13 +159,14 @@ func runWarpWithPsiphon(ctx context.Context, l *slog.Logger, opts WarpOptions, e func runWarpInWarp(ctx context.Context, l *slog.Logger, opts WarpOptions, endpoints []string) error { // Run outer warp - conf, err := wiresocks.ParseConfig(path.Join(opts.CacheDir, "primary", "wgcf-profile.ini"), endpoints[0]) + conf, err := wiresocks.ParseConfig(path.Join(opts.CacheDir, "primary", "wgcf-profile.ini")) if err != nil { return err } conf.Interface.MTU = singleMTU for i, peer := range conf.Peers { + peer.Endpoint = endpoints[0] peer.Trick = true peer.KeepAlive = 3 conf.Peers[i] = peer @@ -181,13 +184,14 @@ func runWarpInWarp(ctx context.Context, l *slog.Logger, opts WarpOptions, endpoi } // Run inner warp - conf, err = wiresocks.ParseConfig(path.Join(opts.CacheDir, "secondary", "wgcf-profile.ini"), addr.String()) + conf, err = wiresocks.ParseConfig(path.Join(opts.CacheDir, "secondary", "wgcf-profile.ini")) if err != nil { return err } conf.Interface.MTU = doubleMTU for i, peer := range conf.Peers { + peer.Endpoint = addr.String() peer.KeepAlive = 10 conf.Peers[i] = peer } diff --git a/go.mod b/go.mod index a5e551125..ea52258a4 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,8 @@ replace github.com/Psiphon-Labs/psiphon-tunnel-core => github.com/bepass-org/psi require ( github.com/Psiphon-Labs/psiphon-tunnel-core v2.0.28+incompatible + github.com/adrg/xdg v0.4.0 + github.com/carlmjohnson/versioninfo v0.22.5 github.com/fatih/color v1.16.0 github.com/flynn/noise v1.1.0 github.com/frankban/quicktest v1.14.6 @@ -16,6 +18,7 @@ require ( github.com/quic-go/quic-go v0.40.1 github.com/refraction-networking/utls v1.3.3 github.com/rodaine/table v1.1.1 + github.com/things-go/go-socks5 v0.0.5 golang.org/x/crypto v0.21.0 golang.org/x/net v0.22.0 golang.org/x/sys v0.18.0 @@ -31,11 +34,9 @@ require ( github.com/Psiphon-Labs/goptlib v0.0.0-20200406165125-c0e32a7a3464 // indirect github.com/Psiphon-Labs/psiphon-tls v0.0.0-20240305020009-09f917290799 // indirect github.com/Psiphon-Labs/quic-go v0.0.0-20240305203241-7c4a760d03cc // indirect - github.com/adrg/xdg v0.4.0 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/armon/go-proxyproto v0.0.0-20180202201750-5b7edb60ff5f // indirect github.com/bifurcation/mint v0.0.0-20180306135233-198357931e61 // indirect - github.com/carlmjohnson/versioninfo v0.22.5 // indirect github.com/cheekybits/genny v0.0.0-20170328200008-9127e812e1e9 // indirect github.com/cognusion/go-cache-lru v0.0.0-20170419142635-f73e2280ecea // indirect github.com/dchest/siphash v1.2.3 // indirect diff --git a/go.sum b/go.sum index a530a56a4..1b3a86dcc 100644 --- a/go.sum +++ b/go.sum @@ -204,6 +204,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 h1:kdXcSzyDtseVEc4yCz2qF8ZrQvIDBJLl4S1c3GCXmoI= github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= +github.com/things-go/go-socks5 v0.0.5 h1:qvKaGcBkfDrUL33SchHN93srAmYGzb4CxSM2DPYufe8= +github.com/things-go/go-socks5 v0.0.5/go.mod h1:mtzInf8v5xmsBpHZVbIw2YQYhc4K0jRwzfsH64Uh0IQ= github.com/wader/filtertransport v0.0.0-20200316221534-bdd9e61eee78 h1:9sreu9e9KOihf2Y0NbpyfWhd1XFDcL4GTkPYL4IvMrg= github.com/wader/filtertransport v0.0.0-20200316221534-bdd9e61eee78/go.mod h1:HazXTRLhXFyq80TQp7PUXi6BKE6mS+ydEdzEqNBKopQ= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= diff --git a/proxy/pkg/mixed/proxy.go b/proxy/pkg/mixed/proxy.go index c8c45db7f..28a9b57d3 100644 --- a/proxy/pkg/mixed/proxy.go +++ b/proxy/pkg/mixed/proxy.go @@ -63,20 +63,20 @@ type Option func(*Proxy) // SwitchConn wraps a net.Conn and a bufio.Reader type SwitchConn struct { net.Conn - reader *bufio.Reader + *bufio.Reader } // NewSwitchConn creates a new SwitchConn func NewSwitchConn(conn net.Conn) *SwitchConn { return &SwitchConn{ Conn: conn, - reader: bufio.NewReader(conn), + Reader: bufio.NewReaderSize(conn, 2048), } } // Read reads data into p, first from the bufio.Reader, then from the net.Conn func (c *SwitchConn) Read(p []byte) (n int, err error) { - return c.reader.Read(p) + return c.Reader.Read(p) } func (p *Proxy) ListenAndServe() error { @@ -116,6 +116,7 @@ func (p *Proxy) ListenAndServe() error { // Start a new goroutine to handle each connection // This way, the server can handle multiple connections concurrently go func() { + defer conn.Close() err := p.handleConnection(conn) if err != nil { p.logger.Error(err.Error()) // Log errors from ServeConn @@ -129,23 +130,16 @@ func (p *Proxy) handleConnection(conn net.Conn) error { // Create a SwitchConn switchConn := NewSwitchConn(conn) - // Read one byte to determine the protocol - buf := make([]byte, 1) - _, err := switchConn.Read(buf) + // Peek one byte to determine the protocol + buf, err := switchConn.Peek(1) if err != nil { return err } - // Unread the byte so it's available for the next read - err = switchConn.reader.UnreadByte() - if err != nil { - return err - } - - switch { - case buf[0] == 5: + switch buf[0] { + case 5: err = p.socks5Proxy.ServeConn(switchConn) - case buf[0] == 4: + case 4: err = p.socks4Proxy.ServeConn(switchConn) default: err = p.httpProxy.ServeConn(switchConn) diff --git a/proxy/pkg/socks5/common.go b/proxy/pkg/socks5/common.go index bffccf645..014ceee80 100644 --- a/proxy/pkg/socks5/common.go +++ b/proxy/pkg/socks5/common.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io" - "math" "net" "strconv" "strings" @@ -20,7 +19,7 @@ var ( ) const ( - maxUdpPacket = math.MaxUint16 - 28 + maxUdpPacket = 2048 ) const ( @@ -321,8 +320,8 @@ func (cc *udpCustomConn) RemoteAddr() net.Addr { func (cc *udpCustomConn) asyncReadPackets() { go func() { + tempBuf := make([]byte, maxUdpPacket) for { - tempBuf := make([]byte, maxUdpPacket) n, addr, err := cc.ReadFrom(tempBuf) if err != nil { cc.packetQueue <- &readStruct{ @@ -331,18 +330,18 @@ func (cc *udpCustomConn) asyncReadPackets() { } break } - if cc.sourceAddr == nil { - cc.sourceAddr = addr - } - packetData := tempBuf[:n] - if len(packetData) < 3 { + if n < 3 { cc.packetQueue <- &readStruct{ data: nil, err: err, } break } - reader := bytes.NewBuffer(packetData[3:]) + if cc.sourceAddr == nil { + cc.sourceAddr = addr + } + + reader := bytes.NewBuffer(tempBuf[3:n]) targetAddr, err := readAddr(reader) if err != nil { diff --git a/wireguard/device/peer.go b/wireguard/device/peer.go index 1c518cfbc..457b43ec1 100644 --- a/wireguard/device/peer.go +++ b/wireguard/device/peer.go @@ -53,8 +53,7 @@ type Peer struct { inbound *autodrainingInboundQueue // sequential ordering of tun writing } - trick bool - stopCh chan int + trick bool cookieGenerator CookieGenerator trieEntries list.List @@ -80,7 +79,6 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) { // create peer peer := new(Peer) - peer.stopCh = make(chan int, 1) peer.cookieGenerator.Init(pk) peer.device = device peer.queue.outbound = newAutodrainingOutboundQueue(device) @@ -267,10 +265,6 @@ func (peer *Peer) Stop() { return } - select { - case peer.stopCh <- 1: - default: - } peer.device.log.Verbosef("%v - Stopping", peer) peer.timersStop() diff --git a/wireguard/device/send.go b/wireguard/device/send.go index 9d0eb91dd..7b1180af8 100644 --- a/wireguard/device/send.go +++ b/wireguard/device/send.go @@ -90,28 +90,27 @@ func randomInt(min, max int) int { func (peer *Peer) sendRandomPackets() { // Generate a random number of packets between 5 and 10 numPackets := randomInt(8, 15) + randomPacket := make([]byte, 100) for i := 0; i < numPackets; i++ { + if peer.device.isClosed() || !peer.isRunning.Load() { + return + } + // Generate a random packet size between 10 and 40 bytes packetSize := randomInt(40, 100) - randomPacket := make([]byte, packetSize) - _, err := rand.Read(randomPacket) + _, err := rand.Read(randomPacket[:packetSize]) if err != nil { return } // Send the random packet - err = peer.SendBuffers([][]byte{randomPacket}) + err = peer.SendBuffers([][]byte{randomPacket[:packetSize]}) if err != nil { return } - if i < numPackets-1 && peer.isRunning.Load() && !peer.device.isClosed() { - select { - case <-peer.stopCh: - // Wait for a random duration between 20 and 250 milliseconds - case <-time.After(time.Duration(randomInt(20, 250)) * time.Millisecond): - } - } + // Wait for a random duration between 20 and 250 milliseconds + <-time.After(time.Duration(randomInt(20, 250)) * time.Millisecond) } } @@ -122,7 +121,7 @@ func (peer *Peer) SendKeepalive() { // Send some random packets on every keepalive if peer.trick { peer.device.log.Verbosef("%v - Running tricks! (keepalive)", peer) - peer.sendRandomPackets() + go peer.sendRandomPackets() } elem := peer.device.NewOutboundElement() @@ -161,7 +160,7 @@ func (peer *Peer) SendHandshakeInitiation(isRetry bool) error { // send some random packets on handshake if peer.trick { peer.device.log.Verbosef("%v - Running tricks! (handshake)", peer) - peer.sendRandomPackets() + go peer.sendRandomPackets() } peer.handshake.lastSentHandshake = time.Now() diff --git a/wiresocks/config.go b/wiresocks/config.go index 0ddff54d8..5d42a49e5 100644 --- a/wiresocks/config.go +++ b/wiresocks/config.go @@ -158,6 +158,14 @@ func ParsePeers(cfg *ini.File) ([]PeerConfig, error) { peer.Endpoint = sectionKey.String() } + if sectionKey, err := section.GetKey("Trick"); err == nil { + value, err := sectionKey.Bool() + if err != nil { + return nil, err + } + peer.Trick = value + } + peers[i] = peer } @@ -165,7 +173,7 @@ func ParsePeers(cfg *ini.File) ([]PeerConfig, error) { } // ParseConfig takes the path of a configuration file and parses it into Configuration -func ParseConfig(path string, endpoint string) (*Configuration, error) { +func ParseConfig(path string) (*Configuration, error) { iniOpt := ini.LoadOptions{ Insensitive: true, AllowShadows: true, @@ -186,10 +194,6 @@ func ParseConfig(path string, endpoint string) (*Configuration, error) { if err != nil { return nil, err } - for i, peer := range peers { - peer.Endpoint = endpoint - peers[i] = peer - } return &Configuration{Interface: &iface, Peers: peers}, nil } diff --git a/wiresocks/config_test.go b/wiresocks/config_test.go index 3b4d62c54..cb61a99fe 100644 --- a/wiresocks/config_test.go +++ b/wiresocks/config_test.go @@ -15,11 +15,14 @@ PrivateKey = aK8FWhiV1CtKFbKUPssL13P+Tv+c5owmYcU5PCP6yFw= DNS = 8.8.8.8 Address = 172.16.0.2/24 Address = 2606:4700:110:8cc0:1ad3:9155:6742:ea8d/128 +MTU = 1500 [Peer] PublicKey = bmXOC+F1FxEMF9dyiK2H5/1SUtzH0JuVo51h2wPfgyo= AllowedIPs = 0.0.0.0/0 AllowedIPs = ::/0 Endpoint = engage.cloudflareclient.com:2408 +PersistentKeepalive = 3 +Trick = true ` const ( privateKeyBase64 = "68af055a1895d42b4a15b2943ecb0bd773fe4eff9ce68c2661c5393c23fac85c" @@ -47,7 +50,7 @@ func TestParseInterface(t *testing.T) { netip.MustParseAddr("2606:4700:110:8cc0:1ad3:9155:6742:ea8d"), }, DNS: []netip.Addr{netip.MustParseAddr("8.8.8.8")}, - MTU: 0, + MTU: 1500, } qt.Assert(t, device, qt.CmpEquals(cmpopts.EquateComparable(netip.Addr{})), want) t.Logf("%+v", device) @@ -70,12 +73,12 @@ func TestParsePeers(t *testing.T) { PublicKey: publicKeyBase64, PreSharedKey: presharedKeyBase64, Endpoint: "engage.cloudflareclient.com:2408", - KeepAlive: 0, + KeepAlive: 3, AllowedIPs: []netip.Prefix{ netip.MustParsePrefix("0.0.0.0/0"), netip.MustParsePrefix("::/0"), }, - Trick: false, + Trick: true, }} qt.Assert(t, peers, qt.CmpEquals(cmpopts.EquateComparable(netip.Prefix{})), want) t.Logf("%+v", peers) diff --git a/wiresocks/proxy.go b/wiresocks/proxy.go index 8b9d7bcb6..53b3801e4 100644 --- a/wiresocks/proxy.go +++ b/wiresocks/proxy.go @@ -2,23 +2,27 @@ package wiresocks import ( "context" + "errors" "io" "log/slog" "net" "net/netip" + "time" "github.com/bepass-org/warp-plus/proxy/pkg/mixed" "github.com/bepass-org/warp-plus/proxy/pkg/statute" "github.com/bepass-org/warp-plus/wireguard/device" "github.com/bepass-org/warp-plus/wireguard/tun/netstack" + "github.com/things-go/go-socks5/bufferpool" ) // VirtualTun stores a reference to netstack network and DNS configuration type VirtualTun struct { - Tnet *netstack.Net - Logger *slog.Logger - Dev *device.Device - Ctx context.Context + Tnet *netstack.Net + Logger *slog.Logger + Dev *device.Device + Ctx context.Context + pool bufferpool.BufPool } // StartProxy spawns a socks5 server. @@ -60,12 +64,18 @@ func (vt *VirtualTun) generalHandler(req *statute.ProxyRequest) error { done := make(chan error, 1) // Copy data from req.Conn to conn go func() { - _, err := io.Copy(conn, req.Conn) + req.Conn.SetReadDeadline(time.Now().Add(15 * time.Second)) + buf1 := vt.pool.Get() + defer vt.pool.Put(buf1) + _, err := copyConnTimeout(conn, req.Conn, buf1[:cap(buf1)], 15*time.Second) done <- err }() // Copy data from conn to req.Conn go func() { - _, err := io.Copy(req.Conn, conn) + conn.SetReadDeadline(time.Now().Add(15 * time.Second)) + buf2 := vt.pool.Get() + defer vt.pool.Put(buf2) + _, err := copyConnTimeout(req.Conn, conn, buf2[:cap(buf2)], 15*time.Second) done <- err }() // Wait for one of the copy operations to finish @@ -75,10 +85,7 @@ func (vt *VirtualTun) generalHandler(req *statute.ProxyRequest) error { } // Close connections and wait for the other copy operation to finish - conn.Close() - req.Conn.Close() <-done - return nil } @@ -89,3 +96,44 @@ func (vt *VirtualTun) Stop() { } } } + +var errInvalidWrite = errors.New("invalid write result") + +func copyConnTimeout(dst net.Conn, src net.Conn, buf []byte, timeout time.Duration) (written int64, err error) { + if buf != nil && len(buf) == 0 { + panic("empty buffer in CopyBuffer") + } + + for { + if err := src.SetReadDeadline(time.Now().Add(timeout)); err != nil { + return 0, err + } + + nr, er := src.Read(buf) + if nr > 0 { + nw, ew := dst.Write(buf[0:nr]) + if nw < 0 || nr < nw { + nw = 0 + if ew == nil { + ew = errInvalidWrite + } + } + written += int64(nw) + if ew != nil { + err = ew + break + } + if nr != nw { + err = io.ErrShortWrite + break + } + } + if er != nil { + if er != io.EOF { + err = er + } + break + } + } + return written, err +} diff --git a/wiresocks/wiresocks.go b/wiresocks/wiresocks.go index b75d4acd4..b50587c42 100644 --- a/wiresocks/wiresocks.go +++ b/wiresocks/wiresocks.go @@ -9,6 +9,7 @@ import ( "github.com/bepass-org/warp-plus/wireguard/conn" "github.com/bepass-org/warp-plus/wireguard/device" "github.com/bepass-org/warp-plus/wireguard/tun/netstack" + "github.com/things-go/go-socks5/bufferpool" ) // StartWireguard creates a tun interface on netstack given a configuration @@ -46,9 +47,10 @@ func StartWireguard(ctx context.Context, l *slog.Logger, conf *Configuration) (* } return &VirtualTun{ - Tnet: tnet, - Logger: l.With("subsystem", "vtun"), - Dev: dev, - Ctx: ctx, + Tnet: tnet, + Logger: l.With("subsystem", "vtun"), + Dev: dev, + Ctx: ctx, + pool: bufferpool.NewPool(256 * 1024), }, nil }