Skip to content

Commit

Permalink
internal: Add default http client performance test in the benchmark (#…
Browse files Browse the repository at this point in the history
…2945)

- Use async client
- Add default client test
  • Loading branch information
xerial authored May 8, 2023
1 parent 5d86f11 commit 1871039
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole
import wvlet.airframe.Session
import wvlet.airframe.benchmark.http.protojava.ProtoJavaGreeter
import wvlet.airframe.http.Http
import wvlet.airframe.http.client.SyncClient
import wvlet.airframe.http.finagle.{Finagle, FinagleClient, FinagleServer, FinagleSyncClient}
import wvlet.airframe.http.grpc.gRPC
import wvlet.log.LogSupport
Expand Down Expand Up @@ -51,9 +53,13 @@ class AirframeFinagle extends LogSupport {
.bind[FinagleClient].toProvider { (server: FinagleServer) =>
Finagle.client.newClient(server.localAddress)
}
.bind[SyncClient].toProvider { (server: FinagleServer) =>
Http.client.noLogging.newSyncClient(server.localAddress)
}
.withProductionMode
private var session: Option[Session] = None
private var client: ServiceSyncClient[Request, Response] = null
private var syncClient: NewServiceSyncClient = null
private var asyncClient: ServiceClient[Future, Request, Response] = null

@Setup
Expand All @@ -62,13 +68,15 @@ class AirframeFinagle extends LogSupport {
s.start
session = Some(s)
client = new ServiceSyncClient(s.build[FinagleSyncClient])
syncClient = new NewServiceSyncClient(s.build[SyncClient])
asyncClient = new ServiceClient(s.build[FinagleClient])
}

@TearDown
def teardown: Unit = {
session.foreach(_.shutdown)
client.close()
syncClient.close()
asyncClient.close()
}

Expand All @@ -77,6 +85,11 @@ class AirframeFinagle extends LogSupport {
blackhole.consume(client.Greeter.hello("RPC"))
}

@Benchmark
def rpcSyncDefault(blackhole: Blackhole): Unit = {
blackhole.consume(syncClient.Greeter.hello("RPC"))
}

@Benchmark
@OperationsPerInvocation(asyncIteration)
def rpcAsync(blackhole: Blackhole): Unit = {
Expand Down Expand Up @@ -132,17 +145,19 @@ class AirframeGrpc extends LogSupport {
def rpcAsync(blackhole: Blackhole): Unit = {
val counter = new AtomicInteger(0)
for (i <- 0 until asyncIteration) {
asyncClient.Greeter.hello(
"RPC",
new StreamObserver[String] {
override def onNext(v: String): Unit = {
blackhole.consume(v)
blackhole.consume(
asyncClient.Greeter.hello(
"RPC",
new StreamObserver[String] {
override def onNext(v: String): Unit = {
blackhole.consume(v)
}
override def onError(t: Throwable): Unit = {}
override def onCompleted(): Unit = {
counter.incrementAndGet()
}
}
override def onError(t: Throwable): Unit = {}
override def onCompleted(): Unit = {
counter.incrementAndGet()
}
}
)
)
}
while (counter.get() != asyncIteration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole
import wvlet.airframe.Session
import wvlet.airframe.benchmark.http.HttpBenchmark.asyncIteration
import wvlet.airframe.benchmark.http.{Greeter, NewServiceSyncClient, ServiceClient}
import wvlet.airframe.benchmark.http.{Greeter, NewServiceAsyncClient, NewServiceSyncClient, ServiceClient}
import wvlet.airframe.http._
import wvlet.airframe.http.client.{AsyncClient, SyncClient}
import wvlet.airframe.http.finagle.{Finagle, FinagleClient}
import wvlet.airframe.http.netty.{Netty, NettyServer}
import wvlet.airframe.rx.Rx
import wvlet.log.LogSupport

import java.util.concurrent.atomic.AtomicInteger
Expand All @@ -37,25 +38,20 @@ class AirframeRPCNetty extends LogSupport {
private val design =
Netty.server
.withRouter(Greeter.router)
// .noLoggingFilter
.noLogging
.design
.bind[SyncClient].toProvider { (server: NettyServer) =>
Http.client.newSyncClient(server.localAddress)
Http.client.noLogging.newSyncClient(server.localAddress)
}
.bind[AsyncClient].toProvider { (server: NettyServer) =>
Http.client.newAsyncClient(server.localAddress)
}
.bind[FinagleClient].toProvider { (server: NettyServer) =>
Finagle.client.newClient(server.localAddress)
Http.client.noLogging.newAsyncClient(server.localAddress)
}
.withProductionMode

private var session: Option[Session] = None

private var client: NewServiceSyncClient = null
// private var asyncClient: NewServiceAsyncClient = null
private var asyncClient: ServiceClient[Future, com.twitter.finagle.http.Request, com.twitter.finagle.http.Response] =
null
private var client: NewServiceSyncClient = null
private var asyncClient: NewServiceAsyncClient = null

private val ec = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())

Expand All @@ -65,8 +61,7 @@ class AirframeRPCNetty extends LogSupport {
s.start
session = Some(s)
client = new NewServiceSyncClient(s.build[SyncClient])
// asyncClient = new NewServiceAsyncClient(s.build[AsyncClient])
asyncClient = new ServiceClient(s.build[FinagleClient])
asyncClient = new NewServiceAsyncClient(s.build[AsyncClient])
}

@TearDown
Expand All @@ -82,31 +77,22 @@ class AirframeRPCNetty extends LogSupport {
blackhole.consume(client.Greeter.hello("RPC"))
}

// TODO: AsyncClient with JavaClientChannel can be faster
// @Benchmark
// @OperationsPerInvocation(asyncIteration)
// def rpcAsync(blackhole: Blackhole): Unit = {
// val counter = new AtomicInteger(0)
// val futures = for (i <- 0 until asyncIteration) yield {
// asyncClient.Greeter
// .hello("RPC").onComplete { x =>
// counter.incrementAndGet()
// }(ec)
// }
// while (counter.get() != asyncIteration) {
// Thread.sleep(0)
// }
// }

@Benchmark
@OperationsPerInvocation(asyncIteration)
def rpcAsync(blackhole: Blackhole): Unit = {
val counter = new AtomicInteger(0)
val futures = for (i <- 0 until asyncIteration) yield {
asyncClient.Greeter
.hello("RPC").foreach { x =>
for (i <- 0 until asyncIteration) {
val rx = asyncClient.Greeter
.hello("RPC").map { x =>
counter.incrementAndGet()
}
blackhole.consume(
ec.submit {
new Runnable {
override def run(): Unit = rx.run(_ => ())
}
}
)
}
while (counter.get() != asyncIteration) {
Thread.sleep(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,27 @@ import io.netty.handler.codec.http.{
HttpVersion
}
import io.netty.handler.stream.ChunkedWriteHandler
import org.openjdk.jmh.annotations.{Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, Setup, State, TearDown}
import org.openjdk.jmh.annotations.{
Benchmark,
BenchmarkMode,
Mode,
OperationsPerInvocation,
OutputTimeUnit,
Scope,
Setup,
State,
TearDown
}
import org.openjdk.jmh.infra.Blackhole
import wvlet.airframe.benchmark.http.HttpBenchmark
import wvlet.airframe.benchmark.http.HttpBenchmark.asyncIteration
import wvlet.airframe.http.Http
import wvlet.airframe.http.client.SyncClient
import wvlet.airframe.http.client.{AsyncClient, SyncClient}
import wvlet.log.io.IOUtil

import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{Executors, TimeUnit}
import scala.concurrent.ExecutionContext

object NettyHttp {

Expand Down Expand Up @@ -133,27 +147,61 @@ object NettyHttp {
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
class NettyHttp {

import NettyHttp._

private val port = IOUtil.unusedPort
private val server = new Server(port)

private var client: SyncClient = _
private var client: SyncClient = _
private var asyncClient: AsyncClient = _

private val ec = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())

@Setup
def setup: Unit = {
server.start
client = Http.client.newSyncClient(s"localhost:${port}")
client = Http.client.noLogging.newSyncClient(s"localhost:${port}")
asyncClient = Http.client.noLogging.newAsyncClient(s"localhost:${port}")
}

@TearDown
def teardown: Unit = {
server.close
client.close()
asyncClient.close()
ec.shutdownNow()
}

@Benchmark
def rpcSync(blackhole: Blackhole): Unit = {
blackhole.consume(client.send(Http.POST("/").withJson("""{"name":"Netty"}""")))
}

@Benchmark
@OperationsPerInvocation(asyncIteration)
def rpcAsync(blackhole: Blackhole): Unit = {
val counter = new AtomicInteger(0)

blackhole.consume {
for (i <- 0 until HttpBenchmark.asyncIteration) {
val rx = asyncClient
.send(Http.POST("/").withJson("""{"name":"Netty"}"""))
.map { _ =>
counter.incrementAndGet()
}

ec.submit {
new Runnable {
override def run(): Unit = {
rx.run(_ => ())
}
}
}
}
while (counter.get() != HttpBenchmark.asyncIteration) {
Thread.sleep(0)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,11 @@ case class NettyServerConfig(
def withHttpLogger(loggerProvider: HttpLoggerConfig => HttpLogger): NettyServerConfig = {
this.copy(httpLoggerProvider = loggerProvider)
}
def noHttpLogger: NettyServerConfig = {
this.copy(httpLoggerProvider = HttpLogger.emptyLogger(_))
def noLogging: NettyServerConfig = {
this.copy(
loggingFilter = { _ => RxHttpFilter.identity },
httpLoggerProvider = HttpLogger.emptyLogger(_)
)
}

def newServer(session: Session): NettyServer = {
Expand Down

0 comments on commit 1871039

Please sign in to comment.