Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/ockafka: Use gNMI client instead of openconfig. #22

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go:
- 1.x
- master
before_install:
- go get -v github.com/golang/lint/golint
- go get -v golang.org/x/lint/golint
- go get -v -t -d ./...
after_success:
- make coverdata
Expand Down
8 changes: 4 additions & 4 deletions cmd/ockafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ Subscribe to all updates on the Arista device at `10.0.1.2` and stream to a loca
Kafka instance:

```
ockafka -addrs 10.0.1.2
ockafka -addr 10.0.1.2
```

Subscribe to temperature sensors from 2 switches and stream to a remote Kafka instance:
Subscribe to temperature sensors and stream to a remote Kafka instance:

```
ockafka -addrs 10.0.1.2,10.0.1.3 -kafkaaddrs kafka:9092 -subscribe /Sysdb/environment/temperature/status/tempSensor
ockafka -addr 10.0.1.2 -kafkaaddrs kafka:9092 -subscribe /Sysdb/environment/temperature/status/tempSensor
```

Start in a container:
```
docker run aristanetworks/ockafka -addrs 10.0.1.1 -kafkaaddrs kafka:9092
docker run aristanetworks/ockafka -addr 10.0.1.2 -kafkaaddrs kafka:9092
```

## Kafka/Elastic integration demo
Expand Down
77 changes: 49 additions & 28 deletions cmd/ockafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,23 @@
package main

import (
"context"
"flag"
"fmt"
"strings"
"sync"

"github.com/aristanetworks/goarista/gnmi"
"github.com/aristanetworks/goarista/kafka"
"github.com/aristanetworks/goarista/kafka/openconfig"
"github.com/aristanetworks/goarista/kafka/producer"
"github.com/aristanetworks/goarista/openconfig/client"

"github.com/Shopify/sarama"
"github.com/aristanetworks/glog"
"github.com/golang/protobuf/proto"
pb "github.com/openconfig/gnmi/proto/gnmi"
)

var keysFlag = flag.String("kafkakeys", "",
"Keys for kafka messages (comma-separated, default: the value of -addrs")
var keyFlag = flag.String("kafkakey", "",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removes the ability to have multiple keys

"Key for kafka messages (default: the value of -addr")

