From 12eb1cfda1e42f60945caea6b639b8dda47b8ed5 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Wed, 8 Jan 2025 14:34:37 +0200 Subject: [PATCH] Metrics for indexing failures due to version conflicts (#119067) This exposes new OTel node and index based metrics for indexing failures due to version conflicts. In addition, the /_cat/shards, /_cat/indices and /_cat/nodes APIs also expose the same metric, under the newly added column iifvc. Relates: #107601 --- docs/changelog/119067.yaml | 5 + docs/reference/cat/nodes.asciidoc | 3 + docs/reference/cat/shards.asciidoc | 3 + .../datastreams/DataStreamAutoshardingIT.java | 2 +- .../test/cat.shards/10_basic.yml | 1 + .../monitor/metrics/IndicesMetricsIT.java | 39 +++- .../metrics/NodeIndexingMetricsIT.java | 211 +++++++++++++++++- .../org/elasticsearch/TransportVersions.java | 1 + .../index/shard/IndexingStats.java | 22 +- .../index/shard/InternalIndexingStats.java | 7 + .../monitor/metrics/IndicesMetrics.java | 10 +- .../monitor/metrics/NodeMetrics.java | 16 ++ .../rest/action/cat/RestIndicesAction.java | 16 ++ .../rest/action/cat/RestNodesAction.java | 6 + .../rest/action/cat/RestShardsAction.java | 6 + .../cluster/node/stats/NodeStatsTests.java | 1 + .../metadata/IndexMetadataStatsTests.java | 16 +- .../index/shard/IndexShardTests.java | 81 +++++++ .../action/cat/RestShardsActionTests.java | 160 +++++++++---- .../index/shard/IndexShardTestCase.java | 19 +- .../indices/IndexStatsMonitoringDocTests.java | 16 +- .../IndicesStatsMonitoringDocTests.java | 2 +- .../node/NodeStatsMonitoringDocTests.java | 1 + 23 files changed, 578 insertions(+), 66 deletions(-) create mode 100644 docs/changelog/119067.yaml diff --git a/docs/changelog/119067.yaml b/docs/changelog/119067.yaml new file mode 100644 index 0000000000000..c7ddd570bea18 --- /dev/null +++ b/docs/changelog/119067.yaml @@ -0,0 +1,5 @@ +pr: 119067 +summary: Metrics for indexing failures due to version conflicts +area: CRUD +type: feature +issues: [] diff --git a/docs/reference/cat/nodes.asciidoc b/docs/reference/cat/nodes.asciidoc index c52315423f87e..a5a813e8d37d5 100644 --- a/docs/reference/cat/nodes.asciidoc +++ b/docs/reference/cat/nodes.asciidoc @@ -239,6 +239,9 @@ Number of indexing operations, such as `1`. `indexing.index_failed`, `iif`, `indexingIndexFailed`:: Number of failed indexing operations, such as `0`. +`indexing.index_failed_due_to_version_conflict`, `iifvc`, `indexingIndexFailedDueToVersionConflict`:: +Number of failed indexing operations due to version conflict, such as `0`. + `merges.current`, `mc`, `mergesCurrent`:: Number of current merge operations, such as `0`. diff --git a/docs/reference/cat/shards.asciidoc b/docs/reference/cat/shards.asciidoc index f73ac6e263cd2..2d3859e74c87e 100644 --- a/docs/reference/cat/shards.asciidoc +++ b/docs/reference/cat/shards.asciidoc @@ -162,6 +162,9 @@ Number of indexing operations, such as `1`. `indexing.index_failed`, `iif`, `indexingIndexFailed`:: Number of failed indexing operations, such as `0`. +`indexing.index_failed_due_to_version_conflict`, `iifvc`, `indexingIndexFailedDueToVersionConflict`:: +Number of failed indexing operations due to version conflict, such as `0`. + `merges.current`, `mc`, `mergesCurrent`:: Number of current merge operations, such as `0`. diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java index ac73385a97d70..91f18ad3573fd 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java @@ -493,7 +493,7 @@ private static ShardStats getShardStats(IndexMetadata indexMeta, int shardIndex, CommonStats stats = new CommonStats(); stats.docs = new DocsStats(100, 0, randomByteSizeValue().getBytes()); stats.store = new StoreStats(); - stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1)); + stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1)); return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0); } diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cat.shards/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cat.shards/10_basic.yml index 03d8b2068d23e..45f381eab80b1 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cat.shards/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cat.shards/10_basic.yml @@ -45,6 +45,7 @@ indexing.index_time .+ \n indexing.index_total .+ \n indexing.index_failed .+ \n + indexing.index_failed_due_to_version_conflict .+ \n merges.current .+ \n merges.current_docs .+ \n merges.current_size .+ \n diff --git a/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/IndicesMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/IndicesMetricsIT.java index 31c7720ffec1c..fee2c0494365e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/IndicesMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/IndicesMetricsIT.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -76,6 +77,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { static final String STANDARD_INDEXING_COUNT = "es.indices.standard.indexing.total"; static final String STANDARD_INDEXING_TIME = "es.indices.standard.indexing.time"; static final String STANDARD_INDEXING_FAILURE = "es.indices.standard.indexing.failure.total"; + static final String STANDARD_INDEXING_FAILURE_DUE_TO_VERSION_CONFLICT = "es.indices.standard.indexing.failure.version_conflict.total"; static final String TIME_SERIES_INDEX_COUNT = "es.indices.time_series.total"; static final String TIME_SERIES_BYTES_SIZE = "es.indices.time_series.size"; @@ -89,6 +91,8 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { static final String TIME_SERIES_INDEXING_COUNT = "es.indices.time_series.indexing.total"; static final String TIME_SERIES_INDEXING_TIME = "es.indices.time_series.indexing.time"; static final String TIME_SERIES_INDEXING_FAILURE = "es.indices.time_series.indexing.failure.total"; + static final String TIME_SERIES_INDEXING_FAILURE_DUE_TO_VERSION_CONFLICT = + "es.indices.time_series.indexing.failure.version_conflict.total"; static final String LOGSDB_INDEX_COUNT = "es.indices.logsdb.total"; static final String LOGSDB_BYTES_SIZE = "es.indices.logsdb.size"; @@ -102,6 +106,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { static final String LOGSDB_INDEXING_COUNT = "es.indices.logsdb.indexing.total"; static final String LOGSDB_INDEXING_TIME = "es.indices.logsdb.indexing.time"; static final String LOGSDB_INDEXING_FAILURE = "es.indices.logsdb.indexing.failure.total"; + static final String LOGSDB_INDEXING_FAILURE_DUE_TO_VERSION_CONFLICT = "es.indices.logsdb.indexing.failure.version_conflict.total"; public void testIndicesMetrics() { String indexNode = internalCluster().startNode(); @@ -132,7 +137,9 @@ public void testIndicesMetrics() { STANDARD_INDEXING_TIME, greaterThanOrEqualTo(0L), STANDARD_INDEXING_FAILURE, - equalTo(indexing1.getIndexFailedCount() - indexing0.getIndexCount()) + equalTo(indexing1.getIndexFailedCount() - indexing0.getIndexFailedCount()), + STANDARD_INDEXING_FAILURE_DUE_TO_VERSION_CONFLICT, + equalTo(indexing1.getIndexFailedDueToVersionConflictCount() - indexing0.getIndexFailedDueToVersionConflictCount()) ) ); @@ -155,7 +162,9 @@ public void testIndicesMetrics() { TIME_SERIES_INDEXING_TIME, greaterThanOrEqualTo(0L), TIME_SERIES_INDEXING_FAILURE, - equalTo(indexing2.getIndexFailedCount() - indexing1.getIndexFailedCount()) + equalTo(indexing1.getIndexFailedCount() - indexing0.getIndexFailedCount()), + TIME_SERIES_INDEXING_FAILURE_DUE_TO_VERSION_CONFLICT, + equalTo(indexing1.getIndexFailedDueToVersionConflictCount() - indexing0.getIndexFailedDueToVersionConflictCount()) ) ); @@ -177,13 +186,14 @@ public void testIndicesMetrics() { LOGSDB_INDEXING_TIME, greaterThanOrEqualTo(0L), LOGSDB_INDEXING_FAILURE, - equalTo(indexing3.getIndexFailedCount() - indexing2.getIndexFailedCount()) + equalTo(indexing3.getIndexFailedCount() - indexing2.getIndexFailedCount()), + LOGSDB_INDEXING_FAILURE_DUE_TO_VERSION_CONFLICT, + equalTo(indexing3.getIndexFailedDueToVersionConflictCount() - indexing2.getIndexFailedDueToVersionConflictCount()) ) ); // already collected indexing stats - collectThenAssertMetrics( - telemetry, - 4, + Map> zeroMatchers = new HashMap<>(); + zeroMatchers.putAll( Map.of( STANDARD_INDEXING_COUNT, equalTo(0L), @@ -191,22 +201,35 @@ public void testIndicesMetrics() { equalTo(0L), STANDARD_INDEXING_FAILURE, equalTo(0L), - + STANDARD_INDEXING_FAILURE_DUE_TO_VERSION_CONFLICT, + equalTo(0L) + ) + ); + zeroMatchers.putAll( + Map.of( TIME_SERIES_INDEXING_COUNT, equalTo(0L), TIME_SERIES_INDEXING_TIME, equalTo(0L), TIME_SERIES_INDEXING_FAILURE, equalTo(0L), - + TIME_SERIES_INDEXING_FAILURE_DUE_TO_VERSION_CONFLICT, + equalTo(0L) + ) + ); + zeroMatchers.putAll( + Map.of( LOGSDB_INDEXING_COUNT, equalTo(0L), LOGSDB_INDEXING_TIME, equalTo(0L), LOGSDB_INDEXING_FAILURE, + equalTo(0L), + LOGSDB_INDEXING_FAILURE_DUE_TO_VERSION_CONFLICT, equalTo(0L) ) ); + collectThenAssertMetrics(telemetry, 4, zeroMatchers); String searchNode = internalCluster().startDataOnlyNode(); indicesService = internalCluster().getInstance(IndicesService.class, searchNode); telemetry = internalCluster().getInstance(PluginsService.class, searchNode) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/NodeIndexingMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/NodeIndexingMetricsIT.java index 1635a08e1768b..04130d176b9e5 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/NodeIndexingMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/NodeIndexingMetricsIT.java @@ -9,14 +9,17 @@ package org.elasticsearch.monitor.metrics; +import org.apache.lucene.analysis.TokenStream; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -25,6 +28,13 @@ import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexingPressure; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.analysis.AbstractTokenFilterFactory; +import org.elasticsearch.index.analysis.TokenFilterFactory; +import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.indices.analysis.AnalysisModule; +import org.elasticsearch.plugins.AnalysisPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.rest.RestStatus; @@ -43,13 +53,16 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import static java.util.Collections.singletonMap; import static org.elasticsearch.index.IndexingPressure.MAX_COORDINATING_BYTES; import static org.elasticsearch.index.IndexingPressure.MAX_PRIMARY_BYTES; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) @@ -66,7 +79,7 @@ public List> getSettings() { @Override protected Collection> nodePlugins() { - return List.of(TestTelemetryPlugin.class, TestAPMInternalSettings.class); + return List.of(TestTelemetryPlugin.class, TestAPMInternalSettings.class, TestAnalysisPlugin.class); } @Override @@ -77,6 +90,197 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { .build(); } + public void testZeroMetricsForVersionConflictsForNonIndexingOperations() { + final String dataNode = internalCluster().startNode(); + ensureStableCluster(1); + + final TestTelemetryPlugin plugin = internalCluster().getInstance(PluginsService.class, dataNode) + .filterPlugins(TestTelemetryPlugin.class) + .findFirst() + .orElseThrow(); + plugin.resetMeter(); + + assertAcked(prepareCreate("index_no_refresh", Settings.builder().put("index.refresh_interval", "-1"))); + assertAcked(prepareCreate("index_with_default_refresh")); + + for (String indexName : List.of("index_no_refresh", "index_with_default_refresh")) { + String docId = randomUUID(); + client(dataNode).index(new IndexRequest(indexName).id(docId).source(Map.of())).actionGet(); + // test version conflicts are counted when getting from the translog + if (randomBoolean()) { + // this get has the side effect of tracking translog location in the live version map, + // which potentially influences the engine conflict exception path + client(dataNode).get(new GetRequest(indexName, docId).realtime(randomBoolean())).actionGet(); + } + { + var e = expectThrows( + VersionConflictEngineException.class, + () -> client(dataNode).get( + new GetRequest(indexName, docId).version(10).versionType(randomFrom(VersionType.EXTERNAL, VersionType.EXTERNAL_GTE)) + ).actionGet() + ); + assertThat(e.getMessage(), containsString("version conflict")); + assertThat(e.status(), is(RestStatus.CONFLICT)); + } + if (randomBoolean()) { + client(dataNode).get(new GetRequest(indexName, docId).realtime(false)).actionGet(); + } + client(dataNode).admin().indices().prepareRefresh(indexName).get(); + { + var e = expectThrows( + VersionConflictEngineException.class, + () -> client(dataNode).get( + new GetRequest(indexName, docId).version(5) + .versionType(randomFrom(VersionType.EXTERNAL, VersionType.EXTERNAL_GTE)) + .realtime(false) + ).actionGet() + ); + assertThat(e.getMessage(), containsString("version conflict")); + assertThat(e.status(), is(RestStatus.CONFLICT)); + } + // updates + { + var e = expectThrows( + VersionConflictEngineException.class, + () -> client(dataNode).update( + new UpdateRequest(indexName, docId).setIfPrimaryTerm(1) + .setIfSeqNo(randomIntBetween(2, 5)) + .doc(Map.of(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10))) + ).actionGet() + ); + assertThat(e.getMessage(), containsString("version conflict")); + assertThat(e.status(), is(RestStatus.CONFLICT)); + } + // deletes + { + var e = expectThrows( + VersionConflictEngineException.class, + () -> client(dataNode).delete( + new DeleteRequest(indexName, docId).setIfPrimaryTerm(randomIntBetween(2, 5)).setIfSeqNo(0) + ).actionGet() + ); + assertThat(e.getMessage(), containsString("version conflict")); + assertThat(e.status(), is(RestStatus.CONFLICT)); + } + } + + // simulate async apm `polling` call for metrics + plugin.collect(); + + // there are no indexing (version conflict) failures reported because only gets/updates/deletes generated the conflicts + // and those are not "indexing" operations + var indexingFailedTotal = getSingleRecordedMetric(plugin::getLongAsyncCounterMeasurement, "es.indexing.indexing.failed.total"); + assertThat(indexingFailedTotal.getLong(), equalTo(0L)); + var indexingFailedDueToVersionConflictTotal = getSingleRecordedMetric( + plugin::getLongAsyncCounterMeasurement, + "es.indexing.indexing.failed.version_conflict.total" + ); + assertThat(indexingFailedDueToVersionConflictTotal.getLong(), equalTo(0L)); + } + + public void testMetricsForIndexingVersionConflicts() { + final String dataNode = internalCluster().startNode(); + ensureStableCluster(1); + + final TestTelemetryPlugin plugin = internalCluster().getInstance(PluginsService.class, dataNode) + .filterPlugins(TestTelemetryPlugin.class) + .findFirst() + .orElseThrow(); + plugin.resetMeter(); + + assertAcked( + prepareCreate( + "test", + Settings.builder() + .put("index.refresh_interval", "-1") + .put("index.analysis.analyzer.test_analyzer.type", "custom") + .put("index.analysis.analyzer.test_analyzer.tokenizer", "standard") + .putList("index.analysis.analyzer.test_analyzer.filter", "test_token_filter") + ).setMapping(Map.of("properties", Map.of("test_field", Map.of("type", "text", "analyzer", "test_analyzer")))).get() + ); + + String docId = randomUUID(); + // successful index (with version) + client(dataNode).index( + new IndexRequest("test").id(docId) + .version(10) + .versionType(randomFrom(VersionType.EXTERNAL, VersionType.EXTERNAL_GTE)) + .source(Map.of()) + ).actionGet(); + // if_primary_term conflict + { + var e = expectThrows( + VersionConflictEngineException.class, + () -> client(dataNode).index(new IndexRequest("test").id(docId).source(Map.of()).setIfSeqNo(0).setIfPrimaryTerm(2)) + .actionGet() + ); + assertThat(e.getMessage(), containsString("version conflict")); + assertThat(e.status(), is(RestStatus.CONFLICT)); + } + // if_seq_no conflict + { + var e = expectThrows( + VersionConflictEngineException.class, + () -> client(dataNode).index(new IndexRequest("test").id(docId).source(Map.of()).setIfSeqNo(1).setIfPrimaryTerm(1)) + .actionGet() + ); + assertThat(e.getMessage(), containsString("version conflict")); + assertThat(e.status(), is(RestStatus.CONFLICT)); + } + // version conflict + { + var e = expectThrows( + VersionConflictEngineException.class, + () -> client(dataNode).index( + new IndexRequest("test").id(docId) + .source(Map.of()) + .version(3) + .versionType(randomFrom(VersionType.EXTERNAL, VersionType.EXTERNAL_GTE)) + ).actionGet() + ); + assertThat(e.getMessage(), containsString("version conflict")); + assertThat(e.status(), is(RestStatus.CONFLICT)); + } + // indexing failure that is NOT a version conflict + PluginsService pluginService = internalCluster().getInstance(PluginsService.class, dataNode); + pluginService.filterPlugins(TestAnalysisPlugin.class).forEach(p -> p.throwParsingError.set(true)); + { + var e = expectThrows( + MapperParsingException.class, + () -> client(dataNode).index(new IndexRequest("test").id(docId + "other").source(Map.of("test_field", "this will error"))) + .actionGet() + ); + assertThat(e.status(), is(RestStatus.BAD_REQUEST)); + } + + plugin.collect(); + + var indexingFailedTotal = getSingleRecordedMetric(plugin::getLongAsyncCounterMeasurement, "es.indexing.indexing.failed.total"); + assertThat(indexingFailedTotal.getLong(), equalTo(4L)); + var indexingFailedDueToVersionConflictTotal = getSingleRecordedMetric( + plugin::getLongAsyncCounterMeasurement, + "es.indexing.indexing.failed.version_conflict.total" + ); + assertThat(indexingFailedDueToVersionConflictTotal.getLong(), equalTo(3L)); + } + + public static final class TestAnalysisPlugin extends Plugin implements AnalysisPlugin { + final AtomicBoolean throwParsingError = new AtomicBoolean(false); + + @Override + public Map> getTokenFilters() { + return singletonMap("test_token_filter", (indexSettings, environment, name, settings) -> new AbstractTokenFilterFactory(name) { + @Override + public TokenStream create(TokenStream tokenStream) { + if (throwParsingError.get()) { + throw new MapperParsingException("simulate mapping parsing error"); + } + return tokenStream; + } + }); + } + } + public void testNodeIndexingMetricsArePublishing() { final String dataNode = internalCluster().startNode(); @@ -116,6 +320,11 @@ public void testNodeIndexingMetricsArePublishing() { var indexingFailedTotal = getSingleRecordedMetric(plugin::getLongAsyncCounterMeasurement, "es.indexing.indexing.failed.total"); assertThat(indexingFailedTotal.getLong(), equalTo(0L)); + var indexingFailedDueToVersionConflictTotal = getSingleRecordedMetric( + plugin::getLongAsyncCounterMeasurement, + "es.indexing.indexing.failed.version_conflict.total" + ); + assertThat(indexingFailedDueToVersionConflictTotal.getLong(), equalTo(0L)); var deletionTotal = getSingleRecordedMetric(plugin::getLongAsyncCounterMeasurement, "es.indexing.deletion.docs.total"); assertThat(deletionTotal.getLong(), equalTo((long) deletesCount)); diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 0587ab8b46edc..39f9a1f34af3c 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -152,6 +152,7 @@ static TransportVersion def(int id) { public static final TransportVersion TEXT_EMBEDDING_QUERY_VECTOR_BUILDER_INFER_MODEL_ID = def(8_817_00_0); public static final TransportVersion ESQL_ENABLE_NODE_LEVEL_REDUCTION = def(8_818_00_0); public static final TransportVersion JINA_AI_INTEGRATION_ADDED = def(8_819_00_0); + public static final TransportVersion TRACK_INDEX_FAILED_DUE_TO_VERSION_CONFLICT_METRIC = def(8_820_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java b/server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java index b0a4d333ba77f..62e456d95f467 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java @@ -34,6 +34,7 @@ public static class Stats implements Writeable, ToXContentFragment { private long indexTimeInMillis; private long indexCurrent; private long indexFailedCount; + private long indexFailedDueToVersionConflictCount; private long deleteCount; private long deleteTimeInMillis; private long deleteCurrent; @@ -50,6 +51,9 @@ public Stats(StreamInput in) throws IOException { indexTimeInMillis = in.readVLong(); indexCurrent = in.readVLong(); indexFailedCount = in.readVLong(); + if (in.getTransportVersion().onOrAfter(TransportVersions.TRACK_INDEX_FAILED_DUE_TO_VERSION_CONFLICT_METRIC)) { + indexFailedDueToVersionConflictCount = in.readVLong(); + } deleteCount = in.readVLong(); deleteTimeInMillis = in.readVLong(); deleteCurrent = in.readVLong(); @@ -67,6 +71,7 @@ public Stats( long indexTimeInMillis, long indexCurrent, long indexFailedCount, + long indexFailedDueToVersionConflictCount, long deleteCount, long deleteTimeInMillis, long deleteCurrent, @@ -80,6 +85,7 @@ public Stats( this.indexTimeInMillis = indexTimeInMillis; this.indexCurrent = indexCurrent; this.indexFailedCount = indexFailedCount; + this.indexFailedDueToVersionConflictCount = indexFailedDueToVersionConflictCount; this.deleteCount = deleteCount; this.deleteTimeInMillis = deleteTimeInMillis; this.deleteCurrent = deleteCurrent; @@ -96,6 +102,7 @@ public void add(Stats stats) { indexTimeInMillis += stats.indexTimeInMillis; indexCurrent += stats.indexCurrent; indexFailedCount += stats.indexFailedCount; + indexFailedDueToVersionConflictCount += stats.indexFailedDueToVersionConflictCount; deleteCount += stats.deleteCount; deleteTimeInMillis += stats.deleteTimeInMillis; @@ -124,6 +131,13 @@ public long getIndexFailedCount() { return indexFailedCount; } + /** + * The number of indexing operations that failed because of a version conflict (a subset of all index failed operations) + */ + public long getIndexFailedDueToVersionConflictCount() { + return indexFailedDueToVersionConflictCount; + } + /** * The total amount of time spend on executing index operations. */ @@ -191,6 +205,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(indexTimeInMillis); out.writeVLong(indexCurrent); out.writeVLong(indexFailedCount); + if (out.getTransportVersion().onOrAfter(TransportVersions.TRACK_INDEX_FAILED_DUE_TO_VERSION_CONFLICT_METRIC)) { + out.writeVLong(indexFailedDueToVersionConflictCount); + } out.writeVLong(deleteCount); out.writeVLong(deleteTimeInMillis); out.writeVLong(deleteCurrent); @@ -209,6 +226,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.humanReadableField(Fields.INDEX_TIME_IN_MILLIS, Fields.INDEX_TIME, getIndexTime()); builder.field(Fields.INDEX_CURRENT, indexCurrent); builder.field(Fields.INDEX_FAILED, indexFailedCount); + builder.field(Fields.INDEX_FAILED_DUE_TO_VERSION_CONFLICT, indexFailedDueToVersionConflictCount); builder.field(Fields.DELETE_TOTAL, deleteCount); builder.humanReadableField(Fields.DELETE_TIME_IN_MILLIS, Fields.DELETE_TIME, getDeleteTime()); @@ -232,6 +250,7 @@ public boolean equals(Object o) { && indexTimeInMillis == that.indexTimeInMillis && indexCurrent == that.indexCurrent && indexFailedCount == that.indexFailedCount + && indexFailedDueToVersionConflictCount == that.indexFailedDueToVersionConflictCount && deleteCount == that.deleteCount && deleteTimeInMillis == that.deleteTimeInMillis && deleteCurrent == that.deleteCurrent @@ -249,6 +268,7 @@ public int hashCode() { indexTimeInMillis, indexCurrent, indexFailedCount, + indexFailedDueToVersionConflictCount, deleteCount, deleteTimeInMillis, deleteCurrent, @@ -323,12 +343,12 @@ public int hashCode() { static final class Fields { static final String INDEXING = "indexing"; - static final String TYPES = "types"; static final String INDEX_TOTAL = "index_total"; static final String INDEX_TIME = "index_time"; static final String INDEX_TIME_IN_MILLIS = "index_time_in_millis"; static final String INDEX_CURRENT = "index_current"; static final String INDEX_FAILED = "index_failed"; + static final String INDEX_FAILED_DUE_TO_VERSION_CONFLICT = "index_failed_due_to_version_conflict"; static final String DELETE_TOTAL = "delete_total"; static final String DELETE_TIME = "delete_time"; static final String DELETE_TIME_IN_MILLIS = "delete_time_in_millis"; diff --git a/server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java b/server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java index d0bb7629c3969..13d270ba36786 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java +++ b/server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java @@ -9,9 +9,11 @@ package org.elasticsearch.index.shard; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.VersionConflictEngineException; import java.util.concurrent.TimeUnit; @@ -78,6 +80,9 @@ public void postIndex(ShardId shardId, Engine.Index index, Exception ex) { if (index.origin().isRecovery() == false) { totalStats.indexCurrent.dec(); totalStats.indexFailed.inc(); + if (ExceptionsHelper.unwrapCause(ex) instanceof VersionConflictEngineException) { + totalStats.indexFailedDueToVersionConflicts.inc(); + } } } @@ -124,6 +129,7 @@ static class StatsHolder { private final MeanMetric deleteMetric = new MeanMetric(); private final CounterMetric indexCurrent = new CounterMetric(); private final CounterMetric indexFailed = new CounterMetric(); + private final CounterMetric indexFailedDueToVersionConflicts = new CounterMetric(); private final CounterMetric deleteCurrent = new CounterMetric(); private final CounterMetric noopUpdates = new CounterMetric(); @@ -140,6 +146,7 @@ IndexingStats.Stats stats( TimeUnit.NANOSECONDS.toMillis(totalIndexingTimeInNanos), indexCurrent.count(), indexFailed.count(), + indexFailedDueToVersionConflicts.count(), deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(), diff --git a/server/src/main/java/org/elasticsearch/monitor/metrics/IndicesMetrics.java b/server/src/main/java/org/elasticsearch/monitor/metrics/IndicesMetrics.java index 99011d101d342..dfaf6535e4d85 100644 --- a/server/src/main/java/org/elasticsearch/monitor/metrics/IndicesMetrics.java +++ b/server/src/main/java/org/elasticsearch/monitor/metrics/IndicesMetrics.java @@ -53,7 +53,7 @@ public IndicesMetrics(MeterRegistry meterRegistry, IndicesService indicesService } private static List registerAsyncMetrics(MeterRegistry registry, IndicesStatsCache cache) { - final int TOTAL_METRICS = 48; + final int TOTAL_METRICS = 52; List metrics = new ArrayList<>(TOTAL_METRICS); for (IndexMode indexMode : IndexMode.values()) { String name = indexMode.getName(); @@ -156,6 +156,14 @@ private static List registerAsyncMetrics(MeterRegistry registry, diffGauge(() -> cache.getOrRefresh().get(indexMode).indexing.getIndexFailedCount()) ) ); + metrics.add( + registry.registerLongGauge( + "es.indices." + name + ".indexing.failure.version_conflict.total", + "current indexing failures due to version conflict of " + name + " indices", + "unit", + diffGauge(() -> cache.getOrRefresh().get(indexMode).indexing.getIndexFailedDueToVersionConflictCount()) + ) + ); } assert metrics.size() == TOTAL_METRICS : "total number of metrics has changed"; return metrics; diff --git a/server/src/main/java/org/elasticsearch/monitor/metrics/NodeMetrics.java b/server/src/main/java/org/elasticsearch/monitor/metrics/NodeMetrics.java index 94395193622e0..2fabd03970f05 100644 --- a/server/src/main/java/org/elasticsearch/monitor/metrics/NodeMetrics.java +++ b/server/src/main/java/org/elasticsearch/monitor/metrics/NodeMetrics.java @@ -365,6 +365,22 @@ private void registerAsyncMetrics(MeterRegistry registry) { ) ); + metrics.add( + registry.registerLongAsyncCounter( + "es.indexing.indexing.failed.version_conflict.total", + "Total number of failed indexing operations due to version conflict", + "operations", + () -> new LongWithAttributes( + Optional.ofNullable(stats.getOrRefresh()) + .map(o -> o.getIndices()) + .map(o -> o.getIndexing()) + .map(o -> o.getTotal()) + .map(o -> o.getIndexFailedDueToVersionConflictCount()) + .orElse(0L) + ) + ) + ); + metrics.add( registry.registerLongAsyncCounter( "es.indexing.deletion.docs.total", diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java index a968ea4520f40..703f1f2c18408 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java @@ -303,6 +303,15 @@ protected Table getTableWithHeader(final RestRequest request) { "sibling:pri;alias:iif,indexingIndexFailed;default:false;text-align:right;desc:number of failed indexing ops" ); table.addCell("pri.indexing.index_failed", "default:false;text-align:right;desc:number of failed indexing ops"); + table.addCell( + "indexing.index_failed_due_to_version_conflict", + "sibling:pri;alias:iifvc,indexingIndexFailedDueToVersionConflict;default:false;text-align:right;" + + "desc:number of failed indexing ops due to version conflict" + ); + table.addCell( + "pri.indexing.index_failed_due_to_version_conflict", + "default:false;text-align:right;desc:number of failed indexing ops due to version conflict" + ); table.addCell("merges.current", "sibling:pri;alias:mc,mergesCurrent;default:false;text-align:right;desc:number of current merges"); table.addCell("pri.merges.current", "default:false;text-align:right;desc:number of current merges"); @@ -670,6 +679,13 @@ Table buildTable( table.addCell(totalStats.getIndexing() == null ? null : totalStats.getIndexing().getTotal().getIndexFailedCount()); table.addCell(primaryStats.getIndexing() == null ? null : primaryStats.getIndexing().getTotal().getIndexFailedCount()); + table.addCell( + totalStats.getIndexing() == null ? null : totalStats.getIndexing().getTotal().getIndexFailedDueToVersionConflictCount() + ); + table.addCell( + primaryStats.getIndexing() == null ? null : primaryStats.getIndexing().getTotal().getIndexFailedDueToVersionConflictCount() + ); + table.addCell(totalStats.getMerge() == null ? null : totalStats.getMerge().getCurrent()); table.addCell(primaryStats.getMerge() == null ? null : primaryStats.getMerge().getCurrent()); diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java index 39e679f2c0ad0..5560bd6dc49c8 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java @@ -230,6 +230,11 @@ protected Table getTableWithHeader(final RestRequest request) { "indexing.index_failed", "alias:iif,indexingIndexFailed;default:false;text-align:right;desc:number of failed indexing ops" ); + table.addCell( + "indexing.index_failed_due_to_version_conflict", + "alias:iifvc,indexingIndexFailedDueToVersionConflict;default:false;text-align:right;" + + "desc:number of failed indexing ops due to version conflict" + ); table.addCell("merges.current", "alias:mc,mergesCurrent;default:false;text-align:right;desc:number of current merges"); table.addCell( @@ -466,6 +471,7 @@ Table buildTable( table.addCell(indexingStats == null ? null : indexingStats.getTotal().getIndexTime()); table.addCell(indexingStats == null ? null : indexingStats.getTotal().getIndexCount()); table.addCell(indexingStats == null ? null : indexingStats.getTotal().getIndexFailedCount()); + table.addCell(indexingStats == null ? null : indexingStats.getTotal().getIndexFailedDueToVersionConflictCount()); MergeStats mergeStats = indicesStats == null ? null : indicesStats.getMerge(); table.addCell(mergeStats == null ? null : mergeStats.getCurrent()); diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java index 8b5e21de2d741..717e6e2d8f35b 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java @@ -169,6 +169,11 @@ protected Table getTableWithHeader(final RestRequest request) { "indexing.index_failed", "alias:iif,indexingIndexFailed;default:false;text-align:right;desc:number of failed indexing ops" ); + table.addCell( + "indexing.index_failed_due_to_version_conflict", + "alias:iifvc,indexingIndexFailedDueToVersionConflict;default:false;text-align:right;" + + "desc:number of failed indexing ops due to version conflict" + ); table.addCell("merges.current", "alias:mc,mergesCurrent;default:false;text-align:right;desc:number of current merges"); table.addCell( @@ -373,6 +378,7 @@ Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsRe table.addCell(getOrNull(commonStats, CommonStats::getIndexing, i -> i.getTotal().getIndexTime())); table.addCell(getOrNull(commonStats, CommonStats::getIndexing, i -> i.getTotal().getIndexCount())); table.addCell(getOrNull(commonStats, CommonStats::getIndexing, i -> i.getTotal().getIndexFailedCount())); + table.addCell(getOrNull(commonStats, CommonStats::getIndexing, i -> i.getTotal().getIndexFailedDueToVersionConflictCount())); table.addCell(getOrNull(commonStats, CommonStats::getMerge, MergeStats::getCurrent)); table.addCell(getOrNull(commonStats, CommonStats::getMerge, MergeStats::getCurrentNumDocs)); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index dccdbee23c775..0e80b70c4a0fd 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -587,6 +587,7 @@ private static CommonStats createShardLevelCommonStats() { ++iota, ++iota, ++iota, + ++iota, false, ++iota, ++iota, diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsTests.java index 2885673ba5539..1709e2d4d372f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsTests.java @@ -113,7 +113,21 @@ private ShardStats createShardStats( commonStats.getIndexing() .getTotal() .add( - new IndexingStats.Stats(0, 0, 0, 0, 0, 0, 0, 0, false, 0, totalIndexingTimeSinceShardStartedInNanos, totalActiveTimeInNanos) + new IndexingStats.Stats( + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + false, + 0, + totalIndexingTimeSinceShardStartedInNanos, + totalActiveTimeInNanos + ) ); return new ShardStats(shardRouting, commonStats, null, null, null, null, null, false, false, 0); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index eacb4cf35a422..7d436ab5d8d22 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -84,6 +84,7 @@ import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.engine.Segment; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.fielddata.FieldDataContext; import org.elasticsearch.index.fielddata.FieldDataStats; import org.elasticsearch.index.fielddata.IndexFieldData; @@ -1729,6 +1730,86 @@ public String[] listAll() throws IOException { } } + public void testIndexingErrors() throws IOException { + AtomicBoolean throwOnIndex = new AtomicBoolean(); + IndexShard shard = newStartedShard(randomBoolean(), Settings.EMPTY, config -> new InternalEngine(config) { + @Override + public IndexResult index(Index index) throws IOException { + if (throwOnIndex.get()) { + throw new IOException("test indexing errors"); + } else { + return super.index(index); + } + } + }); + long nbIndexedDocs = randomIntBetween(1, 10); + AtomicLong nbFailed = new AtomicLong(); + for (int id = 0; id < nbIndexedDocs; id++) { + throwOnIndex.set(randomBoolean()); + if (throwOnIndex.get()) { + nbFailed.incrementAndGet(); + int finalId = id; + expectThrows(IOException.class, () -> indexDoc(shard, "_doc", "test" + finalId)); + } else { + Engine.IndexResult indexResult = indexDoc(shard, "_doc", "test" + id); + assertThat(indexResult.isCreated(), is(true)); + } + } + assertThat(shard.indexingStats().getTotal().getIndexFailedCount(), equalTo(nbFailed.get())); + assertThat(shard.indexingStats().getTotal().getIndexFailedDueToVersionConflictCount(), equalTo(0L)); + assertThat(shard.indexingStats().getTotal().getIndexCount(), equalTo(nbIndexedDocs - nbFailed.get())); + closeShards(shard); + } + + public void testIndexingErrorsDueToVersionConflict() throws IOException { + AtomicBoolean throwOnIndex = new AtomicBoolean(); + IndexShard shard = newStartedShard(true, Settings.EMPTY, config -> new InternalEngine(config) { + @Override + public IndexResult index(Index index) throws IOException { + if (throwOnIndex.get()) { + throw new IOException("test indexing errors"); + } else { + return super.index(index); + } + } + }); + long nbIndexedDocs = randomIntBetween(1, 10); + AtomicLong indexingFailedCount = new AtomicLong(); + AtomicLong indexingFailedWithVersionConflictCount = new AtomicLong(); + AtomicLong indexingSuccessCount = new AtomicLong(); + for (int id = 0; id < nbIndexedDocs; id++) { + if (randomBoolean()) { + // version conflict + throwOnIndex.set(false); + indexingFailedWithVersionConflictCount.incrementAndGet(); + indexingFailedCount.incrementAndGet(); + Engine.IndexResult indexResult = indexDoc(shard, "test" + id, 10L, "{}", XContentType.JSON, null); + assertThat(indexResult.isCreated(), is(false)); + assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); + } else { + throwOnIndex.set(randomBoolean()); + int finalId = id; + if (throwOnIndex.get()) { + // indexing failure + indexingFailedCount.incrementAndGet(); + expectThrows(IOException.class, () -> indexDoc(shard, "_doc", "test" + finalId)); + } else { + // indexing successful + indexingSuccessCount.incrementAndGet(); + Engine.IndexResult indexResult = indexDoc(shard, "_doc", "test" + id); + assertThat(indexResult.isCreated(), is(true)); + } + } + } + assertThat(shard.indexingStats().getTotal().getIndexCount(), equalTo(indexingSuccessCount.get())); + assertThat(shard.indexingStats().getTotal().getIndexFailedCount(), equalTo(indexingFailedCount.get())); + assertThat( + shard.indexingStats().getTotal().getIndexFailedDueToVersionConflictCount(), + equalTo(indexingFailedWithVersionConflictCount.get()) + ); + closeShards(shard); + } + public void testRefreshMetric() throws IOException { IndexShard shard = newStartedShard(); // refresh on: finalize and end of recovery diff --git a/server/src/test/java/org/elasticsearch/rest/action/cat/RestShardsActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/cat/RestShardsActionTests.java index 29857ef4a519f..b1da067e2f7e6 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/cat/RestShardsActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/cat/RestShardsActionTests.java @@ -11,7 +11,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.stats.CommonStats; -import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.cluster.ClusterState; @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.Table; +import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.FakeRestRequest; @@ -40,11 +41,91 @@ public class RestShardsActionTests extends ESTestCase { + private DiscoveryNode localNode; + private List shardRoutings; + private ClusterStateResponse clusterStateResponse; + private IndicesStatsResponse indicesStatsResponse; + public void testBuildTable() { - final int numShards = randomIntBetween(1, 5); - DiscoveryNode localNode = DiscoveryNodeUtils.create("local"); + mockShardStats(randomBoolean()); + + final RestShardsAction action = new RestShardsAction(); + final Table table = action.buildTable(new FakeRestRequest(), clusterStateResponse, indicesStatsResponse); + + // now, verify the table is correct + List headers = table.getHeaders(); + assertThat(headers.get(0).value, equalTo("index")); + assertThat(headers.get(1).value, equalTo("shard")); + assertThat(headers.get(2).value, equalTo("prirep")); + assertThat(headers.get(3).value, equalTo("state")); + assertThat(headers.get(4).value, equalTo("docs")); + assertThat(headers.get(5).value, equalTo("store")); + assertThat(headers.get(6).value, equalTo("dataset")); + assertThat(headers.get(7).value, equalTo("ip")); + assertThat(headers.get(8).value, equalTo("id")); + assertThat(headers.get(9).value, equalTo("node")); + assertThat(headers.get(10).value, equalTo("unassigned.reason")); + + final List> rows = table.getRows(); + assertThat(rows.size(), equalTo(shardRoutings.size())); + + Iterator shardRoutingsIt = shardRoutings.iterator(); + for (final List row : rows) { + ShardRouting shardRouting = shardRoutingsIt.next(); + ShardStats shardStats = indicesStatsResponse.asMap().get(shardRouting); + assertThat(row.get(0).value, equalTo(shardRouting.getIndexName())); + assertThat(row.get(1).value, equalTo(shardRouting.getId())); + assertThat(row.get(2).value, equalTo(shardRouting.primary() ? "p" : "r")); + assertThat(row.get(3).value, equalTo(shardRouting.state())); + assertThat(row.get(7).value, equalTo(localNode.getHostAddress())); + assertThat(row.get(8).value, equalTo(localNode.getId())); + assertThat(row.get(70).value, equalTo(shardStats.getDataPath())); + assertThat(row.get(71).value, equalTo(shardStats.getStatePath())); + } + } + + public void testShardStatsForIndexing() { + mockShardStats(true); + + final RestShardsAction action = new RestShardsAction(); + final Table table = action.buildTable(new FakeRestRequest(), clusterStateResponse, indicesStatsResponse); + + // now, verify the table is correct + List headers = table.getHeaders(); + assertThat(headers.get(29).value, equalTo("indexing.delete_current")); + assertThat(headers.get(30).value, equalTo("indexing.delete_time")); + assertThat(headers.get(31).value, equalTo("indexing.delete_total")); + assertThat(headers.get(32).value, equalTo("indexing.index_current")); + assertThat(headers.get(33).value, equalTo("indexing.index_time")); + assertThat(headers.get(34).value, equalTo("indexing.index_total")); + assertThat(headers.get(35).value, equalTo("indexing.index_failed")); + assertThat(headers.get(36).value, equalTo("indexing.index_failed_due_to_version_conflict")); + + final List> rows = table.getRows(); + assertThat(rows.size(), equalTo(shardRoutings.size())); - List shardRoutings = new ArrayList<>(numShards); + Iterator shardRoutingsIt = shardRoutings.iterator(); + for (final List row : rows) { + ShardRouting shardRouting = shardRoutingsIt.next(); + ShardStats shardStats = indicesStatsResponse.asMap().get(shardRouting); + assertThat(row.get(29).value, equalTo(shardStats.getStats().getIndexing().getTotal().getDeleteCurrent())); + assertThat(row.get(30).value, equalTo(shardStats.getStats().getIndexing().getTotal().getDeleteTime())); + assertThat(row.get(31).value, equalTo(shardStats.getStats().getIndexing().getTotal().getDeleteCount())); + assertThat(row.get(32).value, equalTo(shardStats.getStats().getIndexing().getTotal().getIndexCurrent())); + assertThat(row.get(33).value, equalTo(shardStats.getStats().getIndexing().getTotal().getIndexTime())); + assertThat(row.get(34).value, equalTo(shardStats.getStats().getIndexing().getTotal().getIndexCount())); + assertThat(row.get(35).value, equalTo(shardStats.getStats().getIndexing().getTotal().getIndexFailedCount())); + assertThat( + row.get(36).value, + equalTo(shardStats.getStats().getIndexing().getTotal().getIndexFailedDueToVersionConflictCount()) + ); + } + } + + private void mockShardStats(boolean includeCommonStats) { + final int numShards = randomIntBetween(1, 5); + this.localNode = DiscoveryNodeUtils.create("local"); + this.shardRoutings = new ArrayList<>(numShards); Map shardStatsMap = new HashMap<>(); String index = "index"; for (int i = 0; i < numShards; i++) { @@ -53,10 +134,33 @@ public void testBuildTable() { Path path = createTempDir().resolve("indices") .resolve(shardRouting.shardId().getIndex().getUUID()) .resolve(String.valueOf(shardRouting.shardId().id())); + CommonStats commonStats = null; + if (includeCommonStats) { + commonStats = new CommonStats(randomFrom(CommonStatsFlags.ALL, new CommonStatsFlags(CommonStatsFlags.Flag.Indexing))); + commonStats.indexing.add( + new IndexingStats( + new IndexingStats.Stats( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomBoolean(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong() + ) + ) + ); + } ShardStats shardStats = new ShardStats( shardRouting, new ShardPath(false, path, path, shardRouting.shardId()), - null, + commonStats, null, null, null, @@ -67,56 +171,18 @@ public void testBuildTable() { shardRoutings.add(shardRouting); } - IndexStats indexStats = mock(IndexStats.class); - when(indexStats.getPrimaries()).thenReturn(new CommonStats()); - when(indexStats.getTotal()).thenReturn(new CommonStats()); - - IndicesStatsResponse stats = mock(IndicesStatsResponse.class); - when(stats.asMap()).thenReturn(shardStatsMap); + this.indicesStatsResponse = mock(IndicesStatsResponse.class); + when(this.indicesStatsResponse.asMap()).thenReturn(shardStatsMap); DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class); when(discoveryNodes.get(localNode.getId())).thenReturn(localNode); - ClusterStateResponse state = mock(ClusterStateResponse.class); + this.clusterStateResponse = mock(ClusterStateResponse.class); RoutingTable routingTable = mock(RoutingTable.class); when(routingTable.allShardsIterator()).thenReturn(shardRoutings); ClusterState clusterState = mock(ClusterState.class); when(clusterState.routingTable()).thenReturn(routingTable); when(clusterState.nodes()).thenReturn(discoveryNodes); - when(state.getState()).thenReturn(clusterState); - - final RestShardsAction action = new RestShardsAction(); - final Table table = action.buildTable(new FakeRestRequest(), state, stats); - - // now, verify the table is correct - List headers = table.getHeaders(); - assertThat(headers.get(0).value, equalTo("index")); - assertThat(headers.get(1).value, equalTo("shard")); - assertThat(headers.get(2).value, equalTo("prirep")); - assertThat(headers.get(3).value, equalTo("state")); - assertThat(headers.get(4).value, equalTo("docs")); - assertThat(headers.get(5).value, equalTo("store")); - assertThat(headers.get(6).value, equalTo("dataset")); - assertThat(headers.get(7).value, equalTo("ip")); - assertThat(headers.get(8).value, equalTo("id")); - assertThat(headers.get(9).value, equalTo("node")); - assertThat(headers.get(10).value, equalTo("unassigned.reason")); - - final List> rows = table.getRows(); - assertThat(rows.size(), equalTo(numShards)); - - Iterator shardRoutingsIt = shardRoutings.iterator(); - for (final List row : rows) { - ShardRouting shardRouting = shardRoutingsIt.next(); - ShardStats shardStats = shardStatsMap.get(shardRouting); - assertThat(row.get(0).value, equalTo(shardRouting.getIndexName())); - assertThat(row.get(1).value, equalTo(shardRouting.getId())); - assertThat(row.get(2).value, equalTo(shardRouting.primary() ? "p" : "r")); - assertThat(row.get(3).value, equalTo(shardRouting.state())); - assertThat(row.get(7).value, equalTo(localNode.getHostAddress())); - assertThat(row.get(8).value, equalTo(localNode.getId())); - assertThat(row.get(69).value, equalTo(shardStats.getDataPath())); - assertThat(row.get(70).value, equalTo(shardStats.getStatePath())); - } + when(clusterStateResponse.getState()).thenReturn(clusterState); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 5b9a9e7a4efee..2ae4bb0343101 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -993,12 +993,23 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id) } protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id, String source) throws IOException { - return indexDoc(shard, id, source, XContentType.JSON, null); + return indexDoc(shard, id, Versions.MATCH_ANY, source, XContentType.JSON, null); } - // Uses an auto-generated ID if `id` is null/empty protected Engine.IndexResult indexDoc(IndexShard shard, String id, String source, XContentType xContentType, String routing) throws IOException { + return indexDoc(shard, id, Versions.MATCH_ANY, source, xContentType, routing); + } + + // Uses an auto-generated ID if `id` is null/empty + protected Engine.IndexResult indexDoc( + IndexShard shard, + String id, + long version, + String source, + XContentType xContentType, + String routing + ) throws IOException { long autoGeneratedTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; if (Strings.isEmpty(id)) { id = UUIDs.base64UUID(); @@ -1008,7 +1019,7 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String id, String source Engine.IndexResult result; if (shard.routingEntry().primary()) { result = shard.applyIndexOperationOnPrimary( - Versions.MATCH_ANY, + version, VersionType.INTERNAL, sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, @@ -1024,7 +1035,7 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String id, String source .build() ); result = shard.applyIndexOperationOnPrimary( - Versions.MATCH_ANY, + version, VersionType.INTERNAL, sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java index b6c059b7a0dcc..4811d65e6ed85 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java @@ -389,7 +389,21 @@ private static CommonStats mockCommonStats() { commonStats.getStore().add(new StoreStats(++iota, no, no)); commonStats.getRefresh().add(new RefreshStats(no, ++iota, no, ++iota, (int) no)); - final IndexingStats.Stats indexingStats = new IndexingStats.Stats(++iota, ++iota, no, no, no, no, no, no, false, ++iota, no, no); + final IndexingStats.Stats indexingStats = new IndexingStats.Stats( + ++iota, + ++iota, + no, + no, + no, + no, + no, + no, + no, + false, + ++iota, + no, + no + ); commonStats.getIndexing().add(new IndexingStats(indexingStats)); final SearchStats.Stats searchStats = new SearchStats.Stats(++iota, ++iota, no, no, no, no, no, no, no, no, no, no, no, no); diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java index 6822f54633bdc..ca7651ce84497 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java @@ -183,7 +183,7 @@ private CommonStats mockCommonStats() { commonStats.getDocs().add(new DocsStats(1L, 0L, randomNonNegativeLong() >> 8)); // >> 8 to avoid overflow - we add these things up commonStats.getStore().add(new StoreStats(2L, 0L, 0L)); - final IndexingStats.Stats indexingStats = new IndexingStats.Stats(3L, 4L, 0L, 0L, 0L, 0L, 0L, 0L, true, 5L, 0, 0); + final IndexingStats.Stats indexingStats = new IndexingStats.Stats(3L, 4L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, true, 5L, 0, 0); commonStats.getIndexing().add(new IndexingStats(indexingStats)); final SearchStats.Stats searchStats = new SearchStats.Stats(6L, 7L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L); diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java index 3d7f843358646..be87b92479c21 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java @@ -344,6 +344,7 @@ private static NodeStats mockNodeStats() { no, no, no, + no, false, ++iota, no,