Skip to content

Commit

Permalink
Add possibility to acquire bulk readings in compact JSON format
Browse files Browse the repository at this point in the history
The "compact JSON" format is currently defined with timestamps as keys.

Example:
{
  "1611082554": {
    "temperature": 21.42,
    "humidity": 41.55
  },
  "1611082568": {
    "temperature": 42.84,
    "humidity": 83.1
  }
}
  • Loading branch information
amotl committed Nov 26, 2022
1 parent 249cec7 commit 7085ddb
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ Changelog
incomplete
==========
- Add possibility to acquire bulk readings in JSON format
- Add possibility to acquire bulk readings in compact JSON format,
with timestamps as keys


in progress
Expand Down Expand Up @@ -94,7 +96,6 @@ Infrastructure
- QA: Improve tests on HTTP API for data acquisition
- CI: Add testing against Python 3.9
- CI: Run tests against different versions of Mosquitto, InfluxDB and Grafana
- Add possibility to acquire bulk readings in JSON format


.. _kotori-0.26.8:
Expand Down
7 changes: 7 additions & 0 deletions kotori/daq/decoder/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# (c) 2019-2021 Andreas Motl <andreas@getkotori.org>
from kotori.daq.decoder.airrohr import AirrohrDecoder
from kotori.daq.decoder.json import CompactTimestampedJsonDecoder
from kotori.daq.decoder.tasmota import TasmotaSensorDecoder, TasmotaStateDecoder
from kotori.daq.decoder.schema import MessageType

Expand All @@ -23,6 +24,12 @@ def probe(self):
if 'slot' not in self.topology:
return False

# Compact JSON format, with timestamps as keys
if self.topology.slot.endswith('tc.json'):
self.info.message_type = MessageType.DATA_CONTAINER
self.info.decoder = CompactTimestampedJsonDecoder
return True

# Airrohr
if self.topology.slot.endswith('airrohr.json'):
self.info.message_type = MessageType.DATA_CONTAINER
Expand Down
44 changes: 44 additions & 0 deletions kotori/daq/decoder/json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
# (c) 2021 Andreas Motl <andreas@getkotori.org>
import json


class CompactTimestampedJsonDecoder:
"""
Decode JSON payloads in compact format, with timestamps as keys.
Documentation
=============
- https://getkotori.org/docs/handbook/decoders/json.html (not yet)
- https://github.com/daq-tools/kotori/issues/39
Example
=======
::
{
"1611082554": {
"temperature": 21.42,
"humidity": 41.55
},
"1611082568": {
"temperature": 42.84,
"humidity": 83.1
}
}
"""

@staticmethod
def decode(payload):

# Decode from JSON.
message = json.loads(payload)

# Create list of data dictionaries.
data = []
for timestamp, item in message.items():
item["time"] = timestamp
data.append(item)

return data
1 change: 1 addition & 0 deletions test/settings/mqttkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class TestSettings:
mqtt_topic3_json = 'mqttkit-1/itest3/foo/bar/data.json'
mqtt_topic_event = 'mqttkit-1/itest/foo/bar/event.json'
mqtt_topic_homie = 'mqttkit-1/itest/foo/bar/data/__json__'
mqtt_topic_json_compact = 'mqttkit-1/itest/foo/bar/tc.json'
mqtt_topic_json_legacy = 'mqttkit-1/itest/foo/bar/message-json'

# HTTP channel settings.
Expand Down
34 changes: 34 additions & 0 deletions test/test_daq_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,40 @@ def test_mqtt_to_influxdb_json_bulk(machinery, create_influxdb, reset_influxdb):
assert record == {u'temperature': 42.84, u'humidity': 83.1}


@pytest_twisted.inlineCallbacks
@pytest.mark.mqtt
def test_mqtt_to_influxdb_json_compact_bulk(machinery, create_influxdb, reset_influxdb):
"""
Publish multiple readings in compact JSON format to MQTT broker
and proof they are stored in the InfluxDB database.
https://github.com/daq-tools/kotori/issues/39
"""

# Submit multiple measurements, with timestamp.
data = {
"1611082554": {
"temperature": 21.42,
"humidity": 41.55,
},
"1611082568": {
"temperature": 42.84,
"humidity": 83.1,
},
}
yield threads.deferToThread(mqtt_json_sensor, settings.mqtt_topic_json_compact, data)

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY_MQTT)

# Proof that data arrived in InfluxDB.
record = influx_sensors.get_record(index=0)
assert record == {u'time': '2021-01-19T18:55:54Z', u'temperature': 21.42, u'humidity': 41.55}

record = influx_sensors.get_record(index=1)
assert record == {u'time': '2021-01-19T18:56:08Z', u'temperature': 42.84, u'humidity': 83.1}


@pytest_twisted.inlineCallbacks
@pytest.mark.mqtt
@pytest.mark.legacy
Expand Down

0 comments on commit 7085ddb

Please sign in to comment.