-
Notifications
You must be signed in to change notification settings - Fork 16
/
database.py
256 lines (197 loc) · 10.3 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import os
import sqlite3
import random
from dials.base_logger import logger
class DialsDB:
connection = None
database_changes = 0
def __init__(self, database_file='vudials.db', init_if_missing=False):
# database_path = os.path.join(os.path.expanduser('~'), 'KaranovicResearch', 'vudials')
database_path = os.path.join(os.path.dirname(__file__))
if not os.path.exists(database_path):
os.makedirs(database_path)
self.database_file = os.path.join(database_path, database_file)
logger.info(f"VU1 Database file: {self.database_file}")
if not os.path.exists(self.database_file) and not init_if_missing:
raise SystemError("Database file does not exist!")
self.connection = sqlite3.connect(self.database_file)
self.connection.row_factory = sqlite3.Row
if init_if_missing:
self._init_database()
# -- Dial
def fetch_dial_info_or_create_default(self, dial_uid, dial_name='Not set'):
# check if dial exists
res = self._fetch_one_query(f"SELECT * FROM dials WHERE `dial_uid`='{dial_uid}' LIMIT 1")
if not res:
self._insert(f"INSERT INTO dials (`dial_uid`, `dial_name`) VALUES ('{dial_uid}', '{dial_name}')")
logger.debug(f"Added dial `{dial_uid}` to dial list with friendly name `{dial_name}`")
res = self._fetch_one_query(f"SELECT * FROM dials WHERE `dial_uid`='{dial_uid}' LIMIT 1")
return res
def dial_update_cell(self, dial_uid, cell, value):
logger.debug(f"Updating `{dial_uid}` to `{cell}`='{value}'")
logger.debug(f"Attempting to update `{dial_uid}` to `{cell}='{value}'")
self._insert(f"UPDATE dials SET `{cell}`='{value}' WHERE `dial_uid`='{dial_uid}'")
return self._more_than_one_changed()
def dial_update_cell_with_dict(self, dial_uid, values_dict):
if not isinstance(values_dict, dict):
logger.error(f"Expecting type(dictionary) but {type(values_dict)} given.")
return 0
logger.debug(f"Updating `{dial_uid}` to `{values_dict}'")
fields = ', '.join( f"`{key}`='{value}'" for key, value in values_dict.items())
query = f"UPDATE `dials` SET {fields} WHERE `dial_uid`='{dial_uid}'"
logger.debug(query)
logger.debug(f"Attempting to update `{dial_uid}` to `{values_dict}'")
self._insert(query)
return self._more_than_one_changed()
# -- API keys
def api_key_get_id(self, key):
res = self._fetch_one(table='api_keys', cell='key_id', where='key_uid', where_cmp=key, limit=1)
if not res:
return None
return res[0]
def api_key_list(self):
api_keys = {}
db_keys = self._fetch_all("SELECT * FROM api_keys")
if not db_keys:
return api_keys
for key in list(db_keys):
item = {}
item = {'key_name': key['key_name'], 'key_uid': key['key_uid'], 'priviledges': int(key['key_level'])}
item['dials'] = self.api_key_get_dial_access(key['key_id'])
api_keys[key['key_uid']] = item
return api_keys
def api_key_get_dial_access(self, key_id):
dials = []
key_access = self._fetch_all(f"SELECT `dial_uid` FROM `dial_access` WHERE `key_id`='{key_id}'")
if not key_access:
return dials
for item in key_access:
dials.append(item['dial_uid'])
return dials
def api_key_add_dial_access(self, key, dials):
key_id = self.api_key_get_id(key)
if not key_id:
return False
if not dials:
return False
# Wipe any existing entries that key has
self._query(f"DELETE FROM `dial_access` WHERE `key_id`={key_id}")
# Add dial access
for dial in dials:
self._insert(f"INSERT OR IGNORE INTO `dial_access` (dial_uid, key_id) VALUES ('{dial}', '{key_id}')")
return self._more_than_one_changed()
# Set master key to defined value (used to drive master key from .yaml file into sqlite database)
def api_update_master(self, new_key):
self._query(f"INSERT OR REPLACE INTO api_keys (key_id, key_name, key_uid, key_level) VALUES ('1', 'MASTER_KEY', '{new_key}', 99)")
return self._more_than_one_changed()
def api_key_generate(self, key_name='Not set', level=1):
generated_key = self.generate_api_key_str()
while self._fetch_one(table='api_keys', cell='key_id', where='key_uid', where_cmp=generated_key, limit=1):
generated_key = self.generate_api_key_str()
# self._insert(f"INSERT INTO api_keys (`key_uid`, `key_name`, `key_level`) VALUES ('{generated_key}', '{key_name}', '{level}')")
table_data = { 'key_uid': generated_key, 'key_name': key_name, 'key_level': level }
self._insert_dict('api_keys', table_data)
if self._more_than_one_changed():
return generated_key
raise SystemError("Failed to generate and store new API key to database!")
def generate_api_key_str(self):
s = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'm', 'n', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
return ''.join(random.sample(s, 16))
def api_key_update(self, key_uid, key_name=None, level=None):
# Find key in DB
key_id = self.api_key_get_id(key_uid)
query_update_level = ""
if level is not None:
query_update_level = f", `key_level`='{level}'"
# Rename key
if key_name is not None:
self._query(f"UPDATE `api_keys` SET `key_name`='{key_name}' {query_update_level} WHERE `key_id`='{key_id}'")
return self._more_than_one_changed()
return False
def api_key_delete(self, key_uid):
# Make sure we are not deleting master key!
res = self._fetch_one_query(f"SELECT `key_id` FROM `api_keys` WHERE `key_uid`='{key_uid}' AND `key_level` < '99' LIMIT 1")
key_id = res['key_id']
if not key_id:
return False
# Delete the KEY
query = f"DELETE FROM `api_keys` WHERE `key_id`='{key_id}'"
logger.debug(query)
self._query(query)
self._commit()
if self._more_than_one_changed():
# Delete dial access
self._query(f"DELETE FROM `dial_access` WHERE `key_id`='{key_id}'")
self._commit()
return self._more_than_one_changed()
return False
# -- Internal
def _insert_dict(self, table_name, dict_data):
cursor = self.connection.cursor()
attrib_names = ", ".join(dict_data.keys())
attrib_values = ", ".join("?" * len(dict_data.keys()))
sql = f"INSERT INTO {table_name} ({attrib_names}) VALUES ({attrib_values})"
cursor.execute(sql, list(dict_data.values()))
self._commit()
def _commit(self):
self.connection.commit()
def _insert(self, query):
self._query(query)
self.connection.commit()
def _query(self, query):
cursor = self.connection.cursor()
cursor.execute(query)
def _fetch_one(self, table, cell, where, where_cmp, limit=1):
cursor = self.connection.cursor()
query = f"SELECT {cell} FROM {table} WHERE {where} ='{where_cmp}' LIMIT {limit}"
logger.debug(query)
cursor.execute(query)
return cursor.fetchone()
def _fetch_one_query(self, query):
cursor = self.connection.cursor()
cursor.execute(query)
return cursor.fetchone()
def _fetch_all(self, query):
cursor = self.connection.cursor()
cursor.execute(query)
return cursor.fetchall()
def _more_than_one_changed(self):
if self.connection.total_changes > self.database_changes:
self.database_changes = self.connection.total_changes
return True
return False
def _init_database(self):
# Create DIALS table
self._query("""
CREATE TABLE IF NOT EXISTS dials (
"dial_id" INTEGER PRIMARY KEY AUTOINCREMENT,
"dial_uid" TEXT NOT NULL UNIQUE,
"dial_name" TEXT DEFAULT 'Not Set',
"dial_gen" TEXT DEFAULT 'VU1',
"dial_build_hash" TEXT DEFAULT '?',
"dial_fw_version" TEXT DEFAULT '?',
"dial_hw_version" TEXT DEFAULT '?',
"dial_protocol_version" TEXT DEFAULT 'V1',
"easing_dial_step" INTEGER DEFAULT 2,
"easing_dial_period" INTEGER DEFAULT 50,
"easing_backlight_step" INTEGER DEFAULT 5,
"easing_backlight_period" DEFAULT 100
)
""")
# Create API KEYS table
self._query("""
CREATE TABLE IF NOT EXISTS api_keys (
key_id INTEGER UNIQUE PRIMARY KEY AUTOINCREMENT ,
key_name TEXT,
key_uid TEXT NOT NULL UNIQUE,
key_level INTEGER)
""")
# Create DIAL ACCESS table
self._query("""
CREATE TABLE IF NOT EXISTS dial_access (
id INTEGER PRIMARY KEY AUTOINCREMENT,
dial_uid TEXT NOT NULL,
key_id INTEGER NOT NULL)
""")
self.connection.commit()