From 3a750ec5caa74de3d2e225e77d916fd7d1bd26c1 Mon Sep 17 00:00:00 2001 From: Yisrael Union Date: Sun, 24 Nov 2024 18:27:50 -0500 Subject: [PATCH] additional pubsub commands (#919) --- .../redis4cats/pubsub/PubSubCommands.scala | 6 ++- .../pubsub/internals/LivePubSubCommands.scala | 14 ++++++- .../pubsub/internals/LivePubSubStats.scala | 42 +++++++++++++++++-- .../pubsub/internals/Publisher.scala | 13 +++++- 4 files changed, 68 insertions(+), 7 deletions(-) diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/PubSubCommands.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/PubSubCommands.scala index a3966a6a..97479b6c 100644 --- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/PubSubCommands.scala +++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/PubSubCommands.scala @@ -21,9 +21,13 @@ import dev.profunktor.redis4cats.data._ import dev.profunktor.redis4cats.pubsub.data.Subscription trait PubSubStats[F[_], K] { - def pubSubChannels: F[List[K]] + def numPat: F[Long] + def numSub: F[List[Subscription[K]]] + def pubSubChannels: F[List[RedisChannel[K]]] + def pubSubShardChannels: F[List[RedisChannel[K]]] def pubSubSubscriptions(channel: RedisChannel[K]): F[Subscription[K]] def pubSubSubscriptions(channels: List[RedisChannel[K]]): F[List[Subscription[K]]] + def shardNumSub(channels: List[RedisChannel[K]]): F[List[Subscription[K]]] } trait PublishCommands[F[_], K, V] extends PubSubStats[F, K] { diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/LivePubSubCommands.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/LivePubSubCommands.scala index 31e054d2..1a41b86f 100644 --- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/LivePubSubCommands.scala +++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/LivePubSubCommands.scala @@ -58,13 +58,25 @@ private[pubsub] class LivePubSubCommands[F[_]: Async: Log, K, V]( Stream.eval(FutureLift[F].lift(pubConnection.async().publish(channel.underlying, message)).void) } - override def pubSubChannels: Stream[F, List[K]] = + override def numPat: Stream[F, Long] = + pubSubStats.numPat + + override def numSub: Stream[F, List[Subscription[K]]] = + pubSubStats.numSub + + override def pubSubChannels: Stream[F, List[RedisChannel[K]]] = pubSubStats.pubSubChannels + override def pubSubShardChannels: Stream[F, List[RedisChannel[K]]] = + pubSubStats.pubSubShardChannels + override def pubSubSubscriptions(channel: RedisChannel[K]): Stream[F, Subscription[K]] = pubSubStats.pubSubSubscriptions(channel) override def pubSubSubscriptions(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] = pubSubStats.pubSubSubscriptions(channels) + override def shardNumSub(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] = + pubSubStats.shardNumSub(channels) + } diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/LivePubSubStats.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/LivePubSubStats.scala index 4966ac22..23cd128e 100644 --- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/LivePubSubStats.scala +++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/LivePubSubStats.scala @@ -25,19 +25,42 @@ import dev.profunktor.redis4cats.effect.FutureLift import dev.profunktor.redis4cats.pubsub.data.Subscription import fs2.Stream import io.lettuce.core.pubsub.StatefulRedisPubSubConnection - import dev.profunktor.redis4cats.JavaConversions._ +import dev.profunktor.redis4cats.pubsub.internals.LivePubSubStats.toSubscription +import java.{ util => ju } +import java.lang.{ Long => JLong } private[pubsub] class LivePubSubStats[F[_]: FlatMap: FutureLift, K, V]( pubConnection: StatefulRedisPubSubConnection[K, V] ) extends PubSubStats[Stream[F, *], K] { - override def pubSubChannels: Stream[F, List[K]] = + override def numPat: Stream[F, Long] = + Stream + .eval { + FutureLift[F].lift(pubConnection.async().pubsubNumpat()) + } + .map(Long.unbox) + + override def numSub: Stream[F, List[Subscription[K]]] = + Stream + .eval { + FutureLift[F].lift(pubConnection.async().pubsubNumsub()) + } + .map(toSubscription[K]) + + override def pubSubChannels: Stream[F, List[RedisChannel[K]]] = Stream .eval { FutureLift[F].lift(pubConnection.async().pubsubChannels()) } - .map(_.asScala.toList) + .map(_.asScala.toList.map(RedisChannel[K])) + + override def pubSubShardChannels: Stream[F, List[RedisChannel[K]]] = + Stream + .eval { + FutureLift[F].lift(pubConnection.async().pubsubShardChannels()) + } + .map(_.asScala.toList.map(RedisChannel[K])) override def pubSubSubscriptions(channel: RedisChannel[K]): Stream[F, Subscription[K]] = pubSubSubscriptions(List(channel)).map(_.headOption).unNone @@ -46,7 +69,18 @@ private[pubsub] class LivePubSubStats[F[_]: FlatMap: FutureLift, K, V]( Stream.eval { FutureLift[F] .lift(pubConnection.async().pubsubNumsub(channels.map(_.underlying): _*)) - .map(_.asScala.toList.map { case (k, n) => Subscription(RedisChannel[K](k), n) }) + .map(toSubscription[K]) } + override def shardNumSub(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] = + Stream + .eval { + FutureLift[F].lift(pubConnection.async().pubsubShardNumsub(channels.map(_.underlying): _*)) + } + .map(toSubscription[K]) + +} +object LivePubSubStats { + private def toSubscription[K](map: ju.Map[K, JLong]): List[Subscription[K]] = + map.asScala.toList.map { case (k, n) => Subscription(RedisChannel[K](k), Long.unbox(n)) } } diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/Publisher.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/Publisher.scala index befa32ae..667eb6fa 100644 --- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/Publisher.scala +++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/Publisher.scala @@ -35,7 +35,7 @@ private[pubsub] class Publisher[F[_]: FlatMap: FutureLift, K, V]( override def publish(channel: RedisChannel[K]): Stream[F, V] => Stream[F, Unit] = _.evalMap(message => FutureLift[F].lift(pubConnection.async().publish(channel.underlying, message)).void) - override def pubSubChannels: Stream[F, List[K]] = + override def pubSubChannels: Stream[F, List[RedisChannel[K]]] = pubSubStats.pubSubChannels override def pubSubSubscriptions(channel: RedisChannel[K]): Stream[F, Subscription[K]] = @@ -44,4 +44,15 @@ private[pubsub] class Publisher[F[_]: FlatMap: FutureLift, K, V]( override def pubSubSubscriptions(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] = pubSubStats.pubSubSubscriptions(channels) + override def numPat: Stream[F, Long] = + pubSubStats.numPat + + override def numSub: Stream[F, List[Subscription[K]]] = + pubSubStats.numSub + + override def pubSubShardChannels: Stream[F, List[RedisChannel[K]]] = + pubSubStats.pubSubShardChannels + + override def shardNumSub(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] = + pubSubStats.shardNumSub(channels) }