Skip to content

Commit

Permalink
[C#] Added NeedCopyUpdate to IFunctions for RMW (#345)
Browse files Browse the repository at this point in the history
* Added NeedCopyUpdate in functions, to avoid allocation if we do not need to CopyUpdate.
* Adding default TryAddFunctions for users to use directly, to perform TryAdd.
* Updated HelloWorld sample to show use of TryAdd session
* Fix async object retrieval when tombstones set on objects
  • Loading branch information
badrishc authored Oct 9, 2020
1 parent 031aece commit f2b3483
Show file tree
Hide file tree
Showing 18 changed files with 259 additions and 2 deletions.
21 changes: 21 additions & 0 deletions cs/samples/HelloWorld/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using FASTER.core;
using System;
using System.Data;
using System.IO;

namespace HelloWorld
Expand Down Expand Up @@ -72,6 +73,26 @@ static void Main()
// End session
session.Dispose();

// (4) Perform TryAdd using RMW
using (var tryAddSession = store.NewSession(new TryAddFunctions<long, long>()))
{
key = 3; input1 = 30; input2 = 31;

// First TryAdd - success; status should be NOTFOUND (does not already exist)
status = tryAddSession.RMW(ref key, ref input1);

// Second TryAdd - failure; status should be OK (already exists)
var status2 = tryAddSession.RMW(ref key, ref input2);

// Read, result should be input1 (first TryAdd)
var status3 = session.Read(ref key, ref output);

if (status == Status.NOTFOUND && status2 == Status.OK && status3 == Status.OK && output == input1)
Console.WriteLine("(4) Success!");
else
Console.WriteLine("(3) Error!");
}

// Dispose store
store.Dispose();

Expand Down
3 changes: 3 additions & 0 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,9 @@ public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst)
return _clientSession.functions.ConcurrentWriter(ref key, ref src, ref dst);
}

public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue)
=> _clientSession.functions.NeedCopyUpdate(ref key, ref input, ref oldValue);

public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue)
{
_clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue);
Expand Down
23 changes: 21 additions & 2 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -703,15 +703,25 @@ internal OperationStatus InternalRMW<Input, Output, Context, FasterSession>(
#region Create new record
CreateNewRecord:
{
if (logicalAddress >= hlog.HeadAddress && !hlog.GetInfo(physicalAddress).Tombstone)
{
if (!fasterSession.NeedCopyUpdate(ref key, ref input, ref hlog.GetValue(physicalAddress)))
{
status = OperationStatus.SUCCESS;
goto LatchRelease;
}
}

recordSize = (logicalAddress < hlog.BeginAddress) ?
hlog.GetInitialRecordSize(ref key, ref input, fasterSession) :
hlog.GetRecordSize(physicalAddress, ref input, fasterSession);
BlockAllocate(recordSize, out long newLogicalAddress, sessionCtx, fasterSession);
var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress);
RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), sessionCtx.version,
true, false, false,
latestLogicalAddress);
true, false, false,
latestLogicalAddress);
hlog.ShallowCopy(ref key, ref hlog.GetKey(newPhysicalAddress));

if (logicalAddress < hlog.BeginAddress)
{
fasterSession.InitialUpdater(ref key, ref input, ref hlog.GetValue(newPhysicalAddress));
Expand Down Expand Up @@ -1378,6 +1388,15 @@ internal OperationStatus InternalContinuePendingRMW<Input, Output, Context, Fast
}

#region Create record in mutable region

if ((request.logicalAddress >= hlog.BeginAddress) && !hlog.GetInfoFromBytePointer(request.record.GetValidPointer()).Tombstone)
{
if (!fasterSession.NeedCopyUpdate(ref key, ref pendingContext.input, ref hlog.GetContextRecordValue(ref request)))
{
return OperationStatus.SUCCESS;
}
}

if ((request.logicalAddress < hlog.BeginAddress) || (hlog.GetInfoFromBytePointer(request.record.GetValidPointer()).Tombstone))
{
recordSize = hlog.GetInitialRecordSize(ref key, ref pendingContext.input, fasterSession);
Expand Down
3 changes: 3 additions & 0 deletions cs/src/core/Index/FASTER/FASTERLegacy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,9 @@ public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst)
return _fasterKV._functions.ConcurrentWriter(ref key, ref src, ref dst);
}

