Skip to content

Commit

Permalink
Adding pooling support with keypool.
Browse files Browse the repository at this point in the history
  • Loading branch information
Shawn Garner committed Dec 8, 2023
1 parent f7ff73d commit 23c80b0
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 2 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ val commonSettings = Seq(
libraryDependencies ++= Seq(
Libraries.catsEffectKernel,
Libraries.redisClient,
Libraries.keyPool % Optional,
Libraries.catsEffect % Test,
Libraries.catsLaws % Test,
Libraries.catsTestKit % Test,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import cats._
import cats.data.NonEmptyList
import cats.effect.kernel._
import cats.syntax.all._
import dev.profunktor.redis4cats.Redis.Pool.PoolSettings
import dev.profunktor.redis4cats.algebra.BitCommandOperation
import dev.profunktor.redis4cats.algebra.BitCommandOperation.Overflows
import dev.profunktor.redis4cats.config.Redis4CatsConfig
Expand All @@ -42,22 +43,43 @@ import io.lettuce.core.{
ZAddArgs,
ZAggregateArgs,
ZStoreArgs,
ExpireArgs => JExpireArgs,
GetExArgs => JGetExArgs,
Limit => JLimit,
Range => JRange,
ReadFrom => JReadFrom,
ScanCursor => JScanCursor,
SetArgs => JSetArgs,
ExpireArgs => JExpireArgs
SetArgs => JSetArgs
}
import io.lettuce.core.api.async.RedisAsyncCommands
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands
import io.lettuce.core.cluster.api.sync.{ RedisClusterCommands => RedisClusterSyncCommands }
import org.typelevel.keypool.KeyPool

import scala.concurrent.duration._

object Redis {

object Pool {
final case class PoolSettings(maxTotal: Int, maxIdle: Int, idleTimeAllowedInPool: FiniteDuration)
object PoolSettings {
import scala.language.postfixOps
object Defaults {
val maxTotal: Int = Math.max(10, Runtime.getRuntime.availableProcessors())
val maxIdle: Int = 2
val idleTimeAllowedInPool: FiniteDuration = 60 seconds
}
val default: PoolSettings = PoolSettings(Defaults.maxTotal, Defaults.maxIdle, Defaults.idleTimeAllowedInPool)
}

implicit class PoolOps[F[_], K, V](val pool: KeyPool[F, Unit, RedisCommands[F, K, V]]) extends AnyVal {
@inline def withRedisCommands[A](
fn: RedisCommands[F, K, V] => F[A]
)(implicit M: MonadCancel[F, Throwable]): F[A] =
pool.take(()).use(managed => fn(managed.value))
}
}

private[redis4cats] def acquireAndRelease[F[_]: FutureLift: Log: MonadThrow, K, V](
client: RedisClient,
codec: RedisCodec[K, V],
Expand Down Expand Up @@ -237,6 +259,38 @@ object Redis {
Resource.make(acquire)(release).widen
}

/**
* Creates a [[RedisCommands]] for a single-node connection.
*
* Example:
*
* {{{
* val redis: Resource[IO, RedisCommands[IO, String, String]] =
* for {
* uri <- Resource.eval(RedisURI.make[IO]("redis://localhost"))
* cli <- RedisClient[IO](uri)
* cmd <- Redis[IO].fromClient(cli, RedisCodec.Utf8)
* } yield cmd
* }}}
*
* Note: if you don't need to create multiple connections, you might
* prefer to use either [[utf8]] or `simple` instead.
*/
def pooled[K, V](
client: RedisClient,
codec: RedisCodec[K, V],
poolSettings: PoolSettings = PoolSettings.default
)(implicit T: Temporal[F]): Resource[F, KeyPool[F, Unit, RedisCommands[F, K, V]]] = {
val cmdsResource: Resource[F, RedisCommands[F, K, V]] = fromClient(client, codec)
KeyPool
.Builder[F, Unit, RedisCommands[F, K, V]]((_: Unit) => cmdsResource)
.withMaxPerKey(Function.const(poolSettings.maxTotal))
.withMaxTotal(poolSettings.maxTotal)
.withMaxIdle(poolSettings.maxIdle)
.withIdleTimeAllowedInPool(poolSettings.idleTimeAllowedInPool)
.build
}

/**
* Creates a [[RedisCommands]] for a cluster connection.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2018-2021 ProfunKtor
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.profunktor.redis4cats

import cats.effect.IO
import dev.profunktor.redis4cats.connection._
import dev.profunktor.redis4cats.effect.Log.NoOp._
import io.lettuce.core.RedisCommandExecutionException
import org.typelevel.keypool.KeyPool

object RedisPoolDemo extends LoggerIOApp {
import Demo._

val usernameKey = "test"
val numericKey = "numeric"

val showResult: Option[String] => IO[Unit] =
_.fold(IO.println(s"Not found key: $usernameKey"))(IO.println)

// simple strings program
def p1(stringPool: KeyPool[IO, Unit, RedisCommands[IO, String, String]]): IO[Unit] = {
import dev.profunktor.redis4cats.Redis.Pool._
stringPool.withRedisCommands { redis =>
for {
x <- redis.get(usernameKey)
_ <- showResult(x)
_ <- redis.set(usernameKey, "some value")
y <- redis.get(usernameKey)
_ <- showResult(y)
_ <- redis.setNx(usernameKey, "should not happen")
w <- redis.get(usernameKey)
_ <- showResult(w)
} yield ()
}
}

// proof that you can still get it wrong with `incr` and `decr`, even if type-safe
def p2(
stringPool: KeyPool[IO, Unit, RedisCommands[IO, String, String]],
longPool: KeyPool[IO, Unit, RedisCommands[IO, String, Long]]
): IO[Unit] = {
import dev.profunktor.redis4cats.Redis.Pool._
stringPool.withRedisCommands { redis =>
longPool.withRedisCommands { redisN =>
for {
x <- redis.get(numericKey)
_ <- showResult(x)
_ <- redis.set(numericKey, "not a number")
y <- redis.get(numericKey)
_ <- showResult(y)
_ <- redisN.incr(numericKey).attempt.flatMap {
case Left(e: RedisCommandExecutionException) =>
IO(assert(e.getMessage == "ERR value is not an integer or out of range"))
case _ =>
IO.raiseError(new Exception("Expected error"))
}
w <- redis.get(numericKey)
_ <- showResult(w)
} yield ()
}
}
}

val program: IO[Unit] = {
val res: fs2.Stream[IO, Unit] =
for {
cli <- fs2.Stream.resource(RedisClient[IO].from(redisURI))
rd1 <- fs2.Stream.resource(Redis[IO].pooled(cli, stringCodec))
rd2 <- fs2.Stream.resource(Redis[IO].pooled(cli, longCodec))
_ <- fs2.Stream.eval(p1(rd1) *> p2(rd1, rd2))
} yield ()

res.compile.lastOrError

}

}
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ object Dependencies {
val circe = "0.14.6"
val fs2 = "3.9.3"
val log4cats = "2.6.0"
val keyPool = "0.4.8"

val lettuce = "6.3.0.RELEASE"
val logback = "1.4.14"
Expand All @@ -23,6 +24,7 @@ object Dependencies {

val catsEffectKernel = "org.typelevel" %% "cats-effect-kernel" % V.catsEffect
val fs2Core = "co.fs2" %% "fs2-core" % V.fs2
val keyPool = "org.typelevel" %% "keypool" % V.keyPool

val log4CatsCore = log4cats("core")

Expand Down

0 comments on commit 23c80b0

Please sign in to comment.