From 68d7e738e1eba2b4cd76bc1fd1418018d0bdd63e Mon Sep 17 00:00:00 2001 From: Tim Cowlishaw Date: Wed, 13 Nov 2024 17:59:55 +0100 Subject: [PATCH] allow storing and forwarding of readings in a batch --- app/jobs/mqtt_forwarding_job.rb | 10 ++++---- app/jobs/send_to_datastore_job.rb | 8 ++---- app/lib/mqtt_forwarder.rb | 2 +- app/lib/mqtt_messages_handler.rb | 4 +-- app/models/concerns/message_forwarding.rb | 4 +-- app/models/raw_storer.rb | 2 +- app/models/storer.rb | 31 ++++++++++++----------- spec/jobs/mqtt_forwarding_job_spec.rb | 18 ++++++------- spec/models/storer_spec.rb | 16 ++++++------ 9 files changed, 45 insertions(+), 50 deletions(-) diff --git a/app/jobs/mqtt_forwarding_job.rb b/app/jobs/mqtt_forwarding_job.rb index efe4aa0a..913aa39f 100644 --- a/app/jobs/mqtt_forwarding_job.rb +++ b/app/jobs/mqtt_forwarding_job.rb @@ -2,12 +2,12 @@ class MQTTForwardingJob < ApplicationJob queue_as :mqtt_forward - def perform(device_id, reading) + def perform(device_id, readings) begin device = Device.find(device_id) forwarder = MQTTForwarder.new(mqtt_client) - payload = payload_for(device, reading) - forwarder.forward_reading(device.forwarding_token, device.id, payload) + payload = payload_for(device, readings) + forwarder.forward_readings(device.forwarding_token, device.id, payload) ensure disconnect_mqtt! end @@ -15,8 +15,8 @@ def perform(device_id, reading) private - def payload_for(device, reading) - Presenters.present(device, device.owner, renderer, readings: [reading]) + def payload_for(device, readings) + Presenters.present(device, device.owner, renderer, readings: readings) end def mqtt_client diff --git a/app/jobs/send_to_datastore_job.rb b/app/jobs/send_to_datastore_job.rb index 29cb3d7c..46816ffd 100644 --- a/app/jobs/send_to_datastore_job.rb +++ b/app/jobs/send_to_datastore_job.rb @@ -3,12 +3,8 @@ class SendToDatastoreJob < ApplicationJob def perform(data_param, device_id) @device = Device.includes(:components).find(device_id) - the_data = JSON.parse(data_param) - the_data.sort_by {|a| a['recorded_at']}.reverse.each_with_index do |reading, index| - # move to async method call - do_update = index == 0 - storer.store(@device, reading, do_update) - end + readings = JSON.parse(data_param) + storer.store(@device, readings) end def storer diff --git a/app/lib/mqtt_forwarder.rb b/app/lib/mqtt_forwarder.rb index 9a37d1c0..4522d575 100644 --- a/app/lib/mqtt_forwarder.rb +++ b/app/lib/mqtt_forwarder.rb @@ -5,7 +5,7 @@ def initialize(client) @suffix = suffix end - def forward_reading(token, device_id, reading) + def forward_readings(token, device_id, reading) topic = topic_path(token, device_id) client.publish(topic, reading) end diff --git a/app/lib/mqtt_messages_handler.rb b/app/lib/mqtt_messages_handler.rb index 62282114..0fd8156b 100644 --- a/app/lib/mqtt_messages_handler.rb +++ b/app/lib/mqtt_messages_handler.rb @@ -37,9 +37,7 @@ def handle_readings(device, message) parsed = JSON.parse(message) if message data = parsed["data"] if parsed return nil if data.nil? or data&.empty? - data.each do |reading| - storer.store(device, reading) - end + storer.store(device, data) return true rescue Exception => e Sentry.capture_exception(e) diff --git a/app/models/concerns/message_forwarding.rb b/app/models/concerns/message_forwarding.rb index 5136d71e..ae1cadb8 100644 --- a/app/models/concerns/message_forwarding.rb +++ b/app/models/concerns/message_forwarding.rb @@ -2,9 +2,9 @@ module MessageForwarding extend ActiveSupport::Concern - def forward_reading(device, reading) + def forward_readings(device, readings) if device.forward_readings? - MQTTForwardingJob.perform_later(device.id, reading) + MQTTForwardingJob.perform_later(device.id, readings) end end diff --git a/app/models/raw_storer.rb b/app/models/raw_storer.rb index 955a1f48..ec903202 100644 --- a/app/models/raw_storer.rb +++ b/app/models/raw_storer.rb @@ -62,7 +62,7 @@ def store data, mac, version, ip, raise_errors=false device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published') end - forward_reading(device, sql_data) + forward_readings(device, [sql_data]) rescue Exception => e success = false diff --git a/app/models/storer.rb b/app/models/storer.rb index d28eb3b0..a08f6977 100644 --- a/app/models/storer.rb +++ b/app/models/storer.rb @@ -2,23 +2,24 @@ class Storer include DataParser::Storer include MessageForwarding - def store device, reading, do_update = true - begin - parsed_reading = Storer.parse_reading(device, reading) - kairos_publish(parsed_reading[:_data]) - - if do_update - update_device(device, parsed_reading[:parsed_ts], parsed_reading[:sql_data]) + def store device, readings + readings_to_forward = [] + readings.sort_by {|a| a['recorded_at']}.reverse.each_with_index do |reading, index| + begin + parsed_reading = Storer.parse_reading(device, reading) + kairos_publish(parsed_reading[:_data]) + readings_to_forward << parsed_reading[:sql_data] + if index == 0 + update_device(device, parsed_reading[:parsed_ts], parsed_reading[:sql_data]) + end + + + rescue Exception => e + Sentry.capture_exception(e) + raise e if Rails.env.test? end - - forward_reading(device, parsed_reading[:sql_data]) - - rescue Exception => e - Sentry.capture_exception(e) - raise e if Rails.env.test? end - - raise e unless e.nil? + forward_readings(device, readings_to_forward) end def update_device(device, parsed_ts, sql_data) diff --git a/spec/jobs/mqtt_forwarding_job_spec.rb b/spec/jobs/mqtt_forwarding_job_spec.rb index 1ae052d7..57fc036d 100644 --- a/spec/jobs/mqtt_forwarding_job_spec.rb +++ b/spec/jobs/mqtt_forwarding_job_spec.rb @@ -6,7 +6,7 @@ let(:device) { create(:device) } - let(:reading) { double(:reading) } + let(:readings) { double(:readings) } let(:mqtt_client) { double(:mqtt_client).tap do |mqtt_client| @@ -24,7 +24,7 @@ let(:forwarder) { double(:forwarder).tap do |forwarder| - allow(forwarder).to receive(:forward_reading) + allow(forwarder).to receive(:forward_readings) end } @@ -37,7 +37,7 @@ end it "creates an mqtt client with a clean session and no client id" do - MQTTForwardingJob.perform_now(device.id, reading) + MQTTForwardingJob.perform_now(device.id, readings) expect(MQTTClientFactory).to have_received(:create_client).with({ clean_session: true, client_id: nil @@ -45,22 +45,22 @@ end it "creates a forwarder with the mqtt client" do - MQTTForwardingJob.perform_now(device.id, reading) + MQTTForwardingJob.perform_now(device.id, readings) expect(MQTTForwarder).to have_received(:new).with(mqtt_client) end it "renders the device json for the given device and reading, as the device owner" do - MQTTForwardingJob.perform_now(device.id, reading) - expect(Presenters).to have_received(:present).with(device, device.owner, renderer, readings: [reading]) + MQTTForwardingJob.perform_now(device.id, readings) + expect(Presenters).to have_received(:present).with(device, device.owner, renderer, readings: readings) end it "forwards using the device's id and forwarding token, with the rendered json payload" do - MQTTForwardingJob.perform_now(device.id, reading) - expect(forwarder).to have_received(:forward_reading).with(forwarding_token, device.id, device_json) + MQTTForwardingJob.perform_now(device.id, readings) + expect(forwarder).to have_received(:forward_readings).with(forwarding_token, device.id, device_json) end it "disconnects the MQTT client" do - MQTTForwardingJob.perform_now(device.id, reading) + MQTTForwardingJob.perform_now(device.id, readings) expect(mqtt_client).to have_received(:disconnect) end diff --git a/spec/models/storer_spec.rb b/spec/models/storer_spec.rb index 00d648da..4135619d 100644 --- a/spec/models/storer_spec.rb +++ b/spec/models/storer_spec.rb @@ -51,7 +51,7 @@ # model/storer.rb is not using Kairos, but Redis -> Telnet # expect(Kairos).to receive(:http_post_to).with("/datapoints", @karios_data) expect do - storer.store(device, @data) + storer.store(device, [@data]) end.not_to raise_error end @@ -60,13 +60,13 @@ Time.parse(@data['recorded_at']), [sensor.id] ) - storer.store(device, @data) + storer.store(device, [@data]) end skip 'updates device without touching updated_at' do updated_at = device.updated_at - storer.store(device, @data) + storer.store(device, [@data]) expect(device.reload.updated_at).to eq(updated_at) @@ -84,8 +84,8 @@ it "forwards the message with the forwarding token and the device's id" do allow(device).to receive(:forward_readings?).and_return(true) - expect(MQTTForwardingJob).to receive(:perform_later).with(device.id, @sql_data) - storer.store(device, @data) + expect(MQTTForwardingJob).to receive(:perform_later).with(device.id, [@sql_data]) + storer.store(device, [@data]) end end @@ -93,7 +93,7 @@ it "does not forward the message" do allow(device).to receive(:forward_readings?).and_return(false) expect(MQTTForwardingJob).not_to receive(:perform_later) - storer.store(device, @data) + storer.store(device, [@data]) end end end @@ -110,11 +110,11 @@ it 'does raise error' do expect(Kairos).not_to receive(:http_post_to).with("/datapoints", anything) - expect{ storer.store(device, @bad_data) }.to raise_error(ArgumentError) + expect{ storer.store(device, [@bad_data]) }.to raise_error(ArgumentError) end it 'does not update device' do - expect{ storer.store(device, @bad_data) }.to raise_error(ArgumentError) + expect{ storer.store(device, [@bad_data]) }.to raise_error(ArgumentError) expect(device.reload.last_reading_at).to eq(nil) expect(device.reload.data).to eq(nil)