Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 185 additions & 54 deletions main/ZgatewayBT.ino
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ FreeRTOS::Semaphore semaphoreCreateOrUpdateDevice = FreeRTOS::Semaphore("createO
# include <esp_bt.h>
# include <esp_bt_main.h>
# include <esp_wifi.h>
# include <stdatomic.h>

# include "soc/timer_group_reg.h"
# include "soc/timer_group_struct.h"
Expand Down Expand Up @@ -90,9 +91,101 @@ int minRssi = abs(MinimumRSSI); //minimum rssi value

unsigned int scanCount = 0;

void pubBTMainCore(JsonObject& data, bool haPresenceEnabled = true) {
if (abs((int)data["rssi"] | 0) < minRssi) {
String mac_address = data["id"].as<const char*>();
mac_address.replace(":", "");
String mactopic = subjectBTtoMQTT + String("/") + mac_address;
pub((char*)mactopic.c_str(), data);
}
if (haPresenceEnabled && data.containsKey("distance")) {
data.remove("servicedatauuid");
data.remove("servicedata");
String topic = String(Base_Topic) + "home_presence/" + String(gateway_name);
pub_custom_topic((char*)topic.c_str(), data, false);
}
}

class JsonBundle {
public:
StaticJsonBuffer<JSON_MSG_BUFFER> buffer;
JsonObject* object;
bool haPresence;

JsonObject& createObject(const char* json = NULL, bool haPresenceEnabled = true) {
buffer.clear();
haPresence = haPresenceEnabled;
object = &(json == NULL ? buffer.createObject() : buffer.parseObject(json));
return *object;
}
};

void PublishDeviceData(JsonObject& BLEdata, bool processBLEData = true);

# ifdef ESP32
static TaskHandle_t xCoreTaskHandle;

atomic_int forceBTScan;

JsonBundle jsonBTBufferQueue[BTQueueSize];
atomic_int jsonBTBufferQueueNext, jsonBTBufferQueueLast;
int btQueueBlocked = 0;
int btQueueLengthSum = 0;
int btQueueLengthCount = 0;

JsonObject& getBTJsonObject(const char* json = NULL, bool haPresenceEnabled = true) {
int next, last;
for (bool blocked = false;;) {
next = atomic_load_explicit(&jsonBTBufferQueueNext, ::memory_order_seq_cst); // use namespace std -> ambiguous error...
last = atomic_load_explicit(&jsonBTBufferQueueLast, ::memory_order_seq_cst); // use namespace std -> ambiguous error...
if ((2 * BTQueueSize + last - next) % (2 * BTQueueSize) != BTQueueSize) break;
if (!blocked) {
blocked = true;
btQueueBlocked++;
}
delay(1);
}
return jsonBTBufferQueue[last % BTQueueSize].createObject(json, haPresenceEnabled);
}

// should run from the BT core
void pubBT(JsonObject& data) {
int last = atomic_load_explicit(&jsonBTBufferQueueLast, ::memory_order_seq_cst);
atomic_store_explicit(&jsonBTBufferQueueLast, (last + 1) % (2 * BTQueueSize), ::memory_order_seq_cst); // use namespace std -> ambiguous error...
}

// should run from the main core
void emptyBTQueue() {
for (bool first = true;;) {
int next = atomic_load_explicit(&jsonBTBufferQueueNext, ::memory_order_seq_cst); // use namespace std -> ambiguous error...
int last = atomic_load_explicit(&jsonBTBufferQueueLast, ::memory_order_seq_cst); // use namespace std -> ambiguous error...
if (last == next) break;
if (first) {
int diff = (2 * BTQueueSize + last - next) % (2 * BTQueueSize);
btQueueLengthSum += diff;
btQueueLengthCount++;
first = false;
}
JsonBundle& bundle = jsonBTBufferQueue[next % BTQueueSize];
pubBTMainCore(*bundle.object, bundle.haPresence);
atomic_store_explicit(&jsonBTBufferQueueNext, (next + 1) % (2 * BTQueueSize), ::memory_order_seq_cst); // use namespace std -> ambiguous error...
}
}

# else

JsonBundle jsonBTBuffer;

JsonObject& getBTJsonObject(const char* json = NULL, bool haPresenceEnabled = true) {
return jsonBTBuffer.createObject();
}

void pubBT(JsonObject& data) {
pubBTMainCore(data);
}

# endif

bool ProcessLock = false; // Process lock when we want to use a critical function like OTA for example

BLEdevice* getDeviceByMac(const char* mac);
Expand Down Expand Up @@ -409,10 +502,18 @@ void INodeEMDiscovery(char* mac, char* sensorModel) {}
static int taskCore = 0;

class MyAdvertisedDeviceCallbacks : public BLEAdvertisedDeviceCallbacks {
std::string convertServiceData(std::string deviceServiceData) {
int serviceDataLength = (int)deviceServiceData.length();
char spr[2 * serviceDataLength + 1];
for (int i = 0; i < serviceDataLength; i++) sprintf(spr + 2 * i, "%.2x", (unsigned char)deviceServiceData[i]);
spr[2 * serviceDataLength] = 0;
Log.trace("Converted service data (%d) to %s" CR, serviceDataLength, spr);
return spr;
}

void onResult(BLEAdvertisedDevice* advertisedDevice) {
Log.trace(F("Creating BLE buffer" CR));
StaticJsonBuffer<JSON_MSG_BUFFER> jsonBuffer;
JsonObject& BLEdata = jsonBuffer.createObject();
JsonObject& BLEdata = getBTJsonObject();
String mac_adress = advertisedDevice->getAddress().toString().c_str();
mac_adress.toUpperCase();
BLEdata.set("id", (char*)mac_adress.c_str());
Expand Down Expand Up @@ -441,28 +542,47 @@ class MyAdvertisedDeviceCallbacks : public BLEAdvertisedDeviceCallbacks {
int serviceDataCount = advertisedDevice->getServiceDataCount();
Log.trace(F("Get services data number: %d" CR), serviceDataCount);
for (int j = 0; j < serviceDataCount; j++) {
std::string serviceData = advertisedDevice->getServiceData(j);
int serviceDataLength = serviceData.length();
String returnedString = "";
for (int i = 0; i < serviceDataLength; i++) {
int a = serviceData[i];
if (a < 16) {
returnedString += F("0");
}
returnedString += String(a, HEX);
}
char service_data[returnedString.length() + 1];
returnedString.toCharArray(service_data, returnedString.length() + 1);
service_data[returnedString.length()] = '\0';
Log.trace(F("Service data: %s" CR), service_data);
BLEdata.set("servicedata", service_data);
std::string service_data = convertServiceData(advertisedDevice->getServiceData(j));
Log.trace(F("Service data: %s" CR), service_data.c_str());
BLEdata.set("servicedata", (char*)service_data.c_str());
std::string serviceDatauuid = advertisedDevice->getServiceDataUUID(j).toString();
Log.trace(F("Service data UUID: %s" CR), (char*)serviceDatauuid.c_str());
BLEdata.set("servicedatauuid", (char*)serviceDatauuid.c_str());
PublishDeviceData(BLEdata);
process_bledata(BLEdata); // this will force to resolve all the service data
}

if (serviceDataCount > 1) {
BLEdata.remove("servicedata");
BLEdata.remove("servicedatauuid");

int msglen = BLEdata.measureLength() + 1;
char jsonmsg[msglen];
char jsonmsgb[msglen];
BLEdata.printTo(jsonmsgb, sizeof(jsonmsgb));
for (int j = 0; j < serviceDataCount; j++) {
strcpy(jsonmsg, jsonmsgb); // the parse _destroys_ the message buffer
JsonObject& BLEdataLocal = getBTJsonObject(jsonmsg, j == 0); // note, that first time we will get here the BLEdata itself; haPresence for the first msg
if (!BLEdataLocal.containsKey("id")) { // would crash without id
Log.trace("Json parsing error for %s" CR, jsonmsgb);
break;
}
std::string service_data = convertServiceData(advertisedDevice->getServiceData(j));
std::string serviceDatauuid = advertisedDevice->getServiceDataUUID(j).toString();

int last = atomic_load_explicit(&jsonBTBufferQueueLast, ::memory_order_seq_cst) % BTQueueSize;
int size1 = jsonBTBufferQueue[last].buffer.size();
BLEdataLocal.set("servicedata", (char*)service_data.c_str());
int size2 = jsonBTBufferQueue[last].buffer.size();
BLEdataLocal.set("servicedatauuid", (char*)serviceDatauuid.c_str());
int size3 = jsonBTBufferQueue[last].buffer.size();
Log.trace("Buffersize for %d : %d -> %d -> %d" CR, j, size1, size2, size3);
PublishDeviceData(BLEdataLocal);
}
} else {
PublishDeviceData(BLEdata, false); // easy case
}
} else {
PublishDeviceData(BLEdata); // publish device even if there is no service data
PublishDeviceData(BLEdata); // PublishDeviceData has its own logic whether it needs to publish the json or not
}
} else {
Log.trace(F("Filtered mac device" CR));
Expand Down Expand Up @@ -502,8 +622,7 @@ void notifyCB(

if (length == 5) {
Log.trace(F("Device identified creating BLE buffer" CR));
StaticJsonBuffer<JSON_MSG_BUFFER> jsonBuffer;
JsonObject& BLEdata = jsonBuffer.createObject();
JsonObject& BLEdata = getBTJsonObject();
String mac_adress = pBLERemoteCharacteristic->getRemoteService()->getClient()->getPeerAddress().toString().c_str();
mac_adress.toUpperCase();
for (vector<BLEdevice>::iterator p = devices.begin(); p != devices.end(); ++p) {
Expand All @@ -522,9 +641,7 @@ void notifyCB(
BLEdata.set("volt", (float)(((pData[4] * 256) + pData[3]) / 1000.0));
BLEdata.set("batt", (float)(((((pData[4] * 256) + pData[3]) / 1000.0) - 2.1) * 100));

mac_adress.replace(":", "");
String mactopic = subjectBTtoMQTT + String("/") + mac_adress;
pub((char*)mactopic.c_str(), BLEdata);
pubBT(BLEdata);
} else {
Log.notice(F("Device not identified" CR));
}
Expand Down Expand Up @@ -587,6 +704,7 @@ void BLEconnect() {
void stopProcessing() {
Log.notice(F("Stop BLE processing" CR));
ProcessLock = true;
delay(Scan_duration < 2000 ? Scan_duration : 2000);
}

void startProcessing() {
Expand All @@ -600,14 +718,14 @@ void coreTask(void* pvParameters) {
Log.trace(F("BT Task running on core: %d" CR), xPortGetCoreID());
if (!ProcessLock) {
Copy link
Owner

@1technophile 1technophile Feb 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this if not redundant with the condition added into the while loop and of the else if?

Copy link
Contributor Author

@csiki2 csiki2 Feb 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there was a connection phase before that we may already waited seconds and the validity of ProcessLock long gone.
Also, we don't want to take too much time in anything that has delay() in it.
I even added a delay(Scan_duration < 2000 ? Scan_duration : 2000); to the stopprocessing: this gives more time to leave the BT loop gracefully. Unfortunately waiting to much will result in a timeout at platformio.
I needed these as I am more aggressive in the BT parameterwise and I am able to kill the OTA easily (ofc with the lockless queue)...

int n = 0;
while (client.state() != 0 && n <= InitialMQTTConnectionTimeout) {
while (client.state() != 0 && n <= InitialMQTTConnectionTimeout && !ProcessLock) {
n++;
Log.trace(F("Wait for MQTT on core: %d attempt: %d" CR), xPortGetCoreID(), n);
delay(1000);
}
if (client.state() != 0) {
Log.warning(F("MQTT client disconnected no BLE scan" CR));
} else {
} else if (!ProcessLock) {
BLEscan();
// Launching a connect every BLEscanBeforeConnect
if (!(scanCount % BLEscanBeforeConnect) || scanCount == 1)
Expand All @@ -618,8 +736,14 @@ void coreTask(void* pvParameters) {
}
if (lowpowermode) {
lowPowerESP32();
int scan = atomic_exchange_explicit(&forceBTScan, 0, ::memory_order_seq_cst); // is this enough, it will wait the full deepsleep...
if (scan == 1) BTforceScan();
} else {
delay(BLEinterval);
for (int interval = BLEinterval, waitms; interval > 0; interval -= waitms) {
int scan = atomic_exchange_explicit(&forceBTScan, 0, ::memory_order_seq_cst);
if (scan == 1) BTforceScan(); // should we break after this?
delay(waitms = interval > 100 ? 100 : interval); // 100ms
}
}
} else {
Log.trace(F("BLE core task canceled by processLock" CR));
Expand Down Expand Up @@ -680,6 +804,10 @@ void setupBT() {
Log.notice(F("minrssi: %d" CR), minRssi);
Log.notice(F("Low Power Mode: %d" CR), lowpowermode);

atomic_init(&forceBTScan, 0); // in theory, we don't need this
atomic_init(&jsonBTBufferQueueNext, 0); // in theory, we don't need this
atomic_init(&jsonBTBufferQueueLast, 0); // in theory, we don't need this

// we setup a task with priority one to avoid conflict with other gateways
xTaskCreatePinnedToCore(
coreTask, /* Function to implement the task */
Expand Down Expand Up @@ -789,8 +917,7 @@ bool BTtoMQTT() {
restData = token.substring(d[5].start, (d[5].start + restDataLength));

Log.trace(F("Creating BLE buffer" CR));
StaticJsonBuffer<JSON_MSG_BUFFER> jsonBuffer;
JsonObject& BLEdata = jsonBuffer.createObject();
JsonObject& BLEdata = getBTJsonObject();

Log.trace(F("Id %s" CR), (char*)mac.c_str());
BLEdata.set("id", (char*)mac.c_str());
Expand Down Expand Up @@ -861,21 +988,20 @@ void launchDiscovery() {
}
}

void PublishDeviceData(JsonObject& BLEdata) {
void PublishDeviceData(JsonObject& BLEdata, bool processBLEData) {
if (abs((int)BLEdata["rssi"] | 0) < minRssi) { // process only the devices close enough
JsonObject& BLEdataOut = process_bledata(BLEdata);
if (!publishOnlySensors || BLEdataOut.containsKey("model")) {
if (processBLEData) process_bledata(BLEdata);
if (!publishOnlySensors || BLEdata.containsKey("model") || BLEdata.containsKey("distance")) {
# if !pubBLEServiceUUID
RemoveJsonPropertyIf(BLEdataOut, "servicedatauuid", BLEdataOut.containsKey("servicedatauuid"));
RemoveJsonPropertyIf(BLEdata, "servicedatauuid", BLEdata.containsKey("servicedatauuid"));
# endif
# if !pubKnownBLEServiceData
RemoveJsonPropertyIf(BLEdataOut, "servicedata", BLEdataOut.containsKey("model") && BLEdataOut.containsKey("servicedata"));
RemoveJsonPropertyIf(BLEdata, "servicedata", BLEdata.containsKey("model") && BLEdata.containsKey("servicedata"));
# endif
String mactopic = BLEdataOut["id"].as<const char*>();
mactopic.replace(":", "");
mactopic = subjectBTtoMQTT + String("/") + mactopic;
pub((char*)mactopic.c_str(), BLEdataOut);
pubBT(BLEdata);
}
} else if (BLEdata.containsKey("distance")) {
pubBT(BLEdata);
} else {
Log.trace(F("Low rssi, device filtered" CR));
}
Expand Down Expand Up @@ -1254,11 +1380,21 @@ void haRoomPresence(JsonObject& HomePresence) {
}
HomePresence["distance"] = distance;
Log.trace(F("Ble distance %D" CR), distance);
String topic = String(Base_Topic) + "home_presence/" + String(gateway_name);
pub_custom_topic((char*)topic.c_str(), HomePresence, false);
}
# endif

void BTforceScan() {
if (!ProcessLock) {
BTtoMQTT();
Log.trace(F("Scan done" CR));
# ifdef ESP32
BLEconnect();
# endif
} else {
Log.trace(F("Cannot launch scan due to other process running" CR));
}
}

void MQTTtoBT(char* topicOri, JsonObject& BTdata) { // json object decoding
if (cmpToMainTopic(topicOri, subjectMQTTtoBTset)) {
Log.trace(F("MQTTtoBT json set" CR));
Expand All @@ -1274,22 +1410,17 @@ void MQTTtoBT(char* topicOri, JsonObject& BTdata) { // json object decoding
// Scan interval set
if (BTdata.containsKey("interval")) {
Log.trace(F("BLE interval setup" CR));
// storing BLE interval for further use if needed
unsigned int prevBLEinterval = BLEinterval;
Log.trace(F("Previous interval: %d ms" CR), BLEinterval);
BLEinterval = (unsigned int)BTdata["interval"];
Log.notice(F("New interval: %d ms" CR), BLEinterval);
if (BLEinterval == 0) {
if (!ProcessLock) {
BTtoMQTT();
Log.trace(F("Scan done" CR));
unsigned int interval = BTdata["interval"];
if (interval == 0) {
# ifdef ESP32
BLEconnect();
atomic_store_explicit(&forceBTScan, 1, ::memory_order_seq_cst); // ask the other core to do the scan for us
# else
BTforceScan();
# endif
BLEinterval = prevBLEinterval; // as 0 was just used as a command we recover previous scan duration
} else {
Log.trace(F("Cannot launch scan due to other process running" CR));
}
} else {
Log.trace(F("Previous interval: %d ms" CR), BLEinterval);
BLEinterval = interval;
Log.notice(F("New interval: %d ms" CR), BLEinterval);
}
}
// Number of scan before a connect set
Expand Down
10 changes: 10 additions & 0 deletions main/config_BT.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ extern void setupBT();
extern bool BTtoMQTT();
extern void MQTTtoBT(char* topicOri, JsonObject& RFdata);

#ifdef ESP32
extern int btQueueBlocked;
extern int btQueueLengthSum;
extern int btQueueLengthCount;
#endif

/*----------------------BT topics & parameters-------------------------*/
#define subjectBTtoMQTT "/BTtoMQTT"
#define subjectMQTTtoBTset "/commands/MQTTtoBT/config"
Expand All @@ -48,6 +54,10 @@ extern void MQTTtoBT(char* topicOri, JsonObject& RFdata);
# define PublishOnlySensors false //false if we publish all BLE devices discovered or true only the identified sensors (like temperature sensors)
#endif

#ifndef BTQueueSize
# define BTQueueSize 4 // lockless queue size for multi core cases (ESP32 currently)
#endif

#define HMSerialSpeed 9600 // Communication speed with the HM module, softwareserial doesn't support 115200
//#define HM_BLUE_LED_STOP true //uncomment to stop the blue led light of HM1X

Expand Down
Loading