Skip to content

Commit d4351b1

Browse files
committed
feat(app): add mqtt port flag, use rumqttc client directly for sub/pub, enable clean session
Signed-off-by: Deep Panchal <[email protected]>
1 parent d1d22e0 commit d4351b1

File tree

3 files changed

+34
-10
lines changed

3 files changed

+34
-10
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ license = "MIT"
1212

1313
[dependencies]
1414
aws-iot-device-sdk-rust = "0.6.0"
15+
rumqttc = "0.24.0"
1516
chrono = "0.4.40"
1617
clap = { version = "4.5.31", features = ["derive", "env"] }
1718
colored = "3.0.0"

src/main.rs

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use aws_iot_device_sdk_rust::settings::MQTTOptionsOverrides;
12
use aws_iot_device_sdk_rust::{
23
async_event_loop_listener, AWSIoTAsyncClient, AWSIoTSettings, Packet, QoS,
34
};
@@ -39,6 +40,10 @@ struct Args {
3940
#[arg(long, env = "AWS_IOT_ENDPOINT")]
4041
endpoint: String,
4142

43+
/// AWS IoT endpoint URL
44+
#[arg(long, env = "AWS_IOT_PORT", default_value = "8883")]
45+
port: u16,
46+
4247
/// Client ID for MQTT connection
4348
#[arg(long, env = "AWS_IOT_CLIENT_ID")]
4449
client_id: String,
@@ -119,20 +124,33 @@ async fn main() -> Result<(), Box<dyn Error>> {
119124

120125
debug!("Parsed CLI arguments: {:?}", args);
121126

127+
let mqtt_option_overrides = MQTTOptionsOverrides {
128+
port: Some(args.port),
129+
clean_session: Some(true),
130+
keep_alive: None,
131+
max_packet_size: None,
132+
request_channel_capacity: None,
133+
pending_throttle: None,
134+
inflight: None,
135+
last_will: None,
136+
conn_timeout: None,
137+
transport: None,
138+
};
122139
let aws_settings = AWSIoTSettings::new(
123140
args.client_id.clone(),
124141
args.root_ca.to_str().unwrap().to_string(),
125142
args.device_cert.to_str().unwrap().to_string(),
126143
args.private_key.to_str().unwrap().to_string(),
127144
args.endpoint.clone(),
128-
None,
145+
Some(mqtt_option_overrides),
129146
);
130147

131148
debug!("Connecting with client_id: {}", args.client_id.blue());
132149
debug!("Using endpoint: {}", args.endpoint);
133150

134-
let (iot_core_client, event_loop) = AWSIoTAsyncClient::new(aws_settings).await?;
135-
let iot_core_client = Arc::new(Mutex::new(iot_core_client));
151+
let (iot_core_client, (event_loop, sender)) = AWSIoTAsyncClient::new(aws_settings).await?;
152+
let raw_client = iot_core_client.get_client().await;
153+
let client = Arc::new(Mutex::new(raw_client));
136154

137155
match args.command {
138156
Some(CliCommand::Sub {
@@ -157,7 +175,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
157175
let exclude_regex = exclude.map(|s| Regex::new(&s).unwrap());
158176

159177
for topic in topic_list {
160-
iot_core_client
178+
client
161179
.lock()
162180
.await
163181
.subscribe(topic.to_string(), QoS::AtMostOnce)
@@ -166,7 +184,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
166184
}
167185

168186
// For subscriptions, keep listening to messages
169-
let receiver = iot_core_client.lock().await.get_receiver().await;
187+
let receiver = sender.subscribe();
170188
let receiver = Arc::new(Mutex::new(receiver));
171189

172190
let recv_thread = task::spawn(async move {
@@ -196,7 +214,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
196214
});
197215

198216
let listen_thread = task::spawn(async move {
199-
async_event_loop_listener(event_loop).await.unwrap();
217+
async_event_loop_listener((event_loop, sender))
218+
.await
219+
.unwrap();
200220
});
201221

202222
let (recv_result, listen_result) = tokio::join!(recv_thread, listen_thread);
@@ -210,7 +230,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
210230
let topic_list: Vec<&str> = topics.split(',').collect();
211231

212232
// Create a receiver to drain incoming events
213-
let receiver = iot_core_client.lock().await.get_receiver().await;
233+
let receiver = sender.subscribe();
214234
let receiver = Arc::new(Mutex::new(receiver));
215235
let drain_task = task::spawn(async move {
216236
while (receiver.lock().await.recv().await).is_ok() {
@@ -222,10 +242,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
222242
});
223243

224244
for topic in topic_list {
225-
iot_core_client
245+
client
226246
.lock()
227247
.await
228-
.publish(topic.to_string(), QoS::AtMostOnce, message.to_string())
248+
.publish(topic, QoS::AtMostOnce, false, message.clone())
229249
.await?;
230250
println!("{}", format!("Published to topic: {}", topic).blue());
231251
match serde_json::from_str::<Value>(&message) {
@@ -236,7 +256,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
236256

237257
// Run the event loop briefly to ensure the message is sent
238258
let event_loop_task = task::spawn(async move {
239-
async_event_loop_listener(event_loop).await.unwrap();
259+
async_event_loop_listener((event_loop, sender))
260+
.await
261+
.unwrap();
240262
});
241263

242264
// Allow time for the event loop to process the message

0 commit comments

Comments
 (0)