diff --git a/cosmic-applet-audio/src/mpris_subscription.rs b/cosmic-applet-audio/src/mpris_subscription.rs index bf387705..08af4eba 100644 --- a/cosmic-applet-audio/src/mpris_subscription.rs +++ b/cosmic-applet-audio/src/mpris_subscription.rs @@ -9,7 +9,7 @@ use mpris2_zbus::{ player::{PlaybackStatus, Player}, }; use tokio::join; -use zbus::Connection; +use zbus::{fdo::DBusProxy, Connection}; #[derive(Clone, Debug)] pub struct PlayerStatus { @@ -25,8 +25,8 @@ pub struct PlayerStatus { } impl PlayerStatus { - async fn new(player: Player) -> Self { - let metadata = player.metadata().await.unwrap(); + async fn new(player: Player) -> Option { + let metadata = player.metadata().await.ok()?; let title = metadata.title().map(Cow::from); let artists = metadata .artists() @@ -49,7 +49,7 @@ impl PlayerStatus { player.can_go_previous(), player.can_go_next() ); - Self { + Some(Self { icon, title, artists, @@ -59,7 +59,7 @@ impl PlayerStatus { can_go_previous: can_go_previous.unwrap_or_default(), can_go_next: can_go_next.unwrap_or_default(), player, - } + }) } } @@ -78,7 +78,7 @@ pub fn mpris_subscription( #[derive(Debug)] pub enum State { Setup, - Player(Player), + Player(Player, DBusProxy<'static>), Finished, } @@ -102,22 +102,23 @@ async fn update(state: State, output: &mut futures::channel::mpsc::Sender { let Ok(conn) = Connection::session().await else { tracing::error!("Failed to connect to session bus."); + _ = output.send(MprisUpdate::Finished).await; return State::Finished; }; let mut players = mpris2_zbus::media_player::MediaPlayer::new_all(&conn) .await .unwrap_or_else(|_| Vec::new()); + let Ok(dbus_proxy) = zbus::fdo::DBusProxy::builder(&conn) + .path("/org/freedesktop/DBus") + .unwrap() + .build() + .await + else { + tracing::error!("Failed to create dbus proxy."); + return State::Finished; + }; if players.is_empty() { - let Ok(dbus) = zbus::fdo::DBusProxy::builder(&conn) - .path("/org/freedesktop/DBus") - .unwrap() - .build() - .await - else { - tracing::error!("Failed to create dbus proxy."); - return State::Finished; - }; - let Ok(mut stream) = dbus.receive_name_owner_changed().await else { + let Ok(mut stream) = dbus_proxy.receive_name_owner_changed().await else { tracing::error!("Failed to receive name owner changed signal."); tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; // restart from the beginning @@ -126,61 +127,131 @@ async fn update(state: State, output: &mut futures::channel::mpsc::Sender { - let mut paused = player.receive_playback_status_changed().await; + State::Player(player, dbus_proxy) => { + let Ok(mut name_owner_changed) = player.receive_owner_changed().await else { + tracing::error!("Failed to receive owner changed signal."); + // restart from the beginning + return State::Setup; + }; let mut metadata_changed = player.receive_metadata_changed().await; + let Ok(mut new_mpris) = dbus_proxy.receive_name_owner_changed().await else { + tracing::error!("Failed to receive name owner changed signal."); + // restart from the beginning + return State::Setup; + }; + let conn = player.connection(); + let media_players = mpris2_zbus::media_player::MediaPlayer::new_all(&conn) + .await + .unwrap_or_else(|_| Vec::new()); + + let mut players = Vec::with_capacity(media_players.len()); + for p in media_players { + if let Ok(p) = p.player().await { + players.push(p); + } + } + loop { + let mut listeners = Vec::with_capacity(players.len()); + for p in &players { + listeners.push(p.receive_playback_status_changed().await); + } + let mut player_state_changed_list = Vec::with_capacity(listeners.len()); + for l in &mut listeners { + player_state_changed_list.push(Box::pin(async move { + let changed = l.next().await; + if let Some(c) = changed { + c.get().await.ok() + } else { + tracing::error!("Failed to receive playback status changed signal."); + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + None + } + })); + } + let any_player_state_changed = + futures::future::select_all(player_state_changed_list); let keep_going = tokio::select! { - p = paused.next() => { - p.is_some() - }, m = metadata_changed.next() => { m.is_some() }, + n = name_owner_changed.next() => { + n.map(|n| n.is_some()).unwrap_or_default() + }, + _ = new_mpris.next() => { + true + }, + _ = any_player_state_changed => { + true + }, }; - if keep_going { - let update = PlayerStatus::new(player.clone()).await; - let stopped = update.status == PlaybackStatus::Stopped; - _ = output.send(MprisUpdate::Player(update)).await; - if stopped { - _ = output.send(MprisUpdate::Setup).await; + if !keep_going { + break; + } + + if let Some(update) = PlayerStatus::new(player.clone()).await { + if matches!(update.status, PlaybackStatus::Stopped) { break; } + + // if paused check if any players are playing + // if they are, break + if !matches!(update.status, PlaybackStatus::Playing) { + let conn = player.connection(); + let players = mpris2_zbus::media_player::MediaPlayer::new_all(&conn) + .await + .unwrap_or_else(|_| Vec::new()); + if let Some(active) = find_active(players).await { + if active.destination() != player.destination() { + break; + } + } + } + _ = output.send(MprisUpdate::Player(update)).await; } else { break; } } + _ = output.send(MprisUpdate::Setup).await; State::Setup } State::Finished => iced::futures::future::pending().await, } } -async fn find_active(players: Vec) -> Option { +async fn find_active(mut players: Vec) -> Option { + // pre-sort by path so that the same player is always selected + players.sort_by(|a, b| { + let a = a.destination(); + let b = b.destination(); + a.cmp(&b) + }); let mut best = (0, None); let eval = |p: Player| async move { let v = { @@ -202,7 +273,7 @@ async fn find_active(players: Vec) -> Option { Err(_) => continue, }; let v = eval(p.clone()).await; - if v >= best.0 { + if v > best.0 { best = (v, Some(p)); } }