diff --git a/cs/FASTER.sln b/cs/FASTER.sln index b724c1553..36476020a 100644 --- a/cs/FASTER.sln +++ b/cs/FASTER.sln @@ -112,6 +112,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "root", "root", "{CEDB9572-7 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ResizableCacheStore", "samples\ResizableCacheStore\ResizableCacheStore.csproj", "{B4A55211-5457-44B9-8BCB-A5488C994965}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MemoryDb", "playground\MemoryDb\MemoryDb.csproj", "{6315094C-66B1-410E-9ACE-9078AF7A77CA}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -335,6 +337,14 @@ Global {B4A55211-5457-44B9-8BCB-A5488C994965}.Release|Any CPU.Build.0 = Release|x64 {B4A55211-5457-44B9-8BCB-A5488C994965}.Release|x64.ActiveCfg = Release|x64 {B4A55211-5457-44B9-8BCB-A5488C994965}.Release|x64.Build.0 = Release|x64 + {6315094C-66B1-410E-9ACE-9078AF7A77CA}.Debug|Any CPU.ActiveCfg = Debug|x64 + {6315094C-66B1-410E-9ACE-9078AF7A77CA}.Debug|Any CPU.Build.0 = Debug|x64 + {6315094C-66B1-410E-9ACE-9078AF7A77CA}.Debug|x64.ActiveCfg = Debug|x64 + {6315094C-66B1-410E-9ACE-9078AF7A77CA}.Debug|x64.Build.0 = Debug|x64 + {6315094C-66B1-410E-9ACE-9078AF7A77CA}.Release|Any CPU.ActiveCfg = Release|x64 + {6315094C-66B1-410E-9ACE-9078AF7A77CA}.Release|Any CPU.Build.0 = Release|x64 + {6315094C-66B1-410E-9ACE-9078AF7A77CA}.Release|x64.ActiveCfg = Release|x64 + {6315094C-66B1-410E-9ACE-9078AF7A77CA}.Release|x64.Build.0 = Release|x64 EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -369,6 +379,7 @@ Global {DC3E0640-9A36-43D0-AA37-A1B61B0BFBC9} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE} {AF996720-DB6C-4ED7-9693-B9531F0B119A} = {5E4C9997-3350-4761-9FC9-F27649848B1D} {B4A55211-5457-44B9-8BCB-A5488C994965} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE} + {6315094C-66B1-410E-9ACE-9078AF7A77CA} = {E6026D6A-01C5-4582-B2C1-64751490DABE} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC} diff --git a/cs/playground/MemoryDb/MemoryDb.csproj b/cs/playground/MemoryDb/MemoryDb.csproj new file mode 100644 index 000000000..945639462 --- /dev/null +++ b/cs/playground/MemoryDb/MemoryDb.csproj @@ -0,0 +1,19 @@ + + + + Exe + net7.0 + x64 + latest + true + enable + true + true + enable + + + + + + + diff --git a/cs/playground/MemoryDb/Program.cs b/cs/playground/MemoryDb/Program.cs new file mode 100644 index 000000000..f9d63ad2c --- /dev/null +++ b/cs/playground/MemoryDb/Program.cs @@ -0,0 +1,233 @@ +using FASTER.core; +using System.Buffers.Binary; +using System.Buffers; +using System.Collections.Generic; +using System.IO; +using System.Threading.Tasks; +using System; + +namespace ConsoleApp72; + +using MySession = ClientSession, Memory, Memory, (IMemoryOwner, int), Empty, UserDataFunctions>>; + +internal class Program +{ + static async Task Main() + { + var sourceDbPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "source"); + + WriteSimpleSourceDb(sourceDbPath); + + for (var i = 0; i < 100; i++) + { + var dbPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"faster-{i}"); + try + { + await Write(sourceDbPath, dbPath); + } + finally + { + try + { + if (Directory.Exists(dbPath)) + Directory.Delete(dbPath, true); + } + catch + { + // ignore + } + } + } + } + + private static readonly Random random = new(); + + private static void WriteSimpleSourceDb(string sourceDbPath) + { + Console.WriteLine($"Source: {sourceDbPath}"); + + if (Directory.Exists(sourceDbPath)) + Directory.Delete(sourceDbPath, true); + + using var fasterLogSettings = new FasterLogSettings(sourceDbPath); + using var fasterLog = new FasterLog(fasterLogSettings); + + for (var index = 0; index < 500_000; index++) + { + using var valueBuffer = MemoryPool.Shared.Rent(1024); + + random.NextBytes(valueBuffer.Memory.Span.Slice(0, 1024)); + + fasterLog.Enqueue(valueBuffer.Memory.Span.Slice(0, 1024)); + } + + fasterLog.Commit(true); + } + + private const int SecondaryIndexCnt = 5; + + private static async Task Write(string sourceDbPath, string dbPath) + { + Console.WriteLine(dbPath); + + if (Directory.Exists(dbPath)) + Directory.Delete(dbPath, true); + + var mainDbPath = Path.Combine(dbPath, "main"); + + var tempIndexDbDirectory = Path.Combine(dbPath, "TempIndex"); + + using var mainLogDevice = Devices.CreateLogDevice(mainDbPath); + using var mainKvSettings = new FasterKVSettings, Memory> { LogDevice = mainLogDevice, PageSize = 1 * 1024 * 1024, SegmentSize = 32 * 1024 * 1024, CheckpointDir = mainDbPath }; + using var mainKv = new FasterKV, Memory>(mainKvSettings); + + var indexDevices = new List(); + var indexSettings = new List, Memory>>(); + var indexKvs = new List, Memory>>(); + + for (var i = 0; i < SecondaryIndexCnt; i++) + { + var indexPath = Path.Combine(tempIndexDbDirectory, $"i_{i}"); + + var indexLogDevice = Devices.CreateLogDevice(indexPath); + var indexKvSettings = new FasterKVSettings, Memory> { LogDevice = indexLogDevice, PageSize = 1 * 1024 * 1024, SegmentSize = 32 * 1024 * 1024, CheckpointDir = indexPath }; + var indexKv = new FasterKV, Memory>(indexKvSettings); + + indexDevices.Add(indexLogDevice); + indexSettings.Add(indexKvSettings); + indexKvs.Add(indexKv); + } + + { + using var mainKvSession = mainKv.For(UserDataFunctions>.Instance).NewSession>>(); + + var indexSessions = new List(); + + foreach (var indexKv in indexKvs) + { + indexSessions.Add(indexKv.For(UserDataFunctions>.Instance).NewSession>>()); + } + + using var sourceLogSettings = new FasterLogSettings(sourceDbPath) { ReadOnlyMode = true }; + using var sourceLog = new FasterLog(sourceLogSettings); + + var recordCounter = 0; + + using var sourceIterator = sourceLog.Scan(sourceLog.BeginAddress, long.MaxValue); + + while (sourceIterator.GetNext(MemoryPool.Shared, out var memoryOwner, out var wholeLength, out var address)) + { + recordCounter++; + + using var mo = memoryOwner; + + var wholeValue = memoryOwner.Memory.Slice(0, wholeLength); + + var primaryKey = address; + + using var primaryKeyBuffer = MemoryPool.Shared.Rent(8); + + BinaryPrimitives.TryWriteInt64BigEndian(primaryKeyBuffer.Memory.Span.Slice(0, 8), primaryKey); + + var primaryKeyContent = primaryKeyBuffer.Memory.Slice(0, 8); + + var secondaryKeys = new List(); + + for (var i = 0; i < SecondaryIndexCnt; i++) + { + secondaryKeys.Add(memoryOwner.Memory.Slice(i * 4, 4).ToArray()); + } + + if (TryRead(mainKvSession, primaryKeyContent, out var mainValue)) + { + using (mainValue.MemoryOwner) + { + } + } + + Write(mainKvSession, primaryKeyContent, wholeValue); + + for (var i = 0; i < SecondaryIndexCnt; i++) + { + var secondaryKeyContent = secondaryKeys[i].AsMemory(); + + if (TryRead(indexSessions[i], secondaryKeyContent, out var readItem)) + { + using (readItem.MemoryOwner) + { + } + } + + Write(indexSessions[i], secondaryKeyContent, primaryKeyContent); + } + + if (recordCounter % 100_000 == 0) + { + Console.WriteLine($"{recordCounter}..."); + } + } + + Console.WriteLine($"{recordCounter} done"); + + foreach (var indexSession in indexSessions) indexSession.Dispose(); + + foreach (var indexKv in indexKvs) + { + await indexKv.TakeHybridLogCheckpointAsync(CheckpointType.FoldOver, true); + } + } + + await mainKv.TakeHybridLogCheckpointAsync(CheckpointType.FoldOver, true); + + foreach (var indexKv in indexKvs) indexKv.Dispose(); + foreach (var indexSetting in indexSettings) indexSetting.Dispose(); + foreach (var indexDevice in indexDevices) indexDevice.Dispose(); + } + + private static bool TryRead(ClientSession, Memory, (IMemoryOwner, int), Empty, UserDataFunctions> session, TKey key, out (IMemoryOwner MemoryOwner, int Length) value) + { + var (status, output) = session.Read(key); + + if (status.Found) + { + value = output; + return true; + } + + if (status.IsPending && session.CompletePendingWithOutputs(out var outputs, true)) + { + using (outputs) + { + if (outputs.Next()) + { + value = outputs.Current.Output; + return true; + } + } + } + + value = default; + return false; + } + + private static void Write(ClientSession, Memory, (IMemoryOwner, int), Empty, UserDataFunctions> session, TKey key, Memory value) + { + var status = session.Upsert(key, value); + + if (status.IsPending) + { + session.CompletePending(true); + } + } +} + +internal sealed class UserDataFunctions : MemoryFunctions +{ + + private UserDataFunctions() + { + } + + public static readonly UserDataFunctions Instance = new(); + +} \ No newline at end of file diff --git a/cs/src/core/VarLen/MemoryVarLenStruct.cs b/cs/src/core/VarLen/MemoryVarLenStruct.cs index 2b411bc3b..0549f8076 100644 --- a/cs/src/core/VarLen/MemoryVarLenStruct.cs +++ b/cs/src/core/VarLen/MemoryVarLenStruct.cs @@ -34,10 +34,10 @@ public unsafe ref Memory AsRef(void* source) { if (refCache == null) { - refCache = new (UnmanagedMemoryManager, Memory)[4]; - for (int i = 0; i < 4; i++) refCache[i] = (new UnmanagedMemoryManager(), default); + refCache = new (UnmanagedMemoryManager, Memory)[8]; + for (int i = 0; i < 8; i++) refCache[i] = (new UnmanagedMemoryManager(), default); } - count = (count + 1) % 4; + count = (count + 1) % 8; ref var cache = ref refCache[count]; var len = *(int*)source; cache.Item1.SetDestination((T*)((byte*)source + sizeof(int)), len / sizeof(T)); diff --git a/cs/src/core/VarLen/ReadOnlyMemoryVarLenStruct.cs b/cs/src/core/VarLen/ReadOnlyMemoryVarLenStruct.cs index 122843a20..172d760fd 100644 --- a/cs/src/core/VarLen/ReadOnlyMemoryVarLenStruct.cs +++ b/cs/src/core/VarLen/ReadOnlyMemoryVarLenStruct.cs @@ -34,10 +34,10 @@ public unsafe ref ReadOnlyMemory AsRef(void* source) { if (manager == null) { - manager = new (UnmanagedMemoryManager, ReadOnlyMemory)[4]; - for (int i = 0; i < 4; i++) manager[i] = (new UnmanagedMemoryManager(), default); + manager = new (UnmanagedMemoryManager, ReadOnlyMemory)[8]; + for (int i = 0; i < 8; i++) manager[i] = (new UnmanagedMemoryManager(), default); } - count = (count + 1) % 4; + count = (count + 1) % 8; ref var cache = ref manager[count]; cache.Item1.SetDestination((T*)((byte*)source + sizeof(int)), (*(int*)source) / sizeof(T)); cache.Item2 = cache.Item1.Memory; diff --git a/cs/test/SingleWriterTests.cs b/cs/test/SingleWriterTests.cs index 4ae865d5b..165081802 100644 --- a/cs/test/SingleWriterTests.cs +++ b/cs/test/SingleWriterTests.cs @@ -42,7 +42,7 @@ class SingleWriterTests public void Setup() { DeleteDirectory(MethodTestDir, wait: true); - log = Devices.CreateLogDevice(Path.Combine(MethodTestDir, "test.log"), deleteOnClose: true); + log = Devices.CreateLogDevice(Path.Combine(MethodTestDir, "test.log"), deleteOnClose: false); functions = new SingleWriterTestFunctions(); LogSettings logSettings = new LogSettings { LogDevice = log, ObjectLogDevice = null, PageSizeBits = 12, MemorySizeBits = 22, ReadCopyOptions = new(ReadCopyFrom.Device, ReadCopyTo.MainLog) }; @@ -59,7 +59,7 @@ public void Setup() } } - fht = new FasterKV(1L << 20, logSettings); + fht = new FasterKV(1L << 20, logSettings, new CheckpointSettings { CheckpointDir = MethodTestDir }); session = fht.For(functions).NewSession(); } @@ -123,4 +123,150 @@ public void SingleWriterReasonsTest([Values] ReadCopyDestination readCopyDestina Assert.AreEqual(expectedReason, functions.actualReason); } } + + [TestFixture] + class StructWithStringTests + { + public struct StructWithString + { + public int intField; + public string stringField; + + public StructWithString(int intValue, string prefix) + { + this.intField = intValue; + this.stringField = prefix + intValue.ToString(); + } + + public override string ToString() => this.stringField; + + public class Comparer : IFasterEqualityComparer + { + public long GetHashCode64(ref StructWithString k) => Utility.GetHashCode(k.intField); + + public bool Equals(ref StructWithString k1, ref StructWithString k2) + => k1.intField == k2.intField && k1.stringField == k2.stringField; + } + + public class Serializer : BinaryObjectSerializer + { + public override void Deserialize(out StructWithString obj) + { + var intField = this.reader.ReadInt32(); + var stringField = this.reader.ReadString(); + obj = new() { intField = intField, stringField = stringField }; + } + + public override void Serialize(ref StructWithString obj) + { + this.writer.Write(obj.intField); + this.writer.Write(obj.stringField); + } + } + } + + internal class StructWithStringTestFunctions : SimpleFunctions + { + } + + const int numRecords = 1_000; + const string keyPrefix = "key_"; + string valuePrefix = "value_"; + + StructWithStringTestFunctions functions; + + private FasterKV fht; + private ClientSession session; + private IDevice log, objlog; + + [SetUp] + public void Setup() + { + // create a string of size 1024 bytes + valuePrefix = new string('a', 1024); + + DeleteDirectory(MethodTestDir, wait: true); + log = Devices.CreateLogDevice(Path.Combine(MethodTestDir, "test.log"), deleteOnClose: false); + objlog = Devices.CreateLogDevice(Path.Combine(MethodTestDir, "test.obj.log"), deleteOnClose: false); + SerializerSettings serializerSettings = new() + { + keySerializer = () => new StructWithString.Serializer(), + valueSerializer = () => new StructWithString.Serializer() + }; + fht = new FasterKV(1L << 20, + new LogSettings { LogDevice = log, ObjectLogDevice = objlog, PageSizeBits = 10, MemorySizeBits = 22, SegmentSizeBits = 16 }, + new CheckpointSettings { CheckpointDir = MethodTestDir }, + serializerSettings: serializerSettings, comparer: new StructWithString.Comparer()); + + functions = new(); + session = fht.For(functions).NewSession(); + } + + [TearDown] + public void TearDown() + { + session?.Dispose(); + session = null; + fht?.Dispose(); + fht = null; + objlog?.Dispose(); + objlog = null; + log?.Dispose(); + log = null; + DeleteDirectory(MethodTestDir); + } + + void Populate() + { + for (int ii = 0; ii < numRecords; ii++) + { + StructWithString key = new(ii, keyPrefix); + StructWithString value = new(ii, valuePrefix); + session.Upsert(ref key, ref value); + if (ii % 3_000 == 0) + { + fht.TakeHybridLogCheckpointAsync(CheckpointType.FoldOver).GetAwaiter().GetResult(); + fht.Recover(); + } + } + } + + [Test] + [Category(FasterKVTestCategory)] + [Category(SmokeTestCategory)] + public void StructWithStringCompactTest([Values] CompactionType compactionType, [Values] bool flush) + { + void readKey(int keyInt) + { + StructWithString key = new(keyInt, keyPrefix); + var (status, output) = session.Read(key); + if (status.IsPending) + { + session.CompletePendingWithOutputs(out var completedOutputs, wait: true); + using (completedOutputs) + (status, output) = GetSinglePendingResult(completedOutputs); + } + Assert.IsTrue(status.Found, status.ToString()); + Assert.AreEqual(key.intField, output.intField); + } + + Populate(); + readKey(12); + if (flush) + { + fht.Log.FlushAndEvict(wait: true); + readKey(24); + } + int count = 0; + using var iter = fht.Log.Scan(0, fht.Log.TailAddress); + while (iter.GetNext(out var _)) + { + count++; + } + Assert.AreEqual(count, numRecords); + + fht.Log.Compact(functions, fht.Log.SafeReadOnlyAddress, compactionType); + readKey(48); + } + } }