Skip to content

Commit

Permalink
Add protobuf deserialization to consumer and serialization to producer
Browse files Browse the repository at this point in the history
  • Loading branch information
xakep666 committed Dec 1, 2021
1 parent 9cb9f72 commit 67a1674
Show file tree
Hide file tree
Showing 20 changed files with 904 additions and 92 deletions.
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ A command-line interface for interaction with Apache Kafka
- support for avro schemas
- Configuration of different contexts
- directly access kafka clusters inside your kubernetes cluster
- support for consuming and producing protobuf-encoded messages:
- from `.proto` IDL
- from compiled `.protoset` file. Note that it must be compiled with `--include_imports` flag.

[![asciicast](https://asciinema.org/a/vmxrTA0h8CAXPnJnSFk5uHKzr.svg)](https://asciinema.org/a/vmxrTA0h8CAXPnJnSFk5uHKzr)

Expand Down Expand Up @@ -119,6 +122,18 @@ contexts:
avro:
schemaRegistry: localhost:8081

# optional: default protobuf messages search paths
protobuf:
importPaths:
- "/usr/include/protobuf"
protoFiles:
- "someMessage.proto"
- "otherMessage.proto"
protosetPaths:
- "/home/user/protosets"
protosetFiles:
- "/usr/include/protoset/other.protoset"

# optional: changes the default partitioner
defaultPartitioner: "hash"

Expand Down Expand Up @@ -303,6 +318,11 @@ The following example prints keys in hex and values in base64:
kafkactl consume my-topic --print-keys --key-encoding=hex --value-encoding=base64
```

The consumer can convert protobuf messages to JSON in keys (optional) and values:
```bash
kafkactl consume my-topic --value-proto-message MyTopicValue --key-proto-message MyTopicKey --proto-file kafkamsg.proto
```

### Producing messages

Producing messages can be done in multiple ways. If we want to produce a message with `key='my-key'`,
Expand Down Expand Up @@ -379,6 +399,11 @@ Producing null values (tombstone record) is also possible:
kafkactl produce my-topic --null-value
```

Producing protobuf message converted from JSON:
```bash
kafkactl produce my-topic --key='{"keyField":123}' --key-proto-message MyKeyMessage --value='{"valueField":"value"}' --value-proto-message MyValueMessage --proto-file kafkamsg.proto
```

### Avro support

In order to enable avro support you just have to add the schema registry to your configuration:
Expand Down
5 changes: 5 additions & 0 deletions cmd/consume/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ func NewConsumeCmd() *cobra.Command {
cmdConsume.Flags().StringVarP(&flags.OutputFormat, "output", "o", flags.OutputFormat, "output format. One of: json|yaml")
cmdConsume.Flags().StringVarP(&flags.EncodeKey, "key-encoding", "", flags.EncodeKey, "key encoding (auto-detected by default). One of: none|hex|base64")
cmdConsume.Flags().StringVarP(&flags.EncodeValue, "value-encoding", "", flags.EncodeValue, "value encoding (auto-detected by default). One of: none|hex|base64")
cmdConsume.Flags().StringSliceVarP(&flags.ProtoFiles, "proto-file", "", flags.ProtoFiles, "additional protobuf description file for searching message description")
cmdConsume.Flags().StringSliceVarP(&flags.ProtoImportPaths, "proto-import-path", "", flags.ProtoImportPaths, "additional path to search files listed in proto 'import' directive")
cmdConsume.Flags().StringSliceVarP(&flags.ProtosetFiles, "protoset-file", "", flags.ProtosetFiles, "additional compiled protobuf description file for searching message description")
cmdConsume.Flags().StringVarP(&flags.KeyProtoMessage, "key-proto-message", "", flags.KeyProtoMessage, "key protobuf message type")
cmdConsume.Flags().StringVarP(&flags.ValueProtoMessage, "value-proto-message", "", flags.ValueProtoMessage, "value protobuf message type")

return cmdConsume
}
116 changes: 116 additions & 0 deletions cmd/consume/consume_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package consume_test

import (
"encoding/hex"
"fmt"
"path/filepath"
"strings"
"testing"
"time"

"github.com/deviceinsight/kafkactl/internal/helpers"
"github.com/deviceinsight/kafkactl/testutil"
"github.com/jhump/protoreflect/dynamic"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestConsumeWithKeyAndValueIntegration(t *testing.T) {
Expand Down Expand Up @@ -179,3 +185,113 @@ func TestAvroDeserializationErrorHandlingIntegration(t *testing.T) {
t.Fatalf("expected consumer to fail")
}
}

func TestProtobufConsumeProtoFileIntegration(t *testing.T) {
testutil.StartIntegrationTest(t)

pbTopic := testutil.CreateTopic(t, "proto-file")

kafkaCtl := testutil.CreateKafkaCtlCommand()

protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata")
now := time.Date(2021, time.December, 1, 14, 10, 12, 0, time.UTC)
pbMessageDesc := helpers.ResolveProtoMessage(helpers.ProtoSearch{
ProtoImportPaths: []string{protoPath},
ProtoFiles: []string{"msg.proto"},
}, "TopicMessage")
pbMessage := dynamic.NewMessage(pbMessageDesc)
pbMessage.SetFieldByNumber(1, timestamppb.New(now))
pbMessage.SetFieldByNumber(2, int64(1))

value, err := pbMessage.Marshal()
if err != nil {
t.Fatalf("Failed to marshal proto message: %s", err)
}

// produce valid pb message
if _, err := kafkaCtl.Execute("produce", pbTopic, "--key", "test-key", "--value", hex.EncodeToString(value), "--value-encoding", "hex", "-H", "key1:value1", "-H", "key\\:2:value\\:2"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", pbTopic, "--from-beginning", "--exit", "--proto-import-path", protoPath, "--proto-file", "msg.proto", "--value-proto-message", "TopicMessage"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, `{"producedAt":"2021-12-01T14:10:12Z","num":"1"}`, kafkaCtl.GetStdOut())
}

func TestProtobufConsumeProtosetFileIntegration(t *testing.T) {
testutil.StartIntegrationTest(t)

pbTopic := testutil.CreateTopic(t, "proto-file")

kafkaCtl := testutil.CreateKafkaCtlCommand()

protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata", "msg.protoset")
now := time.Date(2021, time.December, 1, 14, 10, 12, 0, time.UTC)
pbMessageDesc := helpers.ResolveProtoMessage(helpers.ProtoSearch{
ProtosetFiles: []string{protoPath},
}, "TopicMessage")
pbMessage := dynamic.NewMessage(pbMessageDesc)
pbMessage.SetFieldByNumber(1, timestamppb.New(now))
pbMessage.SetFieldByNumber(2, int64(1))

value, err := pbMessage.Marshal()
if err != nil {
t.Fatalf("Failed to marshal proto message: %s", err)
}

// produce valid pb message
if _, err := kafkaCtl.Execute("produce", pbTopic, "--key", "test-key", "--value", hex.EncodeToString(value), "--value-encoding", "hex", "-H", "key1:value1", "-H", "key\\:2:value\\:2"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", pbTopic, "--from-beginning", "--exit", "--protoset-file", protoPath, "--value-proto-message", "TopicMessage"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, `{"producedAt":"2021-12-01T14:10:12Z","num":"1"}`, kafkaCtl.GetStdOut())
}

func TestProtobufConsumeProtoFileErrNoMessageIntegration(t *testing.T) {
testutil.StartIntegrationTest(t)

pbTopic := testutil.CreateTopic(t, "proto-file")

kafkaCtl := testutil.CreateKafkaCtlCommand()

protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata", "msg.protoset")

if _, err := kafkaCtl.Execute("consume", pbTopic, "--from-beginning", "--exit", "--proto-import-path", protoPath, "--proto-file", "msg.proto", "--value-proto-message", "NonExisting"); err != nil {
testutil.AssertErrorContains(t, "not found in provided files", err)
} else {
t.Fatal("Expected consumer to fail")
}
}

func TestProtobufConsumeProtoFileErrDecodeIntegration(t *testing.T) {
testutil.StartIntegrationTest(t)

pbTopic := testutil.CreateTopic(t, "proto-file")

kafkaCtl := testutil.CreateKafkaCtlCommand()

protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata")

// produce invalid pb message
if _, err := kafkaCtl.Execute("produce", pbTopic, "--key", "test-key", "--value", "nonpb", "-H", "key1:value1", "-H", "key\\:2:value\\:2"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", pbTopic, "--from-beginning", "--exit", "--proto-import-path", protoPath, "--proto-file", "msg.proto", "--value-proto-message", "TopicMessage"); err != nil {
testutil.AssertErrorContains(t, "value decode failed: proto: bad wiretype", err)
} else {
t.Fatal("Expected consumer to fail")
}
}
5 changes: 5 additions & 0 deletions cmd/produce/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ func NewProduceCmd() *cobra.Command {
cmdProduce.Flags().StringVarP(&flags.ValueEncoding, "value-encoding", "", flags.ValueEncoding, "value encoding (none by default). One of: none|hex|base64")
cmdProduce.Flags().BoolVarP(&flags.Silent, "silent", "s", false, "do not write to standard output")
cmdProduce.Flags().IntVarP(&flags.RateInSeconds, "rate", "r", -1, "amount of messages per second to produce on the topic")
cmdProduce.Flags().StringSliceVarP(&flags.ProtoFiles, "proto-file", "", flags.ProtoFiles, "additional protobuf description file for searching message description")
cmdProduce.Flags().StringSliceVarP(&flags.ProtoImportPaths, "proto-import-path", "", flags.ProtoImportPaths, "additional path to search files listed in proto 'import' directive")
cmdProduce.Flags().StringSliceVarP(&flags.ProtosetFiles, "protoset-file", "", flags.ProtosetFiles, "additional compiled protobuf description file for searching message description")
cmdProduce.Flags().StringVarP(&flags.KeyProtoMessage, "key-proto-message", "", flags.KeyProtoMessage, "key protobuf message type")
cmdProduce.Flags().StringVarP(&flags.ValueProtoMessage, "value-proto-message", "", flags.ValueProtoMessage, "value protobuf message type")

return cmdProduce
}
Loading

0 comments on commit 67a1674

Please sign in to comment.