Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add protobuf deserialization to consumer and serialization to producer #111

Merged
merged 1 commit into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,10 @@ linters:
- gofmt
- goimports
- revive
- govet
- govet

issues:
exclude-rules:
- linters:
- staticcheck
text: 'SA1019: package github.com/golang/protobuf/jsonpb is deprecated' # dependency of github.com/jhump/protoreflect, see https://github.com/jhump/protoreflect/issues/463
81 changes: 81 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ A command-line interface for interaction with Apache Kafka
- support for avro schemas
- Configuration of different contexts
- directly access kafka clusters inside your kubernetes cluster
- support for consuming and producing protobuf-encoded messages

[![asciicast](https://asciinema.org/a/vmxrTA0h8CAXPnJnSFk5uHKzr.svg)](https://asciinema.org/a/vmxrTA0h8CAXPnJnSFk5uHKzr)

Expand Down Expand Up @@ -119,6 +120,16 @@ contexts:
avro:
schemaRegistry: localhost:8081

# optional: default protobuf messages search paths
protobuf:
importPaths:
- "/usr/include/protobuf"
protoFiles:
- "someMessage.proto"
- "otherMessage.proto"
protosetFiles:
- "/usr/include/protoset/other.protoset"

# optional: changes the default partitioner
defaultPartitioner: "hash"

Expand Down Expand Up @@ -303,6 +314,11 @@ The following example prints keys in hex and values in base64:
kafkactl consume my-topic --print-keys --key-encoding=hex --value-encoding=base64
```

The consumer can convert protobuf messages to JSON in keys (optional) and values:
```bash
kafkactl consume my-topic --value-proto-type MyTopicValue --key-proto-type MyTopicKey --proto-file kafkamsg.proto
```

### Producing messages

Producing messages can be done in multiple ways. If we want to produce a message with `key='my-key'`,
Expand Down Expand Up @@ -379,6 +395,11 @@ Producing null values (tombstone record) is also possible:
kafkactl produce my-topic --null-value
```

Producing protobuf message converted from JSON:
```bash
kafkactl produce my-topic --key='{"keyField":123}' --key-proto-type MyKeyMessage --value='{"valueField":"value"}' --value-proto-type MyValueMessage --proto-file kafkamsg.proto
```

### Avro support

In order to enable avro support you just have to add the schema registry to your configuration:
Expand Down Expand Up @@ -421,6 +442,66 @@ The `consume` command handles this automatically and no configuration is needed.

An additional parameter `print-schema` can be provided to display the schema used for decoding.

### Protobuf support

`kafkactl` can consume and produce protobuf-encoded messages. In order to enable protobuf serialization/deserialization
you should add flag `--value-proto-type` and optionally `--key-proto-type` (if keys encoded in protobuf format)
with type name. Protobuf-encoded messages are mapped with [pbjson](https://developers.google.com/protocol-buffers/docs/proto3#json).

`kafkactl` will search messages in following order:
1. Protoset files specified in `--protoset-file` flag
2. Protoset files specified in `context.protobuf.protosetFiles` config value
3. Proto files specified in `--proto-file` flag
4. Proto files specified in `context.protobuf.protoFiles` config value

Proto files may require some dependencies in `import` sections. To specify additional lookup paths use
`--proto-import-path` flag or `context.protobuf.importPaths` config value.

If provided message types was not found `kafkactl` will return error.

Note that if you want to use raw proto files `protoc` installation don't need to be installed.

Also note that protoset files must be compiled with included imports:
```bash
protoc -o kafkamsg.protoset --include_imports kafkamsg.proto
```

#### Example
Assume you have following proto schema in `kafkamsg.proto`:
```protobuf
syntax = "proto3";

import "google/protobuf/timestamp.proto";

message TopicMessage {
google.protobuf.Timestamp produced_at = 1;
int64 num = 2;
}

message TopicKey {
float fvalue = 1;
}
```
"well-known" `google/protobuf` types are included so no additional proto files needed.

To produce message run
```bash
kafkactl produce <topic> --key '{"fvalue":1.2}' --key-proto-type TopicKey --value '{"producedAt":"2021-12-01T14:10:12Z","num":"1"}' --value-proto-type TopicValue --proto-file kafkamsg.proto
```
or with protoset
```bash
kafkactl produce <topic> --key '{"fvalue":1.2}' --key-proto-type TopicKey --value '{"producedAt":"2021-12-01T14:10:12Z","num":"1"}' --value-proto-type TopicValue --protoset-file kafkamsg.protoset
```

To consume messages run
```bash
kafkactl consume <topic> --key-proto-type TopicKey --value-proto-type TopicValue --proto-file kafkamsg.proto
```
or with protoset
```bash
kafkactl consume <topic> --key-proto-type TopicKey --value-proto-type TopicValue --protoset-file kafkamsg.protoset
```

### Altering topics

Using the `alter topic` command allows you to change the partition count, replication factor and topic-level
Expand Down
5 changes: 5 additions & 0 deletions cmd/consume/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ func NewConsumeCmd() *cobra.Command {
cmdConsume.Flags().StringVarP(&flags.OutputFormat, "output", "o", flags.OutputFormat, "output format. One of: json|yaml")
cmdConsume.Flags().StringVarP(&flags.EncodeKey, "key-encoding", "", flags.EncodeKey, "key encoding (auto-detected by default). One of: none|hex|base64")
cmdConsume.Flags().StringVarP(&flags.EncodeValue, "value-encoding", "", flags.EncodeValue, "value encoding (auto-detected by default). One of: none|hex|base64")
cmdConsume.Flags().StringSliceVarP(&flags.ProtoFiles, "proto-file", "", flags.ProtoFiles, "additional protobuf description file for searching message description")
cmdConsume.Flags().StringSliceVarP(&flags.ProtoImportPaths, "proto-import-path", "", flags.ProtoImportPaths, "additional path to search files listed in proto 'import' directive")
cmdConsume.Flags().StringSliceVarP(&flags.ProtosetFiles, "protoset-file", "", flags.ProtosetFiles, "additional compiled protobuf description file for searching message description")
cmdConsume.Flags().StringVarP(&flags.KeyProtoType, "key-proto-type", "", flags.KeyProtoType, "key protobuf message type")
cmdConsume.Flags().StringVarP(&flags.ValueProtoType, "value-proto-type", "", flags.ValueProtoType, "value protobuf message type")

return cmdConsume
}
117 changes: 117 additions & 0 deletions cmd/consume/consume_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package consume_test

import (
"encoding/hex"
"fmt"
"path/filepath"
"strings"
"testing"
"time"

"github.com/deviceinsight/kafkactl/internal/helpers/protobuf"

"github.com/deviceinsight/kafkactl/testutil"
"github.com/jhump/protoreflect/dynamic"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestConsumeWithKeyAndValueIntegration(t *testing.T) {
Expand Down Expand Up @@ -179,3 +186,113 @@ func TestAvroDeserializationErrorHandlingIntegration(t *testing.T) {
t.Fatalf("expected consumer to fail")
}
}

func TestProtobufConsumeProtoFileIntegration(t *testing.T) {
testutil.StartIntegrationTest(t)

pbTopic := testutil.CreateTopic(t, "proto-file")

kafkaCtl := testutil.CreateKafkaCtlCommand()

protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata")
now := time.Date(2021, time.December, 1, 14, 10, 12, 0, time.UTC)
pbMessageDesc := protobuf.ResolveMessageType(protobuf.SearchContext{
ProtoImportPaths: []string{protoPath},
ProtoFiles: []string{"msg.proto"},
}, "TopicMessage")
pbMessage := dynamic.NewMessage(pbMessageDesc)
pbMessage.SetFieldByNumber(1, timestamppb.New(now))
pbMessage.SetFieldByNumber(2, int64(1))

value, err := pbMessage.Marshal()
if err != nil {
t.Fatalf("Failed to marshal proto message: %s", err)
}

// produce valid pb message
if _, err := kafkaCtl.Execute("produce", pbTopic, "--key", "test-key", "--value", hex.EncodeToString(value), "--value-encoding", "hex", "-H", "key1:value1", "-H", "key\\:2:value\\:2"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", pbTopic, "--from-beginning", "--exit", "--proto-import-path", protoPath, "--proto-file", "msg.proto", "--value-proto-type", "TopicMessage"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, `{"producedAt":"2021-12-01T14:10:12Z","num":"1"}`, kafkaCtl.GetStdOut())
}

func TestProtobufConsumeProtosetFileIntegration(t *testing.T) {
testutil.StartIntegrationTest(t)

pbTopic := testutil.CreateTopic(t, "proto-file")

kafkaCtl := testutil.CreateKafkaCtlCommand()

protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata", "msg.protoset")
now := time.Date(2021, time.December, 1, 14, 10, 12, 0, time.UTC)
pbMessageDesc := protobuf.ResolveMessageType(protobuf.SearchContext{
ProtosetFiles: []string{protoPath},
}, "TopicMessage")
pbMessage := dynamic.NewMessage(pbMessageDesc)
pbMessage.SetFieldByNumber(1, timestamppb.New(now))
pbMessage.SetFieldByNumber(2, int64(1))

value, err := pbMessage.Marshal()
if err != nil {
t.Fatalf("Failed to marshal proto message: %s", err)
}

// produce valid pb message
if _, err := kafkaCtl.Execute("produce", pbTopic, "--key", "test-key", "--value", hex.EncodeToString(value), "--value-encoding", "hex"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", pbTopic, "--from-beginning", "--exit", "--protoset-file", protoPath, "--value-proto-type", "TopicMessage"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, `{"producedAt":"2021-12-01T14:10:12Z","num":"1"}`, kafkaCtl.GetStdOut())
}

func TestProtobufConsumeProtoFileErrNoMessageIntegration(t *testing.T) {
testutil.StartIntegrationTest(t)

pbTopic := testutil.CreateTopic(t, "proto-file")

kafkaCtl := testutil.CreateKafkaCtlCommand()

protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata", "msg.protoset")

if _, err := kafkaCtl.Execute("consume", pbTopic, "--from-beginning", "--exit", "--proto-import-path", protoPath, "--proto-file", "msg.proto", "--value-proto-type", "NonExisting"); err != nil {
testutil.AssertErrorContains(t, "not found in provided files", err)
} else {
t.Fatal("Expected consumer to fail")
}
}

func TestProtobufConsumeProtoFileErrDecodeIntegration(t *testing.T) {
testutil.StartIntegrationTest(t)

pbTopic := testutil.CreateTopic(t, "proto-file")

kafkaCtl := testutil.CreateKafkaCtlCommand()

protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata")

// produce invalid pb message
if _, err := kafkaCtl.Execute("produce", pbTopic, "--key", "test-key", "--value", "nonpb"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", pbTopic, "--from-beginning", "--exit", "--proto-import-path", protoPath, "--proto-file", "msg.proto", "--value-proto-type", "TopicMessage"); err != nil {
testutil.AssertErrorContains(t, "value decode failed: proto: bad wiretype", err)
} else {
t.Fatal("Expected consumer to fail")
}
}
5 changes: 5 additions & 0 deletions cmd/produce/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ func NewProduceCmd() *cobra.Command {
cmdProduce.Flags().StringVarP(&flags.ValueEncoding, "value-encoding", "", flags.ValueEncoding, "value encoding (none by default). One of: none|hex|base64")
cmdProduce.Flags().BoolVarP(&flags.Silent, "silent", "s", false, "do not write to standard output")
cmdProduce.Flags().IntVarP(&flags.RateInSeconds, "rate", "r", -1, "amount of messages per second to produce on the topic")
cmdProduce.Flags().StringSliceVarP(&flags.ProtoFiles, "proto-file", "", flags.ProtoFiles, "additional protobuf description file for searching message description")
cmdProduce.Flags().StringSliceVarP(&flags.ProtoImportPaths, "proto-import-path", "", flags.ProtoImportPaths, "additional path to search files listed in proto 'import' directive")
cmdProduce.Flags().StringSliceVarP(&flags.ProtosetFiles, "protoset-file", "", flags.ProtosetFiles, "additional compiled protobuf description file for searching message description")
cmdProduce.Flags().StringVarP(&flags.KeyProtoType, "key-proto-type", "", flags.KeyProtoType, "key protobuf message type")
cmdProduce.Flags().StringVarP(&flags.ValueProtoType, "value-proto-type", "", flags.ValueProtoType, "value protobuf message type")

return cmdProduce
}
Loading