Skip to content
This repository has been archived by the owner on Nov 20, 2024. It is now read-only.

Generic error handling #85

Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public ProducerDestination provisionProducerDestination(String name,
}
}
}
return new RabbitProducerDestination(exchange, binding);
return new RabbitProducerDestination(exchange, binding,producerProperties);
}

@Override
Expand Down Expand Up @@ -165,7 +165,7 @@ public ConsumerDestination provisionConsumerDestination(String name, String grou
autoBindDLQ(applyPrefix(properties.getExtension().getPrefix(), baseQueueName), queueName,
properties.getExtension());
}
return new RabbitConsumerDestination(queue, binding);
return new RabbitConsumerDestination(queue, binding, properties);
}

/**
Expand Down Expand Up @@ -462,23 +462,32 @@ private void removeSingleton(String name) {
}
}

private static final class RabbitProducerDestination implements ProducerDestination {
private static final class RabbitProducerDestination implements ProducerDestination<ExtendedProducerProperties<RabbitProducerProperties>> {

private final Exchange exchange;

private final Binding binding;

RabbitProducerDestination(Exchange exchange, Binding binding) {
private final ExtendedProducerProperties<RabbitProducerProperties> properties;

RabbitProducerDestination(Exchange exchange, Binding binding, ExtendedProducerProperties<RabbitProducerProperties> properties) {
Assert.notNull(exchange, "exchange must not be null");
this.exchange = exchange;
this.binding = binding;
this.properties = properties;
}

@Override
public String getName() {
return this.exchange.getName();
}

@Override
public ExtendedProducerProperties<RabbitProducerProperties> getProperties() {
return this.properties;
}


@Override
public String getNameForPartition(int partition) {
return this.exchange.getName();
Expand All @@ -493,15 +502,17 @@ public String toString() {
}
}

private static final class RabbitConsumerDestination implements ConsumerDestination {
private static final class RabbitConsumerDestination implements ConsumerDestination<ExtendedConsumerProperties<RabbitConsumerProperties>> {

private final Queue queue;
private final Binding binding;
private final ExtendedConsumerProperties<RabbitConsumerProperties> properties;

RabbitConsumerDestination(Queue queue, Binding binding) {
RabbitConsumerDestination(Queue queue, Binding binding, ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
Assert.notNull(queue, "queue must not be null");
this.queue = queue;
this.binding = binding;
this.properties = properties;
}

@Override
Expand All @@ -516,6 +527,12 @@ public String toString() {
public String getName() {
return this.queue.getName();
}

@Override
public ExtendedConsumerProperties<RabbitConsumerProperties> getProperties() {
return this.properties;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,10 @@

package org.springframework.cloud.stream.binder.rabbit;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
Expand All @@ -34,9 +29,6 @@
import org.springframework.amqp.rabbit.core.support.BatchingStrategy;
import org.springframework.amqp.rabbit.core.support.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.support.postprocessor.DelegatingDecompressingPostProcessor;
Expand All @@ -49,25 +41,21 @@
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitCommonProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitExtendedBindingProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.provisioning.RabbitExchangeQueueProvisioner;
import org.springframework.cloud.stream.error.BinderErrorConfigurer;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
Expand All @@ -88,14 +76,14 @@
* @author Ilayaperumal Gopinathan
* @author David Turanski
* @author Marius Bogoevici
* @author Vinicius Carvalho
*/
public class RabbitMessageChannelBinder
extends AbstractMessageChannelBinder<ExtendedConsumerProperties<RabbitConsumerProperties>,
ExtendedProducerProperties<RabbitProducerProperties>, RabbitExchangeQueueProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, RabbitConsumerProperties, RabbitProducerProperties> {

private static final AmqpMessageHeaderErrorMessageStrategy errorMessageStrategy =
new AmqpMessageHeaderErrorMessageStrategy();


private static final MessagePropertiesConverter inboundMessagePropertiesConverter =
new DefaultMessagePropertiesConverter() {
Expand Down Expand Up @@ -126,8 +114,9 @@ public MessageProperties toMessageProperties(AMQP.BasicProperties source, Envelo
private RabbitExtendedBindingProperties extendedBindingProperties = new RabbitExtendedBindingProperties();

public RabbitMessageChannelBinder(ConnectionFactory connectionFactory, RabbitProperties rabbitProperties,
RabbitExchangeQueueProvisioner provisioningProvider) {
super(true, new String[0], provisioningProvider);
RabbitExchangeQueueProvisioner provisioningProvider,
BinderErrorConfigurer errorConfigurer) {
super(true, new String[0], provisioningProvider, errorConfigurer);
Assert.notNull(connectionFactory, "connectionFactory must not be null");
Assert.notNull(rabbitProperties, "rabbitProperties must not be null");
this.connectionFactory = connectionFactory;
Expand Down Expand Up @@ -272,117 +261,12 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination consumerDes
DefaultAmqpHeaderMapper mapper = DefaultAmqpHeaderMapper.inboundMapper();
mapper.setRequestHeaderNames(properties.getExtension().getHeaderPatterns());
adapter.setHeaderMapper(mapper);
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(consumerDestination, group, properties);
if (properties.getMaxAttempts() > 1) {
adapter.setRetryTemplate(buildRetryTemplate(properties));
if (properties.getExtension().isRepublishToDlq()) {
adapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
}
}
else {
adapter.setErrorMessageStrategy(errorMessageStrategy);
adapter.setErrorChannel(errorInfrastructure.getErrorChannel());
}
return adapter;
}

@Override
protected ErrorMessageStrategy getErrorMessageStrategy() {
return errorMessageStrategy;
}

@Override
protected MessageHandler getErrorMessageHandler(ConsumerDestination destination, String group,
final ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
if (properties.getExtension().isRepublishToDlq()) {
return new MessageHandler() {

private final RabbitTemplate template = new RabbitTemplate(
RabbitMessageChannelBinder.this.connectionFactory);

private final String exchange = deadLetterExchangeName(properties.getExtension());

private final String routingKey = properties.getExtension().getDeadLetterRoutingKey();

@Override
public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
Message amqpMessage = (Message) message.getHeaders()
.get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE);
if (!(message instanceof ErrorMessage)) {
logger.error("Expected an ErrorMessage, not a " + message.getClass().toString() + " for: "
+ message);
}
else if (amqpMessage == null) {
logger.error("No raw message header in " + message);
}
else {
Throwable cause = (Throwable) message.getPayload();
MessageProperties messageProperties = amqpMessage.getMessageProperties();
Map<String, Object> headers = messageProperties.getHeaders();
headers.put(RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE, getStackTraceAsString(cause));
headers.put(RepublishMessageRecoverer.X_EXCEPTION_MESSAGE,
cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
headers.put(RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE,
messageProperties.getReceivedExchange());
headers.put(RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY,
messageProperties.getReceivedRoutingKey());
if (properties.getExtension().getRepublishDeliveyMode() != null) {
messageProperties.setDeliveryMode(properties.getExtension().getRepublishDeliveyMode());
}
template.send(this.exchange,
this.routingKey != null ? this.routingKey : messageProperties.getConsumerQueue(),
amqpMessage);
}
}

};
}
else if (properties.getMaxAttempts() > 1) {
return new MessageHandler() {

private final RejectAndDontRequeueRecoverer recoverer = new RejectAndDontRequeueRecoverer();

@Override
public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
Message amqpMessage = (Message) message.getHeaders()
.get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE);
if (!(message instanceof ErrorMessage)) {
logger.error("Expected an ErrorMessage, not a " + message.getClass().toString() + " for: "
+ message);
throw new ListenerExecutionFailedException("Unexpected error message " + message,
new AmqpRejectAndDontRequeueException(""), null);
}
else if (amqpMessage == null) {
logger.error("No raw message header in " + message);
throw new ListenerExecutionFailedException("Unexpected error message " + message,
new AmqpRejectAndDontRequeueException(""), amqpMessage);
}
else {
this.recoverer.recover(amqpMessage, (Throwable) message.getPayload());
}
}

};
}
else {
return super.getErrorMessageHandler(destination, group, properties);
}
return adapter;
}

