Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
labuladong committed Jun 26, 2024
1 parent aa090be commit bfe57d5
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 0 deletions.
57 changes: 57 additions & 0 deletions pulsaradmin/pkg/admin/brokers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
package admin

import (
"encoding/json"
"net/http"
"net/url"
"os"
"testing"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/auth"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -91,3 +96,55 @@ func TestUpdateDynamicConfiguration(t *testing.T) {
assert.NoError(t, err)
assert.NotEmpty(t, configurations)
}

func TestUpdateDynamicConfigurationWithCustomURL(t *testing.T) {
readFile, err := os.ReadFile("../../../integration-tests/tokens/admin-token")
assert.NoError(t, err)
cfg := &config.Config{
WebServiceURL: DefaultWebServiceURL,
Token: string(readFile),
}

authProvider, err := auth.GetAuthProvider(cfg)
assert.NoError(t, err)

client := rest.Client{
ServiceURL: cfg.WebServiceURL,
VersionInfo: ReleaseVersion,
HTTPClient: &http.Client{
Timeout: DefaultHTTPTimeOutDuration,
Transport: authProvider,
},
}
u, err := url.Parse(cfg.WebServiceURL)
assert.NoError(t, err)

// example config value with '/'
value := `{"key/123":"https://example.com/"}`
encoded := url.QueryEscape(value)

resp, err := client.MakeRequestWithURL(http.MethodPost, &url.URL{
Scheme: u.Scheme,
User: u.User,
Host: u.Host,
Path: "/admin/v2/brokers/configuration/allowAutoSubscriptionCreation/" + value,
RawPath: "/admin/v2/brokers/configuration/allowAutoSubscriptionCreation/" + encoded,
})
assert.NoError(t, err)
defer resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode)

// get the config, check if it's updated
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)

configurations, err := admin.Brokers().GetAllDynamicConfigurations()
assert.NoError(t, err)
assert.NotEmpty(t, configurations)

var m map[string]interface{}
err = json.Unmarshal([]byte(configurations["allowAutoSubscriptionCreation"]), &m)
assert.NoError(t, err)
assert.Equal(t, "https://example.com/", m["key/123"])
}
23 changes: 23 additions & 0 deletions pulsaradmin/pkg/rest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ func (c *Client) newRequest(method, path string) (*request, error) {
return req, nil
}

func (c *Client) newRequestWithURL(method string, urlOpt *url.URL) (*request, error) {
req := &request{
method: method,
url: urlOpt,
params: make(url.Values),
}
return req, nil
}

func (c *Client) doRequest(r *request) (*http.Response, error) {
req, err := r.toHTTP()
if err != nil {
Expand Down Expand Up @@ -104,6 +113,20 @@ func (c *Client) MakeRequest(method, endpoint string) (*http.Response, error) {
return resp, nil
}

func (c *Client) MakeRequestWithURL(method string, urlOpt *url.URL) (*http.Response, error) {
req, err := c.newRequestWithURL(method, urlOpt)
if err != nil {
return nil, err
}

resp, err := checkSuccessful(c.doRequest(req))
if err != nil {
return nil, err
}

return resp, nil
}

func (c *Client) Get(endpoint string, obj interface{}) error {
_, err := c.GetWithQueryParams(endpoint, obj, nil, true)
return err
Expand Down

0 comments on commit bfe57d5

Please sign in to comment.