Skip to content

Commit

Permalink
all: upstream statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
schzhn committed Jan 14, 2025
1 parent 712d38b commit 6ada226
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 77 deletions.
12 changes: 7 additions & 5 deletions fastip/fastest.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ func New(c *Config) (f *FastestAddr) {
func (f *FastestAddr) ExchangeFastest(
req *dns.Msg,
ups []upstream.Upstream,
) (resp *dns.Msg, u upstream.Upstream, err error) {
replies, err := upstream.ExchangeAll(ups, req)
) (resp *dns.Msg, u upstream.Upstream, us []*upstream.Statistics, err error) {
replies, us, err := upstream.ExchangeAll(ups, req)
if err != nil {
return nil, nil, err
return nil, nil, us, err
}

ipSet := container.NewMapSet[netip.Addr]()
Expand All @@ -130,12 +130,14 @@ func (f *FastestAddr) ExchangeFastest(
ips := ipSet.Values()
host := strings.ToLower(req.Question[0].Name)
if pingRes := f.pingAll(host, ips); pingRes != nil {
return f.prepareReply(pingRes, replies)
resp, u, err = f.prepareReply(pingRes, replies)

return resp, u, us, err
}

f.logger.Debug("no fastest ip found, using the first response", "host", host)

return replies[0].Resp, replies[0].Upstream, nil
return replies[0].Resp, replies[0].Upstream, us, nil
}

// prepareReply converts replies into the DNS answer message according to res.
Expand Down
9 changes: 6 additions & 3 deletions fastip/fastest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func TestFastestAddr_ExchangeFastest(t *testing.T) {
PingWaitTimeout: DefaultPingWaitTimeout,
})

resp, up, err := f.ExchangeFastest(newTestReq(t), []upstream.Upstream{u})
resp, up, us, err := f.ExchangeFastest(newTestReq(t), []upstream.Upstream{u})
require.NotNil(t, us)
require.Error(t, err)

assert.ErrorIs(t, err, errDesired)
Expand Down Expand Up @@ -55,7 +56,8 @@ func TestFastestAddr_ExchangeFastest(t *testing.T) {
recs: []*dns.A{newTestRec(t, netip.MustParseAddr("192.0.2.1"))},
}

rep, ups, err := f.ExchangeFastest(newTestReq(t), []upstream.Upstream{dead, alive})
rep, ups, us, err := f.ExchangeFastest(newTestReq(t), []upstream.Upstream{dead, alive})
require.NotNil(t, us)
require.NoError(t, err)

assert.Equal(t, ups, alive)
Expand Down Expand Up @@ -84,7 +86,8 @@ func TestFastestAddr_ExchangeFastest(t *testing.T) {
},
}

resp, _, err := f.ExchangeFastest(newTestReq(t), []upstream.Upstream{ups})
resp, _, us, err := f.ExchangeFastest(newTestReq(t), []upstream.Upstream{ups})
require.NotNil(t, us)
require.NoError(t, err)

