Skip to content

Commit

Permalink
refactor(canary): add reason tag to beats
Browse files Browse the repository at this point in the history
  • Loading branch information
sauljabin committed Feb 22, 2025
1 parent 014b0f6 commit e79d3e1
Show file tree
Hide file tree
Showing 32 changed files with 424 additions and 195 deletions.
8 changes: 5 additions & 3 deletions canary/src/main/java/io/littlehorse/canary/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import io.littlehorse.canary.aggregator.Aggregator;
import io.littlehorse.canary.config.CanaryConfig;
import io.littlehorse.canary.config.ConfigLoader;
import io.littlehorse.canary.infra.ShutdownHook;
import io.littlehorse.canary.kafka.TopicCreator;
import io.littlehorse.canary.littlehorse.LHClient;
import io.littlehorse.canary.metronome.MetronomeGetWfRunExecutor;
import io.littlehorse.canary.metronome.MetronomeRunWfExecutor;
import io.littlehorse.canary.metronome.MetronomeWorker;
Expand All @@ -12,8 +14,6 @@
import io.littlehorse.canary.metronome.internal.LocalRepository;
import io.littlehorse.canary.prometheus.PrometheusExporter;
import io.littlehorse.canary.prometheus.PrometheusServerExporter;
import io.littlehorse.canary.util.LHClient;
import io.littlehorse.canary.util.ShutdownHook;
import io.littlehorse.sdk.common.config.LHConfig;
import java.io.IOException;
import java.nio.file.Paths;
Expand All @@ -38,6 +38,7 @@ public static void main(final String[] args) throws InterruptedException {
latch.await();
}