public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue)
=> _fasterKV._functions.NeedCopyUpdate(ref key, ref input, ref oldValue);

public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue)
{
_fasterKV._functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue);
Expand Down
2 changes: 2 additions & 0 deletions cs/src/core/Index/FASTER/LogCompactionFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public LogVariableCompactFunctions(VariableLengthBlittableAllocator<Key, Value>
public void CheckpointCompletionCallback(string sessionId, CommitPoint commitPoint) { }
public void ConcurrentReader(ref Key key, ref Empty input, ref Value value, ref Empty dst) { }
public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst) { return _functions.CopyInPlace(ref src, ref dst, _allocator.ValueLength); }
public bool NeedCopyUpdate(ref Key key, ref Empty input, ref Value oldValue) => true;
public void CopyUpdater(ref Key key, ref Empty input, ref Value oldValue, ref Value newValue) { }
public void InitialUpdater(ref Key key, ref Empty input, ref Value value) { }
public bool InPlaceUpdater(ref Key key, ref Empty input, ref Value value) => false;
Expand All @@ -47,6 +48,7 @@ public LogCompactFunctions(CompactionFunctions functions)
public void CheckpointCompletionCallback(string sessionId, CommitPoint commitPoint) { }
public void ConcurrentReader(ref Key key, ref Empty input, ref Value value, ref Empty dst) { }
public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst) { return _functions.CopyInPlace(ref src, ref dst, null); }
public bool NeedCopyUpdate(ref Key key, ref Empty input, ref Value oldValue) => true;
public void CopyUpdater(ref Key key, ref Empty input, ref Value oldValue, ref Value newValue) { }
public void InitialUpdater(ref Key key, ref Empty input, ref Value value) { }
public bool InPlaceUpdater(ref Key key, ref Empty input, ref Value value) { return true; }
Expand Down
1 change: 1 addition & 0 deletions cs/src/core/Index/Interfaces/FunctionsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public virtual void SingleReader(ref Key key, ref Input input, ref Value value,
public virtual void SingleWriter(ref Key key, ref Value src, ref Value dst) => dst = src;

public virtual void InitialUpdater(ref Key key, ref Input input, ref Value value) { }
public virtual bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue) => true;
public virtual void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue) { }
public virtual bool InPlaceUpdater(ref Key key, ref Input input, ref Value value) { return true; }

Expand Down
12 changes: 12 additions & 0 deletions cs/src/core/Index/Interfaces/IFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,18 @@ public interface IFunctions<Key, Value, Input, Output, Context>
/// <param name="value"></param>
void InitialUpdater(ref Key key, ref Input input, ref Value value);

/// <summary>
/// Whether we need to invoke copy-update for RMW
/// </summary>
/// <param name="key"></param>
/// <param name="input"></param>
/// <param name="oldValue"></param>
bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue)
#if NETSTANDARD21
=> true
#endif
;

/// <summary>
/// Copy-update for RMW
/// </summary>
Expand Down
30 changes: 30 additions & 0 deletions cs/src/core/Index/Interfaces/TryAddFunctions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

namespace FASTER.core
{
/// <summary>
/// Functions that make RMW behave as an atomic TryAdd operation, where Input is the value being added.
/// Return Status.NOTFOUND => TryAdd succeededed (item added).
/// Return Status.OK => TryAdd failed (item not added, key was already present).
/// </summary>
/// <typeparam name="Key"></typeparam>
/// <typeparam name="Value"></typeparam>
/// <typeparam name="Context"></typeparam>
public class TryAddFunctions<Key, Value, Context> : SimpleFunctions<Key, Value, Context>
{
/// <inheritdoc />
public override bool InPlaceUpdater(ref Key key, ref Value input, ref Value value) => true;
/// <inheritdoc />
public override bool NeedCopyUpdate(ref Key key, ref Value input, ref Value oldValue) => false;
}

/// <summary>
/// Functions that make RMW behave as an atomic TryAdd operation, where Input is the value being added.
/// Return Status.NOTFOUND => TryAdd succeededed (item added)
/// Return Status.OK => TryAdd failed (item not added, key was already present)
/// </summary>
/// <typeparam name="Key"></typeparam>
/// <typeparam name="Value"></typeparam>
public class TryAddFunctions<Key, Value> : TryAddFunctions<Key, Value, Empty> { }
}
2 changes: 2 additions & 0 deletions cs/test/AsyncTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ public bool InPlaceUpdater(ref AdId key, ref AdInput input, ref NumClicks value)
return true;
}

