Skip to content

Commit

Permalink
Add cache size unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
LGouellec committed Jul 4, 2024
1 parent 5d3207a commit d04be34
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 25 deletions.
6 changes: 3 additions & 3 deletions core/ProcessorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ public class ProcessorContext
/// Returns the state directory for the partition.
/// </summary>
public virtual string StateDir => $"{Path.Combine(Configuration.StateDir, Configuration.ApplicationId, Id.ToString())}";

// FOR TESTING

internal ProcessorContext()
{

}

internal ProcessorContext(AbstractTask task, IStreamConfig configuration, IStateManager stateManager,
StreamMetricsRegistry streamMetricsRegistry)
{
Expand Down
39 changes: 20 additions & 19 deletions core/State/Cache/CachingKeyValueStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ internal class CachingKeyValueStore :
ICachedStateStore<byte[], byte[]>
{
private readonly CacheSize _cacheSize;
private MemoryCache<Bytes, CacheEntryValue> cache;
private Action<KeyValuePair<byte[], Change<byte[]>>> flushListener;
private bool sendOldValue;
private bool cachingEnabled;

private Sensor hitRatioSensor = NoRunnableSensor.Empty;
private Sensor totalCacheSizeSensor = NoRunnableSensor.Empty;


internal MemoryCache<Bytes, CacheEntryValue> Cache { get; private set; }

public CachingKeyValueStore(IKeyValueStore<Bytes, byte[]> wrapped, CacheSize cacheSize)
: base(wrapped)
{
Expand Down Expand Up @@ -58,7 +59,7 @@ internal void CreateCache(ProcessorContext context)
cachingEnabled = context.Configuration.DefaultStateStoreCacheMaxBytes > 0 ||
_cacheSize is { CacheSizeBytes: > 0 };
if(cachingEnabled)
cache = new MemoryCache<Bytes, CacheEntryValue>(new MemoryCacheOptions {
Cache = new MemoryCache<Bytes, CacheEntryValue>(new MemoryCacheOptions {
SizeLimit = _cacheSize is { CacheSizeBytes: > 0 } ? _cacheSize.CacheSizeBytes : context.Configuration.DefaultStateStoreCacheMaxBytes,
CompactionPercentage = .20
}, new BytesComparer());
Expand All @@ -70,7 +71,7 @@ private byte[] GetInternal(Bytes key)
{
byte[] value;

if (cache.TryGetValue(key, out CacheEntryValue priorEntry))
if (Cache.TryGetValue(key, out CacheEntryValue priorEntry))
value = priorEntry.Value;
else
{
Expand All @@ -79,7 +80,7 @@ private byte[] GetInternal(Bytes key)
PutInternal(key, new CacheEntryValue(value), true);
}

var currentStat = cache.GetCurrentStatistics();
var currentStat = Cache.GetCurrentStatistics();
hitRatioSensor.Record((double)currentStat.TotalHits / (currentStat.TotalMisses + currentStat.TotalHits));

return value;
Expand All @@ -97,7 +98,7 @@ public override void Init(ProcessorContext context, IStateStore root)

private void UpdateRatioSensor()
{
var currentStat = cache.GetCurrentStatistics();
var currentStat = Cache.GetCurrentStatistics();
hitRatioSensor.Record((double)currentStat.TotalHits / (currentStat.TotalMisses + currentStat.TotalHits));
}

Expand Down Expand Up @@ -131,14 +132,14 @@ private void CacheEntryEviction(Bytes key, CacheEntryValue value, EvictionReason
context.SetRecordMetaData(currentContext);
}

totalCacheSizeSensor.Record(cache.Size);
totalCacheSizeSensor.Record(Cache.Size);
}

public override void Flush()
{
if (cachingEnabled)
{
cache.Compact(1); // Compact 100% of the cache
Cache.Compact(1); // Compact 100% of the cache
base.Flush();
}
else
Expand All @@ -155,8 +156,8 @@ public IKeyValueEnumerator<Bytes, byte[]> Range(Bytes from, Bytes to)
var storeEnumerator = wrapped.Range(from, to);
var cacheEnumerator =
new CacheEnumerator<Bytes, CacheEntryValue>(
cache.KeyRange(from, to, true, true),
cache,
Cache.KeyRange(from, to, true, true),
Cache,
UpdateRatioSensor);

return new MergedStoredCacheKeyValueEnumerator(cacheEnumerator, storeEnumerator, true);
Expand All @@ -171,8 +172,8 @@ public IKeyValueEnumerator<Bytes, byte[]> ReverseRange(Bytes from, Bytes to)
var storeEnumerator = wrapped.ReverseRange(from, to);
var cacheEnumerator =
new CacheEnumerator<Bytes, CacheEntryValue>(
cache.KeyRange(from, to, true, false),
cache,
Cache.KeyRange(from, to, true, false),
Cache,
UpdateRatioSensor);

return new MergedStoredCacheKeyValueEnumerator(cacheEnumerator, storeEnumerator, false);
Expand All @@ -185,8 +186,8 @@ private IEnumerable<KeyValuePair<Bytes, byte[]>> InternalAll(bool reverse)
{
var storeEnumerator = new WrapEnumerableKeyValueEnumerator<Bytes, byte[]>(wrapped.All());
var cacheEnumerator = new CacheEnumerator<Bytes, CacheEntryValue>(
cache.KeySetEnumerable(reverse),
cache,
Cache.KeySetEnumerable(reverse),
Cache,
UpdateRatioSensor);

var mergedEnumerator = new MergedStoredCacheKeyValueEnumerator(cacheEnumerator, storeEnumerator, reverse);
Expand All @@ -210,7 +211,7 @@ public IEnumerable<KeyValuePair<Bytes, byte[]>> ReverseAll()
return wrapped.ReverseAll();
}

public long ApproximateNumEntries() => cachingEnabled ? cache.Count : wrapped.ApproximateNumEntries();
public long ApproximateNumEntries() => cachingEnabled ? Cache.Count : wrapped.ApproximateNumEntries();

public void Put(Bytes key, byte[] value)
{
Expand All @@ -236,10 +237,10 @@ private void PutInternal(Bytes key, CacheEntryValue entry, bool fromWrappedCache

var memoryCacheEntryOptions = new MemoryCacheEntryOptions<Bytes, CacheEntryValue>()
.SetSize(totalSize)
.RegisterPostEvictionCallback(CacheEntryEviction, cache);
.RegisterPostEvictionCallback(CacheEntryEviction, Cache);

cache.Set(key, entry, memoryCacheEntryOptions, fromWrappedCache ? EvictionReason.None : EvictionReason.Setted);
totalCacheSizeSensor.Record(cache.Size);
Cache.Set(key, entry, memoryCacheEntryOptions, fromWrappedCache ? EvictionReason.None : EvictionReason.Setted);
totalCacheSizeSensor.Record(Cache.Size);
}

public byte[] PutIfAbsent(Bytes key, byte[] value)
Expand Down Expand Up @@ -282,7 +283,7 @@ public byte[] Delete(Bytes key)
{
if (cachingEnabled)
{
cache.Dispose();
Cache.Dispose();
base.Close();
}
else
Expand Down
4 changes: 3 additions & 1 deletion core/State/Cache/Internal/MemoryCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ internal sealed class MemoryCache<K, V> : IMemoryCache<K, V>
private readonly IComparer<K> _keyComparer;
private readonly IClockTime _clockTime;
internal readonly ILogger Logger;

private readonly MemoryCacheOptions _options;

internal long Capacity => _options.SizeLimit;

private readonly List<WeakReference<Stats>> _allStats;
private readonly Stats _accumulatedStats;
private readonly ThreadLocal<Stats> _stats;
Expand Down
2 changes: 1 addition & 1 deletion core/Table/CacheSize.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace Streamiz.Kafka.Net.Table
{
public class CacheSize
{
internal long CacheSizeBytes { get; private set; }
public long CacheSizeBytes { get; private set; }

public static CacheSize OfB(int bytes) => new CacheSize { CacheSizeBytes = bytes };
public static CacheSize OfKb(int kilobytes) => new CacheSize { CacheSizeBytes = kilobytes * 1024 };
Expand Down
20 changes: 20 additions & 0 deletions test/Streamiz.Kafka.Net.Tests/Helpers/MockProcessorContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Collections.Generic;
using Confluent.Kafka;
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Mock;
using Streamiz.Kafka.Net.Processors;
using Streamiz.Kafka.Net.Processors.Internal;

namespace Streamiz.Kafka.Net.Tests.Helpers;

public class MockProcessorContext : ProcessorContext
{
public MockProcessorContext(TaskId id, StreamConfig config)
: base(UnassignedStreamTask.Create(), config, new ProcessorStateManager(
id,
new List<TopicPartition>(),
new Dictionary<string, string>(),
new MockChangelogRegister(),
new MockOffsetCheckpointManager()), new StreamMetricsRegistry())
{ }
}
61 changes: 61 additions & 0 deletions test/Streamiz.Kafka.Net.Tests/Public/CacheSizeTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using NUnit.Framework;
using Streamiz.Kafka.Net.Processors.Internal;
using Streamiz.Kafka.Net.State.Cache;
using Streamiz.Kafka.Net.State.InMemory;
using Streamiz.Kafka.Net.Table;
using Streamiz.Kafka.Net.Tests.Helpers;

namespace Streamiz.Kafka.Net.Tests.Public;

public class CacheSizeTests
{
[Test]
public void CacheSizeKVStoreEnabledWithDefaultConf()
{
var config = new StreamConfig();
config.DefaultStateStoreCacheMaxBytes = CacheSize.OfMb(1).CacheSizeBytes;
var context = new MockProcessorContext(new TaskId { Id = 0, Partition = 0 }, config);
var inMemoryKeyValue = new InMemoryKeyValueStore("store");
var cache = new CachingKeyValueStore(inMemoryKeyValue, null);
cache.Init(context, cache);
Assert.IsTrue(cache.IsCachedStore);
Assert.AreEqual(CacheSize.OfMb(1).CacheSizeBytes, cache.Cache.Capacity);
}

[Test]
public void CacheSizeKVStoreEnabledWithSpecificConf()
{
var config = new StreamConfig();
config.DefaultStateStoreCacheMaxBytes = CacheSize.OfMb(1).CacheSizeBytes;
var context = new MockProcessorContext(new TaskId { Id = 0, Partition = 0 }, config);
var inMemoryKeyValue = new InMemoryKeyValueStore("store");
var cache = new CachingKeyValueStore(inMemoryKeyValue, CacheSize.OfMb(10));
cache.Init(context, cache);
Assert.IsTrue(cache.IsCachedStore);
Assert.AreEqual(CacheSize.OfMb(10).CacheSizeBytes, cache.Cache.Capacity);
}

[Test]
public void CacheSizeKVStoreDisabled()
{
var config = new StreamConfig();
config.DefaultStateStoreCacheMaxBytes = 0L;
var context = new MockProcessorContext(new TaskId { Id = 0, Partition = 0 }, config);
var inMemoryKeyValue = new InMemoryKeyValueStore("store");
var cache = new CachingKeyValueStore(inMemoryKeyValue, null);
cache.Init(context, cache);
Assert.IsFalse(cache.IsCachedStore);
}

[Test]
public void CacheSizeKVStoreDisabledExplicitConf()
{
var config = new StreamConfig();
config.DefaultStateStoreCacheMaxBytes = 0L;
var context = new MockProcessorContext(new TaskId { Id = 0, Partition = 0 }, config);
var inMemoryKeyValue = new InMemoryKeyValueStore("store");
var cache = new CachingKeyValueStore(inMemoryKeyValue, CacheSize.OfB(0));
cache.Init(context, cache);
Assert.IsFalse(cache.IsCachedStore);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

namespace Streamiz.Kafka.Net.Tests.Stores
{
// TODO : add test add event from internal wrapped store and flush cache store
public class CacheKeyValueStoreTests
{
private StreamConfig config;
Expand Down

0 comments on commit d04be34

Please sign in to comment.