Skip to content

Commit

Permalink
netty (fix): Fix a hanging inssue when returning async responses (#3833)
Browse files Browse the repository at this point in the history
  • Loading branch information
xerial authored Feb 13, 2025
1 parent fb13df8 commit d5a6cb3
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import wvlet.airframe.http.{
ServerAddress,
ServerSentEvent
}
import wvlet.airframe.rx.{OnCompletion, OnError, OnNext, Rx, RxRunner}
import wvlet.airframe.rx.{Cancelable, OnCompletion, OnError, OnNext, Rx, RxRunner}
import wvlet.log.LogSupport

import java.net.InetSocketAddress
import scala.jdk.CollectionConverters.*
import NettyRequestHandler.*
import NettyRequestHandler.toNettyResponse

import java.io.ByteArrayOutputStream

Expand Down Expand Up @@ -152,15 +152,15 @@ class NettyRequestHandler(config: NettyServerConfig, dispatcher: NettyBackend.Fi
} else {
resp.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
}
val f = ctx.write(resp)
val f = ctx.writeAndFlush(resp)
if (!keepAlive) {
f.addListener(ChannelFutureListener.CLOSE)
}
}

}

object NettyRequestHandler {
object NettyRequestHandler extends LogSupport {
def toNettyResponse(response: Response): DefaultHttpResponse = {
val r = if (response.isContentTypeEventStream && response.message.isEmpty) {
val res = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(response.statusCode))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,24 @@
*/
package wvlet.airframe.http.netty

import wvlet.airframe.http.client.SyncClient
import wvlet.airframe.http.client.{AsyncClient, SyncClient}
import wvlet.airframe.http.{Http, RPC, RxRouter}
import wvlet.airframe.rx.Rx
import wvlet.airspec.AirSpec

import java.util.concurrent.TimeUnit

object NettyRxResponseTest extends AirSpec {

@RPC
class RxApi {
def helloRx(message: String): Rx[String] = {
Rx.single(s"Hello ${message}!")
}

def helloAsyncRx(message: String): Rx[String] = {
Rx.delay(100, TimeUnit.MILLISECONDS).map(_ => s"Hello ${message}!")
}
}

initDesign {
Expand All @@ -33,6 +39,9 @@ object NettyRxResponseTest extends AirSpec {
.withRouter(RxRouter.of[RxApi])
.designWithSyncClient
)
.bind[AsyncClient].toProvider { (server: NettyServer) =>
Http.client.newAsyncClient(server.localAddress)
}
}

test("hello rx") { (client: SyncClient) =>
Expand All @@ -46,4 +55,18 @@ object NettyRxResponseTest extends AirSpec {
resp.contentString shouldBe "Hello Rx!"
}

test("hello rx async") { (client: AsyncClient) =>
client
.withRetryContext(_.noRetry)
.send(
Http
.POST("/wvlet.airframe.http.netty.NettyRxResponseTest.RxApi/helloAsyncRx")
.withJson("""{"message":"Rx"}""")
).map { resp =>
debug(resp)
resp.statusCode shouldBe 200
resp.contentString shouldBe "Hello Rx!"
}
}

}

0 comments on commit d5a6cb3

Please sign in to comment.