You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I want to manually listen to the linux signal in rust to handle the disconnection。
but has bug.
My guess is that there is a processing signal at the c, but this signal is intercepted by my upper layer, causing the c layer not to process it?
full code
use std::{env, process, thread, time::Duration};use tokio::signal::unix::{signal,SignalKind};externcrate paho_mqtt as mqtt;constDFLT_BROKER:&str = "tcp://localhost:1883";constDFLT_CLIENT:&str = "rust_subscribe";constDFLT_TOPICS:&[&str] = &["rust/mqtt","rust/test"];// The qos list that match topics above.constDFLT_QOS:&[i32] = &[0,1];// Reconnect to the broker when connection is lost.asyncfntry_reconnect(cli:&mqtt::AsyncClient) -> bool{println!("Connection lost. Waiting to retry connection");for _ in0..12{
thread::sleep(Duration::from_millis(5000));if cli.reconnect().await.is_ok(){println!("Successfully reconnected");returntrue;}}println!("Unable to reconnect after several attempts.");false}// Subscribes to multiple topics.asyncfnsubscribe_topics(cli:&mqtt::AsyncClient){ifletErr(e) = cli.subscribe_many(DFLT_TOPICS,DFLT_QOS).await{println!("Error subscribes topics: {:?}", e);
process::exit(1);}}#[tokio::main]asyncfnmain(){let host = env::args().nth(1).unwrap_or_else(|| DFLT_BROKER.to_string());// Define the set of options for the create.// Use an ID for a persistent session.let create_opts = mqtt::CreateOptionsBuilder::new().server_uri(host).client_id(DFLT_CLIENT.to_string()).persistence(None).finalize();// Create a client.let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|err| {println!("Error creating the client: {:?}", err);
process::exit(1);});let cloned_cli = cli.clone();
tokio::spawn(asyncmove{// Initialize the consumer before connecting.let rx = cli.start_consuming();let lwt_props = mqtt::properties!(mqtt::PropertyCode::WillDelayInterval => 10);// Define the set of options for the connection.let lwt = mqtt::MessageBuilder::new().topic("lost_connection_topic").payload("{}").properties(lwt_props).qos(1).finalize();let conn_opts = mqtt::ConnectOptionsBuilder::new().keep_alive_interval(Duration::from_secs(20)).clean_session(false).will_message(lwt).finalize();// Connect and wait for it to complete or fail.ifletErr(e) = cli.connect(conn_opts).await{println!("Unable to connect:\n\t{:?}", e);
process::exit(1);}// Subscribe topics.subscribe_topics(&cli).await;println!("Processing requests...");for msg in rx.iter(){ifletSome(msg) = msg {println!("{}", msg);}elseif !cli.is_connected(){iftry_reconnect(&cli).await{println!("Resubscribe topics...");subscribe_topics(&cli).await;}else{break;}}}});letmut stream = signal(SignalKind::terminate()).unwrap();
stream.recv().await;println!("gruceful shutdown")}
start it
cargo run
stop it
ctrl + c
At this point the process will get stuck。
my dependencies
[dependencies]
paho-mqtt = "0.11.1"
emqx server version: 5.0.13
The text was updated successfully, but these errors were encountered:
I have no idea, but I’m not sure what you’re doing here.
I think you should just block on the spawned tokio task to await its completion. Or, for that matter, you appear to be running in an asynchronous tokio main function, so you don’t need a separate async task to run the MQTT connection?
I want to manually listen to the linux signal in rust to handle the disconnection。
but has bug.
My guess is that there is a processing signal at the c, but this signal is intercepted by my upper layer, causing the c layer not to process it?
full code
start it
stop it
At this point the process will get stuck。
my dependencies
emqx server version: 5.0.13
The text was updated successfully, but these errors were encountered: