From 0dcdbfb53ce08a6a48d03043b7db9ef4b4990244 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Claus=20R=C3=B8rbech?= <claus.rorbech@mongodb.com>
Date: Thu, 9 Nov 2023 10:20:01 +0100
Subject: [PATCH] Guard notifying outdated scheduler (#1559)

---
 CHANGELOG.md                                  | 29 ++++++++++++++++++
 dependencies.list                             |  6 ++--
 .../kotlin/internal/interop/RealmInterop.kt   | 14 ++++++++-
 .../kotlin/internal/interop/RealmInterop.kt   | 27 +++++++++++++----
 packages/external/core                        |  2 +-
 .../src/main/jni/realm_api_helpers.cpp        | 30 +++++++++++--------
 .../kotlin/io/realm/kotlin/test/util/Utils.kt | 14 ++++++---
 .../test/common/VersionTrackingTests.kt       |  4 ++-
 .../realm/kotlin/test/darwin/MemoryTests.kt   | 14 +++++----
 .../test/mongodb/common/SyncedRealmTests.kt   |  8 +++--
 10 files changed, 112 insertions(+), 36 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 372727e2bd..f8a2bb01ab 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,32 @@
+## 1.12.1-SNAPSHOT (YYYY-MM-DD)
+
+### Breaking Changes
+* None.
+
+### Enhancements
+* None.
+
+### Fixed
+* Fix craches caused by posting to a released scheduler. (Issue [#1543](https://github.com/realm/realm-kotlin/issues/1543))
+
+### Compatibility
+* File format: Generates Realms with file format v23.
+* Realm Studio 13.0.0 or above is required to open Realms created by this version.
+* This release is compatible with the following Kotlin releases:
+  * Kotlin 1.8.0 and above. The K2 compiler is not supported yet.
+  * Ktor 2.1.2 and above.
+  * Coroutines 1.7.0 and above.
+  * AtomicFu 0.18.3 and above.
+  * The new memory model only. See https://github.com/realm/realm-kotlin#kotlin-memory-model-and-coroutine-compatibility
+* Minimum Kbson 0.3.0.
+* Minimum Gradle version: 6.8.3.
+* Minimum Android Gradle Plugin version: 4.1.3.
+* Minimum Android SDK: 16.
+
+### Internal
+* Updated to Realm Core 13.23.3, commit 7556b535aa7b27d49c13444894f7e9db778b3203.
+
+
 ## 1.12.0 (2023-11-02)
 
 This release upgrades the Sync metadata in a way that is not compatible with older versions. To downgrade a Sync app from this version, you'll need to manually delete the metadata folder located at `$[SYNC-ROOT-DIRECTORY]/mongodb-realm/[APP-ID]/server-utility/metadata/`. This will log out all users.
diff --git a/dependencies.list b/dependencies.list
index abe16e6f95..15db1c3303 100644
--- a/dependencies.list
+++ b/dependencies.list
@@ -1,11 +1,11 @@
 # Version of MongoDB Realm used by integration tests
 # See https://github.com/realm/ci/packages/147854 for available versions
-MONGODB_REALM_SERVER=2023-10-10
+MONGODB_REALM_SERVER=2023-11-07
 
 # `BAAS` and `BAAS-UI` projects commit hashes matching MONGODB_REALM_SERVER image version
 # note that the MONGODB_REALM_SERVER image is a nightly build, find the matching commits
 # for that date within the following repositories:
 # https://github.com/10gen/baas/
 # https://github.com/10gen/baas-ui/
-REALM_BAAS_GIT_HASH=8246fc548763eb908b8090df864e9924e3330a0d
-REALM_BAAS_UI_GIT_HASH=8a1843be2bf24f2faa705c5470a5bdd8d954f7ea
+REALM_BAAS_GIT_HASH=41fa6cdbca47826c20a64f756e21b2c184393e90
+REALM_BAAS_UI_GIT_HASH=b97a27ac858e0e8126aeb63f6ff9734d11029a91
diff --git a/packages/cinterop/src/jvm/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt b/packages/cinterop/src/jvm/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt
index 053bc8dc1c..754b1dba70 100644
--- a/packages/cinterop/src/jvm/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt
+++ b/packages/cinterop/src/jvm/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt
@@ -2138,10 +2138,22 @@ fun ObjectId.asRealmObjectIdT(): realm_object_id_t {
 
 private class JVMScheduler(dispatcher: CoroutineDispatcher) {
     val scope: CoroutineScope = CoroutineScope(dispatcher)
+    val lock = SynchronizableObject()
+    var cancelled = false
 
     fun notifyCore(schedulerPointer: Long) {
         scope.launch {
-            realmc.invoke_core_notify_callback(schedulerPointer)
+            lock.withLock {
+                if (!cancelled) {
+                    realmc.invoke_core_notify_callback(schedulerPointer)
+                }
+            }
+        }
+    }
+
+    fun cancel() {
+        lock.withLock {
+            cancelled = true
         }
     }
 }
diff --git a/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt b/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt
index 2adeb9b0ee..4ac37661cb 100644
--- a/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt
+++ b/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt
@@ -124,6 +124,7 @@ import realm_wrapper.realm_user_t
 import realm_wrapper.realm_value_t
 import realm_wrapper.realm_value_type
 import realm_wrapper.realm_version_id_t
+import realm_wrapper.realm_work_queue_t
 import kotlin.collections.set
 import kotlin.native.internal.createCleaner
 
@@ -560,17 +561,19 @@ actual object RealmInterop {
                 // free: realm_wrapper.realm_free_userdata_func_t? /* = kotlinx.cinterop.CPointer<kotlinx.cinterop.CFunction<(kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */) -> kotlin.Unit>>? */,
                 staticCFunction<COpaquePointer?, Unit> { userdata ->
                     printlntid("free")
-                    userdata?.asStableRef<SingleThreadDispatcherScheduler>()?.dispose()
+                    val stableSchedulerRef: StableRef<SingleThreadDispatcherScheduler>? = userdata?.asStableRef<SingleThreadDispatcherScheduler>()
+                    stableSchedulerRef?.get()?.cancel()
+                    stableSchedulerRef?.dispose()
                 },
 
                 // notify: realm_wrapper.realm_scheduler_notify_func_t? /* = kotlinx.cinterop.CPointer<kotlinx.cinterop.CFunction<(kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */) -> kotlin.Unit>>? */,
-                staticCFunction<COpaquePointer?, Unit> { userdata ->
+                staticCFunction<COpaquePointer?, CPointer<realm_work_queue_t>?, Unit> { userdata, work_queue ->
                     // Must be thread safe
                     val scheduler =
                         userdata!!.asStableRef<SingleThreadDispatcherScheduler>().get()
                     printlntid("$scheduler notify")
                     try {
-                        scheduler.notify()
+                        scheduler.notify(work_queue)
                     } catch (e: Exception) {
                         // Should never happen, but is included for development to get some indicators
                         // on errors instead of silent crashes.
@@ -3392,7 +3395,7 @@ actual object RealmInterop {
     }
 
     interface Scheduler {
-        fun notify()
+        fun notify(work_queue: CPointer<realm_work_queue_t>?)
     }
 
     class SingleThreadDispatcherScheduler(
@@ -3402,16 +3405,22 @@ actual object RealmInterop {
         private val scope: CoroutineScope = CoroutineScope(dispatcher)
         val ref: CPointer<out CPointed> = StableRef.create(this).asCPointer()
         private lateinit var scheduler: CPointer<realm_scheduler_t>
+        private val lock = SynchronizableObject()
+        private var cancelled = false
 
         fun setScheduler(scheduler: CPointer<realm_scheduler_t>) {
             this.scheduler = scheduler
         }
 
-        override fun notify() {
+        override fun notify(work_queue: CPointer<realm_work_queue_t>?) {
             scope.launch {
                 try {
                     printlntid("on dispatcher")
-                    realm_wrapper.realm_scheduler_perform_work(scheduler)
+                    lock.withLock {
+                        if (!cancelled) {
+                            realm_wrapper.realm_scheduler_perform_work(work_queue)
+                        }
+                    }
                 } catch (e: Exception) {
                     // Should never happen, but is included for development to get some indicators
                     // on errors instead of silent crashes.
@@ -3419,6 +3428,12 @@ actual object RealmInterop {
                 }
             }
         }
+
+        fun cancel() {
+            lock.withLock {
+                cancelled = true
+            }
+        }
     }
 }
 
diff --git a/packages/external/core b/packages/external/core
index e6271d7230..7556b535aa 160000
--- a/packages/external/core
+++ b/packages/external/core
@@ -1 +1 @@
-Subproject commit e6271d72308b40399890060f58a88cf568c2ee22
+Subproject commit 7556b535aa7b27d49c13444894f7e9db778b3203
diff --git a/packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp b/packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp
index fbc9c746d7..03783aaf80 100644
--- a/packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp
+++ b/packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp
@@ -264,6 +264,7 @@ class CustomJVMScheduler {
         JNIEnv *jenv = get_env();
         jclass jvm_scheduler_class = jenv->FindClass("io/realm/kotlin/internal/interop/JVMScheduler");
         m_notify_method = jenv->GetMethodID(jvm_scheduler_class, "notifyCore", "(J)V");
+        m_cancel_method = jenv->GetMethodID(jvm_scheduler_class, "cancel", "()V");
         m_jvm_dispatch_scheduler = jenv->NewGlobalRef(dispatchScheduler);
     }
 
@@ -271,18 +272,14 @@ class CustomJVMScheduler {
         get_env(true)->DeleteGlobalRef(m_jvm_dispatch_scheduler);
     }
 
-    void set_scheduler(realm_scheduler_t* scheduler) {
-        m_scheduler = scheduler;
-    }
-
-    void notify() {
+    void notify(realm_work_queue_t* work_queue) {
         // There is currently no signaling of creation/tear down of the core notifier thread, so we
         // just attach it as a daemon thread here on first notification to allow the JVM to
         // shutdown propertly. See https://github.com/realm/realm-core/issues/6429
         auto jenv = get_env(true, true, "core-notifier");
         jni_check_exception(jenv);
         jenv->CallVoidMethod(m_jvm_dispatch_scheduler, m_notify_method,
-                             reinterpret_cast<jlong>(m_scheduler));
+                             reinterpret_cast<jlong>(work_queue));
     }
 
     bool is_on_thread() const noexcept {
@@ -293,12 +290,18 @@ class CustomJVMScheduler {
         return true;
     }
 
+    void cancel() {
+        auto jenv = get_env(true, true, "core-notifier");
+        jenv->CallVoidMethod(m_jvm_dispatch_scheduler, m_cancel_method);
+        jni_check_exception(jenv);
+    }
+
 
 private:
     std::thread::id m_id;
     jmethodID m_notify_method;
+    jmethodID m_cancel_method;
     jobject m_jvm_dispatch_scheduler;
-    realm_scheduler_t *m_scheduler;
 };
 
 // Note: using jlong here will create a linker issue
@@ -309,8 +312,8 @@ class CustomJVMScheduler {
 //
 // I suspect this could be related to the fact that jni.h defines jlong differently between Android (typedef int64_t)
 // and JVM which is a (typedef long long) resulting in a different signature of the method that could be found by the linker.
-void invoke_core_notify_callback(int64_t scheduler) {
-    realm_scheduler_perform_work(reinterpret_cast<realm_scheduler_t *>(scheduler));
+void invoke_core_notify_callback(int64_t work_queue) {
+    realm_scheduler_perform_work(reinterpret_cast<realm_work_queue_t *>(work_queue));
 }
 
 realm_scheduler_t*
@@ -319,13 +322,16 @@ realm_create_scheduler(jobject dispatchScheduler) {
         auto jvmScheduler = new CustomJVMScheduler(dispatchScheduler);
         auto scheduler = realm_scheduler_new(
                 jvmScheduler,
-                [](void *userdata) { delete(static_cast<CustomJVMScheduler *>(userdata)); },
-                [](void *userdata) { static_cast<CustomJVMScheduler *>(userdata)->notify(); },
+                [](void *userdata) {
+                    auto jvmScheduler = static_cast<CustomJVMScheduler *>(userdata);
+                    jvmScheduler->cancel();
+                    delete(jvmScheduler);
+                },
+                [](void *userdata, realm_work_queue_t* work_queue) { static_cast<CustomJVMScheduler *>(userdata)->notify(work_queue); },
                 [](void *userdata) { return static_cast<CustomJVMScheduler *>(userdata)->is_on_thread(); },
                 [](const void *userdata, const void *userdata_other) { return userdata == userdata_other; },
                 [](void *userdata) { return static_cast<CustomJVMScheduler *>(userdata)->can_invoke(); }
         );
-        jvmScheduler->set_scheduler(scheduler);
         return scheduler;
     }
     throw std::runtime_error("Null dispatchScheduler");
diff --git a/packages/test-base/src/commonMain/kotlin/io/realm/kotlin/test/util/Utils.kt b/packages/test-base/src/commonMain/kotlin/io/realm/kotlin/test/util/Utils.kt
index 5e72a273dd..f07e4676bf 100644
--- a/packages/test-base/src/commonMain/kotlin/io/realm/kotlin/test/util/Utils.kt
+++ b/packages/test-base/src/commonMain/kotlin/io/realm/kotlin/test/util/Utils.kt
@@ -21,8 +21,10 @@ import io.realm.kotlin.Realm
 import io.realm.kotlin.test.platform.PlatformUtils
 import io.realm.kotlin.types.RealmInstant
 import io.realm.kotlin.types.RealmObject
+import kotlinx.coroutines.TimeoutCancellationException
 import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.withTimeout
+import kotlinx.coroutines.selects.onTimeout
+import kotlinx.coroutines.selects.select
 import kotlinx.datetime.Instant
 import kotlin.time.Duration
 import kotlin.time.Duration.Companion.minutes
@@ -92,8 +94,12 @@ fun Instant.toRealmInstant(): RealmInstant {
 }
 
 // Variant of `Channel.receiveOrFail()` that will will throw if a timeout is hit.
-suspend fun <T : Any?> Channel<T>.receiveOrFail(timeout: Duration = 1.minutes): T {
-    return withTimeout(timeout) {
-        receive()
+suspend fun <T : Any?> Channel<T>.receiveOrFail(timeout: Duration = 1.minutes, message: String? = null): T {
+    return select {
+        this@receiveOrFail.onReceive { it }
+        onTimeout(timeout) {
+            @Suppress("invisible_member")
+            throw TimeoutCancellationException("Timeout: $message")
+        }
     }
 }
diff --git a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/VersionTrackingTests.kt b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/VersionTrackingTests.kt
index 6a28982495..e51f286ce7 100644
--- a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/VersionTrackingTests.kt
+++ b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/VersionTrackingTests.kt
@@ -45,6 +45,7 @@ import kotlin.test.Test
 import kotlin.test.assertEquals
 import kotlin.test.assertNotNull
 import kotlin.test.assertNull
+import kotlin.test.assertTrue
 
 class VersionTrackingTests {
     private lateinit var initialLogLevel: LogLevel
@@ -150,7 +151,8 @@ class VersionTrackingTests {
         realm.write<Unit> { copyToRealm(Sample()) }
         realm.write<Unit> { copyToRealm(Sample()) }
         realm.activeVersions().run {
-            assertEquals(1, allTracked.size, toString())
+            // Initially tracked version from user facing realm might have been released by now
+            assertTrue(allTracked.size <= 1, toString())
             assertNotNull(notifier, toString())
             assertEquals(0, notifier?.active?.size, toString())
             assertNotNull(writer, toString())
diff --git a/packages/test-base/src/nativeDarwinTest/kotlin/io/realm/kotlin/test/darwin/MemoryTests.kt b/packages/test-base/src/nativeDarwinTest/kotlin/io/realm/kotlin/test/darwin/MemoryTests.kt
index 3c111a3891..ff29ef5503 100644
--- a/packages/test-base/src/nativeDarwinTest/kotlin/io/realm/kotlin/test/darwin/MemoryTests.kt
+++ b/packages/test-base/src/nativeDarwinTest/kotlin/io/realm/kotlin/test/darwin/MemoryTests.kt
@@ -61,6 +61,15 @@ class MemoryTests {
         @OptIn(ExperimentalStdlibApi::class)
         println("NEW_MEMORY_MODEL: " + isExperimentalMM())
 
+        // Referencing things like
+        //   NSProcessInfo.Companion.processInfo().operatingSystemVersionString
+        //   platform.Foundation.NSFileManager.defaultManager
+        // as done in Darwin SystemUtils.kt and initialized lazily, so do a full realm-lifecycle
+        // to only measure increases over the actual test
+        // - Ensure that we clean up any released memory to get a nice baseline
+        platform.posix.sleep(1 * 5) // give chance to the Collector Thread to process out of scope references
+        triggerGC()
+        // - Record the baseline
         val initialAllocation = parseSizeString(runSystemCommand(amountOfMemoryMappedInProcessCMD))
 
         val referenceHolder = mutableListOf<Sample>();
@@ -91,11 +100,6 @@ class MemoryTests {
         triggerGC()
         platform.posix.sleep(1 * 5) // give chance to the Collector Thread to process out of scope references
 
-        // Referencing things like
-        //   NSProcessInfo.Companion.processInfo().operatingSystemVersionString
-        //   platform.Foundation.NSFileManager.defaultManager
-        // as done in Darwin SystemUtils.kt cause allocations so we just assert the increase over
-        // the test
         val allocation = parseSizeString(runSystemCommand(amountOfMemoryMappedInProcessCMD))
         assertEquals(initialAllocation, allocation, "mmap allocation exceeds expectations: initial=$initialAllocation current=$allocation")
     }
diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/SyncedRealmTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/SyncedRealmTests.kt
index fe2851414b..d79f7f157a 100644
--- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/SyncedRealmTests.kt
+++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/SyncedRealmTests.kt
@@ -739,7 +739,7 @@ class SyncedRealmTests {
             realm.writeBlocking { copyToRealm(masterObject) }
             realm.syncSession.uploadAllLocalChanges()
         }
-        assertEquals(42, counterValue.receiveOrFail(), "Failed to receive 42")
+        assertEquals(42, counterValue.receiveOrFail(message = "Failed to receive 42"))
 
         // Increment counter asynchronously after download initial data (1)
         val increment1 = async {
@@ -753,9 +753,10 @@ class SyncedRealmTests {
                         .mutableRealmIntField
                         .increment(1)
                 }
+                realm.syncSession.uploadAllLocalChanges(10.seconds)
             }
         }
-        assertEquals(43, counterValue.receiveOrFail(), "Failed to receive 43")
+        assertEquals(43, counterValue.receiveOrFail(message = "Failed to receive 43"))
 
         // Increment counter asynchronously after download initial data (2)
         val increment2 = async {
@@ -769,9 +770,10 @@ class SyncedRealmTests {
                         .mutableRealmIntField
                         .increment(1)
                 }
+                realm.syncSession.uploadAllLocalChanges(10.seconds)
             }
         }
-        assertEquals(44, counterValue.receiveOrFail(), "Failed to receive 44")
+        assertEquals(44, counterValue.receiveOrFail(message = "Failed to receive 44"))
 
         increment1.cancel()
         increment2.cancel()