@Override
protected String errorsBaseName(ConsumerDestination destination, String group,
ExtendedConsumerProperties<RabbitConsumerProperties> consumerProperties) {
return destination.getName() + ".errors";
}

private String deadLetterExchangeName(RabbitCommonProperties properties) {
if (properties.getDeadLetterExchange() == null) {
return applyPrefix(properties.getPrefix(), RabbitCommonProperties.DEAD_LETTER_EXCHANGE);
}
else {
return properties.getDeadLetterExchange();
}
}

@Override
protected void afterUnbindConsumer(ConsumerDestination consumerDestination, String group,
Expand Down Expand Up @@ -430,13 +314,11 @@ private RabbitTemplate buildRabbitTemplate(RabbitProducerProperties properties)
}
rabbitTemplate.afterPropertiesSet();
return rabbitTemplate;

}

private String getStackTraceAsString(Throwable cause) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter, true);
cause.printStackTrace(printWriter);
return stringWriter.getBuffer().toString();
RabbitMessageChannelErrorConfigurer getErrorConfigurer(){
return (RabbitMessageChannelErrorConfigurer) this.errorConfigurer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be more strict here and require definitely only RabbitMessageChannelErrorConfigurer injection - no any generic BinderErrorConfigurer support.
More over you are going to catch here ClassCast if you don't provide exactly RabbitMessageChannelErrorConfigurer.

}

}
Loading