-
Notifications
You must be signed in to change notification settings - Fork 0
/
firebase_connector.py
executable file
·153 lines (119 loc) · 5.08 KB
/
firebase_connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
from google.cloud import firestore
import credentials
import fnmatch
import datetime
import secrets
# We use Cloud Firestore instead of Realtime Database since Firestore supports more query flexibility (better arrays!)
# Read more here: https://firebase.googleblog.com/2018/08/better-arrays-in-cloud-firestore.html
# We do not use asyncio since coroutines take too much time and threads require many event loops (we need numerous functions and for our current scale, it should be fine)
class FirebaseConnector:
def __init__(self):
# Establish Firestore Client connection
self.db = firestore.Client()
def get_all_tokens(self):
tokens = []
colors = self.db.collection(u'tokens').stream()
for color in colors:
tokens.extend(list(color.to_dict().keys()))
return tokens
def get_current_users(self):
current_users = self.db.collection(u'participants').stream()
return [user.id for user in current_users]
def add_user(self, user_id, first_name):
self.db.collection(u'participants').document(f'{user_id}').set({
u'last_hint': None,
u'name': f'{first_name}',
u'student_id': 0
})
def get_name(self, user_id):
user = self.db.collection(u'participants').document(f'{user_id}')
try:
return user.get().to_dict()['name']
except TypeError as e:
if len(e.args) > 0 and e.args[0] == "'NoneType' object is not subscriptable":
return 0
else:
raise e
def get_all_current_student_id(self):
student_ids = []
current_users = self.db.collection(u'participants').stream()
for user in current_users:
student_ids.append(user.to_dict()['student_id'])
return student_ids
def get_student_id(self, user_id):
user = self.db.collection(u'participants').document(f'{user_id}')
try:
return user.get().to_dict()['student_id']
except TypeError as e:
if len(e.args) > 0 and e.args[0] == "'NoneType' object is not subscriptable":
return 0
else:
raise e
def update_student_id(self, user_id, student_id):
self.db.collection(u'participants').document(
f'{user_id}').set({u'student_id': student_id}, merge=True)
def get_last_hint_time(self, user_id):
user = self.db.collection(u'participants').document(f'{user_id}')
return user.get().to_dict()['last_hint']
def get_hint(self):
tokens = []
colors = self.db.collection(u'tokens').stream()
for color in colors:
tokens.append(color.to_dict())
unclaimed_hints = []
for color in tokens:
for token, value in color.items():
if value['claimed'] == False:
for order in ['first_hint', 'second_hint', 'third_hint']:
unclaimed_hints.append(value[order])
try:
# Select a random unclaimed hint
return secrets.choice(unclaimed_hints)
# Catch specific error
except IndexError as e:
if len(e.args) > 0 and e.args[0] == 'Cannot choose from an empty sequence':
return ''
else:
raise e
def update_last_hint_time(self, user_id, time):
self.db.collection(u'participants').document(
f'{user_id}').set({u'last_hint': time}, merge=True)
def get_unclaimed_tokens(self):
tokens = []
colors = self.db.collection(u'tokens').stream()
for color in colors:
tokens.append(color.to_dict())
unclaimed_tokens = []
for color in tokens:
for token, value in color.items():
if value['claimed'] == False:
unclaimed_tokens.append(token)
return unclaimed_tokens
def get_all_users(self):
all_users = []
users = self.db.collection(u'participants').stream()
for user in users:
all_users.append(user.id)
return all_users
def claim_token(self, user_id, token, verification_hash):
color = self.db.collection(u'tokens').where(
f'`{token}`.`claimed`', '==', False).stream()
color = [i.id for i in color][0]
self.db.collection(u'tokens').document(f'{color}').set({
f'{token}': {
u'claimant': f'{user_id}',
u'claimed': True,
u'hash': f'{verification_hash}'
}
}, merge=True)
def get_all_hash(self):
tokens = []
colors = self.db.collection(u'tokens').stream()
for color in colors:
tokens.append(color.to_dict())
hashes = []
for color in tokens:
for token, value in color.items():
if value['claimed'] == True:
hashes.append(value['hash'])
return hashes