Skip to content

Commit

Permalink
Expose operation and request counts separately in repository stats (#…
Browse files Browse the repository at this point in the history
…117530)

Relates ES-9767
Fixes #104443
  • Loading branch information
nicktindall authored Dec 3, 2024
1 parent 447c4dc commit 7d43d8a
Show file tree
Hide file tree
Showing 21 changed files with 498 additions and 211 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/117530.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 117530
summary: Expose operation and request counts separately in repository stats
area: Snapshot/Restore
type: enhancement
issues:
- 104443
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

package org.elasticsearch.repositories.azure;

import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

Expand All @@ -34,7 +33,6 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -48,6 +46,7 @@
import java.util.stream.IntStream;

import static org.elasticsearch.repositories.azure.AbstractAzureServerTestCase.randomBlobContent;
import static org.elasticsearch.repositories.azure.ResponseInjectingAzureHttpHandler.createFailNRequestsHandler;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
Expand All @@ -61,7 +60,7 @@ public class AzureBlobStoreRepositoryMetricsTests extends AzureBlobStoreReposito
);
private static final int MAX_RETRIES = 3;

private final Queue<RequestHandler> requestHandlers = new ConcurrentLinkedQueue<>();
private final Queue<ResponseInjectingAzureHttpHandler.RequestHandler> requestHandlers = new ConcurrentLinkedQueue<>();

@Override
protected Map<String, HttpHandler> createHttpHandlers() {
Expand Down Expand Up @@ -106,7 +105,8 @@ public void testThrottleResponsesAreCountedInMetrics() throws IOException {

// Queue up some throttle responses
final int numThrottles = randomIntBetween(1, MAX_RETRIES);
IntStream.range(0, numThrottles).forEach(i -> requestHandlers.offer(new FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS)));
IntStream.range(0, numThrottles)
.forEach(i -> requestHandlers.offer(new ResponseInjectingAzureHttpHandler.FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS)));

// Check that the blob exists
blobContainer.blobExists(purpose, blobName);
Expand All @@ -131,7 +131,13 @@ public void testRangeNotSatisfiedAreCountedInMetrics() throws IOException {
clearMetrics(dataNodeName);

// Queue up a range-not-satisfied error
requestHandlers.offer(new FixedRequestHandler(RestStatus.REQUESTED_RANGE_NOT_SATISFIED, null, GET_BLOB_REQUEST_PREDICATE));
requestHandlers.offer(
new ResponseInjectingAzureHttpHandler.FixedRequestHandler(
RestStatus.REQUESTED_RANGE_NOT_SATISFIED,
null,
GET_BLOB_REQUEST_PREDICATE
)
);

// Attempt to read the blob
assertThrows(RequestedRangeNotSatisfiedException.class, () -> blobContainer.readBlob(purpose, blobName));
Expand Down Expand Up @@ -163,7 +169,7 @@ public void testErrorResponsesAreCountedInMetrics() throws IOException {
if (status == RestStatus.TOO_MANY_REQUESTS) {
throttles.incrementAndGet();
}
requestHandlers.offer(new FixedRequestHandler(status));
requestHandlers.offer(new ResponseInjectingAzureHttpHandler.FixedRequestHandler(status));
});

