Skip to content

Commit

Permalink
Move TSC took-time policy to guard both heap and disk tier (#17190) (#…
Browse files Browse the repository at this point in the history
…17478)

* Move TSC took-time policy to guard both heap and disk tier



* changelog



* spotless apply



* Addressed Sagar's comment



* Add missing javadoc



* address round 2 of comments



* Add removal notification to put()



* Fix incorrect stats hit when cache entry rejected by policy



* rerun gradle



* Fixed more broken stats



* rerun gradle



* Addressed more comments



* make policy rejections count as neither hit or miss



* rerun gradle



* remove potential double-loading



* rerun gradle



* remove removalNotification



* rerun gradle



---------




(cherry picked from commit b1e66b3)

Signed-off-by: Peter Alfonsi <[email protected]>
Signed-off-by: Peter Alfonsi <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Peter Alfonsi <[email protected]>
  • Loading branch information
3 people authored Feb 28, 2025
1 parent ec52148 commit 427f220
Show file tree
Hide file tree
Showing 9 changed files with 579 additions and 208 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Convert transport-reactor-netty4 to use gradle version catalog [#17233](https://github.com/opensearch-project/OpenSearch/pull/17233)
- Increase force merge threads to 1/8th of cores [#17255](https://github.com/opensearch-project/OpenSearch/pull/17255)
- TieredSpilloverCache took-time threshold now guards heap tier as well as disk tier [#17190](https://github.com/opensearch-project/OpenSearch/pull/17190)

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testSanityChecksWithIndicesRequestCache() throws InterruptedExceptio
);
}

public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception {
public void testWithDynamicDiskTookTimePolicyWithMultiSegments() throws Exception {
int numberOfSegments = getNumberOfSegments();
int onHeapCacheSizePerSegmentInBytes = 800; // Per cache entry below is around ~700 bytes, so keeping this
// just a bit higher so that each segment can atleast hold 1 entry.
Expand All @@ -139,12 +139,13 @@ public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception {
)
.get()
);
// Set a very high value for took time policy so that no items evicted from onHeap cache are spilled
// Set a very high value for took time disk policy so that no items evicted from onHeap cache are spilled
// to disk. And then hit requests so that few items are cached into cache.
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
new TimeValue(100, TimeUnit.SECONDS)
)
.build()
Expand Down Expand Up @@ -182,12 +183,13 @@ public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception {
assertEquals(0, requestCacheStats.getHitCount());
long lastEvictionSeen = requestCacheStats.getEvictions();

// Decrease took time policy to zero so that disk cache also comes into play. Now we should be able
// Decrease disk took time policy to zero so that disk cache also comes into play. Now we should be able
// to cache all entries.
updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
new TimeValue(0, TimeUnit.MILLISECONDS)
)
.build()
Expand All @@ -206,7 +208,7 @@ public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception {
assertEquals(lastEvictionSeen, requestCacheStats.getEvictions());
}

public void testWithDynamicTookTimePolicy() throws Exception {
public void testWithDynamicHeapTookTimePolicy() throws Exception {
int onHeapCacheSizeInBytes = 2000;
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b", 1)).build());
Client client = client();
Expand All @@ -224,8 +226,7 @@ public void testWithDynamicTookTimePolicy() throws Exception {
)
.get()
);
// Step 1 : Set a very high value for took time policy so that no items evicted from onHeap cache are spilled
// to disk. And then hit requests so that few items are cached into cache.
// Set a high threshold for the overall cache took time policy so nothing will enter the cache.
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
Expand All @@ -245,6 +246,57 @@ public void testWithDynamicTookTimePolicy() throws Exception {
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
long perQuerySizeInCacheInBytes = -1;
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
.get();
assertSearchResponse(resp);
}
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
assertEquals(0, requestCacheStats.getEvictions());
}

public void testWithDynamicDiskTookTimePolicy() throws Exception {
int onHeapCacheSizeInBytes = 2000;
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b", 1)).build());
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("k", "type=keyword")
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.refresh_interval", -1)
)
.get()
);
// Step 1 : Set a very high value for disk took time policy so that no items evicted from onHeap cache are spilled
// to disk. And then hit requests so that few items are cached into cache.
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
new TimeValue(100, TimeUnit.SECONDS)
)
.build()
);
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());
int numberOfIndexedItems = randomIntBetween(6, 10);
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator));
}
ensureSearchable("index");
refreshAndWaitForReplication();
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
long perQuerySizeInCacheInBytes = -1;
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
Expand Down Expand Up @@ -282,12 +334,13 @@ public void testWithDynamicTookTimePolicy() throws Exception {
assertEquals(0, requestCacheStats.getHitCount());
long lastEvictionSeen = requestCacheStats.getEvictions();

// Step 3: Decrease took time policy to zero so that disk cache also comes into play. Now we should be able
// Step 3: Decrease disk took time policy to zero so that disk cache also comes into play. Now we should be able
// to cache all entries.
updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
new TimeValue(0, TimeUnit.MILLISECONDS)
)
.build()
Expand Down Expand Up @@ -352,11 +405,12 @@ public void testInvalidationWithIndicesRequestCache() throws Exception {
)
.get()
);
// Update took time policy to zero so that all entries are eligible to be cached on disk.
// Update disk took time policy to zero so that all entries are eligible to be cached on disk.
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
new TimeValue(0, TimeUnit.MILLISECONDS)
)
.build()
Expand Down Expand Up @@ -437,11 +491,12 @@ public void testWithExplicitCacheClear() throws Exception {
)
.get()
);
// Update took time policy to zero so that all entries are eligible to be cached on disk.
// Update disk took time policy to zero so that all entries are eligible to be cached on disk.
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
new TimeValue(0, TimeUnit.MILLISECONDS)
)
.build()
Expand Down Expand Up @@ -512,11 +567,12 @@ public void testWithDynamicDiskCacheSetting() throws Exception {
)
.get()
);
// Update took time policy to zero so that all entries are eligible to be cached on disk.
// Update disk took time policy to zero so that all entries are eligible to be cached on disk.
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
new TimeValue(0, TimeUnit.MILLISECONDS)
)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
* Test aggregating by indices
*/
public void testIndicesLevelAggregation() throws Exception {
internalCluster().startNodes(
1,
Settings.builder()
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, 1))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
)
.build()
);
startNodesDefaultSettings();
Client client = client();
Map<String, Integer> values = setupCacheForAggregationTests(client);

