Skip to content

Commit

Permalink
Merge branch 'master' into fix/server-concurrent-modification-exception
Browse files Browse the repository at this point in the history
  • Loading branch information
sauljabin authored Feb 25, 2025
2 parents a9927e6 + a11ac46 commit 1262ea9
Show file tree
Hide file tree
Showing 52 changed files with 1,002 additions and 609 deletions.
8 changes: 5 additions & 3 deletions canary/canary.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ lh.canary.aggregator.enable=true
lh.canary.metronome.enable=true
lh.canary.metronome.worker.enable=true
lh.canary.metronome.frequency.ms=1000
lh.canary.metronome.run.threads=1
lh.canary.metronome.run.requests=300
lh.canary.metronome.run.sample.rate=1
lh.canary.metronome.run.threads=10
lh.canary.metronome.run.requests=1
lh.canary.metronome.run.sample.percentage=100
lh.canary.metronome.get.retries=3
lh.canary.aggregator.export.frequency.ms=1000
175 changes: 95 additions & 80 deletions canary/src/main/java/io/littlehorse/canary/Main.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package io.littlehorse.canary;

import io.javalin.http.HandlerType;
import io.littlehorse.canary.aggregator.Aggregator;
import io.littlehorse.canary.config.CanaryConfig;
import io.littlehorse.canary.config.ConfigLoader;
import io.littlehorse.canary.infra.ShutdownHook;
import io.littlehorse.canary.infra.WebServer;
import io.littlehorse.canary.kafka.TopicCreator;
import io.littlehorse.canary.littlehorse.LHClient;
import io.littlehorse.canary.metronome.MetronomeGetWfRunExecutor;
import io.littlehorse.canary.metronome.MetronomeRunWfExecutor;
import io.littlehorse.canary.metronome.MetronomeWorker;
import io.littlehorse.canary.metronome.MetronomeWorkflow;
import io.littlehorse.canary.metronome.internal.BeatProducer;
import io.littlehorse.canary.metronome.internal.LocalRepository;
import io.littlehorse.canary.prometheus.PrometheusExporter;
import io.littlehorse.canary.prometheus.PrometheusServerExporter;
import io.littlehorse.canary.util.LHClient;
import io.littlehorse.canary.util.ShutdownHook;
import io.littlehorse.sdk.common.config.LHConfig;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand All @@ -27,7 +27,13 @@ public class Main {

public static void main(final String[] args) throws InterruptedException {
try {
initialize(args);
final CanaryConfig config = args.length > 0 ? ConfigLoader.load(Paths.get(args[0])) : ConfigLoader.load();
final PrometheusExporter exporter = new PrometheusExporter(config.getCommonTags());

maybeCreateTopics(config);
maybeStartMetronome(config);
maybeStartAggregator(config, exporter);
startWebServer(config, exporter);
} catch (Exception e) {
log.error("Error starting application", e);
System.exit(-1);
Expand All @@ -38,83 +44,92 @@ public static void main(final String[] args) throws InterruptedException {
latch.await();
}

private static void initialize(final String[] args) throws IOException {
// dependencies
final CanaryConfig canaryConfig = args.length > 0 ? ConfigLoader.load(Paths.get(args[0])) : ConfigLoader.load();
private static void maybeStartMetronome(final CanaryConfig config) {
if (!config.isMetronomeEnabled() && !config.isMetronomeWorkerEnabled()) return;

final PrometheusExporter prometheusExporter = new PrometheusExporter(canaryConfig.getCommonTags());
// create topics
if (canaryConfig.isTopicCreationEnabled()) {
final NewTopic topic = new NewTopic(
canaryConfig.getTopicName(), canaryConfig.getTopicPartitions(), canaryConfig.getTopicReplicas());
final LHConfig lhConfig = new LHConfig(config.toLittleHorseConfig().toMap());

new TopicCreator(
canaryConfig.toKafkaConfig().toMap(), canaryConfig.getTopicCreationTimeout(), List.of(topic));
}
final boolean metronomeOrWorkerEnabled =
canaryConfig.isMetronomeEnabled() || canaryConfig.isMetronomeWorkerEnabled();

if (metronomeOrWorkerEnabled) {
final LHConfig lhConfig =
new LHConfig(canaryConfig.toLittleHorseConfig().toMap());
final LHClient lhClient = new LHClient(
lhConfig,
canaryConfig.getWorkflowName(),
canaryConfig.getWorkflowVersion(),
canaryConfig.getWorkflowRevision());
prometheusExporter.addMeasurable(lhClient);
final BeatProducer producer = new BeatProducer(
lhConfig.getApiBootstrapHost(),
lhConfig.getApiBootstrapPort(),
lhClient.getServerVersion(),
canaryConfig.getTopicName(),
canaryConfig.toKafkaConfig().toMap(),
canaryConfig.getMetronomeBeatExtraTags());

// start worker
if (canaryConfig.isMetronomeWorkerEnabled()) {
new MetronomeWorker(producer, lhConfig);
}

// start metronome client
if (canaryConfig.isMetronomeEnabled()) {

// register wf
if (canaryConfig.isWorkflowCreationEnabled()) {
new MetronomeWorkflow(lhClient, canaryConfig.getWorkflowName());
}

final LocalRepository repository = new LocalRepository(canaryConfig.getMetronomeDataPath());

new MetronomeRunWfExecutor(
producer,
lhClient,
canaryConfig.getMetronomeRunFrequency(),
canaryConfig.getMetronomeRunThreads(),
canaryConfig.getMetronomeRunRequests(),
canaryConfig.getMetronomeSampleRate(),
repository);

new MetronomeGetWfRunExecutor(
producer,
lhClient,
canaryConfig.getMetronomeGetFrequency(),
canaryConfig.getMetronomeGetThreads(),
canaryConfig.getMetronomeGetRetries(),
repository);
}
}
final LHClient lhClient = new LHClient(
lhConfig, config.getWorkflowName(), config.getWorkflowVersion(), config.getWorkflowRevision());

// start the aggregator
if (canaryConfig.isAggregatorEnabled()) {
new PrometheusServerExporter(
canaryConfig.getMetricsPort(), canaryConfig.getMetricsPath(), prometheusExporter);
final Aggregator aggregator = new Aggregator(
canaryConfig.toKafkaConfig().toMap(),
canaryConfig.getTopicName(),
canaryConfig.getAggregatorStoreRetention(),
canaryConfig.getAggregatorExportFrequency());
prometheusExporter.addMeasurable(aggregator);
}
final BeatProducer producer = new BeatProducer(
lhConfig.getApiBootstrapHost(),
lhConfig.getApiBootstrapPort(),
lhClient.getServerVersion(),
config.getTopicName(),
config.toKafkaConfig().toMap(),
config.getMetronomeBeatExtraTags());

maybeStartMetronomeWorker(config, producer, lhConfig);
maybeRegisterWorkflow(config, lhClient);
maybeStartMetronomeExecutors(config, producer, lhClient);
}

private static void maybeStartMetronomeExecutors(
final CanaryConfig config, final BeatProducer producer, final LHClient lhClient) {
if (!config.isMetronomeEnabled()) return;

final LocalRepository repository = new LocalRepository(config.getMetronomeDataPath());

final MetronomeRunWfExecutor runWfExecutor = new MetronomeRunWfExecutor(
producer,
lhClient,
config.getMetronomeRunFrequency(),
config.getMetronomeRunThreads(),
config.getMetronomeRunRequests(),
config.getMetronomeSamplePercentage(),
repository);
runWfExecutor.start();

final MetronomeGetWfRunExecutor getWfRunExecutor = new MetronomeGetWfRunExecutor(
producer, lhClient, config.getMetronomeGetFrequency(), config.getMetronomeGetRetries(), repository);
getWfRunExecutor.start();
}

private static void maybeRegisterWorkflow(final CanaryConfig config, final LHClient lhClient) {
if (!config.isWorkflowCreationEnabled()) return;

final MetronomeWorkflow workflow =
new MetronomeWorkflow(lhClient, config.getWorkflowName(), config.getWorkflowRetention());
workflow.register();
}

private static void maybeStartMetronomeWorker(
final CanaryConfig config, final BeatProducer producer, final LHConfig lhConfig) {
if (!config.isMetronomeWorkerEnabled()) return;

final MetronomeWorker worker = new MetronomeWorker(producer, lhConfig);
worker.start();
}

private static void startWebServer(final CanaryConfig config, final PrometheusExporter prometheusExporter) {
final WebServer webServer = new WebServer(config.getMetricsPort());
webServer.addHandler(HandlerType.GET, config.getMetricsPath(), prometheusExporter);
webServer.start();
}

private static void maybeStartAggregator(final CanaryConfig config, final PrometheusExporter prometheusExporter) {
if (!config.isAggregatorEnabled()) return;

final Aggregator aggregator = new Aggregator(
config.toKafkaConfig().toMap(),
config.getTopicName(),
config.getAggregatorStoreRetention(),
config.getAggregatorExportFrequency());

prometheusExporter.addMeasurable(aggregator);
aggregator.start();
}

private static void maybeCreateTopics(final CanaryConfig config) {
if (!config.isTopicCreationEnabled()) return;

final NewTopic topic =
new NewTopic(config.getTopicName(), config.getTopicPartitions(), config.getTopicReplicas());

final TopicCreator topicCreator =
new TopicCreator(config.toKafkaConfig().toMap(), config.getTopicCreationTimeout());

topicCreator.create(List.of(topic));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import io.littlehorse.canary.aggregator.prometheus.MetricStoreExporter;
import io.littlehorse.canary.aggregator.topology.MetricsTopology;
import io.littlehorse.canary.util.ShutdownHook;
import io.littlehorse.canary.infra.ShutdownHook;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import java.time.Duration;
Expand All @@ -29,8 +29,10 @@ public Aggregator(

kafkaStreams = new KafkaStreams(metricsTopology.toTopology(), new StreamsConfig(kafkaStreamsConfig));
ShutdownHook.add("Aggregator Topology", kafkaStreams);
kafkaStreams.start();
}

public void start() {
kafkaStreams.start();
log.info("Aggregator Started");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.littlehorse.canary.aggregator.prometheus;

import com.google.common.util.concurrent.AtomicDouble;
import io.littlehorse.canary.infra.ShutdownHook;
import io.littlehorse.canary.proto.MetricKey;
import io.littlehorse.canary.proto.MetricValue;
import io.littlehorse.canary.util.ShutdownHook;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -53,7 +53,7 @@ private static List<Tag> toMetricTags(final MetricKey key) {
}

private static String toMetricId(final MetricKey key, final String suffix) {
return "%s_%s".formatted(key.getId(), suffix);
return "%s_%s".formatted(key.getName(), suffix);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.littlehorse.canary.aggregator.topology;

import com.google.common.base.Strings;
import io.littlehorse.canary.aggregator.internal.BeatTimeExtractor;
import io.littlehorse.canary.aggregator.serdes.ProtobufSerdes;
import io.littlehorse.canary.proto.*;
Expand Down Expand Up @@ -39,9 +38,9 @@ public Topology toTopology() {

// build latency metric stream
final KStream<MetricKey, MetricValue> latencyMetricsStream = beatsStream
// remove GET_WF_RUN_EXHAUSTED_RETRIES
.filterNot(MetricsTopology::isExhaustedRetries)
// remove the id
// remove messages without latency
.filter(MetricsTopology::hasLatency)
// remove id
.groupBy(
MetricsTopology::removeWfId, Grouped.with(ProtobufSerdes.BeatKey(), ProtobufSerdes.BeatValue()))
// reset aggregator every minute
Expand All @@ -57,7 +56,7 @@ public Topology toTopology() {

// build count metric stream
final KStream<MetricKey, MetricValue> countMetricStream = beatsStream
// remove the id
// remove id
.groupBy(
MetricsTopology::removeWfId, Grouped.with(ProtobufSerdes.BeatKey(), ProtobufSerdes.BeatValue()))
// count all
Expand Down Expand Up @@ -112,8 +111,8 @@ private static MetricValue initializeMetricAggregator() {
return MetricValue.newBuilder().build();
}

private static boolean isExhaustedRetries(final BeatKey key, final BeatValue value) {
return key.getType().equals(BeatType.GET_WF_RUN_EXHAUSTED_RETRIES);
private static boolean hasLatency(final BeatKey key, final BeatValue value) {
return value.hasLatency();
}

private static boolean selectDuplicatedTaskRun(final BeatKey key, final Long value) {
Expand Down Expand Up @@ -159,8 +158,8 @@ private Materialized<BeatKey, Long, KeyValueStore<Bytes, byte[]>> initializeBeat
}

private static KeyValue<MetricKey, MetricValue> mapBeatToMetricCount(final BeatKey key, final Long count) {
final String metricIdPrefix = key.getType().toString().toLowerCase();
return KeyValue.pair(buildMetricKey(key, metricIdPrefix), mapLongToMetricValue(count));
final String metricNamePrefix = key.getType().toString().toLowerCase();
return KeyValue.pair(buildMetricKey(key, metricNamePrefix), mapLongToMetricValue(count));
}

private static Consumed<BeatKey, BeatValue> initializeSerdes() {
Expand All @@ -178,27 +177,19 @@ private Materialized<MetricKey, MetricValue, KeyValueStore<Bytes, byte[]>> initi

private static KeyValue<MetricKey, MetricValue> mapBeatToMetricLatency(
final BeatKey key, final AverageAggregator value) {
final String metricIdPrefix = key.getType().toString().toLowerCase();
return KeyValue.pair(buildMetricKey(key, metricIdPrefix), mapAvgToMetricValue(value.getAvg(), value.getMax()));
final String metricNamePrefix = key.getType().toString().toLowerCase();
return KeyValue.pair(
buildMetricKey(key, metricNamePrefix), mapAvgToMetricValue(value.getAvg(), value.getMax()));
}

private static MetricKey buildMetricKey(final BeatKey key, final String id) {
final MetricKey.Builder builder = MetricKey.newBuilder()
private static MetricKey buildMetricKey(final BeatKey key, final String name) {
return MetricKey.newBuilder()
.setServerVersion(key.getServerVersion())
.setServerPort(key.getServerPort())
.setServerHost(key.getServerHost())
.setId("canary_%s".formatted(id));

if (key.hasStatus() && !Strings.isNullOrEmpty(key.getStatus())) {
builder.addTags(
Tag.newBuilder().setKey("status").setValue(key.getStatus().toLowerCase()));
}

if (key.getTagsCount() > 0) {
builder.addAllTags(key.getTagsList());
}

return builder.build();
.setName("canary_%s".formatted(name))
.addAllTags(key.getTagsList())
.build();
}

private Materialized<BeatKey, AverageAggregator, WindowStore<Bytes, byte[]>> initializeLatencyStore(
Expand All @@ -219,7 +210,6 @@ private static BeatKey removeWfId(final BeatKey key, final BeatValue value) {
.setServerVersion(key.getServerVersion())
.setServerHost(key.getServerHost())
.setServerPort(key.getServerPort())
.setStatus(key.getStatus())
.addAllTags(key.getTagsList())
.build();
}
Expand Down
Loading

0 comments on commit 1262ea9

Please sign in to comment.