Skip to content

Commit

Permalink
http (feature): Add HttpServer interface to NettyServer (#3330)
Browse files Browse the repository at this point in the history
  • Loading branch information
xerial authored Jan 10, 2024
1 parent e285183 commit 2f814a8
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import wvlet.airframe.benchmark.http.HttpBenchmark.asyncIteration
import wvlet.airframe.benchmark.http.{Greeter, NewServiceAsyncClient, NewServiceSyncClient}
import wvlet.airframe.http.*
import wvlet.airframe.http.client.{AsyncClient, SyncClient}
import wvlet.airframe.http.netty.{Netty, NettyServer}
import wvlet.airframe.http.netty.Netty
import wvlet.log.LogSupport

import java.util.concurrent.atomic.AtomicInteger
Expand All @@ -37,10 +37,10 @@ class AirframeRPCNetty extends LogSupport {
.withRouter(Greeter.router)
.noLogging
.design
.bind[SyncClient].toProvider { (server: NettyServer) =>
.bind[SyncClient].toProvider { (server: HttpServer) =>
Http.client.noLogging.noClientFilter.newSyncClient(server.localAddress)
}
.bind[AsyncClient].toProvider { (server: NettyServer) =>
.bind[AsyncClient].toProvider { (server: HttpServer) =>
Http.client.noLogging.noClientFilter.newAsyncClient(server.localAddress)
}
.withProductionMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ case class NettyServerConfig(
}
def design: Design = {
Design.newDesign
.bind[NettyServerConfig].toInstance(this)
.bind[NettyServer].toSingleton
.bind[NettyServer].toProvider { (s: Session) => newServer(s) }
.bind[HttpServer].to[NettyServer]
}

def designWithSyncClient: Design = {
design
.bind[SyncClient].toProvider { (server: NettyServer) =>
.bind[SyncClient].toProvider { (server: HttpServer) =>
Http.client.newSyncClient(server.localAddress)
}
}
Expand All @@ -109,7 +109,7 @@ case class NettyServerConfig(
}
}

class NettyServer(config: NettyServerConfig, session: Session) extends AutoCloseable with LogSupport {
class NettyServer(config: NettyServerConfig, session: Session) extends HttpServer with LogSupport {

private val httpLogger: HttpLogger = config.newHttpLogger
private val loggingFilter: RxHttpFilter = config.loggingFilter(httpLogger)
Expand All @@ -134,7 +134,7 @@ class NettyServer(config: NettyServerConfig, session: Session) extends AutoClose

private var channelFuture: Option[Channel] = None

val localAddress: String = s"localhost:${config.port}"
override def localAddress: String = s"localhost:${config.port}"

private def attachContextFilter = new RxHttpFilter {
override def apply(request: HttpMessage.Request, next: RxHttpEndpoint): Rx[Response] = {
Expand Down Expand Up @@ -248,7 +248,7 @@ class NettyServer(config: NettyServerConfig, session: Session) extends AutoClose
channelFuture = Some(b.bind(config.port).sync().channel())
}

def stop(): Unit = {
override def stop(): Unit = {
if (stopped.compareAndSet(false, true)) {
info(s"Stopping ${config.name} server at ${localAddress}")
workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS)
Expand All @@ -258,15 +258,11 @@ class NettyServer(config: NettyServerConfig, session: Session) extends AutoClose
}
}

override def close(): Unit = {
stop()
}

/**
* Await and block until the server terminates. If the server is already terminated (via close()), this method
* returns immediately.
*/
def awaitTermination(): Unit = {
override def awaitTermination(): Unit = {
channelFuture.foreach(_.closeFuture().sync())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ object NettyRPCServerTest extends AirSpec {
}
}

test("await server test") { (server: NettyServer) =>
test("await server test") { (server: HttpServer) =>
Rx.delay(100, TimeUnit.MILLISECONDS).map(_ => server.close())
.join(Rx.single(() => server.awaitTermination()))
}

test("close and await") { (server: NettyServer) =>
test("close and await") { (server: HttpServer) =>
server.close()
server.awaitTermination()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package wvlet.airframe.http.netty

import wvlet.airframe.http.HttpServer
import wvlet.airspec.AirSpec

class NettyServerTest extends AirSpec {
Expand All @@ -34,7 +35,7 @@ class NettyServerTest extends AirSpec {
}
}

test("safely close multiple times") { (server: NettyServer) =>
test("safely close multiple times") { (server: HttpServer) =>
server.close()
server.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package wvlet.airframe.http.okhttp

import wvlet.airframe.Design
import wvlet.airframe.control.Control.withResource
import wvlet.airframe.http.*
import wvlet.airframe.http.HttpMessage.{Request, Response}
import wvlet.airframe.http.client.{AsyncClient, SyncClient}
import wvlet.airframe.http.netty.{Netty, NettyServer}
import wvlet.airframe.http.*
import wvlet.airframe.http.netty.Netty
import wvlet.airspec.AirSpec
import wvlet.log.LogSupport

Expand Down Expand Up @@ -102,10 +102,10 @@ class OkHttpClientTest extends AirSpec {
_.add(
Netty.server
.withRouter(r).design
.bind[SyncClient].toProvider { (server: NettyServer) =>
.bind[SyncClient].toProvider { (server: HttpServer) =>
OkHttp.client.newSyncClient(server.localAddress)
}
.bind[AsyncClient].toProvider { (server: NettyServer) =>
.bind[AsyncClient].toProvider { (server: HttpServer) =>
OkHttp.client.newAsyncClient(server.localAddress)
}
)
Expand Down Expand Up @@ -227,7 +227,7 @@ class OkHttpClientTest extends AirSpec {

test(
"fail request",
design = _.bind[SyncClient].toProvider { (server: NettyServer) =>
design = _.bind[SyncClient].toProvider { (server: HttpServer) =>
OkHttp.client
.withRetryContext(_.withMaxRetry(3))
.noCircuitBreaker
Expand Down Expand Up @@ -262,7 +262,7 @@ class OkHttpClientTest extends AirSpec {

test(
"read timeout",
design = _.bind[SyncClient].toProvider { (server: NettyServer) =>
design = _.bind[SyncClient].toProvider { (server: HttpServer) =>
OkHttp.client
.withReadTimeout(Duration(10, TimeUnit.MILLISECONDS))
.withRetryContext(_.withMaxRetry(1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package wvlet.airframe.http.recorder
import wvlet.airframe.http.HttpMessage.EmptyMessage
import wvlet.airframe.http.{Http, HttpHeader, HttpMessage, HttpStatus, RxHttpEndpoint, ServerAddress}
import wvlet.airframe.http.client.SyncClient
import wvlet.airframe.http.netty.{NettyBackend, NettyServer}
import wvlet.airframe.http.netty.NettyBackend
import wvlet.airframe.rx.Rx
import wvlet.log.LogSupport

Expand Down
49 changes: 49 additions & 0 deletions airframe-http/src/main/scala/wvlet/airframe/http/HttpServer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package wvlet.airframe.http

/**
* A common trait for Http server implementations
*/
trait HttpServer extends AutoCloseable {

/**
* Await and block until the server is terminated. If the server is already terminated, it will return immediately.
*/
def awaitTermination(): Unit

/**
* Stop the server
*/
def stop(): Unit

/**
* Stop the server. When using Airframe DI, this method will be called automatically.
*/
override def close(): Unit = stop()

/**
* Return the local server address in (host):(port) format. e.g., localhost:8080. This method can be used for
* creating a new Http client for the server. For example:
* {{{
* Netty.server.withRouter(router).start { server =>
* Using.resource(Http.client.newSyncClient(server.localAddress)) { client =>
* val resp = client.send(Http.GET("/hello"))
* }
* }
* }}}
* @return
*/
def localAddress: String
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
package wvlet.airframe.test.api

import wvlet.airframe.Design
import wvlet.airframe.http.{Http, RPCEncoding, RxRouter}
import wvlet.airframe.http.netty.{Netty, NettyServer}
import wvlet.airframe.http.netty.Netty
import wvlet.airframe.http.{Http, HttpServer, RPCEncoding, RxRouter}
import wvlet.airframe.test.api.HelloRPC.VariousParams
import wvlet.airspec.AirSpec

Expand All @@ -26,15 +26,15 @@ class HelloRPCTest extends AirSpec {
.withName("hello-rpc-test")
.withRouter(RxRouter.of[HelloRPCImpl])
.design
.bind[ServiceRPC.RPCSyncClient].toProvider { (server: NettyServer) =>
.bind[ServiceRPC.RPCSyncClient].toProvider { (server: HttpServer) =>
ServiceRPC.newRPCSyncClient(Http.client.newSyncClient(server.localAddress))
}
.bind[ServiceRPC.RPCAsyncClient].toProvider { (server: NettyServer) =>
.bind[ServiceRPC.RPCAsyncClient].toProvider { (server: HttpServer) =>
ServiceRPC.newRPCAsyncClient(Http.client.newAsyncClient(server.localAddress))
}
}

test("rpc") { (server: NettyServer) =>
test("rpc") { (server: HttpServer) =>
test("sync client") { (client: ServiceRPC.RPCSyncClient) =>
test("String response") {
client.HelloRPC.hello("RPC") shouldBe "Hello RPC!"
Expand Down
7 changes: 4 additions & 3 deletions docs/airframe-rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,8 @@ tests.
Here is an example of using Airframe DI for starting an RPC server:

```scala
import wvlet.airframe._
import wvlet.airframe.*
import wvlet.airframe.http.*

// Inject your component as constructor arguments
class MyAPIImpl(myService: MyService) extends MyAPI {
Expand All @@ -412,8 +413,8 @@ val design = newDesign
.add(Netty.server.withRouter(router).design)

// Launch a Netty Server
design.build[NettyServer] { server =>
server.awaitForTerimination()
design.build[HttpServer] { server =>
server.awaitTerimination()
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
package wvlet.airframe.examples.http

import wvlet.airframe.http.{Endpoint, HttpMethod, RxRouter}
import wvlet.airframe.http.{Endpoint, HttpMethod, HttpServer, RxRouter}
import wvlet.airframe.http.netty.{Netty, NettyServer}
import wvlet.log.LogSupport

Expand All @@ -22,7 +22,7 @@ import wvlet.log.LogSupport
object Http_01_Interface extends App {
case class User(id: String, name: String)

trait MyApp extends LogSupport {
class MyApp extends LogSupport {
@Endpoint(method = HttpMethod.GET, path = "/user/:id")
def getUser(id: String): User = {
info(s"lookup user: ${id}")
Expand All @@ -33,7 +33,7 @@ object Http_01_Interface extends App {
val router = RxRouter.of[MyApp]
val design = Netty.server.withName("myapp").withPort(18080).withRouter(router).design

design.build[NettyServer] { server =>
design.build[HttpServer] { server =>
val serverAddress = server.localAddress

// Wait server termination
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
package wvlet.airframe.examples.http

import wvlet.airframe.control.Control.withResource
import wvlet.airframe.http.{Endpoint, Http, HttpMessage, HttpMethod, HttpStatus, RxRouter}
import wvlet.airframe.http.netty.{Netty, NettyServer}
import wvlet.airframe.http.*
import wvlet.airframe.http.netty.{Netty}
import wvlet.log.LogSupport

/**
Expand Down Expand Up @@ -64,7 +64,7 @@ object Http_02_ObjectMapping extends App with LogSupport {
.withRouter(router)
.design

design.build[NettyServer] { server =>
design.build[HttpServer] { server =>
withResource(Http.client.newSyncClient(server.localAddress)) { client =>
val appInfo = client.readAs[AppInfo](Http.GET("/v1/info"))
info(appInfo) // AppInfo(myapp,1.0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package wvlet.airframe.examples.http
import wvlet.airframe.Design

import java.util.concurrent.TimeUnit
import wvlet.airframe.http.{Endpoint, Http, HttpMethod, RxRouter}
import wvlet.airframe.http.{Endpoint, Http, HttpMethod, HttpServer, RxRouter}
import wvlet.airframe.http.client.SyncClient
import wvlet.airframe.http.netty.{Netty, NettyServer}
import wvlet.log.LogSupport
Expand All @@ -27,7 +27,7 @@ import scala.concurrent.duration.Duration
*/
object Http_03_Client extends App with LogSupport {
case class User(id: String, name: String)
trait MyApp extends LogSupport {
class MyApp extends LogSupport {
@Endpoint(method = HttpMethod.GET, path = "/user/:id")
def getUser(id: String): User = {
User(id, "xxx")
Expand All @@ -43,7 +43,7 @@ object Http_03_Client extends App with LogSupport {

val clientDesign =
Design.newDesign
.bind[SyncClient].toProvider { (server: NettyServer) =>
.bind[SyncClient].toProvider { (server: HttpServer) =>
Http.client
// Configure the request retry method
.withRetryContext(
Expand Down

0 comments on commit 2f814a8

Please sign in to comment.