Skip to content

Commit

Permalink
feat(canary): add run wf latency and duplicated task metrics (#661)
Browse files Browse the repository at this point in the history
- Calculates any latency and duplicates tasks by lh cluster
- Using functional semantic
- Remove metrics if they are not present any more in the store
  • Loading branch information
sauljabin authored Feb 9, 2024
1 parent 2ff7599 commit 02a431e
Show file tree
Hide file tree
Showing 16 changed files with 501 additions and 234 deletions.
Original file line number Diff line number Diff line change
@@ -1,52 +1,42 @@
package io.littlehorse.canary.aggregator;

import static io.littlehorse.canary.aggregator.topology.MetricsTopology.METRICS_STORE;

import io.littlehorse.canary.Bootstrap;
import io.littlehorse.canary.aggregator.internal.BeatTimeExtractor;
import io.littlehorse.canary.aggregator.internal.LatencyMetricExporter;
import io.littlehorse.canary.aggregator.serdes.ProtobufSerdes;
import io.littlehorse.canary.aggregator.topology.LatencyTopology;
import io.littlehorse.canary.aggregator.topology.MetricsTopology;
import io.littlehorse.canary.config.CanaryConfig;
import io.littlehorse.canary.config.KafkaStreamsConfig;
import io.littlehorse.canary.proto.Beat;
import io.littlehorse.canary.proto.BeatKey;
import io.littlehorse.canary.prometheus.PrometheusMetricStoreExporter;
import io.littlehorse.canary.util.Shutdown;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics;
import io.micrometer.core.instrument.binder.system.DiskSpaceMetrics;
import java.io.File;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;

@Slf4j
public class AggregatorBootstrap extends Bootstrap implements MeterBinder {

private static final Consumed<BeatKey, Beat> SERDES = Consumed.with(ProtobufSerdes.BeatKey(), ProtobufSerdes.Beat())
.withTimestampExtractor(new BeatTimeExtractor());
private final KafkaStreams kafkaStreams;
private final KafkaStreamsConfig kafkaStreamsConfigMap;

public AggregatorBootstrap(final CanaryConfig config) {
super(config);

kafkaStreamsConfigMap = config.toKafkaStreamsConfig();
kafkaStreams = new KafkaStreams(
buildTopology(config.getTopicName()), new StreamsConfig(kafkaStreamsConfigMap.toMap()));

final MetricsTopology metricsTopology =
new MetricsTopology(config.getTopicName(), config.getAggregatorStoreRetentionMs());
kafkaStreams = new KafkaStreams(metricsTopology.toTopology(), new StreamsConfig(kafkaStreamsConfigMap.toMap()));
Shutdown.addShutdownHook("Aggregator Topology", kafkaStreams);

kafkaStreams.start();
log.trace("Initialized");
}

private static Topology buildTopology(final String metricsTopicName) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<BeatKey, Beat> metricStream = builder.stream(metricsTopicName, SERDES);
new LatencyTopology(metricStream);
return builder.build();
}

@Override
public void bindTo(final MeterRegistry registry) {
final KafkaStreamsMetrics kafkaStreamsMetrics = new KafkaStreamsMetrics(kafkaStreams);
Expand All @@ -56,7 +46,8 @@ public void bindTo(final MeterRegistry registry) {
final DiskSpaceMetrics diskSpaceMetrics = new DiskSpaceMetrics(new File(kafkaStreamsConfigMap.getStateDir()));
diskSpaceMetrics.bindTo(registry);

final LatencyMetricExporter latencyMetricExporter = new LatencyMetricExporter(kafkaStreams);
latencyMetricExporter.bindTo(registry);
final PrometheusMetricStoreExporter prometheusMetricStoreExporter =
new PrometheusMetricStoreExporter(kafkaStreams, METRICS_STORE);
prometheusMetricStoreExporter.bindTo(registry);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.littlehorse.canary.aggregator.topology;

import io.littlehorse.canary.aggregator.serdes.ProtobufSerdes;
import io.littlehorse.canary.proto.Beat;
import io.littlehorse.canary.proto.BeatKey;
import io.littlehorse.canary.proto.MetricKey;
import java.time.Duration;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;

@Getter
@Slf4j
public class DuplicatedTaskRunTopology {

private final KStream<MetricKey, Double> stream;

public DuplicatedTaskRunTopology(final KStream<BeatKey, Beat> mainStream, final Duration storeRetetion) {
stream = mainStream
.filter((key, value) -> value.hasTaskRunBeat())
.groupByKey()
// count all the records with the same idempotency key and attempt number
.count(Materialized.<BeatKey, Long, KeyValueStore<Bytes, byte[]>>with(
ProtobufSerdes.BeatKey(), Serdes.Long())
.withRetention(storeRetetion))
// filter by duplicated
.filter((key, value) -> value > 1L)
.toStream()
.mapValues((readOnlyKey, value) -> Double.valueOf(value))
// debug peek aggregate
.peek(DuplicatedTaskRunTopology::peekAggregate)
// re-key from task run to lh cluster
.groupBy((key, value) -> toMetricKey(key), Grouped.with(ProtobufSerdes.MetricKey(), Serdes.Double()))
// count how many task were duplicated
.count(Materialized.<MetricKey, Long, KeyValueStore<Bytes, byte[]>>with(
ProtobufSerdes.MetricKey(), Serdes.Long())
.withRetention(storeRetetion))
.mapValues((readOnlyKey, value) -> Double.valueOf(value))
.toStream();
}

private static MetricKey toMetricKey(final BeatKey key) {
return MetricKey.newBuilder()
.setId("duplicated_task_run_max_count")
.setServerHost(key.getServerHost())
.setServerPort(key.getServerPort())
.setServerVersion(key.getServerVersion())
.build();
}

private static void peekAggregate(final BeatKey key, final Double count) {
log.debug(
"server={}:{}, idempotency_key={}, attempt_number={}, count={}",
key.getServerHost(),
key.getServerPort(),
key.getTaskRunBeatKey().getIdempotencyKey(),
key.getTaskRunBeatKey().getAttemptNumber(),
count);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,40 @@
import io.littlehorse.canary.proto.MetricKey;
import java.time.Duration;
import java.util.List;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.WindowStore;

@Getter
@Slf4j
public class LatencyTopology {

public static final String LATENCY_METRICS_STORE = "latency-metrics";
private final KStream<MetricKey, Double> stream;

public LatencyTopology(final KStream<BeatKey, Beat> metricStream) {
metricStream
public LatencyTopology(
final KStream<BeatKey, Beat> mainStream, final TimeWindows windows, final Duration storeRetention) {
stream = mainStream
.filter((key, value) -> value.hasLatencyBeat())
.groupByKey()
// reset aggregator every minute
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1), Duration.ofSeconds(5)))
.windowedBy(windows)
// calculate average
.aggregate(
() -> AverageAggregator.newBuilder().build(),
(key, value, aggregate) -> aggregate(value, aggregate),
Materialized.<BeatKey, AverageAggregator, WindowStore<Bytes, byte[]>>as("latency-windows")
.withKeySerde(ProtobufSerdes.BeatKey())
.withValueSerde(ProtobufSerdes.AverageAggregator()))
.toStream()
// peek aggregate
.map((key, value) -> KeyValue.pair(key.key(), value))
.peek((key, value) -> peekAggregate(key, value))
Materialized.<BeatKey, AverageAggregator, WindowStore<Bytes, byte[]>>with(
ProtobufSerdes.BeatKey(), ProtobufSerdes.AverageAggregator())
.withRetention(storeRetention))
.toStream((key, value) -> key.key())
// debug peek aggregate
.peek(LatencyTopology::peekAggregate)
// extract metrics
.flatMap((key, value) -> makeMetrics(key, value))
// create store
.toTable(
Named.as(LATENCY_METRICS_STORE),
Materialized.<MetricKey, Double, KeyValueStore<Bytes, byte[]>>as(LATENCY_METRICS_STORE)
.withKeySerde(ProtobufSerdes.MetricKey())
.withValueSerde(Serdes.Double()));
.flatMap(LatencyTopology::makeMetrics);
}

private static List<KeyValue<MetricKey, Double>> makeMetrics(final BeatKey key, final AverageAggregator value) {
Expand Down Expand Up @@ -80,9 +74,7 @@ private static AverageAggregator aggregate(final Beat value, final AverageAggreg
final int count = aggregate.getCount() + 1;
final double sum = aggregate.getSum() + value.getLatencyBeat().getLatency();
final double avg = sum / count;
final double max = value.getLatencyBeat().getLatency() > aggregate.getMax()
? value.getLatencyBeat().getLatency()
: aggregate.getMax();
final double max = Math.max(value.getLatencyBeat().getLatency(), aggregate.getMax());

return AverageAggregator.newBuilder()
.setCount(count)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.littlehorse.canary.aggregator.topology;

import io.littlehorse.canary.aggregator.internal.BeatTimeExtractor;
import io.littlehorse.canary.aggregator.serdes.ProtobufSerdes;
import io.littlehorse.canary.proto.Beat;
import io.littlehorse.canary.proto.BeatKey;
import io.littlehorse.canary.proto.MetricKey;
import java.time.Duration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.KeyValueStore;

public class MetricsTopology {

private static final Consumed<BeatKey, Beat> BEATS_SERDES = Consumed.with(
ProtobufSerdes.BeatKey(), ProtobufSerdes.Beat())
.withTimestampExtractor(new BeatTimeExtractor());

public static final String METRICS_STORE = "metrics";
private final StreamsBuilder streamsBuilder;

public MetricsTopology(final String inputTopic, final long storeRetention) {
streamsBuilder = new StreamsBuilder();

final KStream<BeatKey, Beat> beatsStream = streamsBuilder.stream(inputTopic, BEATS_SERDES);

final LatencyTopology latencyTopology = new LatencyTopology(
beatsStream,
TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1), Duration.ofSeconds(5)),
Duration.ofMillis(storeRetention));
final DuplicatedTaskRunTopology taskRunTopology =
new DuplicatedTaskRunTopology(beatsStream, Duration.ofMillis(storeRetention));

latencyTopology
.getStream()
.merge(taskRunTopology.getStream())
.toTable(Materialized.<MetricKey, Double, KeyValueStore<Bytes, byte[]>>as(METRICS_STORE)
.withKeySerde(ProtobufSerdes.MetricKey())
.withValueSerde(Serdes.Double())
.withRetention(Duration.ofMillis(storeRetention)));
}

public Topology toTopology() {
return streamsBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class CanaryConfig implements Config {
public static final String METRICS_PORT = "metrics.port";
public static final String METRICS_PATH = "metrics.path";
public static final String METRICS_FILTER_ENABLE = "metrics.filter.enable";
public static final String METRONOME_ACTIVE_MODE_ENABLE = "metronome.active.mode.enable";
public static final String AGGREGATOR_STORE_RETENTION_MS = "aggregator.store.retention.ms";
private final Map<String, Object> configs;

public CanaryConfig(final Map<String, Object> configs) {
Expand Down Expand Up @@ -94,14 +96,22 @@ public boolean isMetronomeEnabled() {
return Boolean.parseBoolean(getConfig(METRONOME_ENABLE));
}

public boolean isMetronomeActiveModeEnabled() {
return Boolean.parseBoolean(getConfig(METRONOME_ACTIVE_MODE_ENABLE));
}

public boolean isAggregatorEnabled() {
return Boolean.parseBoolean(getConfig(AGGREGATOR_ENABLE));
}

public long getMetronomeFrequency() {
public long getMetronomeFrequencyMs() {
return Long.parseLong(getConfig(METRONOME_FREQUENCY_MS));
}

public long getAggregatorStoreRetentionMs() {
return Long.parseLong(getConfig(AGGREGATOR_STORE_RETENTION_MS));
}

public int getMetronomeThreads() {
return Integer.parseInt(getConfig(METRONOME_THREADS));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ public KafkaTopicBootstrap(final CanaryConfig config) {
Shutdown.addShutdownHook("Topics Creator", adminClient);

try {
final NewTopic canaryTopic =
new NewTopic(config.getTopicName(), config.getTopicPartitions(), config.getTopicReplicas());

adminClient.createTopics(List.of(canaryTopic)).all().get(1, TimeUnit.SECONDS);
adminClient
.createTopics(List.of(new NewTopic(
config.getTopicName(), config.getTopicPartitions(), config.getTopicReplicas())))
.all()
.get(1, TimeUnit.SECONDS);
log.info("Topics {} created", config.getTopicName());
} catch (Exception e) {
if (e.getCause() instanceof TopicExistsException) {
Expand Down
Loading

0 comments on commit 02a431e

Please sign in to comment.