55#include " SubscriptionHandle.h"
66#include " TopicId.h"
77#include " TopicMessage.h"
8+ #include " exceptions/IllegalStateException.h"
89#include " impl/MirrorNetwork.h"
910#include " impl/MirrorNode.h"
1011#include " impl/TimestampConverter.h"
@@ -31,12 +32,22 @@ enum class CallStatus
3132// Helper function used to get a connected mirror node.
3233std::shared_ptr<internal::MirrorNode> getConnectedMirrorNode (const std::shared_ptr<internal::MirrorNetwork>& network)
3334{
35+ if (!network)
36+ {
37+ throw IllegalStateException (" Mirror network is not configured" );
38+ }
39+
3440 std::shared_ptr<internal::MirrorNode> node = network->getNextMirrorNode ();
35- while (node->channelFailedToConnect ())
41+ while (node && node ->channelFailedToConnect ())
3642 {
3743 node = network->getNextMirrorNode ();
3844 }
3945
46+ if (!node)
47+ {
48+ throw IllegalStateException (" No mirror node is available for topic message subscription" );
49+ }
50+
4051 return node;
4152}
4253
@@ -218,8 +229,16 @@ void startSubscription(
218229
219230 // Reset the call status and send the query.
220231 *callStatus = CallStatus::STATUS_CREATE;
221- reader = getConnectedMirrorNode (network)->getConsensusServiceStub ()->AsyncsubscribeTopic (
222- contexts.back ().get (), query, queues.back ().get (), callStatus.get ());
232+ try
233+ {
234+ reader = getConnectedMirrorNode (network)->getConsensusServiceStub ()->AsyncsubscribeTopic (
235+ contexts.back ().get (), query, queues.back ().get (), callStatus.get ());
236+ }
237+ catch (const IllegalStateException& e)
238+ {
239+ errorHandler (grpc::Status (grpc::StatusCode::FAILED_PRECONDITION, e.what ()));
240+ complete = true ;
241+ }
223242
224243 break ;
225244 }
@@ -330,23 +349,33 @@ std::shared_ptr<SubscriptionHandle> TopicMessageQuery::subscribe(const Client& c
330349 auto queue = std::make_unique<grpc::CompletionQueue>();
331350 auto callStatus = std::make_unique<CallStatus>();
332351
352+ std::shared_ptr<internal::MirrorNode> node;
353+ try
354+ {
355+ node = getConnectedMirrorNode (client.getClientMirrorNetwork ());
356+ }
357+ catch (const IllegalStateException& e)
358+ {
359+ mImpl ->mErrorHandler (grpc::Status (grpc::StatusCode::FAILED_PRECONDITION, e.what ()));
360+ return handle;
361+ }
362+
333363 // Send the query and initiate the subscription.
334- std::thread (&startSubscription,
335- client.getClientMirrorNetwork (),
336- getConnectedMirrorNode (client.getClientMirrorNetwork ())
337- ->getConsensusServiceStub ()
338- ->AsyncsubscribeTopic (context.get (), mImpl ->mQuery , queue.get (), callStatus.get ()),
339- std::move (context),
340- std::move (queue),
341- std::move (callStatus),
342- mImpl ->mQuery ,
343- mImpl ->mErrorHandler ,
344- mImpl ->mRetryHandler ,
345- mImpl ->mCompletionHandler ,
346- onNext,
347- mImpl ->mMaxAttempts ,
348- mImpl ->mMaxBackoff ,
349- handle)
364+ std::thread (
365+ &startSubscription,
366+ client.getClientMirrorNetwork (),
367+ node->getConsensusServiceStub ()->AsyncsubscribeTopic (context.get (), mImpl ->mQuery , queue.get (), callStatus.get ()),
368+ std::move (context),
369+ std::move (queue),
370+ std::move (callStatus),
371+ mImpl ->mQuery ,
372+ mImpl ->mErrorHandler ,
373+ mImpl ->mRetryHandler ,
374+ mImpl ->mCompletionHandler ,
375+ onNext,
376+ mImpl ->mMaxAttempts ,
377+ mImpl ->mMaxBackoff ,
378+ handle)
350379 .detach ();
351380
352381 return handle;
0 commit comments