Skip to content

Commit

Permalink
Adjust metrics backends
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Feb 3, 2025
1 parent 6972c34 commit 6bed377
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ abstract class ListenerBackend[F[_], P, L](
listener.beforeRequest(request).flatMap { case (requestToSend, tag) =>
monad
.handleError(delegate.send(requestToSend)) { case e: Exception =>
listener.requestException(requestToSend, tag, e).flatMap(_ => monad.error(e))
monad.flatMap {
ResponseException.find(e) match {
case Some(re) => listener.requestSuccessful(requestToSend, re.response, tag, Some(re))
case None => listener.requestException(requestToSend, tag, e)
}
} { _ => monad.error(e) }
}
.flatMap(response => listener.requestSuccessful(requestToSend, response, tag).map(_ => response))
.flatMap(response => listener.requestSuccessful(requestToSend, response, tag, None).map(_ => response))
}
}

Expand Down
29 changes: 23 additions & 6 deletions core/src/main/scala/sttp/client4/listener/RequestListener.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package sttp.client4.listener

import sttp.monad.MonadError
import sttp.client4.{GenericRequest, Response}
import sttp.client4.GenericRequest
import sttp.shared.Identity
import sttp.model.ResponseMetadata
import sttp.client4.ResponseException

/** A listener to be used by the [[ListenerBackend]] to get notified on request lifecycle events. The request can be
* enriched before being sent, e.g. to capture additional metrics.
Expand All @@ -13,18 +15,33 @@ import sttp.shared.Identity
*/
trait RequestListener[F[_], L] {
def beforeRequest[T, R](request: GenericRequest[T, R]): F[(GenericRequest[T, R], L)]
def requestException(request: GenericRequest[_, _], tag: L, e: Exception): F[Unit]
def requestSuccessful(request: GenericRequest[_, _], response: Response[_], tag: L): F[Unit]
def requestException(request: GenericRequest[_, _], tag: L, e: Throwable): F[Unit]

/** @param e
* A [[ResponseException]] that might occur when handling the response: when the raw response is successfully
* received via the network, but e.g. a parsing or decompression exception occurs.
*/
def requestSuccessful(
request: GenericRequest[_, _],
response: ResponseMetadata,
tag: L,
e: Option[ResponseException[_]]
): F[Unit]
}

object RequestListener {
def lift[F[_], L](delegate: RequestListener[Identity, L], monadError: MonadError[F]): RequestListener[F, L] =
new RequestListener[F, L] {
override def beforeRequest[T, R](request: GenericRequest[T, R]): F[(GenericRequest[T, R], L)] =
monadError.eval(delegate.beforeRequest(request))
override def requestException(request: GenericRequest[_, _], tag: L, e: Exception): F[Unit] =
override def requestException(request: GenericRequest[_, _], tag: L, e: Throwable): F[Unit] =
monadError.eval(delegate.requestException(request, tag, e))
override def requestSuccessful(request: GenericRequest[_, _], response: Response[_], tag: L): F[Unit] =
monadError.eval(delegate.requestSuccessful(request, response, tag))
override def requestSuccessful(
request: GenericRequest[_, _],
response: ResponseMetadata,
tag: L,
e: Option[ResponseException[_]]
): F[Unit] =
monadError.eval(delegate.requestSuccessful(request, response, tag, e))
}
}
7 changes: 4 additions & 3 deletions core/src/main/scala/sttp/client4/logging/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,21 @@ import sttp.client4.Response
import sttp.model.ResponseMetadata

import scala.concurrent.duration.Duration
import sttp.client4.ResponseException

