Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions src/core/DescriptorEventReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

#include "log/Logger.h"

#include <exception>
#include <climits>

#endif /* DOXYGEN_SHOULD_SKIP_THIS */
Expand Down Expand Up @@ -160,19 +161,48 @@ namespace core {

void DescriptorEventReceiver::checkTimeout(const utils::Timeval& currentTime) {
if (maxInactivity > 0 && currentTime - lastTriggered >= maxInactivity) {
timeoutEvent();
try {
timeoutEvent();
} catch (const std::exception& ex) {
LOG(ERROR) << getName() << ": Unhandled exception in timeout handler: " << ex.what();
onEventError();
} catch (...) {
LOG(ERROR) << getName() << ": Unhandled unknown exception in timeout handler";
onEventError();
}
}
}

void DescriptorEventReceiver::onEvent(const utils::Timeval& currentTime) {
eventCounter++;
triggered(currentTime);

dispatchEvent();
try {
dispatchEvent();
} catch (const std::exception& ex) {
LOG(ERROR) << getName() << ": Unhandled exception in descriptor event handler: " << ex.what();
onEventError();
} catch (...) {
LOG(ERROR) << getName() << ": Unhandled unknown exception in descriptor event handler";
onEventError();
}
}

void DescriptorEventReceiver::onSignal(int signum) {
signalEvent(signum);
try {
signalEvent(signum);
} catch (const std::exception& ex) {
LOG(ERROR) << getName() << ": Unhandled exception in signal handler: " << ex.what();
onEventError();
} catch (...) {
LOG(ERROR) << getName() << ": Unhandled unknown exception in signal handler";
onEventError();
}
}

void DescriptorEventReceiver::onEventError() {
EventReceiver::onEventError();
disable();
}

void DescriptorEventReceiver::triggered(const utils::Timeval& currentTime) {
Expand Down
1 change: 1 addition & 0 deletions src/core/DescriptorEventReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ namespace core {
private:
void onEvent(const utils::Timeval& currentTime) final;
void onSignal(int signum);
void onEventError() override;

void triggered(const utils::Timeval& currentTime);
void setEnabled(const utils::Timeval& currentTime);
Expand Down
14 changes: 13 additions & 1 deletion src/core/Event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@

#ifndef DOXYGEN_SHOULD_SKIP_THIS

#include "log/Logger.h"

#include <exception>

#endif /* DOXYGEN_SHOULD_SKIP_THIS */

namespace core {
Expand Down Expand Up @@ -81,7 +85,15 @@ namespace core {

void Event::dispatch(const utils::Timeval& currentTime) {
published = false;
eventReceiver->onEvent(currentTime);
try {
eventReceiver->onEvent(currentTime);
} catch (const std::exception& ex) {
LOG(ERROR) << eventReceiver->getName() << ": Unhandled exception in event handler: " << ex.what();
eventReceiver->onEventError();
} catch (...) {
LOG(ERROR) << eventReceiver->getName() << ": Unhandled unknown exception in event handler";
eventReceiver->onEventError();
}
}

EventReceiver* Event::getEventReceiver() const {
Expand Down
13 changes: 12 additions & 1 deletion src/core/EventReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ namespace core {
delete this;
}

void onEventError() override {
delete this;
}

private:
std::function<void(void)> callBack;
};
Expand All @@ -77,13 +81,20 @@ namespace core {
}

void EventReceiver::span() {
event.span();
if (!failed) {
event.span();
}
}

void EventReceiver::relax() {
event.relax();
}

void EventReceiver::onEventError() {
failed = true;
relax();
}

const std::string& EventReceiver::getName() const {
return event.getName();
}
Expand Down
2 changes: 2 additions & 0 deletions src/core/EventReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,12 @@ namespace core {
void relax();

virtual void onEvent(const utils::Timeval& currentTime) = 0;
virtual void onEventError();

const std::string& getName() const;

private:
bool failed = false;
Event event;
};

Expand Down
1 change: 1 addition & 0 deletions src/core/socket/stream/SocketConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ namespace core::socket::stream {

protected:
void doWriteShutdown(const std::function<void()>& onShutdown) override;
void onEventError() override;

void onWriteError(int errnum);
void onReadError(int errnum);
Expand Down
6 changes: 6 additions & 0 deletions src/core/socket/stream/SocketConnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,12 @@ namespace core::socket::stream {
onShutdown();
}

template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::onEventError() {
LOG(ERROR) << connectionName << ": Disabling connection due to unhandled exception in event handler";
close();
}

template <typename PhysicalSocket, typename SocketReader, typename SocketWriter, typename Config>
void SocketConnectionT<PhysicalSocket, SocketReader, SocketWriter, Config>::onReceivedFromPeer(std::size_t available) {
std::size_t consumed = socketContext->readFromPeer();
Expand Down
11 changes: 9 additions & 2 deletions src/iot/mqtt/SubProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,20 @@

namespace iot::mqtt {

OnReceivedFromPeerEvent::OnReceivedFromPeerEvent(const std::function<void(const utils::Timeval&)>& onReceivedFromPeer)
OnReceivedFromPeerEvent::OnReceivedFromPeerEvent(const std::function<void(const utils::Timeval&)>& onReceivedFromPeer,
const std::function<void()>& onError)
: core::EventReceiver("WS-OnData")
, onReceivedFromPeer(onReceivedFromPeer) {
, onReceivedFromPeer(onReceivedFromPeer)
, onError(onError) {
}

void OnReceivedFromPeerEvent::onEvent(const utils::Timeval& currentTime) {
onReceivedFromPeer(currentTime);
}

void OnReceivedFromPeerEvent::onEventError() {
core::EventReceiver::onEventError();
onError();
}

} // namespace iot::mqtt
4 changes: 3 additions & 1 deletion src/iot/mqtt/SubProtocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,14 @@ namespace iot::mqtt {

class OnReceivedFromPeerEvent : public core::EventReceiver {
public:
explicit OnReceivedFromPeerEvent(const std::function<void(const utils::Timeval&)>& onReceivedFromPeer);
OnReceivedFromPeerEvent(const std::function<void(const utils::Timeval&)>& onReceivedFromPeer, const std::function<void()>& onError);

private:
void onEvent(const utils::Timeval& currentTime) override;
void onEventError() override;

std::function<void(const utils::Timeval&)> onReceivedFromPeer;
std::function<void()> onError;
};

template <typename WSSubProtocolRoleT>
Expand Down
5 changes: 5 additions & 0 deletions src/iot/mqtt/SubProtocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ namespace iot::mqtt {
buffer.clear();
cursor = 0;
}
},
[this]() {
LOG(ERROR) << getSocketConnection()->getConnectionName()
<< " WsMqtt: closing socket due to exception in MQTT receive processing";
getSocketConnection()->close();
}) {
}

Expand Down