diff --git a/src/backend/ci/core/common/common-dispatch-sdk/src/main/kotlin/com/tencent/devops/common/dispatch.sdk/config/MQConfiguration.kt b/src/backend/ci/core/common/common-dispatch-sdk/src/main/kotlin/com/tencent/devops/common/dispatch.sdk/config/MQConfiguration.kt index ae991a1ceca..b10effb42eb 100644 --- a/src/backend/ci/core/common/common-dispatch-sdk/src/main/kotlin/com/tencent/devops/common/dispatch.sdk/config/MQConfiguration.kt +++ b/src/backend/ci/core/common/common-dispatch-sdk/src/main/kotlin/com/tencent/devops/common/dispatch.sdk/config/MQConfiguration.kt @@ -33,6 +33,7 @@ import com.tencent.devops.common.event.dispatcher.pipeline.mq.MQ import com.tencent.devops.common.event.dispatcher.pipeline.mq.MQ.EXCHANGE_AGENT_LISTENER_DIRECT import com.tencent.devops.common.event.dispatcher.pipeline.mq.MQ.ROUTE_AGENT_SHUTDOWN import com.tencent.devops.common.event.dispatcher.pipeline.mq.MQ.ROUTE_AGENT_STARTUP +import com.tencent.devops.common.event.dispatcher.pipeline.mq.Tools import org.slf4j.LoggerFactory import org.springframework.amqp.core.Binding import org.springframework.amqp.core.BindingBuilder @@ -50,6 +51,7 @@ import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.core.Ordered +@Suppress("ALL") @Configuration @AutoConfigureOrder(Ordered.LOWEST_PRECEDENCE) class MQConfiguration @Autowired constructor() { @@ -70,36 +72,38 @@ class MQConfiguration @Autowired constructor() { } @Bean - fun pipelineBuildStartQueue(@Autowired buildListener: BuildListener) = + fun pipelineBuildDispatchStartQueue(@Autowired buildListener: BuildListener) = Queue(MQ.QUEUE_PIPELINE_BUILD_START_DISPATCHER + getStartQueue(buildListener)) @Bean - fun pipelineBuildStartQueueBind( - @Autowired pipelineBuildStartQueue: Queue, + fun pipelineBuildDispatchStartQueueBind( + @Autowired pipelineBuildDispatchStartQueue: Queue, @Autowired pipelineBuildStartFanoutExchange: FanoutExchange ): Binding { - return BindingBuilder.bind(pipelineBuildStartQueue).to(pipelineBuildStartFanoutExchange) + return BindingBuilder.bind(pipelineBuildDispatchStartQueue).to(pipelineBuildStartFanoutExchange) } @Bean - fun pipelineBuildStartListenerContainer( + fun pipelineBuildDispatchStartListenerContainer( @Autowired connectionFactory: ConnectionFactory, - @Autowired pipelineBuildStartQueue: Queue, + @Autowired pipelineBuildDispatchStartQueue: Queue, @Autowired rabbitAdmin: RabbitAdmin, @Autowired buildListener: BuildListener, @Autowired messageConverter: Jackson2JsonMessageConverter ): SimpleMessageListenerContainer { - val container = SimpleMessageListenerContainer(connectionFactory) - container.setQueueNames(pipelineBuildStartQueue.name) - container.setConcurrentConsumers(10) - container.setMaxConcurrentConsumers(10) - container.setAmqpAdmin(rabbitAdmin) - container.setPrefetchCount(1) - val adapter = MessageListenerAdapter(buildListener, buildListener::onPipelineStartup.name) adapter.setMessageConverter(messageConverter) - container.setMessageListener(adapter) - return container + return Tools.createSimpleMessageListenerContainerByAdapter( + connectionFactory = connectionFactory, + queue = pipelineBuildDispatchStartQueue, + rabbitAdmin = rabbitAdmin, + startConsumerMinInterval = 10000, + consecutiveActiveTrigger = 5, + concurrency = 10, + maxConcurrency = 10, + adapter = adapter, + prefetchCount = 1 + ) } /** @@ -113,36 +117,38 @@ class MQConfiguration @Autowired constructor() { } @Bean - fun pipelineBuildFinishQueue(@Autowired buildListener: BuildListener) = + fun pipelineBuildDispatchFinishQueue(@Autowired buildListener: BuildListener) = Queue(MQ.QUEUE_PIPELINE_BUILD_FINISH_DISPATCHER + getShutdownQueue(buildListener)) @Bean - fun pipelineBuildFinishQueueBind( - @Autowired pipelineBuildFinishQueue: Queue, + fun pipelineBuildDispatchFinishQueueBind( + @Autowired pipelineBuildDispatchFinishQueue: Queue, @Autowired pipelineBuildFinishFanoutExchange: FanoutExchange ): Binding { - return BindingBuilder.bind(pipelineBuildFinishQueue).to(pipelineBuildFinishFanoutExchange) + return BindingBuilder.bind(pipelineBuildDispatchFinishQueue).to(pipelineBuildFinishFanoutExchange) } @Bean - fun pipelineBuildFinishListenerContainer( + fun pipelineBuildDispatchFinishListenerContainer( @Autowired connectionFactory: ConnectionFactory, - @Autowired pipelineBuildFinishQueue: Queue, + @Autowired pipelineBuildDispatchFinishQueue: Queue, @Autowired rabbitAdmin: RabbitAdmin, @Autowired buildListener: BuildListener, @Autowired messageConverter: Jackson2JsonMessageConverter ): SimpleMessageListenerContainer { - val container = SimpleMessageListenerContainer(connectionFactory) - container.setQueueNames(pipelineBuildFinishQueue.name) - container.setConcurrentConsumers(10) - container.setMaxConcurrentConsumers(10) - container.setAmqpAdmin(rabbitAdmin) - container.setPrefetchCount(1) - val adapter = MessageListenerAdapter(buildListener, buildListener::onPipelineShutdown.name) adapter.setMessageConverter(messageConverter) - container.setMessageListener(adapter) - return container + return Tools.createSimpleMessageListenerContainerByAdapter( + connectionFactory = connectionFactory, + queue = pipelineBuildDispatchFinishQueue, + rabbitAdmin = rabbitAdmin, + startConsumerMinInterval = 10000, + consecutiveActiveTrigger = 5, + concurrency = 10, + maxConcurrency = 10, + adapter = adapter, + prefetchCount = 1 + ) } @Bean @@ -153,15 +159,16 @@ class MQConfiguration @Autowired constructor() { } @Bean - fun buildStartQueue( - @Autowired buildListener: BuildListener - ): Queue { + fun buildAgentStartQueue(@Autowired buildListener: BuildListener): Queue { return Queue(ROUTE_AGENT_STARTUP + getStartQueue(buildListener)) } @Bean - fun buildStartBind(@Autowired buildStartQueue: Queue, @Autowired exchange: DirectExchange): Binding { - return BindingBuilder.bind(buildStartQueue).to(exchange).with(buildStartQueue.name) + fun buildAgentStartQueueBind( + @Autowired buildAgentStartQueue: Queue, + @Autowired exchange: DirectExchange + ): Binding { + return BindingBuilder.bind(buildAgentStartQueue).to(exchange).with(buildAgentStartQueue.name) } @Bean @@ -170,55 +177,60 @@ class MQConfiguration @Autowired constructor() { @Bean fun startListener( @Autowired connectionFactory: ConnectionFactory, - @Autowired buildStartQueue: Queue, + @Autowired buildAgentStartQueue: Queue, @Autowired rabbitAdmin: RabbitAdmin, @Autowired buildListener: BuildListener, @Autowired messageConverter: Jackson2JsonMessageConverter ): SimpleMessageListenerContainer { - val container = SimpleMessageListenerContainer(connectionFactory) - container.setQueueNames(buildStartQueue.name) - container.setConcurrentConsumers(60) - container.setMaxConcurrentConsumers(100) - container.setAmqpAdmin(rabbitAdmin) - container.setMismatchedQueuesFatal(true) - container.setPrefetchCount(1) - val messageListenerAdapter = MessageListenerAdapter(buildListener, buildListener::handleStartMessage.name) - messageListenerAdapter.setMessageConverter(messageConverter) - container.setMessageListener(messageListenerAdapter) - logger.info("Start listener") - return container - } - - @Bean - fun buildShutdownQueue(@Autowired buildListener: BuildListener): Queue { + val adapter = MessageListenerAdapter(buildListener, buildListener::handleStartMessage.name) + adapter.setMessageConverter(messageConverter) + return Tools.createSimpleMessageListenerContainerByAdapter( + connectionFactory = connectionFactory, + queue = buildAgentStartQueue, + rabbitAdmin = rabbitAdmin, + startConsumerMinInterval = 10000, + consecutiveActiveTrigger = 5, + concurrency = 60, + maxConcurrency = 100, + adapter = adapter, + prefetchCount = 1 + ) + } + + @Bean + fun buildAgentShutdownQueue(@Autowired buildListener: BuildListener): Queue { return Queue(ROUTE_AGENT_SHUTDOWN + getShutdownQueue(buildListener)) } @Bean - fun buildShutdownBind(@Autowired buildShutdownQueue: Queue, @Autowired exchange: DirectExchange): Binding { - return BindingBuilder.bind(buildShutdownQueue).to(exchange).with(buildShutdownQueue.name) + fun buildAgentShutdownQueueBind( + @Autowired buildAgentShutdownQueue: Queue, + @Autowired exchange: DirectExchange + ): Binding { + return BindingBuilder.bind(buildAgentShutdownQueue).to(exchange).with(buildAgentShutdownQueue.name) } @Bean fun shutdownListener( @Autowired connectionFactory: ConnectionFactory, - @Autowired buildShutdownQueue: Queue, + @Autowired buildAgentShutdownQueue: Queue, @Autowired rabbitAdmin: RabbitAdmin, @Autowired buildListener: BuildListener, @Autowired messageConverter: Jackson2JsonMessageConverter ): SimpleMessageListenerContainer { - val container = SimpleMessageListenerContainer(connectionFactory) - container.setQueueNames(buildShutdownQueue.name) - container.setConcurrentConsumers(60) - container.setMaxConcurrentConsumers(100) - container.setAmqpAdmin(rabbitAdmin) - container.setMismatchedQueuesFatal(true) - container.setPrefetchCount(1) - val messageListenerAdapter = MessageListenerAdapter(buildListener, buildListener::handleShutdownMessage.name) - messageListenerAdapter.setMessageConverter(messageConverter) - container.setMessageListener(messageListenerAdapter) - logger.info("Start shutdown listener") - return container + val adapter = MessageListenerAdapter(buildListener, buildListener::handleShutdownMessage.name) + adapter.setMessageConverter(messageConverter) + return Tools.createSimpleMessageListenerContainerByAdapter( + connectionFactory = connectionFactory, + queue = buildAgentShutdownQueue, + rabbitAdmin = rabbitAdmin, + startConsumerMinInterval = 10000, + consecutiveActiveTrigger = 5, + concurrency = 60, + maxConcurrency = 100, + adapter = adapter, + prefetchCount = 1 + ) } companion object { diff --git a/src/backend/ci/core/log/biz-log/src/main/kotlin/com/tencent/devops/log/configuration/LogMQConfiguration.kt b/src/backend/ci/core/log/biz-log/src/main/kotlin/com/tencent/devops/log/configuration/LogMQConfiguration.kt index 8dcc7ac9670..7c10e11b8f8 100644 --- a/src/backend/ci/core/log/biz-log/src/main/kotlin/com/tencent/devops/log/configuration/LogMQConfiguration.kt +++ b/src/backend/ci/core/log/biz-log/src/main/kotlin/com/tencent/devops/log/configuration/LogMQConfiguration.kt @@ -63,6 +63,7 @@ import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.core.Ordered +@Suppress("ALL") @Configuration @ConditionalOnWebApplication @AutoConfigureOrder(Ordered.LOWEST_PRECEDENCE) @@ -234,20 +235,20 @@ class LogMQConfiguration @Autowired constructor() { } @Bean - fun pipelineBuildFinishQueue() = Queue(MQ.QUEUE_PIPELINE_BUILD_FINISH_LOG) + fun pipelineBuildFinishLogQueue() = Queue(MQ.QUEUE_PIPELINE_BUILD_FINISH_LOG) @Bean - fun pipelineBuildFinishQueueBind( - @Autowired pipelineBuildFinishQueue: Queue, + fun pipelineBuildFinishLogQueueBind( + @Autowired pipelineBuildFinishLogQueue: Queue, @Autowired pipelineBuildFinishFanoutExchange: FanoutExchange ): Binding { - return BindingBuilder.bind(pipelineBuildFinishQueue).to(pipelineBuildFinishFanoutExchange) + return BindingBuilder.bind(pipelineBuildFinishLogQueue).to(pipelineBuildFinishFanoutExchange) } @Bean - fun pipelineBuildFinishListenerContainer( + fun pipelineBuildFinishLogListenerContainer( @Autowired connectionFactory: ConnectionFactory, - @Autowired pipelineBuildFinishQueue: Queue, + @Autowired pipelineBuildFinishLogQueue: Queue, @Autowired rabbitAdmin: RabbitAdmin, @Autowired logService: LogService, @Autowired messageConverter: Jackson2JsonMessageConverter @@ -256,7 +257,7 @@ class LogMQConfiguration @Autowired constructor() { adapter.setMessageConverter(messageConverter) return Tools.createSimpleMessageListenerContainerByAdapter( connectionFactory = connectionFactory, - queue = pipelineBuildFinishQueue, + queue = pipelineBuildFinishLogQueue, rabbitAdmin = rabbitAdmin, adapter = adapter, startConsumerMinInterval = 5000, diff --git a/src/backend/ci/core/process/biz-engine/src/main/kotlin/com/tencent/devops/process/engine/init/BuildEngineCoreBuildConfiguration.kt b/src/backend/ci/core/process/biz-engine/src/main/kotlin/com/tencent/devops/process/engine/init/BuildEngineCoreBuildConfiguration.kt index cfbfbda5f1d..9789f0fb068 100644 --- a/src/backend/ci/core/process/biz-engine/src/main/kotlin/com/tencent/devops/process/engine/init/BuildEngineCoreBuildConfiguration.kt +++ b/src/backend/ci/core/process/biz-engine/src/main/kotlin/com/tencent/devops/process/engine/init/BuildEngineCoreBuildConfiguration.kt @@ -74,7 +74,7 @@ class BuildEngineCoreBuildConfiguration { } @Bean - fun pipelineStageBuildStartListenerContainer( + fun pipelineBuildStartListenerContainer( @Autowired connectionFactory: ConnectionFactory, @Autowired pipelineBuildStartQueue: Queue, @Autowired rabbitAdmin: RabbitAdmin, diff --git a/src/backend/ci/core/process/biz-engine/src/main/kotlin/com/tencent/devops/process/engine/init/BuildEngineCoreStageConfiguration.kt b/src/backend/ci/core/process/biz-engine/src/main/kotlin/com/tencent/devops/process/engine/init/BuildEngineCoreStageConfiguration.kt index 6f0c78a1fbe..e42c0697106 100644 --- a/src/backend/ci/core/process/biz-engine/src/main/kotlin/com/tencent/devops/process/engine/init/BuildEngineCoreStageConfiguration.kt +++ b/src/backend/ci/core/process/biz-engine/src/main/kotlin/com/tencent/devops/process/engine/init/BuildEngineCoreStageConfiguration.kt @@ -68,7 +68,7 @@ class BuildEngineCoreStageConfiguration { } @Bean - fun pipelineStageBuildStageListenerContainer( + fun pipelineBuildStageListenerContainer( @Autowired connectionFactory: ConnectionFactory, @Autowired pipelineBuildStageQueue: Queue, @Autowired rabbitAdmin: RabbitAdmin,