diff --git a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/factory/rewrite/FluxRewriteFunction.java b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/factory/rewrite/FluxRewriteFunction.java new file mode 100644 index 0000000000..edd3a13f3a --- /dev/null +++ b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/factory/rewrite/FluxRewriteFunction.java @@ -0,0 +1,33 @@ +/* + * Copyright 2013-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.gateway.filter.factory.rewrite; + +import java.util.function.BiFunction; + +import reactor.core.publisher.Flux; + +import org.springframework.web.server.ServerWebExchange; + +/** + * This interface is BETA and may be subject to change in a future release. + * + * @param the type of the first argument to the function + * @param the type of element signaled by the {@link Flux} + */ +public interface FluxRewriteFunction extends BiFunction, Flux> { + +} diff --git a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/factory/rewrite/ModifyResponseBodyGatewayFilterFactory.java b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/factory/rewrite/ModifyResponseBodyGatewayFilterFactory.java index 4e376ca2aa..63faaddfcc 100644 --- a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/factory/rewrite/ModifyResponseBodyGatewayFilterFactory.java +++ b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/factory/rewrite/ModifyResponseBodyGatewayFilterFactory.java @@ -91,8 +91,19 @@ public static class Config { private String newContentType; + /** + * Deprecated in favour of {@link FluxRewriteFunction} & + * {@link MonoRewriteFunction} Use {@link MonoRewriteFunction} for modifying + * non-streaming response body Use {@link FluxRewriteFunction} for modifying + * streaming response body + */ + @Deprecated private RewriteFunction rewriteFunction; + private FluxRewriteFunction fluxRewriteFunction; + + private MonoRewriteFunction monoRewriteFunction; + public Class getInClass() { return inClass; } @@ -138,15 +149,42 @@ public Config setNewContentType(String newContentType) { return this; } + @Deprecated public RewriteFunction getRewriteFunction() { return rewriteFunction; } + public MonoRewriteFunction, Mono> getMonoRewriteFunction() { + return monoRewriteFunction; + } + + public FluxRewriteFunction, Flux> getFluxRewriteFunction() { + return fluxRewriteFunction; + } + + /** + * Deprecated in favour of {@link Config#setMonoRewriteFunction} & + * {@link Config#setFluxRewriteFunction} Use {@link Config#setMonoRewriteFunction} + * for modifying non-streaming response body Use + * {@link Config#setFluxRewriteFunction} for modifying streaming response body + */ + @Deprecated public Config setRewriteFunction(RewriteFunction rewriteFunction) { this.rewriteFunction = rewriteFunction; return this; } + public Config setMonoRewriteFunction(MonoRewriteFunction monoRewriteFunction) { + this.monoRewriteFunction = monoRewriteFunction; + return this; + } + + public Config setFluxRewriteFunction(FluxRewriteFunction fluxRewriteFunction) { + this.fluxRewriteFunction = fluxRewriteFunction; + return this; + } + + @Deprecated public Config setRewriteFunction(Class inClass, Class outClass, RewriteFunction rewriteFunction) { setInClass(inClass); @@ -155,6 +193,21 @@ public Config setRewriteFunction(Class inClass, Class outClass, return this; } + public Config setFluxRewriteFunction(Class inClass, Class outClass, + FluxRewriteFunction fluxRewriteFunction) { + setInClass(inClass); + setOutClass(outClass); + setFluxRewriteFunction(fluxRewriteFunction); + return this; + } + + public Config setMonoRewriteFunction(Class inClass, Class outClass, + MonoRewriteFunction monoRewriteFunction) { + setInClass(inClass); + setOutClass(outClass); + setMonoRewriteFunction(monoRewriteFunction); + return this; + } } public class ModifyResponseGatewayFilter implements GatewayFilter, Ordered { @@ -204,51 +257,99 @@ public ModifiedServerHttpResponse(ServerWebExchange exchange, Config config) { this.config = config; } - @SuppressWarnings("unchecked") @Override + @SuppressWarnings({ "unchecked", "rawtypes" }) public Mono writeWith(Publisher body) { Class inClass = config.getInClass(); Class outClass = config.getOutClass(); - String originalResponseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR); - HttpHeaders httpHeaders = new HttpHeaders(); - // explicitly add it in this way instead of - // 'httpHeaders.setContentType(originalResponseContentType)' - // this will prevent exception in case of using non-standard media - // types like "Content-Type: image" - httpHeaders.add(HttpHeaders.CONTENT_TYPE, originalResponseContentType); - + HttpHeaders httpHeaders = prepareHttpHeaders(); ClientResponse clientResponse = prepareClientResponse(body, httpHeaders); - // TODO: flux or mono - Mono modifiedBody = extractBody(exchange, clientResponse, inClass) - .flatMap(originalBody -> config.getRewriteFunction().apply(exchange, originalBody)) - .switchIfEmpty(Mono.defer(() -> (Mono) config.getRewriteFunction().apply(exchange, null))); + var modifiedBody = extractBody(exchange, clientResponse, inClass); + if (config.getRewriteFunction() != null) { + // TODO: to be removed with removal of rewriteFunction + modifiedBody = modifiedBody + .flatMap(originalBody -> config.getRewriteFunction() + .apply(exchange, originalBody)) + .switchIfEmpty(Mono.defer(() -> (Mono) config.getRewriteFunction() + .apply(exchange, null))); + } + if (config.getMonoRewriteFunction() != null) { + modifiedBody = config.getMonoRewriteFunction().apply(exchange, + modifiedBody); + } - BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, outClass); + BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, + outClass); CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, exchange.getResponse().getHeaders()); - return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> { - Mono messageBody = writeBody(getDelegate(), outputMessage, outClass); - HttpHeaders headers = getDelegate().getHeaders(); - if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING) - || headers.containsKey(HttpHeaders.CONTENT_LENGTH)) { - messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount())); - } + return bodyInserter.insert(outputMessage, new BodyInserterContext()) + .then(Mono.defer(() -> { + Mono messageBody = writeBody(getDelegate(), outputMessage, outClass); + HttpHeaders headers = getDelegate().getHeaders(); + if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING) + || headers.containsKey(HttpHeaders.CONTENT_LENGTH)) { + messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount())); + } + + if (StringUtils.hasText(config.newContentType)) { + headers.set(HttpHeaders.CONTENT_TYPE, config.newContentType); + } + + // TODO: fail if isStreamingMediaType? + return getDelegate().writeWith(messageBody); + })); + } - if (StringUtils.hasText(config.newContentType)) { - headers.set(HttpHeaders.CONTENT_TYPE, config.newContentType); - } + @Override + @SuppressWarnings({ "unchecked", "rawtypes" }) + public Mono writeAndFlushWith( + Publisher> body) { + final var httpHeaders = prepareHttpHeaders(); + final var fluxRewriteConfig = config.getFluxRewriteFunction(); + final var publisher = Flux.from(body).flatMapSequential(r -> r); + final var clientResponse = prepareClientResponse(publisher, httpHeaders); + var modifiedBody = clientResponse.bodyToFlux(config.inClass); + if (config.getRewriteFunction() != null) { + // TODO: to be removed with removal of rewriteFunction + modifiedBody = modifiedBody + .flatMap(originalBody -> config.getRewriteFunction() + .apply(exchange, originalBody)) + .switchIfEmpty(Flux.defer(() -> (Flux) config.getRewriteFunction() + .apply(exchange, null))); + } + if (config.getFluxRewriteFunction() != null) { + modifiedBody = fluxRewriteConfig.apply(exchange, modifiedBody); + } + final var bodyInserter = BodyInserters.fromPublisher(modifiedBody, + config.outClass); + final var outputMessage = new CachedBodyOutputMessage(exchange, + exchange.getResponse().getHeaders()); - // TODO: fail if isStreamingMediaType? - return getDelegate().writeWith(messageBody); - })); + return bodyInserter.insert(outputMessage, new BodyInserterContext()) + .then(Mono.defer(() -> { + final var messageBody = outputMessage.getBody(); + HttpHeaders headers = getDelegate().getHeaders(); + if (StringUtils.hasText(config.newContentType)) { + headers.set(HttpHeaders.CONTENT_TYPE, config.newContentType); + } + return getDelegate() + .writeAndFlushWith(messageBody.map(Flux::just)); + })); } - @Override - public Mono writeAndFlushWith(Publisher> body) { - return writeWith(Flux.from(body).flatMapSequential(p -> p)); + private HttpHeaders prepareHttpHeaders() { + String originalResponseContentType = exchange + .getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR); + HttpHeaders httpHeaders = new HttpHeaders(); + // explicitly add it in this way instead of + // 'httpHeaders.setContentType(originalResponseContentType)' + // this will prevent exception in case of using non-standard media + // types like "Content-Type: image" + httpHeaders.add(HttpHeaders.CONTENT_TYPE, originalResponseContentType); + return httpHeaders; } private ClientResponse prepareClientResponse(Publisher body, HttpHeaders httpHeaders) { diff --git a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/factory/rewrite/MonoRewriteFunction.java b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/factory/rewrite/MonoRewriteFunction.java new file mode 100644 index 0000000000..2c9511eac6 --- /dev/null +++ b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/factory/rewrite/MonoRewriteFunction.java @@ -0,0 +1,34 @@ +/* + * Copyright 2013-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.gateway.filter.factory.rewrite; + +import java.util.function.BiFunction; + +import reactor.core.publisher.Mono; + +import org.springframework.web.server.ServerWebExchange; + + +/** + * This interface is BETA and may be subject to change in a future release. + * + * @param the type of the first argument to the function + * @param the type of element signaled by the {@link Mono} + */ +public interface MonoRewriteFunction extends BiFunction, Mono> { + +}