forked from apache/kafka-site
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsecurity.html
2122 lines (1965 loc) · 151 KB
/
security.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<script id="security-template" type="text/x-handlebars-template">
<h3 class="anchor-heading"><a id="security_overview" class="anchor-link"></a><a href="#security_overview">7.1 Security Overview</a></h3>
In release 0.9.0.0, the Kafka community added a number of features that, used either separately or together, increases security in a Kafka cluster. The following security measures are currently supported:
<ol>
<li>Authentication of connections to brokers from clients (producers and consumers), other brokers and tools, using either SSL or SASL. Kafka supports the following SASL mechanisms:
<ul>
<li>SASL/GSSAPI (Kerberos) - starting at version 0.9.0.0</li>
<li>SASL/PLAIN - starting at version 0.10.0.0</li>
<li>SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 - starting at version 0.10.2.0</li>
<li>SASL/OAUTHBEARER - starting at version 2.0</li>
</ul></li>
<li>Authentication of connections from brokers to ZooKeeper</li>
<li>Encryption of data transferred between brokers and clients, between brokers, or between brokers and tools using SSL (Note that there is a performance degradation when SSL is enabled, the magnitude of which depends on the CPU type and the JVM implementation.)</li>
<li>Authorization of read / write operations by clients</li>
<li>Authorization is pluggable and integration with external authorization services is supported</li>
</ol>
It's worth noting that security is optional - non-secured clusters are supported, as well as a mix of authenticated, unauthenticated, encrypted and non-encrypted clients.
The guides below explain how to configure and use the security features in both clients and brokers.
<h3 class="anchor-heading"><a id="security_ssl" class="anchor-link"></a><a href="#security_ssl">7.2 Encryption and Authentication using SSL</a></h3>
Apache Kafka allows clients to use SSL for encryption of traffic as well as authentication. By default, SSL is disabled but can be turned on if needed.
The following paragraphs explain in detail how to set up your own PKI infrastructure, use it to create certificates and configure Kafka to use these.
<ol>
<li><h4 class="anchor-heading"><a id="security_ssl_key" class="anchor-link"></a><a href="#security_ssl_key">Generate SSL key and certificate for each Kafka broker</a></h4>
The first step of deploying one or more brokers with SSL support is to generate a public/private keypair for every server.
Since Kafka expects all keys and certificates to be stored in keystores we will use Java's keytool command for this task.
The tool supports two different keystore formats, the Java specific jks format which has been deprecated by now, as well as PKCS12.
PKCS12 is the default format as of Java version 9, to ensure this format is being used regardless of the Java version in use all following
commands explicitly specify the PKCS12 format.
<pre class="line-numbers"><code class="language-bash">> keytool -keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg RSA -storetype pkcs12</code></pre>
You need to specify two parameters in the above command:
<ol>
<li>keystorefile: the keystore file that stores the keys (and later the certificate) for this broker. The keystore file contains the private
and public keys of this broker, therefore it needs to be kept safe. Ideally this step is run on the Kafka broker that the key will be
used on, as this key should never be transmitted/leave the server that it is intended for.</li>
<li>validity: the valid time of the key in days. Please note that this differs from the validity period for the certificate, which
will be determined in <a href ="#security_ssl_signing">Signing the certificate</a>. You can use the same key to request multiple
certificates: if your key has a validity of 10 years, but your CA will only sign certificates that are valid for one year, you
can use the same key with 10 certificates over time.</li>
</ol><br>
To obtain a certificate that can be used with the private key that was just created a certificate signing request needs to be created. This
signing request, when signed by a trusted CA results in the actual certificate which can then be installed in the keystore and used for
authentication purposes.<br>
To generate certificate signing requests run the following command for all server keystores created so far.
<pre class="line-numbers"><code class="language-bash">> keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}</code></pre>
This command assumes that you want to add hostname information to the certificate, if this is not the case, you can omit the extension parameter <code>-ext SAN=DNS:{FQDN},IP:{IPADDRESS1}</code>. Please see below for more information on this.
<h5>Host Name Verification</h5>
Host name verification, when enabled, is the process of checking attributes from the certificate that is presented by the server you are
connecting to against the actual hostname or ip address of that server to ensure that you are indeed connecting to the correct server.<br>
The main reason for this check is to prevent man-in-the-middle attacks.
For Kafka, this check has been disabled by default for a long time, but as of Kafka 2.0.0 host name verification of servers is enabled by default
for client connections as well as inter-broker connections.<br>
Server host name verification may be disabled by setting <code>ssl.endpoint.identification.algorithm</code> to an empty string.<br>
For dynamically configured broker listeners, hostname verification may be disabled using <code>kafka-configs.sh</code>:<br>
<pre class="line-numbers"><code class="language-bash">> bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="</code></pre>
<p><b>Note:</b></p>
Normally there is no good reason to disable hostname verification apart from being the quickest way to "just get it to work" followed
by the promise to "fix it later when there is more time"!<br>
Getting hostname verification right is not that hard when done at the right time, but gets much harder once the cluster is up and
running - do yourself a favor and do it now!
<p>If host name verification is enabled, clients will verify the server's fully qualified domain name (FQDN) or ip address against one of the following two fields:
<ol>
<li>Common Name (CN)</li>
<li><a href="https://tools.ietf.org/html/rfc5280#section-4.2.1.6">Subject Alternative Name (SAN)</a></li>
</ol><br>
While Kafka checks both fields, usage of the common name field for hostname verification has been
<a href="https://tools.ietf.org/html/rfc2818#section-3.1">deprecated</a> since 2000 and should be avoided if possible. In addition the
SAN field is much more flexible, allowing for multiple DNS and IP entries to be declared in a certificate.<br>
Another advantage is that if the SAN field is used for hostname verification the common name can be set to a more meaningful value for
authorization purposes. Since we need the SAN field to be contained in the signed certificate, it will be specified when generating the
signing request. It can also be specified when generating the keypair, but this will not automatically be copied into the signing request.<br>
To add a SAN field append the following argument <code> -ext SAN=DNS:{FQDN},IP:{IPADDRESS}</code> to the keytool command:
<pre class="line-numbers"><code class="language-bash">> keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}</code></pre>
</li>
<li><h4 class="anchor-heading"><a id="security_ssl_ca" class="anchor-link"></a><a href="#security_ssl_ca">Creating your own CA</a></h4>
After this step each machine in the cluster has a public/private key pair which can already be used to encrypt traffic and a certificate
signing request, which is the basis for creating a certificate. To add authentication capabilities this signing request needs to be signed
by a trusted authority, which will be created in this step.
<p>A certificate authority (CA) is responsible for signing certificates. CAs works likes a government that issues passports - the government
stamps (signs) each passport so that the passport becomes difficult to forge. Other governments verify the stamps to ensure the passport is
authentic. Similarly, the CA signs the certificates, and the cryptography guarantees that a signed certificate is computationally difficult to
forge. Thus, as long as the CA is a genuine and trusted authority, the clients have a strong assurance that they are connecting to the authentic
machines.
<p>For this guide we will be our own Certificate Authority. When setting up a production cluster in a corporate environment these certificates would
usually be signed by a corporate CA that is trusted throughout the company. Please see <a href="#security_ssl_production">Common Pitfalls in
Production</a> for some things to consider for this case.
<p>Due to a <a href="https://www.openssl.org/docs/man1.1.1/man1/x509.html#BUGS">bug</a> in OpenSSL, the x509 module will not copy requested
extension fields from CSRs into the final certificate. Since we want the SAN extension to be present in our certificate to enable hostname
verification, we'll use the <i>ca</i> module instead. This requires some additional configuration to be in place before we generate our
CA keypair.<br>
Save the following listing into a file called openssl-ca.cnf and adjust the values for validity and common attributes as necessary.
<pre class="line-numbers"><code class="language-bash">HOME = .
RANDFILE = $ENV::HOME/.rnd
####################################################################
[ ca ]
default_ca = CA_default # The default ca section
[ CA_default ]
base_dir = .
certificate = $base_dir/cacert.pem # The CA certifcate
private_key = $base_dir/cakey.pem # The CA private key
new_certs_dir = $base_dir # Location for new certs after signing
database = $base_dir/index.txt # Database index file
serial = $base_dir/serial.txt # The current serial number
default_days = 1000 # How long to certify for
default_crl_days = 30 # How long before next CRL
default_md = sha256 # Use public key default MD
preserve = no # Keep passed DN ordering
x509_extensions = ca_extensions # The extensions to add to the cert
email_in_dn = no # Don't concat the email in the DN
copy_extensions = copy # Required to copy SANs from CSR to cert
####################################################################
[ req ]
default_bits = 4096
default_keyfile = cakey.pem
distinguished_name = ca_distinguished_name
x509_extensions = ca_extensions
string_mask = utf8only
####################################################################
[ ca_distinguished_name ]
countryName = Country Name (2 letter code)
countryName_default = DE
stateOrProvinceName = State or Province Name (full name)
stateOrProvinceName_default = Test Province
localityName = Locality Name (eg, city)
localityName_default = Test Town
organizationName = Organization Name (eg, company)
organizationName_default = Test Company
organizationalUnitName = Organizational Unit (eg, division)
organizationalUnitName_default = Test Unit
commonName = Common Name (e.g. server FQDN or YOUR name)
commonName_default = Test Name
emailAddress = Email Address
emailAddress_default = [email protected]
####################################################################
[ ca_extensions ]
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid:always, issuer
basicConstraints = critical, CA:true
keyUsage = keyCertSign, cRLSign
####################################################################
[ signing_policy ]
countryName = optional
stateOrProvinceName = optional
localityName = optional
organizationName = optional
organizationalUnitName = optional
commonName = supplied
emailAddress = optional
####################################################################
[ signing_req ]
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid,issuer
basicConstraints = CA:FALSE
keyUsage = digitalSignature, keyEncipherment</code></pre>
Then create a database and serial number file, these will be used to keep track of which certificates were signed with this CA. Both of
these are simply text files that reside in the same directory as your CA keys.
<pre class="line-numbers"><code class="language-bash">> echo 01 > serial.txt
> touch index.txt</code></pre>
With these steps done you are now ready to generate your CA that will be used to sign certificates later.
<pre class="line-numbers"><code class="language-bash">> openssl req -x509 -config openssl-ca.cnf -newkey rsa:4096 -sha256 -nodes -out cacert.pem -outform PEM</code></pre>
The CA is simply a public/private key pair and certificate that is signed by itself, and is only intended to sign other certificates.<br>
This keypair should be kept very safe, if someone gains access to it, they can create and sign certificates that will be trusted by your
infrastructure, which means they will be able to impersonate anybody when connecting to any service that trusts this CA.<br>
The next step is to add the generated CA to the **clients' truststore** so that the clients can trust this CA:
<pre class="line-numbers"><code class="language-bash">> keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert</code></pre>
<b>Note:</b>
If you configure the Kafka brokers to require client authentication by setting ssl.client.auth to be "requested" or "required" in the
<a href="#brokerconfigs">Kafka brokers config</a> then you must provide a truststore for the Kafka brokers as well and it should have
all the CA certificates that clients' keys were signed by.
<pre class="line-numbers"><code class="language-bash">> keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert</code></pre>
In contrast to the keystore in step 1 that stores each machine's own identity, the truststore of a client stores all the certificates
that the client should trust. Importing a certificate into one's truststore also means trusting all certificates that are signed by that
certificate. As the analogy above, trusting the government (CA) also means trusting all passports (certificates) that it has issued. This
attribute is called the chain of trust, and it is particularly useful when deploying SSL on a large Kafka cluster. You can sign all certificates
in the cluster with a single CA, and have all machines share the same truststore that trusts the CA. That way all machines can authenticate all
other machines.
</li>
<li><h4 class="anchor-heading"><a id="security_ssl_signing" class="anchor-link"></a><a href="#security_ssl_signing">Signing the certificate</a></h4>
Then sign it with the CA:
<pre class="line-numbers"><code class="language-bash">> openssl ca -config openssl-ca.cnf -policy signing_policy -extensions signing_req -out {server certificate} -infiles {certificate signing request}</code></pre>
Finally, you need to import both the certificate of the CA and the signed certificate into the keystore:
<pre class="line-numbers"><code class="language-bash">> keytool -keystore {keystore} -alias CARoot -import -file {CA certificate}
> keytool -keystore {keystore} -alias localhost -import -file cert-signed</code></pre>
The definitions of the parameters are the following:
<ol>
<li>keystore: the location of the keystore</li>
<li>CA certificate: the certificate of the CA</li>
<li>certificate signing request: the csr created with the server key</li>
<li>server certificate: the file to write the signed certificate of the server to</li>
</ol>
This will leave you with one truststore called <i>truststore.jks</i> - this can be the same for all clients and brokers and does not
contain any sensitive information, so there is no need to secure this.<br>
Additionally you will have one <i>server.keystore.jks</i> file per node which contains that nodes keys, certificate and your CAs certificate,
please refer to <a href="#security_configbroker">Configuring Kafka Brokers</a> and <a href="#security_configclients">Configuring Kafka Clients</a>
for information on how to use these files.
<p>For some tooling assistance on this topic, please check out the <a href="https://github.com/OpenVPN/easy-rsa">easyRSA</a> project which has
extensive scripting in place to help with these steps.</p>
<h5>SSL key and certificates in PEM format</h5>
From 2.7.0 onwards, SSL key and trust stores can be configured for Kafka brokers and clients directly in the configuration in PEM format.
This avoids the need to store separate files on the file system and benefits from password protection features of Kafka configuration.
PEM may also be used as the store type for file-based key and trust stores in addition to JKS and PKCS12. To configure PEM key store directly in the
broker or client configuration, private key in PEM format should be provided in <code>ssl.keystore.key</code> and the certificate chain in PEM format
should be provided in <code>ssl.keystore.certificate.chain</code>. To configure trust store, trust certificates, e.g. public certificate of CA,
should be provided in <code>ssl.truststore.certificates</code>. Since PEM is typically stored as multi-line base-64 strings, the configuration value
can be included in Kafka configuration as multi-line strings with lines terminating in backslash ('\') for line continuation.
<p>Store password configs <code>ssl.keystore.password</code> and <code>ssl.truststore.password</code> are not used for PEM.
If private key is encrypted using a password, the key password must be provided in <code>ssl.key.password</code>. Private keys may be provided
in unencrypted form without a password when PEM is specified directly in the config value. In production deployments, configs should be encrypted or
externalized using password protection feature in Kafka in this case. Note that the default SSL engine factory has limited capabilities for decryption
of encrypted private keys when external tools like OpenSSL are used for encryption. Third party libraries like BouncyCastle may be integrated witn a
custom <code>SslEngineFactory</code> to support a wider range of encrypted private keys.</p>
</li>
<li><h4 class="anchor-heading"><a id="security_ssl_production" class="anchor-link"></a><a href="#security_ssl_production">Common Pitfalls in Production</a></h4>
The above paragraphs show the process to create your own CA and use it to sign certificates for your cluster.
While very useful for sandbox, dev, test, and similar systems, this is usually not the correct process to create certificates for a production
cluster in a corporate environment.
Enterprises will normally operate their own CA and users can send in CSRs to be signed with this CA, which has the benefit of users not being
responsible to keep the CA secure as well as a central authority that everybody can trust.
However it also takes away a lot of control over the process of signing certificates from the user. Quite often the persons operating corporate
CAs will apply tight restrictions on certificates that can cause issues when trying to use these certificates with Kafka.
<ol>
<li><b><a href="https://tools.ietf.org/html/rfc5280#section-4.2.1.12">Extended Key Usage</a></b><br>Certificates may contain an extension
field that controls the purpose for which the certificate can be used. If this field is empty, there are no restricitions on the usage,
but if any usage is specified in here, valid SSL implementations have to enforce these usages.<br>
Relevant usages for Kafka are:
<ul>
<li>Client authentication</li>
<li>Server authentication</li>
</ul>
Kafka brokers need both these usages to be allowed, as for intra-cluster communication every broker will behave as both the client and
the server towards other brokers. It is not uncommon for corporate CAs to have a signing profile for webservers and use this for Kafka as
well, which will only contain the <i>serverAuth</i> usage value and cause the SSL handshake to fail.
</li>
<li><b>Intermediate Certificates</b><br>
Corporate Root CAs are often kept offline for security reasons. To enable day-to-day usage, so called intermediate CAs are created, which
are then used to sign the final certificates. When importing a certificate into the keystore that was signed by an intermediate CA it is
necessarry to provide the entire chain of trust up to the root CA. This can be done by simply <i>cat</i>ing the certificate files into one
combined certificate file and then importing this with keytool.
</li>
<li><b>Failure to copy extension fields</b><br>
CA operators are often hesitant to copy and requested extension fields from CSRs and prefer to specify these themselves as this makes it
harder for a malicious party to obtain certificates with potentially misleading or fraudulent values.
It is adviseable to double check signed certificates, whether these contain all requested SAN fields to enable proper hostname verification.
The following command can be used to print certificate details to the console, which should be compared with what was originally requested:
<pre class="line-numbers"><code class="language-bash">> openssl x509 -in certificate.crt -text -noout</code></pre>
</li>
</ol>
</li>
<li><h4 class="anchor-heading"><a id="security_configbroker" class="anchor-link"></a><a href="#security_configbroker">Configuring Kafka Brokers</a></h4>
Kafka Brokers support listening for connections on multiple ports.
We need to configure the following property in server.properties, which must have one or more comma-separated values:
<pre><code class="language-text">listeners</code></pre>
If SSL is not enabled for inter-broker communication (see below for how to enable it), both PLAINTEXT and SSL ports will be necessary.
<pre class="line-numbers"><code class="language-text">listeners=PLAINTEXT://host.name:port,SSL://host.name:port</code></pre>
Following SSL configs are needed on the broker side
<pre class="line-numbers"><code class="language-text">ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=test1234</code></pre>
Note: ssl.truststore.password is technically optional but highly recommended. If a password is not set access to the truststore is still available, but integrity checking is disabled.
Optional settings that are worth considering:
<ol>
<li>ssl.client.auth=none ("required" => client authentication is required, "requested" => client authentication is requested and client without certs can still connect. The usage of "requested" is discouraged as it provides a false sense of security and misconfigured clients will still connect successfully.)</li>
<li>ssl.cipher.suites (Optional). A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. (Default is an empty list)</li>
<li>ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 (list out the SSL protocols that you are going to accept from clients. Do note that SSL is deprecated in favor of TLS and using SSL in production is not recommended)</li>
<li>ssl.keystore.type=JKS</li>
<li>ssl.truststore.type=JKS</li>
<li>ssl.secure.random.implementation=SHA1PRNG</li>
</ol>
If you want to enable SSL for inter-broker communication, add the following to the server.properties file (it defaults to PLAINTEXT)
<pre class="line-numbers"><code class="language-text">security.inter.broker.protocol=SSL</code></pre>
<p>
Due to import regulations in some countries, the Oracle implementation limits the strength of cryptographic algorithms available by default. If stronger algorithms are needed (for example, AES with 256-bit keys), the <a href="http://www.oracle.com/technetwork/java/javase/downloads/index.html">JCE Unlimited Strength Jurisdiction Policy Files</a> must be obtained and installed in the JDK/JRE. See the
<a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/SunProviders.html">JCA Providers Documentation</a> for more information.
</p>
<p>
The JRE/JDK will have a default pseudo-random number generator (PRNG) that is used for cryptography operations, so it is not required to configure the
implementation used with the <code>ssl.secure.random.implementation</code>. However, there are performance issues with some implementations (notably, the
default chosen on Linux systems, <code>NativePRNG</code>, utilizes a global lock). In cases where performance of SSL connections becomes an issue,
consider explicitly setting the implementation to be used. The <code>SHA1PRNG</code> implementation is non-blocking, and has shown very good performance
characteristics under heavy load (50 MB/sec of produced messages, plus replication traffic, per-broker).
</p>
Once you start the broker you should be able to see in the server.log
<pre class="line-numbers"><code class="language-text">with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)</code></pre>
To check quickly if the server keystore and truststore are setup properly you can run the following command
<pre class="line-numbers"><code class="language-bash">> openssl s_client -debug -connect localhost:9093 -tls1</code></pre> (Note: TLSv1 should be listed under ssl.enabled.protocols)<br>
In the output of this command you should see server's certificate:
<pre class="line-numbers"><code class="language-text">-----BEGIN CERTIFICATE-----
{variable sized random bytes}
-----END CERTIFICATE-----
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/[email protected]</code></pre>
If the certificate does not show up or if there are any other error messages then your keystore is not setup properly.</li>
<li><h4 class="anchor-heading"><a id="security_configclients" class="anchor-link"></a><a href="#security_configclients">Configuring Kafka Clients</a></h4>
SSL is supported only for the new Kafka Producer and Consumer, the older API is not supported. The configs for SSL will be the same for both producer and consumer.<br>
If client authentication is not required in the broker, then the following is a minimal configuration example:
<pre class="line-numbers"><code class="language-text">security.protocol=SSL
ssl.truststore.location=/var/private/ssl/client.truststore.jks
ssl.truststore.password=test1234</code></pre>
Note: ssl.truststore.password is technically optional but highly recommended. If a password is not set access to the truststore is still available, but integrity checking is disabled.
If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured:
<pre class="line-numbers"><code class="language-text">ssl.keystore.location=/var/private/ssl/client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234</code></pre>
Other configuration settings that may also be needed depending on our requirements and the broker configuration:
<ol>
<li>ssl.provider (Optional). The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.</li>
<li>ssl.cipher.suites (Optional). A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.</li>
<li>ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1. It should list at least one of the protocols configured on the broker side</li>
<li>ssl.truststore.type=JKS</li>
<li>ssl.keystore.type=JKS</li>
</ol>
<br>
Examples using console-producer and console-consumer:
<pre class="line-numbers"><code class="language-bash">> kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config client-ssl.properties
> kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties</code></pre>
</li>
</ol>
<h3 class="anchor-heading"><a id="security_sasl" class="anchor-link"></a><a href="#security_sasl">7.3 Authentication using SASL</a></h3>
<ol>
<li><h4 class="anchor-heading"><a id="security_sasl_jaasconfig" class="anchor-link"></a><a href="#security_sasl_jaasconfig">JAAS configuration</a></h4>
<p>Kafka uses the Java Authentication and Authorization Service
(<a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/jaas/JAASRefGuide.html">JAAS</a>)
for SASL configuration.</p>
<ol>
<li><h5><a id="security_jaas_broker"
href="#security_jaas_broker">JAAS configuration for Kafka brokers</a></h5>
<p><tt>KafkaServer</tt> is the section name in the JAAS file used by each
KafkaServer/Broker. This section provides SASL configuration options
for the broker including any SASL client connections made by the broker
for inter-broker communication. If multiple listeners are configured to use
SASL, the section name may be prefixed with the listener name in lower-case
followed by a period, e.g. <tt>sasl_ssl.KafkaServer</tt>.</p>
<p><tt>Client</tt> section is used to authenticate a SASL connection with
zookeeper. It also allows the brokers to set SASL ACL on zookeeper
nodes which locks these nodes down so that only the brokers can
modify it. It is necessary to have the same principal name across all
brokers. If you want to use a section name other than Client, set the
system property <tt>zookeeper.sasl.clientconfig</tt> to the appropriate
name (<i>e.g.</i>, <tt>-Dzookeeper.sasl.clientconfig=ZkClient</tt>).</p>
<p>ZooKeeper uses "zookeeper" as the service name by default. If you
want to change this, set the system property
<tt>zookeeper.sasl.client.username</tt> to the appropriate name
(<i>e.g.</i>, <tt>-Dzookeeper.sasl.client.username=zk</tt>).</p>
<p>Brokers may also configure JAAS using the broker configuration property <code>sasl.jaas.config</code>.
The property name must be prefixed with the listener prefix including the SASL mechanism,
i.e. <code>listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config</code>. Only one
login module may be specified in the config value. If multiple mechanisms are configured on a
listener, configs must be provided for each mechanism using the listener and mechanism prefix.
For example,</p>
<pre class="line-numbers"><code class="language-text">listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin-secret";
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="admin-secret" \
user_admin="admin-secret" \
user_alice="alice-secret";</code></pre>
If JAAS configuration is defined at different levels, the order of precedence used is:
<ul>
<li>Broker configuration property <code>listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config</code></li>
<li><code>{listenerName}.KafkaServer</code> section of static JAAS configuration</li>
<li><code>KafkaServer</code> section of static JAAS configuration</li>
</ul>
Note that ZooKeeper JAAS config may only be configured using static JAAS configuration.
<p>See <a href="#security_sasl_kerberos_brokerconfig">GSSAPI (Kerberos)</a>,
<a href="#security_sasl_plain_brokerconfig">PLAIN</a>,
<a href="#security_sasl_scram_brokerconfig">SCRAM</a> or
<a href="#security_sasl_oauthbearer_brokerconfig">OAUTHBEARER</a> for example broker configurations.</p></li>
<li><h5><a id="security_jaas_client"
href="#security_jaas_client">JAAS configuration for Kafka clients</a></h5>
<p>Clients may configure JAAS using the client configuration property
<a href="#security_client_dynamicjaas">sasl.jaas.config</a>
or using the <a href="#security_client_staticjaas">static JAAS config file</a>
similar to brokers.</p>
<ol>
<li><h6><a id="security_client_dynamicjaas"
href="#security_client_dynamicjaas">JAAS configuration using client configuration property</a></h6>
<p>Clients may specify JAAS configuration as a producer or consumer property without
creating a physical configuration file. This mode also enables different producers
and consumers within the same JVM to use different credentials by specifying
different properties for each client. If both static JAAS configuration system property
<code>java.security.auth.login.config</code> and client property <code>sasl.jaas.config</code>
are specified, the client property will be used.</p>
<p>See <a href="#security_sasl_kerberos_clientconfig">GSSAPI (Kerberos)</a>,
<a href="#security_sasl_plain_clientconfig">PLAIN</a>,
<a href="#security_sasl_scram_clientconfig">SCRAM</a> or
<a href="#security_sasl_oauthbearer_clientconfig">OAUTHBEARER</a> for example configurations.</p></li>
<li><h6 class="anchor-heading"><a id="security_client_staticjaas" class="anchor-link"></a><a href="#security_client_staticjaas">JAAS configuration using static config file</a></h6>
To configure SASL authentication on the clients using static JAAS config file:
<ol>
<li>Add a JAAS config file with a client login section named <tt>KafkaClient</tt>. Configure
a login module in <tt>KafkaClient</tt> for the selected mechanism as described in the examples
for setting up <a href="#security_sasl_kerberos_clientconfig">GSSAPI (Kerberos)</a>,
<a href="#security_sasl_plain_clientconfig">PLAIN</a>,
<a href="#security_sasl_scram_clientconfig">SCRAM</a> or
<a href="#security_sasl_oauthbearer_clientconfig">OAUTHBEARER</a>.
For example, <a href="#security_sasl_gssapi_clientconfig">GSSAPI</a>
credentials may be configured as:
<pre class="line-numbers"><code class="language-text">KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_client.keytab"
principal="[email protected]";
};</code></pre>
</li>
<li>Pass the JAAS config file location as JVM parameter to each client JVM. For example:
<pre class="line-numbers"><code class="language-bash">-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf</code></pre></li>
</ol>
</li>
</ol>
</li>
</ol>
</li>
<li><h4><a id="security_sasl_config"
href="#security_sasl_config">SASL configuration</a></h4>
<p>SASL may be used with PLAINTEXT or SSL as the transport layer using the
security protocol SASL_PLAINTEXT or SASL_SSL respectively. If SASL_SSL is
used, then <a href="#security_ssl">SSL must also be configured</a>.</p>
<ol>
<li><h5><a id="security_sasl_mechanism"
href="#security_sasl_mechanism">SASL mechanisms</a></h5>
Kafka supports the following SASL mechanisms:
<ul>
<li><a href="#security_sasl_kerberos">GSSAPI</a> (Kerberos)</li>
<li><a href="#security_sasl_plain">PLAIN</a></li>
<li><a href="#security_sasl_scram">SCRAM-SHA-256</a></li>
<li><a href="#security_sasl_scram">SCRAM-SHA-512</a></li>
<li><a href="#security_sasl_oauthbearer">OAUTHBEARER</a></li>
</ul>
</li>
<li><h5><a id="security_sasl_brokerconfig"
href="#security_sasl_brokerconfig">SASL configuration for Kafka brokers</a></h5>
<ol>
<li>Configure a SASL port in server.properties, by adding at least one of
SASL_PLAINTEXT or SASL_SSL to the <i>listeners</i> parameter, which
contains one or more comma-separated values:
<pre><code class="language-text">listeners=SASL_PLAINTEXT://host.name:port</code></pre>
If you are only configuring a SASL port (or if you want
the Kafka brokers to authenticate each other using SASL) then make sure
you set the same SASL protocol for inter-broker communication:
<pre><code class="language-text">security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)</code></pre></li>
<li>Select one or more <a href="#security_sasl_mechanism">supported mechanisms</a>
to enable in the broker and follow the steps to configure SASL for the mechanism.
To enable multiple mechanisms in the broker, follow the steps
<a href="#security_sasl_multimechanism">here</a>.</li>
</ol>
</li>
<li><h5><a id="security_sasl_clientconfig"
href="#security_sasl_clientconfig">SASL configuration for Kafka clients</a></h5>
<p>SASL authentication is only supported for the new Java Kafka producer and
consumer, the older API is not supported.</p>
<p>To configure SASL authentication on the clients, select a SASL
<a href="#security_sasl_mechanism">mechanism</a> that is enabled in
the broker for client authentication and follow the steps to configure SASL
for the selected mechanism.</p></li>
</ol>
</li>
<li><h4><a id="security_sasl_kerberos" href="#security_sasl_kerberos">Authentication using SASL/Kerberos</a></h4>
<ol>
<li><h5 class="anchor-heading"><a id="security_sasl_kerberos_prereq" class="anchor-link"></a><a href="#security_sasl_kerberos_prereq">Prerequisites</a></h5>
<ol>
<li><b>Kerberos</b><br>
If your organization is already using a Kerberos server (for example, by using Active Directory), there is no need to install a new server just for Kafka. Otherwise you will need to install one, your Linux vendor likely has packages for Kerberos and a short guide on how to install and configure it (<a href="https://help.ubuntu.com/community/Kerberos">Ubuntu</a>, <a href="https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Managing_Smart_Cards/installing-kerberos.html">Redhat</a>). Note that if you are using Oracle Java, you will need to download JCE policy files for your Java version and copy them to $JAVA_HOME/jre/lib/security.</li>
<li><b>Create Kerberos Principals</b><br>
If you are using the organization's Kerberos or Active Directory server, ask your Kerberos administrator for a principal for each Kafka broker in your cluster and for every operating system user that will access Kafka with Kerberos authentication (via clients and tools).<br>
If you have installed your own Kerberos, you will need to create these principals yourself using the following commands:
<pre class="line-numbers"><code class="language-bash">> sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'
> sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"</code></pre></li>
<li><b>Make sure all hosts can be reachable using hostnames</b> - it is a Kerberos requirement that all your hosts can be resolved with their FQDNs.</li>
</ol>
<li><h5 class="anchor-heading"><a id="security_sasl_kerberos_brokerconfig" class="anchor-link"></a><a href="#security_sasl_kerberos_brokerconfig">Configuring Kafka Brokers</a></h5>
<ol>
<li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example (note that each broker should have its own keytab):
<pre class="line-numbers"><code class="language-text">KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_server.keytab"
principal="kafka/[email protected]";
};
// Zookeeper client authentication
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_server.keytab"
principal="kafka/[email protected]";
};</code></pre>
<tt>KafkaServer</tt> section in the JAAS file tells the broker which principal to use and the location of the keytab where this principal is stored. It
allows the broker to login using the keytab specified in this section. See <a href="#security_jaas_broker">notes</a> for more details on Zookeeper SASL configuration.
</li>
<li>Pass the JAAS and optionally the krb5 file locations as JVM parameters to each Kafka broker (see <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html">here</a> for more details):
<pre class="line-numbers"><code class="language-bash">-Djava.security.krb5.conf=/etc/kafka/krb5.conf
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</code></pre>
</li>
<li>Make sure the keytabs configured in the JAAS file are readable by the operating system user who is starting kafka broker.</li>
<li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>. For example:
<pre class="line-numbers"><code class="language-text">listeners=SASL_PLAINTEXT://host.name:port
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI</code></pre>
We must also configure the service name in server.properties, which should match the principal name of the kafka brokers. In the above example, principal is "kafka/[email protected]", so:
<pre class="line-numbers"><code class="language-text">sasl.kerberos.service.name=kafka</code></pre>
</li>
</ol></li>
<li><h5 class="anchor-heading"><a id="security_sasl_kerberos_clientconfig" class="anchor-link"></a><a href="#security_sasl_kerberos_clientconfig">Configuring Kafka Clients</a></h5>
To configure SASL authentication on the clients:
<ol>
<li>
Clients (producers, consumers, connect workers, etc) will authenticate to the cluster with their
own principal (usually with the same name as the user running the client), so obtain or create
these principals as needed. Then configure the JAAS configuration property for each client.
Different clients within a JVM may run as different users by specifying different principals.
The property <code>sasl.jaas.config</code> in producer.properties or consumer.properties describes
how clients like producer and consumer can connect to the Kafka Broker. The following is an example
configuration for a client using a keytab (recommended for long-running processes):
<pre class="line-numbers"><code class="language-text">sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/etc/security/keytabs/kafka_client.keytab" \
principal="[email protected]";</code></pre>
For command-line utilities like kafka-console-consumer or kafka-console-producer, kinit can be used
along with "useTicketCache=true" as in:
<pre class="line-numbers"><code class="language-text">sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useTicketCache=true;</code></pre>
JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers
as described <a href="#security_client_staticjaas">here</a>. Clients use the login section named
<tt>KafkaClient</tt>. This option allows only one user for all client connections from a JVM.</li>
<li>Make sure the keytabs configured in the JAAS configuration are readable by the operating system user who is starting kafka client.</li>
<li>Optionally pass the krb5 file locations as JVM parameters to each client JVM (see <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html">here</a> for more details):
<pre class="line-numbers"><code class="language-bash">-Djava.security.krb5.conf=/etc/kafka/krb5.conf</code></pre></li>
<li>Configure the following properties in producer.properties or consumer.properties:
<pre class="line-numbers"><code class="language-text">security.protocol=SASL_PLAINTEXT (or SASL_SSL)
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka</code></pre></li>
</ol>
</li>
</ol>
</li>
<li><h4><a id="security_sasl_plain" href="#security_sasl_plain">Authentication using SASL/PLAIN</a></h4>
<p>SASL/PLAIN is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication.
Kafka supports a default implementation for SASL/PLAIN which can be extended for production use as described <a href="#security_sasl_plain_production">here</a>.</p>
The username is used as the authenticated <code>Principal</code> for configuration of ACLs etc.
<ol>
<li><h5 class="anchor-heading"><a id="security_sasl_plain_brokerconfig" class="anchor-link"></a><a href="#security_sasl_plain_brokerconfig">Configuring Kafka Brokers</a></h5>
<ol>
<li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
<pre class="line-numbers"><code class="language-text">KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};</code></pre>
This configuration defines two users (<i>admin</i> and <i>alice</i>). The properties <tt>username</tt> and <tt>password</tt>
in the <tt>KafkaServer</tt> section are used by the broker to initiate connections to other brokers. In this example,
<i>admin</i> is the user for inter-broker communication. The set of properties <tt>user_<i>userName</i></tt> defines
the passwords for all users that connect to the broker and the broker validates all client connections including
those from other brokers using these properties.</li>
<li>Pass the JAAS config file location as JVM parameter to each Kafka broker:
<pre class="line-numbers"><code class="language-bash">-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</code></pre></li>
<li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>. For example:
<pre class="line-numbers"><code class="language-text">listeners=SASL_SSL://host.name:port
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN</code></pre></li>
</ol>
</li>
<li><h5 class="anchor-heading"><a id="security_sasl_plain_clientconfig" class="anchor-link"></a><a href="#security_sasl_plain_clientconfig">Configuring Kafka Clients</a></h5>
To configure SASL authentication on the clients:
<ol>
<li>Configure the JAAS configuration property for each client in producer.properties or consumer.properties.
The login module describes how the clients like producer and consumer can connect to the Kafka Broker.
The following is an example configuration for a client for the PLAIN mechanism:
<pre class="line-numbers"><code class="language-text">sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="alice" \
password="alice-secret";</code></pre>
<p>The options <tt>username</tt> and <tt>password</tt> are used by clients to configure
the user for client connections. In this example, clients connect to the broker as user <i>alice</i>.
Different clients within a JVM may connect as different users by specifying different user names
and passwords in <code>sasl.jaas.config</code>.</p>
<p>JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers
as described <a href="#security_client_staticjaas">here</a>. Clients use the login section named
<tt>KafkaClient</tt>. This option allows only one user for all client connections from a JVM.</p></li>
<li>Configure the following properties in producer.properties or consumer.properties:
<pre class="line-numbers"><code class="language-text">security.protocol=SASL_SSL
sasl.mechanism=PLAIN</code></pre></li>
</ol>
</li>
<li><h5><a id="security_sasl_plain_production" href="#security_sasl_plain_production">Use of SASL/PLAIN in production</a></h5>
<ul>
<li>SASL/PLAIN should be used only with SSL as transport layer to ensure that clear passwords are not transmitted on the wire without encryption.</li>
<li>The default implementation of SASL/PLAIN in Kafka specifies usernames and passwords in the JAAS configuration file as shown
<a href="#security_sasl_plain_brokerconfig">here</a>. From Kafka version 2.0 onwards, you can avoid storing clear passwords on disk
by configuring your own callback handlers that obtain username and password from an external source using the configuration options
<code>sasl.server.callback.handler.class</code> and <code>sasl.client.callback.handler.class</code>.</li>
<li>In production systems, external authentication servers may implement password authentication. From Kafka version 2.0 onwards,
you can plug in your own callback handlers that use external authentication servers for password verification by configuring
<code>sasl.server.callback.handler.class</code>.</li>
</ul>
</li>
</ol>
</li>
<li><h4><a id="security_sasl_scram" href="#security_sasl_scram">Authentication using SASL/SCRAM</a></h4>
<p>Salted Challenge Response Authentication Mechanism (SCRAM) is a family of SASL mechanisms that
addresses the security concerns with traditional mechanisms that perform username/password authentication
like PLAIN and DIGEST-MD5. The mechanism is defined in <a href="https://tools.ietf.org/html/rfc5802">RFC 5802</a>.
Kafka supports <a href="https://tools.ietf.org/html/rfc7677">SCRAM-SHA-256</a> and SCRAM-SHA-512 which
can be used with TLS to perform secure authentication. The username is used as the authenticated
<code>Principal</code> for configuration of ACLs etc. The default SCRAM implementation in Kafka
stores SCRAM credentials in Zookeeper and is suitable for use in Kafka installations where Zookeeper
is on a private network. Refer to <a href="#security_sasl_scram_security">Security Considerations</a>
for more details.</p>
<ol>
<li><h5 class="anchor-heading"><a id="security_sasl_scram_credentials" class="anchor-link"></a><a href="#security_sasl_scram_credentials">Creating SCRAM Credentials</a></h5>
<p>The SCRAM implementation in Kafka uses Zookeeper as credential store. Credentials can be created in
Zookeeper using <tt>kafka-configs.sh</tt>. For each SCRAM mechanism enabled, credentials must be created
by adding a config with the mechanism name. Credentials for inter-broker communication must be created
before Kafka brokers are started. Client credentials may be created and updated dynamically and updated
credentials will be used to authenticate new connections.</p>
<p>Create SCRAM credentials for user <i>alice</i> with password <i>alice-secret</i>:
<pre class="line-numbers"><code class="language-bash">> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice</code></pre>
<p>The default iteration count of 4096 is used if iterations are not specified. A random salt is created
and the SCRAM identity consisting of salt, iterations, StoredKey and ServerKey are stored in Zookeeper.
See <a href="https://tools.ietf.org/html/rfc5802">RFC 5802</a> for details on SCRAM identity and the individual fields.
<p>The following examples also require a user <i>admin</i> for inter-broker communication which can be created using:
<pre class="line-numbers"><code class="language-bash">> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin</code></pre>
<p>Existing credentials may be listed using the <i>--describe</i> option:
<pre class="line-numbers"><code class="language-bash">> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --describe --entity-type users --entity-name alice</code></pre>
<p>Credentials may be deleted for one or more SCRAM mechanisms using the <i>--alter --delete-config</i> option:
<pre class="line-numbers"><code class="language-bash">> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice</code></pre>
</li>
<li><h5 class="anchor-heading"><a id="security_sasl_scram_brokerconfig" class="anchor-link"></a><a href="#security_sasl_scram_brokerconfig">Configuring Kafka Brokers</a></h5>
<ol>
<li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
<pre class="line-numbers"><code class="language-text">KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};</code></pre>
The properties <tt>username</tt> and <tt>password</tt> in the <tt>KafkaServer</tt> section are used by
the broker to initiate connections to other brokers. In this example, <i>admin</i> is the user for
inter-broker communication.</li>
<li>Pass the JAAS config file location as JVM parameter to each Kafka broker:
<pre class="line-numbers"><code class="language-bash">-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</code></pre></li>
<li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>. For example:
<pre class="line-numbers"><code class="language-text">listeners=SASL_SSL://host.name:port
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 (or SCRAM-SHA-512)
sasl.enabled.mechanisms=SCRAM-SHA-256 (or SCRAM-SHA-512)</code></pre></li>
</ol>
</li>
<li><h5 class="anchor-heading"><a id="security_sasl_scram_clientconfig" class="anchor-link"></a><a href="#security_sasl_scram_clientconfig">Configuring Kafka Clients</a></h5>
To configure SASL authentication on the clients:
<ol>
<li>Configure the JAAS configuration property for each client in producer.properties or consumer.properties.
The login module describes how the clients like producer and consumer can connect to the Kafka Broker.
The following is an example configuration for a client for the SCRAM mechanisms:
<pre class="line-numbers"><code class="language-text">sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="alice" \
password="alice-secret";</code></pre>
<p>The options <tt>username</tt> and <tt>password</tt> are used by clients to configure
the user for client connections. In this example, clients connect to the broker as user <i>alice</i>.
Different clients within a JVM may connect as different users by specifying different user names
and passwords in <code>sasl.jaas.config</code>.</p>
<p>JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers
as described <a href="#security_client_staticjaas">here</a>. Clients use the login section named
<tt>KafkaClient</tt>. This option allows only one user for all client connections from a JVM.</p></li>
<li>Configure the following properties in producer.properties or consumer.properties:
<pre class="line-numbers"><code class="language-text">security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-256 (or SCRAM-SHA-512)</code></pre></li>
</ol>
</li>
<li><h5><a id="security_sasl_scram_security" href="#security_sasl_scram_security">Security Considerations for SASL/SCRAM</a></h5>
<ul>
<li>The default implementation of SASL/SCRAM in Kafka stores SCRAM credentials in Zookeeper. This
is suitable for production use in installations where Zookeeper is secure and on a private network.</li>
<li>Kafka supports only the strong hash functions SHA-256 and SHA-512 with a minimum iteration count
of 4096. Strong hash functions combined with strong passwords and high iteration counts protect
against brute force attacks if Zookeeper security is compromised.</li>
<li>SCRAM should be used only with TLS-encryption to prevent interception of SCRAM exchanges. This
protects against dictionary or brute force attacks and against impersonation if Zookeeper is compromised.</li>
<li>From Kafka version 2.0 onwards, the default SASL/SCRAM credential store may be overridden using custom callback handlers
by configuring <code>sasl.server.callback.handler.class</code> in installations where Zookeeper is not secure.</li>
<li>For more details on security considerations, refer to
<a href="https://tools.ietf.org/html/rfc5802#section-9">RFC 5802</a>.
</ul>
</li>
</ol>
</li>
<li><h4><a id="security_sasl_oauthbearer" href="#security_sasl_oauthbearer">Authentication using SASL/OAUTHBEARER</a></h4>
<p>The <a href="https://tools.ietf.org/html/rfc6749">OAuth 2 Authorization Framework</a> "enables a third-party application to obtain limited access to an HTTP service,
either on behalf of a resource owner by orchestrating an approval interaction between the resource owner and the HTTP
service, or by allowing the third-party application to obtain access on its own behalf." The SASL OAUTHBEARER mechanism
enables the use of the framework in a SASL (i.e. a non-HTTP) context; it is defined in <a href="https://tools.ietf.org/html/rfc7628">RFC 7628</a>.
The default OAUTHBEARER implementation in Kafka creates and validates <a href="https://tools.ietf.org/html/rfc7515#appendix-A.5">Unsecured JSON Web Tokens</a>
and is only suitable for use in non-production Kafka installations. Refer to <a href="#security_sasl_oauthbearer_security">Security Considerations</a>
for more details.</p>
<ol>
<li><h5 class="anchor-heading"><a id="security_sasl_oauthbearer_brokerconfig" class="anchor-link"></a><a href="#security_sasl_oauthbearer_brokerconfig">Configuring Kafka Brokers</a></h5>
<ol>
<li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
<pre class="line-numbers"><code class="language-text">KafkaServer {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
unsecuredLoginStringClaim_sub="admin";
};</code></pre>
The property <tt>unsecuredLoginStringClaim_sub</tt> in the <tt>KafkaServer</tt> section is used by
the broker when it initiates connections to other brokers. In this example, <i>admin</i> will appear in the
subject (<tt>sub</tt>) claim and will be the user for inter-broker communication.</li>
<li>Pass the JAAS config file location as JVM parameter to each Kafka broker:
<pre class="line-numbers"><code class="language-bash">-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</code></pre></li>
<li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>. For example:
<pre class="line-numbers"><code class="language-text">listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
sasl.enabled.mechanisms=OAUTHBEARER</code></pre></li>
</ol>
</li>
<li><h5 class="anchor-heading"><a id="security_sasl_oauthbearer_clientconfig" class="anchor-link"></a><a href="#security_sasl_oauthbearer_clientconfig">Configuring Kafka Clients</a></h5>
To configure SASL authentication on the clients:
<ol>
<li>Configure the JAAS configuration property for each client in producer.properties or consumer.properties.
The login module describes how the clients like producer and consumer can connect to the Kafka Broker.
The following is an example configuration for a client for the OAUTHBEARER mechanisms:
<pre class="line-numbers"><code class="language-text">sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
unsecuredLoginStringClaim_sub="alice";</code></pre>
<p>The option <tt>unsecuredLoginStringClaim_sub</tt> is used by clients to configure
the subject (<tt>sub</tt>) claim, which determines the user for client connections.
In this example, clients connect to the broker as user <i>alice</i>.
Different clients within a JVM may connect as different users by specifying different subject (<tt>sub</tt>)
claims in <code>sasl.jaas.config</code>.</p>
<p>JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers
as described <a href="#security_client_staticjaas">here</a>. Clients use the login section named
<tt>KafkaClient</tt>. This option allows only one user for all client connections from a JVM.</p></li>
<li>Configure the following properties in producer.properties or consumer.properties:
<pre class="line-numbers"><code class="language-text">security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
sasl.mechanism=OAUTHBEARER</code></pre></li>
<li>The default implementation of SASL/OAUTHBEARER depends on the jackson-databind library.
Since it's an optional dependency, users have to configure it as a dependency via their build tool.</li>
</ol>
</li>
<li><h5><a id="security_sasl_oauthbearer_unsecured_retrieval" href="#security_sasl_oauthbearer_unsecured_retrieval">Unsecured Token Creation Options for SASL/OAUTHBEARER</a></h5>
<ul>
<li>The default implementation of SASL/OAUTHBEARER in Kafka creates and validates <a href="https://tools.ietf.org/html/rfc7515#appendix-A.5">Unsecured JSON Web Tokens</a>.
While suitable only for non-production use, it does provide the flexibility to create arbitrary tokens in a DEV or TEST environment.</li>
<li>Here are the various supported JAAS module options on the client side (and on the broker side if OAUTHBEARER is the inter-broker protocol):
<table>
<tr>
<th>JAAS Module Option for Unsecured Token Creation</th>
<th>Documentation</th>
</tr>
<tr>
<td><tt>unsecuredLoginStringClaim_<claimname>="value"</tt></td>
<td>Creates a <tt>String</tt> claim with the given name and value. Any valid
claim name can be specified except '<tt>iat</tt>' and '<tt>exp</tt>' (these are
automatically generated).</td>
</tr>
<tr>
<td><tt>unsecuredLoginNumberClaim_<claimname>="value"</tt></td>
<td>Creates a <tt>Number</tt> claim with the given name and value. Any valid
claim name can be specified except '<tt>iat</tt>' and '<tt>exp</tt>' (these are
automatically generated).</td>
</tr>
<tr>
<td><tt>unsecuredLoginListClaim_<claimname>="value"</tt></td>
<td>Creates a <tt>String List</tt> claim with the given name and values parsed
from the given value where the first character is taken as the delimiter. For
example: <tt>unsecuredLoginListClaim_fubar="|value1|value2"</tt>. Any valid
claim name can be specified except '<tt>iat</tt>' and '<tt>exp</tt>' (these are
automatically generated).</td>
</tr>
<tr>
<td><tt>unsecuredLoginExtension_<extensionname>="value"</tt></td>
<td>Creates a <tt>String</tt> extension with the given name and value.
For example: <tt>unsecuredLoginExtension_traceId="123"</tt>. A valid extension name
is any sequence of lowercase or uppercase alphabet characters. In addition, the "auth" extension name is reserved.
A valid extension value is any combination of characters with ASCII codes 1-127.
</tr>
<tr>
<td><tt>unsecuredLoginPrincipalClaimName</tt></td>
<td>Set to a custom claim name if you wish the name of the <tt>String</tt>
claim holding the principal name to be something other than '<tt>sub</tt>'.</td>
</tr>
<tr>
<td><tt>unsecuredLoginLifetimeSeconds</tt></td>
<td>Set to an integer value if the token expiration is to be set to something
other than the default value of 3600 seconds (which is 1 hour). The
'<tt>exp</tt>' claim will be set to reflect the expiration time.</td>
</tr>
<tr>
<td><tt>unsecuredLoginScopeClaimName</tt></td>
<td>Set to a custom claim name if you wish the name of the <tt>String</tt> or
<tt>String List</tt> claim holding any token scope to be something other than
'<tt>scope</tt>'.</td>
</tr>
</table>
</li>
</ul>
</li>
<li><h5><a id="security_sasl_oauthbearer_unsecured_validation" href="#security_sasl_oauthbearer_unsecured_validation">Unsecured Token Validation Options for SASL/OAUTHBEARER</a></h5>
<ul>
<li>Here are the various supported JAAS module options on the broker side for <a href="https://tools.ietf.org/html/rfc7515#appendix-A.5">Unsecured JSON Web Token</a> validation:
<table>
<tr>
<th>JAAS Module Option for Unsecured Token Validation</th>
<th>Documentation</th>
</tr>
<tr>
<td><tt>unsecuredValidatorPrincipalClaimName="value"</tt></td>
<td>Set to a non-empty value if you wish a particular <tt>String</tt> claim
holding a principal name to be checked for existence; the default is to check
for the existence of the '<tt>sub</tt>' claim.</td>
</tr>
<tr>
<td><tt>unsecuredValidatorScopeClaimName="value"</tt></td>
<td>Set to a custom claim name if you wish the name of the <tt>String</tt> or
<tt>String List</tt> claim holding any token scope to be something other than
'<tt>scope</tt>'.</td>
</tr>
<tr>
<td><tt>unsecuredValidatorRequiredScope="value"</tt></td>
<td>Set to a space-delimited list of scope values if you wish the
<tt>String/String List</tt> claim holding the token scope to be checked to
make sure it contains certain values.</td>
</tr>
<tr>
<td><tt>unsecuredValidatorAllowableClockSkewMs="value"</tt></td>
<td>Set to a positive integer value if you wish to allow up to some number of
positive milliseconds of clock skew (the default is 0).</td>
</tr>
</table>
</li>
<li>The default unsecured SASL/OAUTHBEARER implementation may be overridden (and must be overridden in production environments)
using custom login and SASL Server callback handlers.</li>
<li>For more details on security considerations, refer to <a href="https://tools.ietf.org/html/rfc6749#section-10">RFC 6749, Section 10</a>.</li>
</ul>
</li>
<li><h5><a id="security_sasl_oauthbearer_refresh" href="#security_sasl_oauthbearer_refresh">Token Refresh for SASL/OAUTHBEARER</a></h5>
Kafka periodically refreshes any token before it expires so that the client can continue to make
connections to brokers. The parameters that impact how the refresh algorithm
operates are specified as part of the producer/consumer/broker configuration
and are as follows. See the documentation for these properties elsewhere for
details. The default values are usually reasonable, in which case these
configuration parameters would not need to be explicitly set.
<table>
<tr>
<th>Producer/Consumer/Broker Configuration Property</th>
</tr>
<tr>
<td><tt>sasl.login.refresh.window.factor</tt></td>
</tr>
<tr>
<td><tt>sasl.login.refresh.window.jitter</tt></td>
</tr>
<tr>
<td><tt>sasl.login.refresh.min.period.seconds</tt></td>
</tr>
<tr>
<td><tt>sasl.login.refresh.min.buffer.seconds</tt></td>
</tr>
</table>
</li>
<li><h5><a id="security_sasl_oauthbearer_prod" href="#security_sasl_oauthbearer_prod">Secure/Production Use of SASL/OAUTHBEARER</a></h5>
Production use cases will require writing an implementation of
<tt>org.apache.kafka.common.security.auth.AuthenticateCallbackHandler</tt> that can handle an instance of
<tt>org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback</tt> and declaring it via either the
<tt>sasl.login.callback.handler.class</tt> configuration option for a
non-broker client or via the
<tt>listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class</tt>
configuration option for brokers (when SASL/OAUTHBEARER is the inter-broker
protocol).
<p>
Production use cases will also require writing an implementation of
<tt>org.apache.kafka.common.security.auth.AuthenticateCallbackHandler</tt> that can handle an instance of
<tt>org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback</tt> and declaring it via the
<tt>listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class</tt>
broker configuration option.
</li>
<li><h5><a id="security_sasl_oauthbearer_security" href="#security_sasl_oauthbearer_security">Security Considerations for SASL/OAUTHBEARER</a></h5>
<ul>
<li>The default implementation of SASL/OAUTHBEARER in Kafka creates and validates <a href="https://tools.ietf.org/html/rfc7515#appendix-A.5">Unsecured JSON Web Tokens</a>.
This is suitable only for non-production use.</li>
<li>OAUTHBEARER should be used in production enviromnments only with TLS-encryption to prevent interception of tokens.</li>
<li>The default unsecured SASL/OAUTHBEARER implementation may be overridden (and must be overridden in production environments)
using custom login and SASL Server callback handlers as described above.</li>
<li>For more details on OAuth 2 security considerations in general, refer to <a href="https://tools.ietf.org/html/rfc6749#section-10">RFC 6749, Section 10</a>.</li>
</ul>