Skip to content

Commit

Permalink
Some of idels feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
bryce-anderson committed Oct 8, 2024
1 parent 4b4b60c commit 3493dc4
Showing 1 changed file with 20 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,14 @@ public Completable apply(final int count, final Throwable t) {
!(lbEvent instanceof LoadBalancerReadyEvent &&
((LoadBalancerReadyEvent) lbEvent).isReady()))
.ignoreElements();
return applyRetryCallbacksAndDraining(
return applyRetryCallbacks(
sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus), count, t);
}

// Unwrap a WrappedResponseException before asking the policy for a policy.
final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData,
t instanceof WrappedResponseException ? ((WrappedResponseException) t).exception : t);
returnFailedResponses && t instanceof WrappedResponseException ?
((WrappedResponseException) t).exception : t);
if (backOffPolicy != NO_RETRIES) {
final int offsetCount = count - lbNotReadyCount;
Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t);
Expand All @@ -209,46 +210,16 @@ public Completable apply(final int count, final Throwable t) {
retryWhen = retryWhen.concat(executor.timer(constant));
}

return applyRetryCallbacksAndDraining(retryWhen, count, t);
return applyRetryCallbacks(retryWhen, count, t);
}

return failed(t);
}

Completable applyRetryCallbacksAndDraining(final Completable completable, final int retryCount,
final Throwable t) {
if (retryCallbacks == null && !(t instanceof WrappedResponseException)) {
// No wrap necessary.
return completable;
}
return completable.liftSync(subscriber -> new CompletableSource.Subscriber() {
@Override
public void onSubscribe(Cancellable cancellable) {
subscriber.onSubscribe(cancellable);
}

@Override
public void onComplete() {
try {
if (retryCallbacks != null) {
retryCallbacks.beforeRetry(retryCount, requestMetaData, t);
}
if (t instanceof WrappedResponseException) {
drainResponse(((WrappedResponseException) t).response).subscribe();
}
} finally {
subscriber.onComplete();
}
}

@Override
public void onError(Throwable tt) {
// If we're retrying due to a wrapped response it's because the users want the actual response,
// not an exception. Therefore, we return the wrapped response and let it get unwrapped at the
// end of the retry pipeline.
subscriber.onError(t instanceof WrappedResponseException ? t : tt);
}
});
Completable applyRetryCallbacks(final Completable completable, final int retryCount,
final Throwable t) {
return retryCallbacks == null ? completable :
completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t));
}
}

Expand Down Expand Up @@ -295,14 +266,16 @@ protected Single<StreamingHttpResponse> request(final StreamingHttpRequester del
if (responseMapper != null) {
single = single.flatMap(resp -> {
final HttpResponseException exception = responseMapper.apply(resp);
Single<StreamingHttpResponse> respSingle;
if (exception == null) {
return Single.succeeded(resp).shareContextOnSubscribe();
}
if (returnFailedResponses) {
return Single.failed(new WrappedResponseException(resp, exception));
respSingle = Single.succeeded(resp);
} else {
return drainResponse(resp).concat(Single.<StreamingHttpResponse>failed(exception));
// Drain response payload body before packaging it
respSingle = resp.payloadBody().ignoreElements().onErrorComplete()
.concat(Single.<StreamingHttpResponse>failed(
returnFailedResponses ? new WrappedResponseException(resp, exception) : exception));
}
return respSingle.shareContextOnSubscribe();
});
}

Expand All @@ -311,13 +284,11 @@ protected Single<StreamingHttpResponse> request(final StreamingHttpRequester del
// duplicatedRequest isn't used below because retryWhen must be applied outside the defer operator for (2).
single = single.retryWhen(retryStrategy(request, executionContext(), true));
if (returnFailedResponses) {
single = single.onErrorResume(t -> {
if (t instanceof WrappedResponseException) {
return Single.succeeded(((WrappedResponseException) t).response);
} else {
return Single.failed(t);
}
});
single = single.onErrorResume(WrappedResponseException.class, t -> Single.succeeded(
// The previous message was already drained but we can't just 'set' it because it then
// does a weird flow control thing. Therefore, we cheat by transforming in a way that
// simply discards the original.
t.response.transformMessageBody(ignored -> Publisher.empty())));
}
return single;
}
Expand Down Expand Up @@ -1113,10 +1084,6 @@ public RetryingHttpRequesterFilter build() {
}
}

private static Completable drainResponse(StreamingHttpResponse resp) {
return resp.payloadBody().ignoreElements().onErrorComplete();
}

private static final class WrappedResponseException extends Exception {

private static final long serialVersionUID = 3905983622734400759L;
Expand Down

0 comments on commit 3493dc4

Please sign in to comment.