Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 135 additions & 39 deletions main/ZgatewayBT.ino
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ FreeRTOS::Semaphore semaphoreCreateOrUpdateDevice = FreeRTOS::Semaphore("createO
# include <esp_bt.h>
# include <esp_bt_main.h>
# include <esp_wifi.h>
# include <stdatomic.h>

# include "soc/timer_group_reg.h"
# include "soc/timer_group_struct.h"
Expand Down Expand Up @@ -90,9 +91,96 @@ int minRssi = abs(MinimumRSSI); //minimum rssi value

unsigned int scanCount = 0;

void pubBTMainCore(JsonObject& data) {
if (abs((int)data["rssi"] | 0) < minRssi) {
String mac_address = data["id"].as<const char*>();
mac_address.replace(":", "");
String mactopic = subjectBTtoMQTT + String("/") + mac_address;
pub((char*)mactopic.c_str(), data);
}
if (data.containsKey("distance")) {
data.remove("servicedatauuid");
data.remove("servicedata");
String topic = String(Base_Topic) + "home_presence/" + String(gateway_name);
pub_custom_topic((char*)topic.c_str(), data, false);
}
}

class JsonBundle {
public:
StaticJsonBuffer<JSON_MSG_BUFFER> buffer;
JsonObject* object;

JsonObject& createObject() {
buffer.clear();
object = &buffer.createObject();
return *object;
}
};

# ifdef ESP32
static TaskHandle_t xCoreTaskHandle;

atomic_int forceBTScan;

JsonBundle jsonBTBufferQueue[BTQueueSize];
atomic_int jsonBTBufferQueueNext, jsonBTBufferQueueLast;
int btQueueBlocked = 0;
int btQueueLengthSum = 0;
int btQueueLengthCount = 0;

JsonObject& getBTJsonObject() {
int next, last;
for (bool blocked = false;;) {
next = atomic_load_explicit(&jsonBTBufferQueueNext, ::memory_order_seq_cst); // use namespace std -> ambiguous error...
last = atomic_load_explicit(&jsonBTBufferQueueLast, ::memory_order_seq_cst); // use namespace std -> ambiguous error...
if ((2 * BTQueueSize + last - next) % (2 * BTQueueSize) != BTQueueSize) break;
if (!blocked) {
blocked = true;
btQueueBlocked++;
}
delay(1);
}
return jsonBTBufferQueue[last % BTQueueSize].createObject();
}

// should run from the BT core
void pubBT(JsonObject& data) {
int last = atomic_load_explicit(&jsonBTBufferQueueLast, ::memory_order_seq_cst);
atomic_store_explicit(&jsonBTBufferQueueLast, (last + 1) % (2 * BTQueueSize), ::memory_order_seq_cst); // use namespace std -> ambiguous error...
}

// should run from the main core
void emptyBTQueue() {
for (bool first = true;;) {
int next = atomic_load_explicit(&jsonBTBufferQueueNext, ::memory_order_seq_cst); // use namespace std -> ambiguous error...
int last = atomic_load_explicit(&jsonBTBufferQueueLast, ::memory_order_seq_cst); // use namespace std -> ambiguous error...
if (last == next) break;
if (first) {
int diff = (2 * BTQueueSize + last - next) % (2 * BTQueueSize);
btQueueLengthSum += diff;
btQueueLengthCount++;
first = false;
}
pubBTMainCore(*jsonBTBufferQueue[next % BTQueueSize].object);
atomic_store_explicit(&jsonBTBufferQueueNext, (next + 1) % (2 * BTQueueSize), ::memory_order_seq_cst); // use namespace std -> ambiguous error...
}
}

# else

JsonBundle jsonBTBuffer;

JsonObject& getBTJsonObject() {
return jsonBTBuffer.createObject();
}

void pubBT(JsonObject& data) {
pubBTMainCore(data);
}

# endif

bool ProcessLock = false; // Process lock when we want to use a critical function like OTA for example

BLEdevice* getDeviceByMac(const char* mac);
Expand Down Expand Up @@ -411,8 +499,7 @@ static int taskCore = 0;
class MyAdvertisedDeviceCallbacks : public BLEAdvertisedDeviceCallbacks {
void onResult(BLEAdvertisedDevice* advertisedDevice) {
Log.trace(F("Creating BLE buffer" CR));
StaticJsonBuffer<JSON_MSG_BUFFER> jsonBuffer;
JsonObject& BLEdata = jsonBuffer.createObject();
JsonObject& BLEdata = getBTJsonObject();
String mac_adress = advertisedDevice->getAddress().toString().c_str();
mac_adress.toUpperCase();
BLEdata.set("id", (char*)mac_adress.c_str());
Expand Down Expand Up @@ -459,11 +546,9 @@ class MyAdvertisedDeviceCallbacks : public BLEAdvertisedDeviceCallbacks {
std::string serviceDatauuid = advertisedDevice->getServiceDataUUID(j).toString();
Log.trace(F("Service data UUID: %s" CR), (char*)serviceDatauuid.c_str());
BLEdata.set("servicedatauuid", (char*)serviceDatauuid.c_str());
PublishDeviceData(BLEdata);
}
} else {
PublishDeviceData(BLEdata); // publish device even if there is no service data
}
PublishDeviceData(BLEdata); // PublishDeviceData has its own logic whether it needs to publish the json or not
} else {
Log.trace(F("Filtered mac device" CR));
}
Expand Down Expand Up @@ -502,8 +587,7 @@ void notifyCB(

if (length == 5) {
Log.trace(F("Device identified creating BLE buffer" CR));
StaticJsonBuffer<JSON_MSG_BUFFER> jsonBuffer;
JsonObject& BLEdata = jsonBuffer.createObject();
JsonObject& BLEdata = getBTJsonObject();
String mac_adress = pBLERemoteCharacteristic->getRemoteService()->getClient()->getPeerAddress().toString().c_str();
mac_adress.toUpperCase();
for (vector<BLEdevice>::iterator p = devices.begin(); p != devices.end(); ++p) {
Expand All @@ -522,9 +606,7 @@ void notifyCB(
BLEdata.set("volt", (float)(((pData[4] * 256) + pData[3]) / 1000.0));
BLEdata.set("batt", (float)(((((pData[4] * 256) + pData[3]) / 1000.0) - 2.1) * 100));

mac_adress.replace(":", "");
String mactopic = subjectBTtoMQTT + String("/") + mac_adress;
pub((char*)mactopic.c_str(), BLEdata);
pubBT(BLEdata);
} else {
Log.notice(F("Device not identified" CR));
}
Expand Down Expand Up @@ -587,6 +669,7 @@ void BLEconnect() {
void stopProcessing() {
Log.notice(F("Stop BLE processing" CR));
ProcessLock = true;
delay(Scan_duration < 2000 ? Scan_duration : 2000);
}

void startProcessing() {
Expand All @@ -600,14 +683,14 @@ void coreTask(void* pvParameters) {
Log.trace(F("BT Task running on core: %d" CR), xPortGetCoreID());
if (!ProcessLock) {
Copy link
Owner

@1technophile 1technophile Feb 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this if not redundant with the condition added into the while loop and of the else if?

Copy link
Contributor Author

@csiki2 csiki2 Feb 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there was a connection phase before that we may already waited seconds and the validity of ProcessLock long gone.
Also, we don't want to take too much time in anything that has delay() in it.
I even added a delay(Scan_duration < 2000 ? Scan_duration : 2000); to the stopprocessing: this gives more time to leave the BT loop gracefully. Unfortunately waiting to much will result in a timeout at platformio.
I needed these as I am more aggressive in the BT parameterwise and I am able to kill the OTA easily (ofc with the lockless queue)...

int n = 0;
while (client.state() != 0 && n <= InitialMQTTConnectionTimeout) {
while (client.state() != 0 && n <= InitialMQTTConnectionTimeout && !ProcessLock) {
n++;
Log.trace(F("Wait for MQTT on core: %d attempt: %d" CR), xPortGetCoreID(), n);
delay(1000);
}
if (client.state() != 0) {
Log.warning(F("MQTT client disconnected no BLE scan" CR));
} else {
} else if (!ProcessLock) {
BLEscan();
// Launching a connect every BLEscanBeforeConnect
if (!(scanCount % BLEscanBeforeConnect) || scanCount == 1)
Expand All @@ -618,8 +701,14 @@ void coreTask(void* pvParameters) {
}
if (lowpowermode) {
lowPowerESP32();
int scan = atomic_exchange_explicit(&forceBTScan, 0, ::memory_order_seq_cst); // is this enough, it will wait the full deepsleep...
if (scan == 1) BTforceScan();
} else {
delay(BLEinterval);
for (int interval = BLEinterval, waitms; interval > 0; interval -= waitms) {
int scan = atomic_exchange_explicit(&forceBTScan, 0, ::memory_order_seq_cst);
if (scan == 1) BTforceScan(); // should we break after this?
delay(waitms = interval > 100 ? 100 : interval); // 100ms
}
}
} else {
Log.trace(F("BLE core task canceled by processLock" CR));
Expand Down Expand Up @@ -680,6 +769,10 @@ void setupBT() {
Log.notice(F("minrssi: %d" CR), minRssi);
Log.notice(F("Low Power Mode: %d" CR), lowpowermode);

atomic_init(&forceBTScan, 0); // in theory, we don't need this
atomic_init(&jsonBTBufferQueueNext, 0); // in theory, we don't need this
atomic_init(&jsonBTBufferQueueLast, 0); // in theory, we don't need this

// we setup a task with priority one to avoid conflict with other gateways
xTaskCreatePinnedToCore(
coreTask, /* Function to implement the task */
Expand Down Expand Up @@ -789,8 +882,7 @@ bool BTtoMQTT() {
restData = token.substring(d[5].start, (d[5].start + restDataLength));

Log.trace(F("Creating BLE buffer" CR));
StaticJsonBuffer<JSON_MSG_BUFFER> jsonBuffer;
JsonObject& BLEdata = jsonBuffer.createObject();
JsonObject& BLEdata = getBTJsonObject();

Log.trace(F("Id %s" CR), (char*)mac.c_str());
BLEdata.set("id", (char*)mac.c_str());
Expand Down Expand Up @@ -863,19 +955,18 @@ void launchDiscovery() {

void PublishDeviceData(JsonObject& BLEdata) {
if (abs((int)BLEdata["rssi"] | 0) < minRssi) { // process only the devices close enough
JsonObject& BLEdataOut = process_bledata(BLEdata);
if (!publishOnlySensors || BLEdataOut.containsKey("model")) {
process_bledata(BLEdata);
if (!publishOnlySensors || BLEdata.containsKey("model") || BLEdata.containsKey("distance")) {
# if !pubBLEServiceUUID
RemoveJsonPropertyIf(BLEdataOut, "servicedatauuid", BLEdataOut.containsKey("servicedatauuid"));
RemoveJsonPropertyIf(BLEdata, "servicedatauuid", BLEdata.containsKey("servicedatauuid"));
# endif
# if !pubKnownBLEServiceData
RemoveJsonPropertyIf(BLEdataOut, "servicedata", BLEdataOut.containsKey("model") && BLEdataOut.containsKey("servicedata"));
RemoveJsonPropertyIf(BLEdata, "servicedata", BLEdata.containsKey("model") && BLEdata.containsKey("servicedata"));
# endif
String mactopic = BLEdataOut["id"].as<const char*>();
mactopic.replace(":", "");
mactopic = subjectBTtoMQTT + String("/") + mactopic;
pub((char*)mactopic.c_str(), BLEdataOut);
pubBT(BLEdata);
}
} else if (BLEdata.containsKey("distance")) {
pubBT(BLEdata);
} else {
Log.trace(F("Low rssi, device filtered" CR));
}
Expand Down Expand Up @@ -1254,11 +1345,21 @@ void haRoomPresence(JsonObject& HomePresence) {
}
HomePresence["distance"] = distance;
Log.trace(F("Ble distance %D" CR), distance);
String topic = String(Base_Topic) + "home_presence/" + String(gateway_name);
pub_custom_topic((char*)topic.c_str(), HomePresence, false);
}
# endif

void BTforceScan() {
if (!ProcessLock) {
BTtoMQTT();
Log.trace(F("Scan done" CR));
# ifdef ESP32
BLEconnect();
# endif
} else {
Log.trace(F("Cannot launch scan due to other process running" CR));
}
}

void MQTTtoBT(char* topicOri, JsonObject& BTdata) { // json object decoding
if (cmpToMainTopic(topicOri, subjectMQTTtoBTset)) {
Log.trace(F("MQTTtoBT json set" CR));
Expand All @@ -1274,22 +1375,17 @@ void MQTTtoBT(char* topicOri, JsonObject& BTdata) { // json object decoding
// Scan interval set
if (BTdata.containsKey("interval")) {
Log.trace(F("BLE interval setup" CR));
// storing BLE interval for further use if needed
unsigned int prevBLEinterval = BLEinterval;
Log.trace(F("Previous interval: %d ms" CR), BLEinterval);
BLEinterval = (unsigned int)BTdata["interval"];
Log.notice(F("New interval: %d ms" CR), BLEinterval);
if (BLEinterval == 0) {
if (!ProcessLock) {
BTtoMQTT();
Log.trace(F("Scan done" CR));
unsigned int interval = BTdata["interval"];
if (interval == 0) {
# ifdef ESP32
BLEconnect();
atomic_store_explicit(&forceBTScan, 1, ::memory_order_seq_cst); // ask the other core to do the scan for us
# else
BTforceScan();
# endif
BLEinterval = prevBLEinterval; // as 0 was just used as a command we recover previous scan duration
} else {
Log.trace(F("Cannot launch scan due to other process running" CR));
}
} else {
Log.trace(F("Previous interval: %d ms" CR), BLEinterval);
BLEinterval = interval;
Log.notice(F("New interval: %d ms" CR), BLEinterval);
}
}
// Number of scan before a connect set
Expand Down
10 changes: 10 additions & 0 deletions main/config_BT.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ extern void setupBT();
extern bool BTtoMQTT();
extern void MQTTtoBT(char* topicOri, JsonObject& RFdata);

#ifdef ESP32
extern int btQueueBlocked;
extern int btQueueLengthSum;
extern int btQueueLengthCount;
#endif

/*----------------------BT topics & parameters-------------------------*/
#define subjectBTtoMQTT "/BTtoMQTT"
#define subjectMQTTtoBTset "/commands/MQTTtoBT/config"
Expand All @@ -48,6 +54,10 @@ extern void MQTTtoBT(char* topicOri, JsonObject& RFdata);
# define PublishOnlySensors false //false if we publish all BLE devices discovered or true only the identified sensors (like temperature sensors)
#endif

#ifndef BTQueueSize
# define BTQueueSize 4 // lockless queue size for multi core cases (ESP32 currently)
#endif

#define HMSerialSpeed 9600 // Communication speed with the HM module, softwareserial doesn't support 115200
//#define HM_BLUE_LED_STOP true //uncomment to stop the blue led light of HM1X

Expand Down
12 changes: 6 additions & 6 deletions main/main.ino
Original file line number Diff line number Diff line change
Expand Up @@ -1250,13 +1250,7 @@ void loop() {
#ifdef ZmqttDiscovery
if (disc) {
if (!connectedOnce) {
# if defined(ZgatewayBT) && defined(ESP32)
stopProcessing(); // Avoid publication concurrency issues on ESP32 BLE
# endif
pubMqttDiscovery(); // at first connection we publish the discovery payloads
# if defined(ZgatewayBT) && defined(ESP32)
startProcessing();
# endif
}
}
#endif
Expand Down Expand Up @@ -1328,6 +1322,8 @@ void loop() {
# ifndef ESP32
if (BTtoMQTT())
Log.trace(F("BTtoMQTT OK" CR));
# else
emptyBTQueue();
# endif
#endif
#ifdef ZgatewaySRFB
Expand Down Expand Up @@ -1407,6 +1403,10 @@ void stateMeasures() {
# ifdef ZgatewayBT
# ifdef ESP32
SYSdata["lowpowermode"] = (int)lowpowermode;
SYSdata["btqblck"] = btQueueBlocked;
SYSdata["btqsum"] = btQueueLengthSum;
SYSdata["btqsnd"] = btQueueLengthCount;
SYSdata["btqavg"] = (btQueueLengthCount > 0 ? btQueueLengthSum / (float)btQueueLengthCount : 0);
# endif
SYSdata["interval"] = BLEinterval;
SYSdata["scanbcnct"] = BLEscanBeforeConnect;
Expand Down