99#include " icinga/macroprocessor.hpp"
1010#include " icinga/checkcommand.hpp"
1111#include " base/application.hpp"
12- #include " base/defer.hpp"
13- #include " base/io-engine.hpp"
14- #include " base/tcpsocket.hpp"
1512#include " base/stream.hpp"
1613#include " base/base64.hpp"
1714#include " base/json.hpp"
1815#include " base/utility.hpp"
19- #include " base/networkstream.hpp"
2016#include " base/perfdatavalue.hpp"
2117#include " base/exception.hpp"
2218#include " base/statsfunction.hpp"
2319#include < boost/algorithm/string.hpp>
24- #include < boost/asio/ssl/context.hpp>
25- #include < boost/beast/core/flat_buffer.hpp>
26- #include < boost/beast/http/field.hpp>
27- #include < boost/beast/http/message.hpp>
28- #include < boost/beast/http/parser.hpp>
29- #include < boost/beast/http/read.hpp>
30- #include < boost/beast/http/status.hpp>
31- #include < boost/beast/http/string_body.hpp>
32- #include < boost/beast/http/verb.hpp>
33- #include < boost/beast/http/write.hpp>
34- #include < boost/scoped_array.hpp>
35- #include < memory>
3620#include < string>
3721#include < utility>
3822
@@ -82,8 +66,6 @@ void ElasticsearchWriter::Resume()
8266{
8367 ObjectImpl<ElasticsearchWriter>::Resume ();
8468
85- m_EventPrefix = " icinga2.event." ;
86-
8769 Log (LogInformation, " ElasticsearchWriter" )
8870 << " '" << GetName () << " ' resumed." ;
8971
@@ -96,6 +78,19 @@ void ElasticsearchWriter::Resume()
9678 m_FlushTimer->Start ();
9779 m_FlushTimer->Reschedule (0 );
9880
81+ Shared<boost::asio::ssl::context>::Ptr sslContext;
82+ if (GetEnableTls ()) {
83+ try {
84+ sslContext = MakeAsioSslContext (GetCertPath (), GetKeyPath (), GetCaPath ());
85+ } catch (const std::exception& ex) {
86+ Log (LogWarning, " ElasticsearchWriter" )
87+ << " Unable to create SSL context." ;
88+ throw ;
89+ }
90+ }
91+
92+ m_Connection = new PerfdataWriterConnection{GetName (), GetHost (), GetPort (), sslContext};
93+
9994 /* Register for new metrics. */
10095 m_HandleCheckResults = Checkable::OnNewCheckResult.connect ([this ](const Checkable::Ptr& checkable,
10196 const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
@@ -120,12 +115,19 @@ void ElasticsearchWriter::Pause()
120115 m_HandleNotifications.disconnect ();
121116
122117 m_FlushTimer->Stop (true );
123- m_WorkQueue.Join ();
124118
125- {
126- std::unique_lock<std::mutex> lock (m_DataBufferMutex);
119+ std::promise< void > queueDonePromise;
120+ m_WorkQueue. Enqueue ([&]() {
127121 Flush ();
128- }
122+ queueDonePromise.set_value ();
123+ }, PriorityLow);
124+
125+ auto timeout = std::chrono::duration<double >{GetDisconnectTimeout ()};
126+ m_Connection->CancelAfterTimeout (queueDonePromise.get_future (), timeout);
127+
128+ m_WorkQueue.Join ();
129+
130+ m_Connection->Disconnect ();
129131
130132 Log (LogInformation, " ElasticsearchWriter" )
131133 << " '" << GetName () << " ' paused." ;
@@ -379,15 +381,10 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String&
379381{
380382 AssertOnWorkQueue ();
381383
382- /* Atomically buffer the data point. */
383- std::unique_lock<std::mutex> lock (m_DataBufferMutex);
384-
385384 /* Format the timestamps to dynamically select the date datatype inside the index. */
386385 fields->Set (" @timestamp" , FormatTimestamp (ts));
387386 fields->Set (" timestamp" , FormatTimestamp (ts));
388-
389- String eventType = m_EventPrefix + type;
390- fields->Set (" type" , eventType);
387+ fields->Set (" type" , " icinga2.event." + type);
391388
392389 /* Every payload needs a line describing the index.
393390 * We do it this way to avoid problems with a near full queue.
@@ -408,19 +405,19 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String&
408405 }
409406}
410407
408+ /* *
409+ * Queues a Flush on the work-queue if there isn't one queued already.
410+ */
411411void ElasticsearchWriter::FlushTimeout ()
412412{
413- /* Prevent new data points from being added to the array, there is a
414- * race condition where they could disappear.
415- */
416- std::unique_lock<std::mutex> lock (m_DataBufferMutex);
413+ if (m_FlushTimerInQueue.load (std::memory_order_relaxed)) {
414+ return ;
415+ }
417416
418- /* Flush if there are any data available. */
419- if (m_DataBuffer.size () > 0 ) {
420- Log (LogDebug, " ElasticsearchWriter" )
421- << " Timer expired writing " << m_DataBuffer.size () << " data points" ;
417+ m_WorkQueue.Enqueue ([&]() {
422418 Flush ();
423- }
419+ m_FlushTimerInQueue.store (false , std::memory_order_relaxed);
420+ });
424421}
425422
426423void ElasticsearchWriter::Flush ()
@@ -466,22 +463,6 @@ void ElasticsearchWriter::SendRequest(const String& body)
466463
467464 url->SetPath (path);
468465
469- OptionalTlsStream stream;
470-
471- try {
472- stream = Connect ();
473- } catch (const std::exception& ex) {
474- Log (LogWarning, " ElasticsearchWriter" )
475- << " Flush failed, cannot connect to Elasticsearch: " << DiagnosticInformation (ex, false );
476- return ;
477- }
478-
479- Defer s ([&stream]() {
480- if (stream.first ) {
481- stream.first ->next_layer ().shutdown ();
482- }
483- });
484-
485466 http::request<http::string_body> request (http::verb::post , std::string (url->Format (true )), 10 );
486467
487468 request.set (http::field::user_agent, " Icinga/" + Application::GetAppVersion ());
@@ -511,37 +492,14 @@ void ElasticsearchWriter::SendRequest(const String& body)
511492 << " Sending " << request.method_string () << " request" << ((!username.IsEmpty () && !password.IsEmpty ()) ? " with basic auth" : " " )
512493 << " to '" << url->Format () << " '." ;
513494
495+ decltype (m_Connection->Send (request)) response;
514496 try {
515- if (stream.first ) {
516- http::write (*stream.first , request);
517- stream.first ->flush ();
518- } else {
519- http::write (*stream.second , request);
520- stream.second ->flush ();
521- }
522- } catch (const std::exception&) {
523- Log (LogWarning, " ElasticsearchWriter" )
524- << " Cannot write to HTTP API on host '" << GetHost () << " ' port '" << GetPort () << " '." ;
525- throw ;
526- }
527-
528- http::parser<false , http::string_body> parser;
529- beast::flat_buffer buf;
530-
531- try {
532- if (stream.first ) {
533- http::read (*stream.first , buf, parser);
534- } else {
535- http::read (*stream.second , buf, parser);
536- }
537- } catch (const std::exception& ex) {
538- Log (LogWarning, " ElasticsearchWriter" )
539- << " Failed to parse HTTP response from host '" << GetHost () << " ' port '" << GetPort () << " ': " << DiagnosticInformation (ex, false );
540- throw ;
497+ response = m_Connection->Send (request);
498+ } catch (const PerfdataWriterConnection::Stopped& ex) {
499+ Log (LogDebug, " ElasticsearchWriter" ) << ex.what ();
500+ return ;
541501 }
542502
543- auto & response (parser.get ());
544-
545503 if (response.result_int () > 299 ) {
546504 if (response.result () == http::status::unauthorized) {
547505 /* More verbose error logging with Elasticsearch is hidden behind a proxy. */
@@ -589,66 +547,6 @@ void ElasticsearchWriter::SendRequest(const String& body)
589547 }
590548}
591549
592- OptionalTlsStream ElasticsearchWriter::Connect ()
593- {
594- Log (LogNotice, " ElasticsearchWriter" )
595- << " Connecting to Elasticsearch on host '" << GetHost () << " ' port '" << GetPort () << " '." ;
596-
597- OptionalTlsStream stream;
598- bool tls = GetEnableTls ();
599-
600- if (tls) {
601- Shared<boost::asio::ssl::context>::Ptr sslContext;
602-
603- try {
604- sslContext = MakeAsioSslContext (GetCertPath (), GetKeyPath (), GetCaPath ());
605- } catch (const std::exception&) {
606- Log (LogWarning, " ElasticsearchWriter" )
607- << " Unable to create SSL context." ;
608- throw ;
609- }
610-
611- stream.first = Shared<AsioTlsStream>::Make (IoEngine::Get ().GetIoContext (), *sslContext, GetHost ());
612-
613- } else {
614- stream.second = Shared<AsioTcpStream>::Make (IoEngine::Get ().GetIoContext ());
615- }
616-
617- try {
618- icinga::Connect (tls ? stream.first ->lowest_layer () : stream.second ->lowest_layer (), GetHost (), GetPort ());
619- } catch (const std::exception&) {
620- Log (LogWarning, " ElasticsearchWriter" )
621- << " Can't connect to Elasticsearch on host '" << GetHost () << " ' port '" << GetPort () << " '." ;
622- throw ;
623- }
624-
625- if (tls) {
626- auto & tlsStream (stream.first ->next_layer ());
627-
628- try {
629- tlsStream.handshake (tlsStream.client );
630- } catch (const std::exception&) {
631- Log (LogWarning, " ElasticsearchWriter" )
632- << " TLS handshake with host '" << GetHost () << " ' on port " << GetPort () << " failed." ;
633- throw ;
634- }
635-
636- if (!GetInsecureNoverify ()) {
637- if (!tlsStream.GetPeerCertificate ()) {
638- BOOST_THROW_EXCEPTION (std::runtime_error (" Elasticsearch didn't present any TLS certificate." ));
639- }
640-
641- if (!tlsStream.IsVerifyOK ()) {
642- BOOST_THROW_EXCEPTION (std::runtime_error (
643- " TLS certificate validation failed: " + std::string (tlsStream.GetVerifyError ())
644- ));
645- }
646- }
647- }
648-
649- return stream;
650- }
651-
652550void ElasticsearchWriter::AssertOnWorkQueue ()
653551{
654552 ASSERT (m_WorkQueue.IsWorkerThread ());
0 commit comments