-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Cliente MQTT + Simulador mejorado (#7)
* changed protocol from AMQP to MQTT in rabbitmq client * add unused device_id * change int to float in temperature & light. light sent with lux unit. add id_device. add time_stamp with arg tz * add executable to send simple package * add main as cli to run executables more easily * fix lint * fix requeriment & dockerfile * remove unused requeriment * fix lint * fix unique id_device in simulator * add light with LUX unity to send ! * add executable to send deviated package * rename command of simulator * add README * update .env.dist * add -pn lower case
- Loading branch information
Showing
12 changed files
with
679 additions
and
144 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,10 @@ | ||
RABBITMQ_HOST= | ||
QUEUE_NAME= | ||
LOGGING_LEVEL= | ||
WEATHER_API_KEY= | ||
WEATHER_API_URL= | ||
WEATHER_API_URL= | ||
|
||
# RabbitMQ - MQTT broker details | ||
MQTT_HOST= | ||
MQTT_PORT= | ||
MQTT_USERNAME= | ||
MQTT_PASSWORD= | ||
MQTT_TOPIC= |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,7 @@ | ||
pika | ||
requests | ||
python-dotenv | ||
python-dotenv | ||
paho-mqtt | ||
cachetools==3.1. | ||
typer[all] | ||
pandas | ||
typing-extensions |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,52 +1,102 @@ | ||
import pika | ||
import os | ||
import time | ||
import logging | ||
import paho.mqtt.client as mqtt | ||
|
||
FIRST_RECONNECT_DELAY = 1 | ||
RECONNECT_RATE = 2 | ||
MAX_RECONNECT_COUNT = 12 | ||
MAX_RECONNECT_DELAY = 60 | ||
logger = logging.getLogger("simulator") | ||
|
||
class Middleware: | ||
|
||
class Middleware: | ||
def __init__(self): | ||
rabbitmq_host = os.environ.get("RABBITMQ_HOST", "localhost") | ||
self._connection = pika.BlockingConnection( | ||
pika.ConnectionParameters(host=rabbitmq_host) | ||
) | ||
self._channel = self._connection.channel() | ||
self._exit = False | ||
self._remake = False | ||
|
||
def create_queue(self, queue_name): | ||
self._channel.queue_declare(queue=queue_name) | ||
|
||
def _setup_message_consumption(self, queue_name, user_function): | ||
self._channel.basic_consume(queue=queue_name, | ||
on_message_callback=lambda channel, | ||
method, properties, body: | ||
(user_function(body), | ||
channel.basic_ack | ||
(delivery_tag=method.delivery_tag), | ||
self._verify_connection_end())) | ||
self._channel.start_consuming() | ||
|
||
def _verify_connection_end(self): | ||
if self._exit: | ||
self._channel.close() | ||
if self._remake: | ||
self._exit = False | ||
self._channel = self._connection.channel() | ||
|
||
def finish(self, open_new_channel=False): | ||
self._exit = True | ||
self._remake = open_new_channel | ||
|
||
# Work queue methods | ||
def listen_on(self, queue_name, user_function): | ||
self.create_queue(queue_name) | ||
self._channel.basic_qos(prefetch_count=30) | ||
self._setup_message_consumption(queue_name, user_function) | ||
|
||
def send_message(self, queue_name, message): | ||
self._channel.basic_publish(exchange='', | ||
routing_key=queue_name, | ||
body=message) | ||
self._client = mqtt.Client() | ||
self._client.username_pw_set(os.environ.get("MQTT_USERNAME"), | ||
os.environ.get("MQTT_PASSWORD")) | ||
self._topic = None | ||
|
||
def _on_connect(self, client, userdata, flags, rc): | ||
if rc == 0 and self._client.is_connected(): | ||
logger.info(f"Connected successfully with code {rc}: " + | ||
f"{mqtt.connack_string(rc)}") | ||
if self._topic is not None: | ||
self._client.subscribe(self._topic) | ||
logger.info(f"Subscribed to '{self._topic}' topic") | ||
else: | ||
logger.info(f"Connect failed with code {rc}: " + | ||
f"{mqtt.connack_string(rc)}") | ||
|
||
def _on_disconnect(self, client, userdata, rc): | ||
logger.info(f"Disconnected with result code {rc}: " + | ||
f"{mqtt.error_string(rc)}") | ||
if rc == 0: | ||
logger.info(f"Disconnect successfully with code {rc}: " + | ||
f"{mqtt.error_string(rc)}") | ||
return | ||
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY | ||
while reconnect_count < MAX_RECONNECT_COUNT: | ||
logger.info("Reconnecting in %d seconds...", reconnect_delay) | ||
time.sleep(reconnect_delay) | ||
|
||
try: | ||
client.reconnect() | ||
logger.info("Reconnected successfully!") | ||
return | ||
except Exception as err: | ||
logger.error("%s. Reconnect failed. Retrying...", err) | ||
|
||
reconnect_delay *= RECONNECT_RATE | ||
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY) | ||
reconnect_count += 1 | ||
logger.info("Reconnect failed after %s attempts. Exiting...", | ||
reconnect_count) | ||
|
||
def _on_publish(self, client, userdata, mid): | ||
logger.info(f"Message publishd! Message id: {mid}") | ||
|
||
def listen_on(self, user_function, topic): | ||
self._topic = topic | ||
self._client.on_message = user_function | ||
|
||
def send_message(self, topic, msg, qos=2): | ||
result = self._client.publish(topic, msg, qos=qos) | ||
result.wait_for_publish() | ||
status = result[0] | ||
if status == 0: | ||
logger.info(f"Succesfully sent message '{msg}' to topic {topic} " + | ||
f"with qos {qos} and message id {result[1]}. " + | ||
"Result code: " + | ||
f"{status}: {mqtt.error_string(status)}") | ||
else: | ||
logger.info(f"Failed to send message '{msg}' to topic {topic}. " + | ||
f"with qos {qos} and message id {result[1]}. " + | ||
"Result code: " + | ||
f"{status}: {mqtt.error_string(status)}") | ||
|
||
def __del__(self): | ||
self._connection.close() | ||
self.finish() | ||
|
||
def connect(self): | ||
self._client.on_connect = self._on_connect | ||
self._client.on_disconnect = self._on_disconnect | ||
self._client.on_publish = self._on_publish | ||
self._client.connect(os.environ.get("MQTT_HOST"), | ||
int(os.environ.get("MQTT_PORT"))) | ||
logger.info("Connecting to " + | ||
f"{os.environ.get('MQTT_HOST')}:" + | ||
f"{os.environ.get('MQTT_PORT')}") | ||
|
||
def run(self): | ||
self._client.loop_start() | ||
while True: | ||
self._client.loop() | ||
if self._client.is_connected(): | ||
logger.info("Connected to broker. Listening for messages or " + | ||
"ready to send.") | ||
break | ||
|
||
def finish(self): | ||
self._client.loop_stop() | ||
self._client.disconnect() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.