require.NotNil(t, resp)
Expand Down
3 changes: 2 additions & 1 deletion proxy/dns64.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ func (p *Proxy) performDNS64(
host := origReq.Question[0].Name
p.logger.Debug("received an empty aaaa response, checking dns64", "host", host)

dns64Resp, u, err := p.exchangeUpstreams(dns64Req, upstreams)
// TODO!! upstream statistics
dns64Resp, u, _, err := p.exchangeUpstreams(dns64Req, upstreams)
if err != nil {
p.logger.Error("dns64 request failed", slogutil.KeyError, err)

Expand Down
50 changes: 41 additions & 9 deletions proxy/dnscontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"net"
"net/http"
"net/netip"
"time"

"github.com/AdguardTeam/dnsproxy/upstream"
"github.com/ameshkov/dnscrypt/v2"
Expand Down Expand Up @@ -52,11 +51,11 @@ type DNSContext struct {
// Res is the response message.
Res *dns.Msg

Proto Proto
// queryStatistics contains DNS query statistics for both the upstream and
// fallback DNS servers.
queryStatistics *QueryStatistics

// CachedUpstreamAddr is the address of the upstream which the answer was
// cached with. It's empty for responses resolved by the upstream server.
CachedUpstreamAddr string
Proto Proto

// RequestedPrivateRDNS is the subnet extracted from the ARPA domain of
// request's question if it's a PTR, SOA, or NS query for a private IP
Expand All @@ -69,10 +68,6 @@ type DNSContext struct {
// Addr is the address of the client.
Addr netip.AddrPort

// QueryDuration is the duration of a successful query to an upstream
// server or, if the upstream server is unavailable, to a fallback server.
QueryDuration time.Duration

// DoQVersion is the DoQ protocol version. It can (and should) be read from
// ALPN, but in the current version we also use the way DNS messages are
// encoded as a signal.
Expand Down Expand Up @@ -115,6 +110,16 @@ func (p *Proxy) newDNSContext(proto Proto, req *dns.Msg, addr netip.AddrPort) (d
}
}

// QueryStatistics returns DNS query statistics for both the upstream and
// fallback DNS servers.
func (dctx *DNSContext) QueryStatistics() (s *QueryStatistics) {
if dctx == nil {
return nil
}

return dctx.queryStatistics
}

// calcFlagsAndSize lazily calculates some values required for Resolve method.
func (dctx *DNSContext) calcFlagsAndSize() {
if dctx.udpSize != 0 || dctx.Req == nil {
Expand Down Expand Up @@ -231,3 +236,30 @@ func (c *CustomUpstreamConfig) ClearCache() {
c.cache.clearItems()
c.cache.clearItemsWithSubnet()
}

// QueryStatistics contains DNS query statistics for both the upstream and
// fallback DNS servers.
type QueryStatistics struct {
main []*upstream.Statistics
fallback []*upstream.Statistics
}

// cachedQueryStatistics returns DNS query statistics for cached queries.
func cachedQueryStatistics(addr string) (s *QueryStatistics) {
return &QueryStatistics{
main: []*upstream.Statistics{{
Address: addr,
IsCached: true,
}},
}
}

// Main returns DNS query statistics for the upstream DNS server.
func (s *QueryStatistics) Main() (us []*upstream.Statistics) {
return s.main
}

// Fallback returns DNS query statistics for the fallback DNS server.
func (s *QueryStatistics) Fallback() (us []*upstream.Statistics) {
return s.fallback
}
33 changes: 25 additions & 8 deletions proxy/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
func (p *Proxy) exchangeUpstreams(
req *dns.Msg,
ups []upstream.Upstream,
) (resp *dns.Msg, u upstream.Upstream, err error) {
) (resp *dns.Msg, u upstream.Upstream, us []*upstream.Statistics, err error) {
switch p.UpstreamMode {
case UpstreamModeParallel:
return upstream.ExchangeParallel(ups, req)
Expand All @@ -32,12 +32,22 @@ func (p *Proxy) exchangeUpstreams(
// Go on to the load-balancing mode.
}

return p.exchangeLoadBalancing(req, ups)
}

// exchangeLoadBalancing resolves req using the given upstreams while performing
// load balancing.
func (p *Proxy) exchangeLoadBalancing(
req *dns.Msg,
ups []upstream.Upstream,
) (resp *dns.Msg, u upstream.Upstream, us []*upstream.Statistics, err error) {
var s *upstream.Statistics
if len(ups) == 1 {
u = ups[0]
resp, _, err = p.exchange(u, req, p.time)
resp, _, s, err = p.exchange(u, req, p.time)
// TODO(e.burkov): p.updateRTT(u.Address(), elapsed)

return resp, u, err
return resp, u, []*upstream.Statistics{s}, err
}

w := sampleuv.NewWeighted(p.calcWeights(ups), p.randSrc)
Expand All @@ -46,11 +56,12 @@ func (p *Proxy) exchangeUpstreams(
u = ups[i]

var elapsed time.Duration
resp, elapsed, err = p.exchange(u, req, p.time)
resp, elapsed, s, err = p.exchange(u, req, p.time)
us = append(us, s)
if err == nil {
p.updateRTT(u.Address(), elapsed)

return resp, u, nil
return resp, u, us, nil
}

errs = append(errs, err)
Expand All @@ -62,7 +73,7 @@ func (p *Proxy) exchangeUpstreams(

err = fmt.Errorf("all upstreams failed to exchange request: %w", errors.Join(errs...))

return nil, nil, err
return nil, nil, us, err
}

// exchange returns the result of the DNS request exchange with the given
Expand All @@ -72,7 +83,7 @@ func (p *Proxy) exchange(
u upstream.Upstream,
req *dns.Msg,
c clock,
) (resp *dns.Msg, dur time.Duration, err error) {
) (resp *dns.Msg, dur time.Duration, s *upstream.Statistics, err error) {
startTime := c.Now()
resp, err = u.Exchange(req)

Expand All @@ -81,7 +92,12 @@ func (p *Proxy) exchange(

addr := u.Address()
q := &req.Question[0]
s = &upstream.Statistics{
Address: addr,
}

if err != nil {
s.QueryDuration = dur
p.logger.Error(
"exchange failed",
"upstream", addr,
Expand All @@ -90,6 +106,7 @@ func (p *Proxy) exchange(
slogutil.KeyError, err,
)
} else {
s.Error = err
p.logger.Debug(
"exchange successfully finished",
"upstream", addr,
Expand All @@ -98,7 +115,7 @@ func (p *Proxy) exchange(
)
}

return resp, dur, err
return resp, dur, s, err
}

// upstreamRTTStats is the statistics for a single upstream's round-trip time.
Expand Down
14 changes: 7 additions & 7 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,11 +554,13 @@ func (p *Proxy) replyFromUpstream(d *DNSContext) (ok bool, err error) {
p.recDetector.add(d.Req)
}

start := time.Now()
src := "upstream"

// Perform the DNS request.
resp, u, err := p.exchangeUpstreams(req, upstreams)
resp, u, us, err := p.exchangeUpstreams(req, upstreams)
d.queryStatistics = &QueryStatistics{
main: us,
}
if dns64Ups := p.performDNS64(req, resp, upstreams); dns64Ups != nil {
u = dns64Ups
} else if p.isBogusNXDomain(resp) {
Expand All @@ -569,24 +571,22 @@ func (p *Proxy) replyFromUpstream(d *DNSContext) (ok bool, err error) {
if err != nil && !isPrivate && p.Fallbacks != nil {
p.logger.Debug("using fallback", slogutil.KeyError, err)

// Reset the timer.
start = time.Now()
src = "fallback"

// upstreams mustn't appear empty since they have been validated when
// creating proxy.
upstreams = p.Fallbacks.getUpstreamsForDomain(req.Question[0].Name)

resp, u, err = upstream.ExchangeParallel(upstreams, req)
resp, u, us, err = upstream.ExchangeParallel(upstreams, req)
d.queryStatistics.fallback = us
}

if err != nil {
p.logger.Debug("resolving err", "src", src, slogutil.KeyError, err)
}

if resp != nil {
d.QueryDuration = time.Since(start)
p.logger.Debug("resolved", "src", src, "rtt", d.QueryDuration)
p.logger.Debug("resolved", "src", src)
}

p.handleExchangeResult(d, req, resp, u)
Expand Down
2 changes: 1 addition & 1 deletion proxy/proxycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (p *Proxy) replyFromCache(d *DNSContext) (hit bool) {
}

d.Res = ci.m
d.CachedUpstreamAddr = ci.u
d.queryStatistics = cachedQueryStatistics(ci.u)

p.logger.Debug(
"replying from cache",
Expand Down
Loading

0 comments on commit 6ada226

Please sign in to comment.