Skip to content

Commit

Permalink
rpadmin: add AdminAddressesFromK8SDNS to rpadmin package (#41)
Browse files Browse the repository at this point in the history
* rpadmin: add AdminAddressesFromK8SDNS to rpadmin package

* rpadmin: add ErrNoSRVRecordsFound

* rpadmin: add UpdateAPIUrlsFromKubernetesDNS

* rpadmin: fix https in TestAdminAddressesFromK8SDNS and add test case

* rpadmin: add https test for UpdateAPIUrlsFromKubernetesDNS

* rpadmin: add mutex to protech a.urls

* rpadmin: add comment on how to check for kubernetes
  • Loading branch information
bojand authored Dec 5, 2024
1 parent 0fc2550 commit 7c14761
Show file tree
Hide file tree
Showing 4 changed files with 295 additions and 9 deletions.
128 changes: 119 additions & 9 deletions rpadmin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"math/rand"
"net"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"sync"
Expand All @@ -33,9 +35,14 @@ import (
commonnet "github.com/redpanda-data/common-go/net"
)

// ErrNoAdminAPILeader happen when there's no leader for the Admin API.
// ErrNoAdminAPILeader happens when there's no leader for the Admin API.
var ErrNoAdminAPILeader = errors.New("no Admin API leader found")

// ErrNoSRVRecordsFound happens when we try to deduce Admin API URLs
// from Kubernetes SRV DNS records, but no records were returned by
// the DNS query.
var ErrNoSRVRecordsFound = errors.New("not SRV DNS records found")

// HTTPResponseError is the error response.
type HTTPResponseError struct {
Method string
Expand Down Expand Up @@ -81,6 +88,7 @@ type GenericErrorBody struct {

// AdminAPI is a client to interact with Redpanda's admin server.
type AdminAPI struct {
urlsMutex sync.RWMutex
urls []string
brokerIDToUrlsMutex sync.Mutex
brokerIDToUrls map[int]string
Expand Down Expand Up @@ -190,20 +198,40 @@ func newAdminAPI(urls []string, auth Auth, tlsConfig *tls.Config, dialer DialCon
a.retryClient.Transport = transport
a.oneshotClient.Transport = transport

if err := a.initURLs(urls, tlsConfig, forCloud); err != nil {
return nil, err
}

return a, nil
}

const (
schemeHTTP = "http"
schemeHTTPS = "https"
)

func (a *AdminAPI) initURLs(urls []string, tlsConfig *tls.Config, forCloud bool) error {
a.urlsMutex.Lock()
defer a.urlsMutex.Unlock()

if len(a.urls) != len(urls) {
a.urls = make([]string, len(urls))
}

for i, u := range urls {
scheme, host, err := commonnet.ParseHostMaybeScheme(u)
if err != nil {
return nil, err
return err
}
switch scheme {
case "", "http":
scheme = "http"
case "", schemeHTTP:
scheme = schemeHTTP
if tlsConfig != nil {
scheme = "https"
scheme = schemeHTTPS
}
case "https":
case schemeHTTPS:
default:
return nil, fmt.Errorf("unrecognized scheme %q in host %q", scheme, u)
return fmt.Errorf("unrecognized scheme %q in host %q", scheme, u)
}
full := fmt.Sprintf("%s://%s", scheme, host)
if forCloud {
Expand All @@ -212,7 +240,7 @@ func newAdminAPI(urls []string, auth Auth, tlsConfig *tls.Config, dialer DialCon
a.urls[i] = full
}

return a, nil
return nil
}

// SetAuth sets the auth in the client.
Expand All @@ -225,10 +253,14 @@ func (a *AdminAPI) newAdminForSingleHost(host string) (*AdminAPI, error) {
}

func (a *AdminAPI) urlsWithPath(path string) []string {
a.urlsMutex.RLock()
defer a.urlsMutex.RUnlock()

urls := make([]string, len(a.urls))
for i := 0; i < len(a.urls); i++ {
urls[i] = fmt.Sprintf("%s%s", a.urls[i], path)
}

return urls
}

Expand All @@ -249,8 +281,13 @@ func (a *AdminAPI) mapBrokerIDsToURLs(ctx context.Context) {
if err != nil {
return err
}

a.urlsMutex.RLock()
url := aa.urls[0]
a.urlsMutex.RUnlock()

a.brokerIDToUrlsMutex.Lock()
a.brokerIDToUrls[nc.NodeID] = aa.urls[0]
a.brokerIDToUrls[nc.NodeID] = url
a.brokerIDToUrlsMutex.Unlock()
return nil
})
Expand Down Expand Up @@ -280,9 +317,12 @@ func (a *AdminAPI) GetLeaderID(ctx context.Context) (*int, error) {
func (a *AdminAPI) sendAny(ctx context.Context, method, path string, body, into any) error {
// Shuffle the list of URLs
rng := rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec // old rpk code.

a.urlsMutex.RLock()
shuffled := make([]string, len(a.urls))
copy(shuffled, a.urls)
rng.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] })
a.urlsMutex.RUnlock()

// After a 503 or 504, wait a little for an election
const unavailableBackoff = 1500 * time.Millisecond
Expand Down Expand Up @@ -418,10 +458,12 @@ func (a *AdminAPI) getURLFromBrokerID(brokerID int) (string, bool) {
func (a *AdminAPI) sendOne(
ctx context.Context, method, path string, body, into any, retryable bool,
) error {
a.urlsMutex.RLock()
if len(a.urls) != 1 {
return fmt.Errorf("unable to issue a single-admin-endpoint request to %d admin endpoints", len(a.urls))
}
url := a.urls[0] + path
a.urlsMutex.RUnlock()
res, err := a.sendAndReceive(ctx, method, url, body, retryable)
if err != nil {
return err
Expand Down Expand Up @@ -494,6 +536,7 @@ func (a *AdminAPI) sendAll(rootCtx context.Context, method, path string, body, i
// for each of them in a go routine.
func (a *AdminAPI) eachBroker(fn func(aa *AdminAPI) error) error {
var grp multierror.Group
a.urlsMutex.RLock()
for _, url := range a.urls {
aURL := url
grp.Go(func() error {
Expand All @@ -504,6 +547,8 @@ func (a *AdminAPI) eachBroker(fn func(aa *AdminAPI) error) error {
return fn(aa)
})
}
a.urlsMutex.RUnlock()

return grp.Wait().ErrorOrNil()
}

Expand Down Expand Up @@ -630,3 +675,68 @@ func MaxRetries(r int) Opt {
cl.MaxRetries = r
}}
}

// AdminAddressesFromK8SDNS attempts to deduce admin API URLs
// based on Kubernetes DNS resolution.
// https://github.com/kubernetes/dns/blob/master/docs/specification.md
// Assume that Admin API URL configured is a Kubernetes Service URL.
// This Admin API URL is passed in as the function argument.
// Since it's a Kubernetes service, Kubernetes DNS creates a DNS SRV record
// for the admin port mapping.
// We can query the DNS record to get the target host names and ports.
// To check if a workload is running inside a kubernetes pod test for
// KUBERNETES_SERVICE_HOST or KUBERNETES_SERVICE_PORT env vars.
func AdminAddressesFromK8SDNS(adminAPIURL string) ([]string, error) {
adminURL, err := url.Parse(adminAPIURL)
if err != nil {
return nil, err
}

_, records, err := net.LookupSRV("admin", "tcp", adminURL.Hostname())
if err != nil {
return nil, err
}

if len(records) == 0 {
return nil, ErrNoSRVRecordsFound
}

// targets may be in the form
// redpanda-1.redpanda.redpanda.svc.cluster.local.
// take advantage of ordinals and order them accordingly
sort.Slice(records, func(i, j int) bool {
return records[i].Target < records[j].Target
})

urls := make([]string, 0, len(records))

proto := "http://"
if adminURL.Scheme == schemeHTTPS {
proto = "https://"
}

for _, r := range records {
urls = append(urls, proto+r.Target+":"+strconv.Itoa(int(r.Port)))
}

return urls, nil
}

// UpdateAPIUrlsFromKubernetesDNS updates the client's internal URLs to admin addresses from Kubernetes DNS.
// See AdminAddressesFromK8SDNS.
func (a *AdminAPI) UpdateAPIUrlsFromKubernetesDNS() error {
a.urlsMutex.RLock()
if len(a.urls) == 0 {
return errors.New("at least one url is required for the admin api")
}

baseURL := a.urls[0]
a.urlsMutex.RUnlock()

urls, err := AdminAddressesFromK8SDNS(baseURL)
if err != nil {
return err
}

return a.initURLs(urls, a.tlsConfig, a.forCloud)
}
106 changes: 106 additions & 0 deletions rpadmin/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ package rpadmin
import (
"context"
"fmt"
"net"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"sync"
"testing"

"github.com/foxcpp/go-mockdns"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -222,3 +226,105 @@ func callsForNodeID(calls []testCall, nodeID int) []testCall {
}
return filtered
}

func TestAdminAddressesFromK8SDNS(t *testing.T) {
schemes := []string{"http", "https"}

for _, scheme := range schemes {
t.Run(scheme, func(t *testing.T) {
adminAPIURL := scheme + "://" + "redpanda-api.cluster.local:19644"

adminAPIHostURL, err := url.Parse(adminAPIURL)
require.NoError(t, err)

srv, err := mockdns.NewServer(map[string]mockdns.Zone{
"_admin._tcp." + adminAPIHostURL.Hostname() + ".": {
SRV: []net.SRV{
{
Target: "rp-id123-0.rp-id123.redpanda.svc.cluster.local.",
Port: 9644,
Weight: 33,
},
{
Target: "rp-id123-1.rp-id123.redpanda.svc.cluster.local.",
Port: 9644,
Weight: 33,
},
{
Target: "rp-id123-2.rp-id123.redpanda.svc.cluster.local.",
Port: 9644,
Weight: 33,
},
},
},
}, false)
require.NoError(t, err)

defer srv.Close()

srv.PatchNet(net.DefaultResolver)
defer mockdns.UnpatchNet(net.DefaultResolver)

brokerURLs, err := AdminAddressesFromK8SDNS(adminAPIURL)
assert.NoError(t, err)
require.Len(t, brokerURLs, 3)
assert.Equal(t, scheme+"://"+"rp-id123-0.rp-id123.redpanda.svc.cluster.local.:9644", brokerURLs[0])
assert.Equal(t, scheme+"://"+"rp-id123-1.rp-id123.redpanda.svc.cluster.local.:9644", brokerURLs[1])
assert.Equal(t, scheme+"://"+"rp-id123-2.rp-id123.redpanda.svc.cluster.local.:9644", brokerURLs[2])
})
}
}

func TestUpdateAPIUrlsFromKubernetesDNS(t *testing.T) {
schemes := []string{"http", "https"}

for _, scheme := range schemes {
t.Run(scheme, func(t *testing.T) {
adminAPIURL := scheme + "://" + "redpanda-api.cluster.local:19644"

adminAPIHostURL, err := url.Parse(adminAPIURL)
require.NoError(t, err)

srv, err := mockdns.NewServer(map[string]mockdns.Zone{
"_admin._tcp." + adminAPIHostURL.Hostname() + ".": {
SRV: []net.SRV{
{
Target: "rp-id123-0.rp-id123.redpanda.svc.cluster.local.",
Port: 9644,
Weight: 33,
},
{
Target: "rp-id123-1.rp-id123.redpanda.svc.cluster.local.",
Port: 9644,
Weight: 33,
},
{
Target: "rp-id123-2.rp-id123.redpanda.svc.cluster.local.",
Port: 9644,
Weight: 33,
},
},
},
}, false)
require.NoError(t, err)

defer srv.Close()

srv.PatchNet(net.DefaultResolver)
defer mockdns.UnpatchNet(net.DefaultResolver)

cl, err := NewClient([]string{adminAPIURL}, nil, nil, false)
require.NoError(t, err)
require.NotNil(t, cl)
assert.Len(t, cl.urls, 1)

err = cl.UpdateAPIUrlsFromKubernetesDNS()
require.NoError(t, err)
require.NotNil(t, cl)
assert.Len(t, cl.urls, 3)
assert.Equal(t, scheme+"://"+"rp-id123-0.rp-id123.redpanda.svc.cluster.local.:9644", cl.urls[0])
assert.Equal(t, scheme+"://"+"rp-id123-1.rp-id123.redpanda.svc.cluster.local.:9644", cl.urls[1])
assert.Equal(t, scheme+"://"+"rp-id123-2.rp-id123.redpanda.svc.cluster.local.:9644", cl.urls[2])
})
}
}
6 changes: 6 additions & 0 deletions rpadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/redpanda-data/common-go/rpadmin
go 1.22.2

require (
github.com/foxcpp/go-mockdns v1.1.0
github.com/hashicorp/go-multierror v1.1.1
github.com/redpanda-data/common-go/net v0.1.0
github.com/sethgrid/pester v1.2.0
Expand All @@ -13,7 +14,12 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/miekg/dns v1.1.57 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/tools v0.15.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 7c14761

Please sign in to comment.