Skip to content

Commit

Permalink
Avro Schema registry kafka pubsub implementation (dapr#3292)
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick Assuied <[email protected]>
Co-authored-by: Alessandro (Ale) Segala <[email protected]>
  • Loading branch information
passuied and ItalyPaleAle authored Jan 10, 2024
1 parent 7d39c46 commit 419f03f
Show file tree
Hide file tree
Showing 18 changed files with 1,108 additions and 24 deletions.
3 changes: 1 addition & 2 deletions bindings/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import (
"sync"
"sync/atomic"

"github.com/dapr/kit/logger"

"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/common/component/kafka"
contribMetadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
)

const (
Expand Down
29 changes: 28 additions & 1 deletion bindings/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -268,4 +268,31 @@ metadata:
This is potentially insecure and not recommended for use in production.
example: "true"
default: "false"
type: bool
type: bool
- name: schemaRegistryURL
type: string
description: |
The Schema Registry URL.
example: '"http://localhost:8081"'
- name: schemaRegistryAPIKey
type: string
description: |
The Schema Registry credentials API Key.
example: '"XYAXXAZ"'
- name: schemaRegistryAPISecret
type: string
description: |
The Schema Registry credentials API Secret.
example: '"ABCDEFGMEADFF"'
- name: schemaCachingEnabled
type: bool
description: |
Enables caching for schemas.
example: '"true"'
default: '"true"'
- name: SchemaLatestVersionCacheTTL
type: duration
description: |
The TTL for schema caching when publishing a message with latest schema available.
example: '"5m"'
default: '"5m"'
17 changes: 15 additions & 2 deletions common/component/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,17 @@ func (consumer *consumer) doBulkCallback(session sarama.ConsumerGroupSession,
for i, message := range messages {
if message != nil {
metadata := GetEventMetadata(message)
handlerConfig, err := consumer.k.GetTopicHandlerConfig(message.Topic)
if err != nil {
return err
}
messageVal, err := consumer.k.DeserializeValue(message, handlerConfig)
if err != nil {
return err
}
childMessage := KafkaBulkMessageEntry{
EntryId: strconv.Itoa(i),
Event: message.Value,
Event: messageVal,
Metadata: metadata,
}
messageValues[i] = childMessage
Expand Down Expand Up @@ -184,9 +192,14 @@ func (consumer *consumer) doCallback(session sarama.ConsumerGroupSession, messag
if !handlerConfig.IsBulkSubscribe && handlerConfig.Handler == nil {
return errors.New("invalid handler config for subscribe call")
}

messageVal, err := consumer.k.DeserializeValue(message, handlerConfig)
if err != nil {
return err
}
event := NewEvent{
Topic: message.Topic,
Data: message.Value,
Data: messageVal,
}
event.Metadata = GetEventMetadata(message)

Expand Down
186 changes: 186 additions & 0 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@ package kafka

import (
"context"
"encoding/binary"
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/IBM/sarama"
"github.com/linkedin/goavro/v2"
"github.com/riferrei/srclient"

"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
kitmd "github.com/dapr/kit/metadata"
"github.com/dapr/kit/retry"
)

Expand All @@ -42,6 +48,14 @@ type Kafka struct {
subscribeTopics TopicHandlerConfig
subscribeLock sync.Mutex

// schema registry settings
srClient srclient.ISchemaRegistryClient
schemaCachingEnabled bool
latestSchemaCache map[string]SchemaCacheEntry
latestSchemaCacheTTL time.Duration
latestSchemaCacheWriteLock sync.RWMutex
latestSchemaCacheReadLock sync.Mutex

// used for background logic that cannot use the context passed to the Init function
internalContext context.Context
internalContextCancel func()
Expand All @@ -55,6 +69,39 @@ type Kafka struct {
consumeRetryInterval time.Duration
}

type SchemaType int

const (
None SchemaType = iota
Avro
)

type SchemaCacheEntry struct {
schema *srclient.Schema
codec *goavro.Codec
expirationTime time.Time
}

func GetValueSchemaType(metadata map[string]string) (SchemaType, error) {
schemaTypeStr, ok := kitmd.GetMetadataProperty(metadata, valueSchemaType)
if ok {
v, err := parseSchemaType(schemaTypeStr)
return v, err
}
return None, nil
}

func parseSchemaType(sVal string) (SchemaType, error) {
switch strings.ToLower(sVal) {
case "avro":
return Avro, nil
case "none":
return None, nil
default:
return None, fmt.Errorf("error parsing schema type. '%s' is not a supported value", sVal)
}
}

func NewKafka(logger logger.Logger) *Kafka {
return &Kafka{
logger: logger,
Expand Down Expand Up @@ -146,6 +193,18 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
k.consumeRetryEnabled = meta.ConsumeRetryEnabled
k.consumeRetryInterval = meta.ConsumeRetryInterval

if meta.SchemaRegistryURL != "" {
k.srClient = srclient.CreateSchemaRegistryClient(meta.SchemaRegistryURL)
// Empty password is a possibility
if meta.SchemaRegistryAPIKey != "" {
k.srClient.SetCredentials(meta.SchemaRegistryAPIKey, meta.SchemaRegistryAPISecret)
}
k.srClient.CachingEnabled(meta.SchemaCachingEnabled)
if meta.SchemaCachingEnabled {
k.latestSchemaCache = make(map[string]SchemaCacheEntry)
k.latestSchemaCacheTTL = meta.SchemaLatestVersionCacheTTL
}
}
k.logger.Debug("Kafka message bus initialization complete")

return nil
Expand All @@ -166,6 +225,132 @@ func (k *Kafka) Close() (err error) {
return err
}

func getSchemaSubject(topic string) string {
// For now assumes that subject is named after topic (e.g. `my-topic-value`)
return topic + "-value"
}

func (k *Kafka) DeserializeValue(message *sarama.ConsumerMessage, config SubscriptionHandlerConfig) ([]byte, error) {
switch config.ValueSchemaType {
case Avro:
srClient, err := k.getSchemaRegistyClient()
if err != nil {
return nil, err
}
if len(message.Value) < 5 {
return nil, fmt.Errorf("value is too short")
}
schemaID := binary.BigEndian.Uint32(message.Value[1:5])
schema, err := srClient.GetSchema(int(schemaID))
if err != nil {
return nil, err
}
// The data coming through is standard JSON. The version currently supported by srclient doesn't support this yet
// Use this specific codec instead.
codec, err := goavro.NewCodecForStandardJSONFull(schema.Schema())
if err != nil {
return nil, err
}
native, _, err := codec.NativeFromBinary(message.Value[5:])
if err != nil {
return nil, err
}
value, err := codec.TextualFromNative(nil, native)
if err != nil {
return nil, err
}
return value, nil
default:
return message.Value, nil
}
}

func (k *Kafka) getLatestSchema(topic string) (*srclient.Schema, *goavro.Codec, error) {
srClient, err := k.getSchemaRegistyClient()
if err != nil {
return nil, nil, err
}

subject := getSchemaSubject(topic)
if k.schemaCachingEnabled {
k.latestSchemaCacheReadLock.Lock()
cacheEntry, ok := k.latestSchemaCache[subject]
k.latestSchemaCacheReadLock.Unlock()

// Cache present and not expired
if ok && cacheEntry.expirationTime.After(time.Now()) {
return cacheEntry.schema, cacheEntry.codec, nil
}
schema, errSchema := srClient.GetLatestSchema(subject)
if errSchema != nil {
return nil, nil, errSchema
}
// New JSON standard serialization/Deserialization is not integrated in srclient yet.
// Since standard json is passed from dapr, it is needed.
codec, errCodec := goavro.NewCodecForStandardJSONFull(schema.Schema())
if errCodec != nil {
return nil, nil, errCodec
}
k.latestSchemaCacheWriteLock.Lock()
k.latestSchemaCache[subject] = SchemaCacheEntry{schema: schema, codec: codec, expirationTime: time.Now().Add(k.latestSchemaCacheTTL)}
k.latestSchemaCacheWriteLock.Unlock()
return schema, codec, nil
}
schema, err := srClient.GetLatestSchema(getSchemaSubject(topic))
if err != nil {
return nil, nil, err
}
codec, err := goavro.NewCodecForStandardJSONFull(schema.Schema())
if err != nil {
return nil, nil, err
}

return schema, codec, nil
}

func (k *Kafka) getSchemaRegistyClient() (srclient.ISchemaRegistryClient, error) {
if k.srClient == nil {
return nil, errors.New("schema registry details not set")
}

return k.srClient, nil
}

func (k *Kafka) SerializeValue(topic string, data []byte, metadata map[string]string) ([]byte, error) {
valueSchemaType, err := GetValueSchemaType(metadata)
if err != nil {
return nil, err
}

switch valueSchemaType {
case Avro:
schema, codec, err := k.getLatestSchema(topic)
if err != nil {
return nil, err
}

native, _, err := codec.NativeFromTextual(data)
if err != nil {
return nil, err
}

valueBytes, err := codec.BinaryFromNative(nil, native)
if err != nil {
return nil, err
}
schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID()))

recordValue := make([]byte, 0, len(schemaIDBytes)+len(valueBytes)+1)
recordValue = append(recordValue, byte(0))
recordValue = append(recordValue, schemaIDBytes...)
recordValue = append(recordValue, valueBytes...)
return recordValue, nil
default:
return data, nil
}
}

// EventHandler is the handler used to handle the subscribed event.
type EventHandler func(ctx context.Context, msg *NewEvent) error

Expand All @@ -178,6 +363,7 @@ type SubscriptionHandlerConfig struct {
SubscribeConfig pubsub.BulkSubscribeConfig
BulkHandler BulkEventHandler
Handler EventHandler
ValueSchemaType SchemaType
}

// NewEvent is an event arriving from a message bus instance.
Expand Down
Loading

0 comments on commit 419f03f

Please sign in to comment.