// Check that the blob exists
Expand Down Expand Up @@ -259,7 +265,7 @@ public void testBatchDeleteFailure() throws IOException {
clearMetrics(dataNodeName);

// Handler will fail one or more of the batch requests
final RequestHandler failNRequestRequestHandler = createFailNRequestsHandler(failedBatches);
final ResponseInjectingAzureHttpHandler.RequestHandler failNRequestRequestHandler = createFailNRequestsHandler(failedBatches);

// Exhaust the retries
IntStream.range(0, (numberOfBatches - failedBatches) + (failedBatches * (MAX_RETRIES + 1)))
Expand Down Expand Up @@ -287,35 +293,6 @@ private long getLongCounterTotal(String dataNodeName, String metricKey) {
.reduce(0L, Long::sum);
}

/**
* Creates a {@link RequestHandler} that will persistently fail the first <code>numberToFail</code> distinct requests
* it sees. Any other requests are passed through to the delegate.
*
* @param numberToFail The number of requests to fail
* @return the handler
*/
private static RequestHandler createFailNRequestsHandler(int numberToFail) {
final List<String> requestsToFail = new ArrayList<>(numberToFail);
return (exchange, delegate) -> {
final Headers requestHeaders = exchange.getRequestHeaders();
final String requestId = requestHeaders.get("X-ms-client-request-id").get(0);
boolean failRequest = false;
synchronized (requestsToFail) {
if (requestsToFail.contains(requestId)) {
failRequest = true;
} else if (requestsToFail.size() < numberToFail) {
requestsToFail.add(requestId);
failRequest = true;
}
}
if (failRequest) {
exchange.sendResponseHeaders(500, -1);
} else {
delegate.handle(exchange);
}
};
}

private void clearMetrics(String discoveryNode) {
internalCluster().getInstance(PluginsService.class, discoveryNode)
.filterPlugins(TestTelemetryPlugin.class)
Expand Down Expand Up @@ -480,80 +457,4 @@ private void assertMatchingMetricRecorded(MetricType metricType, String metricNa
assertion.accept(measurement);
}
}

@SuppressForbidden(reason = "we use a HttpServer to emulate Azure")
private static class ResponseInjectingAzureHttpHandler implements DelegatingHttpHandler {

private final HttpHandler delegate;
private final Queue<RequestHandler> requestHandlerQueue;

ResponseInjectingAzureHttpHandler(Queue<RequestHandler> requestHandlerQueue, HttpHandler delegate) {
this.delegate = delegate;
this.requestHandlerQueue = requestHandlerQueue;
}

@Override
public void handle(HttpExchange exchange) throws IOException {
RequestHandler nextHandler = requestHandlerQueue.peek();
if (nextHandler != null && nextHandler.matchesRequest(exchange)) {
requestHandlerQueue.poll().writeResponse(exchange, delegate);
} else {
delegate.handle(exchange);
}
}

@Override
public HttpHandler getDelegate() {
return delegate;
}
}

@SuppressForbidden(reason = "we use a HttpServer to emulate Azure")
@FunctionalInterface
private interface RequestHandler {
void writeResponse(HttpExchange exchange, HttpHandler delegate) throws IOException;

default boolean matchesRequest(HttpExchange exchange) {
return true;
}
}

@SuppressForbidden(reason = "we use a HttpServer to emulate Azure")
private static class FixedRequestHandler implements RequestHandler {

private final RestStatus status;
private final String responseBody;
private final Predicate<HttpExchange> requestMatcher;

FixedRequestHandler(RestStatus status) {
this(status, null, req -> true);
}

/**
* Create a handler that only gets executed for requests that match the supplied predicate. Note
* that because the errors are stored in a queue this will prevent any subsequently queued errors from
* being returned until after it returns.
*/
FixedRequestHandler(RestStatus status, String responseBody, Predicate<HttpExchange> requestMatcher) {
this.status = status;
this.responseBody = responseBody;
this.requestMatcher = requestMatcher;
}

@Override
public boolean matchesRequest(HttpExchange exchange) {
return requestMatcher.test(exchange);
}

@Override
public void writeResponse(HttpExchange exchange, HttpHandler delegateHandler) throws IOException {
if (responseBody != null) {
byte[] responseBytes = responseBody.getBytes(StandardCharsets.UTF_8);
exchange.sendResponseHeaders(status.getStatus(), responseBytes.length);
exchange.getResponseBody().write(responseBytes);
} else {
exchange.sendResponseHeaders(status.getStatus(), -1);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
Expand All @@ -54,13 +53,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_OPERATIONS_TOTAL;
import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_REQUESTS_TOTAL;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
Expand Down Expand Up @@ -230,21 +227,13 @@ protected String requestUniqueId(final HttpExchange exchange) {
*/
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
private static class AzureHTTPStatsCollectorHandler extends HttpStatsCollectorHandler {
private final Set<String> seenRequestIds = ConcurrentCollections.newConcurrentSet();

private AzureHTTPStatsCollectorHandler(HttpHandler delegate) {
super(delegate);
}

@Override
protected void maybeTrack(String request, Headers headers) {
// Same request id is a retry
// https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-ncnbi/817da997-30d2-4cd3-972f-a0073e4e98f7
// Do not count retries since the client side request stats do not track them yet.
// See https://github.com/elastic/elasticsearch/issues/104443
if (false == seenRequestIds.add(headers.getFirst("X-ms-client-request-id"))) {
return;
}
if (GET_BLOB_PATTERN.test(request)) {
trackRequest("GetBlob");
} else if (Regex.simpleMatch("HEAD /*/*/*", request)) {
Expand Down Expand Up @@ -393,14 +382,14 @@ public void testMetrics() throws Exception {
}

final AzureBlobStore blobStore = (AzureBlobStore) blobStoreRepository.blobStore();
final Map<AzureBlobStore.StatsKey, LongAdder> statsCollectors = blobStore.getMetricsRecorder().opsCounters;
final Map<AzureBlobStore.StatsKey, AzureBlobStore.StatsCounter> statsCounters = blobStore.getMetricsRecorder().statsCounters;

final List<Measurement> metrics = Measurement.combine(
getTelemetryPlugin(nodeName).getLongCounterMeasurement(METRIC_OPERATIONS_TOTAL)
getTelemetryPlugin(nodeName).getLongCounterMeasurement(METRIC_REQUESTS_TOTAL)
);

assertThat(
statsCollectors.keySet().stream().map(AzureBlobStore.StatsKey::operation).collect(Collectors.toSet()),
statsCounters.keySet().stream().map(AzureBlobStore.StatsKey::operation).collect(Collectors.toSet()),
equalTo(
metrics.stream()
.map(m -> AzureBlobStore.Operation.fromKey((String) m.attributes().get("operation")))
Expand All @@ -417,8 +406,12 @@ public void testMetrics() throws Exception {
operation,
OperationPurpose.parse((String) metric.attributes().get("purpose"))
);
assertThat(nodeName + "/" + statsKey + " exists", statsCollectors, hasKey(statsKey));
assertThat(nodeName + "/" + statsKey + " has correct sum", metric.getLong(), equalTo(statsCollectors.get(statsKey).sum()));
assertThat(nodeName + "/" + statsKey + " exists", statsCounters, hasKey(statsKey));
assertThat(
nodeName + "/" + statsKey + " has correct sum",
metric.getLong(),
equalTo(statsCounters.get(statsKey).requests().sum())
);
aggregatedMetrics.compute(statsKey.operation(), (k, v) -> v == null ? metric.getLong() : v + metric.getLong());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.BlobStoreActionStats;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.common.blobstore.OptionalBytesReference;
Expand Down Expand Up @@ -695,7 +696,7 @@ private AzureBlobServiceClient getAzureBlobServiceClientClient(OperationPurpose
}

@Override
public Map<String, Long> stats() {
public Map<String, BlobStoreActionStats> stats() {
return requestMetricsRecorder.statsMap(service.isStateless());
}

Expand Down Expand Up @@ -737,27 +738,39 @@ public String toString() {
}
}

// visible for testing
record StatsCounter(LongAdder operations, LongAdder requests) {
StatsCounter() {
this(new LongAdder(), new LongAdder());
}

BlobStoreActionStats getEndpointStats() {
return new BlobStoreActionStats(operations.sum(), requests.sum());
}
}

// visible for testing
class RequestMetricsRecorder {
private final RepositoriesMetrics repositoriesMetrics;
final Map<StatsKey, LongAdder> opsCounters = new ConcurrentHashMap<>();
final Map<StatsKey, StatsCounter> statsCounters = new ConcurrentHashMap<>();
final Map<StatsKey, Map<String, Object>> opsAttributes = new ConcurrentHashMap<>();

RequestMetricsRecorder(RepositoriesMetrics repositoriesMetrics) {
this.repositoriesMetrics = repositoriesMetrics;
}

Map<String, Long> statsMap(boolean stateless) {
Map<String, BlobStoreActionStats> statsMap(boolean stateless) {
if (stateless) {
return opsCounters.entrySet()
return statsCounters.entrySet()
.stream()
.collect(Collectors.toUnmodifiableMap(e -> e.getKey().toString(), e -> e.getValue().sum()));
.collect(Collectors.toUnmodifiableMap(e -> e.getKey().toString(), e -> e.getValue().getEndpointStats()));
} else {
Map<String, Long> normalisedStats = Arrays.stream(Operation.values()).collect(Collectors.toMap(Operation::getKey, o -> 0L));
opsCounters.forEach(
Map<String, BlobStoreActionStats> normalisedStats = Arrays.stream(Operation.values())
.collect(Collectors.toMap(Operation::getKey, o -> BlobStoreActionStats.ZERO));
statsCounters.forEach(
(key, value) -> normalisedStats.compute(
key.operation.getKey(),
(k, current) -> Objects.requireNonNull(current) + value.sum()
(k, current) -> value.getEndpointStats().add(Objects.requireNonNull(current))
)
);
return Map.copyOf(normalisedStats);
Expand All @@ -766,13 +779,14 @@ Map<String, Long> statsMap(boolean stateless) {

public void onRequestComplete(Operation operation, OperationPurpose purpose, AzureClientProvider.RequestMetrics requestMetrics) {
final StatsKey statsKey = new StatsKey(operation, purpose);
final LongAdder counter = opsCounters.computeIfAbsent(statsKey, k -> new LongAdder());
final StatsCounter counter = statsCounters.computeIfAbsent(statsKey, k -> new StatsCounter());
final Map<String, Object> attributes = opsAttributes.computeIfAbsent(
statsKey,
k -> RepositoriesMetrics.createAttributesMap(repositoryMetadata, purpose, operation.getKey())
);

counter.add(1);
counter.operations.increment();
counter.requests.add(requestMetrics.getRequestCount());

// range not satisfied is not retried, so we count them by checking the final response
if (requestMetrics.getStatusCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) {
Expand Down
Loading

0 comments on commit 7d43d8a

Please sign in to comment.