diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index e05c614c6d2..371a4a0dbdb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -649,6 +649,10 @@ public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerN String brokerAddr = (null != destBrokerName) ? this.mQClientFactory.findBrokerAddressInPublish(destBrokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); + if (UtilAll.isBlank(brokerAddr)) { + throw new MQClientException("Broker[" + destBrokerName + "] master node does not exist", null); + } + if (UtilAll.isBlank(consumerGroup)) { consumerGroup = this.defaultMQPullConsumer.getConsumerGroup(); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 4eccba8e8d4..46715cea950 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -767,6 +767,9 @@ private void sendMessageBack(MessageExt msg, int delayLevel, final String broker } else { String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); + if (UtilAll.isBlank(brokerAddr)) { + throw new MQClientException("Broker[" + brokerName + "] master node does not exist", null); + } this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg, this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); }