-
Notifications
You must be signed in to change notification settings - Fork 1
/
print_fleet.py
150 lines (125 loc) · 5.49 KB
/
print_fleet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import octorest
from requests.exceptions import ConnectionError
import time
class PrintFleet:
def __init__(self, printer_access):
self.printer_access = printer_access
self.selected_printer = {}
self.printers = {}
self.connect_clients()
self.prev_reconnect_time = time.time()
self.offline_timeout = 300
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
del self.printers
def connect_clients(self):
for printer, accessDict in self.printer_access.items():
self.printers[printer] = {"name": printer, "client": None, "print_job": None, 'status': 'unreachable',
'printing': False, 'details': {}}
try:
print(f"Connecting to {printer.capitalize()}... ", end='')
self.printers[printer]["client"] = \
octorest.OctoRest(url="http://" + accessDict["ip"] + ":" + accessDict["port"],
apikey=accessDict["apikey"])
print("Success")
except ConnectionError as e:
print("Failed")
# print("Connection Error")
pass
except RuntimeError as e:
print("Failed")
# print("Runtime Error")
pass
except TypeError:
print("Failed")
# print("Type Error")
pass
def update_status(self, queue_running):
for printer in self.printers.values():
i = 0
while True:
i += 1
printer['status'] = "offline"
printer['printing'] = False
printer['details'] = {}
try:
if not printer['client']:
tnow = time.time()
if tnow - self.prev_reconnect_time > self.offline_timeout:
print("Refreshing available printers, please wait")
self.connect_clients()
self.prev_reconnect_time = tnow
if printer['client']:
status = printer['client'].printer()
job_info = printer['client'].job_info()
# print(f"Octoprint Status for {printer['name']}:\n{status}\nend") # TODO make debug
printer['details'] = {'status': status,
'job_info': job_info
}
printer['printing'] = status['state']['flags']['printing']
if printer['printing']:
printer['status'] = "printing"
else:
printer['status'] = "available"
break
except ConnectionError as e:
# print(e) # TODO: logging
break
except RuntimeError as e:
# print(e) # TODO: logging
break
if i >= 3:
break
for name in queue_running:
try:
if not self.printers[name]['printing'] and self.printers[name]['status'] != "offline":
self.printers[name]['status'] = "finished"
except KeyError:
# not in printer list
continue
for printer in self.printers.values():
if printer['name'] not in queue_running and printer['printing']:
printer['status'] = "invalid"
def get_status(self):
status_dict = {"available": [], "printing": [], "finished": [], "invalid": [], "offline": []}
for printer in self.printers.values():
status_dict[printer["status"]].append(printer["name"])
return status_dict
def add_print(self, filename, path=""):
print(f"Uploading {filename}... ", end="")
self.selected_printer["client"].upload(filename, path=path)
print("Complete")
def select_print(self, filename):
self.selected_printer["client"].select(filename, print=False)
def run_print(self):
self.selected_printer["client"].start()
def cancel_print(self):
# print(self.selected_printer["client"].printer()['state'])
self.selected_printer["client"].cancel()
t0 = time.time()
while self.selected_printer["client"].printer()['state']['flags']['cancelling']:
time.sleep(0.5)
print(f"Cancel of {self.selected_printer['name']} complete, time taken: {time.time() - t0:.1f}s")
time.sleep(0.5)
def clear_files(self):
for file in self.selected_printer["client"].files()["files"]:
try:
self.selected_printer["client"].delete(file["display"])
except RuntimeError as e:
print(f"Error, usually from non-existent path? - {e}")
def select_printer(self, name):
self.selected_printer = self.printers[name]
# # TODO: Ad-hoc handling
# def file_names(self):
# """Retrieves the G-code file names from the
# OctoPrint server and returns a string message listing the
# file names.
#
# Args:
# client - the OctoRest client
# """
# message = "\nCurrent GCODE Files:\n"
# for i, k in enumerate(self.client.files()['files']):
# message += f"{i}.\t{k['name']}\n"
# print(message)