@@ -93,13 +93,14 @@ def __init__(self, gateway, config, connector_type):
9393 )
9494 for obj in self .config ['paths' ]
9595 ]
96- self .__log .info (' FTP Connector started.' )
96+ self .__log .info (" FTP Connector started with %s and %d" , self . host , self . port )
9797
9898 def open (self ):
9999 self .__stopped = False
100100 self .start ()
101101
102102 def run (self ):
103+ self .__log .debug ('Starting connector loop' )
103104 try :
104105 while not self .__stopped :
105106 with self .__ftp () as ftp :
@@ -115,16 +116,20 @@ def run(self):
115116 if self .__stopped :
116117 break
117118 except Exception as e :
118- self .__log .exception (e )
119+ self .__log .error ("Unexpected exception in loop for %s %d with error %r" , self .host , self .port , str (e ))
120+ self .__log .debug ("Error:" , exc_info = e )
119121 try :
120122 self .close ()
121123 except Exception as e :
122- self .__log .exception (e )
124+ self .__log .error (
125+ "Can not close the connection for %s %d with error %r" , self .host , self .port , str (e ))
126+ self .__log .debug ("Error:" , exc_info = e )
123127 while True :
124128 if self .__stopped :
125129 break
126130
127131 def __connect (self , ftp ):
132+ self .__log .debug ("Connecting to ftp server on %s:%d" , self .host , self .port )
128133 try :
129134 ftp .connect (self .host , self .port )
130135
@@ -135,13 +140,16 @@ def __connect(self, ftp):
135140 self .__log .info ('Data protection level set to "private"' )
136141 else :
137142 ftp .login (self .security ['username' ], self .security ['password' ])
143+ self .__log .info ("Logged in as %s" , str (self .security ['username' ]))
144+
138145
139146 except Exception as e :
140- self .__log .error (e )
147+ self .__log .error ("Connection failed to %s:%d: due to %r" , self .host , self .port , str (e ))
148+ self .__log .debug ("Error:" , exc_info = e )
141149 sleep (10 )
142150 else :
143151 self ._connected = True
144- self .__log .info (' Connected to FTP server' )
152+ self .__log .info (" Connected to FTP server to %s:%d" , self . host , self . port )
145153
146154 def __process_paths (self , ftp ):
147155 for path in self .paths :
@@ -153,6 +161,7 @@ def __process_paths(self, ftp):
153161
154162 if '*' in path .path :
155163 path .find_files (ftp )
164+ self .__log .trace ("Found %d for pattern %s" , len (path .files ), path .path )
156165
157166 for file in path .files :
158167 current_hash = file .get_current_hash (ftp )
@@ -161,6 +170,7 @@ def __process_paths(self, ftp):
161170 file .set_new_hash (current_hash )
162171
163172 handle_stream = io .BytesIO ()
173+ self .__log .trace ("Retrieving file %s..." , file .path_to_file )
164174
165175 ftp .retrbinary ('RETR ' + file .path_to_file , handle_stream .write )
166176
@@ -174,6 +184,8 @@ def __process_paths(self, ftp):
174184
175185 convert_conf = {'file_ext' : file .path_to_file .split ('.' )[- 1 ]}
176186
187+ self .__log .trace ("Processing data from %s file" , file .path_to_file )
188+
177189 if convert_conf ['file_ext' ] == 'json' :
178190 json_data = simplejson .loads (handled_str )
179191 if isinstance (json_data , list ):
@@ -233,10 +245,13 @@ def __send_data(self, converted_data: ConvertedData):
233245 converted_data .attributes_datapoints_count > 0 )):
234246 self .__gateway .send_to_storage (self .name , self .get_id (), converted_data )
235247 self .statistics ['MessagesSent' ] = self .statistics ['MessagesSent' ] + 1
236- self .__log .debug ("Data to ThingsBoard: %s" , converted_data )
248+ self .__log .debug ("Data being sent to ThingsBoard: %s" , converted_data )
237249
238250 def close (self ):
239251 self .__stopped = True
252+ for path in self .paths :
253+ for file in path .files :
254+ file ._hash = None
240255 self .__log .info ('FTP Connector stopped.' )
241256 self .__log .stop ()
242257
@@ -269,19 +284,21 @@ def __is_json(data):
269284
270285 @CollectAllReceivedBytesStatistics (start_stat_type = 'allReceivedBytesFromTB' )
271286 def on_attributes_update (self , content ):
287+ self .__log .debug ("Processing an attribute update from %r" , content )
272288 try :
273289 for attribute_request in self .__attribute_updates :
274290 if fullmatch (attribute_request ["deviceNameFilter" ], content ["device" ]):
275291 attribute_key , attribute_value = content ['data' ].popitem ()
292+ self .__log .info ("Found an attribute key for %s and value %s" , attribute_key , attribute_value )
276293
277294 path_str = attribute_request ['path' ].replace ('${attributeKey}' , attribute_key ).replace (
278295 '${attributeValue}' , attribute_value )
279296 path = Path (path = path_str , device_name = content ['device' ], attributes = [], telemetry = [],
280297 delimiter = ',' , txt_file_data_view = '' )
281298
282- data_expression = attribute_request ['valueExpression' ]. replace ( '${attributeKey}' ,
283- attribute_key ) .replace (
284- '${attributeValue}' , attribute_value )
299+ data_expression = ( attribute_request ['valueExpression' ]
300+ .replace ('${attributeKey}' , attribute_key )
301+ . replace ( '${attributeValue}' , attribute_value ) )
285302
286303 with self .__ftp () as ftp :
287304 self .__connect (ftp )
@@ -294,12 +311,14 @@ def on_attributes_update(self, content):
294311 ftp .storbinary ('STOR ' + file .path_to_file , io_stream )
295312 io_stream .close ()
296313 else :
297- self .__log .error ('Invalid json data' )
314+ self .__log .error ("Invalid json data in attribute update %s" , json_data )
315+ self .__log .debug ("Error:" , exc_info = True )
298316 else :
299317 if attribute_request ['writingMode' ] == 'OVERRIDE' :
300318 io_stream = self ._get_io_stream (data_expression )
301319 ftp .storbinary ('STOR ' + file .path_to_file , io_stream )
302320 io_stream .close ()
321+ self .__log .info ("Successfully process attribute update %s on override request" , attribute_key )
303322 else :
304323 handle_stream = io .BytesIO ()
305324 ftp .retrbinary ('RETR ' + file .path_to_file , handle_stream .write )
@@ -309,9 +328,11 @@ def on_attributes_update(self, content):
309328 io_stream = io .BytesIO (str .encode (str (converted_data + '\n ' + data_expression )))
310329 ftp .storbinary ('STOR ' + file .path_to_file , io_stream )
311330 io_stream .close ()
331+ self .__log .info ("Successfully process attribute update for %s" , attribute_key )
312332
313333 except Exception as e :
314- self .__log .exception (e )
334+ self .__log .error ("Failed to process attribute update with error %r" , str (e ))
335+ self .__log .debug ("Error:" , exc_info = e )
315336
316337 @CollectAllReceivedBytesStatistics ('allBytesSentToDevices' )
317338 def _get_io_stream (self , data_expression ):
@@ -324,7 +345,7 @@ def __fill_rpc_requests(self):
324345 @CollectAllReceivedBytesStatistics (start_stat_type = 'allReceivedBytesFromTB' )
325346 def server_side_rpc_handler (self , content ):
326347 try :
327- self .__log .info ( "Incoming server-side RPC: %s" , content )
348+ self .__log .debug ( "Handling incoming server-side RPC with %s" , content )
328349
329350 if content .get ('data' ) is None :
330351 content ['data' ] = {'params' : content ['params' ], 'method' : content ['method' ], 'id' : content ['id' ]}
@@ -363,6 +384,7 @@ def server_side_rpc_handler(self, content):
363384
364385 converted_data , success_sent = self .__process_rpc (rpc_method , value_expression )
365386 self .__send_rpc_reply ({}, content , converted_data , success_sent )
387+ self .__log .info ("Successfully sent RPC request to FTP for %s rpc method" , rpc_method )
366388 return
367389
368390 for rpc_request in self .__rpc_requests :
@@ -371,10 +393,15 @@ def server_side_rpc_handler(self, content):
371393 converted_data , success_sent = self .__process_rpc (rpc_method , value_expression )
372394
373395 self .__send_rpc_reply (rpc_request , content , converted_data , success_sent )
396+ self .__log .info ("Successfully sent RPC request to FTP for %s rpc method" , rpc_method )
397+
374398 except Exception as e :
375- self .__log .exception (e )
399+ self .__log .error (
400+ "Failed to perform incoming server side RPC for content %s and rpc method due to %r" , str (e ))
401+ self .__log .debug ("Error:" , exc_info = e )
376402
377403 def __process_rpc (self , method , value_expression ):
404+ self .__log .info ("Called __process_rpc, %r %r" , method , value_expression )
378405 with self .__ftp () as ftp :
379406 if not self ._connected or not ftp .sock :
380407 self .__connect (ftp )
@@ -389,7 +416,8 @@ def __process_rpc(self, method, value_expression):
389416 io_stream .close ()
390417 success_sent = True
391418 except Exception as e :
392- self .__log .error (e )
419+ self .__log .error ("Can not process for method write due to %r" , str (e ))
420+ self .__log .debug ("Error:" , exc_info = e )
393421 converted_data = '{"error": "' + str (e ) + '"}'
394422 else :
395423 handle_stream = io .BytesIO ()
@@ -411,4 +439,3 @@ def __send_rpc_reply(self, rpc_request, content, converted_data, success_sent):
411439
412440 def get_config (self ):
413441 return self .config
414-
0 commit comments