1717import java .nio .ByteBuffer ;
1818import java .util .Objects ;
1919import java .util .concurrent .TimeoutException ;
20- import java .util .concurrent .atomic .AtomicBoolean ;
20+ import java .util .concurrent .atomic .AtomicReference ;
2121import java .util .function .BiConsumer ;
2222import java .util .function .Supplier ;
2323
@@ -67,7 +67,7 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
6767 private final AutoLock lock = new AutoLock ();
6868 private final HTTP2ServerConnection _connection ;
6969 private final HttpChannel _httpChannel ;
70- private final AtomicBoolean _channelInUse = new AtomicBoolean ( );
70+ private final AtomicReference < RecycleState > _recycle = new AtomicReference <>( RecycleState . CAN_RECYCLE );
7171 private final HTTP2Stream _stream ;
7272 private MetaData .Request _requestMetaData ;
7373 private MetaData .Response _responseMetaData ;
@@ -77,6 +77,48 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
7777 private boolean committed ;
7878 private boolean _demand ;
7979
80+ /**
81+ * This state machine tracks if the {@link #_httpChannel} can be recycled or not.
82+ * The on*() methods race against succeeded() and they need to figure out if they
83+ * can use the _httpChannel or if it has been recycled, while succeeded() needs to
84+ * know if it can safely recycle the _httpChannel because no on*() method was
85+ * called, or if it cannot.
86+ */
87+ private enum RecycleState
88+ {
89+ /**
90+ * start state
91+ */
92+ CAN_RECYCLE ,
93+
94+ /**
95+ * terminal state
96+ */
97+ RECYCLED ,
98+
99+ /**
100+ * terminal state
101+ */
102+ CANNOT_RECYCLE ;
103+
104+ /**
105+ * Change the current state if it isn't a terminal state.
106+ * @param reference the {@code AtomicReference} containing the state.
107+ * @param newStateIfCanRecycle the state to change to if the current state is {@link #CAN_RECYCLE}.
108+ * @return the previous state.
109+ */
110+ private static RecycleState changeState (AtomicReference <RecycleState > reference , RecycleState newStateIfCanRecycle )
111+ {
112+ assert newStateIfCanRecycle != CAN_RECYCLE ;
113+ return reference .getAndUpdate (currentState -> switch (currentState )
114+ {
115+ case CAN_RECYCLE -> newStateIfCanRecycle ;
116+ case RECYCLED -> RECYCLED ;
117+ case CANNOT_RECYCLE -> CANNOT_RECYCLE ;
118+ });
119+ }
120+ }
121+
80122 public HttpStreamOverHTTP2 (HTTP2ServerConnection connection , HttpChannel httpChannel , HTTP2Stream stream )
81123 {
82124 _connection = connection ;
@@ -162,6 +204,9 @@ private Runnable onBadMessage(HttpException x)
162204 if (LOG .isDebugEnabled ())
163205 LOG .debug ("badMessage {} {}" , this , x );
164206
207+ RecycleState previousState = RecycleState .changeState (_recycle , RecycleState .CANNOT_RECYCLE );
208+ if (previousState == RecycleState .RECYCLED )
209+ return null ;
165210 Throwable failure = (Throwable )x ;
166211 return _httpChannel .onFailure (failure );
167212 }
@@ -617,25 +662,21 @@ public Throwable consumeAvailable()
617662 @ Override
618663 public void onTimeout (TimeoutException timeout , BiConsumer <Runnable , Boolean > consumer )
619664 {
665+ RecycleState previousState = RecycleState .changeState (_recycle , RecycleState .CANNOT_RECYCLE );
666+ if (previousState == RecycleState .RECYCLED )
667+ return ;
620668 HttpChannel .IdleTimeoutTask task = _httpChannel .onIdleTimeout (timeout );
621669 consumer .accept (task .action (), !task .handlingRequest ());
622670 }
623671
624672 @ Override
625673 public Runnable onFailure (Throwable failure , Callback callback )
626674 {
627- Runnable task ;
628- if (_channelInUse .compareAndSet (false , true ))
629- {
630- // This branch prevents the recycling of channels going through on[Remote]Failure().
631- boolean remote = failure instanceof EOFException ;
632- task = remote ? _httpChannel .onRemoteFailure (new EofException (failure )) : _httpChannel .onFailure (failure );
633- }
634- else
635- {
636- // This branch prevents going through on[Remote]Failure() for recycled channels.
637- task = null ;
638- }
675+ RecycleState previousState = RecycleState .changeState (_recycle , RecycleState .CANNOT_RECYCLE );
676+ if (previousState == RecycleState .RECYCLED )
677+ return new FailureTask (null , callback );
678+ boolean remote = failure instanceof EOFException ;
679+ Runnable task = remote ? _httpChannel .onRemoteFailure (new EofException (failure )) : _httpChannel .onFailure (failure );
639680 return new FailureTask (task , callback );
640681 }
641682
@@ -686,8 +727,8 @@ public void succeeded()
686727 }
687728 }
688729
689- // Do not recycle when we are racing against the execution of onFailure()'s Runnable.
690- if (_channelInUse . compareAndSet ( false , true ) )
730+ RecycleState previousState = RecycleState . changeState ( _recycle , RecycleState . RECYCLED );
731+ if (previousState == RecycleState . CAN_RECYCLE )
691732 {
692733 if (_connection .isRecycleHttpChannels ())
693734 {
@@ -720,6 +761,8 @@ public void failed(Throwable x)
720761 LOG .atDebug ().setCause (x ).log ("HTTP2 response #{}/{}: failed {}" , _stream .getId (), Integer .toHexString (_stream .getSession ().hashCode ()), errorCode );
721762 _stream .reset (new ResetFrame (_stream .getId (), errorCode .code ), Callback .NOOP );
722763 }
764+ // Change the state just to make the content of _recycle less surprising when debugging.
765+ _recycle .set (RecycleState .CANNOT_RECYCLE );
723766 }
724767
725768 private class SendTrailers extends Callback .Nested
0 commit comments