Skip to content

Commit f52854b

Browse files
authored
Fixes #12323 - AsyncMiddleManServlet response flushing. (#12542)
* Fixed `ProxyWriter` concurrency, now using IteratingCallback to avoid races. * Made `writeProxyResponseContent()` protected so it can be overridden to flush the response content. * Added test case. Signed-off-by: Simone Bordet <[email protected]>
1 parent b834b58 commit f52854b

File tree

4 files changed

+252
-120
lines changed

4 files changed

+252
-120
lines changed

jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncMiddleManServlet.java

Lines changed: 72 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.ArrayList;
2222
import java.util.Collections;
2323
import java.util.List;
24-
import java.util.Objects;
2524
import java.util.Queue;
2625
import java.util.concurrent.TimeUnit;
2726
import java.util.zip.GZIPOutputStream;
@@ -53,6 +52,7 @@
5352
import org.eclipse.jetty.util.CountingCallback;
5453
import org.eclipse.jetty.util.IteratingCallback;
5554
import org.eclipse.jetty.util.component.Destroyable;
55+
import org.eclipse.jetty.util.thread.AutoLock;
5656
import org.slf4j.Logger;
5757
import org.slf4j.LoggerFactory;
5858

@@ -196,7 +196,7 @@ int readClientRequestContent(ServletInputStream input, byte[] buffer) throws IOE
196196
return input.read(buffer);
197197
}
198198

199-
void writeProxyResponseContent(ServletOutputStream output, ByteBuffer content) throws IOException
199+
protected void writeProxyResponseContent(ServletOutputStream output, ByteBuffer content) throws IOException
200200
{
201201
write(output, content);
202202
}
@@ -495,7 +495,7 @@ public void onContent(Response serverResponse, Content.Chunk chunk, Runnable dem
495495

496496
if (committed)
497497
{
498-
proxyWriter.onWritePossible();
498+
proxyWriter.iterate();
499499
}
500500
else
501501
{
@@ -554,7 +554,7 @@ public void onSuccess(final Response serverResponse)
554554
if (_log.isDebugEnabled())
555555
_log.debug("{} downstream content transformation to {} bytes", getRequestId(clientRequest), newContentBytes);
556556

557-
proxyWriter.onWritePossible();
557+
proxyWriter.iterate();
558558
}
559559
}
560560
else
@@ -592,12 +592,13 @@ public void failed(Throwable failure)
592592
}
593593
}
594594

