Skip to content

Commit

Permalink
refactor(canary): enable canary topology tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sauljabin committed Feb 24, 2025
1 parent 83e3a73 commit cb66457
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public Topology toTopology() {
// build latency metric stream
final KStream<MetricKey, MetricValue> latencyMetricsStream = beatsStream
// remove messages without latency
.filterNot(MetricsTopology::hasLatency)
.filter(MetricsTopology::hasLatency)
// remove id
.groupBy(
MetricsTopology::removeWfId, Grouped.with(ProtobufSerdes.BeatKey(), ProtobufSerdes.BeatValue()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@
import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

// TODO: remove this
@Disabled("Work in progress")
class MetricsTopologyTest {

public static final String HOST_1 = "localhost";
Expand Down Expand Up @@ -112,6 +109,7 @@ private static TestRecord<BeatKey, BeatValue> newBeat(
Long latency,
String beatStatus,
Map<String, String> tags) {

BeatKey.Builder keyBuilder = BeatKey.newBuilder()
.setServerHost(host)
.setServerPort(port)
Expand All @@ -120,7 +118,8 @@ private static TestRecord<BeatKey, BeatValue> newBeat(
BeatValue.Builder valueBuilder = BeatValue.newBuilder().setTime(Timestamps.now());

if (beatStatus != null) {
// keyBuilder.setStatus(beatStatus);
keyBuilder.addTags(
Tag.newBuilder().setKey("status").setValue(beatStatus).build());
}

if (tags != null) {
Expand Down Expand Up @@ -167,7 +166,7 @@ void afterEach() {
}

@Test
void calculateCountAndLatencyForWfRunRequest() {
void shouldCalculateCountAndLatencyForWfRunRequest() {
BeatType expectedType = BeatType.WF_RUN_REQUEST;
String expectedTypeName = expectedType.name().toLowerCase();

Expand All @@ -180,7 +179,7 @@ void calculateCountAndLatencyForWfRunRequest() {
}

@Test
void includeBeatTagsIntoMetrics() {
void shouldIncludeBeatTagsIntoMetrics() {
BeatType expectedType = BeatType.WF_RUN_REQUEST;
String expectedTypeName = expectedType.name().toLowerCase();
Map<String, String> expectedTags = Map.of("my_tag", "value");
Expand All @@ -196,20 +195,22 @@ void includeBeatTagsIntoMetrics() {
}

@Test
void calculateCountForExhaustedRetries() {
// TODO: this
// BeatType expectedType = BeatType.GET_WF_RUN_EXHAUSTED_RETRIES;
// String expectedTypeName = expectedType.name().toLowerCase();

// inputTopic.pipeInput(newBeat(expectedType, getRandomId(), null));
// inputTopic.pipeInput(newBeat(expectedType, getRandomId(), null));
// inputTopic.pipeInput(newBeat(expectedType, getRandomId(), null));
//
// assertThat(store.get(newMetricKey("canary_" + expectedTypeName))).isEqualTo(newMetricValue(3.));
void shouldCalculateCountForExhaustedRetries() {
BeatType expectedType = BeatType.GET_WF_RUN_REQUEST;
String expectedTypeName = expectedType.name().toLowerCase();
Map<String, String> reason = Map.of("reason", "canary_exhausted_retries");

inputTopic.pipeInput(newBeat(expectedType, getRandomId(), null, "error", reason));
inputTopic.pipeInput(newBeat(expectedType, getRandomId(), null, "error", reason));
inputTopic.pipeInput(newBeat(expectedType, getRandomId(), null, "error", reason));

assertThat(getCount()).isEqualTo(1);
assertThat(store.get(newMetricKey("canary_" + expectedTypeName, "error", reason)))
.isEqualTo(newMetricValue(3.));
}

@Test
void calculateCountAndLatencyForWfRunRequestForTwoStatus() {
void shouldCalculateCountAndLatencyForWfRunRequestForTwoStatus() {
BeatType expectedType = BeatType.WF_RUN_REQUEST;
String expectedTypeName = expectedType.name().toLowerCase();

Expand All @@ -230,7 +231,7 @@ void calculateCountAndLatencyForWfRunRequestForTwoStatus() {
}

@Test
void calculateCountAndLatencyForGetWfRunRequest() {
void shouldCalculateCountAndLatencyForGetWfRunRequest() {
BeatType expectedType = BeatType.GET_WF_RUN_REQUEST;
String expectedTypeName = expectedType.name().toLowerCase();

Expand All @@ -244,7 +245,7 @@ void calculateCountAndLatencyForGetWfRunRequest() {
}

@Test
void calculateCountAndLatencyForTaskRunWithNoDuplicated() {
void shouldCalculateCountAndLatencyForTaskRunWithNoDuplicated() {
BeatType expectedType = BeatType.TASK_RUN_EXECUTION;
String expectedTypeName = expectedType.name().toLowerCase();

Expand All @@ -257,7 +258,7 @@ void calculateCountAndLatencyForTaskRunWithNoDuplicated() {
}

@Test
void calculateCountAndLatencyForTaskRunWithDuplicated() {
void shouldCalculateCountAndLatencyForTaskRunWithDuplicated() {
BeatType expectedType = BeatType.TASK_RUN_EXECUTION;
String expectedTypeName = expectedType.name().toLowerCase();
String expectedUniqueId = getRandomId();
Expand All @@ -272,7 +273,7 @@ void calculateCountAndLatencyForTaskRunWithDuplicated() {
}

@Test
void calculateCountAndLatencyForTaskRunWithTwoDuplicated() {
void shouldCalculateCountAndLatencyForTaskRunWithTwoDuplicated() {
BeatType expectedType = BeatType.TASK_RUN_EXECUTION;
String expectedTypeName = expectedType.name().toLowerCase();
String expectedUniqueId1 = getRandomId();
Expand All @@ -292,7 +293,7 @@ void calculateCountAndLatencyForTaskRunWithTwoDuplicated() {
}

@Test
void calculateCountAndLatencyForTaskRunWithDuplicatedAndTwoServers() {
void shouldCalculateCountAndLatencyForTaskRunWithDuplicatedAndTwoServers() {
BeatType expectedType = BeatType.TASK_RUN_EXECUTION;
String expectedTypeName = expectedType.name().toLowerCase();
String expectedUniqueId = getRandomId();
Expand Down

0 comments on commit cb66457

Please sign in to comment.