From 4443ac460f9bb5b9c0c431f5bc05835062b3f2d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Br=C3=BCderl?= Date: Sat, 16 Jan 2021 00:40:04 +0100 Subject: [PATCH] add Async to MoreTransformers --- .../kafka/streams/MoreTransformers.java | 65 ++++++++++++++- .../processor/AsyncTransformerSupplier.java | 83 +++++++++++++++++++ .../processor/AsyncTransformerRetryTest.java | 46 +++++----- .../processor/AsyncTransformerTest.java | 24 +++--- 4 files changed, 179 insertions(+), 39 deletions(-) create mode 100644 src/main/java/de/nerden/kafka/streams/processor/AsyncTransformerSupplier.java diff --git a/src/main/java/de/nerden/kafka/streams/MoreTransformers.java b/src/main/java/de/nerden/kafka/streams/MoreTransformers.java index 2df90f0..275534f 100644 --- a/src/main/java/de/nerden/kafka/streams/MoreTransformers.java +++ b/src/main/java/de/nerden/kafka/streams/MoreTransformers.java @@ -1,7 +1,12 @@ package de.nerden.kafka.streams; +import de.nerden.kafka.streams.processor.AsyncTransformer.AsyncMessage; +import de.nerden.kafka.streams.processor.AsyncTransformerSupplier; import de.nerden.kafka.streams.processor.BatchTransformerSupplier; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.function.Predicate; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Materialized; @@ -14,17 +19,69 @@ public class MoreTransformers { public static final long DEFAULT_MAX_BATCH_DURATION_MILLIS = 10000L; public static final long DEFAULT_MAX_BATCH_SIZE_PER_KEY = 1000L; + public static final int DEFAULT_ASYNC_MAX_INFLIGHT = 10; + public static final int DEFAULT_ASYNC_TIMEOUT_MS = 10000; + + public static TransformerSupplier> Async( + Materialized> materialized, + Function, CompletableFuture>> fn, + Predicate> retryDecider) { + return Async(materialized, fn, retryDecider, + DEFAULT_ASYNC_MAX_INFLIGHT, + DEFAULT_ASYNC_TIMEOUT_MS); + } + + public static TransformerSupplier> Async( + Materialized> materialized, + Function, CompletableFuture>> fn, + Predicate> retryDecider, + int maxInflight, + int timeoutMs + ) { + final MaterializedInternal> materializedInternal = + new MaterializedInternal<>(materialized); + + if (materializedInternal.storeName() != null) { + return new AsyncTransformerSupplier<>( + materializedInternal.storeName(), + materializedInternal.keySerde(), + materializedInternal.valueSerde(), + null, + materializedInternal.loggingEnabled(), + maxInflight, + timeoutMs, + fn, + retryDecider + ); + } else { + return new AsyncTransformerSupplier<>( + "async", + materializedInternal.keySerde(), + materializedInternal.valueSerde(), + null, + materializedInternal.loggingEnabled(), + maxInflight, + timeoutMs, + fn, + retryDecider + ); + } + + } + + public static TransformerSupplier>> Batch( Materialized> materialized) { return Batch(materialized, DEFAULT_MAX_BATCH_DURATION_MILLIS, DEFAULT_MAX_BATCH_SIZE_PER_KEY); } /** - * @param materialized Materialization information for the state stored used for batching. + * @param materialized Materialization information for the state stored used for + * batching. * @param maxBatchDurationMillis Every time maxBatchDurationMillis passed, batches are released - * for all keys. - * @param maxBatchSizePerKey When matchBatchSizePerKey records have been collected for a specific - * key, the batch is forwarded be waited for. + * for all keys. + * @param maxBatchSizePerKey When matchBatchSizePerKey records have been collected for a + * specific key, the batch is forwarded be waited for. * @return */ public static TransformerSupplier>> Batch( diff --git a/src/main/java/de/nerden/kafka/streams/processor/AsyncTransformerSupplier.java b/src/main/java/de/nerden/kafka/streams/processor/AsyncTransformerSupplier.java new file mode 100644 index 0000000..550f3d7 --- /dev/null +++ b/src/main/java/de/nerden/kafka/streams/processor/AsyncTransformerSupplier.java @@ -0,0 +1,83 @@ +package de.nerden.kafka.streams.processor; + +import de.nerden.kafka.streams.processor.AsyncTransformer.AsyncMessage; +import de.nerden.kafka.streams.serde.AsyncMessageSerde; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.function.Predicate; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.TransformerSupplier; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; + +public class AsyncTransformerSupplier implements TransformerSupplier> { + + + private final String storeName; + private final Serde keySerde; + private final Serde valueSerde; + private final KeyValueBytesStoreSupplier storeSupplier; + private final boolean changeLoggingEnabled; + private final int maxInflight; + private final int timeoutMs; + + private final Function, CompletableFuture>> fn; + private final Predicate> retryDecider; + + public AsyncTransformerSupplier( + String storeName, + Serde keySerde, + Serde valueSerde, + KeyValueBytesStoreSupplier storeSupplier, + boolean changeLoggingEnabled, + int maxInflight, + int timeoutMs, + Function, CompletableFuture>> fn, + Predicate> retryDecider) { + this.storeName = storeName; + this.keySerde = keySerde; + this.valueSerde = valueSerde; + this.storeSupplier = storeSupplier; + this.changeLoggingEnabled = changeLoggingEnabled; + this.maxInflight = maxInflight; + this.timeoutMs = timeoutMs; + this.fn = fn; + this.retryDecider = retryDecider; + } + + @Override + public Transformer> get() { + return new AsyncTransformer<>(fn, retryDecider, this.storeName, this.maxInflight, + this.timeoutMs); + } + + @Override + public Set> stores() { + StoreBuilder>> builder; + if (this.storeSupplier != null) { + builder = + Stores.keyValueStoreBuilder( + this.storeSupplier, Serdes.Long(), + new AsyncMessageSerde<>(this.keySerde, this.valueSerde)); + } else { + builder = + Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore(storeName), + Serdes.Long(), + new AsyncMessageSerde<>(this.keySerde, this.valueSerde)); + } + + return Collections.singleton( + changeLoggingEnabled + ? builder.withLoggingEnabled(Map.of()) + : builder.withLoggingDisabled()); + } +} diff --git a/src/test/java/de/nerden/kafka/streams/processor/AsyncTransformerRetryTest.java b/src/test/java/de/nerden/kafka/streams/processor/AsyncTransformerRetryTest.java index 85badb1..b1418c9 100644 --- a/src/test/java/de/nerden/kafka/streams/processor/AsyncTransformerRetryTest.java +++ b/src/test/java/de/nerden/kafka/streams/processor/AsyncTransformerRetryTest.java @@ -1,12 +1,14 @@ package de.nerden.kafka.streams.processor; import com.google.common.truth.Truth; +import de.nerden.kafka.streams.MoreTransformers; import de.nerden.kafka.streams.serde.AsyncMessageSerde; import java.time.Duration; import java.util.List; import java.util.Properties; import java.util.concurrent.CompletableFuture; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.TestInputTopic; @@ -14,8 +16,10 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -34,28 +38,26 @@ class AsyncTransformerRetryTest { public void setUp() { StreamsBuilder bldr = new StreamsBuilder(); bldr.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) - .transform( - () -> - new AsyncTransformer( - kv -> - CompletableFuture.supplyAsync( - () -> { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - e.printStackTrace(); - } - if (retryNo > 0) { - return kv; - } - retryNo++; - throw new RuntimeException("random network fail"); - }), - (retryMessage) -> true, - "inflight", - 1, - 5000), - Named.as("async-transform")) + .transform(MoreTransformers.Async( + Materialized.>as("async") + .withLoggingDisabled() + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String()), + kv -> + CompletableFuture.supplyAsync( + () -> { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (retryNo > 0) { + return kv; + } + retryNo++; + throw new RuntimeException("random network fail"); + }), + decider -> true, 1, 5000), Named.as("async-transform")) .to("output-topic", Produced.with(Serdes.String(), Serdes.String())); Topology topology = bldr.build(); diff --git a/src/test/java/de/nerden/kafka/streams/processor/AsyncTransformerTest.java b/src/test/java/de/nerden/kafka/streams/processor/AsyncTransformerTest.java index d7dfeeb..473c76e 100644 --- a/src/test/java/de/nerden/kafka/streams/processor/AsyncTransformerTest.java +++ b/src/test/java/de/nerden/kafka/streams/processor/AsyncTransformerTest.java @@ -1,11 +1,13 @@ package de.nerden.kafka.streams.processor; import com.google.common.truth.Truth; +import de.nerden.kafka.streams.MoreTransformers; import de.nerden.kafka.streams.serde.AsyncMessageSerde; import java.time.Duration; import java.util.Properties; import java.util.concurrent.CompletableFuture; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.TestInputTopic; @@ -13,8 +15,10 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -30,19 +34,13 @@ class AsyncTransformerTest { public void setUp() { StreamsBuilder bldr = new StreamsBuilder(); bldr.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) - .transform( - () -> - new AsyncTransformer( - kv -> - CompletableFuture.supplyAsync( - () -> { - return kv; - }), - failed -> false, - "inflight", - 1, - 5000), - Named.as("async-transform")) + .transform(MoreTransformers.Async( + Materialized.>as("async") + .withLoggingDisabled() + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String()), + msg -> CompletableFuture.supplyAsync(() -> msg), + decider -> false), Named.as("async-transform")) .to("output-topic", Produced.with(Serdes.String(), Serdes.String())); Topology topology = bldr.build();