diff --git a/cache/src/commonMain/kotlin/org/mobilenativefoundation/store/cache5/StoreMultiCache.kt b/cache/src/commonMain/kotlin/org/mobilenativefoundation/store/cache5/StoreMultiCache.kt index 7003bc582..8e4007d6a 100644 --- a/cache/src/commonMain/kotlin/org/mobilenativefoundation/store/cache5/StoreMultiCache.kt +++ b/cache/src/commonMain/kotlin/org/mobilenativefoundation/store/cache5/StoreMultiCache.kt @@ -2,6 +2,7 @@ package org.mobilenativefoundation.store.cache5 +import org.mobilenativefoundation.store.core5.ExperimentalStoreApi import org.mobilenativefoundation.store.core5.KeyProvider import org.mobilenativefoundation.store.core5.StoreData import org.mobilenativefoundation.store.core5.StoreKey @@ -12,6 +13,7 @@ import org.mobilenativefoundation.store.core5.StoreKey * Depends on [StoreMultiCacheAccessor] for internal data management. * @see [Cache]. */ +@OptIn(ExperimentalStoreApi::class) class StoreMultiCache<Id : Any, Key : StoreKey<Id>, Single : StoreData.Single<Id>, Collection : StoreData.Collection<Id, Single>, Output : StoreData<Id>>( private val keyProvider: KeyProvider<Id, Single>, singlesCache: Cache<StoreKey.Single<Id>, Single> = CacheBuilder<StoreKey.Single<Id>, Single>().build(), diff --git a/paging/src/commonMain/kotlin/org/mobilenativefoundation/store/paging5/RealPager.kt b/paging/src/commonMain/kotlin/org/mobilenativefoundation/store/paging5/RealPager.kt index f8acbb795..bfe2ed15a 100644 --- a/paging/src/commonMain/kotlin/org/mobilenativefoundation/store/paging5/RealPager.kt +++ b/paging/src/commonMain/kotlin/org/mobilenativefoundation/store/paging5/RealPager.kt @@ -16,7 +16,21 @@ import org.mobilenativefoundation.store.core5.StoreData import org.mobilenativefoundation.store.core5.StoreKey import org.mobilenativefoundation.store.store5.StoreReadResponse - +/** + * An internal class that implements the [Pager] interface. + * It manages the paging of data items based on the given [StoreKey]. + * It also synchronizes updates to single items and collections. + * + * @param Id The type of the identifier that uniquely identifies data items. + * @param SK The subtype of [StoreKey.Single] that represents keys for single items. + * @param K The type of [StoreKey] used by the Store this pager delegates to. + * @param SO The subtype of [StoreData.Single] representing the data model for single items. + * @param O The type of [StoreData] representing the output of the Store this pager delegates to. + * @param scope The [CoroutineScope] within which the pager operates. Used to launch coroutines for data streaming and joining. + * @param streamer A [Streamer] function type that provides a flow of [StoreReadResponse] for a given key. + * @param joiner A [Joiner] function type that combines multiple paging data into a single [StateFlow]. + * @param keyFactory A [KeyFactory] to create new [StoreKey] instances for single items based on their identifiers. + */ @ExperimentalStoreApi internal class RealPager<Id : Any, SK : StoreKey.Single<Id>, K : StoreKey<Id>, SO : StoreData.Single<Id>, O : StoreData<Id>>( private val scope: CoroutineScope, @@ -25,50 +39,64 @@ internal class RealPager<Id : Any, SK : StoreKey.Single<Id>, K : StoreKey<Id>, S private val keyFactory: KeyFactory<Id, SK> ) : Pager<Id, K, SO> { + // StateFlow to emit updates of PagingData. private val mutableStateFlow = MutableStateFlow(emptyPagingData()) override val state: StateFlow<PagingData<Id, SO>> = mutableStateFlow.asStateFlow() + // Maps to keep track of all PagingData and corresponding streams. private val allPagingData: MutableMap<K, PagingData<Id, SO>> = mutableMapOf() private val allStreams: MutableMap<K, Job> = mutableMapOf() + // Mutexes for thread-safe access to maps. private val mutexForAllPagingData = Mutex() private val mutexForAllStreams = Mutex() + override fun load(key: K) { if (key !is StoreKey.Collection<*>) { throw IllegalArgumentException("Invalid key type.") } + // Creating a child scope for coroutines. val childScope = scope + Job() + // Launching a coroutine within the child scope for data loading. childScope.launch { + // Locking the streams map to check and manage stream jobs. mutexForAllStreams.withLock { + // Checking if there's no active stream for the key. if (allStreams[key]?.isActive != true) { + // Initializing the PagingData for the key. allPagingData[key] = emptyPagingData() val childrenKeys = mutableListOf<K>() + // Main job to stream data for the key. val mainJob = launch { streamer(key).collect { response -> if (response is StoreReadResponse.Data<O>) { + // Handling collection data response. (response as? StoreReadResponse.Data<StoreData.Collection<Id, SO>>)?.let { dataWithCollection -> + // Updating paging data and state flow inside a locked block for thread safety. mutexForAllPagingData.withLock { allPagingData[key] = pagingDataFrom(dataWithCollection.value.items) val joinedData = joiner(allPagingData) mutableStateFlow.value = joinedData } + // For each item in the collection, initiate streaming and handling of single data. dataWithCollection.value.items.forEach { single -> val childKey = keyFactory.createFor(single.id) (childKey as? K)?.let { + // Launching a coroutine for each single item. val childJob = launch { initStreamAndHandleSingle(single, childKey, key) } + // Keeping track of child keys and jobs. childrenKeys.add(childKey) - mutexForAllStreams.withLock { allStreams[childKey] = childJob } @@ -79,9 +107,11 @@ internal class RealPager<Id : Any, SK : StoreKey.Single<Id>, K : StoreKey<Id>, S } } + // Storing the main job and handling its completion. allStreams[key] = mainJob mainJob.invokeOnCompletion { + // On completion, cancel and remove all child streams and the main stream. childrenKeys.forEach { childKey -> allStreams[childKey]?.cancel() allStreams.remove(childKey) @@ -95,20 +125,24 @@ internal class RealPager<Id : Any, SK : StoreKey.Single<Id>, K : StoreKey<Id>, S } } + // Handles streaming and updating of single item data within a collection. private suspend fun initStreamAndHandleSingle(single: SO, childKey: K, parentKey: K) { streamer(childKey).collect { response -> if (response is StoreReadResponse.Data<O>) { (response as? StoreReadResponse.Data<SO>)?.let { dataWithSingle -> mutexForAllPagingData.withLock { allPagingData[parentKey]?.items?.let { items -> + // Finding and updating the single item within the parent collection. val indexOfSingle = items.indexOfFirst { it.id == single.id } val updatedItems = items.toMutableList() if (updatedItems[indexOfSingle] != dataWithSingle.value) { updatedItems[indexOfSingle] = dataWithSingle.value + // Creating and updating the paging data with the updated item list. val updatedPagingData = allPagingData[parentKey]!!.copy(updatedItems) allPagingData[parentKey] = updatedPagingData + // Updating the state flow with the joined data. val joinedData = joiner(allPagingData) mutableStateFlow.value = joinedData } diff --git a/paging/src/commonTest/kotlin/org/mobilenativefoundation/store/paging5/util/PostStoreFactory.kt b/paging/src/commonTest/kotlin/org/mobilenativefoundation/store/paging5/util/PostStoreFactory.kt index 8ed9b2011..603272e2e 100644 --- a/paging/src/commonTest/kotlin/org/mobilenativefoundation/store/paging5/util/PostStoreFactory.kt +++ b/paging/src/commonTest/kotlin/org/mobilenativefoundation/store/paging5/util/PostStoreFactory.kt @@ -3,19 +3,14 @@ package org.mobilenativefoundation.store.paging5.util import kotlinx.coroutines.flow.flow -import org.mobilenativefoundation.store.cache5.Cache -import org.mobilenativefoundation.store.cache5.StoreMultiCache -import org.mobilenativefoundation.store.core5.KeyProvider -import org.mobilenativefoundation.store.core5.StoreKey -import org.mobilenativefoundation.store.store5.Converter import org.mobilenativefoundation.store.core5.ExperimentalStoreApi +import org.mobilenativefoundation.store.store5.Converter import org.mobilenativefoundation.store.store5.Fetcher import org.mobilenativefoundation.store.store5.MutableStore import org.mobilenativefoundation.store.store5.SourceOfTruth import org.mobilenativefoundation.store.store5.StoreBuilder import org.mobilenativefoundation.store.store5.Updater import org.mobilenativefoundation.store.store5.UpdaterResult -import kotlin.math.floor class PostStoreFactory(private val api: PostApi, private val db: PostDatabase) { @@ -106,29 +101,9 @@ class PostStoreFactory(private val api: PostApi, private val db: PostDatabase) { } ) - private fun createPagingCacheKeyProvider(): KeyProvider<String, PostData.Post> = - object : KeyProvider<String, PostData.Post> { - override fun fromCollection( - key: StoreKey.Collection<String>, - value: PostData.Post - ): StoreKey.Single<String> { - return PostKey.Single(value.postId) - } - - override fun fromSingle(key: StoreKey.Single<String>, value: PostData.Post): StoreKey.Collection<String> { - val id = value.postId.toInt() - val cursor = (floor(id.toDouble() / 10) * 10) + 1 - return PostKey.Cursor(cursor.toInt().toString(), 10) - } - } - - private fun createMemoryCache(): Cache<PostKey, PostData> = - StoreMultiCache(createPagingCacheKeyProvider()) - fun create(): MutableStore<PostKey, PostData> = StoreBuilder.from( fetcher = createFetcher(), sourceOfTruth = createSourceOfTruth(), - memoryCache = createMemoryCache() ).toMutableStoreBuilder( converter = createConverter() ).build(