Skip to content

Commit

Permalink
Merge pull request #1006 from osmanhadzic/fix-astarte-data-access
Browse files Browse the repository at this point in the history
Refactor Astarte Import
  • Loading branch information
Annopaolo authored Oct 23, 2024
2 parents fe6af4f + 340c2f1 commit 82e0f14
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 46 deletions.
2 changes: 2 additions & 0 deletions tools/astarte_import/config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import Config
config :cqerl,
cassandra_nodes: [{System.get_env("CASSANDRA_DB_HOST"), System.get_env("CASSANDRA_DB_PORT")}]

config :astarte_import, :cluster_name, :xandra

config :logger, :console,
format: {Astarte.Import.LogFmtFormatter, :format},
metadata: [:module, :function, :device_id, :realm, :db_action, :reason]
Expand Down
12 changes: 12 additions & 0 deletions tools/astarte_import/lib/astarte/config.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule Astarte.Config do
alias Astarte.DataAccess.Config, as: DataAccessConfig

def xandra_options! do
cluster = Application.get_env(:astarte_import, :cluster_name)
# Dropping :autodiscovery since the option has been deprecated in xandra v0.15.0
# and is now always enabled.
DataAccessConfig.xandra_options!()
|> Keyword.drop([:autodiscovery])
|> Keyword.put(:name, cluster)
end
end
29 changes: 5 additions & 24 deletions tools/astarte_import/lib/astarte/import/cli.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ defmodule Astarte.Import.CLI do

@chunk_size 4096

def main(args) do
def main([realm, file_name]) do
with {:started, {:ok, _}} <- {:started, Application.ensure_all_started(:astarte_import)},
[realm, file_name] <- args,
true <- String.valid?(realm),
true <- String.valid?(file_name),
{:ok, file} <- File.open(file_name, [:read]),
Expand All @@ -34,33 +33,15 @@ defmodule Astarte.Import.CLI do
{data, state}
else
:eof ->
{"", state}

{:error, reason} ->
Logger.error("Cannot read #{file_name}: #{inspect(reason)}.", realm: realm)
throw({:error, :cannot_read})

any ->
Logger.error("Cannot read #{file_name}. unexpected: #{inspect(any)}.", realm: realm)
throw({:error, :cannot_read})
{nil, state}
end
end

# Call the populate function with the necessary arguments
PopulateDB.populate(realm, data, more_data)
else
{:started, {:error, reason}} ->
Logger.error("Cannot ensure all applications startup: #{inspect(reason)}")

{:error, :enoent} ->
[realm, file_name] = args
Logger.error("File not found: #{file_name}.", realm: realm)

{:error, :eacces} ->
[realm, file_name] = args
Logger.error("Cannot access: #{file_name}.", realm: realm)

any ->
Logger.error("Invalid args: #{inspect(any)}. exiting.")
error ->
Logger.error("Failed to start import: #{inspect(error)}")
end
end
end
10 changes: 3 additions & 7 deletions tools/astarte_import/lib/astarte/import/populatedb.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ defmodule Astarte.Import.PopulateDB do
alias Astarte.Core.InterfaceDescriptor
alias Astarte.Core.Mapping
alias Astarte.Core.Mapping.EndpointsAutomaton
alias Astarte.DataAccess.Database
alias Astarte.DataAccess.Interface
alias Astarte.DataAccess.Mappings
alias Astarte.Import
Expand All @@ -48,20 +47,17 @@ defmodule Astarte.Import.PopulateDB do
nodes = Application.get_env(:cqerl, :cassandra_nodes)
{host, port} = Enum.random(nodes)
Logger.info("Connecting to #{host}:#{port} cassandra database.", realm: realm)
opts = [cassandra_nodes: nodes, realm: realm]
{:ok, conn} = Database.connect(opts)
{:ok, xandra_conn} = Xandra.start_link(nodes: ["#{host}:#{port}"])

Logger.info("Connected to database.", realm: realm)
{:ok, xandra_conn} = Xandra.start_link(nodes: ["#{host}:#{port}"])

got_interface_fun = fn %Import.State{data: data} = state, interface_name, major, minor ->
Logger.info("Importing data for #{interface_name} v#{major}.#{minor}.",
realm: realm,
device_id: state.device_id
)

{:ok, interface_desc} = Interface.fetch_interface_descriptor(conn, interface_name, major)
{:ok, mappings} = Mappings.fetch_interface_mappings(conn, interface_desc.interface_id)
{:ok, interface_desc} = Interface.fetch_interface_descriptor(realm, interface_name, major)
{:ok, mappings} = Mappings.fetch_interface_mappings(realm, interface_desc.interface_id)

