diff --git a/canary/canary.properties b/canary/canary.properties index 64d6086db..c9d79fd8d 100644 --- a/canary/canary.properties +++ b/canary/canary.properties @@ -11,6 +11,8 @@ lh.canary.aggregator.enable=true lh.canary.metronome.enable=true lh.canary.metronome.worker.enable=true lh.canary.metronome.frequency.ms=1000 -lh.canary.metronome.run.threads=1 -lh.canary.metronome.run.requests=300 -lh.canary.metronome.run.sample.rate=1 +lh.canary.metronome.run.threads=10 +lh.canary.metronome.run.requests=1 +lh.canary.metronome.run.sample.percentage=100 +lh.canary.metronome.get.retries=3 +lh.canary.aggregator.export.frequency.ms=1000 diff --git a/canary/src/main/java/io/littlehorse/canary/Main.java b/canary/src/main/java/io/littlehorse/canary/Main.java index f5b2d9ad9..2d6113c6b 100644 --- a/canary/src/main/java/io/littlehorse/canary/Main.java +++ b/canary/src/main/java/io/littlehorse/canary/Main.java @@ -1,9 +1,13 @@ package io.littlehorse.canary; +import io.javalin.http.HandlerType; import io.littlehorse.canary.aggregator.Aggregator; import io.littlehorse.canary.config.CanaryConfig; import io.littlehorse.canary.config.ConfigLoader; +import io.littlehorse.canary.infra.ShutdownHook; +import io.littlehorse.canary.infra.WebServer; import io.littlehorse.canary.kafka.TopicCreator; +import io.littlehorse.canary.littlehorse.LHClient; import io.littlehorse.canary.metronome.MetronomeGetWfRunExecutor; import io.littlehorse.canary.metronome.MetronomeRunWfExecutor; import io.littlehorse.canary.metronome.MetronomeWorker; @@ -11,11 +15,7 @@ import io.littlehorse.canary.metronome.internal.BeatProducer; import io.littlehorse.canary.metronome.internal.LocalRepository; import io.littlehorse.canary.prometheus.PrometheusExporter; -import io.littlehorse.canary.prometheus.PrometheusServerExporter; -import io.littlehorse.canary.util.LHClient; -import io.littlehorse.canary.util.ShutdownHook; import io.littlehorse.sdk.common.config.LHConfig; -import java.io.IOException; import java.nio.file.Paths; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -27,7 +27,13 @@ public class Main { public static void main(final String[] args) throws InterruptedException { try { - initialize(args); + final CanaryConfig config = args.length > 0 ? ConfigLoader.load(Paths.get(args[0])) : ConfigLoader.load(); + final PrometheusExporter exporter = new PrometheusExporter(config.getCommonTags()); + + maybeCreateTopics(config); + maybeStartMetronome(config); + maybeStartAggregator(config, exporter); + startWebServer(config, exporter); } catch (Exception e) { log.error("Error starting application", e); System.exit(-1); @@ -38,83 +44,92 @@ public static void main(final String[] args) throws InterruptedException { latch.await(); } - private static void initialize(final String[] args) throws IOException { - // dependencies - final CanaryConfig canaryConfig = args.length > 0 ? ConfigLoader.load(Paths.get(args[0])) : ConfigLoader.load(); + private static void maybeStartMetronome(final CanaryConfig config) { + if (!config.isMetronomeEnabled() && !config.isMetronomeWorkerEnabled()) return; - final PrometheusExporter prometheusExporter = new PrometheusExporter(canaryConfig.getCommonTags()); - // create topics - if (canaryConfig.isTopicCreationEnabled()) { - final NewTopic topic = new NewTopic( - canaryConfig.getTopicName(), canaryConfig.getTopicPartitions(), canaryConfig.getTopicReplicas()); + final LHConfig lhConfig = new LHConfig(config.toLittleHorseConfig().toMap()); - new TopicCreator( - canaryConfig.toKafkaConfig().toMap(), canaryConfig.getTopicCreationTimeout(), List.of(topic)); - } - final boolean metronomeOrWorkerEnabled = - canaryConfig.isMetronomeEnabled() || canaryConfig.isMetronomeWorkerEnabled(); - - if (metronomeOrWorkerEnabled) { - final LHConfig lhConfig = - new LHConfig(canaryConfig.toLittleHorseConfig().toMap()); - final LHClient lhClient = new LHClient( - lhConfig, - canaryConfig.getWorkflowName(), - canaryConfig.getWorkflowVersion(), - canaryConfig.getWorkflowRevision()); - prometheusExporter.addMeasurable(lhClient); - final BeatProducer producer = new BeatProducer( - lhConfig.getApiBootstrapHost(), - lhConfig.getApiBootstrapPort(), - lhClient.getServerVersion(), - canaryConfig.getTopicName(), - canaryConfig.toKafkaConfig().toMap(), - canaryConfig.getMetronomeBeatExtraTags()); - - // start worker - if (canaryConfig.isMetronomeWorkerEnabled()) { - new MetronomeWorker(producer, lhConfig); - } - - // start metronome client - if (canaryConfig.isMetronomeEnabled()) { - - // register wf - if (canaryConfig.isWorkflowCreationEnabled()) { - new MetronomeWorkflow(lhClient, canaryConfig.getWorkflowName()); - } - - final LocalRepository repository = new LocalRepository(canaryConfig.getMetronomeDataPath()); - - new MetronomeRunWfExecutor( - producer, - lhClient, - canaryConfig.getMetronomeRunFrequency(), - canaryConfig.getMetronomeRunThreads(), - canaryConfig.getMetronomeRunRequests(), - canaryConfig.getMetronomeSampleRate(), - repository); - - new MetronomeGetWfRunExecutor( - producer, - lhClient, - canaryConfig.getMetronomeGetFrequency(), - canaryConfig.getMetronomeGetThreads(), - canaryConfig.getMetronomeGetRetries(), - repository); - } - } + final LHClient lhClient = new LHClient( + lhConfig, config.getWorkflowName(), config.getWorkflowVersion(), config.getWorkflowRevision()); - // start the aggregator - if (canaryConfig.isAggregatorEnabled()) { - new PrometheusServerExporter( - canaryConfig.getMetricsPort(), canaryConfig.getMetricsPath(), prometheusExporter); - final Aggregator aggregator = new Aggregator( - canaryConfig.toKafkaConfig().toMap(), - canaryConfig.getTopicName(), - canaryConfig.getAggregatorStoreRetention(), - canaryConfig.getAggregatorExportFrequency()); - prometheusExporter.addMeasurable(aggregator); - } + final BeatProducer producer = new BeatProducer( + lhConfig.getApiBootstrapHost(), + lhConfig.getApiBootstrapPort(), + lhClient.getServerVersion(), + config.getTopicName(), + config.toKafkaConfig().toMap(), + config.getMetronomeBeatExtraTags()); + + maybeStartMetronomeWorker(config, producer, lhConfig); + maybeRegisterWorkflow(config, lhClient); + maybeStartMetronomeExecutors(config, producer, lhClient); + } + + private static void maybeStartMetronomeExecutors( + final CanaryConfig config, final BeatProducer producer, final LHClient lhClient) { + if (!config.isMetronomeEnabled()) return; + + final LocalRepository repository = new LocalRepository(config.getMetronomeDataPath()); + + final MetronomeRunWfExecutor runWfExecutor = new MetronomeRunWfExecutor( + producer, + lhClient, + config.getMetronomeRunFrequency(), + config.getMetronomeRunThreads(), + config.getMetronomeRunRequests(), + config.getMetronomeSamplePercentage(), + repository); + runWfExecutor.start(); + + final MetronomeGetWfRunExecutor getWfRunExecutor = new MetronomeGetWfRunExecutor( + producer, lhClient, config.getMetronomeGetFrequency(), config.getMetronomeGetRetries(), repository); + getWfRunExecutor.start(); + } + + private static void maybeRegisterWorkflow(final CanaryConfig config, final LHClient lhClient) { + if (!config.isWorkflowCreationEnabled()) return; + + final MetronomeWorkflow workflow = + new MetronomeWorkflow(lhClient, config.getWorkflowName(), config.getWorkflowRetention()); + workflow.register(); + } + + private static void maybeStartMetronomeWorker( + final CanaryConfig config, final BeatProducer producer, final LHConfig lhConfig) { + if (!config.isMetronomeWorkerEnabled()) return; + + final MetronomeWorker worker = new MetronomeWorker(producer, lhConfig); + worker.start(); + } + + private static void startWebServer(final CanaryConfig config, final PrometheusExporter prometheusExporter) { + final WebServer webServer = new WebServer(config.getMetricsPort()); + webServer.addHandler(HandlerType.GET, config.getMetricsPath(), prometheusExporter); + webServer.start(); + } + + private static void maybeStartAggregator(final CanaryConfig config, final PrometheusExporter prometheusExporter) { + if (!config.isAggregatorEnabled()) return; + + final Aggregator aggregator = new Aggregator( + config.toKafkaConfig().toMap(), + config.getTopicName(), + config.getAggregatorStoreRetention(), + config.getAggregatorExportFrequency()); + + prometheusExporter.addMeasurable(aggregator); + aggregator.start(); + } + + private static void maybeCreateTopics(final CanaryConfig config) { + if (!config.isTopicCreationEnabled()) return; + + final NewTopic topic = + new NewTopic(config.getTopicName(), config.getTopicPartitions(), config.getTopicReplicas()); + + final TopicCreator topicCreator = + new TopicCreator(config.toKafkaConfig().toMap(), config.getTopicCreationTimeout()); + + topicCreator.create(List.of(topic)); } } diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/Aggregator.java b/canary/src/main/java/io/littlehorse/canary/aggregator/Aggregator.java index f68632f0f..e783dd3de 100644 --- a/canary/src/main/java/io/littlehorse/canary/aggregator/Aggregator.java +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/Aggregator.java @@ -4,7 +4,7 @@ import io.littlehorse.canary.aggregator.prometheus.MetricStoreExporter; import io.littlehorse.canary.aggregator.topology.MetricsTopology; -import io.littlehorse.canary.util.ShutdownHook; +import io.littlehorse.canary.infra.ShutdownHook; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.binder.MeterBinder; import java.time.Duration; @@ -29,8 +29,10 @@ public Aggregator( kafkaStreams = new KafkaStreams(metricsTopology.toTopology(), new StreamsConfig(kafkaStreamsConfig)); ShutdownHook.add("Aggregator Topology", kafkaStreams); - kafkaStreams.start(); + } + public void start() { + kafkaStreams.start(); log.info("Aggregator Started"); } diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/prometheus/MetricStoreExporter.java b/canary/src/main/java/io/littlehorse/canary/aggregator/prometheus/MetricStoreExporter.java index be3c44d07..495ea1da2 100644 --- a/canary/src/main/java/io/littlehorse/canary/aggregator/prometheus/MetricStoreExporter.java +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/prometheus/MetricStoreExporter.java @@ -1,9 +1,9 @@ package io.littlehorse.canary.aggregator.prometheus; import com.google.common.util.concurrent.AtomicDouble; +import io.littlehorse.canary.infra.ShutdownHook; import io.littlehorse.canary.proto.MetricKey; import io.littlehorse.canary.proto.MetricValue; -import io.littlehorse.canary.util.ShutdownHook; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.MeterRegistry; @@ -53,7 +53,7 @@ private static List toMetricTags(final MetricKey key) { } private static String toMetricId(final MetricKey key, final String suffix) { - return "%s_%s".formatted(key.getId(), suffix); + return "%s_%s".formatted(key.getName(), suffix); } @Override diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/topology/MetricsTopology.java b/canary/src/main/java/io/littlehorse/canary/aggregator/topology/MetricsTopology.java index 071942b0c..c630acb50 100644 --- a/canary/src/main/java/io/littlehorse/canary/aggregator/topology/MetricsTopology.java +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/topology/MetricsTopology.java @@ -1,6 +1,5 @@ package io.littlehorse.canary.aggregator.topology; -import com.google.common.base.Strings; import io.littlehorse.canary.aggregator.internal.BeatTimeExtractor; import io.littlehorse.canary.aggregator.serdes.ProtobufSerdes; import io.littlehorse.canary.proto.*; @@ -39,9 +38,9 @@ public Topology toTopology() { // build latency metric stream final KStream latencyMetricsStream = beatsStream - // remove GET_WF_RUN_EXHAUSTED_RETRIES - .filterNot(MetricsTopology::isExhaustedRetries) - // remove the id + // remove messages without latency + .filter(MetricsTopology::hasLatency) + // remove id .groupBy( MetricsTopology::removeWfId, Grouped.with(ProtobufSerdes.BeatKey(), ProtobufSerdes.BeatValue())) // reset aggregator every minute @@ -57,7 +56,7 @@ public Topology toTopology() { // build count metric stream final KStream countMetricStream = beatsStream - // remove the id + // remove id .groupBy( MetricsTopology::removeWfId, Grouped.with(ProtobufSerdes.BeatKey(), ProtobufSerdes.BeatValue())) // count all @@ -112,8 +111,8 @@ private static MetricValue initializeMetricAggregator() { return MetricValue.newBuilder().build(); } - private static boolean isExhaustedRetries(final BeatKey key, final BeatValue value) { - return key.getType().equals(BeatType.GET_WF_RUN_EXHAUSTED_RETRIES); + private static boolean hasLatency(final BeatKey key, final BeatValue value) { + return value.hasLatency(); } private static boolean selectDuplicatedTaskRun(final BeatKey key, final Long value) { @@ -159,8 +158,8 @@ private Materialized> initializeBeat } private static KeyValue mapBeatToMetricCount(final BeatKey key, final Long count) { - final String metricIdPrefix = key.getType().toString().toLowerCase(); - return KeyValue.pair(buildMetricKey(key, metricIdPrefix), mapLongToMetricValue(count)); + final String metricNamePrefix = key.getType().toString().toLowerCase(); + return KeyValue.pair(buildMetricKey(key, metricNamePrefix), mapLongToMetricValue(count)); } private static Consumed initializeSerdes() { @@ -178,27 +177,19 @@ private Materialized> initi private static KeyValue mapBeatToMetricLatency( final BeatKey key, final AverageAggregator value) { - final String metricIdPrefix = key.getType().toString().toLowerCase(); - return KeyValue.pair(buildMetricKey(key, metricIdPrefix), mapAvgToMetricValue(value.getAvg(), value.getMax())); + final String metricNamePrefix = key.getType().toString().toLowerCase(); + return KeyValue.pair( + buildMetricKey(key, metricNamePrefix), mapAvgToMetricValue(value.getAvg(), value.getMax())); } - private static MetricKey buildMetricKey(final BeatKey key, final String id) { - final MetricKey.Builder builder = MetricKey.newBuilder() + private static MetricKey buildMetricKey(final BeatKey key, final String name) { + return MetricKey.newBuilder() .setServerVersion(key.getServerVersion()) .setServerPort(key.getServerPort()) .setServerHost(key.getServerHost()) - .setId("canary_%s".formatted(id)); - - if (key.hasStatus() && !Strings.isNullOrEmpty(key.getStatus())) { - builder.addTags( - Tag.newBuilder().setKey("status").setValue(key.getStatus().toLowerCase())); - } - - if (key.getTagsCount() > 0) { - builder.addAllTags(key.getTagsList()); - } - - return builder.build(); + .setName("canary_%s".formatted(name)) + .addAllTags(key.getTagsList()) + .build(); } private Materialized> initializeLatencyStore( @@ -219,7 +210,6 @@ private static BeatKey removeWfId(final BeatKey key, final BeatValue value) { .setServerVersion(key.getServerVersion()) .setServerHost(key.getServerHost()) .setServerPort(key.getServerPort()) - .setStatus(key.getStatus()) .addAllTags(key.getTagsList()) .build(); } diff --git a/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java b/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java index 634c06424..e44788a58 100644 --- a/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java +++ b/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java @@ -21,12 +21,13 @@ public class CanaryConfig implements Config { public static final String WORKFLOW_CREATION_ENABLE = "workflow.creation.enable"; public static final String WORKFLOW_VERSION = "workflow.version"; public static final String WORKFLOW_REVISION = "workflow.revision"; + public static final String WORKFLOW_RETENTION_MS = "workflow.retention.ms"; public static final String METRONOME_ENABLE = "metronome.enable"; public static final String METRONOME_RUN_FREQUENCY_MS = "metronome.run.frequency.ms"; public static final String METRONOME_RUN_THREADS = "metronome.run.threads"; public static final String METRONOME_RUN_REQUESTS = "metronome.run.requests"; - public static final String METRONOME_RUN_SAMPLE_RATE = "metronome.run.sample.rate"; + public static final String METRONOME_RUN_SAMPLE_PERCENTAGE = "metronome.run.sample.percentage"; public static final String METRONOME_GET_FREQUENCY_MS = "metronome.get.frequency.ms"; public static final String METRONOME_GET_THREADS = "metronome.get.threads"; public static final String METRONOME_GET_RETRIES = "metronome.get.retries"; @@ -123,6 +124,10 @@ public Duration getAggregatorExportFrequency() { return Duration.ofMillis(Long.parseLong(getConfig(AGGREGATOR_EXPORT_FREQUENCY_MS))); } + public Duration getWorkflowRetention() { + return Duration.ofMillis(Long.parseLong(getConfig(WORKFLOW_RETENTION_MS))); + } + public Duration getMetronomeRunFrequency() { return Duration.ofMillis(Long.parseLong(getConfig(METRONOME_RUN_FREQUENCY_MS))); } @@ -135,8 +140,8 @@ public int getMetronomeRunRequests() { return Integer.parseInt(getConfig(METRONOME_RUN_REQUESTS)); } - public int getMetronomeSampleRate() { - return Integer.parseInt(getConfig(METRONOME_RUN_SAMPLE_RATE)); + public int getMetronomeSamplePercentage() { + return Integer.parseInt(getConfig(METRONOME_RUN_SAMPLE_PERCENTAGE)); } public Duration getMetronomeGetFrequency() { @@ -172,7 +177,7 @@ public boolean isTopicCreationEnabled() { } public boolean isWorkflowCreationEnabled() { - return Boolean.parseBoolean(getConfig(WORKFLOW_CREATION_ENABLE)); + return isMetronomeEnabled() && Boolean.parseBoolean(getConfig(WORKFLOW_CREATION_ENABLE)); } public String getWorkflowName() { diff --git a/canary/src/main/java/io/littlehorse/canary/config/ConfigLoader.java b/canary/src/main/java/io/littlehorse/canary/config/ConfigLoader.java index c53b962f6..e36c83226 100644 --- a/canary/src/main/java/io/littlehorse/canary/config/ConfigLoader.java +++ b/canary/src/main/java/io/littlehorse/canary/config/ConfigLoader.java @@ -1,7 +1,5 @@ package io.littlehorse.canary.config; -import static io.littlehorse.canary.config.CanaryConfig.LH_CANARY_PREFIX; - import com.google.common.collect.Streams; import io.smallrye.config.PropertiesConfigSource; import io.smallrye.config.SmallRyeConfig; @@ -86,6 +84,6 @@ private static Map toMap(final Config config) { } private static Map defaultValues() { - return Map.of(LH_CANARY_PREFIX + CanaryConfig.METRONOME_RUN_SAMPLE_RATE, "100"); + return Map.of(); } } diff --git a/canary/src/main/java/io/littlehorse/canary/util/ShutdownHook.java b/canary/src/main/java/io/littlehorse/canary/infra/ShutdownHook.java similarity index 81% rename from canary/src/main/java/io/littlehorse/canary/util/ShutdownHook.java rename to canary/src/main/java/io/littlehorse/canary/infra/ShutdownHook.java index 248e1dcbf..044f8dd2f 100644 --- a/canary/src/main/java/io/littlehorse/canary/util/ShutdownHook.java +++ b/canary/src/main/java/io/littlehorse/canary/infra/ShutdownHook.java @@ -1,4 +1,4 @@ -package io.littlehorse.canary.util; +package io.littlehorse.canary.infra; import lombok.extern.slf4j.Slf4j; @@ -13,7 +13,7 @@ public static void add(final String message, final AutoCloseable closeable) { } catch (Exception e) { log.error("Error in ShutdownHook '{}'", message, e); } - log.trace("Shutdown process for '{}' was completed", message); + log.debug("Shutdown process for '{}' was completed", message); })); } } diff --git a/canary/src/main/java/io/littlehorse/canary/infra/WebServer.java b/canary/src/main/java/io/littlehorse/canary/infra/WebServer.java new file mode 100644 index 000000000..6b02c0b7b --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/infra/WebServer.java @@ -0,0 +1,28 @@ +package io.littlehorse.canary.infra; + +import io.javalin.Javalin; +import io.javalin.http.Handler; +import io.javalin.http.HandlerType; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class WebServer { + + private final int webPort; + private final Javalin server; + + public WebServer(final int webPort) { + this.webPort = webPort; + this.server = Javalin.create(); + ShutdownHook.add("Web Server", server::stop); + } + + public void addHandler(final HandlerType type, final String path, final Handler handler) { + server.addHttpHandler(type, path, handler); + } + + public void start() { + server.start(webPort); + log.info("Metrics Server Exporter Started"); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/kafka/TopicCreator.java b/canary/src/main/java/io/littlehorse/canary/kafka/TopicCreator.java index 51ecfc7c5..ad82536c0 100644 --- a/canary/src/main/java/io/littlehorse/canary/kafka/TopicCreator.java +++ b/canary/src/main/java/io/littlehorse/canary/kafka/TopicCreator.java @@ -1,7 +1,7 @@ package io.littlehorse.canary.kafka; import io.littlehorse.canary.CanaryException; -import io.littlehorse.canary.util.ShutdownHook; +import io.littlehorse.canary.infra.ShutdownHook; import java.time.Duration; import java.util.List; import java.util.Map; @@ -15,11 +15,16 @@ @Slf4j public class TopicCreator { - public TopicCreator( - final Map kafkaAdminClient, final Duration timeout, final List topics) { - final AdminClient adminClient = KafkaAdminClient.create(kafkaAdminClient); + private final AdminClient adminClient; + private final Duration timeout; + + public TopicCreator(final Map config, final Duration timeout) { + this.timeout = timeout; + this.adminClient = KafkaAdminClient.create(config); ShutdownHook.add("Topics Creator", adminClient); + } + public void create(final List topics) { try { adminClient.createTopics(topics).all().get(timeout.toMillis(), TimeUnit.MILLISECONDS); log.info("Topics {} created", topics); diff --git a/canary/src/main/java/io/littlehorse/canary/util/LHClient.java b/canary/src/main/java/io/littlehorse/canary/littlehorse/LHClient.java similarity index 68% rename from canary/src/main/java/io/littlehorse/canary/util/LHClient.java rename to canary/src/main/java/io/littlehorse/canary/littlehorse/LHClient.java index 30641934a..d8d5e1f98 100644 --- a/canary/src/main/java/io/littlehorse/canary/util/LHClient.java +++ b/canary/src/main/java/io/littlehorse/canary/littlehorse/LHClient.java @@ -1,4 +1,4 @@ -package io.littlehorse.canary.util; +package io.littlehorse.canary.littlehorse; import static io.littlehorse.canary.metronome.MetronomeWorkflow.SAMPLE_ITERATION_VARIABLE; import static io.littlehorse.canary.metronome.MetronomeWorkflow.START_TIME_VARIABLE; @@ -9,21 +9,15 @@ import io.littlehorse.sdk.common.proto.*; import io.littlehorse.sdk.common.proto.LittleHorseGrpc.LittleHorseFutureStub; import io.littlehorse.sdk.wfsdk.Workflow; -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.binder.MeterBinder; import java.time.Instant; -import lombok.NonNull; -public class LHClient implements MeterBinder { +public class LHClient { private final LittleHorseFutureStub futureStub; private final LittleHorseGrpc.LittleHorseBlockingStub blockingStub; private final String workflowName; private final int workflowRevision; private final int workflowVersion; - private static final String WF_RUN_COUNTER_NAME = "canary_metronome_wf_run"; - private final CounterMetric wfRunCounter = new CounterMetric(WF_RUN_COUNTER_NAME); public LHClient( final LHConfig lhConfig, final String workflowName, final int workflowVersion, final int workflowRevision) { @@ -66,33 +60,4 @@ public ListenableFuture runCanaryWf(final String id, final Instant start, public WfRun getCanaryWfRun(final String id) { return blockingStub.getWfRun(WfRunId.newBuilder().setId(id).build()); } - - @Override - public void bindTo(@NonNull final MeterRegistry registry) { - wfRunCounter.bindTo(registry); - } - - public void incrementWfRunCountMetric() { - wfRunCounter.increment(); - } - - private static class CounterMetric implements MeterBinder { - private final String metricName; - private Counter counter; - - private CounterMetric(final String metricName) { - this.metricName = metricName; - } - - @Override - public void bindTo(@NonNull final MeterRegistry registry) { - counter = Counter.builder(metricName).register(registry); - } - - public void increment() { - if (counter != null) { - counter.increment(); - } - } - } } diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeGetWfRunExecutor.java b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeGetWfRunExecutor.java index b7077ba3c..03115f344 100644 --- a/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeGetWfRunExecutor.java +++ b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeGetWfRunExecutor.java @@ -1,15 +1,16 @@ package io.littlehorse.canary.metronome; import com.google.protobuf.util.Timestamps; +import io.grpc.StatusRuntimeException; +import io.littlehorse.canary.infra.ShutdownHook; +import io.littlehorse.canary.littlehorse.LHClient; import io.littlehorse.canary.metronome.internal.BeatProducer; import io.littlehorse.canary.metronome.internal.LocalRepository; +import io.littlehorse.canary.metronome.model.Beat; +import io.littlehorse.canary.metronome.model.BeatStatus; import io.littlehorse.canary.proto.Attempt; -import io.littlehorse.canary.proto.BeatStatus; import io.littlehorse.canary.proto.BeatType; -import io.littlehorse.canary.util.LHClient; -import io.littlehorse.canary.util.ShutdownHook; import io.littlehorse.sdk.common.proto.LHStatus; -import io.littlehorse.sdk.common.proto.WfRun; import java.time.Duration; import java.time.Instant; import java.util.Map; @@ -21,10 +22,11 @@ @Slf4j public class MetronomeGetWfRunExecutor { + public static final String EXHAUSTED_RETRIES = "EXHAUSTED_RETRIES"; private final ScheduledExecutorService mainExecutor; - private final ExecutorService requestsExecutor; private final BeatProducer producer; private final LHClient lhClient; + private final Duration frequency; private final LocalRepository repository; private final long retries; @@ -32,21 +34,30 @@ public MetronomeGetWfRunExecutor( final BeatProducer producer, final LHClient lhClient, final Duration frequency, - final int threads, final int retries, final LocalRepository repository) { this.producer = producer; this.lhClient = lhClient; + this.frequency = frequency; this.repository = repository; this.retries = retries; mainExecutor = Executors.newSingleThreadScheduledExecutor(); ShutdownHook.add("Metronome: GetWfRun Main Executor Thread", () -> closeExecutor(mainExecutor)); - mainExecutor.scheduleAtFixedRate(this::scheduledRun, 0, frequency.toMillis(), TimeUnit.MILLISECONDS); - - requestsExecutor = Executors.newFixedThreadPool(threads); - ShutdownHook.add("Metronome: GetWfRun Request Executor Thread", () -> closeExecutor(requestsExecutor)); + } + public void start() { + mainExecutor.scheduleAtFixedRate( + () -> { + try { + scheduledRun(); + } catch (Exception e) { + log.error("Error when executing workflow run", e); + } + }, + 0, + frequency.toMillis(), + TimeUnit.MILLISECONDS); log.info("GetWfRun Metronome Started"); } @@ -56,58 +67,105 @@ private void closeExecutor(final ExecutorService executor) throws InterruptedExc } private void scheduledRun() { - log.trace("Executing GetWfRun metronome"); + log.debug("Executing GetWfRun metronome"); final Instant searchCriteria = Instant.now().minus(Duration.ofMinutes(1)); final Map attempts = repository.getAttemptsBefore(searchCriteria); - attempts.forEach(this::executeRun); + attempts.forEach((id, attempt) -> { + try { + executeRun(id, attempt); + } catch (Exception e) { + sendError(id, e); + } + }); } private void executeRun(final String id, final Attempt attempt) { + log.debug("GetWfRun {}", id); + + // exit if it gets exhausted if (attempt.getAttempt() >= retries) { - repository.delete(id); - producer.sendFuture(id, BeatType.GET_WF_RUN_EXHAUSTED_RETRIES); + sendExhaustedRetries(id); return; } + // update attempt number updateAttempt(id, attempt); + // get status final Instant start = Instant.now(); - final WfRun currentStatus = getCurrentStatus(id, start); + final LHStatus status = lhClient.getCanaryWfRun(id).getStatus(); + final Duration latency = Duration.between(start, Instant.now()); + + // send beat and exit + sendBeat(id, status, latency); - if (currentStatus == null) { + if (status.equals(LHStatus.COMPLETED)) { + repository.delete(id); return; } - producer.sendFuture( - id, - BeatType.GET_WF_RUN_REQUEST, - currentStatus.getStatus().name(), - Duration.between(start, Instant.now())); + // log in case of error + log.error("GetWfRun returns workflow error {} {}", id, status); + } - log.debug("GetWfRun {} {}", id, currentStatus.getStatus()); - if (currentStatus.getStatus().equals(LHStatus.COMPLETED)) { - repository.delete(id); - } + private void sendBeat(final String id, final LHStatus status, final Duration latency) { + final BeatStatus beatStatus = BeatStatus.builder( + status.equals(LHStatus.COMPLETED) ? BeatStatus.Code.OK : BeatStatus.Code.ERROR) + .source(BeatStatus.Source.WORKFLOW) + .reason(status.name()) + .build(); + + final Beat beat = Beat.builder(BeatType.GET_WF_RUN_REQUEST) + .id(id) + .latency(latency) + .status(beatStatus) + .build(); + + producer.send(beat); + } + + private void sendExhaustedRetries(final String id) { + repository.delete(id); + + final BeatStatus beatStatus = BeatStatus.builder(BeatStatus.Code.ERROR) + .reason(EXHAUSTED_RETRIES) + .build(); + + final Beat beat = Beat.builder(BeatType.GET_WF_RUN_REQUEST) + .id(id) + .status(beatStatus) + .build(); + + producer.send(beat); } private void updateAttempt(final String id, final Attempt attempt) { - repository.save( - id, - Attempt.newBuilder() - .setStart(attempt.getStart()) - .setLastAttempt(Timestamps.now()) - .setAttempt(attempt.getAttempt() + 1) - .build()); + final Attempt newAttempt = Attempt.newBuilder() + .setStart(attempt.getStart()) + .setLastAttempt(Timestamps.now()) + .setAttempt(attempt.getAttempt() + 1) + .build(); + log.debug("GetWfRun {} Retry {}", id, newAttempt.getAttempt()); + repository.save(id, newAttempt); } - private WfRun getCurrentStatus(final String id, final Instant start) { - try { - return lhClient.getCanaryWfRun(id); - } catch (Exception e) { - log.error("Error executing getWfRun {}", e.getMessage(), e); - producer.sendFuture( - id, BeatType.GET_WF_RUN_REQUEST, BeatStatus.ERROR.name(), Duration.between(start, Instant.now())); - return null; + private void sendError(final String id, final Exception e) { + log.error("Error executing getWfRun {}", e.getMessage(), e); + + final BeatStatus.BeatStatusBuilder statusBuilder = + BeatStatus.builder(BeatStatus.Code.ERROR).reason(e.getClass().getSimpleName()); + + if (e instanceof StatusRuntimeException statusException) { + statusBuilder + .source(BeatStatus.Source.GRPC) + .reason(statusException.getStatus().getCode().name()); } + + final Beat beat = Beat.builder(BeatType.GET_WF_RUN_REQUEST) + .id(id) + .status(statusBuilder.build()) + .build(); + + producer.send(beat); } } diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeRunWfExecutor.java b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeRunWfExecutor.java index 91c39034a..e7563fa04 100644 --- a/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeRunWfExecutor.java +++ b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeRunWfExecutor.java @@ -3,12 +3,15 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.littlehorse.canary.infra.ShutdownHook; +import io.littlehorse.canary.littlehorse.LHClient; import io.littlehorse.canary.metronome.internal.BeatProducer; import io.littlehorse.canary.metronome.internal.LocalRepository; -import io.littlehorse.canary.proto.BeatStatus; +import io.littlehorse.canary.metronome.model.Beat; +import io.littlehorse.canary.metronome.model.BeatStatus; import io.littlehorse.canary.proto.BeatType; -import io.littlehorse.canary.util.LHClient; -import io.littlehorse.canary.util.ShutdownHook; import io.littlehorse.sdk.common.proto.WfRun; import java.time.Duration; import java.time.Instant; @@ -31,11 +34,10 @@ public class MetronomeRunWfExecutor { private final ScheduledExecutorService mainExecutor; private final ExecutorService requestsExecutor; private final LHClient lhClient; + private final Duration frequency; private final int runs; private final LocalRepository repository; private final int sampleRate; - private final boolean sampleDataEnabled; - private final int sampleSize; public MetronomeRunWfExecutor( final BeatProducer producer, @@ -47,19 +49,28 @@ public MetronomeRunWfExecutor( final LocalRepository repository) { this.producer = producer; this.lhClient = lhClient; + this.frequency = frequency; this.runs = runs; this.repository = repository; this.sampleRate = sampleRate; - this.sampleDataEnabled = sampleRate > 0; - this.sampleSize = (int) (runs * (sampleRate / 100.0)); mainExecutor = Executors.newSingleThreadScheduledExecutor(); ShutdownHook.add("Metronome: RunWf Main Executor Thread", () -> closeExecutor(mainExecutor)); - mainExecutor.scheduleAtFixedRate(this::scheduledRun, 0, frequency.toMillis(), TimeUnit.MILLISECONDS); requestsExecutor = Executors.newFixedThreadPool(threads); ShutdownHook.add("Metronome: RunWf Request Executor Thread", () -> closeExecutor(requestsExecutor)); + } + public int getSampleSize() { + return (int) (runs * (sampleRate / 100.0)); + } + + public boolean isSamplingEnabled() { + return sampleRate > 0; + } + + public void start() { + mainExecutor.scheduleAtFixedRate(this::scheduledRun, 0, frequency.toMillis(), TimeUnit.MILLISECONDS); log.info("RunWf Metronome Started"); } @@ -78,12 +89,8 @@ private void executeRun(final boolean isSampleIteration) { Futures.addCallback(future, new MetronomeCallback(wfId, start, isSampleIteration), requestsExecutor); } - private void sendMetricBeat(final String wfId, final Instant start, final String status) { - producer.sendFuture(wfId, BeatType.WF_RUN_REQUEST, status, Duration.between(start, Instant.now())); - } - private void scheduledRun() { - log.trace("Executing run wf metronome"); + log.debug("Executing run wf metronome"); final HashSet sample = createSampleRuns(); for (int i = 0; i < runs; i++) { final boolean isSampleIteration = sample.contains(i); @@ -92,13 +99,13 @@ private void scheduledRun() { } private HashSet createSampleRuns() { - if (!sampleDataEnabled) { + if (!isSamplingEnabled()) { return new HashSet<>(); } final List range = new ArrayList<>(IntStream.range(0, runs).boxed().toList()); Collections.shuffle(range); - final List sample = range.subList(0, sampleSize); + final List sample = range.subList(0, getSampleSize()); return new HashSet<>(sample); } @@ -115,18 +122,42 @@ private MetronomeCallback(final String wfRunId, final Instant startedAt, final b @Override public void onSuccess(final WfRun result) { - lhClient.incrementWfRunCountMetric(); - if (isSampleIteration) { - sendMetricBeat(wfRunId, startedAt, BeatStatus.OK.name()); - repository.save(wfRunId, 0); - } + if (!isSampleIteration) return; + + final Beat beat = Beat.builder(BeatType.WF_RUN_REQUEST) + .id(wfRunId) + .latency(Duration.between(startedAt, Instant.now())) + .status(BeatStatus.builder(BeatStatus.Code.OK).build()) + .build(); + producer.send(beat); + repository.save(wfRunId, 0); } @Override public void onFailure(final Throwable t) { - lhClient.incrementWfRunCountMetric(); log.error("Error executing runWf {}", t.getMessage(), t); - sendMetricBeat(wfRunId, startedAt, BeatStatus.ERROR.name()); + + final BeatStatus.BeatStatusBuilder statusBuilder = BeatStatus.builder(BeatStatus.Code.ERROR) + .reason(t.getClass().getSimpleName()); + + if (t instanceof StatusRuntimeException statusException) { + statusBuilder + .source(BeatStatus.Source.GRPC) + .reason(statusException.getStatus().getCode().name()); + } + + if (t instanceof StatusException statusException) { + statusBuilder + .source(BeatStatus.Source.GRPC) + .reason(statusException.getStatus().getCode().name()); + } + + final Beat beat = Beat.builder(BeatType.WF_RUN_REQUEST) + .id(wfRunId) + .status(statusBuilder.build()) + .build(); + + producer.send(beat); } } } diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeWorker.java b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeWorker.java index f15047a99..488f2638a 100644 --- a/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeWorker.java +++ b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeWorker.java @@ -1,39 +1,57 @@ package io.littlehorse.canary.metronome; +import io.littlehorse.canary.infra.ShutdownHook; import io.littlehorse.canary.metronome.internal.BeatProducer; +import io.littlehorse.canary.metronome.model.Beat; import io.littlehorse.canary.proto.BeatType; -import io.littlehorse.canary.util.ShutdownHook; import io.littlehorse.sdk.common.config.LHConfig; import io.littlehorse.sdk.worker.LHTaskMethod; import io.littlehorse.sdk.worker.LHTaskWorker; import io.littlehorse.sdk.worker.WorkerContext; import java.time.Duration; import java.time.Instant; +import java.util.concurrent.ExecutionException; import lombok.extern.slf4j.Slf4j; @Slf4j public class MetronomeWorker { private final BeatProducer producer; + private final LHTaskWorker worker; public MetronomeWorker(final BeatProducer producer, final LHConfig lhConfig) { this.producer = producer; - - final LHTaskWorker worker = new LHTaskWorker(this, MetronomeWorkflow.TASK_NAME, lhConfig); + this.worker = new LHTaskWorker(this, MetronomeWorkflow.TASK_NAME, lhConfig); ShutdownHook.add("Metronome: LH Task Worker", worker); + } + + public void start() { worker.registerTaskDef(); worker.start(); - log.info("Worker Started"); } @LHTaskMethod(MetronomeWorkflow.TASK_NAME) - public void executeTask(final long startTime, final boolean sampleIteration, final WorkerContext context) { - final String id = "%s/%s".formatted(context.getIdempotencyKey(), context.getAttemptNumber()); - log.debug("Executing task {} {}", MetronomeWorkflow.TASK_NAME, id); + public void executeTask(final long startTime, final boolean sampleIteration, final WorkerContext context) + throws ExecutionException, InterruptedException { + log.debug( + "Executing task {} {}/{}", + MetronomeWorkflow.TASK_NAME, + context.getIdempotencyKey(), + context.getAttemptNumber()); + if (sampleIteration) { - producer.send( - id, BeatType.TASK_RUN_EXECUTION, Duration.between(Instant.ofEpochMilli(startTime), Instant.now())); + final String id = "%s/%s".formatted(context.getIdempotencyKey(), context.getAttemptNumber()); + final Duration latency = Duration.between(Instant.ofEpochMilli(startTime), Instant.now()); + sendBeat(id, latency); } } + + private void sendBeat(final String id, final Duration latency) throws InterruptedException, ExecutionException { + final Beat beat = Beat.builder(BeatType.TASK_RUN_EXECUTION) + .id(id) + .latency(latency) + .build(); + producer.send(beat).get(); + } } diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeWorkflow.java b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeWorkflow.java index 68af5e903..c31703e02 100644 --- a/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeWorkflow.java +++ b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeWorkflow.java @@ -1,9 +1,10 @@ package io.littlehorse.canary.metronome; -import io.littlehorse.canary.util.LHClient; +import io.littlehorse.canary.littlehorse.LHClient; import io.littlehorse.sdk.common.proto.VariableType; import io.littlehorse.sdk.common.proto.WorkflowRetentionPolicy; import io.littlehorse.sdk.wfsdk.Workflow; +import java.time.Duration; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -12,22 +13,24 @@ public class MetronomeWorkflow { public static final String TASK_NAME = "canary-worker-task"; public static final String START_TIME_VARIABLE = "start-time"; public static final String SAMPLE_ITERATION_VARIABLE = "sample-iteration"; + private final Workflow workflow; + private final LHClient lhClient; - public MetronomeWorkflow(final LHClient lhClient, final String workflowName) { - final Workflow workflow = Workflow.newWorkflow( - workflowName, - thread -> thread.execute( - TASK_NAME, - thread.addVariable(START_TIME_VARIABLE, VariableType.INT), - thread.addVariable(SAMPLE_ITERATION_VARIABLE, VariableType.BOOL))); - - // TODO: This should be a configuration later. - workflow.withRetentionPolicy(WorkflowRetentionPolicy.newBuilder() - .setSecondsAfterWfTermination(60 * 60 * 24) - .build()); + public MetronomeWorkflow(final LHClient lhClient, final String workflowName, final Duration workflowRetention) { + this.lhClient = lhClient; + this.workflow = Workflow.newWorkflow( + workflowName, + thread -> thread.execute( + TASK_NAME, + thread.addVariable(START_TIME_VARIABLE, VariableType.INT), + thread.addVariable(SAMPLE_ITERATION_VARIABLE, VariableType.BOOL))) + .withRetentionPolicy(WorkflowRetentionPolicy.newBuilder() + .setSecondsAfterWfTermination(workflowRetention.getSeconds()) + .build()); + } + public void register() { lhClient.registerWorkflow(workflow); - - log.info("Workflow {} Registered", workflowName); + log.info("Workflow {} Registered", workflow.getName()); } } diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/internal/BeatProducer.java b/canary/src/main/java/io/littlehorse/canary/metronome/internal/BeatProducer.java index 96671335c..c0702aea1 100644 --- a/canary/src/main/java/io/littlehorse/canary/metronome/internal/BeatProducer.java +++ b/canary/src/main/java/io/littlehorse/canary/metronome/internal/BeatProducer.java @@ -1,16 +1,12 @@ package io.littlehorse.canary.metronome.internal; -import com.google.protobuf.util.Timestamps; -import io.littlehorse.canary.CanaryException; +import io.littlehorse.canary.infra.ShutdownHook; +import io.littlehorse.canary.metronome.model.Beat; import io.littlehorse.canary.proto.BeatKey; -import io.littlehorse.canary.proto.BeatType; import io.littlehorse.canary.proto.BeatValue; import io.littlehorse.canary.proto.Tag; -import io.littlehorse.canary.util.ShutdownHook; -import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.*; @@ -43,68 +39,32 @@ public BeatProducer( ShutdownHook.add("Beat Producer", producer); } - public Future sendFuture(final String id, final BeatType type) { - return sendFuture(id, type, null, null); - } - - public Future sendFuture( - final String id, final BeatType type, final String status, final Duration latency) { - - final BeatKey beatKey = buildKey(id, type, status); - final BeatValue beatValue = buildValue(latency); + public Future send(final Beat beat) { + final BeatKey beatKey = beat.toBeatKey().toBuilder() + .setServerHost(lhServerHost) + .setServerPort(lhServerPort) + .setServerVersion(lhServerVersion) + .addAllTags(getExtraTags()) + .build(); + final BeatValue beatValue = beat.toBeatValue(); + final ProducerRecord record = + new ProducerRecord<>(topicName, Bytes.wrap(beatKey.toByteArray()), Bytes.wrap(beatValue.toByteArray())); - return producer.send(buildRecord(beatKey, beatValue), (metadata, exception) -> { + return producer.send(record, (metadata, exception) -> { if (exception == null) { - log.trace("Producing message {}", beatKey.getType()); + log.debug("Producing message {}", beatKey.getType()); } else { log.error("Producing message {}", beatKey.getType(), exception); } }); } - private ProducerRecord buildRecord(final BeatKey beatKey, final BeatValue beatValue) { - return new ProducerRecord<>(topicName, Bytes.wrap(beatKey.toByteArray()), Bytes.wrap(beatValue.toByteArray())); - } - - public RecordMetadata send(final String id, final BeatType type, final Duration latency) { - try { - return sendFuture(id, type, null, latency).get(); - } catch (InterruptedException | ExecutionException e) { - throw new CanaryException(e); - } - } - - private BeatValue buildValue(final Duration latency) { - final BeatValue.Builder builder = BeatValue.newBuilder().setTime(Timestamps.now()); - - if (latency != null) { - builder.setLatency(latency.toMillis()); - } - - return builder.build(); - } - - private BeatKey buildKey(final String id, final BeatType type, final String status) { - final BeatKey.Builder builder = BeatKey.newBuilder() - .setServerHost(lhServerHost) - .setServerPort(lhServerPort) - .setServerVersion(lhServerVersion) - .setId(id) - .setType(type); - - if (status != null) { - builder.setStatus(status); - } - - final List tags = extraTags.entrySet().stream() + private List getExtraTags() { + return extraTags.entrySet().stream() .map(entry -> Tag.newBuilder() .setKey(entry.getKey()) .setValue(entry.getValue()) .build()) .toList(); - - builder.addAllTags(tags); - - return builder.build(); } } diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/internal/LocalRepository.java b/canary/src/main/java/io/littlehorse/canary/metronome/internal/LocalRepository.java index 889c80df9..a11fee825 100644 --- a/canary/src/main/java/io/littlehorse/canary/metronome/internal/LocalRepository.java +++ b/canary/src/main/java/io/littlehorse/canary/metronome/internal/LocalRepository.java @@ -4,8 +4,8 @@ import com.google.protobuf.Timestamp; import com.google.protobuf.util.Timestamps; import io.littlehorse.canary.CanaryException; +import io.littlehorse.canary.infra.ShutdownHook; import io.littlehorse.canary.proto.Attempt; -import io.littlehorse.canary.util.ShutdownHook; import java.time.Instant; import java.util.HashMap; import java.util.Map; diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/model/Beat.java b/canary/src/main/java/io/littlehorse/canary/metronome/model/Beat.java new file mode 100644 index 000000000..465cc6a3c --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/metronome/model/Beat.java @@ -0,0 +1,55 @@ +package io.littlehorse.canary.metronome.model; + +import com.google.protobuf.util.Timestamps; +import io.littlehorse.canary.proto.BeatKey; +import io.littlehorse.canary.proto.BeatType; +import io.littlehorse.canary.proto.BeatValue; +import java.time.Duration; +import java.time.Instant; +import lombok.Builder; +import lombok.Data; +import lombok.NonNull; + +@Data +@Builder +public class Beat { + @NonNull + private BeatType type; + + private String id; + private BeatStatus status; + private Duration latency; + private Instant time; + + public static BeatBuilder builder(final BeatType type) { + return new BeatBuilder().type(type); + } + + public BeatValue toBeatValue() { + final BeatValue.Builder builder = BeatValue.newBuilder().setTime(Timestamps.now()); + + if (getLatency() != null) { + builder.setLatency(getLatency().toMillis()); + } + + if (getTime() != null) { + builder.setTime(Timestamps.fromMillis(getTime().toEpochMilli())); + } + + return builder.build(); + } + + public BeatKey toBeatKey() { + final BeatKey.Builder builder = BeatKey.newBuilder().setType(type); + + if (id != null) { + builder.setId(id); + } + + if (status != null) { + builder.addAllTags(status.toTags()); + } + + return builder.build(); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/model/BeatStatus.java b/canary/src/main/java/io/littlehorse/canary/metronome/model/BeatStatus.java new file mode 100644 index 000000000..bf4e9ef5a --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/metronome/model/BeatStatus.java @@ -0,0 +1,56 @@ +package io.littlehorse.canary.metronome.model; + +import io.littlehorse.canary.proto.Tag; +import java.util.List; +import lombok.Builder; +import lombok.Data; +import lombok.NonNull; + +@Data +@Builder +public class BeatStatus { + + private static final String STATUS = "status"; + private static final String REASON = "reason"; + private static final String DEFAULT_REASON = ""; + + @NonNull + private Code code; + + private Source source; + private String reason; + + public static BeatStatusBuilder builder(final Code code) { + return new BeatStatusBuilder().code(code).source(Source.CANARY); + } + + public List toTags() { + final Tag statusTag = Tag.newBuilder() + .setKey(STATUS) + .setValue(code.name().toLowerCase()) + .build(); + final Tag reasonTag = + Tag.newBuilder().setKey(REASON).setValue(getReason()).build(); + + return List.of(statusTag, reasonTag); + } + + private String getReason() { + return reason == null + ? DEFAULT_REASON + : "%s_%s" + .formatted(source.name(), reason.replaceAll("([a-z])([A-Z]+)", "$1_$2")) + .toLowerCase(); + } + + public enum Code { + OK, + ERROR + } + + public enum Source { + CANARY, + GRPC, + WORKFLOW + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/prometheus/PrometheusExporter.java b/canary/src/main/java/io/littlehorse/canary/prometheus/PrometheusExporter.java index 4e8e4707e..36e063d88 100644 --- a/canary/src/main/java/io/littlehorse/canary/prometheus/PrometheusExporter.java +++ b/canary/src/main/java/io/littlehorse/canary/prometheus/PrometheusExporter.java @@ -1,13 +1,18 @@ package io.littlehorse.canary.prometheus; -import io.littlehorse.canary.util.ShutdownHook; +import io.javalin.http.ContentType; +import io.javalin.http.Context; +import io.javalin.http.Handler; +import io.littlehorse.canary.infra.ShutdownHook; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.binder.MeterBinder; import io.micrometer.prometheusmetrics.PrometheusConfig; import io.micrometer.prometheusmetrics.PrometheusMeterRegistry; import java.util.Map; +import lombok.extern.slf4j.Slf4j; -public class PrometheusExporter { +@Slf4j +public class PrometheusExporter implements Handler { private final PrometheusMeterRegistry prometheusRegistry; @@ -26,7 +31,9 @@ public void addMeasurable(final MeterBinder measurable) { measurable.bindTo(prometheusRegistry); } - public String scrape() { - return prometheusRegistry.scrape(); + @Override + public void handle(final Context ctx) { + log.debug("Processing metrics request"); + ctx.contentType(ContentType.PLAIN).result(prometheusRegistry.scrape()); } } diff --git a/canary/src/main/java/io/littlehorse/canary/prometheus/PrometheusServerExporter.java b/canary/src/main/java/io/littlehorse/canary/prometheus/PrometheusServerExporter.java deleted file mode 100644 index 740e8e613..000000000 --- a/canary/src/main/java/io/littlehorse/canary/prometheus/PrometheusServerExporter.java +++ /dev/null @@ -1,27 +0,0 @@ -package io.littlehorse.canary.prometheus; - -import io.javalin.Javalin; -import io.javalin.http.ContentType; -import io.javalin.http.Context; -import io.littlehorse.canary.util.ShutdownHook; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class PrometheusServerExporter { - - private final PrometheusExporter prometheusExporter; - - public PrometheusServerExporter( - final int webPort, final String webPath, final PrometheusExporter prometheusExporter) { - this.prometheusExporter = prometheusExporter; - final Javalin server = Javalin.create().get(webPath, this::printMetrics).start(webPort); - ShutdownHook.add("Prometheus Exporter: Web Server", server::stop); - - log.info("Metrics Server Exporter Started"); - } - - private void printMetrics(final Context context) { - log.trace("Processing metrics request"); - context.contentType(ContentType.PLAIN).result(prometheusExporter.scrape()); - } -} diff --git a/canary/src/main/proto/beats.proto b/canary/src/main/proto/beats.proto index b0a2e7261..e3ef34a49 100644 --- a/canary/src/main/proto/beats.proto +++ b/canary/src/main/proto/beats.proto @@ -11,12 +11,6 @@ enum BeatType { WF_RUN_REQUEST = 0; GET_WF_RUN_REQUEST = 1; TASK_RUN_EXECUTION = 2; - GET_WF_RUN_EXHAUSTED_RETRIES = 3; -} - -enum BeatStatus { - OK = 0; - ERROR = 1; } message BeatKey { @@ -24,9 +18,8 @@ message BeatKey { int32 server_port = 2; string server_version = 3; BeatType type = 4; - optional string status = 5; - optional string id = 6; - repeated Tag tags = 7; + optional string id = 5; + repeated Tag tags = 6; } message BeatValue { diff --git a/canary/src/main/proto/metrics.proto b/canary/src/main/proto/metrics.proto index 0d876e540..2fb4e88e5 100644 --- a/canary/src/main/proto/metrics.proto +++ b/canary/src/main/proto/metrics.proto @@ -8,7 +8,7 @@ message MetricKey { string server_host = 1; int32 server_port = 2; string server_version = 3; - string id = 4; + string name = 4; repeated Tag tags = 5; } diff --git a/canary/src/main/resources/META-INF/microprofile-config.properties b/canary/src/main/resources/META-INF/microprofile-config.properties index e714af8b3..6b0a01c8f 100644 --- a/canary/src/main/resources/META-INF/microprofile-config.properties +++ b/canary/src/main/resources/META-INF/microprofile-config.properties @@ -13,6 +13,7 @@ lh.canary.workflow.creation.enable=true lh.canary.workflow.name=canary-workflow lh.canary.workflow.version=0 lh.canary.workflow.revision=0 +lh.canary.workflow.retention.ms=86400000 # Metrics settings lh.canary.metrics.port=4023 @@ -30,6 +31,7 @@ lh.canary.metronome.worker.enable=true lh.canary.metronome.run.frequency.ms=1000 lh.canary.metronome.run.threads=1 lh.canary.metronome.run.requests=1 +lh.canary.metronome.run.sample.percentage=100 lh.canary.metronome.get.frequency.ms=30000 lh.canary.metronome.get.threads=10 lh.canary.metronome.get.retries=10 diff --git a/canary/src/test/java/io/littlehorse/canary/aggregator/internal/BeatTimeExtractorTest.java b/canary/src/test/java/io/littlehorse/canary/aggregator/internal/BeatTimeExtractorTest.java index 86e440e5b..7927462e7 100644 --- a/canary/src/test/java/io/littlehorse/canary/aggregator/internal/BeatTimeExtractorTest.java +++ b/canary/src/test/java/io/littlehorse/canary/aggregator/internal/BeatTimeExtractorTest.java @@ -17,7 +17,7 @@ private static ConsumerRecord newRecord(Object value) { } @Test - void returnDefaultTimeIfMetadataIsNotPresent() { + void shouldReturnDefaultTimeIfMetadataIsNotPresent() { BeatTimeExtractor extractor = new BeatTimeExtractor(); long expectedTime = faker.number().randomNumber(); @@ -28,7 +28,7 @@ void returnDefaultTimeIfMetadataIsNotPresent() { } @Test - void returnTheRightTimestamp() { + void shouldExtractTimestampFromValue() { BeatTimeExtractor extractor = new BeatTimeExtractor(); long expectedTime = faker.number().randomNumber(); @@ -43,7 +43,7 @@ void returnTheRightTimestamp() { } @Test - void returnDefaultTimeIfWrongClassWassPassed() { + void shouldReturnDefaultTimeIfWrongClassWasPassed() { BeatTimeExtractor extractor = new BeatTimeExtractor(); long expectedTime = faker.number().randomNumber(); diff --git a/canary/src/test/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporterTest.java b/canary/src/test/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporterTest.java index 9e7214c0d..4976cc267 100644 --- a/canary/src/test/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporterTest.java +++ b/canary/src/test/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporterTest.java @@ -128,7 +128,7 @@ private static MetricKey createMetricsKey(String host, List tags) { .setServerHost(host) .setServerPort(2023) .setServerVersion("test") - .setId("my_metric") + .setName("my_metric") .addAllTags(tags) .build(); } diff --git a/canary/src/test/java/io/littlehorse/canary/aggregator/serdes/ProtobufSerializerTest.java b/canary/src/test/java/io/littlehorse/canary/aggregator/serdes/ProtobufSerializerTest.java index c796d6d61..3c1541805 100644 --- a/canary/src/test/java/io/littlehorse/canary/aggregator/serdes/ProtobufSerializerTest.java +++ b/canary/src/test/java/io/littlehorse/canary/aggregator/serdes/ProtobufSerializerTest.java @@ -13,13 +13,13 @@ class ProtobufSerializerTest { Faker faker = new Faker(); @Test - void returnNullInCaseOfNull() { + void shouldReturnNullInCaseOfNull() { ProtobufSerializer serializer = new ProtobufSerializer(); assertNull(serializer.serialize(null, null)); } @Test - void beatSerialization() { + void shouldSerializeProtobufMessage() { ProtobufSerializer serializer = new ProtobufSerializer(); BeatValue metric = BeatValue.newBuilder().setTime(Timestamps.now()).build(); diff --git a/canary/src/test/java/io/littlehorse/canary/aggregator/topology/MetricsTopologyTest.java b/canary/src/test/java/io/littlehorse/canary/aggregator/topology/MetricsTopologyTest.java index b819aaa68..c6af09f3a 100644 --- a/canary/src/test/java/io/littlehorse/canary/aggregator/topology/MetricsTopologyTest.java +++ b/canary/src/test/java/io/littlehorse/canary/aggregator/topology/MetricsTopologyTest.java @@ -69,7 +69,7 @@ private static MetricKey newMetricKey(String id, String status, Map tags) { MetricKey.Builder builder = - MetricKey.newBuilder().setServerHost(host).setServerPort(port).setId(id); + MetricKey.newBuilder().setServerHost(host).setServerPort(port).setName(id); if (status != null) { builder.addTags(Tag.newBuilder().setKey("status").setValue(status).build()); @@ -109,6 +109,7 @@ private static TestRecord newBeat( Long latency, String beatStatus, Map tags) { + BeatKey.Builder keyBuilder = BeatKey.newBuilder() .setServerHost(host) .setServerPort(port) @@ -117,7 +118,8 @@ private static TestRecord newBeat( BeatValue.Builder valueBuilder = BeatValue.newBuilder().setTime(Timestamps.now()); if (beatStatus != null) { - keyBuilder.setStatus(beatStatus); + keyBuilder.addTags( + Tag.newBuilder().setKey("status").setValue(beatStatus).build()); } if (tags != null) { @@ -164,7 +166,7 @@ void afterEach() { } @Test - void calculateCountAndLatencyForWfRunRequest() { + void shouldCalculateCountAndLatencyForWfRunRequest() { BeatType expectedType = BeatType.WF_RUN_REQUEST; String expectedTypeName = expectedType.name().toLowerCase(); @@ -177,7 +179,7 @@ void calculateCountAndLatencyForWfRunRequest() { } @Test - void includeBeatTagsIntoMetrics() { + void shouldIncludeBeatTagsIntoMetrics() { BeatType expectedType = BeatType.WF_RUN_REQUEST; String expectedTypeName = expectedType.name().toLowerCase(); Map expectedTags = Map.of("my_tag", "value"); @@ -193,19 +195,22 @@ void includeBeatTagsIntoMetrics() { } @Test - void calculateCountForExhaustedRetries() { - BeatType expectedType = BeatType.GET_WF_RUN_EXHAUSTED_RETRIES; + void shouldCalculateCountForExhaustedRetries() { + BeatType expectedType = BeatType.GET_WF_RUN_REQUEST; String expectedTypeName = expectedType.name().toLowerCase(); + Map reason = Map.of("reason", "canary_exhausted_retries"); - inputTopic.pipeInput(newBeat(expectedType, getRandomId(), null)); - inputTopic.pipeInput(newBeat(expectedType, getRandomId(), null)); - inputTopic.pipeInput(newBeat(expectedType, getRandomId(), null)); + inputTopic.pipeInput(newBeat(expectedType, getRandomId(), null, "error", reason)); + inputTopic.pipeInput(newBeat(expectedType, getRandomId(), null, "error", reason)); + inputTopic.pipeInput(newBeat(expectedType, getRandomId(), null, "error", reason)); - assertThat(store.get(newMetricKey("canary_" + expectedTypeName))).isEqualTo(newMetricValue(3.)); + assertThat(getCount()).isEqualTo(1); + assertThat(store.get(newMetricKey("canary_" + expectedTypeName, "error", reason))) + .isEqualTo(newMetricValue(3.)); } @Test - void calculateCountAndLatencyForWfRunRequestForTwoStatus() { + void shouldCalculateCountAndLatencyForWfRunRequestForTwoStatus() { BeatType expectedType = BeatType.WF_RUN_REQUEST; String expectedTypeName = expectedType.name().toLowerCase(); @@ -226,7 +231,7 @@ void calculateCountAndLatencyForWfRunRequestForTwoStatus() { } @Test - void calculateCountAndLatencyForGetWfRunRequest() { + void shouldCalculateCountAndLatencyForGetWfRunRequest() { BeatType expectedType = BeatType.GET_WF_RUN_REQUEST; String expectedTypeName = expectedType.name().toLowerCase(); @@ -240,7 +245,7 @@ void calculateCountAndLatencyForGetWfRunRequest() { } @Test - void calculateCountAndLatencyForTaskRunWithNoDuplicated() { + void shouldCalculateCountAndLatencyForTaskRunWithNoDuplicated() { BeatType expectedType = BeatType.TASK_RUN_EXECUTION; String expectedTypeName = expectedType.name().toLowerCase(); @@ -253,7 +258,7 @@ void calculateCountAndLatencyForTaskRunWithNoDuplicated() { } @Test - void calculateCountAndLatencyForTaskRunWithDuplicated() { + void shouldCalculateCountAndLatencyForTaskRunWithDuplicated() { BeatType expectedType = BeatType.TASK_RUN_EXECUTION; String expectedTypeName = expectedType.name().toLowerCase(); String expectedUniqueId = getRandomId(); @@ -268,7 +273,7 @@ void calculateCountAndLatencyForTaskRunWithDuplicated() { } @Test - void calculateCountAndLatencyForTaskRunWithTwoDuplicated() { + void shouldCalculateCountAndLatencyForTaskRunWithTwoDuplicated() { BeatType expectedType = BeatType.TASK_RUN_EXECUTION; String expectedTypeName = expectedType.name().toLowerCase(); String expectedUniqueId1 = getRandomId(); @@ -288,7 +293,7 @@ void calculateCountAndLatencyForTaskRunWithTwoDuplicated() { } @Test - void calculateCountAndLatencyForTaskRunWithDuplicatedAndTwoServers() { + void shouldCalculateCountAndLatencyForTaskRunWithDuplicatedAndTwoServers() { BeatType expectedType = BeatType.TASK_RUN_EXECUTION; String expectedTypeName = expectedType.name().toLowerCase(); String expectedUniqueId = getRandomId(); diff --git a/canary/src/test/java/io/littlehorse/canary/config/CanaryConfigTest.java b/canary/src/test/java/io/littlehorse/canary/config/CanaryConfigTest.java index 75d20f053..ed70a5003 100644 --- a/canary/src/test/java/io/littlehorse/canary/config/CanaryConfigTest.java +++ b/canary/src/test/java/io/littlehorse/canary/config/CanaryConfigTest.java @@ -14,7 +14,7 @@ class CanaryConfigTest { public static final String EXPECTED_VALUE = "test"; @Test - void toMapMustCreateCopy() { + void shouldCreateCopyOfInputMap() { Map input = Map.of(KEY, EXPECTED_VALUE); CanaryConfig canaryConfig = new CanaryConfig(input); @@ -24,7 +24,7 @@ void toMapMustCreateCopy() { } @Test - void filterMap() { + void shouldFilterInvalidConfigurations() { Map input = Map.of(KEY, EXPECTED_VALUE, "not.a.valid.key", "to be filtered"); CanaryConfig canaryConfig = new CanaryConfig(input); @@ -34,7 +34,7 @@ void filterMap() { } @Test - void getCommonTags() { + void shouldParseCommonTags() { Map input = Map.of( "lh.canary.metrics.common.tags.application_id", "my_id", "lh.canary.metrics.common.tags.extra", "extra_tag"); @@ -47,7 +47,7 @@ void getCommonTags() { } @Test - void getMetronomeExtraTags() { + void shouldParseMetronomeExtraTags() { Map input = Map.of("lh.canary.metronome.beat.extra.tags.my_tag", "extra_tag"); CanaryConfig canaryConfig = new CanaryConfig(input); @@ -58,7 +58,7 @@ void getMetronomeExtraTags() { } @Test - void getEmptyMetronomeExtraTags() { + void shouldInitializeMetronomeExtraTags() { CanaryConfig canaryConfig = new CanaryConfig(Map.of()); Map output = canaryConfig.getMetronomeBeatExtraTags(); @@ -68,7 +68,7 @@ void getEmptyMetronomeExtraTags() { } @Test - void throwsExceptionIfConfigurationIsNotFound() { + void shouldThrowExceptionIfConfigurationIsNotFound() { CanaryConfig canaryConfig = new CanaryConfig(Map.of()); IllegalArgumentException result = diff --git a/canary/src/test/java/io/littlehorse/canary/config/ConfigLoaderTest.java b/canary/src/test/java/io/littlehorse/canary/config/ConfigLoaderTest.java index 99063a490..fc6b86163 100644 --- a/canary/src/test/java/io/littlehorse/canary/config/ConfigLoaderTest.java +++ b/canary/src/test/java/io/littlehorse/canary/config/ConfigLoaderTest.java @@ -23,7 +23,7 @@ class ConfigLoaderTest { @Test @ClearEnvironmentVariable(key = ENV_VARIABLE_NAME) - void loadFromFile() throws IOException { + void shouldLoadConfigurationsFromFile() throws IOException { Path configPath = createTemporaryProperties(); CanaryConfig canaryConfig = ConfigLoader.load(configPath); @@ -33,7 +33,7 @@ void loadFromFile() throws IOException { @Test @SetEnvironmentVariable(key = ENV_VARIABLE_NAME, value = EXPECTED_ENV_VALUE) - void overwriteFileConfigWithEnvConfigs() throws IOException { + void shouldOverwriteFileConfigWithEnvConfigs() throws IOException { Path configPath = createTemporaryProperties(); CanaryConfig canaryConfig = ConfigLoader.load(configPath); @@ -43,7 +43,7 @@ void overwriteFileConfigWithEnvConfigs() throws IOException { @Test @ClearEnvironmentVariable(key = ENV_VARIABLE_NAME) - void loadFromPropertiesObject() { + void shouldLoadFromPropertiesObject() { Properties properties = new Properties(); properties.put(KEY, EXPECTED_PROPERTIES_VALUE); @@ -54,7 +54,7 @@ void loadFromPropertiesObject() { @Test @SetEnvironmentVariable(key = ENV_VARIABLE_NAME, value = EXPECTED_ENV_VALUE) - void overwritePropertiesWithEnvConfigs() { + void shouldOverwritePropertiesWithEnvConfigs() { Properties properties = new Properties(); properties.put(EXPECTED_KEY, EXPECTED_PROPERTIES_VALUE); diff --git a/canary/src/test/java/io/littlehorse/canary/config/KafkaProducerConfigTest.java b/canary/src/test/java/io/littlehorse/canary/config/KafkaProducerConfigTest.java index 0bce2425c..689cfb4f5 100644 --- a/canary/src/test/java/io/littlehorse/canary/config/KafkaProducerConfigTest.java +++ b/canary/src/test/java/io/littlehorse/canary/config/KafkaProducerConfigTest.java @@ -4,28 +4,25 @@ import static org.assertj.core.api.Assertions.entry; import java.util.Map; -import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; -@Slf4j class KafkaProducerConfigTest { public static final String EXPECTED_KEY = "bootstrap.servers"; public static final String EXPECTED_VALUE = "localhost:9092"; @Test - void toMapMustCreateCopy() { + void shouldCreateCopyOfInputMap() { Map input = Map.of("kafka." + EXPECTED_KEY, EXPECTED_VALUE); KafkaConfig kafkaAdminConfig = new KafkaConfig(input); Map output = kafkaAdminConfig.toMap(); - log.info("Configs: {}", output); assertThat(output).isNotSameAs(input); } @Test - void filterMap() { + void shouldFilterInvalidConfigurations() { Map input = Map.of("kafka." + EXPECTED_KEY, EXPECTED_VALUE, "not.a.valid.key", "To be filtered"); KafkaConfig kafkaAdminConfig = new KafkaConfig(input); @@ -36,7 +33,7 @@ void filterMap() { } @Test - void mustKeepKafkaConfigs() { + void shouldKeepKafkaConfigs() { Map input = Map.of( "kafka.key.serializer", "org.apache.kafka.common.serialization.StringSerializer", "kafka.value.serializer", "org.apache.kafka.common.serialization.BytesSerializer", @@ -53,7 +50,6 @@ void mustKeepKafkaConfigs() { KafkaConfig kafkaAdminConfig = new KafkaConfig(input); Map output = kafkaAdminConfig.toMap(); - log.info("Configs: {}", output); assertThat(output).isEqualTo(expected); } diff --git a/canary/src/test/java/io/littlehorse/canary/config/LittleHorseConfigTest.java b/canary/src/test/java/io/littlehorse/canary/config/LittleHorseConfigTest.java index f96571f36..cf7742129 100644 --- a/canary/src/test/java/io/littlehorse/canary/config/LittleHorseConfigTest.java +++ b/canary/src/test/java/io/littlehorse/canary/config/LittleHorseConfigTest.java @@ -16,7 +16,7 @@ class WorkerConfigs { public static final String EXPECTED_LHW_VALUE = "MY-ID-123"; @Test - void toMapMustCreateCopy() { + void shouldCreateCopyOfInputMap() { Map input = Map.of(INPUT_LHW_KEY, EXPECTED_LHW_VALUE); LittleHorseConfig littleHorseConfig = new LittleHorseConfig(input); @@ -26,7 +26,7 @@ void toMapMustCreateCopy() { } @Test - void filterMapForLHW() { + void shouldFilterInvalidConfigurations() { Map input = Map.of(INPUT_LHW_KEY, EXPECTED_LHW_VALUE, "not.a.valid.key", "To be filtered"); LittleHorseConfig littleHorseConfig = new LittleHorseConfig(input); @@ -43,7 +43,7 @@ class ClientConfigs { public static final String EXPECTED_LHC_VALUE = "localhost"; @Test - void toMapMustCreateCopy() { + void shouldCreateCopyOfInputMap() { Map input = Map.of(INPUT_LHC_KEY, EXPECTED_LHC_VALUE); LittleHorseConfig littleHorseConfig = new LittleHorseConfig(input); @@ -53,7 +53,7 @@ void toMapMustCreateCopy() { } @Test - void filterMapForLHC() { + void shouldFilterInvalidConfigurations() { Map input = Map.of(INPUT_LHC_KEY, EXPECTED_LHC_VALUE, "not.a.valid.key", "To be filtered"); LittleHorseConfig littleHorseConfig = new LittleHorseConfig(input); diff --git a/canary/src/test/java/io/littlehorse/canary/metronome/internal/LocalRepositoryTest.java b/canary/src/test/java/io/littlehorse/canary/metronome/internal/LocalRepositoryTest.java index 5226f45c1..9ae8d9163 100644 --- a/canary/src/test/java/io/littlehorse/canary/metronome/internal/LocalRepositoryTest.java +++ b/canary/src/test/java/io/littlehorse/canary/metronome/internal/LocalRepositoryTest.java @@ -21,19 +21,13 @@ void beforeEach() throws IOException { } @Test - void save() { + void shouldSaveAttemptRecord() { String id = UUID.randomUUID().toString(); - Attempt attempt = newAttempt(9); + Attempt attempt = + Attempt.newBuilder().setAttempt(9).setStart(Timestamps.now()).build(); repository.save(id, attempt); assertThat(repository.get(id)).isEqualTo(attempt); } - - private static Attempt newAttempt(int attempt) { - return Attempt.newBuilder() - .setAttempt(attempt) - .setStart(Timestamps.now()) - .build(); - } } diff --git a/canary/src/test/java/io/littlehorse/canary/metronome/model/BeatStatusTest.java b/canary/src/test/java/io/littlehorse/canary/metronome/model/BeatStatusTest.java new file mode 100644 index 000000000..0d516d091 --- /dev/null +++ b/canary/src/test/java/io/littlehorse/canary/metronome/model/BeatStatusTest.java @@ -0,0 +1,50 @@ +package io.littlehorse.canary.metronome.model; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.grpc.Status; +import io.littlehorse.canary.proto.Tag; +import org.junit.jupiter.api.Test; + +class BeatStatusTest { + + @Test + void shouldInitializeStatus() { + BeatStatus beatStatus = BeatStatus.builder(BeatStatus.Code.OK).build(); + assertThat(beatStatus.toTags()) + .containsExactly( + Tag.newBuilder().setKey("status").setValue("ok").build(), + Tag.newBuilder().setKey("reason").build()); + } + + @Test + void shouldFormatReasonAsSnakeCaseException() { + BeatStatus beatStatus = BeatStatus.builder(BeatStatus.Code.ERROR) + .reason(NullPointerException.class.getSimpleName()) + .build(); + + assertThat(beatStatus.toTags()) + .containsExactly( + Tag.newBuilder().setKey("status").setValue("error").build(), + Tag.newBuilder() + .setKey("reason") + .setValue("canary_null_pointer_exception") + .build()); + } + + @Test + void shouldFormatReasonForGrpc() { + BeatStatus beatStatus = BeatStatus.builder(BeatStatus.Code.ERROR) + .source(BeatStatus.Source.GRPC) + .reason(Status.Code.ALREADY_EXISTS.name()) + .build(); + + assertThat(beatStatus.toTags()) + .containsExactly( + Tag.newBuilder().setKey("status").setValue("error").build(), + Tag.newBuilder() + .setKey("reason") + .setValue("grpc_already_exists") + .build()); + } +} diff --git a/canary/src/test/java/io/littlehorse/canary/metronome/model/BeatTest.java b/canary/src/test/java/io/littlehorse/canary/metronome/model/BeatTest.java new file mode 100644 index 000000000..17ae6eac8 --- /dev/null +++ b/canary/src/test/java/io/littlehorse/canary/metronome/model/BeatTest.java @@ -0,0 +1,51 @@ +package io.littlehorse.canary.metronome.model; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.protobuf.util.Timestamps; +import io.grpc.Status; +import io.littlehorse.canary.proto.BeatKey; +import io.littlehorse.canary.proto.BeatType; +import io.littlehorse.canary.proto.Tag; +import java.time.Instant; +import org.junit.jupiter.api.Test; + +class BeatTest { + + @Test + void shouldSetDefaultTimestamp() { + Beat beat = Beat.builder(BeatType.WF_RUN_REQUEST).build(); + + assertThat(beat.toBeatValue().hasTime()).isTrue(); + } + + @Test + void shouldSetCustomTimestamp() { + Instant now = Instant.now(); + Beat beat = Beat.builder(BeatType.WF_RUN_REQUEST).time(now).build(); + + assertThat(Timestamps.toMillis(beat.toBeatValue().getTime())).isEqualTo(now.toEpochMilli()); + } + + @Test + void shouldSetStatusTags() { + BeatStatus beatStatus = BeatStatus.builder(BeatStatus.Code.ERROR) + .source(BeatStatus.Source.GRPC) + .reason(Status.Code.UNAVAILABLE.name()) + .build(); + Beat beat = Beat.builder(BeatType.WF_RUN_REQUEST).status(beatStatus).build(); + + assertThat(beat.toBeatKey()) + .isEqualTo(BeatKey.newBuilder() + .setType(BeatType.WF_RUN_REQUEST) + .addTags(Tag.newBuilder() + .setKey("status") + .setValue("error") + .build()) + .addTags(Tag.newBuilder() + .setKey("reason") + .setValue("grpc_unavailable") + .build()) + .build()); + } +} diff --git a/dashboard/src/app/(authenticated)/[tenantId]/(diagram)/components/NodeTypes/Task/TaskDetails.tsx b/dashboard/src/app/(authenticated)/[tenantId]/(diagram)/components/NodeTypes/Task/TaskDetails.tsx index e34de7924..5e4e15a26 100644 --- a/dashboard/src/app/(authenticated)/[tenantId]/(diagram)/components/NodeTypes/Task/TaskDetails.tsx +++ b/dashboard/src/app/(authenticated)/[tenantId]/(diagram)/components/NodeTypes/Task/TaskDetails.tsx @@ -84,7 +84,7 @@ export const TaskDetails: FC<{ } -
+
diff --git a/dashboard/src/app/(authenticated)/[tenantId]/components/OverflowText.tsx b/dashboard/src/app/(authenticated)/[tenantId]/components/OverflowText.tsx index c25e67c0a..28192f498 100644 --- a/dashboard/src/app/(authenticated)/[tenantId]/components/OverflowText.tsx +++ b/dashboard/src/app/(authenticated)/[tenantId]/components/OverflowText.tsx @@ -1,11 +1,11 @@ 'use client' -import { FC, useEffect, useRef, useState } from 'react' -import { cn } from '@/components/utils' +import { tryFormatAsJson } from '@/app/utils/tryFormatAsJson' import { Button } from '@/components/ui/button' -import { ChevronRight } from 'lucide-react' import { Dialog, DialogContent, DialogTrigger } from '@/components/ui/dialog' +import { cn } from '@/components/utils' +import { ChevronRight } from 'lucide-react' +import { FC, useEffect, useRef, useState } from 'react' import { CopyButton } from './CopyButton' -import { tryFormatAsJson } from '@/app/utils/tryFormatAsJson' type OverflowTextProps = { text: string @@ -37,7 +37,7 @@ export const OverflowText: FC = ({ text, className, variant } className )} > - {formattedText} + {formattedText}
View @@ -51,14 +51,14 @@ export const OverflowText: FC = ({ text, className, variant } 'bg-status-failed text-red-500': variant === 'error', })} > -
{formattedText}
+
{formattedText}
) } return ( -
+
{formattedText}
) diff --git a/dashboard/src/app/(authenticated)/[tenantId]/components/Principal.tsx b/dashboard/src/app/(authenticated)/[tenantId]/components/Principal.tsx index 434bd47c7..18b02a4ef 100644 --- a/dashboard/src/app/(authenticated)/[tenantId]/components/Principal.tsx +++ b/dashboard/src/app/(authenticated)/[tenantId]/components/Principal.tsx @@ -1,20 +1,22 @@ -import { useWhoAmI } from '@/contexts/WhoAmIContext' import { DropdownMenu, - DropdownMenuSeparator, - DropdownMenuTrigger, DropdownMenuContent, DropdownMenuItem, DropdownMenuLabel, + DropdownMenuSeparator, + DropdownMenuTrigger, } from '@/components/ui/dropdown-menu' +import { useWhoAmI } from '@/contexts/WhoAmIContext' import { signOut } from 'next-auth/react' -import { FC, Fragment } from 'react' +import { FC } from 'react' function classNames(...classes: Array) { return classes.filter(Boolean).join(' ') } export const Principal: FC = () => { const { user } = useWhoAmI() + const isAuthEnabled = process.env.LHD_OAUTH_ENABLED === 'true' + return ( @@ -25,9 +27,11 @@ export const Principal: FC = () => { {user?.name} - signOut()} className="block w-full px-4 py-2 text-left text-sm"> - Sign out - + {isAuthEnabled && ( + signOut()} className="block w-full px-4 py-2 text-left text-sm"> + Sign out + + )} ) diff --git a/dashboard/src/app/(authenticated)/[tenantId]/components/Search.tsx b/dashboard/src/app/(authenticated)/[tenantId]/components/Search.tsx index 93440c3c1..188ad0fd7 100644 --- a/dashboard/src/app/(authenticated)/[tenantId]/components/Search.tsx +++ b/dashboard/src/app/(authenticated)/[tenantId]/components/Search.tsx @@ -1,9 +1,9 @@ 'use client' import { SEARCH_DEFAULT_LIMIT, SEARCH_ENTITIES, SearchType } from '@/app/constants' -import useSWRInfinite from 'swr/infinite' import { RefreshCwIcon } from 'lucide-react' import { useParams, useSearchParams } from 'next/navigation' import { FC, useState } from 'react' +import useSWRInfinite from 'swr/infinite' import { SearchFooter } from './SearchFooter' import { SearchHeader } from './SearchHeader' import { SearchResponse, search } from './searchAction' diff --git a/dashboard/src/app/(authenticated)/[tenantId]/components/VersionTag.tsx b/dashboard/src/app/(authenticated)/[tenantId]/components/VersionTag.tsx new file mode 100644 index 000000000..db73143b3 --- /dev/null +++ b/dashboard/src/app/(authenticated)/[tenantId]/components/VersionTag.tsx @@ -0,0 +1,9 @@ +import { TagIcon } from "lucide-react"; + +export default function VersionTag({ label }: { label: string }) { + return ( +
+ {label} +
+ ) +} \ No newline at end of file diff --git a/dashboard/src/app/(authenticated)/[tenantId]/components/tables/ExternalEventDefTable.tsx b/dashboard/src/app/(authenticated)/[tenantId]/components/tables/ExternalEventDefTable.tsx index f267f2c41..0b68db321 100644 --- a/dashboard/src/app/(authenticated)/[tenantId]/components/tables/ExternalEventDefTable.tsx +++ b/dashboard/src/app/(authenticated)/[tenantId]/components/tables/ExternalEventDefTable.tsx @@ -1,10 +1,10 @@ import { ExternalEventDefId } from 'littlehorse-client/proto' -import LinkWithTenant from '../LinkWithTenant' import { FC, Fragment } from 'react' import { SearchResultProps } from '.' +import { SelectionLink } from '../SelectionLink' export const ExternalEventDefTable: FC = ({ pages = [] }) => { - if (pages.length === 0) { + if (pages.every(page => page.results.length === 0)) { return
No ExternalEventDefs
} @@ -13,11 +13,9 @@ export const ExternalEventDefTable: FC = ({ pages = [] }) => {pages.map((page, i) => ( {page.results.map(({ name }: ExternalEventDefId) => ( -
- - {name} - -
+ +

{name}

+
))}
))} diff --git a/dashboard/src/app/(authenticated)/[tenantId]/components/tables/TaskDefTable.tsx b/dashboard/src/app/(authenticated)/[tenantId]/components/tables/TaskDefTable.tsx index 69626ff19..57da5c5b6 100644 --- a/dashboard/src/app/(authenticated)/[tenantId]/components/tables/TaskDefTable.tsx +++ b/dashboard/src/app/(authenticated)/[tenantId]/components/tables/TaskDefTable.tsx @@ -1,13 +1,13 @@ import { TaskDefId } from 'littlehorse-client/proto' +import { useParams } from 'next/navigation' import { FC, Fragment } from 'react' import { SearchResultProps } from '.' -import { useParams } from 'next/navigation' -import LinkWithTenant from '../LinkWithTenant' +import { SelectionLink } from '../SelectionLink' export const TaskDefTable: FC = ({ pages = [] }) => { const { tenantId } = useParams() - if (pages.length === 0) { + if (pages.every(page => page.results.length === 0)) { return
No TaskDefs
} @@ -16,11 +16,9 @@ export const TaskDefTable: FC = ({ pages = [] }) => { {pages.map((page, i) => ( {page.results.map(({ name }: TaskDefId) => ( -
- - {name} - -
+ +

{name}

+
))}
))} diff --git a/dashboard/src/app/(authenticated)/[tenantId]/components/tables/UserTaskDefTable.tsx b/dashboard/src/app/(authenticated)/[tenantId]/components/tables/UserTaskDefTable.tsx index 84c9b2eb7..7ba79ac8a 100644 --- a/dashboard/src/app/(authenticated)/[tenantId]/components/tables/UserTaskDefTable.tsx +++ b/dashboard/src/app/(authenticated)/[tenantId]/components/tables/UserTaskDefTable.tsx @@ -1,12 +1,11 @@ -import { Separator } from '@/components/ui/separator' import { UserTaskDefId } from 'littlehorse-client/proto' -import { TagIcon } from 'lucide-react' import { FC, Fragment } from 'react' import { SearchResultProps } from '.' -import LinkWithTenant from '../LinkWithTenant' +import { SelectionLink } from '../SelectionLink' +import VersionTag from '../VersionTag' export const UserTaskDefTable: FC = ({ pages = [] }) => { - if (pages.length === 0) { + if (pages.every(page => page.results.length === 0)) { return
No UserTaskDefs
} @@ -15,16 +14,10 @@ export const UserTaskDefTable: FC = ({ pages = [] }) => { {pages.map((page, i) => ( {page.results.map(({ name, version }: UserTaskDefId) => ( - -
- - {name} - -
- v{version} -
-
-
+ +

{name}

+ +
))}
))} diff --git a/dashboard/src/app/(authenticated)/[tenantId]/components/tables/WfSpecTable.tsx b/dashboard/src/app/(authenticated)/[tenantId]/components/tables/WfSpecTable.tsx index f417546dc..db63d11ce 100644 --- a/dashboard/src/app/(authenticated)/[tenantId]/components/tables/WfSpecTable.tsx +++ b/dashboard/src/app/(authenticated)/[tenantId]/components/tables/WfSpecTable.tsx @@ -1,12 +1,11 @@ import { getLatestWfSpecs } from '@/app/actions/getLatestWfSpec' -import { Separator } from '@/components/ui/separator' import { WfSpecData } from '@/types' -import { TagIcon } from 'lucide-react' import { useParams, useRouter } from 'next/navigation' -import { FC, Fragment, useEffect, useState } from 'react' +import { FC, useEffect, useState } from 'react' import { SearchResultProps } from '.' import { SelectionLink } from '../SelectionLink' +import VersionTag from '../VersionTag' export const WfSpecTable: FC = ({ pages = [] }) => { const router = useRouter() const tenantId = useParams().tenantId as string @@ -17,7 +16,7 @@ export const WfSpecTable: FC = ({ pages = [] }) => { getLatestWfSpecs(tenantId, wfSpecNames).then(setWfSpecs) }, [pages, tenantId]) - if (pages.length === 0) { + if (pages.every(page => page.results.length === 0)) { return
No WfSpecs
} @@ -25,16 +24,10 @@ export const WfSpecTable: FC = ({ pages = [] }) => {
{wfSpecs.map(wfSpec => ( - - -

{wfSpec.name}

-
- - Latest: v{wfSpec.latestVersion} -
-
- -
+ +

{wfSpec.name}

+ +
))}
diff --git a/dashboard/src/app/(authenticated)/[tenantId]/components/tables/WorkflowEventDefTable.tsx b/dashboard/src/app/(authenticated)/[tenantId]/components/tables/WorkflowEventDefTable.tsx index 5e60506b2..1e4767187 100644 --- a/dashboard/src/app/(authenticated)/[tenantId]/components/tables/WorkflowEventDefTable.tsx +++ b/dashboard/src/app/(authenticated)/[tenantId]/components/tables/WorkflowEventDefTable.tsx @@ -1,13 +1,14 @@ import { WorkflowEventDefId } from 'littlehorse-client/proto' +import { useParams } from 'next/navigation' import { FC, Fragment } from 'react' import { SearchResultProps } from '.' -import { useParams } from 'next/navigation' import LinkWithTenant from '../LinkWithTenant' +import { SelectionLink } from '../SelectionLink' export const WorkflowEventDefTable: FC = ({ pages = [] }) => { const { tenantId } = useParams() - if (pages.length === 0) { + if (pages.every(page => page.results.length === 0)) { return
No WorkflowEventDefs
} @@ -16,11 +17,9 @@ export const WorkflowEventDefTable: FC = ({ pages = [] }) => {pages.map((page, i) => ( {page.results.map(({ name }: WorkflowEventDefId) => ( -
- - {name} - -
+ +

{name}

+
))}
))} diff --git a/dashboard/src/app/(authenticated)/layout.tsx b/dashboard/src/app/(authenticated)/layout.tsx index 29484f8fe..e610db672 100644 --- a/dashboard/src/app/(authenticated)/layout.tsx +++ b/dashboard/src/app/(authenticated)/layout.tsx @@ -2,11 +2,11 @@ import { Toaster } from '@/components/ui/sonner' import { WhoAmIContext } from '@/contexts/WhoAmIContext' import type { Metadata } from 'next' import { Inter } from 'next/font/google' -import { Header } from './[tenantId]/components/Header' -import { QueryProvider } from './[tenantId]/components/QueryProvider' +import { SWRConfig } from 'swr' import getWhoAmI from '../getWhoami' import '../globals.css' -import { SWRConfig } from 'swr' +import { Header } from './[tenantId]/components/Header' +import { QueryProvider } from './[tenantId]/components/QueryProvider' const inter = Inter({ subsets: ['latin'] }) diff --git a/dashboard/tailwind.config.ts b/dashboard/tailwind.config.ts index a24595ffe..668bf3efd 100644 --- a/dashboard/tailwind.config.ts +++ b/dashboard/tailwind.config.ts @@ -5,91 +5,94 @@ const config = { content: ['./pages/**/*.{ts,tsx}', './components/**/*.{ts,tsx}', './app/**/*.{ts,tsx}', './src/**/*.{ts,tsx}'], prefix: '', theme: { - container: { - center: true, - padding: '2rem', - screens: { - '2xl': '1400px' - } - }, - extend: { - colors: { - border: 'hsl(var(--border))', - input: 'hsl(var(--input))', - ring: 'hsl(var(--ring))', - background: 'hsl(var(--background))', - foreground: 'hsl(var(--foreground))', - primary: { - DEFAULT: 'hsl(var(--primary))', - foreground: 'hsl(var(--primary-foreground))' - }, - secondary: { - DEFAULT: 'hsl(var(--secondary))', - foreground: 'hsl(var(--secondary-foreground))' - }, - destructive: { - DEFAULT: 'hsl(var(--destructive))', - foreground: 'hsl(var(--destructive-foreground))' - }, - muted: { - DEFAULT: 'hsl(var(--muted))', - foreground: 'hsl(var(--muted-foreground))' - }, - accent: { - DEFAULT: 'hsl(var(--accent))', - foreground: 'hsl(var(--accent-foreground))' - }, - popover: { - DEFAULT: 'hsl(var(--popover))', - foreground: 'hsl(var(--popover-foreground))' - }, - card: { - DEFAULT: 'hsl(var(--card))', - foreground: 'hsl(var(--card-foreground))' - }, - status: { - running: 'theme("colors.blue.300")', - success: 'theme("colors.green.300")', - failed: 'theme("colors.red.300")', - exception: 'theme("colors.orange.300")', - halting: 'theme("colors.purple.300")' - }, - chart: { - '1': 'hsl(var(--chart-1))', - '2': 'hsl(var(--chart-2))', - '3': 'hsl(var(--chart-3))', - '4': 'hsl(var(--chart-4))', - '5': 'hsl(var(--chart-5))' - } - }, - borderRadius: { - lg: 'var(--radius)', - md: 'calc(var(--radius) - 2px)', - sm: 'calc(var(--radius) - 4px)' - }, - keyframes: { - 'accordion-down': { - from: { - height: '0' - }, - to: { - height: 'var(--radix-accordion-content-height)' - } - }, - 'accordion-up': { - from: { - height: 'var(--radix-accordion-content-height)' - }, - to: { - height: '0' - } - } - }, - animation: { - 'accordion-down': 'accordion-down 0.2s ease-out', - 'accordion-up': 'accordion-up 0.2s ease-out' - } - } + container: { + center: true, + padding: '2rem', + screens: { + '2xl': '1400px', + }, + }, + extend: { + colors: { + border: 'hsl(var(--border))', + input: 'hsl(var(--input))', + ring: 'hsl(var(--ring))', + background: 'hsl(var(--background))', + foreground: 'hsl(var(--foreground))', + primary: { + DEFAULT: 'hsl(var(--primary))', + foreground: 'hsl(var(--primary-foreground))', + }, + secondary: { + DEFAULT: 'hsl(var(--secondary))', + foreground: 'hsl(var(--secondary-foreground))', + }, + destructive: { + DEFAULT: 'hsl(var(--destructive))', + foreground: 'hsl(var(--destructive-foreground))', + }, + muted: { + DEFAULT: 'hsl(var(--muted))', + foreground: 'hsl(var(--muted-foreground))', + }, + accent: { + DEFAULT: 'hsl(var(--accent))', + foreground: 'hsl(var(--accent-foreground))', + }, + popover: { + DEFAULT: 'hsl(var(--popover))', + foreground: 'hsl(var(--popover-foreground))', + }, + card: { + DEFAULT: 'hsl(var(--card))', + foreground: 'hsl(var(--card-foreground))', + }, + status: { + running: 'theme("colors.blue.300")', + success: 'theme("colors.green.300")', + failed: 'theme("colors.red.300")', + exception: 'theme("colors.orange.300")', + halting: 'theme("colors.purple.300")', + }, + chart: { + '1': 'hsl(var(--chart-1))', + '2': 'hsl(var(--chart-2))', + '3': 'hsl(var(--chart-3))', + '4': 'hsl(var(--chart-4))', + '5': 'hsl(var(--chart-5))', + }, + }, + borderRadius: { + lg: 'var(--radius)', + md: 'calc(var(--radius) - 2px)', + sm: 'calc(var(--radius) - 4px)', + }, + keyframes: { + 'accordion-down': { + from: { + height: '0', + }, + to: { + height: 'var(--radix-accordion-content-height)', + }, + }, + 'accordion-up': { + from: { + height: 'var(--radix-accordion-content-height)', + }, + to: { + height: '0', + }, + }, + }, + animation: { + 'accordion-down': 'accordion-down 0.2s ease-out', + 'accordion-up': 'accordion-up 0.2s ease-out', + }, + fontFamily: { + code: ['Fira Code', 'sans-serif'], + }, + }, }, plugins: [require('tailwindcss-animate')], } satisfies Config diff --git a/sdk-dotnet/LittleHorse.Sdk.Tests/Workflow/Spec/WorkflowThreadErrorsAndExceptionsTest.cs b/sdk-dotnet/LittleHorse.Sdk.Tests/Workflow/Spec/WorkflowThreadErrorsAndExceptionsTest.cs index fa0c5c242..a50b34cc8 100644 --- a/sdk-dotnet/LittleHorse.Sdk.Tests/Workflow/Spec/WorkflowThreadErrorsAndExceptionsTest.cs +++ b/sdk-dotnet/LittleHorse.Sdk.Tests/Workflow/Spec/WorkflowThreadErrorsAndExceptionsTest.cs @@ -44,59 +44,96 @@ void EntryPointAction(WorkflowThread wf) var compiledWfThread = workflowThread.Compile(); - var expectedSpec = new ThreadSpec(); - var entrypoint = new Node - { - Entrypoint = new EntrypointNode(), - OutgoingEdges = + var expectedSpec = GetExpectedThreadSpec( + new FailureHandlerDef { - new Edge { SinkNodeName = "1-fail-TASK" } - } - }; + HandlerSpecName = "exn-handler-1-fail-TASK-FAILURE_TYPE_ERROR", + AnyFailureOfType = FailureHandlerDef.Types.LHFailureType.FailureTypeError + }); - var failTask = new Node + var expectedNumberOfNodes = numberOfEntrypointNodes + numberOfExitNodes + numberOfTasks; + Assert.Equal(expectedNumberOfNodes, compiledWfThread.Nodes.Count); + Assert.Equal(expectedSpec, compiledWfThread); + } + + [Fact] + public void WfThread_WithSpecificError_ShouldCompileErrorHandling() + { + var numberOfExitNodes = 1; + var numberOfEntrypointNodes = 1; + var numberOfTasks = 2; + var workflowName = "TestWorkflow"; + var mockParentWorkflow = new Mock(workflowName, _action); + + void EntryPointAction(WorkflowThread wf) { - Task = new TaskNode - { - TaskDefId = new TaskDefId { Name = "fail" } - }, - OutgoingEdges = { new Edge { SinkNodeName = "2-my-task-TASK" } }, - FailureHandlers = - { - new FailureHandlerDef + NodeOutput node = wf.Execute("fail"); + wf.HandleError( + node, + LHErrorType.Timeout, + handler => { - HandlerSpecName = "exn-handler-1-fail-TASK-FAILURE_TYPE_ERROR", - AnyFailureOfType = FailureHandlerDef.Types.LHFailureType.FailureTypeError + handler.Execute("my-task"); } - } - }; + ); + wf.Execute("my-task"); + } + var workflowThread = new WorkflowThread(mockParentWorkflow.Object, EntryPointAction); - var myTask = new Node - { - Task = new TaskNode + var compiledWfThread = workflowThread.Compile(); + + var expectedSpec = GetExpectedThreadSpec( + new FailureHandlerDef { - TaskDefId = new TaskDefId { Name = "my-task" } - }, - OutgoingEdges = { new Edge { SinkNodeName = "3-exit-EXIT" } } - }; + HandlerSpecName = "exn-handler-1-fail-TASK-TIMEOUT", + SpecificFailure = "TIMEOUT" + }); - var exitNode = new Node + var expectedNumberOfNodes = numberOfEntrypointNodes + numberOfExitNodes + numberOfTasks; + Assert.Equal(expectedNumberOfNodes, compiledWfThread.Nodes.Count); + Assert.Equal(expectedSpec, compiledWfThread); + } + + [Fact] + public void WfThread_WithExceptionName_ShouldCompileExceptionHandling() + { + var numberOfExitNodes = 1; + var numberOfEntrypointNodes = 1; + var numberOfTasks = 2; + var workflowName = "TestWorkflow"; + var mockParentWorkflow = new Mock(workflowName, _action); + + void EntryPointAction(WorkflowThread wf) { - Exit = new ExitNode() - }; + NodeOutput node = wf.Execute("fail"); + wf.HandleException( + node, + "any-business-exception", + handler => + { + handler.Execute("my-task"); + } + ); + wf.Execute("my-task"); + } + var workflowThread = new WorkflowThread(mockParentWorkflow.Object, EntryPointAction); - expectedSpec.Nodes.Add("0-entrypoint-ENTRYPOINT", entrypoint); - expectedSpec.Nodes.Add("1-fail-TASK", failTask); - expectedSpec.Nodes.Add("2-my-task-TASK", myTask); - expectedSpec.Nodes.Add("3-exit-EXIT", exitNode); + var compiledWfThread = workflowThread.Compile(); + + var expectedSpec = GetExpectedThreadSpec( + new FailureHandlerDef + { + HandlerSpecName = "exn-handler-1-fail-TASK-any-business-exception", + SpecificFailure = "any-business-exception" + }); var expectedNumberOfNodes = numberOfEntrypointNodes + numberOfExitNodes + numberOfTasks; Assert.Equal(expectedNumberOfNodes, compiledWfThread.Nodes.Count); Assert.Equal(expectedSpec, compiledWfThread); - } - + } + [Fact] - public void WfThread_WithSpecificError_ShouldCompileErrorHandling() + public void WfThread_WithoutExceptionName_ShouldCompileExceptionHandling() { var numberOfExitNodes = 1; var numberOfEntrypointNodes = 1; @@ -107,9 +144,8 @@ public void WfThread_WithSpecificError_ShouldCompileErrorHandling() void EntryPointAction(WorkflowThread wf) { NodeOutput node = wf.Execute("fail"); - wf.HandleError( + wf.HandleException( node, - LHErrorType.Timeout, handler => { handler.Execute("my-task"); @@ -120,7 +156,57 @@ void EntryPointAction(WorkflowThread wf) var workflowThread = new WorkflowThread(mockParentWorkflow.Object, EntryPointAction); var compiledWfThread = workflowThread.Compile(); + + var expectedSpec = GetExpectedThreadSpec( + new FailureHandlerDef + { + HandlerSpecName = "exn-handler-1-fail-TASK", + AnyFailureOfType = FailureHandlerDef.Types.LHFailureType.FailureTypeException + }); + + var expectedNumberOfNodes = numberOfEntrypointNodes + numberOfExitNodes + numberOfTasks; + Assert.Equal(expectedNumberOfNodes, compiledWfThread.Nodes.Count); + Assert.Equal(expectedSpec, compiledWfThread); + } + + [Fact] + public void WfThread_WithBusinessException_ShouldCompileAnyFailureHandling() + { + var numberOfExitNodes = 1; + var numberOfEntrypointNodes = 1; + var numberOfTasks = 2; + var workflowName = "TestWorkflow"; + var mockParentWorkflow = new Mock(workflowName, _action); + + void EntryPointAction(WorkflowThread wf) + { + NodeOutput node = wf.Execute("fail"); + wf.HandleAnyFailure( + node, + handler => + { + handler.Execute("my-task"); + } + ); + wf.Execute("my-task"); + } + var workflowThread = new WorkflowThread(mockParentWorkflow.Object, EntryPointAction); + var compiledWfThread = workflowThread.Compile(); + + var expectedSpec = GetExpectedThreadSpec( + new FailureHandlerDef + { + HandlerSpecName = "exn-handler-1-fail-TASK-any-failure", + }); + + var expectedNumberOfNodes = numberOfEntrypointNodes + numberOfExitNodes + numberOfTasks; + Assert.Equal(expectedNumberOfNodes, compiledWfThread.Nodes.Count); + Assert.Equal(expectedSpec, compiledWfThread); + } + + private ThreadSpec GetExpectedThreadSpec(FailureHandlerDef failureHandlerDef) + { var expectedSpec = new ThreadSpec(); var entrypoint = new Node { @@ -140,11 +226,7 @@ void EntryPointAction(WorkflowThread wf) OutgoingEdges = { new Edge { SinkNodeName = "2-my-task-TASK" } }, FailureHandlers = { - new FailureHandlerDef - { - HandlerSpecName = "exn-handler-1-fail-TASK-TIMEOUT", - SpecificFailure = "TIMEOUT" - } + failureHandlerDef } }; @@ -167,8 +249,6 @@ void EntryPointAction(WorkflowThread wf) expectedSpec.Nodes.Add("2-my-task-TASK", myTask); expectedSpec.Nodes.Add("3-exit-EXIT", exitNode); - var expectedNumberOfNodes = numberOfEntrypointNodes + numberOfExitNodes + numberOfTasks; - Assert.Equal(expectedNumberOfNodes, compiledWfThread.Nodes.Count); - Assert.Equal(expectedSpec, compiledWfThread); + return expectedSpec; } } \ No newline at end of file diff --git a/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/WorkflowThread.cs b/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/WorkflowThread.cs index 7a1ba576e..0345d66af 100644 --- a/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/WorkflowThread.cs +++ b/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/WorkflowThread.cs @@ -661,9 +661,10 @@ public void Fail(string failureName, string message) Fail(null, failureName, message); } - private FailureHandlerDef BuildFailureHandlerDef(NodeOutput node, string error, Action handler) + private FailureHandlerDef BuildFailureHandlerDef(NodeOutput node, string error, Action handler) { - string threadName = $"exn-handler-{node.NodeName}-{error}"; + string suffix = !string.IsNullOrEmpty(error) ? $"-{error}" : string.Empty; + string threadName = $"exn-handler-{node.NodeName}{suffix}"; threadName = _parent.AddSubThread(threadName, handler); @@ -677,4 +678,61 @@ private void AddFailureHandlerDef(FailureHandlerDef handlerDef, NodeOutput node) lastNode.FailureHandlers.Add(handlerDef); } + + /// + /// Attaches an Exception Handler to the specified NodeOutput, enabling it to handle specific + /// types of exceptions as defined by the 'exceptionName' parameter. If 'exceptionName' is null, + /// the handler will catch all exceptions. + /// + /// + /// The NodeOutput instance to which the Exception Handler will be attached. + /// + /// + /// The name of the specific exception to handle. If set to null, the handler will catch all exceptions. + /// + /// + /// A ThreadFunction defining a ThreadSpec that specifies how to handle the exception. + /// + public void HandleException(NodeOutput node, String exceptionName, Action handler) + { + CheckIfWorkflowThreadIsActive(); + var handlerDef = BuildFailureHandlerDef(node, exceptionName, handler); + handlerDef.SpecificFailure = exceptionName; + AddFailureHandlerDef(handlerDef, node); + } + + /// + /// Attaches an Exception Handler to the specified NodeOutput, enabling it to handle any + /// types of exceptions. + /// + /// + /// The NodeOutput instance to which the Exception Handler will be attached. + /// + /// + /// A ThreadFunction defining a ThreadSpec that specifies how to handle the exception. + /// + public void HandleException(NodeOutput node, Action handler) + { + CheckIfWorkflowThreadIsActive(); + var handlerDef = BuildFailureHandlerDef(node, null!, handler); + handlerDef.AnyFailureOfType = FailureHandlerDef.Types.LHFailureType.FailureTypeException; + AddFailureHandlerDef(handlerDef, node); + } + + /// + /// Attaches a Failure Handler to the specified NodeOutput, allowing it manages any type of errors or exceptions. + /// types of exceptions. + /// + /// + /// The NodeOutput instance to which the Error Handler will be attached. + /// + /// + /// A ThreadFunction defining a ThreadSpec that specifies how to handle the error. + /// + public void HandleAnyFailure(NodeOutput node, Action handler) + { + CheckIfWorkflowThreadIsActive(); + var handlerDef = BuildFailureHandlerDef(node, "any-failure", handler); + AddFailureHandlerDef(handlerDef, node); + } } \ No newline at end of file diff --git a/server/src/main/java/io/littlehorse/common/model/getable/core/taskworkergroup/TaskWorkerMetadataModel.java b/server/src/main/java/io/littlehorse/common/model/getable/core/taskworkergroup/TaskWorkerMetadataModel.java index e463f1c8d..9b080ef37 100644 --- a/server/src/main/java/io/littlehorse/common/model/getable/core/taskworkergroup/TaskWorkerMetadataModel.java +++ b/server/src/main/java/io/littlehorse/common/model/getable/core/taskworkergroup/TaskWorkerMetadataModel.java @@ -10,7 +10,6 @@ import java.util.*; import java.util.stream.Collectors; import lombok.Getter; -import org.jetbrains.annotations.NotNull; public class TaskWorkerMetadataModel extends LHSerializable implements Comparable { @@ -49,7 +48,7 @@ public List hostsToProto() { } @Override - public int compareTo(@NotNull TaskWorkerMetadataModel o) { + public int compareTo(TaskWorkerMetadataModel o) { if (taskWorkerId == null) return -1; return taskWorkerId.compareTo(o.taskWorkerId); } diff --git a/server/src/main/java/io/littlehorse/server/monitoring/metrics/InstanceState.java b/server/src/main/java/io/littlehorse/server/monitoring/metrics/InstanceState.java index 0c2fe5acd..43999d7d9 100644 --- a/server/src/main/java/io/littlehorse/server/monitoring/metrics/InstanceState.java +++ b/server/src/main/java/io/littlehorse/server/monitoring/metrics/InstanceState.java @@ -17,7 +17,6 @@ import org.apache.kafka.streams.TaskMetadata; import org.apache.kafka.streams.ThreadMetadata; import org.apache.kafka.streams.processor.TaskId; -import org.jetbrains.annotations.NotNull; @Slf4j public class InstanceState implements MeterBinder, KafkaStreams.StateListener, Closeable { @@ -43,7 +42,7 @@ public InstanceState(KafkaStreams streams, BackendInternalComms internalComms) { } @Override - public void bindTo(@NotNull MeterRegistry registry) { + public void bindTo(MeterRegistry registry) { Gauge.builder(METRIC_NAME + "_global", activeTaskBySubTopology, value -> { return value.getOrDefault(GLOBAL_SUB_TOPOLOGY_ID, 0); }) diff --git a/server/src/test/java/io/littlehorse/storeinternals/GetableManagerTest.java b/server/src/test/java/io/littlehorse/storeinternals/GetableManagerTest.java index 2f779d237..68d40a4d1 100644 --- a/server/src/test/java/io/littlehorse/storeinternals/GetableManagerTest.java +++ b/server/src/test/java/io/littlehorse/storeinternals/GetableManagerTest.java @@ -50,7 +50,6 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; -import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -147,7 +146,6 @@ void deleteAllByPrefix() { assertThat(storedTaskRunModel).isNull(); } - @NotNull private List getAllKeys(KeyValueStore store) { KeyValueIterator all = store.all(); List keys = new LinkedList<>();