public bool NeedCopyUpdate(ref AdId key, ref AdInput input, ref NumClicks oldValue) => true;

public void CopyUpdater(ref AdId key, ref AdInput input, ref NumClicks oldValue, ref NumClicks newValue)
{
newValue.numClicks += oldValue.numClicks + input.numClicks.numClicks;
Expand Down
137 changes: 137 additions & 0 deletions cs/test/NeedCopyUpdateTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Linq;
using FASTER.core;
using System.IO;
using NUnit.Framework;

namespace FASTER.test
{

[TestFixture]
internal class NeedCopyUpdateTests
{
private FasterKV<int, RMWValue> fht;
private IDevice log, objlog;

[SetUp]
public void Setup()
{
log = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "\\NeedCopyUpdateTests.log", deleteOnClose: true);
objlog = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "\\NeedCopyUpdateTests.obj.log", deleteOnClose: true);

fht = new FasterKV<int, RMWValue>
(128,
logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, MemorySizeBits = 15, PageSizeBits = 10 },
checkpointSettings: new CheckpointSettings { CheckPointType = CheckpointType.FoldOver },
serializerSettings: new SerializerSettings<int, RMWValue> { valueSerializer = () => new RMWValueSerializer() }
);
}

[TearDown]
public void TearDown()
{
fht.Dispose();
fht = null;
log.Dispose();
objlog.Dispose();
}


[Test]
public void TryAddTest()
{
using var session = fht.For(new TryAddTestFunctions()).NewSession<TryAddTestFunctions>();

Status status;
var key = 1;
var value1 = new RMWValue { value = 1 };
var value2 = new RMWValue { value = 2 };

status = session.RMW(ref key, ref value1); // InitialUpdater + NOTFOUND
Assert.IsTrue(status == Status.NOTFOUND);
Assert.IsTrue(value1.flag); // InitialUpdater is called

status = session.RMW(ref key, ref value2); // InPlaceUpdater + OK
Assert.IsTrue(status == Status.OK);

fht.Log.Flush(true);
status = session.RMW(ref key, ref value2); // NeedCopyUpdate + OK
Assert.IsTrue(status == Status.OK);

fht.Log.FlushAndEvict(true);
status = session.RMW(ref key, ref value2, Status.OK, 0); // PENDING + NeedCopyUpdate + OK
Assert.IsTrue(status == Status.PENDING);
session.CompletePending(true);

// Test stored value. Should be value1
var output = new RMWValue();
status = session.Read(ref key, ref value1, ref output, Status.OK, 0);
Assert.IsTrue(status == Status.PENDING);
session.CompletePending(true);

status = session.Delete(ref key);
Assert.IsTrue(status == Status.OK);
session.CompletePending(true);
fht.Log.FlushAndEvict(true);
status = session.RMW(ref key, ref value2, Status.NOTFOUND, 0); // PENDING + InitialUpdater + NOTFOUND
Assert.IsTrue(status == Status.PENDING);
session.CompletePending(true);
}
}

internal class RMWValue
{
public int value;
public bool flag;
}

internal class RMWValueSerializer : BinaryObjectSerializer<RMWValue>
{
public override void Serialize(ref RMWValue value)
{
writer.Write(value.value);
}

public override void Deserialize(out RMWValue value)
{
value = new RMWValue
{
value = reader.ReadInt32()
};
}
}

