Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiramisu Splitting Up Work] Tiramisu disk tier policies #23

Open
wants to merge 18 commits into
base: tiramisu-stats
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;

Expand All @@ -54,7 +55,8 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public interface BytesReference extends Comparable<BytesReference>, ToXContentFragment {
public interface BytesReference extends Comparable<BytesReference>, ToXContentFragment, Serializable {
// TODO: Remove "Serializable" once we merge in the serializer PR!

/**
* Convert an {@link XContentBuilder} into a BytesReference. This method closes the builder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.cache.tier.DiskTierTookTimePolicy;
import org.opensearch.common.cache.tier.TierType;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.cache.request.ShardRequestCache;
Expand All @@ -54,7 +56,9 @@ public class IndicesRequestCacheDiskTierIT extends OpenSearchIntegTestCase {
public void testDiskTierStats() throws Exception {
int heapSizeBytes = 4729;
String node = internalCluster().startNode(
Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes))
Settings.builder()
.put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes))
.put(DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING.getKey(), TimeValue.ZERO) // allow into disk cache regardless of took time
);
Client client = client(node);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.common.cache.tier;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;

import java.io.IOException;

/**
* A class containing information needed for all CacheTierPolicy objects to decide whether to admit
* a given BytesReference. This spares us from having to create an entire short-lived QuerySearchResult object
* just to read a few values.
*/
public class CachePolicyInfoWrapper implements Writeable {
private final Long tookTimeNanos;
public CachePolicyInfoWrapper(Long tookTimeNanos) {
this.tookTimeNanos = tookTimeNanos;
// Add more values here as they are needed for future cache tier policies
}

public CachePolicyInfoWrapper(StreamInput in) throws IOException {
this.tookTimeNanos = in.readOptionalLong();
}

public Long getTookTimeNanos() {
return tookTimeNanos;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalLong(tookTimeNanos);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.common.cache.tier;

public interface CacheTierPolicy<T> {
/**
* Determines whether this policy allows the data into its cache tier.
* @param data The data to check
* @return true if accepted, otherwise false
*/
boolean checkData(T data);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.common.cache.tier;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.common.cache.tier.CacheTierPolicy;
import org.opensearch.search.query.QuerySearchResult;

import java.io.IOException;
import java.util.function.Function;

/**
* A cache tier policy which accepts queries whose took time is greater than some threshold,
* which is specified as a dynamic cluster-level setting. The threshold should be set to approximately
* the time it takes to get a result from the cache tier.
* The policy expects to be able to read a CachePolicyInfoWrapper from the start of the BytesReference.
*/
public class DiskTierTookTimePolicy implements CacheTierPolicy<BytesReference> {
public static final Setting<TimeValue> DISK_TOOKTIME_THRESHOLD_SETTING = Setting.positiveTimeSetting(
"indices.requests.cache.disk.tooktime.threshold",
new TimeValue(10),
Setting.Property.Dynamic,
Setting.Property.NodeScope
); // Set this to TimeValue.ZERO to let all data through

private TimeValue threshold;
private final Function<BytesReference, CachePolicyInfoWrapper> getPolicyInfoFn;

public DiskTierTookTimePolicy(Settings settings, ClusterSettings clusterSettings, Function<BytesReference, CachePolicyInfoWrapper> getPolicyInfoFn) {
this.threshold = DISK_TOOKTIME_THRESHOLD_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(DISK_TOOKTIME_THRESHOLD_SETTING, this::setThreshold);
this.getPolicyInfoFn = getPolicyInfoFn;
}

protected void setThreshold(TimeValue threshold) { // protected so that we can manually set value in unit test
this.threshold = threshold;
}

@Override
public boolean checkData(BytesReference data) {
if (threshold.equals(TimeValue.ZERO)) {
return true;
}
Long tookTimeNanos;
try {
tookTimeNanos = getPolicyInfoFn.apply(data).getTookTimeNanos();
} catch (Exception e) {
// If we can't retrieve the took time for whatever reason, admit the data to be safe
return true;
}
if (tookTimeNanos == null) {
// Received a null took time -> this QSR is from an old version which does not have took time, we should accept it
return true;
}
TimeValue tookTime = TimeValue.timeValueNanos(tookTimeNanos);
if (tookTime.compareTo(threshold) < 0) { // negative -> tookTime is shorter than threshold
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@

package org.opensearch.common.cache.tier;

import org.opensearch.common.cache.Cache;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.indices.IndicesRequestCache;
import org.opensearch.search.query.QuerySearchResult;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
Expand All @@ -38,13 +43,15 @@ public class TieredCacheSpilloverStrategyService<K, V> implements TieredCacheSer
* Maintains caching tiers in order of get calls.
*/
private final List<CachingTier<K, V>> cachingTierList;
private final List<CacheTierPolicy<V>> policies;

private TieredCacheSpilloverStrategyService(Builder<K, V> builder) {
this.onHeapCachingTier = Objects.requireNonNull(builder.onHeapCachingTier);
this.diskCachingTier = Optional.ofNullable(builder.diskCachingTier);
this.tieredCacheEventListener = Objects.requireNonNull(builder.tieredCacheEventListener);
this.cachingTierList = this.diskCachingTier.map(diskTier -> Arrays.asList(onHeapCachingTier, diskTier))
.orElse(List.of(onHeapCachingTier));
this.policies = Objects.requireNonNull(builder.policies);
setRemovalListeners();
}

Expand Down Expand Up @@ -130,10 +137,12 @@ public void onRemoval(RemovalNotification<K, V> notification) {
if (RemovalReason.EVICTED.equals(notification.getRemovalReason())) {
switch (notification.getTierType()) {
case ON_HEAP:
diskCachingTier.ifPresent(diskTier -> {
diskTier.put(notification.getKey(), notification.getValue());
tieredCacheEventListener.onCached(notification.getKey(), notification.getValue(), TierType.DISK);
});
if (checkPolicies(notification.getValue())) {
diskCachingTier.ifPresent(diskTier -> {
diskTier.put(notification.getKey(), notification.getValue());
tieredCacheEventListener.onCached(notification.getKey(), notification.getValue(), TierType.DISK);
});
}
break;
default:
break;
Expand All @@ -152,6 +161,15 @@ public Optional<DiskCachingTier<K, V>> getDiskCachingTier() {
return this.diskCachingTier;
}

boolean checkPolicies(V value) {
for (CacheTierPolicy<V> policy : policies) {
if (!policy.checkData(value)) {
return false;
}
}
return true;
}

/**
* Register this service as a listener to removal events from different caching tiers.
*/
Expand Down Expand Up @@ -190,6 +208,7 @@ public static class Builder<K, V> {
private OnHeapCachingTier<K, V> onHeapCachingTier;
private DiskCachingTier<K, V> diskCachingTier;
private TieredCacheEventListener<K, V> tieredCacheEventListener;
private ArrayList<CacheTierPolicy<V>> policies = new ArrayList<>();

public Builder() {}

Expand All @@ -208,6 +227,17 @@ public Builder<K, V> setTieredCacheEventListener(TieredCacheEventListener<K, V>
return this;
}

public Builder<K, V> withPolicy(CacheTierPolicy<V> policy) {
this.policies.add(policy);
return this;
}

// Add multiple policies at once
public Builder<K, V> withPolicies(List<CacheTierPolicy<V>> policiesList) {
this.policies.addAll(policiesList);
return this;
}

public TieredCacheSpilloverStrategyService<K, V> build() {
return new TieredCacheSpilloverStrategyService<K, V>(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.cache.tier.DiskTierTookTimePolicy;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.network.NetworkService;
Expand Down Expand Up @@ -675,7 +676,10 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,

// Tiered caching
DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.common.CheckedSupplier;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.tier.CachePolicyInfoWrapper;
import org.opensearch.common.cache.tier.DiskTierTookTimePolicy;
import org.opensearch.common.cache.tier.BytesReferenceSerializer;
import org.opensearch.common.cache.tier.CacheValue;
import org.opensearch.common.cache.tier.EhCacheDiskCachingTier;
Expand All @@ -51,6 +53,7 @@
import org.opensearch.common.cache.tier.TieredCacheService;
import org.opensearch.common.cache.tier.TieredCacheSpilloverStrategyService;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
Expand All @@ -61,6 +64,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.search.query.QuerySearchResult;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -71,6 +75,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;

/**
* The indices request cache allows to cache a shard level request stage responses, helping with improving
Expand Down Expand Up @@ -120,7 +125,7 @@ public final class IndicesRequestCache implements TieredCacheEventListener<Indic
private final IndicesService indicesService;
private final Settings settings;

IndicesRequestCache(Settings settings, IndicesService indicesService) {
IndicesRequestCache(Settings settings, IndicesService indicesService, ClusterSettings clusterSettings) {
this.size = INDICES_CACHE_QUERY_SIZE.get(settings);
this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null;
long sizeInBytes = size.getBytes();
Expand All @@ -133,6 +138,7 @@ public final class IndicesRequestCache implements TieredCacheEventListener<Indic
).setMaximumWeight(sizeInBytes).setExpireAfterAccess(expire).build();



// Initialize tiered cache service.
TieredCacheSpilloverStrategyService.Builder<Key, BytesReference> tieredCacheServiceBuilder =
new TieredCacheSpilloverStrategyService.Builder<Key, BytesReference>()
Expand All @@ -142,6 +148,16 @@ public final class IndicesRequestCache implements TieredCacheEventListener<Indic

EhCacheDiskCachingTier<Key, BytesReference> ehcacheDiskTier = createNewDiskTier();
tieredCacheServiceBuilder.setOnDiskCachingTier(ehcacheDiskTier);

// Function to allow took-time policy to inspect took time on cached data.
Function<BytesReference, CachePolicyInfoWrapper> transformationFunction = (data) -> {
try {
return getPolicyInfo(data);
} catch (IOException e) {
throw new RuntimeException(e);
}
};
tieredCacheServiceBuilder.withPolicy(new DiskTierTookTimePolicy(settings, clusterSettings, transformationFunction));
tieredCacheService = tieredCacheServiceBuilder.build();
}

Expand Down Expand Up @@ -223,6 +239,12 @@ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference
tieredCacheService.invalidate(new Key(cacheEntity, cacheKey, readerCacheKeyId));
}

public static CachePolicyInfoWrapper getPolicyInfo(BytesReference data) throws IOException {
// Reads the policy info corresponding to this QSR, written in IndicesService$loadIntoContext,
// without having to create a potentially large short-lived QSR object just for this purpose
return new CachePolicyInfoWrapper(data.streamInput());
}

/**
* Loader for the request cache
*
Expand Down
Loading