Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Source.queue #537

Merged
merged 5 commits into from
Apr 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions docs/src/test/scala/docs/http/scaladsl/Http2ClientApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import pekko.http.scaladsl.model.headers.HttpEncodings
import pekko.http.scaladsl.model.HttpRequest
import pekko.http.scaladsl.model.HttpResponse
import pekko.http.scaladsl.model.headers
import pekko.stream.OverflowStrategy
import pekko.stream.scaladsl.Flow
import pekko.stream.scaladsl.Sink
import pekko.stream.scaladsl.Source
import pekko.stream.QueueOfferResult

import com.typesafe.config.ConfigFactory

import scala.annotation.nowarn
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
Expand Down Expand Up @@ -75,14 +75,11 @@ object Http2ClientApp extends App {
.flatMap(_.toStrict(1.second))
.onComplete(res => println(s"[4] Got favicon: $res"))

// OverflowStrategy.dropNew has been deprecated in latest Pekko versions
// FIXME: replace with 2.6 queue when 2.5 support is dropped, see #3069
@nowarn("msg=Use Source.queue") //
// #response-future-association
def singleRequest(
connection: Flow[HttpRequest, HttpResponse, Any], bufferSize: Int = 100): HttpRequest => Future[HttpResponse] = {
val queue =
Source.queue(bufferSize, OverflowStrategy.dropNew)
Source.queue(bufferSize)
.via(connection)
.to(Sink.foreach { response =>
// complete the response promise with the response when it arrives
Expand All @@ -94,9 +91,14 @@ object Http2ClientApp extends App {
req => {
// create a promise of the response for each request and set it as an attribute on the request
val p = Promise[HttpResponse]()
queue.offer(req.addAttribute(ResponsePromise.Key, ResponsePromise(p)))
queue.offer(req.addAttribute(ResponsePromise.Key, ResponsePromise(p))) match {
// return the future response
.flatMap(_ => p.future)
case QueueOfferResult.Enqueued => p.future
case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Queue overflowed."))
case QueueOfferResult.Failure(ex) => Future.failed(ex)
case QueueOfferResult.QueueClosed => Future.failed(
new RuntimeException("Queue was closed (pool shut down)."))
}
}
}
// #response-future-association
Expand Down
79 changes: 35 additions & 44 deletions docs/src/test/scala/docs/http/scaladsl/HttpClientExampleSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,28 @@

package docs.http.scaladsl

import scala.concurrent.ExecutionContext
import org.apache.pekko
import pekko.http.scaladsl.model.HttpRequest
import pekko.http.scaladsl.settings.ClientConnectionSettings
import pekko.http.scaladsl.settings.ConnectionPoolSettings
import scala.annotation.nowarn

import docs.CompileOnlySpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import scala.concurrent.ExecutionContext

// OverflowStrategy.dropNew has been deprecated in latest Pekko versions
// FIXME: replace with 2.6 queue when 2.5 support is dropped, see #3069
@nowarn("msg=will not be a runnable program|Use Source.queue")
class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySpec {

"manual-entity-consume-example-1" in compileOnlySpec {
// #manual-entity-consume-example-1
import java.io.File

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.http.scaladsl.model._
import pekko.stream.scaladsl.{ FileIO, Framing }
import pekko.util.ByteString

import java.io.File

implicit val system: ActorSystem = ActorSystem()

val response: HttpResponse = ???
Expand All @@ -54,15 +51,14 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp

"manual-entity-consume-example-2" in compileOnlySpec {
// #manual-entity-consume-example-2
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.http.scaladsl.model._
import pekko.util.ByteString

import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration._

implicit val system: ActorSystem = ActorSystem()
implicit val dispatcher: ExecutionContext = system.dispatcher

Expand Down Expand Up @@ -91,16 +87,15 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp

"manual-entity-consume-example-3" in compileOnlySpec {
// #manual-entity-consume-example-3
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import org.apache.pekko
import pekko.NotUsed
import pekko.actor.ActorSystem
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.model._
import pekko.util.ByteString
import pekko.stream.scaladsl.{ Flow, Sink, Source }
import pekko.util.ByteString

import scala.concurrent.{ ExecutionContext, Future }

implicit val system: ActorSystem = ActorSystem()
implicit val dispatcher: ExecutionContext = system.dispatcher
Expand Down Expand Up @@ -140,12 +135,12 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp

"manual-entity-discard-example-1" in compileOnlySpec {
// #manual-entity-discard-example-1
import scala.concurrent.ExecutionContext

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.http.scaladsl.model.HttpMessage.DiscardedEntity
import pekko.http.scaladsl.model._
import pekko.http.scaladsl.model.HttpMessage.DiscardedEntity

import scala.concurrent.ExecutionContext

implicit val system: ActorSystem = ActorSystem()
implicit val dispatcher: ExecutionContext = system.dispatcher
Expand All @@ -158,14 +153,13 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp
// #manual-entity-discard-example-1
}
"manual-entity-discard-example-2" in compileOnlySpec {
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import pekko.Done
import pekko.actor.ActorSystem
import pekko.http.scaladsl.model._
import pekko.stream.scaladsl.Sink

import scala.concurrent.{ ExecutionContext, Future }

implicit val system: ActorSystem = ActorSystem()
implicit val dispatcher: ExecutionContext = system.dispatcher

Expand All @@ -179,27 +173,25 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp

"host-level-queue-example" in compileOnlySpec {
// #host-level-queue-example
import scala.util.{ Failure, Success }
import scala.concurrent.{ Future, Promise }

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.model._
import pekko.stream.scaladsl._
import pekko.stream.QueueOfferResult

import pekko.stream.{ OverflowStrategy, QueueOfferResult }
import scala.concurrent.{ Future, Promise }
import scala.util.{ Failure, Success }

implicit val system: ActorSystem = ActorSystem()
import system.dispatcher // to get an implicit ExecutionContext into scope
implicit val system: ActorSystem = ActorSystem() // to get an implicit ExecutionContext into scope

val QueueSize = 10

// This idea came initially from this blog post (link broken):
// http://kazuhiro.github.io/scala/akka/akka-http/akka-streams/2016/01/31/connection-pooling-with-akka-http-and-source-queue.html
val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("pekko.apache.org")
val queue =
Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize)
.via(poolClientFlow)
.to(Sink.foreach {
case ((Success(resp), p)) => p.success(resp)
Expand All @@ -209,12 +201,12 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp

def queueRequest(request: HttpRequest): Future[HttpResponse] = {
val responsePromise = Promise[HttpResponse]()
queue.offer(request -> responsePromise).flatMap {
queue.offer(request -> responsePromise) match {
case QueueOfferResult.Enqueued => responsePromise.future
case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
case QueueOfferResult.Failure(ex) => Future.failed(ex)
case QueueOfferResult.QueueClosed => Future.failed(
new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
new RuntimeException("Queue was closed (pool shut down) while running the request."))
}
}

Expand All @@ -224,20 +216,18 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp

"host-level-streamed-example" in compileOnlySpec {
// #host-level-streamed-example
import java.nio.file.{ Path, Paths }

import scala.util.{ Failure, Success }
import scala.concurrent.Future

import org.apache.pekko
import pekko.NotUsed
import pekko.actor.ActorSystem
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.marshalling.Marshal
import pekko.http.scaladsl.model._
import pekko.http.scaladsl.model.Multipart.FormData
import pekko.stream.scaladsl._

import pekko.http.scaladsl.model.Multipart.FormData
import pekko.http.scaladsl.marshalling.Marshal
import java.nio.file.{ Path, Paths }
import scala.concurrent.Future
import scala.util.{ Failure, Success }

implicit val system: ActorSystem = ActorSystem()
import system.dispatcher // to get an implicit ExecutionContext into scope
Expand Down Expand Up @@ -289,9 +279,10 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp
}

"single-request-example" in compileOnlySpec {
import scala.concurrent.Future
import pekko.actor.ActorSystem
import pekko.http.scaladsl.model._

import scala.concurrent.Future
// #create-simple-request
HttpRequest(uri = "https://pekko.apache.org")

Expand Down Expand Up @@ -320,8 +311,8 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp
val response: HttpResponse = null
// #unmarshal-response-body
import org.apache.pekko
import pekko.http.scaladsl.unmarshalling.Unmarshal
import pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import pekko.http.scaladsl.unmarshalling.Unmarshal
import spray.json.DefaultJsonProtocol._
import spray.json.RootJsonFormat

Expand All @@ -343,8 +334,8 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp
class Myself extends Actor
with ActorLogging {

import pekko.pattern.pipe
import context.dispatcher
import pekko.pattern.pipe

implicit val system: ActorSystem = context.system
val http = Http(system)
Expand All @@ -370,12 +361,12 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp

"https-proxy-example-single-request" in compileOnlySpec {
// #https-proxy-example-single-request
import java.net.InetSocketAddress

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.http.scaladsl.{ ClientTransport, Http }

import java.net.InetSocketAddress

implicit val system = ActorSystem()

val proxyHost = "localhost"
Expand All @@ -391,12 +382,12 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp
}

"https-proxy-example-single-request with auth" in compileOnlySpec {
import java.net.InetSocketAddress

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.http.scaladsl.{ ClientTransport, Http }

import java.net.InetSocketAddress

implicit val system = ActorSystem()

val proxyHost = "localhost"
Expand Down
Loading