Skip to content

Commit

Permalink
add SdkAsyncHttpClientH1TestSuite (#829)
Browse files Browse the repository at this point in the history
* SdkAsyncHttpClientH1TestSuite

* fix naughtyHeaderCharactersDoNotGetToServer test

exceptions in toPekkoRequest should be wrapped in a future failed

* add comment about connectionReceiveServerErrorStatusShouldNotReuseConnection

* scala 2.12 strikes again

* silence netty logs

SdkAsyncHttpClientH1TestSuite uses netty to simulate a server...

* fix org.junit.platform.commons.JUnitException when loading test in scala 3

Error was:
org.junit.platform.commons.JUnitException: ClassSelector [className = 'org.apache.pekko.stream.connectors.awsspi.RequestRunnerSpec$$anon$1', classLoader = java.net.URLClassLoader@79baaf07] resolution failed : ClassSelector [className = 'org.apache.pekko.stream.connectors.awsspi.RequestRunnerSpec$$anon$1', classLoader = java.net.URLClassLoader@79baaf07] resolution failed
  • Loading branch information
jtjeferreira authored Sep 27, 2024
1 parent 3cb1ed8 commit 56a2ee8
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.{ CompletableFuture, TimeUnit }

import org.apache.pekko
import pekko.actor.{ ActorSystem, ClassicActorSystemProvider }
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.{ ConnectionContext, Http, HttpsConnectionContext }
import pekko.http.scaladsl.model.HttpHeader.ParsingResult
import pekko.http.scaladsl.model.HttpHeader.ParsingResult.Ok
import pekko.http.scaladsl.model.MediaType.Compressible
Expand All @@ -39,11 +39,18 @@ import software.amazon.awssdk.http.async._
import software.amazon.awssdk.http.{ SdkHttpConfigurationOption, SdkHttpRequest }
import software.amazon.awssdk.utils.AttributeMap

import java.security.SecureRandom
import java.security.cert.X509Certificate
import javax.net.ssl._
import scala.collection.immutable
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, ExecutionContext }

