From ef596217e929ada9b81aae677bcb8e0793905458 Mon Sep 17 00:00:00 2001 From: Ravi Suhag Date: Thu, 31 Oct 2024 15:27:16 +0530 Subject: [PATCH] feat(kafka): add authentication and authorization support --- plugins/extractors/kafka/kafka.go | 112 +++++++++--------- plugins/extractors/kafka/kafka_test.go | 47 ++++---- .../kafka/kubernetes_token_provider.go | 67 +++++++++++ 3 files changed, 150 insertions(+), 76 deletions(-) create mode 100644 plugins/extractors/kafka/kubernetes_token_provider.go diff --git a/plugins/extractors/kafka/kafka.go b/plugins/extractors/kafka/kafka.go index 8c87faa0..cd0f5ff4 100644 --- a/plugins/extractors/kafka/kafka.go +++ b/plugins/extractors/kafka/kafka.go @@ -58,6 +58,11 @@ type AuthConfig struct { // certificate authority file for TLS client authentication CAFile string `mapstructure:"ca_file"` } `mapstructure:"tls"` + + SASL struct { + Enabled bool `mapstructure:"enabled"` + Mechanism string `mapstructure:"mechanism"` + } } var sampleConfig = `broker: "localhost:9092"` @@ -74,7 +79,7 @@ var info = plugins.Info{ type Extractor struct { plugins.BaseExtractor // internal states - conn *kafka.Conn + conn sarama.Consumer logger log.Logger config Config clientDurn metric.Int64Histogram @@ -104,69 +109,91 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error { return err } - // create default dialer - dialer := &kafka.Dialer{ - Timeout: 10 * time.Second, - DualStack: true, - } + consumerConfig := sarama.NewConfig() if e.config.Auth.TLS.Enabled { tlsConfig, err := e.createTLSConfig() if err != nil { return fmt.Errorf("create tls config: %w", err) } - - dialer.TLS = tlsConfig + consumerConfig.Net.TLS.Enable = true + consumerConfig.Net.TLS.Config = tlsConfig + + if e.config.Auth.SASL.Enabled { + consumerConfig.Net.SASL.Enable = true + if e.config.Auth.SASL.Mechanism == sarama.SASLTypeOAuth { + consumerConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth + consumerConfig.Net.SASL.TokenProvider = NewKubernetesTokenProvider() + } } - // create connection - e.conn, err = dialer.DialContext(ctx, "tcp", e.config.Broker) + consumer, err := sarama.NewConsumer([]string{e.config.Broker}, consumerConfig) if err != nil { - return fmt.Errorf("create connection: %w", err) + fmt.Printf("Error is here !! %s", err.Error()) + return fmt.Errorf("failed to create kafka consumer for brokers %s and config %+v. Error %s", e.config.Broker, + consumerConfig, err.Error()) } - + e.conn = consumer return nil } // Extract checks if the extractor is ready to extract // if so, then extracts metadata from the kafka broker -func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error { +func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) { defer e.conn.Close() - partitions, err := e.readPartitions(ctx) - if err != nil { - return fmt.Errorf("fetch partitions: %w", err) - } + defer func(start time.Time) { + attributes := []attribute.KeyValue{ + attribute.String("kafka.broker", e.config.Broker), + attribute.Bool("success", err == nil), + } + if err != nil { + errorCode := "UNKNOWN" + var kErr kafka.Error + if errors.As(err, &kErr) { + errorCode = strings.ReplaceAll( + strings.ToUpper(kErr.Title()), " ", "_", + ) + } + attributes = append(attributes, attribute.String("kafka.error_code", errorCode)) + } - // collect topic list from partition list - topics := map[string]int{} - for _, p := range partitions { - topics[p.Topic]++ + e.clientDurn.Record( + ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attributes...), + ) + }(time.Now()) + topics, err := e.conn.Topics() + if err != nil { + return fmt.Errorf("fetch topics: %w", err) } // build and push topics - for topic, numOfPartitions := range topics { + for _, topic := range topics { // skip if topic is a default topic _, isDefaultTopic := defaultTopics[topic] if isDefaultTopic { continue } - asset, err := e.buildAsset(topic, numOfPartitions) + partitions, err := e.conn.Partitions(topic) + if err != nil { + e.logger.Error("failed to fetch partitions for topic", "err", err, "topic", topic) + continue + } + asset, err := e.buildAsset(topic, len(partitions)) if err != nil { e.logger.Error("failed to build asset", "err", err, "topic", topic) continue } emit(models.NewRecord(asset)) } - return nil } func (e *Extractor) createTLSConfig() (*tls.Config, error) { authConfig := e.config.Auth.TLS - if authConfig.CertFile == "" || authConfig.KeyFile == "" || authConfig.CAFile == "" { + if authConfig.CAFile == "" { //nolint:gosec return &tls.Config{ InsecureSkipVerify: e.config.Auth.TLS.InsecureSkipVerify, @@ -178,9 +205,13 @@ func (e *Extractor) createTLSConfig() (*tls.Config, error) { return nil, fmt.Errorf("create cert: %w", err) } - caCert, err := os.ReadFile(authConfig.CAFile) - if err != nil { - return nil, fmt.Errorf("read ca cert file: %w", err) + var cert tls.Certificate + var err error + if authConfig.CertFile != "" && authConfig.KeyFile != "" { + cert, err = tls.LoadX509KeyPair(authConfig.CertFile, authConfig.KeyFile) + if err != nil { + return nil, fmt.Errorf("create cert: %w", err) + } } caCertPool := x509.NewCertPool() @@ -215,31 +246,6 @@ func (e *Extractor) buildAsset(topicName string, numOfPartitions int) (*v1beta2. }, nil } -func (e *Extractor) readPartitions(ctx context.Context) (partitions []kafka.Partition, err error) { - defer func(start time.Time) { - attributes := []attribute.KeyValue{ - attribute.String("kafka.broker", e.config.Broker), - attribute.Bool("success", err == nil), - } - if err != nil { - errorCode := "UNKNOWN" - var kErr kafka.Error - if errors.As(err, &kErr) { - errorCode = strings.ReplaceAll( - strings.ToUpper(kErr.Title()), " ", "_", - ) - } - attributes = append(attributes, attribute.String("kafka.error_code", errorCode)) - } - - e.clientDurn.Record( - ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attributes...), - ) - }(time.Now()) - - return e.conn.ReadPartitions() -} - func init() { if err := registry.Extractors.Register("kafka", func() plugins.Extractor { return New(plugins.GetLog()) diff --git a/plugins/extractors/kafka/kafka_test.go b/plugins/extractors/kafka/kafka_test.go index a978d128..f0f437bf 100644 --- a/plugins/extractors/kafka/kafka_test.go +++ b/plugins/extractors/kafka/kafka_test.go @@ -6,11 +6,13 @@ package kafka_test import ( "context" "errors" + "fmt" "log" - "net" "os" - "strconv" "testing" + "time" + + kafkaLib "github.com/IBM/sarama" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -27,13 +29,12 @@ import ( ) var ( - brokerHost = "localhost:9093" + brokerHost = "0.0.0.0:9093" urnScope = "test-kafka" ) func TestMain(m *testing.M) { - var conn *kafkaLib.Conn - var broker kafkaLib.Broker + var broker *kafkaLib.Broker // setup test opts := dockertest.RunOptions{ @@ -49,25 +50,23 @@ func TestMain(m *testing.M) { }, }, } + retryFn := func(resource *dockertest.Resource) (err error) { - // create client - conn, err = kafkaLib.Dial("tcp", brokerHost) + time.Sleep(30 * time.Second) + conn, err := kafkaLib.NewClient([]string{brokerHost}, nil) if err != nil { return } // healthcheck - brokerList, err := conn.Brokers() - if err != nil { - return - } - if len(brokerList) == 0 { + if len(conn.Brokers()) == 0 { err = errors.New("not ready") return } broker, err = conn.Controller() if err != nil { + fmt.Printf("error fetching controller %s", err.Error()) conn.Close() return } @@ -163,7 +162,7 @@ func TestInit(t *testing.T) { }, }) - assert.ErrorContains(t, err, "create connection") + assert.ErrorContains(t, err, "failed to create kafka consumer") }) } @@ -226,24 +225,26 @@ func TestExtract(t *testing.T) { }) } -func setup(broker kafkaLib.Broker) (err error) { - // create broker connection to create topics - var conn *kafkaLib.Conn - conn, err = kafkaLib.Dial("tcp", net.JoinHostPort(broker.Host, strconv.Itoa(broker.Port))) +func setup(broker *kafkaLib.Broker) (err error) { + // create client connection to create topics + conn, err := kafkaLib.NewClient([]string{brokerHost}, nil) if err != nil { + fmt.Printf("error creating client ") return } defer conn.Close() // create topics - topicConfigs := []kafkaLib.TopicConfig{ - {Topic: "meteor-test-topic-1", NumPartitions: 1, ReplicationFactor: 1}, - {Topic: "meteor-test-topic-2", NumPartitions: 1, ReplicationFactor: 1}, - {Topic: "meteor-test-topic-3", NumPartitions: 1, ReplicationFactor: 1}, - {Topic: "__consumer_offsets", NumPartitions: 1, ReplicationFactor: 1}, + topicConfigs := map[string]*kafkaLib.TopicDetail{ + "meteor-test-topic-1": {NumPartitions: 1, ReplicationFactor: 1}, + "meteor-test-topic-2": {NumPartitions: 1, ReplicationFactor: 1}, + "meteor-test-topic-3": {NumPartitions: 1, ReplicationFactor: 1}, + "__consumer_offsets": {NumPartitions: 1, ReplicationFactor: 1}, } - err = conn.CreateTopics(topicConfigs...) + createTopicRequest := &kafkaLib.CreateTopicsRequest{TopicDetails: topicConfigs} + _, err = broker.CreateTopics(createTopicRequest) if err != nil { + fmt.Printf("error creating topics! %s", err.Error()) return } diff --git a/plugins/extractors/kafka/kubernetes_token_provider.go b/plugins/extractors/kafka/kubernetes_token_provider.go new file mode 100644 index 00000000..8ffb0293 --- /dev/null +++ b/plugins/extractors/kafka/kubernetes_token_provider.go @@ -0,0 +1,67 @@ +package kafka + +import ( + "fmt" + "os" + "strings" + + "github.com/IBM/sarama" + "github.com/rs/zerolog/log" +) + +const ( + kubernetesServiceAccountTokenPath = "/var/run/secrets/kafka/serviceaccount/token" +) + +// NewKubernetesTokenProvider creates a new TokenProvider that reads the token from kubernetes pod service account +// token file. By default, the token file path for kafka is stored in `/var/run/secrets/kafka/serviceaccount/token`. +// User need to make sure there a valid projected service account token on that path. +func NewKubernetesTokenProvider(opts ...TokenProviderOption) *KubernetesTokenProvider { + options := &TokenProviderOptions{ + FilePath: kubernetesServiceAccountTokenPath, + } + for _, o := range opts { + o(options) + } + log.Info().Str("token_file_path", options.FilePath).Msg("token provider options") + return &KubernetesTokenProvider{ + serviceAccountFilePath: options.FilePath, + } +} + +type KubernetesTokenProvider struct { + serviceAccountFilePath string +} + +// Token returns the token from the service account token file. +func (tp *KubernetesTokenProvider) Token() (*sarama.AccessToken, error) { + token, err := tp.readFile() + if err != nil { + log.Error().Err(err).Msg("failed to read token from service account token file") + return nil, err + } + return &sarama.AccessToken{ + Token: token, + }, nil +} +func (tp *KubernetesTokenProvider) readFile() (string, error) { + token, err := os.ReadFile(tp.serviceAccountFilePath) + if err != nil { + return "", fmt.Errorf("failed to read files: %w", err) + } + tkn := strings.TrimSpace(string(token)) + return tkn, nil +} + +type TokenProviderOptions struct { + // FilePath is the path to the file containing the token. + FilePath string +} +type TokenProviderOption func(*TokenProviderOptions) + +// WithTokenFilePath sets the file path to the token. +func WithTokenFilePath(path string) TokenProviderOption { + return func(o *TokenProviderOptions) { + o.FilePath = path + } +}