// TODO: refactor this code
private static void initialize(final String[] args) throws IOException {
// dependencies
final CanaryConfig canaryConfig = args.length > 0 ? ConfigLoader.load(Paths.get(args[0])) : ConfigLoader.load();
Expand Down Expand Up @@ -81,7 +82,8 @@ private static void initialize(final String[] args) throws IOException {

// register wf
if (canaryConfig.isWorkflowCreationEnabled()) {
new MetronomeWorkflow(lhClient, canaryConfig.getWorkflowName());
new MetronomeWorkflow(
lhClient, canaryConfig.getWorkflowName(), canaryConfig.getWorkflowRetention());
}

final LocalRepository repository = new LocalRepository(canaryConfig.getMetronomeDataPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import io.littlehorse.canary.aggregator.prometheus.MetricStoreExporter;
import io.littlehorse.canary.aggregator.topology.MetricsTopology;
import io.littlehorse.canary.util.ShutdownHook;
import io.littlehorse.canary.infra.ShutdownHook;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.littlehorse.canary.aggregator.prometheus;

import com.google.common.util.concurrent.AtomicDouble;
import io.littlehorse.canary.infra.ShutdownHook;
import io.littlehorse.canary.proto.MetricKey;
import io.littlehorse.canary.proto.MetricValue;
import io.littlehorse.canary.util.ShutdownHook;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -53,7 +53,7 @@ private static List<Tag> toMetricTags(final MetricKey key) {
}

private static String toMetricId(final MetricKey key, final String suffix) {
return "%s_%s".formatted(key.getId(), suffix);
return "%s_%s".formatted(key.getName(), suffix);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.littlehorse.canary.aggregator.topology;

import com.google.common.base.Strings;
import io.littlehorse.canary.aggregator.internal.BeatTimeExtractor;
import io.littlehorse.canary.aggregator.serdes.ProtobufSerdes;
import io.littlehorse.canary.proto.*;
Expand Down Expand Up @@ -39,9 +38,9 @@ public Topology toTopology() {

// build latency metric stream
final KStream<MetricKey, MetricValue> latencyMetricsStream = beatsStream
// remove GET_WF_RUN_EXHAUSTED_RETRIES
.filterNot(MetricsTopology::isExhaustedRetries)
// remove the id
// remove messages without latency
.filterNot(MetricsTopology::hasLatency)
// remove id
.groupBy(
MetricsTopology::removeWfId, Grouped.with(ProtobufSerdes.BeatKey(), ProtobufSerdes.BeatValue()))
// reset aggregator every minute
Expand All @@ -57,7 +56,7 @@ public Topology toTopology() {

// build count metric stream
final KStream<MetricKey, MetricValue> countMetricStream = beatsStream
// remove the id
// remove id
.groupBy(
MetricsTopology::removeWfId, Grouped.with(ProtobufSerdes.BeatKey(), ProtobufSerdes.BeatValue()))
// count all
Expand Down Expand Up @@ -112,8 +111,8 @@ private static MetricValue initializeMetricAggregator() {
return MetricValue.newBuilder().build();
}

private static boolean isExhaustedRetries(final BeatKey key, final BeatValue value) {
return key.getType().equals(BeatType.GET_WF_RUN_EXHAUSTED_RETRIES);
private static boolean hasLatency(final BeatKey key, final BeatValue value) {
return value.hasLatency();
}

private static boolean selectDuplicatedTaskRun(final BeatKey key, final Long value) {
Expand Down Expand Up @@ -159,8 +158,8 @@ private Materialized<BeatKey, Long, KeyValueStore<Bytes, byte[]>> initializeBeat
}

private static KeyValue<MetricKey, MetricValue> mapBeatToMetricCount(final BeatKey key, final Long count) {
final String metricIdPrefix = key.getType().toString().toLowerCase();
return KeyValue.pair(buildMetricKey(key, metricIdPrefix), mapLongToMetricValue(count));
final String metricNamePrefix = key.getType().toString().toLowerCase();
return KeyValue.pair(buildMetricKey(key, metricNamePrefix), mapLongToMetricValue(count));
}

private static Consumed<BeatKey, BeatValue> initializeSerdes() {
Expand All @@ -178,27 +177,19 @@ private Materialized<MetricKey, MetricValue, KeyValueStore<Bytes, byte[]>> initi

private static KeyValue<MetricKey, MetricValue> mapBeatToMetricLatency(
final BeatKey key, final AverageAggregator value) {
final String metricIdPrefix = key.getType().toString().toLowerCase();
return KeyValue.pair(buildMetricKey(key, metricIdPrefix), mapAvgToMetricValue(value.getAvg(), value.getMax()));
final String metricNamePrefix = key.getType().toString().toLowerCase();
return KeyValue.pair(
buildMetricKey(key, metricNamePrefix), mapAvgToMetricValue(value.getAvg(), value.getMax()));
}

private static MetricKey buildMetricKey(final BeatKey key, final String id) {
final MetricKey.Builder builder = MetricKey.newBuilder()
private static MetricKey buildMetricKey(final BeatKey key, final String name) {
return MetricKey.newBuilder()
.setServerVersion(key.getServerVersion())
.setServerPort(key.getServerPort())
.setServerHost(key.getServerHost())
.setId("canary_%s".formatted(id));

if (key.hasStatus() && !Strings.isNullOrEmpty(key.getStatus())) {
builder.addTags(
Tag.newBuilder().setKey("status").setValue(key.getStatus().toLowerCase()));
}

if (key.getTagsCount() > 0) {
builder.addAllTags(key.getTagsList());
}

return builder.build();
.setName("canary_%s".formatted(name))
.addAllTags(key.getTagsList())
.build();
}

private Materialized<BeatKey, AverageAggregator, WindowStore<Bytes, byte[]>> initializeLatencyStore(
Expand All @@ -219,7 +210,6 @@ private static BeatKey removeWfId(final BeatKey key, final BeatValue value) {
.setServerVersion(key.getServerVersion())
.setServerHost(key.getServerHost())
.setServerPort(key.getServerPort())
.setStatus(key.getStatus())
.addAllTags(key.getTagsList())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class CanaryConfig implements Config {
public static final String WORKFLOW_CREATION_ENABLE = "workflow.creation.enable";
public static final String WORKFLOW_VERSION = "workflow.version";
public static final String WORKFLOW_REVISION = "workflow.revision";
public static final String WORKFLOW_RETENTION_MS = "workflow.retention.ms";

public static final String METRONOME_ENABLE = "metronome.enable";
public static final String METRONOME_RUN_FREQUENCY_MS = "metronome.run.frequency.ms";
Expand Down Expand Up @@ -123,6 +124,10 @@ public Duration getAggregatorExportFrequency() {
return Duration.ofMillis(Long.parseLong(getConfig(AGGREGATOR_EXPORT_FREQUENCY_MS)));
}

public Duration getWorkflowRetention() {
return Duration.ofMillis(Long.parseLong(getConfig(WORKFLOW_RETENTION_MS)));
}

public Duration getMetronomeRunFrequency() {
return Duration.ofMillis(Long.parseLong(getConfig(METRONOME_RUN_FREQUENCY_MS)));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.littlehorse.canary.util;
package io.littlehorse.canary.infra;

import lombok.extern.slf4j.Slf4j;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.littlehorse.canary.kafka;

import io.littlehorse.canary.CanaryException;
import io.littlehorse.canary.util.ShutdownHook;
import io.littlehorse.canary.infra.ShutdownHook;
import java.time.Duration;
import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.littlehorse.canary.util;
package io.littlehorse.canary.littlehorse;

import static io.littlehorse.canary.metronome.MetronomeWorkflow.SAMPLE_ITERATION_VARIABLE;
import static io.littlehorse.canary.metronome.MetronomeWorkflow.START_TIME_VARIABLE;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package io.littlehorse.canary.metronome;

import static io.littlehorse.canary.metronome.MetronomeRunWfExecutor.GRPC_STATUS_PREFIX;

import com.google.protobuf.util.Timestamps;
import io.grpc.StatusRuntimeException;
import io.littlehorse.canary.infra.ShutdownHook;
import io.littlehorse.canary.littlehorse.LHClient;
import io.littlehorse.canary.metronome.internal.BeatProducer;
import io.littlehorse.canary.metronome.internal.LocalRepository;
import io.littlehorse.canary.metronome.model.Beat;
import io.littlehorse.canary.metronome.model.BeatStatus;
import io.littlehorse.canary.proto.Attempt;
import io.littlehorse.canary.proto.BeatStatus;
import io.littlehorse.canary.proto.BeatType;
import io.littlehorse.canary.util.LHClient;
import io.littlehorse.canary.util.ShutdownHook;
import io.littlehorse.sdk.common.proto.LHStatus;
import io.littlehorse.sdk.common.proto.WfRun;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
Expand All @@ -24,6 +22,7 @@

@Slf4j
public class MetronomeGetWfRunExecutor {
public static final String EXHAUSTED_RETRIES = "EXHAUSTED_RETRIES";
private final ScheduledExecutorService mainExecutor;
private final ExecutorService requestsExecutor;
private final BeatProducer producer;
Expand Down Expand Up @@ -66,31 +65,79 @@ private void scheduledRun() {
}

private void executeRun(final String id, final Attempt attempt) {
// exist if it gets exhausted
if (attempt.getAttempt() >= retries) {
repository.delete(id);
producer.sendFuture(id, BeatType.GET_WF_RUN_EXHAUSTED_RETRIES);
exhaustedRetries(id);
return;
}

// update retry number
updateAttempt(id, attempt);

final Instant start = Instant.now();
final WfRun currentStatus = getCurrentStatus(id, start);
final LHStatus status = getCurrentStatus(id);
final Instant end = Instant.now();

if (currentStatus == null) {
if (status == null) {
return;
}

producer.sendFuture(
id,
BeatType.GET_WF_RUN_REQUEST,
currentStatus.getStatus().name(),
Duration.between(start, Instant.now()));
log.debug("GetWfRun {} {}", id, status);

log.debug("GetWfRun {} {}", id, currentStatus.getStatus());
if (currentStatus.getStatus().equals(LHStatus.COMPLETED)) {
repository.delete(id);
// check if wf run was successful
if (status.equals(LHStatus.COMPLETED)) {
sendSuccessfulWfRun(id, status, Duration.between(start, end));
return;
}

// this status is not expected, send error
log.error("GetWfRun returns error {} {}", id, status);
sendErrorWfRun(id, status);
}

private void sendErrorWfRun(final String id, final LHStatus status) {
final BeatStatus beatStatus = BeatStatus.builder(BeatStatus.Code.ERROR)
.source(BeatStatus.Source.WORKFLOW)
.reason(status.name())
.build();

final Beat beat = Beat.builder(BeatType.GET_WF_RUN_REQUEST)
.id(id)
.status(beatStatus)
.build();

producer.send(beat);
}

private void sendSuccessfulWfRun(final String id, final LHStatus currentStatus, final Duration latency) {
final BeatStatus beatStatus = BeatStatus.builder(BeatStatus.Code.OK)
.source(BeatStatus.Source.WORKFLOW)
.reason(currentStatus.name())
.build();

final Beat beat = Beat.builder(BeatType.GET_WF_RUN_REQUEST)
.id(id)
.latency(latency)
.status(beatStatus)
.build();

producer.send(beat);
repository.delete(id);
}

private void exhaustedRetries(final String id) {
repository.delete(id);

final BeatStatus beatStatus = BeatStatus.builder(BeatStatus.Code.ERROR)
.reason(EXHAUSTED_RETRIES)
.build();

final Beat beat = Beat.builder(BeatType.GET_WF_RUN_REQUEST)
.id(id)
.status(beatStatus)
.build();

producer.send(beat);
}

private void updateAttempt(final String id, final Attempt attempt) {
Expand All @@ -103,20 +150,27 @@ private void updateAttempt(final String id, final Attempt attempt) {
.build());
}

private WfRun getCurrentStatus(final String id, final Instant start) {
private LHStatus getCurrentStatus(final String id) {
try {
return lhClient.getCanaryWfRun(id);
return lhClient.getCanaryWfRun(id).getStatus();
} catch (Exception e) {
log.error("Error executing getWfRun {}", e.getMessage(), e);

String status = BeatStatus.CLIENT_ERROR.name();
final BeatStatus.BeatStatusBuilder statusBuilder = BeatStatus.builder(BeatStatus.Code.ERROR)
.reason(e.getClass().getSimpleName());

if (e instanceof StatusRuntimeException statusException) {
status = GRPC_STATUS_PREFIX.formatted(
statusException.getStatus().getCode().name());
statusBuilder
.source(BeatStatus.Source.GRPC)
.reason(statusException.getStatus().getCode().name());
}

producer.sendFuture(id, BeatType.GET_WF_RUN_REQUEST, status, Duration.between(start, Instant.now()));
final Beat beat = Beat.builder(BeatType.GET_WF_RUN_REQUEST)
.id(id)
.status(statusBuilder.build())
.build();

producer.send(beat);
return null;
}
}
Expand Down
Loading

0 comments on commit e79d3e1

Please sign in to comment.