@@ -363,78 +363,103 @@ static int connRdmaHandleWrite(RdmaContext *ctx, uint32_t byte_len) {
363363 return VALKEY_OK ;
364364}
365365
366- static int connRdmaHandleCq (RdmaContext * ctx ) {
366+ static int connRdmaHandleWc (RdmaContext * ctx , struct ibv_wc * wc ) {
367367 struct rdma_cm_id * cm_id = ctx -> cm_id ;
368- struct ibv_cq * ev_cq = NULL ;
369- void * ev_ctx = NULL ;
370- struct ibv_wc wc = {0 };
371368 valkeyRdmaCmd * cmd ;
372- int ret ;
373-
374- if (ibv_get_cq_event (ctx -> comp_channel , & ev_cq , & ev_ctx ) < 0 ) {
375- if (errno != EAGAIN ) {
376- valkeySetError (ctx , VALKEY_ERR_OTHER , "RDMA: get cq event failed" );
377- return VALKEY_ERR ;
378- }
379-
380- return VALKEY_OK ;
381- }
382-
383- ibv_ack_cq_events (ev_cq , 1 );
384- if (ibv_req_notify_cq (ev_cq , 0 )) {
385- valkeySetError (ctx , VALKEY_ERR_OTHER , "RDMA: notify cq failed" );
386- return VALKEY_ERR ;
387- }
388-
389- pollcq :
390- ret = ibv_poll_cq (ctx -> cq , 1 , & wc );
391- if (ret < 0 ) {
392- valkeySetError (ctx , VALKEY_ERR_OTHER , "RDMA: poll cq failed" );
393- return VALKEY_ERR ;
394- } else if (ret == 0 ) {
395- return VALKEY_OK ;
396- }
397369
398- if (wc . status != IBV_WC_SUCCESS ) {
370+ if (wc -> status != IBV_WC_SUCCESS ) {
399371 valkeySetError (ctx , VALKEY_ERR_OTHER , "RDMA: send/recv failed" );
400372 return VALKEY_ERR ;
401373 }
402374
403- switch (wc . opcode ) {
375+ switch (wc -> opcode ) {
404376 case IBV_WC_RECV :
405- cmd = (valkeyRdmaCmd * )(uintptr_t )wc . wr_id ;
406- if (connRdmaHandleRecv (ctx , cm_id , cmd , wc . byte_len ) == VALKEY_ERR ) {
377+ cmd = (valkeyRdmaCmd * )(uintptr_t )wc -> wr_id ;
378+ if (connRdmaHandleRecv (ctx , cm_id , cmd , wc -> byte_len ) == VALKEY_ERR ) {
407379 return VALKEY_ERR ;
408380 }
409-
410381 break ;
411-
412382 case IBV_WC_RECV_RDMA_WITH_IMM :
413- cmd = (valkeyRdmaCmd * )(uintptr_t )wc . wr_id ;
414- if (connRdmaHandleRecvImm (ctx , cm_id , cmd , ntohl (wc . imm_data )) == VALKEY_ERR ) {
383+ cmd = (valkeyRdmaCmd * )(uintptr_t )wc -> wr_id ;
384+ if (connRdmaHandleRecvImm (ctx , cm_id , cmd , ntohl (wc -> imm_data )) == VALKEY_ERR ) {
415385 return VALKEY_ERR ;
416386 }
417-
418387 break ;
419388 case IBV_WC_RDMA_WRITE :
420- if (connRdmaHandleWrite (ctx , wc . byte_len ) == VALKEY_ERR ) {
389+ if (connRdmaHandleWrite (ctx , wc -> byte_len ) == VALKEY_ERR ) {
421390 return VALKEY_ERR ;
422391 }
423-
424392 break ;
425393 case IBV_WC_SEND :
426- cmd = (valkeyRdmaCmd * )(uintptr_t )wc . wr_id ;
394+ cmd = (valkeyRdmaCmd * )(uintptr_t )wc -> wr_id ;
427395 if (connRdmaHandleSend (ctx , cmd ) == VALKEY_ERR ) {
428396 return VALKEY_ERR ;
429397 }
430-
431398 break ;
432399 default :
433400 valkeySetError (ctx , VALKEY_ERR_OTHER , "RDMA: unexpected opcode" );
434401 return VALKEY_ERR ;
435402 }
436403
437- goto pollcq ;
404+ return VALKEY_OK ;
405+ }
406+
407+ static int connRdmaHandleCq (RdmaContext * ctx ) {
408+ struct ibv_cq * ev_cq = NULL ;
409+ void * ev_ctx = NULL ;
410+ struct ibv_wc wc = {0 };
411+ int ret ;
412+
413+ pthread_mutex_lock (& ctx -> cq_mu );
414+ for (;;) {
415+ ret = ibv_poll_cq (ctx -> cq , 1 , & wc );
416+ if (ret < 0 ) {
417+ valkeySetError (ctx , VALKEY_ERR_OTHER , "RDMA: poll cq failed" );
418+ pthread_mutex_unlock (& ctx -> cq_mu );
419+ return VALKEY_ERR ;
420+ } else if (ret == 0 ) {
421+ break ;
422+ }
423+
424+ if (connRdmaHandleWc (ctx , & wc ) == VALKEY_ERR ) {
425+ pthread_mutex_unlock (& ctx -> cq_mu );
426+ return VALKEY_ERR ;
427+ }
428+ }
429+
430+ if (ibv_get_cq_event (ctx -> comp_channel , & ev_cq , & ev_ctx ) < 0 ) {
431+ if (errno != EAGAIN ) {
432+ valkeySetError (ctx , VALKEY_ERR_OTHER , "RDMA: get cq event failed" );
433+ pthread_mutex_unlock (& ctx -> cq_mu );
434+ return VALKEY_ERR ;
435+ }
436+ pthread_mutex_unlock (& ctx -> cq_mu );
437+ return VALKEY_OK ;
438+ }
439+
440+ ibv_ack_cq_events (ev_cq , 1 );
441+ if (ibv_req_notify_cq (ev_cq , 0 )) {
442+ valkeySetError (ctx , VALKEY_ERR_OTHER , "RDMA: notify cq failed" );
443+ pthread_mutex_unlock (& ctx -> cq_mu );
444+ return VALKEY_ERR ;
445+ }
446+
447+ for (;;) {
448+ ret = ibv_poll_cq (ctx -> cq , 1 , & wc );
449+ if (ret < 0 ) {
450+ valkeySetError (ctx , VALKEY_ERR_OTHER , "RDMA: poll cq failed" );
451+ pthread_mutex_unlock (& ctx -> cq_mu );
452+ return VALKEY_ERR ;
453+ } else if (ret == 0 ) {
454+ pthread_mutex_unlock (& ctx -> cq_mu );
455+ return VALKEY_OK ;
456+ }
457+
458+ if (connRdmaHandleWc (ctx , & wc ) == VALKEY_ERR ) {
459+ pthread_mutex_unlock (& ctx -> cq_mu );
460+ return VALKEY_ERR ;
461+ }
462+ }
438463}
439464
440465/* There are two FD(s) in use:
@@ -447,14 +472,10 @@ static int valkeyRdmaPollCqCm(RdmaContext *ctx, long timed) {
447472#define VALKEY_RDMA_POLLFD_CQ 1
448473#define VALKEY_RDMA_POLLFD_MAX 2
449474 struct pollfd pfd [VALKEY_RDMA_POLLFD_MAX ];
450- long now = vk_msec_now ();
475+ struct ibv_wc wc = {0 };
476+ long now ;
451477 int ret ;
452478
453- if (now >= timed ) {
454- valkeySetError (ctx , VALKEY_ERR_IO , "RDMA: IO timeout" );
455- return VALKEY_ERR ;
456- }
457-
458479 /* pfd[0] for CM event */
459480 pfd [VALKEY_RDMA_POLLFD_CM ].fd = ctx -> cm_channel -> fd ;
460481 pfd [VALKEY_RDMA_POLLFD_CM ].events = POLLIN ;
@@ -464,24 +485,57 @@ static int valkeyRdmaPollCqCm(RdmaContext *ctx, long timed) {
464485 pfd [VALKEY_RDMA_POLLFD_CQ ].fd = ctx -> comp_channel -> fd ;
465486 pfd [VALKEY_RDMA_POLLFD_CQ ].events = POLLIN ;
466487 pfd [VALKEY_RDMA_POLLFD_CQ ].revents = 0 ;
467- ret = poll_noeintr (pfd , VALKEY_RDMA_POLLFD_MAX , timed - now );
468- if (ret < 0 ) {
469- valkeySetError (ctx , VALKEY_ERR_IO , "RDMA: Poll CQ/CM failed" );
470- return VALKEY_ERR ;
471- } else if (ret == 0 ) {
472- valkeySetError (ctx , VALKEY_ERR_IO , "Resource temporarily unavailable" );
473- return VALKEY_ERR ;
474- }
475488
476- if ( pfd [ VALKEY_RDMA_POLLFD_CM ]. revents & POLLIN ) {
477- valkeyRdmaCM ( ctx , 0 );
478- if (!( ctx -> flags & VALKEY_CONNECTED ) ) {
479- valkeySetError (ctx , VALKEY_ERR_EOF , "Server closed the connection " );
489+ for (;; ) {
490+ now = vk_msec_now ( );
491+ if (now >= timed ) {
492+ valkeySetError (ctx , VALKEY_ERR_IO , "RDMA: IO timeout " );
480493 return VALKEY_ERR ;
481494 }
482- }
483495
484- return VALKEY_OK ;
496+ /* First, try to drain CQ without relying on events. */
497+ for (;;) {
498+ pthread_mutex_lock (& ctx -> cq_mu );
499+ ret = ibv_poll_cq (ctx -> cq , 1 , & wc );
500+ if (ret < 0 ) {
501+ valkeySetError (ctx , VALKEY_ERR_OTHER , "RDMA: poll cq failed" );
502+ pthread_mutex_unlock (& ctx -> cq_mu );
503+ return VALKEY_ERR ;
504+ } else if (ret == 0 ) {
505+ pthread_mutex_unlock (& ctx -> cq_mu );
506+ break ;
507+ }
508+ if (connRdmaHandleWc (ctx , & wc ) == VALKEY_ERR ) {
509+ pthread_mutex_unlock (& ctx -> cq_mu );
510+ return VALKEY_ERR ;
511+ }
512+ pthread_mutex_unlock (& ctx -> cq_mu );
513+ }
514+
515+ /* Poll for a short slice so we can re-check CQ even if no events. */
516+ ret = poll_noeintr (pfd , VALKEY_RDMA_POLLFD_MAX , (int )valkeyMin (10 , timed - now ));
517+ if (ret < 0 ) {
518+ valkeySetError (ctx , VALKEY_ERR_IO , "RDMA: Poll CQ/CM failed" );
519+ return VALKEY_ERR ;
520+ } else if (ret == 0 ) {
521+ continue ;
522+ }
523+
524+ if (pfd [VALKEY_RDMA_POLLFD_CM ].revents & POLLIN ) {
525+ valkeyRdmaCM (ctx , 0 );
526+ if (!(ctx -> flags & VALKEY_CONNECTED )) {
527+ valkeySetError (ctx , VALKEY_ERR_EOF , "Server closed the connection" );
528+ return VALKEY_ERR ;
529+ }
530+ }
531+
532+ if (pfd [VALKEY_RDMA_POLLFD_CQ ].revents & POLLIN ) {
533+ if (connRdmaHandleCq (ctx ) == VALKEY_ERR ) {
534+ return VALKEY_ERR ;
535+ }
536+ return VALKEY_OK ;
537+ }
538+ }
485539}
486540
487541static size_t connRdmaSend (RdmaContext * ctx , struct rdma_cm_id * cm_id , const void * data , size_t data_len ) {
@@ -815,7 +869,6 @@ ssize_t rdmaRead(RdmaContext *ctx, char *buf, size_t bufcap, long timeout_msec)
815869 struct rdma_cm_id * cm_id = ctx -> cm_id ;
816870 long end ;
817871 uint32_t toread , remained , topoll ;
818-
819872 end = vk_msec_now () + timeout_msec ;
820873
821874pollcq :
@@ -836,15 +889,23 @@ ssize_t rdmaRead(RdmaContext *ctx, char *buf, size_t bufcap, long timeout_msec)
836889 }
837890 pthread_mutex_unlock (& ctx -> rx_mu );
838891
839- pthread_mutex_lock (& ctx -> cq_mu );
840892 pthread_mutex_lock (& ctx -> rx_mu );
841893 topoll = ctx -> recv_offset - ctx -> rx_offset ;
842894 pthread_mutex_unlock (& ctx -> rx_mu );
843- if (topoll == 0 && (valkeyRdmaPollCqCm (ctx , end ) != VALKEY_OK || connRdmaHandleCq (ctx ) != VALKEY_OK )) {
844- pthread_mutex_unlock (& ctx -> cq_mu );
845- return VALKEY_ERR ;
895+ while (topoll == 0 ) {
896+ if (connRdmaHandleCq (ctx ) != VALKEY_OK ) {
897+ return VALKEY_ERR ;
898+ }
899+ pthread_mutex_lock (& ctx -> rx_mu );
900+ topoll = ctx -> recv_offset - ctx -> rx_offset ;
901+ pthread_mutex_unlock (& ctx -> rx_mu );
902+ if (topoll != 0 ) {
903+ break ;
904+ }
905+ if (valkeyRdmaPollCqCm (ctx , end ) != VALKEY_OK ) {
906+ return VALKEY_ERR ;
907+ }
846908 }
847- pthread_mutex_unlock (& ctx -> cq_mu );
848909 goto pollcq ;
849910}
850911
@@ -853,7 +914,6 @@ ssize_t rdmaWrite(RdmaContext *ctx, const char *obuf, size_t data_len, long time
853914 long end ;
854915 uint32_t towrite , topoll , wrote = 0 ;
855916 size_t ret ;
856-
857917 end = vk_msec_now () + timeout_msec ;
858918
859919pollcq :
@@ -878,15 +938,23 @@ ssize_t rdmaWrite(RdmaContext *ctx, const char *obuf, size_t data_len, long time
878938 }
879939
880940waitcq :
881- pthread_mutex_lock (& ctx -> cq_mu );
882941 pthread_mutex_lock (& ctx -> tx_mu );
883942 topoll = ctx -> tx_offset - ctx -> tx_length ;
884943 pthread_mutex_unlock (& ctx -> tx_mu );
885- if (topoll == 0 && (valkeyRdmaPollCqCm (ctx , end ) != VALKEY_OK || connRdmaHandleCq (ctx ) != VALKEY_OK )) {
886- pthread_mutex_unlock (& ctx -> cq_mu );
887- return VALKEY_ERR ;
944+ while (topoll == 0 ) {
945+ if (connRdmaHandleCq (ctx ) != VALKEY_OK ) {
946+ return VALKEY_ERR ;
947+ }
948+ pthread_mutex_lock (& ctx -> tx_mu );
949+ topoll = ctx -> tx_offset - ctx -> tx_length ;
950+ pthread_mutex_unlock (& ctx -> tx_mu );
951+ if (topoll != 0 ) {
952+ break ;
953+ }
954+ if (valkeyRdmaPollCqCm (ctx , end ) != VALKEY_OK ) {
955+ return VALKEY_ERR ;
956+ }
888957 }
889- pthread_mutex_unlock (& ctx -> cq_mu );
890958 goto pollcq ;
891959}
892960
0 commit comments