1717import java .nio .ByteBuffer ;
1818import java .util .Objects ;
1919import java .util .concurrent .TimeoutException ;
20+ import java .util .concurrent .atomic .AtomicBoolean ;
2021import java .util .function .BiConsumer ;
2122import java .util .function .Supplier ;
2223
@@ -63,16 +64,17 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
6364{
6465 private static final Logger LOG = LoggerFactory .getLogger (HttpStreamOverHTTP2 .class );
6566
66- private final AutoLock lock = new AutoLock ();
67+ private final AutoLock _lock = new AutoLock ();
6768 private final HTTP2ServerConnection _connection ;
6869 private final HttpChannel _httpChannel ;
70+ private final AtomicBoolean _recycle = new AtomicBoolean (); // Set to true when _httpChannel has been recycled or cannot be recycled anymore.
6971 private final HTTP2Stream _stream ;
7072 private MetaData .Request _requestMetaData ;
7173 private MetaData .Response _responseMetaData ;
72- private TunnelSupport tunnelSupport ;
74+ private TunnelSupport _tunnelSupport ;
7375 private Content .Chunk _chunk ;
7476 private Content .Chunk _trailer ;
75- private boolean committed ;
77+ private boolean _committed ;
7678 private boolean _demand ;
7779
7880 public HttpStreamOverHTTP2 (HTTP2ServerConnection connection , HttpChannel httpChannel , HTTP2Stream stream )
@@ -105,7 +107,7 @@ public Runnable onRequest(HeadersFrame frame)
105107
106108 if (frame .isEndStream ())
107109 {
108- try (AutoLock ignored = lock .lock ())
110+ try (AutoLock ignored = _lock .lock ())
109111 {
110112 _chunk = Content .Chunk .EOF ;
111113 }
@@ -114,7 +116,7 @@ public Runnable onRequest(HeadersFrame frame)
114116 HttpFields fields = _requestMetaData .getHttpFields ();
115117
116118 if (_requestMetaData instanceof MetaData .ConnectRequest )
117- tunnelSupport = new TunnelSupportOverHTTP2 (_requestMetaData .getProtocol ());
119+ _tunnelSupport = new TunnelSupportOverHTTP2 (_requestMetaData .getProtocol ());
118120
119121 if (LOG .isDebugEnabled ())
120122 {
@@ -161,20 +163,20 @@ private Runnable onBadMessage(HttpException x)
161163 LOG .debug ("badMessage {} {}" , this , x );
162164
163165 Throwable failure = (Throwable )x ;
164- return _httpChannel . onFailure (failure );
166+ return onFailure (failure , Callback . NOOP );
165167 }
166168
167169 @ Override
168170 public Content .Chunk read ()
169171 {
170172 // Tunnel requests do not have HTTP content, avoid
171173 // returning chunks meant for a different protocol.
172- if (tunnelSupport != null )
174+ if (_tunnelSupport != null )
173175 return null ;
174176
175177 // Check if there already is a chunk, e.g. EOF.
176178 Content .Chunk chunk ;
177- try (AutoLock ignored = lock .lock ())
179+ try (AutoLock ignored = _lock .lock ())
178180 {
179181 chunk = _chunk ;
180182 _chunk = Content .Chunk .next (chunk );
@@ -190,7 +192,7 @@ public Content.Chunk read()
190192 if (data .frame ().isEndStream ())
191193 {
192194 Content .Chunk trailer ;
193- try (AutoLock ignored = lock .lock ())
195+ try (AutoLock ignored = _lock .lock ())
194196 {
195197 trailer = _trailer ;
196198 if (trailer != null )
@@ -207,7 +209,7 @@ public Content.Chunk read()
207209 // the two actions cancel each other, no need to further retain or release.
208210 chunk = createChunk (data );
209211
210- try (AutoLock ignored = lock .lock ())
212+ try (AutoLock ignored = _lock .lock ())
211213 {
212214 _chunk = Content .Chunk .next (chunk );
213215 }
@@ -219,7 +221,7 @@ public void demand()
219221 {
220222 boolean notify = false ;
221223 boolean demand = false ;
222- try (AutoLock ignored = lock .lock ())
224+ try (AutoLock ignored = _lock .lock ())
223225 {
224226 if (_chunk != null || _trailer != null )
225227 notify = true ;
@@ -245,7 +247,7 @@ else if (demand)
245247 @ Override
246248 public Runnable onDataAvailable ()
247249 {
248- try (AutoLock ignored = lock .lock ())
250+ try (AutoLock ignored = _lock .lock ())
249251 {
250252 _demand = false ;
251253 }
@@ -264,7 +266,7 @@ public Runnable onDataAvailable()
264266 public Runnable onTrailer (HeadersFrame frame )
265267 {
266268 HttpFields trailers = frame .getMetaData ().getHttpFields ().asImmutable ();
267- try (AutoLock ignored = lock .lock ())
269+ try (AutoLock ignored = _lock .lock ())
268270 {
269271 _trailer = new Trailers (trailers );
270272 }
@@ -331,7 +333,7 @@ private void sendHeaders(MetaData.Request request, MetaData.Response response, B
331333 }
332334 else
333335 {
334- committed = true ;
336+ _committed = true ;
335337 if (last )
336338 {
337339 long realContentLength = BufferUtil .length (content );
@@ -580,7 +582,7 @@ private void sendTrailersFrame(MetaData metaData, Callback callback)
580582 @ Override
581583 public boolean isCommitted ()
582584 {
583- return committed ;
585+ return _committed ;
584586 }
585587
586588 @ Override
@@ -593,13 +595,13 @@ public boolean isIdle()
593595 @ Override
594596 public TunnelSupport getTunnelSupport ()
595597 {
596- return tunnelSupport ;
598+ return _tunnelSupport ;
597599 }
598600
599601 @ Override
600602 public Throwable consumeAvailable ()
601603 {
602- if (tunnelSupport != null )
604+ if (_tunnelSupport != null )
603605 return null ;
604606 Throwable result = HttpStream .consumeAvailable (this , _httpChannel .getConnectionMetaData ().getHttpConfiguration ());
605607 if (result != null )
@@ -615,13 +617,22 @@ public Throwable consumeAvailable()
615617 @ Override
616618 public void onTimeout (TimeoutException timeout , BiConsumer <Runnable , Boolean > consumer )
617619 {
620+ boolean wasRecycled = !_recycle .compareAndSet (false , true );
621+ if (wasRecycled )
622+ {
623+ consumer .accept (null , true );
624+ return ;
625+ }
618626 HttpChannel .IdleTimeoutTask task = _httpChannel .onIdleTimeout (timeout );
619627 consumer .accept (task .action (), !task .handlingRequest ());
620628 }
621629
622630 @ Override
623631 public Runnable onFailure (Throwable failure , Callback callback )
624632 {
633+ boolean wasRecycled = !_recycle .compareAndSet (false , true );
634+ if (wasRecycled )
635+ return new FailureTask (null , callback );
625636 boolean remote = failure instanceof EOFException ;
626637 Runnable task = remote ? _httpChannel .onRemoteFailure (new EofException (failure )) : _httpChannel .onFailure (failure );
627638 return new FailureTask (task , callback );
@@ -643,7 +654,7 @@ public void succeeded()
643654 }
644655 else
645656 {
646- EndPoint endPoint = tunnelSupport .getEndPoint ();
657+ EndPoint endPoint = _tunnelSupport .getEndPoint ();
647658 _stream .setAttachment (endPoint );
648659 endPoint .upgrade (connection );
649660 }
@@ -673,8 +684,13 @@ public void succeeded()
673684 }
674685 }
675686 }
676- _httpChannel .recycle ();
677- _connection .offerHttpChannel (_httpChannel );
687+
688+ boolean canRecycle = _recycle .compareAndSet (false , true );
689+ if (canRecycle )
690+ {
691+ _httpChannel .recycle ();
692+ _connection .offerHttpChannel (_httpChannel );
693+ }
678694 }
679695
680696 @ Override
@@ -700,6 +716,7 @@ public void failed(Throwable x)
700716 LOG .atDebug ().setCause (x ).log ("HTTP2 response #{}/{}: failed {}" , _stream .getId (), Integer .toHexString (_stream .getSession ().hashCode ()), errorCode );
701717 _stream .reset (new ResetFrame (_stream .getId (), errorCode .code ), Callback .NOOP );
702718 }
719+ _recycle .set (true );
703720 }
704721
705722 private class SendTrailers extends Callback .Nested
0 commit comments