Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

参照example配置mq动态线程池后,消费者收到消息后不会立刻消费,求解答,是否哪里配置有误? #445

Open
shiuekai opened this issue Jul 19, 2024 · 0 comments
Labels
question Further information is requested

Comments

@shiuekai
Copy link

首先感谢您使用 DynamicTp,如果对项目有任何疑问需要解答,请按照下述模板提问,建议使用 Markdown 语法

环境

Springboot 3.2.7
rabbitmq 3.1.6

使用方面

配置类
`@Slf4j
@configuration
public class MqMDCConfig {

/**
 * 往abstractConnectionFactory里面设置线程池
 * 这里需要注意 配置文件      rabbitmqTp对应的threadPoolName 要与  RabbitMqDtpAdapter的genTpName方法生成的名字对上
 */
@Bean
public AbstractRabbitListenerContainerFactory<?> rabbitListenerContainerFactory(AbstractConnectionFactory abstractConnectionFactory) {
    DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
    factory.setConnectionFactory(abstractConnectionFactory);
    factory.setAfterReceivePostProcessors(mdcMessagePostProcessor());
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setMaxPoolSize(4);
    executor.setCorePoolSize(2);
    executor.setQueueCapacity(10);
    executor.setThreadNamePrefix("dynamic-mqConsumer-");
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.initialize();
    factory.setTaskExecutor(executor);
    abstractConnectionFactory.setExecutor(executor.getThreadPoolExecutor());
    log.info(">>>>>>>>>>> directRabbitListenerContainerFactory init.");
    return factory;
}


@Bean
public MessageConverter jackson2JsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}

@Bean
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, AbstractConnectionFactory abstractConnectionFactory) {
    RabbitTemplate template = new RabbitTemplate();
    template.setMessageConverter(jackson2JsonMessageConverter());
    configurer.configure(template, abstractConnectionFactory);
    template.setBeforePublishPostProcessors(new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            return message;
        }

        @Override
        public Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {
            String traceId = MDC.get(TRACE_ID);
            if (StrUtil.isNotBlank(traceId)) {
                // 内部调用自己设置链路
                message.getMessageProperties().setHeader(TRACE_ID, traceId);
            }
            return message;
        }
    });
    template.setConfirmCallback((correlationData, ack, cause) -> {
        // correlationData 可能为空
        if (ack) {
            log.info("消息发送到exchange成功,id: {}", correlationData);
        } else {
            log.error("消息发送到exchange失败,原因: {}", cause);
        }
    });
    template.setReturnsCallback((returnedMessage -> {
        log.info(returnedMessage.getExchange(), returnedMessage.getRoutingKey(),
                returnedMessage.getReplyCode(), returnedMessage.getReplyText(),
                returnedMessage.getMessage());
    }));
    log.info(">>>>>>>>>>> rabbitTemplate init.");
    return template;
}

@Bean
public MDCMessagePostProcessor mdcMessagePostProcessor() {
    return new MDCMessagePostProcessor();
}

}`

生产者
rabbitTemplate.convertAndSend("testQueue", "test");

消费者
@RabbitListener(queues = "testQueue", containerFactory = "rabbitListenerContainerFactory") @RabbitHandler public void testQueue(String text) { Thread thread = Thread.currentThread(); String name = thread.getName(); long id = thread.getId(); log.info("receive message: {}, thread name: {}, thread id: {}", text, name, id); }

zk配置
spring.dynamic.tp.monitorInterval=5 spring.dynamic.tp.rabbitmqTp[0].threadPoolName=rabbitMqTp#rabbitConnectionFactory spring.dynamic.tp.rabbitmqTp[0].threadPoolAliasName=rabbitmq线程池 spring.dynamic.tp.rabbitmqTp[0].threadNamePrefix=dynamicRabbitmqExecutor spring.dynamic.tp.rabbitmqTp[0].corePoolSize=8 spring.dynamic.tp.rabbitmqTp[0].maximumPoolSize=16 spring.dynamic.tp.rabbitmqTp[0].keepAliveTime=60 spring.dynamic.tp.rabbitmqTp[0].notifyEnabled=false

原理方面

其他

@shiuekai shiuekai added the question Further information is requested label Jul 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

1 participant