Skip to content

Commit e803b94

Browse files
committed
feat: 安全响应流处理接口
1 parent b05d32a commit e803b94

File tree

9 files changed

+191
-13
lines changed

9 files changed

+191
-13
lines changed

forest-core/src/main/java/com/dtflys/forest/backend/httpclient/response/HttpclientEntity.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ public Header getContentEncoding() {
6161
public InputStream getContent() throws IOException, UnsupportedOperationException {
6262
if (isStreaming()) {
6363
final InputStream in = entity.getContent();
64+
if (request.isSSE()) {
65+
return in;
66+
}
6467
final ByteArrayOutputStream out = new ByteArrayOutputStream();
6568
if (contentLength < 0) {
6669
contentLength = getContentLength();

forest-core/src/main/java/com/dtflys/forest/backend/httpclient/response/HttpclientForestResponse.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,7 @@ private void setupResponseCharset() {
8888
private void setupContent() {
8989
if (content == null) {
9090
final Class<?> resultClass = ReflectUtils.toClass(request.getLifeCycleHandler().getResultType());
91-
if (request.isDownloadFile()
92-
|| InputStream.class.isAssignableFrom(request.getMethod().getReturnClass())
93-
|| (resultClass != null && InputStream.class.isAssignableFrom(resultClass))
91+
if (request.isReceiveStream()
9492
|| (contentType != null && contentType.canReadAsBinaryStream())) {
9593
final StringBuilder builder = new StringBuilder();
9694
builder.append("[stream content-type: ")
@@ -173,6 +171,7 @@ public byte[] getByteArray() throws IOException {
173171
return null;
174172
} else {
175173
try {
174+
bytesRead = true;
176175
bytes = EntityUtils.toByteArray(entity);
177176
} finally {
178177
close();

forest-core/src/main/java/com/dtflys/forest/backend/okhttp3/response/OkHttp3ForestResponse.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,7 @@ public OkHttp3ForestResponse(ForestRequest request, Response okResponse, Date re
6565
* @date 2021/12/8 23:51
6666
**/
6767
private void setupContent() {
68-
if (request.isDownloadFile()
69-
|| InputStream.class.isAssignableFrom(request.getMethod().getReturnClass())
70-
|| InputStream.class.isAssignableFrom(ReflectUtils.toClass(request.getLifeCycleHandler().getResultType()))
68+
if (request.isReceiveStream()
7169
|| (contentType != null && contentType.canReadAsBinaryStream())) {
7270
final StringBuilder builder = new StringBuilder();
7371
builder.append("[stream content-type: ")
@@ -198,6 +196,7 @@ public byte[] getByteArray() throws Exception {
198196
return null;
199197
} else {
200198
try {
199+
bytesRead = true;
201200
bytes = body.bytes();
202201
} finally {
203202
close();
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.dtflys.forest.callback;
2+
3+
import com.dtflys.forest.http.ForestRequest;
4+
import com.dtflys.forest.http.ForestResponse;
5+
6+
import java.io.InputStream;
7+
8+
/**
9+
* 回调函数: 处理数据流时调用
10+
*
11+
* @since 1.6.0
12+
*/
13+
@FunctionalInterface
14+
public interface OnStream<R> {
15+
16+
/**
17+
* 回调函数: 处理数据流时调用
18+
* <p>在该函数被回调时,HTTP 响应种的数据流已被自动打开,调用结束后也会自动关闭流</p>
19+
*
20+
* @param in 数据流: {@link InputStream}对象
21+
* @param req Forest 请求对象
22+
* @param res Forest 响应对象
23+
* @since 1.6.0
24+
*/
25+
R onStream(InputStream in, ForestRequest req, ForestResponse res);
26+
}

forest-core/src/main/java/com/dtflys/forest/http/ForestRequest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
package com.dtflys.forest.http;
2626

27+
import cn.hutool.core.lang.func.Consumer3;
2728
import com.dtflys.forest.auth.BasicAuth;
2829
import com.dtflys.forest.auth.ForestAuthenticator;
2930
import com.dtflys.forest.backend.ContentType;
@@ -37,6 +38,7 @@
3738
import com.dtflys.forest.callback.OnResponse;
3839
import com.dtflys.forest.callback.OnRetry;
3940
import com.dtflys.forest.callback.OnSaveCookie;
41+
import com.dtflys.forest.callback.OnStream;
4042
import com.dtflys.forest.callback.OnSuccess;
4143
import com.dtflys.forest.callback.RetryWhen;
4244
import com.dtflys.forest.callback.SuccessWhen;
@@ -78,6 +80,7 @@
7880
import com.dtflys.forest.ssl.SSLUtils;
7981
import com.dtflys.forest.ssl.TrustAllHostnameVerifier;
8082
import com.dtflys.forest.utils.ForestDataType;
83+
import com.dtflys.forest.utils.ReflectUtils;
8184
import com.dtflys.forest.utils.RequestNameValue;
8285
import com.dtflys.forest.utils.StringUtils;
8386
import com.dtflys.forest.utils.TimeUtils;
@@ -91,6 +94,7 @@
9194
import java.io.InputStream;
9295
import java.lang.reflect.Constructor;
9396
import java.lang.reflect.InvocationTargetException;
97+
import java.lang.reflect.ParameterizedType;
9498
import java.lang.reflect.Type;
9599
import java.net.URI;
96100
import java.nio.charset.Charset;
@@ -111,6 +115,7 @@
111115
import java.util.concurrent.ConcurrentHashMap;
112116
import java.util.concurrent.Future;
113117
import java.util.concurrent.TimeUnit;
118+
import java.util.function.BiConsumer;
114119
import java.util.function.Consumer;
115120
import java.util.function.Function;
116121
import java.util.stream.Collectors;
@@ -4081,6 +4086,29 @@ public ForestRequest<T> retryWhen(Class<? extends RetryWhen> conditionClass) {
40814086
public boolean isDownloadFile() {
40824087
return isDownloadFile;
40834088
}
4089+
4090+
public boolean isReceiveStream() {
4091+
if (isDownloadFile()) {
4092+
return true;
4093+
}
4094+
if (InputStream.class.isAssignableFrom(getMethod().getReturnClass())) {
4095+
return true;
4096+
}
4097+
Type resultType = getLifeCycleHandler().getResultType();
4098+
if (InputStream.class.isAssignableFrom(ReflectUtils.toClass(resultType))) {
4099+
return true;
4100+
}
4101+
if (ForestResponse.class.isAssignableFrom(ReflectUtils.toClass(resultType))) {
4102+
ParameterizedType parameterizedType = ReflectUtils.toParameterizedType(resultType);
4103+
if (parameterizedType != null) {
4104+
Type argType = parameterizedType.getActualTypeArguments()[0];
4105+
if (InputStream.class.isAssignableFrom(ReflectUtils.toClass(argType))) {
4106+
return true;
4107+
}
4108+
}
4109+
}
4110+
return false;
4111+
}
40844112

40854113
/**
40864114
* 设置该请求是否下载文件
@@ -5266,6 +5294,36 @@ public ForestResponse executeAsResponse() {
52665294
return execute(ForestResponse.class);
52675295
}
52685296

5297+
/**
5298+
* 执行请求发送过程,获取输入流类型结果,并流作为回调函数的参数进行调用,无返回值
5299+
* <p>该接口会安全处理响应体数据流,在回调函数被回调时,流已被自动打开,调用结束后也会自动关闭流</p>
5300+
*
5301+
* @param consumer 回调函数
5302+
* @since 1.6.0
5303+
*/
5304+
public void executeAsStream(Consumer3<InputStream, ForestRequest, ForestResponse> consumer) {
5305+
final ForestResponse<InputStream> response = execute(new TypeReference<ForestResponse<InputStream>>() {});
5306+
response.openStream((in, _res) -> {
5307+
consumer.accept(in, this, response);
5308+
});
5309+
}
5310+
5311+
/**
5312+
* 执行请求发送过程,获取输入流类型结果,并流作为回调函数的参数进行调用,最终返回回调函数中的返回值
5313+
* <p>该接口会安全处理响应体数据流,在回调函数被回调时,流已被自动打开,调用结束后也会自动关闭流</p>
5314+
*
5315+
* @param onStream 回调函数
5316+
* @return 回调函数中的返回值
5317+
* @param <R> 回调函数中的返回值类型
5318+
* @since 1.6.0
5319+
*/
5320+
public <R> R executeAsStream(OnStream<R> onStream) {
5321+
final ForestResponse<InputStream> response = execute(new TypeReference<ForestResponse<InputStream>>() {});
5322+
return response.openStream((in, _res) -> {
5323+
return onStream.onStream(in, this, response);
5324+
});
5325+
}
5326+
52695327
/**
52705328
* 执行请求发送过程
52715329
*

forest-core/src/main/java/com/dtflys/forest/http/ForestResponse.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ public abstract class ForestResponse<T> extends ResultGetter implements HasURL,
7373
*/
7474
protected volatile boolean closed = false;
7575

76+
protected volatile boolean bytesRead = false;
77+
7678
/**
7779
* 是否为Gzip压缩
7880
*/
@@ -794,6 +796,10 @@ protected String byteToString(byte[] bytes) throws IOException {
794796
public boolean isClosed() {
795797
return closed;
796798
}
799+
800+
public boolean isBytesRead() {
801+
return bytesRead;
802+
}
797803

798804
public abstract void close();
799805

forest-core/src/main/java/com/dtflys/forest/http/ForestResultGetter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public interface ForestResultGetter {
2828

2929
<T> T getByPath(String path, Type type);
3030

31-
ResultGetter accept(BiConsumer<InputStream, ForestResponse> consumer);
31+
ResultGetter openStream(BiConsumer<InputStream, ForestResponse> consumer);
3232

33-
<R> R accept(BiFunction<InputStream, ForestResponse, R> function);
33+
<R> R openStream(BiFunction<InputStream, ForestResponse, R> function);
3434
}

forest-core/src/main/java/com/dtflys/forest/http/ResultGetter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public <T> T getByPath(String path, Type type) {
113113
* @since 1.6.0
114114
*/
115115
@Override
116-
public ResultGetter accept(BiConsumer<InputStream, ForestResponse> consumer) {
116+
public ResultGetter openStream(BiConsumer<InputStream, ForestResponse> consumer) {
117117
final ForestResponse response = getResponse();
118118
try (final InputStream in = response.getInputStream()) {
119119
consumer.accept(in, response);
@@ -132,7 +132,7 @@ public ResultGetter accept(BiConsumer<InputStream, ForestResponse> consumer) {
132132
* @since 1.6.0
133133
*/
134134
@Override
135-
public <R> R accept(BiFunction<InputStream, ForestResponse, R> function) {
135+
public <R> R openStream(BiFunction<InputStream, ForestResponse, R> function) {
136136
final ForestResponse response = getResponse();
137137
try (final InputStream in = response.getInputStream()) {
138138
final Object ret = function.apply(in, response);

forest-core/src/test/java/com/dtflys/forest/test/http/TestDownloadClient.java

Lines changed: 90 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package com.dtflys.forest.test.http;
22

33
import com.dtflys.forest.Forest;
4+
import com.dtflys.forest.annotation.Request;
45
import com.dtflys.forest.config.ForestConfiguration;
56
import com.dtflys.forest.http.ForestResponse;
7+
import com.dtflys.forest.interceptor.ResponseResult;
68
import com.dtflys.forest.test.http.client.DownloadClient;
79
import com.dtflys.forest.utils.ForestProgress;
10+
import com.dtflys.forest.utils.TypeReference;
811
import okhttp3.mockwebserver.MockResponse;
912
import okhttp3.mockwebserver.MockWebServer;
1013
import okio.Buffer;
@@ -29,6 +32,7 @@
2932
import java.util.concurrent.atomic.AtomicReference;
3033

3134
import static org.assertj.core.api.Assertions.assertThat;
35+
import static org.assertj.core.api.Assertions.in;
3236

3337
/**
3438
* @author gongjun[dt_flys@hotmail.com]
@@ -319,10 +323,62 @@ public void testDownloadWithQuickAcceptStream() throws IOException {
319323
buffer.readAll(Okio.sink(bytesOut));
320324
byte[] out = bytesOut.toByteArray();
321325
AtomicBoolean acceptDone = new AtomicBoolean(false);
326+
AtomicBoolean responseBytesRead = new AtomicBoolean(false);
322327

323328
Forest.get("http://localhost:{}/download/test-img.jpg", server.getPort())
324-
.executeAsResponse()
325-
.accept((in, res) -> {
329+
.onResponse((req, res) -> {
330+
if (res.isError()) {
331+
return ResponseResult.error();
332+
}
333+
res.openStream((in, _res) -> {
334+
responseBytesRead.set(res.isBytesRead());
335+
System.out.println("Accept stream");
336+
try {
337+
byte[] fileBytes = IOUtils.toByteArray(in);
338+
assertThat(fileBytes)
339+
.hasSize(out.length)
340+
.isEqualTo(out);
341+
acceptDone.set(true);
342+
} catch (IOException e) {
343+
throw new RuntimeException(e);
344+
}
345+
});
346+
return ResponseResult.proceed();
347+
})
348+
.execute(new TypeReference<ForestResponse<InputStream>>() {});
349+
assertThat(acceptDone.get()).isTrue();
350+
assertThat(responseBytesRead.get()).isFalse();
351+
}
352+
353+
354+
@Test
355+
public void testDownloadWithInputStreamResponse() throws IOException {
356+
Buffer buffer = getImageBuffer();
357+
server.enqueue(new MockResponse().setBody(buffer));
358+
359+
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
360+
buffer.readAll(Okio.sink(bytesOut));
361+
362+
ForestResponse<InputStream> res = Forest.get("http://localhost:{}/download/test-img.jpg", server.getPort())
363+
.execute(new TypeReference<ForestResponse<InputStream>>() {});
364+
assertThat(res.isBytesRead()).isFalse();
365+
}
366+
367+
368+
@Test
369+
public void testDownloadWithExecuteAsStream() throws IOException {
370+
Buffer buffer = getImageBuffer();
371+
server.enqueue(new MockResponse().setBody(buffer));
372+
373+
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
374+
buffer.readAll(Okio.sink(bytesOut));
375+
byte[] out = bytesOut.toByteArray();
376+
AtomicBoolean acceptDone = new AtomicBoolean(false);
377+
AtomicBoolean responseBytesRead = new AtomicBoolean(false);
378+
379+
Forest.get("http://localhost:{}/download/test-img.jpg", server.getPort())
380+
.executeAsStream((in, req, res) -> {
381+
responseBytesRead.set(res.isBytesRead());
326382
System.out.println("Accept stream");
327383
try {
328384
byte[] fileBytes = IOUtils.toByteArray(in);
@@ -335,9 +391,40 @@ public void testDownloadWithQuickAcceptStream() throws IOException {
335391
}
336392
});
337393
assertThat(acceptDone.get()).isTrue();
394+
assertThat(responseBytesRead.get()).isFalse();
395+
}
396+
397+
398+
@Test
399+
public void testDownloadWithExecuteAsStream2() throws IOException {
400+
Buffer buffer = getImageBuffer();
401+
server.enqueue(new MockResponse().setBody(buffer));
402+
403+
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
404+
buffer.readAll(Okio.sink(bytesOut));
405+
byte[] out = bytesOut.toByteArray();
406+
AtomicBoolean acceptDone = new AtomicBoolean(false);
407+
AtomicBoolean responseBytesRead = new AtomicBoolean(false);
408+
409+
byte[] bytes = Forest.get("http://localhost:{}/download/test-img.jpg", server.getPort())
410+
.executeAsStream((in, req, res) -> {
411+
responseBytesRead.set(res.isBytesRead());
412+
System.out.println("Accept stream");
413+
try {
414+
byte[] fileBytes = IOUtils.toByteArray(in);
415+
acceptDone.set(true);
416+
return fileBytes;
417+
} catch (IOException e) {
418+
throw new RuntimeException(e);
419+
}
420+
});
421+
assertThat(bytes).isEqualTo(out);
422+
assertThat(acceptDone.get()).isTrue();
423+
assertThat(responseBytesRead.get()).isFalse();
338424
}
339425

340426

427+
341428
@Test
342429
public void testDownloadWithQuickAcceptStreamFunction() throws IOException {
343430
Buffer buffer = getImageBuffer();
@@ -350,7 +437,7 @@ public void testDownloadWithQuickAcceptStreamFunction() throws IOException {
350437

351438
byte[] fileBytes = Forest.get("http://localhost:{}/download/test-img.jpg", server.getPort())
352439
.executeAsResponse()
353-
.accept((in, res) -> {
440+
.openStream((in, res) -> {
354441
try {
355442
return IOUtils.toByteArray(in);
356443
} catch (IOException e) {

0 commit comments

Comments
 (0)