Skip to content

Commit

Permalink
Optimize elasticsearch query performance by using _mGet and physica…
Browse files Browse the repository at this point in the history
…l index name (apache#9339)

Optimize ElasticSearch query performance by using `_mGet` and physical index name rather than alias in these scenarios:
(a) Metrics aggregation 
(b) Zipkin query
(c) Metrics query
(d) Log query
  • Loading branch information
wankai123 authored Jul 14, 2022
1 parent 63636fb commit 14800cb
Show file tree
Hide file tree
Showing 14 changed files with 291 additions and 71 deletions.
1 change: 1 addition & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* Remove legacy OAL `percentile` functions, `p99`, `p95`, `p90`, `p75`, `p50` func(s).
* Revert [#8066](https://github.com/apache/skywalking/pull/8066). Keep all metrics persistent even it is default value.
* Skip loading UI templates if folder is empty or doesn't exist.
* Optimize ElasticSearch query performance by using `_mGet` and physical index name rather than alias in these scenarios, (a) Metrics aggregation (b) Zipkin query (c) Metrics query (d) Log query
* Support the `NETWORK` type of eBPF Profiling task.
* Support `sumHistogram` in `MAL`.
* [Breaking Change] Make the eBPF Profiling task support to the service instance level, index/table `ebpf_profiling_task` is required to be re-created when bump up from previous releases.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -33,12 +35,13 @@
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.response.Documents;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.library.elasticsearch.ElasticSearch;
import org.apache.skywalking.library.elasticsearch.ElasticSearchBuilder;
import org.apache.skywalking.library.elasticsearch.ElasticSearchVersion;
import org.apache.skywalking.library.elasticsearch.bulk.BulkProcessor;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
import org.apache.skywalking.library.elasticsearch.response.Document;
Expand Down Expand Up @@ -300,7 +303,30 @@ public boolean existDoc(String indexName, String id) {
return es.get().documents().exists(indexName, TYPE, id);
}

public SearchResponse ids(String indexName, Iterable<String> ids) {

/**
* Provide to get documents from multi indices by IDs.
* @param indexIds key: indexName, value: ids list
* @return Documents
* @since 9.2.0
*/
public Optional<Documents> ids(Map<String, List<String>> indexIds) {
Map<String, List<String>> map = new HashMap<>();
indexIds.forEach((indexName, ids) -> {
map.put(indexNameConverter.apply(indexName), ids);
});
return es.get().documents().mGet(TYPE, map);
}

/**
* Search by ids with index alias, when can not locate the physical index.
* Otherwise, recommend use method {@link #ids}
* @param indexName Index alias name or physical name
* @param ids ID list
* @return SearchResponse
* @since 9.2.0 this method was ids
*/
public SearchResponse searchIDs(String indexName, Iterable<String> ids) {
indexName = indexNameConverter.apply(indexName);

return es.get().search(Search.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.util.Exceptions;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -32,6 +33,7 @@
import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
import org.apache.skywalking.library.elasticsearch.requests.UpdateRequest;
import org.apache.skywalking.library.elasticsearch.response.Document;
import org.apache.skywalking.library.elasticsearch.response.Documents;

@Slf4j
@RequiredArgsConstructor
Expand Down Expand Up @@ -83,6 +85,35 @@ public Optional<Document> get(String index, String type, String id) {
return future.get();
}

@SneakyThrows
public Optional<Documents> mGet(String type, Map<String, List<String>> indexIds) {
final CompletableFuture<Optional<Documents>> future =
version.thenCompose(
v -> client.execute(v.requestFactory().document().mget(type, indexIds))
.aggregate().thenApply(response -> {
if (response.status() != HttpStatus.OK) {
throw new RuntimeException(response.contentUtf8());
}

try (final HttpData content = response.content();
final InputStream is = content.toInputStream()) {
return Optional.of(v.codec().decode(is, Documents.class));
} catch (Exception e) {
return Exceptions.throwUnsafely(e);
}
}));
future.whenComplete((result, exception) -> {
if (exception != null) {
log.error("Failed to get doc by indexIds {}", indexIds, exception);
return;
}
if (log.isDebugEnabled()) {
log.debug("Docs by indexIds {}: {}", indexIds, result);
}
});
return future.get();
}

@SneakyThrows
public void index(IndexRequest request, Map<String, Object> params) {
final CompletableFuture<Void> future = version.thenCompose(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.skywalking.library.elasticsearch.requests.factory;

import com.linecorp.armeria.common.HttpRequest;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
import org.apache.skywalking.library.elasticsearch.requests.UpdateRequest;
Expand All @@ -39,6 +40,11 @@ public interface DocumentFactory {
*/
HttpRequest mget(String index, String type, Iterable<String> ids);

/**
* Returns a request to get multiple documents of {@code indexIds}.
*/
HttpRequest mget(final String type, final Map<String, List<String>> indexIds);

/**
* Returns a request to index a document with {@link IndexRequest}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpRequestBuilder;
import com.linecorp.armeria.common.MediaType;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
Expand Down Expand Up @@ -91,6 +94,30 @@ public HttpRequest mget(String index, String type, Iterable<String> ids) {
.build();
}

@SneakyThrows
@Override
public HttpRequest mget(final String type, final Map<String, List<String>> indexIds) {
checkArgument(!isNullOrEmpty(type), "type cannot be null or empty");
checkArgument(indexIds != null && !indexIds.isEmpty(), "ids cannot be null or empty");
final List<Map<String, String>> indexIdList = new ArrayList<>();
indexIds.forEach((index, ids) -> {
checkArgument(ids != null && !isEmpty(ids), "ids cannot be null or empty");
ids.forEach(id -> {
indexIdList.add(ImmutableMap.of("_index", index, "_type", type, "_id", id));
});
});
final Map<String, Iterable<Map<String, String>>> m = ImmutableMap.of("docs", indexIdList);
final byte[] content = version.codec().encode(m);
if (log.isDebugEnabled()) {
log.debug("mget indexIds request: {}", new String(content, Charset.defaultCharset()));
}

return HttpRequest.builder()
.get("/_mget")
.content(MediaType.JSON, content)
.build();
}

@SneakyThrows
@Override
public HttpRequest index(IndexRequest request, Map<String, ?> params) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpRequestBuilder;
import com.linecorp.armeria.common.MediaType;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
Expand Down Expand Up @@ -88,6 +91,30 @@ public HttpRequest mget(String index, String type, Iterable<String> ids) {
.build();
}

@SneakyThrows
@Override
public HttpRequest mget(final String type, final Map<String, List<String>> indexIds) {
checkArgument(!isNullOrEmpty(type), "type cannot be null or empty");
checkArgument(indexIds != null && !indexIds.isEmpty(), "ids cannot be null or empty");
final List<Map<String, String>> indexIdList = new ArrayList<>();
indexIds.forEach((index, ids) -> {
checkArgument(ids != null && !isEmpty(ids), "ids cannot be null or empty");
ids.forEach(id -> {
indexIdList.add(ImmutableMap.of("_index", index, "_id", id));
});
});
final Map<String, Iterable<Map<String, String>>> m = ImmutableMap.of("docs", indexIdList);
final byte[] content = version.codec().encode(m);
if (log.isDebugEnabled()) {
log.debug("mget indexIds request: {}", new String(content, Charset.defaultCharset()));
}

return HttpRequest.builder()
.get("/_doc/_mget")
.content(MediaType.JSON, content)
.build();
}

@SneakyThrows
@Override
public HttpRequest index(IndexRequest request, Map<String, ?> params) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.library.elasticsearch.client.TemplateClient;
import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.Aggregation;
import org.apache.skywalking.library.elasticsearch.response.Document;
import org.apache.skywalking.library.elasticsearch.response.Documents;
import org.apache.skywalking.library.elasticsearch.response.IndexTemplate;
import org.apache.skywalking.library.elasticsearch.response.Mappings;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
Expand Down Expand Up @@ -273,6 +277,17 @@ public void testSearch() {
.get("buckets")
).size()
);

//test mGet
Map<String, List<String>> indexIdsGroup = new HashMap<>();
indexIdsGroup.put("test-index", Arrays.asList("id1", "id2"));
Optional<Documents> documents = client.documents().mGet(type, indexIdsGroup);
Map<String, Map<String, Object>> result = new HashMap<>();
for (final Document document : documents.get()) {
result.put(document.getId(), document.getSource());
}
assertEquals(2, result.get("id1").size());
assertEquals(2, result.get("id2").size());
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.library.elasticsearch.response.Documents;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
Expand All @@ -35,11 +38,8 @@
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.IndicesMetadataCache;
import org.joda.time.DateTime;

import static java.util.stream.Collectors.groupingBy;

@Slf4j
public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
protected final StorageBuilder<Metrics> storageBuilder;
Expand All @@ -52,50 +52,48 @@ protected MetricsEsDAO(ElasticSearchClient client,

@Override
public List<Metrics> multiGet(Model model, List<Metrics> metrics) {
Map<String, List<Metrics>> groupIndices
= metrics.stream()
.collect(
groupingBy(metric -> {
if (model.isTimeRelativeID()) {
// Try to use with timestamp index name(write index),
// if latest cache shows this name doesn't exist,
// then fail back to template alias name.
// This should only happen in very rare case, such as this is the time to create new index
// as a new day comes, and the index cache is pseudo real time.
// This case doesn't affect the result, just has lower performance due to using the alias name.
// Another case is that a removed index showing existing also due to latency,
// which could cause multiGet fails
// but this should not happen in the real runtime, TTL timer only removed the oldest indices,
// which should not have an update/insert.
String indexName = TimeSeriesUtils.writeIndexName(model, metric.getTimeBucket());
// Format the name to follow the global physical index naming policy.
if (!IndicesMetadataCache.INSTANCE.isExisting(
getClient().formatIndexName(indexName))) {
indexName = IndexController.INSTANCE.getTableName(model);
}
return indexName;
} else {
// Metadata level metrics, always use alias name, due to the physical index of the records
// can't be located through timestamp.
return IndexController.INSTANCE.getTableName(model);
}
})
);

// The groupIndices mostly include one or two group,
// the current day and the T-1 day(if at the edge between days)
Map<String, List<Metrics>> groupIndices = new HashMap<>();
List<Metrics> result = new ArrayList<>(metrics.size());
groupIndices.forEach((tableName, metricList) -> {
List<String> ids = metricList.stream()
.map(item -> IndexController.INSTANCE.generateDocId(model, item.id()))
.collect(Collectors.toList());
final SearchResponse response = getClient().ids(tableName, ids);
response.getHits().getHits().forEach(hit -> {
Metrics source = storageBuilder.storage2Entity(new HashMapConverter.ToEntity(hit.getSource()));
result.add(source);

if (model.isTimeRelativeID()) {
metrics.forEach(metric -> {
// Try to use with timestamp index name(write index),
String indexName = TimeSeriesUtils.writeIndexName(model, metric.getTimeBucket());
groupIndices.computeIfAbsent(indexName, v -> new ArrayList<>()).add(metric);
});
});

Map<String, List<String>> indexIdsGroup = new HashMap<>();
groupIndices.forEach((tableName, metricList) -> {
List<String> ids = metricList.stream()
.map(item -> IndexController.INSTANCE.generateDocId(model, item.id()))
.collect(Collectors.toList());
indexIdsGroup.put(tableName, ids);
});
if (!indexIdsGroup.isEmpty()) {
final Optional<Documents> response = getClient().ids(indexIdsGroup);
response.ifPresent(documents -> documents.forEach(document -> {
Metrics source = storageBuilder.storage2Entity(new HashMapConverter.ToEntity(document.getSource()));
result.add(source);
}));
}
} else {
metrics.forEach(metric -> {
// Metadata level metrics, always use alias name, due to the physical index of the records
// can't be located through timestamp.
String indexName = IndexController.INSTANCE.getTableName(model);
groupIndices.computeIfAbsent(indexName, v -> new ArrayList<>()).add(metric);
});
groupIndices.forEach((tableName, metricList) -> {
List<String> ids = metricList.stream()
.map(item -> IndexController.INSTANCE.generateDocId(model, item.id()))
.collect(Collectors.toList());
final SearchResponse response = getClient().searchIDs(tableName, ids);
response.getHits().getHits().forEach(hit -> {
Metrics source = storageBuilder.storage2Entity(new HashMapConverter.ToEntity(hit.getSource()));
result.add(source);
});
});
}
return result;
}

Expand Down
Loading

0 comments on commit 14800cb

Please sign in to comment.