From 2f814a8bfb6eae197598a3ca22440deb4ea8c3ef Mon Sep 17 00:00:00 2001 From: "Taro L. Saito" Date: Tue, 9 Jan 2024 19:39:46 -0800 Subject: [PATCH] http (feature): Add HttpServer interface to NettyServer (#3330) --- .../benchmark/rpc_netty/RPCNetty.scala | 6 +-- .../airframe/http/netty/NettyServer.scala | 18 +++---- .../http/netty/NettyRPCServerTest.scala | 4 +- .../airframe/http/netty/NettyServerTest.scala | 3 +- .../http/okhttp/OkHttpClientTest.scala | 12 ++--- .../airframe/http/recorder/HttpRecorder.scala | 2 +- .../wvlet/airframe/http/HttpServer.scala | 49 +++++++++++++++++++ .../airframe/test/api/HelloRPCTest.scala | 10 ++-- docs/airframe-rpc.md | 7 +-- .../examples/http/Http_01_Interface.scala | 6 +-- .../examples/http/Http_02_ObjectMapping.scala | 6 +-- .../examples/http/Http_03_Client.scala | 6 +-- 12 files changed, 88 insertions(+), 41 deletions(-) create mode 100644 airframe-http/src/main/scala/wvlet/airframe/http/HttpServer.scala diff --git a/airframe-benchmark/src/main/scala/wvlet/airframe/benchmark/rpc_netty/RPCNetty.scala b/airframe-benchmark/src/main/scala/wvlet/airframe/benchmark/rpc_netty/RPCNetty.scala index a7ca0e835e..81253dc224 100644 --- a/airframe-benchmark/src/main/scala/wvlet/airframe/benchmark/rpc_netty/RPCNetty.scala +++ b/airframe-benchmark/src/main/scala/wvlet/airframe/benchmark/rpc_netty/RPCNetty.scala @@ -20,7 +20,7 @@ import wvlet.airframe.benchmark.http.HttpBenchmark.asyncIteration import wvlet.airframe.benchmark.http.{Greeter, NewServiceAsyncClient, NewServiceSyncClient} import wvlet.airframe.http.* import wvlet.airframe.http.client.{AsyncClient, SyncClient} -import wvlet.airframe.http.netty.{Netty, NettyServer} +import wvlet.airframe.http.netty.Netty import wvlet.log.LogSupport import java.util.concurrent.atomic.AtomicInteger @@ -37,10 +37,10 @@ class AirframeRPCNetty extends LogSupport { .withRouter(Greeter.router) .noLogging .design - .bind[SyncClient].toProvider { (server: NettyServer) => + .bind[SyncClient].toProvider { (server: HttpServer) => Http.client.noLogging.noClientFilter.newSyncClient(server.localAddress) } - .bind[AsyncClient].toProvider { (server: NettyServer) => + .bind[AsyncClient].toProvider { (server: HttpServer) => Http.client.noLogging.noClientFilter.newAsyncClient(server.localAddress) } .withProductionMode diff --git a/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyServer.scala b/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyServer.scala index cde7807ade..b2e690e4f4 100644 --- a/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyServer.scala +++ b/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyServer.scala @@ -87,13 +87,13 @@ case class NettyServerConfig( } def design: Design = { Design.newDesign - .bind[NettyServerConfig].toInstance(this) - .bind[NettyServer].toSingleton + .bind[NettyServer].toProvider { (s: Session) => newServer(s) } + .bind[HttpServer].to[NettyServer] } def designWithSyncClient: Design = { design - .bind[SyncClient].toProvider { (server: NettyServer) => + .bind[SyncClient].toProvider { (server: HttpServer) => Http.client.newSyncClient(server.localAddress) } } @@ -109,7 +109,7 @@ case class NettyServerConfig( } } -class NettyServer(config: NettyServerConfig, session: Session) extends AutoCloseable with LogSupport { +class NettyServer(config: NettyServerConfig, session: Session) extends HttpServer with LogSupport { private val httpLogger: HttpLogger = config.newHttpLogger private val loggingFilter: RxHttpFilter = config.loggingFilter(httpLogger) @@ -134,7 +134,7 @@ class NettyServer(config: NettyServerConfig, session: Session) extends AutoClose private var channelFuture: Option[Channel] = None - val localAddress: String = s"localhost:${config.port}" + override def localAddress: String = s"localhost:${config.port}" private def attachContextFilter = new RxHttpFilter { override def apply(request: HttpMessage.Request, next: RxHttpEndpoint): Rx[Response] = { @@ -248,7 +248,7 @@ class NettyServer(config: NettyServerConfig, session: Session) extends AutoClose channelFuture = Some(b.bind(config.port).sync().channel()) } - def stop(): Unit = { + override def stop(): Unit = { if (stopped.compareAndSet(false, true)) { info(s"Stopping ${config.name} server at ${localAddress}") workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS) @@ -258,15 +258,11 @@ class NettyServer(config: NettyServerConfig, session: Session) extends AutoClose } } - override def close(): Unit = { - stop() - } - /** * Await and block until the server terminates. If the server is already terminated (via close()), this method * returns immediately. */ - def awaitTermination(): Unit = { + override def awaitTermination(): Unit = { channelFuture.foreach(_.closeFuture().sync()) } } diff --git a/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyRPCServerTest.scala b/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyRPCServerTest.scala index 7d2f08a184..ce1c19289d 100644 --- a/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyRPCServerTest.scala +++ b/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyRPCServerTest.scala @@ -55,12 +55,12 @@ object NettyRPCServerTest extends AirSpec { } } - test("await server test") { (server: NettyServer) => + test("await server test") { (server: HttpServer) => Rx.delay(100, TimeUnit.MILLISECONDS).map(_ => server.close()) .join(Rx.single(() => server.awaitTermination())) } - test("close and await") { (server: NettyServer) => + test("close and await") { (server: HttpServer) => server.close() server.awaitTermination() } diff --git a/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyServerTest.scala b/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyServerTest.scala index 8df487c7d1..39a5999dfb 100644 --- a/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyServerTest.scala +++ b/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyServerTest.scala @@ -13,6 +13,7 @@ */ package wvlet.airframe.http.netty +import wvlet.airframe.http.HttpServer import wvlet.airspec.AirSpec class NettyServerTest extends AirSpec { @@ -34,7 +35,7 @@ class NettyServerTest extends AirSpec { } } - test("safely close multiple times") { (server: NettyServer) => + test("safely close multiple times") { (server: HttpServer) => server.close() server.close() } diff --git a/airframe-http-okhttp/src/test/scala/wvlet/airframe/http/okhttp/OkHttpClientTest.scala b/airframe-http-okhttp/src/test/scala/wvlet/airframe/http/okhttp/OkHttpClientTest.scala index 7b25a8a55a..078262512e 100644 --- a/airframe-http-okhttp/src/test/scala/wvlet/airframe/http/okhttp/OkHttpClientTest.scala +++ b/airframe-http-okhttp/src/test/scala/wvlet/airframe/http/okhttp/OkHttpClientTest.scala @@ -2,10 +2,10 @@ package wvlet.airframe.http.okhttp import wvlet.airframe.Design import wvlet.airframe.control.Control.withResource +import wvlet.airframe.http.* import wvlet.airframe.http.HttpMessage.{Request, Response} import wvlet.airframe.http.client.{AsyncClient, SyncClient} -import wvlet.airframe.http.netty.{Netty, NettyServer} -import wvlet.airframe.http.* +import wvlet.airframe.http.netty.Netty import wvlet.airspec.AirSpec import wvlet.log.LogSupport @@ -102,10 +102,10 @@ class OkHttpClientTest extends AirSpec { _.add( Netty.server .withRouter(r).design - .bind[SyncClient].toProvider { (server: NettyServer) => + .bind[SyncClient].toProvider { (server: HttpServer) => OkHttp.client.newSyncClient(server.localAddress) } - .bind[AsyncClient].toProvider { (server: NettyServer) => + .bind[AsyncClient].toProvider { (server: HttpServer) => OkHttp.client.newAsyncClient(server.localAddress) } ) @@ -227,7 +227,7 @@ class OkHttpClientTest extends AirSpec { test( "fail request", - design = _.bind[SyncClient].toProvider { (server: NettyServer) => + design = _.bind[SyncClient].toProvider { (server: HttpServer) => OkHttp.client .withRetryContext(_.withMaxRetry(3)) .noCircuitBreaker @@ -262,7 +262,7 @@ class OkHttpClientTest extends AirSpec { test( "read timeout", - design = _.bind[SyncClient].toProvider { (server: NettyServer) => + design = _.bind[SyncClient].toProvider { (server: HttpServer) => OkHttp.client .withReadTimeout(Duration(10, TimeUnit.MILLISECONDS)) .withRetryContext(_.withMaxRetry(1)) diff --git a/airframe-http-recorder/src/main/scala/wvlet/airframe/http/recorder/HttpRecorder.scala b/airframe-http-recorder/src/main/scala/wvlet/airframe/http/recorder/HttpRecorder.scala index 1817ec7d95..87b8adfab1 100644 --- a/airframe-http-recorder/src/main/scala/wvlet/airframe/http/recorder/HttpRecorder.scala +++ b/airframe-http-recorder/src/main/scala/wvlet/airframe/http/recorder/HttpRecorder.scala @@ -16,7 +16,7 @@ package wvlet.airframe.http.recorder import wvlet.airframe.http.HttpMessage.EmptyMessage import wvlet.airframe.http.{Http, HttpHeader, HttpMessage, HttpStatus, RxHttpEndpoint, ServerAddress} import wvlet.airframe.http.client.SyncClient -import wvlet.airframe.http.netty.{NettyBackend, NettyServer} +import wvlet.airframe.http.netty.NettyBackend import wvlet.airframe.rx.Rx import wvlet.log.LogSupport diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/HttpServer.scala b/airframe-http/src/main/scala/wvlet/airframe/http/HttpServer.scala new file mode 100644 index 0000000000..f3a3cd6585 --- /dev/null +++ b/airframe-http/src/main/scala/wvlet/airframe/http/HttpServer.scala @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package wvlet.airframe.http + +/** + * A common trait for Http server implementations + */ +trait HttpServer extends AutoCloseable { + + /** + * Await and block until the server is terminated. If the server is already terminated, it will return immediately. + */ + def awaitTermination(): Unit + + /** + * Stop the server + */ + def stop(): Unit + + /** + * Stop the server. When using Airframe DI, this method will be called automatically. + */ + override def close(): Unit = stop() + + /** + * Return the local server address in (host):(port) format. e.g., localhost:8080. This method can be used for + * creating a new Http client for the server. For example: + * {{{ + * Netty.server.withRouter(router).start { server => + * Using.resource(Http.client.newSyncClient(server.localAddress)) { client => + * val resp = client.send(Http.GET("/hello")) + * } + * } + * }}} + * @return + */ + def localAddress: String +} diff --git a/airframe-integration-test/src/test/scala/wvlet/airframe/test/api/HelloRPCTest.scala b/airframe-integration-test/src/test/scala/wvlet/airframe/test/api/HelloRPCTest.scala index eac6f59211..2f0d2cb2c3 100644 --- a/airframe-integration-test/src/test/scala/wvlet/airframe/test/api/HelloRPCTest.scala +++ b/airframe-integration-test/src/test/scala/wvlet/airframe/test/api/HelloRPCTest.scala @@ -14,8 +14,8 @@ package wvlet.airframe.test.api import wvlet.airframe.Design -import wvlet.airframe.http.{Http, RPCEncoding, RxRouter} -import wvlet.airframe.http.netty.{Netty, NettyServer} +import wvlet.airframe.http.netty.Netty +import wvlet.airframe.http.{Http, HttpServer, RPCEncoding, RxRouter} import wvlet.airframe.test.api.HelloRPC.VariousParams import wvlet.airspec.AirSpec @@ -26,15 +26,15 @@ class HelloRPCTest extends AirSpec { .withName("hello-rpc-test") .withRouter(RxRouter.of[HelloRPCImpl]) .design - .bind[ServiceRPC.RPCSyncClient].toProvider { (server: NettyServer) => + .bind[ServiceRPC.RPCSyncClient].toProvider { (server: HttpServer) => ServiceRPC.newRPCSyncClient(Http.client.newSyncClient(server.localAddress)) } - .bind[ServiceRPC.RPCAsyncClient].toProvider { (server: NettyServer) => + .bind[ServiceRPC.RPCAsyncClient].toProvider { (server: HttpServer) => ServiceRPC.newRPCAsyncClient(Http.client.newAsyncClient(server.localAddress)) } } - test("rpc") { (server: NettyServer) => + test("rpc") { (server: HttpServer) => test("sync client") { (client: ServiceRPC.RPCSyncClient) => test("String response") { client.HelloRPC.hello("RPC") shouldBe "Hello RPC!" diff --git a/docs/airframe-rpc.md b/docs/airframe-rpc.md index f01c23339f..e91b243f7c 100644 --- a/docs/airframe-rpc.md +++ b/docs/airframe-rpc.md @@ -397,7 +397,8 @@ tests. Here is an example of using Airframe DI for starting an RPC server: ```scala -import wvlet.airframe._ +import wvlet.airframe.* +import wvlet.airframe.http.* // Inject your component as constructor arguments class MyAPIImpl(myService: MyService) extends MyAPI { @@ -412,8 +413,8 @@ val design = newDesign .add(Netty.server.withRouter(router).design) // Launch a Netty Server -design.build[NettyServer] { server => - server.awaitForTerimination() +design.build[HttpServer] { server => + server.awaitTerimination() } ``` diff --git a/examples/src/main/scala/wvlet/airframe/examples/http/Http_01_Interface.scala b/examples/src/main/scala/wvlet/airframe/examples/http/Http_01_Interface.scala index d67feffc64..cd18c3010c 100644 --- a/examples/src/main/scala/wvlet/airframe/examples/http/Http_01_Interface.scala +++ b/examples/src/main/scala/wvlet/airframe/examples/http/Http_01_Interface.scala @@ -13,7 +13,7 @@ */ package wvlet.airframe.examples.http -import wvlet.airframe.http.{Endpoint, HttpMethod, RxRouter} +import wvlet.airframe.http.{Endpoint, HttpMethod, HttpServer, RxRouter} import wvlet.airframe.http.netty.{Netty, NettyServer} import wvlet.log.LogSupport @@ -22,7 +22,7 @@ import wvlet.log.LogSupport object Http_01_Interface extends App { case class User(id: String, name: String) - trait MyApp extends LogSupport { + class MyApp extends LogSupport { @Endpoint(method = HttpMethod.GET, path = "/user/:id") def getUser(id: String): User = { info(s"lookup user: ${id}") @@ -33,7 +33,7 @@ object Http_01_Interface extends App { val router = RxRouter.of[MyApp] val design = Netty.server.withName("myapp").withPort(18080).withRouter(router).design - design.build[NettyServer] { server => + design.build[HttpServer] { server => val serverAddress = server.localAddress // Wait server termination diff --git a/examples/src/main/scala/wvlet/airframe/examples/http/Http_02_ObjectMapping.scala b/examples/src/main/scala/wvlet/airframe/examples/http/Http_02_ObjectMapping.scala index be7fa06519..50101e594c 100644 --- a/examples/src/main/scala/wvlet/airframe/examples/http/Http_02_ObjectMapping.scala +++ b/examples/src/main/scala/wvlet/airframe/examples/http/Http_02_ObjectMapping.scala @@ -14,8 +14,8 @@ package wvlet.airframe.examples.http import wvlet.airframe.control.Control.withResource -import wvlet.airframe.http.{Endpoint, Http, HttpMessage, HttpMethod, HttpStatus, RxRouter} -import wvlet.airframe.http.netty.{Netty, NettyServer} +import wvlet.airframe.http.* +import wvlet.airframe.http.netty.{Netty} import wvlet.log.LogSupport /** @@ -64,7 +64,7 @@ object Http_02_ObjectMapping extends App with LogSupport { .withRouter(router) .design - design.build[NettyServer] { server => + design.build[HttpServer] { server => withResource(Http.client.newSyncClient(server.localAddress)) { client => val appInfo = client.readAs[AppInfo](Http.GET("/v1/info")) info(appInfo) // AppInfo(myapp,1.0) diff --git a/examples/src/main/scala/wvlet/airframe/examples/http/Http_03_Client.scala b/examples/src/main/scala/wvlet/airframe/examples/http/Http_03_Client.scala index f95016e041..2fffdf7251 100644 --- a/examples/src/main/scala/wvlet/airframe/examples/http/Http_03_Client.scala +++ b/examples/src/main/scala/wvlet/airframe/examples/http/Http_03_Client.scala @@ -16,7 +16,7 @@ package wvlet.airframe.examples.http import wvlet.airframe.Design import java.util.concurrent.TimeUnit -import wvlet.airframe.http.{Endpoint, Http, HttpMethod, RxRouter} +import wvlet.airframe.http.{Endpoint, Http, HttpMethod, HttpServer, RxRouter} import wvlet.airframe.http.client.SyncClient import wvlet.airframe.http.netty.{Netty, NettyServer} import wvlet.log.LogSupport @@ -27,7 +27,7 @@ import scala.concurrent.duration.Duration */ object Http_03_Client extends App with LogSupport { case class User(id: String, name: String) - trait MyApp extends LogSupport { + class MyApp extends LogSupport { @Endpoint(method = HttpMethod.GET, path = "/user/:id") def getUser(id: String): User = { User(id, "xxx") @@ -43,7 +43,7 @@ object Http_03_Client extends App with LogSupport { val clientDesign = Design.newDesign - .bind[SyncClient].toProvider { (server: NettyServer) => + .bind[SyncClient].toProvider { (server: HttpServer) => Http.client // Configure the request retry method .withRetryContext(