Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#434] feat: added create topic endpoint #929

Merged
merged 58 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
ed1591f
[#434] feat: added create topic endpoint
ilkerkocatepe Sep 7, 2024
58093e6
[#434] feat: added create topic endpoint - logger edit
ilkerkocatepe Sep 7, 2024
b0275f8
[#434] feat: added create topic endpoint - logger edit
ilkerkocatepe Sep 7, 2024
5e16a8d
[#434] feat: added create topic endpoint - openapiv2.json edits and t…
ilkerkocatepe Sep 21, 2024
4acf8ab
[#434] feat: added create topic endpoint - path changed
ilkerkocatepe Sep 28, 2024
4fdb8ce
[#434] feat: added create topic endpoint - javadoc fix
ilkerkocatepe Sep 28, 2024
cc80b1d
[#434] Update generated documentation
ilkerkocatepe Oct 5, 2024
ee9d6c0
Bumped Snakeyaml to 2.2 (#930)
ppatierno Oct 6, 2024
52c3cd5
Update Kubernetes Config Provider to 1.2.0 (#931)
scholzj Oct 6, 2024
2933bc7
Excluded android related dependency (#933)
ppatierno Oct 12, 2024
c0b57b9
[#434] fix: after review - partitions and replication_factor paramete…
ilkerkocatepe Oct 13, 2024
84c5127
[#434] Update generated documentation
ilkerkocatepe Oct 13, 2024
156223f
[#434] fix: after review - partitions and replication_factor paramete…
ilkerkocatepe Oct 13, 2024
9d3106b
[#434] Update generated documentation
ilkerkocatepe Oct 13, 2024
51ff0ff
[#434] docs: after review fix
ilkerkocatepe Oct 13, 2024
fb496d4
[#434] fix: checkstyle edits
ilkerkocatepe Oct 13, 2024
cd691f8
[#434] fix: checkstyle edits
ilkerkocatepe Oct 13, 2024
0d8f1f1
Merge branch 'strimzi:main' into ISSUE-434
ilkerkocatepe Oct 19, 2024
9cf0039
[#434] docs: method description detailed
ilkerkocatepe Oct 20, 2024
9857748
[#434] docs: removed unnecessary response code
ilkerkocatepe Oct 20, 2024
815d954
[#434] docs: removed unnecessary response code
ilkerkocatepe Oct 20, 2024
6fdaefb
[#434] fix: docs edited after review
ilkerkocatepe Oct 23, 2024
9e9f096
[#434] feat: edits for api standards
ilkerkocatepe Oct 29, 2024
4d70979
Bump Kafka 3.8.1 (#935)
ppatierno Oct 30, 2024
d923836
[#434] fix: edits after review
ilkerkocatepe Oct 30, 2024
ed89a87
Bumped Kafka 3.9.0 (#936)
ppatierno Nov 11, 2024
34c2918
Bump io.netty:netty-common from 4.1.111.Final to 4.1.115.Final (#937)
dependabot[bot] Nov 14, 2024
ea03491
Bumped Vert.x 4.5.11 (#938)
ppatierno Nov 15, 2024
1d76d08
[#434] docs: edits after review
ilkerkocatepe Nov 18, 2024
faeb691
[#434] docs: edits after review
ilkerkocatepe Nov 18, 2024
51c1e0c
[#434] docs: edits after review & changelog
ilkerkocatepe Nov 19, 2024
4446e47
[#434] feat: added create topic endpoint
ilkerkocatepe Sep 7, 2024
36c2fa0
[#434] feat: added create topic endpoint - logger edit
ilkerkocatepe Sep 7, 2024
b17bf93
[#434] feat: added create topic endpoint - logger edit
ilkerkocatepe Sep 7, 2024
06f50ae
[#434] feat: added create topic endpoint - openapiv2.json edits and t…
ilkerkocatepe Sep 21, 2024
7f28846
[#434] feat: added create topic endpoint - path changed
ilkerkocatepe Sep 28, 2024
67528df
[#434] feat: added create topic endpoint - javadoc fix
ilkerkocatepe Sep 28, 2024
d50cb87
[#434] Update generated documentation
ilkerkocatepe Oct 5, 2024
4fa81ca
[#434] fix: after review - partitions and replication_factor paramete…
ilkerkocatepe Oct 13, 2024
cb10851
[#434] Update generated documentation
ilkerkocatepe Oct 13, 2024
be81321
[#434] fix: after review - partitions and replication_factor paramete…
ilkerkocatepe Oct 13, 2024
970ac87
[#434] Update generated documentation
ilkerkocatepe Oct 13, 2024
ddd74d0
[#434] docs: after review fix
ilkerkocatepe Oct 13, 2024
f264f96
[#434] fix: checkstyle edits
ilkerkocatepe Oct 13, 2024
fd00053
[#434] fix: checkstyle edits
ilkerkocatepe Oct 13, 2024
f24b55c
[#434] docs: method description detailed
ilkerkocatepe Oct 20, 2024
bb116ac
[#434] docs: removed unnecessary response code
ilkerkocatepe Oct 20, 2024
9671b29
[#434] docs: removed unnecessary response code
ilkerkocatepe Oct 20, 2024
3f0f72e
[#434] fix: docs edited after review
ilkerkocatepe Oct 23, 2024
1514090
[#434] feat: edits for api standards
ilkerkocatepe Oct 29, 2024
4be7bbe
[#434] fix: edits after review
ilkerkocatepe Oct 30, 2024
603214a
[#434] docs: edits after review
ilkerkocatepe Nov 18, 2024
11c8a7e
[#434] docs: edits after review
ilkerkocatepe Nov 18, 2024
e591f0d
[#434] docs: edits after review & changelog
ilkerkocatepe Nov 19, 2024
6299bf6
Merge remote-tracking branch 'origin/ISSUE-434' into ISSUE-434
ilkerkocatepe Nov 19, 2024
cee7d21
[#434] docs: changelog
ilkerkocatepe Nov 19, 2024
093833e
Revert "Bumped Vert.x 4.5.11 (#938)"
ilkerkocatepe Nov 21, 2024
841d6c5
[#434] docs: changelog
ilkerkocatepe Nov 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

## 0.31.0

* Dependency updates (Vert.x 4.5.10)
* Dependency updates (Kafka 3.9.0, Vert.x 4.5.10)
* Added support for creating a new topic via endpoint.

## 0.30.0

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
e4a70d99cdac5e8dddb45ce30c39817fa9402463074478e0fd3a05d6a7f849d5
cf88d8909114896517ba4027596382126bca594d55dd9a924330232407e603b2
117 changes: 117 additions & 0 deletions documentation/book/api/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2289,6 +2289,87 @@ endif::internal-generation[]
=== Topics


[.createTopic]
==== createTopic

`POST /admin/topics`



===== Description

Creates a topic with given name, partitions count, and replication factor.


// markup not found, no include::{specDir}admin/topics/POST/spec.adoc[opts=optional]



===== Parameters



[cols="2,3,1,1,1"]
.Body Parameter
|===
|Name| Description| Required| Default| Pattern

| NewTopic
| Creates a topic with given name, partitions count, and replication factor. <<NewTopic>>
| X
|
|

|===





===== Return Type




-


===== Responses

.HTTP Response Codes
[cols="2,3,1"]
|===
| Code | Message | Datatype


| 201
| Created
| <<>>

|===

===== Samples


include::{snippetDir}admin/topics/POST/http-request.adoc[opts=optional]


// markup not found, no include::{snippetDir}admin/topics/POST/http-response.adoc[opts=optional]



// file not found, no * wiremock data link :admin/topics/POST/POST.json[]


ifdef::internal-generation[]
===== Implementation

// markup not found, no include::{specDir}admin/topics/POST/implementation.adoc[opts=optional]


endif::internal-generation[]


[.getOffsets]
==== getOffsets

Expand Down Expand Up @@ -3209,6 +3290,42 @@ Information about Kafka Bridge instance.



[#NewTopic]
=== _NewTopic_ NewTopic




[.fields-NewTopic]
[cols="2,1,1,2,4,1"]
|===
| Field Name| Required| Nullable | Type| Description | Format

| topic_name
| X
|
| String
| Name of the topic to create.
|

| partitions_count
|
| X
| Integer
| Number of partitions for the topic.
|

| replication_factor
|
| X
| Integer
| Number of replicas for each partition.
|

|===



[#OffsetCommitSeek]
=== _OffsetCommitSeek_ OffsetCommitSeek

Expand Down
11 changes: 11 additions & 0 deletions documentation/book/api/snippet/admin/topics/POST/http-request.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
==== Example HTTP request

===== Request body
[source,json]
----
{
"topic_name" : "my-topic",
"partitions_count" : 1,
"replication_factor" : 2,
}
----
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@
<log4j.version>2.17.2</log4j.version>
<vertx.version>4.5.10</vertx.version>
<vertx-testing.version>4.5.10</vertx-testing.version>
<netty.version>4.1.111.Final</netty.version>
<kafka.version>3.8.0</kafka.version>
<netty.version>4.1.115.Final</netty.version>
<kafka.version>3.9.0</kafka.version>
<kafka-kubernetes-config-provider.version>1.2.0</kafka-kubernetes-config-provider.version>
<kafka-env-var-config-provider.version>1.1.0</kafka-env-var-config-provider.version>
<maven.checkstyle.version>3.3.0</maven.checkstyle.version>
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/io/strimzi/kafka/bridge/KafkaBridgeAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -84,6 +87,31 @@ public CompletionStage<Set<String>> listTopics() {
return promise;
}

/**
* Creates a topic with given name and number of partitions (optional) and replication factor (optional).
*
* @param topicName topic name to create
* @param partitions number of partitions
* @param replicationFactor replication factor
* @return a CompletionStage Void
*/
public CompletionStage<Void> createTopic(String topicName, Optional<Integer> partitions, Optional<Short> replicationFactor) {
LOGGER.trace("Create topic thread {}", Thread.currentThread());
LOGGER.info("Create topic {}, partitions {}, replicationFactor {}", topicName, partitions, replicationFactor);
CompletableFuture<Void> promise = new CompletableFuture<>();
this.adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, partitions, replicationFactor)))
.all()
.whenComplete((topic, exception) -> {
LOGGER.trace("Create topic callback thread {}", Thread.currentThread());
if (exception == null) {
promise.complete(topic);
} else {
promise.completeExceptionally(exception);
}
});
return promise;
}

/**
* Returns the description of the specified topics.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.http.converter.JsonUtils;
import io.strimzi.kafka.bridge.http.model.HttpBridgeError;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
Expand All @@ -32,6 +33,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -81,6 +83,10 @@ public void handle(RoutingContext routingContext, Handler<HttpBridgeEndpoint> ha
doGetTopic(routingContext);
break;

case CREATE_TOPIC:
doCreateTopic(routingContext);
break;

case LIST_PARTITIONS:
doListPartitions(routingContext);
break;
Expand Down Expand Up @@ -173,6 +179,47 @@ public void doGetTopic(RoutingContext routingContext) {
});
}

/**
* Create a topic with described name, partitions count, and replication factor in the body of the HTTP request
*
* @param routingContext the routing context
*/
public void doCreateTopic(RoutingContext routingContext) {
JsonObject jsonBody = routingContext.body().asJsonObject();

if (jsonBody.isEmpty()) {
HttpBridgeError error = new HttpBridgeError(
HttpResponseStatus.UNPROCESSABLE_ENTITY.code(),
"Request body must be a JSON object"
ppatierno marked this conversation as resolved.
Show resolved Hide resolved
);
HttpUtils.sendResponse(routingContext, HttpResponseStatus.UNPROCESSABLE_ENTITY.code(),
BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson()));
return;
}

String topicName = jsonBody.getString("topic_name");
Optional<Integer> partitionsCount = Optional.ofNullable(jsonBody.getInteger("partitions_count"));
Optional<Short> replicationFactor = Optional.ofNullable(jsonBody.getInteger("replication_factor"))
.map(Integer::shortValue);

this.kafkaBridgeAdmin.createTopic(topicName, partitionsCount, replicationFactor)
.whenComplete(((topic, exception) -> {
LOGGER.trace("Create topic handler thread {}", Thread.currentThread());
if (exception == null) {
JsonNode root = JsonUtils.createObjectNode();
HttpUtils.sendResponse(routingContext, HttpResponseStatus.CREATED.code(),
BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(root));
} else {
HttpBridgeError error = new HttpBridgeError(
HttpResponseStatus.INTERNAL_SERVER_ERROR.code(),
exception.getMessage()
);
HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(),
BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson()));
}
}));
}

/**
* Get partitions information related to the topic in the HTTP request
*
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public void start(Promise<Void> startPromise) {
routerBuilder.operation(this.SEEK_TO_END.getOperationId().toString()).handler(this.SEEK_TO_END);
routerBuilder.operation(this.LIST_TOPICS.getOperationId().toString()).handler(this.LIST_TOPICS);
routerBuilder.operation(this.GET_TOPIC.getOperationId().toString()).handler(this.GET_TOPIC);
routerBuilder.operation(this.CREATE_TOPIC.getOperationId().toString()).handler(this.CREATE_TOPIC);
routerBuilder.operation(this.LIST_PARTITIONS.getOperationId().toString()).handler(this.LIST_PARTITIONS);
routerBuilder.operation(this.GET_PARTITION.getOperationId().toString()).handler(this.GET_PARTITION);
routerBuilder.operation(this.GET_OFFSETS.getOperationId().toString()).handler(this.GET_OFFSETS);
Expand Down Expand Up @@ -381,6 +382,11 @@ private void getTopic(RoutingContext routingContext) {
processAdminClient(routingContext);
}

private void createTopic(RoutingContext routingContext) {
this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.CREATE_TOPIC);
processAdminClient(routingContext);
}

private void listPartitions(RoutingContext routingContext) {
this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.LIST_PARTITIONS);
processAdminClient(routingContext);
Expand Down Expand Up @@ -726,6 +732,14 @@ public void process(RoutingContext routingContext) {
}
};

final HttpOpenApiOperation CREATE_TOPIC = new HttpOpenApiOperation(HttpOpenApiOperations.CREATE_TOPIC) {

@Override
public void process(RoutingContext routingContext) {
createTopic(routingContext);
}
};

final HttpOpenApiOperation LIST_PARTITIONS = new HttpOpenApiOperation(HttpOpenApiOperations.LIST_PARTITIONS) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public enum HttpOpenApiOperations {
LIST_TOPICS("listTopics"),
/** get information for a specific topic */
GET_TOPIC("getTopic"),
/** creates a topic with specified name */
CREATE_TOPIC("createTopic"),
/** list partitions for a specific topic */
LIST_PARTITIONS("listPartitions"),
/** get partition information for a specific topic */
Expand Down
Loading
Loading