Skip to content

Commit

Permalink
RNET-1083: Add support for progress estimate on progress notifications (
Browse files Browse the repository at this point in the history
#3479)

* Added stub for progress notifications

* Added test for flexible sync

* Using correct core branch

* Fixed test

* Small fix

* Small fixes

* Corrected changelog

* Removed extra space

* Removed obsolete

* Last fix

* Fixed changelog

* Various fixes

* Small fix

* Fix for E2E tests

* Fixes

* Updated to latest commit

* Removed ignored

* Fixed test [skip-ci]

* Various fixes

* Added ignore

* Removed unused

* Removed unused test

* Fixes

* Corrected documentation and fixed linting
  • Loading branch information
papafe authored Apr 16, 2024
1 parent 772738a commit cc537e6
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 82 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Breaking Changes
* Added automatic serialization and deserialization of Realm classes when using methods on `MongoClient.Collection`, without the need to annotate classes with `MongoDB.Bson`attributes. This feature required to change the default serialization for various types (including `DateTimeOffset`). If you prefer to use the previous serialization, you need to call `Realm.SetLegacySerialization` before any kind of serialization is done, otherwise it may not work as epxected. [#3459](https://github.com/realm/realm-dotnet/pull/3459)
* `SyncProgress.TransferredBytes` and `SyncProgress.TransferableBytes` have been removed in favour of `SyncProgress.ProgressEstimate`, a double value between 0.0 and 1.0 that expresses the percentage estimate of the current progress. (Issue [#3478](https://github.com/realm/realm-dotnet/issues/3478]))
* Support for upgrading from Realm files produced by RealmCore v5.23.9 (Realm .NET v5.0.1) or earlier is no longer supported. (Core 14.0.0)
* `String` and `byte[]` are now strongly typed for comparisons and queries. This change is especially relevant when querying for a string constant on a `RealmValue` property, as now only strings will be returned. If searching for binary data is desired, then that type must be specified by the constant. In RQL (`.Filter()`) the new way to specify a binary constant is to use `RealmValueProp = bin('xyz')` or `RealmValueProp = binary('xyz')`. (Core 14.0.0)
* Sorting order of strings has changed to use standard unicode codepoint order instead of grouping similar english letters together. A noticeable change will be from "aAbBzZ" to "ABZabz". (Core 14.0.0)
Expand Down Expand Up @@ -114,6 +115,7 @@
* Automatic client reset recovery now does a better job of recovering changes when changesets were downloaded from the server after the unuploaded local changes were committed. If the local Realm happened to be fully up to date with the server prior to the client reset, automatic recovery should now always produce exactly the same state as if no client reset was involved. (Core 13.24.1)
* Exceptions thrown during bootstrap application will now be surfaced to the user rather than terminating the program with an unhandled exception. (Core 13.25.0)
* Allow the using `>`, `>=`, `<`, `<=` operators in `Realm.Filter()` queries for string constants. This is a case sensitive lexicographical comparison. Improved performance of RQL (`.Filter()`) queries on a non-linked string property using: >, >=, <, <=, operators and fixed behaviour that a null string should be evaluated as less than everything, previously nulls were not matched. (Core 13.26.0-14-gdf25f)
* `Session.GetProgressObservable` can now be used with Flexible Sync. (Issue [#3478](https://github.com/realm/realm-dotnet/issues/3478]))

### Fixed
* Automatic client reset recovery would duplicate insertions in a list when recovering a write which made an unrecoverable change to a list (i.e. modifying or deleting a pre-existing entry), followed by a subscription change, followed by a write which added an entry to the list. (Core 13.24.0)
Expand Down
9 changes: 6 additions & 3 deletions Realm/Realm/Handles/SessionHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public delegate void SessionErrorCallback(IntPtr session_handle_ptr,
IntPtr managed_sync_config_handle);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void SessionProgressCallback(IntPtr progress_token_ptr, ulong transferred_bytes, ulong transferable_bytes);
public delegate void SessionProgressCallback(IntPtr progress_token_ptr, ulong transferred_bytes, ulong transferable_bytes, double progressEstimate);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void SessionWaitCallback(IntPtr task_completion_source, int error_code, PrimitiveValue message);
Expand Down Expand Up @@ -405,10 +405,13 @@ private static IntPtr NotifyAfterClientReset(IntPtr beforeFrozen, IntPtr after,
}

[MonoPInvokeCallback(typeof(NativeMethods.SessionProgressCallback))]
private static void HandleSessionProgress(IntPtr tokenPtr, ulong transferredBytes, ulong transferableBytes)
private static void HandleSessionProgress(IntPtr tokenPtr, ulong transferredBytes, ulong transferableBytes, double progressEstimate)
{
var token = (ProgressNotificationToken?)GCHandle.FromIntPtr(tokenPtr).Target;
token?.Notify(transferredBytes, transferableBytes);

// This is used to provide a reasonable progress estimate until the core work is done
double managedProgressEstimate = transferableBytes > 0.0 ? transferredBytes / transferableBytes : 1.0;
token?.Notify(managedProgressEstimate);
}

[MonoPInvokeCallback(typeof(NativeMethods.SessionWaitCallback))]
Expand Down
11 changes: 5 additions & 6 deletions Realm/Realm/Sync/ProgressNotifications/ProgressMode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@ public enum ProgressMode
{
/// <summary>
/// The callback will be called forever, or until it is unregistered by disposing the subscription token.
/// Notifications will always report the latest number of transferred bytes, and the most up-to-date number of
/// total transferable bytes.
/// </summary>
ReportIndefinitely,

/// <summary>
/// The callback will, upon registration, store the total number of bytes to be transferred. When invoked, it will
/// always report the most up-to-date number of transferable bytes out of that original number of transferable bytes.
/// When the number of transferred bytes reaches or exceeds the number of transferable bytes, the callback will
/// be unregistered.
/// The callback will be active:
/// <list type="bullet">
/// <item><description>For uploads, until all the unsynced data available at the moment of the registration of the callback is sent to the server.</description></item>
/// <item><description>For downloads, until the client catches up to the current data (available at the moment of callback registration) or the next batch of data sent from the server.</description></item>
/// </list>
/// </summary>
ForCurrentlyOutstandingWork
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ public ProgressNotificationToken(Action<SyncProgress> observer, Func<GCHandle, u
}
}

public void Notify(ulong transferredBytes, ulong transferableBytes)
public void Notify(double progressEstimate)
{
Task.Run(() =>
{
try
{
_observer(new SyncProgress(transferredBytes, transferableBytes));
_observer(new SyncProgress(progressEstimate));
}
catch (Exception ex)
{
Expand Down
24 changes: 7 additions & 17 deletions Realm/Realm/Sync/ProgressNotifications/SyncProgress.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,19 @@ namespace Realms.Sync
/// <summary>
/// A struct containing information about the progress state at a given instant.
/// </summary>
public struct SyncProgress
public readonly struct SyncProgress
{
/// <summary>
/// Gets the number of bytes that have been transferred since subscribing for progress notifications.
/// Gets the percentage estimate of the current progress, expressed as a double between 0.0 and 1.0.
/// </summary>
/// <value>The number of transferred bytes.</value>
public ulong TransferredBytes { get; }
/// <value>A percentage estimate of the progress.</value>
public double ProgressEstimate { get; }

/// <summary>
/// Gets the total number of bytes that have to be transferred since subscribing for progress notifications.
/// The difference between that number and <see cref="TransferredBytes"/> gives you the number of bytes not yet
/// transferred. If the difference is 0, then all changes at the instant the callback fires have been
/// successfully transferred.
/// </summary>
/// <value>The number of transferable bytes.</value>
public ulong TransferableBytes { get; }

internal SyncProgress(ulong transferred, ulong transferable)
internal SyncProgress(double progressEstimate)
{
TransferredBytes = transferred;
TransferableBytes = transferable;
ProgressEstimate = progressEstimate;
}

internal bool IsComplete => TransferableBytes == TransferredBytes;
internal bool IsComplete => ProgressEstimate >= 1.0;
}
}
3 changes: 1 addition & 2 deletions Realm/Realm/Sync/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ public event PropertyChangedEventHandler? PropertyChanged
/// var observable = session.GetProgressObservable(ProgressDirection.Upload, ProgressMode.ReportIndefinitely);
/// notificationToken = observable.Subscribe(progress =>
/// {
/// // Update relevant properties by accessing
/// // progress.TransferredBytes and progress.TransferableBytes
/// // Update relevant properties by accessing progress.ProgressEstimate
/// });
/// }
///
Expand Down
95 changes: 56 additions & 39 deletions Tests/Realm.Tests/Sync/SessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,25 @@ namespace Realms.Tests.Sync
[TestFixture, Preserve(AllMembers = true)]
public class SessionTests : SyncTestBase
{
public static readonly string[] AppTypes = new[]
{
AppConfigType.Default,
AppConfigType.FlexibleSync
};

public static readonly object[] AllClientResetHandlers = new object[]
{
typeof(DiscardUnsyncedChangesHandler),
typeof(RecoverUnsyncedChangesHandler),
typeof(RecoverOrDiscardUnsyncedChangesHandler),
};

public static readonly ProgressMode[] ProgressModeTypes = new ProgressMode[]
{
ProgressMode.ForCurrentlyOutstandingWork,
ProgressMode.ReportIndefinitely,
};

[Preserve]
static SessionTests()
{
Expand All @@ -79,12 +91,6 @@ static SessionTests()
};
}

public static readonly string[] AppTypes = new[]
{
AppConfigType.Default,
AppConfigType.FlexibleSync
};

[Test]
public void Realm_SyncSession_WhenSyncedRealm()
{
Expand Down Expand Up @@ -750,18 +756,34 @@ public void Session_OnSessionError()
});
}

[TestCase(ProgressMode.ForCurrentlyOutstandingWork)]
[TestCase(ProgressMode.ReportIndefinitely)]
public void SessionIntegrationTest_ProgressObservable(ProgressMode mode)
// This test needs to be revisited when the work on progress notification is finished.
[Test]
public void SessionIntegrationTest_ProgressObservable(
[ValueSource(nameof(AppTypes))] string appType,
[ValueSource(nameof(ProgressModeTypes))] ProgressMode mode)
{
const int objectSize = 1_000_000;
const int objectsToRecord = 2;

SyncTestHelpers.RunBaasTestAsync(async () =>
{
var config = await GetIntegrationConfigAsync(Guid.NewGuid().ToString());
using var realm = GetRealm(config);
Realm realm;
if (appType == AppConfigType.Default)
{
var config = await GetIntegrationConfigAsync(Guid.NewGuid().ToString());
realm = GetRealm(config);
}
else
{
var config = await GetFLXIntegrationConfigAsync();
config.PopulateInitialSubscriptions = (r) =>
{
r.Subscriptions.Add(r.All<HugeSyncObject>());
};
realm = await GetRealmAsync(config);
}

var completionTcs = new TaskCompletionSource<ulong>();
var completionTcs = new TaskCompletionSource();
var callbacksInvoked = 0;

var session = GetSession(realm);
Expand All @@ -776,59 +798,54 @@ public void SessionIntegrationTest_ProgressObservable(ProgressMode mode)
});
}

var lastReportedProgress = 0.0d;

var progressList = new List<SyncProgress>();

using var token = observable.Subscribe(p =>
{
try
{
callbacksInvoked++;

if (p.TransferredBytes > p.TransferableBytes)
progressList.Add(p);

if (p.ProgressEstimate < 0.0 || p.ProgressEstimate > 1.0)
{
// TODO https://github.com/realm/realm-dotnet/issues/2360: this seems to be a regression in Sync.
// throw new Exception($"Expected: {p.TransferredBytes} <= {p.TransferableBytes}");
throw new Exception($"Expected progress estimate to be between 0.0 and 1.0, but was {p.ProgressEstimate}");
}

if (mode == ProgressMode.ForCurrentlyOutstandingWork)
if (p.ProgressEstimate < lastReportedProgress)
{
if (p.TransferableBytes <= objectSize ||
p.TransferableBytes >= (objectsToRecord + 2) * objectSize)
throw new Exception($"Expected progress estimate is expected to be monotonically increasing, but it wasn't.");
}

if (p.IsComplete)
{
if (p.ProgressEstimate != 1.0)
{
throw new Exception($"Expected: {p.TransferableBytes} to be in the ({objectSize}, {(objectsToRecord + 1) * objectSize}) range.");
throw new Exception($"Expected progress estimate to be complete if and only if ProgressEstimate == 1.0");
}

completionTcs.TrySetResult();
}

lastReportedProgress = p.ProgressEstimate;
}
catch (Exception e)
{
completionTcs.TrySetException(e);
}

if (p.TransferredBytes >= p.TransferableBytes)
{
completionTcs.TrySetResult(p.TransferredBytes);
}
});

realm.Write(() =>
{
realm.Add(new HugeSyncObject(objectSize));
});

var totalTransferred = await completionTcs.Task;

if (mode == ProgressMode.ForCurrentlyOutstandingWork)
{
Assert.That(totalTransferred, Is.GreaterThanOrEqualTo(objectSize));

// We add ObjectsToRecord + 1 items, but the last item is added after subscribing
// so in the fixed mode, we should not get updates for it.
Assert.That(totalTransferred, Is.LessThan((objectsToRecord + 5) * objectSize));
}
else
{
Assert.That(totalTransferred, Is.GreaterThanOrEqualTo((objectsToRecord + 1) * objectSize));
}
await completionTcs.Task;

Assert.That(callbacksInvoked, Is.GreaterThan(1));
Assert.That(callbacksInvoked, Is.GreaterThanOrEqualTo(1));
}, timeout: 120_000);
}

Expand Down
12 changes: 2 additions & 10 deletions Tests/Realm.Tests/Sync/SynchronizedInstanceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,7 @@ public void GetInstanceAsync_ReportsProgress()
using var realm = await GetRealmAsync(config);
Assert.That(realm.All<HugeSyncObject>().Count(), Is.EqualTo(NumberOfObjects));
Assert.That(callbacksInvoked, Is.GreaterThan(0));

// We can't validate exact values because there's a reasonable chance that
// the last notification won't be invoked if the Realm is downloaded first.
Assert.That(lastProgress.TransferredBytes, Is.GreaterThan(OneMegabyte));
Assert.That(lastProgress.TransferableBytes, Is.GreaterThan(OneMegabyte));
Assert.That(lastProgress.ProgressEstimate, Is.GreaterThan(0.0));
}, 60000);
}

Expand Down Expand Up @@ -163,11 +159,7 @@ public void GetInstanceAsync_WithOnProgress_DoesntThrowWhenOnProgressIsSetToNull

Assert.That(realm.All<HugeSyncObject>().Count(), Is.EqualTo(NumberOfObjects));
Assert.That(callbacksInvoked, Is.GreaterThan(0));

// We can't validate exact values because there's a reasonable chance that
// the last notification won't be invoked if the Realm is downloaded first.
Assert.That(lastProgress.TransferredBytes, Is.GreaterThan(OneMegabyte));
Assert.That(lastProgress.TransferableBytes, Is.GreaterThan(OneMegabyte));
Assert.That(lastProgress.ProgressEstimate, Is.GreaterThan(0.0));
}, 60000);
}

