Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

add ad task stats #332

Merged
merged 3 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,11 @@ public Collection<Object> createComponents(
new ADStat<>(true, new IndexStatusSupplier(indexUtils, DetectorInternalState.DETECTOR_STATE_INDEX))
)
.put(StatNames.DETECTOR_COUNT.getName(), new ADStat<>(true, new SettableSupplier()))
.put(StatNames.HISTORICAL_SINGLE_ENTITY_DETECTOR_COUNT.getName(), new ADStat<>(true, new SettableSupplier()))
.put(StatNames.AD_EXECUTING_BATCH_TASK_COUNT.getName(), new ADStat<>(false, new CounterSupplier()))
.put(StatNames.AD_CANCELED_BATCH_TASK_COUNT.getName(), new ADStat<>(false, new CounterSupplier()))
.put(StatNames.AD_TOTAL_BATCH_TASK_EXECUTION_COUNT.getName(), new ADStat<>(false, new CounterSupplier()))
.put(StatNames.AD_BATCH_TASK_FAILURE_COUNT.getName(), new ADStat<>(false, new CounterSupplier()))
.build();

adStats = new ADStats(indexUtils, modelManager, stats);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.stats;

/**
* Enum containing names of all internal stats which will not be returned
* in AD stats REST API.
*/
public enum InternalStatNames {
JVM_HEAP_USAGE("jvm_heap_usage");

private String name;

InternalStatNames(String name) {
this.name = name;
}

/**
* Get internal stat name
*
* @return name
*/
public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
import java.util.Set;

/**
* Enum containing names of all stats
* Enum containing names of all external stats which will be returned in
* AD stats REST API.
*/
public enum StatNames {
AD_EXECUTE_REQUEST_COUNT("ad_execute_request_count"),
Expand All @@ -32,7 +33,12 @@ public enum StatNames {
MODELS_CHECKPOINT_INDEX_STATUS("models_checkpoint_index_status"),
ANOMALY_DETECTION_JOB_INDEX_STATUS("anomaly_detection_job_index_status"),
ANOMALY_DETECTION_STATE_STATUS("anomaly_detection_state_status"),
MODEL_INFORMATION("models");
MODEL_INFORMATION("models"),
HISTORICAL_SINGLE_ENTITY_DETECTOR_COUNT("historical_single_entity_detector_count"),
AD_EXECUTING_BATCH_TASK_COUNT("ad_executing_batch_task_count"),
AD_CANCELED_BATCH_TASK_COUNT("ad_canceled_batch_task_count"),
AD_TOTAL_BATCH_TASK_EXECUTION_COUNT("ad_total_batch_task_execution_count"),
AD_BATCH_TASK_FAILURE_COUNT("ad_batch_task_failure_count");

private String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.monitor.jvm.JvmService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import com.amazon.opendistroforelasticsearch.ad.stats.ADStats;
import com.amazon.opendistroforelasticsearch.ad.stats.InternalStatNames;

/**
* ADStatsNodesTransportAction contains the logic to extract the stats from the nodes
Expand All @@ -39,6 +41,7 @@ public class ADStatsNodesTransportAction extends
TransportNodesAction<ADStatsRequest, ADStatsNodesResponse, ADStatsNodeRequest, ADStatsNodeResponse> {

private ADStats adStats;
private final JvmService jvmService;

/**
* Constructor
Expand All @@ -55,7 +58,8 @@ public ADStatsNodesTransportAction(
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
ADStats adStats
ADStats adStats,
JvmService jvmService
) {
super(
ADStatsNodesAction.NAME,
Expand All @@ -69,6 +73,7 @@ public ADStatsNodesTransportAction(
ADStatsNodeResponse.class
);
this.adStats = adStats;
this.jvmService = jvmService;
}

@Override
Expand Down Expand Up @@ -99,6 +104,11 @@ private ADStatsNodeResponse createADStatsNodeResponse(ADStatsRequest adStatsRequ
Map<String, Object> statValues = new HashMap<>();
Set<String> statsToBeRetrieved = adStatsRequest.getStatsToBeRetrieved();

if (statsToBeRetrieved.contains(InternalStatNames.JVM_HEAP_USAGE.getName())) {
long heapUsedPercent = jvmService.stats().getMem().getHeapUsedPercent();
statValues.put(InternalStatNames.JVM_HEAP_USAGE.getName(), heapUsedPercent);
}

for (String statName : adStats.getNodeStats().keySet()) {
if (statsToBeRetrieved.contains(statName)) {
statValues.put(statName, adStats.getStats().get(statName).getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
adStatsResponse.toXContent(builder, params);
return builder;
}

protected ADStatsResponse getAdStatsResponse() {
return adStatsResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazon.opendistroforelasticsearch.ad.transport;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -31,16 +32,22 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorType;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStats;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStatsResponse;
import com.amazon.opendistroforelasticsearch.ad.stats.StatNames;
import com.amazon.opendistroforelasticsearch.ad.util.MultiResponsesDelegateActionListener;

public class StatsAnomalyDetectorTransportAction extends HandledTransportAction<ADStatsRequest, StatsAnomalyDetectorResponse> {
public static final String DETECTOR_TYPE_AGG = "detector_type_agg";
private final Logger logger = LogManager.getLogger(StatsAnomalyDetectorTransportAction.class);

private final Client client;
Expand Down Expand Up @@ -120,23 +127,36 @@ private void getClusterStats(
ADStatsRequest adStatsRequest
) {
ADStatsResponse adStatsResponse = new ADStatsResponse();
if (adStatsRequest.getStatsToBeRetrieved().contains(StatNames.DETECTOR_COUNT.getName())) {
if (clusterService.state().getRoutingTable().hasIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX)) {
final SearchRequest request = client
.prepareSearch(AnomalyDetector.ANOMALY_DETECTORS_INDEX)
.setSize(0)
.setTrackTotalHits(true)
.request();
client.search(request, ActionListener.wrap(indicesStatsResponse -> {
adStats.getStat(StatNames.DETECTOR_COUNT.getName()).setValue(indicesStatsResponse.getHits().getTotalHits().value);
adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest));
listener.onResponse(adStatsResponse);
}, e -> listener.onFailure(new RuntimeException("Failed to get AD cluster stats", e))));
} else {
adStats.getStat(StatNames.DETECTOR_COUNT.getName()).setValue(0L);
if ((adStatsRequest.getStatsToBeRetrieved().contains(StatNames.DETECTOR_COUNT.getName())
|| adStatsRequest.getStatsToBeRetrieved().contains(StatNames.HISTORICAL_SINGLE_ENTITY_DETECTOR_COUNT.getName()))
&& clusterService.state().getRoutingTable().hasIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX)) {

TermsAggregationBuilder termsAgg = AggregationBuilders.terms(DETECTOR_TYPE_AGG).field(AnomalyDetector.DETECTOR_TYPE_FIELD);
SearchRequest request = new SearchRequest()
.indices(AnomalyDetector.ANOMALY_DETECTORS_INDEX)
.source(new SearchSourceBuilder().aggregation(termsAgg).size(0).trackTotalHits(true));

client.search(request, ActionListener.wrap(r -> {
StringTerms aggregation = r.getAggregations().get(DETECTOR_TYPE_AGG);
List<StringTerms.Bucket> buckets = aggregation.getBuckets();
long totalDetectors = r.getHits().getTotalHits().value;
long totalHistoricalSingleEntityDetectors = 0;
for (StringTerms.Bucket b : buckets) {
if (AnomalyDetectorType.HISTORICAL_SINGLE_ENTITY.name().equals(b.getKeyAsString())) {
totalHistoricalSingleEntityDetectors += b.getDocCount();
}
}
if (adStatsRequest.getStatsToBeRetrieved().contains(StatNames.DETECTOR_COUNT.getName())) {
adStats.getStat(StatNames.DETECTOR_COUNT.getName()).setValue(totalDetectors);
}
if (adStatsRequest.getStatsToBeRetrieved().contains(StatNames.HISTORICAL_SINGLE_ENTITY_DETECTOR_COUNT.getName())) {
adStats
.getStat(StatNames.HISTORICAL_SINGLE_ENTITY_DETECTOR_COUNT.getName())
.setValue(totalHistoricalSingleEntityDetectors);
}
adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest));
listener.onResponse(adStatsResponse);
}
}, e -> listener.onFailure(e)));
} else {
adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest));
listener.onResponse(adStatsResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistroforelasticsearch.ad;

import static org.apache.http.entity.ContentType.APPLICATION_JSON;
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.BUILT_IN_ROLES;
import static org.elasticsearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
Expand All @@ -41,6 +42,7 @@
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -49,11 +51,13 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetadata;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
Expand Down Expand Up @@ -627,6 +631,11 @@ public static ThreadPool createThreadPool() {
return pool;
}

public static CreateIndexResponse createIndex(AdminClient adminClient, String indexName, String indexMapping) {
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(AnomalyDetector.TYPE, indexMapping, XContentType.JSON);
return adminClient.indices().create(request).actionGet(5_000);
}

public static void createIndex(RestClient client, String indexName, HttpEntity data) throws IOException {
TestHelpers
.makeRequest(
Expand Down Expand Up @@ -757,4 +766,17 @@ public static ADTask randomAdTask(String taskId, ADTaskState state, Instant exec
.build();
return task;
}

public static HttpEntity toHttpEntity(ToXContentObject object) throws IOException {
return new StringEntity(toJsonString(object), APPLICATION_JSON);
}

public static HttpEntity toHttpEntity(String jsonString) throws IOException {
return new StringEntity(jsonString, APPLICATION_JSON);
}

public static String toJsonString(ToXContentObject object) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
return TestHelpers.xContentBuilderToString(object.toXContent(builder, ToXContent.EMPTY_PARAMS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.monitor.jvm.JvmService;
import org.elasticsearch.monitor.jvm.JvmStats;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -40,9 +42,11 @@
import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStat;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStats;
import com.amazon.opendistroforelasticsearch.ad.stats.InternalStatNames;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.CounterSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.IndexStatusSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.ModelsOnNodeSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.SettableSupplier;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils;
import com.amazon.opendistroforelasticsearch.ad.util.Throttler;
Expand Down Expand Up @@ -87,17 +91,26 @@ public void setUp() throws Exception {
put(nodeStatName2, new ADStat<>(false, new ModelsOnNodeSupplier(modelManager, cacheProvider)));
put(clusterStatName1, new ADStat<>(true, new IndexStatusSupplier(indexUtils, "index1")));
put(clusterStatName2, new ADStat<>(true, new IndexStatusSupplier(indexUtils, "index2")));
put(InternalStatNames.JVM_HEAP_USAGE.getName(), new ADStat<>(true, new SettableSupplier()));
}
};

adStats = new ADStats(indexUtils, modelManager, statsMap);
JvmService jvmService = mock(JvmService.class);
JvmStats jvmStats = mock(JvmStats.class);
JvmStats.Mem mem = mock(JvmStats.Mem.class);

when(jvmService.stats()).thenReturn(jvmStats);
when(jvmStats.getMem()).thenReturn(mem);
when(mem.getHeapUsedPercent()).thenReturn(randomShort());

action = new ADStatsNodesTransportAction(
client().threadPool(),
clusterService(),
mock(TransportService.class),
mock(ActionFilters.class),
adStats
adStats,
jvmService
);
}

Expand Down Expand Up @@ -133,4 +146,26 @@ public void testNodeOperation() {
assertTrue(statsToBeRetrieved.contains(statName));
}
}

@Test
public void testNodeOperationWithJvmHeapUsage() {
String nodeId = clusterService().localNode().getId();
ADStatsRequest adStatsRequest = new ADStatsRequest((nodeId));
adStatsRequest.clear();

Set<String> statsToBeRetrieved = new HashSet<>(Arrays.asList(nodeStatName1, InternalStatNames.JVM_HEAP_USAGE.getName()));

for (String stat : statsToBeRetrieved) {
adStatsRequest.addStat(stat);
}

ADStatsNodeResponse response = action.nodeOperation(new ADStatsNodeRequest(adStatsRequest));

Map<String, Object> stats = response.getStatsMap();

assertEquals(statsToBeRetrieved.size(), stats.size());
for (String statName : stats.keySet()) {
assertTrue(statsToBeRetrieved.contains(statName));
}
}
}
Loading