From 7c6a6890384522f671aa5bc32e3ca0a5abe55b75 Mon Sep 17 00:00:00 2001 From: Jz Date: Wed, 24 Apr 2024 21:59:56 +0800 Subject: [PATCH 1/8] add support for listener name which is described in https://pulsar.apache.org/docs/next/concepts-multiple-advertised-listeners/ --- .../pulsarmq/PulsarMQCanalConnector.java | 30 +++++++++++++++++-- .../pulsarmq/config/PulsarMQConstants.java | 5 ++++ .../config/PulsarMQProducerConfig.java | 13 ++++++++ .../consumer/CanalPulsarMQConsumer.java | 10 +++++++ .../producer/CanalPulsarMQProducer.java | 10 +++++++ 5 files changed, 66 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/com/alibaba/otter/canal/client/pulsarmq/PulsarMQCanalConnector.java b/client/src/main/java/com/alibaba/otter/canal/client/pulsarmq/PulsarMQCanalConnector.java index 89358bdee4..2fbc0aa781 100644 --- a/client/src/main/java/com/alibaba/otter/canal/client/pulsarmq/PulsarMQCanalConnector.java +++ b/client/src/main/java/com/alibaba/otter/canal/client/pulsarmq/PulsarMQCanalConnector.java @@ -59,6 +59,12 @@ public class PulsarMQCanalConnector implements CanalMQConnector { * 角色认证token */ private String roleToken; + + /** + * listener name + */ + private String listenerName; + /** * 订阅客户端名称 */ @@ -129,6 +135,11 @@ public class PulsarMQCanalConnector implements CanalMQConnector { */ public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic , String subscriptName) { + this(isFlatMessage, serviceUrl, roleToken, topic, subscriptName,null); + } + + public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic + , String subscriptName, String listenerName) { this.isFlatMessage = isFlatMessage; this.serviceUrl = serviceUrl; this.roleToken = roleToken; @@ -137,6 +148,7 @@ public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String r if (StringUtils.isEmpty(this.subscriptName)) { throw new RuntimeException("Pulsar Consumer subscriptName required"); } + this.listenerName = listenerName; } /** @@ -150,6 +162,15 @@ public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String r , String subscriptName, int batchSize, int getBatchTimeoutSeconds, int batchProcessTimeoutSeconds , int redeliveryDelaySeconds, int ackTimeoutSeconds, boolean isRetry, boolean isRetryDLQUpperCase , int maxRedeliveryCount) { + this(isFlatMessage, serviceUrl, roleToken, topic, subscriptName, batchSize, getBatchTimeoutSeconds + , batchProcessTimeoutSeconds, redeliveryDelaySeconds, ackTimeoutSeconds, isRetry, isRetryDLQUpperCase + , maxRedeliveryCount, null); + } + + public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic + , String subscriptName, int batchSize, int getBatchTimeoutSeconds, int batchProcessTimeoutSeconds + , int redeliveryDelaySeconds, int ackTimeoutSeconds, boolean isRetry, boolean isRetryDLQUpperCase + , int maxRedeliveryCount, String listenerName) { this.isFlatMessage = isFlatMessage; this.serviceUrl = serviceUrl; this.roleToken = roleToken; @@ -166,15 +187,20 @@ public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String r this.isRetry = isRetry; this.isRetryDLQUpperCase = isRetryDLQUpperCase; this.maxRedeliveryCount = maxRedeliveryCount; + this.listenerName = listenerName; } @Override public void connect() throws CanalClientException { // 连接创建客户端 try { - pulsarClient = PulsarClient.builder() + ClientBuilder builder = PulsarClient.builder() .serviceUrl(serviceUrl) - .authentication(AuthenticationFactory.token(roleToken)) + .authentication(AuthenticationFactory.token(roleToken)); + if (StringUtils.isNotEmpty(listenerName)) { + builder.listenerName(listenerName); + } + pulsarClient = builder .build(); } catch (PulsarClientException e) { throw new RuntimeException(e); diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java index 1c9c1a8e66..e04afee45d 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java @@ -61,4 +61,9 @@ public class PulsarMQConstants { */ public static final String PULSARMQ_ADMIN_SERVER_URL = ROOT + "." + "adminServerUrl"; + /** + * Pulsar admin服务器地址 + */ + public static final String PULSARMQ_LISTENER_NAME = ROOT + "." + "listenerName"; + } diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java index 1ace4aa0d2..40d29f3db9 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java @@ -37,6 +37,11 @@ public class PulsarMQProducerConfig extends MQProperties { */ private String adminServerUrl; + /** + * listener name + */ + private String listenerName; + public String getServerUrl() { return serverUrl; } @@ -68,4 +73,12 @@ public String getAdminServerUrl() { public void setAdminServerUrl(String adminServerUrl) { this.adminServerUrl = adminServerUrl; } + + public String getListenerName() { + return listenerName; + } + + public void setListenerName(String listenerName) { + this.listenerName = listenerName; + } } diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java index 0a535468cb..1f3612043d 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java @@ -54,6 +54,12 @@ public class CanalPulsarMQConsumer implements CanalMsgConsumer { * 角色认证token */ private String roleToken; + + /** + * listener name + */ + private String listenerName; + /** * 订阅客户端名称 */ @@ -110,6 +116,7 @@ public void init(Properties properties, String topic, String groupId) { } this.serviceUrl = properties.getProperty(PulsarMQConstants.PULSARMQ_SERVER_URL); this.roleToken = properties.getProperty(PulsarMQConstants.PULSARMQ_ROLE_TOKEN); + this.listenerName = properties.getProperty(PulsarMQConstants.PULSARMQ_LISTENER_NAME); this.subscriptName = properties.getProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME); // 采用groupId作为subscriptName,避免所有的都是同一个订阅者名称 if (StringUtils.isEmpty(this.subscriptName)) { @@ -165,6 +172,9 @@ public void connect() { if (StringUtils.isNotEmpty(roleToken)) { builder.authentication(AuthenticationFactory.token(roleToken)); } + if (StringUtils.isNotEmpty(listenerName)) { + builder.authentication(AuthenticationFactory.token(listenerName)); + } pulsarClient = builder.build(); } catch (PulsarClientException e) { throw new RuntimeException(e); diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java index b13003dfd8..0a50f285ba 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java @@ -73,6 +73,11 @@ public void init(Properties properties) { // 角色权限认证的token builder.authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken())); } + if (StringUtils.isNotEmpty(pulsarMQProducerConfig.getListenerName())) { + //listener name + builder.listenerName(pulsarMQProducerConfig.getListenerName()); + } + client = builder.build(); } catch (PulsarClientException e) { throw new RuntimeException(e); @@ -127,6 +132,11 @@ private void loadPulsarMQProperties(Properties properties) { if (!StringUtils.isEmpty(adminServerUrl)) { tmpProperties.setAdminServerUrl(adminServerUrl); } + String listenerName = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_LISTENER_NAME); + if (!StringUtils.isEmpty(listenerName)) { + tmpProperties.setListenerName(listenerName); + } + if (logger.isDebugEnabled()) { logger.debug("Load pulsar properties ==> {}", JSON.toJSON(this.mqProperties)); } From 23c0c0548c3f29ad79a2f2adad2de76c7432730e Mon Sep 17 00:00:00 2001 From: Jz Date: Tue, 21 May 2024 18:38:07 +0800 Subject: [PATCH 2/8] upgrade for CVE-2023-20883 CVE-2022-1471 CVE-2023-20860 --- admin/pom.xml | 9 +++++++-- client-adapter/launcher/pom.xml | 2 +- client-adapter/pom.xml | 4 ++-- pom.xml | 4 ++-- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/admin/pom.xml b/admin/pom.xml index 94469ab43e..4ae240d415 100644 --- a/admin/pom.xml +++ b/admin/pom.xml @@ -36,7 +36,7 @@ org.springframework.boot spring-boot-dependencies - 2.5.4 + 2.5.15 pom import @@ -48,7 +48,12 @@ io.ebean ebean - 11.41.1 + 11.45.1 + + + org.yaml + snakeyaml + 2.0 diff --git a/client-adapter/launcher/pom.xml b/client-adapter/launcher/pom.xml index 633269b764..4efdedfdbd 100644 --- a/client-adapter/launcher/pom.xml +++ b/client-adapter/launcher/pom.xml @@ -17,7 +17,7 @@ org.springframework.boot spring-boot-dependencies - 2.5.4 + 2.5.15 pom import diff --git a/client-adapter/pom.xml b/client-adapter/pom.xml index cfd7c4aab8..21d023c5e1 100644 --- a/client-adapter/pom.xml +++ b/client-adapter/pom.xml @@ -117,7 +117,7 @@ org.springframework.boot spring-boot - 2.5.4 + 2.5.15 com.h2database @@ -238,7 +238,7 @@ org.yaml snakeyaml - 1.29 + 2.0 diff --git a/pom.xml b/pom.xml index 825246fae9..51f51ebfa6 100644 --- a/pom.xml +++ b/pom.xml @@ -99,9 +99,9 @@ 1.8 UTF-8 true - 5.3.9 + 5.3.26 2.17.0 - 4.8.0 + 4.9.8 5.18.0 1.0.3 2.4.0 From e090ed903ece5765b325425f4b460ce24c0da740 Mon Sep 17 00:00:00 2001 From: Jz Date: Tue, 2 Jul 2024 12:49:43 +0800 Subject: [PATCH 3/8] add enableChunking --- .../pulsarmq/config/PulsarMQConstants.java | 5 +++++ .../pulsarmq/config/PulsarMQProducerConfig.java | 12 ++++++++++++ .../pulsarmq/producer/CanalPulsarMQProducer.java | 14 ++++++++++++-- pom.xml | 2 +- 4 files changed, 30 insertions(+), 3 deletions(-) diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java index 5a6d1774cf..933859b94e 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java @@ -66,4 +66,9 @@ public class PulsarMQConstants { */ public static final String PULSARMQ_LISTENER_NAME = ROOT + "." + "listenerName"; + + /** + * Pulsar admin服务器地址 + */ + public static final String PULSARMQ_ENABLE_CHUNKING = ROOT + "." + "enableChunking"; } diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java index 40d29f3db9..1db7f65138 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java @@ -42,6 +42,11 @@ public class PulsarMQProducerConfig extends MQProperties { */ private String listenerName; + /** + * enableChunking + */ + private boolean enableChunking; + public String getServerUrl() { return serverUrl; } @@ -81,4 +86,11 @@ public String getListenerName() { public void setListenerName(String listenerName) { this.listenerName = listenerName; } + + public void setEnableChunking(boolean enableChunking) { + this.enableChunking = enableChunking; + } + public boolean getEnableChunking() { + return this.enableChunking; + } } diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java index 8347710a51..dc32a63ceb 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java @@ -137,6 +137,11 @@ private void loadPulsarMQProperties(Properties properties) { tmpProperties.setListenerName(listenerName); } + String enableChunkingStr = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ENABLE_CHUNKING); + if (!StringUtils.isEmpty(enableChunkingStr)) { + tmpProperties.setEnableChunking(Boolean.parseBoolean(enableChunkingStr)); + } + if (logger.isDebugEnabled()) { logger.debug("Load pulsar properties ==> {}", JSON.toJSON(this.mqProperties)); } @@ -408,11 +413,16 @@ private Producer getProducer(String topic) { } // 创建指定topic的生产者 - producer = client.newProducer() - .topic(fullTopic) + ProducerBuilder producerBuilder = client.newProducer(); + if(pulsarMQProperties.getEnableChunking()){ + producerBuilder.enableChunking(true); + producerBuilder.enableBatching(false); + } + producerBuilder.topic(fullTopic) // 指定路由器 .messageRouter(new MessageRouterImpl(topic)) .create(); + // 放入缓存 PRODUCERS.put(topic, producer); } diff --git a/pom.xml b/pom.xml index 4398b112d6..6fb7bf26ac 100644 --- a/pom.xml +++ b/pom.xml @@ -105,7 +105,7 @@ 5.18.0 1.0.3 2.4.0 - 2.8.1 + 2.11.4 5.1.48 0.8.3 2.22.1 From 52129dc83ea48a108a45908655c72dfb59eea75e Mon Sep 17 00:00:00 2001 From: Jz Date: Tue, 2 Jul 2024 13:26:31 +0800 Subject: [PATCH 4/8] downgrade pulsar client version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 6fb7bf26ac..0c86d16eaf 100644 --- a/pom.xml +++ b/pom.xml @@ -105,7 +105,7 @@ 5.18.0 1.0.3 2.4.0 - 2.11.4 + 2.9.5 5.1.48 0.8.3 2.22.1 From fd39cedfadaceece1fd3d6234b47f5fca027993c Mon Sep 17 00:00:00 2001 From: Jz Date: Tue, 2 Jul 2024 19:57:57 +0800 Subject: [PATCH 5/8] fix --- .../connector/pulsarmq/producer/CanalPulsarMQProducer.java | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java index dc32a63ceb..b90fc3ae69 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java @@ -418,7 +418,7 @@ private Producer getProducer(String topic) { producerBuilder.enableChunking(true); producerBuilder.enableBatching(false); } - producerBuilder.topic(fullTopic) + producer = producerBuilder.topic(fullTopic) // 指定路由器 .messageRouter(new MessageRouterImpl(topic)) .create(); diff --git a/pom.xml b/pom.xml index 0c86d16eaf..6fb7bf26ac 100644 --- a/pom.xml +++ b/pom.xml @@ -105,7 +105,7 @@ 5.18.0 1.0.3 2.4.0 - 2.9.5 + 2.11.4 5.1.48 0.8.3 2.22.1 From 195a9b43d1fb419920592540690263a3d4d376fe Mon Sep 17 00:00:00 2001 From: Jz Date: Tue, 2 Jul 2024 21:56:57 +0800 Subject: [PATCH 6/8] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=8E=8B=E7=BC=A9?= =?UTF-8?q?=E7=AE=97=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pulsarmq/config/PulsarMQConstants.java | 9 +++++-- .../config/PulsarMQProducerConfig.java | 12 ++++++++++ .../producer/CanalPulsarMQProducer.java | 24 +++++++++++++++++++ 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java index 933859b94e..bea661f598 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java @@ -62,13 +62,18 @@ public class PulsarMQConstants { public static final String PULSARMQ_ADMIN_SERVER_URL = ROOT + "." + "adminServerUrl"; /** - * Pulsar admin服务器地址 + * Pulsar 监听器名字 */ public static final String PULSARMQ_LISTENER_NAME = ROOT + "." + "listenerName"; /** - * Pulsar admin服务器地址 + * Pulsar 开启chunking */ public static final String PULSARMQ_ENABLE_CHUNKING = ROOT + "." + "enableChunking"; + + /** + * Pulsar 压缩算法 + */ + public static final String PULSARMQ_COMPRESSION_TYPE = ROOT + "." + "compressionType"; } diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java index 1db7f65138..af51c13fa8 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java @@ -47,6 +47,11 @@ public class PulsarMQProducerConfig extends MQProperties { */ private boolean enableChunking; + /** + * compressionType + */ + private String compressionType; + public String getServerUrl() { return serverUrl; } @@ -93,4 +98,11 @@ public void setEnableChunking(boolean enableChunking) { public boolean getEnableChunking() { return this.enableChunking; } + + public void setCompressionType(String compressionType) { + this.compressionType = compressionType; + } + public String getCompressionType() { + return this.compressionType; + } } diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java index b90fc3ae69..38ca7078a7 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java @@ -142,6 +142,12 @@ private void loadPulsarMQProperties(Properties properties) { tmpProperties.setEnableChunking(Boolean.parseBoolean(enableChunkingStr)); } + String compressionType = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_COMPRESSION_TYPE); + if (!StringUtils.isEmpty(compressionType)) { + tmpProperties.setCompressionType(compressionType); + } + + if (logger.isDebugEnabled()) { logger.debug("Load pulsar properties ==> {}", JSON.toJSON(this.mqProperties)); } @@ -418,6 +424,24 @@ private Producer getProducer(String topic) { producerBuilder.enableChunking(true); producerBuilder.enableBatching(false); } + + if(!StringUtils.isEmpty(pulsarMQProperties.getCompressionType())) { + switch(pulsarMQProperties.getCompressionType().toLowerCase()) { + case "lz4": + producerBuilder.compressionType(CompressionType.LZ4); + break; + case "zlib": + producerBuilder.compressionType(CompressionType.ZLIB); + break; + case "zstd": + producerBuilder.compressionType(CompressionType.ZSTD); + break; + case "snappy": + producerBuilder.compressionType(CompressionType.SNAPPY); + break; + } + } + producer = producerBuilder.topic(fullTopic) // 指定路由器 .messageRouter(new MessageRouterImpl(topic)) From 254002d6c01bd7c4d177d493c06ddfc407f4c01e Mon Sep 17 00:00:00 2001 From: Jz Date: Tue, 16 Jul 2024 15:37:53 +0800 Subject: [PATCH 7/8] fix CVE-2022-25845 --- parse/pom.xml | 10 ++++++++++ pom.xml | 6 +++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/parse/pom.xml b/parse/pom.xml index 1cab43c202..ce09343628 100644 --- a/parse/pom.xml +++ b/parse/pom.xml @@ -48,6 +48,16 @@ com.alibaba.polardbx polardbx-parser + + + com.alibaba + fastjson + + + + + com.alibaba + fastjson mysql diff --git a/pom.xml b/pom.xml index 6fb7bf26ac..4524bb51b8 100644 --- a/pom.xml +++ b/pom.xml @@ -394,7 +394,11 @@ polardbx-parser 5.4.19 - + + com.alibaba + fastjson + 1.2.83 + junit From 134d2c92f6a6698ea2d8ca2b32ac0b58eba1d724 Mon Sep 17 00:00:00 2001 From: Jz Date: Tue, 27 Aug 2024 16:48:17 +0800 Subject: [PATCH 8/8] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=20=E4=B8=80=E6=AD=A5?= =?UTF-8?q?=E5=8F=91=E9=80=81=20=E9=85=8D=E7=BD=AE=EF=BC=8C=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E5=90=9E=E5=90=90=E7=8E=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pulsarmq/config/PulsarMQConstants.java | 8 +++ .../config/PulsarMQProducerConfig.java | 15 +++++ .../producer/CanalPulsarMQProducer.java | 63 ++++++++++++++----- 3 files changed, 71 insertions(+), 15 deletions(-) diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java index dfc4de4d05..955cbccf16 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java @@ -71,8 +71,16 @@ public class PulsarMQConstants { */ public static final String PULSARMQ_ENABLE_CHUNKING = ROOT + "." + "enableChunking"; + /** + * Pulsar 发送的时候异步发送 + */ + public static final String PULSARMQ_ENABLE_ASYNC = ROOT + "." + "enableAsync"; + + /** * Pulsar 压缩算法 */ public static final String PULSARMQ_COMPRESSION_TYPE = ROOT + "." + "compressionType"; + + } diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java index 1fd6d42cb0..572ef8f0ce 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java @@ -47,11 +47,18 @@ public class PulsarMQProducerConfig extends MQProperties { */ private boolean enableChunking; + /** + * enableAsync + */ + private boolean enableAsync; + /** * compressionType */ private String compressionType; + + public String getServerUrl() { return serverUrl; } @@ -100,6 +107,14 @@ public boolean getEnableChunking() { return this.enableChunking; } + public void setEnableAsync(boolean enableAsync) { + this.enableAsync = enableAsync; + } + + public boolean getEnableAsync() { + return this.enableAsync; + } + public void setCompressionType(String compressionType) { this.compressionType = compressionType; } diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java index 4f4d581d6c..fc16c0100b 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java @@ -2,6 +2,7 @@ import java.util.*; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -56,6 +57,8 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ */ protected PulsarAdmin pulsarAdmin; + private boolean asyncSend; + @Override public void init(Properties properties) { // 加载配置 @@ -92,6 +95,8 @@ public void init(Properties properties) { } } + asyncSend = pulsarMQProducerConfig.getEnableAsync(); + // 加载所有生产者 --> topic可能有正则或表名,无法确认所有topic,在使用时再加载 int parallelPartitionSendThreadSize = mqProperties.getParallelSendThreadSize(); sendPartitionExecutor = new ThreadPoolExecutor(parallelPartitionSendThreadSize, @@ -142,6 +147,11 @@ private void loadPulsarMQProperties(Properties properties) { tmpProperties.setEnableChunking(Boolean.parseBoolean(enableChunkingStr)); } + String enableAsyncStr = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ENABLE_ASYNC); + if (!StringUtils.isEmpty(enableAsyncStr)) { + tmpProperties.setEnableAsync(Boolean.parseBoolean(enableAsyncStr)); + } + String compressionType = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_COMPRESSION_TYPE); if (!StringUtils.isEmpty(compressionType)) { tmpProperties.setCompressionType(compressionType); @@ -322,13 +332,23 @@ private void sendMessage(String topic, int partitionNum, com.alibaba.otter.canal Producer producer = getProducer(topic); byte[] msgBytes = CanalMessageSerializerUtil.serializer(msg, mqProperties.isFilterTransactionEntry()); try { - MessageId msgResultId = producer.newMessage() - .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partitionNum)) - .value(msgBytes) - .send(); - // todo 判断发送结果 - if (logger.isDebugEnabled()) { - logger.debug("Send Message to topic:{} Result: {}", topic, msgResultId); + if(asyncSend) { + producer.newMessage() + .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partitionNum)) + .value(msgBytes) + .sendAsync(); + if (logger.isDebugEnabled()) { + logger.debug("Async Send Message to topic:{}", topic); + } + } else { + MessageId msgResultId = producer.newMessage() + .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partitionNum)) + .value(msgBytes) + .send(); + // todo 判断发送结果 + if (logger.isDebugEnabled()) { + logger.debug("Send Message to topic:{} Result: {}", topic, msgResultId); + } } } catch (Throwable e) { throw new RuntimeException(e); @@ -349,14 +369,26 @@ private void sendMessage(String topic, int partition, List flatMess Producer producer = getProducer(topic); for (FlatMessage f : flatMessages) { try { - MessageId msgResultId = producer.newMessage() - .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition)) - .value(JSON.toJSONBytes(f, Feature.WriteNulls, JSONWriter.Feature.LargeObject)) - .send() - // - ; - if (logger.isDebugEnabled()) { - logger.debug("Send Messages to topic:{} Result: {}", topic, msgResultId); + if(asyncSend) { + producer.newMessage() + .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition)) + .value(JSON.toJSONBytes(f, Feature.WriteNulls, JSONWriter.Feature.LargeObject)) + .sendAsync() + // + ; + if (logger.isDebugEnabled()) { + logger.debug("Send Messages to topic:{}", topic); + } + } else { + MessageId msgResultId = producer.newMessage() + .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition)) + .value(JSON.toJSONBytes(f, Feature.WriteNulls, JSONWriter.Feature.LargeObject)) + .send() + // + ; + if (logger.isDebugEnabled()) { + logger.debug("Send Messages to topic:{} Result: {}", topic, msgResultId); + } } } catch (Throwable e) { throw new RuntimeException(e); @@ -441,6 +473,7 @@ private Producer getProducer(String topic) { } } + producer = producerBuilder.topic(fullTopic) // 指定路由器 .messageRouter(new MessageRouterImpl(topic))