Expand Down Expand Up @@ -115,16 +106,7 @@ public void testIndicesLevelAggregation() throws Exception {
* Test aggregating by indices and tier
*/
public void testIndicesAndTierLevelAggregation() throws Exception {
internalCluster().startNodes(
1,
Settings.builder()
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, 1))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
)
.build()
);
startNodesDefaultSettings();
Client client = client();
Map<String, Integer> values = setupCacheForAggregationTests(client);

Expand Down Expand Up @@ -195,16 +177,7 @@ public void testIndicesAndTierLevelAggregation() throws Exception {
* Test aggregating by tier only
*/
public void testTierLevelAggregation() throws Exception {
internalCluster().startNodes(
1,
Settings.builder()
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, 1))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
)
.build()
);
startNodesDefaultSettings();
Client client = client();
Map<String, Integer> values = setupCacheForAggregationTests(client);
// Get values for tiers alone and check they add correctly across indices
Expand Down Expand Up @@ -236,16 +209,7 @@ public void testTierLevelAggregation() throws Exception {
}

public void testInvalidLevelsAreIgnored() throws Exception {
internalCluster().startNodes(
1,
Settings.builder()
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, getNumberOfSegments()))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
)
.build()
);
startNodesDefaultSettings();
Client client = client();
Map<String, Integer> values = setupCacheForAggregationTests(client);

Expand Down Expand Up @@ -287,16 +251,7 @@ public void testInvalidLevelsAreIgnored() throws Exception {
* Check the new stats API returns the same values as the old stats API.
*/
public void testStatsMatchOldApi() throws Exception {
internalCluster().startNodes(
1,
Settings.builder()
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, getNumberOfSegments()))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
)
.build()
);
startNodesDefaultSettings();
String index = "index";
Client client = client();
startIndex(client, index);
Expand Down Expand Up @@ -354,7 +309,12 @@ public void testStatsWithMultipleSegments() throws Exception {
.put(defaultSettings(heap_cache_size_per_segment * numberOfSegments + "B", numberOfSegments))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
TimeValue.ZERO
)
.put(
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
TimeValue.ZERO
)
.build()
);
Expand Down Expand Up @@ -429,6 +389,11 @@ public void testClosingShard() throws Exception {
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
)
.put(
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
new TimeValue(0, TimeUnit.SECONDS)
)
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
);
Expand Down Expand Up @@ -631,4 +596,22 @@ private static ImmutableCacheStatsHolder getNodeCacheStatsResult(Client client,
NodeCacheStats ncs = nodeStatsResponse.getNodes().get(0).getNodeCacheStats();
return ncs.getStatsByCache(CacheType.INDICES_REQUEST_CACHE);
}

private void startNodesDefaultSettings() {
internalCluster().startNodes(
1,
Settings.builder()
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, 1))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
TimeValue.ZERO
)
.put(
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
.getKey(),
TimeValue.ZERO
)
.build()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,14 @@

package org.opensearch.cache.common.policy;

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;

import java.util.function.Function;
import java.util.function.Predicate;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;

/**
* A cache tier policy which accepts queries whose took time is greater than some threshold.
* The threshold should be set to approximately the time it takes to get a result from the cache tier.
Expand All @@ -46,20 +44,20 @@ public class TookTimePolicy<V> implements Predicate<V> {
* @param threshold the threshold
* @param cachedResultParser the function providing policy values
* @param clusterSettings cluster settings
* @param cacheType cache type
* @param targetSetting the cluster setting to register a consumer with
*/
public TookTimePolicy(
TimeValue threshold,
Function<V, CachedQueryResult.PolicyValues> cachedResultParser,
ClusterSettings clusterSettings,
CacheType cacheType
Setting<TimeValue> targetSetting
) {
if (threshold.compareTo(TimeValue.ZERO) < 0) {
throw new IllegalArgumentException("Threshold for TookTimePolicy must be >= 0ms but was " + threshold.getStringRep());
}
this.threshold = threshold;
this.cachedResultParser = cachedResultParser;
clusterSettings.addSettingsUpdateConsumer(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType), this::setThreshold);
clusterSettings.addSettingsUpdateConsumer(targetSetting, this::setThreshold);
}

private void setThreshold(TimeValue threshold) {
Expand All @@ -72,6 +70,10 @@ private void setThreshold(TimeValue threshold) {
* @return whether to admit the data
*/
public boolean test(V data) {
if (threshold.equals(TimeValue.ZERO)) {
// Skip parsing the took time if this threshold is zero.
return true;
}
long tookTimeNanos;
try {
tookTimeNanos = cachedResultParser.apply(data).getTookTimeNanos();
Expand Down
Loading

0 comments on commit 427f220

Please sign in to comment.