From 83b4d636d99419dfb9a18b906cf63c750a493d47 Mon Sep 17 00:00:00 2001 From: Francesco Noacco Date: Tue, 29 Aug 2023 17:40:58 +0200 Subject: [PATCH 1/3] test(dup): remove a few warnings Remove some unused aliases and variables Signed-off-by: Francesco Noacco --- .../triggers_handler_test.exs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/triggers_handler_test.exs b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/triggers_handler_test.exs index 0e01cc9f0..d2d85b083 100644 --- a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/triggers_handler_test.exs +++ b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/triggers_handler_test.exs @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -32,8 +32,7 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do PathRemovedEvent, SimpleEvent, ValueChangeAppliedEvent, - ValueChangeEvent, - ValueStoredEvent + ValueChangeEvent } alias AMQP.Channel @@ -734,7 +733,7 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do end) end - test "HTTP trigger with no policy defaults to default one", %{chan: chan} do + test "HTTP trigger with no policy defaults to default one" do simple_trigger_id = :uuid.get_v4() parent_trigger_id = :uuid.get_v4() static_header_key = "important_metadata_value_change_applied" @@ -797,7 +796,7 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do assert Map.get(headers_map, static_header_key) == static_header_value end - test "HTTP trigger with explicit trigger policy is correctly routed", %{chan: chan} do + test "HTTP trigger with explicit trigger policy is correctly routed" do simple_trigger_id = :uuid.get_v4() parent_trigger_id = :uuid.get_v4() static_header_key = "important_metadata_value_change_applied" @@ -860,7 +859,7 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do assert Map.get(headers_map, static_header_key) == static_header_value end - test "AMQP trigger has no trigger policy", %{chan: chan} do + test "AMQP trigger has no trigger policy" do simple_trigger_id = :uuid.get_v4() parent_trigger_id = :uuid.get_v4() static_header_key = "important_metadata_value_change_applied" From fbad22c5c4ec1a74b3a9ed47a72707ddd87883a1 Mon Sep 17 00:00:00 2001 From: Francesco Noacco Date: Tue, 29 Aug 2023 15:29:31 +0200 Subject: [PATCH 2/3] chore(dup): update xandra 0.13 -> 0.17 Signed-off-by: Francesco Noacco --- .../astarte_data_updater_plant/application.ex | 7 ++----- .../lib/astarte_data_updater_plant/config.ex | 9 +++++++-- apps/astarte_data_updater_plant/mix.exs | 4 ++-- apps/astarte_data_updater_plant/mix.lock | 7 ++++--- .../data_updater_test.exs | 3 ++- .../test/support/database_test_helper.exs | 17 ++++++++++++++++- 6 files changed, 33 insertions(+), 14 deletions(-) diff --git a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/application.ex b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/application.ex index da466df65..a33fd7dfc 100644 --- a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/application.ex +++ b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/application.ex @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -42,14 +42,11 @@ defmodule Astarte.DataUpdaterPlant.Application do DataAccessConfig.validate!() xandra_options = Config.xandra_options!() - data_access_opts = [xandra_options: xandra_options] - dup_xandra_opts = Keyword.put(xandra_options, :name, :xandra) - children = [ Astarte.DataUpdaterPlantWeb.Telemetry, - {Xandra.Cluster, dup_xandra_opts}, + {Xandra.Cluster, xandra_options}, {Astarte.DataAccess, data_access_opts}, Astarte.DataUpdaterPlant.DataPipelineSupervisor ] diff --git a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/config.ex b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/config.ex index 2bbf8c759..a93c97018 100644 --- a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/config.ex +++ b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/config.ex @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -389,6 +389,11 @@ defmodule Astarte.DataUpdaterPlant.Config do defdelegate cqex_nodes, to: DataAccessConfig defdelegate cqex_nodes!, to: DataAccessConfig - defdelegate xandra_options!, to: DataAccessConfig + def xandra_options! do + DataAccessConfig.xandra_options!() + |> Keyword.put(:name, :xandra) + |> Keyword.drop([:autodiscovery]) + end + defdelegate cqex_options!, to: DataAccessConfig end diff --git a/apps/astarte_data_updater_plant/mix.exs b/apps/astarte_data_updater_plant/mix.exs index b31c70508..48280793c 100644 --- a/apps/astarte_data_updater_plant/mix.exs +++ b/apps/astarte_data_updater_plant/mix.exs @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017-2021 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -80,7 +80,7 @@ defmodule Astarte.DataUpdaterPlant.Mixfile do {:telemetry_metrics_prometheus_core, "~> 0.4"}, {:telemetry_metrics, "~> 0.4"}, {:telemetry_poller, "~> 0.4"}, - {:xandra, "~> 0.13"}, + {:xandra, "~> 0.17"}, {:skogsra, "~> 2.2"}, {:telemetry, "~> 0.4"}, {:observer_cli, "~> 1.5"}, diff --git a/apps/astarte_data_updater_plant/mix.lock b/apps/astarte_data_updater_plant/mix.lock index 8a57293e3..1e9c1002f 100644 --- a/apps/astarte_data_updater_plant/mix.lock +++ b/apps/astarte_data_updater_plant/mix.lock @@ -14,8 +14,8 @@ "cqex": {:hex, :cqex, "1.0.1", "bc9980ac3b82d039879f8d6ca589deab799fe08f80ff449d60ad709f2524718f", [:mix], [{:cqerl, "~> 2.0.1", [hex: :cqerl, repo: "hexpm", optional: false]}], "hexpm", "1bbf2079c044cbf0f747f60dcf0409a951eaa8f1a2447cd6d80d6ff1b7c4dc6b"}, "credentials_obfuscation": {:hex, :credentials_obfuscation, "2.4.0", "9fb57683b84899ca3546b384e59ab5d3054a9f334eba50d74c82cd0ae82dd6ca", [:rebar3], [], "hexpm", "d28a89830e30698b075de9a4dbe683a20685c6bed1e3b7df744a0c06e6ff200a"}, "cyanide": {:hex, :cyanide, "2.0.0", "f97b700b87f9b0679ae812f0c4b7fe35ea6541a4121a096cf10287941b7a6d55", [:mix], [], "hexpm", "7f9748251804c2a2115b539202568e1117ab2f0ae09875853fb89cc94ae19dd1"}, - "db_connection": {:hex, :db_connection, "2.3.1", "4c9f3ed1ef37471cbdd2762d6655be11e38193904d9c5c1c9389f1b891a3088e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm", "abaab61780dde30301d840417890bd9f74131041afd02174cf4e10635b3a63f5"}, - "decimal": {:hex, :decimal, "1.9.0", "83e8daf59631d632b171faabafb4a9f4242c514b0a06ba3df493951c08f64d07", [:mix], [], "hexpm", "b1f2343568eed6928f3e751cf2dffde95bfaa19dd95d09e8a9ea92ccfd6f7d85"}, + "db_connection": {:hex, :db_connection, "2.5.0", "bb6d4f30d35ded97b29fe80d8bd6f928a1912ca1ff110831edcd238a1973652c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c92d5ba26cd69ead1ff7582dbb860adeedfff39774105a4f1c92cbb654b55aa2"}, + "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, "dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"}, "dialyzex": {:git, "https://github.com/Comcast/dialyzex.git", "cdc7cf71fe6df0ce4cf59e3f497579697a05c989", []}, "ecto": {:hex, :ecto, "3.5.8", "8ebf12be6016cb99313348ba7bb4612f4114b9a506d6da79a2adc7ef449340bc", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ea0be182ea8922eb7742e3ae8e71b67ee00ae177de1bf76210299a5f16ba4c77"}, @@ -35,6 +35,7 @@ "mime": {:hex, :mime, "1.5.0", "203ef35ef3389aae6d361918bf3f952fa17a09e8e43b5aa592b93eba05d0fb8d", [:mix], [], "hexpm", "55a94c0f552249fc1a3dd9cd2d3ab9de9d3c89b559c2bd01121f824834f24746"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, "mox": {:hex, :mox, "0.5.2", "55a0a5ba9ccc671518d068c8dddd20eeb436909ea79d1799e2209df7eaa98b6c", [:mix], [], "hexpm", "df4310628cd628ee181df93f50ddfd07be3e5ecc30232d3b6aadf30bdfe6092b"}, + "nimble_options": {:hex, :nimble_options, "1.0.2", "92098a74df0072ff37d0c12ace58574d26880e522c22801437151a159392270e", [:mix], [], "hexpm", "fd12a8db2021036ce12a309f26f564ec367373265b53e25403f0ee697380f1b8"}, "observer_cli": {:hex, :observer_cli, "1.6.1", "d176f967c978ab8b8a29c35c12524f78b7bb36fd4e9b8276dd75c9cb56e07e42", [:mix, :rebar3], [{:recon, "~>2.5.1", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "3418e319764b9dff1f469e43cbdffd7fd54ea47cbf765027c557abd146a19fb3"}, "parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"}, "plug": {:hex, :plug, "1.11.1", "f2992bac66fdae679453c9e86134a4201f6f43a687d8ff1cd1b2862d53c80259", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "23524e4fefbb587c11f0833b3910bfb414bf2e2534d61928e920f54e3a1b881f"}, @@ -57,5 +58,5 @@ "telemetry_poller": {:hex, :telemetry_poller, "0.5.1", "21071cc2e536810bac5628b935521ff3e28f0303e770951158c73eaaa01e962a", [:rebar3], [{:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "4cab72069210bc6e7a080cec9afffad1b33370149ed5d379b81c7c5f0c663fd4"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, "uuid": {:hex, :uuid_erl, "2.0.1", "1fd9079c544d521063897887a1c5b3302dca98f9bb06aadcdc6fb0663f256797", [:rebar3], [{:quickrand, "~> 2.0.1", [hex: :quickrand, repo: "hexpm", optional: false]}], "hexpm", "ab57caccd51f170011e5f444ce865f84b41605e483a9efcc468c1afaec87553b"}, - "xandra": {:hex, :xandra, "0.13.1", "f82866e6c47527f74f35dd3007b5311121852dd861a29ed1613e27ccfaba0102", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "a2efdb8921e3b694bf3505e40c5ec9d353d8fa3755cec946be7c18b8236d7230"}, + "xandra": {:hex, :xandra, "0.17.0", "c1291a6ade16d19ddf4ebb5e3e947b5e3177e3a0791913a2c4a947b34aa5d400", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "65937898bbfe5eba692a5ce2937cba792bef01deae866cecdd0f0f59b327c88a"}, } diff --git a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs index dc2a92c00..da59b6df5 100644 --- a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs +++ b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017,2018 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -42,6 +42,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do alias CQEx.Result, as: DatabaseResult setup_all do + DatabaseTestHelper.await_cluster_connected!() {:ok, _client} = Astarte.DataUpdaterPlant.DatabaseTestHelper.create_test_keyspace() {:ok, _pid} = AMQPTestHelper.start_link() diff --git a/apps/astarte_data_updater_plant/test/support/database_test_helper.exs b/apps/astarte_data_updater_plant/test/support/database_test_helper.exs index 5effd76b8..d3a63d0a5 100644 --- a/apps/astarte_data_updater_plant/test/support/database_test_helper.exs +++ b/apps/astarte_data_updater_plant/test/support/database_test_helper.exs @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -861,4 +861,19 @@ defmodule Astarte.DataUpdaterPlant.DatabaseTestHelper do def equal_to_group_incoming_trigger_id() do <<140, 143, 242, 83, 113, 178, 249, 23, 213, 224, 46, 58, 138, 34, 20, 45>> end + + # TODO: include in astarte_data_access + def await_cluster_connected!(cluster \\ nil, tries \\ 10) do + cluster = cluster || Config.xandra_options!()[:name] + fun = &Xandra.execute!(&1, "SELECT * FROM system.local") + + with {:error, %Xandra.ConnectionError{}} <- Xandra.Cluster.run(cluster, _options = [], fun) do + if tries > 0 do + Process.sleep(100) + await_cluster_connected!(cluster, tries - 1) + else + raise("Connection to the cluster failed") + end + end + end end From 33493adb7849afd4fd323f3d510f3c724be16fc9 Mon Sep 17 00:00:00 2001 From: Francesco Noacco Date: Tue, 29 Aug 2023 17:43:17 +0200 Subject: [PATCH 3/3] refactor(dup): port queries to xandra Ports all the existing queries to xandra, with a little refactor where deemed necessary (no, no `"simple flow"` rework _yet_). Removes all references to CQEx. Signed-off-by: Francesco Noacco --- .../lib/astarte_data_updater_plant/config.ex | 8 - .../data_updater.ex | 24 +- .../data_updater/impl.ex | 296 +++--- .../data_updater/queries.ex | 920 +++++++++--------- .../data_updater_test.exs | 470 ++++----- .../test/support/database_test_helper.exs | 799 +++++++-------- 6 files changed, 1168 insertions(+), 1349 deletions(-) diff --git a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/config.ex b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/config.ex index a93c97018..1226a2fe0 100644 --- a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/config.ex +++ b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/config.ex @@ -383,17 +383,9 @@ defmodule Astarte.DataUpdaterPlant.Config do defdelegate xandra_nodes, to: DataAccessConfig defdelegate xandra_nodes!, to: DataAccessConfig - @doc """ - Returns Cassandra nodes formatted in the CQEx format. - """ - defdelegate cqex_nodes, to: DataAccessConfig - defdelegate cqex_nodes!, to: DataAccessConfig - def xandra_options! do DataAccessConfig.xandra_options!() |> Keyword.put(:name, :xandra) |> Keyword.drop([:autodiscovery]) end - - defdelegate cqex_options!, to: DataAccessConfig end diff --git a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater.ex b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater.ex index cf3970a80..2b87e8509 100644 --- a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater.ex +++ b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater.ex @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater do alias Astarte.DataUpdaterPlant.AMQPDataConsumer alias Astarte.DataUpdaterPlant.DataUpdater.Server alias Astarte.DataUpdaterPlant.DataUpdater.Queries - alias Astarte.DataAccess.Database alias Astarte.DataUpdaterPlant.MessageTracker require Logger @@ -226,19 +225,14 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater do defp verify_device_exists(realm_name, encoded_device_id) do with {:ok, decoded_device_id} <- Device.decode_device_id(encoded_device_id), - {:ok, client} <- Database.connect(realm: realm_name), - {:ok, exists?} <- Queries.check_device_exists(client, decoded_device_id) do - if exists? do - :ok - else - _ = - Logger.warn( - "Device #{encoded_device_id} in realm #{realm_name} does not exist.", - tag: "device_does_not_exist" - ) - - {:error, :device_does_not_exist} - end + {:error, :device_does_not_exist} <- + Queries.check_device_exists(realm_name, decoded_device_id) do + Logger.warn( + "Device #{encoded_device_id} in realm #{realm_name} does not exist.", + tag: "device_does_not_exist" + ) + + {:error, :device_does_not_exist} end end end diff --git a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/impl.ex b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/impl.ex index 10165fd5d..27b0102a8 100644 --- a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/impl.ex +++ b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/impl.ex @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -30,7 +30,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do alias Astarte.Core.Triggers.SimpleTriggersProtobuf.Utils, as: SimpleTriggersProtobufUtils alias Astarte.Core.Triggers.SimpleTriggersProtobuf.AMQPTriggerTarget alias Astarte.DataAccess.Data - alias Astarte.DataAccess.Database alias Astarte.DataAccess.Device, as: DeviceQueries alias Astarte.DataAccess.Interface, as: InterfaceQueries alias Astarte.DataAccess.Mappings @@ -81,12 +80,10 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do Logger.metadata(realm: realm, device_id: encoded_device_id) Logger.info("Created device process.", tag: "device_process_created") - {:ok, db_client} = Database.connect(realm: new_state.realm) - stats_and_introspection = - Queries.retrieve_device_stats_and_introspection!(db_client, device_id) + Queries.retrieve_device_stats_and_introspection!(new_state.realm, device_id) - {:ok, ttl} = Queries.fetch_datastream_maximum_storage_retention(db_client) + {:ok, ttl} = Queries.fetch_datastream_maximum_storage_retention(new_state.realm) Map.merge(new_state, stats_and_introspection) |> Map.put(:datastream_maximum_storage_retention, ttl) @@ -99,9 +96,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end def handle_connection(state, ip_address_string, message_id, timestamp) do - {:ok, db_client} = Database.connect(realm: state.realm) - - new_state = execute_time_based_actions(state, timestamp, db_client) + new_state = execute_time_based_actions(state, timestamp) timestamp_ms = div(timestamp, 10_000) @@ -121,7 +116,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end Queries.set_device_connected!( - db_client, + state.realm, new_state.device_id, timestamp_ms, ip_address @@ -155,11 +150,9 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do # TODO make this private when all heartbeats will be moved to internal def handle_heartbeat(state, message_id, timestamp) do - {:ok, db_client} = Database.connect(realm: state.realm) + new_state = execute_time_based_actions(state, timestamp) - new_state = execute_time_based_actions(state, timestamp, db_client) - - Queries.maybe_refresh_device_connected!(db_client, new_state.device_id) + Queries.maybe_refresh_device_connected!(new_state.realm, new_state.device_id) MessageTracker.ack_delivery(new_state.message_tracker, message_id) Logger.info("Device heartbeat.", tag: "device_heartbeat") @@ -204,12 +197,10 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end def handle_disconnection(state, message_id, timestamp) do - {:ok, db_client} = Database.connect(realm: state.realm) - new_state = state - |> execute_time_based_actions(timestamp, db_client) - |> set_device_disconnected(db_client, timestamp) + |> execute_time_based_actions(timestamp) + |> set_device_disconnected(timestamp) MessageTracker.ack_delivery(new_state.message_tracker, message_id) Logger.info("Device disconnected.", tag: "device_disconnected") @@ -457,15 +448,13 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end def handle_data(state, interface, path, payload, message_id, timestamp) do - {:ok, db_client} = Database.connect(realm: state.realm) - - new_state = execute_time_based_actions(state, timestamp, db_client) + new_state = execute_time_based_actions(state, timestamp) with :ok <- validate_interface(interface), :ok <- validate_path(path), maybe_descriptor <- Map.get(new_state.interfaces, interface), {:ok, interface_descriptor, new_state} <- - maybe_handle_cache_miss(maybe_descriptor, interface, new_state, db_client), + maybe_handle_cache_miss(maybe_descriptor, interface, new_state), :ok <- can_write_on_interface?(interface_descriptor), interface_id <- interface_descriptor.interface_id, {:ok, endpoint} <- resolve_path(path, interface_descriptor, new_state.mappings), @@ -561,7 +550,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do is_still_valid?( Queries.fetch_path_expiry( - db_client, + state.realm, new_state.device_id, interface_descriptor, endpoint, @@ -573,7 +562,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do true -> Queries.insert_path_into_db( - db_client, + state.realm, new_state.device_id, interface_descriptor, endpoint, @@ -618,7 +607,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do # TODO: handle insert failures here insert_result = Queries.insert_value_into_db( - db_client, + state.realm, new_state.device_id, interface_descriptor, endpoint, @@ -1183,9 +1172,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end def process_introspection(state, new_introspection_list, payload, message_id, timestamp) do - {:ok, db_client} = Database.connect(realm: state.realm) - - new_state = execute_time_based_actions(state, timestamp, db_client) + new_state = execute_time_based_actions(state, timestamp) timestamp_ms = div(timestamp, 10_000) @@ -1202,7 +1189,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do any_interface_id = SimpleTriggersProtobufUtils.any_interface_object_id() %{device_triggers: device_triggers} = - populate_triggers_for_object!(new_state, db_client, any_interface_id, :any_interface) + populate_triggers_for_object!(new_state, any_interface_id, :any_interface) realm = new_state.realm device_id_string = Device.encode_device_id(new_state.device_id) @@ -1244,7 +1231,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do :ok = if interface_major == 0 do Queries.register_device_with_interface( - db_client, + realm, state.device_id, interface_name, 0 @@ -1284,7 +1271,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do :ok = if interface_major == 0 do Queries.unregister_device_with_interface( - db_client, + realm, state.device_id, interface_name, 0 @@ -1319,35 +1306,26 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end end) - {added_interfaces, removed_interfaces} = - Enum.reduce(diff, {%{}, %{}}, fn {change_type, changed_interfaces}, {add_acc, rm_acc} -> - case change_type do - :ins -> - changed_map = Enum.into(changed_interfaces, %{}) - {Map.merge(add_acc, changed_map), rm_acc} - - :del -> - changed_map = Enum.into(changed_interfaces, %{}) - {add_acc, Map.merge(rm_acc, changed_map)} - - :eq -> - {add_acc, rm_acc} - end - end) - - {:ok, old_minors} = Queries.fetch_device_introspection_minors(db_client, state.device_id) + {:ok, old_minors} = Queries.fetch_device_introspection_minors(realm, state.device_id) - readded_introspection = Enum.to_list(added_interfaces) + added_interfaces = + diff + |> Enum.filter(fn {type, _interfaces} -> type == :ins end) + |> Enum.flat_map(fn {_ins, interfaces} -> interfaces end) + |> MapSet.new() old_introspection = - Enum.reduce(removed_interfaces, %{}, fn {iface, _major}, acc -> - prev_major = Map.fetch!(state.introspection, iface) - prev_minor = Map.get(old_minors, iface, 0) - Map.put(acc, {iface, prev_major}, prev_minor) + diff + |> Enum.filter(fn {type, _interfaces} -> type == :del end) + |> Enum.flat_map(fn {_del, interfaces} -> interfaces end) + |> Map.new(fn {interface, _major} -> + prev_major = Map.fetch!(state.introspection, interface) + prev_minor = Map.get(old_minors, interface, 0) + {{interface, prev_major}, prev_minor} end) - :ok = Queries.add_old_interfaces(db_client, new_state.device_id, old_introspection) - :ok = Queries.remove_old_interfaces(db_client, new_state.device_id, readded_introspection) + :ok = Queries.add_old_interfaces(realm, new_state.device_id, old_introspection) + :ok = Queries.remove_old_interfaces(realm, new_state.device_id, added_interfaces) # Deliver interface_minor_updated triggers if needed for {interface_name, old_minor} <- old_minors, @@ -1377,7 +1355,8 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do # Removed/updated interfaces must be purged away, otherwise data will be written using old # interface_id. - remove_interfaces_list = Map.keys(removed_interfaces) + remove_interfaces_list = + Enum.map(old_introspection, fn {{interface, _major}, _minor} -> interface end) {interfaces_to_drop_map, _} = Map.split(new_state.interfaces, remove_interfaces_list) interfaces_to_drop_list = Map.keys(interfaces_to_drop_map) @@ -1386,7 +1365,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do new_state = forget_interfaces(new_state, interfaces_to_drop_list) Queries.update_device_introspection!( - db_client, + realm, new_state.device_id, db_introspection_map, db_introspection_minor_map @@ -1410,9 +1389,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end def handle_control(state, "/producer/properties", <<0, 0, 0, 0>>, message_id, timestamp) do - {:ok, db_client} = Database.connect(realm: state.realm) - - new_state = execute_time_based_actions(state, timestamp, db_client) + new_state = execute_time_based_actions(state, timestamp) timestamp_ms = div(timestamp, 10_000) @@ -1434,9 +1411,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end def handle_control(state, "/producer/properties", payload, message_id, timestamp) do - {:ok, db_client} = Database.connect(realm: state.realm) - - new_state = execute_time_based_actions(state, timestamp, db_client) + new_state = execute_time_based_actions(state, timestamp) timestamp_ms = div(timestamp, 10_000) @@ -1467,13 +1442,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do def handle_control(state, "/emptyCache", _payload, message_id, timestamp) do Logger.debug("Received /emptyCache") - {:ok, db_client} = Database.connect(realm: state.realm) - - new_state = execute_time_based_actions(state, timestamp, db_client) + realm = state.realm + new_state = execute_time_based_actions(state, timestamp) - with :ok <- send_control_consumer_properties(state, db_client), - {:ok, new_state} <- resend_all_properties(state, db_client), - :ok <- Queries.set_pending_empty_cache(db_client, new_state.device_id, false) do + with :ok <- send_control_consumer_properties(state), + {:ok, new_state} <- resend_all_properties(state), + :ok <- Queries.set_pending_empty_cache(realm, new_state.device_id, false) do MessageTracker.ack_delivery(state.message_tracker, message_id) :telemetry.execute( @@ -1629,8 +1603,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do interface_major: major, match_path: "/*" }} -> - with {:ok, db_client} <- Database.connect(realm: state.realm), - :ok <- + with :ok <- InterfaceQueries.check_if_interface_exists(state.realm, interface_name, major) do {:ok, new_state} else @@ -1645,8 +1618,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do interface_major: major, match_path: match_path }} -> - with {:ok, db_client} <- Database.connect(realm: state.realm), - {:ok, %InterfaceDescriptor{automaton: automaton}} <- + with {:ok, %InterfaceDescriptor{automaton: automaton}} <- InterfaceQueries.fetch_interface_descriptor(state.realm, interface_name, major), {:ok, _endpoint_id} <- EndpointsAutomaton.resolve_path(match_path, automaton) do {:ok, new_state} @@ -1775,9 +1747,9 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do {:ok, %{state | device_triggers: updated_device_triggers}} end - defp reload_groups_on_expiry(state, timestamp, db_client) do + defp reload_groups_on_expiry(state, timestamp) do if state.last_groups_refresh + @groups_lifespan_decimicroseconds <= timestamp do - {:ok, groups} = Queries.get_device_groups(db_client, state.device_id) + {:ok, groups} = Queries.get_device_groups(state.realm, state.device_id) %{state | last_groups_refresh: timestamp, groups: groups} else @@ -1785,7 +1757,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end end - defp reload_device_triggers_on_expiry(state, timestamp, db_client) do + defp reload_device_triggers_on_expiry(state, timestamp) do if state.last_device_triggers_refresh + @device_triggers_lifespan_decimicroseconds <= timestamp do any_device_id = SimpleTriggersProtobufUtils.any_device_object_id() @@ -1802,40 +1774,39 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do |> Map.put(:last_device_triggers_refresh, timestamp) |> Map.put(:device_triggers, %{}) |> forget_any_interface_data_triggers() - |> populate_triggers_for_object!(db_client, any_device_id, :any_device) - |> populate_triggers_for_object!(db_client, state.device_id, :device) - |> populate_triggers_for_object!(db_client, any_interface_id, :any_interface) + |> populate_triggers_for_object!(any_device_id, :any_device) + |> populate_triggers_for_object!(state.device_id, :device) + |> populate_triggers_for_object!(any_interface_id, :any_interface) |> populate_triggers_for_object!( - db_client, device_and_any_interface_object_id, :device_and_any_interface ) - |> populate_group_device_triggers!(db_client) - |> populate_group_and_any_interface_triggers!(db_client) + |> populate_group_device_triggers!() + |> populate_group_and_any_interface_triggers!() else state end end - defp populate_group_device_triggers!(state, db_client) do + defp populate_group_device_triggers!(state) do Enum.map(state.groups, &SimpleTriggersProtobufUtils.get_group_object_id/1) - |> Enum.reduce(state, &populate_triggers_for_object!(&2, db_client, &1, :group)) + |> Enum.reduce(state, &populate_triggers_for_object!(&2, &1, :group)) end - defp populate_group_and_any_interface_triggers!(state, db_client) do + defp populate_group_and_any_interface_triggers!(state) do Enum.map(state.groups, &SimpleTriggersProtobufUtils.get_group_and_any_interface_object_id/1) |> Enum.reduce( state, - &populate_triggers_for_object!(&2, db_client, &1, :group_and_any_interface) + &populate_triggers_for_object!(&2, &1, :group_and_any_interface) ) end - defp execute_time_based_actions(state, timestamp, db_client) do + defp execute_time_based_actions(state, timestamp) do state |> Map.put(:last_seen_message, timestamp) - |> reload_groups_on_expiry(timestamp, db_client) + |> reload_groups_on_expiry(timestamp) |> purge_expired_interfaces(timestamp) - |> reload_device_triggers_on_expiry(timestamp, db_client) + |> reload_device_triggers_on_expiry(timestamp) end defp purge_expired_interfaces(state, timestamp) do @@ -1913,7 +1884,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do } end - defp maybe_handle_cache_miss(nil, interface_name, state, db_client) do + defp maybe_handle_cache_miss(nil, interface_name, state) do with {:ok, major_version} <- DeviceQueries.interface_version(state.realm, state.device_id, interface_name), {:ok, interface_row} <- @@ -1940,7 +1911,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do new_state <- populate_triggers_for_object!( new_state, - db_client, interface_descriptor.interface_id, :interface ), @@ -1952,14 +1922,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do new_state = populate_triggers_for_object!( new_state, - db_client, device_and_interface_object_id, :device_and_interface ), new_state = populate_triggers_for_group_and_interface!( new_state, - db_client, interface_id ) do # TODO: make everything with-friendly @@ -1984,18 +1952,18 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end end - defp maybe_handle_cache_miss(interface_descriptor, _interface_name, state, _db_client) do + defp maybe_handle_cache_miss(interface_descriptor, _interface_name, state) do {:ok, interface_descriptor, state} end - defp populate_triggers_for_group_and_interface!(state, db_client, interface_id) do + defp populate_triggers_for_group_and_interface!(state, interface_id) do Enum.map( state.groups, &SimpleTriggersProtobufUtils.get_group_and_interface_object_id(&1, interface_id) ) |> Enum.reduce( state, - &populate_triggers_for_object!(&2, db_client, &1, :group_and_interface) + &populate_triggers_for_object!(&2, &1, :group_and_interface) ) end @@ -2003,23 +1971,20 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do {:ok, paths_set} = PayloadsDecoder.parse_device_properties_payload(decoded_payload, state.introspection) - {:ok, db_client} = Database.connect(realm: state.realm) - Enum.each(state.introspection, fn {interface, _} -> # TODO: check result here - prune_interface(state, db_client, interface, paths_set, timestamp) + prune_interface(state, interface, paths_set, timestamp) end) :ok end - defp prune_interface(state, db_client, interface, all_paths_set, timestamp) do + defp prune_interface(state, interface, all_paths_set, timestamp) do with {:ok, interface_descriptor, new_state} <- maybe_handle_cache_miss( Map.get(state.interfaces, interface), interface, - state, - db_client + state ) do cond do interface_descriptor.type != :properties -> @@ -2031,62 +1996,64 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do {:error, :maybe_outdated_introspection} true -> - do_prune(new_state, db_client, interface_descriptor, all_paths_set, timestamp) + do_prune(new_state, interface_descriptor, all_paths_set, timestamp) # TODO: nobody uses new_state {:ok, new_state} end end end - defp do_prune(state, db, interface_descriptor, all_paths_set, timestamp) do + defp do_prune(state, interface_descriptor, all_paths_set, timestamp) do each_interface_mapping(state.mappings, interface_descriptor, fn mapping -> endpoint_id = mapping.endpoint_id - Queries.query_all_endpoint_paths!(db, state.device_id, interface_descriptor, endpoint_id) - |> Enum.each(fn path_row -> - path = path_row[:path] - - if not MapSet.member?(all_paths_set, {interface_descriptor.name, path}) do - device_id_string = Device.encode_device_id(state.device_id) + Queries.query_all_endpoint_paths!( + state.realm, + state.device_id, + interface_descriptor, + endpoint_id + ) + |> Enum.map(&Map.fetch!(&1, "path")) + |> Enum.reject(&MapSet.member?(all_paths_set, {interface_descriptor.name, &1})) + |> Enum.each(fn path -> + device_id_string = Device.encode_device_id(state.device_id) - {:ok, endpoint_id} = - EndpointsAutomaton.resolve_path(path, interface_descriptor.automaton) + {:ok, endpoint_id} = EndpointsAutomaton.resolve_path(path, interface_descriptor.automaton) - Queries.delete_property_from_db(state, db, interface_descriptor, endpoint_id, path) + Queries.delete_property_from_db(state, interface_descriptor, endpoint_id, path) - interface_id = interface_descriptor.interface_id + interface_id = interface_descriptor.interface_id - path_removed_triggers = - get_on_data_triggers(state, :on_path_removed, interface_id, endpoint_id, path) + path_removed_triggers = + get_on_data_triggers(state, :on_path_removed, interface_id, endpoint_id, path) - i_name = interface_descriptor.name + i_name = interface_descriptor.name - Enum.each(path_removed_triggers, fn trigger -> - target_with_policy_list = - trigger.trigger_targets - |> Enum.map(fn target -> - {target, Map.get(state.trigger_id_to_policy_name, target.parent_trigger_id)} - end) + Enum.each(path_removed_triggers, fn trigger -> + target_with_policy_list = + trigger.trigger_targets + |> Enum.map(fn target -> + {target, Map.get(state.trigger_id_to_policy_name, target.parent_trigger_id)} + end) - TriggersHandler.path_removed( - target_with_policy_list, - state.realm, - device_id_string, - i_name, - path, - timestamp - ) - end) - end + TriggersHandler.path_removed( + target_with_policy_list, + state.realm, + device_id_string, + i_name, + path, + timestamp + ) + end) end) end) end - defp set_device_disconnected(state, db_client, timestamp) do + defp set_device_disconnected(state, timestamp) do timestamp_ms = div(timestamp, 10_000) Queries.set_device_disconnected!( - db_client, + state.realm, state.device_id, timestamp_ms, state.total_received_msgs, @@ -2127,11 +2094,9 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do encoded_device_id = Device.encode_device_id(device_id) - {:ok, db_client} = Database.connect(realm: state.realm) - - with :ok <- Queries.set_pending_empty_cache(db_client, device_id, true), + with :ok <- Queries.set_pending_empty_cache(realm, device_id, true), :ok <- force_disconnection(realm, encoded_device_id) do - new_state = set_device_disconnected(state, db_client, timestamp) + new_state = set_device_disconnected(state, timestamp) Logger.info("Successfully forced device disconnection.", tag: "forced_device_disconnection") @@ -2206,35 +2171,36 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end end - defp populate_triggers_for_object!(state, client, object_id, object_type) do + defp populate_triggers_for_object!(state, object_id, object_type) do object_type_int = SimpleTriggersProtobufUtils.object_type_to_int!(object_type) - simple_triggers_rows = Queries.query_simple_triggers!(client, object_id, object_type_int) + simple_triggers_rows = Queries.query_simple_triggers!(state.realm, object_id, object_type_int) new_state = - Enum.reduce(simple_triggers_rows, state, fn row, state_acc -> - trigger_id = row[:simple_trigger_id] - parent_trigger_id = row[:parent_trigger_id] + Enum.reduce(simple_triggers_rows, state, fn trigger_data, state_acc -> + %{ + "simple_trigger_id" => trigger_id, + "parent_trigger_id" => parent_trigger_id, + "trigger_data" => trigger_data, + "trigger_target" => trigger_target + } = trigger_data - simple_trigger = - SimpleTriggersProtobufUtils.deserialize_simple_trigger(row[:trigger_data]) + simple_trigger = SimpleTriggersProtobufUtils.deserialize_simple_trigger(trigger_data) trigger_target = - SimpleTriggersProtobufUtils.deserialize_trigger_target(row[:trigger_target]) + SimpleTriggersProtobufUtils.deserialize_trigger_target(trigger_target) |> Map.put(:simple_trigger_id, trigger_id) |> Map.put(:parent_trigger_id, parent_trigger_id) load_trigger(state_acc, simple_trigger, trigger_target) end) - Enum.reduce(new_state.volatile_triggers, new_state, fn {{obj_id, obj_type}, - {simple_trigger, trigger_target}}, - state_acc -> - if obj_id == object_id and obj_type == object_type_int do + Enum.reduce(new_state.volatile_triggers, new_state, fn + {{^object_id, ^object_type_int}, {simple_trigger, trigger_target}}, state_acc -> load_trigger(state_acc, simple_trigger, trigger_target) - else + + _, state_acc -> state_acc - end end) end @@ -2505,16 +2471,16 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end) end - defp send_control_consumer_properties(state, db_client) do + defp send_control_consumer_properties(state) do Logger.debug("Device introspection: #{inspect(state.introspection)}.") abs_paths_list = Enum.flat_map(state.introspection, fn {interface, _} -> descriptor = Map.get(state.interfaces, interface) - case maybe_handle_cache_miss(descriptor, interface, state, db_client) do + case maybe_handle_cache_miss(descriptor, interface, state) do {:ok, interface_descriptor, new_state} -> - gather_interface_properties(new_state, db_client, interface_descriptor) + gather_interface_properties(new_state, interface_descriptor) {:error, :interface_loading_failed} -> Logger.warn("Failed #{interface} interface loading.") @@ -2530,31 +2496,30 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end defp gather_interface_properties( - %State{device_id: device_id, mappings: mappings} = _state, - db_client, + %State{device_id: device_id, mappings: mappings, realm: realm} = _state, %InterfaceDescriptor{type: :properties, ownership: :server} = interface_descriptor ) do reduce_interface_mapping(mappings, interface_descriptor, [], fn mapping, i_acc -> - Queries.retrieve_endpoint_values(db_client, device_id, interface_descriptor, mapping) + Queries.retrieve_endpoint_values(realm, device_id, interface_descriptor, mapping) |> Enum.reduce(i_acc, fn [{:path, path}, {_, _value}], acc -> ["#{interface_descriptor.name}#{path}" | acc] end) end) end - defp gather_interface_properties(_state, _db, %InterfaceDescriptor{} = _descriptor) do + defp gather_interface_properties(_state, %InterfaceDescriptor{} = _descriptor) do [] end - defp resend_all_properties(state, db_client) do + defp resend_all_properties(state) do Logger.debug("Device introspection: #{inspect(state.introspection)}") Enum.reduce_while(state.introspection, {:ok, state}, fn {interface, _}, {:ok, state_acc} -> maybe_descriptor = Map.get(state_acc.interfaces, interface) with {:ok, interface_descriptor, new_state} <- - maybe_handle_cache_miss(maybe_descriptor, interface, state_acc, db_client), - :ok <- resend_all_interface_properties(new_state, db_client, interface_descriptor) do + maybe_handle_cache_miss(maybe_descriptor, interface, state_acc), + :ok <- resend_all_interface_properties(new_state, interface_descriptor) do {:cont, {:ok, new_state}} else {:error, :interface_loading_failed} -> @@ -2569,13 +2534,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do defp resend_all_interface_properties( %State{realm: realm, device_id: device_id, mappings: mappings} = _state, - db_client, %InterfaceDescriptor{type: :properties, ownership: :server} = interface_descriptor ) do encoded_device_id = Device.encode_device_id(device_id) each_interface_mapping(mappings, interface_descriptor, fn mapping -> - Queries.retrieve_endpoint_values(db_client, device_id, interface_descriptor, mapping) + Queries.retrieve_endpoint_values(realm, device_id, interface_descriptor, mapping) |> Enum.reduce_while(:ok, fn [{:path, path}, {_, value}], _acc -> case send_value(realm, encoded_device_id, interface_descriptor.name, path, value) do {:ok, _bytes} -> @@ -2589,7 +2553,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end) end - defp resend_all_interface_properties(_state, _db, %InterfaceDescriptor{} = _descriptor) do + defp resend_all_interface_properties(_state, %InterfaceDescriptor{} = _descriptor) do :ok end diff --git a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/queries.ex b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/queries.ex index 6b052573d..259816ba7 100644 --- a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/queries.ex +++ b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/queries.ex @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2018 Ispirata Srl +# Copyright 2018-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -22,71 +22,60 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do alias Astarte.Core.InterfaceDescriptor alias Astarte.Core.Mapping alias Astarte.DataUpdaterPlant.Config - alias CQEx.Query, as: DatabaseQuery - alias CQEx.Result, as: DatabaseResult require Logger - def query_simple_triggers!(db_client, object_id, object_type_int) do + @default_query_opts [uuid_format: :binary, timestamp_format: :integer] + @default_custom_query_opts [result: :default, context: nil] + + def query_simple_triggers!(realm_name, object_id, object_type_int) do simple_triggers_statement = """ SELECT simple_trigger_id, parent_trigger_id, trigger_data, trigger_target FROM simple_triggers WHERE object_id=:object_id AND object_type=:object_type_int """ - simple_triggers_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(simple_triggers_statement) - |> DatabaseQuery.put(:object_id, object_id) - |> DatabaseQuery.put(:object_type_int, object_type_int) + params = %{ + "object_id" => object_id, + "object_type_int" => object_type_int + } - DatabaseQuery.call!(db_client, simple_triggers_query) + custom_query!(simple_triggers_statement, realm_name, params) end - def query_all_endpoint_paths!(db_client, device_id, interface_descriptor, endpoint_id) do + def query_all_endpoint_paths!(realm_name, device_id, interface_descriptor, endpoint_id) do all_paths_statement = """ SELECT path FROM #{interface_descriptor.storage} WHERE device_id=:device_id AND interface_id=:interface_id AND endpoint_id=:endpoint_id """ - all_paths_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(all_paths_statement) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:interface_id, interface_descriptor.interface_id) - |> DatabaseQuery.put(:endpoint_id, endpoint_id) + params = %{ + "device_id" => device_id, + "interface_id" => interface_descriptor.interface_id, + "endpoint_id" => endpoint_id + } - DatabaseQuery.call!(db_client, all_paths_query) + custom_query!(all_paths_statement, realm_name, params) end - def set_pending_empty_cache(db_client, device_id, pending_empty_cache) do + def set_pending_empty_cache(realm_name, device_id, pending_empty_cache) do pending_empty_cache_statement = """ UPDATE devices SET pending_empty_cache = :pending_empty_cache WHERE device_id = :device_id """ - update_pending = - DatabaseQuery.new() - |> DatabaseQuery.statement(pending_empty_cache_statement) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:pending_empty_cache, pending_empty_cache) + params = %{ + "device_id" => device_id, + "pending_empty_cache" => pending_empty_cache + } - with {:ok, _result} <- DatabaseQuery.call(db_client, update_pending) do + with {:ok, _result} <- custom_query(pending_empty_cache_statement, realm_name, params) do :ok - else - %{acc: _, msg: error_message} -> - Logger.warn("Database error: #{error_message}.") - {:error, :database_error} - - {:error, reason} -> - # DB Error - Logger.warn("Failed with reason #{inspect(reason)}.") - {:error, :database_error} end end def insert_value_into_db( - db_client, + realm_name, device_id, %InterfaceDescriptor{storage_type: :multi_interface_individual_properties_dbtable} = interface_descriptor, @@ -102,25 +91,28 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do # TODO: should we handle this situation? end + statement = """ + DELETE FROM #{interface_descriptor.storage} + WHERE device_id=:device_id AND interface_id=:interface_id AND endpoint_id=:endpoint_id AND path=:path + """ + # TODO: :reception_timestamp_submillis is just a place holder right now - unset_query = - DatabaseQuery.new() - |> DatabaseQuery.statement( - "DELETE FROM #{interface_descriptor.storage} WHERE device_id=:device_id AND interface_id=:interface_id AND endpoint_id=:endpoint_id AND path=:path" - ) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:interface_id, interface_descriptor.interface_id) - |> DatabaseQuery.put(:endpoint_id, endpoint.endpoint_id) - |> DatabaseQuery.put(:path, path) - |> DatabaseQuery.consistency(insert_consistency(interface_descriptor, endpoint)) + params = %{ + "device_id" => device_id, + "interface_id" => interface_descriptor.interface_id, + "endpoint_id" => endpoint.endpoint_id, + "path" => path + } + + consistency = insert_consistency(interface_descriptor, endpoint) - DatabaseQuery.call!(db_client, unset_query) + custom_query!(statement, realm_name, params, consistency: consistency) :ok end def insert_value_into_db( - db_client, + realm_name, device_id, %InterfaceDescriptor{storage_type: :multi_interface_individual_properties_dbtable} = interface_descriptor, @@ -132,29 +124,31 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do _opts ) do # TODO: :reception_timestamp_submillis is just a place holder right now - insert_query = - DatabaseQuery.new() - |> DatabaseQuery.statement( - "INSERT INTO #{interface_descriptor.storage} " <> - "(device_id, interface_id, endpoint_id, path, reception_timestamp, #{CQLUtils.type_to_db_column_name(endpoint.value_type)}) " <> - "VALUES (:device_id, :interface_id, :endpoint_id, :path, :reception_timestamp, :value);" - ) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:interface_id, interface_descriptor.interface_id) - |> DatabaseQuery.put(:endpoint_id, endpoint.endpoint_id) - |> DatabaseQuery.put(:path, path) - |> DatabaseQuery.put(:reception_timestamp, div(reception_timestamp, 10000)) - |> DatabaseQuery.put(:reception_timestamp_submillis, rem(reception_timestamp, 10000)) - |> DatabaseQuery.put(:value, to_db_friendly_type(value)) - |> DatabaseQuery.consistency(insert_consistency(interface_descriptor, endpoint)) + statement = """ + INSERT INTO #{interface_descriptor.storage} + (device_id, interface_id, endpoint_id, path, reception_timestamp, #{CQLUtils.type_to_db_column_name(endpoint.value_type)}) + VALUES (:device_id, :interface_id, :endpoint_id, :path, :reception_timestamp, :value); + """ + + params = %{ + "device_id" => device_id, + "interface_id" => interface_descriptor.interface_id, + "endpoint_id" => endpoint.endpoint_id, + "path" => path, + "reception_timestamp" => div(reception_timestamp, 10000), + "reception_timestamp_submillis" => rem(reception_timestamp, 10000), + "value" => to_db_friendly_type(value) + } + + consistency = insert_consistency(interface_descriptor, endpoint) - DatabaseQuery.call!(db_client, insert_query) + custom_query!(statement, realm_name, params, consistency: consistency) :ok end def insert_value_into_db( - db_client, + realm_name, device_id, %InterfaceDescriptor{storage_type: :multi_interface_individual_datastream_dbtable} = interface_descriptor, @@ -167,32 +161,34 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do ) do ttl_string = get_ttl_string(opts) + statement = """ + INSERT INTO #{interface_descriptor.storage} + (device_id, interface_id, endpoint_id, path, value_timestamp, reception_timestamp, reception_timestamp_submillis, #{CQLUtils.type_to_db_column_name(endpoint.value_type)}) + VALUES (:device_id, :interface_id, :endpoint_id, :path, :value_timestamp, :reception_timestamp, :reception_timestamp_submillis, :value) #{ttl_string}; + """ + # TODO: use received value_timestamp when needed # TODO: :reception_timestamp_submillis is just a place holder right now - insert_query = - DatabaseQuery.new() - |> DatabaseQuery.statement( - "INSERT INTO #{interface_descriptor.storage} " <> - "(device_id, interface_id, endpoint_id, path, value_timestamp, reception_timestamp, reception_timestamp_submillis, #{CQLUtils.type_to_db_column_name(endpoint.value_type)}) " <> - "VALUES (:device_id, :interface_id, :endpoint_id, :path, :value_timestamp, :reception_timestamp, :reception_timestamp_submillis, :value) #{ttl_string};" - ) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:interface_id, interface_descriptor.interface_id) - |> DatabaseQuery.put(:endpoint_id, endpoint.endpoint_id) - |> DatabaseQuery.put(:path, path) - |> DatabaseQuery.put(:value_timestamp, value_timestamp) - |> DatabaseQuery.put(:reception_timestamp, div(reception_timestamp, 10000)) - |> DatabaseQuery.put(:reception_timestamp_submillis, rem(reception_timestamp, 10000)) - |> DatabaseQuery.put(:value, to_db_friendly_type(value)) - |> DatabaseQuery.consistency(insert_consistency(interface_descriptor, endpoint)) - - DatabaseQuery.call!(db_client, insert_query) + params = %{ + "device_id" => device_id, + "interface_id" => interface_descriptor.interface_id, + "endpoint_id" => endpoint.endpoint_id, + "path" => path, + "value_timestamp" => value_timestamp, + "reception_timestamp" => div(reception_timestamp, 10000), + "reception_timestamp_submillis" => rem(reception_timestamp, 10000), + "value" => to_db_friendly_type(value) + } + + consistency = insert_consistency(interface_descriptor, endpoint) + + custom_query!(statement, realm_name, params, consistency: consistency) :ok end def insert_value_into_db( - db_client, + realm_name, device_id, %InterfaceDescriptor{storage_type: :one_object_datastream_dbtable} = interface_descriptor, _endpoint, @@ -205,60 +201,55 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do ttl_string = get_ttl_string(opts) # TODO: we should cache endpoints by interface_id - endpoint_query = - DatabaseQuery.new() - |> DatabaseQuery.statement( - "SELECT endpoint, value_type FROM endpoints WHERE interface_id=:interface_id;" - ) - |> DatabaseQuery.put(:interface_id, interface_descriptor.interface_id) + endpoint_statement = + "SELECT endpoint, value_type FROM endpoints WHERE interface_id=:interface_id;" - endpoint_rows = DatabaseQuery.call!(db_client, endpoint_query) + endpoint_params = %{ + "interface_id" => interface_descriptor.interface_id + } - # TODO: we should also cache explicit_timestamp - explicit_timestamp_query = - DatabaseQuery.new() - |> DatabaseQuery.statement( - "SELECT explicit_timestamp FROM endpoints WHERE interface_id=:interface_id LIMIT 1;" - ) - |> DatabaseQuery.put(:interface_id, interface_descriptor.interface_id) + endpoint_rows = custom_query!(endpoint_statement, realm_name, endpoint_params) - [explicit_timestamp: explicit_timestamp] = - DatabaseQuery.call!(db_client, explicit_timestamp_query) - |> CQEx.Result.head() + # TODO: we should also cache explicit_timestamp + explicit_timestamp_statement = + "SELECT explicit_timestamp FROM endpoints WHERE interface_id=:interface_id LIMIT 1;" - # FIXME: new atoms are created here, we should avoid this. We need to replace CQEx. - column_atoms = - Enum.reduce(endpoint_rows, %{}, fn endpoint, column_atoms_acc -> - endpoint_name = - endpoint[:endpoint] - |> String.split("/") - |> List.last() + explicit_timestamp_params = %{"interface_id" => interface_descriptor.interface_id} - column_name = CQLUtils.endpoint_to_db_column_name(endpoint_name) + %{"explicit_timestamp" => explicit_timestamp} = + custom_query!(explicit_timestamp_statement, realm_name, explicit_timestamp_params, + result: :first! + ) - Map.put(column_atoms_acc, endpoint_name, String.to_atom(column_name)) + endpoints = + Enum.map(endpoint_rows, fn %{"endpoint" => endpoint_name} -> + endpoint_name + |> String.split("/") + |> List.last() end) - {query_values, placeholders, query_columns} = - Enum.reduce(value, {%{}, "", ""}, fn {obj_key, obj_value}, - {query_values_acc, placeholders_acc, query_acc} -> - if column_atoms[obj_key] != nil do - column_name = CQLUtils.endpoint_to_db_column_name(obj_key) + {endpoints, invalid} = Enum.split_with(value, fn {key, _} -> key in endpoints end) + + for {key, val} <- invalid do + Logger.warn("Unexpected object key #{inspect(key)} with value #{inspect(val)}.") + end - db_value = to_db_friendly_type(obj_value) - next_query_values_acc = Map.put(query_values_acc, column_atoms[obj_key], db_value) - next_placeholders_acc = "#{placeholders_acc} :#{to_string(column_atoms[obj_key])}," - next_query_acc = "#{query_acc} #{column_name}, " + query_values = + for {key, value} <- endpoints, into: %{} do + column = CQLUtils.endpoint_to_db_column_name(key) + value = to_db_friendly_type(value) + {column, value} + end - {next_query_values_acc, next_placeholders_acc, next_query_acc} - else - Logger.warn( - "Unexpected object key #{inspect(obj_key)} with value #{inspect(obj_value)}." - ) + db_columns = Map.keys(query_values) - query_values_acc - end - end) + placeholders = + db_columns + |> Enum.map_join(" ", &":#{&1},") + + query_columns = + db_columns + |> Enum.map_join(" ", &"#{&1},") {query_columns, placeholders} = if explicit_timestamp do @@ -267,24 +258,26 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do {query_columns, placeholders} end + statement = """ + INSERT INTO #{interface_descriptor.storage} (device_id, path, #{query_columns} reception_timestamp, reception_timestamp_submillis) + VALUES (:device_id, :path, #{placeholders} :reception_timestamp, :reception_timestamp_submillis) #{ttl_string}; + """ + # TODO: use received value_timestamp when needed # TODO: :reception_timestamp_submillis is just a place holder right now - insert_query = - DatabaseQuery.new() - |> DatabaseQuery.statement( - "INSERT INTO #{interface_descriptor.storage} (device_id, path, #{query_columns} reception_timestamp, reception_timestamp_submillis) " <> - "VALUES (:device_id, :path, #{placeholders} :reception_timestamp, :reception_timestamp_submillis) #{ttl_string};" - ) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:path, path) - |> DatabaseQuery.put(:value_timestamp, value_timestamp) - |> DatabaseQuery.put(:reception_timestamp, div(reception_timestamp, 10000)) - |> DatabaseQuery.put(:reception_timestamp_submillis, rem(reception_timestamp, 10000)) - |> DatabaseQuery.merge(query_values) + params = + %{ + "device_id" => device_id, + "path" => path, + "value_timestamp" => value_timestamp, + "reception_timestamp" => div(reception_timestamp, 10000), + "reception_timestamp_submillis" => rem(reception_timestamp, 10000) + } + |> Map.merge(query_values) - # TODO: |> DatabaseQuery.consistency(insert_consistency(interface_descriptor, endpoint)) + # TODO: consistency = insert_consistency(interface_descriptor, endpoint) - DatabaseQuery.call!(db_client, insert_query) + custom_query!(statement, realm_name, params) :ok end @@ -299,7 +292,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do end def insert_path_into_db( - db_client, + realm_name, device_id, %InterfaceDescriptor{storage_type: :multi_interface_individual_datastream_dbtable} = interface_descriptor, @@ -310,7 +303,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do opts ) do insert_path( - db_client, + realm_name, device_id, interface_descriptor, mapping, @@ -322,7 +315,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do end def insert_path_into_db( - db_client, + realm_name, device_id, %InterfaceDescriptor{storage_type: :one_object_datastream_dbtable} = interface_descriptor, mapping, @@ -332,7 +325,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do opts ) do insert_path( - db_client, + realm_name, device_id, interface_descriptor, mapping, @@ -344,7 +337,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do end defp insert_path( - db_client, + realm_name, device_id, interface_descriptor, endpoint, @@ -364,45 +357,45 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do :reception_timestamp, :reception_timestamp_submillis, :datetime_value) #{ttl_string} """ - insert_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(insert_statement) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:interface_id, interface_descriptor.interface_id) - |> DatabaseQuery.put(:endpoint_id, endpoint.endpoint_id) - |> DatabaseQuery.put(:path, path) - |> DatabaseQuery.put(:reception_timestamp, div(reception_timestamp, 10000)) - |> DatabaseQuery.put(:reception_timestamp_submillis, rem(reception_timestamp, 10000)) - |> DatabaseQuery.put(:datetime_value, value_timestamp) - |> DatabaseQuery.consistency(path_consistency(interface_descriptor, endpoint)) - - with {:ok, %CQEx.Result.Empty{}} <- DatabaseQuery.call(db_client, insert_query) do + params = %{ + "device_id" => device_id, + "interface_id" => interface_descriptor.interface_id, + "endpoint_id" => endpoint.endpoint_id, + "path" => path, + "reception_timestamp" => div(reception_timestamp, 10000), + "reception_timestamp_submillis" => rem(reception_timestamp, 10000), + "datetime_value" => value_timestamp + } + + consistency = path_consistency(interface_descriptor, endpoint) + + with {:ok, _result} <- + custom_query(insert_statement, realm_name, params, + consistency: consistency, + context: "upserting path: #{path}" + ) do :ok - else - {:error, reason} -> - Logger.warn("Error while upserting path: #{path} (reason: #{inspect(reason)}).") - {:error, :database_error} end end - def delete_property_from_db(state, db_client, interface_descriptor, endpoint_id, path) do - delete_query = - DatabaseQuery.new() - |> DatabaseQuery.statement( - "DELETE FROM #{interface_descriptor.storage} WHERE device_id=:device_id AND interface_id=:interface_id AND endpoint_id=:endpoint_id AND path=:path;" - ) - |> DatabaseQuery.put(:device_id, state.device_id) - |> DatabaseQuery.put(:interface_id, interface_descriptor.interface_id) - |> DatabaseQuery.put(:endpoint_id, endpoint_id) - |> DatabaseQuery.put(:path, path) + def delete_property_from_db(state, interface_descriptor, endpoint_id, path) do + statement = + "DELETE FROM #{interface_descriptor.storage} WHERE device_id=:device_id AND interface_id=:interface_id AND endpoint_id=:endpoint_id AND path=:path;" + + params = %{ + "device_id" => state.device_id, + "interface_id" => interface_descriptor.interface_id, + "endpoint_id" => endpoint_id, + "path" => path + } - # TODO: |> DatabaseQuery.consistency(insert_consistency(interface_descriptor, endpoint)) + # TODO: consistency = insert_consistency(interface_descriptor, endpoint) - DatabaseQuery.call!(db_client, delete_query) + custom_query!(statement, state.realm, params) :ok end - def retrieve_device_stats_and_introspection!(db_client, device_id) do + def retrieve_device_stats_and_introspection!(realm_name, device_id) do stats_and_introspection_statement = """ SELECT total_received_msgs, total_received_bytes, introspection, exchanged_bytes_by_interface, exchanged_msgs_by_interface @@ -410,59 +403,36 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do WHERE device_id=:device_id """ - device_row_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(stats_and_introspection_statement) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.consistency(:local_quorum) + params = %{"device_id" => device_id} device_row = - DatabaseQuery.call!(db_client, device_row_query) - |> DatabaseResult.head() - - introspection_map = convert_map_result(device_row[:introspection]) - - initial_interface_exchanged_bytes = - convert_map_result(device_row[:exchanged_bytes_by_interface]) - |> convert_tuple_keys() - - initial_interface_exchanged_msgs = - convert_map_result(device_row[:exchanged_msgs_by_interface]) - |> convert_tuple_keys() + custom_query!(stats_and_introspection_statement, realm_name, params, + result: :first!, + consistency: :local_quorum + ) %{ - introspection: introspection_map, - total_received_msgs: device_row[:total_received_msgs], - total_received_bytes: device_row[:total_received_bytes], - initial_interface_exchanged_bytes: initial_interface_exchanged_bytes, - initial_interface_exchanged_msgs: initial_interface_exchanged_msgs + introspection: device_row["introspection"] || %{}, + total_received_msgs: device_row["total_received_msgs"], + total_received_bytes: device_row["total_received_bytes"], + initial_interface_exchanged_bytes: device_row["exchanged_bytes_by_interface"] || %{}, + initial_interface_exchanged_msgs: device_row["exchanged_msgs_by_interface"] || %{} } end - defp convert_map_result(nil), do: %{} - defp convert_map_result(result) when is_list(result), do: Enum.into(result, %{}) - defp convert_map_result(result) when is_map(result), do: result - - # CQEx returns tuple keys as lists, convert them to tuples - defp convert_tuple_keys(map) when is_map(map) do - for {key, value} <- map, into: %{} do - {List.to_tuple(key), value} - end - end - - def set_device_connected!(db_client, device_id, timestamp_ms, ip_address) do - set_connection_info!(db_client, device_id, timestamp_ms, ip_address) + def set_device_connected!(realm_name, device_id, timestamp_ms, ip_address) do + set_connection_info!(realm_name, device_id, timestamp_ms, ip_address) ttl = heartbeat_interval_seconds() * 8 - refresh_device_connected!(db_client, device_id, ttl) + refresh_device_connected!(realm_name, device_id, ttl) end - def maybe_refresh_device_connected!(db_client, device_id) do - with {:ok, remaining_ttl} <- get_connected_remaining_ttl(db_client, device_id) do + def maybe_refresh_device_connected!(realm_name, device_id) do + with {:ok, remaining_ttl} <- get_connected_remaining_ttl(realm_name, device_id) do if remaining_ttl < heartbeat_interval_seconds() * 2 do Logger.debug("Refreshing connected status", tag: "refresh_device_connected") write_ttl = heartbeat_interval_seconds() * 8 - refresh_device_connected!(db_client, device_id, write_ttl) + refresh_device_connected!(realm_name, device_id, write_ttl) else :ok end @@ -473,25 +443,23 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do Config.device_heartbeat_interval_ms!() |> div(1000) end - defp set_connection_info!(db_client, device_id, timestamp_ms, ip_address) do + defp set_connection_info!(realm_name, device_id, timestamp_ms, ip_address) do device_update_statement = """ UPDATE devices SET last_connection=:last_connection, last_seen_ip=:last_seen_ip WHERE device_id=:device_id """ - device_update_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(device_update_statement) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:last_connection, timestamp_ms) - |> DatabaseQuery.put(:last_seen_ip, ip_address) - |> DatabaseQuery.consistency(:local_quorum) + params = %{ + "device_id" => device_id, + "last_connection" => timestamp_ms, + "last_seen_ip" => ip_address + } - DatabaseQuery.call!(db_client, device_update_query) + custom_query!(device_update_statement, realm_name, params, consistency: :local_quorum) end - defp refresh_device_connected!(db_client, device_id, ttl) do + defp refresh_device_connected!(realm_name, device_id, ttl) do refresh_connected_statement = """ UPDATE devices USING TTL #{ttl} @@ -499,50 +467,34 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do WHERE device_id=:device_id """ - refresh_connected_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(refresh_connected_statement) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.consistency(:local_quorum) + params = %{"device_id" => device_id} - DatabaseQuery.call!(db_client, refresh_connected_query) + custom_query!(refresh_connected_statement, realm_name, params, consistency: :local_quorum) end - defp get_connected_remaining_ttl(db_client, device_id) do + defp get_connected_remaining_ttl(realm_name, device_id) do fetch_connected_ttl_statement = """ SELECT TTL(connected) FROM devices WHERE device_id=:device_id """ - fetch_connected_ttl_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(fetch_connected_ttl_statement) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.consistency(:quorum) + params = %{"device_id" => device_id} - with {:ok, result} <- DatabaseQuery.call(db_client, fetch_connected_ttl_query), - ["ttl(connected)": ttl] when is_integer(ttl) <- DatabaseResult.head(result) do + with {:ok, result} <- + custom_query(fetch_connected_ttl_statement, realm_name, params, + consistency: :quorum, + result: {:first!, :device_not_found}, + context: "retrieving property" + ) do + %{"ttl(connected)" => ttl} = result + ttl = ttl || 0 {:ok, ttl} - else - :empty_dataset -> - {:error, :device_not_found} - - ["ttl(connected)": nil] -> - {:ok, 0} - - %{acc: _, msg: error_message} -> - Logger.warn("Database error: #{error_message}.") - {:error, :database_error} - - {:error, reason} -> - Logger.warn("Database error while retrieving property: #{inspect(reason)}.") - {:error, :database_error} end end def set_device_disconnected!( - db_client, + realm_name, device_id, timestamp_ms, total_received_msgs, @@ -561,138 +513,109 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do WHERE device_id=:device_id """ - device_update_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(device_update_statement) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:last_disconnection, timestamp_ms) - |> DatabaseQuery.put(:total_received_msgs, total_received_msgs) - |> DatabaseQuery.put(:total_received_bytes, total_received_bytes) - |> DatabaseQuery.put(:exchanged_bytes_by_interface, interface_exchanged_bytes) - |> DatabaseQuery.put(:exchanged_msgs_by_interface, interface_exchanged_msgs) - |> DatabaseQuery.consistency(:local_quorum) + params = %{ + "device_id" => device_id, + "last_disconnection" => timestamp_ms, + "total_received_msgs" => total_received_msgs, + "total_received_bytes" => total_received_bytes, + "exchanged_bytes_by_interface" => interface_exchanged_bytes, + "exchanged_msgs_by_interface" => interface_exchanged_msgs + } - DatabaseQuery.call!(db_client, device_update_query) + custom_query!(device_update_statement, realm_name, params, consistency: :local_quorum) end - def fetch_device_introspection_minors(db_client, device_id) do + def fetch_device_introspection_minors(realm_name, device_id) do introspection_minor_statement = """ SELECT introspection_minor FROM devices WHERE device_id=:device_id """ - introspection_minor_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(introspection_minor_statement) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.consistency(:quorum) - - with {:ok, result} <- DatabaseQuery.call(db_client, introspection_minor_query), - [introspection_minor: introspection_minors] when is_list(introspection_minors) <- - DatabaseResult.head(result) do - {:ok, Enum.into(introspection_minors, %{})} - else - [introspection_minor: nil] -> - {:ok, %{}} + params = %{"device_id" => device_id} - %{acc: _, msg: error_message} -> - Logger.warn("Database error: #{error_message}.") - {:error, :database_error} - - {:error, reason} -> - Logger.warn("Failed with reason #{inspect(reason)}.") - {:error, :database_error} + with {:ok, result} <- + custom_query(introspection_minor_statement, realm_name, params, + consistency: :quorum, + result: :first! + ) do + %{"introspection_minor" => introspection_minors} = result + introspection_minors = introspection_minors || %{} + {:ok, introspection_minors} end end - def get_device_groups(db_client, device_id) do + def get_device_groups(realm_name, device_id) do groups_statement = """ SELECT groups FROM devices WHERE device_id=:device_id """ - groups_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(groups_statement) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.consistency(:quorum) + params = %{"device_id" => device_id} - with {:ok, result} <- DatabaseQuery.call(db_client, groups_query), - [groups: groups] when is_list(groups) <- DatabaseResult.head(result) do - {:ok, :proplists.get_keys(groups)} - else - [groups: nil] -> - {:ok, []} - - %{acc: _, msg: error_message} -> - Logger.warn("Database error: #{error_message}.", tag: "db_error") - {:error, :database_error} - - {:error, reason} -> - Logger.warn("Failed with reason #{inspect(reason)}.", tag: "db_error") - {:error, :database_error} + with {:ok, result} <- + custom_query(groups_statement, realm_name, params, + consistency: :quorum, + result: :first! + ) do + %{"groups" => groups} = result + groups = groups || %{} + {:ok, Map.keys(groups)} end end - def update_device_introspection!(db_client, device_id, introspection, introspection_minor) do + def update_device_introspection!(realm_name, device_id, introspection, introspection_minor) do introspection_update_statement = """ UPDATE devices SET introspection=:introspection, introspection_minor=:introspection_minor WHERE device_id=:device_id """ - introspection_update_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(introspection_update_statement) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:introspection, introspection) - |> DatabaseQuery.put(:introspection_minor, introspection_minor) - |> DatabaseQuery.consistency(:quorum) + params = %{ + "device_id" => device_id, + "introspection" => introspection, + "introspection_minor" => introspection_minor + } - DatabaseQuery.call!(db_client, introspection_update_query) + custom_query!(introspection_update_statement, realm_name, params, consistency: :quorum) end - def add_old_interfaces(db_client, device_id, old_interfaces) do + def add_old_interfaces(realm_name, device_id, old_interfaces) do old_introspection_update_statement = """ UPDATE devices SET old_introspection = old_introspection + :introspection WHERE device_id=:device_id """ - old_introspection_update_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(old_introspection_update_statement) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:introspection, old_interfaces) - |> DatabaseQuery.consistency(:quorum) + params = %{"device_id" => device_id, "introspection" => old_interfaces} - with {:ok, _result} <- DatabaseQuery.call(db_client, old_introspection_update_query) do + with {:ok, _result} <- + custom_query(old_introspection_update_statement, realm_name, params, + consistency: :quorum + ) do :ok end end - def remove_old_interfaces(db_client, device_id, old_interfaces) do + def remove_old_interfaces(realm_name, device_id, old_interfaces) do old_introspection_remove_statement = """ UPDATE devices SET old_introspection = old_introspection - :old_interfaces WHERE device_id=:device_id """ - old_introspection_remove_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(old_introspection_remove_statement) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:old_interfaces, old_interfaces) - |> DatabaseQuery.consistency(:quorum) + params = %{"device_id" => device_id, "old_interfaces" => old_interfaces} - with {:ok, _result} <- DatabaseQuery.call(db_client, old_introspection_remove_query) do + with {:ok, _result} <- + custom_query(old_introspection_remove_statement, realm_name, params, + consistency: :quorum + ) do :ok end end - def register_device_with_interface(db_client, device_id, interface_name, interface_major) do + def register_device_with_interface(realm_name, device_id, interface_name, interface_major) do key_insert_statement = """ INSERT INTO kv_store (group, key) VALUES (:group, :key) @@ -701,37 +624,31 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do major_str = "v#{Integer.to_string(interface_major)}" encoded_device_id = Device.encode_device_id(device_id) - insert_device_by_interface_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(key_insert_statement) - |> DatabaseQuery.put(:group, "devices-by-interface-#{interface_name}-#{major_str}") - |> DatabaseQuery.put(:key, encoded_device_id) - |> DatabaseQuery.consistency(:each_quorum) - - insert_to_with_data_on_interface = - DatabaseQuery.new() - |> DatabaseQuery.statement(key_insert_statement) - |> DatabaseQuery.put( - :group, - "devices-with-data-on-interface-#{interface_name}-#{major_str}" - ) - |> DatabaseQuery.put(:key, encoded_device_id) - |> DatabaseQuery.consistency(:each_quorum) + devices_by_interface_params = %{ + "group" => "devices-by-interface-#{interface_name}-#{major_str}", + "key" => encoded_device_id + } - with {:ok, _result} <- DatabaseQuery.call(db_client, insert_device_by_interface_query), - {:ok, _result} <- DatabaseQuery.call(db_client, insert_to_with_data_on_interface) do - :ok - else - {:error, reason} -> - Logger.warn( - "Database error: cannot register device-interface pair, reason: #{inspect(reason)}." - ) + data_on_interface_params = %{ + "group" => "devices-with-data-on-interface-#{interface_name}-#{major_str}", + "key" => encoded_device_id + } - {:error, reason} + with {:ok, _result} <- + custom_query(key_insert_statement, realm_name, devices_by_interface_params, + consistency: :each_quorum, + context: "registering device-interface pair" + ), + {:ok, _result} <- + custom_query(key_insert_statement, realm_name, data_on_interface_params, + consistency: :each_quorum, + context: "registering device-interface pair" + ) do + :ok end end - def unregister_device_with_interface(db_client, device_id, interface_name, interface_major) do + def unregister_device_with_interface(realm_name, device_id, interface_name, interface_major) do key_delete_statement = """ DELETE FROM kv_store WHERE group=:group AND key=:key @@ -740,51 +657,34 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do major_str = "v#{Integer.to_string(interface_major)}" encoded_device_id = Device.encode_device_id(device_id) - delete_device_by_interface_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(key_delete_statement) - |> DatabaseQuery.put(:group, "devices-by-interface-#{interface_name}-#{major_str}") - |> DatabaseQuery.put(:key, encoded_device_id) - |> DatabaseQuery.consistency(:each_quorum) + params = %{ + "group" => "devices-by-interface-#{interface_name}-#{major_str}", + "key" => encoded_device_id + } - with {:ok, _result} <- DatabaseQuery.call(db_client, delete_device_by_interface_query) do + with {:ok, _result} <- + custom_query(key_delete_statement, realm_name, params, + consistency: :each_quorum, + context: "unregistering device-interface pair" + ) do :ok - else - {:error, reason} -> - Logger.warn( - "Database error: cannot unregister device-interface pair: #{inspect(reason)}." - ) - - {:error, reason} end end - def check_device_exists(client, device_id) do + def check_device_exists(realm_name, device_id) do device_statement = """ SELECT device_id FROM devices WHERE device_id=:device_id """ - device_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(device_statement) - |> DatabaseQuery.put(:device_id, device_id) + params = %{"device_id" => device_id} - with {:ok, result} <- DatabaseQuery.call(client, device_query), - device_row when is_list(device_row) <- DatabaseResult.head(result) do - {:ok, true} - else - :empty_dataset -> - {:ok, false} - - %{acc: _, msg: error_message} -> - _ = Logger.warn("Database error: #{error_message}.", tag: "db_error") - {:error, :database_error} - - {:error, reason} -> - _ = Logger.warn("Database error, reason: #{inspect(reason)}.", tag: "db_error") - {:error, :database_error} + with {:ok, _result} <- + custom_query(device_statement, realm_name, params, + result: {:first!, :device_does_not_exist} + ) do + :ok end end @@ -805,7 +705,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do value end - def retrieve_endpoint_values(client, device_id, interface_descriptor, mapping) do + def retrieve_endpoint_values(realm_name, device_id, interface_descriptor, mapping) do query_statement = prepare_get_property_statement( mapping.value_type, @@ -814,14 +714,14 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do interface_descriptor.storage_type ) - query = - DatabaseQuery.new() - |> DatabaseQuery.statement(query_statement) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:interface_id, interface_descriptor.interface_id) - |> DatabaseQuery.put(:endpoint_id, mapping.endpoint_id) + params = %{ + "device_id" => device_id, + "interface_id" => interface_descriptor.interface_id, + "endpoint_id" => mapping.endpoint_id + } - DatabaseQuery.call!(client, query) + {:ok, result} = custom_query(query_statement, realm_name, params) + result end defp prepare_get_property_statement( @@ -870,36 +770,24 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do :one end - def fetch_datastream_maximum_storage_retention(client) do + def fetch_datastream_maximum_storage_retention(realm_name) do maximum_storage_retention_statement = """ SELECT blobAsInt(value) FROM kv_store WHERE group='realm_config' AND key='datastream_maximum_storage_retention' """ - query = - DatabaseQuery.new() - |> DatabaseQuery.statement(maximum_storage_retention_statement) - |> DatabaseQuery.consistency(:quorum) - - with {:ok, res} <- DatabaseQuery.call(client, query), - ["system.blobasint(value)": maximum_storage_retention] <- DatabaseResult.head(res) do + with {:ok, result} <- + custom_query(maximum_storage_retention_statement, realm_name, %{}, + result: :first, + consistency: :quorum + ) do + maximum_storage_retention = result["system.blobasint(value)"] {:ok, maximum_storage_retention} - else - :empty_dataset -> - {:ok, nil} - - %{acc: _, msg: error_message} -> - Logger.warn("Database error: #{error_message}.") - {:error, :database_error} - - {:error, reason} -> - Logger.warn("Failed with reason: #{inspect(reason)}.") - {:error, :database_error} end end - def fetch_path_expiry(db_client, device_id, interface_descriptor, %Mapping{} = mapping, path) + def fetch_path_expiry(realm_name, device_id, interface_descriptor, %Mapping{} = mapping, path) when is_binary(device_id) and is_binary(path) do # TODO: do not hardcode individual_properties here fetch_property_value_statement = """ @@ -909,38 +797,178 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do AND endpoint_id=:endpoint_id AND path=:path """ - fetch_property_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(fetch_property_value_statement) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:interface_id, interface_descriptor.interface_id) - |> DatabaseQuery.put(:endpoint_id, mapping.endpoint_id) - |> DatabaseQuery.put(:path, path) - |> DatabaseQuery.consistency(:quorum) - - with {:ok, result} <- DatabaseQuery.call(db_client, fetch_property_query), - ["ttl(datetime_value)": ttl] when is_integer(ttl) <- DatabaseResult.head(result) do - expiry_datetime = - DateTime.utc_now() - |> DateTime.to_unix() - |> :erlang.+(ttl) - |> DateTime.from_unix!() - - {:ok, expiry_datetime} - else - :empty_dataset -> - {:error, :property_not_set} + params = %{ + "device_id" => device_id, + "interface_id" => interface_descriptor.interface_id, + "endpoint_id" => mapping.endpoint_id, + "path" => path + } - ["ttl(datetime_value)": nil] -> + with {:ok, result} <- + custom_query(fetch_property_value_statement, realm_name, params, + consistency: :quorum, + result: {:first!, :property_not_set} + ) do + %{"ttl(datetime_value)" => ttl} = result + + if ttl == nil do {:ok, :no_expiry} + else + expiry_datetime = + DateTime.utc_now() + |> DateTime.to_unix() + |> :erlang.+(ttl) + |> DateTime.from_unix!() - %{acc: _, msg: error_message} -> - Logger.warn("Database error: #{error_message}.") - {:error, :database_error} + {:ok, expiry_datetime} + end + end + end + + # TODO: add to astarte_data_access + def custom_query(statement, realm \\ nil, params \\ %{}, opts \\ []) do + do_custom_query(&execute_query/6, statement, realm, params, opts) + end + + def custom_query!(statement, realm \\ nil, params \\ %{}, opts \\ []) do + do_custom_query(&execute_query!/6, statement, realm, params, opts) + end - {:error, reason} -> - Logger.warn("Database error while retrieving property: #{inspect(reason)}.") - {:error, :database_error} + defp do_custom_query(execute_query, statement, realm, params, opts) do + {custom_opts, query_opts} = parse_opts(opts) + cluster = Config.xandra_options!()[:name] + + Xandra.Cluster.run( + cluster, + &execute_query.(&1, statement, realm, params, query_opts, custom_opts) + ) + end + + defp execute_query(conn, statement, realm, params, query_opts, custom_opts) do + with {:ok, prepared} <- prepare_query(conn, statement, realm) do + case Xandra.execute(conn, prepared, params, query_opts) do + {:ok, result} -> + cast_query_result(result, custom_opts) + + {:error, error} -> + %{message: message, tag: tag} = database_error_message(error, custom_opts[:context]) + + _ = Logger.warn(message, tag: tag) + + {:error, :database_error} + end + end + end + + defp execute_query!(conn, statement, realm, params, query_opts, custom_opts) do + prepared = prepare_query!(conn, statement, realm) + + Xandra.execute!(conn, prepared, params, query_opts) + |> cast_query_result!(custom_opts) + end + + defp use_realm(_conn, nil = _realm), do: :ok + + defp use_realm(conn, realm) when is_binary(realm) do + with true <- Astarte.Core.Realm.valid_name?(realm), + {:ok, %Xandra.SetKeyspace{}} <- Xandra.execute(conn, "USE #{realm}") do + :ok + else + _ -> {:error, :realm_not_found} end end + + defp prepare_query(conn, statement, realm) do + with :ok <- use_realm(conn, realm) do + case Xandra.prepare(conn, statement) do + {:ok, page} -> + {:ok, page} + + {:error, reason} -> + _ = Logger.warn("Cannot prepare query: #{inspect(reason)}.", tag: "db_error") + {:error, :database_error} + end + end + end + + defp prepare_query!(conn, statement, realm) do + :ok = use_realm(conn, realm) + Xandra.prepare!(conn, statement) + end + + defp parse_opts(opts) do + {custom_opts, query_opts} = Keyword.split(opts, Keyword.keys(@default_custom_query_opts)) + query_opts = Keyword.merge(@default_query_opts, query_opts) + custom_opts = Keyword.validate!(custom_opts, @default_custom_query_opts) + + {custom_opts, query_opts} + end + + defp cast_query_result(result, opts) do + result_with_defaults = + case opts[:result] do + :first -> {:first, nil} + :first! -> {:first!, :not_found} + x -> x + end + + case result_with_defaults do + :default -> + {:ok, result} + + :list -> + {:ok, Enum.to_list(result)} + + {:first, default} -> + {:ok, Enum.at(result, 0, default)} + + {:first!, error} -> + Enum.fetch(result, 0) + |> case do + :error -> {:error, error} + {:ok, first} -> {:ok, first} + end + end + end + + defp cast_query_result!(result, opts) do + case opts[:result] do + :default -> + result + + :list -> + Enum.to_list(result) + + :first -> + Enum.at(result, 0) + + {:first, default} -> + Enum.at(result, 0, default) + + :first! -> + Enum.fetch!(result, 0) + end + end + + defp database_error_message(%Xandra.Error{message: message, reason: reason}, nil = _context) do + %{message: "Database error #{reason}: #{message}", tag: "db_error"} + end + + defp database_error_message(%Xandra.Error{message: message, reason: reason}, context) do + %{message: "Database error #{reason} during #{context}: #{message}", tag: "db_error"} + end + + defp database_error_message( + %Xandra.ConnectionError{action: action, reason: reason}, + nil = _context + ) do + %{message: "Database connection error: #{reason} (#{action})", tag: "db_connection_error"} + end + + defp database_error_message(%Xandra.ConnectionError{action: action, reason: reason}, context) do + %{ + message: "Database connection error during #{context}: #{reason} (#{action})", + tag: "db_connection_error" + } + end end diff --git a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs index da59b6df5..405233a6a 100644 --- a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs +++ b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs @@ -33,21 +33,19 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do alias Astarte.Core.Triggers.SimpleTriggersProtobuf.DeviceTrigger alias Astarte.Core.Triggers.SimpleTriggersProtobuf.SimpleTriggerContainer alias Astarte.Core.Triggers.SimpleTriggersProtobuf.TriggerTargetContainer - alias Astarte.DataAccess.Database + alias Astarte.DataUpdaterPlant.DataUpdater.Queries alias Astarte.DataUpdaterPlant.AMQPTestHelper alias Astarte.DataUpdaterPlant.DatabaseTestHelper alias Astarte.DataUpdaterPlant.DataUpdater alias Astarte.Core.CQLUtils - alias CQEx.Query, as: DatabaseQuery - alias CQEx.Result, as: DatabaseResult setup_all do DatabaseTestHelper.await_cluster_connected!() - {:ok, _client} = Astarte.DataUpdaterPlant.DatabaseTestHelper.create_test_keyspace() + DatabaseTestHelper.create_test_keyspace!() {:ok, _pid} = AMQPTestHelper.start_link() on_exit(fn -> - Astarte.DataUpdaterPlant.DatabaseTestHelper.destroy_local_test_keyspace() + DatabaseTestHelper.destroy_local_test_keyspace() end) end @@ -72,8 +70,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do DatabaseTestHelper.insert_device(device_id, insert_opts) - {:ok, db_client} = Database.connect(realm: realm) - # Install a volatile device test trigger simple_trigger_data = %SimpleTriggerContainer{ @@ -179,38 +175,34 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do simple_trigger_id: DatabaseTestHelper.device_connected_trigger_id() } - device_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(""" - SELECT connected, total_received_msgs, total_received_bytes, - exchanged_msgs_by_interface, exchanged_bytes_by_interface - FROM devices WHERE device_id=:device_id; - """) - |> DatabaseQuery.put(:device_id, device_id) + device_id_param = %{"device_id" => device_id} + + device_info_statement = """ + SELECT connected, total_received_msgs, total_received_bytes, + exchanged_msgs_by_interface, exchanged_bytes_by_interface + FROM devices WHERE device_id=:device_id; + """ device_row = - DatabaseQuery.call!(db_client, device_query) - |> DatabaseResult.head() - - assert device_row == [ - connected: true, - total_received_msgs: 45000, - total_received_bytes: 4_500_000, - exchanged_msgs_by_interface: nil, - exchanged_bytes_by_interface: nil - ] + Queries.custom_query!(device_info_statement, realm, device_id_param, result: :first!) + + assert device_row == %{ + "connected" => true, + "total_received_msgs" => 45000, + "total_received_bytes" => 4_500_000, + "exchanged_msgs_by_interface" => nil, + "exchanged_bytes_by_interface" => nil + } # Introspection sub-test - device_introspection_query = - DatabaseQuery.new() - |> DatabaseQuery.statement("SELECT introspection FROM devices WHERE device_id=:device_id;") - |> DatabaseQuery.put(:device_id, device_id) + device_introspection_statement = + "SELECT introspection FROM devices WHERE device_id=:device_id;" ^existing_introspection_map = - DatabaseQuery.call!(db_client, device_introspection_query) - |> DatabaseResult.head() - |> Keyword.get(:introspection) - |> Enum.into(%{}) + Queries.custom_query!(device_introspection_statement, realm, device_id_param, + result: :first + ) + |> Map.fetch!("introspection") # Install a volatile incoming introspection test trigger incoming_introspection_trigger_data = @@ -541,10 +533,10 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do DataUpdater.dump_state(realm, encoded_device_id) device_introspection = - DatabaseQuery.call!(db_client, device_introspection_query) - |> DatabaseResult.head() - |> Keyword.get(:introspection) - |> Enum.into(%{}) + Queries.custom_query!(device_introspection_statement, realm, device_id_param, + result: :first! + ) + |> Map.fetch!("introspection") assert existing_introspection_map == device_introspection @@ -945,7 +937,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do # We check that all 3 on_incoming_data triggers were correctly installed interface_id = CQLUtils.interface_id("com.test.SimpleStreamTest", 1) - endpoint_id = retrieve_endpoint_id(db_client, "com.test.SimpleStreamTest", 1, "/0/value") + endpoint_id = retrieve_endpoint_id(realm, "com.test.SimpleStreamTest", 1, "/0/value") trigger_key = {:on_incoming_data, interface_id, endpoint_id} incoming_data_0_value_triggers = Map.get(state.data_triggers, trigger_key) @@ -960,42 +952,44 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do # It should have 2 targets assert length(gt_trigger.trigger_targets) == 2 - endpoint_id = retrieve_endpoint_id(db_client, "com.test.LCDMonitor", 1, "/time/from") + endpoint_id = retrieve_endpoint_id(realm, "com.test.LCDMonitor", 1, "/time/from") - value_query = - DatabaseQuery.new() - |> DatabaseQuery.statement( - "SELECT longinteger_value FROM individual_properties WHERE device_id=:device_id AND interface_id=:interface_id AND endpoint_id=:endpoint_id AND path=:path" - ) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:interface_id, CQLUtils.interface_id("com.test.LCDMonitor", 1)) - |> DatabaseQuery.put(:endpoint_id, endpoint_id) - |> DatabaseQuery.put(:path, "/time/from") + value_property_statement = + "SELECT longinteger_value FROM individual_properties WHERE device_id=:device_id AND interface_id=:interface_id AND endpoint_id=:endpoint_id AND path=:path" + + value_property_params = %{ + "device_id" => device_id, + "interface_id" => CQLUtils.interface_id("com.test.LCDMonitor", 1), + "endpoint_id" => endpoint_id, + "path" => "/time/from" + } value = - DatabaseQuery.call!(db_client, value_query) - |> DatabaseResult.head() + Queries.custom_query!(value_property_statement, realm, value_property_params, + result: :first! + ) - assert value == [longinteger_value: 9000] + assert value == %{"longinteger_value" => 9000} - endpoint_id = retrieve_endpoint_id(db_client, "com.test.SimpleStreamTest", 1, "/0/value") + endpoint_id = retrieve_endpoint_id(realm, "com.test.SimpleStreamTest", 1, "/0/value") - value_query = - DatabaseQuery.new() - |> DatabaseQuery.statement( - "SELECT integer_value FROM individual_datastreams WHERE device_id=:device_id AND interface_id=:interface_id AND endpoint_id=:endpoint_id AND path=:path AND value_timestamp>=:value_timestamp" - ) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:interface_id, CQLUtils.interface_id("com.test.SimpleStreamTest", 1)) - |> DatabaseQuery.put(:endpoint_id, endpoint_id) - |> DatabaseQuery.put(:path, "/0/value") - |> DatabaseQuery.put(:value_timestamp, 1_507_557_632_000) + value_datastream_statement = + "SELECT integer_value FROM individual_datastreams WHERE device_id=:device_id AND interface_id=:interface_id AND endpoint_id=:endpoint_id AND path=:path AND value_timestamp>=:value_timestamp" + + value_datastream_params = %{ + "device_id" => device_id, + "interface_id" => CQLUtils.interface_id("com.test.SimpleStreamTest", 1), + "endpoint_id" => endpoint_id, + "path" => "/0/value", + "value_timestamp" => 1_507_557_632_000 + } value = - DatabaseQuery.call!(db_client, value_query) - |> DatabaseResult.head() + Queries.custom_query!(value_datastream_statement, realm, value_datastream_params, + result: :first! + ) - assert value == [integer_value: 5] + assert value == %{"integer_value" => 5} assert DataUpdater.handle_delete_volatile_trigger( realm, @@ -1004,7 +998,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do ) == :ok timestamp_us_x_10 = make_timestamp("2017-10-09T14:15:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) + _timestamp_ms = div(timestamp_us_x_10, 10_000) # Introspection change subtest DataUpdater.handle_introspection( @@ -1040,7 +1034,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do make_timestamp("2017-10-26T08:48:50+00:00") ) - payload2 = Cyanide.encode!(%{"v" => %{"value" => 0}}) + payload2 = Cyanide.encode!(%{"v" => %{"value" => 0.0}}) DataUpdater.handle_data( realm, @@ -1079,74 +1073,68 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do DataUpdater.dump_state(realm, encoded_device_id) - objects_query = - DatabaseQuery.new() - |> DatabaseQuery.statement( - "SELECT * FROM com_example_testobject_v1 WHERE device_id=:device_id AND path='/'" - ) - |> DatabaseQuery.put(:device_id, device_id) + objects_statement = + "SELECT * FROM com_example_testobject_v1 WHERE device_id=:device_id AND path='/'" - objects = - DatabaseQuery.call!(db_client, objects_query) - |> Enum.to_list() + objects = Queries.custom_query!(objects_statement, realm, device_id_param, result: :list) assert objects == [ - [ - device_id: device_id, - path: "/", - reception_timestamp: 1_506_755_400_000, - reception_timestamp_submillis: 0, - v_string: "aaa", - v_value: 1.1 - ], - [ - device_id: device_id, - path: "/", - reception_timestamp: 1_506_755_520_000, - reception_timestamp_submillis: 0, - v_string: "bbb", - v_value: 2.2 - ], - [ - device_id: device_id, - path: "/", - reception_timestamp: 1_506_755_580_000, - reception_timestamp_submillis: 0, - v_string: "zzz", - v_value: 3.3 - ], - [ - device_id: device_id, - path: "/", - reception_timestamp: 1_509_007_729_000, - reception_timestamp_submillis: 0, - v_string: "Astarteです", - v_value: 1.9 - ], - [ - device_id: device_id, - path: "/", - reception_timestamp: 1_509_007_730_000, - reception_timestamp_submillis: 0, - v_string: "Hello World');", - v_value: nil - ], - [ - device_id: device_id, - path: "/", - reception_timestamp: 1_509_007_731_000, - reception_timestamp_submillis: 0, - v_string: nil, - v_value: 0.0 - ], - [ - device_id: device_id, - path: "/", - reception_timestamp: 1_509_347_580_000, - reception_timestamp_submillis: 0, - v_string: nil, - v_value: nil - ] + %{ + "device_id" => device_id, + "path" => "/", + "reception_timestamp" => 1_506_755_400_000, + "reception_timestamp_submillis" => 0, + "v_string" => "aaa", + "v_value" => 1.1 + }, + %{ + "device_id" => device_id, + "path" => "/", + "reception_timestamp" => 1_506_755_520_000, + "reception_timestamp_submillis" => 0, + "v_string" => "bbb", + "v_value" => 2.2 + }, + %{ + "device_id" => device_id, + "path" => "/", + "reception_timestamp" => 1_506_755_580_000, + "reception_timestamp_submillis" => 0, + "v_string" => "zzz", + "v_value" => 3.3 + }, + %{ + "device_id" => device_id, + "path" => "/", + "reception_timestamp" => 1_509_007_729_000, + "reception_timestamp_submillis" => 0, + "v_string" => "Astarteです", + "v_value" => 1.9 + }, + %{ + "device_id" => device_id, + "path" => "/", + "reception_timestamp" => 1_509_007_730_000, + "reception_timestamp_submillis" => 0, + "v_string" => "Hello World');", + "v_value" => nil + }, + %{ + "device_id" => device_id, + "path" => "/", + "reception_timestamp" => 1_509_007_731_000, + "reception_timestamp_submillis" => 0, + "v_string" => nil, + "v_value" => 0.0 + }, + %{ + "device_id" => device_id, + "path" => "/", + "reception_timestamp" => 1_509_347_580_000, + "reception_timestamp_submillis" => 0, + "v_string" => nil, + "v_value" => nil + } ] # Test /producer/properties control message @@ -1189,80 +1177,47 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do simple_trigger_id: DatabaseTestHelper.path_removed_trigger_id() } - endpoint_id = retrieve_endpoint_id(db_client, "com.test.LCDMonitor", 1, "/time/from") - - value_query = - DatabaseQuery.new() - |> DatabaseQuery.statement( - "SELECT longinteger_value FROM individual_properties WHERE device_id=:device_id AND interface_id=:interface_id AND endpoint_id=:endpoint_id AND path=:path" + value = + Queries.custom_query(value_property_statement, realm, value_property_params, + result: :first! ) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:interface_id, CQLUtils.interface_id("com.test.LCDMonitor", 1)) - |> DatabaseQuery.put(:endpoint_id, endpoint_id) - |> DatabaseQuery.put(:path, "/time/from") - value = - DatabaseQuery.call!(db_client, value_query) - |> DatabaseResult.head() + assert value == {:error, :not_found} - assert value == :empty_dataset + endpoint_id = retrieve_endpoint_id(realm, "com.test.LCDMonitor", 1, "/weekSchedule/9/start") - endpoint_id = - retrieve_endpoint_id(db_client, "com.test.LCDMonitor", 1, "/weekSchedule/9/start") + week9_value_property_params = + value_property_params + |> Map.put("endpoint_id", endpoint_id) + |> Map.put("path", "/weekSchedule/9/start") - value_query = - DatabaseQuery.new() - |> DatabaseQuery.statement( - "SELECT longinteger_value FROM individual_properties WHERE device_id=:device_id AND interface_id=:interface_id AND endpoint_id=:endpoint_id AND path=:path" + week9_value = + Queries.custom_query(value_property_statement, realm, week9_value_property_params, + result: :first! ) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:interface_id, CQLUtils.interface_id("com.test.LCDMonitor", 1)) - |> DatabaseQuery.put(:endpoint_id, endpoint_id) - |> DatabaseQuery.put(:path, "/weekSchedule/9/start") - value = - DatabaseQuery.call!(db_client, value_query) - |> DatabaseResult.head() + assert week9_value == {:error, :not_found} - assert value == :empty_dataset + endpoint_id = retrieve_endpoint_id(realm, "com.test.LCDMonitor", 1, "/weekSchedule/10/start") - endpoint_id = - retrieve_endpoint_id(db_client, "com.test.LCDMonitor", 1, "/weekSchedule/10/start") + week10_value_property_params = + value_property_params + |> Map.put("endpoint_id", endpoint_id) + |> Map.put("path", "/weekSchedule/10/start") - value_query = - DatabaseQuery.new() - |> DatabaseQuery.statement( - "SELECT longinteger_value FROM individual_properties WHERE device_id=:device_id AND interface_id=:interface_id AND endpoint_id=:endpoint_id AND path=:path" + week10_value = + Queries.custom_query!(value_property_statement, realm, week10_value_property_params, + result: :first! ) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:interface_id, CQLUtils.interface_id("com.test.LCDMonitor", 1)) - |> DatabaseQuery.put(:endpoint_id, endpoint_id) - |> DatabaseQuery.put(:path, "/weekSchedule/10/start") - value = - DatabaseQuery.call!(db_client, value_query) - |> DatabaseResult.head() - - assert value == [longinteger_value: 10] - - endpoint_id = retrieve_endpoint_id(db_client, "com.test.SimpleStreamTest", 1, "/0/value") + assert week10_value == %{"longinteger_value" => 10} - value_query = - DatabaseQuery.new() - |> DatabaseQuery.statement( - "SELECT integer_value FROM individual_datastreams WHERE device_id=:device_id AND interface_id=:interface_id AND endpoint_id=:endpoint_id AND path=:path AND value_timestamp>=:value_timestamp" + datastream_value = + Queries.custom_query!(value_datastream_statement, realm, value_datastream_params, + result: :first! ) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:interface_id, CQLUtils.interface_id("com.test.SimpleStreamTest", 1)) - |> DatabaseQuery.put(:endpoint_id, endpoint_id) - |> DatabaseQuery.put(:path, "/0/value") - |> DatabaseQuery.put(:value_timestamp, 1_507_557_632_000) - value = - DatabaseQuery.call!(db_client, value_query) - |> DatabaseResult.head() - - assert value == [integer_value: 5] + assert datastream_value == %{"integer_value" => 5} # Unset subtest @@ -1285,24 +1240,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do DataUpdater.dump_state(realm, encoded_device_id) - endpoint_id = - retrieve_endpoint_id(db_client, "com.test.LCDMonitor", 1, "/weekSchedule/10/start") - - value_query = - DatabaseQuery.new() - |> DatabaseQuery.statement( - "SELECT longinteger_value FROM individual_properties WHERE device_id=:device_id AND interface_id=:interface_id AND endpoint_id=:endpoint_id AND path=:path" - ) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:interface_id, CQLUtils.interface_id("com.test.LCDMonitor", 1)) - |> DatabaseQuery.put(:endpoint_id, endpoint_id) - |> DatabaseQuery.put(:path, "/weekSchedule/10/start") - value = - DatabaseQuery.call!(db_client, value_query) - |> DatabaseResult.head() + Queries.custom_query(value_property_statement, realm, week10_value_property_params, + result: :first! + ) - assert value == :empty_dataset + assert value == {:error, :not_found} # Device disconnection sub-test DataUpdater.handle_disconnection( @@ -1315,24 +1258,23 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do DataUpdater.dump_state(realm, encoded_device_id) device_row = - DatabaseQuery.call!(db_client, device_query) - |> DatabaseResult.head() - - assert device_row == [ - connected: false, - total_received_msgs: 45018, - total_received_bytes: 4_501_003, - exchanged_msgs_by_interface: [ - {["com.example.TestObject", 1], 5}, - {["com.test.LCDMonitor", 1], 6}, - {["com.test.SimpleStreamTest", 1], 1} - ], - exchanged_bytes_by_interface: [ - {["com.example.TestObject", 1], 243}, - {["com.test.LCDMonitor", 1], 291}, - {["com.test.SimpleStreamTest", 1], 45} - ] - ] + Queries.custom_query!(device_info_statement, realm, device_id_param, result: :first!) + + assert device_row == %{ + "connected" => false, + "total_received_msgs" => 45018, + "total_received_bytes" => 4_501_007, + "exchanged_msgs_by_interface" => %{ + {"com.example.TestObject", 1} => 5, + {"com.test.LCDMonitor", 1} => 6, + {"com.test.SimpleStreamTest", 1} => 1 + }, + "exchanged_bytes_by_interface" => %{ + {"com.example.TestObject", 1} => 247, + {"com.test.LCDMonitor", 1} => 291, + {"com.test.SimpleStreamTest", 1} => 45 + } + } assert AMQPTestHelper.awaiting_messages_count() == 0 end @@ -1352,8 +1294,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do DatabaseTestHelper.insert_device(device_id, groups: ["group2"]) - {:ok, db_client} = Database.connect(realm: realm) - timestamp_us_x_10 = make_timestamp("2017-12-09T14:00:32+00:00") timestamp_ms = div(timestamp_us_x_10, 10_000) @@ -1392,20 +1332,20 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do simple_trigger_id: DatabaseTestHelper.group2_device_connected_trigger_id() } - device_introspection_query = - DatabaseQuery.new() - |> DatabaseQuery.statement("SELECT introspection FROM devices WHERE device_id=:device_id;") - |> DatabaseQuery.put(:device_id, device_id) + device_introspection_statement = + "SELECT introspection FROM devices WHERE device_id=:device_id;" + + device_id_param = %{"device_id" => device_id} old_device_introspection = - DatabaseQuery.call!(db_client, device_introspection_query) - |> DatabaseResult.head() - |> Keyword.get(:introspection) + device_introspection_statement + |> Queries.custom_query!(realm, device_id_param, result: :first!) + |> Map.fetch!("introspection") assert old_device_introspection == nil timestamp_us_x_10 = make_timestamp("2017-10-09T14:00:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) + _timestamp_ms = div(timestamp_us_x_10, 10_000) DataUpdater.handle_introspection( realm, @@ -1418,10 +1358,9 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do DataUpdater.dump_state(realm, encoded_device_id) new_device_introspection = - DatabaseQuery.call!(db_client, device_introspection_query) - |> DatabaseResult.head() - |> Keyword.get(:introspection) - |> Enum.into(%{}) + device_introspection_statement + |> Queries.custom_query!(realm, device_id_param, result: :first!) + |> Map.fetch!("introspection") assert new_device_introspection == new_introspection_map @@ -1440,7 +1379,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do {:ok, device_id} = Device.decode_device_id(encoded_device_id) DatabaseTestHelper.insert_device(device_id) - {:ok, db_client} = Database.connect(realm: realm) DataUpdater.handle_connection( realm, @@ -1461,7 +1399,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do ) DataUpdater.dump_state(realm, encoded_device_id) - assert DatabaseTestHelper.fetch_old_introspection(db_client, device_id) == {:ok, %{}} + assert DatabaseTestHelper.fetch_old_introspection(realm, device_id) == {:ok, %{}} new_introspection_string = "com.test.LCDMonitor:2:0;com.test.SimpleStreamTest:1:0" @@ -1474,12 +1412,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do ) DataUpdater.dump_state(realm, encoded_device_id) - DatabaseTestHelper.fetch_old_introspection(db_client, device_id) + DatabaseTestHelper.fetch_old_introspection(realm, device_id) - assert DatabaseTestHelper.fetch_old_introspection(db_client, device_id) == + assert DatabaseTestHelper.fetch_old_introspection(realm, device_id) == {:ok, %{ - ["com.test.LCDMonitor", 1] => 0 + {"com.test.LCDMonitor", 1} => 0 }} new_introspection_string = "com.test.LCDMonitor:2:0" @@ -1494,11 +1432,11 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do DataUpdater.dump_state(realm, encoded_device_id) - assert DatabaseTestHelper.fetch_old_introspection(db_client, device_id) == + assert DatabaseTestHelper.fetch_old_introspection(realm, device_id) == {:ok, %{ - ["com.test.LCDMonitor", 1] => 0, - ["com.test.SimpleStreamTest", 1] => 0 + {"com.test.LCDMonitor", 1} => 0, + {"com.test.SimpleStreamTest", 1} => 0 }} end @@ -1507,8 +1445,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do realm = "autotestrealm" - {:ok, db_client} = Database.connect(realm: realm) - # Install a volatile device test trigger simple_trigger_data = %SimpleTriggerContainer{ @@ -1553,11 +1489,10 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do test "fails to delete volatile trigger on missing device" do AMQPTestHelper.clean_queue() realm = "autotestrealm" - {:ok, db_client} = Database.connect(realm: realm) volatile_trigger_id = :crypto.strong_rand_bytes(16) fail_encoded_device_id = "f0VMRgIBAQBBBBBBBBBBBB" - {:ok, fail_device_id} = Device.decode_device_id(fail_encoded_device_id) + {:ok, _fail_device_id} = Device.decode_device_id(fail_encoded_device_id) assert DataUpdater.handle_delete_volatile_trigger( realm, @@ -1581,10 +1516,8 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do DatabaseTestHelper.insert_device(device_id) - {:ok, db_client} = Database.connect(realm: realm) - timestamp_us_x_10 = make_timestamp("2017-12-09T14:00:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) + _timestamp_ms = div(timestamp_us_x_10, 10_000) # Make sure a process for the device exists DataUpdater.handle_connection( @@ -1626,10 +1559,8 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do DatabaseTestHelper.insert_device(device_id) - {:ok, db_client} = Database.connect(realm: realm) - timestamp_us_x_10 = make_timestamp("2017-12-09T14:00:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) + _timestamp_ms = div(timestamp_us_x_10, 10_000) # Make sure a process for the device exists DataUpdater.handle_connection( @@ -1653,23 +1584,20 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do DataUpdater.dump_state(realm, encoded_device_id) end - defp retrieve_endpoint_id(client, interface_name, interface_major, path) do - query = - DatabaseQuery.new() - |> DatabaseQuery.statement( - "SELECT * FROM interfaces WHERE name = :name AND major_version = :major_version;" - ) - |> DatabaseQuery.put(:name, interface_name) - |> DatabaseQuery.put(:major_version, interface_major) + defp retrieve_endpoint_id(realm, interface_name, interface_major, path) do + interface_statement = + "SELECT * FROM interfaces WHERE name = :name AND major_version = :major_version;" + + params = %{ + "name" => interface_name, + "major_version" => interface_major + } - interface_row = - DatabaseQuery.call!(client, query) - |> Enum.take(1) - |> hd + interface_row = Queries.custom_query!(interface_statement, realm, params, result: :first!) automaton = - {:erlang.binary_to_term(interface_row[:automaton_transitions]), - :erlang.binary_to_term(interface_row[:automaton_accepting_states])} + {:erlang.binary_to_term(interface_row["automaton_transitions"]), + :erlang.binary_to_term(interface_row["automaton_accepting_states"])} {:ok, endpoint_id} = Astarte.Core.Mapping.EndpointsAutomaton.resolve_path(path, automaton) diff --git a/apps/astarte_data_updater_plant/test/support/database_test_helper.exs b/apps/astarte_data_updater_plant/test/support/database_test_helper.exs index d3a63d0a5..042d8530e 100644 --- a/apps/astarte_data_updater_plant/test/support/database_test_helper.exs +++ b/apps/astarte_data_updater_plant/test/support/database_test_helper.exs @@ -17,20 +17,18 @@ # defmodule Astarte.DataUpdaterPlant.DatabaseTestHelper do + alias Astarte.DataUpdaterPlant.TriggerPolicy.Queries alias Astarte.Core.CQLUtils alias Astarte.Core.Device alias Astarte.Core.Triggers.SimpleTriggersProtobuf.AMQPTriggerTarget alias Astarte.Core.Triggers.SimpleTriggersProtobuf.DataTrigger alias Astarte.Core.Triggers.SimpleTriggersProtobuf.DeviceTrigger - alias Astarte.Core.Triggers.SimpleTriggersProtobuf.IntrospectionTrigger alias Astarte.Core.Triggers.SimpleTriggersProtobuf.SimpleTriggerContainer alias Astarte.Core.Triggers.SimpleTriggersProtobuf.TriggerTargetContainer alias Astarte.Core.Triggers.SimpleTriggersProtobuf.Utils, as: SimpleTriggersProtobufUtils alias Astarte.DataUpdaterPlant.AMQPTestHelper alias Astarte.DataUpdaterPlant.Config - alias CQEx.Query, as: DatabaseQuery - alias CQEx.Client, as: DatabaseClient - alias CQEx.Result, as: DatabaseResult + alias Astarte.DataUpdaterPlant.DataUpdater.Queries @create_autotestrealm """ CREATE KEYSPACE autotestrealm @@ -354,475 +352,390 @@ defmodule Astarte.DataUpdaterPlant.DatabaseTestHelper do VALUES (:object_id, :object_type, :parent_trigger_id, :simple_trigger_id, :trigger_data, :trigger_target); """ - def create_test_keyspace do - {:ok, client} = DatabaseClient.new(List.first(Config.cqex_nodes!())) - - case DatabaseQuery.call(client, @create_autotestrealm) do - {:ok, _} -> - DatabaseQuery.call!(client, @create_devices_table) - DatabaseQuery.call!(client, @create_endpoints_table) - - Enum.each(@insert_endpoints, fn query -> - DatabaseQuery.call!(client, query) - end) - - DatabaseQuery.call!(client, @create_simple_triggers_table) - DatabaseQuery.call!(client, @create_individual_properties_table) - DatabaseQuery.call!(client, @create_individual_datastreams_table) - DatabaseQuery.call!(client, @create_test_object_table) - - Enum.each(@insert_values, fn query -> - DatabaseQuery.call!(client, query) - end) - - DatabaseQuery.call!(client, @create_interfaces_table) - DatabaseQuery.call!(client, @create_kv_store_table) - - query = - DatabaseQuery.new() - |> DatabaseQuery.statement(@insert_into_interface_0) - |> DatabaseQuery.put( - :automaton_accepting_states, - Base.decode64!( - "g3QAAAAFYQNtAAAAEIAeEDVf33Bpjm4/0nkmmathBG0AAAAQjrtis2DBS6JBcp3e3YCcn2EFbQAAABBP5QNKPZuZ7H7DsjcWMD0zYQdtAAAAEOb3NjHv/B1+rVLT86O65QthCG0AAAAQKyxj3bvZVzVtSo5W9QTt2g==" - ) - ) - |> DatabaseQuery.put( - :automaton_transitions, - Base.decode64!( - "g3QAAAAIaAJhAG0AAAAKbGNkQ29tbWFuZGEFaAJhAG0AAAAEdGltZWEGaAJhAG0AAAAMd2Vla1NjaGVkdWxlYQFoAmEBbQAAAABhAmgCYQJtAAAABXN0YXJ0YQNoAmECbQAAAARzdG9wYQRoAmEGbQAAAARmcm9tYQdoAmEGbQAAAAJ0b2EI" - ) - ) - - DatabaseQuery.call!(client, query) - - query = - DatabaseQuery.new() - |> DatabaseQuery.statement(@insert_into_interface_1) - |> DatabaseQuery.put( - :automaton_accepting_states, - Base.decode64!( - "g3QAAAAFYQJtAAAAEHUBDhsZnu783TXSVLDiCSRhBW0AAAAQOQfUHVvKMp2eUUzqKlSpmmEGbQAAABB6pEwRInNH2eYkSuAp3t6qYQdtAAAAEO/5V88D397tl4SocI49jLlhCG0AAAAQNGyA5MqZYnSB9nscG+WVIQ==" - ) - ) - |> DatabaseQuery.put( - :automaton_transitions, - Base.decode64!( - "g3QAAAAIaAJhAG0AAAAAYQFoAmEAbQAAAANmb29hA2gCYQFtAAAABXZhbHVlYQJoAmEDbQAAAABhBGgCYQRtAAAACWJsb2JWYWx1ZWEGaAJhBG0AAAAJbG9uZ1ZhbHVlYQdoAmEEbQAAAAtzdHJpbmdWYWx1ZWEFaAJhBG0AAAAOdGltZXN0YW1wVmFsdWVhCA==" - ) - ) - - DatabaseQuery.call!(client, query) - - query = - DatabaseQuery.new() - |> DatabaseQuery.statement(@insert_into_interface_2) - |> DatabaseQuery.put( - :automaton_accepting_states, - Base.decode64!( - "g3QAAAACYQFtAAAAEHyfFOhPL5d/wSbV4buYdudhAm0AAAAQOzn9OuJhJv/lI0wt0VC4ZA==" - ) - ) - |> DatabaseQuery.put( - :automaton_transitions, - Base.decode64!("g3QAAAACaAJhAG0AAAAGc3RyaW5nYQFoAmEAbQAAAAV2YWx1ZWEC") - ) - - DatabaseQuery.call!(client, query) - - simple_trigger_data = - %Astarte.Core.Triggers.SimpleTriggersProtobuf.SimpleTriggerContainer{ - simple_trigger: { - :data_trigger, - %Astarte.Core.Triggers.SimpleTriggersProtobuf.DataTrigger{ - interface_name: "com.test.LCDMonitor", - interface_major: 1, - data_trigger_type: :INCOMING_DATA, - match_path: "/weekSchedule/%{weekDay}/start", - value_match_operator: :GREATER_THAN, - known_value: Cyanide.encode!(%{v: 9}) - } - } + def create_test_keyspace! do + Queries.custom_query!(@create_autotestrealm) + Queries.custom_query!(@create_devices_table) + Queries.custom_query!(@create_endpoints_table) + + @insert_endpoints + |> Enum.each(&Queries.custom_query!/1) + + Queries.custom_query!(@create_simple_triggers_table) + Queries.custom_query!(@create_individual_properties_table) + Queries.custom_query!(@create_individual_datastreams_table) + Queries.custom_query!(@create_test_object_table) + + @insert_values + |> Enum.each(&Queries.custom_query!/1) + + Queries.custom_query!(@create_interfaces_table) + Queries.custom_query!(@create_kv_store_table) + + params_interface_0 = %{ + "automaton_accepting_states" => + Base.decode64!( + "g3QAAAAFYQNtAAAAEIAeEDVf33Bpjm4/0nkmmathBG0AAAAQjrtis2DBS6JBcp3e3YCcn2EFbQAAABBP5QNKPZuZ7H7DsjcWMD0zYQdtAAAAEOb3NjHv/B1+rVLT86O65QthCG0AAAAQKyxj3bvZVzVtSo5W9QTt2g==" + ), + "automaton_transitions" => + Base.decode64!( + "g3QAAAAIaAJhAG0AAAAKbGNkQ29tbWFuZGEFaAJhAG0AAAAEdGltZWEGaAJhAG0AAAAMd2Vla1NjaGVkdWxlYQFoAmEBbQAAAABhAmgCYQJtAAAABXN0YXJ0YQNoAmECbQAAAARzdG9wYQRoAmEGbQAAAARmcm9tYQdoAmEGbQAAAAJ0b2EI" + ) + } + + params_interface_1 = %{ + "automaton_accepting_states" => + Base.decode64!( + "g3QAAAAFYQJtAAAAEHUBDhsZnu783TXSVLDiCSRhBW0AAAAQOQfUHVvKMp2eUUzqKlSpmmEGbQAAABB6pEwRInNH2eYkSuAp3t6qYQdtAAAAEO/5V88D397tl4SocI49jLlhCG0AAAAQNGyA5MqZYnSB9nscG+WVIQ==" + ), + "automaton_transitions" => + Base.decode64!( + "g3QAAAAIaAJhAG0AAAAAYQFoAmEAbQAAAANmb29hA2gCYQFtAAAABXZhbHVlYQJoAmEDbQAAAABhBGgCYQRtAAAACWJsb2JWYWx1ZWEGaAJhBG0AAAAJbG9uZ1ZhbHVlYQdoAmEEbQAAAAtzdHJpbmdWYWx1ZWEFaAJhBG0AAAAOdGltZXN0YW1wVmFsdWVhCA==" + ) + } + + params_interface_2 = %{ + "automaton_accepting_states" => + Base.decode64!("g3QAAAACYQFtAAAAEHyfFOhPL5d/wSbV4buYdudhAm0AAAAQOzn9OuJhJv/lI0wt0VC4ZA=="), + "automaton_transitions" => + Base.decode64!("g3QAAAACaAJhAG0AAAAGc3RyaW5nYQFoAmEAbQAAAAV2YWx1ZWEC") + } + + Queries.custom_query(@insert_into_interface_0, nil, params_interface_0) + + Queries.custom_query(@insert_into_interface_1, nil, params_interface_1) + + Queries.custom_query(@insert_into_interface_2, nil, params_interface_2) + + simple_trigger_data = + %Astarte.Core.Triggers.SimpleTriggersProtobuf.SimpleTriggerContainer{ + simple_trigger: { + :data_trigger, + %Astarte.Core.Triggers.SimpleTriggersProtobuf.DataTrigger{ + interface_name: "com.test.LCDMonitor", + interface_major: 1, + data_trigger_type: :INCOMING_DATA, + match_path: "/weekSchedule/%{weekDay}/start", + value_match_operator: :GREATER_THAN, + known_value: Cyanide.encode!(%{v: 9}) } - |> Astarte.Core.Triggers.SimpleTriggersProtobuf.SimpleTriggerContainer.encode() - - trigger_target_data = - %Astarte.Core.Triggers.SimpleTriggersProtobuf.TriggerTargetContainer{ - trigger_target: { - :amqp_trigger_target, - %Astarte.Core.Triggers.SimpleTriggersProtobuf.AMQPTriggerTarget{ - routing_key: AMQPTestHelper.events_routing_key() - } - } + } + } + |> Astarte.Core.Triggers.SimpleTriggersProtobuf.SimpleTriggerContainer.encode() + + trigger_target_data = + %Astarte.Core.Triggers.SimpleTriggersProtobuf.TriggerTargetContainer{ + trigger_target: { + :amqp_trigger_target, + %Astarte.Core.Triggers.SimpleTriggersProtobuf.AMQPTriggerTarget{ + routing_key: AMQPTestHelper.events_routing_key() } - |> Astarte.Core.Triggers.SimpleTriggersProtobuf.TriggerTargetContainer.encode() - - query = - DatabaseQuery.new() - |> DatabaseQuery.statement(@insert_into_simple_triggers) - |> DatabaseQuery.put( - :object_id, - :uuid.string_to_uuid("798b93a5-842e-bbad-2e4d-d20306838051") - ) - |> DatabaseQuery.put( - :object_type, - SimpleTriggersProtobufUtils.object_type_to_int!(:interface) - ) - |> DatabaseQuery.put(:simple_trigger_id, greater_than_incoming_trigger_id()) - |> DatabaseQuery.put(:parent_trigger_id, fake_parent_trigger_id()) - |> DatabaseQuery.put(:trigger_data, simple_trigger_data) - |> DatabaseQuery.put(:trigger_target, trigger_target_data) - - DatabaseQuery.call!(client, query) - - simple_trigger_data = - %SimpleTriggerContainer{ - simple_trigger: { - :device_trigger, - %DeviceTrigger{ - device_event_type: :DEVICE_CONNECTED - } - } + } + } + |> Astarte.Core.Triggers.SimpleTriggersProtobuf.TriggerTargetContainer.encode() + + params = %{ + "object_id" => :uuid.string_to_uuid("798b93a5-842e-bbad-2e4d-d20306838051"), + "object_type" => SimpleTriggersProtobufUtils.object_type_to_int!(:interface), + "simple_trigger_id" => greater_than_incoming_trigger_id(), + "parent_trigger_id" => fake_parent_trigger_id(), + "trigger_data" => simple_trigger_data, + "trigger_target" => trigger_target_data + } + + Queries.custom_query!(@insert_into_simple_triggers, nil, params) + + simple_trigger_data = + %SimpleTriggerContainer{ + simple_trigger: { + :device_trigger, + %DeviceTrigger{ + device_event_type: :DEVICE_CONNECTED } - |> SimpleTriggerContainer.encode() - - trigger_target_data = - %TriggerTargetContainer{ - trigger_target: { - :amqp_trigger_target, - %AMQPTriggerTarget{ - routing_key: AMQPTestHelper.events_routing_key() - } - } + } + } + |> SimpleTriggerContainer.encode() + + trigger_target_data = + %TriggerTargetContainer{ + trigger_target: { + :amqp_trigger_target, + %AMQPTriggerTarget{ + routing_key: AMQPTestHelper.events_routing_key() } - |> TriggerTargetContainer.encode() - - query = - DatabaseQuery.new() - |> DatabaseQuery.statement(@insert_into_simple_triggers) - |> DatabaseQuery.put( - :object_id, - :uuid.string_to_uuid("7f454c46-0201-0100-0000-000000000000") - ) - |> DatabaseQuery.put( - :object_type, - SimpleTriggersProtobufUtils.object_type_to_int!(:device) - ) - |> DatabaseQuery.put(:simple_trigger_id, device_connected_trigger_id()) - |> DatabaseQuery.put(:parent_trigger_id, fake_parent_trigger_id()) - |> DatabaseQuery.put(:trigger_data, simple_trigger_data) - |> DatabaseQuery.put(:trigger_target, trigger_target_data) - - DatabaseQuery.call!(client, query) - - simple_trigger_data = - %SimpleTriggerContainer{ - simple_trigger: { - :data_trigger, - %DataTrigger{ - interface_name: "com.test.LCDMonitor", - interface_major: 1, - data_trigger_type: :PATH_REMOVED, - match_path: "/time/from" - } - } + } + } + |> TriggerTargetContainer.encode() + + params = %{ + "object_id" => :uuid.string_to_uuid("7f454c46-0201-0100-0000-000000000000"), + "object_type" => SimpleTriggersProtobufUtils.object_type_to_int!(:device), + "simple_trigger_id" => device_connected_trigger_id(), + "parent_trigger_id" => fake_parent_trigger_id(), + "trigger_data" => simple_trigger_data, + "trigger_target" => trigger_target_data + } + + Queries.custom_query!(@insert_into_simple_triggers, nil, params) + + simple_trigger_data = + %SimpleTriggerContainer{ + simple_trigger: { + :data_trigger, + %DataTrigger{ + interface_name: "com.test.LCDMonitor", + interface_major: 1, + data_trigger_type: :PATH_REMOVED, + match_path: "/time/from" } - |> SimpleTriggerContainer.encode() - - trigger_target_data = - %TriggerTargetContainer{ - trigger_target: { - :amqp_trigger_target, - %AMQPTriggerTarget{ - routing_key: AMQPTestHelper.events_routing_key() - } - } + } + } + |> SimpleTriggerContainer.encode() + + trigger_target_data = + %TriggerTargetContainer{ + trigger_target: { + :amqp_trigger_target, + %AMQPTriggerTarget{ + routing_key: AMQPTestHelper.events_routing_key() } - |> TriggerTargetContainer.encode() - - query = - DatabaseQuery.new() - |> DatabaseQuery.statement(@insert_into_simple_triggers) - |> DatabaseQuery.put( - :object_id, - :uuid.string_to_uuid("798b93a5-842e-bbad-2e4d-d20306838051") - ) - |> DatabaseQuery.put( - :object_type, - SimpleTriggersProtobufUtils.object_type_to_int!(:interface) - ) - |> DatabaseQuery.put(:simple_trigger_id, path_removed_trigger_id()) - |> DatabaseQuery.put(:parent_trigger_id, fake_parent_trigger_id()) - |> DatabaseQuery.put(:trigger_data, simple_trigger_data) - |> DatabaseQuery.put(:trigger_target, trigger_target_data) - - DatabaseQuery.call!(client, query) - - # group 1 device trigger - simple_trigger_data = - %SimpleTriggerContainer{ - simple_trigger: { - :device_trigger, - %DeviceTrigger{ - device_event_type: :DEVICE_CONNECTED, - group_name: "group1" - } - } + } + } + |> TriggerTargetContainer.encode() + + params = %{ + "object_id" => :uuid.string_to_uuid("798b93a5-842e-bbad-2e4d-d20306838051"), + "object_type" => SimpleTriggersProtobufUtils.object_type_to_int!(:interface), + "simple_trigger_id" => path_removed_trigger_id(), + "parent_trigger_id" => fake_parent_trigger_id(), + "trigger_data" => simple_trigger_data, + "trigger_target" => trigger_target_data + } + + Queries.custom_query!(@insert_into_simple_triggers, nil, params) + + # group 1 device trigger + simple_trigger_data = + %SimpleTriggerContainer{ + simple_trigger: { + :device_trigger, + %DeviceTrigger{ + device_event_type: :DEVICE_CONNECTED, + group_name: "group1" } - |> SimpleTriggerContainer.encode() - - trigger_target_data = - %TriggerTargetContainer{ - trigger_target: { - :amqp_trigger_target, - %AMQPTriggerTarget{ - routing_key: AMQPTestHelper.events_routing_key() - } - } + } + } + |> SimpleTriggerContainer.encode() + + trigger_target_data = + %TriggerTargetContainer{ + trigger_target: { + :amqp_trigger_target, + %AMQPTriggerTarget{ + routing_key: AMQPTestHelper.events_routing_key() } - |> TriggerTargetContainer.encode() - - object_id = SimpleTriggersProtobufUtils.get_group_object_id("group1") - - query = - DatabaseQuery.new() - |> DatabaseQuery.statement(@insert_into_simple_triggers) - |> DatabaseQuery.put(:object_id, object_id) - |> DatabaseQuery.put( - :object_type, - SimpleTriggersProtobufUtils.object_type_to_int!(:group) - ) - |> DatabaseQuery.put(:simple_trigger_id, group1_device_connected_trigger_id()) - |> DatabaseQuery.put(:parent_trigger_id, fake_parent_trigger_id()) - |> DatabaseQuery.put(:trigger_data, simple_trigger_data) - |> DatabaseQuery.put(:trigger_target, trigger_target_data) - - DatabaseQuery.call!(client, query) - - # group 2 device trigger - simple_trigger_data = - %SimpleTriggerContainer{ - simple_trigger: { - :device_trigger, - %DeviceTrigger{ - device_event_type: :DEVICE_CONNECTED, - group_name: "group2" - } - } + } + } + |> TriggerTargetContainer.encode() + + object_id = SimpleTriggersProtobufUtils.get_group_object_id("group1") + + group_1_params = %{ + "object_id" => object_id, + "object_type" => SimpleTriggersProtobufUtils.object_type_to_int!(:group), + "simple_trigger_id" => group1_device_connected_trigger_id(), + "parent_trigger_id" => fake_parent_trigger_id(), + "trigger_data" => simple_trigger_data, + "trigger_target" => trigger_target_data + } + + Queries.custom_query!(@insert_into_simple_triggers, nil, group_1_params) + + # group 2 device trigger + simple_trigger_data = + %SimpleTriggerContainer{ + simple_trigger: { + :device_trigger, + %DeviceTrigger{ + device_event_type: :DEVICE_CONNECTED, + group_name: "group2" } - |> SimpleTriggerContainer.encode() - - trigger_target_data = - %TriggerTargetContainer{ - trigger_target: { - :amqp_trigger_target, - %AMQPTriggerTarget{ - routing_key: AMQPTestHelper.events_routing_key() - } - } + } + } + |> SimpleTriggerContainer.encode() + + trigger_target_data = + %TriggerTargetContainer{ + trigger_target: { + :amqp_trigger_target, + %AMQPTriggerTarget{ + routing_key: AMQPTestHelper.events_routing_key() } - |> TriggerTargetContainer.encode() - - object_id = SimpleTriggersProtobufUtils.get_group_object_id("group2") - - query = - DatabaseQuery.new() - |> DatabaseQuery.statement(@insert_into_simple_triggers) - |> DatabaseQuery.put(:object_id, object_id) - |> DatabaseQuery.put( - :object_type, - SimpleTriggersProtobufUtils.object_type_to_int!(:group) - ) - |> DatabaseQuery.put(:simple_trigger_id, group2_device_connected_trigger_id()) - |> DatabaseQuery.put(:parent_trigger_id, fake_parent_trigger_id()) - |> DatabaseQuery.put(:trigger_data, simple_trigger_data) - |> DatabaseQuery.put(:trigger_target, trigger_target_data) - - DatabaseQuery.call!(client, query) - - # Device-specific data trigger - - target_device_id = "f0VMRgIBAQAAAAAAAAAAAA" - - simple_trigger_data = - %Astarte.Core.Triggers.SimpleTriggersProtobuf.SimpleTriggerContainer{ - simple_trigger: { - :data_trigger, - %Astarte.Core.Triggers.SimpleTriggersProtobuf.DataTrigger{ - device_id: target_device_id, - interface_name: "com.test.LCDMonitor", - interface_major: 1, - data_trigger_type: :INCOMING_DATA, - match_path: "/weekSchedule/%{weekDay}/start", - value_match_operator: :LESS_THAN, - known_value: Cyanide.encode!(%{v: 2}) - } - } + } + } + |> TriggerTargetContainer.encode() + + object_id = SimpleTriggersProtobufUtils.get_group_object_id("group2") + + group_2_params = %{ + "object_id" => object_id, + "object_type" => SimpleTriggersProtobufUtils.object_type_to_int!(:group), + "simple_trigger_id" => group2_device_connected_trigger_id(), + "parent_trigger_id" => fake_parent_trigger_id(), + "trigger_data" => simple_trigger_data, + "trigger_target" => trigger_target_data + } + + Queries.custom_query!(@insert_into_simple_triggers, nil, group_2_params) + + # Device-specific data trigger + + target_device_id = "f0VMRgIBAQAAAAAAAAAAAA" + + simple_trigger_data = + %Astarte.Core.Triggers.SimpleTriggersProtobuf.SimpleTriggerContainer{ + simple_trigger: { + :data_trigger, + %Astarte.Core.Triggers.SimpleTriggersProtobuf.DataTrigger{ + device_id: target_device_id, + interface_name: "com.test.LCDMonitor", + interface_major: 1, + data_trigger_type: :INCOMING_DATA, + match_path: "/weekSchedule/%{weekDay}/start", + value_match_operator: :LESS_THAN, + known_value: Cyanide.encode!(%{v: 2}) } - |> Astarte.Core.Triggers.SimpleTriggersProtobuf.SimpleTriggerContainer.encode() - - trigger_target_data = - %Astarte.Core.Triggers.SimpleTriggersProtobuf.TriggerTargetContainer{ - trigger_target: { - :amqp_trigger_target, - %Astarte.Core.Triggers.SimpleTriggersProtobuf.AMQPTriggerTarget{ - routing_key: AMQPTestHelper.events_routing_key() - } - } + } + } + |> Astarte.Core.Triggers.SimpleTriggersProtobuf.SimpleTriggerContainer.encode() + + trigger_target_data = + %Astarte.Core.Triggers.SimpleTriggersProtobuf.TriggerTargetContainer{ + trigger_target: { + :amqp_trigger_target, + %Astarte.Core.Triggers.SimpleTriggersProtobuf.AMQPTriggerTarget{ + routing_key: AMQPTestHelper.events_routing_key() } - |> Astarte.Core.Triggers.SimpleTriggersProtobuf.TriggerTargetContainer.encode() - - {:ok, target_decoded_device_id} = target_device_id |> Device.decode_device_id() - - interface_id = CQLUtils.interface_id("com.test.LCDMonitor", 1) - - object_id = - SimpleTriggersProtobufUtils.get_device_and_interface_object_id( - target_decoded_device_id, - interface_id - ) - - query = - DatabaseQuery.new() - |> DatabaseQuery.statement(@insert_into_simple_triggers) - |> DatabaseQuery.put( - :object_id, - object_id - ) - |> DatabaseQuery.put( - :object_type, - SimpleTriggersProtobufUtils.object_type_to_int!(:device_and_interface) - ) - |> DatabaseQuery.put(:simple_trigger_id, less_than_device_incoming_trigger_id()) - |> DatabaseQuery.put(:parent_trigger_id, fake_parent_trigger_id()) - |> DatabaseQuery.put(:trigger_data, simple_trigger_data) - |> DatabaseQuery.put(:trigger_target, trigger_target_data) - - DatabaseQuery.call!(client, query) - - # Group-specific data trigger - target_group = "group1" - - simple_trigger_data = - %Astarte.Core.Triggers.SimpleTriggersProtobuf.SimpleTriggerContainer{ - simple_trigger: { - :data_trigger, - %Astarte.Core.Triggers.SimpleTriggersProtobuf.DataTrigger{ - group_name: target_group, - interface_name: "com.test.LCDMonitor", - interface_major: 1, - data_trigger_type: :INCOMING_DATA, - match_path: "/weekSchedule/%{weekDay}/start", - value_match_operator: :EQUAL_TO, - known_value: Cyanide.encode!(%{v: 3}) - } - } + } + } + |> Astarte.Core.Triggers.SimpleTriggersProtobuf.TriggerTargetContainer.encode() + + {:ok, target_decoded_device_id} = target_device_id |> Device.decode_device_id() + + interface_id = CQLUtils.interface_id("com.test.LCDMonitor", 1) + + object_id = + SimpleTriggersProtobufUtils.get_device_and_interface_object_id( + target_decoded_device_id, + interface_id + ) + + device_params = %{ + "object_id" => object_id, + "object_type" => SimpleTriggersProtobufUtils.object_type_to_int!(:device_and_interface), + "simple_trigger_id" => less_than_device_incoming_trigger_id(), + "parent_trigger_id" => fake_parent_trigger_id(), + "trigger_data" => simple_trigger_data, + "trigger_target" => trigger_target_data + } + + Queries.custom_query!(@insert_into_simple_triggers, nil, device_params) + + # Group-specific data trigger + target_group = "group1" + + simple_trigger_data = + %Astarte.Core.Triggers.SimpleTriggersProtobuf.SimpleTriggerContainer{ + simple_trigger: { + :data_trigger, + %Astarte.Core.Triggers.SimpleTriggersProtobuf.DataTrigger{ + group_name: target_group, + interface_name: "com.test.LCDMonitor", + interface_major: 1, + data_trigger_type: :INCOMING_DATA, + match_path: "/weekSchedule/%{weekDay}/start", + value_match_operator: :EQUAL_TO, + known_value: Cyanide.encode!(%{v: 3}) } - |> Astarte.Core.Triggers.SimpleTriggersProtobuf.SimpleTriggerContainer.encode() - - trigger_target_data = - %Astarte.Core.Triggers.SimpleTriggersProtobuf.TriggerTargetContainer{ - trigger_target: { - :amqp_trigger_target, - %Astarte.Core.Triggers.SimpleTriggersProtobuf.AMQPTriggerTarget{ - routing_key: AMQPTestHelper.events_routing_key() - } - } + } + } + |> Astarte.Core.Triggers.SimpleTriggersProtobuf.SimpleTriggerContainer.encode() + + trigger_target_data = + %Astarte.Core.Triggers.SimpleTriggersProtobuf.TriggerTargetContainer{ + trigger_target: { + :amqp_trigger_target, + %Astarte.Core.Triggers.SimpleTriggersProtobuf.AMQPTriggerTarget{ + routing_key: AMQPTestHelper.events_routing_key() } - |> Astarte.Core.Triggers.SimpleTriggersProtobuf.TriggerTargetContainer.encode() - - interface_id = CQLUtils.interface_id("com.test.LCDMonitor", 1) - - object_id = - SimpleTriggersProtobufUtils.get_group_and_interface_object_id( - target_group, - interface_id - ) - - query = - DatabaseQuery.new() - |> DatabaseQuery.statement(@insert_into_simple_triggers) - |> DatabaseQuery.put( - :object_id, - object_id - ) - |> DatabaseQuery.put( - :object_type, - SimpleTriggersProtobufUtils.object_type_to_int!(:group_and_interface) - ) - |> DatabaseQuery.put(:simple_trigger_id, equal_to_group_incoming_trigger_id()) - |> DatabaseQuery.put(:parent_trigger_id, fake_parent_trigger_id()) - |> DatabaseQuery.put(:trigger_data, simple_trigger_data) - |> DatabaseQuery.put(:trigger_target, trigger_target_data) - - DatabaseQuery.call!(client, query) - - {:ok, client} - - %{msg: msg} -> - {:error, msg} - end + } + } + |> Astarte.Core.Triggers.SimpleTriggersProtobuf.TriggerTargetContainer.encode() + + interface_id = CQLUtils.interface_id("com.test.LCDMonitor", 1) + + object_id = + SimpleTriggersProtobufUtils.get_group_and_interface_object_id( + target_group, + interface_id + ) + + group_specific_params = %{ + "object_id" => object_id, + "object_type" => SimpleTriggersProtobufUtils.object_type_to_int!(:group_and_interface), + "simple_trigger_id" => equal_to_group_incoming_trigger_id(), + "parent_trigger_id" => fake_parent_trigger_id(), + "trigger_data" => simple_trigger_data, + "trigger_target" => trigger_target_data + } + + Queries.custom_query!(@insert_into_simple_triggers, nil, group_specific_params) end def destroy_local_test_keyspace do - {:ok, client} = DatabaseClient.new(List.first(Config.cqex_nodes!())) - DatabaseQuery.call(client, "DROP KEYSPACE autotestrealm;") + Queries.custom_query("DROP KEYSPACE autotestrealm;") :ok end def insert_device(device_id, opts \\ []) do - client = DatabaseClient.new!(List.first(Config.cqex_nodes!())) - last_connection = Keyword.get(opts, :last_connection) - last_disconnection = Keyword.get(opts, :last_disconnection) - - first_pairing = - Keyword.get(opts, :first_pairing, DateTime.utc_now() |> DateTime.to_unix(:millisecond)) - - last_seen_ip = Keyword.get(opts, :last_seen_ip) - last_pairing_ip = Keyword.get(opts, :last_pairing_ip) - total_received_msgs = Keyword.get(opts, :total_received_msgs, 0) - total_received_bytes = Keyword.get(opts, :total_received_bytes, 0) - introspection = Keyword.get(opts, :introspection, %{}) - groups = Keyword.get(opts, :groups, []) - groups_map = for group <- groups, do: {group, UUID.uuid1()} - - query = - DatabaseQuery.new() - |> DatabaseQuery.statement(@insert_device) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.put(:last_connection, last_connection) - |> DatabaseQuery.put(:last_disconnection, last_disconnection) - |> DatabaseQuery.put(:first_pairing, first_pairing) - |> DatabaseQuery.put(:last_seen_ip, last_seen_ip) - |> DatabaseQuery.put(:last_pairing_ip, last_pairing_ip) - |> DatabaseQuery.put(:total_received_msgs, total_received_msgs) - |> DatabaseQuery.put(:total_received_bytes, total_received_bytes) - |> DatabaseQuery.put(:introspection, introspection) - |> DatabaseQuery.put(:groups, groups_map) - - DatabaseQuery.call(client, query) + params = + opts + |> Keyword.validate!( + last_connection: nil, + last_disconnection: nil, + first_pairing: DateTime.utc_now() |> DateTime.to_unix(:millisecond), + last_seen_ip: nil, + last_pairing_ip: nil, + total_received_msgs: 0, + total_received_bytes: 0, + introspection: %{}, + groups: [] + ) + |> Keyword.update!(:groups, &Map.new(&1, fn group -> {group, UUID.uuid1()} end)) + |> Keyword.put(:device_id, device_id) + |> Map.new(fn {key, val} -> {to_string(key), val} end) + + Queries.custom_query(@insert_device, nil, params) end - def fetch_old_introspection(db_client, device_id) do + def fetch_old_introspection(realm, device_id) do old_introspection_statement = """ SELECT old_introspection FROM devices WHERE device_id=:device_id """ - old_introspection_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(old_introspection_statement) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.consistency(:quorum) + params = %{ + "device_id" => device_id + } - with {:ok, result} <- DatabaseQuery.call(db_client, old_introspection_query), - [old_introspection: introspection_minors] when is_list(introspection_minors) <- - DatabaseResult.head(result) do - {:ok, Enum.into(introspection_minors, %{})} - else - [old_introspection: nil] -> - {:ok, %{}} + with {:ok, result} <- + Queries.custom_query(old_introspection_statement, realm, params, + consistency: :quorum, + result: :first! + ) do + %{"old_introspection" => introspection_minors} = result + introspection_minors = introspection_minors || %{} + {:ok, introspection_minors} end end