diff --git a/lib/remote/CMakeLists.txt b/lib/remote/CMakeLists.txt index 2271abff6c..2da4ae7679 100644 --- a/lib/remote/CMakeLists.txt +++ b/lib/remote/CMakeLists.txt @@ -37,6 +37,7 @@ set(remote_SOURCES modifyobjecthandler.cpp modifyobjecthandler.hpp objectqueryhandler.cpp objectqueryhandler.hpp pkiutility.cpp pkiutility.hpp + statsreporter.cpp statsreporter.hpp statushandler.cpp statushandler.hpp templatequeryhandler.cpp templatequeryhandler.hpp typequeryhandler.cpp typequeryhandler.hpp diff --git a/lib/remote/endpoint.cpp b/lib/remote/endpoint.cpp index e534fc1784..7b6135d97e 100644 --- a/lib/remote/endpoint.cpp +++ b/lib/remote/endpoint.cpp @@ -5,10 +5,12 @@ #include "remote/apilistener.hpp" #include "remote/jsonrpcconnection.hpp" #include "remote/zone.hpp" +#include "base/perfdatavalue.hpp" #include "base/configtype.hpp" #include "base/utility.hpp" #include "base/exception.hpp" #include "base/convert.hpp" +#include "base/statsfunction.hpp" using namespace icinga; @@ -17,6 +19,8 @@ REGISTER_TYPE(Endpoint); boost::signals2::signal Endpoint::OnConnected; boost::signals2::signal Endpoint::OnDisconnected; +REGISTER_STATSFUNCTION(Endpoint, &Endpoint::StatsFunc); + void Endpoint::OnAllConfigLoaded() { ObjectImpl::OnAllConfigLoaded(); @@ -101,6 +105,70 @@ Endpoint::Ptr Endpoint::GetLocalEndpoint() return listener->GetLocalEndpoint(); } +void Endpoint::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) +{ + auto localZone (Zone::GetLocalZone()); + auto parentZone (localZone->GetParent()); + auto unorderedZones (ConfigType::GetObjectsByType()); + std::set zones (unorderedZones.begin(), unorderedZones.end()); + std::set endpoints; + Dictionary::Ptr ourStatus = new Dictionary; + + unorderedZones.clear(); + + for (auto zone (zones.begin()); zone != zones.end();) { + if ((*zone)->GetParent() == localZone) { + ++zone; + } else { + zones.erase(zone++); + } + } + + zones.emplace(localZone); + + if (parentZone) + zones.emplace(parentZone); + + for (auto& zone : zones) { + auto zoneEndpoints (zone->GetEndpoints()); + endpoints.insert(zoneEndpoints.begin(), zoneEndpoints.end()); + } + + endpoints.erase(GetLocalEndpoint()); + + for (auto& endpoint : endpoints) { + ourStatus->Set(endpoint->GetName(), new Dictionary({ + {"local_log_position", endpoint->GetLocalLogPosition()}, + {"remote_log_position", endpoint->GetRemoteLogPosition()}, + {"connecting", endpoint->GetConnecting()}, + {"syncing", endpoint->GetSyncing()}, + {"connected", endpoint->GetConnected()}, + {"last_message_sent", endpoint->GetLastMessageSent()}, + {"last_message_received", endpoint->GetLastMessageReceived()}, + {"messages_sent_per_second", endpoint->GetMessagesSentPerSecond()}, + {"messages_received_per_second", endpoint->GetMessagesReceivedPerSecond()}, + {"bytes_sent_per_second", endpoint->GetBytesSentPerSecond()}, + {"bytes_received_per_second", endpoint->GetBytesReceivedPerSecond()} + })); + } + + { + ObjectLock ourStatusLock (ourStatus); + + for (auto& nameEndpointStatus : ourStatus) { + Dictionary::Ptr endpointStatus = nameEndpointStatus.second; + ObjectLock endpointStatusLock (endpointStatus); + auto labelPrefix ("endpoint_" + nameEndpointStatus.first + "_"); + + for (auto& labelValue : endpointStatus) { + perfdata->Add(new PerfdataValue(labelPrefix + labelValue.first, labelValue.second)); + } + } + } + + status->Set("endpoint", ourStatus); +} + void Endpoint::AddMessageSent(int bytes) { double time = Utility::GetTime(); diff --git a/lib/remote/endpoint.hpp b/lib/remote/endpoint.hpp index d641c2c6b8..fe7f3bf63a 100644 --- a/lib/remote/endpoint.hpp +++ b/lib/remote/endpoint.hpp @@ -5,6 +5,8 @@ #include "remote/i2-remote.hpp" #include "remote/endpoint-ti.hpp" +#include "base/array.hpp" +#include "base/dictionary.hpp" #include "base/ringbuffer.hpp" #include @@ -37,6 +39,7 @@ class Endpoint final : public ObjectImpl bool GetConnected() const override; static Endpoint::Ptr GetLocalEndpoint(); + static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); void SetCachedZone(const intrusive_ptr& zone); diff --git a/lib/remote/statsreporter.cpp b/lib/remote/statsreporter.cpp new file mode 100644 index 0000000000..77225bd407 --- /dev/null +++ b/lib/remote/statsreporter.cpp @@ -0,0 +1,195 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012-2018 Icinga Development Team (https://www.icinga.com/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#include "base/application.hpp" +#include "base/array.hpp" +#include "base/configtype.hpp" +#include "base/dictionary.hpp" +#include "base/function.hpp" +#include "base/objectlock.hpp" +#include "base/scriptglobal.hpp" +#include "base/statsfunction.hpp" +#include "base/utility.hpp" +#include "base/value.hpp" +#include "remote/apifunction.hpp" +#include "remote/endpoint.hpp" +#include "remote/messageorigin.hpp" +#include "remote/statsreporter.hpp" +#include "remote/zone.hpp" +#include +#include + +using namespace icinga; + +REGISTER_APIFUNCTION(ClusterStats, event, &StatsReporter::ClusterStatsAPIHandler); + +REGISTER_STATSFUNCTION(ClusterStats, &StatsReporter::StatsFunc); + +StatsReporter StatsReporter::m_Instance; + +StatsReporter::StatsReporter() +{ + Endpoint::OnConnected.connect([this](const Endpoint::Ptr& endpoint, const intrusive_ptr&) { + OnConnected(endpoint); + }); +} + +void StatsReporter::OnConnected(const Endpoint::Ptr& endpoint) +{ + if (!m_HasBeenInitialized.test_and_set()) { + timer = Timer::Create(); + timer->OnTimerExpired.connect([this](const Timer * const&) { ReportStats(); }); + timer->SetInterval(10); + timer->Start(); + timer->Reschedule(1); + } +} + +void StatsReporter::ReportStats() +{ + auto parent (Zone::GetLocalZone()->GetParent()); + + if (parent) { + Dictionary::Ptr message; + + for (auto& endpoint : parent->GetEndpoints()) { + for (auto& client : endpoint->GetClients()) { + if (!message) { + message = new Dictionary({ + { "jsonrpc", "2.0" }, + { "method", "event::ClusterStats" }, + { "params", new Dictionary({ + { "stats", GenerateStats() } + }) } + }); + } + + client->SendMessage(message); + + break; + } + } + } +} + +Dictionary::Ptr StatsReporter::GenerateStats() +{ + auto allStats (new Dictionary); + + { + boost::mutex::scoped_lock lock (m_Mutex); + + for (auto& endpointStats : m_SecondaryStats) { + allStats->Set(endpointStats.first, endpointStats.second); + } + } + + auto localZone (Zone::GetLocalZone()); + auto parentZone (localZone->GetParent()); + auto unorderedZones (ConfigType::GetObjectsByType()); + std::set zones (unorderedZones.begin(), unorderedZones.end()); + std::set endpoints; + auto ourStatus (new Dictionary); + auto now (Utility::GetTime()); + + unorderedZones.clear(); + + for (auto zone (zones.begin()); zone != zones.end();) { + if ((*zone)->GetParent() == localZone) { + ++zone; + } else { + zones.erase(zone++); + } + } + + zones.emplace(localZone); + + if (parentZone) + zones.emplace(parentZone); + + for (auto& zone : zones) { + auto zoneEndpoints (zone->GetEndpoints()); + endpoints.insert(zoneEndpoints.begin(), zoneEndpoints.end()); + } + + endpoints.erase(Endpoint::GetLocalEndpoint()); + + for (auto& endpoint : endpoints) { + ourStatus->Set(endpoint->GetName(), new Dictionary({ + {"connected", endpoint->GetConnected()}, + {"last_message_received", endpoint->GetLastMessageReceived()} + })); + } + + allStats->Set(Endpoint::GetLocalEndpoint()->GetName(), new Dictionary({ + {"mtime", now}, + {"version", Application::GetAppVersion()}, + {"uptime", now - Application::GetStartTime()}, + {"endpoints", ourStatus} + })); + + return allStats; +} + +Value StatsReporter::ClusterStatsAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) +{ + auto endpoint (origin->FromClient->GetEndpoint()); + + if (endpoint && endpoint->GetZone()->IsChildOf(Zone::GetLocalZone())) { + auto rawStats (params->Get("stats")); + + if (rawStats.IsObject()) { + Dictionary::Ptr allStats (rawStats); + + if (allStats) { + // Don't permit any child to speak in our zone's name + std::set neighborhood; + + for (auto& endpoint : Zone::GetLocalZone()->GetEndpoints()) { + neighborhood.emplace(endpoint->GetName()); + } + + ObjectLock lock (allStats); + + for (auto& endpointStats : allStats) { + if (endpointStats.second.IsObject()) { + Dictionary::Ptr stats (endpointStats.second); + + if (stats && neighborhood.find(endpointStats.first) == neighborhood.end()) { + m_Instance.ClusterStatsHandler(endpointStats.first, endpointStats.second); + } + } + } + } + } + } + + return Empty; +} + +void StatsReporter::ClusterStatsHandler(const String& endpoint, const Dictionary::Ptr& stats) +{ + boost::mutex::scoped_lock lock (m_Mutex); + m_SecondaryStats[endpoint] = stats; +} + +void StatsReporter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) +{ + status->Set("cluster", m_Instance.GenerateStats()); +} diff --git a/lib/remote/statsreporter.hpp b/lib/remote/statsreporter.hpp new file mode 100644 index 0000000000..42d77370ab --- /dev/null +++ b/lib/remote/statsreporter.hpp @@ -0,0 +1,65 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012-2018 Icinga Development Team (https://www.icinga.com/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#ifndef STATSREPORTER_H +#define STATSREPORTER_H + +#include "base/array.hpp" +#include "base/dictionary.hpp" +#include "base/string.hpp" +#include "base/timer.hpp" +#include "base/value.hpp" +#include "remote/endpoint.hpp" +#include "remote/messageorigin.hpp" +#include +#include +#include + +namespace icinga +{ + +/** +* @ingroup remote +*/ +class StatsReporter +{ +public: + static Value ClusterStatsAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); + +private: + StatsReporter(); + + void OnConnected(const Endpoint::Ptr& endpoint); + void ReportStats(); + Dictionary::Ptr GenerateStats(); + void ClusterStatsHandler(const String& endpoint, const Dictionary::Ptr& stats); + + static StatsReporter m_Instance; + + std::atomic_flag m_HasBeenInitialized = ATOMIC_FLAG_INIT; + Timer::Ptr timer; + + boost::mutex m_Mutex; + std::map m_SecondaryStats; +}; + +} + +#endif /* STATSREPORTER_H */ diff --git a/lib/remote/zone.cpp b/lib/remote/zone.cpp index 5ae1468c16..3c974054ee 100644 --- a/lib/remote/zone.cpp +++ b/lib/remote/zone.cpp @@ -4,13 +4,18 @@ #include "remote/zone-ti.cpp" #include "remote/jsonrpcconnection.hpp" #include "base/array.hpp" +#include "base/perfdatavalue.hpp" #include "base/objectlock.hpp" #include "base/logger.hpp" +#include "base/statsfunction.hpp" +#include using namespace icinga; REGISTER_TYPE(Zone); +REGISTER_STATSFUNCTION(Zone, &Zone::StatsFunc); + void Zone::OnAllConfigLoaded() { ObjectImpl::OnAllConfigLoaded(); @@ -141,6 +146,163 @@ Zone::Ptr Zone::GetLocalZone() return local->GetZone(); } +static std::set l_StatsFuncAggregateSum ({ + "messages_sent_per_second", "messages_received_per_second", "bytes_sent_per_second", "bytes_received_per_second" +}); + +static std::set l_StatsFuncAggregateCount ({ + "connecting", "syncing", "connected" +}); + +static std::set l_StatsFuncAggregateMin ({ + "last_message_sent", "last_message_received" +}); + +void Zone::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) +{ + auto localZone (Zone::GetLocalZone()); + auto parentZone (localZone->GetParent()); + auto unorderedZones (ConfigType::GetObjectsByType()); + std::set zones (unorderedZones.begin(), unorderedZones.end()); + Dictionary::Ptr ourStatus = new Dictionary; + auto localEndpoint (Endpoint::GetLocalEndpoint()); + + unorderedZones.clear(); + + for (auto zone (zones.begin()); zone != zones.end();) { + if ((*zone)->GetParent() == localZone) { + ++zone; + } else { + zones.erase(zone++); + } + } + + zones.emplace(localZone); + + if (parentZone) + zones.emplace(parentZone); + + for (auto& zone : zones) { + Dictionary::Ptr endpointStats = new Dictionary({ + {"local_log_position", new Array}, + {"remote_log_position", new Array}, + {"connecting", new Array}, + {"syncing", new Array}, + {"connected", new Array}, + {"last_message_sent", new Array}, + {"last_message_received", new Array}, + {"messages_sent_per_second", new Array}, + {"messages_received_per_second", new Array}, + {"bytes_sent_per_second", new Array}, + {"bytes_received_per_second", new Array} + }); + + auto endpoints (zone->GetEndpoints()); + + endpoints.erase(localEndpoint); + + if (endpoints.empty()) + continue; + + for (auto& endpoint : endpoints) { + ((Array::Ptr)endpointStats->Get("local_log_position"))->Add(endpoint->GetLocalLogPosition()); + ((Array::Ptr)endpointStats->Get("remote_log_position"))->Add(endpoint->GetRemoteLogPosition()); + ((Array::Ptr)endpointStats->Get("connecting"))->Add(endpoint->GetConnecting()); + ((Array::Ptr)endpointStats->Get("syncing"))->Add(endpoint->GetSyncing()); + ((Array::Ptr)endpointStats->Get("connected"))->Add(endpoint->GetConnected()); + ((Array::Ptr)endpointStats->Get("last_message_sent"))->Add(endpoint->GetLastMessageSent()); + ((Array::Ptr)endpointStats->Get("last_message_received"))->Add(endpoint->GetLastMessageReceived()); + ((Array::Ptr)endpointStats->Get("messages_sent_per_second"))->Add(endpoint->GetMessagesSentPerSecond()); + ((Array::Ptr)endpointStats->Get("messages_received_per_second"))->Add(endpoint->GetMessagesReceivedPerSecond()); + ((Array::Ptr)endpointStats->Get("bytes_sent_per_second"))->Add(endpoint->GetBytesSentPerSecond()); + ((Array::Ptr)endpointStats->Get("bytes_received_per_second"))->Add(endpoint->GetBytesReceivedPerSecond()); + } + + for (auto& label : l_StatsFuncAggregateSum) { + auto sum (0.0); + Array::Ptr values = endpointStats->Get(label); + ObjectLock valuesLock (values); + + for (auto& value : values) { + sum += value.Get(); + } + + endpointStats->Set(label, sum); + } + + for (auto& label : l_StatsFuncAggregateCount) { + uintmax_t count = 0; + Array::Ptr values = endpointStats->Get(label); + ObjectLock valuesLock (values); + + for (auto& value : values) { + if (value.Get()) { + ++count; + } + } + + endpointStats->Set(label, count); + } + + for (auto& label : l_StatsFuncAggregateMin) { + auto min (std::numeric_limits::infinity()); + Array::Ptr values = endpointStats->Get(label); + ObjectLock valuesLock (values); + + for (auto& value : values) { + auto number (value.Get()); + + if (number < min) { + min = number; + } + } + + endpointStats->Set(label, min); + } + + { + auto maxDiff (-std::numeric_limits::infinity()); + Array::Ptr remoteLogPositions = endpointStats->Get("remote_log_position"); + ObjectLock remoteLogPositionLock (remoteLogPositions); + auto remoteLogPosition (begin(remoteLogPositions)); + Array::Ptr localLogPositions = endpointStats->Get("local_log_position"); + ObjectLock localLogPositionLock (localLogPositions); + + for (auto& localLogPosition : localLogPositions) { + auto diff (localLogPosition - *remoteLogPosition); + + if (diff > maxDiff) { + maxDiff = diff; + } + + ++remoteLogPosition; + } + + endpointStats->Set("client_log_lag", maxDiff); + endpointStats->Remove("local_log_position"); + endpointStats->Remove("remote_log_position"); + } + + ourStatus->Set(zone->GetName(), endpointStats); + } + + { + ObjectLock ourStatusLock (ourStatus); + + for (auto& nameZoneStatus : ourStatus) { + Dictionary::Ptr zoneStatus = nameZoneStatus.second; + ObjectLock zoneStatusLock (zoneStatus); + auto labelPrefix ("zone_" + nameZoneStatus.first + "_"); + + for (auto& labelValue : zoneStatus) { + perfdata->Add(new PerfdataValue(labelPrefix + labelValue.first, labelValue.second)); + } + } + } + + status->Set("zone", ourStatus); +} + void Zone::ValidateEndpointsRaw(const Lazy& lvalue, const ValidationUtils& utils) { ObjectImpl::ValidateEndpointsRaw(lvalue, utils); diff --git a/lib/remote/zone.hpp b/lib/remote/zone.hpp index 897b18e96c..1f14d1773b 100644 --- a/lib/remote/zone.hpp +++ b/lib/remote/zone.hpp @@ -3,6 +3,8 @@ #ifndef ZONE_H #define ZONE_H +#include "base/array.hpp" +#include "base/dictionary.hpp" #include "remote/i2-remote.hpp" #include "remote/zone-ti.hpp" #include "remote/endpoint.hpp" @@ -32,6 +34,7 @@ class Zone final : public ObjectImpl bool IsSingleInstance() const; static Zone::Ptr GetLocalZone(); + static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); protected: void ValidateEndpointsRaw(const Lazy& lvalue, const ValidationUtils& utils) override;