diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java index dfc4de4d05..955cbccf16 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java @@ -71,8 +71,16 @@ public class PulsarMQConstants { */ public static final String PULSARMQ_ENABLE_CHUNKING = ROOT + "." + "enableChunking"; + /** + * Pulsar 发送的时候异步发送 + */ + public static final String PULSARMQ_ENABLE_ASYNC = ROOT + "." + "enableAsync"; + + /** * Pulsar 压缩算法 */ public static final String PULSARMQ_COMPRESSION_TYPE = ROOT + "." + "compressionType"; + + } diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java index 1fd6d42cb0..572ef8f0ce 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java @@ -47,11 +47,18 @@ public class PulsarMQProducerConfig extends MQProperties { */ private boolean enableChunking; + /** + * enableAsync + */ + private boolean enableAsync; + /** * compressionType */ private String compressionType; + + public String getServerUrl() { return serverUrl; } @@ -100,6 +107,14 @@ public boolean getEnableChunking() { return this.enableChunking; } + public void setEnableAsync(boolean enableAsync) { + this.enableAsync = enableAsync; + } + + public boolean getEnableAsync() { + return this.enableAsync; + } + public void setCompressionType(String compressionType) { this.compressionType = compressionType; } diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java index 4f4d581d6c..fc16c0100b 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java @@ -2,6 +2,7 @@ import java.util.*; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -56,6 +57,8 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ */ protected PulsarAdmin pulsarAdmin; + private boolean asyncSend; + @Override public void init(Properties properties) { // 加载配置 @@ -92,6 +95,8 @@ public void init(Properties properties) { } } + asyncSend = pulsarMQProducerConfig.getEnableAsync(); + // 加载所有生产者 --> topic可能有正则或表名,无法确认所有topic,在使用时再加载 int parallelPartitionSendThreadSize = mqProperties.getParallelSendThreadSize(); sendPartitionExecutor = new ThreadPoolExecutor(parallelPartitionSendThreadSize, @@ -142,6 +147,11 @@ private void loadPulsarMQProperties(Properties properties) { tmpProperties.setEnableChunking(Boolean.parseBoolean(enableChunkingStr)); } + String enableAsyncStr = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ENABLE_ASYNC); + if (!StringUtils.isEmpty(enableAsyncStr)) { + tmpProperties.setEnableAsync(Boolean.parseBoolean(enableAsyncStr)); + } + String compressionType = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_COMPRESSION_TYPE); if (!StringUtils.isEmpty(compressionType)) { tmpProperties.setCompressionType(compressionType); @@ -322,13 +332,23 @@ private void sendMessage(String topic, int partitionNum, com.alibaba.otter.canal Producer producer = getProducer(topic); byte[] msgBytes = CanalMessageSerializerUtil.serializer(msg, mqProperties.isFilterTransactionEntry()); try { - MessageId msgResultId = producer.newMessage() - .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partitionNum)) - .value(msgBytes) - .send(); - // todo 判断发送结果 - if (logger.isDebugEnabled()) { - logger.debug("Send Message to topic:{} Result: {}", topic, msgResultId); + if(asyncSend) { + producer.newMessage() + .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partitionNum)) + .value(msgBytes) + .sendAsync(); + if (logger.isDebugEnabled()) { + logger.debug("Async Send Message to topic:{}", topic); + } + } else { + MessageId msgResultId = producer.newMessage() + .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partitionNum)) + .value(msgBytes) + .send(); + // todo 判断发送结果 + if (logger.isDebugEnabled()) { + logger.debug("Send Message to topic:{} Result: {}", topic, msgResultId); + } } } catch (Throwable e) { throw new RuntimeException(e); @@ -349,14 +369,26 @@ private void sendMessage(String topic, int partition, List flatMess Producer producer = getProducer(topic); for (FlatMessage f : flatMessages) { try { - MessageId msgResultId = producer.newMessage() - .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition)) - .value(JSON.toJSONBytes(f, Feature.WriteNulls, JSONWriter.Feature.LargeObject)) - .send() - // - ; - if (logger.isDebugEnabled()) { - logger.debug("Send Messages to topic:{} Result: {}", topic, msgResultId); + if(asyncSend) { + producer.newMessage() + .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition)) + .value(JSON.toJSONBytes(f, Feature.WriteNulls, JSONWriter.Feature.LargeObject)) + .sendAsync() + // + ; + if (logger.isDebugEnabled()) { + logger.debug("Send Messages to topic:{}", topic); + } + } else { + MessageId msgResultId = producer.newMessage() + .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition)) + .value(JSON.toJSONBytes(f, Feature.WriteNulls, JSONWriter.Feature.LargeObject)) + .send() + // + ; + if (logger.isDebugEnabled()) { + logger.debug("Send Messages to topic:{} Result: {}", topic, msgResultId); + } } } catch (Throwable e) { throw new RuntimeException(e); @@ -441,6 +473,7 @@ private Producer getProducer(String topic) { } } + producer = producerBuilder.topic(fullTopic) // 指定路由器 .messageRouter(new MessageRouterImpl(topic)) diff --git a/parse/pom.xml b/parse/pom.xml index 1cab43c202..ce09343628 100644 --- a/parse/pom.xml +++ b/parse/pom.xml @@ -48,6 +48,16 @@ com.alibaba.polardbx polardbx-parser + + + com.alibaba + fastjson + + + + + com.alibaba + fastjson mysql diff --git a/pom.xml b/pom.xml index 6fb7bf26ac..4524bb51b8 100644 --- a/pom.xml +++ b/pom.xml @@ -394,7 +394,11 @@ polardbx-parser 5.4.19 - + + com.alibaba + fastjson + 1.2.83 + junit