diff --git a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala index df57812e2..b3ad36746 100644 --- a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala +++ b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala @@ -21,7 +21,7 @@ import java.util.concurrent.{ CompletableFuture, TimeUnit } import org.apache.pekko import pekko.actor.{ ActorSystem, ClassicActorSystemProvider } -import pekko.http.scaladsl.Http +import pekko.http.scaladsl.{ ConnectionContext, Http, HttpsConnectionContext } import pekko.http.scaladsl.model.HttpHeader.ParsingResult import pekko.http.scaladsl.model.HttpHeader.ParsingResult.Ok import pekko.http.scaladsl.model.MediaType.Compressible @@ -39,11 +39,18 @@ import software.amazon.awssdk.http.async._ import software.amazon.awssdk.http.{ SdkHttpConfigurationOption, SdkHttpRequest } import software.amazon.awssdk.utils.AttributeMap +import java.security.SecureRandom +import java.security.cert.X509Certificate +import javax.net.ssl._ import scala.collection.immutable import scala.concurrent.duration.Duration import scala.concurrent.{ Await, ExecutionContext } -class PekkoHttpClient(shutdownHandle: () => Unit, private[awsspi] val connectionSettings: ConnectionPoolSettings)( +class PekkoHttpClient( + shutdownHandle: () => Unit, + private[awsspi] val connectionSettings: ConnectionPoolSettings, + private[awsspi] val connectionContext: HttpsConnectionContext +)( implicit actorSystem: ActorSystem, ec: ExecutionContext, @@ -53,9 +60,11 @@ class PekkoHttpClient(shutdownHandle: () => Unit, private[awsspi] val connection lazy val runner = new RequestRunner() override def execute(request: AsyncExecuteRequest): CompletableFuture[Void] = { - val pekkoHttpRequest = toPekkoRequest(request.request(), request.requestContentPublisher()) runner.run( - () => Http().singleRequest(pekkoHttpRequest, settings = connectionSettings), + () => { + val pekkoHttpRequest = toPekkoRequest(request.request(), request.requestContentPublisher()) + Http().singleRequest(pekkoHttpRequest, settings = connectionSettings, connectionContext = connectionContext) + }, request.responseHandler()) } @@ -185,6 +194,12 @@ object PekkoHttpClient { connectionPoolSettings.getOrElse(ConnectionPoolSettings(as)), resolvedOptions ) + + val connectionContext = + if (resolvedOptions.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES).booleanValue()) + ConnectionContext.httpsClient(createInsecureSslEngine _) + else ConnectionContext.httpsClient(SSLContext.getDefault) + val shutdownhandleF = () => { if (actorSystem.isEmpty) { Await.result(Http().shutdownAllConnectionPools().flatMap(_ => as.terminate()), @@ -192,7 +207,7 @@ object PekkoHttpClient { } () } - new PekkoHttpClient(shutdownhandleF, cps)(as, ec, mat) + new PekkoHttpClient(shutdownhandleF, cps, connectionContext)(as, ec, mat) } def withActorSystem(actorSystem: ActorSystem): PekkoHttpClientBuilder = copy(actorSystem = Some(actorSystem)) def withActorSystem(actorSystem: ClassicActorSystemProvider): PekkoHttpClientBuilder = @@ -223,4 +238,34 @@ object PekkoHttpClient { "application/x-www-form-urlencoded; charset-UTF-8" -> formUrlEncoded, "application/x-www-form-urlencoded" -> formUrlEncoded, "application/xml" -> applicationXml) + + private def createInsecureSslEngine(host: String, port: Int): SSLEngine = { + val engine = createTrustfulSslContext().createSSLEngine(host, port) + engine.setUseClientMode(true) + + // WARNING: this creates an SSL Engine without enabling endpoint identification/verification procedures + // Disabling host name verification is a very bad idea, please don't unless you have a very good reason to. + // When in doubt, use the `ConnectionContext.httpsClient` that takes an `SSLContext` instead, or enable with: + // engine.setSSLParameters({ + // val params = engine.getSSLParameters + // params.setEndpointIdentificationAlgorithm("https") + // params + // }) + + engine + } + + private def createTrustfulSslContext(): SSLContext = { + object NoCheckX509TrustManager extends X509TrustManager { + override def checkClientTrusted(chain: Array[X509Certificate], authType: String) = () + + override def checkServerTrusted(chain: Array[X509Certificate], authType: String) = () + + override def getAcceptedIssuers = Array[X509Certificate]() + } + + val context = SSLContext.getInstance("TLS") + context.init(Array[KeyManager](), Array(NoCheckX509TrustManager), new SecureRandom()) + context + } } diff --git a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala index fa1335bce..ac01b0edb 100644 --- a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala +++ b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala @@ -26,7 +26,6 @@ import pekko.http.scaladsl.model.headers.{ `Content-Length`, `Content-Type` } import pekko.stream.Materializer import pekko.stream.scaladsl.{ Keep, Sink } import pekko.util.FutureConverters -import org.slf4j.LoggerFactory import software.amazon.awssdk.http.SdkHttpFullResponse import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler @@ -34,10 +33,9 @@ import scala.concurrent.{ ExecutionContext, Future } class RequestRunner()(implicit ec: ExecutionContext, mat: Materializer) { - val logger = LoggerFactory.getLogger(this.getClass) - def run(runRequest: () => Future[HttpResponse], handler: SdkAsyncHttpResponseHandler): CompletableFuture[Void] = { - val result = runRequest().flatMap { response => + // Future.unit.flatMap(expr) is a scala 2.12 equivalent of Future.delegate(expr) + val result = Future.unit.flatMap(_ => runRequest()).flatMap { response => handler.onHeaders(toSdkHttpFullResponse(response)) val (complete, publisher) = response.entity.dataBytes diff --git a/aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/S3Test.java b/aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/s3/S3Test.java similarity index 98% rename from aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/S3Test.java rename to aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/s3/S3Test.java index 528b285dd..3261e21ea 100644 --- a/aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/S3Test.java +++ b/aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/s3/S3Test.java @@ -21,7 +21,6 @@ import org.apache.pekko.stream.connectors.awsspi.PekkoHttpAsyncHttpService; import org.junit.Rule; import org.junit.Test; -import org.scalatestplus.junit.JUnitSuite; import org.testcontainers.containers.GenericContainer; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; import software.amazon.awssdk.core.ResponseBytes; @@ -44,7 +43,7 @@ import static org.junit.Assert.assertEquals; -public class S3Test extends JUnitSuite { +public class S3Test { private static final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; private static SecureRandom rnd = new SecureRandom(); diff --git a/aws-spi-pekko-http/src/test/resources/logback-test.xml b/aws-spi-pekko-http/src/test/resources/logback-test.xml index 5d1d29de5..bf6018724 100644 --- a/aws-spi-pekko-http/src/test/resources/logback-test.xml +++ b/aws-spi-pekko-http/src/test/resources/logback-test.xml @@ -15,7 +15,7 @@ - + diff --git a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientH1TestSuite.scala b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientH1TestSuite.scala new file mode 100644 index 000000000..06148812b --- /dev/null +++ b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientH1TestSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.connectors.awsspi + +import software.amazon.awssdk.http.{ SdkAsyncHttpClientH1TestSuite, SdkHttpConfigurationOption } +import software.amazon.awssdk.http.async.SdkAsyncHttpClient +import software.amazon.awssdk.utils.AttributeMap + +class PekkoHttpClientH1TestSuite extends SdkAsyncHttpClientH1TestSuite { + + override def setupClient(): SdkAsyncHttpClient = { + PekkoHttpClient.builder().buildWithDefaults( + AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, Boolean.box(true)).build()); + } + + // Failed tests + // The logic to not reuse connections on server error status is not implemented in PekkoHttpClient, and + // it seems that it is being reverted in https://github.com/aws/aws-sdk-java-v2/pull/5607 + override def connectionReceiveServerErrorStatusShouldNotReuseConnection(): Unit = () + +} diff --git a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunnerSpec.scala b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunnerSpec.scala index b899f3efd..4bfb42a8e 100644 --- a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunnerSpec.scala +++ b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunnerSpec.scala @@ -48,16 +48,18 @@ class RequestRunnerSpec extends AnyWordSpec with Matchers with OptionValues { handler.responseHeaders.headers().get("Content-Length").get(0) shouldBe "2" } - class MyHeaderHandler() extends SdkAsyncHttpResponseHandler { + private class MyHeaderHandler() extends SdkAsyncHttpResponseHandler { private val headers = new AtomicReference[SdkHttpResponse](null) def responseHeaders = headers.get() override def onHeaders(headers: SdkHttpResponse): Unit = this.headers.set(headers) - override def onStream(stream: Publisher[ByteBuffer]): Unit = stream.subscribe(new Subscriber[ByteBuffer] { - override def onSubscribe(s: Subscription): Unit = s.request(1000) - override def onNext(t: ByteBuffer): Unit = () - override def onError(t: Throwable): Unit = () - override def onComplete(): Unit = () - }) + override def onStream(stream: Publisher[ByteBuffer]): Unit = stream.subscribe(new MySubscriber) override def onError(error: Throwable): Unit = () } + + private class MySubscriber() extends Subscriber[ByteBuffer] { + override def onSubscribe(s: Subscription): Unit = s.request(1000) + override def onNext(t: ByteBuffer): Unit = () + override def onError(t: Throwable): Unit = () + override def onComplete(): Unit = () + } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 137bb6b95..2d7ae2901 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -10,6 +10,7 @@ import sbt._ import Common.isScala3 import Keys._ +import com.github.sbt.junit.jupiter.sbt.Import.JupiterKeys object Dependencies { @@ -142,9 +143,11 @@ object Dependencies { ExclusionRule("software.amazon.awssdk", "netty-nio-client")), ("software.amazon.awssdk" % "s3" % AwsSdk2Version % "it,test").excludeAll( ExclusionRule("software.amazon.awssdk", "netty-nio-client")), + ("software.amazon.awssdk" % "http-client-tests" % AwsSdk2Version % "it,test").excludeAll( + ExclusionRule("software.amazon.awssdk", "netty-nio-client")), "com.dimafeng" %% "testcontainers-scala" % TestContainersScalaTestVersion % Test, + "com.github.sbt.junit" % "jupiter-interface" % JupiterKeys.jupiterVersion.value % Test, "org.scalatest" %% "scalatest" % ScalaTestVersion % "it,test", - "org.scalatestplus" %% "junit-4-13" % scalaTestScalaCheckVersion % "it,test", "ch.qos.logback" % "logback-classic" % LogbackVersion % "it,test")) val AwsLambda = Seq( diff --git a/project/plugins.sbt b/project/plugins.sbt index 37abc6f69..609ec7be5 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -31,3 +31,5 @@ addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.2") addSbtPlugin("org.apache.pekko" % "pekko-grpc-sbt-plugin" % "1.1.0-M1") // templating addSbtPlugin("com.github.sbt" % "sbt-boilerplate" % "0.7.0") +// Run JUnit 5 tests with sbt +addSbtPlugin("com.github.sbt.junit" % "sbt-jupiter-interface" % "0.13.0")