@@ -74,6 +74,8 @@ namespace {
7474const double k_RECONNECT_INTERVAL_MS = 500 ;
7575const int k_RECONNECT_COUNT = bsl::numeric_limits<int >::max();
7676const bsls::Types::Int64 k_CHANNEL_LOW_WATERMARK = 512 * 1024 ;
77+ const int k_DEFAULT_MAX_MISSED_HEARTBEATS = 10 ;
78+ const int k_DEFAULT_HEARTBEAT_INTERVAL_MS = 1000 ;
7779
7880// / Create the StatContextConfiguration to use, from the specified
7981// / `options`, and using the specified `allocator` for memory allocations.
@@ -144,6 +146,8 @@ void Application::onChannelDown(const bsl::string& peerUri,
144146{
145147 // executed by the *IO* thread
146148
149+ stopHeartbeat ();
150+
147151 BALL_LOG_INFO << " Session with '" << peerUri << " ' is now DOWN"
148152 << " [status: " << status << " ]" ;
149153
@@ -159,10 +163,12 @@ void Application::onChannelWatermark(const bsl::string& peerUri,
159163 d_brokerSession.handleChannelWatermark (type);
160164}
161165
162- void Application::readCb (const bmqio::Status& status,
163- int * numNeeded,
164- bdlbb::Blob* blob,
165- const bsl::shared_ptr<bmqio::Channel>& channel)
166+ void Application::readCb (
167+ const bmqio::Status& status,
168+ int * numNeeded,
169+ bdlbb::Blob* blob,
170+ const bsl::shared_ptr<bmqio::Channel>& channel,
171+ const bsl::shared_ptr<bmqp::HeartbeatMonitor>& monitor)
166172{
167173 // executed by the *IO* thread
168174
@@ -201,10 +207,15 @@ void Application::readCb(const bmqio::Status& status,
201207 return ; // RETURN
202208 }
203209
204- BALL_LOG_TRACE << channel->peerUri () << " : ReadCallback got a blob\n "
205- << bmqu::BlobStartHexDumper (&readBlob);
210+ // Create a raw event with a cloned blob
211+ bmqp::Event event (&readBlob, &d_allocator, true );
212+
213+ if (monitor->checkData (channel.get (), event)) {
214+ BALL_LOG_TRACE << channel->peerUri () << " : ReadCallback got a blob\n "
215+ << bmqu::BlobStartHexDumper (&readBlob);
206216
207- d_brokerSession.processPacket (readBlob);
217+ d_brokerSession.processPacket (event);
218+ }
208219}
209220
210221void Application::channelStateCallback (
@@ -238,6 +249,9 @@ void Application::channelStateCallback(
238249
239250 d_brokerSession.setChannel (channel);
240251
252+ bsl::shared_ptr<bmqp::HeartbeatMonitor> monitor = createMonitor (
253+ channel);
254+
241255 // Initiate read flow
242256 bmqio::Status st;
243257 channel->read (
@@ -248,7 +262,8 @@ void Application::channelStateCallback(
248262 bdlf::PlaceHolders::_1, // status
249263 bdlf::PlaceHolders::_2, // numNeeded
250264 bdlf::PlaceHolders::_3, // blob
251- channel));
265+ channel,
266+ monitor));
252267 if (!st) {
253268 BALL_LOG_ERROR << " Could not read from channel:"
254269 << " [peer: " << channel->peerUri ()
@@ -260,6 +275,8 @@ void Application::channelStateCallback(
260275 // Cancel the timeout event (if the handle is invalid, this will just
261276 // do nothing)
262277 d_scheduler.cancelEvent (&d_startTimeoutHandle);
278+
279+ startHeartbeat (channel, monitor);
263280 } break ; // BREAK
264281 case bmqio::ChannelFactoryEvent::e_CONNECT_ATTEMPT_FAILED: {
265282 BALL_LOG_DEBUG << " Failed an attempt to establish a session with '"
@@ -599,6 +616,7 @@ Application::Application(
599616, d_statSnaphotTimerHandle()
600617, d_nextStatDump(-1 )
601618, d_lastAllocatorSnapshot(0 )
619+ , d_heartbeatSchedulerHandle()
602620{
603621 // NOTE:
604622 // o The persistent session pool must live longer than the brokerSession
@@ -721,6 +739,8 @@ void Application::stop()
721739 BALL_LOG_INFO << " ::: STOP (SYNC) [state: " << d_brokerSession.state ()
722740 << " ] :::" ;
723741
742+ stopHeartbeat ();
743+
724744 // Stop the brokerSession
725745 d_brokerSession.stop ();
726746}
@@ -730,9 +750,79 @@ void Application::stopAsync()
730750 BALL_LOG_INFO << " ::: STOP (ASYNC) [state: " << d_brokerSession.state ()
731751 << " ] :::" ;
732752
753+ stopHeartbeat ();
754+
733755 // Stop the brokerSession
734756 d_brokerSession.stopAsync ();
735757}
736758
759+ bsl::shared_ptr<bmqp::HeartbeatMonitor>
760+ Application::createMonitor (const bsl::shared_ptr<bmqio::Channel>& channel)
761+ {
762+ int maxMissedHeartbeats = k_DEFAULT_MAX_MISSED_HEARTBEATS;
763+
764+ channel->properties ().load (
765+ &maxMissedHeartbeats,
766+ NegotiatedChannelFactory::k_CHANNEL_PROPERTY_MAX_MISSED_HEARTBEATS);
767+
768+ bsl::shared_ptr<bmqp::HeartbeatMonitor> monitor (
769+ new (d_allocator) bmqp::HeartbeatMonitor (maxMissedHeartbeats),
770+ &d_allocator);
771+
772+ return monitor;
773+ }
774+
775+ void Application::startHeartbeat (
776+ const bsl::shared_ptr<bmqio::Channel>& channel,
777+ const bsl::shared_ptr<bmqp::HeartbeatMonitor>& monitor)
778+ {
779+ BSLS_ASSERT_SAFE (monitor);
780+
781+ if (!monitor->isHearbeatEnabled ()) {
782+ return ; // RETURN
783+ }
784+
785+ int heartbeatIntervalMs = k_DEFAULT_HEARTBEAT_INTERVAL_MS;
786+
787+ channel->properties ().load (
788+ &heartbeatIntervalMs,
789+ NegotiatedChannelFactory::k_CHANNEL_PROPERTY_HEARTBEAT_INTERVAL_MS);
790+
791+ bsls::TimeInterval interval;
792+ interval.addMilliseconds (heartbeatIntervalMs);
793+
794+ d_scheduler.scheduleRecurringEvent (
795+ &d_heartbeatSchedulerHandle,
796+ interval,
797+ bdlf::BindUtil::bind (&Application::onHeartbeatSchedulerEvent,
798+ this ,
799+ channel,
800+ monitor));
801+ }
802+ void Application::stopHeartbeat ()
803+ {
804+ d_scheduler.cancelEventAndWait (&d_heartbeatSchedulerHandle);
805+ }
806+
807+ void Application::onHeartbeatSchedulerEvent (
808+ const bsl::shared_ptr<bmqio::Channel>& channel,
809+ const bsl::shared_ptr<bmqp::HeartbeatMonitor>& monitor)
810+ {
811+ // executed by the *SCHEDULER* thread
812+
813+ BSLS_ASSERT_SAFE (monitor);
814+ BSLS_ASSERT_SAFE (monitor->maxMissedHeartbeats ());
815+
816+ if (!monitor->checkHeartbeat (channel.get ())) {
817+ BALL_LOG_WARN << " #TCP_DEAD_CHANNEL "
818+ << " Closing unresponsive channel after "
819+ << monitor->maxMissedHeartbeats ()
820+ << " missed heartbeats [channel: '" << channel->peerUri ()
821+ << " ']" ;
822+
823+ channel->close ();
824+ }
825+ }
826+
737827} // close package namespace
738828} // close enterprise namespace
0 commit comments