/** Performs logging before requests are sent and after requests complete successfully or with an exception. */
trait Log[F[_]] {
def beforeRequestSend(request: GenericRequest[_, _]): F[Unit]

/** @param exception
* An exception that might occur when processing the response (e.g. parsing).
* A [[ResponseException]] that might occur when handling the response (e.g. parsing).
*/
def response(
request: GenericRequest[_, _],
response: ResponseMetadata,
responseBody: Option[String],
timings: Option[ResponseTimings],
exception: Option[Throwable]
exception: Option[ResponseException[_]]
): F[Unit]

def requestException(request: GenericRequest[_, _], timing: Option[Duration], exception: Throwable): F[Unit]
Expand Down Expand Up @@ -55,7 +56,7 @@ class DefaultLog[F[_]](logger: Logger[F], config: LogConfig, logContext: LogCont
response: ResponseMetadata,
responseBody: Option[String],
timings: Option[ResponseTimings],
exception: Option[Throwable]
exception: Option[ResponseException[_]]
): F[Unit] = {
val responseWithBody = Response(
responseBody.getOrElse(""),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class LoggingBackend[F[_], P](
} { case e: Exception =>
monad.flatMap {
ResponseException.find(e) match {
case Some(re) => log.response(request, re.response, None, tag.map(toResponseTimings), Some(e))
case Some(re) => log.response(request, re.response, None, tag.map(toResponseTimings), Some(re))
case None => log.requestException(request, tag.map(toResponseTimings).map(_.bodyHandled), e)
}
} { _ => monad.error(e) }
Expand Down
2 changes: 1 addition & 1 deletion docs/backends/wrappers/custom.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Backends, or backend wrappers can use attributes e.g. for logging, passing a met

## Listener backend

The `sttp.client4.listener.ListenerBackend` can make it easier to create backend wrappers which need to be notified about request lifecycle events: when a request is started, and when it completes either successfully or with an exception. This is possible by implementing a `sttp.client4.listener.RequestListener`. This is how e.g. the [slf4j backend](logging.md) is implemented.
The `sttp.client4.listener.ListenerBackend` can make it easier to create backend wrappers which need to be notified about request lifecycle events: when a request is started, and when it completes either successfully or with an exception. This is possible by implementing a `sttp.client4.listener.RequestListener`.

A request listener can associate a value with a request, which will then be passed to the request completion notification methods.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.opentelemetry.semconv.UrlAttributes
import io.opentelemetry.semconv.ErrorAttributes
import io.opentelemetry.semconv.ServerAttributes
import io.opentelemetry.api.common.AttributesBuilder
import sttp.model.ResponseMetadata

object OpenTelemetryDefaults {

Expand All @@ -32,7 +33,7 @@ object OpenTelemetryDefaults {
.put(ServerAttributes.SERVER_PORT, request.uri.port.getOrElse(80))

/** @see https://opentelemetry.io/docs/specs/semconv/http/http-metrics/#http-client */
def responseAttributes(request: GenericRequest[_, _], response: Response[_]): Attributes =
def responseAttributes(request: GenericRequest[_, _], response: ResponseMetadata): Attributes =
Attributes.builder
.put(HttpAttributes.HTTP_RESPONSE_STATUS_CODE, response.code.code.toLong: java.lang.Long)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import sttp.client4._
import sttp.client4.listener.ListenerBackend
import sttp.client4.listener.RequestListener
import sttp.client4.wrappers.FollowRedirectsBackend
import sttp.model.ResponseMetadata
import sttp.shared.Identity

import java.util.concurrent.ConcurrentHashMap
Expand Down Expand Up @@ -92,7 +93,12 @@ private class OpenTelemetryMetricsListener(config: OpenTelemetryMetricsConfig)
(request, config.requestToLatencyHistogramMapper(request).map { _ => config.clock.millis() })
}

override def requestSuccessful(request: GenericRequest[_, _], response: Response[_], tag: Option[Long]): Unit = {
override def requestSuccessful(
request: GenericRequest[_, _],
response: ResponseMetadata,
tag: Option[Long],
e: Option[ResponseException[_]]
): Unit = {
val requestAttributes = config.requestAttributes(request)
val responseAttributes = config.responseAttributes(request, response)

Expand All @@ -113,22 +119,17 @@ private class OpenTelemetryMetricsListener(config: OpenTelemetryMetricsConfig)
updateInProgressCounter(request, -1, requestAttributes)
}

override def requestException(request: GenericRequest[_, _], tag: Option[Long], e: Exception): Unit = {
override def requestException(request: GenericRequest[_, _], tag: Option[Long], e: Throwable): Unit = {
val requestAttributes = config.requestAttributes(request)
val errorAttributes = config.errorAttributes(e)

ResponseException.find(e) match {
case Some(re) =>
requestSuccessful(request, Response((), re.response.code, request.onlyMetadata), tag)
case _ =>
incrementCounter(config.requestToFailureCounterMapper(request, e), errorAttributes)
recordHistogram(
config.requestToLatencyHistogramMapper(request),
tag.map(config.clock.millis() - _),
errorAttributes
)
updateInProgressCounter(request, -1, requestAttributes)
}
incrementCounter(config.requestToFailureCounterMapper(request, e), errorAttributes)
recordHistogram(
config.requestToLatencyHistogramMapper(request),
tag.map(config.clock.millis() - _),
errorAttributes
)
updateInProgressCounter(request, -1, requestAttributes)
}

private def updateInProgressCounter[R, T](request: GenericRequest[T, R], delta: Long, attributes: Attributes): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@ import sttp.client4._
import sttp.client4.opentelemetry.OpenTelemetryMetricsBackend._

import java.time.Clock
import sttp.model.ResponseMetadata

final case class OpenTelemetryMetricsConfig(
meter: Meter,
clock: Clock,
requestToLatencyHistogramMapper: GenericRequest[_, _] => Option[HistogramCollectorConfig],
requestToInProgressCounterMapper: GenericRequest[_, _] => Option[CollectorConfig],
responseToSuccessCounterMapper: (GenericRequest[_, _], Response[_]) => Option[CollectorConfig],
requestToErrorCounterMapper: (GenericRequest[_, _], Response[_]) => Option[CollectorConfig],
responseToSuccessCounterMapper: (GenericRequest[_, _], ResponseMetadata) => Option[CollectorConfig],
requestToErrorCounterMapper: (GenericRequest[_, _], ResponseMetadata) => Option[CollectorConfig],
requestToFailureCounterMapper: (GenericRequest[_, _], Throwable) => Option[CollectorConfig],
requestToSizeHistogramMapper: GenericRequest[_, _] => Option[HistogramCollectorConfig],
responseToSizeHistogramMapper: (GenericRequest[_, _], Response[_]) => Option[HistogramCollectorConfig],
responseToSizeHistogramMapper: (GenericRequest[_, _], ResponseMetadata) => Option[HistogramCollectorConfig],
requestAttributes: GenericRequest[_, _] => Attributes,
responseAttributes: (GenericRequest[_, _], Response[_]) => Attributes,
responseAttributes: (GenericRequest[_, _], ResponseMetadata) => Attributes,
errorAttributes: Throwable => Attributes
)

Expand All @@ -38,10 +39,10 @@ object OpenTelemetryMetricsConfig {
),
requestToInProgressCounterMapper: GenericRequest[_, _] => Option[CollectorConfig] = (_: GenericRequest[_, _]) =>
Some(CollectorConfig(DefaultRequestsActiveCounterName)),
responseToSuccessCounterMapper: (GenericRequest[_, _], Response[_]) => Option[CollectorConfig] =
(_: GenericRequest[_, _], _: Response[_]) => Some(CollectorConfig(DefaultSuccessCounterName)),
responseToErrorCounterMapper: (GenericRequest[_, _], Response[_]) => Option[CollectorConfig] =
(_: GenericRequest[_, _], _: Response[_]) => Some(CollectorConfig(DefaultErrorCounterName)),
responseToSuccessCounterMapper: (GenericRequest[_, _], ResponseMetadata) => Option[CollectorConfig] =
(_: GenericRequest[_, _], _: ResponseMetadata) => Some(CollectorConfig(DefaultSuccessCounterName)),
responseToErrorCounterMapper: (GenericRequest[_, _], ResponseMetadata) => Option[CollectorConfig] =
(_: GenericRequest[_, _], _: ResponseMetadata) => Some(CollectorConfig(DefaultErrorCounterName)),
requestToFailureCounterMapper: (GenericRequest[_, _], Throwable) => Option[CollectorConfig] =
(_: GenericRequest[_, _], _: Throwable) => Some(CollectorConfig(DefaultFailureCounterName)),
requestToSizeHistogramMapper: GenericRequest[_, _] => Option[HistogramCollectorConfig] =
Expand All @@ -53,8 +54,8 @@ object OpenTelemetryMetricsConfig {
unit = HistogramCollectorConfig.Bytes
)
),
responseToSizeHistogramMapper: (GenericRequest[_, _], Response[_]) => Option[HistogramCollectorConfig] =
(_: GenericRequest[_, _], _: Response[_]) =>
responseToSizeHistogramMapper: (GenericRequest[_, _], ResponseMetadata) => Option[HistogramCollectorConfig] =
(_: GenericRequest[_, _], _: ResponseMetadata) =>
Some(
HistogramCollectorConfig(
DefaultResponseSizeHistogramName,
Expand All @@ -64,7 +65,7 @@ object OpenTelemetryMetricsConfig {
),
spanName: GenericRequest[_, _] => String = OpenTelemetryDefaults.spanName _,
requestAttributes: GenericRequest[_, _] => Attributes = OpenTelemetryDefaults.requestAttributes _,
responseAttributes: (GenericRequest[_, _], Response[_]) => Attributes =
responseAttributes: (GenericRequest[_, _], ResponseMetadata) => Attributes =
OpenTelemetryDefaults.responseAttributes _,
errorAttributes: Throwable => Attributes = OpenTelemetryDefaults.errorAttributes _
): OpenTelemetryMetricsConfig = usingMeter(
Expand Down Expand Up @@ -99,10 +100,10 @@ object OpenTelemetryMetricsConfig {
),
requestToInProgressCounterMapper: GenericRequest[_, _] => Option[CollectorConfig] = (_: GenericRequest[_, _]) =>
Some(CollectorConfig(DefaultRequestsActiveCounterName)),
responseToSuccessCounterMapper: (GenericRequest[_, _], Response[_]) => Option[CollectorConfig] =
(_: GenericRequest[_, _], _: Response[_]) => Some(CollectorConfig(DefaultSuccessCounterName)),
responseToErrorCounterMapper: (GenericRequest[_, _], Response[_]) => Option[CollectorConfig] =
(_: GenericRequest[_, _], _: Response[_]) => Some(CollectorConfig(DefaultErrorCounterName)),
responseToSuccessCounterMapper: (GenericRequest[_, _], ResponseMetadata) => Option[CollectorConfig] =
(_: GenericRequest[_, _], _: ResponseMetadata) => Some(CollectorConfig(DefaultSuccessCounterName)),
responseToErrorCounterMapper: (GenericRequest[_, _], ResponseMetadata) => Option[CollectorConfig] =
(_: GenericRequest[_, _], _: ResponseMetadata) => Some(CollectorConfig(DefaultErrorCounterName)),
requestToFailureCounterMapper: (GenericRequest[_, _], Throwable) => Option[CollectorConfig] =
(_: GenericRequest[_, _], _: Throwable) => Some(CollectorConfig(DefaultFailureCounterName)),
requestToSizeHistogramMapper: GenericRequest[_, _] => Option[HistogramCollectorConfig] =
Expand All @@ -114,8 +115,8 @@ object OpenTelemetryMetricsConfig {
unit = HistogramCollectorConfig.Bytes
)
),
responseToSizeHistogramMapper: (GenericRequest[_, _], Response[_]) => Option[HistogramCollectorConfig] =
(_: GenericRequest[_, _], _: Response[_]) =>
responseToSizeHistogramMapper: (GenericRequest[_, _], ResponseMetadata) => Option[HistogramCollectorConfig] =
(_: GenericRequest[_, _], _: ResponseMetadata) =>
Some(
HistogramCollectorConfig(
DefaultResponseSizeHistogramName,
Expand All @@ -124,7 +125,7 @@ object OpenTelemetryMetricsConfig {
)
),
requestAttributes: GenericRequest[_, _] => Attributes = OpenTelemetryDefaults.requestAttributes _,
responseAttributes: (GenericRequest[_, _], Response[_]) => Attributes =
responseAttributes: (GenericRequest[_, _], ResponseMetadata) => Attributes =
OpenTelemetryDefaults.responseAttributes _,
errorAttributes: Throwable => Attributes = OpenTelemetryDefaults.errorAttributes _
): OpenTelemetryMetricsConfig =
Expand Down
Loading

0 comments on commit 6bed377

Please sign in to comment.