Skip to content

Commit

Permalink
Remove the second wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
bryce-anderson committed Oct 8, 2024
1 parent 3493dc4 commit e80e98e
Showing 1 changed file with 16 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import io.servicetalk.client.api.LoadBalancerReadyEvent;
import io.servicetalk.client.api.NoAvailableHostException;
import io.servicetalk.client.api.ServiceDiscoverer;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.BiIntFunction;
import io.servicetalk.concurrent.api.Completable;
Expand All @@ -31,6 +29,7 @@
import io.servicetalk.concurrent.api.RetryStrategies;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.DefaultHttpHeadersFactory;
import io.servicetalk.http.api.FilterableReservedStreamingHttpConnection;
import io.servicetalk.http.api.FilterableStreamingHttpClient;
import io.servicetalk.http.api.HttpExecutionStrategies;
Expand All @@ -45,6 +44,7 @@
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequester;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponses;
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.api.ExecutionStrategyInfluencer;
import io.servicetalk.transport.api.RetryableException;
Expand All @@ -57,6 +57,7 @@
import java.util.function.UnaryOperator;
import javax.annotation.Nullable;

import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR;
import static io.servicetalk.concurrent.api.Completable.completed;
import static io.servicetalk.concurrent.api.Completable.failed;
import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffDeltaJitter;
Expand Down Expand Up @@ -198,10 +199,7 @@ public Completable apply(final int count, final Throwable t) {
sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus), count, t);
}

// Unwrap a WrappedResponseException before asking the policy for a policy.
final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData,
returnFailedResponses && t instanceof WrappedResponseException ?
((WrappedResponseException) t).exception : t);
final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, t);
if (backOffPolicy != NO_RETRIES) {
final int offsetCount = count - lbNotReadyCount;
Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t);
Expand All @@ -216,8 +214,7 @@ public Completable apply(final int count, final Throwable t) {
return failed(t);
}

Completable applyRetryCallbacks(final Completable completable, final int retryCount,
final Throwable t) {
Completable applyRetryCallbacks(final Completable completable, final int retryCount, final Throwable t) {
return retryCallbacks == null ? completable :
completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t));
}
Expand Down Expand Up @@ -266,16 +263,12 @@ protected Single<StreamingHttpResponse> request(final StreamingHttpRequester del
if (responseMapper != null) {
single = single.flatMap(resp -> {
final HttpResponseException exception = responseMapper.apply(resp);
Single<StreamingHttpResponse> respSingle;
if (exception == null) {
respSingle = Single.succeeded(resp);
} else {
// Drain response payload body before packaging it
respSingle = resp.payloadBody().ignoreElements().onErrorComplete()
.concat(Single.<StreamingHttpResponse>failed(
returnFailedResponses ? new WrappedResponseException(resp, exception) : exception));
}
return respSingle.shareContextOnSubscribe();
return (exception != null ?
// Drain response payload body before discarding it:
resp.payloadBody().ignoreElements().onErrorComplete()
.concat(Single.<StreamingHttpResponse>failed(exception)) :
Single.succeeded(resp))
.shareContextOnSubscribe();
});
}

Expand All @@ -284,11 +277,11 @@ protected Single<StreamingHttpResponse> request(final StreamingHttpRequester del
// duplicatedRequest isn't used below because retryWhen must be applied outside the defer operator for (2).
single = single.retryWhen(retryStrategy(request, executionContext(), true));
if (returnFailedResponses) {
single = single.onErrorResume(WrappedResponseException.class, t -> Single.succeeded(
// The previous message was already drained but we can't just 'set' it because it then
// does a weird flow control thing. Therefore, we cheat by transforming in a way that
// simply discards the original.
t.response.transformMessageBody(ignored -> Publisher.empty())));
single = single.onErrorResume(HttpResponseException.class, t -> {
HttpResponseMetaData metaData = t.metaData();
return Single.succeeded(StreamingHttpResponses.newResponse(metaData.status(), metaData.version(),
metaData.headers(), DEFAULT_ALLOCATOR, DefaultHttpHeadersFactory.INSTANCE));
});
}
return single;
}
Expand Down Expand Up @@ -1083,23 +1076,4 @@ public RetryingHttpRequesterFilter build() {
returnFailedResponses, maxTotalRetries, responseMapper, allPredicate, onRequestRetry);
}
}

private static final class WrappedResponseException extends Exception {

private static final long serialVersionUID = 3905983622734400759L;

final StreamingHttpResponse response;
final HttpResponseException exception;

WrappedResponseException(final StreamingHttpResponse response, final HttpResponseException exception) {
this.response = response;
this.exception = exception;
}

@Override
public synchronized Throwable fillInStackTrace() {
// just a carrier, the stack traces are not important.
return this;
}
}
}

0 comments on commit e80e98e

Please sign in to comment.