func newProducer(addresses []string, topic, key, dataset string) (producer.Producer, error) {
encodedKey := sarama.StringEncoder(key)
Expand All @@ -36,33 +36,54 @@ func newProducer(addresses []string, topic, key, dataset string) (producer.Produ
}

func main() {
username, password, subscriptions, grpcAddrs, opts := client.ParseFlags()
cfg := &gnmi.Config{}
flag.StringVar(&cfg.Addr, "addr", "", "Address of gNMI gRPC server with optional VRF name")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removes the ability to connect to multiple servers

flag.StringVar(&cfg.CAFile, "cafile", "", "Path to server TLS certificate file")
flag.StringVar(&cfg.CertFile, "certfile", "", "Path to client TLS certificate file")
flag.StringVar(&cfg.KeyFile, "keyfile", "", "Path to client TLS private key file")
flag.StringVar(&cfg.Password, "password", "", "Password to authenticate with")
flag.StringVar(&cfg.Username, "username", "", "Username to authenticate with")
flag.BoolVar(&cfg.TLS, "tls", false, "Enable TLS")
subscribePaths := flag.String("subscribe", "/", "Comma-separated list of paths to subscribe to")
flag.Parse()

if *keysFlag == "" {
*keysFlag = strings.Join(grpcAddrs, ",")
}
keys := strings.Split(*keysFlag, ",")
if len(grpcAddrs) != len(keys) {
glog.Fatal("Please provide the same number of addresses and Kafka keys")
var key string
if key = *keyFlag; key == "" {
key = cfg.Addr
}
addresses := strings.Split(*kafka.Addresses, ",")
wg := new(sync.WaitGroup)
for i, grpcAddr := range grpcAddrs {
key := keys[i]
p, err := newProducer(addresses, *kafka.Topic, key, grpcAddr)
if err != nil {
p, err := newProducer(addresses, *kafka.Topic, key, cfg.Addr)
if err != nil {
glog.Fatal(err)
} else {
glog.Infof("Initialized Kafka producer for %s", cfg.Addr)
}
p.Start()
defer p.Stop()

subscriptions := strings.Split(*subscribePaths, ",")
ctx := gnmi.NewContext(context.Background(), cfg)
client, err := gnmi.Dial(cfg)
if err != nil {
glog.Fatal(err)
}
respChan := make(chan *pb.SubscribeResponse)
errChan := make(chan error)
defer close(errChan)
subscribeOptions := &gnmi.SubscribeOptions{
Mode: "stream",
StreamMode: "target_defined",
Paths: gnmi.SplitPaths(subscriptions),
}
go gnmi.Subscribe(ctx, client, subscribeOptions, respChan, errChan)
for {
select {
case resp := <-respChan:
func(response *pb.SubscribeResponse) {
p.Write(response)
}(resp)
case err := <-errChan:
glog.Fatal(err)
} else {
glog.Infof("Initialized Kafka producer for %s", grpcAddr)
}
publish := func(addr string, message proto.Message) {
p.Write(message)
}
wg.Add(1)
p.Start()
defer p.Stop()
c := client.New(username, password, grpcAddr, opts)
go c.Subscribe(wg, subscriptions, publish)
}
wg.Wait()
}
10 changes: 4 additions & 6 deletions kafka/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import (

"github.com/Shopify/sarama"
"github.com/aristanetworks/glog"
"github.com/golang/protobuf/proto"
pb "github.com/openconfig/gnmi/proto/gnmi"
)

// MessageEncoder is an encoder interface
// which handles encoding proto.Message to sarama.ProducerMessage
type MessageEncoder interface {
Encode(proto.Message) ([]*sarama.ProducerMessage, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can keep the interface generic the way it was intended, so we don't couple ourselves with the types of messages we're handling.

Encode(*pb.SubscribeResponse) ([]*sarama.ProducerMessage, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

HandleSuccess(*sarama.ProducerMessage)
HandleError(*sarama.ProducerError)
}
Expand Down Expand Up @@ -60,11 +60,9 @@ func NewBaseEncoder(typ string) *BaseEncoder {
return e
}

// Encode encodes the proto message to a sarama.ProducerMessage
func (e *BaseEncoder) Encode(message proto.Message) ([]*sarama.ProducerMessage,
// Encode does nothing, but keep it in order for BaseEncoder to implement MessageEncoder interface
func (e *BaseEncoder) Encode(response *pb.SubscribeResponse) ([]*sarama.ProducerMessage,
error) {
// doesn't do anything, but keep it in order for BaseEncoder
// to implement MessageEncoder interface
return nil, nil
}

Expand Down
15 changes: 5 additions & 10 deletions kafka/openconfig/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import (
"fmt"
"time"

"github.com/aristanetworks/goarista/elasticsearch"
"github.com/aristanetworks/goarista/gnmi"
"github.com/aristanetworks/goarista/kafka"
"github.com/aristanetworks/goarista/openconfig"

"github.com/Shopify/sarama"
"github.com/aristanetworks/glog"
"github.com/golang/protobuf/proto"
pb "github.com/openconfig/reference/rpc/openconfig"
pb "github.com/openconfig/gnmi/proto/gnmi"
)

// UnhandledMessageError is used for proto messages not matching the handled types
Expand Down Expand Up @@ -55,19 +54,15 @@ func NewEncoder(topic string, key sarama.Encoder, dataset string) kafka.MessageE
}
}

func (e *elasticsearchMessageEncoder) Encode(message proto.Message) ([]*sarama.ProducerMessage,
func (e *elasticsearchMessageEncoder) Encode(response *pb.SubscribeResponse) (
[]*sarama.ProducerMessage,
error) {

response, ok := message.(*pb.SubscribeResponse)
if !ok {
return nil, UnhandledMessageError{message: message}
}
update := response.GetUpdate()
if update == nil {
return nil, UnhandledSubscribeResponseError{response: response}
}
updateMap, err := openconfig.NotificationToMap(e.dataset, update,
elasticsearch.EscapeFieldName)
updateMap, err := gnmi.NotificationToMap(update)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if you tested this but gnmi.NotificationToMap() isn't really a drop in replacement for openconfig.NotificationToMap(). Unlike the openconfig version, the gnmi version flattens the update so instead of:

updates: {
  foo: {
    bar: {
...

you have:

updates: {
  foo/bar: ...

I think eventually we'll end up flattening the schema and consolidating it with what we have in our own backend, but in the meanwhile would you be up for keeping the format the same? If you prefer the format you implemented here keeping your own for is also an option.

Copy link
Author

@tamihiro tamihiro Nov 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if you tested this but gnmi.NotificationToMap() isn't really a drop in replacement for openconfig.NotificationToMap().

No it isn't, and neither gnmi.Path is a drop in replacement for openconfig.Path. :-)

I think eventually we'll end up flattening the schema and consolidating it with what we have in our own backend, but in the meanwhile would you be up for keeping the format the same? If

@7AC may I ask why we really need to? I see cmd/ocsplunk uses the flattened path value of gnmi.NotificationToMap() directly for writing events, which I believe is because that's how it's supposed to be. Doesn't the same go for kafka agent?

Copy link
Member

@7AC 7AC Nov 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial use case for ockafka was for Kafka -> Logstash -> Elasticsearch -> Kibana. Having an entire device as a single JSON document makes a really nice representation in Elasticsearch, and that's what you see implemented here (before your changes). It does run into some limitations when it comes to scaling (max number of fields, fields changing type, etc). We have some changes coming to ockafka that work around these limitations which will cause schema changes. As a result I'm a bit reluctant to make an additional schema change here. So I think we can go two routes:

What do you think? By the way sorry for the delayed response it was a long weekend in the US.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@7AC many thanks for the clarification. Now I see how you plan to have it grow, the route 1 sounds natural to me.

hold off merging this and reconcile with the changes on our end once they come in

Hope your revision for the work around will go smoothly. And also hope you enjoyed the holidays!

if err != nil {
return nil, err
}
Expand Down
16 changes: 8 additions & 8 deletions kafka/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@ import (

"github.com/Shopify/sarama"
"github.com/aristanetworks/glog"
"github.com/golang/protobuf/proto"
pb "github.com/openconfig/gnmi/proto/gnmi"
)

// Producer forwards messages recvd on a channel to kafka.
type Producer interface {
Start()
Write(proto.Message)
Write(*pb.SubscribeResponse)
Stop()
}

type producer struct {
notifsChan chan proto.Message
notifsChan chan *pb.SubscribeResponse
Copy link
Member

@7AC 7AC Oct 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is failing tests, please run make check

kafkaProducer sarama.AsyncProducer
encoder kafka.MessageEncoder
done chan struct{}
Expand Down Expand Up @@ -53,7 +53,7 @@ func New(encoder kafka.MessageEncoder,
}

p := &producer{
notifsChan: make(chan proto.Message),
notifsChan: make(chan *pb.SubscribeResponse),
kafkaProducer: kafkaProducer,
encoder: encoder,
done: make(chan struct{}),
Expand Down Expand Up @@ -91,9 +91,9 @@ func (p *producer) run() {
}
}

func (p *producer) Write(msg proto.Message) {
func (p *producer) Write(response *pb.SubscribeResponse) {
select {
case p.notifsChan <- msg:
case p.notifsChan <- response:
case <-p.done:
// TODO: This should probably return an EOF error, but that
// would change the API
Expand All @@ -106,8 +106,8 @@ func (p *producer) Stop() {
p.kafkaProducer.Close()
}

func (p *producer) produceNotifications(protoMessage proto.Message) error {
messages, err := p.encoder.Encode(protoMessage)
func (p *producer) produceNotifications(response *pb.SubscribeResponse) error {
messages, err := p.encoder.Encode(response)
if err != nil {
return err
}
Expand Down
55 changes: 27 additions & 28 deletions kafka/producer/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
"strings"
"sync"
"testing"
"time"

"github.com/aristanetworks/goarista/kafka/openconfig"
"github.com/aristanetworks/goarista/test"

"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
pb "github.com/openconfig/reference/rpc/openconfig"
pb "github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/gnmi/value"
)

type mockAsyncProducer struct {
Expand Down Expand Up @@ -53,16 +54,22 @@ func (p *mockAsyncProducer) Errors() <-chan *sarama.ProducerError {
return p.errors
}

func newPath(path string) *pb.Path {
if path == "" {
return nil
func newNotification(path []string, timestamp *time.Time) *pb.Notification {
sv, _ := value.FromScalar(timestamp.String())
return &pb.Notification{
Timestamp: timestamp.UnixNano() / 1e6,
Update: []*pb.Update{
{
Path: &pb.Path{Element: path},
Val: sv,
},
},
}
return &pb.Path{Element: strings.Split(path, "/")}
}

func TestKafkaProducer(t *testing.T) {
mock := newMockAsyncProducer()
toDB := make(chan proto.Message)
toDB := make(chan *pb.SubscribeResponse)
topic := "occlient"
systemID := "Foobar"
toDBProducer := &producer{
Expand All @@ -75,23 +82,17 @@ func TestKafkaProducer(t *testing.T) {

toDBProducer.Start()

path := []string{"foo", "bar"}
timestamp := time.Now()
response := &pb.SubscribeResponse{
Response: &pb.SubscribeResponse_Update{
Update: &pb.Notification{
Timestamp: 0,
Prefix: newPath("/foo/bar"),
Update: []*pb.Update{},
},
Update: newNotification(path, &timestamp),
},
}
document := map[string]interface{}{
"timestamp": int64(0),
"update": map[string]interface{}{
"": map[string]interface{}{
"foo": map[string]interface{}{
"bar": map[string]interface{}{},
},
},
"timestamp": timestamp.UnixNano() / 1e6,
"updates": map[string]interface{}{
"/" + strings.Join(path, "/"): timestamp.String(),
},
}

Expand All @@ -118,11 +119,11 @@ func TestKafkaProducer(t *testing.T) {
if err != nil {
t.Errorf("Error decoding into JSON: %s", err)
}
if !test.DeepEqual(document["update"], result.(map[string]interface{})["update"]) {
t.Errorf("Protobuf sent from Kafka Producer does not match original.\nOriginal: %v\nNew:%v",
if !test.DeepEqual(document["updates"], result.(map[string]interface{})["updates"]) {
t.Errorf(
"Protobuf sent from Kafka Producer does not match original.\nOriginal: %#v\nNew:%#v",
document, result)
}

toDBProducer.Stop()
}

Expand All @@ -136,7 +137,7 @@ func TestProducerStartStop(t *testing.T) {
// this test checks that Start() followed by Stop() doesn't cause any race conditions.

mock := newMockAsyncProducer()
toDB := make(chan proto.Message)
toDB := make(chan *pb.SubscribeResponse)
topic := "occlient"
systemID := "Foobar"
p := &producer{
Expand All @@ -146,13 +147,11 @@ func TestProducerStartStop(t *testing.T) {
done: make(chan struct{}),
}

path := []string{"foo", "bar"}
timestamp := time.Now()
msg := &pb.SubscribeResponse{
Response: &pb.SubscribeResponse_Update{
Update: &pb.Notification{
Timestamp: 0,
Prefix: newPath("/foo/bar"),
Update: []*pb.Update{},
},
Update: newNotification(path, &timestamp),
},
}

Expand Down