2222import org .hyperledger .besu .ethereum .api .jsonrpc .internal .response .RpcErrorType ;
2323
2424import java .io .IOException ;
25- import java .util .Map ;
2625import java .util .Optional ;
27- import java .util .Set ;
28- import java .util .concurrent .CancellationException ;
29- import java .util .concurrent .CompletableFuture ;
30- import java .util .concurrent .CompletionException ;
31- import java .util .concurrent .ConcurrentHashMap ;
32- import java .util .concurrent .ExecutionException ;
33- import java .util .concurrent .TimeUnit ;
34- import java .util .concurrent .TimeoutException ;
3526
3627import io .opentelemetry .api .trace .Tracer ;
3728import io .vertx .core .Handler ;
38- import io .vertx .core .http .HttpConnection ;
3929import io .vertx .ext .web .RoutingContext ;
4030import org .slf4j .Logger ;
4131import org .slf4j .LoggerFactory ;
@@ -48,164 +38,69 @@ private JsonRpcExecutorHandler() {}
4838 public static Handler <RoutingContext > handler (
4939 final JsonRpcExecutor jsonRpcExecutor ,
5040 final Tracer tracer ,
51- final JsonRpcConfiguration jsonRpcConfiguration ,
52- final Map <HttpConnection , Set <InterruptibleCompletableFuture <Void >>>
53- activeRequestsByConnection ) {
54- return ctx ->
55- handleRequest (
56- jsonRpcExecutor , tracer , jsonRpcConfiguration , activeRequestsByConnection , ctx );
57- }
58-
59- private static void handleRequest (
60- final JsonRpcExecutor jsonRpcExecutor ,
61- final Tracer tracer ,
62- final JsonRpcConfiguration jsonRpcConfiguration ,
63- final Map <HttpConnection , Set <InterruptibleCompletableFuture <Void >>>
64- activeRequestsByConnection ,
65- final RoutingContext ctx ) {
66-
67- final long timeoutMillis = jsonRpcConfiguration .getHttpTimeoutSec () * 1000 ;
68-
69- Optional <AbstractJsonRpcExecutor > executorOpt =
70- createExecutor (jsonRpcExecutor , tracer , ctx , jsonRpcConfiguration );
71-
72- if (executorOpt .isEmpty ()) {
73- handleErrorAndEndResponse (ctx , null , RpcErrorType .PARSE_ERROR );
74- return ;
75- }
76-
77- final AbstractJsonRpcExecutor executor = executorOpt .get ();
78- InterruptibleCompletableFuture <Void > executionFuture =
79- executeAsync (executor , ctx , activeRequestsByConnection , timeoutMillis );
80-
81- executionFuture .handle ((result , throwable ) -> handleCompletion (ctx , throwable , timeoutMillis ));
82- }
83-
84- private static InterruptibleCompletableFuture <Void > executeAsync (
85- final AbstractJsonRpcExecutor executor ,
86- final RoutingContext ctx ,
87- final Map <HttpConnection , Set <InterruptibleCompletableFuture <Void >>>
88- activeRequestsByConnection ,
89- final long timeoutMillis ) {
90-
91- final InterruptibleCompletableFuture <Void > executionFuture =
92- new InterruptibleCompletableFuture <>();
93-
94- // Register this request with the connection's active requests
95- if (ctx .request () != null && ctx .request ().connection () != null ) {
96- Set <InterruptibleCompletableFuture <Void >> activeFutures =
97- activeRequestsByConnection .computeIfAbsent (
98- ctx .request ().connection (), k -> ConcurrentHashMap .newKeySet ());
99- activeFutures .add (executionFuture );
100- // Remove when complete
101- executionFuture .whenComplete ((r , t ) -> activeFutures .remove (executionFuture ));
102- }
103-
104- CompletableFuture .runAsync (
105- () -> {
106- executionFuture .setWorkerThread (Thread .currentThread ());
107- try {
108- executor .execute ();
109- executionFuture .complete (null );
110- } catch (IOException e ) {
111- logExecutionError (ctx , executor , e );
112- executionFuture .completeExceptionally (
113- new RuntimeException ("Error executing RPC method" , e ));
114- } catch (Exception e ) {
115- executionFuture .completeExceptionally (e );
116- } finally {
117- executionFuture .clearWorkerThread ();
118- }
119- })
120- .orTimeout (timeoutMillis , TimeUnit .MILLISECONDS )
121- .whenComplete (
122- (result , throwable ) -> {
123- if (throwable != null && !executionFuture .isDone ()) {
124- // For timeout exceptions, cancel with interruption to stop the worker thread
125- Throwable cause = unwrapException (throwable );
126- if (cause instanceof TimeoutException ) {
127- executionFuture .cancel (true );
128- } else {
129- executionFuture .completeExceptionally (throwable );
130- }
131- }
132- });
133-
134- return executionFuture ;
135- }
136-
137- private static Void handleCompletion (
138- final RoutingContext ctx , final Throwable throwable , final long timeoutMillis ) {
139-
140- if (throwable == null ) {
141- return null ; // Successful completion, response already sent
142- }
143-
144- Throwable actualCause = unwrapException (throwable );
145-
146- // Treat both TimeoutException and CancellationException as timeouts
147- // (CancellationException occurs when we cancel the future due to timeout)
148- if (actualCause instanceof TimeoutException || actualCause instanceof CancellationException ) {
149- handleTimeout (ctx , timeoutMillis );
150- } else {
151- handleExecutionError (ctx , actualCause );
152- }
153-
154- return null ;
155- }
156-
157- private static Throwable unwrapException (final Throwable throwable ) {
158- if (throwable instanceof CompletionException && throwable .getCause () != null ) {
159- return throwable .getCause ();
160- }
161- return throwable ;
162- }
163-
164- private static void handleTimeout (final RoutingContext ctx , final long timeoutMillis ) {
165-
166- final String requestBodyAsJson =
167- ctx .get (ContextKey .REQUEST_BODY_AS_JSON_OBJECT .name ()).toString ();
168-
169- LOG .error (
170- "Timeout ({} ms) occurred in JSON-RPC executor for method {}" ,
171- timeoutMillis ,
172- getShortLogString (requestBodyAsJson ));
173- LOG .atTrace ()
174- .setMessage ("Timeout ({} ms) occurred in JSON-RPC executor for method {}" )
175- .addArgument (timeoutMillis )
176- .addArgument (requestBodyAsJson )
177- .log ();
178-
179- // Thread interruption is automatically handled by executeAsync's whenComplete handler
180- handleErrorAndEndResponse (ctx , null , RpcErrorType .TIMEOUT_ERROR );
181- }
182-
183- private static void handleExecutionError (final RoutingContext ctx , final Throwable actualCause ) {
184- Throwable cause =
185- actualCause instanceof ExecutionException ? actualCause .getCause () : actualCause ;
186-
187- final String requestBodyAsJson =
188- ctx .get (ContextKey .REQUEST_BODY_AS_JSON_OBJECT .name ()).toString ();
189-
190- LOG .error (
191- "Exception during RPC execution for method {}" ,
192- getShortLogString (requestBodyAsJson ),
193- cause );
194-
195- handleErrorAndEndResponse (ctx , null , RpcErrorType .INTERNAL_ERROR );
196- }
197-
198- private static void logExecutionError (
199- final RoutingContext ctx , final AbstractJsonRpcExecutor executor , final IOException e ) {
200- final String method = executor .getRpcMethodName (ctx );
201- LOG .error ("{} - Error streaming JSON-RPC response" , method , e );
202-
203- final String requestBodyAsJson =
204- ctx .get (ContextKey .REQUEST_BODY_AS_JSON_OBJECT .name ()).toString ();
205- LOG .atTrace ()
206- .setMessage ("{} - Error streaming JSON-RPC response" )
207- .addArgument (requestBodyAsJson )
208- .log ();
41+ final JsonRpcConfiguration jsonRpcConfiguration ) {
42+ return ctx -> {
43+ long timeoutMillis = jsonRpcConfiguration .getHttpTimeoutSec () * 1000 ;
44+ final long timerId =
45+ ctx .vertx ()
46+ .setTimer (
47+ timeoutMillis ,
48+ id -> {
49+ final String requestBodyAsJson =
50+ ctx .get (ContextKey .REQUEST_BODY_AS_JSON_OBJECT .name ()).toString ();
51+ LOG .error (
52+ "Timeout ({} ms) occurred in JSON-RPC executor for method {}" ,
53+ timeoutMillis ,
54+ getShortLogString (requestBodyAsJson ));
55+ LOG .atTrace ()
56+ .setMessage ("Timeout ({} ms) occurred in JSON-RPC executor for method {}" )
57+ .addArgument (timeoutMillis )
58+ .addArgument (requestBodyAsJson )
59+ .log ();
60+ handleErrorAndEndResponse (ctx , null , RpcErrorType .TIMEOUT_ERROR );
61+ });
62+
63+ ctx .put ("timerId" , timerId );
64+
65+ try {
66+ createExecutor (jsonRpcExecutor , tracer , ctx , jsonRpcConfiguration )
67+ .ifPresentOrElse (
68+ executor -> {
69+ try {
70+ executor .execute ();
71+ } catch (IOException e ) {
72+ final String method = executor .getRpcMethodName (ctx );
73+ final String requestBodyAsJson =
74+ ctx .get (ContextKey .REQUEST_BODY_AS_JSON_OBJECT .name ()).toString ();
75+ LOG .error ("{} - Error streaming JSON-RPC response" , method , e );
76+ LOG .atTrace ()
77+ .setMessage ("{} - Error streaming JSON-RPC response" )
78+ .addArgument (requestBodyAsJson )
79+ .log ();
80+ handleErrorAndEndResponse (ctx , null , RpcErrorType .INTERNAL_ERROR );
81+ } finally {
82+ cancelTimer (ctx );
83+ }
84+ },
85+ () -> {
86+ handleErrorAndEndResponse (ctx , null , RpcErrorType .PARSE_ERROR );
87+ cancelTimer (ctx );
88+ });
89+ } catch (final RuntimeException e ) {
90+ final String requestBodyAsJson =
91+ ctx .get (ContextKey .REQUEST_BODY_AS_JSON_OBJECT .name ()).toString ();
92+ LOG .error (
93+ "Unhandled exception in JSON-RPC executor for method {}" ,
94+ getShortLogString (requestBodyAsJson ),
95+ e );
96+ LOG .atTrace ()
97+ .setMessage ("Unhandled exception in JSON-RPC executor for method {}" )
98+ .addArgument (requestBodyAsJson )
99+ .log ();
100+ handleErrorAndEndResponse (ctx , null , RpcErrorType .INTERNAL_ERROR );
101+ cancelTimer (ctx );
102+ }
103+ };
209104 }
210105
211106 private static Object getShortLogString (final String requestBodyAsJson ) {
@@ -215,6 +110,13 @@ private static Object getShortLogString(final String requestBodyAsJson) {
215110 : requestBodyAsJson .substring (0 , maxLogLength ).concat ("..." );
216111 }
217112
113+ private static void cancelTimer (final RoutingContext ctx ) {
114+ Long timerId = ctx .get ("timerId" );
115+ if (timerId != null ) {
116+ ctx .vertx ().cancelTimer (timerId );
117+ }
118+ }
119+
218120 private static void handleErrorAndEndResponse (
219121 final RoutingContext ctx , final Object id , final RpcErrorType errorType ) {
220122 if (!ctx .response ().ended ()) {
0 commit comments