Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proxy optimizations and wireguard tweaks #89

Merged
merged 4 commits into from
May 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions app/app.go
Original file line number Diff line number Diff line change
@@ -94,13 +94,14 @@ func RunWarp(ctx context.Context, l *slog.Logger, opts WarpOptions) error {
}

func runWarp(ctx context.Context, l *slog.Logger, opts WarpOptions, endpoint string) error {
conf, err := wiresocks.ParseConfig(path.Join(opts.CacheDir, "primary", "wgcf-profile.ini"), endpoint)
conf, err := wiresocks.ParseConfig(path.Join(opts.CacheDir, "primary", "wgcf-profile.ini"))
if err != nil {
return err
}
conf.Interface.MTU = singleMTU

for i, peer := range conf.Peers {
peer.Endpoint = endpoint
peer.Trick = true
peer.KeepAlive = 3
conf.Peers[i] = peer
@@ -122,13 +123,14 @@ func runWarp(ctx context.Context, l *slog.Logger, opts WarpOptions, endpoint str
}

func runWarpWithPsiphon(ctx context.Context, l *slog.Logger, opts WarpOptions, endpoint string) error {
conf, err := wiresocks.ParseConfig(path.Join(opts.CacheDir, "primary", "wgcf-profile.ini"), endpoint)
conf, err := wiresocks.ParseConfig(path.Join(opts.CacheDir, "primary", "wgcf-profile.ini"))
if err != nil {
return err
}
conf.Interface.MTU = singleMTU

for i, peer := range conf.Peers {
peer.Endpoint = endpoint
peer.Trick = true
peer.KeepAlive = 3
conf.Peers[i] = peer
@@ -157,13 +159,14 @@ func runWarpWithPsiphon(ctx context.Context, l *slog.Logger, opts WarpOptions, e

func runWarpInWarp(ctx context.Context, l *slog.Logger, opts WarpOptions, endpoints []string) error {
// Run outer warp
conf, err := wiresocks.ParseConfig(path.Join(opts.CacheDir, "primary", "wgcf-profile.ini"), endpoints[0])
conf, err := wiresocks.ParseConfig(path.Join(opts.CacheDir, "primary", "wgcf-profile.ini"))
if err != nil {
return err
}
conf.Interface.MTU = singleMTU

for i, peer := range conf.Peers {
peer.Endpoint = endpoints[0]
peer.Trick = true
peer.KeepAlive = 3
conf.Peers[i] = peer
@@ -181,13 +184,14 @@ func runWarpInWarp(ctx context.Context, l *slog.Logger, opts WarpOptions, endpoi
}

// Run inner warp
conf, err = wiresocks.ParseConfig(path.Join(opts.CacheDir, "secondary", "wgcf-profile.ini"), addr.String())
conf, err = wiresocks.ParseConfig(path.Join(opts.CacheDir, "secondary", "wgcf-profile.ini"))
if err != nil {
return err
}
conf.Interface.MTU = doubleMTU

for i, peer := range conf.Peers {
peer.Endpoint = addr.String()
peer.KeepAlive = 10
conf.Peers[i] = peer
}
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -6,6 +6,8 @@ replace github.com/Psiphon-Labs/psiphon-tunnel-core => github.com/bepass-org/psi

require (
github.com/Psiphon-Labs/psiphon-tunnel-core v2.0.28+incompatible
github.com/adrg/xdg v0.4.0
github.com/carlmjohnson/versioninfo v0.22.5
github.com/fatih/color v1.16.0
github.com/flynn/noise v1.1.0
github.com/frankban/quicktest v1.14.6
@@ -16,6 +18,7 @@ require (
github.com/quic-go/quic-go v0.40.1
github.com/refraction-networking/utls v1.3.3
github.com/rodaine/table v1.1.1
github.com/things-go/go-socks5 v0.0.5
golang.org/x/crypto v0.21.0
golang.org/x/net v0.22.0
golang.org/x/sys v0.18.0
@@ -31,11 +34,9 @@ require (
github.com/Psiphon-Labs/goptlib v0.0.0-20200406165125-c0e32a7a3464 // indirect
github.com/Psiphon-Labs/psiphon-tls v0.0.0-20240305020009-09f917290799 // indirect
github.com/Psiphon-Labs/quic-go v0.0.0-20240305203241-7c4a760d03cc // indirect
github.com/adrg/xdg v0.4.0 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/armon/go-proxyproto v0.0.0-20180202201750-5b7edb60ff5f // indirect
github.com/bifurcation/mint v0.0.0-20180306135233-198357931e61 // indirect
github.com/carlmjohnson/versioninfo v0.22.5 // indirect
github.com/cheekybits/genny v0.0.0-20170328200008-9127e812e1e9 // indirect
github.com/cognusion/go-cache-lru v0.0.0-20170419142635-f73e2280ecea // indirect
github.com/dchest/siphash v1.2.3 // indirect
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -204,6 +204,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 h1:kdXcSzyDtseVEc4yCz2qF8ZrQvIDBJLl4S1c3GCXmoI=
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/things-go/go-socks5 v0.0.5 h1:qvKaGcBkfDrUL33SchHN93srAmYGzb4CxSM2DPYufe8=
github.com/things-go/go-socks5 v0.0.5/go.mod h1:mtzInf8v5xmsBpHZVbIw2YQYhc4K0jRwzfsH64Uh0IQ=
github.com/wader/filtertransport v0.0.0-20200316221534-bdd9e61eee78 h1:9sreu9e9KOihf2Y0NbpyfWhd1XFDcL4GTkPYL4IvMrg=
github.com/wader/filtertransport v0.0.0-20200316221534-bdd9e61eee78/go.mod h1:HazXTRLhXFyq80TQp7PUXi6BKE6mS+ydEdzEqNBKopQ=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
24 changes: 9 additions & 15 deletions proxy/pkg/mixed/proxy.go
Original file line number Diff line number Diff line change
@@ -63,20 +63,20 @@ type Option func(*Proxy)
// SwitchConn wraps a net.Conn and a bufio.Reader
type SwitchConn struct {
net.Conn
reader *bufio.Reader
*bufio.Reader
}

// NewSwitchConn creates a new SwitchConn
func NewSwitchConn(conn net.Conn) *SwitchConn {
return &SwitchConn{
Conn: conn,
reader: bufio.NewReader(conn),
Reader: bufio.NewReaderSize(conn, 2048),
}
}

// Read reads data into p, first from the bufio.Reader, then from the net.Conn
func (c *SwitchConn) Read(p []byte) (n int, err error) {
return c.reader.Read(p)
return c.Reader.Read(p)
}

func (p *Proxy) ListenAndServe() error {
@@ -116,6 +116,7 @@ func (p *Proxy) ListenAndServe() error {
// Start a new goroutine to handle each connection
// This way, the server can handle multiple connections concurrently
go func() {
defer conn.Close()
err := p.handleConnection(conn)
if err != nil {
p.logger.Error(err.Error()) // Log errors from ServeConn
@@ -129,23 +130,16 @@ func (p *Proxy) handleConnection(conn net.Conn) error {
// Create a SwitchConn
switchConn := NewSwitchConn(conn)

// Read one byte to determine the protocol
buf := make([]byte, 1)
_, err := switchConn.Read(buf)
// Peek one byte to determine the protocol
buf, err := switchConn.Peek(1)
if err != nil {
return err
}

// Unread the byte so it's available for the next read
err = switchConn.reader.UnreadByte()
if err != nil {
return err
}

switch {
case buf[0] == 5:
switch buf[0] {
case 5:
err = p.socks5Proxy.ServeConn(switchConn)
case buf[0] == 4:
case 4:
err = p.socks4Proxy.ServeConn(switchConn)
default:
err = p.httpProxy.ServeConn(switchConn)
17 changes: 8 additions & 9 deletions proxy/pkg/socks5/common.go
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"io"
"math"
"net"
"strconv"
"strings"
@@ -20,7 +19,7 @@ var (
)

const (
maxUdpPacket = math.MaxUint16 - 28
maxUdpPacket = 2048
)

const (
@@ -321,8 +320,8 @@ func (cc *udpCustomConn) RemoteAddr() net.Addr {

func (cc *udpCustomConn) asyncReadPackets() {
go func() {
tempBuf := make([]byte, maxUdpPacket)
for {
tempBuf := make([]byte, maxUdpPacket)
n, addr, err := cc.ReadFrom(tempBuf)
if err != nil {
cc.packetQueue <- &readStruct{
@@ -331,18 +330,18 @@ func (cc *udpCustomConn) asyncReadPackets() {
}
break
}
if cc.sourceAddr == nil {
cc.sourceAddr = addr
}
packetData := tempBuf[:n]
if len(packetData) < 3 {
if n < 3 {
cc.packetQueue <- &readStruct{
data: nil,
err: err,
}
break
}
reader := bytes.NewBuffer(packetData[3:])
if cc.sourceAddr == nil {
cc.sourceAddr = addr
}

reader := bytes.NewBuffer(tempBuf[3:n])
targetAddr, err := readAddr(reader)

if err != nil {
8 changes: 1 addition & 7 deletions wireguard/device/peer.go
Original file line number Diff line number Diff line change
@@ -53,8 +53,7 @@ type Peer struct {
inbound *autodrainingInboundQueue // sequential ordering of tun writing
}

trick bool
stopCh chan int
trick bool

cookieGenerator CookieGenerator
trieEntries list.List
@@ -80,7 +79,6 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) {

// create peer
peer := new(Peer)
peer.stopCh = make(chan int, 1)
peer.cookieGenerator.Init(pk)
peer.device = device
peer.queue.outbound = newAutodrainingOutboundQueue(device)
@@ -267,10 +265,6 @@ func (peer *Peer) Stop() {
return
}

select {
case peer.stopCh <- 1:
default:
}
peer.device.log.Verbosef("%v - Stopping", peer)

peer.timersStop()
23 changes: 11 additions & 12 deletions wireguard/device/send.go
Original file line number Diff line number Diff line change
@@ -90,28 +90,27 @@ func randomInt(min, max int) int {
func (peer *Peer) sendRandomPackets() {
// Generate a random number of packets between 5 and 10
numPackets := randomInt(8, 15)
randomPacket := make([]byte, 100)
for i := 0; i < numPackets; i++ {
if peer.device.isClosed() || !peer.isRunning.Load() {
return
}

// Generate a random packet size between 10 and 40 bytes
packetSize := randomInt(40, 100)
randomPacket := make([]byte, packetSize)
_, err := rand.Read(randomPacket)
_, err := rand.Read(randomPacket[:packetSize])
if err != nil {
return
}

// Send the random packet
err = peer.SendBuffers([][]byte{randomPacket})
err = peer.SendBuffers([][]byte{randomPacket[:packetSize]})
if err != nil {
return
}

if i < numPackets-1 && peer.isRunning.Load() && !peer.device.isClosed() {
select {
case <-peer.stopCh:
// Wait for a random duration between 20 and 250 milliseconds
case <-time.After(time.Duration(randomInt(20, 250)) * time.Millisecond):
}
}
// Wait for a random duration between 20 and 250 milliseconds
<-time.After(time.Duration(randomInt(20, 250)) * time.Millisecond)
}
}

@@ -122,7 +121,7 @@ func (peer *Peer) SendKeepalive() {
// Send some random packets on every keepalive
if peer.trick {
peer.device.log.Verbosef("%v - Running tricks! (keepalive)", peer)
peer.sendRandomPackets()
go peer.sendRandomPackets()
}

elem := peer.device.NewOutboundElement()
@@ -161,7 +160,7 @@ func (peer *Peer) SendHandshakeInitiation(isRetry bool) error {
// send some random packets on handshake
if peer.trick {
peer.device.log.Verbosef("%v - Running tricks! (handshake)", peer)
peer.sendRandomPackets()
go peer.sendRandomPackets()
}

peer.handshake.lastSentHandshake = time.Now()
14 changes: 9 additions & 5 deletions wiresocks/config.go
Original file line number Diff line number Diff line change
@@ -158,14 +158,22 @@ func ParsePeers(cfg *ini.File) ([]PeerConfig, error) {
peer.Endpoint = sectionKey.String()
}

if sectionKey, err := section.GetKey("Trick"); err == nil {
value, err := sectionKey.Bool()
if err != nil {
return nil, err
}
peer.Trick = value
}

peers[i] = peer
}

return peers, nil
}

// ParseConfig takes the path of a configuration file and parses it into Configuration
func ParseConfig(path string, endpoint string) (*Configuration, error) {
func ParseConfig(path string) (*Configuration, error) {
iniOpt := ini.LoadOptions{
Insensitive: true,
AllowShadows: true,
@@ -186,10 +194,6 @@ func ParseConfig(path string, endpoint string) (*Configuration, error) {
if err != nil {
return nil, err
}
for i, peer := range peers {
peer.Endpoint = endpoint
peers[i] = peer
}

return &Configuration{Interface: &iface, Peers: peers}, nil
}
9 changes: 6 additions & 3 deletions wiresocks/config_test.go
Original file line number Diff line number Diff line change
@@ -15,11 +15,14 @@ PrivateKey = aK8FWhiV1CtKFbKUPssL13P+Tv+c5owmYcU5PCP6yFw=
DNS = 8.8.8.8
Address = 172.16.0.2/24
Address = 2606:4700:110:8cc0:1ad3:9155:6742:ea8d/128
MTU = 1500
[Peer]
PublicKey = bmXOC+F1FxEMF9dyiK2H5/1SUtzH0JuVo51h2wPfgyo=
AllowedIPs = 0.0.0.0/0
AllowedIPs = ::/0
Endpoint = engage.cloudflareclient.com:2408
PersistentKeepalive = 3
Trick = true
`
const (
privateKeyBase64 = "68af055a1895d42b4a15b2943ecb0bd773fe4eff9ce68c2661c5393c23fac85c"
@@ -47,7 +50,7 @@ func TestParseInterface(t *testing.T) {
netip.MustParseAddr("2606:4700:110:8cc0:1ad3:9155:6742:ea8d"),
},
DNS: []netip.Addr{netip.MustParseAddr("8.8.8.8")},
MTU: 0,
MTU: 1500,
}
qt.Assert(t, device, qt.CmpEquals(cmpopts.EquateComparable(netip.Addr{})), want)
t.Logf("%+v", device)
@@ -70,12 +73,12 @@ func TestParsePeers(t *testing.T) {
PublicKey: publicKeyBase64,
PreSharedKey: presharedKeyBase64,
Endpoint: "engage.cloudflareclient.com:2408",
KeepAlive: 0,
KeepAlive: 3,
AllowedIPs: []netip.Prefix{
netip.MustParsePrefix("0.0.0.0/0"),
netip.MustParsePrefix("::/0"),
},
Trick: false,
Trick: true,
}}
qt.Assert(t, peers, qt.CmpEquals(cmpopts.EquateComparable(netip.Prefix{})), want)
t.Logf("%+v", peers)
66 changes: 57 additions & 9 deletions wiresocks/proxy.go
Original file line number Diff line number Diff line change
@@ -2,23 +2,27 @@ package wiresocks

import (
"context"
"errors"
"io"
"log/slog"
"net"
"net/netip"
"time"

"github.com/bepass-org/warp-plus/proxy/pkg/mixed"
"github.com/bepass-org/warp-plus/proxy/pkg/statute"
"github.com/bepass-org/warp-plus/wireguard/device"
"github.com/bepass-org/warp-plus/wireguard/tun/netstack"
"github.com/things-go/go-socks5/bufferpool"
)

// VirtualTun stores a reference to netstack network and DNS configuration
type VirtualTun struct {
Tnet *netstack.Net
Logger *slog.Logger
Dev *device.Device
Ctx context.Context
Tnet *netstack.Net
Logger *slog.Logger
Dev *device.Device
Ctx context.Context
pool bufferpool.BufPool
}

// StartProxy spawns a socks5 server.
@@ -60,12 +64,18 @@ func (vt *VirtualTun) generalHandler(req *statute.ProxyRequest) error {
done := make(chan error, 1)
// Copy data from req.Conn to conn
go func() {
_, err := io.Copy(conn, req.Conn)
req.Conn.SetReadDeadline(time.Now().Add(15 * time.Second))
buf1 := vt.pool.Get()
defer vt.pool.Put(buf1)
_, err := copyConnTimeout(conn, req.Conn, buf1[:cap(buf1)], 15*time.Second)
done <- err
}()
// Copy data from conn to req.Conn
go func() {
_, err := io.Copy(req.Conn, conn)
conn.SetReadDeadline(time.Now().Add(15 * time.Second))
buf2 := vt.pool.Get()
defer vt.pool.Put(buf2)
_, err := copyConnTimeout(req.Conn, conn, buf2[:cap(buf2)], 15*time.Second)
done <- err
}()
// Wait for one of the copy operations to finish
@@ -75,10 +85,7 @@ func (vt *VirtualTun) generalHandler(req *statute.ProxyRequest) error {
}

// Close connections and wait for the other copy operation to finish
conn.Close()
req.Conn.Close()
<-done

return nil
}

@@ -89,3 +96,44 @@ func (vt *VirtualTun) Stop() {
}
}
}

var errInvalidWrite = errors.New("invalid write result")

func copyConnTimeout(dst net.Conn, src net.Conn, buf []byte, timeout time.Duration) (written int64, err error) {
if buf != nil && len(buf) == 0 {
panic("empty buffer in CopyBuffer")
}

for {
if err := src.SetReadDeadline(time.Now().Add(timeout)); err != nil {
return 0, err
}

nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
if nw < 0 || nr < nw {
nw = 0
if ew == nil {
ew = errInvalidWrite
}
}
written += int64(nw)
if ew != nil {
err = ew
break
}
if nr != nw {
err = io.ErrShortWrite
break
}
}
if er != nil {
if er != io.EOF {
err = er
}
break
}
}
return written, err
}
10 changes: 6 additions & 4 deletions wiresocks/wiresocks.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import (
"github.com/bepass-org/warp-plus/wireguard/conn"
"github.com/bepass-org/warp-plus/wireguard/device"
"github.com/bepass-org/warp-plus/wireguard/tun/netstack"
"github.com/things-go/go-socks5/bufferpool"
)

// StartWireguard creates a tun interface on netstack given a configuration
@@ -46,9 +47,10 @@ func StartWireguard(ctx context.Context, l *slog.Logger, conf *Configuration) (*
}

return &VirtualTun{
Tnet: tnet,
Logger: l.With("subsystem", "vtun"),
Dev: dev,
Ctx: ctx,
Tnet: tnet,
Logger: l.With("subsystem", "vtun"),
Dev: dev,
Ctx: ctx,
pool: bufferpool.NewPool(256 * 1024),
}, nil
}