diff --git a/.gitignore b/.gitignore index 796b21a..657d773 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,6 @@ classes/ # MacOS .DS_Store +# config files +vertx-proxy/src/main/resources/config.json + diff --git a/README.md b/README.md index 15ed937..b24e900 100644 --- a/README.md +++ b/README.md @@ -6,26 +6,17 @@ The goal of this project is to provide proxy-based, topic-level encryption-at-rest for [Apache Kafka®](https://kafka.apache.org/). -To learn more about the background and architecture of topic encryption, see our [overview document](doc/README.md). -The next planned milestones in the project are: +### Documentation +To learn more about the background and architecture of topic encryption, see our [overview document](doc/README.md). -## M1, May 14: Foundation -- Technical specification of the project -- Assessment of viable proxy - - Envoy vs. a custom-developed proxy (in golang or Java) +The [getting started guide](doc/getting-started.md) explains how to compile and run the encrypting proxy for testing and evaluation. -## M2, June 04: Alpha proxy -- Initial implementation of selected proxy architecture - - stand-alone, not yet integrated +### Project structure +The project consists of two nested projects: +- [encmod](encmod/), the topic encryption module +- [vertx-proxy](vertx-proxy/), an experimental Kafka proxy for developing and testing the encryption module. -## M3, June 18: Proxy integration evaluation -- First version of the software encryption module -- Integration of encryption module with proxy -- Evaluation of proxy integration into Strimzi and build environment -## M4, July 02: Alpha Strimzi integration -- Integrate proxy with the Strimzi project -- Integrate encryption module diff --git a/common/pom.xml b/common/pom.xml new file mode 100644 index 0000000..81795ef --- /dev/null +++ b/common/pom.xml @@ -0,0 +1,6 @@ + + 4.0.0 + strimzi.io + common + 0.0.1-SNAPSHOT + \ No newline at end of file diff --git a/doc/README.md b/doc/README.md index c37649e..064b484 100644 --- a/doc/README.md +++ b/doc/README.md @@ -4,7 +4,7 @@ # Proxy-Based Topic-level Encryption at Rest for Kakfka -The goal of this project is to provide proxy-based, topic-level encryption-at-rest for [Apache Kafka](https://kafka.apache.org/). +The goal of this project is to provide proxy-based, topic-level encryption-at-rest for [Apache Kafka](https://kafka.apache.org/). This document provides an overview of the motivation and architecture of the encrypting proxy. For more details, see the [references below](#references) for links to our research paper [1] and project proposal [2]. Although Kafka provides multiple authentication methods and encrypted communication over [TLS](https://en.wikipedia.org/wiki/Transport_Layer_Security), it does not encrypt data at rest. Yet Kafka is increasingly used as a store of data, not just as a means of transferring data from one location to another. @@ -28,12 +28,13 @@ The diagram below depicts the main components of the proposal, illustrating clie One core component, the _Encryption Module_, provides the encryption functionality. A second core component, the _Proxy_, intercepts Kafka connections and delegates message processing to the Encryption Module. -Topic can be encrypted by different keys, allowing brokers to store a mix of encrypted and unencrypted data, with data owners managing the keys to their topics. +Topics can be encrypted by different keys, allowing brokers to store a mix of encrypted and unencrypted data, with data owners managing the keys to their topics. Keys will be stored in an external key management system with access policies and logging. -In the coming weeks we will be providing the specification for the core components along with a roadmap. We look forward to engaging with the Community in developing this exciting extension to Strimzi and Kafka! -P.S. The original [Strimzi proposal #17](https://github.com/strimzi/proposals/blob/master/017-kafka-topic-encryption.md) provides additional background. +## References +1. [Securing Kafka with Encryption-at-Rest](https://ieeexplore.ieee.org/abstract/document/9671388/), Chris Giblin, Sean Rooney, Pascal Vetsch, and Adrian Preston, 2021 IEEE International Conference on Big Data (Big Data) +2. The original [Strimzi proposal #17](https://github.com/strimzi/proposals/blob/master/017-kafka-topic-encryption.md) provides additional background. diff --git a/doc/getting-started.md b/doc/getting-started.md new file mode 100644 index 0000000..c0278f8 --- /dev/null +++ b/doc/getting-started.md @@ -0,0 +1,125 @@ +# Getting started + +Requirements: +- a Kafka instance, version 2.8.0 or older, which you can configure +- Java 17 +- Apache maven installed in your command line environment +- git command + + +The steps for getting started with this initial version of topic encryption are outlined below: +1. Clone the repository and set your working path. +2. Compile +3. Configure the Kafka advertised address +4. Configure the proxy +5. Run the proxy +6. Start kafka +7. Run kafka clients + +Each of these steps is described in detail below with an example. + +## Scenario + +In the scenario to get started, all components run on the same system, `localhost`. The Kafka broker can also run remotely. The minimum requirement is that one can update the broker configuration file and restart the broker. In this example, however, we run the broker locally. + +The proxy will listen on port 1234 and the broker listens on its standard port 9092 as depicted below: + +``` + Kafka client Proxy Kafka broker + o------------o 1234 o------------o 9092 +``` + +The clients are reconfigured to use port 1234 (details below). + +A policy to encrypt all topics with the same key, along with a test key management system (KMS) which returns a hard-coded AES key, is used. + +The following sections provide details for each step in running the encrypting proxy. + +### 1. Clone the repository and set your working path +``` +git clone git@github.com:strimzi/topic-encryption.git +cd topic-encryption +``` + +### 2. Compile + +``` +mvn install +``` + +### 3. Configure the Kafka broker's listeners +The address advertised by Kafka must be that of the proxy, not the broker itself. + +Modify the `advertised.listeners` property in `$KAFKA_HOME/config/server.properties` to point to the proxy host and port, as shown in the snippet below: + +``` +# The address the socket server listens on. It will get the value returned from +# java.net.InetAddress.getCanonicalHostName() if not configured. +# FORMAT: +# listeners = listener_name://host_name:port +# EXAMPLE: +# listeners = PLAINTEXT://your.host.name:9092 +listeners=PLAINTEXT://:9092 + +# Hostname and port the broker will advertise to producers and consumers. If not set, +# it uses the value for "listeners" if configured. Otherwise, it will use the value +# returned from java.net.InetAddress.getCanonicalHostName(). +advertised.listeners=PLAINTEXT://127.0.0.1:1234 +``` +Stop the Kafka broker and start it after the proxy is running. + +### 4. Configure the proxy +Set the working directory to the proxy's target folder: +``` +$ cd vertx-proxy/target/ +``` + +Create a configuration file, `config.json` and add the following JSON contents: + +``` +{ + "listening_port" : 1234, + "kafka_broker" : "localhost:9092", + "policy_repo" : "test" +} +``` +### 5. Run the proxy +With the current path set to the target directory, run the proxy with the following Java invocation: + +``` +$ java -cp vertx-proxy-0.0.1-SNAPSHOT-fat.jar io.strimzi.kafka.proxy.vertx.VertRunner +``` + +If successfully started, the following output appears: +``` +$ java -cp vertx-proxy-0.0.1-SNAPSHOT-fat.jar io.strimzi.kafka.proxy.vertx.VertRunner +WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance. +2022-04-13 10:30:12 INFO KafkaProxyVerticle:46 35 - Kafka version: 2.8.0 +2022-04-13 10:30:12 INFO KafkaProxyVerticle:75 35 - Listening on port 1234 +``` + +### 6. Start Kafka broker + +Now start the Kafka broker, for example: +``` +$KAFKA_HOME/bin/kafka-server-start.sh config/server.properties +``` + +### 7. Run Kafka clients +Start the Kafka console producer (note the proxy address in the broker list): + +``` +$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:1234 --topic enctest --producer.config config/producer.properties +``` + +Start the Kafka console consumer, like the producer, specifying the proxy host and port: +``` +$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:1234 --consumer.config config/consumer.properties --topic enctest --from-beginning +``` + +Enter arbitry data in the producer and verify that it appears in consumer. + +Inspect the topic segment files and verify they indeed are encrypted. +``` +$KAFKA_HOME/kafka-dump-log.sh --files /tmp/kafka-logs/enctest-0/00000000000000000000.log --value-decoder-class kafka.serializer.StringDecoder +``` diff --git a/encmod/README.md b/encmod/README.md new file mode 100644 index 0000000..4377af0 --- /dev/null +++ b/encmod/README.md @@ -0,0 +1,5 @@ +# Topic Encryption Module + +This component is concerned strictly with the encryption and decryption of Kafka records. + + diff --git a/encmod/pom.xml b/encmod/pom.xml new file mode 100644 index 0000000..9b6fbba --- /dev/null +++ b/encmod/pom.xml @@ -0,0 +1,41 @@ + + + + io.strimzi + topic-encryption + 0.0.1-SNAPSHOT + + 4.0.0 + encmod + encryption module + desc + + + + org.apache.kafka + kafka-clients + + + org.apache.logging.log4j + log4j-api + + + org.apache.logging.log4j + log4j-core + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.slf4j + slf4j-api + + + junit + junit + test + + + + diff --git a/encmod/src/main/java/io/strimzi/kafka/topicenc/EncModControl.java b/encmod/src/main/java/io/strimzi/kafka/topicenc/EncModControl.java new file mode 100644 index 0000000..f38bada --- /dev/null +++ b/encmod/src/main/java/io/strimzi/kafka/topicenc/EncModControl.java @@ -0,0 +1,28 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.topicenc; + +/** + * This defines the interface to the Encryption Module to functions + * controlling its internal state. So, for example, can an implementation + * receiving events from a key management system (KMS), notify the module + * to purge a key because it has expired. If we consider the + * Encryption Module's encrypt() and decrypt() functions to comprise + * the data path, this interface describes its control path. + * + * Currently this interface is a placeholder but will be continually + * extended as the implementation matures. + */ +public interface EncModControl { + + /** + * Purge the key, indicated by the keyRef argument, from any + * internal state such that the key in question is now longer used. + * This supports key revokation. + * + * @param keyref A key reference, understood by the Encryption Module and its KMS, identifying the key to purge. + */ + void purgeKey(String keyref); +} diff --git a/encmod/src/main/java/io/strimzi/kafka/topicenc/EncryptionModule.java b/encmod/src/main/java/io/strimzi/kafka/topicenc/EncryptionModule.java new file mode 100644 index 0000000..0302db3 --- /dev/null +++ b/encmod/src/main/java/io/strimzi/kafka/topicenc/EncryptionModule.java @@ -0,0 +1,253 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.topicenc; + +import java.nio.ByteBuffer; +import java.security.GeneralSecurityException; +import java.util.HashMap; +import java.util.Map; + +import javax.crypto.SecretKey; + +import org.apache.kafka.common.message.FetchResponseData.FetchablePartitionResponse; +import org.apache.kafka.common.message.FetchResponseData.FetchableTopicResponse; +import org.apache.kafka.common.message.ProduceRequestData.PartitionProduceData; +import org.apache.kafka.common.message.ProduceRequestData.TopicProduceData; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.strimzi.kafka.topicenc.enc.AesGcmEncrypter; +import io.strimzi.kafka.topicenc.enc.EncData; +import io.strimzi.kafka.topicenc.enc.EncrypterDecrypter; +import io.strimzi.kafka.topicenc.kms.KeyMgtSystem; +import io.strimzi.kafka.topicenc.policy.PolicyRepository; +import io.strimzi.kafka.topicenc.policy.TopicPolicy; +import io.strimzi.kafka.topicenc.ser.AesGcmV1SerDer; +import io.strimzi.kafka.topicenc.ser.EncSerDer; +import io.strimzi.kafka.topicenc.ser.EncSerDerException; + + +/** + * This class is the encompassing, deployable component containing + * the Kafka topic encryption implementation. + */ +public class EncryptionModule implements EncModControl { + + private static final Logger LOGGER = LoggerFactory.getLogger(EncryptionModule.class); + + private PolicyRepository policyRepo; + private KeyMgtSystem kms; + private Map keyCache; + private EncSerDer encSerDer; + + public EncryptionModule (PolicyRepository policyRepo, KeyMgtSystem kms) { + this.policyRepo = policyRepo; + this.kms = kms; + keyCache = new HashMap<>(); + encSerDer = new AesGcmV1SerDer(); + // init kms connection + // init policy Repo + // init kms cache + // create enc/dec + // init encdec cache + } + + public boolean encrypt(TopicProduceData topicData) + throws EncSerDerException, GeneralSecurityException { + + final EncrypterDecrypter encrypter; + try { + encrypter = getTopicEncrypter(topicData.name()); + } catch (Exception e1) { + LOGGER.error("Error obtaining encrypter for topic " + topicData.name()); + return false; + } + + if (encrypter == null) { + LOGGER.debug( + "No encryption - topic {} is not configured for encryption",topicData.name()); + return false; + } + + // If this far, the data should be encrypted. + // Navigate into each record and encrypt. + for (PartitionProduceData partitionData : topicData.partitionData()) { + + MemoryRecords recs = (MemoryRecords) partitionData.records(); + MemoryRecordsBuilder builder = createMemoryRecsBuilder(recs.buffer().capacity()); + for (org.apache.kafka.common.record.Record record : recs.records()) { + if (record.hasValue()) { + // encrypt record value: + byte[] plaintext = new byte[record.valueSize()]; + record.value().get(plaintext); + EncData ciphertext = encrypter.encrypt(plaintext); + + // serialize the ciphertext and metadata, add to the builder: + encSerDer.serialize(builder, record, ciphertext); + } + } + // overwrite the partition's memoryrecords with the encrypted records: + partitionData.setRecords(builder.build()); + } + return true; + } + + public boolean decrypt(FetchableTopicResponse fetchRsp) + throws EncSerDerException, GeneralSecurityException { + + String topicName = fetchRsp.topic(); + final EncrypterDecrypter encrypter; + try { + encrypter = getTopicEncrypter(topicName); + } catch (Exception e) { + LOGGER.error("Error obtaining encrypter for topic " + topicName, e); + return false; + } + + if (encrypter == null) { + LOGGER.debug( + "No decryption - topic {} is not configured for encryption", topicName); + return false; + } + + // If this far, the data was encrypted. + // Navigate into each record and decrypt. + for (FetchablePartitionResponse partitionData : fetchRsp.partitionResponses()) { + + if (LOGGER.isDebugEnabled()) { + String msg = String.format( + "partition: %d, logStartOffset: %08X, lastStableOffset: %08X, " + + "partition leader epoch: %04X", + partitionData.partition(), + partitionData.currentLeader().leaderEpoch(), + partitionData.logStartOffset(), + partitionData.lastStableOffset()); + LOGGER.debug(msg); + } + + MemoryRecords recs = (MemoryRecords) partitionData.recordSet(); + + long firstOffset = getFirstOffset(recs); + MemoryRecordsBuilder builder = + createMemoryRecsBuilder(recs.sizeInBytes(), + partitionData.currentLeader().leaderEpoch(), + firstOffset); + for (org.apache.kafka.common.record.Record record : recs.records()) { + if (record.hasValue()) { + byte[] ciphertext = new byte[record.valueSize()]; + record.value().get(ciphertext); + + // serialize value into version, iv, ciphertext: + EncData md = encSerDer.deserialize(ciphertext); + + // decrypt, add to records builder: + byte[] plaintext = encrypter.decrypt(md); + + SimpleRecord newRec = new SimpleRecord(record.timestamp(), + record.key(), + ByteBuffer.wrap(plaintext), + record.headers()); + builder.append(newRec); + } + } + // overwrite the partition's memoryrecords with the decrypted records: + MemoryRecords newRecs = builder.build(); + partitionData.setRecordSet(newRecs); + } + return true; + } + + /** + * EncMod control interface. Empty, placeholder implementation + * for the time being. + */ + @Override + public void purgeKey(String keyref) { + } + + /** + * Consults the policy db whether a topic is to be encrypted. + * If topic is not to be encrypted, returns null. + * @throws Exception + */ + protected EncrypterDecrypter getTopicEncrypter (String topicName) throws Exception { + + String topicKey = topicName.toLowerCase(); + + // first check cache + EncrypterDecrypter enc = keyCache.get(topicKey); + if (enc != null) { + return enc; + } + + // query policy db for a policy for this topic: + TopicPolicy policy = policyRepo.getTopicPolicy(topicKey); + if (policy == null) { + return null; // no encryption policy for this topic. return null + } + + // encryption policy exists for this topic. Retrieve key + SecretKey key = getKey(policy); + + // instantiate the encrypter/decrypter for this topic + // todo: factory for creating type of encrypter - comes from policy + enc = new AesGcmEncrypter(key); + + // add to cache and return + keyCache.put(topicKey, enc); + return enc; + } + + private long getFirstOffset(MemoryRecords recs) { + for (org.apache.kafka.common.record.Record r : recs.records()) { + if (r.hasValue()) { + return r.offset(); + } + } + return 0; + } + + /** + * Given a encryption policy retrieve and return the encryption key. + * @param policy + * @return + * @throws Exception + */ + private SecretKey getKey(TopicPolicy policy) throws Exception { + return kms.getKey(policy.getKeyReference()); + } + + private MemoryRecordsBuilder createMemoryRecsBuilder(int bufSize) { + return createMemoryRecsBuilder(bufSize, RecordBatch.NO_PARTITION_LEADER_EPOCH); + } + + private MemoryRecordsBuilder createMemoryRecsBuilder(int bufSize, int partitionEpoch) { + return createMemoryRecsBuilder(bufSize, partitionEpoch, 0L); + } + + private MemoryRecordsBuilder createMemoryRecsBuilder(int bufSize, int partitionEpoch, long baseOffset) { + ByteBuffer buffer = ByteBuffer.allocate(10); // will be expanded as needed + return new MemoryRecordsBuilder( + buffer, + RecordBatch.CURRENT_MAGIC_VALUE, + CompressionType.NONE, + TimestampType.CREATE_TIME, + baseOffset, + RecordBatch.NO_TIMESTAMP, // log appendTime + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + 0, // baseSequence. partitionEpoch > 0 ? (partitionEpoch-1) : 0, RecordBatch.NO_SEQUENCE, + false, // isTransactional + false, // isBatch + partitionEpoch, // RecordBatch.NO_PARTITION_LEADER_EPOCH, + bufSize); + } +} diff --git a/encmod/src/main/java/io/strimzi/kafka/topicenc/LogUtils.java b/encmod/src/main/java/io/strimzi/kafka/topicenc/LogUtils.java new file mode 100644 index 0000000..1f8bc66 --- /dev/null +++ b/encmod/src/main/java/io/strimzi/kafka/topicenc/LogUtils.java @@ -0,0 +1,63 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.topicenc; + +import java.util.Base64; + +public class LogUtils { + + public static String base64Encode(byte[] rawBytes) { + return Base64.getEncoder().encodeToString(rawBytes); + } + + public static byte[] base64Decode(String base64Str) { + return Base64.getDecoder().decode(base64Str); + } + + public static void hexDump(String title, byte[] buffer) { + + if (buffer == null) { + return; + } + if (title != null) { + title = String.format("%s (buffer.length=%d %04X bytes)", title, buffer.length, buffer.length); + System.out.println(title); + } + final String MID_FILLER = " "; + StringBuilder hex = new StringBuilder(); + StringBuilder chars = new StringBuilder(); + int numBytes = buffer.length; + int i = 0; + for (i = 0; i < numBytes; i++) { + + if ((i > 0) && (i % 16 == 0)) { + hex.append(MID_FILLER); + hex.append(chars); + hex.append('\n'); + chars = new StringBuilder(); + } + byte b = buffer[i]; + hex.append(String.format("%02X ", b)); + if (b >= 0x20 && b < 0x7F) { + chars.append((char) b); + } else { + chars.append('.'); + } + } + + // loop over. add remainders + if (chars.length() > 0) { + for (int j = i % 16; j < 16; j++) { + hex.append(" "); + } + hex.append(MID_FILLER); + hex.append(chars); + hex.append('\n'); + } + // for now, write to stdout + System.out.println(hex); + } + +} diff --git a/encmod/src/main/java/io/strimzi/kafka/topicenc/enc/AesGcmEncrypter.java b/encmod/src/main/java/io/strimzi/kafka/topicenc/enc/AesGcmEncrypter.java new file mode 100644 index 0000000..a38b551 --- /dev/null +++ b/encmod/src/main/java/io/strimzi/kafka/topicenc/enc/AesGcmEncrypter.java @@ -0,0 +1,70 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.topicenc.enc; + +import java.security.GeneralSecurityException; + +import javax.crypto.Cipher; +import javax.crypto.SecretKey; +import javax.crypto.spec.GCMParameterSpec; + +/** + * An Encrypter/Decrypter for AES GCM. + */ +public class AesGcmEncrypter implements EncrypterDecrypter { + + public static final int IV_SIZE = 16; // bytes + public static final int KEY_SIZE = 128; // for now + private static final String JCE_PROVIDER = "SunJCE"; // for now + + private final String transformation; + private final SecretKey key; + + public AesGcmEncrypter(SecretKey key) { + this.key = key; + this.transformation = CryptoUtils.AES256_GCM_NOPADDING; + } + + @Override + public EncData encrypt(byte[] plaintext) throws GeneralSecurityException { + byte[] iv = CryptoUtils.createRandom(IV_SIZE); + return encrypt(plaintext, iv); + } + + @Override + public EncData encrypt(byte[] plaintext, byte[] iv) throws GeneralSecurityException { + Cipher encCipher = createEncryptionCipher(transformation, key, iv); + byte[] ciphertext = encCipher.doFinal(plaintext); + return new EncData(iv, ciphertext); + } + + @Override + public byte[] decrypt(EncData encData) throws GeneralSecurityException { + // every encryption assumed to have its own IV + Cipher decCipher = createDecryptionCipher(transformation, key, encData.getIv()); + return decCipher.doFinal(encData.getCiphertext()); + } + + private static Cipher createEncryptionCipher(String transformation, SecretKey key, byte[] iv) + throws GeneralSecurityException { + return createCipher(Cipher.ENCRYPT_MODE, transformation, key, iv); + } + + private static Cipher createDecryptionCipher(String transformation, SecretKey key, byte[] iv) + throws GeneralSecurityException { + return createCipher(Cipher.DECRYPT_MODE, transformation, key, iv); + } + + private static Cipher createCipher(int mode, String transformation, SecretKey key, byte[] iv) + throws GeneralSecurityException { + if (iv == null || iv.length == 0) { + throw new GeneralSecurityException("Initialization vector either null or empty."); + } + Cipher cipher = Cipher.getInstance(transformation, JCE_PROVIDER); + GCMParameterSpec gcmSpec = new GCMParameterSpec(KEY_SIZE, iv); + cipher.init(mode, key, gcmSpec); + return cipher; + } +} diff --git a/encmod/src/main/java/io/strimzi/kafka/topicenc/enc/CryptoUtils.java b/encmod/src/main/java/io/strimzi/kafka/topicenc/enc/CryptoUtils.java new file mode 100644 index 0000000..b82c5ed --- /dev/null +++ b/encmod/src/main/java/io/strimzi/kafka/topicenc/enc/CryptoUtils.java @@ -0,0 +1,65 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.topicenc.enc; + +import java.security.NoSuchAlgorithmException; +import java.security.Provider; +import java.security.SecureRandom; +import java.security.Security; +import java.util.Map; + +import javax.crypto.KeyGenerator; +import javax.crypto.SecretKey; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Commonly general purpose cryptographic functions and definitions. + */ +public class CryptoUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(CryptoUtils.class); + + public static final String AES = "AES"; + public static final String AES_GCM_PADDING = AES + "/GCM/PKCS5Padding"; + public static final String AES256_GCM_NOPADDING = "AES_256/GCM/NoPadding"; + + /** + * Create an array of bytes with random bits, suitable for use + * as nonce or initialization vector. + * + * @param sizeBytes + * @return + */ + public static byte[] createRandom(int numBytes) { + byte[] buf = new byte[numBytes]; + new SecureRandom().nextBytes(buf); + return buf; + } + + public static SecretKey generateKey(String algo, int keySize) throws NoSuchAlgorithmException { + KeyGenerator kgen = KeyGenerator.getInstance(algo); + kgen.init(keySize); + return kgen.generateKey(); + } + + public static SecretKey generateAesKey(int keySize) throws NoSuchAlgorithmException { + return generateKey(AES, keySize); + } + + public static void logCiphers() { + for (Provider provider : Security.getProviders()) { + LOGGER.debug("Cipher provider: {}", provider.getName()); + for (Map.Entry entry : provider.entrySet()) { + if (((String) entry.getValue()).contains("GCM")) { + LOGGER.debug("key: [%s] value: [%s]%n", + entry.getKey(), + entry.getValue()); + } + } + } + } +} diff --git a/encmod/src/main/java/io/strimzi/kafka/topicenc/enc/EncData.java b/encmod/src/main/java/io/strimzi/kafka/topicenc/enc/EncData.java new file mode 100644 index 0000000..4de31ff --- /dev/null +++ b/encmod/src/main/java/io/strimzi/kafka/topicenc/enc/EncData.java @@ -0,0 +1,28 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.topicenc.enc; + +/** + * Encrypted data. Contains the ciphertext and IV. + * This class covers symmetric block ciphers such as AES. + * Other types of encrypted data classes are possible in the future. + */ +public class EncData { + private final byte[] iv; + private final byte[] ciphertext; + + public EncData(byte[] iv, byte[] ciphertext) { + this.iv = iv; + this.ciphertext = ciphertext; + } + + public byte[] getIv() { + return iv; + } + + public byte[] getCiphertext() { + return ciphertext; + } +} diff --git a/encmod/src/main/java/io/strimzi/kafka/topicenc/enc/EncrypterDecrypter.java b/encmod/src/main/java/io/strimzi/kafka/topicenc/enc/EncrypterDecrypter.java new file mode 100644 index 0000000..4a03e52 --- /dev/null +++ b/encmod/src/main/java/io/strimzi/kafka/topicenc/enc/EncrypterDecrypter.java @@ -0,0 +1,26 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.topicenc.enc; + +import java.security.GeneralSecurityException; + +/** + * As the name suggests, an Encrypter/Decrypter is a component which encrypts and decrypts messages. + * With this interface, implementers can develop a variety of encryption functions. + */ +public interface EncrypterDecrypter { + + /** + * Encrypt, internally generating a nonce/IV. + * @param plaintext + * @return + * @throws Exception + */ + EncData encrypt(byte[] plaintext) throws GeneralSecurityException; + + EncData encrypt(byte[] plaintext, byte[] iv) throws GeneralSecurityException; + + byte[] decrypt(EncData encMetadata) throws GeneralSecurityException; +} diff --git a/encmod/src/main/java/io/strimzi/kafka/topicenc/kms/KeyMgtSystem.java b/encmod/src/main/java/io/strimzi/kafka/topicenc/kms/KeyMgtSystem.java new file mode 100644 index 0000000..dcee839 --- /dev/null +++ b/encmod/src/main/java/io/strimzi/kafka/topicenc/kms/KeyMgtSystem.java @@ -0,0 +1,14 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.topicenc.kms; + +import javax.crypto.SecretKey; + +public interface KeyMgtSystem { + + void setCredential(String cred); + + SecretKey getKey(String keyReference); +} diff --git a/encmod/src/main/java/io/strimzi/kafka/topicenc/kms/TestKms.java b/encmod/src/main/java/io/strimzi/kafka/topicenc/kms/TestKms.java new file mode 100644 index 0000000..e494263 --- /dev/null +++ b/encmod/src/main/java/io/strimzi/kafka/topicenc/kms/TestKms.java @@ -0,0 +1,33 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.topicenc.kms; + +import java.security.NoSuchAlgorithmException; +import java.util.Base64; + +import javax.crypto.SecretKey; +import javax.crypto.spec.SecretKeySpec; + +/** For test only. This class to be moved into test source path */ +public class TestKms implements KeyMgtSystem { + + private static String TEST_KEY = "bfUup8fs92bnOHlghWXegCJleHhbnNaf31RZL0d6r/I="; + + SecretKey key; + + public TestKms() throws NoSuchAlgorithmException { + byte[] decodedKey = Base64.getDecoder().decode(TEST_KEY); + key = new SecretKeySpec(decodedKey, 0, decodedKey.length, "AES"); + } + + @Override + public void setCredential(String cred) { + } + + @Override + public SecretKey getKey(String keyReference) { + return key; + } +} diff --git a/encmod/src/main/java/io/strimzi/kafka/topicenc/policy/EncryptionPolicy.java b/encmod/src/main/java/io/strimzi/kafka/topicenc/policy/EncryptionPolicy.java new file mode 100644 index 0000000..b8237c1 --- /dev/null +++ b/encmod/src/main/java/io/strimzi/kafka/topicenc/policy/EncryptionPolicy.java @@ -0,0 +1,24 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.topicenc.policy; + +import java.util.ArrayList; +import java.util.List; + +public class EncryptionPolicy { + + public static final String ALL_TOPICS = "*"; + + List kmsDefinitions = new ArrayList<>(); + List topicPolicies = new ArrayList<>(); + + + public List getKmsDefinitions() { + return kmsDefinitions; + } + public List getTopicPolicies() { + return topicPolicies; + } +} diff --git a/encmod/src/main/java/io/strimzi/kafka/topicenc/policy/KmsDefinition.java b/encmod/src/main/java/io/strimzi/kafka/topicenc/policy/KmsDefinition.java new file mode 100644 index 0000000..f8de539 --- /dev/null +++ b/encmod/src/main/java/io/strimzi/kafka/topicenc/policy/KmsDefinition.java @@ -0,0 +1,54 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.topicenc.policy; + +import java.net.URL; + +public class KmsDefinition { + + private URL url; + private String name; + private String instance; + private String description; + private String type; + private String credential; + + public URL getUrl() { + return url; + } + public void setUrl(URL url) { + this.url = url; + } + public String getName() { + return name; + } + public void setName(String name) { + this.name = name; + } + public String getInstance() { + return instance; + } + public void setInstance(String instance) { + this.instance = instance; + } + public String getDescription() { + return description; + } + public void setDescription(String description) { + this.description = description; + } + public String getType() { + return type; + } + public void setType(String type) { + this.type = type; + } + public String getCredential() { + return credential; + } + public void setCredential(String credential) { + this.credential = credential; + } +} diff --git a/encmod/src/main/java/io/strimzi/kafka/topicenc/policy/PolicyRepository.java b/encmod/src/main/java/io/strimzi/kafka/topicenc/policy/PolicyRepository.java new file mode 100644 index 0000000..35a86cc --- /dev/null +++ b/encmod/src/main/java/io/strimzi/kafka/topicenc/policy/PolicyRepository.java @@ -0,0 +1,23 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.topicenc.policy; + +/** + * This is the interface to an implementation of a + * policy repository containing information on + * topics to encrypt. Such a repository can + * take many forms such as a JSON file, a REST service, + * a database, etc. The encryption module uses this interface + * to retrieve policy information without being concerned with + * the implementation details on repository housing policy. + * A trivial implementation of this interface is TestPolicyRepo + * used for testing. + */ +public interface PolicyRepository { + + TopicPolicy getTopicPolicy(String topicName); + + KmsDefinition getKmsDefinition(String kmsName); +} diff --git a/encmod/src/main/java/io/strimzi/kafka/topicenc/policy/TestPolicyRepo.java b/encmod/src/main/java/io/strimzi/kafka/topicenc/policy/TestPolicyRepo.java new file mode 100644 index 0000000..0ff0234 --- /dev/null +++ b/encmod/src/main/java/io/strimzi/kafka/topicenc/policy/TestPolicyRepo.java @@ -0,0 +1,48 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.topicenc.policy; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * An implementation of a policy repository used for testing. + */ +public class TestPolicyRepo implements PolicyRepository { + + + List kmsDefinitions = new ArrayList<>(); + List topicPolicies = new ArrayList<>(); + + Map policyMap = new HashMap<>(); + Map kmsMap = new HashMap<>(); + + public TestPolicyRepo() { + TopicPolicy policy = new TopicPolicy(); + policy.setEncMethod("AesGcmV1"); + policy.setKeyReference("test"); + policy.setTopic(EncryptionPolicy.ALL_TOPICS); + + policyMap.put(EncryptionPolicy.ALL_TOPICS, policy); + } + + @Override + public TopicPolicy getTopicPolicy(String topicName) { + + // wildcard has priority: + TopicPolicy policy = policyMap.get(EncryptionPolicy.ALL_TOPICS); + if (policy != null) { + return policy; + } + return policyMap.get(topicName.toLowerCase()); + } + + @Override + public KmsDefinition getKmsDefinition(String kmsName) { + return kmsMap.get(kmsName.toLowerCase()); + } +} diff --git a/encmod/src/main/java/io/strimzi/kafka/topicenc/policy/TopicPolicy.java b/encmod/src/main/java/io/strimzi/kafka/topicenc/policy/TopicPolicy.java new file mode 100644 index 0000000..ad102be --- /dev/null +++ b/encmod/src/main/java/io/strimzi/kafka/topicenc/policy/TopicPolicy.java @@ -0,0 +1,45 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.topicenc.policy; + +public class TopicPolicy { + + private String topic; + private String encMethod; + private String keyReference; + private KmsDefinition kms; + private String credential; + + public String getTopic() { + return topic; + } + public void setTopic(String topic) { + this.topic = topic; + } + public String getEncMethod() { + return encMethod; + } + public void setEncMethod(String encMethod) { + this.encMethod = encMethod; + } + public String getKeyReference() { + return keyReference; + } + public void setKeyReference(String keyReference) { + this.keyReference = keyReference; + } + public KmsDefinition getKms() { + return kms; + } + public void setKms(KmsDefinition kms) { + this.kms = kms; + } + public String getCredential() { + return credential; + } + public void setCredential(String credential) { + this.credential = credential; + } +} diff --git a/encmod/src/main/java/io/strimzi/kafka/topicenc/ser/AesGcmV1SerDer.java b/encmod/src/main/java/io/strimzi/kafka/topicenc/ser/AesGcmV1SerDer.java new file mode 100644 index 0000000..d471c40 --- /dev/null +++ b/encmod/src/main/java/io/strimzi/kafka/topicenc/ser/AesGcmV1SerDer.java @@ -0,0 +1,93 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.topicenc.ser; + +import java.nio.ByteBuffer; + +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; + +import io.strimzi.kafka.topicenc.enc.EncData; + +/** + * Serializes and deserializes messages encrypted with AES GCM. + * Actually we need access to requests so we can access headers if needed. + */ +public class AesGcmV1SerDer implements EncSerDer { + + public static final short VERSION = 1; + private static final String VERSION_ERRMSG = "Unsupported serialization version: %d, expected %d"; + + @Override + public byte[] serialize(EncData md) throws EncSerDerException { + int len = Short.BYTES + // version + Short.BYTES + // iv length + md.getIv().length + // iv + Integer.BYTES + // data len + md.getCiphertext().length; // data + + ByteBuffer buf = ByteBuffer.allocate(len); + buf.putShort(VERSION); + buf.putShort((short) md.getIv().length); + buf.put(md.getIv()); + buf.putInt(md.getCiphertext().length); + buf.put(md.getCiphertext()); + return buf.array(); + } + + @Override + public void serialize(MemoryRecordsBuilder builder, Record r, EncData encResult) throws EncSerDerException { + byte[] serializedBuf = serialize(encResult); + builder.append(r.timestamp(), + r.key() != null ? r.key().array() : null, + serializedBuf, + r.headers()); + } + + @Override + public EncData deserialize(byte[] msg) throws EncSerDerException { + ByteBuffer buf = ByteBuffer.wrap(msg); + int bufLen = buf.remaining(); + + if (bufLen < 2*Short.BYTES) { + throw new EncSerDerException("Message too small, cannot deserialize."); + } + short version = buf.getShort(); + if (version != VERSION) { + String errMsg = createVersionErrMsg(version, VERSION); + throw new EncSerDerException(errMsg); + } + + short ivLen = buf.getShort(); + if (ivLen == 0) { + throw new EncSerDerException("Invalid message: IV length is 0."); + } + if (ivLen > bufLen) { + throw new EncSerDerException("Invalid message: IV length exceeds message length."); + } + + byte[] iv = new byte[ivLen]; + buf.get(iv); + + if ((buf.position() + Integer.BYTES) > bufLen) { + throw new EncSerDerException("Invalid message: message too short."); + } + int ciphertextLen = buf.getInt(); + if (ciphertextLen < 0) { + throw new EncSerDerException("Invalid message: negative ciphertext length."); + } else if ((buf.position() + ciphertextLen) > bufLen) { + throw new EncSerDerException("Invalid message: ciphertext length exceeds message length"); + } + + byte[] ciphertext = new byte[ciphertextLen]; + buf.get(ciphertext); + EncData result = new EncData(iv, ciphertext); + return result; + } + + private static String createVersionErrMsg(short rcvd, short expected) { + return String.format(VERSION_ERRMSG, rcvd, expected); + } +} diff --git a/encmod/src/main/java/io/strimzi/kafka/topicenc/ser/EncSerDer.java b/encmod/src/main/java/io/strimzi/kafka/topicenc/ser/EncSerDer.java new file mode 100644 index 0000000..bfde0e0 --- /dev/null +++ b/encmod/src/main/java/io/strimzi/kafka/topicenc/ser/EncSerDer.java @@ -0,0 +1,19 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.topicenc.ser; + +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; + +import io.strimzi.kafka.topicenc.enc.EncData; + +public interface EncSerDer { + + byte[] serialize(EncData md) throws EncSerDerException; + + void serialize(MemoryRecordsBuilder builder, Record r, EncData md) throws EncSerDerException; + + EncData deserialize(byte[] msg) throws EncSerDerException; +} diff --git a/encmod/src/main/java/io/strimzi/kafka/topicenc/ser/EncSerDerException.java b/encmod/src/main/java/io/strimzi/kafka/topicenc/ser/EncSerDerException.java new file mode 100644 index 0000000..7033aa7 --- /dev/null +++ b/encmod/src/main/java/io/strimzi/kafka/topicenc/ser/EncSerDerException.java @@ -0,0 +1,14 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.topicenc.ser; + +public class EncSerDerException extends Exception { + + private static final long serialVersionUID = 8654177305331008275L; + + public EncSerDerException(String msg) { + super(msg); + } +} diff --git a/encmod/src/test/java/io/strimzi/kafka/topicenc/enc/AesGcmEncTests.java b/encmod/src/test/java/io/strimzi/kafka/topicenc/enc/AesGcmEncTests.java new file mode 100644 index 0000000..dac4456 --- /dev/null +++ b/encmod/src/test/java/io/strimzi/kafka/topicenc/enc/AesGcmEncTests.java @@ -0,0 +1,143 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.topicenc.enc; + +import static org.junit.Assert.fail; + +import java.nio.charset.StandardCharsets; +import java.security.NoSuchAlgorithmException; +import java.util.Random; + +import javax.crypto.SecretKey; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import io.strimzi.kafka.topicenc.kms.TestKms; +import io.strimzi.kafka.topicenc.ser.AesGcmV1SerDer; +import io.strimzi.kafka.topicenc.ser.EncSerDerException; + +public class AesGcmEncTests { + + private static final String TEST_MSG = "abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()_-=+[]{}"; + + TestKms kms; + AesGcmEncrypter enc; + + @Before + public void testsInit() throws NoSuchAlgorithmException { + kms = new TestKms(); + SecretKey key = kms.getKey("test"); + enc = new AesGcmEncrypter(key); + } + + /** + * Basic test of encryption, decryption of AES GCM encrypter. + */ + @Test + public void basicTestAesGcm() { + + byte[] testMsg = TEST_MSG.getBytes(StandardCharsets.UTF_8); + + EncData encData; + try { + encData = enc.encrypt(testMsg); + } catch (Exception e) { + fail("Error encrypting test message: " + e.toString()); + return; + } + try { + byte[] plaintext = enc.decrypt(encData); + Assert.assertArrayEquals(testMsg, plaintext); + + String plaintextStr = new String(plaintext, "UTF-8"); + Assert.assertEquals(TEST_MSG, plaintextStr); + + } catch (Exception e) { + fail("Error deencrypting test message: " + e.toString()); + } + } + + /** + * Basic test of serialization, deserialization of encrypted data. + */ + @Test + public void basicTestSerDer() { + byte[] testMsg = TEST_MSG.getBytes(StandardCharsets.UTF_8); + testSerDer(testMsg); + } + + /** + * Tests serialization, deserialization of encrypted data + * using buffers of random size and data. + */ + @Test + public void randomTestSerDer() { + int numTests = 200; + + // tiny + testRandomSerDer(1, 2, numTests); + // small + testRandomSerDer(10, 100, numTests); + // medium + testRandomSerDer(500, 2500, numTests); + // large + testRandomSerDer(100000, 500000, numTests); + // extra large + testRandomSerDer(1000000, 5000000, numTests); + } + + private void testRandomSerDer(int minBufSize, int maxBufSize, int iterations) { + Random rand = new Random(); + + for (int i=0; i< iterations; i++) { + int bufSize = rand.nextInt(maxBufSize - minBufSize) + minBufSize; + byte[] buf = CryptoUtils.createRandom(bufSize); + + testSerDer(buf); + } + } + + private void testSerDer(byte[] testMsg) { + + // encrypt test message + EncData encData; + try { + encData = enc.encrypt(testMsg); + } catch (Exception e) { + fail("Error encrypting test message: " + e.toString()); + return; + } + + // serialize encrypted message + AesGcmV1SerDer serder = new AesGcmV1SerDer(); + byte[] serialized; + try { + serialized = serder.serialize(encData); + } catch (EncSerDerException e) { + fail("Error serializing encrypted test message: " + e.toString()); + return; + } + + // deserialize serialized buf + EncData deserialized; + try { + deserialized = serder.deserialize(serialized); + } catch (EncSerDerException e) { + fail("Error deserializing serialized test message: " + e.toString()); + return; + } + + // decrypt deserialized data, assert plaintext equals original message. + try { + byte[] decrypted = enc.decrypt(deserialized); + Assert.assertArrayEquals(testMsg, decrypted); + + } catch (Exception e) { + fail("Error decrypting test message: " + e.toString()); + } + } +} diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..57ab473 --- /dev/null +++ b/pom.xml @@ -0,0 +1,149 @@ + + + 4.0.0 + io.strimzi + topic-encryption + pom + 0.0.1-SNAPSHOT + + + + Apache License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0.txt + + + + Strimzi Topic Encryption + Provides encryption-at-rest at the granularity of a topic through an encrypting proxy. + https://github.com/strimzi/topic-encryption + + + scm:git:git://github.com/strimzi/topic-encryption.git + scm:git:ssh://github.com:strimzi/topic-encryption.git + https://github.com/strimzi/topic-encryption + + + + GitHub + https://github.com/strimzi/topic-encryption/issues + + + + + Chris Giblin + cgi@zurich.ibm.com + IBM Research - Zurich + https://www.zurich.ibm.com/ + + + + + 17 + 17 + 3.8.1 + 3.0.0-M5 + 3.0.0-M5 + 3.3.0 + 3.1.0 + 3.0.1 + 3.1.1 + 3.1.0 + 3.8.1 + 3.2.4 + 2.22.2 + 1.6.0 + + 4.2.4 + + 2.17.2 + 1.7.36 + + 5.8.2 + 4.13.1 + 1.8.2 + 1.3.2 + + 2.8.0 + + + + + encmod + vertx-proxy + + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + org.slf4j + slf4j-api + ${slf4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + junit + junit + ${junit.version} + test + + + io.vertx + vertx-core + ${vertx.version} + + + io.vertx + vertx-config + ${vertx.version} + + + org.junit.jupiter + junit-jupiter-engine + ${jupiter.version} + test + + + org.apiguardian + apiguardian-api + + + + + org.junit.jupiter + junit-jupiter-api + ${jupiter.version} + test + + + org.junit.platform + junit-platform-commons + ${junit.platform.version} + + + org.junit.platform + junit-platform-launcher + ${junit.platform.version} + + + + + diff --git a/vertx-proxy/README.md b/vertx-proxy/README.md new file mode 100644 index 0000000..4dd8295 --- /dev/null +++ b/vertx-proxy/README.md @@ -0,0 +1,6 @@ +# vertx-proxy + +This component implements a minimal Kafka proxy providing a runtime context for development and testing of the [topic encryption module](../encmod). The proxy implementation is currently considered experimental and subject to change. As the name reveals, the [vert.x framework](https://vertx.io/) has been used as the foundation for this proxy. + + + diff --git a/vertx-proxy/pom.xml b/vertx-proxy/pom.xml new file mode 100644 index 0000000..8ad0329 --- /dev/null +++ b/vertx-proxy/pom.xml @@ -0,0 +1,103 @@ + + + + io.strimzi + topic-encryption + 0.0.1-SNAPSHOT + + 4.0.0 + vertx-proxy + proxy + Experimental kafka proxy based on vert.x + https://github.com/strimzi/topic-encryption + + + 3.8.1 + 3.2.4 + 2.22.2 + + + + + junit + junit + test + + + io.vertx + vertx-core + + + io.vertx + vertx-config + + + org.apache.kafka + kafka-clients + + + io.strimzi + encmod + 0.0.1-SNAPSHOT + + + org.apache.logging.log4j + log4j-api + + + org.apache.logging.log4j + log4j-core + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.slf4j + slf4j-api + + + + + + + maven-compiler-plugin + ${maven-compiler-plugin.version} + + 11 + + + + maven-shade-plugin + ${maven-shade-plugin.version} + + + package + + shade + + + + + + ${launcher.class} + ${main.verticle} + + + + + ${project.build.directory}/${project.artifactId}-${project.version}-fat.jar + + + + + + + maven-surefire-plugin + ${maven-surefire-plugin.version} + + + + + \ No newline at end of file diff --git a/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/AbstractKafkaMsg.java b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/AbstractKafkaMsg.java new file mode 100644 index 0000000..c95afb5 --- /dev/null +++ b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/AbstractKafkaMsg.java @@ -0,0 +1,39 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.proxy.vertx; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import io.vertx.core.buffer.Buffer; + +public class AbstractKafkaMsg { + + protected final static int MSG_SIZE_LEN = 4; + + Buffer rawMsg; + ByteBuffer payload; + + public AbstractKafkaMsg(Buffer rawMsg) { + this.rawMsg = rawMsg; + } + + public Buffer getRawMsg() { + return rawMsg; + } + + public ByteBuffer getPayload() { + if (payload == null) { + payload = extractKafkaPayload(rawMsg); + } + return payload; + } + + protected ByteBuffer extractKafkaPayload(Buffer kafkaMsg) { + // copy bytes after leading 4 bytes containing message len: + byte[] dataBytes = Arrays.copyOfRange(kafkaMsg.getBytes(), 4, kafkaMsg.getBytes().length); + return ByteBuffer.wrap(dataBytes); + } +} diff --git a/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/Config.java b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/Config.java new file mode 100644 index 0000000..d12860c --- /dev/null +++ b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/Config.java @@ -0,0 +1,75 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.proxy.vertx; + +import io.strimzi.kafka.topicenc.policy.PolicyRepository; +import io.strimzi.kafka.topicenc.policy.TestPolicyRepo; +import io.vertx.core.json.JsonObject; + +public class Config { + + public final class PropertyNames { + public static final String LISTENING_PORT = "listening_port"; + public static final String KAFKA_BROKERS = "kafka_broker"; + public static final String POLICY_REPO = "policy_repo"; + private PropertyNames() {} + } + + private final String brokers; + private final PolicyRepository policyRepo; + private final int listeningPort; + + public Config (int listeningPort, String kafkaHostname, PolicyRepository policyRepo) { + this.listeningPort = listeningPort; + this.brokers = kafkaHostname; + this.policyRepo = policyRepo; + } + + public int getListeningPort() { + return listeningPort; + } + + public String kafkaHostname() { + return brokers; + } + + public PolicyRepository policyRepo() { + return policyRepo; + } + + public static Config toConfig(JsonObject jsonConfig) { + String brokers = getParam(jsonConfig, PropertyNames.KAFKA_BROKERS); + if (brokers.indexOf(':') == -1) { + throw new IllegalArgumentException("Broker must be specified as 'hostname:port'"); + } + int listeningPort = getIntParam(jsonConfig, PropertyNames.LISTENING_PORT); + + String policyRepo = getParam(jsonConfig, PropertyNames.POLICY_REPO); + if (!policyRepo.equalsIgnoreCase("test")) { + // only test repo supported currently. + throw new IllegalArgumentException("Unsupported policy repo"); + } + return new Config(listeningPort, brokers, new TestPolicyRepo()); + } + + private static String getParam(JsonObject jsonConfig, String paramName) { + String param = jsonConfig.getString(paramName); + if (isEmpty(param)) { + throw new IllegalArgumentException("Configuration missing field, " + paramName); + } + return param; + } + + private static int getIntParam(JsonObject jsonConfig, String paramName) { + if (! jsonConfig.containsKey(paramName)) { + throw new IllegalArgumentException("Configuration missing field, " + paramName); + } + return jsonConfig.getInteger(paramName); + } + + private static boolean isEmpty(String s) { + return s == null || s.isBlank(); + } +} diff --git a/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/KafkaProxyVerticle.java b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/KafkaProxyVerticle.java new file mode 100644 index 0000000..b27489c --- /dev/null +++ b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/KafkaProxyVerticle.java @@ -0,0 +1,80 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.proxy.vertx; + + +import org.apache.kafka.common.utils.AppInfoParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.strimzi.kafka.topicenc.EncryptionModule; +import io.strimzi.kafka.topicenc.kms.TestKms; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.AsyncResult; +import io.vertx.core.Context; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; +import io.vertx.core.net.NetServer; +import io.vertx.core.net.NetServerOptions; + +public class KafkaProxyVerticle extends AbstractVerticle { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProxyVerticle.class); + + public static final String CTX_KEY_CONFIG = "topicenc.config"; + public static final String CTX_KEY_ENCMOD = "topicenc.encmod"; + + private Config config; + private EncryptionModule encMod; + + @Override + public void init(Vertx vertx, Context context) { + super.init(vertx, context); + JsonObject jsonConfig = config(); + + try { + config = Config.toConfig(jsonConfig); + } catch (Exception e) { + LOGGER.error("Configuration error", e); + System.exit(1); + } + + LOGGER.info("Kafka version: " + AppInfoParser.getVersion()); + + // currently "manual" config: + context.put(CTX_KEY_CONFIG, config); + try { + encMod = new EncryptionModule(config.policyRepo(), new TestKms()); + context.put(CTX_KEY_ENCMOD, encMod); + } catch (Exception e) { + LOGGER.error("Error creating encryption module", e); + System.exit(1); + } + } + + @Override + public void start(Promise promise) { + LOGGER.debug("starting"); + + NetServerOptions opts = new NetServerOptions(); + opts.setPort(config.getListeningPort()); + + vertx.createNetServer(opts) + .connectHandler(new TopicEncryptingSocketHandler(context)) + .listen(new Handler>() { + @Override + public void handle(AsyncResult event) { + if (event.failed()) { + LOGGER.info("Listen failed: " + event.cause().toString()); + } + else if (event.succeeded()) { + LOGGER.info("Listening on port " + opts.getPort()); + } + } + }); + } +} diff --git a/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/KafkaReqMsg.java b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/KafkaReqMsg.java new file mode 100644 index 0000000..7ba7b5d --- /dev/null +++ b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/KafkaReqMsg.java @@ -0,0 +1,49 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.proxy.vertx; + +import java.util.Arrays; + +import org.apache.kafka.common.requests.RequestHeader; + +import io.vertx.core.buffer.Buffer; + +public class KafkaReqMsg extends AbstractKafkaMsg { + + private final static int FIXED_HEADER_LEN = 10; + + private RequestHeader header; + private byte[] headerBytes; + + public KafkaReqMsg(Buffer rawMsg) { + super(rawMsg); + } + + public RequestHeader getHeader() { + if (header == null) { + + header = RequestHeader.parse(getPayload()); + } + return header; + } + + public byte[] getHeaderBytes() { + if (headerBytes == null) { + // to do: clarify +1 + int headerSize = FIXED_HEADER_LEN + getClientIdLen(); + int destIndex = MSG_SIZE_LEN + headerSize + 1; + headerBytes = Arrays.copyOfRange(rawMsg.getBytes(), + MSG_SIZE_LEN, + destIndex); + } + return headerBytes; + } + + private int getClientIdLen() { + String clientId = header.clientId(); + int clientIdLen = clientId != null ? clientId.length() : 0; + return clientIdLen; + } +} diff --git a/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/KafkaRspMsg.java b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/KafkaRspMsg.java new file mode 100644 index 0000000..5006c24 --- /dev/null +++ b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/KafkaRspMsg.java @@ -0,0 +1,61 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.proxy.vertx; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.kafka.common.requests.ResponseHeader; + +import io.vertx.core.buffer.Buffer; + +public class KafkaRspMsg extends AbstractKafkaMsg { + + private final static int FIXED_HEADER_LEN = 4; + + private ResponseHeader header; + private byte[] headerBytes; + private final short version; + + public KafkaRspMsg(Buffer rawMsg, short version) { + super(rawMsg); + this.version = version; + } + + public short getVersion() { + return version; + } + + public ResponseHeader getHeader() { + if (header == null) { + header = ResponseHeader.parse(getPayload(), version); + } + return header; + } + + public byte[] getHeaderBytes() { + + if (headerBytes == null) { + // to do: test, not correct + int headerSize = FIXED_HEADER_LEN; + int destIndex = MSG_SIZE_LEN + headerSize + 1; + headerBytes = Arrays.copyOfRange(rawMsg.getBytes(), + MSG_SIZE_LEN, + destIndex); + } + return headerBytes; + } + + @Override + protected ByteBuffer extractKafkaPayload(Buffer kafkaMsg) { + // copy bytes after leading 4 bytes containing message len: + int msgLen = kafkaMsg.length(); + int headerLen = MSG_SIZE_LEN; // + FIXED_HEADER_LEN; + return ByteBuffer.wrap(kafkaMsg.getBytes(headerLen, msgLen)); + //byte[] dataBytes = Arrays.copyOfRange(kafkaMsg.getBytes(), 0, kafkaMsg.getBytes().length); + //return ByteBuffer.wrap(dataBytes); + } + +} diff --git a/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/LogUtils.java b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/LogUtils.java new file mode 100644 index 0000000..4ae8b0a --- /dev/null +++ b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/LogUtils.java @@ -0,0 +1,63 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.proxy.vertx; + +import io.vertx.core.buffer.Buffer; + +/** + * Quick and dirty hex dump methods for troubleshooting. + */ +public class LogUtils { + + private static final String MID_COLUMN = " "; + + public static void hexDump(String title, Buffer buffer) { + hexDump(title, buffer.getBytes()); + } + + public static void hexDump(String title, byte[] buffer) { + + if (buffer == null) { + return; + } + if (title != null) { + title = String.format("%s (buffer.length=%d %04X bytes)", title, buffer.length, buffer.length); + System.out.println(title); + } + StringBuilder hex = new StringBuilder(); + StringBuilder chars = new StringBuilder(); + int i = 0; + for (i = 0; i < buffer.length; i++) { + + if ((i > 0) && (i % 16 == 0)) { + hex.append(MID_COLUMN); + hex.append(chars); + hex.append('\n'); + chars = new StringBuilder(); + } + byte b = buffer[i]; + hex.append(String.format("%02X ", b)); + if (b >= 0x20 && b < 0x7F) { + chars.append((char) b); + } else { + chars.append('.'); + } + } + + // loop over. add remainders + if (chars.length() > 0) { + if (i % 16 != 0) { + for (int j = i % 16; j < 16; j++) { + hex.append(" "); + } + } + hex.append(MID_COLUMN); + hex.append(chars); + hex.append('\n'); + } + // for now, write to stdout + System.out.println(hex); + } +} diff --git a/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/MessageAccumulator.java b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/MessageAccumulator.java new file mode 100644 index 0000000..1c5744e --- /dev/null +++ b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/MessageAccumulator.java @@ -0,0 +1,49 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.proxy.vertx; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.vertx.core.buffer.Buffer; + +/** + * Receives and appends Kafka message fragments and answers + * whether the message is complete based on the message length + * in the first 4 bytes of the message. + */ +public class MessageAccumulator { + + private static final Logger LOGGER = LoggerFactory.getLogger(MessageAccumulator.class); + + private Buffer buffer; + + public MessageAccumulator() { + buffer = Buffer.buffer(0); + } + + public void append(Buffer buffer) { + if (LOGGER.isDebugEnabled()) { + LogUtils.hexDump("Msg append", buffer); + } + this.buffer.appendBuffer(buffer); + } + + public boolean isComplete() { + return MsgUtil.isBufferComplete(buffer); + } + + public Buffer getBuffer() { + return buffer; + } + + public void reset() { + buffer = Buffer.buffer(0); + } + + public boolean isEmpty() { + return buffer.length() == 0; + } +} diff --git a/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/MessageHandler.java b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/MessageHandler.java new file mode 100644 index 0000000..3980808 --- /dev/null +++ b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/MessageHandler.java @@ -0,0 +1,423 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.proxy.vertx; + +import java.security.GeneralSecurityException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.FetchResponseData.FetchableTopicResponse; +import org.apache.kafka.common.message.ProduceRequestData.TopicProduceData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.ProduceRequest; +import org.apache.kafka.common.requests.RequestHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.strimzi.kafka.topicenc.EncryptionModule; +import io.strimzi.kafka.topicenc.ser.EncSerDerException; +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetSocket; + +/** + * Handles message flow between a single socket with a Kafka client + * and a single socket to the broker. As such, class variables are + * scoped to a socket pair. + * Global objects such as the Encryption Module are passed through the + * vertx context argument to the constructor. + */ +public class MessageHandler implements Handler { + + private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandler.class); + + private Context context; + private final Config config; + private final EncryptionModule encMod; + private NetSocket clientSocket; + private NetClient brokerClient; + private Future brokerSocketFuture; + private Map fetchHeaderCache = new HashMap<>(); + private MessageAccumulator currBrokerRsp = new MessageAccumulator(); + private MessageAccumulator currClientReq = new MessageAccumulator(); + + /** + * The real constructor, as opposed to the test constructor. + * + * @param context + * @param clientSocket + */ + public MessageHandler(Context context, NetSocket clientSocket) { + this.context = context; + this.clientSocket = clientSocket; + //clientSocket.pause(); + + this.config = context.get(KafkaProxyVerticle.CTX_KEY_CONFIG); + if (Objects.isNull(config)) { + throw new NullPointerException("No config object"); + } + + this.encMod = context.get(KafkaProxyVerticle.CTX_KEY_ENCMOD); + if (Objects.isNull(encMod)) { + throw new NullPointerException("No encryption module"); + } + + connectToBroker(clientSocket); + LOGGER.info("MessageHandler created {}", brokerSocketFuture.isComplete()); + } + + private void connectToBroker(NetSocket clientSocket) { + + this.brokerClient = context.owner().createNetClient(); + String broker = config.kafkaHostname(); + String[] tokens = broker.split(":"); + if (tokens == null || tokens.length != 2) { + throw new IllegalArgumentException("Broker must be specified as 'hostname:port'"); + } + int port = Integer.valueOf(tokens[1]); + String hostname = tokens[0]; + + // open connection in a thread and then wait later + //CompletableFuture c = CompletableFuture. + //context.executeBlocking(fut -> { + LOGGER.debug("Connecting to broker {}", broker); + this.brokerSocketFuture = brokerClient.connect(port, hostname); + brokerSocketFuture + .onSuccess(socket -> { + LOGGER.debug("broker connected. Thread = {}", Thread.currentThread().getName()); + socket + .handler(buffer -> { + try { + processBrokerResponse(buffer); + } catch (EncSerDerException | GeneralSecurityException e) { + LOGGER.error("Error decrypting broker response", e); + // to do: forward error msg to client + } + }) + .closeHandler(brokerClose -> { + LOGGER.debug("connection closed by broker"); + clientSocket.close(); + //clientSocket.resume(); + }); + }) + .onFailure(e -> { + LOGGER.debug("Error connecting to broker", e); + }); + } + + /** + * This constructor used for testing. + * + * @param encMod + * @param config + */ + public MessageHandler(EncryptionModule encMod, Config config) { + super(); + if (Objects.isNull(encMod)) { + throw new NullPointerException("Null encryption module"); // different exception? + } + this.encMod = encMod; + this.config = config; + } + + /** + * Close and clear resources so the message handler does not hang + * around the heap after the client socket has been closed. + */ + public void close() { + LOGGER.debug("Closing Message Handler"); + clientSocket = null; + context = null; + fetchHeaderCache.clear(); + fetchHeaderCache = null; + NetSocket brokerSocket = brokerSocketFuture.result(); + if (brokerSocket != null) { + brokerSocket.close(); + } + brokerSocketFuture = null; + } + + /** + * This is the main entry to message handling, triggered by the + * arrival of data from the Kafka client. The received buffer + * may not be a complete Kafka message, so we assemble + * fragments until we have a complete message. + */ + @Override + public void handle(Buffer buffer) { + + LOGGER.debug("Request buffer from client arrived"); + currClientReq.append(buffer); + + boolean isComplete = currClientReq.isComplete(); + LOGGER.debug("Kafka msg complete {}", isComplete); + if (!isComplete) { + return; + } + + // We have a complete kafka msg - process it, forward to broker + Buffer sendBuffer = null; + try { + processRequest(currClientReq.getBuffer()); + } catch (EncSerDerException | GeneralSecurityException e) { + LOGGER.error("Encryption error processing request", e); + // send back Kafka error msg + return; + } + currClientReq.reset(); + forwardToBroker(sendBuffer); + } + + /** + * Inspects the incoming Kafka request message and dispatches it + * depending on apikey (i.e., Kafka message type). If not a request + * type we are interested in, returns the unaltered buffer + * so it is forwarded to the broker as-is. + * + * @param buffer + * @return + * @throws GeneralSecurityException + * @throws EncSerDerException + */ + public Buffer processRequest(Buffer buffer) throws EncSerDerException, GeneralSecurityException { + if (buffer.length() < 10) { + LOGGER.debug("processRequest(): buffer too small, ignoring."); + return buffer; + } + short apikey = MsgUtil.getApiKey(buffer); + + if (LOGGER.isDebugEnabled()) { + int corrId = MsgUtil.getReqCorrId(buffer); + String apikeyName = ApiKeys.forId(apikey).name(); + LOGGER.debug("Request: apikey = {}, corrid = {}, socket = {}", + apikeyName, corrId, + clientSocket == null ? "null" : + clientSocket.remoteAddress().toString()); + } + // dispatch based on apikey.Currently PRODUCE and FETCH only + if (apikey == ApiKeys.PRODUCE.id) { + return processProduceRequest(buffer); + } else if (apikey == ApiKeys.FETCH.id) { + return processFetchRequest(buffer); + } else { + // not interested in the msg type - pass back as-is. + return buffer; + } + } + + /** + * Process a produce request. We introspect the request and determine + * whether it contains topic data which should be encrypted. If so, + * the encrypted records are re-computed and overwrite the original + * plaintext. + * + * @param kafkaMsg + * @return + * @throws GeneralSecurityException + * @throws EncSerDerException + */ + public Buffer processProduceRequest(Buffer buffer) + throws EncSerDerException, GeneralSecurityException { + + if (LOGGER.isDebugEnabled()) { + LogUtils.hexDump("client->proxy: PRODUCE request", buffer); + } + // create an KafkaReqMsg instance - this enables access to the header, apiversion + KafkaReqMsg kafkaMsg; + try { + kafkaMsg = new KafkaReqMsg(buffer); + + } catch (IllegalArgumentException e) { + LOGGER.error("Error parsing PRODUCE request", e); + if (LOGGER.isDebugEnabled()) { + LogUtils.hexDump("Request causing error", buffer); + } + return buffer; + } + + // deserialize the msg to a Produce instance: + ProduceRequest req = ProduceRequest.parse(kafkaMsg.getPayload(), + kafkaMsg.getHeader().apiVersion()); + + // iterate over the request's partitions, passing them to the + // encryption module where they are encrypted, if required. + int numEncryptions = 0; + if (req.data() != null && req.data().topicData() != null) { + for (TopicProduceData topicData : req.data().topicData()) { + boolean wasEncrypted = encMod.encrypt(topicData); + if (wasEncrypted) { + numEncryptions++; + } + } + } + + if (numEncryptions == 0) { + // no encryptions performed, return original buffer as-is + return buffer; + } + // records were altered by encrypting. Serialize and return the modified message + return MsgUtil.toSendBuffer(kafkaMsg.getHeaderBytes(), req); + } + + /** + * Process fetch requests. We cache the fetch request headers + * in order to identify fetch responses on the back flow. + * + * @param kafkaMsg + * @return + */ + private Buffer processFetchRequest(Buffer buffer) { + try { + // cache the request header which we need later for response processing + KafkaReqMsg req = new KafkaReqMsg(buffer); + fetchHeaderCache.put(req.getHeader().correlationId(), + req.getHeader()); + + if (LOGGER.isDebugEnabled()) { + FetchRequest fetch = FetchRequest.parse(req.getPayload(), + req.getHeader().apiVersion()); + + String msg = String.format("FETCH epoch = %04X, session = %04X", + fetch.metadata().epoch(), fetch.metadata().sessionId()); + LOGGER.debug(msg); + } + return req.rawMsg; + + } catch (Exception e) { + LOGGER.error("Error in processFetchRequest()", e); + if (LOGGER.isDebugEnabled()) { + LogUtils.hexDump("processFetchRequest: Error parsing buffer", buffer); + } + return buffer; + } + } + + /** + * Once a client request is processed it is forwarded to the broker. + * + * @param sendBuffer + */ + private void forwardToBroker(Buffer sendBuffer) { + + if (!brokerSocketFuture.isComplete()) { + LOGGER.info("broker socket not ready. Thread = {}", Thread.currentThread().getName()); + // broker socket not ready. Return an empty buffer to get off the thread. + // This is a hack ... must find a better way to handle this. + //clientSocket.write(Buffer.buffer(0)); + return; + } + if (brokerSocketFuture.failed()) { + LOGGER.error("broker connection failed", brokerSocketFuture.cause()); + return; + } + if (brokerSocketFuture.failed()) { + LOGGER.error("broker connection failed", brokerSocketFuture.cause()); + return; + } + NetSocket brokerSocket = brokerSocketFuture.result(); + brokerSocket.write(sendBuffer); + LOGGER.debug("Forwarded message to broker"); + } + + /** + * This method is the handler for broker responses. + * We check responses as to whether they match a cached Fetch request, + * based on correlation ID. If so, pass to the encryption module + * to check whether decryption is needed. + * + * @param brokerRsp + * @throws GeneralSecurityException + * @throws EncSerDerException + */ + public void processBrokerResponse(Buffer brokerRsp) + throws EncSerDerException, GeneralSecurityException { + // accumulate message fragments + currBrokerRsp.append(brokerRsp); + if (!currBrokerRsp.isComplete()) { + return; + } + + // if this far, we have a complete message. process it. + Buffer brokerRspMsg = currBrokerRsp.getBuffer(); + + int corrId = MsgUtil.getRspCorrId(brokerRspMsg); + if (corrId != -1) { + RequestHeader reqHeader = fetchHeaderCache.get(corrId); + if (reqHeader != null) { + // The response matches a recently cached fetch request. + LOGGER.debug("Broker response matches cached FETCH req header corrId={}", corrId); + fetchHeaderCache.remove(corrId); + + // processFetchResponse calls enc module for decryption: + brokerRspMsg = processFetchResponse(brokerRspMsg, reqHeader); + } + } + + // Finished with broker response processing. + // Forward to the Kafka client. + Future writeFuture = clientSocket.write(brokerRspMsg); + + // logging: + final Buffer rspBuf = brokerRspMsg; + writeFuture.onSuccess(h -> { + if (LOGGER.isDebugEnabled()) { + String msg = String.format("proxy->client: broker response corrId=%d (%02X), thread = %s, socket=%s", + corrId, corrId, Thread.currentThread().getName(), clientSocket.remoteAddress().toString()); + LogUtils.hexDump(msg, rspBuf); + } + }); + // reset the broker rsp buffer: + currBrokerRsp.reset(); + } + + /** + * Fetch responses are processed here. We navigate the topic responses, + * passing to the encryption module which determines if they are to be decrypted. + * + * @param buffer + * @param reqHeader + * @return + * @throws GeneralSecurityException + * @throws EncSerDerException + */ + public Buffer processFetchResponse(Buffer buffer, RequestHeader reqHeader) + throws EncSerDerException, GeneralSecurityException { + // instantiate FetchResponse instance + KafkaRspMsg rsp = new KafkaRspMsg(buffer, reqHeader.apiVersion()); + FetchResponse fetch = + (FetchResponse) AbstractResponse.parseResponse(rsp.getPayload(), reqHeader); + + // iterate through response records, decrypting where needed + FetchResponseData data = fetch.data(); + if (data == null) { + return buffer; + } + List responses = data.responses(); + int numDecryptions = 0; + for (FetchableTopicResponse topicRsp : responses) { + boolean wasDecrypted = encMod.decrypt(topicRsp); + if (wasDecrypted) { + numDecryptions++; + } + } + + if (numDecryptions == 0) { + // no decryptions were performed, return original buffer + return buffer; + } else { + // records were decrypted. Serialize and return the modified message + return MsgUtil.toSendBuffer(fetch, reqHeader); + } + } +} diff --git a/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/MsgUtil.java b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/MsgUtil.java new file mode 100644 index 0000000..7d11809 --- /dev/null +++ b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/MsgUtil.java @@ -0,0 +1,144 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.proxy.vertx; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Objects; + +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.requests.RequestUtils; +import org.apache.kafka.common.requests.ResponseHeader; + +import io.vertx.core.buffer.Buffer; + +/** + * Utility methods for processing Kafka messages. + */ +public class MsgUtil { + + /** + * Given a Kafka request in form of a vert.x Buffer, return the Apikey. + * This intended as a lightweight way of identifying request types without + * instantiating/deserializing the bytes into a full request instance. + * + * @param buffer + * @return -1 if there is something wrong with the input. + */ + public static short getApiKey(Buffer buffer) { + if (Objects.isNull(buffer) || buffer.length() < 6) { + return -1; + } + ByteBuffer bb = ByteBuffer.allocate(2); + bb.order(ByteOrder.BIG_ENDIAN); + bb.put(buffer.getByte(4)); + bb.put(buffer.getByte(5)); + return bb.getShort(0); + } + + /** + * Given a Kafka response in form of a vert.x Buffer, return the correlation ID. + * This intended as a lightweight way of identifying a response without + * instantiating/deserializing the bytes into a full response instance. + * + * @param buffer + * @return -1 if there is something wrong with the input. + */ + public static int getRspCorrId(Buffer buffer) { + if (Objects.isNull(buffer) || buffer.length() < 8) { + return -1; + } + ByteBuffer bb = ByteBuffer.allocate(4); + bb.order(ByteOrder.BIG_ENDIAN); + bb.put(buffer.getByte(4)); + bb.put(buffer.getByte(5)); + bb.put(buffer.getByte(6)); + bb.put(buffer.getByte(7)); + return bb.getInt(0); + } + + public static int getReqCorrId(Buffer buffer) { + if (Objects.isNull(buffer) || buffer.length() < 12) { + return -1; + } + ByteBuffer bb = ByteBuffer.allocate(4); + bb.order(ByteOrder.BIG_ENDIAN); + bb.put(buffer.getByte(8)); + bb.put(buffer.getByte(9)); + bb.put(buffer.getByte(10)); + bb.put(buffer.getByte(11)); + return bb.getInt(0); + } + + public static int getMsgLen(Buffer buffer) { + if (Objects.isNull(buffer) || buffer.length() < 4) { + return -1; + } + return buffer.getInt(0); + } + + /** + * Serialize a request into a Kafka send buffer + * + * @param header + * @param req + * @return + */ + public static Buffer toSendBuffer(byte[] header, AbstractRequest req) { + ByteBuffer serializedReq = req.serialize(); + ByteBuffer msgLenBuf = ByteBuffer.allocate(Integer.BYTES); + msgLenBuf.putInt(header.length + serializedReq.array().length); + + Buffer sendBuffer = Buffer.buffer(msgLenBuf.array()); + sendBuffer.appendBytes(header); + sendBuffer.appendBytes(serializedReq.array()); + return sendBuffer; + } + + /** + * Serialize a request into a Kafka send buffer + * + * @param header + * @param req + * @return + */ + public static Buffer toSendBuffer(FetchResponse fetchRsp, RequestHeader reqHeader) { + ByteBuffer serializedRsp = serialize(fetchRsp, reqHeader); + byte[] rspBytes = serializedRsp.array(); + int bufLen = Integer.BYTES + rspBytes.length; + ByteBuffer bufTmp = ByteBuffer.allocate(bufLen); + bufTmp.putInt(rspBytes.length); + bufTmp.put(rspBytes); + return Buffer.buffer(bufTmp.array()); + } + + /** + * Unlike requests, the serialize() method in the base AbstractResponse + * class is not public so we cannot call it. Therefore we have to do + * the work here ourselves. + * + * @param fetchRsp + * @return + */ + private static ByteBuffer serialize(FetchResponse fetchRsp, RequestHeader reqHeader) { + ResponseHeader rspHeader = new ResponseHeader(reqHeader.correlationId(), reqHeader.apiVersion()); + //System.out.println("****** req: " + reqHeader.apiVersion() + " rsp: " + rspHeader.headerVersion()); + return RequestUtils.serialize(rspHeader.data(), rspHeader.headerVersion(), fetchRsp.data(), reqHeader.apiVersion()); + } + + public static boolean isBufferComplete(Buffer buffer) { + if (buffer.length() < 4) { + return false; + } + int msgLen = buffer.getInt(0); + return isBufferComplete(buffer, msgLen); + } + + public static boolean isBufferComplete(Buffer buffer, int msgLen) { + return msgLen == buffer.length() - 4; + } +} diff --git a/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/TopicEncryptingSocketHandler.java b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/TopicEncryptingSocketHandler.java new file mode 100644 index 0000000..b6f7abf --- /dev/null +++ b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/TopicEncryptingSocketHandler.java @@ -0,0 +1,86 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.proxy.vertx; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.strimzi.kafka.topicenc.EncryptionModule; +import io.vertx.core.Context; +import io.vertx.core.Handler; +import io.vertx.core.net.NetSocket; + +/** + * This handler integrates the Encryption Module + * into the vert.x proxy framework. Here we are receiving client requests, + * processing them if needed, forwarding to the broker, processing + * the response, and forwarding to the client. + * The philosophy is to instantiate only requests we are interested in. + * All messages which we either are not interested in or which we do + * not change are passed on to the broker as is, in original form. + * The encryption module is configured and instantiated outside of this + * handler and passed here by means of the vert.x Context. + */ +public class TopicEncryptingSocketHandler implements Handler { + + private static final Logger LOGGER = LoggerFactory.getLogger(TopicEncryptingSocketHandler.class); + + final Context context; + final Map activeHandlers = new HashMap<>(); + + /** + * Constructor. The handler retrieves config and enccryption module + * instances from the provided context. + * @param context + */ + public TopicEncryptingSocketHandler(Context context) { + //super(); + this.context = context; + + // validate context contents at this early stage: + EncryptionModule encMod = context.get(KafkaProxyVerticle.CTX_KEY_ENCMOD); + if (Objects.isNull(encMod)) { + throw new NullPointerException("No encryption module"); + } + Config config = context.get(KafkaProxyVerticle.CTX_KEY_CONFIG); + if (Objects.isNull(config)) { + throw new NullPointerException("No config object"); + } + } + + /** + * Here client sockets are received. + */ + @Override + public void handle(NetSocket clientSocket) { + LOGGER.info("New client socket " + clientSocket.remoteAddress().toString()); + + // create a message handler and store in the activeHandlers map: + MessageHandler msgHandler = new MessageHandler(context, clientSocket); + activeHandlers.put(clientSocket, msgHandler); + + // assign the socket's handlers, most notably the msgHandler + clientSocket.pause(); + clientSocket + .handler(msgHandler) + .exceptionHandler(e -> { + LOGGER.info("Client socket exception: {}",e);} + ) + .closeHandler(x -> { + LOGGER.info("Client socket closed: {} ", clientSocket.remoteAddress().toString()); + // activeHandlers should be concurrent map + MessageHandler h = activeHandlers.remove(clientSocket); + if (h != null) { + h.close(); + } + }); + clientSocket.resume(); + } +} + diff --git a/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/VertRunner.java b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/VertRunner.java new file mode 100644 index 0000000..d308ab3 --- /dev/null +++ b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/VertRunner.java @@ -0,0 +1,41 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.proxy.vertx; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.vertx.config.ConfigRetriever; +import io.vertx.config.ConfigRetrieverOptions; +import io.vertx.config.ConfigStoreOptions; +import io.vertx.core.DeploymentOptions; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; + +public class VertRunner { + + private static final Logger LOGGER = LoggerFactory.getLogger(VertRunner.class); + + public static void main(String[] args) { + + ConfigStoreOptions fileStore = new ConfigStoreOptions() + .setType("file") + .setConfig(new JsonObject().put("path", "config.json")); + ConfigRetrieverOptions retrOptions = new ConfigRetrieverOptions(); + retrOptions.addStore(fileStore); + + Vertx vertx = Vertx.vertx(); + ConfigRetriever retriever = ConfigRetriever.create(vertx, retrOptions); + retriever.getConfig(ar -> { + if (ar.failed()) { + LOGGER.error("Unable to load configuration", ar.cause()); + System.exit(1); + } + JsonObject config = ar.result(); + KafkaProxyVerticle proxy = new KafkaProxyVerticle(); + vertx.deployVerticle(proxy, new DeploymentOptions().setConfig(config)); + }); + } +} diff --git a/vertx-proxy/src/main/resources/log4j2.properties b/vertx-proxy/src/main/resources/log4j2.properties new file mode 100644 index 0000000..363da4d --- /dev/null +++ b/vertx-proxy/src/main/resources/log4j2.properties @@ -0,0 +1,11 @@ +name = TestConfig + +appender.console.type = Console +appender.console.name = STDOUT +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %tid - %m%n + +rootLogger.level = ${env:STRIMZI_LOG_LEVEL:-INFO} +rootLogger.appenderRefs = stdout +rootLogger.appenderRef.console.ref = STDOUT +rootLogger.additivity = false \ No newline at end of file diff --git a/vertx-proxy/src/test/java/io/strimzi/kafka/proxy/vertx/EncModTests.java b/vertx-proxy/src/test/java/io/strimzi/kafka/proxy/vertx/EncModTests.java new file mode 100644 index 0000000..0380a04 --- /dev/null +++ b/vertx-proxy/src/test/java/io/strimzi/kafka/proxy/vertx/EncModTests.java @@ -0,0 +1,120 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.proxy.vertx; + +import java.io.File; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.List; + +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.FetchResponseData.FetchableTopicResponse; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import io.strimzi.kafka.topicenc.EncryptionModule; +import io.strimzi.kafka.topicenc.kms.TestKms; +import io.strimzi.kafka.topicenc.policy.TestPolicyRepo; +import io.strimzi.kafka.topicenc.ser.EncSerDerException; +import io.vertx.core.buffer.Buffer; + +public class EncModTests { + + EncryptionModule encMod; + Config dummyConfig; + + @Before + public void testsInit() throws NoSuchAlgorithmException { + } + + @Test + public void testEncryption() throws IOException, NoSuchAlgorithmException { + encMod = new EncryptionModule(new TestPolicyRepo(), new TestKms()); + dummyConfig = new Config(0, "localhost",null); + var handler = new MessageHandler(encMod, dummyConfig); + + File reqDataFile = new File("src/test/resources/produce_request.hex"); + byte[] prodReq = TestDataFileUtil.hexToBin(reqDataFile); + + var reqBuf = Buffer.buffer(prodReq); + + try { + Buffer sendBuf = handler.processProduceRequest(reqBuf); + boolean equal = Arrays.equals(prodReq, sendBuf.getBytes()); + Assert.assertFalse("Message was not encrypted", equal); + + } catch (EncSerDerException | GeneralSecurityException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + @Test + public void testDecryption() throws IOException, NoSuchAlgorithmException { + testDecryption(new File("src/test/resources/fetch_response.hex")); + testDecryption(new File("src/test/resources/fetch_multi_response.hex")); + } + + private void testDecryption(File rspMsgFile) throws IOException, NoSuchAlgorithmException { + byte[] fetchRsp = TestDataFileUtil.hexToBin(rspMsgFile); + LogUtils.hexDump("FETCH response encrypted", fetchRsp); + + // set up so we can call the handler + encMod = new EncryptionModule(new TestPolicyRepo(), new TestKms()); + dummyConfig = this.createDummyConfig(); + var handler = new MessageHandler(encMod, dummyConfig); + var rspBuf = Buffer.buffer(fetchRsp); + int corrId = MsgUtil.getRspCorrId(rspBuf); + var reqHeader = new RequestHeader(ApiKeys.FETCH, (short) 12, "console-producer", corrId); + + // decrypt: + Buffer fetchRspBuf; + try { + fetchRspBuf = handler.processFetchResponse(rspBuf, reqHeader); + } catch (EncSerDerException | GeneralSecurityException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + return; + } + LogUtils.hexDump("FETCH response decrypted", fetchRspBuf); + + // instantiate the decrypted fetch response + KafkaRspMsg rsp = new KafkaRspMsg(fetchRspBuf, reqHeader.apiVersion()); + FetchResponse fetch = (FetchResponse) AbstractResponse.parseResponse(rsp.getPayload(), reqHeader); + + FetchResponseData data = fetch.data(); + navigate(data); + } + + private void navigate(FetchResponseData data) { + // navigate into the decrypted response. + // This tests the integrity of the decrypted response. + List responses = data.responses(); + for (FetchableTopicResponse topicRsp : responses) { + topicRsp.partitionResponses().forEach(pd -> { + MemoryRecords recs = (MemoryRecords) pd.recordSet(); + recs.records().forEach(r -> { + if (r.hasValue()) { + byte[] recordData = new byte[r.valueSize()]; + r.value().get(recordData); + LogUtils.hexDump("Record data", recordData); + } + }); + }); + } + } + + private Config createDummyConfig() { + return new Config(0, "localhost",null); + } +} diff --git a/vertx-proxy/src/test/java/io/strimzi/kafka/proxy/vertx/MsgUtilTests.java b/vertx-proxy/src/test/java/io/strimzi/kafka/proxy/vertx/MsgUtilTests.java new file mode 100644 index 0000000..8a12dac --- /dev/null +++ b/vertx-proxy/src/test/java/io/strimzi/kafka/proxy/vertx/MsgUtilTests.java @@ -0,0 +1,49 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.proxy.vertx; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.junit.Assert; +import org.junit.Test; + +import io.vertx.core.buffer.Buffer; + +public class MsgUtilTests { + + @Test + public void testMsgUtil() throws FileNotFoundException, IOException { + + // produce request + File testDataFile = new File("src/test/resources/produce_request.hex"); + byte[] prodReq = TestDataFileUtil.hexToBin(testDataFile); + LogUtils.hexDump("PRODUCE request", prodReq); + + Buffer buf = Buffer.buffer(prodReq); + short apikey = MsgUtil.getApiKey(buf); + Assert.assertEquals(ApiKeys.PRODUCE.id, apikey); + + int expectedMsgLen = buf.length() - 4; + int msgLen = MsgUtil.getMsgLen(buf); + Assert.assertEquals(msgLen, expectedMsgLen); + + boolean isCompleted = MsgUtil.isBufferComplete(buf); + Assert.assertEquals(true, isCompleted); + + isCompleted = MsgUtil.isBufferComplete(buf, expectedMsgLen); + Assert.assertEquals(true, isCompleted); + + // fetch response + File rspDataFile = new File("src/test/resources/fetch_response.hex"); + byte[] fetchRsp = TestDataFileUtil.hexToBin(rspDataFile); + + var rspBuf = Buffer.buffer(fetchRsp); + int corrId = MsgUtil.getRspCorrId(rspBuf); + Assert.assertEquals("Correlation ID", (int) 0x4B, corrId); + } +} diff --git a/vertx-proxy/src/test/java/io/strimzi/kafka/proxy/vertx/TestDataFileUtil.java b/vertx-proxy/src/test/java/io/strimzi/kafka/proxy/vertx/TestDataFileUtil.java new file mode 100644 index 0000000..aea7636 --- /dev/null +++ b/vertx-proxy/src/test/java/io/strimzi/kafka/proxy/vertx/TestDataFileUtil.java @@ -0,0 +1,60 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.proxy.vertx; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class TestDataFileUtil { + + public static byte[] hexToBin(File file) throws FileNotFoundException, IOException { + try (BufferedReader in = new BufferedReader(new FileReader(file))) { + List data = new ArrayList<>(); + String line; + while ((line = in.readLine()) != null) { + line = line.trim(); + if (line.length() == 0 || line.charAt(0) == '#') { + continue; + } + int numBytes = 0; + for (int i = 0; i < line.length() && numBytes < 16; i+=3, numBytes++) { + if (line.charAt(i) != ' ') { + byte b = hexToByte(line.charAt(i), line.charAt(i+1)); + data.add(b); + } + } + } + return toByteArray(data); + } + } + + public static byte hexToByte(char c1, char c2) { + int n1 = hexCharToInt(c1); + int n2 = hexCharToInt(c2); + return (byte) ((n1 << 4) + n2); + } + + public static int hexCharToInt(char hexChar) { + int n = Character.digit(hexChar, 16); + if (n == -1) { + throw new IllegalArgumentException( + "Invalid Hexadecimal Character: "+ hexChar); + } + return n; + } + + private static byte[] toByteArray(List list) { + byte[] array = new byte[list.size()]; + for (int i = 0; i < list.size(); i++) { + array[i] = list.get(i).byteValue(); + } + return array; + } +} diff --git a/vertx-proxy/src/test/resources/fetch_multi_response.hex b/vertx-proxy/src/test/resources/fetch_multi_response.hex new file mode 100644 index 0000000..6edbbe3 --- /dev/null +++ b/vertx-proxy/src/test/resources/fetch_multi_response.hex @@ -0,0 +1,19 @@ +00 00 01 29 00 00 00 91 00 00 00 00 00 00 00 76 ...)...........v +CA 9F BE 02 06 74 65 73 74 33 02 00 00 00 00 00 .....test3...... +00 00 00 00 00 00 00 00 06 00 00 00 00 00 00 00 ................ +06 00 00 00 00 00 00 00 00 00 FF FF FF FF EB 01 ................ +00 00 00 00 00 00 00 04 00 00 00 6A 00 00 00 00 ...........j.... +02 25 05 53 C8 00 00 00 00 00 00 00 00 01 7F F5 .%.S............ +26 E2 97 00 00 01 7F F5 26 E2 97 FF FF FF FF FF &.......&....... +FF FF FF FF FF FF FF FF FF 00 00 00 01 70 00 00 .............p.. +00 01 64 00 01 00 10 D4 E5 58 65 2B 56 36 01 B4 ..d......Xe+V6.. +C1 FA 0B 9D 92 D9 E3 00 00 00 1A FA EF 00 EC 9F ................ +49 AA 74 E6 C7 B4 84 3F 4C 3A D8 44 1E C6 D0 8E I.t....?L:.D.... +F0 0A 42 61 5A 00 00 00 00 00 00 00 00 05 00 00 ..BaZ........... +00 68 00 00 00 00 02 A0 2A 32 7C 00 00 00 00 00 .h......*2|..... +00 00 00 01 7F F5 29 3E 02 00 00 01 7F F5 29 3E ......)>......)> +02 FF FF FF FF FF FF FF FF FF FF FF FF FF FF 00 ................ +00 00 01 6C 00 00 00 01 60 00 01 00 10 B1 D8 9F ...l....`....... +7B 63 D9 F5 DC 57 CD EA CB DD 3D FC 98 00 00 00 {c...W....=..... +18 54 78 A4 97 E9 54 DE 94 DD 85 60 29 B3 D3 FD .Tx...T....`)... +CC EB 22 2A E7 8F D9 77 A8 00 00 00 00 .."*...w..... diff --git a/vertx-proxy/src/test/resources/fetch_response.hex b/vertx-proxy/src/test/resources/fetch_response.hex new file mode 100644 index 0000000..43531c3 --- /dev/null +++ b/vertx-proxy/src/test/resources/fetch_response.hex @@ -0,0 +1,13 @@ +00 00 00 C7 00 00 00 4B 00 00 00 00 00 00 00 22 .......K......." +65 67 82 02 06 74 65 73 74 33 02 00 00 00 00 00 eg...test3...... +00 00 00 00 00 00 00 00 02 00 00 00 00 00 00 00 ................ +02 00 00 00 00 00 00 00 00 00 FF FF FF FF 89 01 ................ +00 00 00 00 00 00 00 01 00 00 00 7C 00 00 00 00 ...........|.... +02 DB 37 04 A9 00 00 00 00 00 00 00 00 01 7F EB ..7............. +B6 77 F6 00 00 01 7F EB B6 77 F6 FF FF FF FF FF .w.......w...... +FF FF FF FF FF FF FF FF FF 00 00 00 01 92 01 00 ................ +00 00 01 84 01 00 01 00 10 BE 70 D6 64 2C DC 47 ..........p.d,.G +3E 5D 90 44 E8 C7 C7 B2 BD 00 00 00 2A 51 2C C1 >].D........*Q,. +9A DA 24 65 3F 96 D2 27 D2 6D 44 59 EB 9A B4 DE ..$e?..'.mDY.... +42 F3 65 CE 1E 2D BC A9 84 D4 8C 8B F1 81 A9 E5 B.e..-.......... +F3 CC 58 27 6A DF FE 00 00 00 00 ..X'j...... diff --git a/vertx-proxy/src/test/resources/produce_request.hex b/vertx-proxy/src/test/resources/produce_request.hex new file mode 100644 index 0000000..9d5d173 --- /dev/null +++ b/vertx-proxy/src/test/resources/produce_request.hex @@ -0,0 +1,9 @@ +00 00 00 81 00 00 00 09 00 00 00 73 00 10 63 6F ...........s..co +6E 73 6F 6C 65 2D 70 72 6F 64 75 63 65 72 00 00 nsole-producer.. +00 01 00 00 05 DC 02 06 74 65 73 74 33 02 00 00 ........test3... +00 00 50 00 00 00 00 00 00 00 00 00 00 00 43 FF ..P...........C. +FF FF FF 02 C8 70 22 F0 00 00 00 00 00 00 00 00 .....p"......... +01 7F EA 9E 20 02 00 00 01 7F EA 9E 20 02 FF FF .... ....... ... +FF FF FF FF FF FF FF FF FF FF FF FF 00 00 00 01 ................ +22 00 00 00 01 16 30 31 32 33 34 35 36 37 38 39 ".....0123456789 +30 00 00 00 00 0.... \ No newline at end of file diff --git a/vertx-proxy/src/test/resources/produce_request2.hex b/vertx-proxy/src/test/resources/produce_request2.hex new file mode 100644 index 0000000..d76bca7 --- /dev/null +++ b/vertx-proxy/src/test/resources/produce_request2.hex @@ -0,0 +1,12 @@ +00 00 00 B1 00 00 00 64 00 00 00 00 00 00 00 13 .......d........ +D7 33 0F 02 06 74 65 73 74 33 02 00 00 00 00 00 .3...test3...... +00 00 00 00 00 00 00 00 04 00 00 00 00 00 00 00 ................ +04 00 00 00 00 00 00 00 00 00 FF FF FF FF 74 00 ..............t. +00 00 00 00 00 00 03 00 00 00 67 00 00 00 00 02 ..........g..... +FD CF 72 6F 00 00 00 00 00 00 00 00 01 7F E5 A9 ..ro............ +DA 79 00 00 01 7F E5 A9 DA 79 FF FF FF FF FF FF .y.......y...... +FF FF FF FF FF FF FF FF 00 00 00 01 6A 00 00 00 ............j... +01 5E 68 6A 6B 68 68 68 68 68 68 68 68 68 68 68 .^hjkhhhhhhhhhhh +68 68 68 68 68 68 68 68 68 68 68 68 68 68 68 68 hhhhhhhhhhhhhhhh +68 68 68 68 68 68 68 68 68 68 68 68 68 68 68 68 hhhhhhhhhhhhhhhh +68 00 00 00 00 h....