Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Merge pull request #739 from sundeepsf/develop
Browse files Browse the repository at this point in the history
Made changes to detect metric data lag and disable alert evaluation if there is a data lag.
  • Loading branch information
dilipdevaraj-sfdc authored May 8, 2018
2 parents 550c0d9 + 24582c7 commit 7ca6a60
Show file tree
Hide file tree
Showing 7 changed files with 812 additions and 598 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
package com.salesforce.dva.argus.client;

import com.salesforce.dva.argus.service.AlertService;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
Expand Down Expand Up @@ -84,7 +86,7 @@ public void run() {
Thread.currentThread().interrupt();
break;
} catch (Throwable ex) {
LOGGER.warn("Exception in alerter: {}", ex);
LOGGER.warn("Exception in alerter: {}", ExceptionUtils.getFullStackTrace(ex));
}
}
LOGGER.warn(MessageFormat.format("Alerter thread interrupted. {} alerts evaluated by this thread.", jobCounter.get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ public enum JobStatus {

DEQUEUED("Job dequeued from the message queue."),
STARTED("Job started."),
SKIPPED("Job skipped."),
SUCCESS("Job successfully completed."),
FAILURE("Job failed."),
ERROR("Exception occurred."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ public interface MonitorService extends Service {
* @return The runtime dashboard. Will not be null.
*/
Dashboard getRuntimeDashboard();

/**
* Returns boolean to indicate whether the data is lagging currently or not
*
* @return isDataLagging boolean flag
*/
boolean isDataLagging();

//~ Enums ****************************************************************************************************************************************

Expand Down Expand Up @@ -182,6 +189,8 @@ public static enum Counter {
JOBS_MAX("argus.core", "jobs.max"),
ALERTS_SCHEDULED("argus.core", "alerts.scheduled"),
ALERTS_EVALUATED("argus.core", "alerts.evaluated"),
ALERTS_FAILED("argus.core", "alerts.failed"),
ALERTS_SKIPPED("argus.core", "alerts.skipped"),
NOTIFICATIONS_SENT("argus.core", "notifications.sent"),
TRIGGERS_VIOLATED("argus.core", "triggers.violated"),
ALERTS_MAX("argus.core", "alerts.max"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.text.MessageFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
Expand Down Expand Up @@ -353,8 +354,24 @@ public List<History> executeScheduledAlerts(int alertCount, int timeout) {
long jobEndTime = 0;

String logMessage = null;
History history = new History(addDateToMessage(JobStatus.STARTED.getDescription()), SystemConfiguration.getHostname(), alert.getId(), JobStatus.STARTED);
History history = null;

if(Boolean.valueOf(_configuration.getValue(com.salesforce.dva.argus.system.SystemConfiguration.Property.DATA_LAG_MONITOR_ENABLED))){
if(_monitorService.isDataLagging()) {
history = new History(addDateToMessage(JobStatus.SKIPPED.getDescription()), SystemConfiguration.getHostname(), alert.getId(), JobStatus.SKIPPED);
logMessage = MessageFormat.format("Skipping evaluating the alert with id: {0}. because metric data was lagging", alert.getId());
_logger.info(logMessage);
_appendMessageNUpdateHistory(history, logMessage, null, 0);
history = _historyService.createHistory(alert, history.getMessage(), history.getJobStatus(), history.getExecutionTime());
historyList.add(history);
Map<String, String> tags = new HashMap<>();
tags.put(USERTAG, alert.getOwner().getUserName());
_monitorService.modifyCounter(Counter.ALERTS_SKIPPED, 1, tags);
continue;
}
}

history = new History(addDateToMessage(JobStatus.STARTED.getDescription()), SystemConfiguration.getHostname(), alert.getId(), JobStatus.STARTED);
try {
List<Metric> metrics = _metricService.getMetrics(alert.getExpression(), alertEnqueueTimestampsByAlertId.get(alert.getId()));

Expand Down Expand Up @@ -393,8 +410,19 @@ public List<History> executeScheduledAlerts(int alertCount, int timeout) {
}

jobEndTime = System.currentTimeMillis();
_appendMessageNUpdateHistory(history, "Alert was evaluated successfully.", JobStatus.SUCCESS, jobEndTime - jobStartTime);
long evalLatency = jobEndTime - jobStartTime;
_appendMessageNUpdateHistory(history, "Alert was evaluated successfully.", JobStatus.SUCCESS, evalLatency);

// publishing evaluation latency as a metric
Map<Long, Double> datapoints = new HashMap<>();
datapoints.put(1000 * 60 * (System.currentTimeMillis()/(1000 *60)), Double.valueOf(evalLatency));
Metric metric = new Metric("alerts.evaluated", "alert-evaluation-latency-" + alert.getId().toString());
metric.addDatapoints(datapoints);
try {
_tsdbService.putMetrics(Arrays.asList(new Metric[] {metric}));
} catch (Exception ex) {
_logger.error("Exception occurred while pushing alert evaluation latency metric to tsdb - {}", ex.getMessage());
}
} catch (MissingDataException mde) {
jobEndTime = System.currentTimeMillis();
logMessage = MessageFormat.format("Failed to evaluate alert : {0}. Reason: {1}", alert.getId(), mde.getMessage());
Expand All @@ -403,6 +431,9 @@ public List<History> executeScheduledAlerts(int alertCount, int timeout) {
if (alert.isMissingDataNotificationEnabled()) {
_sendNotificationForMissingData(alert);
}
Map<String, String> tags = new HashMap<>();
tags.put(USERTAG, alert.getOwner().getUserName());
_monitorService.modifyCounter(Counter.ALERTS_FAILED, 1, tags);
} catch (Exception ex) {
jobEndTime = System.currentTimeMillis();
logMessage = MessageFormat.format("Failed to evaluate alert : {0}. Reason: {1}", alert.getId(), ex.getMessage());
Expand All @@ -412,7 +443,9 @@ public List<History> executeScheduledAlerts(int alertCount, int timeout) {
if (Boolean.valueOf(_configuration.getValue(SystemConfiguration.Property.EMAIL_EXCEPTIONS))) {
_sendEmailToAdmin(alert, alert.getId(), ex);
}

Map<String, String> tags = new HashMap<>();
tags.put(USERTAG, alert.getOwner().getUserName());
_monitorService.modifyCounter(Counter.ALERTS_FAILED, 1, tags);
} finally {
Map<String, String> tags = new HashMap<>();
tags.put(USERTAG, alert.getOwner().getUserName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package com.salesforce.dva.argus.service.monitor;

import java.text.MessageFormat;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.salesforce.dva.argus.entity.Metric;
import com.salesforce.dva.argus.service.MailService;
import com.salesforce.dva.argus.service.MetricService;
import com.salesforce.dva.argus.system.SystemConfiguration;

/*
* This class runs a thread which periodically checks if there is data lag on Argus side.
*
*/
public class DataLagMonitor extends Thread{

private String _dataLagQueryExpression;

private long _dataLagThreshold;

private String _dataLagNotificationEmailId;

private String _hostName;

private boolean isDataLagging = false;

private MetricService _metricService;

private MailService _mailService;

private static final Long SLEEP_INTERVAL_MILLIS = 60*1000L;

private final Logger _logger = LoggerFactory.getLogger(DataLagMonitor.class);

public DataLagMonitor(SystemConfiguration sysConfig, MetricService metricService, MailService mailService) {
_metricService = metricService;
_mailService = mailService;
_dataLagQueryExpression = sysConfig.getValue(com.salesforce.dva.argus.system.SystemConfiguration.Property.DATA_LAG_QUERY_EXPRESSION);
_dataLagThreshold = Long.valueOf(sysConfig.getValue(com.salesforce.dva.argus.system.SystemConfiguration.Property.DATA_LAG_THRESHOLD));
_dataLagNotificationEmailId = sysConfig.getValue(com.salesforce.dva.argus.system.SystemConfiguration.Property.DATA_LAG_NOTIFICATION_EMAIL_ADDRESS);
_hostName = sysConfig.getHostname();
_logger.info("Data lag monitor initialized");
}

@Override
public void run() {
_logger.info("Data lag monitor thread started");
while (!isInterrupted()) {
try {
sleep(SLEEP_INTERVAL_MILLIS);
long currTime = System.currentTimeMillis();
List<Metric> metrics = _metricService.getMetrics(_dataLagQueryExpression, currTime);
if(metrics==null || metrics.isEmpty()) {
_logger.info("Data lag detected as metric list is empty");
if(!isDataLagging) {
isDataLagging=true;
sendDataLagEmailNotification();
}
continue;
}

//assuming only one time series in result
Metric currMetric = metrics.get(0);
if(currMetric.getDatapoints()==null || currMetric.getDatapoints().size()==0) {
_logger.info("Data lag detected as data point list is empty");
if(!isDataLagging) {
isDataLagging=true;
sendDataLagEmailNotification();
}
continue;
}else {
long lastDataPointTime = 0L;
for(Long dataPointTime : currMetric.getDatapoints().keySet()) {
if(dataPointTime > lastDataPointTime) {
lastDataPointTime = dataPointTime;
}
}
if((currTime - lastDataPointTime)> _dataLagThreshold) {
_logger.info("Data lag detected as the last data point recieved is more than the data threshold of " + _dataLagThreshold + " ms");
if(!isDataLagging) {
isDataLagging=true;
sendDataLagEmailNotification();
}
continue;
}
}
if(isDataLagging) {
isDataLagging = false;
sendDataLagEmailNotification();
}
}catch(Exception e) {
_logger.error("Exception thrown in data lag monitor thread - " + ExceptionUtils.getFullStackTrace(e));
}
}
}

private void sendDataLagEmailNotification() {
Set<String> emailAddresseses = new HashSet<String>();
emailAddresseses.add(_dataLagNotificationEmailId);
String subject = "";
if(isDataLagging) {
subject = "Alert evaluation on host - "+ _hostName + " has been stopped due to metric data lag";
}else {
subject = "Alert evaluation on host - "+ _hostName + " has been resumed as the metric data lag has cleared";
}

StringBuilder body = new StringBuilder();
body.append(MessageFormat.format("<b>Evaluated metric expression: </b> {0}<br/>", _dataLagQueryExpression));
body.append(MessageFormat.format("<b>Configured data lag threshold: </b> {0}<br/>", _dataLagThreshold));

_mailService.sendMessage(emailAddresseses, subject, body.toString(), "text/html; charset=utf-8", MailService.Priority.NORMAL);
}

public boolean isDataLagging() {
return isDataLagging;
}
}
Loading

0 comments on commit 7ca6a60

Please sign in to comment.