Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

增加 Pulsar 异步发送的配置,增加吞吐率 #5256

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,16 @@ public class PulsarMQConstants {
*/
public static final String PULSARMQ_ENABLE_CHUNKING = ROOT + "." + "enableChunking";

/**
* Pulsar 发送的时候异步发送
*/
public static final String PULSARMQ_ENABLE_ASYNC = ROOT + "." + "enableAsync";


/**
* Pulsar 压缩算法
*/
public static final String PULSARMQ_COMPRESSION_TYPE = ROOT + "." + "compressionType";


}
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,18 @@ public class PulsarMQProducerConfig extends MQProperties {
*/
private boolean enableChunking;

/**
* enableAsync
*/
private boolean enableAsync;

/**
* compressionType
*/
private String compressionType;



public String getServerUrl() {
return serverUrl;
}
Expand Down Expand Up @@ -100,6 +107,14 @@ public boolean getEnableChunking() {
return this.enableChunking;
}

public void setEnableAsync(boolean enableAsync) {
this.enableAsync = enableAsync;
}

public boolean getEnableAsync() {
return this.enableAsync;
}

public void setCompressionType(String compressionType) {
this.compressionType = compressionType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -56,6 +57,8 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
*/
protected PulsarAdmin pulsarAdmin;

private boolean asyncSend;

@Override
public void init(Properties properties) {
// 加载配置
Expand Down Expand Up @@ -92,6 +95,8 @@ public void init(Properties properties) {
}
}

asyncSend = pulsarMQProducerConfig.getEnableAsync();

// 加载所有生产者 --> topic可能有正则或表名,无法确认所有topic,在使用时再加载
int parallelPartitionSendThreadSize = mqProperties.getParallelSendThreadSize();
sendPartitionExecutor = new ThreadPoolExecutor(parallelPartitionSendThreadSize,
Expand Down Expand Up @@ -142,6 +147,11 @@ private void loadPulsarMQProperties(Properties properties) {
tmpProperties.setEnableChunking(Boolean.parseBoolean(enableChunkingStr));
}

String enableAsyncStr = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ENABLE_ASYNC);
if (!StringUtils.isEmpty(enableAsyncStr)) {
tmpProperties.setEnableAsync(Boolean.parseBoolean(enableAsyncStr));
}

String compressionType = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_COMPRESSION_TYPE);
if (!StringUtils.isEmpty(compressionType)) {
tmpProperties.setCompressionType(compressionType);
Expand Down Expand Up @@ -322,13 +332,23 @@ private void sendMessage(String topic, int partitionNum, com.alibaba.otter.canal
Producer<byte[]> producer = getProducer(topic);
byte[] msgBytes = CanalMessageSerializerUtil.serializer(msg, mqProperties.isFilterTransactionEntry());
try {
MessageId msgResultId = producer.newMessage()
.property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partitionNum))
.value(msgBytes)
.send();
// todo 判断发送结果
if (logger.isDebugEnabled()) {
logger.debug("Send Message to topic:{} Result: {}", topic, msgResultId);
if(asyncSend) {
producer.newMessage()
.property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partitionNum))
.value(msgBytes)
.sendAsync();
if (logger.isDebugEnabled()) {
logger.debug("Async Send Message to topic:{}", topic);
}
} else {
MessageId msgResultId = producer.newMessage()
.property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partitionNum))
.value(msgBytes)
.send();
// todo 判断发送结果
if (logger.isDebugEnabled()) {
logger.debug("Send Message to topic:{} Result: {}", topic, msgResultId);
}
}
} catch (Throwable e) {
throw new RuntimeException(e);
Expand All @@ -349,14 +369,26 @@ private void sendMessage(String topic, int partition, List<FlatMessage> flatMess
Producer<byte[]> producer = getProducer(topic);
for (FlatMessage f : flatMessages) {
try {
MessageId msgResultId = producer.newMessage()
.property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition))
.value(JSON.toJSONBytes(f, Feature.WriteNulls, JSONWriter.Feature.LargeObject))
.send()
//
;
if (logger.isDebugEnabled()) {
logger.debug("Send Messages to topic:{} Result: {}", topic, msgResultId);
if(asyncSend) {
producer.newMessage()
.property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition))
.value(JSON.toJSONBytes(f, Feature.WriteNulls, JSONWriter.Feature.LargeObject))
.sendAsync()
//
;
if (logger.isDebugEnabled()) {
logger.debug("Send Messages to topic:{}", topic);
}
} else {
MessageId msgResultId = producer.newMessage()
.property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition))
.value(JSON.toJSONBytes(f, Feature.WriteNulls, JSONWriter.Feature.LargeObject))
.send()
//
;
if (logger.isDebugEnabled()) {
logger.debug("Send Messages to topic:{} Result: {}", topic, msgResultId);
}
}
} catch (Throwable e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -441,6 +473,7 @@ private Producer<byte[]> getProducer(String topic) {
}
}


producer = producerBuilder.topic(fullTopic)
// 指定路由器
.messageRouter(new MessageRouterImpl(topic))
Expand Down
10 changes: 10 additions & 0 deletions parse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@
<dependency>
<groupId>com.alibaba.polardbx</groupId>
<artifactId>polardbx-parser</artifactId>
<exclusions>
<exclusion>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
Expand Down
6 changes: 5 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,11 @@
<artifactId>polardbx-parser</artifactId>
<version>5.4.19</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- test dependency -->
<dependency>
<groupId>junit</groupId>
Expand Down