Skip to content

Commit

Permalink
[tools] Pulsar Client: add ability to produce KV messages (apache#11303)
Browse files Browse the repository at this point in the history
### Motivation

Currently (Pulsar 2.8.0) it is not easy to produce messages with KeyValue encoding, because command line tools do not provide such support.
With this change the user will be able to set the schema while using `pulsar-client produce` 

We are adding three parameters:
* "--key-schema" : this is the schema for the Key (default :"string")
* "--value-schema": this is the schema for the Value (default: "bytes")
* "--key-value-encoding-type": this is the type of encoding with values: none,separated,inline
with key-value-encoding-type=node (default behaviour) we are not using KV encoding

The command is 100% compatible with previous versions

### Modifications

Add support for the properties listed above.
We are using "Schema.AUTO_PRODUCE_BYTES" in order to deal with the Schema registry.
The user will pass the raw value as message and we are passing it without modifications to Pulsar.

Example command to send a KV message with JSON key and value:
`bin/pulsar-client produce --key-value-encoding-type separated -k '{"a":"b"}' -m '{"a":"b"}' --key-schema 'json:{"type": "record","namespace": "com.example","name": "FullName", "fields": [{ "name": "a", "type": "string" }]} '  --value-schema 'json:{"type": "record","namespace": "com.example","name": "FullName", "fields": [{ "name": "a", "type": "string" }]} ' test`

for AVRO and JSON the schema is written inline after the prefix "avro:" and "json:"

### Verifying this change

This change added unit tests
  • Loading branch information
eolivelli authored Aug 13, 2021
1 parent 3dd9ec5 commit 17ab040
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.gson.JsonParseException;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
Expand All @@ -38,21 +39,26 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.eclipse.jetty.util.ssl.SslContextFactory;
Expand All @@ -76,6 +82,9 @@ public class CmdProduce {

private static final Logger LOG = LoggerFactory.getLogger(PulsarClientTool.class);
private static final int MAX_MESSAGES = 1000;
static final String KEY_VALUE_ENCODING_TYPE_NOT_SET = "";
private static final String KEY_VALUE_ENCODING_TYPE_SEPARATED = "separated";
private static final String KEY_VALUE_ENCODING_TYPE_INLINE = "inline";

@Parameter(description = "TopicName", required = true)
private List<String> mainOptions;
Expand Down Expand Up @@ -114,6 +123,15 @@ public class CmdProduce {
@Parameter(names = { "-k", "--key"}, description = "message key to add ")
private String key;

@Parameter(names = { "-vs", "--value-schema"}, description = "Schema type (can be bytes,avro,json,string...)")
private String valueSchema = "bytes";

@Parameter(names = { "-ks", "--key-schema"}, description = "Schema type (can be bytes,avro,json,string...)")
private String keySchema = "string";

@Parameter(names = { "-kvet", "--key-value-encoding-type"}, description = "Key Value Encoding Type (it can be separated or inline)")
private String keyValueEncodingType = null;

@Parameter(names = { "-ekn", "--encryption-key-name" }, description = "The public key name to encrypt payload")
private String encKeyName = null;

Expand Down Expand Up @@ -190,6 +208,18 @@ public int run() throws PulsarClientException {
throw (new ParameterException("Please supply message content with either --messages or --files"));
}

if (keyValueEncodingType == null) {
keyValueEncodingType = KEY_VALUE_ENCODING_TYPE_NOT_SET;
} else {
switch (keyValueEncodingType) {
case KEY_VALUE_ENCODING_TYPE_SEPARATED:
case KEY_VALUE_ENCODING_TYPE_INLINE:
break;
default:
throw (new ParameterException("--key-value-encoding-type "+keyValueEncodingType+" is not valid, only 'separated' or 'inline'"));
}
}

int totalMessages = (messages.size() + messageFileNames.size()) * numTimesProduce;
if (totalMessages > MAX_MESSAGES) {
String msg = "Attempting to send " + totalMessages + " messages. Please do not send more than "
Expand All @@ -212,7 +242,8 @@ private int publish(String topic) {

try {
PulsarClient client = clientBuilder.build();
ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topic);
Schema<?> schema = buildSchema(this.keySchema, this.valueSchema, this.keyValueEncodingType);
ProducerBuilder<?> producerBuilder = client.newProducer(schema).topic(topic);
if (this.chunkingAllowed) {
producerBuilder.enableChunking(true);
producerBuilder.enableBatching(false);
Expand All @@ -221,7 +252,7 @@ private int publish(String topic) {
producerBuilder.addEncryptionKey(this.encKeyName);
producerBuilder.defaultCryptoKeyReader(this.encKeyValue);
}
Producer<byte[]> producer = producerBuilder.create();
Producer<?> producer = producerBuilder.create();

List<byte[]> messageBodies = generateMessageBodies(this.messages, this.messageFileNames);
RateLimiter limiter = (this.publishRate > 0) ? RateLimiter.create(this.publishRate) : null;
Expand All @@ -238,17 +269,33 @@ private int publish(String topic) {
limiter.acquire();
}

TypedMessageBuilder<byte[]> message = producer.newMessage();
TypedMessageBuilder message = producer.newMessage();

if (!kvMap.isEmpty()) {
message.properties(kvMap);
}

if (key != null && !key.isEmpty()) {
message.key(key);
switch (keyValueEncodingType) {
case KEY_VALUE_ENCODING_TYPE_NOT_SET:
if (key != null && !key.isEmpty()) {
message.key(key);
}
message.value(content);
break;
case KEY_VALUE_ENCODING_TYPE_SEPARATED:
case KEY_VALUE_ENCODING_TYPE_INLINE:
KeyValue kv = new KeyValue<>(
// TODO: support AVRO encoded key
key != null ? key.getBytes(StandardCharsets.UTF_8) : null,
content);
message.value(kv);
break;
default:
throw new IllegalStateException();
}

message.value(content).send();
message.send();


numMessagesSent++;
}
Expand All @@ -265,6 +312,51 @@ private int publish(String topic) {
return returnCode;
}

static Schema<?> buildSchema(String keySchema, String schema, String keyValueEncodingType) {
switch (keyValueEncodingType) {
case KEY_VALUE_ENCODING_TYPE_NOT_SET:
return buildComponentSchema(schema);
case KEY_VALUE_ENCODING_TYPE_SEPARATED:
return Schema.KeyValue(buildComponentSchema(keySchema), buildComponentSchema(schema), KeyValueEncodingType.SEPARATED);
case KEY_VALUE_ENCODING_TYPE_INLINE:
return Schema.KeyValue(buildComponentSchema(keySchema), buildComponentSchema(schema), KeyValueEncodingType.INLINE);
default:
throw new IllegalArgumentException("Invalid KeyValueEncodingType "+keyValueEncodingType+", only: 'none','separated' and 'inline");
}
}

private static Schema<?> buildComponentSchema(String schema) {
Schema<?> base;
switch (schema) {
case "string":
base = Schema.STRING;
break;
case "bytes":
// no need for wrappers
return Schema.BYTES;
default:
if (schema.startsWith("avro:")) {
base = buildGenericSchema(SchemaType.AVRO, schema.substring(5));
} else if (schema.startsWith("json:")) {
base = buildGenericSchema(SchemaType.JSON, schema.substring(5));
} else {
throw new IllegalArgumentException("Invalid schema type: "+schema);
}
}
return Schema.AUTO_PRODUCE_BYTES(base);
}

private static Schema<?> buildGenericSchema(SchemaType type, String definition) {
return Schema.generic(SchemaInfoImpl
.builder()
.schema(definition.getBytes(StandardCharsets.UTF_8))
.name("client")
.properties(new HashMap<>())
.type(type)
.build());

}

@SuppressWarnings("deprecation")
@VisibleForTesting
public String getProduceBaseEndPoint(String topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@
*/
package org.apache.pulsar.client.cli;

import org.testng.Assert;

import static org.testng.Assert.assertEquals;

import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class TestCmdProduce {

CmdProduce cmdProduce;
Expand All @@ -35,10 +41,35 @@ public void setUp() {
@Test
public void testGetProduceBaseEndPoint() {
String topicNameV1 = "persistent://public/cluster/default/issue-11067";
Assert.assertEquals(cmdProduce.getProduceBaseEndPoint(topicNameV1),
assertEquals(cmdProduce.getProduceBaseEndPoint(topicNameV1),
"ws://localhost:8080/ws/producer/persistent/public/cluster/default/issue-11067");
String topicNameV2 = "persistent://public/default/issue-11067";
Assert.assertEquals(cmdProduce.getProduceBaseEndPoint(topicNameV2),
assertEquals(cmdProduce.getProduceBaseEndPoint(topicNameV2),
"ws://localhost:8080/ws/v2/producer/persistent/public/default/issue-11067");
}

@Test
public void testBuildSchema() {
// default
assertEquals(SchemaType.BYTES, CmdProduce.buildSchema("string", "bytes", CmdProduce.KEY_VALUE_ENCODING_TYPE_NOT_SET).getSchemaInfo().getType());

// simple key value
assertEquals(SchemaType.KEY_VALUE, CmdProduce.buildSchema("string", "string", "separated").getSchemaInfo().getType());
assertEquals(SchemaType.KEY_VALUE, CmdProduce.buildSchema("string", "string", "inline").getSchemaInfo().getType());

KeyValueSchema<?, ?> composite1 = (KeyValueSchema<?, ?>) CmdProduce.buildSchema("string",
"json:{\"type\": \"record\",\"namespace\": \"com.example\",\"name\": \"FullName\", \"fields\": [{ \"name\": \"a\", \"type\": \"string\" }]}",
"inline");
assertEquals(KeyValueEncodingType.INLINE, composite1.getKeyValueEncodingType());
assertEquals(SchemaType.STRING, composite1.getKeySchema().getSchemaInfo().getType());
assertEquals(SchemaType.JSON, composite1.getValueSchema().getSchemaInfo().getType());

KeyValueSchema<?, ?> composite2 = (KeyValueSchema<?, ?>) CmdProduce.buildSchema(
"json:{\"type\": \"record\",\"namespace\": \"com.example\",\"name\": \"FullName\", \"fields\": [{ \"name\": \"a\", \"type\": \"string\" }]}",
"avro:{\"type\": \"record\",\"namespace\": \"com.example\",\"name\": \"FullName\", \"fields\": [{ \"name\": \"a\", \"type\": \"string\" }]}",
"inline");
assertEquals(KeyValueEncodingType.INLINE, composite2.getKeyValueEncodingType());
assertEquals(SchemaType.JSON, composite2.getKeySchema().getSchemaInfo().getType());
assertEquals(SchemaType.AVRO, composite2.getValueSchema().getSchemaInfo().getType());
}
}

0 comments on commit 17ab040

Please sign in to comment.