Skip to content

Commit

Permalink
[Improve] Add admin topic api CreateWithProperties (apache#1226)
Browse files Browse the repository at this point in the history
### Motivation

To keep consistent with the [Java client](apache/pulsar#12818).


### Modifications

- Add admin topic api CreateWithProperties
-  Add admin topic api GetProperties
  • Loading branch information
crossoverJie authored Jun 12, 2024
1 parent 6876346 commit a1ce5e6
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 5 deletions.
37 changes: 34 additions & 3 deletions pulsaradmin/pkg/admin/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"strconv"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
)

Expand All @@ -35,6 +37,20 @@ type Topics interface {
// when setting to 0, it will create a non-partitioned topic
Create(topic utils.TopicName, partitions int) error

// CreateWithProperties Create a partitioned or non-partitioned topic
//
// @param topic
// topicName struct
// @param partitions
// number of topic partitions,
// when setting to 0, it will create a non-partitioned topic
// @param meta
// topic properties
CreateWithProperties(topic utils.TopicName, partitions int, meta map[string]string) error

// GetProperties returns the properties of a topic
GetProperties(topic utils.TopicName) (map[string]string, error)

// Delete a topic, this function can delete both partitioned or non-partitioned topic
//
// @param topic
Expand Down Expand Up @@ -392,14 +408,29 @@ func (c *pulsarClient) Topics() Topics {
}

func (t *topics) Create(topic utils.TopicName, partitions int) error {
return t.CreateWithProperties(topic, partitions, nil)
}
func (t *topics) CreateWithProperties(topic utils.TopicName, partitions int, meta map[string]string) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "partitions")
data := &partitions
if partitions == 0 {
endpoint = t.pulsar.endpoint(t.basePath, topic.GetRestPath())
data = nil
return t.pulsar.Client.Put(endpoint, meta)
}
data := struct {
Meta map[string]string `json:"properties"`
Partitions int `json:"partitions"`
}{
Meta: meta,
Partitions: partitions,
}
return t.pulsar.Client.PutWithCustomMediaType(endpoint, &data, nil, nil, rest.PartitionedTopicMetaJSON)
}

return t.pulsar.Client.Put(endpoint, data)
func (t *topics) GetProperties(topic utils.TopicName) (map[string]string, error) {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "properties")
var properties map[string]string
err := t.pulsar.Client.Get(endpoint, &properties)
return properties, err
}

func (t *topics) Delete(topic utils.TopicName, force bool, nonPartitioned bool) error {
Expand Down
33 changes: 33 additions & 0 deletions pulsaradmin/pkg/admin/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,39 @@ func TestCreateTopic(t *testing.T) {
t.Error("Couldn't find topic: " + topic)
}

func TestTopics_CreateWithProperties(t *testing.T) {
topic := newTopicName()
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)

// Create non-partition topic
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().CreateWithProperties(*topicName, 0, map[string]string{
"key1": "value1",
})
assert.NoError(t, err)

properties, err := admin.Topics().GetProperties(*topicName)
assert.NoError(t, err)
assert.Equal(t, properties["key1"], "value1")

// Create partition topic
topic = newTopicName()
topicName, err = utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().CreateWithProperties(*topicName, 4, map[string]string{
"key2": "value2",
})
assert.NoError(t, err)

properties, err = admin.Topics().GetProperties(*topicName)
assert.NoError(t, err)
assert.Equal(t, properties["key2"], "value2")
}

func TestPartitionState(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
Expand Down
22 changes: 20 additions & 2 deletions pulsaradmin/pkg/rest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ import (
"path"
)

type MediaType string

const (
ApplicationJSON MediaType = "application/json"
PartitionedTopicMetaJSON MediaType = "application/vnd.partitioned-topic-metadata+json"
)

func (m MediaType) String() string {
return string(m)
}

// Client is a base client that is used to make http request to the ServiceURL
type Client struct {
ServiceURL string
Expand Down Expand Up @@ -65,10 +76,10 @@ func (c *Client) doRequest(r *request) (*http.Response, error) {
if r.contentType != "" {
req.Header.Set("Content-Type", r.contentType)
} else if req.Body != nil {
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Type", ApplicationJSON.String())
}

req.Header.Set("Accept", "application/json")
req.Header.Set("Accept", ApplicationJSON.String())
req.Header.Set("User-Agent", c.useragent())
hc := c.HTTPClient
if hc == nil {
Expand Down Expand Up @@ -160,10 +171,17 @@ func (c *Client) Put(endpoint string, in interface{}) error {
}

func (c *Client) PutWithQueryParams(endpoint string, in, obj interface{}, params map[string]string) error {
return c.PutWithCustomMediaType(endpoint, in, obj, params, "")
}
func (c *Client) PutWithCustomMediaType(endpoint string, in, obj interface{}, params map[string]string,
mediaType MediaType) error {
req, err := c.newRequest(http.MethodPut, endpoint)
if err != nil {
return err
}
if mediaType != "" {
req.contentType = mediaType.String()
}
req.obj = in

if params != nil {
Expand Down

0 comments on commit a1ce5e6

Please sign in to comment.