1- use std:: str:: FromStr ;
2- use std:: string:: FromUtf8Error ;
1+ use crate :: event:: config_cache;
32use crate :: load:: { load_config, MqttConfig } ;
3+ use crate :: man:: data:: DownloadData ;
4+ use crate :: man:: gw:: { DataWrapper , GwCmd , GwCmdResponse , ShellCmd } ;
5+ use crate :: man:: lora:: { LoRaNode , LoRaNodeManager } ;
6+ use crate :: man:: Id ;
7+ use crate :: protocol:: mqtt:: { LinkRx , LinkTx , Notification } ;
48use crate :: { DeviceError , DeviceResult , GLOBAL_DOWNLOAD_RESPONSE , GLOBAL_STATE } ;
5- use derive_new:: new;
6- use rumqttc:: { Event , Incoming , MqttOptions , QoS } ;
7- use std:: sync:: Mutex ;
8- use std:: time:: Duration ;
99use base64:: Engine ;
1010use bytes:: Bytes ;
11+ use common_define:: db:: { DbErr , DeviceLoraGateColumn , DeviceLoraGateEntity , Eui } ;
12+ use common_define:: event:: DownloadMessage ;
13+ use derive_new:: new;
14+ use rumqttc:: { Event , Incoming , MqttOptions , QoS } ;
1115use sea_orm:: { ColumnTrait , EntityTrait , QueryFilter } ;
1216use serde:: { Deserialize , Serialize } ;
17+ use std:: str:: FromStr ;
18+ use std:: string:: FromUtf8Error ;
19+ use std:: sync:: Mutex ;
20+ use std:: time:: Duration ;
1321use tokio:: sync:: mpsc;
1422use tracing:: { debug, error, info, trace, warn} ;
15- use common_define:: db:: { DbErr , DeviceLoraGateColumn , DeviceLoraGateEntity , Eui } ;
16- use common_define:: event:: DownloadMessage ;
17- use crate :: event:: config_cache;
18- use crate :: man:: data:: DownloadData ;
19- use crate :: man:: gw:: { DataWrapper , GwCmd , GwCmdResponse , ShellCmd } ;
20- use crate :: man:: Id ;
21- use crate :: man:: lora:: { LoRaNode , LoRaNodeManager } ;
22- use crate :: protocol:: mqtt:: { LinkRx , LinkTx , Notification } ;
23-
2423
2524#[ derive( Debug , Deserialize , Serialize ) ]
2625pub struct ForwardResult {
@@ -74,7 +73,6 @@ pub enum MqttError {
7473}
7574
7675impl MqPublisher {
77-
7876 pub fn new ( client : LinkTx ) -> MqPublisher {
7977 MqPublisher { client : Mutex :: new ( client) }
8078 }
@@ -88,21 +86,32 @@ impl MqPublisher {
8886pub struct MessageProcessor {
8987 rx : LinkRx ,
9088 sender : mpsc:: Sender < MqttMessage > ,
91- down : mpsc:: Sender < DownloadMessage >
89+ down : mpsc:: Sender < DownloadMessage > ,
9290}
9391
9492impl MessageProcessor {
95- pub fn new_with_sender ( rx : LinkRx , sender : mpsc:: Sender < MqttMessage > , down : mpsc:: Sender < DownloadMessage > ) -> Self {
93+ pub fn new_with_sender (
94+ rx : LinkRx ,
95+ sender : mpsc:: Sender < MqttMessage > ,
96+ down : mpsc:: Sender < DownloadMessage > ,
97+ ) -> Self {
9698 Self { sender, rx, down }
9799 }
98100
99-
100101 pub async fn start ( mut self ) {
101102 while let Ok ( message) = self . rx . next ( ) . await {
102103 if let Some ( Notification :: Forward ( publish) ) = message {
104+ let down = self . down . clone ( ) ;
103105 match String :: from_utf8 ( publish. publish . topic . to_vec ( ) ) {
104106 Ok ( topic) => {
105- tokio:: spawn ( process_mqtt ( MqttMessage :: new ( topic, publish. publish . payload ) , self . down . clone ( ) ) ) ;
107+ tokio:: spawn ( async move {
108+ if let Err ( e) =
109+ process_mqtt ( MqttMessage :: new ( topic, publish. publish . payload ) , down)
110+ . await
111+ {
112+ warn ! ( "mqtt payload: {}" , e)
113+ }
114+ } ) ;
106115 }
107116 Err ( _) => {
108117 warn ! ( "Snap Mqtt message contained invalid UTF-8" ) ;
@@ -113,7 +122,10 @@ impl MessageProcessor {
113122 }
114123}
115124
116- async fn process_mqtt ( message : MqttMessage , down : mpsc:: Sender < DownloadMessage > ) -> Result < ( ) , MqttError > {
125+ async fn process_mqtt (
126+ message : MqttMessage ,
127+ down : mpsc:: Sender < DownloadMessage > ,
128+ ) -> Result < ( ) , MqttError > {
117129 let mut topic = message. topic . splitn ( 3 , '/' ) ;
118130 topic. next ( ) ;
119131 if let Some ( user_id) = topic. next ( ) {
@@ -130,8 +142,12 @@ async fn process_mqtt(message: MqttMessage, down: mpsc::Sender<DownloadMessage>)
130142 }
131143 Ok ( ( ) )
132144}
133- async fn process_downlink ( user_id : Id , message : MqttMessage , down : mpsc:: Sender < DownloadMessage > ) -> Result < ( ) , MqttError > {
134- let mut topic = message. topic . split ( '/' ) ;
145+ async fn process_downlink (
146+ user_id : Id ,
147+ message : MqttMessage ,
148+ down : mpsc:: Sender < DownloadMessage > ,
149+ ) -> Result < ( ) , MqttError > {
150+ let mut topic = message. topic . split ( '/' ) ;
135151 topic. next ( ) ;
136152 topic. next ( ) ;
137153 topic. next ( ) ;
@@ -142,15 +158,11 @@ async fn process_downlink(user_id: Id, message: MqttMessage, down: mpsc::Sender<
142158 if let Some ( node) = LoRaNodeManager :: get_node_by_eui ( eui) . await ? {
143159 if node. info . user_id == Some ( user_id) {
144160 let data: ForwardPayload = serde_json:: from_slice ( & message. payload ) ?;
145- down. send ( DownloadMessage {
146- eui,
147- port : data. port ,
148- data : data. payload ,
149- } ) . await ;
161+ down. send ( DownloadMessage { eui, port : data. port , data : data. payload } ) . await ;
150162 let rx = GLOBAL_DOWNLOAD_RESPONSE . add ( eui) ;
151163 let topic = format ! ( "user/{}/device/{}/forward_result" , user_id, eui) ;
152164 if data. timeout > 60 {
153- return Ok ( ( ) )
165+ return Ok ( ( ) ) ;
154166 }
155167 match rx {
156168 Some ( rx) => {
@@ -159,10 +171,16 @@ async fn process_downlink(user_id: Id, message: MqttMessage, down: mpsc::Sender<
159171 Ok ( Ok ( response) ) => {
160172 let payload = serde_json:: to_string ( & ForwardResult {
161173 port : response. 1 ,
162- payload : base64:: engine:: general_purpose:: STANDARD . encode ( response. 0 . as_slice ( ) ) ,
163- status : 0
174+ payload : base64:: engine:: general_purpose:: STANDARD
175+ . encode ( response. 0 . as_slice ( ) ) ,
176+ status : 0 ,
164177 } ) ?;
165- GLOBAL_STATE . mq . publish ( crate :: integration:: mqtt:: MqttMessage :: new ( payload, topic) ) . await ?;
178+ GLOBAL_STATE
179+ . mq
180+ . publish ( crate :: integration:: mqtt:: MqttMessage :: new (
181+ payload, topic,
182+ ) )
183+ . await ?;
166184 }
167185 _ => {
168186 GLOBAL_DOWNLOAD_RESPONSE . get ( eui) ;
@@ -173,9 +191,12 @@ async fn process_downlink(user_id: Id, message: MqttMessage, down: mpsc::Sender<
173191 let payload = serde_json:: to_string ( & ForwardResult {
174192 port : 0 ,
175193 payload : "" . to_string ( ) ,
176- status : -1
194+ status : -1 ,
177195 } ) ?;
178- GLOBAL_STATE . mq . publish ( crate :: integration:: mqtt:: MqttMessage :: new ( payload, topic) ) . await ?;
196+ GLOBAL_STATE
197+ . mq
198+ . publish ( crate :: integration:: mqtt:: MqttMessage :: new ( payload, topic) )
199+ . await ?;
179200 }
180201 }
181202 }
@@ -184,16 +205,20 @@ async fn process_downlink(user_id: Id, message: MqttMessage, down: mpsc::Sender<
184205 Ok ( ( ) )
185206}
186207async fn process_gateway ( user_id : Id , message : MqttMessage ) -> Result < ( ) , MqttError > {
187- let mut topic = message. topic . split ( '/' ) ;
208+ let mut topic = message. topic . split ( '/' ) ;
188209 topic. next ( ) ;
189210 topic. next ( ) ;
190211 topic. next ( ) ;
191212 let eui_s = topic. next ( ) . ok_or ( MqttError :: EuiNotFound ) ?;
192213 let action = topic. next ( ) . ok_or ( MqttError :: ActionNotFound ) ?;
193214 if "up" == action {
194- let cmd: GwCmdResponse = serde_json:: from_slice ( message. payload . as_ref ( ) ) . map_err ( |e| MqttError :: SerdeError ( e) ) ?;
215+ let cmd: GwCmdResponse = serde_json:: from_slice ( message. payload . as_ref ( ) )
216+ . map_err ( |e| MqttError :: SerdeError ( e) ) ?;
195217 let eui = Eui :: from_str ( eui_s) ?;
196- let gate = DeviceLoraGateEntity :: find ( ) . filter ( DeviceLoraGateColumn :: Eui . eq ( eui) ) . one ( & GLOBAL_STATE . db ) . await ?;
218+ let gate = DeviceLoraGateEntity :: find ( )
219+ . filter ( DeviceLoraGateColumn :: Eui . eq ( eui) )
220+ . one ( & GLOBAL_STATE . db )
221+ . await ?;
197222 if let Some ( gate) = gate {
198223 match cmd {
199224 GwCmdResponse :: ShellCmd ( DataWrapper { id, data } ) => { }
0 commit comments