Skip to content

Commit

Permalink
Fix backends
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Feb 3, 2025
1 parent 6bed377 commit 8165c2e
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ class FinagleBackend(client: Option[Client] = None) extends Backend[TFuture] {
val statusText = fResponse.status.reason
val responseMetadata = ResponseMetadata(code, statusText, headers)
val body =
bodyFromResponseAs(request.options.onBodyReceived)(request.response, responseMetadata, Left(fResponse))
bodyFromResponseAs(() => request.options.onBodyReceived(responseMetadata))(
request.response,
responseMetadata,
Left(fResponse)
)
service
.close()
.flatMap(_ => body.map(sttp.client4.Response(_, code, statusText, headers, Nil, request.onlyMetadata)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,9 @@ class Http4sBackend[F[_]: ConcurrentEffect: ContextShift](
val statusText = response.status.reason
val responseMetadata = ResponseMetadata(code, statusText, headers)

val callbackResponse =
response.copy(body = response.body.onFinalize(ConcurrentEffect[F].delay(r.options.onBodyReceived())))
val limitedResponse: org.http4s.Response[F] =
r.options.maxResponseBodyLength
.fold(callbackResponse)(limit => response.copy(body = limitBytes(callbackResponse.body, limit)))
.fold(response)(limit => response.copy(body = limitBytes(response.body, limit)))

val signalBodyComplete = responseBodyCompleteVar.tryPut(()).map(_ => ())
val body =
Expand All @@ -98,7 +96,10 @@ class Http4sBackend[F[_]: ConcurrentEffect: ContextShift](
responseMetadata,
Left(
onFinalizeSignal(
decompressResponseBodyIfNotHead(r.method, limitedResponse, r.autoDecompressionEnabled),
addOnBodyReceivedCallback(
decompressResponseBodyIfNotHead(r.method, limitedResponse, r.autoDecompressionEnabled),
() => r.options.onBodyReceived(responseMetadata)
),
signalBodyComplete
)
)
Expand Down Expand Up @@ -195,6 +196,9 @@ class Http4sBackend[F[_]: ConcurrentEffect: ContextShift](
private def onFinalizeSignal(hr: http4s.Response[F], signal: F[Unit]): http4s.Response[F] =
hr.copy(body = hr.body.onFinalize(signal))

private def addOnBodyReceivedCallback[T](hr: http4s.Response[F], callback: () => Unit): http4s.Response[F] =
hr.copy(body = hr.body.onFinalize(ConcurrentEffect[F].delay(callback())))

private def decompressResponseBodyIfNotHead[T](
m: Method,
hr: http4s.Response[F],
Expand Down

0 comments on commit 8165c2e

Please sign in to comment.