diff --git a/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyServer.scala b/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyServer.scala index a4b6aa18e6..a764171cdd 100644 --- a/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyServer.scala +++ b/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyServer.scala @@ -25,18 +25,18 @@ import io.netty.handler.stream.ChunkedWriteHandler import wvlet.airframe.codec.{MessageCodec, MessageCodecFactory} import wvlet.airframe.control.ThreadUtil import wvlet.airframe.http.HttpMessage.Response -import wvlet.airframe.http.{HttpMessage, *} import wvlet.airframe.http.client.{AsyncClient, SyncClient} -import wvlet.airframe.http.internal.{LogRotationHttpLogger, RPCLoggingFilter, RPCResponseFilter} +import wvlet.airframe.http.internal.{LogRotationHttpLogger, RPCResponseFilter} import wvlet.airframe.http.router.{ControllerProvider, HttpRequestDispatcher} +import wvlet.airframe.http.{HttpMessage, *} import wvlet.airframe.rx.Rx import wvlet.airframe.surface.Surface import wvlet.airframe.{Design, Session} import wvlet.log.LogSupport import wvlet.log.io.IOUtil -import java.util.concurrent.{Executors, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.{Executors, TimeUnit} import javax.annotation.PostConstruct import scala.collection.immutable.ListMap import scala.concurrent.ExecutionContext @@ -52,7 +52,6 @@ case class NettyServerConfig( httpLoggerProvider: HttpLoggerConfig => HttpLogger = { (config: HttpLoggerConfig) => new LogRotationHttpLogger(config) }, - loggingFilter: HttpLogger => RxHttpFilter = { new RPCLoggingFilter(_) }, customCodec: PartialFunction[Surface, MessageCodec[_]] = PartialFunction.empty, // Thread manager for handling Future[_] responses executionContext: ExecutionContext = { @@ -86,7 +85,6 @@ case class NettyServerConfig( } def noLogging: NettyServerConfig = { this.copy( - loggingFilter = { _ => RxHttpFilter.identity }, httpLoggerProvider = HttpLogger.emptyLogger(_) ) } @@ -138,8 +136,8 @@ case class NettyServerConfig( class NettyServer(config: NettyServerConfig, session: Session) extends HttpServer with LogSupport { - private val httpLogger: HttpLogger = config.newHttpLogger - private val loggingFilter: RxHttpFilter = config.loggingFilter(httpLogger) + private val httpLogger: HttpLogger = config.newHttpLogger + private val rpcFilter: RxHttpFilter = new RPCResponseFilter(httpLogger) private val bossGroup = { val tf = ThreadUtil.newDaemonThreadFactory("airframe-netty-boss") @@ -235,8 +233,7 @@ class NettyServer(config: NettyServerConfig, session: Session) extends HttpServe NettyBackend .rxFilterAdapter( attachContextFilter - .andThen(loggingFilter) - .andThen(RPCResponseFilter) + .andThen(rpcFilter) ) .andThen( HttpRequestDispatcher.newDispatcher( diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/HttpLogger.scala b/airframe-http/src/main/scala/wvlet/airframe/http/HttpLogger.scala index 033c656a10..3716243bf6 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/HttpLogger.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/HttpLogger.scala @@ -21,6 +21,9 @@ import wvlet.log.LogSupport * Interface for writing HTTP request/response logs */ trait HttpLogger extends AutoCloseable { + // Headers to exclude from the logs + val excludeHeaders: HttpMultiMap = HttpMultiMap.fromHeaderNames(config.excludeHeaders) + def config: HttpLoggerConfig final def write(log: Map[String, Any]): Unit = { diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/RPCStatus.scala b/airframe-http/src/main/scala/wvlet/airframe/http/RPCStatus.scala index 77e8ccd4b9..7dcb0536de 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/RPCStatus.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/RPCStatus.scala @@ -439,6 +439,7 @@ sealed abstract class RPCStatus( def shouldReportStackTrace: Boolean = { this match { + // Do not report stack traces for non-authorized user requests by default case UNAUTHENTICATED_U13 | PERMISSION_DENIED_U14 => false case _ => true } diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClientLoggingFilter.scala b/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClientLoggingFilter.scala index a481b7c6a1..5a8feb1ae3 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClientLoggingFilter.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClientLoggingFilter.scala @@ -18,20 +18,27 @@ import wvlet.airframe.http.{HttpLogger, HttpLoggerConfig, HttpMultiMap, RxHttpEn import wvlet.airframe.rx.Rx import wvlet.log.LogSupport +import scala.util.{Failure, Success} + /** * A client-side filter for logging HTTP requests and responses */ class HttpClientLoggingFilter(httpLogger: HttpLogger) extends HttpClientFilter with AutoCloseable with LogSupport { - - private val excludeHeaders = HttpMultiMap.fromHeaderNames(httpLogger.config.excludeHeaders) - override def close(): Unit = { httpLogger.close() } def apply(context: HttpClientContext): RxHttpFilter = new RxHttpFilter { override def apply(request: Request, next: RxHttpEndpoint): Rx[Response] = { - HttpLogs.reportLog(httpLogger, excludeHeaders, request, next, Some(context)) + val logContext = new HttpLogs.LogContext(request, httpLogger, Some(context), None) + next(request) + .tap { resp => + // TODO Record exceptions returned from the server + logContext.logResponse(resp, None) + } + .tapOnFailure { ex => + logContext.logError(ex) + } } } } diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/internal/HttpLogs.scala b/airframe-http/src/main/scala/wvlet/airframe/http/internal/HttpLogs.scala index bab8bd0997..2310b74594 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/internal/HttpLogs.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/internal/HttpLogs.scala @@ -19,69 +19,68 @@ import wvlet.airframe.http.client.HttpClientContext import wvlet.airframe.rx.Rx import wvlet.airframe.surface.{Parameter, Surface, TypeName} import wvlet.airframe.ulid.ULID -import wvlet.log.LogTimestampFormatter +import wvlet.log.{LogSupport, LogTimestampFormatter} import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import scala.annotation.tailrec import scala.collection.immutable.ListMap import scala.concurrent.ExecutionException -import scala.util.Try +import scala.util.{Failure, Success, Try} /** * Internal utilities for HTTP request/response logging */ -object HttpLogs { +object HttpLogs extends LogSupport { - def reportLog( - httpLogger: HttpLogger, - excludeHeaders: HttpMultiMap, + class LogContext( request: Request, - next: RxHttpEndpoint, - clientContext: Option[HttpClientContext] = None, - rpcContext: Option[RPCContext] = None - ): Rx[Response] = { - val baseTime = System.currentTimeMillis() - val start = System.nanoTime() - val m = ListMap.newBuilder[String, Any] - m ++= unixTimeLogs(baseTime) - m ++= commonRequestLogs(request) - m ++= requestHeaderLogs(request, excludeHeaders) - - def reportLogs: Unit = { - // Finally, write the log - httpLogger.write(httpLogger.config.logFilter(m.result())) - } + httpLogger: HttpLogger, + clientContext: Option[HttpClientContext], + rpcContext: Option[RPCContext] + ) { + private val baseTime = System.currentTimeMillis() + private val start = System.nanoTime() + private val m = ListMap.newBuilder[String, Any] - def rpcCallLogs(): Unit = { - // RPC call context will be set to a TLS after dispatching an event - // TODO Pass RPC call context without using TLS + init() + + private def init(): Unit = { + m ++= unixTimeLogs(baseTime) + m ++= commonRequestLogs(request) + m ++= requestHeaderLogs(request, httpLogger.excludeHeaders) + + // Log RPC context in the client side + clientContext.foreach { + _.rpcMethod.map { rpc => m ++= rpcMethodLogs(rpc) } + } + // Log RPC context in the server side rpcContext.flatMap(_.rpcCallContext).foreach { rcc => m ++= rpcLogs(rcc) } } - clientContext.foreach { - _.rpcMethod.map { rpc => m ++= rpcMethodLogs(rpc) } + private def logResponse(): Unit = { + m ++= durationLogs(baseTime, start) } - next - .apply(request) - .toRx - .map { resp => - m ++= durationLogs(baseTime, start) - rpcCallLogs() - m ++= commonResponseLogs(resp) - m ++= responseHeaderLogs(resp, excludeHeaders) - reportLogs - resp - } - .recoverWith { case e: Throwable => - m ++= durationLogs(baseTime, start) - rpcCallLogs() - m ++= errorLogs(e) - reportLogs - Rx.exception(e) - } + def logResponse(response: Response, exception: Option[Throwable]): Response = { + logResponse() + m ++= commonResponseLogs(response) + m ++= responseHeaderLogs(response, httpLogger.excludeHeaders) + exception.foreach(ex => m ++= errorLogs(ex)) + + // Write the log + httpLogger.write(httpLogger.config.logFilter(m.result())) + response + } + + def logError(e: Throwable): Unit = { + logResponse() + m ++= errorLogs(e) + + // Write the log + httpLogger.write(httpLogger.config.logFilter(m.result())) + } } def durationLogs(baseTime: Long, sinceNano: Long): ListMap[String, Any] = { diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCCallContext.scala b/airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCCallContext.scala index 0f212c94c4..13f0cab32d 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCCallContext.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCCallContext.scala @@ -24,6 +24,6 @@ case class RPCCallContext( // The full class name of the RPC implementation class def rpcClassName: String = TypeName.sanitizeTypeName(rpcMethodSurface.owner.fullName) def rpcInterfaceName: String = rpcMethod.rpcInterfaceName - def rpcMethodName: String = rpcMethodSurface.name + def rpcMethodName: String = rpcMethod.methodName def withRPCArgs(rpcArgs: Seq[Any]): RPCCallContext = this.copy(rpcArgs = rpcArgs) } diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCLoggingFilter.scala b/airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCLoggingFilter.scala deleted file mode 100644 index e7bcfe1660..0000000000 --- a/airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCLoggingFilter.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package wvlet.airframe.http.internal - -import wvlet.airframe.http.{HttpLogger, HttpMessage, HttpMultiMap, RPCContext, RxHttpEndpoint, RxHttpFilter} -import wvlet.airframe.rx.Rx -import wvlet.log.LogSupport - -/** - * Report HTTP/RPC request/response logs to the given logger - */ -class RPCLoggingFilter(httpLogger: HttpLogger) extends RxHttpFilter with LogSupport { - private val excludeHeaders = HttpMultiMap.fromHeaderNames(httpLogger.config.excludeHeaders) - - override def apply(request: HttpMessage.Request, next: RxHttpEndpoint): Rx[HttpMessage.Response] = { - val rpcContext = RPCContext.current - HttpLogs.reportLog(httpLogger, excludeHeaders, request, next, None, Some(rpcContext)) - } -} diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCResponseFilter.scala b/airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCResponseFilter.scala index c66f26d3c4..f94e3b3776 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCResponseFilter.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCResponseFilter.scala @@ -16,9 +16,12 @@ package wvlet.airframe.http.internal import wvlet.airframe.http.{ Http, HttpHeader, + HttpLogger, HttpMessage, + HttpMultiMap, HttpServerException, HttpStatus, + RPCContext, RPCException, RPCStatus, RxHttpEndpoint, @@ -30,23 +33,28 @@ import wvlet.log.LogSupport import scala.util.{Failure, Success} /** - * Add RPCStatus to the response header and embed the error message to the request body + * A filter for managing RPC status header, logs, and errors. Exception messages will be embedded to the response body. */ -object RPCResponseFilter extends RxHttpFilter with LogSupport { +class RPCResponseFilter(httpLogger: HttpLogger) extends RxHttpFilter with LogSupport { override def apply(request: HttpMessage.Request, next: RxHttpEndpoint): Rx[HttpMessage.Response] = { + + val logContext = new HttpLogs.LogContext(request, httpLogger, None, Some(RPCContext.current)) + next(request) .transform { case Success(resp) => - setRPCStatus(resp) + logContext.logResponse(setRPCStatus(resp), None) case Failure(e) => e match { case ex: HttpServerException => val re = RPCStatus.fromHttpStatus(ex.status).newException(ex.getMessage, ex.getCause) - re.toResponse + logContext.logResponse(re.toResponse, Some(re)) case ex: RPCException => - ex.toResponse + logContext.logResponse(ex.toResponse, Some(ex)) case other => - RPCStatus.INTERNAL_ERROR_I0.newException(other.getMessage, other).toResponse + val ex = RPCStatus.INTERNAL_ERROR_I0.newException(other.getMessage, other) + // Report the original error to the log + logContext.logResponse(ex.toResponse, Some(other)) } } }