Skip to content

Commit 1b7f83c

Browse files
authored
BT lockless queue (#867)
* Lockless json queue for ESP32 BT to avoid SYStoMQTT concurrency issue * BT haRoomPresence handle with lockless queue
1 parent 4431af4 commit 1b7f83c

File tree

3 files changed

+201
-60
lines changed

3 files changed

+201
-60
lines changed

main/ZgatewayBT.ino

Lines changed: 185 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ FreeRTOS::Semaphore semaphoreCreateOrUpdateDevice = FreeRTOS::Semaphore("createO
4444
# include <esp_bt.h>
4545
# include <esp_bt_main.h>
4646
# include <esp_wifi.h>
47+
# include <stdatomic.h>
4748

4849
# include "soc/timer_group_reg.h"
4950
# include "soc/timer_group_struct.h"
@@ -90,9 +91,101 @@ int minRssi = abs(MinimumRSSI); //minimum rssi value
9091

9192
unsigned int scanCount = 0;
9293

94+
void pubBTMainCore(JsonObject& data, bool haPresenceEnabled = true) {
95+
if (abs((int)data["rssi"] | 0) < minRssi) {
96+
String mac_address = data["id"].as<const char*>();
97+
mac_address.replace(":", "");
98+
String mactopic = subjectBTtoMQTT + String("/") + mac_address;
99+
pub((char*)mactopic.c_str(), data);
100+
}
101+
if (haPresenceEnabled && data.containsKey("distance")) {
102+
data.remove("servicedatauuid");
103+
data.remove("servicedata");
104+
String topic = String(Base_Topic) + "home_presence/" + String(gateway_name);
105+
pub_custom_topic((char*)topic.c_str(), data, false);
106+
}
107+
}
108+
109+
class JsonBundle {
110+
public:
111+
StaticJsonBuffer<JSON_MSG_BUFFER> buffer;
112+
JsonObject* object;
113+
bool haPresence;
114+
115+
JsonObject& createObject(const char* json = NULL, bool haPresenceEnabled = true) {
116+
buffer.clear();
117+
haPresence = haPresenceEnabled;
118+
object = &(json == NULL ? buffer.createObject() : buffer.parseObject(json));
119+
return *object;
120+
}
121+
};
122+
123+
void PublishDeviceData(JsonObject& BLEdata, bool processBLEData = true);
124+
93125
# ifdef ESP32
94126
static TaskHandle_t xCoreTaskHandle;
127+
128+
atomic_int forceBTScan;
129+
130+
JsonBundle jsonBTBufferQueue[BTQueueSize];
131+
atomic_int jsonBTBufferQueueNext, jsonBTBufferQueueLast;
132+
int btQueueBlocked = 0;
133+
int btQueueLengthSum = 0;
134+
int btQueueLengthCount = 0;
135+
136+
JsonObject& getBTJsonObject(const char* json = NULL, bool haPresenceEnabled = true) {
137+
int next, last;
138+
for (bool blocked = false;;) {
139+
next = atomic_load_explicit(&jsonBTBufferQueueNext, ::memory_order_seq_cst); // use namespace std -> ambiguous error...
140+
last = atomic_load_explicit(&jsonBTBufferQueueLast, ::memory_order_seq_cst); // use namespace std -> ambiguous error...
141+
if ((2 * BTQueueSize + last - next) % (2 * BTQueueSize) != BTQueueSize) break;
142+
if (!blocked) {
143+
blocked = true;
144+
btQueueBlocked++;
145+
}
146+
delay(1);
147+
}
148+
return jsonBTBufferQueue[last % BTQueueSize].createObject(json, haPresenceEnabled);
149+
}
150+
151+
// should run from the BT core
152+
void pubBT(JsonObject& data) {
153+
int last = atomic_load_explicit(&jsonBTBufferQueueLast, ::memory_order_seq_cst);
154+
atomic_store_explicit(&jsonBTBufferQueueLast, (last + 1) % (2 * BTQueueSize), ::memory_order_seq_cst); // use namespace std -> ambiguous error...
155+
}
156+
157+
// should run from the main core
158+
void emptyBTQueue() {
159+
for (bool first = true;;) {
160+
int next = atomic_load_explicit(&jsonBTBufferQueueNext, ::memory_order_seq_cst); // use namespace std -> ambiguous error...
161+
int last = atomic_load_explicit(&jsonBTBufferQueueLast, ::memory_order_seq_cst); // use namespace std -> ambiguous error...
162+
if (last == next) break;
163+
if (first) {
164+
int diff = (2 * BTQueueSize + last - next) % (2 * BTQueueSize);
165+
btQueueLengthSum += diff;
166+
btQueueLengthCount++;
167+
first = false;
168+
}
169+
JsonBundle& bundle = jsonBTBufferQueue[next % BTQueueSize];
170+
pubBTMainCore(*bundle.object, bundle.haPresence);
171+
atomic_store_explicit(&jsonBTBufferQueueNext, (next + 1) % (2 * BTQueueSize), ::memory_order_seq_cst); // use namespace std -> ambiguous error...
172+
}
173+
}
174+
175+
# else
176+
177+
JsonBundle jsonBTBuffer;
178+
179+
JsonObject& getBTJsonObject(const char* json = NULL, bool haPresenceEnabled = true) {
180+
return jsonBTBuffer.createObject();
181+
}
182+
183+
void pubBT(JsonObject& data) {
184+
pubBTMainCore(data);
185+
}
186+
95187
# endif
188+
96189
bool ProcessLock = false; // Process lock when we want to use a critical function like OTA for example
97190

98191
BLEdevice* getDeviceByMac(const char* mac);
@@ -409,10 +502,18 @@ void INodeEMDiscovery(char* mac, char* sensorModel) {}
409502
static int taskCore = 0;
410503

411504
class MyAdvertisedDeviceCallbacks : public BLEAdvertisedDeviceCallbacks {
505+
std::string convertServiceData(std::string deviceServiceData) {
506+
int serviceDataLength = (int)deviceServiceData.length();
507+
char spr[2 * serviceDataLength + 1];
508+
for (int i = 0; i < serviceDataLength; i++) sprintf(spr + 2 * i, "%.2x", (unsigned char)deviceServiceData[i]);
509+
spr[2 * serviceDataLength] = 0;
510+
Log.trace("Converted service data (%d) to %s" CR, serviceDataLength, spr);
511+
return spr;
512+
}
513+
412514
void onResult(BLEAdvertisedDevice* advertisedDevice) {
413515
Log.trace(F("Creating BLE buffer" CR));
414-
StaticJsonBuffer<JSON_MSG_BUFFER> jsonBuffer;
415-
JsonObject& BLEdata = jsonBuffer.createObject();
516+
JsonObject& BLEdata = getBTJsonObject();
416517
String mac_adress = advertisedDevice->getAddress().toString().c_str();
417518
mac_adress.toUpperCase();
418519
BLEdata.set("id", (char*)mac_adress.c_str());
@@ -441,28 +542,47 @@ class MyAdvertisedDeviceCallbacks : public BLEAdvertisedDeviceCallbacks {
441542
int serviceDataCount = advertisedDevice->getServiceDataCount();
442543
Log.trace(F("Get services data number: %d" CR), serviceDataCount);
443544
for (int j = 0; j < serviceDataCount; j++) {
444-
std::string serviceData = advertisedDevice->getServiceData(j);
445-
int serviceDataLength = serviceData.length();
446-
String returnedString = "";
447-
for (int i = 0; i < serviceDataLength; i++) {
448-
int a = serviceData[i];
449-
if (a < 16) {
450-
returnedString += F("0");
451-
}
452-
returnedString += String(a, HEX);
453-
}
454-
char service_data[returnedString.length() + 1];
455-
returnedString.toCharArray(service_data, returnedString.length() + 1);
456-
service_data[returnedString.length()] = '\0';
457-
Log.trace(F("Service data: %s" CR), service_data);
458-
BLEdata.set("servicedata", service_data);
545+
std::string service_data = convertServiceData(advertisedDevice->getServiceData(j));
546+
Log.trace(F("Service data: %s" CR), service_data.c_str());
547+
BLEdata.set("servicedata", (char*)service_data.c_str());
459548
std::string serviceDatauuid = advertisedDevice->getServiceDataUUID(j).toString();
460549
Log.trace(F("Service data UUID: %s" CR), (char*)serviceDatauuid.c_str());
461550
BLEdata.set("servicedatauuid", (char*)serviceDatauuid.c_str());
462-
PublishDeviceData(BLEdata);
551+
process_bledata(BLEdata); // this will force to resolve all the service data
552+
}
553+
554+
if (serviceDataCount > 1) {
555+
BLEdata.remove("servicedata");
556+
BLEdata.remove("servicedatauuid");
557+
558+
int msglen = BLEdata.measureLength() + 1;
559+
char jsonmsg[msglen];
560+
char jsonmsgb[msglen];
561+
BLEdata.printTo(jsonmsgb, sizeof(jsonmsgb));
562+
for (int j = 0; j < serviceDataCount; j++) {
563+
strcpy(jsonmsg, jsonmsgb); // the parse _destroys_ the message buffer
564+
JsonObject& BLEdataLocal = getBTJsonObject(jsonmsg, j == 0); // note, that first time we will get here the BLEdata itself; haPresence for the first msg
565+
if (!BLEdataLocal.containsKey("id")) { // would crash without id
566+
Log.trace("Json parsing error for %s" CR, jsonmsgb);
567+
break;
568+
}
569+
std::string service_data = convertServiceData(advertisedDevice->getServiceData(j));
570+
std::string serviceDatauuid = advertisedDevice->getServiceDataUUID(j).toString();
571+
572+
int last = atomic_load_explicit(&jsonBTBufferQueueLast, ::memory_order_seq_cst) % BTQueueSize;
573+
int size1 = jsonBTBufferQueue[last].buffer.size();
574+
BLEdataLocal.set("servicedata", (char*)service_data.c_str());
575+
int size2 = jsonBTBufferQueue[last].buffer.size();
576+
BLEdataLocal.set("servicedatauuid", (char*)serviceDatauuid.c_str());
577+
int size3 = jsonBTBufferQueue[last].buffer.size();
578+
Log.trace("Buffersize for %d : %d -> %d -> %d" CR, j, size1, size2, size3);
579+
PublishDeviceData(BLEdataLocal);
580+
}
581+
} else {
582+
PublishDeviceData(BLEdata, false); // easy case
463583
}
464584
} else {
465-
PublishDeviceData(BLEdata); // publish device even if there is no service data
585+
PublishDeviceData(BLEdata); // PublishDeviceData has its own logic whether it needs to publish the json or not
466586
}
467587
} else {
468588
Log.trace(F("Filtered mac device" CR));
@@ -502,8 +622,7 @@ void notifyCB(
502622

503623
if (length == 5) {
504624
Log.trace(F("Device identified creating BLE buffer" CR));
505-
StaticJsonBuffer<JSON_MSG_BUFFER> jsonBuffer;
506-
JsonObject& BLEdata = jsonBuffer.createObject();
625+
JsonObject& BLEdata = getBTJsonObject();
507626
String mac_adress = pBLERemoteCharacteristic->getRemoteService()->getClient()->getPeerAddress().toString().c_str();
508627
mac_adress.toUpperCase();
509628
for (vector<BLEdevice>::iterator p = devices.begin(); p != devices.end(); ++p) {
@@ -522,9 +641,7 @@ void notifyCB(
522641
BLEdata.set("volt", (float)(((pData[4] * 256) + pData[3]) / 1000.0));
523642
BLEdata.set("batt", (float)(((((pData[4] * 256) + pData[3]) / 1000.0) - 2.1) * 100));
524643

525-
mac_adress.replace(":", "");
526-
String mactopic = subjectBTtoMQTT + String("/") + mac_adress;
527-
pub((char*)mactopic.c_str(), BLEdata);
644+
pubBT(BLEdata);
528645
} else {
529646
Log.notice(F("Device not identified" CR));
530647
}
@@ -587,6 +704,7 @@ void BLEconnect() {
587704
void stopProcessing() {
588705
Log.notice(F("Stop BLE processing" CR));
589706
ProcessLock = true;
707+
delay(Scan_duration < 2000 ? Scan_duration : 2000);
590708
}
591709

592710
void startProcessing() {
@@ -600,14 +718,14 @@ void coreTask(void* pvParameters) {
600718
Log.trace(F("BT Task running on core: %d" CR), xPortGetCoreID());
601719
if (!ProcessLock) {
602720
int n = 0;
603-
while (client.state() != 0 && n <= InitialMQTTConnectionTimeout) {
721+
while (client.state() != 0 && n <= InitialMQTTConnectionTimeout && !ProcessLock) {
604722
n++;
605723
Log.trace(F("Wait for MQTT on core: %d attempt: %d" CR), xPortGetCoreID(), n);
606724
delay(1000);
607725
}
608726
if (client.state() != 0) {
609727
Log.warning(F("MQTT client disconnected no BLE scan" CR));
610-
} else {
728+
} else if (!ProcessLock) {
611729
BLEscan();
612730
// Launching a connect every BLEscanBeforeConnect
613731
if (!(scanCount % BLEscanBeforeConnect) || scanCount == 1)
@@ -618,8 +736,14 @@ void coreTask(void* pvParameters) {
618736
}
619737
if (lowpowermode) {
620738
lowPowerESP32();
739+
int scan = atomic_exchange_explicit(&forceBTScan, 0, ::memory_order_seq_cst); // is this enough, it will wait the full deepsleep...
740+
if (scan == 1) BTforceScan();
621741
} else {
622-
delay(BLEinterval);
742+
for (int interval = BLEinterval, waitms; interval > 0; interval -= waitms) {
743+
int scan = atomic_exchange_explicit(&forceBTScan, 0, ::memory_order_seq_cst);
744+
if (scan == 1) BTforceScan(); // should we break after this?
745+
delay(waitms = interval > 100 ? 100 : interval); // 100ms
746+
}
623747
}
624748
} else {
625749
Log.trace(F("BLE core task canceled by processLock" CR));
@@ -680,6 +804,10 @@ void setupBT() {
680804
Log.notice(F("minrssi: %d" CR), minRssi);
681805
Log.notice(F("Low Power Mode: %d" CR), lowpowermode);
682806

807+
atomic_init(&forceBTScan, 0); // in theory, we don't need this
808+
atomic_init(&jsonBTBufferQueueNext, 0); // in theory, we don't need this
809+
atomic_init(&jsonBTBufferQueueLast, 0); // in theory, we don't need this
810+
683811
// we setup a task with priority one to avoid conflict with other gateways
684812
xTaskCreatePinnedToCore(
685813
coreTask, /* Function to implement the task */
@@ -789,8 +917,7 @@ bool BTtoMQTT() {
789917
restData = token.substring(d[5].start, (d[5].start + restDataLength));
790918

791919
Log.trace(F("Creating BLE buffer" CR));
792-
StaticJsonBuffer<JSON_MSG_BUFFER> jsonBuffer;
793-
JsonObject& BLEdata = jsonBuffer.createObject();
920+
JsonObject& BLEdata = getBTJsonObject();
794921

795922
Log.trace(F("Id %s" CR), (char*)mac.c_str());
796923
BLEdata.set("id", (char*)mac.c_str());
@@ -861,21 +988,20 @@ void launchDiscovery() {
861988
}
862989
}
863990

864-
void PublishDeviceData(JsonObject& BLEdata) {
991+
void PublishDeviceData(JsonObject& BLEdata, bool processBLEData) {
865992
if (abs((int)BLEdata["rssi"] | 0) < minRssi) { // process only the devices close enough
866-
JsonObject& BLEdataOut = process_bledata(BLEdata);
867-
if (!publishOnlySensors || BLEdataOut.containsKey("model")) {
993+
if (processBLEData) process_bledata(BLEdata);
994+
if (!publishOnlySensors || BLEdata.containsKey("model") || BLEdata.containsKey("distance")) {
868995
# if !pubBLEServiceUUID
869-
RemoveJsonPropertyIf(BLEdataOut, "servicedatauuid", BLEdataOut.containsKey("servicedatauuid"));
996+
RemoveJsonPropertyIf(BLEdata, "servicedatauuid", BLEdata.containsKey("servicedatauuid"));
870997
# endif
871998
# if !pubKnownBLEServiceData
872-
RemoveJsonPropertyIf(BLEdataOut, "servicedata", BLEdataOut.containsKey("model") && BLEdataOut.containsKey("servicedata"));
999+
RemoveJsonPropertyIf(BLEdata, "servicedata", BLEdata.containsKey("model") && BLEdata.containsKey("servicedata"));
8731000
# endif
874-
String mactopic = BLEdataOut["id"].as<const char*>();
875-
mactopic.replace(":", "");
876-
mactopic = subjectBTtoMQTT + String("/") + mactopic;
877-
pub((char*)mactopic.c_str(), BLEdataOut);
1001+
pubBT(BLEdata);
8781002
}
1003+
} else if (BLEdata.containsKey("distance")) {
1004+
pubBT(BLEdata);
8791005
} else {
8801006
Log.trace(F("Low rssi, device filtered" CR));
8811007
}
@@ -1254,11 +1380,21 @@ void haRoomPresence(JsonObject& HomePresence) {
12541380
}
12551381
HomePresence["distance"] = distance;
12561382
Log.trace(F("Ble distance %D" CR), distance);
1257-
String topic = String(Base_Topic) + "home_presence/" + String(gateway_name);
1258-
pub_custom_topic((char*)topic.c_str(), HomePresence, false);
12591383
}
12601384
# endif
12611385

1386+
void BTforceScan() {
1387+
if (!ProcessLock) {
1388+
BTtoMQTT();
1389+
Log.trace(F("Scan done" CR));
1390+
# ifdef ESP32
1391+
BLEconnect();
1392+
# endif
1393+
} else {
1394+
Log.trace(F("Cannot launch scan due to other process running" CR));
1395+
}
1396+
}
1397+
12621398
void MQTTtoBT(char* topicOri, JsonObject& BTdata) { // json object decoding
12631399
if (cmpToMainTopic(topicOri, subjectMQTTtoBTset)) {
12641400
Log.trace(F("MQTTtoBT json set" CR));
@@ -1274,22 +1410,17 @@ void MQTTtoBT(char* topicOri, JsonObject& BTdata) { // json object decoding
12741410
// Scan interval set
12751411
if (BTdata.containsKey("interval")) {
12761412
Log.trace(F("BLE interval setup" CR));
1277-
// storing BLE interval for further use if needed
1278-
unsigned int prevBLEinterval = BLEinterval;
1279-
Log.trace(F("Previous interval: %d ms" CR), BLEinterval);
1280-
BLEinterval = (unsigned int)BTdata["interval"];
1281-
Log.notice(F("New interval: %d ms" CR), BLEinterval);
1282-
if (BLEinterval == 0) {
1283-
if (!ProcessLock) {
1284-
BTtoMQTT();
1285-
Log.trace(F("Scan done" CR));
1413+
unsigned int interval = BTdata["interval"];
1414+
if (interval == 0) {
12861415
# ifdef ESP32
1287-
BLEconnect();
1416+
atomic_store_explicit(&forceBTScan, 1, ::memory_order_seq_cst); // ask the other core to do the scan for us
1417+
# else
1418+
BTforceScan();
12881419
# endif
1289-
BLEinterval = prevBLEinterval; // as 0 was just used as a command we recover previous scan duration
1290-
} else {
1291-
Log.trace(F("Cannot launch scan due to other process running" CR));
1292-
}
1420+
} else {
1421+
Log.trace(F("Previous interval: %d ms" CR), BLEinterval);
1422+
BLEinterval = interval;
1423+
Log.notice(F("New interval: %d ms" CR), BLEinterval);
12931424
}
12941425
}
12951426
// Number of scan before a connect set

main/config_BT.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ extern void setupBT();
3030
extern bool BTtoMQTT();
3131
extern void MQTTtoBT(char* topicOri, JsonObject& RFdata);
3232

33+
#ifdef ESP32
34+
extern int btQueueBlocked;
35+
extern int btQueueLengthSum;
36+
extern int btQueueLengthCount;
37+
#endif
38+
3339
/*----------------------BT topics & parameters-------------------------*/
3440
#define subjectBTtoMQTT "/BTtoMQTT"
3541
#define subjectMQTTtoBTset "/commands/MQTTtoBT/config"
@@ -48,6 +54,10 @@ extern void MQTTtoBT(char* topicOri, JsonObject& RFdata);
4854
# define PublishOnlySensors false //false if we publish all BLE devices discovered or true only the identified sensors (like temperature sensors)
4955
#endif
5056

57+
#ifndef BTQueueSize
58+
# define BTQueueSize 4 // lockless queue size for multi core cases (ESP32 currently)
59+
#endif
60+
5161
#define HMSerialSpeed 9600 // Communication speed with the HM module, softwareserial doesn't support 115200
5262
//#define HM_BLUE_LED_STOP true //uncomment to stop the blue led light of HM1X
5363

0 commit comments

Comments
 (0)