Skip to content

Commit ef56f06

Browse files
committed
#985 add exception for zmq connection error to identify TCP fallback
1 parent 1b41b0f commit ef56f06

6 files changed

Lines changed: 163 additions & 15 deletions

File tree

blockfinalize/client/BlockFinalizeDownloader.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "exceptions/ConnectionRefusedException.h"
2828
#include "exceptions/ExitRequestedException.h"
2929
#include "exceptions/DoNotHaveProposalYetException.h"
30+
#include "exceptions/ZMQTransportException.h"
3031

3132
#include "abstracttcpserver/ConnectionStatus.h"
3233

@@ -311,17 +312,14 @@ void BlockFinalizeDownloader::downloadFragment(
311312
return;
312313
}
313314

314-
// If node is not marked as TCP fallback, try ZMQ first, and if it fails, mark it as TCP fallback and retry using TCP.
315+
// If node is not marked as TCP fallback, try ZMQ first. Downgrade to TCP only for
316+
// explicit ZMQ transport failures, not for parsing, protocol, or local-state errors.
315317
try {
316318
downloadFragmentZMQ( _dstIndex, _fragmentIndex, header );
317319
clearTCPFallback( _dstIndex );
318320
return;
319-
} catch ( DoNotHaveProposalYetException& ) {
320-
throw;
321-
} catch ( ExitRequestedException& ) {
322-
throw;
323-
} catch ( ... ) {
324-
// peer is not usable via ZMQ, mark it as TCP fallback and retry using TCP
321+
} catch ( const ZMQTransportException& e ) {
322+
// peer is not usable via ZMQ transport right now, mark it as TCP fallback
325323
getNode()->getSockets()->getBulkDataZMQSockets()->closeDestinationSocket( _dstIndex );
326324
getSchain()->noteBlockFinalizeZmqClientFallbackToTcp();
327325
markTCPFallback( _dstIndex );

blockfinalize/client/BlockFinalizeDownloader.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
#pragma once
2626

27+
#include <exception>
2728

2829
class CommittedBlockList;
2930
class ClientSocket;
@@ -129,7 +130,7 @@ class BlockFinalizeDownloader : public Agent {
129130

130131
/**
131132
* Downloads block fragment from '_dstIndex' node using TCP connection.
132-
* This is the old interface, used only as a fallback when ZMQ download fails,
133+
* This is the old interface, used only as a fallback when ZMQ transport fails,
133134
* and is marked in local cache to keep using TCP for this node for some time in the future.
134135
*/
135136
void downloadFragmentTCP( schain_index _dstIndex, fragment_index _fragmentIndex,

exceptions/ZMQTransportException.h

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
Copyright (C) 2018-2019 SKALE Labs
3+
4+
This file is part of skale-consensus.
5+
6+
skale-consensus is free software: you can redistribute it and/or modify
7+
it under the terms of the GNU Affero General Public License as published
8+
by the Free Software Foundation, either version 3 of the License, or
9+
(at your option) any later version.
10+
11+
skale-consensus is distributed in the hope that it will be useful,
12+
but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
GNU Affero General Public License for more details.
15+
16+
You should have received a copy of the GNU Affero General Public License
17+
along with skale-consensus. If not, see <https://www.gnu.org/licenses/>.
18+
19+
@file ZMQTransportException.h
20+
@author Sidnei Teixeira
21+
@date 2026
22+
*/
23+
24+
#pragma once
25+
26+
#include "NetworkProtocolException.h"
27+
28+
class ZMQTransportException : public NetworkProtocolException {
29+
int zmqErrno = 0;
30+
31+
public:
32+
ZMQTransportException( const string& _message, int _zmqErrno, const string& _className )
33+
: NetworkProtocolException( _message, _className ), zmqErrno( _zmqErrno ) {}
34+
35+
[[nodiscard]] int getZmqErrno() const {
36+
return zmqErrno;
37+
}
38+
};

network/ZMQErrorClassifier.h

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
Copyright (C) 2018-2019 SKALE Labs
3+
4+
This file is part of skale-consensus.
5+
6+
skale-consensus is free software: you can redistribute it and/or modify
7+
it under the terms of the GNU Affero General Public License as published
8+
by the Free Software Foundation, either version 3 of the License, or
9+
(at your option) any later version.
10+
11+
skale-consensus is distributed in the hope that it will be useful,
12+
but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
GNU Affero General Public License for more details.
15+
16+
You should have received a copy of the GNU Affero General Public License
17+
along with skale-consensus. If not, see <https://www.gnu.org/licenses/>.
18+
19+
@file ZMQErrorClassifier.h
20+
@author Sidnei Teixeira
21+
@date 2026
22+
*/
23+
24+
#pragma once
25+
26+
#include <cerrno>
27+
28+
inline bool isZMQTransportErrorForTcpFallback( int _zmqErrno ) {
29+
switch ( _zmqErrno ) {
30+
case EAGAIN:
31+
case ETIMEDOUT:
32+
case ECONNRESET:
33+
case ECONNREFUSED:
34+
case ENETDOWN:
35+
case ENETUNREACH:
36+
case EHOSTUNREACH:
37+
case EPIPE:
38+
return true;
39+
default:
40+
return false;
41+
}
42+
}

network/ZMQHeaderPayloadFrame.cpp

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
#include "exceptions/ExitRequestedException.h"
2424
#include "exceptions/NetworkProtocolException.h"
2525
#include "exceptions/ParsingException.h"
26+
#include "exceptions/ZMQTransportException.h"
2627
#include "headers/Header.h"
28+
#include "network/ZMQErrorClassifier.h"
2729
#include "node/Node.h"
2830
#include "utils/Time.h"
2931
#include "ZMQHeaderPayloadFrame.h"
@@ -38,6 +40,17 @@ void maybeSimulateDelay( Schain& _sChain ) {
3840
usleep( simulatedDelay * 1000 );
3941
}
4042
}
43+
44+
[[noreturn]] void throwZMQCommunicationException( const string& _message ) {
45+
auto zmqErrno = zmq_errno();
46+
auto errorMessage = _message + ":" + string( zmq_strerror( zmqErrno ) );
47+
48+
if ( isZMQTransportErrorForTcpFallback( zmqErrno ) ) {
49+
BOOST_THROW_EXCEPTION( ZMQTransportException( errorMessage, zmqErrno, __CLASS_NAME__ ) );
50+
}
51+
52+
BOOST_THROW_EXCEPTION( NetworkProtocolException( errorMessage, __CLASS_NAME__ ) );
53+
}
4154
}
4255

4356
ptr< vector< uint8_t > > ZMQHeaderPayloadFrame::packMessage(
@@ -110,8 +123,7 @@ void ZMQHeaderPayloadFrame::sendFrame( Schain& _sChain, void* _socket, const ptr
110123

111124
auto rc = zmq_send( _socket, _frame->data(), _frame->size(), 0 );
112125
if ( rc < 0 ) {
113-
BOOST_THROW_EXCEPTION( NetworkProtocolException(
114-
"ZMQ send failed:" + string( zmq_strerror( errno ) ), __CLASS_NAME__ ) );
126+
throwZMQCommunicationException( "ZMQ send failed" );
115127
}
116128
}
117129

@@ -134,8 +146,7 @@ void ZMQHeaderPayloadFrame::sendFrameToRoutingId( Schain& _sChain, void* _socket
134146
rc = zmq_msg_send( &msg, _socket, 0 );
135147
if ( rc < 0 ) {
136148
zmq_msg_close( &msg );
137-
BOOST_THROW_EXCEPTION( NetworkProtocolException(
138-
"ZMQ send failed:" + string( zmq_strerror( errno ) ), __CLASS_NAME__ ) );
149+
throwZMQCommunicationException( "ZMQ send failed" );
139150
}
140151

141152
rc = zmq_msg_close( &msg );
@@ -154,8 +165,9 @@ ptr< vector< uint8_t > > ZMQHeaderPayloadFrame::receiveFrame(
154165

155166
rc = zmq_msg_recv( &msg, _socket, 0 );
156167
if ( rc < 0 ) {
168+
auto zmqErrno = zmq_errno();
157169
zmq_msg_close( &msg );
158-
if ( errno == EAGAIN ) {
170+
if ( zmqErrno == EAGAIN ) {
159171
// check if timeout happened during exit
160172
_sChain.getNode()->exitCheck();
161173

@@ -164,8 +176,12 @@ ptr< vector< uint8_t > > ZMQHeaderPayloadFrame::receiveFrame(
164176
return nullptr;
165177
}
166178
}
167-
BOOST_THROW_EXCEPTION( NetworkProtocolException(
168-
string( _errorString ) + ":" + zmq_strerror( errno ), __CLASS_NAME__ ) );
179+
auto errorMessage = string( _errorString ) + ":" + string( zmq_strerror( zmqErrno ) );
180+
if ( isZMQTransportErrorForTcpFallback( zmqErrno ) ) {
181+
BOOST_THROW_EXCEPTION(
182+
ZMQTransportException( errorMessage, zmqErrno, __CLASS_NAME__ ) );
183+
}
184+
BOOST_THROW_EXCEPTION( NetworkProtocolException( errorMessage, __CLASS_NAME__ ) );
169185
}
170186

171187
auto frame = make_shared< vector< uint8_t > >( rc );
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
#include "thirdparty/catch.hpp"
2+
3+
#include <cerrno>
4+
#include <exception>
5+
6+
#include "exceptions/InvalidStateException.h"
7+
#include "exceptions/NetworkProtocolException.h"
8+
#include "exceptions/ParsingException.h"
9+
#include "exceptions/ZMQTransportException.h"
10+
#include "network/ZMQErrorClassifier.h"
11+
#include "zmq.h"
12+
13+
CATCH_TEST_CASE( "ZMQ transport fallback classifier accepts transport errno values",
14+
"[network][zmq][unit]" ) {
15+
CATCH_REQUIRE( isZMQTransportErrorForTcpFallback( EAGAIN ) );
16+
CATCH_REQUIRE( isZMQTransportErrorForTcpFallback( ETIMEDOUT ) );
17+
CATCH_REQUIRE( isZMQTransportErrorForTcpFallback( ECONNRESET ) );
18+
CATCH_REQUIRE( isZMQTransportErrorForTcpFallback( ECONNREFUSED ) );
19+
CATCH_REQUIRE( isZMQTransportErrorForTcpFallback( ENETDOWN ) );
20+
CATCH_REQUIRE( isZMQTransportErrorForTcpFallback( ENETUNREACH ) );
21+
CATCH_REQUIRE( isZMQTransportErrorForTcpFallback( EHOSTUNREACH ) );
22+
CATCH_REQUIRE( isZMQTransportErrorForTcpFallback( EPIPE ) );
23+
}
24+
25+
CATCH_TEST_CASE( "ZMQ transport fallback classifier rejects non-transport errno values",
26+
"[network][zmq][unit]" ) {
27+
CATCH_REQUIRE_FALSE( isZMQTransportErrorForTcpFallback( EINVAL ) );
28+
CATCH_REQUIRE_FALSE( isZMQTransportErrorForTcpFallback( EFSM ) );
29+
CATCH_REQUIRE_FALSE( isZMQTransportErrorForTcpFallback( ETERM ) );
30+
}
31+
32+
CATCH_TEST_CASE( "BlockFinalize TCP fallback only applies to ZMQ transport exceptions",
33+
"[blockfinalize][network][unit]" ) {
34+
ZMQTransportException transportException( "transport", EAGAIN, "Test" );
35+
ParsingException parsingException( "parsing", "Test" );
36+
NetworkProtocolException protocolException( "protocol", "Test" );
37+
InvalidStateException invalidStateException( "state", "Test" );
38+
auto catchesAsTransport = []( const auto& _exception ) {
39+
try {
40+
throw _exception;
41+
} catch ( const ZMQTransportException& ) {
42+
return true;
43+
} catch ( ... ) {
44+
return false;
45+
}
46+
};
47+
48+
CATCH_REQUIRE( transportException.getZmqErrno() == EAGAIN );
49+
CATCH_REQUIRE( catchesAsTransport( transportException ) );
50+
CATCH_REQUIRE_FALSE( catchesAsTransport( parsingException ) );
51+
CATCH_REQUIRE_FALSE( catchesAsTransport( protocolException ) );
52+
CATCH_REQUIRE_FALSE( catchesAsTransport( invalidStateException ) );
53+
}

0 commit comments

Comments
 (0)