Skip to content

Commit

Permalink
http4s backends
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Jan 21, 2025
1 parent 4a81935 commit 2dda137
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,19 @@ class Http4sBackend[F[_]: Async](
val statusText = response.status.reason
val responseMetadata = ResponseMetadata(code, statusText, headers)

val limitedResponse =
r.options.maxResponseBodyLength.fold(response)(limit =>
response.copy(body = Fs2Streams.limitBytes(response.body, limit))
)

val signalBodyComplete = responseBodyCompleteVar.complete(()).map(_ => ())
val body =
bodyFromResponseAs(signalBodyComplete)(
r.response,
responseMetadata,
Left(
onFinalizeSignal(
decompressResponseBodyIfNotHead(r.method, response, r.autoDecompressionEnabled),
decompressResponseBodyIfNotHead(r.method, limitedResponse, r.autoDecompressionEnabled),
signalBodyComplete
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import cats.effect.concurrent.MVar
import cats.effect.{Blocker, Concurrent, ConcurrentEffect, ContextShift, Resource, Sync}
import cats.implicits._
import cats.effect.implicits._
import fs2.{Chunk, Stream}
import fs2.{Chunk, Stream, Pull}
import org.http4s.{EntityBody, Request => Http4sRequest, Status}
import org.http4s
import org.http4s.blaze.client.BlazeClientBuilder
Expand All @@ -27,6 +27,7 @@ import sttp.client4.compression.CompressionHandlers
import sttp.client4.impl.fs2.GZipFs2Decompressor
import sttp.client4.impl.fs2.DeflateFs2Decompressor
import sttp.client4.compression.Decompressor
import sttp.capabilities.StreamMaxLengthExceededException

import scala.concurrent.ExecutionContext

Expand Down Expand Up @@ -84,14 +85,18 @@ class Http4sBackend[F[_]: ConcurrentEffect: ContextShift](
val statusText = response.status.reason
val responseMetadata = ResponseMetadata(code, statusText, headers)

val limitedResponse =
r.options.maxResponseBodyLength
.fold(response)(limit => response.copy(body = limitBytes(response.body, limit)))

val signalBodyComplete = responseBodyCompleteVar.tryPut(()).map(_ => ())
val body =
bodyFromResponseAs(signalBodyComplete)(
r.response,
responseMetadata,
Left(
onFinalizeSignal(
decompressResponseBodyIfNotHead(r.method, response, r.autoDecompressionEnabled),
decompressResponseBodyIfNotHead(r.method, limitedResponse, r.autoDecompressionEnabled),
signalBodyComplete
)
)
Expand Down Expand Up @@ -260,6 +265,24 @@ class Http4sBackend[F[_]: ConcurrentEffect: ContextShift](
case e: Exception => SttpClientException.defaultExceptionToSttpClientException(request, e)
}

// based on Fs2Streams.limitBytes (for ce3)
private def limitBytes[F[_]](stream: Stream[F, Byte], maxBytes: Long): Stream[F, Byte] = {
def go(s: Stream[F, Byte], remaining: Long): Pull[F, Byte, Unit] = {
if (remaining < 0) throw new StreamMaxLengthExceededException(maxBytes)
else
s.pull.uncons.flatMap {
case Some((chunk, tail)) =>
val chunkSize = chunk.size.toLong
if (chunkSize <= remaining)
Pull.output(chunk) >> go(tail, remaining - chunkSize)
else
throw new StreamMaxLengthExceededException(maxBytes)
case None => Pull.done
}
}
go(stream, maxBytes).stream
}

override implicit val monad: MonadError[F] = new CatsMonadAsyncError

// no-op. Client lifecycle is managed by Resource
Expand Down

0 comments on commit 2dda137

Please sign in to comment.