diff --git a/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NetthRequestHandler.scala b/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NetthRequestHandler.scala index c62f66d259..48ee9c305a 100644 --- a/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NetthRequestHandler.scala +++ b/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NetthRequestHandler.scala @@ -17,6 +17,7 @@ import io.netty.buffer.Unpooled import io.netty.channel.{ChannelFutureListener, ChannelHandlerContext, SimpleChannelInboundHandler} import io.netty.handler.codec.http._ import wvlet.airframe.http.HttpMessage.{Request, Response} +import wvlet.airframe.http.internal.RPCResponseFilter import wvlet.airframe.http.{ Http, HttpHeader, @@ -88,35 +89,13 @@ class NetthRequestHandler(config: NettyServerConfig, dispatcher: NettyBackend.Fi val nettyResponse = toNettyResponse(v.asInstanceOf[Response]) writeResponse(msg, ctx, nettyResponse) case OnError(ex) => - val resp = ex match { - case ex: HttpServerException => - toNettyResponse(ex.toResponse) - case e: RPCException => - toNettyResponse(rpcExceptionResponse(e)) - case other => - val ex = RPCStatus.INTERNAL_ERROR_I0.newException(other.getMessage, other) - toNettyResponse(rpcExceptionResponse(ex)) - } - writeResponse(msg, ctx, resp) + val resp = RPCStatus.INTERNAL_ERROR_I0.newException(ex.getMessage, ex).toResponse + val nettyResponse = toNettyResponse(resp) + writeResponse(msg, ctx, nettyResponse) case OnCompletion => } } - private def rpcExceptionResponse(e: RPCException): Response = { - var resp = Http - .response(e.status.httpStatus) - .addHeader(HttpHeader.xAirframeRPCStatus, e.status.code.toString) - try { - // Embed RPCError into the response body - resp = resp.withJson(e.toJson) - } catch { - case ex: Throwable => - // Show warning - logger.warn(s"Failed to serialize RPCException: ${e}", ex) - } - resp - } - private def writeResponse(req: HttpRequest, ctx: ChannelHandlerContext, resp: DefaultHttpResponse): Unit = { val keepAlive = HttpStatus.ofCode(resp.status().code()).isSuccessful && HttpUtil.isKeepAlive(req) if (keepAlive) { diff --git a/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyServer.scala b/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyServer.scala index 2b42777812..a3c04f1be9 100644 --- a/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyServer.scala +++ b/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyServer.scala @@ -27,7 +27,7 @@ import wvlet.airframe.control.ThreadUtil import wvlet.airframe.http.HttpMessage.Response import wvlet.airframe.http.{HttpMessage, _} import wvlet.airframe.http.client.{AsyncClient, SyncClient} -import wvlet.airframe.http.internal.{HttpServerLoggingFilter, LogRotationHttpLogger} +import wvlet.airframe.http.internal.{RPCLoggingFilter, LogRotationHttpLogger, RPCResponseFilter} import wvlet.airframe.http.router.{ControllerProvider, HttpRequestDispatcher} import wvlet.airframe.rx.Rx import wvlet.airframe.{Design, Session} @@ -50,7 +50,7 @@ case class NettyServerConfig( httpLoggerProvider: HttpLoggerConfig => HttpLogger = { (config: HttpLoggerConfig) => new LogRotationHttpLogger(config) }, - loggingFilter: HttpLogger => RxHttpFilter = { new HttpServerLoggingFilter(_) } + loggingFilter: HttpLogger => RxHttpFilter = { new RPCLoggingFilter(_) } ) { lazy val port = serverPort.getOrElse(IOUtil.unusedPort) @@ -204,7 +204,11 @@ class NettyServer(config: NettyServerConfig, session: Session) extends AutoClose private val dispatcher = { NettyBackend - .rxFilterAdapter(attachContextFilter.andThen(loggingFilter)) + .rxFilterAdapter( + attachContextFilter + .andThen(loggingFilter) + .andThen(RPCResponseFilter) + ) .andThen( HttpRequestDispatcher.newDispatcher( session = session, diff --git a/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyRxFilterTest.scala b/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyRxFilterTest.scala index e5326f3f26..7eca05d3c1 100644 --- a/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyRxFilterTest.scala +++ b/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyRxFilterTest.scala @@ -14,7 +14,17 @@ package wvlet.airframe.http.netty import wvlet.airframe.http.client.SyncClient -import wvlet.airframe.http.{Http, HttpMessage, RPC, RPCException, RPCStatus, RxHttpEndpoint, RxHttpFilter, RxRouter} +import wvlet.airframe.http.{ + Http, + HttpClientException, + HttpMessage, + RPC, + RPCException, + RPCStatus, + RxHttpEndpoint, + RxHttpFilter, + RxRouter +} import wvlet.airframe.rx.Rx import wvlet.airspec.AirSpec @@ -50,13 +60,15 @@ object NettyRxFilterTest extends AirSpec { test("Run server with auth filter", design = _.add(Netty.server.withRouter(router1).designWithSyncClient)) { (client: SyncClient) => test("when no auth header") { - val ex = intercept[RPCException] { + val e = intercept[HttpClientException] { client.send( Http.POST("/wvlet.airframe.http.netty.NettyRxFilterTest.MyRPC/hello").withJson("""{"msg":"Netty"}""") ) } - ex.status shouldBe RPCStatus.UNAUTHENTICATED_U13 - ex.message shouldBe "authentication failed" + e.getCause shouldMatch { case ex: RPCException => + ex.status shouldBe RPCStatus.UNAUTHENTICATED_U13 + ex.message shouldBe "authentication failed" + } } test("with auth header") { @@ -72,12 +84,14 @@ object NettyRxFilterTest extends AirSpec { test("throw RPCException in a filter", design = _.add(Netty.server.withRouter(router2).designWithSyncClient)) { (client: SyncClient) => - val ex = intercept[RPCException] { + val e = intercept[HttpClientException] { client.send( Http.POST("/wvlet.airframe.http.netty.NettyRxFilterTest.MyRPC/hello").withJson("""{"msg":"Netty"}""") ) } - ex.status shouldBe RPCStatus.UNAUTHENTICATED_U13 - ex.message shouldBe "authentication failed" + e.getCause shouldMatch { case ex: RPCException => + ex.status shouldBe RPCStatus.UNAUTHENTICATED_U13 + ex.message shouldBe "authentication failed" + } } } diff --git a/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyRxRPCServerTest.scala b/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyRxRPCServerTest.scala index b4fcb1cb40..02eede5fce 100644 --- a/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyRxRPCServerTest.scala +++ b/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyRxRPCServerTest.scala @@ -14,7 +14,7 @@ package wvlet.airframe.http.netty import wvlet.airframe.Design -import wvlet.airframe.http.{Http, RPC, RxRouter} +import wvlet.airframe.http.{Http, HttpHeader, RPC, RPCStatus, RxRouter} import wvlet.airframe.http.client.SyncClient import wvlet.airspec.AirSpec @@ -39,10 +39,12 @@ class NettyRxRPCServerTest extends AirSpec { Http.POST("/wvlet.airframe.http.netty.NettyRxRPCServerTest.MyRPC/helloNetty").withJson("""{"msg":"Netty"}""") ) resp.message.toContentString shouldBe "Hello Netty!" + resp.getHeader(HttpHeader.xAirframeRPCStatus) shouldBe Some(RPCStatus.SUCCESS_S0.code.toString) val resp2 = client.send( Http.POST("/wvlet.airframe.http.netty.NettyRxRPCServerTest.MyRPC/helloNetty2").withJson("""{"msg":"Netty"}""") ) resp2.message.toContentString shouldBe "Hello Netty2!" + resp2.getHeader(HttpHeader.xAirframeRPCStatus) shouldBe Some(RPCStatus.SUCCESS_S0.code.toString) } } diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/HttpClientException.scala b/airframe-http/src/main/scala/wvlet/airframe/http/HttpClientException.scala index 83c2ea5a74..2a607cc08b 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/HttpClientException.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/HttpClientException.scala @@ -67,7 +67,7 @@ object HttpClientException extends LogSupport { val status = adapter.statusOf(response) val isRPCException: Boolean = adapter.headerOf(response).get(HttpHeader.xAirframeRPCStatus).isDefined if (isRPCException) { - val cause = HttpClients.parseRPCException(adapter.httpResponseOf(response)) + val cause = RPCException.fromResponse(adapter.httpResponseOf(response)) new HttpClientException(adapter.wrap(response), status, cause) } else { val content = adapter.contentStringOf(response) diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/HttpMessage.scala b/airframe-http/src/main/scala/wvlet/airframe/http/HttpMessage.scala index 72ef225567..9cf15a679c 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/HttpMessage.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/HttpMessage.scala @@ -164,11 +164,13 @@ object HttpMessage { } case class StringMessage(content: String) extends Message { + override def isEmpty: Boolean = content.isEmpty override def toString: String = content override def toContentString: String = content override def toContentBytes: Array[Byte] = content.getBytes(StandardCharsets.UTF_8) } case class ByteArrayMessage(content: Array[Byte]) extends Message { + override def isEmpty: Boolean = content.isEmpty override def toString: String = toContentString override def toContentString: String = { new String(content, StandardCharsets.UTF_8) @@ -179,6 +181,7 @@ object HttpMessage { class LazyByteArrayMessage(contentReader: => Array[Byte]) extends Message { // Use lazy evaluation of content body to avoid unnecessary data copy private lazy val content: Array[Byte] = contentReader + override def isEmpty: Boolean = content.isEmpty override def toString: String = toContentString override def toContentString: String = { new String(content, StandardCharsets.UTF_8) diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/RPCException.scala b/airframe-http/src/main/scala/wvlet/airframe/http/RPCException.scala index 848107a0d3..728df4c84f 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/RPCException.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/RPCException.scala @@ -14,9 +14,14 @@ package wvlet.airframe.http import wvlet.airframe.codec.{GenericException, GenericStackTraceElement, MessageCodec} +import wvlet.airframe.http.HttpMessage.Response import wvlet.airframe.http.RPCException.rpcErrorMessageCodec +import wvlet.airframe.http.internal.HttpResponseBodyCodec import wvlet.airframe.json.Json import wvlet.airframe.msgpack.spi.MsgPack +import wvlet.log.LogSupport + +import scala.util.Try /** * RPCException provides a backend-independent (e.g., Finagle or gRPC) RPC error reporting mechanism. Create this @@ -36,7 +41,8 @@ case class RPCException( appErrorCode: Option[Int] = None, // [optional] Application-specific metadata metadata: Map[String, Any] = Map.empty -) extends Exception(s"[${status}] ${message}", cause.getOrElse(null)) { +) extends Exception(s"[${status}] ${message}", cause.getOrElse(null)) + with LogSupport { private var _includeStackTrace: Option[Boolean] = None @@ -75,6 +81,25 @@ case class RPCException( def toMsgPack: MsgPack = { rpcErrorMessageCodec.toMsgPack(toMessage) } + + /** + * Convert this exception to an HTTP response + */ + def toResponse: HttpMessage.Response = { + var resp = Http + .response(status.httpStatus) + .addHeader(HttpHeader.xAirframeRPCStatus, status.code.toString) + + try { + // Embed RPCError into the response body + resp = resp.withJson(toJson) + } catch { + case ex: Throwable => + // Show warning + warn(s"Failed to serialize RPCException: ${this}", ex) + } + resp + } } /** @@ -121,4 +146,28 @@ object RPCException { val m = rpcErrorMessageCodec.fromMsgPack(msgpack) fromRPCErrorMessage(m) } + + def fromResponse(response: HttpMessage.Response): RPCException = { + val responseBodyCodec = new HttpResponseBodyCodec[Response] + + response + .getHeader(HttpHeader.xAirframeRPCStatus) + .flatMap(x => Try(x.toInt).toOption) match { + case Some(rpcStatus) => + try { + if (response.message.isEmpty) { + val status = RPCStatus.ofCode(rpcStatus) + status.newException(status.name) + } else { + val msgpack = responseBodyCodec.toMsgPack(response) + RPCException.fromMsgPack(msgpack) + } + } catch { + case e: Throwable => + RPCStatus.ofCode(rpcStatus).newException(s"Failed to parse the RPC error details: ${e.getMessage}", e) + } + case None => + RPCStatus.DATA_LOSS_I8.newException(s"Invalid RPC response: ${response}") + } + } } diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClients.scala b/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClients.scala index 62bf32e179..7b6495dacd 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClients.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClients.scala @@ -135,6 +135,9 @@ trait SyncClient extends SyncClientCompat with HttpClientFactory[SyncClient] wit * @param request * @tparam Req * @return + * + * @throws RPCException + * when RPC request fails */ def rpc[Req, Resp](method: RPCMethod, requestContent: Req): Resp = { val request: Request = @@ -154,7 +157,7 @@ trait SyncClient extends SyncClientCompat with HttpClientFactory[SyncClient] wit ret.asInstanceOf[Resp] } else { // Parse the RPC error message - throw HttpClients.parseRPCException(response) + throw RPCException.fromResponse(response) } } } @@ -240,6 +243,16 @@ trait AsyncClient extends AsyncClientCompat with HttpClientFactory[AsyncClient] } } + /** + * @param method + * @param requestContent + * @tparam Req + * @tparam Resp + * @return + * + * @throws RPCException + * when RPC request fails + */ def rpc[Req, Resp]( method: RPCMethod, requestContent: Req @@ -258,7 +271,7 @@ trait AsyncClient extends AsyncClientCompat with HttpClientFactory[AsyncClient] val ret = HttpClients.parseRPCResponse(config, response, method.responseSurface) ret.asInstanceOf[Resp] } else { - throw HttpClients.parseRPCException(response) + throw RPCException.fromResponse(response) } } } @@ -276,7 +289,8 @@ object HttpClients extends LogSupport { resp.getHeader(HttpHeader.xAirframeRPCStatus) match { case Some(status) => // Throw RPCException if RPCStatus code is given - throw parseRPCException(e.response.toHttpResponse) + val ex = RPCException.fromResponse(e.response.toHttpResponse) + throw new HttpClientException(resp, ex.status.httpStatus, ex.message, ex) case None => // Throw as is for known client exception throw e @@ -417,21 +431,4 @@ object HttpClients extends LogSupport { } } - private[http] def parseRPCException(response: Response): RPCException = { - response - .getHeader(HttpHeader.xAirframeRPCStatus) - .flatMap(x => Try(x.toInt).toOption) match { - case Some(rpcStatus) => - try { - val msgpack = responseBodyCodec.toMsgPack(response) - RPCException.fromMsgPack(msgpack) - } catch { - case e: Throwable => - RPCStatus.ofCode(rpcStatus).newException(s"Failed to parse the RPC error details: ${e.getMessage}", e) - } - case None => - RPCStatus.DATA_LOSS_I8.newException(s"Invalid RPC response: ${response}") - } - } - } diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/internal/HttpServerLoggingFilter.scala b/airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCLoggingFilter.scala similarity index 87% rename from airframe-http/src/main/scala/wvlet/airframe/http/internal/HttpServerLoggingFilter.scala rename to airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCLoggingFilter.scala index 35338a6fd5..e7bcfe1660 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/internal/HttpServerLoggingFilter.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCLoggingFilter.scala @@ -17,7 +17,10 @@ import wvlet.airframe.http.{HttpLogger, HttpMessage, HttpMultiMap, RPCContext, R import wvlet.airframe.rx.Rx import wvlet.log.LogSupport -class HttpServerLoggingFilter(httpLogger: HttpLogger) extends RxHttpFilter with LogSupport { +/** + * Report HTTP/RPC request/response logs to the given logger + */ +class RPCLoggingFilter(httpLogger: HttpLogger) extends RxHttpFilter with LogSupport { private val excludeHeaders = HttpMultiMap.fromHeaderNames(httpLogger.config.excludeHeaders) override def apply(request: HttpMessage.Request, next: RxHttpEndpoint): Rx[HttpMessage.Response] = { diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCResponseFilter.scala b/airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCResponseFilter.scala new file mode 100644 index 0000000000..c66f26d3c4 --- /dev/null +++ b/airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCResponseFilter.scala @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package wvlet.airframe.http.internal + +import wvlet.airframe.http.{ + Http, + HttpHeader, + HttpMessage, + HttpServerException, + HttpStatus, + RPCException, + RPCStatus, + RxHttpEndpoint, + RxHttpFilter +} +import wvlet.airframe.rx.Rx +import wvlet.log.LogSupport + +import scala.util.{Failure, Success} + +/** + * Add RPCStatus to the response header and embed the error message to the request body + */ +object RPCResponseFilter extends RxHttpFilter with LogSupport { + override def apply(request: HttpMessage.Request, next: RxHttpEndpoint): Rx[HttpMessage.Response] = { + next(request) + .transform { + case Success(resp) => + setRPCStatus(resp) + case Failure(e) => + e match { + case ex: HttpServerException => + val re = RPCStatus.fromHttpStatus(ex.status).newException(ex.getMessage, ex.getCause) + re.toResponse + case ex: RPCException => + ex.toResponse + case other => + RPCStatus.INTERNAL_ERROR_I0.newException(other.getMessage, other).toResponse + } + } + } + + private def setRPCStatus(resp: HttpMessage.Response): HttpMessage.Response = { + resp.getHeader(HttpHeader.xAirframeRPCStatus) match { + case Some(status) => + resp + case None => + val status = RPCStatus.fromHttpStatus(resp.status) + resp.addHeader(HttpHeader.xAirframeRPCStatus, status.code.toString) + } + } +}