Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] otel4s #178

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ curl -i --header "context-id: mycontextid" localhost:9001/api/v1/trace
---
### Example 2: OpenTelemetry agent

* app3 and app4 are Play Framework applications, while app5 is Zio HTTP
* app3 and app4 are Play Framework applications, while app5 is Zio HTTP, and app6 is Http4s
* All apps use OpenTelemetry for monitoring.
* OpenTelemetry agent sends tracing and metrics to OTEL.
* Prometheus:
* http://localhost:9093
* http://localhost:9094
* http://localhost:9095
* http://localhost:9096
* jaeger: http://localhost:16686

```
Expand Down
4 changes: 3 additions & 1 deletion app3/app/service/TraceService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ class TraceService @Inject() (ws: WSClient, config: Configuration)

private lazy val api4URL = config.get[String]("app4.url")
private lazy val api5URL = config.get[String]("app5.url")
private lazy val api6URL = config.get[String]("app6.url")

def getTrace(ctxId: String)(implicit
mc: MarkerContext,
ex: ExecutionContext
): Future[TraceResponse] = {
for {
_ <- getTrace(ctxId, api4URL)
result <- getTrace(ctxId, api5URL)
_ <- getTrace(ctxId, api5URL)
result <- getTrace(ctxId, api6URL)
} yield result
}

Expand Down
5 changes: 5 additions & 0 deletions app3/conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ app5 {
url = ${?APP5_URL}
}

app6 {
url = "http://localhost:9006/api/v1/trace"
url = ${?APP6_URL}
}

play.filters.hosts {
allowed = ["."]
}
2 changes: 1 addition & 1 deletion app4/app/controllers/AppController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class AppController(
def trace: Action[AnyContent] = Action.async { implicit request =>
implicit val mc: MarkerContext =
requestHeaderToMarkerContext(request.headers)
logger.info("trace request")
logger.info(s"trace request - headers ${request.headers.headers}")
Future(
Ok(
Json.toJson(
Expand Down
2 changes: 1 addition & 1 deletion app5/src/main/scala/controllers/AppController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object AppController {
case req @ (Method.GET -> !! / "api" / "v1" / "trace") =>
{
for {
_ <- ZIO.logInfo(s"trace request")
_ <- ZIO.logInfo(s"trace request - headers ${req.headersAsList}")
} yield Response.json(
TraceResponse(Span.current().getSpanContext.getTraceId).toJson
)
Expand Down
73 changes: 73 additions & 0 deletions app6/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>

<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${application.home:-.}/logs/application.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${application.home:-.}/logs/bff-log-%d{yyyy-MM-dd}.gz</fileNamePattern>
<maxHistory>30</maxHistory>
<totalSizeCap>3GB</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>%date{"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",UTC} [%marker] %logger [%mdc] - %msg%n</pattern>
</encoder>
</appender>


<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",UTC} [%marker] %logger [%mdc] - %msg%n</pattern>
</encoder>
</appender>

<appender name="OPENTELEMETRY"
class="io.opentelemetry.instrumentation.logback.appender.v1_0.OpenTelemetryAppender"
captureMdcAttributes="true" >
</appender>


<!--
<appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>logstash:6000</destination>
<keepAliveDuration>5 minutes</keepAliveDuration>
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<customFields>{"appname": "n-sender", "environment": "default"}</customFields>
</encoder>
</appender>
-->

<appender name="ASYNCFILE" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>500</queueSize>
<discardingThreshold>0</discardingThreshold>
<appender-ref ref="FILE" />
</appender>

<appender name="ASYNCSTDOUT" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>500</queueSize>
<discardingThreshold>0</discardingThreshold>
<appender-ref ref="STDOUT" />
</appender>

<!--
<appender name="ASYNCLOGSTASH" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="LOGSTASH" />
</appender>
-->


<turboFilter class="ch.qos.logback.classic.turbo.MarkerFilter">
<Name>TRACER_FILTER</Name>
<Marker>TRACER</Marker>
<OnMatch>ACCEPT</OnMatch>
</turboFilter>

<logger name="play" level="INFO" />
<logger name="application" level="DEBUG" />

<root level="INFO">
<appender-ref ref="ASYNCFILE" />
<appender-ref ref="ASYNCSTDOUT" />
<appender-ref ref="OPENTELEMETRY"/>
<!--<appender-ref ref="ASYNCLOGSTASH" />-->
</root>
</configuration>
21 changes: 21 additions & 0 deletions app6/src/main/scala/MainApp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import cats.effect.*
import controllers.AppController
import org.http4s.blaze.server.BlazeServerBuilder
import utils.{OtelResource, ServerMiddleware}
object MainApp extends IOApp with ServerMiddleware {

override def run(args: List[String]): IO[ExitCode] = {
val serverResource: Resource[IO, Unit] = for {
otel <- OtelResource.apply[IO]
tracer <- Resource.eval(otel.tracerProvider.get("my-tracer"))
appController = new AppController[IO](tracer)
httpApp = appController.httpApp
_ <- BlazeServerBuilder[IO]
.bindHttp(9006, "0.0.0.0")
.withHttpApp(httpApp)
.resource
} yield ()

serverResource.use(_ => IO.never).as(ExitCode.Success)
}
}
81 changes: 81 additions & 0 deletions app6/src/main/scala/controllers/AppController.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package controllers

import cats.effect.Async
import cats.implicits.*
import io.circe.generic.auto.*
import io.circe.syntax.*
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.TextMapGetter
import org.http4s.*
import org.http4s.circe.*
import org.http4s.dsl.Http4sDsl
import org.http4s.implicits.*
import org.typelevel.ci.CIString
import org.typelevel.log4cats.slf4j.Slf4jLogger
import org.typelevel.otel4s.trace.Tracer
import utils.ServerMiddleware.*
import utils.{
ContextId,
ContextualLogger,
HttpRequestContextual,
RequestContext
}

import scala.jdk.CollectionConverters.*

case class TraceResponse(id: String)

object TraceResponse {
implicit def traceResponseEncoder[F[_]]: EntityEncoder[F, TraceResponse] =
jsonEncoderOf[F, TraceResponse]
}

class AppController[F[_]: Async](tracer: Tracer[F])
extends Http4sDsl[F]
with ContextId {
implicit val t: Tracer[F] = tracer

val logger: ContextualLogger[F] =
new ContextualLogger[F](Slf4jLogger.getLogger[F])

// TODO:WIP..
private val propagator = W3CTraceContextPropagator.getInstance()
private val getter = new TextMapGetter[Headers] {
override def keys(carrier: Headers): java.lang.Iterable[String] =
carrier.headers.map(_.name.toString).asJava

override def get(carrier: Headers, key: String): String =
carrier.get(CIString(key)).map(_.head.value).orNull
}

def trace: HttpRoutes[F] = HttpRoutes.of[F] {
case request @ GET -> Root / "api" / "v1" / "trace" =>
HttpRequestContextual.contextual(request) {
implicit ctx: RequestContext =>
val context =
propagator.extract(Context.current(), request.headers, getter)
val span = Span.fromContext(context)
for {
_ <- logger.info(
s"trace request - headers ${request.headers.headers}"
)
_ <- logger.info(s"trace_id ${span.getSpanContext.getTraceId}")
response <- Ok(
TraceResponse(span.getSpanContext.getTraceId).asJson
)
} yield response
}
}

def index: HttpRoutes[F] = HttpRoutes.of[F] { case GET -> Root / "health" =>
Ok(
"<h1>Welcome</h1><p>APP6 is ready.</p>",
Header.Raw(CIString("Content-Type"), "text/html")
)
}

private val routes: HttpRoutes[F] = trace <+> index
val httpApp: HttpApp[F] = routes.orNotFound.traced
}
28 changes: 28 additions & 0 deletions app6/src/main/scala/utils/ContextId.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package utils

import org.http4s.Headers
import org.log4s.getLogger
import org.typelevel.ci.CIString
import utils.ContextId.cId

import java.util.UUID

object ContextId {
lazy val cId = CIString(
"context-id"
) // TODO: choose correlation-id in all the solution
}

trait ContextId {

private[this] val logger = getLogger
def getCtxId(requestHeader: Headers): String = {
requestHeader
.get(cId)
.map(_.head.value)
.getOrElse({
logger.warn(s"$cId is missing")
UUID.randomUUID().toString
})
}
}
52 changes: 52 additions & 0 deletions app6/src/main/scala/utils/ContextualLogger.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package utils

import cats.effect.{Async, Sync}
import org.typelevel.log4cats.Logger

class ContextualLogger[F[_]: Sync](logger: Logger[F]) {

private def contextual(message: => String)(implicit
ctx: RequestContext
): String =
s"${ctx.requestId} - $message"

def error(message: => String)(implicit ctx: RequestContext): F[Unit] =
logger.error(contextual(message))

def warn(message: => String)(implicit ctx: RequestContext): F[Unit] =
logger.warn(contextual(message))

def info(message: => String)(implicit ctx: RequestContext): F[Unit] =
logger.info(contextual(message))

def debug(message: => String)(implicit ctx: RequestContext): F[Unit] =
logger.debug(contextual(message))

def trace(message: => String)(implicit ctx: RequestContext): F[Unit] =
logger.trace(contextual(message))

def error(t: Throwable)(message: => String)(implicit
ctx: RequestContext
): F[Unit] =
logger.error(t)(contextual(message))

def warn(t: Throwable)(message: => String)(implicit
ctx: RequestContext
): F[Unit] =
logger.warn(t)(contextual(message))

def info(t: Throwable)(message: => String)(implicit
ctx: RequestContext
): F[Unit] =
logger.info(t)(contextual(message))

def debug(t: Throwable)(message: => String)(implicit
ctx: RequestContext
): F[Unit] =
logger.debug(t)(contextual(message))

def trace(t: Throwable)(message: => String)(implicit
ctx: RequestContext
): F[Unit] =
logger.trace(t)(contextual(message))
}
22 changes: 22 additions & 0 deletions app6/src/main/scala/utils/HttpRequestContextual.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package utils

import cats.effect.Sync
import org.http4s.Request
import org.http4s.Response
import org.http4s.server.middleware.RequestId
import java.util.UUID

final case class RequestContext(requestId: String)

object HttpRequestContextual {
type Contextual[T] = RequestContext => T

def contextual[F[_]: Sync](
request: Request[F]
)(f: Contextual[F[Response[F]]]): F[Response[F]] = {
val context: RequestContext = request.attributes
.lookup(RequestId.requestIdAttrKey)
.fold(RequestContext(UUID.randomUUID().toString))(RequestContext.apply)
f(context)
}
}
14 changes: 14 additions & 0 deletions app6/src/main/scala/utils/OtelResource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package utils

import cats.effect.{Async, LiftIO, Resource}
import io.opentelemetry.api.GlobalOpenTelemetry
import org.typelevel.otel4s.Otel4s
import org.typelevel.otel4s.oteljava.OtelJava

object OtelResource {
def apply[F[_]: Async: LiftIO]: Resource[F, Otel4s[F]] = {
Resource
.eval(Async[F].delay(GlobalOpenTelemetry.get))
.evalMap(OtelJava.forAsync[F])
}
}
Loading
Loading