-
Notifications
You must be signed in to change notification settings - Fork 0
/
InfluxUploader.py
executable file
·99 lines (79 loc) · 2.75 KB
/
InfluxUploader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#!/usr/bin/env python3.10
"""
InfluxDB Uploader.
upload data into InfluxDB
read credentials from .ini file
"""
# install via
# pip3 install influxdb-client
from configparser import ConfigParser
from influxdb import InfluxDBClient
class InfluxUploader:
"""InfluxUploader Class."""
def __init__(self, verbose: bool = False) -> None: # noqa: D107
self.verbose = verbose
if self.verbose:
print("InfluxUploader: verbose = True")
self.config = ConfigParser(interpolation=None)
# interpolation=None -> treats % in values as char % instead of interpreting it
self.config.read("InfluxUploader.ini", encoding="utf-8")
self.con = self.connect()
def connect(self) -> InfluxDBClient:
"""Connect to DB."""
client = InfluxDBClient(
host=self.config.get("Connection", "host"),
port=self.config.getint("Connection", "port"),
username=self.config.get("Connection", "username"),
password=self.config.get("Connection", "password"),
)
client.switch_database(self.config.get("Connection", "database"))
return client
def upload(
self,
measurement: str,
fields: dict[str, int | float],
tags: dict[str, str],
datetime: str = "",
) -> None:
"""
Upload measurement data.
datetime string, created via datetime module
dt.strftime("%Y-%m-%dT%H:%M:%SZ")
"""
json = [
{
"measurement": measurement,
"tags": tags,
"fields": fields,
},
]
if datetime != "":
# dt.strftime("%Y-%m-%dT%H:%M:%SZ")
json[0]["time"] = datetime
if self.verbose:
print("adding timestamp")
if self.verbose:
print(f"uploading:\n {json}")
# time_precision is important for performance
if self.con.write_points(json, time_precision="s") is True:
if self.verbose:
print("data inserted into InfluxDB")
else:
print("ERROR: Write to InfluxDB not successful")
# def query(self, query):
# # TODO: not needed for a pure upload class
# print("Querying data: " + query)
# result = self.con.query(query)
# print("Result: {0}".format(result))
def test(self) -> None:
"""Test access by reading list of databases."""
print("list of databases:")
print(self.con.get_list_database())
def test() -> None:
"""Test."""
influx_uploader = InfluxUploader(verbose=True)
influx_uploader.test()
# requires grant read on mydb to myuser
# influx_uploader.query('SHOW MEASUREMENTS')
if __name__ == "__main__":
test()