class PekkoHttpClient(shutdownHandle: () => Unit, private[awsspi] val connectionSettings: ConnectionPoolSettings)(
class PekkoHttpClient(
shutdownHandle: () => Unit,
private[awsspi] val connectionSettings: ConnectionPoolSettings,
private[awsspi] val connectionContext: HttpsConnectionContext
)(
implicit
actorSystem: ActorSystem,
ec: ExecutionContext,
Expand All @@ -53,9 +60,11 @@ class PekkoHttpClient(shutdownHandle: () => Unit, private[awsspi] val connection
lazy val runner = new RequestRunner()

override def execute(request: AsyncExecuteRequest): CompletableFuture[Void] = {
val pekkoHttpRequest = toPekkoRequest(request.request(), request.requestContentPublisher())
runner.run(
() => Http().singleRequest(pekkoHttpRequest, settings = connectionSettings),
() => {
val pekkoHttpRequest = toPekkoRequest(request.request(), request.requestContentPublisher())
Http().singleRequest(pekkoHttpRequest, settings = connectionSettings, connectionContext = connectionContext)
},
request.responseHandler())
}

Expand Down Expand Up @@ -185,14 +194,20 @@ object PekkoHttpClient {
connectionPoolSettings.getOrElse(ConnectionPoolSettings(as)),
resolvedOptions
)

val connectionContext =
if (resolvedOptions.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES).booleanValue())
ConnectionContext.httpsClient(createInsecureSslEngine _)
else ConnectionContext.httpsClient(SSLContext.getDefault)

val shutdownhandleF = () => {
if (actorSystem.isEmpty) {
Await.result(Http().shutdownAllConnectionPools().flatMap(_ => as.terminate()),
Duration.apply(10, TimeUnit.SECONDS))
}
()
}
new PekkoHttpClient(shutdownhandleF, cps)(as, ec, mat)
new PekkoHttpClient(shutdownhandleF, cps, connectionContext)(as, ec, mat)
}
def withActorSystem(actorSystem: ActorSystem): PekkoHttpClientBuilder = copy(actorSystem = Some(actorSystem))
def withActorSystem(actorSystem: ClassicActorSystemProvider): PekkoHttpClientBuilder =
Expand Down Expand Up @@ -223,4 +238,34 @@ object PekkoHttpClient {
"application/x-www-form-urlencoded; charset-UTF-8" -> formUrlEncoded,
"application/x-www-form-urlencoded" -> formUrlEncoded,
"application/xml" -> applicationXml)

private def createInsecureSslEngine(host: String, port: Int): SSLEngine = {
val engine = createTrustfulSslContext().createSSLEngine(host, port)
engine.setUseClientMode(true)

// WARNING: this creates an SSL Engine without enabling endpoint identification/verification procedures
// Disabling host name verification is a very bad idea, please don't unless you have a very good reason to.
// When in doubt, use the `ConnectionContext.httpsClient` that takes an `SSLContext` instead, or enable with:
// engine.setSSLParameters({
// val params = engine.getSSLParameters
// params.setEndpointIdentificationAlgorithm("https")
// params
// })

engine
}

private def createTrustfulSslContext(): SSLContext = {
object NoCheckX509TrustManager extends X509TrustManager {
override def checkClientTrusted(chain: Array[X509Certificate], authType: String) = ()

override def checkServerTrusted(chain: Array[X509Certificate], authType: String) = ()

override def getAcceptedIssuers = Array[X509Certificate]()
}

val context = SSLContext.getInstance("TLS")
context.init(Array[KeyManager](), Array(NoCheckX509TrustManager), new SecureRandom())
context
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,16 @@ import pekko.http.scaladsl.model.headers.{ `Content-Length`, `Content-Type` }
import pekko.stream.Materializer
import pekko.stream.scaladsl.{ Keep, Sink }
import pekko.util.FutureConverters
import org.slf4j.LoggerFactory
import software.amazon.awssdk.http.SdkHttpFullResponse
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler

import scala.concurrent.{ ExecutionContext, Future }

class RequestRunner()(implicit ec: ExecutionContext, mat: Materializer) {

val logger = LoggerFactory.getLogger(this.getClass)

def run(runRequest: () => Future[HttpResponse], handler: SdkAsyncHttpResponseHandler): CompletableFuture[Void] = {
val result = runRequest().flatMap { response =>
// Future.unit.flatMap(expr) is a scala 2.12 equivalent of Future.delegate(expr)
val result = Future.unit.flatMap(_ => runRequest()).flatMap { response =>
handler.onHeaders(toSdkHttpFullResponse(response))

val (complete, publisher) = response.entity.dataBytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.pekko.stream.connectors.awsspi.PekkoHttpAsyncHttpService;
import org.junit.Rule;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
import org.testcontainers.containers.GenericContainer;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.core.ResponseBytes;
Expand All @@ -44,7 +43,7 @@

import static org.junit.Assert.assertEquals;

public class S3Test extends JUnitSuite {
public class S3Test {

private static final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
private static SecureRandom rnd = new SecureRandom();
Expand Down
2 changes: 1 addition & 1 deletion aws-spi-pekko-http/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<appender-ref ref="STDOUT" />
</root>

<logger name="io.netty" level="trace" additivity="false">
<logger name="io.netty" level="info" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<logger name="com.typesafe" level="error" additivity="false">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.connectors.awsspi

import software.amazon.awssdk.http.{ SdkAsyncHttpClientH1TestSuite, SdkHttpConfigurationOption }
import software.amazon.awssdk.http.async.SdkAsyncHttpClient
import software.amazon.awssdk.utils.AttributeMap

class PekkoHttpClientH1TestSuite extends SdkAsyncHttpClientH1TestSuite {

override def setupClient(): SdkAsyncHttpClient = {
PekkoHttpClient.builder().buildWithDefaults(
AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, Boolean.box(true)).build());
}

// Failed tests
// The logic to not reuse connections on server error status is not implemented in PekkoHttpClient, and
// it seems that it is being reverted in https://github.com/aws/aws-sdk-java-v2/pull/5607
override def connectionReceiveServerErrorStatusShouldNotReuseConnection(): Unit = ()

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,18 @@ class RequestRunnerSpec extends AnyWordSpec with Matchers with OptionValues {
handler.responseHeaders.headers().get("Content-Length").get(0) shouldBe "2"
}

class MyHeaderHandler() extends SdkAsyncHttpResponseHandler {
private class MyHeaderHandler() extends SdkAsyncHttpResponseHandler {
private val headers = new AtomicReference[SdkHttpResponse](null)
def responseHeaders = headers.get()
override def onHeaders(headers: SdkHttpResponse): Unit = this.headers.set(headers)
override def onStream(stream: Publisher[ByteBuffer]): Unit = stream.subscribe(new Subscriber[ByteBuffer] {
override def onSubscribe(s: Subscription): Unit = s.request(1000)
override def onNext(t: ByteBuffer): Unit = ()
override def onError(t: Throwable): Unit = ()
override def onComplete(): Unit = ()
})
override def onStream(stream: Publisher[ByteBuffer]): Unit = stream.subscribe(new MySubscriber)
override def onError(error: Throwable): Unit = ()
}

private class MySubscriber() extends Subscriber[ByteBuffer] {
override def onSubscribe(s: Subscription): Unit = s.request(1000)
override def onNext(t: ByteBuffer): Unit = ()
override def onError(t: Throwable): Unit = ()
override def onComplete(): Unit = ()
}
}
5 changes: 4 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import sbt._
import Common.isScala3
import Keys._
import com.github.sbt.junit.jupiter.sbt.Import.JupiterKeys

object Dependencies {

Expand Down Expand Up @@ -142,9 +143,11 @@ object Dependencies {
ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
("software.amazon.awssdk" % "s3" % AwsSdk2Version % "it,test").excludeAll(
ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
("software.amazon.awssdk" % "http-client-tests" % AwsSdk2Version % "it,test").excludeAll(
ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
"com.dimafeng" %% "testcontainers-scala" % TestContainersScalaTestVersion % Test,
"com.github.sbt.junit" % "jupiter-interface" % JupiterKeys.jupiterVersion.value % Test,
"org.scalatest" %% "scalatest" % ScalaTestVersion % "it,test",
"org.scalatestplus" %% "junit-4-13" % scalaTestScalaCheckVersion % "it,test",
"ch.qos.logback" % "logback-classic" % LogbackVersion % "it,test"))

val AwsLambda = Seq(
Expand Down
2 changes: 2 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.2")
addSbtPlugin("org.apache.pekko" % "pekko-grpc-sbt-plugin" % "1.1.0-M1")
// templating
addSbtPlugin("com.github.sbt" % "sbt-boilerplate" % "0.7.0")
// Run JUnit 5 tests with sbt
addSbtPlugin("com.github.sbt.junit" % "sbt-jupiter-interface" % "0.13.0")

0 comments on commit 56a2ee8

Please sign in to comment.