Skip to content

Commit

Permalink
add Async to MoreTransformers
Browse files Browse the repository at this point in the history
  • Loading branch information
birdayz committed Jan 15, 2021
1 parent b08cf59 commit 4443ac4
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 39 deletions.
65 changes: 61 additions & 4 deletions src/main/java/de/nerden/kafka/streams/MoreTransformers.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package de.nerden.kafka.streams;

import de.nerden.kafka.streams.processor.AsyncTransformer.AsyncMessage;
import de.nerden.kafka.streams.processor.AsyncTransformerSupplier;
import de.nerden.kafka.streams.processor.BatchTransformerSupplier;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Materialized;
Expand All @@ -14,17 +19,69 @@ public class MoreTransformers {
public static final long DEFAULT_MAX_BATCH_DURATION_MILLIS = 10000L;
public static final long DEFAULT_MAX_BATCH_SIZE_PER_KEY = 1000L;

public static final int DEFAULT_ASYNC_MAX_INFLIGHT = 10;
public static final int DEFAULT_ASYNC_TIMEOUT_MS = 10000;

public static <K, V> TransformerSupplier<K, V, KeyValue<K, V>> Async(
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized,
Function<KeyValue<K, V>, CompletableFuture<KeyValue<K, V>>> fn,
Predicate<AsyncMessage<K, V>> retryDecider) {
return Async(materialized, fn, retryDecider,
DEFAULT_ASYNC_MAX_INFLIGHT,
DEFAULT_ASYNC_TIMEOUT_MS);
}

public static <K, V> TransformerSupplier<K, V, KeyValue<K, V>> Async(
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized,
Function<KeyValue<K, V>, CompletableFuture<KeyValue<K, V>>> fn,
Predicate<AsyncMessage<K, V>> retryDecider,
int maxInflight,
int timeoutMs
) {
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized);

if (materializedInternal.storeName() != null) {
return new AsyncTransformerSupplier<>(
materializedInternal.storeName(),
materializedInternal.keySerde(),
materializedInternal.valueSerde(),
null,
materializedInternal.loggingEnabled(),
maxInflight,
timeoutMs,
fn,
retryDecider
);
} else {
return new AsyncTransformerSupplier<>(
"async",
materializedInternal.keySerde(),
materializedInternal.valueSerde(),
null,
materializedInternal.loggingEnabled(),
maxInflight,
timeoutMs,
fn,
retryDecider
);
}

}


public static <K, V> TransformerSupplier<K, V, KeyValue<K, List<V>>> Batch(
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return Batch(materialized, DEFAULT_MAX_BATCH_DURATION_MILLIS, DEFAULT_MAX_BATCH_SIZE_PER_KEY);
}

/**
* @param materialized Materialization information for the state stored used for batching.
* @param materialized Materialization information for the state stored used for
* batching.
* @param maxBatchDurationMillis Every time maxBatchDurationMillis passed, batches are released
* for all keys.
* @param maxBatchSizePerKey When matchBatchSizePerKey records have been collected for a specific
* key, the batch is forwarded be waited for.
* for all keys.
* @param maxBatchSizePerKey When matchBatchSizePerKey records have been collected for a
* specific key, the batch is forwarded be waited for.
* @return
*/
public static <K, V> TransformerSupplier<K, V, KeyValue<K, List<V>>> Batch(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package de.nerden.kafka.streams.processor;

import de.nerden.kafka.streams.processor.AsyncTransformer.AsyncMessage;
import de.nerden.kafka.streams.serde.AsyncMessageSerde;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

public class AsyncTransformerSupplier<K, V> implements TransformerSupplier<K, V, KeyValue<K, V>> {


private final String storeName;
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private final KeyValueBytesStoreSupplier storeSupplier;
private final boolean changeLoggingEnabled;
private final int maxInflight;
private final int timeoutMs;

private final Function<KeyValue<K, V>, CompletableFuture<KeyValue<K, V>>> fn;
private final Predicate<AsyncMessage<K, V>> retryDecider;

public AsyncTransformerSupplier(
String storeName,
Serde<K> keySerde,
Serde<V> valueSerde,
KeyValueBytesStoreSupplier storeSupplier,
boolean changeLoggingEnabled,
int maxInflight,
int timeoutMs,
Function<KeyValue<K, V>, CompletableFuture<KeyValue<K, V>>> fn,
Predicate<AsyncMessage<K, V>> retryDecider) {
this.storeName = storeName;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.storeSupplier = storeSupplier;
this.changeLoggingEnabled = changeLoggingEnabled;
this.maxInflight = maxInflight;
this.timeoutMs = timeoutMs;
this.fn = fn;
this.retryDecider = retryDecider;
}

@Override
public Transformer<K, V, KeyValue<K, V>> get() {
return new AsyncTransformer<>(fn, retryDecider, this.storeName, this.maxInflight,
this.timeoutMs);
}

@Override
public Set<StoreBuilder<?>> stores() {
StoreBuilder<KeyValueStore<Long, AsyncMessage<K, V>>> builder;
if (this.storeSupplier != null) {
builder =
Stores.keyValueStoreBuilder(
this.storeSupplier, Serdes.Long(),
new AsyncMessageSerde<>(this.keySerde, this.valueSerde));
} else {
builder =
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(storeName),
Serdes.Long(),
new AsyncMessageSerde<>(this.keySerde, this.valueSerde));
}

return Collections.singleton(
changeLoggingEnabled
? builder.withLoggingEnabled(Map.of())
: builder.withLoggingDisabled());
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
package de.nerden.kafka.streams.processor;

import com.google.common.truth.Truth;
import de.nerden.kafka.streams.MoreTransformers;
import de.nerden.kafka.streams.serde.AsyncMessageSerde;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -34,28 +38,26 @@ class AsyncTransformerRetryTest {
public void setUp() {
StreamsBuilder bldr = new StreamsBuilder();
bldr.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
.transform(
() ->
new AsyncTransformer<String, String>(
kv ->
CompletableFuture.supplyAsync(
() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (retryNo > 0) {
return kv;
}
retryNo++;
throw new RuntimeException("random network fail");
}),
(retryMessage) -> true,
"inflight",
1,
5000),
Named.as("async-transform"))
.transform(MoreTransformers.Async(
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("async")
.withLoggingDisabled()
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()),
kv ->
CompletableFuture.supplyAsync(
() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (retryNo > 0) {
return kv;
}
retryNo++;
throw new RuntimeException("random network fail");
}),
decider -> true, 1, 5000), Named.as("async-transform"))
.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

Topology topology = bldr.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package de.nerden.kafka.streams.processor;

import com.google.common.truth.Truth;
import de.nerden.kafka.streams.MoreTransformers;
import de.nerden.kafka.streams.serde.AsyncMessageSerde;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
Expand All @@ -30,19 +34,13 @@ class AsyncTransformerTest {
public void setUp() {
StreamsBuilder bldr = new StreamsBuilder();
bldr.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
.transform(
() ->
new AsyncTransformer<String, String>(
kv ->
CompletableFuture.supplyAsync(
() -> {
return kv;
}),
failed -> false,
"inflight",
1,
5000),
Named.as("async-transform"))
.transform(MoreTransformers.Async(
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("async")
.withLoggingDisabled()
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()),
msg -> CompletableFuture.supplyAsync(() -> msg),
decider -> false), Named.as("async-transform"))
.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

Topology topology = bldr.build();
Expand Down

0 comments on commit 4443ac4

Please sign in to comment.