Skip to content

Commit

Permalink
allow storing and forwarding of readings in a batch
Browse files Browse the repository at this point in the history
  • Loading branch information
timcowlishaw committed Nov 13, 2024
1 parent 4f7553f commit 68d7e73
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 50 deletions.
10 changes: 5 additions & 5 deletions app/jobs/mqtt_forwarding_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ class MQTTForwardingJob < ApplicationJob

queue_as :mqtt_forward

def perform(device_id, reading)
def perform(device_id, readings)
begin
device = Device.find(device_id)
forwarder = MQTTForwarder.new(mqtt_client)
payload = payload_for(device, reading)
forwarder.forward_reading(device.forwarding_token, device.id, payload)
payload = payload_for(device, readings)
forwarder.forward_readings(device.forwarding_token, device.id, payload)
ensure
disconnect_mqtt!
end
end

private

def payload_for(device, reading)
Presenters.present(device, device.owner, renderer, readings: [reading])
def payload_for(device, readings)
Presenters.present(device, device.owner, renderer, readings: readings)
end

def mqtt_client
Expand Down
8 changes: 2 additions & 6 deletions app/jobs/send_to_datastore_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,8 @@ class SendToDatastoreJob < ApplicationJob

def perform(data_param, device_id)
@device = Device.includes(:components).find(device_id)
the_data = JSON.parse(data_param)
the_data.sort_by {|a| a['recorded_at']}.reverse.each_with_index do |reading, index|
# move to async method call
do_update = index == 0
storer.store(@device, reading, do_update)
end
readings = JSON.parse(data_param)
storer.store(@device, readings)
end

def storer
Expand Down
2 changes: 1 addition & 1 deletion app/lib/mqtt_forwarder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ def initialize(client)
@suffix = suffix
end

def forward_reading(token, device_id, reading)
def forward_readings(token, device_id, reading)
topic = topic_path(token, device_id)
client.publish(topic, reading)
end
Expand Down
4 changes: 1 addition & 3 deletions app/lib/mqtt_messages_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ def handle_readings(device, message)
parsed = JSON.parse(message) if message
data = parsed["data"] if parsed
return nil if data.nil? or data&.empty?
data.each do |reading|
storer.store(device, reading)
end
storer.store(device, data)
return true
rescue Exception => e
Sentry.capture_exception(e)
Expand Down
4 changes: 2 additions & 2 deletions app/models/concerns/message_forwarding.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ module MessageForwarding

extend ActiveSupport::Concern

def forward_reading(device, reading)
def forward_readings(device, readings)
if device.forward_readings?
MQTTForwardingJob.perform_later(device.id, reading)
MQTTForwardingJob.perform_later(device.id, readings)
end
end

Expand Down
2 changes: 1 addition & 1 deletion app/models/raw_storer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def store data, mac, version, ip, raise_errors=false
device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published')
end

forward_reading(device, sql_data)
forward_readings(device, [sql_data])
rescue Exception => e

success = false
Expand Down
31 changes: 16 additions & 15 deletions app/models/storer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,24 @@ class Storer
include DataParser::Storer
include MessageForwarding

def store device, reading, do_update = true
begin
parsed_reading = Storer.parse_reading(device, reading)
kairos_publish(parsed_reading[:_data])

if do_update
update_device(device, parsed_reading[:parsed_ts], parsed_reading[:sql_data])
def store device, readings
readings_to_forward = []
readings.sort_by {|a| a['recorded_at']}.reverse.each_with_index do |reading, index|
begin
parsed_reading = Storer.parse_reading(device, reading)
kairos_publish(parsed_reading[:_data])
readings_to_forward << parsed_reading[:sql_data]
if index == 0
update_device(device, parsed_reading[:parsed_ts], parsed_reading[:sql_data])
end


rescue Exception => e
Sentry.capture_exception(e)
raise e if Rails.env.test?
end

forward_reading(device, parsed_reading[:sql_data])

rescue Exception => e
Sentry.capture_exception(e)
raise e if Rails.env.test?
end

raise e unless e.nil?
forward_readings(device, readings_to_forward)
end

def update_device(device, parsed_ts, sql_data)
Expand Down
18 changes: 9 additions & 9 deletions spec/jobs/mqtt_forwarding_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

