From 840fab05615b003e91a6c6d4383191053b2e0896 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Tue, 8 Oct 2024 14:28:49 -0600 Subject: [PATCH] Remove the second wrapper --- .../netty/RetryingHttpRequesterFilter.java | 57 ++++++------------- 1 file changed, 16 insertions(+), 41 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index 71b640490b..053adb3d73 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -21,8 +21,6 @@ import io.servicetalk.client.api.LoadBalancerReadyEvent; import io.servicetalk.client.api.NoAvailableHostException; import io.servicetalk.client.api.ServiceDiscoverer; -import io.servicetalk.concurrent.Cancellable; -import io.servicetalk.concurrent.CompletableSource; import io.servicetalk.concurrent.api.AsyncContext; import io.servicetalk.concurrent.api.BiIntFunction; import io.servicetalk.concurrent.api.Completable; @@ -31,6 +29,7 @@ import io.servicetalk.concurrent.api.RetryStrategies; import io.servicetalk.concurrent.api.Single; import io.servicetalk.context.api.ContextMap; +import io.servicetalk.http.api.DefaultHttpHeadersFactory; import io.servicetalk.http.api.FilterableReservedStreamingHttpConnection; import io.servicetalk.http.api.FilterableStreamingHttpClient; import io.servicetalk.http.api.HttpExecutionStrategies; @@ -45,6 +44,7 @@ import io.servicetalk.http.api.StreamingHttpRequest; import io.servicetalk.http.api.StreamingHttpRequester; import io.servicetalk.http.api.StreamingHttpResponse; +import io.servicetalk.http.api.StreamingHttpResponses; import io.servicetalk.transport.api.ExecutionContext; import io.servicetalk.transport.api.ExecutionStrategyInfluencer; import io.servicetalk.transport.api.RetryableException; @@ -57,6 +57,7 @@ import java.util.function.UnaryOperator; import javax.annotation.Nullable; +import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR; import static io.servicetalk.concurrent.api.Completable.completed; import static io.servicetalk.concurrent.api.Completable.failed; import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffDeltaJitter; @@ -199,9 +200,7 @@ public Completable apply(final int count, final Throwable t) { } // Unwrap a WrappedResponseException before asking the policy for a policy. - final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, - returnFailedResponses && t instanceof WrappedResponseException ? - ((WrappedResponseException) t).exception : t); + final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, t); if (backOffPolicy != NO_RETRIES) { final int offsetCount = count - lbNotReadyCount; Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t); @@ -216,8 +215,7 @@ public Completable apply(final int count, final Throwable t) { return failed(t); } - Completable applyRetryCallbacks(final Completable completable, final int retryCount, - final Throwable t) { + Completable applyRetryCallbacks(final Completable completable, final int retryCount, final Throwable t) { return retryCallbacks == null ? completable : completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t)); } @@ -266,16 +264,12 @@ protected Single request(final StreamingHttpRequester del if (responseMapper != null) { single = single.flatMap(resp -> { final HttpResponseException exception = responseMapper.apply(resp); - Single respSingle; - if (exception == null) { - respSingle = Single.succeeded(resp); - } else { - // Drain response payload body before packaging it - respSingle = resp.payloadBody().ignoreElements().onErrorComplete() - .concat(Single.failed( - returnFailedResponses ? new WrappedResponseException(resp, exception) : exception)); - } - return respSingle.shareContextOnSubscribe(); + return (exception != null ? + // Drain response payload body before discarding it: + resp.payloadBody().ignoreElements().onErrorComplete() + .concat(Single.failed(exception)) : + Single.succeeded(resp)) + .shareContextOnSubscribe(); }); } @@ -284,11 +278,11 @@ protected Single request(final StreamingHttpRequester del // duplicatedRequest isn't used below because retryWhen must be applied outside the defer operator for (2). single = single.retryWhen(retryStrategy(request, executionContext(), true)); if (returnFailedResponses) { - single = single.onErrorResume(WrappedResponseException.class, t -> Single.succeeded( - // The previous message was already drained but we can't just 'set' it because it then - // does a weird flow control thing. Therefore, we cheat by transforming in a way that - // simply discards the original. - t.response.transformMessageBody(ignored -> Publisher.empty()))); + single = single.onErrorResume(HttpResponseException.class, t -> { + HttpResponseMetaData metaData = t.metaData(); + return Single.succeeded(StreamingHttpResponses.newResponse(metaData.status(), metaData.version(), + metaData.headers(), DEFAULT_ALLOCATOR, DefaultHttpHeadersFactory.INSTANCE)); + }); } return single; } @@ -1083,23 +1077,4 @@ public RetryingHttpRequesterFilter build() { returnFailedResponses, maxTotalRetries, responseMapper, allPredicate, onRequestRetry); } } - - private static final class WrappedResponseException extends Exception { - - private static final long serialVersionUID = 3905983622734400759L; - - final StreamingHttpResponse response; - final HttpResponseException exception; - - WrappedResponseException(final StreamingHttpResponse response, final HttpResponseException exception) { - this.response = response; - this.exception = exception; - } - - @Override - public synchronized Throwable fillInStackTrace() { - // just a carrier, the stack traces are not important. - return this; - } - } }