Skip to content

Commit

Permalink
Rewrite fetching changes via synchronous watch
Browse files Browse the repository at this point in the history
  • Loading branch information
dieproht committed Jun 25, 2024
1 parent 68a9553 commit 2b56312
Showing 1 changed file with 6 additions and 17 deletions.
23 changes: 6 additions & 17 deletions molly-core/src/main/scala/molly/core/query/SyncWatchQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ package molly.core.query
import cats.effect.kernel.Async
import cats.effect.kernel.Sync
import cats.effect.syntax.spawn.*
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import com.mongodb.client.ChangeStreamIterable
import com.mongodb.client.MongoChangeStreamCursor
import com.mongodb.client.model.changestream.ChangeStreamDocument
import com.mongodb.client.model.changestream.FullDocument
import fs2.Chunk
import fs2.Stream
import molly.core.MollyCodec
import org.bson.BsonDocument
Expand Down Expand Up @@ -42,20 +39,12 @@ final case class SyncWatchQuery[F[_], A] private[core] (private[core] val iterab
private type Cursor = MongoChangeStreamCursor[ChangeStreamDocument[BsonDocument]]

private def fromCursor(cursor: Cursor, bufferSize: Int, timeout: FiniteDuration): Stream[F, ChangeStreamDocument[A]] =
def getNextChunk(cursor: Cursor): F[Option[(Chunk[ChangeStreamDocument[BsonDocument]], Cursor)]] =
val buffer = Vector.newBuilder[ChangeStreamDocument[BsonDocument]]
f.race(
f
.suspend(Sync.Type.Blocking):
var count = 0
while count < bufferSize && cursor.hasNext do
buffer += cursor.next()
count += 1
if count == 0 then None else Some((Chunk.from(buffer.result()), cursor))
.cancelable(f.delay(cursor.close())),
f.sleep(timeout) >> f.delay(Some((Chunk.from(buffer.result()), cursor)))
).map(_.fold(identity, identity))
def getNext(cursor: Cursor): F[Option[(ChangeStreamDocument[BsonDocument], Cursor)]] =
f.suspend(Sync.Type.Blocking)(if cursor.hasNext() then Some(cursor.next() -> cursor) else None)
.cancelable(f.delay(cursor.close()))

Stream
.unfoldChunkEval(cursor)(getNextChunk)
.unfoldEval(cursor)(getNext)
.evalMap(WatchQuery.decodeChangeStreamDocument)
.groupWithin(bufferSize, timeout)
.flatMap(Stream.chunk)

0 comments on commit 2b56312

Please sign in to comment.