diff --git a/client/src/main/java/me/retrodaredevil/solarthing/mqtt/MqttPacketSaver.java b/client/src/main/java/me/retrodaredevil/solarthing/mqtt/MqttPacketSaver.java index 96c85706..c93fa85d 100644 --- a/client/src/main/java/me/retrodaredevil/solarthing/mqtt/MqttPacketSaver.java +++ b/client/src/main/java/me/retrodaredevil/solarthing/mqtt/MqttPacketSaver.java @@ -17,26 +17,29 @@ import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Map; public class MqttPacketSaver implements PacketHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(MqttPacketSaver.class); private static final ObjectMapper OBJECT_MAPPER = JacksonUtil.defaultMapper(); private static final Charset CHARSET = StandardCharsets.UTF_8; /** The default topic format. Note that the ending "/" is optional. */ - public static final String DEFAULT_TOPIC_FORMAT = "solarthing/%source/%fragment/%identifier/"; + public static final String DEFAULT_TOPIC_FORMAT = "solarthing/%source/%fragment/%identifier"; private final MemoryPersistence persistence = new MemoryPersistence(); private final MqttClient client; private final MqttConnectOptions options; - /** The topic format similar to "solarthing/%source/%fragment/%identifier/" */ + /** The topic format similar to "solarthing/%source/%fragment/%identifier" (NO slash at end) */ private final String topicFormat; private final boolean retain; public MqttPacketSaver(String broker, String clientId, String username, char[] password, String topicFormat, boolean retain) { - this.topicFormat = topicFormat.endsWith("/") ? topicFormat : topicFormat + "/"; + this.topicFormat = topicFormat; this.retain = retain; // org.eclipse.paho.client.mqttv3.logging.LoggerFactory.setLogger(); @@ -56,7 +59,7 @@ public MqttPacketSaver(String broker, String clientId, String username, char[] p public void handle(PacketCollection packetCollection, InstantType instantType) throws PacketHandleException { InstancePacketGroup instancePacketGroup = PacketGroups.parseToInstancePacketGroup(packetCollection, DefaultInstanceOptions.REQUIRE_NO_DEFAULTS); try { - handle(instancePacketGroup); + handle(instancePacketGroup, packetCollection); } catch (MqttException e) { throw new PacketHandleException("MQTT Exception", e); } finally { @@ -64,13 +67,12 @@ public void handle(PacketCollection packetCollection, InstantType instantType) t try { client.disconnect(); } catch (MqttException e) { - //noinspection ThrowFromFinallyBlock - throw new RuntimeException("We shouldn't get an exception while disconnecting...", e); + LOGGER.error("We shouldn't get an exception while disconnecting... (Can't throw in finally)", e); } } } } - private void handle(InstancePacketGroup instancePacketGroup) throws MqttException { + private void handle(InstancePacketGroup instancePacketGroup, PacketCollection entirePacketCollection) throws MqttException { DefaultInstanceOptions.requireNoDefaults(instancePacketGroup); long dateMillis = instancePacketGroup.getDateMillis(); final String partiallyFormattedTopic = topicFormat @@ -78,10 +80,24 @@ private void handle(InstancePacketGroup instancePacketGroup) throws MqttExceptio .replace("%fragment", "" + instancePacketGroup.getFragmentId()); client.connect(options); + { // block for entire packet collection + final String collectionJson; + try { + collectionJson = OBJECT_MAPPER.writeValueAsString(entirePacketCollection); + } catch (JsonProcessingException e) { + throw new RuntimeException("We should be able to serial this to JSON!", e); + } + client.publish( + partiallyFormattedTopic.replace("%identifier", "packetCollection"), + collectionJson.getBytes(CHARSET), + 2, + retain + ); + } for (Packet packet : instancePacketGroup.getPackets()) { if (packet instanceof Identifiable) { Identifiable identifiable = (Identifiable) packet; - String topic = partiallyFormattedTopic.replace("%identifier", identifiable.getIdentifier().getRepresentation()); + String topic = partiallyFormattedTopic.replace("%identifier", identifiable.getIdentifier().getRepresentation()) + "/"; final String rawJson; @@ -96,7 +112,8 @@ private void handle(InstancePacketGroup instancePacketGroup) throws MqttExceptio for (Map.Entry entry : PointUtil.flattenJsonObject(json, "/")) { String key = entry.getKey(); ValueNode prim = entry.getValue(); - client.publish(topic + "packet/" + key, prim.asText().getBytes(CHARSET), 2, retain); + // Make individual fields use QoS of 1 because we want them all to go through, but no one should be using them as "event"-like packets + client.publish(topic + "packet/" + key, prim.asText().getBytes(CHARSET), 1, retain); } } } diff --git a/config_templates/databases/mqtt_template.json b/config_templates/databases/mqtt_template.json index 3329c762..6769f199 100644 --- a/config_templates/databases/mqtt_template.json +++ b/config_templates/databases/mqtt_template.json @@ -2,7 +2,7 @@ "type": "mqtt", "config": { "broker": "tcp://localhost:1883", - "username": "solarthing_user", - "password": "my_cool_password" + "username": "admin", + "password": "hivemq" } } diff --git a/config_templates/databases/mqtt_template_advanced.json b/config_templates/databases/mqtt_template_advanced.json new file mode 100644 index 00000000..61e10e11 --- /dev/null +++ b/config_templates/databases/mqtt_template_advanced.json @@ -0,0 +1,11 @@ +{ + "type": "mqtt", + "config": { + "broker": "tcp://localhost:1883", + "client_id": "solarthing-client", + "username": "solarthing_user", + "password": "my_cool_password", + "topic": "solarthing/%source/%fragment/%identifier", + "retain": true + } +} diff --git a/other/docs/mqtt_setup.md b/other/docs/mqtt_setup.md new file mode 100644 index 00000000..1f44d968 --- /dev/null +++ b/other/docs/mqtt_setup.md @@ -0,0 +1,7 @@ +## MQTT Setup +Note: MQTT support is very new for SolarThing, so this page is pretty informal right now. + + +### Setting up a broker +HiveMQ is a good broker: +* https://www.hivemq.com/downloads/docker/