Skip to content
This repository was archived by the owner on Jun 21, 2023. It is now read-only.

Commit f9dbf5a

Browse files
authored
Merge pull request #161 from scalecube/error-mapper-tests
Add ability use custom error mapper
2 parents 30ebf5b + 9ff527a commit f9dbf5a

30 files changed

+546
-70
lines changed

services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
55
import io.scalecube.services.api.ErrorData;
66
import io.scalecube.services.api.ServiceMessage;
7-
import io.scalecube.services.exceptions.DefaultErrorMapper;
87
import io.scalecube.services.gateway.transport.GatewayClientCodec;
98
import io.scalecube.services.transport.api.ReferenceCountUtil;
109
import java.nio.channels.ClosedChannelException;
@@ -168,10 +167,7 @@ private void handleResponse(
168167
if (signal == Signal.ERROR) {
169168
// decode error data to retrieve real error cause
170169
ServiceMessage errorMessage = codec.decodeData(response, ErrorData.class);
171-
Throwable error = DefaultErrorMapper.INSTANCE.toError(errorMessage);
172-
String sid = response.header(STREAM_ID);
173-
LOGGER.error("Received error response: sid={}, error={}", sid, error);
174-
onError.accept(error);
170+
onNext.accept(errorMessage);
175171
}
176172
} else {
177173
// handle normal response

services-gateway-netty/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import io.netty.handler.codec.http.cors.CorsConfigBuilder;
66
import io.netty.handler.codec.http.cors.CorsHandler;
77
import io.scalecube.net.Address;
8+
import io.scalecube.services.exceptions.DefaultErrorMapper;
9+
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
810
import io.scalecube.services.gateway.Gateway;
911
import io.scalecube.services.gateway.GatewayOptions;
1012
import io.scalecube.services.gateway.GatewayTemplate;
@@ -20,6 +22,8 @@
2022

2123
public class HttpGateway extends GatewayTemplate {
2224

25+
private final ServiceProviderErrorMapper errorMapper;
26+
2327
private DisposableServer server;
2428
private LoopResources loopResources;
2529

@@ -31,7 +35,12 @@ public class HttpGateway extends GatewayTemplate {
3135
.allowedRequestMethods(HttpMethod.POST);
3236

3337
public HttpGateway(GatewayOptions options) {
38+
this(options, DefaultErrorMapper.INSTANCE);
39+
}
40+
41+
public HttpGateway(GatewayOptions options, ServiceProviderErrorMapper errorMapper) {
3442
super(options);
43+
this.errorMapper = errorMapper;
3544
}
3645

3746
private HttpGateway(HttpGateway other) {
@@ -40,6 +49,7 @@ private HttpGateway(HttpGateway other) {
4049
this.loopResources = other.loopResources;
4150
this.corsEnabled = other.corsEnabled;
4251
this.corsConfigBuilder = copy(other.corsConfigBuilder);
52+
this.errorMapper = other.errorMapper;
4353
}
4454

4555
/**
@@ -108,7 +118,7 @@ private CorsConfigBuilder copy(CorsConfigBuilder other) {
108118
public Mono<Gateway> start() {
109119
return Mono.defer(
110120
() -> {
111-
HttpGatewayAcceptor acceptor = new HttpGatewayAcceptor(options.call());
121+
HttpGatewayAcceptor acceptor = new HttpGatewayAcceptor(options.call(), errorMapper);
112122

113123
loopResources = LoopResources.create("http-gateway");
114124

services-gateway-netty/src/main/java/io/scalecube/services/gateway/http/HttpGatewayAcceptor.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.scalecube.services.api.ErrorData;
1616
import io.scalecube.services.api.ServiceMessage;
1717
import io.scalecube.services.exceptions.DefaultErrorMapper;
18+
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
1819
import io.scalecube.services.gateway.ReferenceCountUtil;
1920
import io.scalecube.services.transport.api.DataCodec;
2021
import java.util.function.BiFunction;
@@ -34,9 +35,15 @@ public class HttpGatewayAcceptor
3435
private static final String ERROR_NAMESPACE = "io.scalecube.services.error";
3536

3637
private final ServiceCall serviceCall;
38+
private final ServiceProviderErrorMapper errorMapper;
3739

3840
HttpGatewayAcceptor(ServiceCall serviceCall) {
41+
this(serviceCall, DefaultErrorMapper.INSTANCE);
42+
}
43+
44+
HttpGatewayAcceptor(ServiceCall serviceCall, ServiceProviderErrorMapper errorMapper) {
3945
this.serviceCall = serviceCall;
46+
this.errorMapper = errorMapper;
4047
}
4148

4249
@Override
@@ -58,8 +65,7 @@ public Publisher<Void> apply(HttpServerRequest httpRequest, HttpServerResponse h
5865
.switchIfEmpty(Mono.defer(() -> ByteBufMono.just(Unpooled.EMPTY_BUFFER)))
5966
.map(ByteBuf::retain)
6067
.flatMap(content -> handleRequest(content, httpRequest, httpResponse))
61-
.onErrorResume(
62-
t -> error(httpResponse, DefaultErrorMapper.INSTANCE.toMessage(ERROR_NAMESPACE, t)));
68+
.onErrorResume(t -> error(httpResponse, errorMapper.toMessage(ERROR_NAMESPACE, t)));
6369
}
6470

6571
private Mono<Void> handleRequest(

services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGateway.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import io.rsocket.transport.netty.server.CloseableChannel;
66
import io.rsocket.transport.netty.server.WebsocketServerTransport;
77
import io.scalecube.net.Address;
8+
import io.scalecube.services.exceptions.DefaultErrorMapper;
9+
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
810
import io.scalecube.services.gateway.Gateway;
911
import io.scalecube.services.gateway.GatewayOptions;
1012
import io.scalecube.services.gateway.GatewaySessionHandler;
@@ -22,25 +24,45 @@ public class RSocketGateway extends GatewayTemplate {
2224
private static final Logger LOGGER = LoggerFactory.getLogger(RSocketGateway.class);
2325

2426
private final GatewaySessionHandler sessionHandler;
27+
private final ServiceProviderErrorMapper errorMapper;
28+
2529
private CloseableChannel server;
2630
private LoopResources loopResources;
2731

2832
public RSocketGateway(GatewayOptions options) {
29-
super(options);
30-
this.sessionHandler = GatewaySessionHandler.DEFAULT_INSTANCE;
33+
this(options, GatewaySessionHandler.DEFAULT_INSTANCE, DefaultErrorMapper.INSTANCE);
3134
}
3235

3336
public RSocketGateway(GatewayOptions options, GatewaySessionHandler sessionHandler) {
37+
this(options, sessionHandler, DefaultErrorMapper.INSTANCE);
38+
}
39+
40+
public RSocketGateway(GatewayOptions options, ServiceProviderErrorMapper errorMapper) {
41+
this(options, GatewaySessionHandler.DEFAULT_INSTANCE, errorMapper);
42+
}
43+
44+
/**
45+
* Constructor.
46+
*
47+
* @param options gateway options
48+
* @param sessionHandler session handler
49+
* @param errorMapper error mapper
50+
*/
51+
public RSocketGateway(
52+
GatewayOptions options,
53+
GatewaySessionHandler sessionHandler,
54+
ServiceProviderErrorMapper errorMapper) {
3455
super(options);
3556
this.sessionHandler = sessionHandler;
57+
this.errorMapper = errorMapper;
3658
}
3759

3860
@Override
3961
public Mono<Gateway> start() {
4062
return Mono.defer(
4163
() -> {
4264
RSocketGatewayAcceptor acceptor =
43-
new RSocketGatewayAcceptor(options.call(), sessionHandler);
65+
new RSocketGatewayAcceptor(options.call(), sessionHandler, errorMapper);
4466

4567
loopResources = LoopResources.create("rsocket-gateway");
4668

services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewayAcceptor.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.rsocket.RSocket;
55
import io.rsocket.SocketAcceptor;
66
import io.scalecube.services.ServiceCall;
7+
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
78
import io.scalecube.services.gateway.GatewaySessionHandler;
89
import io.scalecube.services.gateway.ServiceMessageCodec;
910
import io.scalecube.services.transport.api.HeadersCodec;
@@ -19,16 +20,22 @@ public class RSocketGatewayAcceptor implements SocketAcceptor {
1920

2021
private final ServiceCall serviceCall;
2122
private final GatewaySessionHandler sessionHandler;
23+
private final ServiceProviderErrorMapper errorMapper;
2224

2325
/**
2426
* Creates new acceptor for RS gateway.
2527
*
2628
* @param serviceCall to call remote service
2729
* @param sessionHandler handler for session events
30+
* @param errorMapper error mapper
2831
*/
29-
public RSocketGatewayAcceptor(ServiceCall serviceCall, GatewaySessionHandler sessionHandler) {
32+
public RSocketGatewayAcceptor(
33+
ServiceCall serviceCall,
34+
GatewaySessionHandler sessionHandler,
35+
ServiceProviderErrorMapper errorMapper) {
3036
this.serviceCall = serviceCall;
3137
this.sessionHandler = sessionHandler;
38+
this.errorMapper = errorMapper;
3239
}
3340

3441
@Override
@@ -43,7 +50,8 @@ public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket rsocket) {
4350
serviceCall,
4451
messageCodec,
4552
headers(messageCodec, setup),
46-
(session, req) -> sessionHandler.mapMessage(session, req, Context.empty()));
53+
(session, req) -> sessionHandler.mapMessage(session, req, Context.empty()),
54+
errorMapper);
4755
sessionHandler.onSessionOpen(gatewaySession);
4856
rsocket
4957
.onClose()

services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewaySession.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import io.rsocket.util.ByteBufPayload;
66
import io.scalecube.services.ServiceCall;
77
import io.scalecube.services.api.ServiceMessage;
8-
import io.scalecube.services.exceptions.DefaultErrorMapper;
8+
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
99
import io.scalecube.services.gateway.GatewaySession;
1010
import io.scalecube.services.gateway.ReferenceCountUtil;
1111
import io.scalecube.services.gateway.ServiceMessageCodec;
@@ -31,23 +31,27 @@ public final class RSocketGatewaySession extends AbstractRSocket implements Gate
3131
private final long sessionId;
3232
private final BiFunction<GatewaySession, ServiceMessage, ServiceMessage> messageMapper;
3333
private final Map<String, String> headers;
34+
private final ServiceProviderErrorMapper errorMapper;
3435

3536
/**
3637
* Constructor for gateway rsocket.
3738
*
3839
* @param serviceCall service call coming from microservices.
3940
* @param messageCodec message messageCodec.
41+
* @param errorMapper error mapper
4042
*/
4143
public RSocketGatewaySession(
4244
ServiceCall serviceCall,
4345
ServiceMessageCodec messageCodec,
4446
Map<String, String> headers,
45-
BiFunction<GatewaySession, ServiceMessage, ServiceMessage> messageMapper) {
47+
BiFunction<GatewaySession, ServiceMessage, ServiceMessage> messageMapper,
48+
ServiceProviderErrorMapper errorMapper) {
4649
this.serviceCall = serviceCall;
4750
this.messageCodec = messageCodec;
4851
this.messageMapper = messageMapper;
4952
this.sessionId = SESSION_ID_GENERATOR.incrementAndGet();
5053
this.headers = Collections.unmodifiableMap(new HashMap<>(headers));
54+
this.errorMapper = errorMapper;
5155
}
5256

5357
@Override
@@ -73,8 +77,7 @@ public Mono<Payload> requestResponse(Payload payload) {
7377
return serviceCall
7478
.requestOne(request)
7579
.doOnError(th -> releaseRequestOnError(request))
76-
.onErrorResume(
77-
th -> Mono.just(DefaultErrorMapper.INSTANCE.toMessage(request.qualifier(), th)))
80+
.onErrorResume(th -> Mono.just(errorMapper.toMessage(request.qualifier(), th)))
7881
.map(this::toPayload);
7982
});
8083
}
@@ -87,8 +90,7 @@ public Flux<Payload> requestStream(Payload payload) {
8790
return serviceCall
8891
.requestMany(request)
8992
.doOnError(th -> releaseRequestOnError(request))
90-
.onErrorResume(
91-
th -> Mono.just(DefaultErrorMapper.INSTANCE.toMessage(request.qualifier(), th)))
93+
.onErrorResume(th -> Mono.just(errorMapper.toMessage(request.qualifier(), th)))
9294
.map(this::toPayload);
9395
});
9496
}

services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/GatewayMessages.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.scalecube.services.gateway.ws;
22

33
import io.scalecube.services.api.ServiceMessage;
4-
import io.scalecube.services.exceptions.DefaultErrorMapper;
4+
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
55

66
public final class GatewayMessages {
77

@@ -34,19 +34,22 @@ public static ServiceMessage newCancelMessage(long sid, String qualifier) {
3434
/**
3535
* Returns error message by given arguments.
3636
*
37-
* @param message request
37+
* @param errorMapper error mapper
38+
* @param request request
3839
* @param th cause
3940
* @return {@link ServiceMessage} instance as the error signal
4041
*/
41-
public static ServiceMessage newErrorMessage(ServiceMessage message, Throwable th) {
42-
ServiceMessage.Builder builder =
43-
ServiceMessage.from(DefaultErrorMapper.INSTANCE.toMessage(message.qualifier(), th))
44-
.header(SIGNAL_FIELD, Signal.ERROR.code());
45-
String sid = message.header(STREAM_ID_FIELD);
46-
if (sid != null) {
47-
builder.header(STREAM_ID_FIELD, sid);
42+
public static ServiceMessage toErrorResponse(
43+
ServiceProviderErrorMapper errorMapper, ServiceMessage request, Throwable th) {
44+
ServiceMessage errorMessage = errorMapper.toMessage(request.qualifier(), th);
45+
String sid = request.header(STREAM_ID_FIELD);
46+
if (sid == null) {
47+
return ServiceMessage.from(errorMessage).header(SIGNAL_FIELD, Signal.ERROR.code()).build();
4848
}
49-
return builder.build();
49+
return ServiceMessage.from(errorMessage)
50+
.header(SIGNAL_FIELD, Signal.ERROR.code())
51+
.header(STREAM_ID_FIELD, sid)
52+
.build();
5053
}
5154

5255
/**

services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketContextException.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,6 @@ public static WebsocketContextException badRequest(String errorMessage, ServiceM
2121
new io.scalecube.services.exceptions.BadRequestException(errorMessage), request, null);
2222
}
2323

24-
public static WebsocketContextException wrap(
25-
Throwable th, ServiceMessage request, ServiceMessage response) {
26-
return new WebsocketContextException(th, request, response);
27-
}
28-
2924
public ServiceMessage request() {
3025
return request;
3126
}

0 commit comments

Comments
 (0)