Skip to content

Commit

Permalink
Fix selectResult readAt and deleteStale methods (#8)
Browse files Browse the repository at this point in the history
* added delete by keys
added delete by read/write methods

* - Added options on deleteState overloads
- Bug: Added readAt to be set when being deserialized (as it has just been read)

* fix merge
  • Loading branch information
chrisjenx authored Jan 15, 2025
1 parent ce4e7c0 commit cf552d3
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 18 deletions.
5 changes: 1 addition & 4 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ org.gradle.caching=true
org.gradle.configuration-cache=true
org.gradle.daemon=true
org.gradle.parallel=true

# Maven
GROUP=com.mercury.sqkon
VERSION_NAME=1.0.0-alpha02
VERSION_NAME=1.0.0-alpha04
POM_NAME=Sqkon
POM_INCEPTION_YEAR=2024
POM_URL=https://github.com/MercuryTechnologies/sqkon/
Expand All @@ -19,11 +18,9 @@ POM_SCM_CONNECTION=scm:git:git://github.com/MercuryTechnologies/sqkon.git
POM_SCM_DEV_CONNECTION=scm:git:ssh://[email protected]/MercuryTechnologies/sqkon.git
POM_DEVELOPER_NAME=MercuryTechnologies
POM_DEVELOPER_URL=https://github.com/MercuryTechnologies/

#Kotlin
kotlin.code.style=official
kotlin.daemon.jvmargs=-Xmx4G

#Android
android.useAndroidX=true
android.nonTransitiveRClass=true
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ androidx-runner = "1.6.2"
kotlin = "2.1.0"
agp = "8.8.0"
kotlinx-coroutines = "1.10.1"
kotlinx-serialization = { require = "1.7.3" }
kotlinx-serialization = { require = "1.8.0" }
kotlinx-datetime = "0.6.1"
paging = "3.3.0-alpha02-0.5.1"
sqlDelight = "2.0.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,15 @@ class EntityQueries(
parameters = 1,
bindArgs = { bindString(entityName) }
))
when(entityKeys?.size) {
when (entityKeys?.size) {
null, 0 -> {}

1 -> add(SqlQuery(
where = "entity_key = ?",
parameters = 1,
bindArgs = { bindString(entityKeys.first()) }
))

else -> add(SqlQuery(
where = "entity_key IN (${entityKeys.joinToString(",") { "?" }})",
parameters = entityKeys.size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ open class KeyValueStorage<T : Any>(
entity.deserialize<T>()?.let { v -> ResultRow(entity, v) }
}
}
.distinctUntilChanged()
}

/**
Expand Down Expand Up @@ -402,17 +403,40 @@ open class KeyValueStorage<T : Any>(
* hours. You would call this function with `Clock.System.now().minus(1.days)`. This is not the same as
* [deleteExpired] which is based on the `expires_at` field.
*
* @param writeInstant if set, will delete rows that have not been written to before this time.
* @param readInstant if set, will delete rows that have not been read before this time.
*
* @see deleteExpired
*/
suspend fun deleteStale(
writeInstant: Instant = Clock.System.now(),
readInstant: Instant = Clock.System.now()
writeInstant: Instant? = Clock.System.now(),
readInstant: Instant? = Clock.System.now()
) = transaction {
metadataQueries.purgeStale(
entity_name = entityName,
writeInstant = writeInstant.toEpochMilliseconds(),
readInstant = readInstant.toEpochMilliseconds()
)
when {
writeInstant != null && readInstant != null -> {
metadataQueries.purgeStale(
entity_name = entityName,
writeInstant = writeInstant.toEpochMilliseconds(),
readInstant = readInstant.toEpochMilliseconds()
)
}

writeInstant != null -> {
metadataQueries.purgeStaleWrite(
entity_name = entityName,
writeInstant = writeInstant.toEpochMilliseconds()
)
}

readInstant != null -> {
metadataQueries.purgeStaleRead(
entity_name = entityName,
readInstant = readInstant.toEpochMilliseconds()
)
}

else -> return@transaction
}
updateWriteAt(
currentCoroutineContext()[RequestHash.Key]?.hash
?: (writeInstant.hashCode() + readInstant.hashCode())
Expand All @@ -429,7 +453,7 @@ open class KeyValueStorage<T : Any>(
*
* @see deleteExpired
*/
suspend fun deleteState(instant: Instant = Clock.System.now()) {
suspend fun deleteState(instant: Instant) {
deleteStale(instant, instant)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.mercury.sqkon.db

import kotlinx.datetime.Clock
import kotlinx.datetime.Instant

data class ResultRow<T : Any>(
Expand All @@ -14,7 +15,8 @@ data class ResultRow<T : Any>(
addedAt = Instant.fromEpochMilliseconds(entity.added_at),
updatedAt = Instant.fromEpochMilliseconds(entity.updated_at),
expiresAt = entity.expires_at?.let { Instant.fromEpochMilliseconds(it) },
readAt = entity.read_at?.let { Instant.fromEpochMilliseconds(it) },
readAt = Clock.System.now(), // By reading this value, we are marking it as read, we just
// update the db async
writeAt = Instant.fromEpochMilliseconds(entity.write_at),
value = value,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,9 @@ DELETE FROM entity
WHERE entity_name = :entity_name
AND write_at < :writeInstant
AND (read_at IS NULL OR read_at < :readInstant);

purgeStaleWrite:
DELETE FROM entity WHERE entity_name = :entity_name AND write_at < :writeInstant;

purgeStaleRead:
DELETE FROM entity WHERE entity_name = :entity_name AND read_at < :readInstant;
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.test.runTest
import kotlinx.datetime.Clock
import org.junit.After
import org.junit.Test
import java.lang.Thread.sleep
import kotlin.test.AfterTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertNotNull

class KeyValueStorageStaleTest {

Expand All @@ -21,7 +22,7 @@ class KeyValueStorageStaleTest {
"test-object", entityQueries, metadataQueries, mainScope
)

@After
@AfterTest
fun tearDown() {
mainScope.cancel()
}
Expand Down Expand Up @@ -53,6 +54,38 @@ class KeyValueStorageStaleTest {
assertEquals(0, actualAfterDelete.size)
}

@Test
fun insertAll_staleWrite_purgeReadNotStale() = runTest {
val expected = (0..10).map { TestObject() }
.associateBy { it.id }
.toSortedMap()
testObjectStorage.insertAll(expected)
sleep(1)
val now = Clock.System.now()
sleep(1)
testObjectStorage.selectAll().first()
// Clean up older than now
testObjectStorage.deleteStale(writeInstant = null, readInstant = now)
val actualAfterDelete = testObjectStorage.selectAll().first()
assertEquals(expected.size, actualAfterDelete.size)
}

@Test
fun insertAll_staleWrite_purgeStaleWrite() = runTest {
val expected = (0..10).map { TestObject() }
.associateBy { it.id }
.toSortedMap()
testObjectStorage.insertAll(expected)
sleep(1)
val now = Clock.System.now()
sleep(1)
testObjectStorage.selectAll().first()
// Clean up older than now
testObjectStorage.deleteStale(writeInstant = now, readInstant = null)
val actualAfterDelete = testObjectStorage.selectAll().first()
assertEquals(0, actualAfterDelete.size)
}

@Test
fun insertAll_readInPast() = runTest {
val expected = (0..10).map { TestObject() }
Expand All @@ -70,6 +103,41 @@ class KeyValueStorageStaleTest {
assertEquals(expected.size, actualAfterDelete.size)
}

@Test
fun insertAll_readInPast_purgeStaleRead() = runTest {
val expected = (0..10).map { TestObject() }
.associateBy { it.id }
.toSortedMap()
testObjectStorage.insertAll(expected)
testObjectStorage.selectAll().first()
sleep(1)
val now = Clock.System.now()
sleep(1)
// write again so read is in the past
testObjectStorage.updateAll(expected)
// Read in the past write is after now
testObjectStorage.deleteStale(writeInstant = null, readInstant = now)
val actualAfterDelete = testObjectStorage.selectAll().first()
assertEquals(0, actualAfterDelete.size)
}

@Test
fun insertAll_readInPast_purgeWriteNotStale() = runTest {
val expected = (0..10).map { TestObject() }
.associateBy { it.id }
.toSortedMap()
testObjectStorage.insertAll(expected)
testObjectStorage.selectAll().first()
val now = Clock.System.now()
sleep(10)
// write again so read is in the past
testObjectStorage.updateAll(expected)
// Read in the past write is after now
testObjectStorage.deleteStale(writeInstant = now, readInstant = null)
val actualAfterDelete = testObjectStorage.selectAll().first()
assertEquals(expected.size, actualAfterDelete.size)
}

@Test
fun insertAll_staleRead() = runTest {
val expected = (0..10).map { TestObject() }
Expand All @@ -86,4 +154,17 @@ class KeyValueStorageStaleTest {
assertEquals(0, actualAfterDelete.size)
}

@Test
fun selectResult_readWriteSet() = runTest {
val expected = (0..10).map { TestObject() }
.associateBy { it.id }
.toSortedMap()
testObjectStorage.insertAll(expected)
val actual = testObjectStorage.selectResult().first()
actual.forEach { result ->
assertNotNull(result.readAt)
assertNotNull(result.value)
}
}

}

0 comments on commit cf552d3

Please sign in to comment.