Skip to content

Commit

Permalink
optimize handle mqtt command
Browse files Browse the repository at this point in the history
  • Loading branch information
lvjing2 committed Nov 28, 2024
1 parent b968d59 commit c9fca16
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,63 +184,40 @@ private List<SimpleBizInfo> getSimpleAllBizInfo(Output<?> output) {
public void handleCommand(String cmd, MqttMessage msg) throws ArkletRuntimeException {
ArkletLoggerFactory.getDefaultLogger().info("mqtt handle command {} with content {}", cmd,
msg);
Output<?> output;
if (cmd.equals(BuiltinCommand.INSTALL_BIZ.getId())
|| cmd.equals(BuiltinCommand.UNINSTALL_BIZ.getId())) {
handleBizOperation(cmd, msg);
} else if (cmd.equals("baseline")) {
handlePlaybackBaseline(cmd, msg);
} else if (cmd.equals(BuiltinCommand.HEALTH.getId())) {
handleHealthCommand(cmd, msg);
} else if (cmd.equals(BuiltinCommand.QUERY_ALL_BIZ.getId())) {
handleQueryAllBizCommand(cmd, msg);
} else {
throw new ArkletRuntimeException(
String.format("unsupported command %s with content: %s", cmd, msg));
}
}

private void handleBizOperation(String cmd, MqttMessage msg) throws ArkletRuntimeException {
Map<String, Object> cmdContent = JSONObject.parseObject(msg.toString(), HashMap.class);
Map<String, Object> bizOperationResponse = new HashMap<>();
try {
if (cmd.equals(BuiltinCommand.INSTALL_BIZ.getId())
|| cmd.equals(BuiltinCommand.UNINSTALL_BIZ.getId())) {
Map<String, Object> cmdContent = JSONObject.parseObject(msg.toString(),
HashMap.class);
bizOperationResponse.put(Constants.COMMAND, cmd);
bizOperationResponse.put(BIZ_NAME, cmdContent.get(BIZ_NAME));
bizOperationResponse.put(BIZ_VERSION, cmdContent.get(BIZ_VERSION));
bizOperationResponse.put(Constants.COMMAND, cmd);
bizOperationResponse.put(BIZ_NAME, cmdContent.get(BIZ_NAME));
bizOperationResponse.put(BIZ_VERSION, cmdContent.get(BIZ_VERSION));

output = commandService.process(cmd, cmdContent);
// install or uninstall command, send result to biz operation response topic
bizOperationResponse.put(Constants.COMMAND_RESPONSE, output);
mqttClient.publish(getBizOperationResponseTopic(),
JSONObject.toJSONString(MqttResponse.withData(bizOperationResponse)).getBytes(),
1, false);
} else if (cmd.equals("baseline")) {
List<Map<String, Object>> cmdContents = JSONObject.parseObject(msg.toString(),
List.class);
// TODO: parallel process
for (Map<String, Object> cmdContent : cmdContents) {
commandService.process(BuiltinCommand.INSTALL_BIZ.getId(), cmdContent);
}
Output<?> allBizOutput = commandService
.process(BuiltinCommand.QUERY_ALL_BIZ.getId(), null);
mqttClient.publish(getBizTopic(), JSONObject
.toJSONString(MqttResponse.withData(getSimpleAllBizInfo(allBizOutput)),
SerializerFeature.SkipTransientField, SerializerFeature.WriteMapNullValue)
.getBytes(), 1, false);
} else if (cmd.equals(BuiltinCommand.HEALTH.getId())) {
// health command, send result to health topic
Map<String, Object> cmdContent = JSONObject.parseObject(msg.toString(),
HashMap.class);
output = commandService.process(cmd, cmdContent);
mqttClient.publish(getHealthTopic(),
JSONObject.toJSONString(MqttResponse.withData(output)).getBytes(), 1, false);
} else if (cmd.equals(BuiltinCommand.QUERY_ALL_BIZ.getId())) {
// queryAllBiz command, send result to biz topic
Map<String, Object> cmdContent = JSONObject.parseObject(msg.toString(),
HashMap.class);
output = commandService.process(cmd, cmdContent);
mqttClient.publish(getBizTopic(),
JSONObject.toJSONString(MqttResponse.withData(getSimpleAllBizInfo(output)),
SerializerFeature.SkipTransientField, SerializerFeature.WriteMapNullValue)
.getBytes(),
1, false);
} else {
throw new ArkletRuntimeException(
String.format("unsupported command %s with content: %s", cmd, msg));
}
} catch (Throwable e) {
try {
Output<?> output = commandService.process(cmd, cmdContent);
// install or uninstall command, send result to biz operation response topic
bizOperationResponse.put(Constants.COMMAND_RESPONSE, output);
mqttClient.publish(getBizOperationResponseTopic(),
JSONObject.toJSONString(MqttResponse.withData(bizOperationResponse)).getBytes(), 1,
false);
} catch (Throwable t) {
ClientResponse data = new ClientResponse();
data.setMessage(ExceptionUtils.getStackTraceAsString(e));
data.setMessage(ExceptionUtils.getStackTraceAsString(t));
data.setCode(ResponseCode.FAILED);
output = Output.ofFailed(data, e.getMessage());

Output<?> output = Output.ofFailed(data, t.getMessage());
// install or uninstall command, send result to biz operation response topic
bizOperationResponse.put(Constants.COMMAND, cmd);
bizOperationResponse.put(Constants.COMMAND_RESPONSE, output);
Expand All @@ -251,8 +228,61 @@ public void handleCommand(String cmd, MqttMessage msg) throws ArkletRuntimeExcep
} catch (MqttException ex) {
throw new ArkletRuntimeException(ex);
}
throw new ArkletRuntimeException(e);
throw new ArkletRuntimeException(t);
}
}

private void handlePlaybackBaseline(String cmd, MqttMessage msg) throws ArkletRuntimeException {
List<Map<String, Object>> cmdContents = JSONObject.parseObject(msg.toString(), List.class);

ArkletLoggerFactory.getDefaultLogger()
.info("start to playback baseline with content: " + cmdContents);
// TODO: parallel process
List<Map<String, Object>> failedContents = new ArrayList<>();
for (Map<String, Object> cmdContent : cmdContents) {
try {
commandService.process(BuiltinCommand.INSTALL_BIZ.getId(), cmdContent);
} catch (Throwable e) {
failedContents.add(cmdContent);
}
}
if (!failedContents.isEmpty()) {
ArkletLoggerFactory.getDefaultLogger().error(
String.format("fail to handle command %s with content: %s", cmd, failedContents));
} else {
ArkletLoggerFactory.getDefaultLogger()
.info("install biz success when playback baseline");
}
}

private void handleHealthCommand(String cmd, MqttMessage msg) throws ArkletRuntimeException {
Map<String, Object> cmdContent = JSONObject.parseObject(msg.toString(), HashMap.class);
try {
Output<?> output = commandService.process(cmd, cmdContent);
mqttClient.publish(getHealthTopic(),
JSONObject.toJSONString(MqttResponse.withData(output)).getBytes(), 1, false);
} catch (Throwable t) {
ArkletLoggerFactory.getDefaultLogger().error(
String.format("fail to handle command %s with content: %s", cmd, cmdContent), t);
throw new ArkletRuntimeException(t);
}
}

private void handleQueryAllBizCommand(String cmd,
MqttMessage msg) throws ArkletRuntimeException {
Map<String, Object> cmdContent = JSONObject.parseObject(msg.toString(), HashMap.class);
try {
Output<?> output = commandService.process(cmd, cmdContent);
mqttClient.publish(getBizTopic(),
JSONObject
.toJSONString(MqttResponse.withData(getSimpleAllBizInfo(output)),
SerializerFeature.SkipTransientField, SerializerFeature.WriteMapNullValue)
.getBytes(),
1, false);
} catch (Throwable t) {
ArkletLoggerFactory.getDefaultLogger().error(
String.format("fail to handle command %s with content: %s", cmd, cmdContent), t);
throw new ArkletRuntimeException(t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alipay.sofa.koupleless.arklet.tunnel.paho;

import com.alipay.sofa.koupleless.arklet.core.command.builtin.BuiltinCommand;
import com.alipay.sofa.koupleless.arklet.core.common.exception.ArkletRuntimeException;
import com.alipay.sofa.koupleless.arklet.tunnel.BaseTest;
import com.alipay.sofa.koupleless.arklet.tunnel.mqtt.paho.MqttMessageHandler;
import org.eclipse.paho.client.mqttv3.MqttMessage;
Expand All @@ -36,15 +37,15 @@ public void onConnectCompleted() {
mqttMessageHandler.onConnectCompleted();
}

@Test(expected = NullPointerException.class)
@Test(expected = ArkletRuntimeException.class)
public void handleHealthCommand() {
MqttMessageHandler mqttMessageHandler = new MqttMessageHandler(commandService,
baseMetadataHook, null, "test");
mqttMessageHandler.handleCommand(BuiltinCommand.HEALTH.getId(),
new MqttMessage("{}".getBytes()));
}

@Test(expected = NullPointerException.class)
@Test(expected = ArkletRuntimeException.class)
public void handleQueryAllBizCommand() {
MqttMessageHandler mqttMessageHandler = new MqttMessageHandler(commandService,
baseMetadataHook, null, "test");
Expand Down

0 comments on commit c9fca16

Please sign in to comment.