Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for AWS MSK IAM #513

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
[![Language grade: Java](https://img.shields.io/lgtm/grade/java/g/obsidiandynamics/kafdrop.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/obsidiandynamics/kafdrop/context:java)


<em>Kafdrop is a web UI for viewing Kafka topics and browsing consumer groups.</em> The tool displays information such as brokers, topics, partitions, consumers, and lets you view messages.
<em>Kafdrop is a web UI for viewing Kafka topics and browsing consumer groups.</em> The tool displays information such as brokers, topics, partitions, consumers, and lets you view messages.

![Overview Screenshot](docs/images/overview.png?raw=true)

Expand Down Expand Up @@ -65,14 +65,14 @@ Finally, a default message and key format (e.g. to deserialize Avro messages or
--message.format=AVRO
--message.keyFormat=DEFAULT
```
Valid format values are `DEFAULT`, `AVRO`, `PROTOBUF`. This can also be configured at the topic level via dropdown when viewing messages.
Valid format values are `DEFAULT`, `AVRO`, `PROTOBUF`. This can also be configured at the topic level via dropdown when viewing messages.
If key format is unspecified, message format will be used for key too.

## Configure Protobuf message type
### Option 1: Using Protobuf Descriptor
In case of protobuf message type, the definition of a message could be compiled and transmitted using a descriptor file.
Thus, in order for kafdrop to recognize the message, the application will need to access to the descriptor file(s).
Kafdrop will allow user to select descriptor and well as specifying name of one of the message type provided by the descriptor at runtime.
### Option 1: Using Protobuf Descriptor
In case of protobuf message type, the definition of a message could be compiled and transmitted using a descriptor file.
Thus, in order for kafdrop to recognize the message, the application will need to access to the descriptor file(s).
Kafdrop will allow user to select descriptor and well as specifying name of one of the message type provided by the descriptor at runtime.

To configure a folder with protobuf descriptor file(s) (.desc), follow:
```
Expand Down Expand Up @@ -289,7 +289,7 @@ docker run -d --rm -p 9000:9000 \
|`SERVER_PORT` |The web server port to listen on. Defaults to `9000`.
|`SCHEMAREGISTRY_CONNECT `|The endpoint of Schema Registry for Avro or Protobuf message
|`SCHEMAREGISTRY_AUTH` |Optional basic auth credentials in the form `username:password`.
|`CMD_ARGS` |Command line arguments to Kafdrop, e.g. `--message.format` or `--protobufdesc.directory` or `--server.port`.
|`CMD_ARGS` |Command line arguments to Kafdrop, e.g. `--message.format` or `--protobufdesc.directory` or `--server.port`.

##### Advanced configuration
| Name |Description
Expand All @@ -306,6 +306,16 @@ docker run -d --rm -p 9000:9000 \
| `SSL_KEY_STORE_PASSWORD` | Keystore password
| `SSL_KEY_ALIAS` | Key alias

##### AWS SASL IAM
| Name |Description
|--------------------------|-------------------------------
| `KAFKA_IAM_ENABLED` |Set to `true` to use AWS SASL IAM
| `KAFKA_SASL_MECHANISM` |Set to `AWS_MSK_IAM`
| `KAFKA_SECURITY_PROTOCOL`|Set to `SASL_SSL`
| `KAFKA_SASL_JAAS_CONFIG` |Set to `software.amazon.msk.auth.iam.IAMLoginModule;`
| `KAFKA_SASL_CLIENT_CALLBACK`|Set to `software.amazon.msk.auth.iam.IAMClientCallbackHandler`
| `KAFKA_IS_SECURED` |Set to `true`

### Using Helm
Like in the Docker example, supply the files in base-64 form:

Expand Down Expand Up @@ -342,7 +352,7 @@ Add a logout page in `/usr/local/opt/nginx/html/401.html`:
Use the following snippet for `/usr/local/etc/nginx/nginx.conf`:
```
worker_processes 4;

events {
worker_connections 1024;
}
Expand Down Expand Up @@ -402,7 +412,7 @@ See [here](CONTRIBUTING.md).

To cut an official release, these are the steps:

1. Commit a new version on master that has the `-SNAPSHOT` suffix stripped (see `pom.xml`). Once the commit is merged, the CI will treat it as a release build, and will end up publishing more artifacts than the regular (non-release/snapshot) build. One of those will be a dockerhub push to the specific version and "latest" tags. (The regular build doesn't update "latest").
1. Commit a new version on master that has the `-SNAPSHOT` suffix stripped (see `pom.xml`). Once the commit is merged, the CI will treat it as a release build, and will end up publishing more artifacts than the regular (non-release/snapshot) build. One of those will be a dockerhub push to the specific version and "latest" tags. (The regular build doesn't update "latest").

2. You can then edit the release description in GitHub to describe what went into the release.

Expand Down
13 changes: 13 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
<protobuf.version>3.22.3</protobuf.version>
<testcontainers.version>1.18.0</testcontainers.version>
<kafka-libs.version>7.3.3</kafka-libs.version>
<msk.auth.version>1.0.0</msk.auth.version>
<sts.sdk.version>1.11.704</sts.sdk.version>
Comment on lines +26 to +27
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If next comment is accepted, this can be removed.

</properties>

<scm>
Expand Down Expand Up @@ -76,6 +78,11 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
<version>${msk.auth.version}</version>
Comment on lines +82 to +84
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davideicardi What's your view on this? I'd rather not become dependent on Amazon JARs. The code doesn't need them, but the dependency exists when sasl.client.callback.handler.class (SASL_CLIENT_CALLBACK_HANDLER_CLASS) is set to software.amazon.msk.auth.iam.IAMClientCallbackHandler.

I propose to resolve this by changing kafdrop.sh to allow mounting a folder with extra classes. I recently did that for another project, see hapifhir/hapi-fhir-jpaserver-starter#514

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it seems to be a good idea to me. Loading extra classes could be a nice feature also for other cases.

We just have to write a good documentation on this kind of stuff.

</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
Expand Down Expand Up @@ -143,6 +150,12 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>${sts.sdk.version}</version>
</dependency>
Comment on lines +154 to +158
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-undertow</artifactId>
Expand Down
17 changes: 15 additions & 2 deletions src/main/java/kafdrop/config/KafkaConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,22 @@ public final class KafkaConfiguration {
private String truststoreFile;
private String propertiesFile;
private String keystoreFile;
private String saslJaasConfig;
private String saslClientCallback;
private boolean iamEnabled = false;

public void applyCommon(Properties properties) {
properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerConnect);

if (isSecured) {
LOG.warn("The 'isSecured' property is deprecated; consult README.md on the preferred way to configure security");
properties.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
}

if (isSecured || securityProtocol.equals("SSL")) {
LOG.info("Setting sasl mechanism to {}", saslMechanism);
properties.put(SaslConfigs.SASL_MECHANISM, saslMechanism);

if (isSecured || securityProtocol.equals("SSL") || securityProtocol.equals("SASL_SSL")) {
LOG.info("Setting security protocol to {}", securityProtocol);
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
}

Expand All @@ -45,6 +52,12 @@ public void applyCommon(Properties properties) {
LOG.info("Assigning truststore location to {}", truststoreFile);
properties.put("ssl.truststore.location", truststoreFile);
}
LOG.info("Is IAM enabled : {}", iamEnabled);
if (iamEnabled) {
LOG.info("Setting SASL client callback {} and JAAS config to {}", saslClientCallback, saslJaasConfig);
properties.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, saslClientCallback);
properties.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
}

LOG.info("Checking keystore file {}", keystoreFile);
if (new File(keystoreFile).isFile()) {
Expand Down
9 changes: 6 additions & 3 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ kafdrop.monitor:

kafka:
brokerConnect: localhost:9092
isSecured: false
saslMechanism: "PLAIN"
securityProtocol: "SASL_PLAINTEXT"
isSecured: "${KAFKA_IS_SECURED:false}"
saslMechanism: "${KAFKA_SASL_MECHANISM:PLAIN}"
securityProtocol: "${KAFKA_SECURITY_PROTOCOL:SASL_PLAINTEXT}"
truststoreFile: "${KAFKA_TRUSTSTORE_FILE:kafka.truststore.jks}"
propertiesFile : "${KAFKA_PROPERTIES_FILE:kafka.properties}"
keystoreFile: "${KAFKA_KEYSTORE_FILE:kafka.keystore.jks}"
iamEnabled: "${KAFKA_IAM_ENABLED:false}"
saslJaasConfig: "${KAFKA_SASL_JAAS_CONFIG}"
saslClientCallback: "${KAFKA_SASL_CLIENT_CALLBACK}"