Skip to content

Commit

Permalink
Tested mqtt and small changes
Browse files Browse the repository at this point in the history
  • Loading branch information
retrodaredevil committed Mar 25, 2021
1 parent fb277c1 commit ad6da04
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,29 @@
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;

public class MqttPacketSaver implements PacketHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MqttPacketSaver.class);
private static final ObjectMapper OBJECT_MAPPER = JacksonUtil.defaultMapper();
private static final Charset CHARSET = StandardCharsets.UTF_8;
/** The default topic format. Note that the ending "/" is optional. */
public static final String DEFAULT_TOPIC_FORMAT = "solarthing/%source/%fragment/%identifier/";
public static final String DEFAULT_TOPIC_FORMAT = "solarthing/%source/%fragment/%identifier";

private final MemoryPersistence persistence = new MemoryPersistence();
private final MqttClient client;
private final MqttConnectOptions options;
/** The topic format similar to "solarthing/%source/%fragment/%identifier/" */
/** The topic format similar to "solarthing/%source/%fragment/%identifier" (NO slash at end) */
private final String topicFormat;
private final boolean retain;

public MqttPacketSaver(String broker, String clientId, String username, char[] password, String topicFormat, boolean retain) {
this.topicFormat = topicFormat.endsWith("/") ? topicFormat : topicFormat + "/";
this.topicFormat = topicFormat;
this.retain = retain;

// org.eclipse.paho.client.mqttv3.logging.LoggerFactory.setLogger();
Expand All @@ -56,32 +59,45 @@ public MqttPacketSaver(String broker, String clientId, String username, char[] p
public void handle(PacketCollection packetCollection, InstantType instantType) throws PacketHandleException {
InstancePacketGroup instancePacketGroup = PacketGroups.parseToInstancePacketGroup(packetCollection, DefaultInstanceOptions.REQUIRE_NO_DEFAULTS);
try {
handle(instancePacketGroup);
handle(instancePacketGroup, packetCollection);
} catch (MqttException e) {
throw new PacketHandleException("MQTT Exception", e);
} finally {
if (client.isConnected()) {
try {
client.disconnect();
} catch (MqttException e) {
//noinspection ThrowFromFinallyBlock
throw new RuntimeException("We shouldn't get an exception while disconnecting...", e);
LOGGER.error("We shouldn't get an exception while disconnecting... (Can't throw in finally)", e);
}
}
}
}
private void handle(InstancePacketGroup instancePacketGroup) throws MqttException {
private void handle(InstancePacketGroup instancePacketGroup, PacketCollection entirePacketCollection) throws MqttException {
DefaultInstanceOptions.requireNoDefaults(instancePacketGroup);
long dateMillis = instancePacketGroup.getDateMillis();
final String partiallyFormattedTopic = topicFormat
.replace("%source", instancePacketGroup.getSourceId())
.replace("%fragment", "" + instancePacketGroup.getFragmentId());

client.connect(options);
{ // block for entire packet collection
final String collectionJson;
try {
collectionJson = OBJECT_MAPPER.writeValueAsString(entirePacketCollection);
} catch (JsonProcessingException e) {
throw new RuntimeException("We should be able to serial this to JSON!", e);
}
client.publish(
partiallyFormattedTopic.replace("%identifier", "packetCollection"),
collectionJson.getBytes(CHARSET),
2,
retain
);
}
for (Packet packet : instancePacketGroup.getPackets()) {
if (packet instanceof Identifiable) {
Identifiable identifiable = (Identifiable) packet;
String topic = partiallyFormattedTopic.replace("%identifier", identifiable.getIdentifier().getRepresentation());
String topic = partiallyFormattedTopic.replace("%identifier", identifiable.getIdentifier().getRepresentation()) + "/";


final String rawJson;
Expand All @@ -96,7 +112,8 @@ private void handle(InstancePacketGroup instancePacketGroup) throws MqttExceptio
for (Map.Entry<String, ValueNode> entry : PointUtil.flattenJsonObject(json, "/")) {
String key = entry.getKey();
ValueNode prim = entry.getValue();
client.publish(topic + "packet/" + key, prim.asText().getBytes(CHARSET), 2, retain);
// Make individual fields use QoS of 1 because we want them all to go through, but no one should be using them as "event"-like packets
client.publish(topic + "packet/" + key, prim.asText().getBytes(CHARSET), 1, retain);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions config_templates/databases/mqtt_template.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"type": "mqtt",
"config": {
"broker": "tcp://localhost:1883",
"username": "solarthing_user",
"password": "my_cool_password"
"username": "admin",
"password": "hivemq"
}
}
11 changes: 11 additions & 0 deletions config_templates/databases/mqtt_template_advanced.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"type": "mqtt",
"config": {
"broker": "tcp://localhost:1883",
"client_id": "solarthing-client",
"username": "solarthing_user",
"password": "my_cool_password",
"topic": "solarthing/%source/%fragment/%identifier",
"retain": true
}
}
7 changes: 7 additions & 0 deletions other/docs/mqtt_setup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## MQTT Setup
Note: MQTT support is very new for SolarThing, so this page is pretty informal right now.


### Setting up a broker
HiveMQ is a good broker:
* https://www.hivemq.com/downloads/docker/

0 comments on commit ad6da04

Please sign in to comment.