Skip to content

Commit

Permalink
Merge pull request #624 from getlantern/jovis/add-metric-attributes
Browse files Browse the repository at this point in the history
Add attributes of proxy.connections metric
  • Loading branch information
Jovis7 authored Aug 12, 2024
2 parents ec5cec9 + b238237 commit 839812c
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 15 deletions.
6 changes: 4 additions & 2 deletions http_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func (p *Proxy) ListenAndServe(ctx context.Context) error {
p.instrument, err = instrument.NewDefault(
p.CountryLookup,
p.ISPLookup,
p.ProxyName,
)
if err != nil {
return errors.New("Unable to configure instrumentation: %v", err)
Expand Down Expand Up @@ -252,9 +253,10 @@ func (p *Proxy) ListenAndServe(ctx context.Context) error {
Filter: instrumentedFilter,
OKDoesNotWaitForUpstream: !p.ConnectOKWaitsForUpstream,
OnError: instrumentedErrorHandler,
OnActive: func() {
OnActive: func(conn net.Conn) {
clientIP := conn.RemoteAddr().(*net.TCPAddr).IP
// count the connection only when a connection is established and becomes active
p.instrument.Connection(ctx)
p.instrument.Connection(ctx, clientIP)
},
})
stopProxiedBytes := p.configureTeleportProxiedBytes()
Expand Down
19 changes: 13 additions & 6 deletions instrument/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Instrument interface {
XBQHeaderSent(ctx context.Context)
SuspectedProbing(ctx context.Context, fromIP net.IP, reason string)
ProxiedBytes(ctx context.Context, sent, recv int, platform, platformVersion, libVersion, appVersion, app, locale, dataCapCohort, probingError string, clientIP net.IP, deviceID, originHost, arch string)
Connection(ctx context.Context)
Connection(ctx context.Context, clientIP net.IP)
ReportProxiedBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider)
ReportProxiedBytes(tp *sdktrace.TracerProvider)
ReportOriginBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider)
Expand Down Expand Up @@ -75,8 +75,8 @@ func (i NoInstrument) ProxiedBytes(ctx context.Context, sent, recv int, platform
}
func (i NoInstrument) ReportProxiedBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider) {
}
func (i NoInstrument) ReportProxiedBytes(tp *sdktrace.TracerProvider) {}
func (i NoInstrument) Connection(ctx context.Context) {}
func (i NoInstrument) ReportProxiedBytes(tp *sdktrace.TracerProvider) {}
func (i NoInstrument) Connection(ctx context.Context, clientIP net.IP) {}
func (i NoInstrument) ReportOriginBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider) {
}
func (i NoInstrument) ReportOriginBytes(tp *sdktrace.TracerProvider) {}
Expand All @@ -102,9 +102,10 @@ type defaultInstrument struct {
clientStats map[clientDetails]*usage
originStats map[originDetails]*usage
statsMx sync.Mutex
proxyName string
}

func NewDefault(countryLookup geo.CountryLookup, ispLookup geo.ISPLookup) (*defaultInstrument, error) {
func NewDefault(countryLookup geo.CountryLookup, ispLookup geo.ISPLookup, proxyName string) (*defaultInstrument, error) {
if err := otelinstrument.Initialize(); err != nil {
return nil, err
}
Expand All @@ -116,6 +117,7 @@ func NewDefault(countryLookup geo.CountryLookup, ispLookup geo.ISPLookup) (*defa
errorHandlers: make(map[string]func(conn net.Conn, err error)),
clientStats: make(map[clientDetails]*usage),
originStats: make(map[originDetails]*usage),
proxyName: proxyName,
}

return p, nil
Expand Down Expand Up @@ -301,8 +303,13 @@ func (ins *defaultInstrument) ProxiedBytes(ctx context.Context, sent, recv int,
}

// Connection counts the number of incoming connections
func (ins *defaultInstrument) Connection(ctx context.Context) {
otelinstrument.Connections.Add(ctx, 1)
func (ins *defaultInstrument) Connection(ctx context.Context, clientIP net.IP) {
fromCountry := ins.countryLookup.CountryCode(clientIP)
otelinstrument.Connections.Add(ctx, 1,
metric.WithAttributes(
attribute.KeyValue{"country", attribute.StringValue(fromCountry)},
attribute.KeyValue{"proxy.name", attribute.StringValue(ins.proxyName)},
))
}

// quicPackets is used by QuicTracer to update QUIC retransmissions mainly for block detection.
Expand Down
18 changes: 11 additions & 7 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Opts struct {

// OnActive is called only once when a connection is accepted and has done
// either a first Read() or Write()
OnActive func()
OnActive func(conn net.Conn)
}

// Server is an HTTP proxy server.
Expand All @@ -67,7 +67,7 @@ type Server struct {
listenerGenerators []ListenerGenerator
onError func(conn net.Conn, err error)
onAcceptError func(err error) (fatalErr error)
onActive func()
onActive func(conn net.Conn)
}

// New constructs a new HTTP proxy server using the given options
Expand Down Expand Up @@ -99,7 +99,7 @@ func New(opts *Opts) *Server {
opts.OnAcceptError = func(err error) (fatalErr error) { return err }
}
if opts.OnActive == nil {
opts.OnActive = func() {}
opts.OnActive = func(conn net.Conn) {}
}
return &Server{
proxy: p,
Expand Down Expand Up @@ -282,11 +282,11 @@ type onActiveConn struct {
net.Conn

once sync.Once
onActive func()
onActive func(conn net.Conn)
}

// WrapOnActiveConn wraps a net.Conn and calls onActive once after first successful Read or Write
func wrapOnActiveConn(conn net.Conn, onActive func()) net.Conn {
func wrapOnActiveConn(conn net.Conn, onActive func(conn net.Conn)) net.Conn {
wc, _ := conn.(listeners.WrapConnEmbeddable)
return &onActiveConn{wc, conn, sync.Once{}, onActive}
}
Expand All @@ -310,15 +310,19 @@ func (c *onActiveConn) Wrapped() net.Conn {
func (c *onActiveConn) Read(b []byte) (int, error) {
n, err := c.Conn.Read(b)
if err == nil {
c.once.Do(c.onActive)
c.once.Do(func() {
c.onActive(c.Conn)
})
}
return n, err
}

func (c *onActiveConn) Write(b []byte) (int, error) {
n, err := c.Conn.Write(b)
if err == nil {
c.once.Do(c.onActive)
c.once.Do(func() {
c.onActive(c.Conn)
})
}
return n, err
}

0 comments on commit 839812c

Please sign in to comment.