Skip to content

Commit 7e4a9be

Browse files
gregwlorbansbordet
authored
Jetty 12.1.x simplified epc no pending (#13372)
Simplified EPC ExecutionStrategy Only have a single attempt at EPC, never loop. Set IDLE before the attempt so a reserved thread can always become the producer added dispatch=true to h2/h3 demand mechanisms to avoid recursion in offerTask --------- Signed-off-by: Ludovic Orban <[email protected]> Signed-off-by: Simone Bordet <[email protected]> Co-authored-by: Ludovic Orban <[email protected]> Co-authored-by: Simone Bordet <[email protected]>
1 parent d39bcd5 commit 7e4a9be

File tree

29 files changed

+560
-418
lines changed

29 files changed

+560
-418
lines changed

jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpChannelOverHTTP2.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,11 +211,11 @@ public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
211211
}
212212

213213
@Override
214-
public void onDataAvailable(Stream stream)
214+
public void onDataAvailable(Stream stream, boolean dispatch)
215215
{
216216
HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment();
217217
Runnable task = channel.onDataAvailable();
218-
connection.offerTask(invoker.offer(task), false);
218+
connection.offerTask(invoker.offer(task), dispatch);
219219
}
220220

221221
@Override

jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,12 @@ public boolean onIdleExpired(TimeoutException timeoutException)
207207
return false;
208208
}
209209

210+
/**
211+
* @param task The task to offer to the connection.
212+
* @param dispatch {@code true} to dispatch the task, {@code false} to produce in the calling thread.
213+
* Callers from application threads should use {@code true}, otherwise they may be arbitrarily
214+
* delayed. Callers from I/O threads should use {@code false} to avoid thread hops.
215+
*/
210216
public void offerTask(Runnable task, boolean dispatch)
211217
{
212218
offerTask(task);

jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ private void onHeaders(HeadersFrame frame, Callback callback)
404404
{
405405
// Offer EOF in case the application calls readData() or demand().
406406
if (offer(Data.eof(getId())))
407-
processData();
407+
processData(true);
408408
if (closed)
409409
getSession().removeStream(this);
410410
}, callback));
@@ -439,7 +439,7 @@ private void onHeaders(HeadersFrame frame, Callback callback)
439439
notifyHeaders(frame, Callback.from(() ->
440440
{
441441
if (eof)
442-
processData();
442+
processData(true);
443443
if (closed)
444444
getSession().removeStream(this);
445445
}, callback));
@@ -475,7 +475,11 @@ private void onData(Data data)
475475
}
476476

477477
if (offer(data))
478-
processData();
478+
{
479+
// Data was not immediately available, it has just
480+
// now been notified to this method from the network.
481+
processData(false);
482+
}
479483
}
480484

481485
private boolean offer(Data data)
@@ -550,10 +554,13 @@ public void demand()
550554
if (LOG.isDebugEnabled())
551555
LOG.debug("Demand, {} data processing for {}", process ? "proceeding" : "stalling", this);
552556
if (process)
553-
processData();
557+
{
558+
// Data is immediately available.
559+
processData(true);
560+
}
554561
}
555562

556-
public void processData()
563+
public void processData(boolean immediate)
557564
{
558565
while (true)
559566
{
@@ -569,7 +576,7 @@ public void processData()
569576
dataDemand = false;
570577
dataStalled = false;
571578
}
572-
notifyDataAvailable();
579+
notifyDataAvailable(immediate);
573580
}
574581
}
575582

@@ -886,12 +893,12 @@ private void notifyHeaders(HeadersFrame frame, Callback callback)
886893
}
887894
}
888895

