Skip to content

Commit

Permalink
naming refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Jan 20, 2025
1 parent d741313 commit caf721b
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,31 @@ package sttp.client4.httpclient

import sttp.attributes.AttributeKey

import java.nio.ByteBuffer

/** Defines a callback to be invoked when subsequent parts of the request body to be sent are created, just before they
* are sent over the network.
/** Defines a callback to be invoked when subsequent parts of the request body are about to be sent over the network.
*
* When a request is sent, `onInit` is invoked exactly once with the content length (if it is known). This is followed
* by arbitrary number of `onNext` calls. Finally, either `onComplete` or `onError` are called exactly once.
* by arbitrary number of `onNext` calls, with the number of bytes that will be sent. Finally, either `onComplete` or
* `onError` are called exactly once.
*
* All of the methods should be non-blocking and complete as fast as possible, so as not to obstruct sending data over
* the network.
*
* To register a callback, set the [[RequestBodyCallback.Attribute]] on a request, using the
* To register a callback, set the [[RequestBodyProgressCallback.Attribute]] on a request, using the
* [[sttp.client4.Request.attribute]] method.
*/
trait RequestBodyCallback {
trait RequestBodyProgressCallback {
def onInit(contentLength: Option[Long]): Unit

def onNext(b: ByteBuffer): Unit
def onNext(bytesCount: Long): Unit

def onComplete(): Unit
def onError(t: Throwable): Unit
}

object RequestBodyCallback {
object RequestBodyProgressCallback {

/** The key of the attribute that should be set on a request, to receive callbacks when the request body is sent. */
val Attribute = AttributeKey[RequestBodyCallback]
/** The key of the attribute that should be set on a request, to receive callbacks on the progress of sending the
* request body.
*/
val Attribute = AttributeKey[RequestBodyProgressCallback]
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import java.util.concurrent.Flow
import java.util.function.Supplier
import scala.collection.JavaConverters._
import java.util.concurrent.Flow.Subscription
import sttp.client4.httpclient.RequestBodyCallback
import sttp.client4.httpclient.RequestBodyProgressCallback

private[client4] trait BodyToHttpClient[F[_], S, R] {
val streams: Streams[S]
Expand Down Expand Up @@ -51,7 +51,7 @@ private[client4] trait BodyToHttpClient[F[_], S, R] {
case Some(cl) => body.map(b => withKnownContentLength(b, cl))
}

request.attribute(RequestBodyCallback.Attribute) match {
request.attribute(RequestBodyProgressCallback.Attribute) match {
case None => bodyWithContentLength
case Some(callback) => bodyWithContentLength.map(withCallback(_, callback))
}
Expand Down Expand Up @@ -99,7 +99,7 @@ private[client4] trait BodyToHttpClient[F[_], S, R] {

private def withCallback(
delegate: HttpRequest.BodyPublisher,
callback: RequestBodyCallback
callback: RequestBodyProgressCallback
): HttpRequest.BodyPublisher =
new HttpRequest.BodyPublisher {
override def contentLength(): Long = delegate.contentLength()
Expand All @@ -114,7 +114,7 @@ private[client4] trait BodyToHttpClient[F[_], S, R] {
}

override def onNext(item: ByteBuffer): Unit = {
runCallbackSafe(callback.onNext(item))
runCallbackSafe(callback.onNext(item.remaining()))
subscriber.onNext(item)
}

Expand Down
11 changes: 5 additions & 6 deletions core/src/test/scalajvm/sttp/client4/HttpClientSyncHttpTest.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package sttp.client4

import sttp.client4.httpclient.HttpClientSyncBackend
import sttp.client4.httpclient.RequestBodyCallback
import sttp.client4.httpclient.RequestBodyProgressCallback
import sttp.client4.testing.ConvertToFuture
import sttp.client4.testing.HttpTest
import sttp.model.StatusCode
import sttp.shared.Identity

import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._

Expand All @@ -24,14 +23,14 @@ class HttpClientSyncHttpTest extends HttpTest[Identity] {
"callback" - {
"should be invoked as described in the callback protocol" in {
val trail = new ConcurrentLinkedQueue[String]()
val callback = new RequestBodyCallback {
val callback = new RequestBodyProgressCallback {

override def onInit(contentLength: Option[Long]): Unit = {
val _ = trail.add(s"init ${contentLength.getOrElse(-1)}")
}

override def onNext(b: ByteBuffer): Unit = {
val _ = trail.add(s"next ${b.remaining()}")
override def onNext(bytesCount: Long): Unit = {
val _ = trail.add(s"next $bytesCount")
}

override def onComplete(): Unit = {
Expand All @@ -44,7 +43,7 @@ class HttpClientSyncHttpTest extends HttpTest[Identity] {
}

val contentLength = 2048 * 100
val req = postEcho.body("x" * contentLength).attribute(RequestBodyCallback.Attribute, callback)
val req = postEcho.body("x" * contentLength).attribute(RequestBodyProgressCallback.Attribute, callback)

(req.send(backend): Identity[Response[Either[String, String]]]).toFuture().map { response =>
val t = trail.asScala
Expand Down
29 changes: 15 additions & 14 deletions docs/other/body_callbacks.md
Original file line number Diff line number Diff line change
@@ -1,43 +1,44 @@
# Body callbacks
# Request body progress callback

When using the `HttpClient`-based backends (which includes the `DefaultSyncBackend` and `DefaultFutureBackend` on the
JVM), it is possible to register body-related callbacks.
JVM), it is possible to register a callback that keeps track of the progress of sending the request body.

This feature is not available in other backends, and setting the attribute described below will have no effect.

## Request body callbacks
The callback is defined through an instance of the `RequestBodyProgressCallback` trait.

Defines a callback to be invoked when subsequent parts of the request body to be sent are created, just before they
are sent over the network. The callback is defined through an instance of the `RequestBodyCallback` trait.

When a request is sent, the `RequestBodyCallback.onInit` method is invoked exactly once with the content length (if it
When a request is sent, the `RequestBodyProgressCallback.onInit` method is invoked exactly once with the content length (if it
is known). This is followed by arbitrary number of `onNext` calls. Finally, either `onComplete` or `onError` are called
exactly once.

All of the methods in the `RequestBodyCallback` implementation should be non-blocking and complete as fast as possible,
```{note}
`onNext` is called when a part of the request body is ready to be sent over the network, that is, before it is actually
being transferred.
```

All of the methods in the `RequestBodyProgressCallback` implementation should be non-blocking and complete as fast as possible,
so as not to obstruct sending data over the network.

To register a callback, set the `RequestBodyCallback.Attribute` on a request. For example:
To register a callback, set the `RequestBodyProgressCallback.Attribute` on a request. For example:

```scala mdoc:compile-only
import sttp.client4.*
import sttp.client4.httpclient.{HttpClientSyncBackend, RequestBodyCallback}
import java.nio.ByteBuffer
import sttp.client4.httpclient.{HttpClientSyncBackend, RequestBodyProgressCallback}
import java.io.File

val backend = HttpClientSyncBackend()

val fileToSend: File = ???
val callback = new RequestBodyCallback {
val callback = new RequestBodyProgressCallback {
override def onInit(contentLength: Option[Long]): Unit = println(s"expected content length: $contentLength")
override def onNext(b: ByteBuffer): Unit = println(s"next, bytes: ${b.remaining()}")
override def onNext(bytesCount: Long): Unit = println(s"next, bytes: $bytesCount")
override def onComplete(): Unit = println(s"complete")
override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
}

val response = basicRequest
.get(uri"http://example.com")
.body(fileToSend)
.attribute(RequestBodyCallback.Attribute, callback)
.attribute(RequestBodyProgressCallback.Attribute, callback)
.send(backend)
```

0 comments on commit caf721b

Please sign in to comment.