Skip to content

Commit bd1a01e

Browse files
committed
feat(app): handle graceful shutdown w/ ctrl+c, unsubscribe from mqtt topics on shutdown
Signed-off-by: Deep Panchal <[email protected]>
1 parent 1074087 commit bd1a01e

File tree

3 files changed

+68
-7
lines changed

3 files changed

+68
-7
lines changed

Cargo.lock

Lines changed: 49 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,6 @@ colored = "3.0.0"
1919
regex = "1.11.1"
2020
serde_json = "1.0.140"
2121
term_size = "0.3.2"
22-
tokio = "1.44.0"
22+
tokio = { version = "1.44.0", features = ["full"] }
2323
env_logger = "0.11.6"
2424
log = "0.4.26"

src/main.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use serde_json::Value;
1010
use std::error::Error;
1111
use std::hash::{Hash, Hasher};
1212
use std::sync::Arc;
13+
use tokio::signal;
1314
use tokio::sync::Mutex;
1415
use tokio::task;
1516
use tokio::time::{sleep, Duration};
@@ -163,6 +164,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
163164
let raw_client = iot_core_client.get_client().await;
164165
let client = Arc::new(Mutex::new(raw_client));
165166

167+
let shutdown_signal = async {
168+
signal::ctrl_c().await.expect("Failed to listen for Ctrl+C");
169+
debug!("Received shutdown signal, cleaning up...");
170+
};
171+
166172
match args.command {
167173
Some(CliCommand::Sub {
168174
topics,
@@ -185,7 +191,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
185191
let include_regex = include.map(|s| Regex::new(&s).unwrap());
186192
let exclude_regex = exclude.map(|s| Regex::new(&s).unwrap());
187193

188-
for topic in topic_list {
194+
for topic in topic_list.clone() {
189195
client
190196
.lock()
191197
.await
@@ -230,11 +236,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
230236
.unwrap();
231237
});
232238

233-
let (recv_result, listen_result) = tokio::join!(recv_thread, listen_thread);
234-
235-
// Propagate errors if any
236-
recv_result?;
237-
listen_result?;
239+
// Wait for either the threads to complete or the shutdown signal
240+
tokio::select! {
241+
_ = recv_thread => {}
242+
_ = listen_thread => {}
243+
_ = shutdown_signal => {
244+
for topic in topic_list.clone() {
245+
client.lock().await.unsubscribe(topic).await?;
246+
println!("{}", format!("Unsubscribed from topic: {}", topic).blue());
247+
}
248+
}
249+
}
238250
}
239251

240252
Some(CliCommand::Pub { topics, message }) => {

0 commit comments

Comments
 (0)