Skip to content

Commit

Permalink
Server Endpoints and Effect Monad (#95)
Browse files Browse the repository at this point in the history
* Server endpoints are runAsync based on the cats.effect.Effect instance
* Fixes IO Capture
  • Loading branch information
juanpedromoreno authored Dec 5, 2017
1 parent 72e19c5 commit f57aa95
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 53 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ lazy val rpc = project
%%("frees-core", freesV),
%%("frees-async", freesV),
%%("frees-async-guava", freesV),
%%("frees-async-cats-effect", freesV),
%%("frees-config", freesV),
%%("frees-logging", freesV),
%%("frees-tagless", freesV),
Expand Down
58 changes: 48 additions & 10 deletions rpc/src/main/scala/RPCAsyncImplicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package freestyle
package rpc

import cats.effect.IO
import cats.{~>, Comonad}
import freestyle.rpc.client.handlers._
import journal.Logger
Expand All @@ -27,11 +28,23 @@ import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._

trait RPCAsyncImplicits extends freestyle.async.Implicits {
trait IOCapture {

implicit val ioCapture: Capture[IO] = new Capture[IO] {
override def capture[A](a: => A): IO[A] = IO(a)
}

}

trait BaseAsync extends freestyle.async.Implicits {

protected[this] val asyncLogger: Logger = Logger[this.type]
protected[this] val atMostDuration: FiniteDuration = 10.seconds

}

trait FutureAsyncInstances extends BaseAsync {

implicit def futureComonad(implicit EC: ExecutionContext): Comonad[Future] =
new Comonad[Future] {
def extract[A](x: Future[A]): A = {
Expand All @@ -45,6 +58,18 @@ trait RPCAsyncImplicits extends freestyle.async.Implicits {
fa.map(f)
}

implicit val future2Task: Future ~> Task =
new (Future ~> Task) {
override def apply[A](fa: Future[A]): Task[A] = {
asyncLogger.info(s"${Thread.currentThread().getName} Deferring Future to Task...")
Task.deferFuture(fa)
}
}

}

trait TaskAsyncInstances extends BaseAsync {

implicit def taskComonad(implicit S: Scheduler): Comonad[Task] =
new Comonad[Task] {
def extract[A](x: Task[A]): A = {
Expand All @@ -58,19 +83,32 @@ trait RPCAsyncImplicits extends freestyle.async.Implicits {
fa.map(f)
}

implicit def task2Future(implicit S: Scheduler): FSHandler[Task, Future] =
implicit def task2Future(implicit S: Scheduler): Task ~> Future =
new TaskMHandler[Future]

implicit val future2Task: Future ~> Task =
new (Future ~> Task) {
override def apply[A](fa: Future[A]): Task[A] = {
asyncLogger.info(s"${Thread.currentThread().getName} Deferring Future to Task...")
Task.deferFuture(fa)
}
}

implicit val task2Task: Task ~> Task = new (Task ~> Task) {
override def apply[A](fa: Task[A]): Task[A] = fa
}

implicit def task2IO(implicit S: Scheduler): Task ~> IO = new (Task ~> IO) {
override def apply[A](fa: Task[A]): IO[A] = fa.toIO
}
}

trait IOAsyncInstances extends BaseAsync {

implicit val ioComonad: Comonad[IO] = new Comonad[IO] {

override def extract[A](x: IO[A]): A = x.unsafeRunSync()

override def coflatMap[A, B](fa: IO[A])(f: IO[A] => B): IO[B] = IO.pure(f(fa))

override def map[A, B](fa: IO[A])(f: A => B): IO[B] = fa.map(f)
}

implicit val io2Task: IO ~> Task = new (IO ~> Task) {
override def apply[A](fa: IO[A]): Task[A] = fa.to[Task]
}
}

trait RPCAsyncImplicits extends FutureAsyncInstances with TaskAsyncInstances with IOAsyncInstances
24 changes: 13 additions & 11 deletions rpc/src/main/scala/internal/service/calls.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package rpc
package internal
package service

import cats.effect.{Effect, IO}
import cats.effect.implicits._
import cats.implicits._
import cats.{~>, Comonad, MonadError}
import cats.{~>, Comonad}
import io.grpc.stub.ServerCalls.{
BidiStreamingMethod,
ClientStreamingMethod,
Expand All @@ -41,16 +43,18 @@ object calls {
import converters._

def unaryMethod[F[_], Req, Res](f: (Req) => F[Res])(
implicit ME: MonadError[F, Throwable]): UnaryMethod[Req, Res] =
implicit EFF: Effect[F]): UnaryMethod[Req, Res] =
new UnaryMethod[Req, Res] {
override def invoke(request: Req, responseObserver: StreamObserver[Res]): Unit = {
ME.attempt(f(request)).map(completeObserver(responseObserver))
(): Unit
}
override def invoke(request: Req, responseObserver: StreamObserver[Res]): Unit =
EFF
.attempt(f(request))
.map(completeObserver(responseObserver))
.runAsync(_ => IO.pure(()))
.unsafeRunAsync(_ => ())
}

def clientStreamingMethod[F[_], Req, Res](f: (Observable[Req]) => F[Res])(
implicit ME: MonadError[F, Throwable],
implicit EFF: Effect[F],
HTask: F ~> Task,
S: Scheduler): ClientStreamingMethod[Req, Res] = new ClientStreamingMethod[Req, Res] {

Expand All @@ -63,8 +67,7 @@ object calls {
}

def serverStreamingMethod[F[_], Req, Res](f: (Req) => F[Observable[Res]])(
implicit ME: MonadError[F, Throwable],
C: Comonad[F],
implicit C: Comonad[F],
S: Scheduler): ServerStreamingMethod[Req, Res] = new ServerStreamingMethod[Req, Res] {

override def invoke(request: Req, responseObserver: StreamObserver[Res]): Unit = {
Expand All @@ -74,8 +77,7 @@ object calls {
}

def bidiStreamingMethod[F[_], Req, Res](f: (Observable[Req]) => F[Observable[Res]])(
implicit ME: MonadError[F, Throwable],
C: Comonad[F],
implicit C: Comonad[F],
S: Scheduler): BidiStreamingMethod[Req, Res] = new BidiStreamingMethod[Req, Res] {

override def invoke(responseObserver: StreamObserver[Res]): StreamObserver[Req] = {
Expand Down
2 changes: 1 addition & 1 deletion rpc/src/main/scala/internal/service/service.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ trait RPCService {
val serviceBindings: Defn.Def = {
val args: Seq[Term.Tuple] = requests.map(_.call)
q"""
def bindService[F[_]](implicit algebra: $algName[F], HTask: _root_.freestyle.FSHandler[F, _root_.monix.eval.Task], ME: _root_.cats.MonadError[F, Throwable], C: _root_.cats.Comonad[F], S: _root_.monix.execution.Scheduler): _root_.io.grpc.ServerServiceDefinition =
def bindService[F[_]](implicit algebra: $algName[F], EFF: _root_.cats.effect.Effect[F], HTask: _root_.freestyle.FSHandler[F, _root_.monix.eval.Task], C: _root_.cats.Comonad[F], S: _root_.monix.execution.Scheduler): _root_.io.grpc.ServerServiceDefinition =
new _root_.freestyle.rpc.internal.service.GRPCServiceDefBuilder(${Lit.String(
algName.value)}, ..$args).apply
"""
Expand Down
1 change: 1 addition & 0 deletions rpc/src/main/scala/server/implicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ trait Helpers {

object implicits
extends CaptureInstances
with IOCapture
with RPCAsyncImplicits
with Syntax
with Helpers
Expand Down
34 changes: 19 additions & 15 deletions rpc/src/test/scala/TaglessUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package freestyle.rpc

import cats.effect.IO
import cats.{~>, Monad, MonadError}
import freestyle._
import freestyle.asyncCatsEffect.implicits._
import freestyle.rpc.client._
import freestyle.rpc.protocol._
import freestyle.rpc.server._
Expand All @@ -26,11 +28,13 @@ import monix.eval.Task
import monix.reactive.Observable

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.Await
import scala.util.{Failure, Success, Try}

object TaglessUtils {

type ConcurrentMonad[A] = IO[A]

object service {

@message
Expand Down Expand Up @@ -264,7 +268,7 @@ object TaglessUtils {
val channelConfigList: List[ManagedChannelConfig] = List(UsePlaintext(true))

val managedChannelInterpreter =
new ManagedChannelInterpreter[Future](channelFor, channelConfigList)
new ManagedChannelInterpreter[ConcurrentMonad](channelFor, channelConfigList)

managedChannelInterpreter.build(channelFor, channelConfigList)
}
Expand Down Expand Up @@ -314,41 +318,41 @@ object TaglessUtils {
import freestyle.rpc.server.implicits._
import freestyle.rpc.server.handlers._

implicit val ec: ExecutionContext = ExecutionContext.Implicits.global
implicit val S: monix.execution.Scheduler = monix.execution.Scheduler.Implicits.global

//////////////////////////////////
// Server Runtime Configuration //
//////////////////////////////////

implicit val taglessRPCHandler: TaglessRPCService.Handler[Future] =
new TaglessRPCServiceServerHandler[Future]
implicit val taglessRPCHandler: TaglessRPCService.Handler[ConcurrentMonad] =
new TaglessRPCServiceServerHandler[ConcurrentMonad]

val grpcConfigs: List[GrpcConfig] = List(
AddService(TaglessRPCService.bindService[Future])
AddService(TaglessRPCService.bindService[ConcurrentMonad])
)

implicit val grpcServerHandler: GrpcServer.Op ~> Future =
new GrpcServerHandler[Future] andThen
new GrpcKInterpreter[Future](createServerConf(grpcConfigs).server)
implicit val grpcServerHandler: GrpcServer.Op ~> ConcurrentMonad =
new GrpcServerHandler[ConcurrentMonad] andThen
new GrpcKInterpreter[ConcurrentMonad](createServerConf(grpcConfigs).server)

//////////////////////////////////
// Client Runtime Configuration //
//////////////////////////////////

implicit val taglessRPCServiceClient: TaglessRPCService.Client[Future] =
TaglessRPCService.client[Future](createManagedChannel)
implicit val taglessRPCServiceClient: TaglessRPCService.Client[ConcurrentMonad] =
TaglessRPCService.client[ConcurrentMonad](createManagedChannel)

implicit val taglessRPCServiceClientHandler: TaglessRPCServiceClientHandler[Future] =
new TaglessRPCServiceClientHandler[Future]
implicit val taglessRPCServiceClientHandler: TaglessRPCServiceClientHandler[ConcurrentMonad] =
new TaglessRPCServiceClientHandler[ConcurrentMonad]

////////////
// Syntax //
////////////

implicit class InterpreterOps[F[_], A](fs: FreeS[F, A])(implicit H: FSHandler[F, Future]) {
implicit class InterpreterOps[F[_], A](fs: FreeS[F, A])(
implicit H: FSHandler[F, ConcurrentMonad]) {

def runF: A = Await.result(fs.interpret[Future], Duration.Inf)
def runF: A = Await.result(fs.interpret[ConcurrentMonad].unsafeToFuture(), Duration.Inf)

}

Expand Down
34 changes: 19 additions & 15 deletions rpc/src/test/scala/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package freestyle.rpc

import cats.effect.IO
import cats.{~>, Monad, MonadError}
import freestyle._
import freestyle.rpc.Utils.database.a4
import freestyle.asyncCatsEffect.implicits._
import freestyle.rpc.client._
import freestyle.rpc.protocol._
import freestyle.rpc.server._
Expand All @@ -27,11 +29,13 @@ import monix.eval.Task
import monix.reactive.Observable

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.Await
import scala.util.{Failure, Success, Try}

object Utils {

type ConcurrentMonad[A] = IO[A]

object service {

@message
Expand Down Expand Up @@ -263,7 +267,7 @@ object Utils {
val channelConfigList: List[ManagedChannelConfig] = List(UsePlaintext(true))

val managedChannelInterpreter =
new ManagedChannelInterpreter[Future](channelFor, channelConfigList)
new ManagedChannelInterpreter[ConcurrentMonad](channelFor, channelConfigList)

managedChannelInterpreter.build(channelFor, channelConfigList)
}
Expand Down Expand Up @@ -313,41 +317,41 @@ object Utils {
import freestyle.rpc.server.implicits._
import freestyle.rpc.server.handlers._

implicit val ec: ExecutionContext = ExecutionContext.Implicits.global
implicit val S: monix.execution.Scheduler = monix.execution.Scheduler.Implicits.global

//////////////////////////////////
// Server Runtime Configuration //
//////////////////////////////////

implicit val freesRPCHandler: ServerRPCService[Future] =
new ServerRPCService[Future]
implicit val freesRPCHandler: ServerRPCService[ConcurrentMonad] =
new ServerRPCService[ConcurrentMonad]

val grpcConfigs: List[GrpcConfig] = List(
AddService(RPCService.bindService[Future])
AddService(RPCService.bindService[ConcurrentMonad])
)

implicit val grpcServerHandler: GrpcServer.Op ~> Future =
new GrpcServerHandler[Future] andThen
new GrpcKInterpreter[Future](createServerConf(grpcConfigs).server)
implicit val grpcServerHandler: GrpcServer.Op ~> ConcurrentMonad =
new GrpcServerHandler[ConcurrentMonad] andThen
new GrpcKInterpreter[ConcurrentMonad](createServerConf(grpcConfigs).server)

//////////////////////////////////
// Client Runtime Configuration //
//////////////////////////////////

implicit val freesRPCServiceClient: RPCService.Client[Future] =
RPCService.client[Future](createManagedChannel)
implicit val freesRPCServiceClient: RPCService.Client[ConcurrentMonad] =
RPCService.client[ConcurrentMonad](createManagedChannel)

implicit val freesRPCServiceClientHandler: FreesRPCServiceClientHandler[Future] =
new FreesRPCServiceClientHandler[Future]
implicit val freesRPCServiceClientHandler: FreesRPCServiceClientHandler[ConcurrentMonad] =
new FreesRPCServiceClientHandler[ConcurrentMonad]

////////////
// Syntax //
////////////

implicit class InterpreterOps[F[_], A](fs: FreeS[F, A])(implicit H: FSHandler[F, Future]) {
implicit class InterpreterOps[F[_], A](fs: FreeS[F, A])(
implicit H: FSHandler[F, ConcurrentMonad]) {

def runF: A = Await.result(fs.interpret[Future], Duration.Inf)
def runF: A = Await.result(fs.interpret[ConcurrentMonad].unsafeToFuture(), Duration.Inf)

}

Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.4.1-SNAPSHOT"
version in ThisBuild := "0.4.1"

0 comments on commit f57aa95

Please sign in to comment.