diff --git a/CHANGES.rst b/CHANGES.rst index d7866ad8..065f349b 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -6,6 +6,8 @@ Changelog incomplete ========== - Add possibility to acquire bulk readings in JSON format +- Add possibility to acquire bulk readings in compact JSON format, + with timestamps as keys in progress @@ -94,7 +96,6 @@ Infrastructure - QA: Improve tests on HTTP API for data acquisition - CI: Add testing against Python 3.9 - CI: Run tests against different versions of Mosquitto, InfluxDB and Grafana -- Add possibility to acquire bulk readings in JSON format .. _kotori-0.26.8: diff --git a/kotori/daq/decoder/__init__.py b/kotori/daq/decoder/__init__.py index 35b0e3fe..f9a8a874 100644 --- a/kotori/daq/decoder/__init__.py +++ b/kotori/daq/decoder/__init__.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # (c) 2019-2021 Andreas Motl from kotori.daq.decoder.airrohr import AirrohrDecoder +from kotori.daq.decoder.json import CompactTimestampedJsonDecoder from kotori.daq.decoder.tasmota import TasmotaSensorDecoder, TasmotaStateDecoder from kotori.daq.decoder.schema import MessageType @@ -23,6 +24,12 @@ def probe(self): if 'slot' not in self.topology: return False + # Compact JSON format, with timestamps as keys + if self.topology.slot.endswith('tc.json'): + self.info.message_type = MessageType.DATA_CONTAINER + self.info.decoder = CompactTimestampedJsonDecoder + return True + # Airrohr if self.topology.slot.endswith('airrohr.json'): self.info.message_type = MessageType.DATA_CONTAINER diff --git a/kotori/daq/decoder/json.py b/kotori/daq/decoder/json.py new file mode 100644 index 00000000..5e66de69 --- /dev/null +++ b/kotori/daq/decoder/json.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# (c) 2021 Andreas Motl +import json + + +class CompactTimestampedJsonDecoder: + """ + Decode JSON payloads in compact format, with timestamps as keys. + + Documentation + ============= + - https://getkotori.org/docs/handbook/decoders/json.html (not yet) + - https://github.com/daq-tools/kotori/issues/39 + + Example + ======= + :: + + { + "1611082554": { + "temperature": 21.42, + "humidity": 41.55 + }, + "1611082568": { + "temperature": 42.84, + "humidity": 83.1 + } + } + + """ + + @staticmethod + def decode(payload): + + # Decode from JSON. + message = json.loads(payload) + + # Create list of data dictionaries. + data = [] + for timestamp, item in message.items(): + item["time"] = timestamp + data.append(item) + + return data diff --git a/test/settings/mqttkit.py b/test/settings/mqttkit.py index 70c328d5..e7e4b910 100644 --- a/test/settings/mqttkit.py +++ b/test/settings/mqttkit.py @@ -27,6 +27,7 @@ class TestSettings: mqtt_topic3_json = 'mqttkit-1/itest3/foo/bar/data.json' mqtt_topic_event = 'mqttkit-1/itest/foo/bar/event.json' mqtt_topic_homie = 'mqttkit-1/itest/foo/bar/data/__json__' + mqtt_topic_json_compact = 'mqttkit-1/itest/foo/bar/tc.json' mqtt_topic_json_legacy = 'mqttkit-1/itest/foo/bar/message-json' # HTTP channel settings. diff --git a/test/test_daq_mqtt.py b/test/test_daq_mqtt.py index 3ca23683..eb0cb857 100644 --- a/test/test_daq_mqtt.py +++ b/test/test_daq_mqtt.py @@ -71,6 +71,40 @@ def test_mqtt_to_influxdb_json_bulk(machinery, create_influxdb, reset_influxdb): assert record == {u'temperature': 42.84, u'humidity': 83.1} +@pytest_twisted.inlineCallbacks +@pytest.mark.mqtt +def test_mqtt_to_influxdb_json_compact_bulk(machinery, create_influxdb, reset_influxdb): + """ + Publish multiple readings in compact JSON format to MQTT broker + and proof they are stored in the InfluxDB database. + + https://github.com/daq-tools/kotori/issues/39 + """ + + # Submit multiple measurements, with timestamp. + data = { + "1611082554": { + "temperature": 21.42, + "humidity": 41.55, + }, + "1611082568": { + "temperature": 42.84, + "humidity": 83.1, + }, + } + yield threads.deferToThread(mqtt_json_sensor, settings.mqtt_topic_json_compact, data) + + # Wait for some time to process the message. + yield sleep(PROCESS_DELAY_MQTT) + + # Proof that data arrived in InfluxDB. + record = influx_sensors.get_record(index=0) + assert record == {u'time': '2021-01-19T18:55:54Z', u'temperature': 21.42, u'humidity': 41.55} + + record = influx_sensors.get_record(index=1) + assert record == {u'time': '2021-01-19T18:56:08Z', u'temperature': 42.84, u'humidity': 83.1} + + @pytest_twisted.inlineCallbacks @pytest.mark.mqtt @pytest.mark.legacy