@@ -242,58 +242,74 @@ void async_tcp_client::service()
242
242
243
243
244
244
// event loop
245
- fd_set fdss[3 ];
245
+ // fd_set fdss[3];
246
246
timeval timeout;
247
247
248
248
for (; !app_exiting_;)
249
249
{
250
- memcpy (&fdss, fdss_, sizeof (fdss_));
251
-
250
+ set_descriptors (); // memcpy(&fdss, fdss_, sizeof(fdss_));
252
251
253
252
// @pitfall: If still have data to read, only wait 1 millseconds.
254
253
get_wait_duration (timeout, this ->offset_ > 0 ? MAX_BUSY_DELAY : MAX_WAIT_DURATION);
255
254
256
- int nfds = ::select (maxfdp_, &(fdss[read_op]), &(fdss[write_op]), &(fdss[except_op]), &timeout);
255
+ INET_LOG (" socket.select waiting... %ld milliseconds" , timeout.tv_sec * 1000 + timeout.tv_usec / 1000 );
256
+
257
+ int nfds = ::select (10000 , &(fdss_[read_op]), nullptr , nullptr , &timeout);
258
+
259
+ INET_LOG (" socket.select waked up, retval=%d" , nfds);
260
+
257
261
if (nfds == -1 )
258
262
{
259
263
int ec = xxsocket::get_last_errno ();
260
264
INET_LOG (" socket.select failed, error code: %d, error msg:%s\n " , ec, xxsocket::get_error_msg (ec));
261
265
if (ec == EBADF || !this ->impl_ .is_open ()) {
262
266
goto _L_error;
263
267
}
268
+
269
+ clear_descriptors ();
264
270
continue ; // try select again.
265
271
}
272
+
273
+ if (nfds == 0 ) {
274
+ INET_LOG (" socket.select is timeout, do perform_timeout_timers()" );
275
+ perform_timeout_timers ();
276
+ }
277
+ // Reset the interrupter.
278
+ else if (nfds > 0 && FD_ISSET (this ->interrupter_ .read_descriptor (), &(fdss_[read_op])))
279
+ {
280
+ bool was_interrupt = interrupter_.reset ();
281
+ INET_LOG (" socket.select waked up by interrupt, interrupter fd:%d, was_interrupt:%s" , this ->interrupter_ .read_descriptor (), was_interrupt ? " true" : " false" );
282
+ --nfds;
283
+ }
266
284
285
+ #if 0
267
286
// we should check whether the connection have exception before any operations.
268
287
if(FD_ISSET(this->impl_.native_handle(), &(fdss[except_op])))
269
288
{
270
289
int ec = xxsocket::get_last_errno();
271
290
INET_LOG("socket.select exception triggered, error code: %d, error msg:%s\n", ec, xxsocket::get_error_msg(ec));
272
291
goto _L_error;
273
292
}
274
-
275
- if (FD_ISSET (this ->interrupter_ .read_descriptor (), &(fdss[read_op]))) {
276
- // reset only
277
- interrupter_.reset ();
278
- }
279
-
280
- // perform read operations
281
- if (this ->offset_ > 0 || FD_ISSET (this ->impl_ .native_handle (), &(fdss[read_op])))
282
- { // can read socket data
283
- if (!do_read (this ))
293
+ #endif
294
+ if (nfds > 0 || this ->offset_ > 0 ) {
295
+ INET_LOG (" perform read operation..." );
296
+ if (!do_read (this )) {
297
+ INET_LOG (" do read failed..." );
284
298
goto _L_error;
299
+ }
285
300
}
286
301
287
302
// perform write operations
288
303
if (!this ->send_queue_ .empty ()){
304
+ INET_LOG (" perform write operation..." );
289
305
if (!do_write (this ))
290
306
{ // TODO: check would block? for client, may be unnecessory.
307
+ INET_LOG (" do write failed..." );
291
308
goto _L_error;
292
309
}
293
310
}
294
-
295
- if (nfds == 0 )
296
- perform_timeout_timers ();
311
+
312
+ clear_descriptors ();
297
313
298
314
/* if (this->p2p_channel1_.connected_) {
299
315
if (!do_write(&p2p_channel1_))
@@ -370,6 +386,18 @@ void async_tcp_client::handle_error(void)
370
386
interrupter_.reset ();
371
387
}
372
388
389
+ void async_tcp_client::set_descriptors ()
390
+ {
391
+ FD_SET (this ->impl_ , &fdss_[read_op]);
392
+ FD_SET (this ->interrupter_ .read_descriptor (), &fdss_[read_op]);
393
+ }
394
+
395
+ void async_tcp_client::clear_descriptors ()
396
+ {
397
+ FD_CLR (this ->impl_ , &fdss_[read_op]);
398
+ FD_CLR (this ->interrupter_ .read_descriptor (), &fdss_[read_op]);
399
+ }
400
+
373
401
void async_tcp_client::register_descriptor (const socket_native_type fd, int flags)
374
402
{
375
403
if ((flags & socket_event_read) != 0 )
@@ -414,10 +442,13 @@ void async_tcp_client::async_send(std::vector<char>&& data, const appl_pdu_send_
414
442
{
415
443
auto pdu = new appl_pdu (std::move (data), callback, std::chrono::seconds (this ->send_timeout_ ));
416
444
417
- std::unique_lock<std::recursive_mutex> autolock ( send_queue_mtx_);
445
+ send_queue_mtx_. lock ( );
418
446
send_queue_.push_back (pdu);
447
+ send_queue_mtx_.unlock ();
419
448
420
449
interrupter_.interrupt ();
450
+
451
+ INET_LOG (" async_tcp_client::async_send --> push a message to send_queue_ ok." );
421
452
}
422
453
else {
423
454
INET_LOG (" async_tcp_client::send failed, The connection not ok!!!" );
@@ -462,8 +493,10 @@ bool async_tcp_client::connect(void)
462
493
FD_ZERO (&fdss_[write_op]);
463
494
FD_ZERO (&fdss_[except_op]);
464
495
465
- register_descriptor (interrupter_.read_descriptor (), socket_event_read);
466
- register_descriptor (impl_.native_handle (), socket_event_read | socket_event_except);
496
+ INET_LOG (" interrupter readfd:%d" , interrupter_.read_descriptor ());
497
+
498
+ // register_descriptor(interrupter_.read_descriptor(), socket_event_read);
499
+ // register_descriptor(impl_.native_handle(), socket_event_read | socket_event_except);
467
500
468
501
impl_.set_nonblocking (true );
469
502
@@ -689,7 +722,8 @@ void async_tcp_client::schedule_timer(deadline_timer* timer)
689
722
return lhs->wait_duration () > rhs->wait_duration ();
690
723
});
691
724
692
- interrupter_.interrupt ();
725
+ if (timer == *this ->timer_queue_ .begin ())
726
+ interrupter_.interrupt ();
693
727
}
694
728
695
729
void async_tcp_client::cancel_timer (deadline_timer* timer)
0 commit comments