595-
protected class ProxyWriter implements WriteListener
595+
protected class ProxyWriter extends IteratingCallback implements WriteListener
596596
{
597-
private final Queue<Chunk> chunks = new ArrayDeque<>();
597+
private final AutoLock lock = new AutoLock();
598+
private final Queue<BufferWithCallback> chunks = new ArrayDeque<>();
598599
private final HttpServletRequest clientRequest;
599600
private final Response serverResponse;
600-
private Chunk chunk;
601+
private BufferWithCallback chunk;
601602
private boolean writePending;
602603

603604
protected ProxyWriter(HttpServletRequest clientRequest, Response serverResponse)
@@ -610,75 +611,90 @@ public boolean offer(ByteBuffer content, Callback callback)
610611
{
611612
if (_log.isDebugEnabled())
612613
_log.debug("{} proxying content to downstream: {} bytes {}", getRequestId(clientRequest), content.remaining(), callback);
613-
return chunks.offer(new Chunk(content, callback));
614+
try (AutoLock ignored = lock.lock())
615+
{
616+
return chunks.offer(new BufferWithCallback(content, callback));
617+
}
614618
}
615619

616620
@Override
617-
public void onWritePossible() throws IOException
621+
protected Action process() throws Throwable
618622
{
619623
ServletOutputStream output = clientRequest.getAsyncContext().getResponse().getOutputStream();
620624

621-
// If we had a pending write, let's succeed it.
622-
if (writePending)
625+
BufferWithCallback chunk;
626+
try (AutoLock ignored = lock.lock())
623627
{
624-
if (_log.isDebugEnabled())
625-
_log.debug("{} pending async write complete of {} on {}", getRequestId(clientRequest), chunk, output);
626-
writePending = false;
627-
if (succeed(chunk.callback))
628-
return;
628+
chunk = this.chunk = chunks.poll();
629629
}
630+
if (chunk == null)
631+
return Action.IDLE;
630632

631-
int length = 0;
632-
Chunk chunk = null;
633-
while (output.isReady())
634-
{
635-
if (chunk != null)
636-
{
637-
if (_log.isDebugEnabled())
638-
_log.debug("{} async write complete of {} ({} bytes) on {}", getRequestId(clientRequest), chunk, length, output);
639-
if (succeed(chunk.callback))
640-
return;
641-
}
642-
643-
this.chunk = chunk = chunks.poll();
644-
if (chunk == null)
645-
return;
633+
int length = chunk.buffer.remaining();
634+
if (_log.isDebugEnabled())
635+
_log.debug("{} async write of {} ({} bytes) on {}", getRequestId(clientRequest), chunk, length, this);
636+
if (length > 0)
637+
writeProxyResponseContent(output, chunk.buffer);
646638

647-
length = chunk.buffer.remaining();
648-
if (length > 0)
649-
writeProxyResponseContent(output, chunk.buffer);
639+
boolean complete = output.isReady();
640+
try (AutoLock ignored = lock.lock())
641+
{
642+
writePending = !complete;
650643
}
644+
if (complete)
645+
succeeded();
651646

652-
if (_log.isDebugEnabled())
653-
_log.debug("{} async write pending of {} ({} bytes) on {}", getRequestId(clientRequest), chunk, length, output);
654-
writePending = true;
647+
return Action.SCHEDULED;
655648
}
656649

657-
private boolean succeed(Callback callback)
650+
@Override
651+
protected void onSuccess()
658652
{
659-
// Succeeding the callback may cause to reenter in onWritePossible()
660-
// because typically the callback is the one that controls whether the
661-
// content received from the server has been consumed, so succeeding
662-
// the callback causes more content to be received from the server,
663-
// and hence more to be written to the client by onWritePossible().
664-
// A reentrant call to onWritePossible() performs another write,
665-
// which may remain pending, which means that the reentrant call
666-
// to onWritePossible() returns all the way back to just after the
667-
// succeed of the callback. There, we cannot just loop attempting
668-
// write, but we need to check whether we are write pending.
669-
callback.succeeded();
670-
return writePending;
653+
BufferWithCallback chunk;
654+
try (AutoLock ignored = lock.lock())
655+
{
656+
chunk = this.chunk;
657+
this.chunk = null;
658+
}
659+
if (_log.isDebugEnabled())
660+
_log.debug("{} async write complete of {} on {}", getRequestId(clientRequest), chunk, this);
661+
chunk.callback.succeeded();
671662
}
672663

673664
@Override
674-
public void onError(Throwable failure)
665+
protected void onCompleteFailure(Throwable failure)
675666
{
676-
Chunk chunk = this.chunk;
667+
BufferWithCallback chunk;
668+
try (AutoLock ignored = lock.lock())
669+
{
670+
chunk = this.chunk;
671+
this.chunk = null;
672+
}
677673
if (chunk != null)
678674
chunk.callback.failed(failure);
679675
else
680676
serverResponse.abort(failure);
681677
}
678+
679+
@Override
680+
public void onWritePossible()
681+
{
682+
boolean pending;
683+
try (AutoLock ignored = lock.lock())
684+
{
685+
pending = writePending;
686+
}
687+
if (pending)
688+
succeeded();
689+
else
690+
iterate();
691+
}
692+
693+
@Override
694+
public void onError(Throwable failure)
695+
{
696+
failed(failure);
697+
}
682698
}
683699

684700
/**
@@ -858,15 +874,12 @@ public void write(ByteBuffer buffer, Callback callback)
858874
}
859875
}
860876

861-
private static class Chunk
877+
private record BufferWithCallback(ByteBuffer buffer, Callback callback)
862878
{
863-
private final ByteBuffer buffer;
864-
private final Callback callback;
865-
866-
private Chunk(ByteBuffer buffer, Callback callback)
879+
@Override
880+
public String toString()
867881
{
868-
this.buffer = Objects.requireNonNull(buffer);
869-
this.callback = Objects.requireNonNull(callback);
882+
return "%s@%x[buffer=%s,callback=%s]".formatted(getClass().getSimpleName(), hashCode(), buffer, callback);
870883
}
871884
}
872885
}

jetty-ee10/jetty-ee10-proxy/src/test/java/org/eclipse/jetty/ee10/proxy/AsyncMiddleManServletTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.io.IOException;
1919
import java.io.InputStream;
2020
import java.io.InputStreamReader;
21+
import java.io.InterruptedIOException;
2122
import java.io.OutputStream;
2223
import java.lang.management.ManagementFactory;
2324
import java.lang.management.ThreadInfo;
@@ -1638,6 +1639,58 @@ public boolean transform(Source source, Sink sink) throws IOException
16381639
assertNull(zipIn.getNextEntry());
16391640
}
16401641

1642+
@Test
1643+
public void testProxyResponseContentFlush() throws Exception
1644+
{
1645+
CountDownLatch serverLatch = new CountDownLatch(1);
1646+
startServer(new HttpServlet()
1647+
{
1648+
@Override
1649+
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
1650+
{
1651+
try
1652+
{
1653+
ServletOutputStream output = response.getOutputStream();
1654+
output.write(new byte[16]);
1655+
output.flush();
1656+
serverLatch.await(5, TimeUnit.SECONDS);
1657+
output.write(new byte[32]);
1658+
}
1659+
catch (InterruptedException x)
1660+
{
1661+
throw new InterruptedIOException();
1662+
}
1663+
}
1664+
});
1665+
startProxy(new AsyncMiddleManServlet()
1666+
{
1667+
@Override
1668+
protected void writeProxyResponseContent(ServletOutputStream output, ByteBuffer content) throws IOException
1669+
{
1670+
super.writeProxyResponseContent(output, content);
1671+
// Force a flush for every write, to avoid
1672+
// buffering it in the ServletOutputStream.
1673+
if (output.isReady())
1674+
output.flush();
1675+
}
1676+
});
1677+
startClient();
1678+
1679+
CountDownLatch clientLatch = new CountDownLatch(1);
1680+
client.newRequest("localhost", serverConnector.getLocalPort())
1681+
// Tell the server to continue to send data only if we receive the first small chunk.
1682+
.onResponseContent((response, content) -> serverLatch.countDown())
1683+
.timeout(10, TimeUnit.SECONDS)
1684+
.send(result ->
1685+
{
1686+
assertTrue(result.isSucceeded());
1687+
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
1688+
clientLatch.countDown();
1689+
});
1690+
1691+
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
1692+
}
1693+
16411694
private void sleep(long delay)
16421695
{
16431696
try

0 commit comments

Comments
 (0)