Skip to content

Commit

Permalink
Add tests for publishing data-stream
Browse files Browse the repository at this point in the history
Add tests for publishing individual parametric
and nonparametric datastreams

Signed-off-by: Osman Hadzic <osman.hadzic@secomind.com>
  • Loading branch information
osmanhadzic committed Nov 8, 2024
1 parent 80aa547 commit 2715530
Showing 1 changed file with 86 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# SPDX-FileCopyrightText: 2024 SECO Mind Srl
#
# SPDX-License-Identifier: Apache-2.0


import subprocess

from resources import (
list_of_params_endpoints,
list_of_nonparams_endpoints,
map_of_params_data,
map_of_nonparams_data,
list_of_nonparams_endpoints,
)


def test_app_engine_server_publish_datastream_individual_parametric_datastream(
astarte_env_vars,
):
device_id = astarte_env_vars["device_test_1"]
astarte_url = astarte_env_vars["astarte_url"]
realm = astarte_env_vars["realm"]
jwt = astarte_env_vars["jwt"]

interface_name = "test.astarte-platform.server.individual.parametric.Datastream"

for path in list_of_params_endpoints:
value = map_of_params_data[path]
if type(value) is type([]):
value = " ".join(value).replace(" ", ",")

arg_list = [
"astartectl",
"appengine",
"devices",
"publish-datastream",
device_id,
interface_name,
path,
value,
"-t",
jwt,
"-u",
astarte_url,
"-r",
realm,
]
sample_data_result = subprocess.run(arg_list, capture_output=True, text=True)
print(f"Info: {sample_data_result.stdout}")
assert sample_data_result.stdout.replace("\n", "") == "ok"


def test_app_engine_server_publish_datastream_individual_nonparametric_datastream(
astarte_env_vars,
):
device_id = astarte_env_vars["device_test_1"]
astarte_url = astarte_env_vars["astarte_url"]
realm = astarte_env_vars["realm"]
jwt = astarte_env_vars["jwt"]

interface_name = "test.astarte-platform.server.individual.nonparametric.Datastream"

for path in list_of_nonparams_endpoints:
value = map_of_nonparams_data[path]
if type(value) is type([]):
value = " ".join(value).replace(" ", ",")

arg_list = [
"astartectl",
"appengine",
"devices",
"publish-datastream",
device_id,
interface_name,
path,
value,
"-t",
jwt,
"-u",
astarte_url,
"-r",
realm,
]
sample_data_result = subprocess.run(arg_list, capture_output=True, text=True)
print(f"Info: {sample_data_result.stdout}")
assert sample_data_result.stdout.replace("\n", "") == "ok"

0 comments on commit 2715530

Please sign in to comment.