Skip to content

Commit

Permalink
Clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
matt-ramotar committed Feb 17, 2024
1 parent 6764ee2 commit 6af0312
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 317 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
package org.mobilenativefoundation.store.core5


@OptIn(ExperimentalStoreApi::class)
interface KeyFactory<Id: Any, Key: StoreKey.Single<Id>> {
fun createSingleFor(id: Id): Key
}


/**
* An interface that defines keys used by Store for data-fetching operations.
* Allows Store to fetch individual items and collections of items.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,187 +13,192 @@ import kotlinx.coroutines.plus
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.mobilenativefoundation.store.core5.ExperimentalStoreApi
import org.mobilenativefoundation.store.core5.KeyFactory
import org.mobilenativefoundation.store.core5.StoreData
import org.mobilenativefoundation.store.core5.StoreKey
import org.mobilenativefoundation.store.store5.MutableStore
import org.mobilenativefoundation.store.store5.Store
import org.mobilenativefoundation.store.store5.StoreReadRequest
import org.mobilenativefoundation.store.store5.StoreReadResponse

@ExperimentalStoreApi
interface StorePager<Id : Any, Key : StoreKey<Id>, Output : StoreData<Id>> {
val state: StateFlow<StoreReadResponse<Output>>
fun load(key: Key)
}

@ExperimentalStoreApi
interface DataJoiner<Id : Any, Key : StoreKey<Id>, Output : StoreData.Collection<Id, Single>, Single : StoreData.Single<Id>> {
suspend operator fun invoke(
key: Key,
data: Map<Key, StoreReadResponse.Data<Output>?>
): StoreReadResponse.Data<Output>
}
data class PagingData<Id : Any, SO : StoreData.Single<Id>>(
val items: List<SO>
)


/**
* Initializes and returns a [StateFlow] that reflects the state of the [Store], updating by a flow of provided keys.
* @see [launchPagingStore].
*/
@ExperimentalStoreApi
fun <Id : Any, Key : StoreKey<Id>, Output : StoreData<Id>, SingleKey : StoreKey.Single<Id>, Collection : StoreData.Collection<Id, Single>, Single : StoreData.Single<Id>> MutableStore<Key, Output>.launchPagingStore(
scope: CoroutineScope,
keys: Flow<Key>,
joiner: DataJoiner<Id, Key, Collection, Single>,
keyFactory: KeyFactory<Id, SingleKey>
): StateFlow<StoreReadResponse<Output>> {
fun streamer(key: Key): Flow<StoreReadResponse<Output>> {
println("STREAMING FOR KEY $key")
return stream<Any>(StoreReadRequest.fresh(key))
}
interface Pager<Id : Any, K : StoreKey<Id>, SO : StoreData.Single<Id>> {
val state: StateFlow<PagingData<Id, SO>>
fun load(key: K)

companion object {
fun <Id : Any, SK : StoreKey.Single<Id>, K : StoreKey<Id>, SO : StoreData.Single<Id>, O : StoreData<Id>> create(
scope: CoroutineScope,
store: Store<K, O>,
joiner: Joiner<Id, K, SO>,
keyFactory: KeyFactory<Id, SK>
): Pager<Id, K, SO> {

val streamer = object : Streamer<Id, K, O> {
override fun invoke(key: K): Flow<StoreReadResponse<O>> {
return store.stream(StoreReadRequest.fresh(key))
}
}

return RealPager(
scope,
streamer,
joiner,
keyFactory
)
}

val pager = RealStorePager(scope, ::streamer, joiner, keyFactory)
fun <Id : Any, SK : StoreKey.Single<Id>, K : StoreKey<Id>, SO : StoreData.Single<Id>, O : StoreData<Id>> create(
scope: CoroutineScope,
store: MutableStore<K, O>,
joiner: Joiner<Id, K, SO>,
keyFactory: KeyFactory<Id, SK>
): Pager<Id, K, SO> {

val childScope = scope + Job()
val streamer = object : Streamer<Id, K, O> {
override fun invoke(key: K): Flow<StoreReadResponse<O>> {
return store.stream<Any>(StoreReadRequest.fresh(key))
}
}

childScope.launch {
keys.collect { key ->
pager.load(key)
return RealPager(
scope,
streamer,
joiner,
keyFactory
)
}
}
}

return pager.state
@ExperimentalStoreApi
interface Joiner<Id : Any, K : StoreKey<Id>, SO : StoreData.Single<Id>> {
suspend operator fun invoke(data: Map<K, PagingData<Id, SO>>): PagingData<Id, SO>
}

@ExperimentalStoreApi
class RealStorePager<Id : Any, Key : StoreKey<Id>, SingleKey : StoreKey.Single<Id>, Output : StoreData<Id>, Collection : StoreData.Collection<Id, Single>, Single : StoreData.Single<Id>>(
private val scope: CoroutineScope,
private val streamer: (key: Key) -> Flow<StoreReadResponse<Output>>,
private val joiner: DataJoiner<Id, Key, Collection, Single>,
private val keyFactory: KeyFactory<Id, SingleKey>
) : StorePager<Id, Key, Output> {
private val mutableStateFlow = MutableStateFlow<StoreReadResponse<Output>>(StoreReadResponse.Initial)
override val state: StateFlow<StoreReadResponse<Output>> = mutableStateFlow.asStateFlow()
interface Streamer<Id : Any, K : StoreKey<Id>, O : StoreData<Id>> {
operator fun invoke(key: K): Flow<StoreReadResponse<O>>
}

private val data: MutableMap<Key, StoreReadResponse.Data<Collection>?> = mutableMapOf()
private val streams: MutableMap<Key, Job> = mutableMapOf()
@ExperimentalStoreApi
interface KeyFactory<Id : Any, SK : StoreKey.Single<Id>> {
fun createFor(id: Id): SK
}

private val dataMutex = Mutex()
private val streamsMutex = Mutex()
@ExperimentalStoreApi
class RealPager<Id : Any, SK : StoreKey.Single<Id>, K : StoreKey<Id>, SO : StoreData.Single<Id>, O : StoreData<Id>>(
private val scope: CoroutineScope,
private val streamer: Streamer<Id, K, O>,
private val joiner: Joiner<Id, K, SO>,
private val keyFactory: KeyFactory<Id, SK>
) : Pager<Id, K, SO> {

override fun load(key: Key) {
private val mutableStateFlow = MutableStateFlow(emptyPagingData())
override val state: StateFlow<PagingData<Id, SO>> = mutableStateFlow.asStateFlow()

println("HITTING0 $key")
private val allPagingData: MutableMap<K, PagingData<Id, SO>> = mutableMapOf()
private val allStreams: MutableMap<K, Job> = mutableMapOf()

private val mutexForAllPagingData = Mutex()
private val mutexForAllStreams = Mutex()
override fun load(key: K) {
if (key !is StoreKey.Collection<*>) {
throw IllegalArgumentException("Invalid key type")
throw IllegalArgumentException("Invalid key type.")
}

val childScope = scope + Job()

childScope.launch {
streamsMutex.withLock {
if (streams[key]?.isActive != true) {
data[key] = null
mutexForAllStreams.withLock {
if (allStreams[key]?.isActive != true) {
allPagingData[key] = emptyPagingData()

val nestedKeys = mutableListOf<Key>()
val childrenKeys = mutableListOf<K>()

val job = launch {
val mainJob = launch {
streamer(key).collect { response ->
when (response) {
is StoreReadResponse.Data<Output> -> {
println("HITTING1 $response")
(response as? StoreReadResponse.Data<Collection>)?.let {
dataMutex.withLock {
data[key] = it
val joinedData = joiner(key, data)
(joinedData as? StoreReadResponse.Data<Output>)?.let {
mutableStateFlow.emit(it)
}
}

if (response is StoreReadResponse.Data<O>) {
(response as? StoreReadResponse.Data<StoreData.Collection<Id, SO>>)?.let { dataWithCollection ->

it.value.items.forEach { single ->
// TODO: Start stream for each single
// TODO: When change in single, update paging state
val singleKey = keyFactory.createSingleFor(single.id)
(singleKey as? Key)?.let { k ->

val nestedJob = launch {
streamer(k).collect { singleResponse ->
when (singleResponse) {
is StoreReadResponse.Data<Output> -> {
println("HITTING NESTED $singleResponse")
(singleResponse as? StoreReadResponse.Data<Single>)?.let {
dataMutex.withLock {
data[key]?.value?.items?.let { items ->
val index =
items.indexOfFirst { it.id == single.id }
val updatedItems = items.toMutableList()

if (updatedItems[index] != it.value) {
println("HITTING FOR ${it.value}")
updatedItems[index] = it.value
val updatedCollection =
data[key]!!.value.copyWith(updatedItems) as? Collection

updatedCollection?.let { collection ->
data[key] = data[key]!!.copy(collection)

val joinedData = joiner(key, data)
(joinedData as? StoreReadResponse.Data<Output>)?.let {
mutableStateFlow.emit(it)
}
}
}

}
mutexForAllPagingData.withLock {
allPagingData[key] = pagingDataFrom(dataWithCollection.value.items)
val joinedData = joiner(allPagingData)
mutableStateFlow.value = joinedData
}

}
}
}
dataWithCollection.value.items.forEach { single ->

else -> {}
}
}
val childKey = keyFactory.createFor(single.id)

(childKey as? K)?.let {
val childJob = launch {
initStreamAndHandleSingle(single, childKey, key)
}

}
childrenKeys.add(childKey)

streams[k] = nestedJob
nestedKeys.add(k)
// TODO: This might result in a deadlock
mutexForAllStreams.withLock {
allStreams[childKey] = childJob
}

}
}
}

else -> {
println("HITTING $response")
mutableStateFlow.emit(response)
}
}
}
}

streams[key] = job
allStreams[key] = mainJob

job.invokeOnCompletion {
nestedKeys.forEach {
streams[it]?.cancel()
streams.remove(it)
mainJob.invokeOnCompletion {
childrenKeys.forEach { childKey ->
allStreams[childKey]?.cancel()
allStreams.remove(childKey)
}

streams[key]?.cancel()
streams.remove(key)
allStreams[key]?.cancel()
allStreams.remove(key)
}
}
}
}
}
}

private suspend fun initStreamAndHandleSingle(single: SO, childKey: K, parentKey: K) {
streamer(childKey).collect { response ->
if (response is StoreReadResponse.Data<O>) {
(response as? StoreReadResponse.Data<SO>)?.let { dataWithSingle ->
mutexForAllPagingData.withLock {
allPagingData[parentKey]?.items?.let { items ->
val indexOfSingle = items.indexOfFirst { it.id == single.id }
val updatedItems = items.toMutableList()
if (updatedItems[indexOfSingle] != dataWithSingle.value) {
updatedItems[indexOfSingle] = dataWithSingle.value

val updatedPagingData = allPagingData[parentKey]!!.copy(updatedItems)
allPagingData[parentKey] = updatedPagingData

val joinedData = joiner(allPagingData)
mutableStateFlow.value = joinedData
}
}
}

}
}
}
}

private fun emptyPagingData() = PagingData<Id, SO>(emptyList())
private fun pagingDataFrom(items: List<SO>) = PagingData(items)

}



Loading

0 comments on commit 6af0312

Please sign in to comment.