Skip to content

Commit

Permalink
Merge pull request #39 from flokli/mqtt-refactor
Browse files Browse the repository at this point in the history
mqtt: handle display demuxing
  • Loading branch information
flokli authored Jul 20, 2024
2 parents 61847dd + aafd13f commit d23e56c
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 51 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.nix
Original file line number Diff line number Diff line change
Expand Up @@ -1760,6 +1760,10 @@ rec {
name = "eyre";
packageId = "eyre";
}
{
name = "parking_lot";
packageId = "parking_lot";
}
{
name = "rumqttc";
packageId = "rumqttc";
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ clap = { version = "4.5.9", features = ["derive"] }
color-eyre = "0.6.3"
edid-rs = "0.1.0"
eyre = "0.6.12"
parking_lot = "0.12.3"
rumqttc = "0.24.0"
serde = { version = "1.0.204", features = ["derive"] }
serde_json = "1.0.120"
Expand Down
2 changes: 1 addition & 1 deletion src/browser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tracing::{debug, warn};
use wry::WebViewBuilder;

/// Commands to control the browser instance.
#[derive(Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(tag = "kind")]
pub enum Command {
LoadUrl { url: String },
Expand Down
23 changes: 16 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ struct Cli {

#[arg(long = "default-config")]
default_config_path: Option<String>,

#[arg(long)]
mqtt_topic_prefix: Option<String>,
}

fn main() -> color_eyre::eyre::Result<()> {
Expand Down Expand Up @@ -121,14 +124,20 @@ fn main() -> color_eyre::eyre::Result<()> {

let (tx, rx) = channel();

let listener = mqtt::Listener {
id: config.id.unwrap_or(display_info.serial),
host: config.host,
port: config.port,
sender: tx,
};
let listener = mqtt::Listener::new(
config.id.unwrap_or_else(|| display_info.serial.clone()),
config.host,
config.port,
cli.mqtt_topic_prefix
.unwrap_or_else(|| "screens".to_string()),
)?;

// register our display
// FUTURWORK: multiple display support
listener
.add_display(&display_info, tx)
.context("adding display")?;

listener.start().context("starting the mqtt listener")?;
fossbeamer::spawn_browser(cli.url, rx)?;

Ok(())
Expand Down
164 changes: 121 additions & 43 deletions src/mqtt.rs
Original file line number Diff line number Diff line change
@@ -1,61 +1,139 @@
use bstr::BStr;
use fossbeamer::Command;
use rumqttc::{Client, ClientError, MqttOptions, Packet, Publish};
use std::{sync::mpsc::Sender, thread, time::Duration};
use eyre::Context;
use fossbeamer::{Command, Info};
use parking_lot::RwLock;
use rumqttc::{Client, MqttOptions, Packet, Publish};
use std::{
collections::HashMap,
sync::{mpsc::Sender, Arc},
thread,
time::Duration,
};
use tracing::{debug, info, warn, Span};

/// Maintains a connection to an MQTT broker.
pub(crate) struct Listener {
pub id: String,
pub host: String,
pub port: u16,
pub sender: Sender<fossbeamer::Command>,
/// The MQTT client
client: rumqttc::Client,

/// The topic that's prepended before IDs in the topic
topic_prefix: String,

/// Senders expecting commands to be sent to, keyed by their topic.
senders: Arc<RwLock<HashMap<String, Sender<fossbeamer::Command>>>>,
}

impl Listener {
pub(crate) fn start(self) -> Result<(), ClientError> {
let (client, mut connection) =
Client::new(MqttOptions::new(&self.id, self.host, self.port), 64);

client.subscribe("screens", rumqttc::QoS::AtLeastOnce)?;
client.subscribe(format!("screens/{}", self.id), rumqttc::QoS::AtLeastOnce)?;

thread::spawn(move || {
for event in connection.iter() {
match event {
Ok(event) => match event {
rumqttc::Event::Incoming(Packet::Publish(Publish {
topic,
payload,
..
})) => {
Span::current().record("topic", &topic);
match serde_json::from_slice::<Command>(&payload) {
Ok(command) => {
info!(?command, "received command");

self.sender.send(command).unwrap();
}
Err(e) => {
warn!(err=%e, payload=%BStr::new(&payload), "received payload that couldn't be parsed");
/// Prepares a connection to the broker, and spawns off a thread dealing
/// with received messages.
/// It spawns off a thread relaying messages to the Senders added in a
/// [add_display] call.
pub fn new(
id: impl Into<String>,
host: impl Into<String>,
port: u16,
topic_prefix: impl Into<String> + Clone,
) -> eyre::Result<Self> {
let (client, mut connection) = Client::new(MqttOptions::new(id, host, port), 64);

let senders = Arc::new(RwLock::new(
HashMap::<String, Sender<fossbeamer::Command>>::new(),
));

let topic_prefix: String = topic_prefix.into();
let catchall_topic = topic_prefix.clone();

thread::spawn({
let senders = senders.clone();
let catchall_topic = catchall_topic.clone();
move || {
for event in connection.iter() {
match event {
Ok(event) => match event {
rumqttc::Event::Incoming(Packet::Publish(Publish {
topic,
payload,
..
})) => {
Span::current().record("topic", &topic);

// parse the command
let command = match serde_json::from_slice::<Command>(&payload) {
Ok(command) => {
info!(?command, "received command");
command
}
Err(e) => {
warn!(err=%e, payload=%BStr::new(&payload), "received payload that couldn't be parsed");
continue;
}
};

if topic == catchall_topic {
for (_topic, sender) in senders.read().iter() {
if let Err(e) = sender.send(command.clone()) {
warn!(err=%e, "unable to send command to tx");
}
}
} else {
match senders.read().get(&topic) {
None => {
warn!("couldn't find topic");
continue;
}
Some(tx) => {
if let Err(e) = tx.send(command) {
warn!(err=%e, "unable to send command to tx");
}
}
}
}
}
rumqttc::Event::Incoming(incoming) => {
debug!(?incoming, "other incoming event");
}
rumqttc::Event::Outgoing(out) => {
debug!(?out, "outgoing event");
}
},
Err(e) => {
warn!(err=%e, "connection error");
// sleep a bit
std::thread::sleep(Duration::from_secs(5));
}
rumqttc::Event::Incoming(incoming) => {
debug!(?incoming, "other incoming event");
}
rumqttc::Event::Outgoing(out) => {
debug!(?out, "outgoing event");
}
},
Err(e) => {
warn!(err=%e, "connection error");
// sleep a bit
std::thread::sleep(Duration::from_secs(5));
}
}
}
});

// subscribe to the catchall
client
.subscribe(catchall_topic, rumqttc::QoS::AtLeastOnce)
.context("subscribing to catchall topic")?;

Ok(Self {
client,
senders,
topic_prefix,
})
}

/// Register a new display, using the passed display_info.
/// `set` requests received are sent to the passed channel.
pub fn add_display(
&self,
display_info: &Info,
tx: Sender<fossbeamer::Command>,
) -> eyre::Result<()> {
let k = &display_info.serial;
let topic_str = format!("{}/{}", self.topic_prefix, k);

self.client
.subscribe(&topic_str, rumqttc::QoS::AtLeastOnce)
.context("subscribing to topic")?;

self.senders.write().insert(topic_str, tx);

Ok(())
}
}

0 comments on commit d23e56c

Please sign in to comment.