-
Notifications
You must be signed in to change notification settings - Fork 968
Description
Connector name (If bug in the some connector):
Not connector-specific. The issue is in the SQLite event storage layer and affects all connectors. Observed with BACnet Connector.
Describe the bug
When the gateway is configured with SQLite event storage ("type": "sqlite"), duplicate telemetry entries are sent to ThingsBoard after the gateway has been running for some time. Every telemetry reading appears twice on the platform with timestamps 1–10 ms apart. Switching to "type": "memory" eliminates the issue completely.
After reviewing the source code (tag 3.8.2), we believe the root cause is a race condition between the Database thread's pre-fetch mechanism and the record deletion cycle in the SQLite storage.
Steps to Reproduce
- Configure the gateway with SQLite event storage (
"type": "sqlite") - Add a BACnet connector (or any connector) with multiple devices
- Let the gateway run for an extended period (hours)
- Check telemetry on the ThingsBoard side
- Duplicate entries appear with timestamps 1–10 ms apart and identical values
- Switch storage to
"type": "memory"— duplicates stop immediately
Error traceback (If available):
No crash or exception. The gateway keeps running normally while silently producing duplicates. The following log pattern repeats indefinitely when the issue is active:
INFO - __add_device - Device SCIBRA_F-2_CWP_EM1 connected to platform
...
DEBUG - indication_callback - Device SCIBRA_F-2_CWP_EM1 already added
DEBUG - indication_callback - Device SCIBRA_F-2_CWP_EM1 already added
DEBUG - indication_callback - Device SCIBRA_F-2_CWP_EM1 already added
We also occasionally see:
DEBUG - Connector with name bacnet-1 not found! probably disabled, device ... removed from saved devices
Versions (please complete the following information):
- OS: Debian (Docker container, official image)
- ThingsBoard IoT Gateway version: 3.8.2
- Python version: 3.13
- ThingsBoard: CE
Additional context
After reading the source code, we suspect the following areas may be involved:
1. Pre-fetch cache may return stale data
The Database thread pre-fetches the next batch into __next_batch as soon as can_prepare_new_batch() is called (database.py, lines 116–119):
if self.__should_read:
if self.__can_prepare_new_batch and not self.__next_batch:
self.__next_batch = self.read_data() # pre-fetches from DB
self.__can_prepare_new_batch = FalseAnd read_data() returns the cached batch without querying the database (database.py, lines 230–231):
if self.__next_batch:
return self.__next_batch # returns cached dataBut can_prepare_new_batch() is called inside get_event_pack() — before event_pack_processing_done() deletes the records (sqlite_event_storage.py, lines 266–287):
def get_event_pack(self):
data_from_storage = self.read_data() # reads batch
# ...
self.__read_database.can_prepare_new_batch() # unlocks pre-fetch HERE
return event_pack_messagesWhile the deletion only happens later in event_pack_processing_done() (sqlite_event_storage.py, lines 252–253):
def event_pack_processing_done(self):
if not self.stopped.is_set():
self.delete_data(self.delete_time_point) # DELETE happens HEREThis creates a window where the DB thread pre-fetches the same records that haven't been deleted yet. On the next get_event_pack(), the stale cache is served and the same data is sent again.
With memory storage this doesn't happen because Queue.get_nowait() is a destructive read — data is gone once consumed (memory_event_storage.py, lines 47–49):
self.__event_pack = [self.__events_queue.get_nowait() for _ in
range(min(self.__events_per_time, self.__events_queue.qsize()))]2. Several code paths skip event_pack_processing_done()
In tb_gateway_service.py (lines 1536–1559), data is sent via MQTT before the confirmation step. If confirmation fails, records persist in SQLite and get re-sent:
self.__send_data(devices_data_in_event_pack) # data SENT via MQTT
if self.tb_client.is_connected() and (...):
success = self.__handle_published_events()
if success and self.tb_client.is_connected():
self._event_storage.event_pack_processing_done() # only called here
else:
continue # SKIPS deletionAdditionally, __handle_published_events() returns False when _published_events is empty (tb_gateway_service.py, lines 1571–1572), which can happen due to timing between the send and confirmation threads:
if not events:
return False # prevents event_pack_processing_done from being called3. Duplicate detector is disabled
We noticed the old duplicate_detector has been deprecated and commented out (tb_gateway_service.py, line 1156):
# Duplicate detector is deprecated!
# if isinstance(data, dict):
# filtered_data = self.__duplicate_detector.filter_data(connector_name, data)There's no active deduplication mechanism to catch these cases.