Expand Down
2 changes: 1 addition & 1 deletion wrappers/src/async_open_task_cs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ REALM_EXPORT uint64_t realm_asyncopentask_register_progress_notifier(const Share
{
return handle_errors(ex, [&] {
return task->register_download_progress_notifier([managed_state](uint64_t transferred, uint64_t transferable, double progress_estimate) {
s_progress_callback(managed_state, transferred, transferable);
s_progress_callback(managed_state, transferred, transferable, progress_estimate);
});
});
}
Expand Down
2 changes: 1 addition & 1 deletion wrappers/src/sync_session_cs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ REALM_EXPORT uint64_t realm_syncsession_register_progress_notifier(const SharedS
: SyncSession::ProgressDirection::download;

return session->register_progress_notifier([managed_state](uint64_t transferred, uint64_t transferable, double progress_estimate) {
s_progress_callback(managed_state, transferred, transferable);
s_progress_callback(managed_state, transferred, transferable, progress_estimate);
}, notifier_direction, is_streaming);
});
}
Expand Down
2 changes: 1 addition & 1 deletion wrappers/src/sync_session_cs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace realm::binding {

using SharedSyncSession = std::shared_ptr<SyncSession>;
using SessionErrorCallbackT = void(SharedSyncSession* session, realm_sync_error error, void* managed_sync_config);
using ProgressCallbackT = void(void* state, uint64_t transferred_bytes, uint64_t transferrable_bytes);
using ProgressCallbackT = void(void* state, uint64_t transferred_bytes, uint64_t transferrable_bytes, double progress_estimate);
using NotifyBeforeClientResetCallbackT = void*(SharedRealm& before_frozen, void* managed_sync_config);
using NotifyAfterClientResetCallbackT = void*(SharedRealm& before_frozen, SharedRealm& after, void* managed_sync_config, bool did_recover);

Expand Down

0 comments on commit cc537e6

Please sign in to comment.