|
20 | 20 | #include "remote/zone.hpp"
|
21 | 21 | #include "remote/zone-ti.cpp"
|
22 | 22 | #include "remote/jsonrpcconnection.hpp"
|
| 23 | +#include "base/perfdatavalue.hpp" |
23 | 24 | #include "base/objectlock.hpp"
|
24 | 25 | #include "base/logger.hpp"
|
| 26 | +#include "base/statsfunction.hpp" |
| 27 | +#include <limits> |
25 | 28 |
|
26 | 29 | using namespace icinga;
|
27 | 30 |
|
28 | 31 | REGISTER_TYPE(Zone);
|
29 | 32 |
|
| 33 | +REGISTER_STATSFUNCTION(Zone, &Zone::StatsFunc); |
| 34 | + |
30 | 35 | void Zone::OnAllConfigLoaded()
|
31 | 36 | {
|
32 | 37 | ObjectImpl<Zone>::OnAllConfigLoaded();
|
@@ -147,6 +152,163 @@ Zone::Ptr Zone::GetLocalZone()
|
147 | 152 | return local->GetZone();
|
148 | 153 | }
|
149 | 154 |
|
| 155 | +static std::set<String> l_StatsFuncAggregateSum ({ |
| 156 | + "messages_sent_per_second", "messages_received_per_second", "bytes_sent_per_second", "bytes_received_per_second" |
| 157 | +}); |
| 158 | + |
| 159 | +static std::set<String> l_StatsFuncAggregateCount ({ |
| 160 | + "connecting", "syncing", "connected" |
| 161 | +}); |
| 162 | + |
| 163 | +static std::set<String> l_StatsFuncAggregateMin ({ |
| 164 | + "last_message_sent", "last_message_received" |
| 165 | +}); |
| 166 | + |
| 167 | +void Zone::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) |
| 168 | +{ |
| 169 | + auto localZone (Zone::GetLocalZone()); |
| 170 | + auto parentZone (localZone->GetParent()); |
| 171 | + auto unorderedZones (ConfigType::GetObjectsByType<Zone>()); |
| 172 | + std::set<Zone::Ptr> zones (unorderedZones.begin(), unorderedZones.end()); |
| 173 | + Dictionary::Ptr ourStatus = new Dictionary; |
| 174 | + auto localEndpoint (Endpoint::GetLocalEndpoint()); |
| 175 | + |
| 176 | + unorderedZones.clear(); |
| 177 | + |
| 178 | + for (auto zone (zones.begin()); zone != zones.end();) { |
| 179 | + if ((*zone)->GetParent() == localZone) { |
| 180 | + ++zone; |
| 181 | + } else { |
| 182 | + zones.erase(zone++); |
| 183 | + } |
| 184 | + } |
| 185 | + |
| 186 | + zones.emplace(localZone); |
| 187 | + |
| 188 | + if (parentZone) |
| 189 | + zones.emplace(parentZone); |
| 190 | + |
| 191 | + for (auto& zone : zones) { |
| 192 | + Dictionary::Ptr endpointStats = new Dictionary({ |
| 193 | + {"local_log_position", (Array::Ptr)new Array}, |
| 194 | + {"remote_log_position", (Array::Ptr)new Array}, |
| 195 | + {"connecting", (Array::Ptr)new Array}, |
| 196 | + {"syncing", (Array::Ptr)new Array}, |
| 197 | + {"connected", (Array::Ptr)new Array}, |
| 198 | + {"last_message_sent", (Array::Ptr)new Array}, |
| 199 | + {"last_message_received", (Array::Ptr)new Array}, |
| 200 | + {"messages_sent_per_second", (Array::Ptr)new Array}, |
| 201 | + {"messages_received_per_second", (Array::Ptr)new Array}, |
| 202 | + {"bytes_sent_per_second", (Array::Ptr)new Array}, |
| 203 | + {"bytes_received_per_second", (Array::Ptr)new Array} |
| 204 | + }); |
| 205 | + |
| 206 | + auto endpoints (zone->GetEndpoints()); |
| 207 | + |
| 208 | + endpoints.erase(localEndpoint); |
| 209 | + |
| 210 | + if (endpoints.empty()) |
| 211 | + continue; |
| 212 | + |
| 213 | + for (auto& endpoint : endpoints) { |
| 214 | + ((Array::Ptr)endpointStats->Get("local_log_position"))->Add(endpoint->GetLocalLogPosition()); |
| 215 | + ((Array::Ptr)endpointStats->Get("remote_log_position"))->Add(endpoint->GetRemoteLogPosition()); |
| 216 | + ((Array::Ptr)endpointStats->Get("connecting"))->Add(endpoint->GetConnecting()); |
| 217 | + ((Array::Ptr)endpointStats->Get("syncing"))->Add(endpoint->GetSyncing()); |
| 218 | + ((Array::Ptr)endpointStats->Get("connected"))->Add(endpoint->GetConnected()); |
| 219 | + ((Array::Ptr)endpointStats->Get("last_message_sent"))->Add(endpoint->GetLastMessageSent()); |
| 220 | + ((Array::Ptr)endpointStats->Get("last_message_received"))->Add(endpoint->GetLastMessageReceived()); |
| 221 | + ((Array::Ptr)endpointStats->Get("messages_sent_per_second"))->Add(endpoint->GetMessagesSentPerSecond()); |
| 222 | + ((Array::Ptr)endpointStats->Get("messages_received_per_second"))->Add(endpoint->GetMessagesReceivedPerSecond()); |
| 223 | + ((Array::Ptr)endpointStats->Get("bytes_sent_per_second"))->Add(endpoint->GetBytesSentPerSecond()); |
| 224 | + ((Array::Ptr)endpointStats->Get("bytes_received_per_second"))->Add(endpoint->GetBytesReceivedPerSecond()); |
| 225 | + } |
| 226 | + |
| 227 | + for (auto& label : l_StatsFuncAggregateSum) { |
| 228 | + auto sum (0.0); |
| 229 | + Array::Ptr values = endpointStats->Get(label); |
| 230 | + ObjectLock valuesLock (values); |
| 231 | + |
| 232 | + for (auto& value : values) { |
| 233 | + sum += value.Get<double>(); |
| 234 | + } |
| 235 | + |
| 236 | + endpointStats->Set(label, sum); |
| 237 | + } |
| 238 | + |
| 239 | + for (auto& label : l_StatsFuncAggregateCount) { |
| 240 | + uintmax_t count = 0; |
| 241 | + Array::Ptr values = endpointStats->Get(label); |
| 242 | + ObjectLock valuesLock (values); |
| 243 | + |
| 244 | + for (auto& value : values) { |
| 245 | + if (value.Get<bool>()) { |
| 246 | + ++count; |
| 247 | + } |
| 248 | + } |
| 249 | + |
| 250 | + endpointStats->Set(label, count); |
| 251 | + } |
| 252 | + |
| 253 | + for (auto& label : l_StatsFuncAggregateMin) { |
| 254 | + auto min (std::numeric_limits<double>::infinity()); |
| 255 | + Array::Ptr values = endpointStats->Get(label); |
| 256 | + ObjectLock valuesLock (values); |
| 257 | + |
| 258 | + for (auto& value : values) { |
| 259 | + auto number (value.Get<double>()); |
| 260 | + |
| 261 | + if (number < min) { |
| 262 | + min = number; |
| 263 | + } |
| 264 | + } |
| 265 | + |
| 266 | + endpointStats->Set(label, min); |
| 267 | + } |
| 268 | + |
| 269 | + { |
| 270 | + auto maxDiff (-std::numeric_limits<double>::infinity()); |
| 271 | + Array::Ptr remoteLogPositions = endpointStats->Get("remote_log_position"); |
| 272 | + ObjectLock remoteLogPositionLock (remoteLogPositions); |
| 273 | + auto remoteLogPosition (begin(remoteLogPositions)); |
| 274 | + Array::Ptr localLogPositions = endpointStats->Get("local_log_position"); |
| 275 | + ObjectLock localLogPositionLock (localLogPositions); |
| 276 | + |
| 277 | + for (auto& localLogPosition : localLogPositions) { |
| 278 | + auto diff (localLogPosition - *remoteLogPosition); |
| 279 | + |
| 280 | + if (diff > maxDiff) { |
| 281 | + maxDiff = diff; |
| 282 | + } |
| 283 | + |
| 284 | + ++remoteLogPosition; |
| 285 | + } |
| 286 | + |
| 287 | + endpointStats->Set("client_log_lag", maxDiff); |
| 288 | + endpointStats->Remove("local_log_position"); |
| 289 | + endpointStats->Remove("remote_log_position"); |
| 290 | + } |
| 291 | + |
| 292 | + ourStatus->Set(zone->GetName(), endpointStats); |
| 293 | + } |
| 294 | + |
| 295 | + { |
| 296 | + ObjectLock ourStatusLock (ourStatus); |
| 297 | + |
| 298 | + for (auto& nameZoneStatus : ourStatus) { |
| 299 | + Dictionary::Ptr zoneStatus = nameZoneStatus.second; |
| 300 | + ObjectLock zoneStatusLock (zoneStatus); |
| 301 | + auto labelPrefix ("zone_" + nameZoneStatus.first + "_"); |
| 302 | + |
| 303 | + for (auto& labelValue : zoneStatus) { |
| 304 | + perfdata->Add(new PerfdataValue(labelPrefix + labelValue.first, labelValue.second)); |
| 305 | + } |
| 306 | + } |
| 307 | + } |
| 308 | + |
| 309 | + status->Set("zone", ourStatus); |
| 310 | +} |
| 311 | + |
150 | 312 | void Zone::ValidateEndpointsRaw(const Lazy<Array::Ptr>& lvalue, const ValidationUtils& utils)
|
151 | 313 | {
|
152 | 314 | ObjectImpl<Zone>::ValidateEndpointsRaw(lvalue, utils);
|
|
0 commit comments