Skip to content

Commit

Permalink
Google Java Format
Browse files Browse the repository at this point in the history
  • Loading branch information
github-actions committed Jan 15, 2021
1 parent 4443ac4 commit 65785c6
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 47 deletions.
25 changes: 9 additions & 16 deletions src/main/java/de/nerden/kafka/streams/MoreTransformers.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,16 @@ public static <K, V> TransformerSupplier<K, V, KeyValue<K, V>> Async(
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized,
Function<KeyValue<K, V>, CompletableFuture<KeyValue<K, V>>> fn,
Predicate<AsyncMessage<K, V>> retryDecider) {
return Async(materialized, fn, retryDecider,
DEFAULT_ASYNC_MAX_INFLIGHT,
DEFAULT_ASYNC_TIMEOUT_MS);
return Async(
materialized, fn, retryDecider, DEFAULT_ASYNC_MAX_INFLIGHT, DEFAULT_ASYNC_TIMEOUT_MS);
}

public static <K, V> TransformerSupplier<K, V, KeyValue<K, V>> Async(
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized,
Function<KeyValue<K, V>, CompletableFuture<KeyValue<K, V>>> fn,
Predicate<AsyncMessage<K, V>> retryDecider,
int maxInflight,
int timeoutMs
) {
int timeoutMs) {
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized);

Expand All @@ -51,8 +49,7 @@ public static <K, V> TransformerSupplier<K, V, KeyValue<K, V>> Async(
maxInflight,
timeoutMs,
fn,
retryDecider
);
retryDecider);
} else {
return new AsyncTransformerSupplier<>(
"async",
Expand All @@ -63,25 +60,21 @@ public static <K, V> TransformerSupplier<K, V, KeyValue<K, V>> Async(
maxInflight,
timeoutMs,
fn,
retryDecider
);
retryDecider);
}

}


public static <K, V> TransformerSupplier<K, V, KeyValue<K, List<V>>> Batch(
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return Batch(materialized, DEFAULT_MAX_BATCH_DURATION_MILLIS, DEFAULT_MAX_BATCH_SIZE_PER_KEY);
}

/**
* @param materialized Materialization information for the state stored used for
* batching.
* @param materialized Materialization information for the state stored used for batching.
* @param maxBatchDurationMillis Every time maxBatchDurationMillis passed, batches are released
* for all keys.
* @param maxBatchSizePerKey When matchBatchSizePerKey records have been collected for a
* specific key, the batch is forwarded be waited for.
* for all keys.
* @param maxBatchSizePerKey When matchBatchSizePerKey records have been collected for a specific
* key, the batch is forwarded be waited for.
* @return
*/
public static <K, V> TransformerSupplier<K, V, KeyValue<K, List<V>>> Batch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

public class AsyncTransformerSupplier<K, V> implements TransformerSupplier<K, V, KeyValue<K, V>> {


private final String storeName;
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
Expand Down Expand Up @@ -55,8 +54,8 @@ public AsyncTransformerSupplier(

@Override
public Transformer<K, V, KeyValue<K, V>> get() {
return new AsyncTransformer<>(fn, retryDecider, this.storeName, this.maxInflight,
this.timeoutMs);
return new AsyncTransformer<>(
fn, retryDecider, this.storeName, this.maxInflight, this.timeoutMs);
}

@Override
Expand All @@ -65,7 +64,8 @@ public Set<StoreBuilder<?>> stores() {
if (this.storeSupplier != null) {
builder =
Stores.keyValueStoreBuilder(
this.storeSupplier, Serdes.Long(),
this.storeSupplier,
Serdes.Long(),
new AsyncMessageSerde<>(this.keySerde, this.valueSerde));
} else {
builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,30 @@ class AsyncTransformerRetryTest {
public void setUp() {
StreamsBuilder bldr = new StreamsBuilder();
bldr.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
.transform(MoreTransformers.Async(
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("async")
.withLoggingDisabled()
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()),
kv ->
CompletableFuture.supplyAsync(
() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (retryNo > 0) {
return kv;
}
retryNo++;
throw new RuntimeException("random network fail");
}),
decider -> true, 1, 5000), Named.as("async-transform"))
.transform(
MoreTransformers.Async(
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("async")
.withLoggingDisabled()
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()),
kv ->
CompletableFuture.supplyAsync(
() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (retryNo > 0) {
return kv;
}
retryNo++;
throw new RuntimeException("random network fail");
}),
decider -> true,
1,
5000),
Named.as("async-transform"))
.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

Topology topology = bldr.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ class AsyncTransformerTest {
public void setUp() {
StreamsBuilder bldr = new StreamsBuilder();
bldr.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
.transform(MoreTransformers.Async(
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("async")
.withLoggingDisabled()
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()),
msg -> CompletableFuture.supplyAsync(() -> msg),
decider -> false), Named.as("async-transform"))
.transform(
MoreTransformers.Async(
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("async")
.withLoggingDisabled()
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()),
msg -> CompletableFuture.supplyAsync(() -> msg),
decider -> false),
Named.as("async-transform"))
.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

Topology topology = bldr.build();
Expand Down

0 comments on commit 65785c6

Please sign in to comment.