%Import.State{
state
Expand Down
29 changes: 28 additions & 1 deletion tools/astarte_import/lib/mix/tasks/astarte_import.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,37 @@
defmodule Mix.Tasks.Astarte.Import do
use Mix.Task
require Logger
alias Astarte.Import.CLI
alias Astarte.DataAccess.Config

@impl Mix.Task
@shortdoc "import data into an existing Astarte realm"
def run(args) do
CLI.main(args)
Logger.info("Running astarte import task with arguments: #{inspect(args)}")

# Process the arguments
case args do
[realm, file_name] ->
Config.validate!()

xandra_options = Config.xandra_options!()
data_access_opts = [xandra_options: xandra_options]

ae_xandra_opts = Keyword.put(xandra_options, :name, :xandra)

children = [
# Ensure the :astarte_data_access process is started
{Xandra.Cluster, ae_xandra_opts},
{Astarte.DataAccess, data_access_opts}
]

opts = [strategy: :one_for_one, name: AstarteImport.Supervisor]
Supervisor.start_link(children, opts)

CLI.main([realm, file_name])

_ ->
Logger.info("Usage: mix astarte.import <realm> <file_name>")
end
end
end
29 changes: 15 additions & 14 deletions tools/astarte_import/mix.lock
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
%{
"artificery": {:hex, :artificery, "0.4.3", "0bc4260f988dcb9dda4b23f9fc3c6c8b99a6220a331534fdf5bf2fd0d4333b02", [:mix], [], "hexpm", "12e95333a30e20884e937abdbefa3e7f5e05609c2ba8cf37b33f000b9ffc0504"},
"astarte_core": {:git, "https://github.com/astarte-platform/astarte_core.git", "3574f38f759e7b012060c66a46b594ac2cdb53c3", [tag: "v1.2.0-rc.0"]},
"astarte_data_access": {:git, "https://github.com/astarte-platform/astarte_data_access.git", "ea86a9683817b9384de418339dd3f15ab76f67db", [tag: "v1.2.0-rc.0"]},
"castore": {:hex, :castore, "1.0.7", "b651241514e5f6956028147fe6637f7ac13802537e895a724f90bf3e36ddd1dd", [:mix], [], "hexpm", "da7785a4b0d2a021cd1292a60875a784b6caef71e76bf4917bdee1f390455cf5"},
"astarte_core": {:hex, :astarte_core, "1.2.0", "5c989629b676f27cd3f48b655e640c59c25dff9293189bb254d04f23cdc4976b", [:mix], [{:cyanide, "~> 2.0", [hex: :cyanide, repo: "hexpm", optional: false]}, {:ecto, "~> 3.4", [hex: :ecto, repo: "hexpm", optional: false]}, {:ecto_morph, "~> 0.1.23", [hex: :ecto_morph, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.12", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "8a17f1dab5682a29384ce1dd21dab4b014cda86d31ae0c37774edacf93730306"},
"astarte_data_access": {:hex, :astarte_data_access, "1.2.0", "97843ec1251a54c94e96027845cfe86dc667d0d2cfd5febb391324dac0cc89d6", [:mix], [{:astarte_core, "~> 1.2", [hex: :astarte_core, repo: "hexpm", optional: false]}, {:castore, "~> 1.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:cqex, "~> 1.0", [hex: :cqex, repo: "hexpm", optional: false]}, {:skogsra, "~> 2.2", [hex: :skogsra, repo: "hexpm", optional: false]}, {:xandra, "~> 0.11", [hex: :xandra, repo: "hexpm", optional: false]}], "hexpm", "20300ff2963d6fb328ce8a5ca30b3aaa1ee05b3977fc0d4cf151548068abcae5"},
"castore": {:hex, :castore, "1.0.9", "5cc77474afadf02c7c017823f460a17daa7908e991b0cc917febc90e466a375c", [:mix], [], "hexpm", "5ea956504f1ba6f2b4eb707061d8e17870de2bee95fb59d512872c2ef06925e7"},
"connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"},
"cqerl": {:hex, :cqerl, "2.0.1", "c92929e885adc50cda1f11b73eb0121363e8418533312f8a44defb5f14853445", [:rebar3], [{:lz4, "~>0.2.4", [hex: :lz4_erl, repo: "hexpm", optional: false]}, {:re2, "1.9.5", [hex: :re2, repo: "hexpm", optional: false]}, {:semver, "~>0.0.1", [hex: :semver_erl, repo: "hexpm", optional: false]}, {:snappyer, "1.2.6", [hex: :snappyer, repo: "hexpm", optional: false]}, {:uuid, "~>2.0.0", [hex: :uuid_erl, repo: "hexpm", optional: false]}], "hexpm", "96e9ee407830508187a5edff9fc49983a7122b5c4127c640320a226b59ae12fe"},
"cqerl": {:hex, :cqerl, "2.0.1", "c92929e885adc50cda1f11b73eb0121363e8418533312f8a44defb5f14853445", [:rebar3], [{:lz4, "~> 0.2.4", [hex: :lz4_erl, repo: "hexpm", optional: false]}, {:re2, "1.9.5", [hex: :re2, repo: "hexpm", optional: false]}, {:semver, "~> 0.0.1", [hex: :semver_erl, repo: "hexpm", optional: false]}, {:snappyer, "1.2.6", [hex: :snappyer, repo: "hexpm", optional: false]}, {:uuid, "~> 2.0.0", [hex: :uuid_erl, repo: "hexpm", optional: false]}], "hexpm", "96e9ee407830508187a5edff9fc49983a7122b5c4127c640320a226b59ae12fe"},
"cqex": {:hex, :cqex, "1.0.1", "bc9980ac3b82d039879f8d6ca589deab799fe08f80ff449d60ad709f2524718f", [:mix], [{:cqerl, "~> 2.0.1", [hex: :cqerl, repo: "hexpm", optional: false]}], "hexpm", "1bbf2079c044cbf0f747f60dcf0409a951eaa8f1a2447cd6d80d6ff1b7c4dc6b"},
"cyanide": {:hex, :cyanide, "2.0.0", "f97b700b87f9b0679ae812f0c4b7fe35ea6541a4121a096cf10287941b7a6d55", [:mix], [], "hexpm", "7f9748251804c2a2115b539202568e1117ab2f0ae09875853fb89cc94ae19dd1"},
"db_connection": {:hex, :db_connection, "2.3.1", "4c9f3ed1ef37471cbdd2762d6655be11e38193904d9c5c1c9389f1b891a3088e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm", "abaab61780dde30301d840417890bd9f74131041afd02174cf4e10635b3a63f5"},
"decimal": {:hex, :decimal, "1.9.0", "83e8daf59631d632b171faabafb4a9f4242c514b0a06ba3df493951c08f64d07", [:mix], [], "hexpm", "b1f2343568eed6928f3e751cf2dffde95bfaa19dd95d09e8a9ea92ccfd6f7d85"},
"decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"},
"distillery": {:hex, :distillery, "2.1.1", "f9332afc2eec8a1a2b86f22429e068ef35f84a93ea1718265e740d90dd367814", [:mix], [{:artificery, "~> 0.2", [hex: :artificery, repo: "hexpm", optional: false]}], "hexpm", "bbc7008b0161a6f130d8d903b5b3232351fccc9c31a991f8fcbf2a12ace22995"},
"ecto": {:hex, :ecto, "3.5.8", "8ebf12be6016cb99313348ba7bb4612f4114b9a506d6da79a2adc7ef449340bc", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ea0be182ea8922eb7742e3ae8e71b67ee00ae177de1bf76210299a5f16ba4c77"},
"ecto": {:hex, :ecto, "3.12.4", "267c94d9f2969e6acc4dd5e3e3af5b05cdae89a4d549925f3008b2b7eb0b93c3", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ef04e4101688a67d061e1b10d7bc1fbf00d1d13c17eef08b71d070ff9188f747"},
"ecto_morph": {:hex, :ecto_morph, "0.1.29", "bc0b915779636bd2d30c54cad6922b3cb40f85b1d4ad59bdffd3c788d9d1f972", [:mix], [{:ecto, ">= 3.0.3", [hex: :ecto, repo: "hexpm", optional: false]}], "hexpm", "814bed72e3d03b278c1dfb3fbc4da37f478a37518ee54f010c1ad9254f1ca0e3"},
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"exprotobuf": {:hex, :exprotobuf, "1.2.17", "3003937da617f588a8fb63ebdd7b127a18d78d6502623c272076fd54c07c4de1", [:mix], [{:gpb, "~> 4.0", [hex: :gpb, repo: "hexpm", optional: false]}], "hexpm", "e07ec1e5ae6f8c1c8521450d5f6b658c8c700b1f34c70356e91ece0766f4361a"},
"gpb": {:hex, :gpb, "4.12.0", "4a5224c046b1cd91fb23b81a8a450cb4435016c1f2c96af71f0a90476a11dcaf", [:make, :rebar], [], "hexpm", "de6c947a2308c52941bab0b168e0aadca146c26678a330698865d222a3621125"},
"jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"},
"logfmt": {:hex, :logfmt, "3.3.1", "d9eaf344f81f4b4a6f2a1c73d79cb9cb4c4ff1d7c878d7a1bf3cb92761f653db", [:mix], [], "hexpm", "5030c9dafe1ef85e416cc867cd8db836de512c54047735c7a3e7be0ba67a532e"},
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
"logfmt": {:hex, :logfmt, "3.3.3", "6521ee4a5c532088e15d487fab9f736c07bdd161d643560c73cd4b10685deb65", [:mix], [], "hexpm", "dbd51cd3fe37c3429b9bd687bad1f531a533505f4a641592129e7a47e24104d1"},
"lz4": {:hex, :lz4_erl, "0.2.4", "fafc1fa39ed1d034893316852daebedd82f37df478446224ac096490be3b4a4d", [:rebar3], [], "hexpm", "e3eb9e2b5c1e4dab39db8fe0421e6fa10f9bf5843f20dab43518f8ab8e812517"},
"protobuf": {:hex, :protobuf, "0.12.0", "58c0dfea5f929b96b5aa54ec02b7130688f09d2de5ddc521d696eec2a015b223", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "75fa6cbf262062073dd51be44dd0ab940500e18386a6c4e87d5819a58964dc45"},
"quickrand": {:hex, :quickrand, "2.0.1", "6d861fa11e6eb51bb2343a2616eff704c2681a9997f41abc78e58fa76da33981", [:rebar3], [], "hexpm", "14db67d4aef6b8815810ec9f3ccef5e324b73b56cae3687f99d752b85bdd4c96"},
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
"protobuf": {:hex, :protobuf, "0.13.0", "7a9d9aeb039f68a81717eb2efd6928fdf44f03d2c0dfdcedc7b560f5f5aae93d", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "21092a223e3c6c144c1a291ab082a7ead32821ba77073b72c68515aa51fef570"},
"quickrand": {:hex, :quickrand, "2.0.7", "d2bd76676a446e6a058d678444b7fda1387b813710d1af6d6e29bb92186c8820", [:rebar3], [], "hexpm", "b8acbf89a224bc217c3070ca8bebc6eb236dbe7f9767993b274084ea044d35f0"},
"re2": {:hex, :re2, "1.9.5", "3c419527fb2cc75eda1657f04dc7e8cea9864899b43ff6cc3250fa7525431e83", [:rebar3], [], "hexpm", "4861336271ac565224e79e133cbaf90af77739cda3fff25f6965b24a1146a4d6"},
"semver": {:hex, :semver_erl, "0.0.1", "e1dc99fb20ff071b240a0280611faba93eeb9c2adfb02a15e20a06a9f13dfff4", [:rebar3], [], "hexpm", "adf1cb935eeb2472b4b7bb8116c678c1077ae4cd5bdfe90010b765aecce5753b"},
"skogsra": {:hex, :skogsra, "2.3.3", "90ea76d98ad749241b31e724ca17ed8aca0202001972aeca3cb834f44027f3ea", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}, {:yamerl, "~> 0.8", [hex: :yamerl, repo: "hexpm", optional: true]}], "hexpm", "e36880922431d41ac56d6cb4529b0526039a108fb44f8ecc90b517d494b86c28"},
"skogsra": {:hex, :skogsra, "2.5.0", "57d57c15bb8356662177779cb10adf1272069eeb4f3c032bf7d71d522e726f06", [:mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: true]}, {:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: true]}], "hexpm", "b7dfe23ef3f9999a96fa330b73363b3f48d68a7ca3eb98ab1f32cd888ef207ee"},
"snappyer": {:hex, :snappyer, "1.2.6", "34181e3233f68a92044e176fe96e54fee7957acc2be554f0460d799c495166c2", [:rebar3], [], "hexpm", "d538d1e8892af09dc8b2771b2652c9d70f009cd1556246b3e22706df643f47b4"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"},
"telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"},
"uuid": {:hex, :uuid_erl, "2.0.1", "1fd9079c544d521063897887a1c5b3302dca98f9bb06aadcdc6fb0663f256797", [:rebar3], [{:quickrand, "~> 2.0.1", [hex: :quickrand, repo: "hexpm", optional: false]}], "hexpm", "ab57caccd51f170011e5f444ce865f84b41605e483a9efcc468c1afaec87553b"},
"xandra": {:hex, :xandra, "0.13.1", "f82866e6c47527f74f35dd3007b5311121852dd861a29ed1613e27ccfaba0102", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "a2efdb8921e3b694bf3505e40c5ec9d353d8fa3755cec946be7c18b8236d7230"},
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
"uuid": {:hex, :uuid_erl, "2.0.7", "b2078d2cc814f53afa52d36c91e08962c7e7373585c623f4c0ea6dfb04b2af94", [:rebar3], [{:quickrand, ">= 2.0.7", [hex: :quickrand, repo: "hexpm", optional: false]}], "hexpm", "4e4c5ca3461dc47c5e157ed42aa3981a053b7a186792af972a27b14a9489324e"},
"xandra": {:hex, :xandra, "0.19.1", "3041768e92874d850f65905669fb39a9482e4b68059351efc09ddc881f1e0baa", [:mix], [{:decimal, "~> 1.7 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "f47cef0597ff04baf1416dc3c4906c6edb04f19f7081a523f8eee1d3b69a4323"},
}

0 comments on commit 82e0f14

Please sign in to comment.