-
Notifications
You must be signed in to change notification settings - Fork 35
/
psmqtt.py
154 lines (135 loc) · 4.19 KB
/
psmqtt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/env python
#
# 1. Read configuration pointed to by PSMQTTCONFIG env var, or use `psmqtt.conf`
# by default.
# 2. Extract from config file settings, e.g. mqtt broker and schedule.
# 2. Execute schedule...
# 3. Perform tasks from the schedule, which involves reading sensors and sending
# values to the broker
#
from datetime import datetime
from dateutil.rrule import rrulestr # pip install python-dateutil
import os
#import sys
import sched
import socket
import logging
import sys
from threading import Thread
import time
from typing import Any, Dict, List, Union
from src.config import load_config
def run_tasks(tasks: Union[str, Dict[str, str],
List[Union[Dict[str, str], str]]]) -> None:
'''
tasks come from conf file, e.g.:
["cpu_percent", "virtual_memory/percent"]
or
"disk_usage/percent/|"
or
{"boot_time/{{x|uptime}}": "uptime" }
'''
# delayed import to enable PYTHONPATH adjustment
from src.task import run_task
logging.debug("run_tasks(%s)", tasks)
if isinstance(tasks, dict):
for k in tasks:
run_task(k, tasks[k])
elif isinstance(tasks, list):
for task in tasks:
if isinstance(task, dict):
for k,t in task.items():
run_task(k, t)
else:
run_task(task, task)
else:
run_task(tasks, tasks)
return
class TimerThread(Thread):
def __init__(self, scheduler:sched.scheduler):
super().__init__(daemon=True)
self.scheduler = scheduler
return
def run(self) -> None:
self.scheduler.run()
return
def on_timer(s:sched.scheduler, dt: str, tasks: Any) -> None:
logging.debug("on_timer(%s, %s, %s)", s, dt, tasks)
run_tasks(tasks)
# add next timer task
now = datetime.now()
# need reparse rule (see #10)
delay = (rrulestr(dt).after(now) - now).total_seconds()
s.enter(delay, 1, on_timer, (s, dt, tasks))
return
def run() -> None:
'''
Main loop
'''
#
# read initial config files - this may exit(2)
#
cf = load_config()
pythonpath = cf.get('pythonpath', '')
if pythonpath:
logging.debug("Adding to PYTHONPATH: '%s'", pythonpath)
sys.path.append(pythonpath)
topic_prefix = cf.get(
'mqtt_topic_prefix', f'psmqtt/{socket.gethostname()}/')
request_topic = cf.get('mqtt_request_topic', 'request')
if request_topic != '':
request_topic = topic_prefix + request_topic + '/'
# delayed import to enable PYTHONPATH adjustment
from recurrent import RecurringEvent # pip install recurrent
from src.task import MqttClient
#
# create MqttClient
#
mqttc = MqttClient(
cf.get('mqtt_clientid', 'psmqtt-%s' % os.getpid()),
cf.get('mqtt_clean_session', False),
topic_prefix,
request_topic,
cf.get('mqtt_qos', 0),
cf.get('mqtt_retain', False))
#
# connect MqttClient to broker
#
mqttc.connect(
cf.get('mqtt_broker', 'localhost'),
int(cf.get('mqtt_port', '1883')),
cf.get('mqtt_username', ''),
cf.get('mqtt_password', None))
#
# parse schedule
#
schedule = cf.get('schedule', {})
assert isinstance(schedule, dict)
if not schedule:
logging.error("No schedule to execute, exiting")
return
s = sched.scheduler(time.time, time.sleep)
now = datetime.now()
for t, tasks in schedule.items():
logging.debug("Periodicity: '%s'", t)
logging.debug("Tasks: '%s'", tasks)
r = RecurringEvent()
dt = r.parse(t)
if not r.is_recurring:
logging.error(t + " is not recurring time. Skipping")
continue
delay = (rrulestr(dt).after(now) - now).total_seconds()
s.enter(delay, 1, on_timer, (s, dt, tasks))
TimerThread(s).start()
while True:
try:
mqttc.mqttc.loop_forever()
except socket.error:
logging.debug("socket.error caught, sleeping for 5 sec...")
time.sleep(5)
except KeyboardInterrupt:
logging.debug("KeyboardInterrupt caught, exiting")
break
return
if __name__ == '__main__':
run()