internal class TryAddTestFunctions : TryAddFunctions<int, RMWValue, Status>
{
public override void InitialUpdater(ref int key, ref RMWValue input, ref RMWValue value)
{
input.flag = true;
base.InitialUpdater(ref key, ref input, ref value);
}

public override void CopyUpdater(ref int key, ref RMWValue input, ref RMWValue oldValue, ref RMWValue newValue)
{
Assert.Fail("CopyUpdater");
}

public override void RMWCompletionCallback(ref int key, ref RMWValue input, Status ctx, Status status)
{
Assert.IsTrue(status == ctx);

if (status == Status.NOTFOUND)
Assert.IsTrue(input.flag); // InitialUpdater is called.
}

public override void ReadCompletionCallback(ref int key, ref RMWValue input, ref RMWValue output, Status ctx, Status status)
{
Assert.IsTrue(input.value == output.value);
}
}
}
1 change: 1 addition & 0 deletions cs/test/ObjectRecoveryTest2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ internal void FinalizeRead(ref Status status, ref MyOutput g1)
public class MyFunctions : IFunctions<MyKey, MyValue, MyInput, MyOutput, MyContext>
{
public void InitialUpdater(ref MyKey key, ref MyInput input, ref MyValue value) => value.value = input.value;
public bool NeedCopyUpdate(ref MyKey key, ref MyInput input, ref MyValue oldValue) => true;
public void CopyUpdater(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyValue newValue) => newValue = oldValue;
public bool InPlaceUpdater(ref MyKey key, ref MyInput input, ref MyValue value)
{
Expand Down
2 changes: 2 additions & 0 deletions cs/test/ObjectRecoveryTestTypes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ public bool InPlaceUpdater(ref AdId key, ref Input input, ref NumClicks value)
return true;
}

public bool NeedCopyUpdate(ref AdId key, ref Input input, ref NumClicks oldValue) => true;

public void CopyUpdater(ref AdId key, ref Input input, ref NumClicks oldValue, ref NumClicks newValue)
{
newValue = new NumClicks { numClicks = oldValue.numClicks + input.numClicks.numClicks };
Expand Down
8 changes: 8 additions & 0 deletions cs/test/ObjectTestTypes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public bool InPlaceUpdater(ref MyKey key, ref MyInput input, ref MyValue value)
return true;
}

public bool NeedCopyUpdate(ref MyKey key, ref MyInput input, ref MyValue oldValue) => true;

public void CopyUpdater(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyValue newValue)
{
newValue = new MyValue { value = oldValue.value + input.value };
Expand Down Expand Up @@ -156,6 +158,8 @@ public bool InPlaceUpdater(ref MyKey key, ref MyInput input, ref MyValue value)
return true;
}

public bool NeedCopyUpdate(ref MyKey key, ref MyInput input, ref MyValue oldValue) => true;

public void CopyUpdater(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyValue newValue)
{
newValue = new MyValue { value = oldValue.value + input.value };
Expand Down Expand Up @@ -235,6 +239,8 @@ public bool InPlaceUpdater(ref int key, ref MyInput input, ref MyValue value)
return true;
}

public bool NeedCopyUpdate(ref int key, ref MyInput input, ref MyValue oldValue) => true;

public void CopyUpdater(ref int key, ref MyInput input, ref MyValue oldValue, ref MyValue newValue)
{
newValue = new MyValue { value = oldValue.value + input.value };
Expand Down Expand Up @@ -346,6 +352,8 @@ public void DeleteCompletionCallback(ref MyKey key, Empty ctx)
{
}

public bool NeedCopyUpdate(ref MyKey key, ref MyInput input, ref MyLargeValue oldValue) => true;

public void CopyUpdater(ref MyKey key, ref MyInput input, ref MyLargeValue oldValue, ref MyLargeValue newValue)
{
}
Expand Down
2 changes: 2 additions & 0 deletions cs/test/RecoverContinueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ public bool InPlaceUpdater(ref AdId key, ref AdInput input, ref NumClicks value)
return true;
}

public bool NeedCopyUpdate(ref AdId key, ref AdInput input, ref NumClicks oldValue) => true;

public void CopyUpdater(ref AdId key, ref AdInput input, ref NumClicks oldValue, ref NumClicks newValue)
{
newValue.numClicks += oldValue.numClicks + input.numClicks.numClicks;
Expand Down
2 changes: 2 additions & 0 deletions cs/test/RecoveryTestTypes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public bool InPlaceUpdater(ref AdId key, ref AdInput input, ref NumClicks value)
return true;
}

public bool NeedCopyUpdate(ref AdId key, ref AdInput input, ref NumClicks oldValue) => true;

public void CopyUpdater(ref AdId key, ref AdInput input, ref NumClicks oldValue, ref NumClicks newValue)
{
newValue.numClicks += oldValue.numClicks + input.numClicks.numClicks;
Expand Down
Loading

0 comments on commit f2b3483

Please sign in to comment.