From a1ce5e6d48fd00d07d45958a5c7482751f0cbb68 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Wed, 12 Jun 2024 17:53:12 +0800 Subject: [PATCH] [Improve] Add admin topic api CreateWithProperties (#1226) ### Motivation To keep consistent with the [Java client](https://github.com/apache/pulsar/pull/12818). ### Modifications - Add admin topic api CreateWithProperties - Add admin topic api GetProperties --- pulsaradmin/pkg/admin/topic.go | 37 ++++++++++++++++++++++++++--- pulsaradmin/pkg/admin/topic_test.go | 33 +++++++++++++++++++++++++ pulsaradmin/pkg/rest/client.go | 22 +++++++++++++++-- 3 files changed, 87 insertions(+), 5 deletions(-) diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go index 7badc63400..82cfc87ead 100644 --- a/pulsaradmin/pkg/admin/topic.go +++ b/pulsaradmin/pkg/admin/topic.go @@ -21,6 +21,8 @@ import ( "fmt" "strconv" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" ) @@ -35,6 +37,20 @@ type Topics interface { // when setting to 0, it will create a non-partitioned topic Create(topic utils.TopicName, partitions int) error + // CreateWithProperties Create a partitioned or non-partitioned topic + // + // @param topic + // topicName struct + // @param partitions + // number of topic partitions, + // when setting to 0, it will create a non-partitioned topic + // @param meta + // topic properties + CreateWithProperties(topic utils.TopicName, partitions int, meta map[string]string) error + + // GetProperties returns the properties of a topic + GetProperties(topic utils.TopicName) (map[string]string, error) + // Delete a topic, this function can delete both partitioned or non-partitioned topic // // @param topic @@ -392,14 +408,29 @@ func (c *pulsarClient) Topics() Topics { } func (t *topics) Create(topic utils.TopicName, partitions int) error { + return t.CreateWithProperties(topic, partitions, nil) +} +func (t *topics) CreateWithProperties(topic utils.TopicName, partitions int, meta map[string]string) error { endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "partitions") - data := &partitions if partitions == 0 { endpoint = t.pulsar.endpoint(t.basePath, topic.GetRestPath()) - data = nil + return t.pulsar.Client.Put(endpoint, meta) + } + data := struct { + Meta map[string]string `json:"properties"` + Partitions int `json:"partitions"` + }{ + Meta: meta, + Partitions: partitions, } + return t.pulsar.Client.PutWithCustomMediaType(endpoint, &data, nil, nil, rest.PartitionedTopicMetaJSON) +} - return t.pulsar.Client.Put(endpoint, data) +func (t *topics) GetProperties(topic utils.TopicName) (map[string]string, error) { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "properties") + var properties map[string]string + err := t.pulsar.Client.Get(endpoint, &properties) + return properties, err } func (t *topics) Delete(topic utils.TopicName, force bool, nonPartitioned bool) error { diff --git a/pulsaradmin/pkg/admin/topic_test.go b/pulsaradmin/pkg/admin/topic_test.go index a3609ff64e..fced05427e 100644 --- a/pulsaradmin/pkg/admin/topic_test.go +++ b/pulsaradmin/pkg/admin/topic_test.go @@ -65,6 +65,39 @@ func TestCreateTopic(t *testing.T) { t.Error("Couldn't find topic: " + topic) } +func TestTopics_CreateWithProperties(t *testing.T) { + topic := newTopicName() + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + + // Create non-partition topic + topicName, err := utils.GetTopicName(topic) + assert.NoError(t, err) + err = admin.Topics().CreateWithProperties(*topicName, 0, map[string]string{ + "key1": "value1", + }) + assert.NoError(t, err) + + properties, err := admin.Topics().GetProperties(*topicName) + assert.NoError(t, err) + assert.Equal(t, properties["key1"], "value1") + + // Create partition topic + topic = newTopicName() + topicName, err = utils.GetTopicName(topic) + assert.NoError(t, err) + err = admin.Topics().CreateWithProperties(*topicName, 4, map[string]string{ + "key2": "value2", + }) + assert.NoError(t, err) + + properties, err = admin.Topics().GetProperties(*topicName) + assert.NoError(t, err) + assert.Equal(t, properties["key2"], "value2") +} + func TestPartitionState(t *testing.T) { randomName := newTopicName() topic := "persistent://public/default/" + randomName diff --git a/pulsaradmin/pkg/rest/client.go b/pulsaradmin/pkg/rest/client.go index a1a79737c1..e6b603f5f8 100644 --- a/pulsaradmin/pkg/rest/client.go +++ b/pulsaradmin/pkg/rest/client.go @@ -26,6 +26,17 @@ import ( "path" ) +type MediaType string + +const ( + ApplicationJSON MediaType = "application/json" + PartitionedTopicMetaJSON MediaType = "application/vnd.partitioned-topic-metadata+json" +) + +func (m MediaType) String() string { + return string(m) +} + // Client is a base client that is used to make http request to the ServiceURL type Client struct { ServiceURL string @@ -65,10 +76,10 @@ func (c *Client) doRequest(r *request) (*http.Response, error) { if r.contentType != "" { req.Header.Set("Content-Type", r.contentType) } else if req.Body != nil { - req.Header.Set("Content-Type", "application/json") + req.Header.Set("Content-Type", ApplicationJSON.String()) } - req.Header.Set("Accept", "application/json") + req.Header.Set("Accept", ApplicationJSON.String()) req.Header.Set("User-Agent", c.useragent()) hc := c.HTTPClient if hc == nil { @@ -160,10 +171,17 @@ func (c *Client) Put(endpoint string, in interface{}) error { } func (c *Client) PutWithQueryParams(endpoint string, in, obj interface{}, params map[string]string) error { + return c.PutWithCustomMediaType(endpoint, in, obj, params, "") +} +func (c *Client) PutWithCustomMediaType(endpoint string, in, obj interface{}, params map[string]string, + mediaType MediaType) error { req, err := c.newRequest(http.MethodPut, endpoint) if err != nil { return err } + if mediaType != "" { + req.contentType = mediaType.String() + } req.obj = in if params != nil {