-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathTLSServer.py
More file actions
3287 lines (2797 loc) · 163 KB
/
TLSServer.py
File metadata and controls
3287 lines (2797 loc) · 163 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import asyncio
import json
import logging
import os
import ssl
from datetime import datetime, timezone, timedelta
from typing import Any, Dict, Set
import bssci_config
import messages
from protocol import decode_messages, encode_message
logger = logging.getLogger(__name__)
IDENTIFIER = bytes("MIOTYB01", "utf-8")
class TLSServer:
def __init__(
self,
sensor_config_file: str,
mqtt_out_queue: asyncio.Queue[dict[str, str]],
mqtt_in_queue: asyncio.Queue[dict[str, str]],
) -> None:
self.bs_op_ids: Dict[asyncio.streams.StreamWriter, int] = {}
self.mqtt_out_queue = mqtt_out_queue
self.mqtt_in_queue = mqtt_in_queue
self.connected_base_stations: Dict[
asyncio.streams.StreamWriter, str
] = {}
self.connecting_base_stations: Dict[
asyncio.streams.StreamWriter, str
] = {}
self.sensor_config_file = sensor_config_file
# EUI -> {status, base_stations: [], timestamp}
self.registered_sensors: Dict[str, Dict[str, Any]] = {}
# opID -> {sensor_eui, timestamp, base_station}
self.pending_attach_requests: Dict[int, Dict[str, Any]] = {}
# Track if status request task is running
self._status_task_running = False
# Deduplication variables
# message_key -> {message, timestamp, snr, bs_eui}
self.deduplication_buffer: Dict[str, Dict[str, Any]] = {}
self.deduplication_delay = bssci_config.DEDUPLICATION_DELAY
self.deduplication_stats = {
'total_messages': 0,
'duplicate_messages': 0,
'published_messages': 0
}
# Traffic metrics for visualization
self.traffic_metrics = {
'messages_in': 0, # Total messages received from base stations
'messages_out': 0, # Total messages sent to MQTT
'messages_dropped': 0, # Messages filtered by deduplication
'bytes_in': 0, # Total bytes received
'bytes_out': 0, # Total bytes sent to MQTT
'vm_messages': 0, # VM sub-channel messages
'attach_requests': 0, # Attach requests sent
'detach_requests': 0, # Detach requests sent
'status_requests': 0, # Status requests sent
'start_time': datetime.now(timezone.utc).timestamp()
}
# Time-series data for charts (last 60 minutes, 1-minute resolution)
self.traffic_history: list = []
self._last_history_update = 0
# Track active sensors per hour (sensors that sent data)
self.active_sensors_hourly: set = set()
self._current_hour = datetime.now(timezone.utc).hour
self._last_hourly_active_count = 0
# Base station health data (eui -> {cpu, temperature, ...})
self.base_station_health: Dict[str, dict] = {}
# Sensor packet tracking for packet loss detection
# eui -> {last_packet_cnt, packets_received, packets_lost, snr_sum, snr_count}
self.sensor_packet_stats: Dict[str, Dict[str, Any]] = {}
# SNR/RSSI history for graphs (last 288 data points, 5 min intervals = 24 hours)
self.snr_rssi_history: list = []
self._last_snr_history_update = 0
# Per-sensor heartbeat tracking for online/offline status
# eui -> {avg_interval, last_seen, state, offline_since, warning_active, message_timestamps, last_state_change}
self.sensor_heartbeat: Dict[str, Dict[str, Any]] = {}
self.HEARTBEAT_OFFLINE_MULTIPLIER = 4
self.HEARTBEAT_DEFAULT_INTERVAL = 600
self.HEARTBEAT_CHECK_INTERVAL = 30
# Auto-detach variables
# eui -> timestamp of last message
self.sensor_last_seen: Dict[str, float] = {}
# eui -> whether warning was sent
self.sensor_warning_sent: Dict[str, bool] = {}
# Status request failure tracking: writer -> consecutive failure count
self.bs_status_failures: Dict[Any, int] = {}
self.BS_STATUS_FAILURE_THRESHOLD = 3
# Network topology tracking: which base stations receive which sensors
# sensor_eui -> {primary_bs: str, receiving_bases: {bs_eui: {snr, rssi, last_seen, count}}}
self.sensor_topology: Dict[str, Dict[str, Any]] = {}
# Variable MAC (VM) Sub-Channel tracking
# eui -> {active: bool, vm_channel: int, activated_at: timestamp, bs_eui: str}
self.vm_active_sensors: Dict[str, Dict[str, Any]] = {}
# Pending VM operations: opID -> {eui, operation, timestamp}
self.pending_vm_operations: Dict[int, Dict[str, Any]] = {}
# OMS Meter tracking for VM uplink data (WMBUS/wireless M-Bus meters)
# meter_id -> {eui, snr, rssi, data, timestamp, bs_eui, message_count}
self.oms_meters: Dict[str, Dict[str, Any]] = {}
# VM log for tracking VM operations (activate, deactivate, status, etc.)
# List of {timestamp, event, details, bs_eui}
self.vm_log: list = []
self._max_vm_log_entries = 100
# Track which base stations support VM (Variable MAC)
# Base stations that have successfully responded to VM commands
self.vm_capable_base_stations: Set[str] = set()
# VM periodic status query settings
self.vm_periodic_status_enabled = True # Enable by default
self.vm_periodic_status_interval = 60 # Query every 60 seconds
# Start the deduplication task
asyncio.create_task(self.process_deduplication_buffer())
# Start periodic VM status query
asyncio.create_task(self.periodic_vm_status_query())
# Start auto-detach monitoring if enabled
if getattr(bssci_config, 'AUTO_DETACH_ENABLED', True):
asyncio.create_task(self.auto_detach_monitor())
# Start heartbeat monitor for online/offline tracking
asyncio.create_task(self.heartbeat_monitor())
try:
with open(sensor_config_file, "r") as f:
self.sensor_config = json.load(f)
except Exception:
self.sensor_config = []
# Add queue logging
logger.info("🔍 TLS Server Queue Assignment:")
logger.info(f" mqtt_out_queue ID: {id(self.mqtt_out_queue)}")
logger.info(f" mqtt_in_queue ID: {id(self.mqtt_in_queue)}")
def _get_next_op_id(self, writer: asyncio.streams.StreamWriter) -> int:
"""Get the next sequential operation ID for a specific base station connection.
SC-initiated operations use negative opIds (BSSCI convention:
BS uses positive/0, SC uses negative)."""
op_id = self.bs_op_ids.get(writer, -1)
self.bs_op_ids[writer] = op_id - 1
return op_id
def _get_local_time(self) -> str:
"""Get current time in configured timezone"""
try:
import zoneinfo
tz = zoneinfo.ZoneInfo(bssci_config.TIMEZONE)
local_time = datetime.now(tz)
except Exception:
# Fallback to UTC+1 (CET) if timezone not available
utc_time = datetime.now(timezone.utc)
local_time = utc_time + timedelta(hours=1)
return local_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
async def start_server(self) -> None:
logger.info("🔐 Setting up SSL/TLS context for BSSCI server...")
logger.info(f" Certificate file: {bssci_config.CERT_FILE}")
logger.info(f" Key file: {bssci_config.KEY_FILE}")
logger.info(f" CA file: {bssci_config.CA_FILE}")
try:
ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_ctx.load_cert_chain(
certfile=bssci_config.CERT_FILE, keyfile=bssci_config.KEY_FILE
)
ssl_ctx.load_verify_locations(cafile=bssci_config.CA_FILE)
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
# Log SSL context details
logger.info(f" TLS Protocol versions: {ssl_ctx.minimum_version.name} - {ssl_ctx.maximum_version.name}")
logger.info("✓ SSL context configured successfully with client certificate verification")
except FileNotFoundError as e:
logger.error(f"❌ SSL certificate file not found: {e}")
raise
except ssl.SSLError as e:
logger.error(f"❌ SSL configuration error: {e}")
raise
except Exception as e:
logger.error(f"❌ Unexpected error setting up SSL: {e}")
raise
logger.info("🚀 Starting BSSCI TLS server...")
logger.info(f" Listen address: {bssci_config.LISTEN_HOST}:{bssci_config.LISTEN_PORT}")
logger.info(f" Sensor config file: {self.sensor_config_file}")
logger.info(f" Loaded sensors: {len(self.sensor_config)}")
server = await asyncio.start_server(
self.handle_client,
bssci_config.LISTEN_HOST,
bssci_config.LISTEN_PORT,
ssl=ssl_ctx,
)
logger.info("📨 Starting MQTT queue watcher task...")
asyncio.create_task(self.queue_watcher())
logger.info("✓ BSSCI TLS Server is ready and listening for base station connections")
async with server:
await server.serve_forever()
async def send_attach_request(
self, writer: asyncio.streams.StreamWriter, sensor: dict[str, Any]
) -> None:
bs_eui = self.connected_base_stations.get(writer, "unknown")
try:
op_id = self._get_next_op_id(writer)
logger.info("📤 BSSCI ATTACH REQUEST INITIATED")
logger.info(" =====================================")
logger.info(f" Sensor EUI: {sensor['eui']}")
logger.info(f" Target Base Station: {bs_eui}")
logger.info(f" Operation ID: {op_id}")
logger.info(f" Timestamp: {self._get_local_time()}")
# Comprehensive validation with detailed logging
validation_errors = []
validation_warnings = []
# EUI validation
if len(sensor["eui"]) != 16:
validation_errors.append(f"EUI length {len(sensor['eui'])} != 16 characters")
else:
try:
int(sensor["eui"], 16) # Test hex validity
logger.info(f" ✓ EUI format valid: {sensor['eui']}")
except ValueError:
validation_errors.append(f"EUI contains invalid hex characters: {sensor['eui']}")
# Network Key validation and normalization
original_nw_key = sensor["nwKey"]
nw_key = original_nw_key[:32] if len(original_nw_key) >= 32 else original_nw_key
if len(original_nw_key) != 32:
if len(original_nw_key) > 32:
validation_warnings.append(f"Network key truncated from {len(original_nw_key)} to 32 characters")
logger.warning(f" ⚠️ Network key too long, truncating: {original_nw_key} -> {nw_key}")
else:
validation_errors.append(f"Network key length {len(original_nw_key)} < 32 characters required")
else:
try:
int(nw_key, 16) # Test hex validity
logger.info(f" ✓ Network key format valid: {nw_key[:8]}...{nw_key[-8:]}")
except ValueError:
validation_errors.append(f"Network key contains invalid hex characters: {nw_key}")
# Short Address validation
if len(sensor["shortAddr"]) != 4:
validation_errors.append(f"Short address length {len(sensor['shortAddr'])} != 4 characters")
else:
try:
int(sensor["shortAddr"], 16) # Test hex validity
logger.info(f" ✓ Short address format valid: {sensor['shortAddr']}")
except ValueError:
validation_errors.append(f"Short address contains invalid hex characters: {sensor['shortAddr']}")
# Bidirectional flag validation
bidi_value = sensor.get("bidi", False)
logger.info(f" ✓ Bidirectional flag: {bidi_value}")
# Check for existing registrations to this base station
eui_upper = sensor["eui"].upper()
if eui_upper in self.registered_sensors:
reg_info = self.registered_sensors[eui_upper]
if reg_info.get('status') == 'registered':
existing_bases = reg_info.get('base_stations', [])
if bs_eui in existing_bases:
validation_warnings.append(f"Sensor {sensor['eui']} already registered to base station {bs_eui}")
logger.warning(f" ⚠️ Re-registering sensor to same base station")
else:
validation_warnings.append(f"Sensor {sensor['eui']} already registered to {len(existing_bases)} other base station(s): {existing_bases}")
logger.warning(f" ⚠️ Adding registration to additional base station")
# Log all warnings
for warning in validation_warnings:
logger.warning(f" ⚠️ {warning}")
if not validation_errors:
logger.info(f" ✅ All validations passed")
logger.info(f" 📋 Final parameters:")
logger.info(f" EUI: {sensor['eui']}")
logger.info(f" Network Key: {nw_key[:8]}...{nw_key[-8:]}")
logger.info(f" Short Address: {sensor['shortAddr']}")
logger.info(f" Bidirectional: {bidi_value}")
# Use normalized sensor data
normalized_sensor = {
"eui": sensor["eui"].upper(),
"nwKey": nw_key,
"shortAddr": sensor["shortAddr"],
"bidi": bidi_value
}
# Build and encode the message
attach_message = messages.build_attach_request(normalized_sensor, op_id)
logger.debug(f" 📝 Built attach message: {attach_message}")
msg_pack = encode_message(attach_message)
full_message = IDENTIFIER + len(msg_pack).to_bytes(4, byteorder="little") + msg_pack
logger.info(f" 📤 Transmitting attach request...")
logger.info(f" Message size: {len(full_message)} bytes")
logger.info(f" Payload size: {len(msg_pack)} bytes")
writer.write(full_message)
await writer.drain()
self.traffic_metrics['attach_requests'] += 1
# Track this attach request for correlation with response
self.pending_attach_requests[op_id] = {
'sensor_eui': sensor['eui'],
'timestamp': asyncio.get_event_loop().time(),
'base_station': bs_eui,
'sensor_config': normalized_sensor
}
logger.info(f"✅ BSSCI ATTACH REQUEST TRANSMITTED")
logger.info(f" Operation ID {op_id} sent to base station {bs_eui}")
logger.info(f" Tracking request for correlation with response")
logger.info(f" Awaiting response from base station...")
logger.info(" =====================================")
else:
logger.error(f"❌ ATTACH REQUEST VALIDATION FAILED")
logger.error(f" Sensor EUI: {sensor.get('eui', 'unknown')}")
logger.error(f" Base Station: {bs_eui}")
logger.error(f" Validation errors found:")
for i, error in enumerate(validation_errors, 1):
logger.error(f" {i}. {error}")
logger.error(f" ❌ Attach request NOT sent due to validation failures")
logger.error(f" =====================================")
except Exception as e:
logger.error(f"❌ CRITICAL ERROR during attach request preparation")
logger.error(f" Sensor EUI: {sensor.get('eui', 'unknown')}")
logger.error(f" Base Station: {bs_eui}")
logger.error(f" Exception type: {type(e).__name__}")
logger.error(f" Exception message: {str(e)}")
import traceback
logger.error(f" Full traceback:")
for line in traceback.format_exc().strip().split('\n'):
logger.error(f" {line}")
logger.error(f" =====================================")
raise # Re-raise to handle upstream
async def attach_file(self, writer: asyncio.streams.StreamWriter) -> None:
bs_eui = self.connected_base_stations.get(writer, "unknown")
logger.info(f"🔗 BATCH SENSOR ATTACHMENT started for base station {bs_eui}")
logger.info(f" Total sensors to process: {len(self.sensor_config)}")
successful_attachments = 0
failed_attachments = 0
skipped_attachments = 0
for i, sensor in enumerate(self.sensor_config, 1):
try:
if writer not in self.connected_base_stations:
logger.warning(f" ⚠️ Writer no longer in connected base stations - aborting batch attach for {bs_eui}")
break
eui_upper = sensor['eui'].upper()
if eui_upper in self.registered_sensors:
reg_info = self.registered_sensors[eui_upper]
if reg_info.get('status') == 'registered' and bs_eui in reg_info.get('base_stations', []):
skipped_attachments += 1
logger.debug(f" ⏭️ Sensor {sensor['eui']} already attached to {bs_eui} - skipping")
continue
logger.info(f" Processing sensor {i}/{len(self.sensor_config)}: {sensor['eui']}")
await self.send_attach_request(writer, sensor)
successful_attachments += 1
await asyncio.sleep(0.1)
except (ConnectionResetError, ConnectionError, BrokenPipeError, AttributeError) as e:
logger.warning(f" 🔌 Connection lost to {bs_eui} during batch attach - aborting remaining sensors")
logger.warning(f" Last sensor attempted: {sensor.get('eui', 'unknown')}, error: {e}")
failed_attachments += len(self.sensor_config) - i
break
except Exception as e:
failed_attachments += 1
logger.error(f" ❌ Failed to attach sensor {sensor.get('eui', 'unknown')}: {e}")
logger.error(f" Exception type: {type(e).__name__}")
logger.info(f"✅ BATCH SENSOR ATTACHMENT completed for base station {bs_eui}")
logger.info(f" Successful: {successful_attachments}")
logger.info(f" Skipped (already attached): {skipped_attachments}")
logger.info(f" Failed: {failed_attachments}")
logger.info(f" Total processed: {len(self.sensor_config)}")
if failed_attachments > 0:
logger.warning(f" ⚠️ {failed_attachments} sensors failed to attach - check individual sensor logs above")
async def send_detach_request(self, writer: asyncio.streams.StreamWriter, sensor_eui: str) -> bool:
"""Send detach request for a specific sensor"""
bs_eui = self.connected_base_stations.get(writer, "unknown")
logger.info(f"🔌 DETACHING SENSOR from base station {bs_eui}")
logger.info(f" Sensor EUI: {sensor_eui}")
try:
# Ensure EUI is properly formatted (16 hex characters)
clean_eui = sensor_eui.upper().replace(":", "").replace("-", "")
if len(clean_eui) != 16:
logger.error(f"❌ Invalid EUI format: {sensor_eui} (should be 16 hex characters)")
return False
# Build and encode the detach message
op_id = self._get_next_op_id(writer)
detach_message = messages.build_detach_request(clean_eui, op_id)
logger.debug(f" 📝 Built detach message: {detach_message}")
msg_pack = encode_message(detach_message)
full_message = IDENTIFIER + len(msg_pack).to_bytes(4, byteorder="little") + msg_pack
logger.info(f" 📤 Transmitting detach request...")
logger.info(f" Message size: {len(full_message)} bytes")
writer.write(full_message)
await writer.drain()
self.traffic_metrics['detach_requests'] += 1
# Remove from registered sensors
eui_key = sensor_eui.upper()
if eui_key in self.registered_sensors:
# Remove this base station from the sensor's list
if 'base_stations' in self.registered_sensors[eui_key]:
self.registered_sensors[eui_key]['base_stations'] = [
bs for bs in self.registered_sensors[eui_key]['base_stations']
if bs['base_station_eui'] != bs_eui
]
# If no base stations left, mark as not registered
if not self.registered_sensors[eui_key]['base_stations']:
self.registered_sensors[eui_key]['registered'] = False
logger.info(f" ✅ Sensor {sensor_eui} fully detached from all base stations")
else:
logger.info(f" ✅ Sensor {sensor_eui} detached from {bs_eui}, still connected to {len(self.registered_sensors[eui_key]['base_stations'])} other base stations")
# Notify via MQTT
if self.mqtt_out_queue:
detach_notification = {
"topic": f"ep/{sensor_eui.upper()}/status",
"payload": json.dumps({
"action": "detached",
"sensor_eui": sensor_eui,
"base_station_eui": bs_eui,
"timestamp": asyncio.get_event_loop().time()
})
}
await self.mqtt_out_queue.put(detach_notification)
logger.info(f"✅ DETACH REQUEST sent for sensor {sensor_eui}")
return True
except Exception as e:
logger.error(f"❌ CRITICAL ERROR during detach request")
logger.error(f" Sensor EUI: {sensor_eui}")
logger.error(f" Base Station: {bs_eui}")
logger.error(f" Exception: {e}")
return False
async def detach_sensor(self, sensor_eui: str) -> bool:
"""Detach a sensor from all connected base stations"""
logger.info(f"🔌 DETACHING SENSOR {sensor_eui} from ALL base stations")
success_count = 0
total_count = len(self.connected_base_stations)
for writer in list(self.connected_base_stations.keys()):
try:
success = await self.send_detach_request(writer, sensor_eui)
if success:
success_count += 1
await asyncio.sleep(0.1) # Small delay between requests
except Exception as e:
logger.error(f" ❌ Failed to detach sensor {sensor_eui} from base station: {e}")
logger.info(f"✅ SENSOR DETACH completed for {sensor_eui}")
logger.info(f" Successful: {success_count}/{total_count} base stations")
return success_count > 0
async def detach_all_sensors(self) -> int:
"""Detach all sensors from all base stations"""
logger.info(f"🔌 DETACHING ALL SENSORS from all base stations")
# Get list of all registered sensors
registered_euis = [eui for eui in self.registered_sensors.keys()
if not eui.endswith('_failure') and self.registered_sensors[eui].get('registered', False)]
logger.info(f" Total registered sensors to detach: {len(registered_euis)}")
detached_count = 0
for sensor_eui in registered_euis:
try:
success = await self.detach_sensor(sensor_eui)
if success:
detached_count += 1
await asyncio.sleep(0.2) # Small delay between sensors
except Exception as e:
logger.error(f" ❌ Failed to detach sensor {sensor_eui}: {e}")
logger.info(f"✅ BULK DETACH completed")
logger.info(f" Successfully detached: {detached_count}/{len(registered_euis)} sensors")
return detached_count
def clear_all_sensors(self) -> None:
"""Clear all sensor configurations and registrations"""
logger.info(f"🗑️ CLEARING ALL SENSOR CONFIGURATIONS")
# Clear sensor config
old_count = len(self.sensor_config)
self.sensor_config = []
# Clear registered sensors
old_registered = len([k for k in self.registered_sensors.keys() if not k.endswith('_failure')])
self.registered_sensors.clear()
# Clear pending requests
self.pending_attach_requests.clear()
logger.info(f"✅ ALL SENSORS CLEARED")
logger.info(f" Configurations removed: {old_count}")
logger.info(f" Registrations removed: {old_registered}")
def detach_sensor_sync(self, sensor_eui: str) -> bool:
"""Synchronous wrapper for detaching a sensor from all connected base stations"""
try:
logger.info(f"🔌 SYNC DETACHING SENSOR {sensor_eui} from ALL base stations")
success_count = 0
total_count = len(self.connected_base_stations)
# Remove from registered sensors immediately
eui_key = sensor_eui.upper()
if eui_key in self.registered_sensors:
self.registered_sensors[eui_key]['registered'] = False
self.registered_sensors[eui_key]['base_stations'] = []
logger.info(f" ✅ Sensor {sensor_eui} marked as detached in local registry")
success_count = total_count # Consider it successful if we can update local state
logger.info(f"✅ SYNC SENSOR DETACH completed for {sensor_eui}")
logger.info(f" Local detach: {success_count}/{total_count} base stations")
return success_count > 0
except Exception as e:
logger.error(f"❌ Error in sync detach for {sensor_eui}: {e}")
return False
def detach_all_sensors_sync(self) -> int:
"""Synchronous wrapper for detaching all sensors from all base stations"""
try:
logger.info(f"🔌 SYNC DETACHING ALL SENSORS from all base stations")
# Get list of all registered sensors
registered_euis = [eui for eui in self.registered_sensors.keys()
if not eui.endswith('_failure') and self.registered_sensors[eui].get('registered', False)]
logger.info(f" Total registered sensors to detach: {len(registered_euis)}")
detached_count = 0
for sensor_eui in registered_euis:
try:
success = self.detach_sensor_sync(sensor_eui)
if success:
detached_count += 1
except Exception as e:
logger.error(f" ❌ Failed to sync detach sensor {sensor_eui}: {e}")
logger.info(f"✅ SYNC BULK DETACH completed")
logger.info(f" Successfully detached: {detached_count}/{len(registered_euis)} sensors")
return detached_count
except Exception as e:
logger.error(f"❌ Error in sync detach all: {e}")
return 0
def attach_sensor_sync(self, sensor_eui: str) -> int:
"""Synchronous wrapper for attaching a sensor to all connected base stations"""
try:
logger.info(f"🔗 SYNC ATTACHING SENSOR {sensor_eui} to ALL base stations")
if not self.connected_base_stations:
logger.warning(" ⚠️ No base stations connected")
return 0
# Find sensor in configuration
sensor_config = None
for sensor in self.sensor_config:
if sensor['eui'].upper() == sensor_eui.upper():
sensor_config = sensor
break
if not sensor_config:
logger.error(f" ❌ Sensor {sensor_eui} not found in configuration")
return 0
logger.info(f" Found sensor config: {sensor_config['eui']}")
logger.info(f" Target base stations: {len(self.connected_base_stations)}")
# Create new event loop for sync call
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
success_count = 0
try:
async def send_attaches():
nonlocal success_count
for writer in list(self.connected_base_stations.keys()):
try:
await self.send_attach_request(writer, sensor_config)
success_count += 1
logger.info(f" ✅ Attach request sent to {self.connected_base_stations[writer]}")
except Exception as e:
logger.error(f" ❌ Failed to send attach to {self.connected_base_stations.get(writer, 'unknown')}: {e}")
loop.run_until_complete(send_attaches())
finally:
loop.close()
logger.info(f"✅ SYNC SENSOR ATTACH completed for {sensor_eui}")
logger.info(f" Successful attachments: {success_count}/{len(self.connected_base_stations)} base stations")
return success_count
except Exception as e:
logger.error(f"❌ Error in sync attach for {sensor_eui}: {e}")
return 0
def attach_all_sensors_sync(self) -> int:
"""Synchronous wrapper for attaching all sensors to all connected base stations"""
try:
logger.info(f"🔗 SYNC ATTACHING ALL SENSORS to all base stations")
if not self.connected_base_stations:
logger.warning(" ⚠️ No base stations connected")
return 0
if not self.sensor_config:
logger.warning(" ⚠️ No sensors configured")
return 0
logger.info(f" Total sensors to attach: {len(self.sensor_config)}")
logger.info(f" Target base stations: {len(self.connected_base_stations)}")
# Create new event loop for sync call
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
total_attachments = 0
try:
async def send_all_attaches():
nonlocal total_attachments
for sensor in self.sensor_config:
for writer in list(self.connected_base_stations.keys()):
try:
await self.send_attach_request(writer, sensor)
total_attachments += 1
logger.info(f" ✅ Attach request sent for {sensor['eui']} to {self.connected_base_stations[writer]}")
# Small delay between requests
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f" ❌ Failed to send attach for {sensor['eui']} to {self.connected_base_stations.get(writer, 'unknown')}: {e}")
loop.run_until_complete(send_all_attaches())
finally:
loop.close()
expected_total = len(self.sensor_config) * len(self.connected_base_stations)
logger.info(f"✅ SYNC BULK ATTACH completed")
logger.info(f" Successful attachments: {total_attachments}/{expected_total} total requests")
logger.info(f" Sensors processed: {len(self.sensor_config)}")
logger.info(f" Base stations: {len(self.connected_base_stations)}")
return total_attachments
except Exception as e:
logger.error(f"❌ Error in sync attach all: {e}")
return 0
async def send_status_requests(self) -> None:
logger.info(f"📊 STATUS REQUEST TASK STARTED")
logger.info(f" Status request interval: {bssci_config.STATUS_INTERVAL} seconds")
try:
while True:
await asyncio.sleep(bssci_config.STATUS_INTERVAL)
if self.connected_base_stations:
logger.info(f"📊 PERIODIC STATUS REQUEST CYCLE STARTING")
logger.info(f" Connected base stations: {len(self.connected_base_stations)}")
logger.info(f" Base stations: {list(self.connected_base_stations.values())}")
for writer in list(self.connected_base_stations.keys()):
try:
bs_eui = self.connected_base_stations.get(writer, "unknown")
logger.info(f" 📊 Sending status request to {bs_eui}")
op_id = self._get_next_op_id(writer)
status_message = messages.build_status_request(op_id)
msg_pack = encode_message(status_message)
full_message = IDENTIFIER + len(msg_pack).to_bytes(4, byteorder="little") + msg_pack
writer.write(full_message)
await writer.drain()
self.bs_status_failures.pop(writer, None)
except Exception as e:
bs_eui = self.connected_base_stations.get(writer, "unknown")
fail_count = self.bs_status_failures.get(writer, 0) + 1
self.bs_status_failures[writer] = fail_count
logger.warning(f" ⚠️ Failed to send status to {bs_eui} ({fail_count}/{self.BS_STATUS_FAILURE_THRESHOLD}): {e}")
if fail_count >= self.BS_STATUS_FAILURE_THRESHOLD:
logger.error(f" 🔌 Base station {bs_eui} reached failure threshold ({self.BS_STATUS_FAILURE_THRESHOLD}), removing from active list")
if writer in self.connected_base_stations:
self.connected_base_stations.pop(writer)
self.bs_status_failures.pop(writer, None)
self.bs_op_ids.pop(writer, None)
try:
writer.close()
except:
pass
logger.info(f"📊 STATUS REQUEST CYCLE COMPLETED")
else:
logger.debug(f"📊 No base stations connected - skipping status requests")
except asyncio.CancelledError:
logger.info(f"📊 STATUS REQUEST TASK CANCELLED")
self._status_task_running = False
raise
except Exception as e:
logger.error(f"❌ STATUS REQUEST TASK FAILED: {e}")
self._status_task_running = False
raise
async def handle_client(
self, reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter
) -> None:
addr = writer.get_extra_info("peername")
ssl_obj = writer.get_extra_info("ssl_object")
try:
logger.info(f"🔗 New BSSCI connection attempt from {addr}")
if ssl_obj:
cert = ssl_obj.getpeercert()
if cert:
subject = cert.get('subject', [])
cn = None
for field in subject:
for name, value in field:
if name == 'commonName':
cn = value
break
logger.info(f" ✓ SSL handshake successful - Client certificate CN: {cn}")
else:
logger.warning(f" ⚠️ SSL handshake completed but no client certificate provided")
else:
logger.error(f" ❌ No SSL object found - connection may not be encrypted")
except Exception as e:
logger.error(f" ❌ SSL connection error from {addr}: {e}")
try:
writer.close()
await writer.wait_closed()
except:
pass
return
connection_start_time = asyncio.get_event_loop().time()
messages_processed = 0
try:
while True:
data = await reader.read(4096)
if not data:
break
self.traffic_metrics['bytes_in'] += len(data)
# try:
for message in decode_messages(data):
msg_type = message.get("command", "")
messages_processed += 1
logger.info(f"📨 BSSCI message #{messages_processed} received from {addr}")
logger.info(f" Message type: {msg_type}")
logger.debug(f" Full message: {message}")
if msg_type == "con":
logger.info(f"📨 BSSCI CONNECTION REQUEST received from {addr}")
logger.info(f" Operation ID: {message.get('opId', 'unknown')}")
logger.info(f" Base Station UUID: {message.get('snBsUuid', 'unknown')}")
msg = encode_message(
messages.build_connection_response(
message.get("opId", ""), message.get("snBsUuid", "")
)
)
writer.write(
IDENTIFIER + len(msg).to_bytes(4, byteorder="little") + msg
)
await writer.drain()
bs_eui = int(message["bsEui"]).to_bytes(8, byteorder="big").hex().upper()
self.connecting_base_stations[writer] = bs_eui
logger.info(f"📤 BSSCI CONNECTION RESPONSE sent to base station {bs_eui}")
logger.info(f" Base station {bs_eui} is now in connecting state")
elif msg_type == "conCmp":
logger.info(f"📨 BSSCI CONNECTION COMPLETE received from {addr}")
# Always remove from connecting first (fix for duplicate display bug)
bs_eui = self.connecting_base_stations.pop(writer, None)
if bs_eui and writer not in self.connected_base_stations:
# Deduplicate: Remove any existing connection with the same EUI
old_writers = [w for w, eui in list(self.connected_base_stations.items()) if eui == bs_eui]
for old_writer in old_writers:
logger.warning(f"🔄 REPLACING duplicate connection for base station {bs_eui}")
logger.warning(f" Closing old connection, keeping new connection from {addr}")
try:
old_writer.close()
except Exception as e:
logger.debug(f" Could not close old writer: {e}")
self.connected_base_stations.pop(old_writer, None)
self.bs_op_ids.pop(old_writer, None)
# Also remove from connecting if present there
self.connecting_base_stations.pop(old_writer, None)
self.connected_base_stations[writer] = bs_eui
self.bs_op_ids[writer] = -1
connection_time = asyncio.get_event_loop().time() - connection_start_time
logger.info(f"✅ BSSCI CONNECTION ESTABLISHED with base station {bs_eui}")
logger.info(" =====================================")
logger.info(f" Base Station EUI: {bs_eui}")
logger.info(f" Connection established at: {self._get_local_time()}")
logger.info(f" Connection setup duration: {connection_time:.2f} seconds")
logger.info(f" Client address: {addr}")
logger.info(f" Total connected base stations: {len(self.connected_base_stations)}")
logger.info(f" All connected stations: {list(self.connected_base_stations.values())}")
logger.info(f"🔗 INITIATING SENSOR ATTACHMENT PROCESS")
logger.info(f" Total sensors to attach: {len(self.sensor_config)}")
if self.sensor_config:
logger.info(f" Sensors to be attached:")
for i, sensor in enumerate(self.sensor_config, 1):
logger.info(f" {i:2d}. EUI: {sensor['eui']}, Short Addr: {sensor['shortAddr']}")
else:
logger.warning(f" ⚠️ No sensors configured for attachment")
logger.info(" =====================================")
# Start attachment process
await self.attach_file(writer)
# Always ensure status request task is running
if not hasattr(self, '_status_task_running') or not self._status_task_running:
logger.info(f"📊 Starting periodic status request task for all base stations")
self._status_task_running = True
asyncio.create_task(self.send_status_requests())
else:
logger.info(f"📊 Status request task already running, will include this base station")
else:
logger.warning(f"⚠️ Received connection complete from unknown or already connected base station")
elif msg_type == "ping":
logger.debug(f"Ping request received from {addr}")
msg_pack = encode_message(
messages.build_ping_response(message.get("opId", ""))
)
writer.write(
IDENTIFIER
+ len(msg_pack).to_bytes(4, byteorder="little")
+ msg_pack
)
await writer.drain()
elif msg_type == "pingCmp":
logger.debug(f"Ping complete received from {addr}")
elif msg_type == "statusRsp":
bs_eui = self.connected_base_stations[writer]
op_id = message.get("opId", "unknown")
cpu_raw = message['cpuLoad']
mem_raw = message['memLoad']
cpu_pct = cpu_raw if cpu_raw > 1 else cpu_raw * 100
mem_pct = mem_raw if mem_raw > 1 else mem_raw * 100
logger.info(f"📊 BASE STATION STATUS RESPONSE received from {bs_eui}")
logger.info(f" Operation ID: {op_id}")
logger.info(f" Status Code: {message['code']}")
logger.info(f" Memory Load: {mem_pct:.1f}%")
logger.info(f" CPU Load: {cpu_pct:.1f}%")
logger.info(f" Duty Cycle: {message['dutyCycle']:.1%}")
# Parse uptime to human readable format
uptime_seconds = message['uptime']
uptime_hours = uptime_seconds // 3600
uptime_minutes = (uptime_seconds % 3600) // 60
uptime_secs = uptime_seconds % 60
logger.info(f" Uptime: {uptime_hours:02d}:{uptime_minutes:02d}:{uptime_secs:02d} ({uptime_seconds}s)")
# Parse timestamp
try:
bs_time = datetime.fromtimestamp(message['time'] / 1_000_000_000)
logger.info(f" Base Station Time: {bs_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
except:
logger.info(f" Base Station Time: {message['time']} (raw)")
data_dict = {
"code": message["code"],
"memLoad": message["memLoad"],
"cpuLoad": message["cpuLoad"],
"dutyCycle": message["dutyCycle"],
"time": message["time"],
"uptime": message["uptime"],
}
temp_value = message.get("temp", None)
self.base_station_health[bs_eui.lower()] = {
"cpu": cpu_pct,
"memory": mem_pct,
"duty_cycle": message["dutyCycle"] * 100,
"uptime": message["uptime"],
"temperature": temp_value,
"last_update": datetime.now(timezone.utc).isoformat()
}
mqtt_topic = f"bs/{bs_eui.upper()}"
payload = json.dumps(data_dict)
logger.info(f"📤 MQTT PUBLICATION - BASE STATION STATUS")
logger.info(f" Topic: {bssci_config.BASE_TOPIC.rstrip('/')}/{mqtt_topic}")
logger.info(f" Base Station EUI: {bs_eui}")
logger.info(f" Payload size: {len(payload)} bytes")
logger.info(f" Status data: Code={data_dict['code']}, CPU={cpu_pct:.1f}%, Memory={mem_pct:.1f}%")
logger.info(f" Queue size before add: {self.mqtt_out_queue.qsize()}")
try:
await self.mqtt_out_queue.put(
{
"topic": mqtt_topic,
"payload": payload,
}
)
logger.info(f"✅ Base station status queued for MQTT publication")
logger.info(f" Queue size after add: {self.mqtt_out_queue.qsize()}")
except Exception as mqtt_err:
logger.error(f"❌ Failed to queue MQTT message: {mqtt_err}")
msg_pack = encode_message(
messages.build_status_complete(message.get("opId", ""))
)
writer.write(
IDENTIFIER
+ len(msg_pack).to_bytes(4, byteorder="little")
+ msg_pack
)
await writer.drain()
logger.debug(f"📤 STATUS COMPLETE sent for opID {op_id}")
elif msg_type == "attPrpRsp":
# Handle attach response according to BSSCI specification
# Per spec: attPrpRsp only contains command and opId fields
op_id = message.get("opId", "unknown")
bs_eui = self.connected_base_stations.get(writer, "unknown")
logger.info(f"📨 BSSCI ATTACH RESPONSE received from base station {bs_eui}")
logger.info(" =====================================")
logger.info(f" Operation ID: {op_id}")
logger.info(f" Raw message: {message}")
logger.info(f" Note: Per BSSCI spec, attach response contains only command and opId")
# Try to correlate with pending attach request
pending_request = self.pending_attach_requests.get(op_id)
if pending_request:
sensor_eui = pending_request['sensor_eui']
sensor_config = pending_request['sensor_config']