@@ -76,20 +76,78 @@ void throw_if_error(int result, const char *context)
7676{
7777 if (result < 0 )
7878 {
79- throw std::runtime_error (std::string (context) + " : " + aeron_errmsg ());
79+ throw std::runtime_error (std::string (context) + " : " + epoch::detail::aeron_hooks (). errmsg ());
8080 }
8181}
8282
8383void throw_if_null (void *ptr, const char *context)
8484{
8585 if (ptr == nullptr )
8686 {
87- throw std::runtime_error (std::string (context) + " : " + aeron_errmsg ());
87+ throw std::runtime_error (std::string (context) + " : " + epoch::detail::aeron_hooks (). errmsg ());
8888 }
8989}
9090
9191} // namespace
9292
93+ namespace detail {
94+
95+ AeronHooks &aeron_hooks ()
96+ {
97+ static AeronHooks hooks{
98+ aeron_context_init,
99+ aeron_context_set_dir,
100+ aeron_init,
101+ aeron_start,
102+ aeron_async_add_publication,
103+ aeron_async_add_publication_poll,
104+ aeron_async_add_subscription,
105+ aeron_async_add_subscription_poll,
106+ aeron_publication_offer,
107+ aeron_subscription_poll,
108+ aeron_publication_close,
109+ aeron_subscription_close,
110+ aeron_close,
111+ aeron_context_close,
112+ aeron_errmsg,
113+ };
114+ return hooks;
115+ }
116+
117+ } // namespace detail
118+
119+ #ifdef EPOCH_TESTING
120+ namespace test {
121+
122+ detail::AeronHooks &aeron_hooks ()
123+ {
124+ return detail::aeron_hooks ();
125+ }
126+
127+ void reset_aeron_hooks ()
128+ {
129+ detail::aeron_hooks () = detail::AeronHooks{
130+ aeron_context_init,
131+ aeron_context_set_dir,
132+ aeron_init,
133+ aeron_start,
134+ aeron_async_add_publication,
135+ aeron_async_add_publication_poll,
136+ aeron_async_add_subscription,
137+ aeron_async_add_subscription_poll,
138+ aeron_publication_offer,
139+ aeron_subscription_poll,
140+ aeron_publication_close,
141+ aeron_subscription_close,
142+ aeron_close,
143+ aeron_context_close,
144+ aeron_errmsg,
145+ };
146+ }
147+
148+ } // namespace test
149+ #endif
150+
93151AeronTransport::AeronTransport (AeronConfig config) : config_(std::move(config))
94152{
95153 if (config_.fragment_limit <= 0 )
@@ -104,25 +162,27 @@ AeronTransport::AeronTransport(AeronConfig config) : config_(std::move(config))
104162 try
105163 {
106164 aeron_context_t *context = nullptr ;
107- throw_if_error (aeron_context_init (&context), " aeron_context_init failed" );
165+ throw_if_error (detail::aeron_hooks (). context_init (&context), " aeron_context_init failed" );
108166 if (!config_.aeron_directory .empty ())
109167 {
110- throw_if_error (aeron_context_set_dir (context, config_.aeron_directory .c_str ()),
168+ throw_if_error (detail::aeron_hooks (). context_set_dir (context, config_.aeron_directory .c_str ()),
111169 " aeron_context_set_dir failed" );
112170 }
113171 context_ = context;
114172
115173 aeron_t *client = nullptr ;
116- throw_if_error (aeron_init (&client, context_), " aeron_init failed" );
117- throw_if_error (aeron_start (client), " aeron_start failed" );
174+ throw_if_error (detail::aeron_hooks (). init (&client, context_), " aeron_init failed" );
175+ throw_if_error (detail::aeron_hooks (). start (client), " aeron_start failed" );
118176 client_ = client;
119177
120178 aeron_async_add_publication_t *pub_async = nullptr ;
121- throw_if_error (aeron_async_add_publication (&pub_async, client_, config_.channel .c_str (), config_.stream_id ),
179+ throw_if_error (
180+ detail::aeron_hooks ().async_add_publication (
181+ &pub_async, client_, config_.channel .c_str (), config_.stream_id ),
122182 " aeron_async_add_publication failed" );
123183 while (true )
124184 {
125- int poll_result = aeron_async_add_publication_poll (&publication_, pub_async);
185+ int poll_result = detail::aeron_hooks (). async_add_publication_poll (&publication_, pub_async);
126186 if (poll_result == 1 )
127187 {
128188 break ;
@@ -134,12 +194,19 @@ AeronTransport::AeronTransport(AeronConfig config) : config_(std::move(config))
134194
135195 aeron_async_add_subscription_t *sub_async = nullptr ;
136196 throw_if_error (
137- aeron_async_add_subscription (
138- &sub_async, client_, config_.channel .c_str (), config_.stream_id , nullptr , nullptr , nullptr , nullptr ),
197+ detail::aeron_hooks ().async_add_subscription (
198+ &sub_async,
199+ client_,
200+ config_.channel .c_str (),
201+ config_.stream_id ,
202+ nullptr ,
203+ nullptr ,
204+ nullptr ,
205+ nullptr ),
139206 " aeron_async_add_subscription failed" );
140207 while (true )
141208 {
142- int poll_result = aeron_async_add_subscription_poll (&subscription_, sub_async);
209+ int poll_result = detail::aeron_hooks (). async_add_subscription_poll (&subscription_, sub_async);
143210 if (poll_result == 1 )
144211 {
145212 break ;
@@ -174,7 +241,8 @@ void AeronTransport::send(const Message &message)
174241 std::int64_t result = 0 ;
175242 do
176243 {
177- result = aeron_publication_offer (publication_, buffer.data (), buffer.size (), nullptr , nullptr );
244+ result = detail::aeron_hooks ().publication_offer (
245+ publication_, buffer.data (), buffer.size (), nullptr , nullptr );
178246 if (result >= 0 )
179247 {
180248 stats_.sent_count ++;
@@ -238,7 +306,7 @@ std::vector<Message> AeronTransport::poll(std::size_t max)
238306 ctx->stats ->received_count ++;
239307 };
240308
241- int fragments = aeron_subscription_poll (subscription_, handler, &context, limit);
309+ int fragments = detail::aeron_hooks (). subscription_poll (subscription_, handler, &context, limit);
242310 throw_if_error (fragments, " aeron_subscription_poll failed" );
243311 return out;
244312}
@@ -253,22 +321,22 @@ void AeronTransport::close()
253321
254322 if (publication_ != nullptr )
255323 {
256- aeron_publication_close (publication_, nullptr , nullptr );
324+ detail::aeron_hooks (). publication_close (publication_, nullptr , nullptr );
257325 publication_ = nullptr ;
258326 }
259327 if (subscription_ != nullptr )
260328 {
261- aeron_subscription_close (subscription_, nullptr , nullptr );
329+ detail::aeron_hooks (). subscription_close (subscription_, nullptr , nullptr );
262330 subscription_ = nullptr ;
263331 }
264332 if (client_ != nullptr )
265333 {
266- aeron_close (client_);
334+ detail::aeron_hooks (). close (client_);
267335 client_ = nullptr ;
268336 }
269337 if (context_ != nullptr )
270338 {
271- aeron_context_close (context_);
339+ detail::aeron_hooks (). context_close (context_);
272340 context_ = nullptr ;
273341 }
274342}
0 commit comments