diff --git a/CHANGELOG.md b/CHANGELOG.md index 50922b85a0c0d..7db82b23a256b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -82,15 +82,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added -- Add coordinator level stats for search latency ([#8386](https://github.com/opensearch-project/OpenSearch/issues/8386)) -- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681)) -- Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694)) -- [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666)) -- Async blob read support for encrypted containers ([#10131](https://github.com/opensearch-project/OpenSearch/pull/10131)) -- Add capability to restrict async durability mode for remote indexes ([#10189](https://github.com/opensearch-project/OpenSearch/pull/10189)) -- Add Doc Status Counter for Indexing Engine ([#4562](https://github.com/opensearch-project/OpenSearch/issues/4562)) -- Add unreferenced file cleanup count to merge stats ([#10204](https://github.com/opensearch-project/OpenSearch/pull/10204)) -- [Remote Store] Add support to restrict creation & deletion if system repository and mutation of immutable settings of system repository ([#9839](https://github.com/opensearch-project/OpenSearch/pull/9839)) +- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351)) +- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567)) +- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255)) +- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352)) +- [Remote cluster state] Make index and global metadata upload timeout dynamic cluster settings ([#10814](https://github.com/opensearch-project/OpenSearch/pull/10814)) +- Added cluster setting cluster.restrict.index.replication_type to restrict setting of index setting replication type ([#10866](https://github.com/opensearch-project/OpenSearch/pull/10866)) +- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670)) +- [Tiered caching] Framework changes ([#10753](https://github.com/opensearch-project/OpenSearch/pull/10753) ### Dependencies - Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575)) @@ -143,4 +142,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x \ No newline at end of file +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 98a22717019cf..a1815d9be2daf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -634,6 +634,45 @@ public void testProfileDisableCache() throws Exception { } } + public void testCacheWithInvalidation() throws Exception { + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + .get() + ); + indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); + ensureSearchable("index"); + SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + OpenSearchAssertions.assertAllSuccessful(resp); + assertThat(resp.getHits().getTotalHits().value, equalTo(1L)); + + assertCacheState(client, "index", 0, 1); + // Index but don't refresh + indexRandom(false, client.prepareIndex("index").setSource("k", "hello2")); + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + // Should expect hit as here as refresh didn't happen + assertCacheState(client, "index", 1, 1); + + // Explicit refresh would invalidate cache + refresh(); + // Hit same query again + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + // Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh) + assertCacheState(client, "index", 1, 2); + } + private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) { RequestCacheStats requestCacheStats = client.admin() .indices() @@ -648,6 +687,7 @@ private static void assertCacheState(Client client, String index, long expectedH Arrays.asList(expectedHits, expectedMisses, 0L), Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions()) ); + } } diff --git a/server/src/main/java/org/opensearch/common/cache/Cache.java b/server/src/main/java/org/opensearch/common/cache/Cache.java index 0ebef1556424b..c7c28e13496e5 100644 --- a/server/src/main/java/org/opensearch/common/cache/Cache.java +++ b/server/src/main/java/org/opensearch/common/cache/Cache.java @@ -422,68 +422,74 @@ public V computeIfAbsent(K key, CacheLoader loader) throws ExecutionExcept } }); if (value == null) { - // we need to synchronize loading of a value for a given key; however, holding the segment lock while - // invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we - // need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding - // the segment lock; to do this, we atomically put a future in the map that can load the value, and then - // get the value from this future on the thread that won the race to place the future into the segment map - CacheSegment segment = getCacheSegment(key); - CompletableFuture> future; - CompletableFuture> completableFuture = new CompletableFuture<>(); + value = compute(key, loader); + } + return value; + } - try (ReleasableLock ignored = segment.writeLock.acquire()) { - future = segment.map.putIfAbsent(key, completableFuture); - } + public V compute(K key, CacheLoader loader) throws ExecutionException { + long now = now(); + // we need to synchronize loading of a value for a given key; however, holding the segment lock while + // invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we + // need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding + // the segment lock; to do this, we atomically put a future in the map that can load the value, and then + // get the value from this future on the thread that won the race to place the future into the segment map + CacheSegment segment = getCacheSegment(key); + CompletableFuture> future; + CompletableFuture> completableFuture = new CompletableFuture<>(); - BiFunction, Throwable, ? extends V> handler = (ok, ex) -> { - if (ok != null) { - try (ReleasableLock ignored = lruLock.acquire()) { - promote(ok, now); - } - return ok.value; - } else { - try (ReleasableLock ignored = segment.writeLock.acquire()) { - CompletableFuture> sanity = segment.map.get(key); - if (sanity != null && sanity.isCompletedExceptionally()) { - segment.map.remove(key); - } - } - return null; - } - }; + try (ReleasableLock ignored = segment.writeLock.acquire()) { + future = segment.map.putIfAbsent(key, completableFuture); + } - CompletableFuture completableValue; - if (future == null) { - future = completableFuture; - completableValue = future.handle(handler); - V loaded; - try { - loaded = loader.load(key); - } catch (Exception e) { - future.completeExceptionally(e); - throw new ExecutionException(e); - } - if (loaded == null) { - NullPointerException npe = new NullPointerException("loader returned a null value"); - future.completeExceptionally(npe); - throw new ExecutionException(npe); - } else { - future.complete(new Entry<>(key, loaded, now)); + BiFunction, Throwable, ? extends V> handler = (ok, ex) -> { + if (ok != null) { + try (ReleasableLock ignored = lruLock.acquire()) { + promote(ok, now); } + return ok.value; } else { - completableValue = future.handle(handler); + try (ReleasableLock ignored = segment.writeLock.acquire()) { + CompletableFuture> sanity = segment.map.get(key); + if (sanity != null && sanity.isCompletedExceptionally()) { + segment.map.remove(key); + } + } + return null; } + }; + CompletableFuture completableValue; + if (future == null) { + future = completableFuture; + completableValue = future.handle(handler); + V loaded; try { - value = completableValue.get(); - // check to ensure the future hasn't been completed with an exception - if (future.isCompletedExceptionally()) { - future.get(); // call get to force the exception to be thrown for other concurrent callers - throw new IllegalStateException("the future was completed exceptionally but no exception was thrown"); - } - } catch (InterruptedException e) { - throw new IllegalStateException(e); + loaded = loader.load(key); + } catch (Exception e) { + future.completeExceptionally(e); + throw new ExecutionException(e); } + if (loaded == null) { + NullPointerException npe = new NullPointerException("loader returned a null value"); + future.completeExceptionally(npe); + throw new ExecutionException(npe); + } else { + future.complete(new Entry<>(key, loaded, now)); + } + } else { + completableValue = future.handle(handler); + } + V value; + try { + value = completableValue.get(); + // check to ensure the future hasn't been completed with an exception + if (future.isCompletedExceptionally()) { + future.get(); // call get to force the exception to be thrown for other concurrent callers + throw new IllegalStateException("the future was completed exceptionally but no exception was thrown"); + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); } return value; } diff --git a/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java b/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java index 6d355b2122460..2152c4917b62d 100644 --- a/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java +++ b/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java @@ -32,6 +32,8 @@ package org.opensearch.common.cache; +import org.opensearch.common.cache.tier.TierType; + /** * Notification when an element is removed from the cache * @@ -42,11 +44,17 @@ public class RemovalNotification { private final K key; private final V value; private final RemovalReason removalReason; + private final TierType tierType; public RemovalNotification(K key, V value, RemovalReason removalReason) { + this(key, value, removalReason, TierType.ON_HEAP); + } + + public RemovalNotification(K key, V value, RemovalReason removalReason, TierType tierType) { this.key = key; this.value = value; this.removalReason = removalReason; + this.tierType = tierType; } public K getKey() { @@ -60,4 +68,8 @@ public V getValue() { public RemovalReason getRemovalReason() { return removalReason; } + + public TierType getTierType() { + return tierType; + } } diff --git a/server/src/main/java/org/opensearch/common/cache/tier/CachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/CachingTier.java new file mode 100644 index 0000000000000..48fd5deadc111 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/CachingTier.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +import org.opensearch.common.cache.RemovalListener; + +/** + * Caching tier interface. Can be implemented/extended by concrete classes to provide different flavors of cache like + * onHeap, disk etc. + * @param Type of key + * @param Type of value + */ +public interface CachingTier { + + V get(K key); + + void put(K key, V value); + + V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception; + + void invalidate(K key); + + V compute(K key, TieredCacheLoader loader) throws Exception; + + void setRemovalListener(RemovalListener removalListener); + + void invalidateAll(); + + Iterable keys(); + + int count(); + + TierType getTierType(); + + /** + * Force any outstanding size-based and time-based evictions to occur + */ + default void refresh() {} +} diff --git a/server/src/main/java/org/opensearch/common/cache/tier/DiskCachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/DiskCachingTier.java new file mode 100644 index 0000000000000..4db71b6378a02 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/DiskCachingTier.java @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +/** + * This is specific to disk caching tier and can be used to add methods which are specific to disk tier. + * @param Type of key + * @param Type of value + */ +public interface DiskCachingTier extends CachingTier { + +} diff --git a/server/src/main/java/org/opensearch/common/cache/tier/OnHeapCachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/OnHeapCachingTier.java new file mode 100644 index 0000000000000..127fa6ee8e6b3 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/OnHeapCachingTier.java @@ -0,0 +1,16 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +/** + * This is specific to onHeap caching tier and can be used to add methods which are specific to this tier. + * @param Type of key + * @param Type of value + */ +public interface OnHeapCachingTier extends CachingTier {} diff --git a/server/src/main/java/org/opensearch/common/cache/tier/OpenSearchOnHeapCache.java b/server/src/main/java/org/opensearch/common/cache/tier/OpenSearchOnHeapCache.java new file mode 100644 index 0000000000000..22d2f769507cf --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/OpenSearchOnHeapCache.java @@ -0,0 +1,136 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +import org.opensearch.common.cache.Cache; +import org.opensearch.common.cache.CacheBuilder; +import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.unit.TimeValue; + +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.function.ToLongBiFunction; + +/** + * This variant of on-heap cache uses OpenSearch custom cache implementation. + * @param Type of key + * @param Type of value + */ +public class OpenSearchOnHeapCache implements OnHeapCachingTier, RemovalListener { + + private final Cache cache; + private RemovalListener removalListener; + + private OpenSearchOnHeapCache(Builder builder) { + Objects.requireNonNull(builder.weigher); + CacheBuilder cacheBuilder = CacheBuilder.builder() + .setMaximumWeight(builder.maxWeightInBytes) + .weigher(builder.weigher) + .removalListener(this); + if (builder.expireAfterAcess != null) { + cacheBuilder.setExpireAfterAccess(builder.expireAfterAcess); + } + cache = cacheBuilder.build(); + } + + @Override + public void setRemovalListener(RemovalListener removalListener) { + this.removalListener = removalListener; + } + + @Override + public void invalidateAll() { + cache.invalidateAll(); + } + + @Override + public Iterable keys() { + return this.cache.keys(); + } + + @Override + public int count() { + return cache.count(); + } + + @Override + public TierType getTierType() { + return TierType.ON_HEAP; + } + + @Override + public V get(K key) { + return cache.get(key); + } + + @Override + public void put(K key, V value) { + cache.put(key, value); + } + + @Override + public V computeIfAbsent(K key, TieredCacheLoader loader) throws ExecutionException { + return cache.computeIfAbsent(key, key1 -> loader.load(key)); + } + + @Override + public void invalidate(K key) { + cache.invalidate(key); + } + + @Override + public V compute(K key, TieredCacheLoader loader) throws Exception { + return cache.compute(key, key1 -> loader.load(key)); + } + + @Override + public void refresh() { + cache.refresh(); + } + + @Override + public void onRemoval(RemovalNotification notification) { + removalListener.onRemoval(notification); + } + + /** + * Builder object + * @param Type of key + * @param Type of value + */ + public static class Builder { + private long maxWeightInBytes; + + private ToLongBiFunction weigher; + + private TimeValue expireAfterAcess; + + public Builder() {} + + public Builder setMaximumWeight(long sizeInBytes) { + this.maxWeightInBytes = sizeInBytes; + return this; + } + + public Builder setWeigher(ToLongBiFunction weigher) { + this.weigher = weigher; + return this; + } + + public Builder setExpireAfterAccess(TimeValue expireAfterAcess) { + this.expireAfterAcess = expireAfterAcess; + return this; + } + + public OpenSearchOnHeapCache build() { + return new OpenSearchOnHeapCache(this); + } + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/tier/TierType.java b/server/src/main/java/org/opensearch/common/cache/tier/TierType.java new file mode 100644 index 0000000000000..ca61b636c1dda --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/TierType.java @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +/** + * Tier types in cache. + */ +public enum TierType { + + ON_HEAP, + DISK; +} diff --git a/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheEventListener.java b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheEventListener.java new file mode 100644 index 0000000000000..05b59bf16b282 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheEventListener.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +import org.opensearch.common.cache.RemovalNotification; + +/** + * This can be used to listen to tiered caching events + * @param Type of key + * @param Type of value + */ +public interface TieredCacheEventListener { + + void onMiss(K key, TierType tierType); + + void onRemoval(RemovalNotification notification); + + void onHit(K key, V value, TierType tierType); + + void onCached(K key, V value, TierType tierType); +} diff --git a/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheLoader.java b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheLoader.java new file mode 100644 index 0000000000000..d720feade0609 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheLoader.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +/** + * Used to load value in tiered cache if not present. + * @param Type of key + * @param Type of value + */ +public interface TieredCacheLoader { + V load(K key) throws Exception; + + boolean isLoaded(); +} diff --git a/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheService.java b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheService.java new file mode 100644 index 0000000000000..31d58510206f0 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheService.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +import java.util.Optional; + +/** + * This service encapsulates all logic to write/fetch to/from appropriate tiers. Can be implemented with different + * flavors like spillover etc. + * @param Type of key + * @param Type of value + */ +public interface TieredCacheService { + + V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception; + + V get(K key); + + void invalidate(K key); + + void invalidateAll(); + + long count(); + + OnHeapCachingTier getOnHeapCachingTier(); + + Optional> getDiskCachingTier(); +} diff --git a/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyService.java b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyService.java new file mode 100644 index 0000000000000..153dbf9b330f5 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyService.java @@ -0,0 +1,228 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + +/** + * This service spillover the evicted items from upper tier to lower tier. For now, we are spilling the in-memory + * cache items to disk tier cache. + * @param Type of key + * @param Type of value + */ +public class TieredCacheSpilloverStrategyService implements TieredCacheService, RemovalListener { + + private final OnHeapCachingTier onHeapCachingTier; + + /** + * Optional in case tiered caching is turned off. + */ + private final Optional> diskCachingTier; + private final TieredCacheEventListener tieredCacheEventListener; + + /** + * Maintains caching tiers in order of get calls. + */ + private final List> cachingTierList; + + private TieredCacheSpilloverStrategyService(Builder builder) { + this.onHeapCachingTier = Objects.requireNonNull(builder.onHeapCachingTier); + this.diskCachingTier = Optional.ofNullable(builder.diskCachingTier); + this.tieredCacheEventListener = Objects.requireNonNull(builder.tieredCacheEventListener); + this.cachingTierList = this.diskCachingTier.map(diskTier -> Arrays.asList(onHeapCachingTier, diskTier)) + .orElse(List.of(onHeapCachingTier)); + setRemovalListeners(); + } + + /** + * This method logic is divided into 2 parts: + * 1. First check whether key is present or not in desired tier. If yes, return the value. + * 2. If the key is not present, then add the key/value pair to onHeap cache. + * @param key Key for lookup. + * @param loader Used to load value in case it is not present in any tier. + * @return value + * @throws Exception exception thrown + */ + @Override + public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception { + CacheValue cacheValue = getValueFromTierCache(true).apply(key); + if (cacheValue == null) { + // Add the value to the onHeap cache. Any items if evicted will be moved to lower tier. + V value = onHeapCachingTier.compute(key, loader); + tieredCacheEventListener.onCached(key, value, TierType.ON_HEAP); + return value; + } + return cacheValue.value; + } + + @Override + public V get(K key) { + CacheValue cacheValue = getValueFromTierCache(true).apply(key); + if (cacheValue == null) { + return null; + } + return cacheValue.value; + } + + /** + * First fetches the tier type which has this key. And then invalidate accordingly. + * @param key key to invalidate + */ + @Override + public void invalidate(K key) { + // We don't need to track hits/misses in this case. + CacheValue cacheValue = getValueFromTierCache(false).apply(key); + if (cacheValue != null) { + switch (cacheValue.source) { + case ON_HEAP: + onHeapCachingTier.invalidate(key); + break; + case DISK: + diskCachingTier.ifPresent(diskTier -> diskTier.invalidate(key)); + break; + default: + break; + } + } + } + + @Override + public void invalidateAll() { + for (CachingTier cachingTier : cachingTierList) { + cachingTier.invalidateAll(); + } + } + + /** + * Returns the total count of items present in all cache tiers. + * @return total count of items in cache + */ + @Override + public long count() { + long totalCount = 0; + for (CachingTier cachingTier : cachingTierList) { + totalCount += cachingTier.count(); + } + return totalCount; + } + + /** + * Called whenever an item is evicted from any cache tier. If the item was evicted from onHeap cache, it is moved + * to disk tier cache. In case it was evicted from disk tier cache, it will discarded. + * @param notification Contains info about the removal like reason, key/value etc. + */ + @Override + public void onRemoval(RemovalNotification notification) { + if (RemovalReason.EVICTED.equals(notification.getRemovalReason())) { + switch (notification.getTierType()) { + case ON_HEAP: + diskCachingTier.ifPresent(diskTier -> { + diskTier.put(notification.getKey(), notification.getValue()); + tieredCacheEventListener.onCached(notification.getKey(), notification.getValue(), TierType.DISK); + }); + break; + default: + break; + } + } + tieredCacheEventListener.onRemoval(notification); + } + + @Override + public OnHeapCachingTier getOnHeapCachingTier() { + return this.onHeapCachingTier; + } + + @Override + public Optional> getDiskCachingTier() { + return this.diskCachingTier; + } + + /** + * Register this service as a listener to removal events from different caching tiers. + */ + private void setRemovalListeners() { + for (CachingTier cachingTier : cachingTierList) { + cachingTier.setRemovalListener(this); + } + } + + private Function> getValueFromTierCache(boolean trackStats) { + return key -> { + for (CachingTier cachingTier : cachingTierList) { + V value = cachingTier.get(key); + if (value != null) { + if (trackStats) { + tieredCacheEventListener.onHit(key, value, cachingTier.getTierType()); + } + return new CacheValue<>(value, cachingTier.getTierType()); + } + if (trackStats) { + tieredCacheEventListener.onMiss(key, cachingTier.getTierType()); + } + } + return null; + }; + } + + /** + * Represents a cache value along with its associated tier type where it is stored. + * @param Type of value. + */ + public static class CacheValue { + V value; + TierType source; + + CacheValue(V value, TierType source) { + this.value = value; + this.source = source; + } + } + + /** + * Builder object + * @param Type of key + * @param Type of value + */ + public static class Builder { + private OnHeapCachingTier onHeapCachingTier; + private DiskCachingTier diskCachingTier; + private TieredCacheEventListener tieredCacheEventListener; + + public Builder() {} + + public Builder setOnHeapCachingTier(OnHeapCachingTier onHeapCachingTier) { + this.onHeapCachingTier = onHeapCachingTier; + return this; + } + + public Builder setOnDiskCachingTier(DiskCachingTier diskCachingTier) { + this.diskCachingTier = diskCachingTier; + return this; + } + + public Builder setTieredCacheEventListener(TieredCacheEventListener tieredCacheEventListener) { + this.tieredCacheEventListener = tieredCacheEventListener; + return this; + } + + public TieredCacheSpilloverStrategyService build() { + return new TieredCacheSpilloverStrategyService(this); + } + } + +} diff --git a/server/src/main/java/org/opensearch/common/cache/tier/package-info.java b/server/src/main/java/org/opensearch/common/cache/tier/package-info.java new file mode 100644 index 0000000000000..7ad81dbe3073c --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Base package for cache tier support. */ +package org.opensearch.common.cache.tier; diff --git a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java index 77609822d3d90..c9da20f279a3c 100644 --- a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java +++ b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java @@ -39,6 +39,7 @@ import org.opensearch.core.index.shard.ShardId; import java.io.IOException; +import java.util.UUID; /** * A {@link org.apache.lucene.index.FilterDirectoryReader} that exposes @@ -51,11 +52,14 @@ public final class OpenSearchDirectoryReader extends FilterDirectoryReader { private final ShardId shardId; private final FilterDirectoryReader.SubReaderWrapper wrapper; + private DelegatingCacheHelper delegatingCacheHelper; + private OpenSearchDirectoryReader(DirectoryReader in, FilterDirectoryReader.SubReaderWrapper wrapper, ShardId shardId) throws IOException { super(in, wrapper); this.wrapper = wrapper; this.shardId = shardId; + this.delegatingCacheHelper = new DelegatingCacheHelper(in.getReaderCacheHelper()); } /** @@ -68,7 +72,61 @@ public ShardId shardId() { @Override public CacheHelper getReaderCacheHelper() { // safe to delegate since this reader does not alter the index - return in.getReaderCacheHelper(); + return this.delegatingCacheHelper; + } + + public DelegatingCacheHelper getDelegatingCacheHelper() { + return this.delegatingCacheHelper; + } + + /** + * Wraps existing IndexReader cache helper which internally provides a way to wrap CacheKey. + * @opensearch.internal + */ + public class DelegatingCacheHelper implements CacheHelper { + CacheHelper cacheHelper; + DelegatingCacheKey serializableCacheKey; + + DelegatingCacheHelper(CacheHelper cacheHelper) { + this.cacheHelper = cacheHelper; + this.serializableCacheKey = new DelegatingCacheKey(cacheHelper.getKey()); + } + + @Override + public CacheKey getKey() { + return this.cacheHelper.getKey(); + } + + public DelegatingCacheKey getDelegatingCacheKey() { + return this.serializableCacheKey; + } + + @Override + public void addClosedListener(ClosedListener listener) { + this.cacheHelper.addClosedListener(listener); + } + } + + /** + * Wraps internal IndexReader.CacheKey and attaches a uniqueId to it which can be eventually be used instead of + * object itself for serialization purposes. + */ + public class DelegatingCacheKey { + CacheKey cacheKey; + private final String uniqueId; + + DelegatingCacheKey(CacheKey cacheKey) { + this.cacheKey = cacheKey; + this.uniqueId = UUID.randomUUID().toString(); + } + + public CacheKey getCacheKey() { + return this.cacheKey; + } + + public String getId() { + return uniqueId; + } } @Override diff --git a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java index b13eec79c2be8..795d585d88647 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java +++ b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java @@ -49,6 +49,7 @@ public final class ShardRequestCache { final CounterMetric missCount = new CounterMetric(); public RequestCacheStats stats() { + // TODO: Change RequestCacheStats to support disk tier stats. return new RequestCacheStats(totalMetric.count(), evictionsMetric.count(), hitCount.count(), missCount.count()); } diff --git a/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java b/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java index bb1201cb910a9..81d4f545e0fd9 100644 --- a/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java +++ b/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java @@ -34,6 +34,7 @@ import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.cache.tier.TierType; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.shard.IndexShard; @@ -51,22 +52,26 @@ abstract class AbstractIndexShardCacheEntity implements IndicesRequestCache.Cach protected abstract ShardRequestCache stats(); @Override - public final void onCached(IndicesRequestCache.Key key, BytesReference value) { + public final void onCached(IndicesRequestCache.Key key, BytesReference value, TierType tierType) { + // TODO: Handle tierType in stats stats().onCached(key, value); } @Override - public final void onHit() { + public final void onHit(TierType tierType) { + // TODO: Handle tierType in stats stats().onHit(); } @Override - public final void onMiss() { + public final void onMiss(TierType tierType) { + // TODO: Handle tierType in stats stats().onMiss(); } @Override public final void onRemoval(RemovalNotification notification) { + // TODO: Handle tierType in stats stats().onRemoval(notification.getKey(), notification.getValue(), notification.getRemovalReason() == RemovalReason.EVICTED); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 629cea102a8b2..f3cac4cbaebcc 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -39,11 +39,14 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.RamUsageEstimator; import org.opensearch.common.CheckedSupplier; -import org.opensearch.common.cache.Cache; -import org.opensearch.common.cache.CacheBuilder; -import org.opensearch.common.cache.CacheLoader; -import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.tier.OnHeapCachingTier; +import org.opensearch.common.cache.tier.OpenSearchOnHeapCache; +import org.opensearch.common.cache.tier.TierType; +import org.opensearch.common.cache.tier.TieredCacheEventListener; +import org.opensearch.common.cache.tier.TieredCacheLoader; +import org.opensearch.common.cache.tier.TieredCacheService; +import org.opensearch.common.cache.tier.TieredCacheSpilloverStrategyService; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -51,6 +54,9 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.unit.ByteSizeValue; import java.io.Closeable; @@ -78,7 +84,7 @@ * * @opensearch.internal */ -public final class IndicesRequestCache implements RemovalListener, Closeable { +public final class IndicesRequestCache implements TieredCacheEventListener, Closeable { private static final Logger logger = LogManager.getLogger(IndicesRequestCache.class); @@ -107,25 +113,29 @@ public final class IndicesRequestCache implements RemovalListener keysToClean = ConcurrentCollections.newConcurrentSet(); private final ByteSizeValue size; private final TimeValue expire; - private final Cache cache; + private final TieredCacheService tieredCacheService; + private final IndicesService indicesService; - IndicesRequestCache(Settings settings) { + IndicesRequestCache(Settings settings, IndicesService indicesService) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; long sizeInBytes = size.getBytes(); - CacheBuilder cacheBuilder = CacheBuilder.builder() - .setMaximumWeight(sizeInBytes) - .weigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed()) - .removalListener(this); - if (expire != null) { - cacheBuilder.setExpireAfterAccess(expire); - } - cache = cacheBuilder.build(); + this.indicesService = indicesService; + + // Initialize onHeap cache tier first. + OnHeapCachingTier openSearchOnHeapCache = new OpenSearchOnHeapCache.Builder().setWeigher( + (k, v) -> k.ramBytesUsed() + v.ramBytesUsed() + ).setMaximumWeight(sizeInBytes).setExpireAfterAccess(expire).build(); + + // Initialize tiered cache service. TODO: Enable Disk tier when tiered support is turned on. + tieredCacheService = new TieredCacheSpilloverStrategyService.Builder().setOnHeapCachingTier( + openSearchOnHeapCache + ).setTieredCacheEventListener(this).build(); } @Override public void close() { - cache.invalidateAll(); + tieredCacheService.invalidateAll(); } void clear(CacheEntity entity) { @@ -133,11 +143,26 @@ void clear(CacheEntity entity) { cleanCache(); } + @Override + public void onMiss(Key key, TierType tierType) { + key.entity.onMiss(tierType); + } + @Override public void onRemoval(RemovalNotification notification) { notification.getKey().entity.onRemoval(notification); } + @Override + public void onHit(Key key, BytesReference value, TierType tierType) { + key.entity.onHit(tierType); + } + + @Override + public void onCached(Key key, BytesReference value, TierType tierType) { + key.entity.onCached(key, value, tierType); + } + BytesReference getOrCompute( CacheEntity cacheEntity, CheckedSupplier loader, @@ -145,22 +170,28 @@ BytesReference getOrCompute( BytesReference cacheKey ) throws Exception { assert reader.getReaderCacheHelper() != null; - final Key key = new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey); + assert reader.getReaderCacheHelper() instanceof OpenSearchDirectoryReader.DelegatingCacheHelper; + + OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) reader + .getReaderCacheHelper(); + String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId(); + assert readerCacheKeyId != null; + final Key key = new Key(cacheEntity, cacheKey, readerCacheKeyId); Loader cacheLoader = new Loader(cacheEntity, loader); - BytesReference value = cache.computeIfAbsent(key, cacheLoader); + BytesReference value = tieredCacheService.computeIfAbsent(key, cacheLoader); if (cacheLoader.isLoaded()) { - key.entity.onMiss(); // see if its the first time we see this reader, and make sure to register a cleanup key - CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getReaderCacheHelper().getKey()); + CleanupKey cleanupKey = new CleanupKey(cacheEntity, readerCacheKeyId); if (!registeredClosedListeners.containsKey(cleanupKey)) { Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE); if (previous == null) { OpenSearchDirectoryReader.addReaderCloseListener(reader, cleanupKey); } } - } else { - key.entity.onHit(); } + // else { + // key.entity.onHit(); + // } return value; } @@ -172,7 +203,12 @@ BytesReference getOrCompute( */ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { assert reader.getReaderCacheHelper() != null; - cache.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey)); + String readerCacheKeyId = null; + if (reader instanceof OpenSearchDirectoryReader) { + IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper(); + readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey().getId(); + } + tieredCacheService.invalidate(new Key(cacheEntity, cacheKey, readerCacheKeyId)); } /** @@ -180,7 +216,7 @@ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference * * @opensearch.internal */ - private static class Loader implements CacheLoader { + private static class Loader implements TieredCacheLoader { private final CacheEntity entity; private final CheckedSupplier loader; @@ -198,7 +234,6 @@ public boolean isLoaded() { @Override public BytesReference load(Key key) throws Exception { BytesReference value = loader.get(); - entity.onCached(key, value); loaded = true; return value; } @@ -207,12 +242,12 @@ public BytesReference load(Key key) throws Exception { /** * Basic interface to make this cache testable. */ - interface CacheEntity extends Accountable { + interface CacheEntity extends Accountable, Writeable { /** * Called after the value was loaded. */ - void onCached(Key key, BytesReference value); + void onCached(Key key, BytesReference value, TierType tierType); /** * Returns true iff the resource behind this entity is still open ie. @@ -229,17 +264,18 @@ interface CacheEntity extends Accountable { /** * Called each time this entity has a cache hit. */ - void onHit(); + void onHit(TierType tierType); /** * Called each time this entity has a cache miss. */ - void onMiss(); + void onMiss(TierType tierType); /** * Called when this entity instance is removed */ void onRemoval(RemovalNotification notification); + } /** @@ -247,17 +283,23 @@ interface CacheEntity extends Accountable { * * @opensearch.internal */ - static class Key implements Accountable { - private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); + public class Key implements Accountable { + private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); public final CacheEntity entity; // use as identity equality - public final IndexReader.CacheKey readerCacheKey; + public final String readerCacheKeyId; public final BytesReference value; - Key(CacheEntity entity, IndexReader.CacheKey readerCacheKey, BytesReference value) { + Key(CacheEntity entity, BytesReference value, String readerCacheKeyId) { this.entity = entity; - this.readerCacheKey = Objects.requireNonNull(readerCacheKey); this.value = value; + this.readerCacheKeyId = Objects.requireNonNull(readerCacheKeyId); + } + + Key(StreamInput in) throws IOException { + this.entity = in.readOptionalWriteable(in1 -> indicesService.new IndexShardCacheEntity(in1)); + this.readerCacheKeyId = in.readOptionalString(); + this.value = in.readBytesReference(); } @Override @@ -276,7 +318,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Key key = (Key) o; - if (Objects.equals(readerCacheKey, key.readerCacheKey) == false) return false; + if (Objects.equals(readerCacheKeyId, key.readerCacheKeyId) == false) return false; if (!entity.getCacheIdentity().equals(key.entity.getCacheIdentity())) return false; if (!value.equals(key.value)) return false; return true; @@ -285,7 +327,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + readerCacheKey.hashCode(); + result = 31 * result + readerCacheKeyId.hashCode(); result = 31 * result + value.hashCode(); return result; } @@ -293,11 +335,11 @@ public int hashCode() { private class CleanupKey implements IndexReader.ClosedListener { final CacheEntity entity; - final IndexReader.CacheKey readerCacheKey; + final String readerCacheKeyId; - private CleanupKey(CacheEntity entity, IndexReader.CacheKey readerCacheKey) { + private CleanupKey(CacheEntity entity, String readerCacheKeyId) { this.entity = entity; - this.readerCacheKey = readerCacheKey; + this.readerCacheKeyId = readerCacheKeyId; } @Override @@ -315,7 +357,7 @@ public boolean equals(Object o) { return false; } CleanupKey that = (CleanupKey) o; - if (Objects.equals(readerCacheKey, that.readerCacheKey) == false) return false; + if (Objects.equals(readerCacheKeyId, that.readerCacheKeyId) == false) return false; if (!entity.getCacheIdentity().equals(that.entity.getCacheIdentity())) return false; return true; } @@ -323,11 +365,14 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + Objects.hashCode(readerCacheKey); + result = 31 * result + Objects.hashCode(readerCacheKeyId); return result; } } + /** + * Logic to clean up in-memory cache. + */ synchronized void cleanCache() { final Set currentKeysToClean = new HashSet<>(); final Set currentFullClean = new HashSet<>(); @@ -336,7 +381,7 @@ synchronized void cleanCache() { for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { CleanupKey cleanupKey = iterator.next(); iterator.remove(); - if (cleanupKey.readerCacheKey == null || cleanupKey.entity.isOpen() == false) { + if (cleanupKey.readerCacheKeyId == null || cleanupKey.entity.isOpen() == false) { // null indicates full cleanup, as does a closed shard currentFullClean.add(cleanupKey.entity.getCacheIdentity()); } else { @@ -344,26 +389,25 @@ synchronized void cleanCache() { } } if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) { - for (Iterator iterator = cache.keys().iterator(); iterator.hasNext();) { + for (Iterator iterator = tieredCacheService.getOnHeapCachingTier().keys().iterator(); iterator.hasNext();) { Key key = iterator.next(); if (currentFullClean.contains(key.entity.getCacheIdentity())) { iterator.remove(); } else { - if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKey))) { + if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKeyId))) { iterator.remove(); } } } } - - cache.refresh(); + tieredCacheService.getOnHeapCachingTier().refresh(); } /** * Returns the current size of the cache */ - int count() { - return cache.count(); + long count() { + return tieredCacheService.count(); } int numRegisteredCloseListeners() { // for testing diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index a72142e65c5e8..f5e71327b6e7b 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -391,7 +391,7 @@ public IndicesService( this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS)); this.analysisRegistry = analysisRegistry; this.indexNameExpressionResolver = indexNameExpressionResolver; - this.indicesRequestCache = new IndicesRequestCache(settings); + this.indicesRequestCache = new IndicesRequestCache(settings, this); this.indicesQueryCache = new IndicesQueryCache(settings); this.mapperRegistry = mapperRegistry; this.namedWriteableRegistry = namedWriteableRegistry; @@ -1746,14 +1746,21 @@ private BytesReference cacheShardLevelResult( * * @opensearch.internal */ - static final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity { - private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class); + public final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity { + private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class); private final IndexShard indexShard; - protected IndexShardCacheEntity(IndexShard indexShard) { + public IndexShardCacheEntity(IndexShard indexShard) { this.indexShard = indexShard; } + public IndexShardCacheEntity(StreamInput in) throws IOException { + Index index = in.readOptionalWriteable(Index::new); + int shardId = in.readVInt(); + IndexService indexService = indices.get(index.getUUID()); + this.indexShard = Optional.ofNullable(indexService).map(indexService1 -> indexService1.getShard(shardId)).orElse(null); + } + @Override protected ShardRequestCache stats() { return indexShard.requestCache(); @@ -1775,6 +1782,12 @@ public long ramBytesUsed() { // across many entities return BASE_RAM_BYTES_USED; } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(indexShard.shardId().getIndex()); + out.writeVInt(indexShard.shardId().id()); + } } @FunctionalInterface diff --git a/server/src/test/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyServiceTests.java b/server/src/test/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyServiceTests.java new file mode 100644 index 0000000000000..3cd08df649f72 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyServiceTests.java @@ -0,0 +1,458 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +public class TieredCacheSpilloverStrategyServiceTests extends OpenSearchTestCase { + + public void testComputeAndAbsentWithoutAnyOnHeapCacheEviction() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + MockTieredCacheEventListener eventListener = new MockTieredCacheEventListener(); + TieredCacheSpilloverStrategyService spilloverStrategyService = intializeTieredCacheService( + onHeapCacheSize, + randomIntBetween(1, 4), + eventListener + ); + int numOfItems1 = randomIntBetween(1, onHeapCacheSize / 2 - 1); + List keys = new ArrayList<>(); + // Put values in cache. + for (int iter = 0; iter < numOfItems1; iter++) { + String key = UUID.randomUUID().toString(); + keys.add(key); + TieredCacheLoader tieredCacheLoader = getTieredCacheLoader(); + spilloverStrategyService.computeIfAbsent(key, tieredCacheLoader); + } + assertEquals(numOfItems1, eventListener.enumMap.get(TierType.ON_HEAP).missCount.count()); + assertEquals(0, eventListener.enumMap.get(TierType.ON_HEAP).hitCount.count()); + assertEquals(0, eventListener.enumMap.get(TierType.ON_HEAP).evictionsMetric.count()); + + // Try to hit cache again with some randomization. + int numOfItems2 = randomIntBetween(1, onHeapCacheSize / 2 - 1); + int cacheHit = 0; + int cacheMiss = 0; + for (int iter = 0; iter < numOfItems2; iter++) { + if (randomBoolean()) { + // Hit cache with stored key + cacheHit++; + int index = randomIntBetween(0, keys.size() - 1); + spilloverStrategyService.computeIfAbsent(keys.get(index), getTieredCacheLoader()); + } else { + // Hit cache with randomized key which is expected to miss cache always. + spilloverStrategyService.computeIfAbsent(UUID.randomUUID().toString(), getTieredCacheLoader()); + cacheMiss++; + } + } + assertEquals(cacheHit, eventListener.enumMap.get(TierType.ON_HEAP).hitCount.count()); + assertEquals(numOfItems1 + cacheMiss, eventListener.enumMap.get(TierType.ON_HEAP).missCount.count()); + assertEquals(0, eventListener.enumMap.get(TierType.ON_HEAP).evictionsMetric.count()); + } + + public void testComputeAndAbsentWithEvictionsFromOnHeapCache() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(60, 100); + int totalSize = onHeapCacheSize + diskCacheSize; + MockTieredCacheEventListener eventListener = new MockTieredCacheEventListener(); + TieredCacheSpilloverStrategyService spilloverStrategyService = intializeTieredCacheService( + onHeapCacheSize, + diskCacheSize, + eventListener + ); + + // Put values in cache more than it's size and cause evictions from onHeap. + int numOfItems1 = randomIntBetween(onHeapCacheSize + 1, totalSize); + List onHeapKeys = new ArrayList<>(); + List diskTierKeys = new ArrayList<>(); + for (int iter = 0; iter < numOfItems1; iter++) { + String key = UUID.randomUUID().toString(); + if (iter > (onHeapCacheSize - 1)) { + // All these are bound to go to disk based cache. + diskTierKeys.add(key); + } else { + onHeapKeys.add(key); + } + TieredCacheLoader tieredCacheLoader = getTieredCacheLoader(); + spilloverStrategyService.computeIfAbsent(key, tieredCacheLoader); + } + assertEquals(numOfItems1, eventListener.enumMap.get(TierType.ON_HEAP).missCount.count()); + assertEquals(0, eventListener.enumMap.get(TierType.ON_HEAP).hitCount.count()); + assertTrue(eventListener.enumMap.get(TierType.ON_HEAP).evictionsMetric.count() > 0); + + assertEquals( + eventListener.enumMap.get(TierType.ON_HEAP).evictionsMetric.count(), + eventListener.enumMap.get(TierType.DISK).cachedCount.count() + ); + assertEquals(diskTierKeys.size(), eventListener.enumMap.get(TierType.DISK).cachedCount.count()); + + // Try to hit cache again with some randomization. + int numOfItems2 = randomIntBetween(50, 200); + int onHeapCacheHit = 0; + int diskCacheHit = 0; + int cacheMiss = 0; + for (int iter = 0; iter < numOfItems2; iter++) { + if (randomBoolean()) { + if (randomBoolean()) { // Hit cache with key stored in onHeap cache. + onHeapCacheHit++; + int index = randomIntBetween(0, onHeapKeys.size() - 1); + spilloverStrategyService.computeIfAbsent(onHeapKeys.get(index), getTieredCacheLoader()); + } else { // Hit cache with key stored in disk cache. + diskCacheHit++; + int index = randomIntBetween(0, diskTierKeys.size() - 1); + spilloverStrategyService.computeIfAbsent(diskTierKeys.get(index), getTieredCacheLoader()); + } + } else { + // Hit cache with randomized key which is expected to miss cache always. + TieredCacheLoader tieredCacheLoader = getTieredCacheLoader(); + spilloverStrategyService.computeIfAbsent(UUID.randomUUID().toString(), tieredCacheLoader); + cacheMiss++; + } + } + // On heap cache misses would also include diskCacheHits as it means it missed onHeap cache. + assertEquals(numOfItems1 + cacheMiss + diskCacheHit, eventListener.enumMap.get(TierType.ON_HEAP).missCount.count()); + assertEquals(onHeapCacheHit, eventListener.enumMap.get(TierType.ON_HEAP).hitCount.count()); + assertEquals(cacheMiss + numOfItems1, eventListener.enumMap.get(TierType.DISK).missCount.count()); + assertEquals(diskCacheHit, eventListener.enumMap.get(TierType.DISK).hitCount.count()); + } + + public void testComputeAndAbsentWithEvictionsFromBothTier() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(onHeapCacheSize + 1, 100); + int totalSize = onHeapCacheSize + diskCacheSize; + + MockTieredCacheEventListener eventListener = new MockTieredCacheEventListener(); + TieredCacheSpilloverStrategyService spilloverStrategyService = intializeTieredCacheService( + onHeapCacheSize, + diskCacheSize, + eventListener + ); + + int numOfItems = randomIntBetween(totalSize + 1, totalSize * 3); + for (int iter = 0; iter < numOfItems; iter++) { + TieredCacheLoader tieredCacheLoader = getTieredCacheLoader(); + spilloverStrategyService.computeIfAbsent(UUID.randomUUID().toString(), tieredCacheLoader); + } + assertTrue(eventListener.enumMap.get(TierType.ON_HEAP).evictionsMetric.count() > 0); + assertTrue(eventListener.enumMap.get(TierType.DISK).evictionsMetric.count() > 0); + } + + public void testGetAndCount() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(onHeapCacheSize + 1, 100); + int totalSize = onHeapCacheSize + diskCacheSize; + + MockTieredCacheEventListener eventListener = new MockTieredCacheEventListener(); + TieredCacheSpilloverStrategyService spilloverStrategyService = intializeTieredCacheService( + onHeapCacheSize, + diskCacheSize, + eventListener + ); + + int numOfItems1 = randomIntBetween(onHeapCacheSize + 1, totalSize); + List onHeapKeys = new ArrayList<>(); + List diskTierKeys = new ArrayList<>(); + for (int iter = 0; iter < numOfItems1; iter++) { + String key = UUID.randomUUID().toString(); + if (iter > (onHeapCacheSize - 1)) { + // All these are bound to go to disk based cache. + diskTierKeys.add(key); + } else { + onHeapKeys.add(key); + } + TieredCacheLoader tieredCacheLoader = getTieredCacheLoader(); + spilloverStrategyService.computeIfAbsent(key, tieredCacheLoader); + } + + for (int iter = 0; iter < numOfItems1; iter++) { + if (randomBoolean()) { + if (randomBoolean()) { + int index = randomIntBetween(0, onHeapKeys.size() - 1); + assertNotNull(spilloverStrategyService.get(onHeapKeys.get(index))); + } else { + int index = randomIntBetween(0, diskTierKeys.size() - 1); + assertNotNull(spilloverStrategyService.get(diskTierKeys.get(index))); + } + } else { + assertNull(spilloverStrategyService.get(UUID.randomUUID().toString())); + } + } + assertEquals(numOfItems1, spilloverStrategyService.count()); + } + + public void testWithDiskTierNull() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + MockTieredCacheEventListener eventListener = new MockTieredCacheEventListener(); + TieredCacheSpilloverStrategyService spilloverStrategyService = new TieredCacheSpilloverStrategyService.Builder< + String, + String>().setOnHeapCachingTier(new MockOnHeapCacheTier<>(onHeapCacheSize)).setTieredCacheEventListener(eventListener).build(); + int numOfItems = randomIntBetween(onHeapCacheSize + 1, onHeapCacheSize * 3); + for (int iter = 0; iter < numOfItems; iter++) { + TieredCacheLoader tieredCacheLoader = getTieredCacheLoader(); + spilloverStrategyService.computeIfAbsent(UUID.randomUUID().toString(), tieredCacheLoader); + } + assertTrue(eventListener.enumMap.get(TierType.ON_HEAP).evictionsMetric.count() > 0); + assertEquals(0, eventListener.enumMap.get(TierType.DISK).cachedCount.count()); + assertEquals(0, eventListener.enumMap.get(TierType.DISK).evictionsMetric.count()); + assertEquals(0, eventListener.enumMap.get(TierType.DISK).missCount.count()); + } + + private TieredCacheLoader getTieredCacheLoader() { + return new TieredCacheLoader() { + boolean isLoaded = false; + + @Override + public String load(String key) { + isLoaded = true; + return UUID.randomUUID().toString(); + } + + @Override + public boolean isLoaded() { + return isLoaded; + } + }; + } + + private TieredCacheSpilloverStrategyService intializeTieredCacheService( + int onHeapCacheSize, + int diksCacheSize, + TieredCacheEventListener cacheEventListener + ) { + DiskCachingTier diskCache = new MockDiskCachingTier<>(diksCacheSize); + OnHeapCachingTier openSearchOnHeapCache = new MockOnHeapCacheTier<>(onHeapCacheSize); + return new TieredCacheSpilloverStrategyService.Builder().setOnHeapCachingTier(openSearchOnHeapCache) + .setOnDiskCachingTier(diskCache) + .setTieredCacheEventListener(cacheEventListener) + .build(); + } + + class MockOnHeapCacheTier implements OnHeapCachingTier, RemovalListener { + + Map onHeapCacheTier; + int maxSize; + private RemovalListener removalListener; + + MockOnHeapCacheTier(int size) { + maxSize = size; + this.onHeapCacheTier = new ConcurrentHashMap(); + } + + @Override + public V get(K key) { + return this.onHeapCacheTier.get(key); + } + + @Override + public void put(K key, V value) { + this.onHeapCacheTier.put(key, value); + } + + @Override + public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception { + if (this.onHeapCacheTier.size() > maxSize) { // If it exceeds, just notify for evict. + onRemoval(new RemovalNotification<>(key, loader.load(key), RemovalReason.EVICTED, TierType.ON_HEAP)); + return loader.load(key); + } + return this.onHeapCacheTier.computeIfAbsent(key, k -> { + try { + return loader.load(key); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public void invalidate(K key) { + this.onHeapCacheTier.remove(key); + } + + @Override + public V compute(K key, TieredCacheLoader loader) throws Exception { + if (this.onHeapCacheTier.size() >= maxSize) { // If it exceeds, just notify for evict. + onRemoval(new RemovalNotification<>(key, loader.load(key), RemovalReason.EVICTED, TierType.ON_HEAP)); + return loader.load(key); + } + return this.onHeapCacheTier.compute(key, ((k, v) -> { + try { + return loader.load(key); + } catch (Exception e) { + throw new RuntimeException(e); + } + })); + } + + @Override + public void setRemovalListener(RemovalListener removalListener) { + this.removalListener = removalListener; + } + + @Override + public void invalidateAll() { + this.onHeapCacheTier.clear(); + } + + @Override + public Iterable keys() { + return this.onHeapCacheTier.keySet(); + } + + @Override + public int count() { + return this.onHeapCacheTier.size(); + } + + @Override + public TierType getTierType() { + return TierType.ON_HEAP; + } + + @Override + public void onRemoval(RemovalNotification notification) { + removalListener.onRemoval(notification); + } + } + + class MockTieredCacheEventListener implements TieredCacheEventListener { + + EnumMap enumMap = new EnumMap<>(TierType.class); + + MockTieredCacheEventListener() { + for (TierType tierType : TierType.values()) { + enumMap.put(tierType, new TestStatsHolder()); + } + } + + @Override + public void onMiss(K key, TierType tierType) { + enumMap.get(tierType).missCount.inc(); + } + + @Override + public void onRemoval(RemovalNotification notification) { + if (notification.getRemovalReason().equals(RemovalReason.EVICTED)) { + enumMap.get(notification.getTierType()).evictionsMetric.inc(); + } + } + + @Override + public void onHit(K key, V value, TierType tierType) { + enumMap.get(tierType).hitCount.inc(); + } + + @Override + public void onCached(K key, V value, TierType tierType) { + enumMap.get(tierType).cachedCount.inc(); + } + + class TestStatsHolder { + final CounterMetric evictionsMetric = new CounterMetric(); + final CounterMetric hitCount = new CounterMetric(); + final CounterMetric missCount = new CounterMetric(); + + final CounterMetric cachedCount = new CounterMetric(); + } + } + + class MockDiskCachingTier implements DiskCachingTier, RemovalListener { + Map diskTier; + private RemovalListener removalListener; + int maxSize; + + MockDiskCachingTier(int size) { + this.maxSize = size; + diskTier = new ConcurrentHashMap(); + } + + @Override + public V get(K key) { + return this.diskTier.get(key); + } + + @Override + public void put(K key, V value) { + if (this.diskTier.size() >= maxSize) { // For simplification + onRemoval(new RemovalNotification<>(key, value, RemovalReason.EVICTED, TierType.DISK)); + return; + } + this.diskTier.put(key, value); + } + + @Override + public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception { + return this.diskTier.computeIfAbsent(key, k -> { + try { + return loader.load(k); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public void invalidate(K key) { + this.diskTier.remove(key); + } + + @Override + public V compute(K key, TieredCacheLoader loader) throws Exception { + if (this.diskTier.size() >= maxSize) { // If it exceeds, just notify for evict. + onRemoval(new RemovalNotification<>(key, loader.load(key), RemovalReason.EVICTED, TierType.DISK)); + return loader.load(key); + } + return this.diskTier.compute(key, (k, v) -> { + try { + return loader.load(key); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public void setRemovalListener(RemovalListener removalListener) { + this.removalListener = removalListener; + } + + @Override + public void invalidateAll() { + this.diskTier.clear(); + } + + @Override + public Iterable keys() { + return null; + } + + @Override + public int count() { + return this.diskTier.size(); + } + + @Override + public TierType getTierType() { + return TierType.DISK; + } + + @Override + public void onRemoval(RemovalNotification notification) { + this.removalListener.onRemoval(notification); + } + } +} diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 8494259c8fd8a..0dc4669c27a18 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -52,23 +52,28 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.bytes.AbstractBytesReference; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentHelper; +import org.opensearch.index.IndexService; import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.query.TermQueryBuilder; -import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; import java.util.Arrays; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; -public class IndicesRequestCacheTests extends OpenSearchTestCase { +public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase { public void testBasicOperationsCache() throws Exception { ShardRequestCache requestCacheStats = new ShardRequestCache(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -122,7 +127,7 @@ public void testBasicOperationsCache() throws Exception { } public void testCacheDifferentReaders() throws Exception { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); @@ -218,7 +223,7 @@ public void testCacheDifferentReaders() throws Exception { public void testEviction() throws Exception { final ByteSizeValue size; { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); @@ -244,7 +249,8 @@ public void testEviction() throws Exception { IOUtils.close(reader, secondReader, writer, dir, cache); } IndicesRequestCache cache = new IndicesRequestCache( - Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build() + Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), + null ); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); @@ -281,7 +287,7 @@ public void testEviction() throws Exception { } public void testClearAllEntityIdentity() throws Exception { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); @@ -366,7 +372,7 @@ public BytesReference get() { public void testInvalidate() throws Exception { ShardRequestCache requestCacheStats = new ShardRequestCache(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -435,20 +441,23 @@ public void testInvalidate() throws Exception { public void testEqualsKey() throws IOException { AtomicBoolean trueBoolean = new AtomicBoolean(true); AtomicBoolean falseBoolean = new AtomicBoolean(false); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndicesRequestCache indicesRequestCache = indicesService.indicesRequestCache; Directory dir = newDirectory(); IndexWriterConfig config = newIndexWriterConfig(); IndexWriter writer = new IndexWriter(dir, config); - IndexReader reader1 = DirectoryReader.open(writer); - IndexReader.CacheKey rKey1 = reader1.getReaderCacheHelper().getKey(); + ShardId shardId = new ShardId("foo", "bar", 1); + IndexReader reader1 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + String rKey1 = ((OpenSearchDirectoryReader) reader1).getDelegatingCacheHelper().getDelegatingCacheKey().getId(); writer.addDocument(new Document()); - IndexReader reader2 = DirectoryReader.open(writer); - IndexReader.CacheKey rKey2 = reader2.getReaderCacheHelper().getKey(); + IndexReader reader2 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + String rKey2 = ((OpenSearchDirectoryReader) reader2).getDelegatingCacheHelper().getDelegatingCacheKey().getId(); IOUtils.close(reader1, reader2, writer, dir); - IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(new TestEntity(null, falseBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey2, new TestBytesReference(1)); - IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(2)); + IndicesRequestCache.Key key1 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key2 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key3 = indicesRequestCache.new Key(new TestEntity(null, falseBoolean), new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key4 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey2); + IndicesRequestCache.Key key5 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(2), rKey2); String s = "Some other random object"; assertEquals(key1, key1); assertEquals(key1, key2); @@ -538,5 +547,8 @@ public Object getCacheIdentity() { public long ramBytesUsed() { return 42; } + + @Override + public void writeTo(StreamOutput out) throws IOException {} } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java index 415844dccb611..5dd4eb504ec2f 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java @@ -37,10 +37,12 @@ import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.tier.TierType; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexModule; @@ -59,6 +61,7 @@ import org.opensearch.test.hamcrest.OpenSearchAssertions; import org.opensearch.transport.nio.MockNioTransportPlugin; +import java.io.IOException; import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; @@ -315,13 +318,18 @@ public void testCloseWhileOngoingRequestUsesRequestCache() throws Exception { assertEquals(0L, cache.count()); IndicesRequestCache.CacheEntity cacheEntity = new IndicesRequestCache.CacheEntity() { + @Override + public void writeTo(StreamOutput out) throws IOException { + + } + @Override public long ramBytesUsed() { return 42; } @Override - public void onCached(Key key, BytesReference value) {} + public void onCached(Key key, BytesReference value, TierType tierType) {} @Override public boolean isOpen() { @@ -334,10 +342,10 @@ public Object getCacheIdentity() { } @Override - public void onHit() {} + public void onHit(TierType tierType) {} @Override - public void onMiss() {} + public void onMiss(TierType tierType) {} @Override public void onRemoval(RemovalNotification notification) {}