Skip to content

Commit 22e150a

Browse files
Add PerfdataWriterConnection class
1 parent dc08aee commit 22e150a

File tree

3 files changed

+361
-0
lines changed

3 files changed

+361
-0
lines changed

lib/perfdata/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ set(perfdata_SOURCES
1919
influxdb2writer.cpp influxdb2writer.hpp influxdb2writer-ti.hpp
2020
opentsdbwriter.cpp opentsdbwriter.hpp opentsdbwriter-ti.hpp
2121
perfdatawriter.cpp perfdatawriter.hpp perfdatawriter-ti.hpp
22+
perfdatawriterconnection.cpp perfdatawriterconnection.hpp
2223
)
2324

2425
if(ICINGA2_UNITY_BUILD)
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
// SPDX-FileCopyrightText: 2026 Icinga GmbH <https://icinga.com>
2+
// SPDX-License-Identifier: GPL-3.0-or-later
3+
4+
#include "perfdata/perfdatawriterconnection.hpp"
5+
#include "base/io-engine.hpp"
6+
#include "base/tcpsocket.hpp"
7+
#include <boost/asio/read.hpp>
8+
#include <boost/asio/use_future.hpp>
9+
#include <boost/beast/http/read.hpp>
10+
#include <boost/beast/http/write.hpp>
11+
#include <utility>
12+
13+
using namespace icinga;
14+
using HttpResponse = PerfdataWriterConnection::HttpResponse;
15+
16+
PerfdataWriterConnection::PerfdataWriterConnection(
17+
String name,
18+
String host,
19+
String port,
20+
Shared<boost::asio::ssl::context>::Ptr sslContext,
21+
bool verifySecure
22+
)
23+
: m_VerifySecure(verifySecure),
24+
m_SslContext(std::move(sslContext)),
25+
m_Name(std::move(name)),
26+
m_Host(std::move(host)),
27+
m_Port(std::move(port)),
28+
m_DisconnectTimer(IoEngine::Get().GetIoContext()),
29+
m_ReconnectTimer(IoEngine::Get().GetIoContext()),
30+
m_Strand(IoEngine::Get().GetIoContext()),
31+
m_Stream(ResetStream())
32+
{
33+
}
34+
35+
void PerfdataWriterConnection::Send(boost::asio::const_buffer data)
36+
{
37+
if (m_Stopped) {
38+
BOOST_THROW_EXCEPTION(Stopped{});
39+
}
40+
41+
std::promise<void> promise;
42+
43+
IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) {
44+
while (true) {
45+
try {
46+
EnsureConnected(yc);
47+
48+
std::visit(
49+
[&](auto& stream) {
50+
boost::asio::async_write(*stream, data, yc);
51+
stream->async_flush(yc);
52+
},
53+
m_Stream
54+
);
55+
56+
promise.set_value();
57+
return;
58+
} catch (const std::exception& ex) {
59+
if (const auto* se = dynamic_cast<const boost::system::system_error*>(&ex);
60+
se && se->code() == boost::asio::error::operation_aborted) {
61+
promise.set_exception(std::make_exception_ptr(Stopped{}));
62+
return;
63+
}
64+
65+
Log(LogCritical, "PerfdataWriterConnection")
66+
<< "Error while sending to '" << m_Host << ":" << m_Port << "' for '" << m_Name
67+
<< "': " << ex.what();
68+
69+
m_Stream = ResetStream();
70+
m_Connected = false;
71+
}
72+
}
73+
});
74+
75+
promise.get_future().get();
76+
}
77+
78+
HttpResponse PerfdataWriterConnection::Send(HttpRequest& request)
79+
{
80+
if (m_Stopped) {
81+
BOOST_THROW_EXCEPTION(Stopped{});
82+
}
83+
84+
boost::beast::http::response<boost::beast::http::string_body> response;
85+
std::promise<void> promise;
86+
87+
IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) {
88+
while (true) {
89+
try {
90+
EnsureConnected(yc);
91+
92+
std::visit(
93+
[&](auto& stream) {
94+
boost::beast::http::async_write(*stream, request, yc);
95+
stream->async_flush(yc);
96+
97+
boost::beast::flat_buffer buf;
98+
boost::beast::http::async_read(*stream, buf, response, yc);
99+
},
100+
m_Stream
101+
);
102+
103+
if (!response.keep_alive()) {
104+
Disconnect(yc);
105+
}
106+
107+
promise.set_value();
108+
return;
109+
} catch (const Stopped&) {
110+
promise.set_exception(std::current_exception());
111+
return;
112+
} catch (const std::exception& ex) {
113+
if (const auto* se = dynamic_cast<const boost::system::system_error*>(&ex);
114+
se && se->code() == boost::asio::error::operation_aborted) {
115+
promise.set_exception(std::make_exception_ptr(Stopped{}));
116+
return;
117+
}
118+
119+
Log(LogCritical, "PerfdataWriterConnection")
120+
<< "Error while sending to '" << m_Host << ":" << m_Port << "' for '" << m_Name
121+
<< "': " << ex.what();
122+
123+
m_Stream = ResetStream();
124+
m_Connected = false;
125+
}
126+
}
127+
});
128+
129+
promise.get_future().get();
130+
return response;
131+
}
132+
133+
/**
134+
* Get the current state of the connection.
135+
*
136+
* This wraps retrieving the state in boost::asio::post() on the strand instead of making it
137+
* atomic, because the only defined states are the suspension points where the coroutine yields.
138+
*/
139+
bool PerfdataWriterConnection::IsConnected()
140+
{
141+
return boost::asio::post(m_Strand, boost::asio::use_future([&]() { return m_Connected; })).get();
142+
}
143+
144+
void PerfdataWriterConnection::Disconnect()
145+
{
146+
std::promise<void> promise;
147+
148+
IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) {
149+
try {
150+
Disconnect(std::move(yc));
151+
promise.set_value();
152+
} catch (const std::exception& ex) {
153+
promise.set_exception(std::current_exception());
154+
}
155+
});
156+
157+
promise.get_future().get();
158+
}
159+
160+
/**
161+
* Cancel all outstanding operations.
162+
*/
163+
void PerfdataWriterConnection::Cancel()
164+
{
165+
boost::asio::post(m_Strand, [&]() {
166+
try {
167+
m_Stopped = true;
168+
169+
/* Cancel any outstanding operations of the other coroutine.
170+
* Since we're on the same strand we're hopefully guaranteed that all cancellations
171+
* result in exceptions thrown by the yield_context, even if its already queued for
172+
* completion.
173+
*/
174+
std::visit(
175+
[](const auto& stream) {
176+
if (stream->lowest_layer().is_open()) {
177+
stream->lowest_layer().cancel();
178+
}
179+
},
180+
m_Stream
181+
);
182+
m_ReconnectTimer.cancel();
183+
} catch (const std::exception& ex) {
184+
Log(LogCritical, "PerfdataWriterConnection") << "Exception during Cancel: " << ex.what();
185+
}
186+
});
187+
}
188+
189+
AsioTlsOrTcpStream PerfdataWriterConnection::ResetStream()
190+
{
191+
AsioTlsOrTcpStream ret;
192+
if (m_SslContext) {
193+
ret = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *m_SslContext);
194+
} else {
195+
ret = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
196+
}
197+
198+
return ret;
199+
}
200+
201+
void PerfdataWriterConnection::EnsureConnected(boost::asio::yield_context yc)
202+
{
203+
while (!m_Connected) {
204+
if (m_Stopped) {
205+
BOOST_THROW_EXCEPTION(Stopped{});
206+
}
207+
208+
try {
209+
std::visit(
210+
[&](auto& stream) {
211+
::Connect(stream->lowest_layer(), m_Host, m_Port, yc);
212+
213+
if constexpr (std::is_same_v<std::decay_t<decltype(stream)>, Shared<AsioTlsStream>::Ptr>) {
214+
using type = boost::asio::ssl::stream_base::handshake_type;
215+
216+
stream->next_layer().async_handshake(type::client, yc);
217+
218+
if (m_VerifySecure && !stream->next_layer().IsVerifyOK()) {
219+
BOOST_THROW_EXCEPTION(std::runtime_error{"TLS certificate validation failed"});
220+
}
221+
}
222+
},
223+
m_Stream
224+
);
225+
226+
m_Connected = true;
227+
m_RetryTimeout = 1s;
228+
} catch (const std::exception& ex) {
229+
m_Connected = false;
230+
if (const auto* se = dynamic_cast<const boost::system::system_error*>(&ex);
231+
se->code() == boost::asio::error::operation_aborted) {
232+
throw;
233+
}
234+
235+
Log(LogWarning, "PerfdataWriterConnection")
236+
<< "Failed to connect to host '" << m_Host << "' port '" << m_Port << "' for '" << m_Name
237+
<< "': " << ex.what();
238+
239+
m_Stream = ResetStream();
240+
241+
/* Timeout before making another attempt at connecting.
242+
*/
243+
m_ReconnectTimer.expires_after(m_RetryTimeout);
244+
if (m_RetryTimeout < 30s) {
245+
m_RetryTimeout *= 2;
246+
}
247+
m_ReconnectTimer.async_wait(yc);
248+
}
249+
}
250+
}
251+
252+
void PerfdataWriterConnection::Disconnect(boost::asio::yield_context yc)
253+
{
254+
if (!m_Connected) {
255+
return;
256+
}
257+
m_Connected = false;
258+
259+
std::visit(
260+
[&](auto& stream) {
261+
if constexpr (std::is_same_v<std::decay_t<decltype(stream)>, Shared<AsioTlsStream>::Ptr>) {
262+
stream->GracefulDisconnect(m_Strand, yc);
263+
} else {
264+
stream->lowest_layer().shutdown(boost::asio::socket_base::shutdown_both);
265+
stream->lowest_layer().close();
266+
}
267+
},
268+
m_Stream
269+
);
270+
271+
m_Stream = ResetStream();
272+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// SPDX-FileCopyrightText: 2026 Icinga GmbH <https://icinga.com>
2+
// SPDX-License-Identifier: GPL-3.0-or-later
3+
4+
#pragma once
5+
6+
#include "base/tlsstream.hpp"
7+
#include <boost/asio/steady_timer.hpp>
8+
#include <boost/asio/streambuf.hpp>
9+
#include <boost/beast/http/message.hpp>
10+
#include <boost/beast/http/string_body.hpp>
11+
#include <future>
12+
13+
namespace icinga {
14+
15+
/**
16+
* Class handling the connection to the various Perfdata backends.
17+
*/
18+
class PerfdataWriterConnection final : public Object
19+
{
20+
public:
21+
DECLARE_PTR_TYPEDEFS(PerfdataWriterConnection);
22+
23+
struct Stopped : std::exception
24+
{
25+
[[nodiscard]] const char* what() const noexcept final { return "Connection stopped."; }
26+
};
27+
28+
using HttpRequest = boost::beast::http::request<boost::beast::http::string_body>;
29+
using HttpResponse = boost::beast::http::response<boost::beast::http::string_body>;
30+
31+
explicit PerfdataWriterConnection(
32+
String name,
33+
String host,
34+
String port,
35+
Shared<boost::asio::ssl::context>::Ptr sslContext,
36+
bool verifySecure = true
37+
);
38+
39+
void Send(boost::asio::const_buffer data);
40+
HttpResponse Send(HttpRequest& request);
41+
42+
void Disconnect();
43+
44+
/**
45+
* Cancels ongoing operations if the future does not receive a value within the timeout.
46+
*
47+
* This will put the connection into a stopped state where no further Send() requests are
48+
* accepted.
49+
*
50+
* @param future The future to wait for
51+
* @param timeout The timeout after which ongoing operations are canceled
52+
*/
53+
template<class Rep, class Period>
54+
void CancelAfterTimeout(const std::future<void>& future, const std::chrono::duration<Rep, Period>& timeout)
55+
{
56+
auto status = future.wait_for(timeout);
57+
if (status != std::future_status::ready) {
58+
Cancel();
59+
}
60+
}
61+
62+
void Cancel();
63+
64+
bool IsConnected();
65+
66+
private:
67+
AsioTlsOrTcpStream ResetStream();
68+
void EnsureConnected(boost::asio::yield_context yc);
69+
void Disconnect(boost::asio::yield_context yc);
70+
71+
bool m_Stopped{};
72+
bool m_Connected{};
73+
74+
bool m_VerifySecure;
75+
Shared<boost::asio::ssl::context>::Ptr m_SslContext;
76+
77+
String m_Name;
78+
String m_Host;
79+
String m_Port;
80+
81+
std::chrono::milliseconds m_RetryTimeout{1000ms};
82+
boost::asio::steady_timer m_DisconnectTimer;
83+
boost::asio::steady_timer m_ReconnectTimer;
84+
boost::asio::io_context::strand m_Strand;
85+
AsioTlsOrTcpStream m_Stream;
86+
};
87+
88+
} // namespace icinga

0 commit comments

Comments
 (0)