diff --git a/build.sbt b/build.sbt index b1c2254e3..2201144fe 100644 --- a/build.sbt +++ b/build.sbt @@ -42,6 +42,7 @@ lazy val rpc = project %%("frees-core", freesV), %%("frees-async", freesV), %%("frees-async-guava", freesV), + %%("frees-async-cats-effect", freesV), %%("frees-config", freesV), %%("frees-logging", freesV), %%("frees-tagless", freesV), diff --git a/rpc/src/main/scala/RPCAsyncImplicits.scala b/rpc/src/main/scala/RPCAsyncImplicits.scala index c27640d28..49b434313 100644 --- a/rpc/src/main/scala/RPCAsyncImplicits.scala +++ b/rpc/src/main/scala/RPCAsyncImplicits.scala @@ -17,6 +17,7 @@ package freestyle package rpc +import cats.effect.IO import cats.{~>, Comonad} import freestyle.rpc.client.handlers._ import journal.Logger @@ -27,11 +28,23 @@ import scala.concurrent.duration.FiniteDuration import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ -trait RPCAsyncImplicits extends freestyle.async.Implicits { +trait IOCapture { + + implicit val ioCapture: Capture[IO] = new Capture[IO] { + override def capture[A](a: => A): IO[A] = IO(a) + } + +} + +trait BaseAsync extends freestyle.async.Implicits { protected[this] val asyncLogger: Logger = Logger[this.type] protected[this] val atMostDuration: FiniteDuration = 10.seconds +} + +trait FutureAsyncInstances extends BaseAsync { + implicit def futureComonad(implicit EC: ExecutionContext): Comonad[Future] = new Comonad[Future] { def extract[A](x: Future[A]): A = { @@ -45,6 +58,18 @@ trait RPCAsyncImplicits extends freestyle.async.Implicits { fa.map(f) } + implicit val future2Task: Future ~> Task = + new (Future ~> Task) { + override def apply[A](fa: Future[A]): Task[A] = { + asyncLogger.info(s"${Thread.currentThread().getName} Deferring Future to Task...") + Task.deferFuture(fa) + } + } + +} + +trait TaskAsyncInstances extends BaseAsync { + implicit def taskComonad(implicit S: Scheduler): Comonad[Task] = new Comonad[Task] { def extract[A](x: Task[A]): A = { @@ -58,19 +83,32 @@ trait RPCAsyncImplicits extends freestyle.async.Implicits { fa.map(f) } - implicit def task2Future(implicit S: Scheduler): FSHandler[Task, Future] = + implicit def task2Future(implicit S: Scheduler): Task ~> Future = new TaskMHandler[Future] - implicit val future2Task: Future ~> Task = - new (Future ~> Task) { - override def apply[A](fa: Future[A]): Task[A] = { - asyncLogger.info(s"${Thread.currentThread().getName} Deferring Future to Task...") - Task.deferFuture(fa) - } - } - implicit val task2Task: Task ~> Task = new (Task ~> Task) { override def apply[A](fa: Task[A]): Task[A] = fa } + implicit def task2IO(implicit S: Scheduler): Task ~> IO = new (Task ~> IO) { + override def apply[A](fa: Task[A]): IO[A] = fa.toIO + } } + +trait IOAsyncInstances extends BaseAsync { + + implicit val ioComonad: Comonad[IO] = new Comonad[IO] { + + override def extract[A](x: IO[A]): A = x.unsafeRunSync() + + override def coflatMap[A, B](fa: IO[A])(f: IO[A] => B): IO[B] = IO.pure(f(fa)) + + override def map[A, B](fa: IO[A])(f: A => B): IO[B] = fa.map(f) + } + + implicit val io2Task: IO ~> Task = new (IO ~> Task) { + override def apply[A](fa: IO[A]): Task[A] = fa.to[Task] + } +} + +trait RPCAsyncImplicits extends FutureAsyncInstances with TaskAsyncInstances with IOAsyncInstances diff --git a/rpc/src/main/scala/internal/service/calls.scala b/rpc/src/main/scala/internal/service/calls.scala index 3f209179d..4a93659e5 100644 --- a/rpc/src/main/scala/internal/service/calls.scala +++ b/rpc/src/main/scala/internal/service/calls.scala @@ -19,8 +19,10 @@ package rpc package internal package service +import cats.effect.{Effect, IO} +import cats.effect.implicits._ import cats.implicits._ -import cats.{~>, Comonad, MonadError} +import cats.{~>, Comonad} import io.grpc.stub.ServerCalls.{ BidiStreamingMethod, ClientStreamingMethod, @@ -41,16 +43,18 @@ object calls { import converters._ def unaryMethod[F[_], Req, Res](f: (Req) => F[Res])( - implicit ME: MonadError[F, Throwable]): UnaryMethod[Req, Res] = + implicit EFF: Effect[F]): UnaryMethod[Req, Res] = new UnaryMethod[Req, Res] { - override def invoke(request: Req, responseObserver: StreamObserver[Res]): Unit = { - ME.attempt(f(request)).map(completeObserver(responseObserver)) - (): Unit - } + override def invoke(request: Req, responseObserver: StreamObserver[Res]): Unit = + EFF + .attempt(f(request)) + .map(completeObserver(responseObserver)) + .runAsync(_ => IO.pure(())) + .unsafeRunAsync(_ => ()) } def clientStreamingMethod[F[_], Req, Res](f: (Observable[Req]) => F[Res])( - implicit ME: MonadError[F, Throwable], + implicit EFF: Effect[F], HTask: F ~> Task, S: Scheduler): ClientStreamingMethod[Req, Res] = new ClientStreamingMethod[Req, Res] { @@ -63,8 +67,7 @@ object calls { } def serverStreamingMethod[F[_], Req, Res](f: (Req) => F[Observable[Res]])( - implicit ME: MonadError[F, Throwable], - C: Comonad[F], + implicit C: Comonad[F], S: Scheduler): ServerStreamingMethod[Req, Res] = new ServerStreamingMethod[Req, Res] { override def invoke(request: Req, responseObserver: StreamObserver[Res]): Unit = { @@ -74,8 +77,7 @@ object calls { } def bidiStreamingMethod[F[_], Req, Res](f: (Observable[Req]) => F[Observable[Res]])( - implicit ME: MonadError[F, Throwable], - C: Comonad[F], + implicit C: Comonad[F], S: Scheduler): BidiStreamingMethod[Req, Res] = new BidiStreamingMethod[Req, Res] { override def invoke(responseObserver: StreamObserver[Res]): StreamObserver[Req] = { diff --git a/rpc/src/main/scala/internal/service/service.scala b/rpc/src/main/scala/internal/service/service.scala index 04f7ec68a..1cbb262d3 100644 --- a/rpc/src/main/scala/internal/service/service.scala +++ b/rpc/src/main/scala/internal/service/service.scala @@ -92,7 +92,7 @@ trait RPCService { val serviceBindings: Defn.Def = { val args: Seq[Term.Tuple] = requests.map(_.call) q""" - def bindService[F[_]](implicit algebra: $algName[F], HTask: _root_.freestyle.FSHandler[F, _root_.monix.eval.Task], ME: _root_.cats.MonadError[F, Throwable], C: _root_.cats.Comonad[F], S: _root_.monix.execution.Scheduler): _root_.io.grpc.ServerServiceDefinition = + def bindService[F[_]](implicit algebra: $algName[F], EFF: _root_.cats.effect.Effect[F], HTask: _root_.freestyle.FSHandler[F, _root_.monix.eval.Task], C: _root_.cats.Comonad[F], S: _root_.monix.execution.Scheduler): _root_.io.grpc.ServerServiceDefinition = new _root_.freestyle.rpc.internal.service.GRPCServiceDefBuilder(${Lit.String( algName.value)}, ..$args).apply """ diff --git a/rpc/src/main/scala/server/implicits.scala b/rpc/src/main/scala/server/implicits.scala index 79103fb4f..86d0d82e3 100644 --- a/rpc/src/main/scala/server/implicits.scala +++ b/rpc/src/main/scala/server/implicits.scala @@ -65,6 +65,7 @@ trait Helpers { object implicits extends CaptureInstances + with IOCapture with RPCAsyncImplicits with Syntax with Helpers diff --git a/rpc/src/test/scala/TaglessUtils.scala b/rpc/src/test/scala/TaglessUtils.scala index 390fb97eb..f4ffb27b4 100644 --- a/rpc/src/test/scala/TaglessUtils.scala +++ b/rpc/src/test/scala/TaglessUtils.scala @@ -16,8 +16,10 @@ package freestyle.rpc +import cats.effect.IO import cats.{~>, Monad, MonadError} import freestyle._ +import freestyle.asyncCatsEffect.implicits._ import freestyle.rpc.client._ import freestyle.rpc.protocol._ import freestyle.rpc.server._ @@ -26,11 +28,13 @@ import monix.eval.Task import monix.reactive.Observable import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.Await import scala.util.{Failure, Success, Try} object TaglessUtils { + type ConcurrentMonad[A] = IO[A] + object service { @message @@ -264,7 +268,7 @@ object TaglessUtils { val channelConfigList: List[ManagedChannelConfig] = List(UsePlaintext(true)) val managedChannelInterpreter = - new ManagedChannelInterpreter[Future](channelFor, channelConfigList) + new ManagedChannelInterpreter[ConcurrentMonad](channelFor, channelConfigList) managedChannelInterpreter.build(channelFor, channelConfigList) } @@ -314,41 +318,41 @@ object TaglessUtils { import freestyle.rpc.server.implicits._ import freestyle.rpc.server.handlers._ - implicit val ec: ExecutionContext = ExecutionContext.Implicits.global implicit val S: monix.execution.Scheduler = monix.execution.Scheduler.Implicits.global ////////////////////////////////// // Server Runtime Configuration // ////////////////////////////////// - implicit val taglessRPCHandler: TaglessRPCService.Handler[Future] = - new TaglessRPCServiceServerHandler[Future] + implicit val taglessRPCHandler: TaglessRPCService.Handler[ConcurrentMonad] = + new TaglessRPCServiceServerHandler[ConcurrentMonad] val grpcConfigs: List[GrpcConfig] = List( - AddService(TaglessRPCService.bindService[Future]) + AddService(TaglessRPCService.bindService[ConcurrentMonad]) ) - implicit val grpcServerHandler: GrpcServer.Op ~> Future = - new GrpcServerHandler[Future] andThen - new GrpcKInterpreter[Future](createServerConf(grpcConfigs).server) + implicit val grpcServerHandler: GrpcServer.Op ~> ConcurrentMonad = + new GrpcServerHandler[ConcurrentMonad] andThen + new GrpcKInterpreter[ConcurrentMonad](createServerConf(grpcConfigs).server) ////////////////////////////////// // Client Runtime Configuration // ////////////////////////////////// - implicit val taglessRPCServiceClient: TaglessRPCService.Client[Future] = - TaglessRPCService.client[Future](createManagedChannel) + implicit val taglessRPCServiceClient: TaglessRPCService.Client[ConcurrentMonad] = + TaglessRPCService.client[ConcurrentMonad](createManagedChannel) - implicit val taglessRPCServiceClientHandler: TaglessRPCServiceClientHandler[Future] = - new TaglessRPCServiceClientHandler[Future] + implicit val taglessRPCServiceClientHandler: TaglessRPCServiceClientHandler[ConcurrentMonad] = + new TaglessRPCServiceClientHandler[ConcurrentMonad] //////////// // Syntax // //////////// - implicit class InterpreterOps[F[_], A](fs: FreeS[F, A])(implicit H: FSHandler[F, Future]) { + implicit class InterpreterOps[F[_], A](fs: FreeS[F, A])( + implicit H: FSHandler[F, ConcurrentMonad]) { - def runF: A = Await.result(fs.interpret[Future], Duration.Inf) + def runF: A = Await.result(fs.interpret[ConcurrentMonad].unsafeToFuture(), Duration.Inf) } diff --git a/rpc/src/test/scala/Utils.scala b/rpc/src/test/scala/Utils.scala index 6438a55af..747ac271a 100644 --- a/rpc/src/test/scala/Utils.scala +++ b/rpc/src/test/scala/Utils.scala @@ -16,9 +16,11 @@ package freestyle.rpc +import cats.effect.IO import cats.{~>, Monad, MonadError} import freestyle._ import freestyle.rpc.Utils.database.a4 +import freestyle.asyncCatsEffect.implicits._ import freestyle.rpc.client._ import freestyle.rpc.protocol._ import freestyle.rpc.server._ @@ -27,11 +29,13 @@ import monix.eval.Task import monix.reactive.Observable import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.Await import scala.util.{Failure, Success, Try} object Utils { + type ConcurrentMonad[A] = IO[A] + object service { @message @@ -263,7 +267,7 @@ object Utils { val channelConfigList: List[ManagedChannelConfig] = List(UsePlaintext(true)) val managedChannelInterpreter = - new ManagedChannelInterpreter[Future](channelFor, channelConfigList) + new ManagedChannelInterpreter[ConcurrentMonad](channelFor, channelConfigList) managedChannelInterpreter.build(channelFor, channelConfigList) } @@ -313,41 +317,41 @@ object Utils { import freestyle.rpc.server.implicits._ import freestyle.rpc.server.handlers._ - implicit val ec: ExecutionContext = ExecutionContext.Implicits.global implicit val S: monix.execution.Scheduler = monix.execution.Scheduler.Implicits.global ////////////////////////////////// // Server Runtime Configuration // ////////////////////////////////// - implicit val freesRPCHandler: ServerRPCService[Future] = - new ServerRPCService[Future] + implicit val freesRPCHandler: ServerRPCService[ConcurrentMonad] = + new ServerRPCService[ConcurrentMonad] val grpcConfigs: List[GrpcConfig] = List( - AddService(RPCService.bindService[Future]) + AddService(RPCService.bindService[ConcurrentMonad]) ) - implicit val grpcServerHandler: GrpcServer.Op ~> Future = - new GrpcServerHandler[Future] andThen - new GrpcKInterpreter[Future](createServerConf(grpcConfigs).server) + implicit val grpcServerHandler: GrpcServer.Op ~> ConcurrentMonad = + new GrpcServerHandler[ConcurrentMonad] andThen + new GrpcKInterpreter[ConcurrentMonad](createServerConf(grpcConfigs).server) ////////////////////////////////// // Client Runtime Configuration // ////////////////////////////////// - implicit val freesRPCServiceClient: RPCService.Client[Future] = - RPCService.client[Future](createManagedChannel) + implicit val freesRPCServiceClient: RPCService.Client[ConcurrentMonad] = + RPCService.client[ConcurrentMonad](createManagedChannel) - implicit val freesRPCServiceClientHandler: FreesRPCServiceClientHandler[Future] = - new FreesRPCServiceClientHandler[Future] + implicit val freesRPCServiceClientHandler: FreesRPCServiceClientHandler[ConcurrentMonad] = + new FreesRPCServiceClientHandler[ConcurrentMonad] //////////// // Syntax // //////////// - implicit class InterpreterOps[F[_], A](fs: FreeS[F, A])(implicit H: FSHandler[F, Future]) { + implicit class InterpreterOps[F[_], A](fs: FreeS[F, A])( + implicit H: FSHandler[F, ConcurrentMonad]) { - def runF: A = Await.result(fs.interpret[Future], Duration.Inf) + def runF: A = Await.result(fs.interpret[ConcurrentMonad].unsafeToFuture(), Duration.Inf) } diff --git a/version.sbt b/version.sbt index a6c955bea..22edbfc8c 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.4.1-SNAPSHOT" \ No newline at end of file +version in ThisBuild := "0.4.1" \ No newline at end of file