-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka.go
183 lines (152 loc) · 4.75 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package main
import (
"crypto/tls"
"crypto/x509"
"encoding/base64"
"github.com/Shopify/sarama"
"io/ioutil"
"khajiit/env"
"log"
"path/filepath"
"strings"
)
func tlsConfig(conf *sarama.Config, certBytes []byte) *sarama.Config {
conf.Net.TLS.Enable = true
conf.Net.TLS.Config = &tls.Config{}
var ok bool
conf.Net.TLS.Config.RootCAs, ok = poolFromPEMBytes(certBytes)
if !ok {
log.Fatalln("no certs read")
}
return conf
}
func gssapiKeytabConfig(conf *sarama.Config, krb5Bytes []byte, username, realm string, keytabBytes []byte) *sarama.Config {
var err error
krb5Path := filepath.Join(processTmpDir, "krb5.conf")
err = ioutil.WriteFile(krb5Path, krb5Bytes, 0600)
if err != nil {
log.Fatalln(err)
}
keytabPath := filepath.Join(processTmpDir, "keytab")
err = ioutil.WriteFile(keytabPath, keytabBytes, 0600)
if err != nil {
log.Fatalln(err)
}
conf.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH
conf.Net.SASL.GSSAPI.KerberosConfigPath = krb5Path
conf.Net.SASL.GSSAPI.ServiceName = "kafka"
conf.Net.SASL.GSSAPI.Username = username
conf.Net.SASL.GSSAPI.Realm = realm
conf.Net.SASL.GSSAPI.KeyTabPath = keytabPath
return conf
}
func gssapiUserConfig(conf *sarama.Config, krb5Bytes []byte, username, realm, password string) *sarama.Config {
var err error
krb5Path := filepath.Join(processTmpDir, "krb5.conf")
err = ioutil.WriteFile(krb5Path, krb5Bytes, 0600)
if err != nil {
log.Fatalln(err)
}
conf.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH
conf.Net.SASL.GSSAPI.KerberosConfigPath = krb5Path
conf.Net.SASL.GSSAPI.ServiceName = "kafka"
conf.Net.SASL.GSSAPI.Username = username
conf.Net.SASL.GSSAPI.Realm = realm
conf.Net.SASL.GSSAPI.Password = password
return conf
}
func poolFromPEMFiles(paths ...string) (pool *x509.CertPool) {
var err error
pool = x509.NewCertPool()
for _, path := range paths {
var pemBytes []byte
pemBytes, err = ioutil.ReadFile(path)
if err != nil {
log.Fatalln(err)
}
if !pool.AppendCertsFromPEM(pemBytes) {
log.Fatalf("no certs added from PEM file %s\n", path)
}
}
return
}
func poolFromPEMBytes(pemBytes []byte) (pool *x509.CertPool, ok bool) {
pool = x509.NewCertPool()
ok = pool.AppendCertsFromPEM(pemBytes)
return
}
func createKafkaClient() (addrs []string, conf *sarama.Config, err error) {
conf = sarama.NewConfig()
conf.ClientID = "core-eventing-ds-adapter"
conf.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
conf.Consumer.Offsets.AutoCommit.Enable = true
conf.Consumer.Offsets.Initial = sarama.OffsetOldest
if IsTruthy(env.GetOrDefault("KAFKA_TLS_ENABLE", "0")) {
log.Println("Kafka TLS enabled")
var certBytes []byte
certBytes, err = base64.StdEncoding.DecodeString(env.GetOrFail("KAFKA_TLS_ROOT_CA"))
if err != nil {
return
}
tlsConfig(conf, certBytes)
}
if IsTruthy(env.GetOrDefault("KAFKA_SASL_ENABLE", "0")) {
log.Printf("Kafka SASL enabled\n")
saslType := env.GetOrDefault("KAFKA_SASL_TYPE", "PLAIN")
conf.Net.SASL.Enable = true
conf.Net.SASL.Mechanism = sarama.SASLTypePlaintext
if saslType == "GSSAPI" {
log.Printf("Kafka SASL mechanism = %s\n", saslType)
conf.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
credType := env.GetOrDefault("KAFKA_SASL_GSSAPI_AUTH", "KEYTAB")
if credType == "KEYTAB" {
log.Printf("Kafka SASL GSSAPI credentials = %s\n", credType)
var krb5Bytes []byte
krb5Bytes, err = base64.StdEncoding.DecodeString(env.GetOrFail("KAFKA_SASL_GSSAPI_KRB5CONF"))
if err != nil {
return
}
username := env.GetOrFail("KAFKA_SASL_GSSAPI_USERNAME")
realm := env.GetOrFail("KAFKA_SASL_GSSAPI_REALM")
var keytabBytes []byte
keytabBytes, err = base64.StdEncoding.DecodeString(env.GetSecretOrFail("KAFKA_SASL_GSSAPI_KEYTAB"))
if err != nil {
return
}
gssapiKeytabConfig(
conf,
krb5Bytes,
username,
realm,
keytabBytes,
)
} else if credType == "PASSWORD" {
log.Printf("Kafka SASL GSSAPI credentials = %s\n", credType)
var krb5Bytes []byte
krb5Bytes, err = base64.StdEncoding.DecodeString(env.GetOrFail("KAFKA_SASL_GSSAPI_KRB5CONF"))
if err != nil {
return
}
username := env.GetOrFail("KAFKA_SASL_GSSAPI_USERNAME")
realm := env.GetOrFail("KAFKA_SASL_GSSAPI_REALM")
password := env.GetSecretOrFail("KAFKA_SASL_GSSAPI_PASSWORD")
gssapiUserConfig(
conf,
krb5Bytes,
username,
realm,
password,
)
} else {
log.Printf("Kafka SASL GSSAPI credentials = %s; unrecognized credential type\n", credType)
}
} else {
log.Printf("Kafka SASL mechanism = %s; unrecognized mechanism\n", saslType)
}
} else {
// TODO: add env vars for Username/Password for PLAINTEXT
log.Printf("Kafka SASL disabled\n")
}
addrs = strings.Split(env.GetOrDefault("KAFKA_SERVERS", "localhost:9092"), ",")
return
}