Skip to content

Commit

Permalink
[persistence] improve time mock for quartz scheduler (#1787)
Browse files Browse the repository at this point in the history
* [persistence] improve time mock for quartz scheduler

* fix flaky anomaly resolution test
  • Loading branch information
cyrilou242 authored Feb 7, 2025
1 parent 85278ac commit 8b7d79c
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,6 @@ private static long epoch(final String dateTime) {
return localDateTime.toInstant(UTC).toEpochMilli();
}

private static long jumpToTime(final String dateTime) throws InterruptedException {
final long jumpTime = epoch(dateTime);
CLOCK.useMockTime(jumpTime);
CLOCK.tick(5); // simulate move time forward

// give thread to detectionCronScheduler and to quartz scheduler -
// (quartz idle time is weaved to 100 ms for test speed)
Thread.sleep(1000);
return jumpTime;
}

@BeforeClass
public void beforeClass() throws Exception {
// ensure time is controlled via the TimeProvider CLOCK - ie weaving is working correctly
Expand Down Expand Up @@ -195,134 +184,111 @@ public void testOnboardingTaskRun() throws Exception {
assertThat(alertLastTimestamp).isEqualTo(epoch("2020-02-16 00:00"));
}

/**
* the alert detects anomalies on [feb 3 , feb6), [feb 10, feb 13), [feb 17, feb 20), [feb 24, feb
* 27), etc...
* this test checks the behaviour for the anomaly on [feb 17, feb 20)
* time is currently "2020-02-16 15:00". time is increased day by day from feb 16 to feb 24, and
* the state is checked every day
* the max merge gap is P3D, so once there is no anomaly detected on [feb 23, feb 24), the
* completed anomaly notification should be sent
*/
@Test(dependsOnMethods = "testOnboardingTaskRun", timeOut = TEST_IMEOUT)
public void testDailyFeb21() throws InterruptedException {
// get current number of anomalies
final int numAnomaliesBeforeDetectionRun = client.getAnomalies().size();

final List<AnomalyApi> parentAnomalies = client.getParentAnomalies();
assertThat(parentAnomalies).hasSize(1);

// No notifications sent yet.
assertThat(nsf.getCount()).isZero();
long jumpTime = jumpToTime("2020-02-21 00:00");

waitForDetectionRun();
// wait for new anomalies to be created
while (client.getAnomalies().size() == numAnomaliesBeforeDetectionRun) {
Thread.sleep(1000);
}

// check that lastTimestamp after detection is the runTime of the cron
assertThat(getAlertLastTimestamp()).isEqualTo(jumpTime);

public void testCompletedAnomalyIsSentCorrectly() throws InterruptedException {
// sanity checks after the onboarding task
assertThat(client.getParentAnomalies()).hasSize(2);
// no notification happened yet - time has not increased since subscription group creation
assertThat(nsf.notificationSentCount()).isZero();

// There is at least 1 successful subscription group task
// run the detections every day and check the results
// the cron is at 5 am, so moving to 6 means the cron at 5 will be triggered
// no anomaly on [Feb 16, Feb 17)
CLOCK.useMockTime(epoch("2020-02-17 06:00"));
waitForDetectionRun();
// notification cron is every day at 7 am, so moving to 8 will trigger a notification
CLOCK.useMockTime(epoch("2020-02-17 08:00"));
waitForNotificationTaskRun();
assertThat(nsf.getCount()).isEqualTo(0);

// Move time forward by 5 minutes to make subscription group task run again
jumpToTime("2020-02-21 00:05");
assertThat(nsf.notificationSentCount()).isZero();

// anomaly on [Feb 17, Feb 18)
CLOCK.useMockTime(epoch("2020-02-18 06:00"));
assertThat(nsf.notificationSentCount()).isZero();
waitForDetectionRun();
CLOCK.useMockTime(epoch("2020-02-18 08:00"));
waitForNotificationTaskRun();
assertThat(nsf.getCount()).isEqualTo(1);

final NotificationPayloadApi notificationPayload = nsf.getNotificationPayload();
assertThat(nsf.notificationSentCount()).isEqualTo(1); // a new anomaly is detected and notified
final NotificationPayloadApi notificationPayload = nsf.lastNotificationPayload();
assertThat(notificationPayload.getAnomalyReports()).hasSize(1);

final AnomalyApi anomalyApi = notificationPayload.getAnomalyReports().getFirst().getAnomaly();
assertThat(anomalyApi.getStartTime()).isEqualTo(new Date(epoch("2020-02-17 00:00")));
assertThat(anomalyApi.getEndTime()).isEqualTo(new Date(epoch("2020-02-21 00:00")));
}

@Test(dependsOnMethods = "testDailyFeb21", timeOut = TEST_IMEOUT)
public void testDailyFeb22() throws InterruptedException {
jumpToTimeAndWait("2020-02-22 00:06");
assertThat(client.getParentAnomalies()).hasSize(2);
assertThat(nsf.getCount()).isEqualTo(1); // no new notifications
}

@Test(dependsOnMethods = "testDailyFeb22", timeOut = TEST_IMEOUT)
public void testDailyFeb23() throws InterruptedException {
jumpToTimeAndWait("2020-02-23 00:06");
assertThat(client.getParentAnomalies()).hasSize(2);
assertThat(nsf.getCount()).isEqualTo(1); // no new notifications
}

@Test(dependsOnMethods = "testDailyFeb23", timeOut = TEST_IMEOUT)
public void testDailyFeb24() throws InterruptedException {
jumpToTimeAndWait("2020-02-24 00:06");
assertThat(client.getParentAnomalies()).hasSize(2);
assertThat(nsf.getCount()).isEqualTo(1); // no new notifications
}

@Test(dependsOnMethods = "testDailyFeb24", timeOut = TEST_IMEOUT)
public void testDailyFeb25() throws InterruptedException {
jumpToTimeAndWait("2020-02-25 00:06");

final List<AnomalyApi> parentAnomalies = client.getParentAnomalies();
assertThat(parentAnomalies).hasSize(2);
assertThat(anomalyApi.getEndTime()).isEqualTo(new Date(epoch("2020-02-18 00:00")));

final AnomalyApi ongoingAnomaly = parentAnomalies.stream()
.filter(a -> a.getStartTime().equals(new Date(epoch("2020-02-17 00:00"))))
.findFirst()
.orElseThrow(() -> new AssertionError("Anomaly not found"));

// anomalies are merged here
assertThat(ongoingAnomaly.getEndTime()).isEqualTo(new Date(epoch("2020-02-25 00:00")));
// anomaly on [Feb 18, Feb 19)
CLOCK.useMockTime(epoch("2020-02-19 06:00"));
waitForDetectionRun();
CLOCK.useMockTime(epoch("2020-02-19 08:00"));
waitForNotificationTaskRun();
assertThat(nsf.notificationSentCount()).isEqualTo(
1); // this point is anomalous but merged in current anomaly

// No new notifications yet
assertThat(nsf.getCount()).isEqualTo(1);
}
// anomaly on [Feb 19, Feb 20)
CLOCK.useMockTime(epoch("2020-02-20 06:00"));
waitForDetectionRun();
CLOCK.useMockTime(epoch("2020-02-20 08:00"));
waitForNotificationTaskRun();
assertThat(nsf.notificationSentCount()).isEqualTo(
1); // this point is anomalous but merged in current anomaly

@Test(dependsOnMethods = "testDailyFeb25", timeOut = TEST_IMEOUT)
public void testDailyMar3() throws InterruptedException {
jumpToTimeAndWait("2020-03-03 00:06");
// sanity checks on the detection state
// check that lastTimestamp after detection is the expected runTime of the cron, floored to alert granularity 2020-02-20 06:00 --> 2020-02-20 00:00
assertThat(getAlertLastTimestamp()).isEqualTo(epoch("2020-02-20 00:00"));
assertThat(client.getParentAnomalies()).hasSize(3);
assertThat(nsf.getCount()).isEqualTo(1);
}
final int anomaliesCurrentCount = client.getAnomalies().size();

@Test(dependsOnMethods = "testDailyMar3", timeOut = TEST_IMEOUT)
public void testDailyMar4() throws InterruptedException {
jumpToTimeAndWait("2020-03-04 00:06");

final List<AnomalyApi> parentAnomalies = client.getParentAnomalies();
assertThat(parentAnomalies).hasSize(3);

// No new notifications yet
assertThat(nsf.getCount()).isEqualTo(2);
// no anomaly on [Feb 20, Feb 21)
CLOCK.useMockTime(epoch("2020-02-21 06:00"));
waitForDetectionRun();
// ensure the number of anomalies hasn't changed
assertThat(client.getAnomalies()).hasSize(anomaliesCurrentCount);
CLOCK.useMockTime(epoch("2020-02-21 08:00"));
waitForNotificationTaskRun();
// ensure the number of notification hasn't changed
assertThat(nsf.notificationSentCount()).isEqualTo(1);

final NotificationPayloadApi notificationPayload = nsf.getNotificationPayload();
assertThat(notificationPayload.getAnomalyReports()).hasSize(1);
// no anomaly on [Feb 21, Feb 22) - same checks as above
CLOCK.useMockTime(epoch("2020-02-22 06:00"));
waitForDetectionRun();
assertThat(client.getAnomalies()).hasSize(anomaliesCurrentCount);
CLOCK.useMockTime(epoch("2020-02-22 08:00"));
waitForNotificationTaskRun();
assertThat(nsf.notificationSentCount()).isEqualTo(1);

final AnomalyApi anomalyApi = notificationPayload.getAnomalyReports().getFirst().getAnomaly();
assertThat(anomalyApi.getStartTime()).isEqualTo(new Date(epoch("2020-03-02 00:00")));
assertThat(anomalyApi.getEndTime()).isEqualTo(new Date(epoch("2020-03-03 00:00")));
}
// no anomaly on [Feb 22, Feb 23) - same checks as above
CLOCK.useMockTime(epoch("2020-02-23 06:00"));
waitForDetectionRun();
assertThat(client.getAnomalies()).hasSize(anomaliesCurrentCount);
CLOCK.useMockTime(epoch("2020-02-23 08:00"));
waitForNotificationTaskRun();
assertThat(nsf.notificationSentCount()).isEqualTo(1);

@Test(dependsOnMethods = "testDailyMar4", timeOut = TEST_IMEOUT)
public void testDailyMar5() throws InterruptedException {
jumpToTimeAndWait("2020-03-05 00:06");
// no anomaly on [Feb 23, Feb 24)
CLOCK.useMockTime(epoch("2020-02-24 06:00"));
waitForDetectionRun();
assertThat(client.getAnomalies()).hasSize(anomaliesCurrentCount);
CLOCK.useMockTime(epoch("2020-02-24 08:00"));
waitForNotificationTaskRun();
// maxMergeGap is P3D, so a notification for completed anomaly can be sent now
assertThat(nsf.notificationSentCount()).isEqualTo(2);
// sanity check on number of anomalies
assertThat(client.getParentAnomalies()).hasSize(3);
assertThat(nsf.getCount()).isEqualTo(3);

final NotificationPayloadApi payload = nsf.getNotificationPayload();
// ensure the completed anomaly notification was sent
final NotificationPayloadApi payload = nsf.lastNotificationPayload();
assertThat(payload.getAnomalyReports()).hasSize(0);
assertThat(payload.getCompletedAnomalyReports()).hasSize(1);
}

private void jumpToTimeAndWait(final String dateTime) throws InterruptedException {
jumpToTime(dateTime); // allow both detection and notification to run
waitForDetectionRun();
waitForNotificationTaskRun();
}

private void waitForDetectionRun() throws InterruptedException {
nDetectionTaskRuns = waitFor(alertId, nDetectionTaskRuns);

// Even after the task is complete, anomalies are persisted async. Giving another sec
Thread.sleep(1000);
}

private void waitForNotificationTaskRun() throws InterruptedException {
Expand All @@ -333,11 +299,11 @@ private int waitFor(final Long refId, int currentCount) throws InterruptedExcept
int nTasks;
do {
nTasks = client.getSuccessfulTasks(refId).size();
if (!(nTasks <= currentCount)) {
if (nTasks > currentCount) {
break;
}
// should trigger another task after time jump
Thread.sleep(1000);
// should give time to run tasks
Thread.sleep(500);
} while (true);

return nTasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ public void testGetAnomaliesCreate() throws InterruptedException {
List<AnomalyApi> anomalies = List.of();
while (anomalies.size() == 0) {
// see taskDriver server config for optimization
Thread.sleep(1000);
Thread.sleep(500);
final Response response = request("api/anomalies?isChild=false").get();
assert200(response);
anomalies = response.readEntity(ANOMALIES_LIST_TYPE);
Expand Down Expand Up @@ -448,7 +448,7 @@ public void testGetAnomaliesAfterUpdate() throws InterruptedException {
long newAlertLastUpdateTime = alertLastUpdateTime;
while (newAlertLastUpdateTime == alertLastUpdateTime) {
// see taskDriver server config for optimization
Thread.sleep(1000);
Thread.sleep(500);
newAlertLastUpdateTime = getAlertLastUpdatedTime();
}
final Response response = request("api/anomalies?isChild=false").get();
Expand Down Expand Up @@ -595,7 +595,7 @@ public void testReplayIsIdemPotent() throws InterruptedException {
assertThat(replayResponse.getStatus()).isEqualTo(200);

while (getAlertLastUpdatedTime() == lastUpdatedTime) {
Thread.sleep(1000);
Thread.sleep(500);
}

final Response afterReplayResponse = request(alertAnomaliesRoute).get();
Expand Down Expand Up @@ -867,7 +867,7 @@ private RcaInvestigationApi mustGetInvestigation(long id) {
private void waitForAnyAnomalies(final long alertId) throws InterruptedException {
List<AnomalyApi> gotAnomalies = mustGetAnomaliesForAlert(alertId);
while (gotAnomalies.size() == 0) {
Thread.sleep(1000);
Thread.sleep(500);
gotAnomalies = mustGetAnomaliesForAlert(alertId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -168,7 +167,7 @@ public void testOnboardingLastTimestamp() throws Exception {
TaskApi onboardingTask = getTask(onboardingTaskId);
while (TASK_PENDING_STATUSES.contains(onboardingTask.getStatus())) {
// see taskDriver server config for optimization
Thread.sleep(1000);
Thread.sleep(500);
onboardingTask = getTask(onboardingTaskId);
}

Expand All @@ -185,12 +184,12 @@ public void testAfterDetectionCronLastTimestamp() throws InterruptedException {
// not exact time should not impact lastTimestamp
CLOCK.tick(5);
// give thread to detectionCronScheduler and to quartz scheduler - (quartz idle time is weaved to 100 ms for test speed)
Thread.sleep(1000);
Thread.sleep(500);

// wait for the new task to be created - proxy to know when the detection is triggered
List<TaskApi> tasks = getTasks();
while (tasks.size() == 1 || TASK_PENDING_STATUSES.contains(tasks.getLast().getStatus())) {
Thread.sleep(1000);
Thread.sleep(500);
tasks = getTasks();
}
assertThat(tasks.getLast().getTaskSubType()).isEqualTo(TaskSubType.DETECTION_TRIGGERED_BY_CRON);
Expand All @@ -211,11 +210,11 @@ public void testSecondAnomalyIsMerged() throws InterruptedException {
// not exact time should not impact lastTimestamp
CLOCK.tick(5);
// give thread to quartz scheduler - (quartz idle time is weaved to 1000 ms for test speed)
Thread.sleep(1000);
Thread.sleep(500);

// wait for a new anomaly to be created - proxy to know when the detection has run
while (anomalies.size() == numAnomaliesBeforeDetectionRun) {
Thread.sleep(1000);
Thread.sleep(500);
anomalies = getAnomalies();
}

Expand All @@ -226,7 +225,7 @@ public void testSecondAnomalyIsMerged() throws InterruptedException {
// find anomalies starting on MARCH 21 - there should be 2
final List<AnomalyApi> march21Anomalies = anomalies.stream()
.filter(a -> a.getStartTime().getTime() == MARCH_21_2020_00H00)
.collect(Collectors.toList());
.toList();
assertThat(march21Anomalies.size()).isEqualTo(2);
// check that one anomaly finishes on MARCH 22: the child anomaly
assertThat(march21Anomalies.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ public NotificationService build(final Map<String, Object> params) {
};
}

public int getCount() {
public int notificationSentCount() {
return count;
}

public NotificationPayloadApi getNotificationPayload() {
public NotificationPayloadApi lastNotificationPayload() {
return f.get();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "threshold-pageviews-daily-merge-3-15",
"description": "Daily Threshold Alert with mergeMaxGap = 3 days and mergeMaxDuration = 15 days",
"cron": "0 0 5 1/1 * ? *",
"cron": "0 0 5 * * ? *",
"template": {
"nodes": [
{
Expand Down Expand Up @@ -74,9 +74,9 @@
"monitoringGranularity": "P1D",
"timeColumn": "AUTO",
"timeColumnFormat": "",
"max": "600000",
"max": "680000",
"min": "250000",
"mergeMaxGap": "P3D",
"mergeMaxDuration": "P15D"
"mergeMaxDuration": "P6D"
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "sg-anomalyresolution",
"cron": "0 0/5 0 ? * * *",
"cron": "0 0 7 * * ? *",
"active": true,
"notifyHistoricalAnomalies": false,
"specs": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ private AnomalyFilter buildAnomalyFilterCompletedAnomalies(final AlertAssociatio
final AlertDTO alert = alertManager.findById(alertId);
final AlertTemplateDTO renderedTemplate = alertTemplateRenderer.renderAlert(alert);
final Period mergeMaxGap = AlertUtils.getMergeMaxGap(renderedTemplate);
// note: a ThirdEye anomaly could also be completed if its mergeMaxDuration is reached,
// but in this case a new ThirdEye anomaly would be created for the same data anomaly just after
// we don't call out these anomalies as completed
final long endTimeIsLt = alert.getLastTimestamp() - mergeMaxGap.toStandardDuration().getMillis();

return new AnomalyFilter()
Expand Down
Loading

0 comments on commit 8b7d79c

Please sign in to comment.