Skip to content

Commit 6fe83a0

Browse files
committed
Fix #364 Introduce for SSE cleanupClients(N) function
This function is similar to `cleanupClients(N)` for WebSocket: - It allows to cleanup an oldest SSE client if the connected client count is more than N - it resize the client list to remove empty entries following client disconnect This "book-keeping" function has to be called in the loop(). This is a non-elegant attempt to fix issue #364
1 parent 4c86b44 commit 6fe83a0

2 files changed

Lines changed: 57 additions & 17 deletions

File tree

src/AsyncEventSource.cpp

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,21 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient *client) {
360360
#ifdef ESP32
361361
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
362362
#endif
363-
_clients.emplace_back(client);
363+
364+
// find first unique_ptr with nullptr and reuse it
365+
bool reused = false;
366+
for (auto &c : _clients) {
367+
if (c.get() == nullptr) {
368+
c.reset(client);
369+
reused = true;
370+
break;
371+
}
372+
}
373+
374+
if (!reused) {
375+
_clients.emplace_back(client);
376+
}
377+
364378
if (_connectcb) {
365379
_connectcb(client);
366380
}
@@ -377,7 +391,7 @@ void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient *client) {
377391
#endif
378392
for (auto i = _clients.begin(); i != _clients.end(); ++i) {
379393
if (i->get() == client) {
380-
_clients.erase(i);
394+
i->reset(); // reset the unique_ptr but do not remove the list entry yet to keep other iterators valid
381395
break;
382396
}
383397
}
@@ -392,7 +406,7 @@ void AsyncEventSource::close() {
392406
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
393407
#endif
394408
for (const auto &c : _clients) {
395-
if (c->connected()) {
409+
if (c.get() != nullptr && c->connected()) {
396410
/**
397411
* @brief: Fix self-deadlock by using recursive_mutex instead.
398412
* Due to c->close() shall call the callback function _onDisconnect()
@@ -403,24 +417,38 @@ void AsyncEventSource::close() {
403417
}
404418
}
405419

420+
void AsyncEventSource::cleanupClients(uint32_t maxClients) {
421+
#ifdef ESP32
422+
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
423+
#endif
424+
425+
// first resize the list to remove unique_ptr with nullptr
426+
_clients.remove_if([](const std::unique_ptr<AsyncEventSourceClient> &c) {
427+
return c.get() == nullptr;
428+
});
429+
430+
// then close one old client if we have more than maxClients
431+
// no need to close more: cleanupClients() is expected to be called periodically
432+
// and next invocation will clean up the empty unique_ptr once teh associated client disconnect callback is invoked
433+
if (count() > maxClients) {
434+
_clients.front()->close();
435+
}
436+
}
437+
406438
// pmb fix
407439
size_t AsyncEventSource::avgPacketsWaiting() const {
408440
size_t aql = 0;
409441
uint32_t nConnectedClients = 0;
410442
#ifdef ESP32
411443
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
412444
#endif
413-
if (!_clients.size()) {
414-
return 0;
415-
}
416-
417445
for (const auto &c : _clients) {
418-
if (c->connected()) {
446+
if (c.get() != nullptr && c->connected()) {
419447
aql += c->packetsWaiting();
420448
++nConnectedClients;
421449
}
422450
}
423-
return ((aql) + (nConnectedClients / 2)) / (nConnectedClients); // round up
451+
return nConnectedClients == 0 ? 0 : ((aql) + (nConnectedClients / 2)) / (nConnectedClients); // round up
424452
}
425453

426454
AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect) {
@@ -431,10 +459,12 @@ AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const c
431459
size_t hits = 0;
432460
size_t miss = 0;
433461
for (const auto &c : _clients) {
434-
if (c->write(shared_msg)) {
435-
++hits;
436-
} else {
437-
++miss;
462+
if (c.get() != nullptr && c->connected()) {
463+
if (c->write(shared_msg)) {
464+
++hits;
465+
} else {
466+
++miss;
467+
}
438468
}
439469
}
440470
return hits == 0 ? DISCARDED : (miss == 0 ? ENQUEUED : PARTIALLY_ENQUEUED);
@@ -446,7 +476,7 @@ size_t AsyncEventSource::count() const {
446476
#endif
447477
size_t n_clients{0};
448478
for (const auto &i : _clients) {
449-
if (i->connected()) {
479+
if (i.get() != nullptr && i->connected()) {
450480
++n_clients;
451481
}
452482
}
@@ -462,11 +492,15 @@ void AsyncEventSource::handleRequest(AsyncWebServerRequest *request) {
462492
request->send(new AsyncEventSourceResponse(this));
463493
}
464494

495+
// list iteration protected by caller's lock
465496
void AsyncEventSource::_adjust_inflight_window() {
466-
if (_clients.size()) {
467-
size_t inflight = SSE_MAX_INFLIGH / _clients.size();
497+
const size_t clientCount = count();
498+
if (clientCount) {
499+
size_t inflight = SSE_MAX_INFLIGH / clientCount;
468500
for (const auto &c : _clients) {
469-
c->set_max_inflight_bytes(inflight);
501+
if (c.get() != nullptr && c->connected()) {
502+
c->set_max_inflight_bytes(inflight);
503+
}
470504
}
471505
// Serial.printf("adjusted inflight to: %u\n", inflight);
472506
}

src/AsyncEventSource.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@
4747
#include <memory>
4848
#include <utility>
4949

50+
#ifndef DEFAULT_MAX_SSE_CLIENTS
51+
#define DEFAULT_MAX_SSE_CLIENTS UINT32_MAX
52+
#endif
53+
5054
class AsyncEventSource;
5155
class AsyncEventSourceResponse;
5256
class AsyncEventSourceClient;
@@ -272,6 +276,8 @@ class AsyncEventSource : public AsyncWebHandler {
272276
// close all connected clients
273277
void close();
274278

279+
void cleanupClients(uint32_t maxClients = DEFAULT_MAX_SSE_CLIENTS);
280+
275281
/**
276282
* @brief set on-connect callback for the client
277283
* used to deliver messages to client on first connect

0 commit comments

Comments
 (0)