889-
private void notifyDataAvailable()
896+
private void notifyDataAvailable(boolean immediate)
890897
{
891898
Listener listener = Objects.requireNonNullElse(getListener(), Listener.AUTO_DISCARD);
892899
try
893900
{
894-
listener.onDataAvailable(this);
901+
listener.onDataAvailable(this, immediate);
895902
}
896903
catch (Throwable x)
897904
{

jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,12 +228,13 @@ public default Map<Integer, Integer> onPreface(Session session)
228228
* <p>Applications can detect whether request DATA frames will be arriving
229229
* by testing {@link HeadersFrame#isEndStream()}. If the application is
230230
* interested in processing the DATA frames, it must demand for DATA
231-
* frames using {@link Stream#demand()} and then return either a
232-
* {@link Stream.Listener} implementation that overrides
233-
* {@link Stream.Listener#onDataAvailable(Stream)} where applications can
234-
* read from the {@link Stream} via {@link Stream#readData()}, or
235-
* {@link Stream.Listener#AUTO_DISCARD} that automatically reads and
236-
* discards DATA frames.
231+
* frames using {@link Stream#demand()} and then return a
232+
* {@link Stream.Listener} implementation that overrides either
233+
* {@link Stream.Listener#onDataAvailable(Stream)} or
234+
* {@link Stream.Listener#onDataAvailable(Stream, boolean)},
235+
* where applications can read from the {@link Stream} via
236+
* {@link Stream#readData()}, or return {@link Stream.Listener#AUTO_DISCARD}
237+
* that automatically reads and discards DATA frames.
237238
* Returning {@code null} is possible but discouraged, and has the
238239
* same effect of demanding and discarding the DATA frames.</p>
239240
*

jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java

Lines changed: 48 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public default CompletableFuture<Stream> push(PushPromiseFrame frame, Listener l
129129
* @return a {@link Stream.Data} object containing the DATA frame,
130130
* or null if no DATA frame is available
131131
* @see #demand()
132-
* @see Listener#onDataAvailable(Stream)
132+
* @see Listener#onDataAvailable(Stream, boolean)
133133
*/
134134
public Data readData();
135135

@@ -230,22 +230,22 @@ public default CompletableFuture<Void> reset(ResetFrame frame)
230230

231231
/**
232232
* <p>Demands more {@code DATA} frames for this stream.</p>
233-
* <p>Calling this method causes {@link Listener#onDataAvailable(Stream)}
233+
* <p>Calling this method causes {@link Listener#onDataAvailable(Stream, boolean)}
234234
* to be invoked, possibly at a later time, when the stream has data
235235
* to be read, but also when the stream has reached EOF.</p>
236236
* <p>This method is idempotent: calling it when there already is an
237-
* outstanding demand to invoke {@link Listener#onDataAvailable(Stream)}
237+
* outstanding demand to invoke {@link Listener#onDataAvailable(Stream, boolean)}
238238
* is a no-operation.</p>
239239
* <p>The thread invoking this method may invoke directly
240-
* {@link Listener#onDataAvailable(Stream)}, unless another thread
241-
* that must invoke {@link Listener#onDataAvailable(Stream)}
240+
* {@link Listener#onDataAvailable(Stream, boolean)}, unless another thread
241+
* that must invoke {@link Listener#onDataAvailable(Stream, boolean)}
242242
* notices the outstanding demand first.</p>
243243
* <p>It is always guaranteed that invoking this method from within
244-
* {@code onDataAvailable(Stream)} will not cause a
244+
* {@code onDataAvailable(Stream, boolean)} will not cause a
245245
* {@link StackOverflowError}.</p>
246246
*
247247
* @see #readData()
248-
* @see Listener#onDataAvailable(Stream)
248+
* @see Listener#onDataAvailable(Stream, boolean)
249249
*/
250250
public void demand();
251251

@@ -314,12 +314,12 @@ public default void onHeaders(Stream stream, HeadersFrame frame, Callback callba
314314
* <p>Callback method invoked when a PUSH_PROMISE frame has been received.</p>
315315
* <p>Applications that override this method are typically interested in
316316
* processing the pushed stream DATA frames, and must demand for pushed
317-
* DATA frames via {@link Stream#demand()} and then return either a
318-
* {@link Listener} implementation that overrides
319-
* {@link #onDataAvailable(Stream)} where applications can
320-
* read from the {@link Stream} via {@link Stream#readData()}, or
321-
* {@link #AUTO_DISCARD} that automatically reads and
322-
* discards DATA frames.
317+
* DATA frames via {@link Stream#demand()} and then return a
318+
* {@link Listener} implementation that overrides either
319+
* {@link #onDataAvailable(Stream)} or {@link #onDataAvailable(Stream, boolean)},
320+
* where applications can read from the {@link Stream} via
321+
* {@link Stream#readData()}, or return {@link #AUTO_DISCARD} that automatically
322+
* reads and discards DATA frames.
323323
* Returning {@code null} is possible but discouraged, and has the
324324
* same effect of demanding and discarding the pushed DATA frames.</p>
325325
*
@@ -333,10 +333,33 @@ public default Listener onPush(Stream stream, PushPromiseFrame frame)
333333
return AUTO_DISCARD;
334334
}
335335

336+
/**
337+
* <p>A simplified version of {@link #onDataAvailable(Stream, boolean)}.</p>
338+
* <p>The default implementation of this method reads and discards data.</p>
339+
*
340+
* @param stream the stream
341+
* @see Stream#demand()
342+
*/
343+
default void onDataAvailable(Stream stream)
344+
{
345+
while (true)
346+
{
347+
Data data = stream.readData();
348+
if (data == null)
349+
{
350+
stream.demand();
351+
return;
352+
}
353+
data.release();
354+
if (data.frame().isEndStream())
355+
return;
356+
}
357+
}
358+
336359
/**
337360
* <p>Callback method invoked if the application has expressed
338361
* {@link Stream#demand() demand} for DATA frames, and if there
339-
* may be content available.</p>
362+
* is content available.</p>
340363
* <p>Applications that wish to handle DATA frames should call
341364
* {@link Stream#demand()} for this method to be invoked when
342365
* the data is available.</p>
@@ -358,7 +381,7 @@ public default Listener onPush(Stream stream, PushPromiseFrame frame)
358381
* class MyStreamListener implements Stream.Listener
359382
* {
360383
* @Override
361-
* public void onDataAvailable(Stream stream)
384+
* public void onDataAvailable(Stream stream, boolean immediate)
362385
* {
363386
* // Read a chunk of the content.
364387
* Stream.Data data = stream.readData();
@@ -382,24 +405,20 @@ public default Listener onPush(Stream stream, PushPromiseFrame frame)
382405
* }
383406
* }
384407
* }</pre>
408+
* <p>The default implementation of this method calls
409+
* {@link #onDataAvailable(Stream)}.</p>
385410
*
386411
* @param stream the stream
412+
* @param immediate {@code true} when data is immediately available at the time
413+
* {@link #demand()} is invoked (this method is directly invoked from {@link #demand()};
414+
* {@code false} when data was not immediately available at the time {@link #demand()}
415+
* was called, but is now available (this method is invoked from the network layer,
416+
* not directly from {@link #demand()}
387417
* @see Stream#demand()
388418
*/
389-
public default void onDataAvailable(Stream stream)
419+
default void onDataAvailable(Stream stream, boolean immediate)
390420
{
391-
while (true)
392-
{
393-
Data data = stream.readData();
394-
if (data == null)
395-
{
396-
stream.demand();
397-
return;
398-
}
399-
data.release();
400-
if (data.frame().isEndStream())
401-
return;
402-
}
421+
onDataAvailable(stream);
403422
}
404423

405424
/**
@@ -481,7 +500,7 @@ public String toString()
481500

482501
private static class EOF extends Data
483502
{
484-
public EOF(int streamId)
503+
private EOF(int streamId)
485504
{
486505
super(new DataFrame(streamId, BufferUtil.EMPTY_BUFFER, true));
487506
}

jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,9 @@ public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
155155
}
156156

157157
@Override
158-
public void onDataAvailable(Stream stream)
158+
public void onDataAvailable(Stream stream, boolean immediate)
159159
{
160-
getConnection().onDataAvailable(stream);
160+
getConnection().onDataAvailable(stream, immediate);
161161
}
162162

163163
@Override

jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HTTP2ServerConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public void onNewStream(HTTP2Stream stream, HeadersFrame frame)
149149
offerTask(task, false);
150150
}
151151

152-
public void onDataAvailable(Stream stream)
152+
public void onDataAvailable(Stream stream, boolean dispatch)
153153
{
154154
if (LOG.isDebugEnabled())
155155
LOG.debug("Processing data available on {}", stream);
@@ -159,7 +159,7 @@ public void onDataAvailable(Stream stream)
159159
{
160160
Runnable task = channel.onDataAvailable();
161161
if (task != null)
162-
offerTask(task, false);
162+
offerTask(task, dispatch);
163163
}
164164
}
165165

jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HttpStreamOverHTTP2.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,11 @@ else if (!_demand)
222222
{
223223
Runnable task = _httpChannel.onContentAvailable();
224224
if (task != null)
225-
_connection.offerTask(task, false);
225+
{
226+
// We must dispatch, so an application thread does not
227+
// become a producer and then consume another request.
228+
_connection.offerTask(task, true);
229+
}
226230
}
227231
else if (demand)
228232
{

jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpChannelOverHTTP3.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,21 +142,21 @@ public void onNewStream(Stream.Client stream)
142142
public void onResponse(Stream.Client stream, HeadersFrame frame)
143143
{
144144
Runnable task = receiver.onResponse(stream, frame);
145-
getHttpConnection().offerTask(invoker.offer(task));
145+
getHttpConnection().offerTask(invoker.offer(task), false);
146146
}
147147

148148
@Override
149-
public void onDataAvailable(Stream.Client stream)
149+
public void onDataAvailable(Stream.Client stream, boolean immediate)
150150
{
151151
Runnable task = receiver.onDataAvailable();
152-
getHttpConnection().offerTask(invoker.offer(task));
152+
getHttpConnection().offerTask(invoker.offer(task), immediate);
153153
}
154154

155155
@Override
156156
public void onTrailer(Stream.Client stream, HeadersFrame frame)
157157
{
158158
Runnable task = receiver.onTrailer(frame);
159-
getHttpConnection().offerTask(invoker.offer(task));
159+
getHttpConnection().offerTask(invoker.offer(task), false);
160160
}
161161

162162
@Override

jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpConnectionOverHTTP3.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,10 +184,10 @@ public boolean onIdleTimeout(long idleTimeout, Throwable failure)
184184
return false;
185185
}
186186

187-
void offerTask(Runnable task)
187+
void offerTask(Runnable task, boolean dispatch)
188188
{
189189
if (task != null)
190-
getSession().getProtocolSession().offerTask(task);
190+
getSession().getProtocolSession().offerTask(task, dispatch);
191191
}
192192

193193
@Override

0 commit comments

Comments
 (0)