let(:device) { create(:device) }

let(:reading) { double(:reading) }
let(:readings) { double(:readings) }

let(:mqtt_client) {
double(:mqtt_client).tap do |mqtt_client|
Expand All @@ -24,7 +24,7 @@

let(:forwarder) {
double(:forwarder).tap do |forwarder|
allow(forwarder).to receive(:forward_reading)
allow(forwarder).to receive(:forward_readings)
end
}

Expand All @@ -37,30 +37,30 @@
end

it "creates an mqtt client with a clean session and no client id" do
MQTTForwardingJob.perform_now(device.id, reading)
MQTTForwardingJob.perform_now(device.id, readings)
expect(MQTTClientFactory).to have_received(:create_client).with({
clean_session: true,
client_id: nil
})
end

it "creates a forwarder with the mqtt client" do
MQTTForwardingJob.perform_now(device.id, reading)
MQTTForwardingJob.perform_now(device.id, readings)
expect(MQTTForwarder).to have_received(:new).with(mqtt_client)
end

it "renders the device json for the given device and reading, as the device owner" do
MQTTForwardingJob.perform_now(device.id, reading)
expect(Presenters).to have_received(:present).with(device, device.owner, renderer, readings: [reading])
MQTTForwardingJob.perform_now(device.id, readings)
expect(Presenters).to have_received(:present).with(device, device.owner, renderer, readings: readings)
end

it "forwards using the device's id and forwarding token, with the rendered json payload" do
MQTTForwardingJob.perform_now(device.id, reading)
expect(forwarder).to have_received(:forward_reading).with(forwarding_token, device.id, device_json)
MQTTForwardingJob.perform_now(device.id, readings)
expect(forwarder).to have_received(:forward_readings).with(forwarding_token, device.id, device_json)
end

it "disconnects the MQTT client" do
MQTTForwardingJob.perform_now(device.id, reading)
MQTTForwardingJob.perform_now(device.id, readings)
expect(mqtt_client).to have_received(:disconnect)
end

Expand Down
16 changes: 8 additions & 8 deletions spec/models/storer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
# model/storer.rb is not using Kairos, but Redis -> Telnet
# expect(Kairos).to receive(:http_post_to).with("/datapoints", @karios_data)
expect do
storer.store(device, @data)
storer.store(device, [@data])
end.not_to raise_error
end

Expand All @@ -60,13 +60,13 @@
Time.parse(@data['recorded_at']),
[sensor.id]
)
storer.store(device, @data)
storer.store(device, [@data])
end

skip 'updates device without touching updated_at' do
updated_at = device.updated_at

storer.store(device, @data)
storer.store(device, [@data])

expect(device.reload.updated_at).to eq(updated_at)

Expand All @@ -84,16 +84,16 @@

it "forwards the message with the forwarding token and the device's id" do
allow(device).to receive(:forward_readings?).and_return(true)
expect(MQTTForwardingJob).to receive(:perform_later).with(device.id, @sql_data)
storer.store(device, @data)
expect(MQTTForwardingJob).to receive(:perform_later).with(device.id, [@sql_data])
storer.store(device, [@data])
end
end

context "when the device does not allow forwarding" do
it "does not forward the message" do
allow(device).to receive(:forward_readings?).and_return(false)
expect(MQTTForwardingJob).not_to receive(:perform_later)
storer.store(device, @data)
storer.store(device, [@data])
end
end
end
Expand All @@ -110,11 +110,11 @@

it 'does raise error' do
expect(Kairos).not_to receive(:http_post_to).with("/datapoints", anything)
expect{ storer.store(device, @bad_data) }.to raise_error(ArgumentError)
expect{ storer.store(device, [@bad_data]) }.to raise_error(ArgumentError)
end

it 'does not update device' do
expect{ storer.store(device, @bad_data) }.to raise_error(ArgumentError)
expect{ storer.store(device, [@bad_data]) }.to raise_error(ArgumentError)

expect(device.reload.last_reading_at).to eq(nil)
expect(device.reload.data).to eq(nil)
Expand Down

0 comments on commit 68d7e73

Please sign in to comment.