Skip to content
This repository was archived by the owner on Jun 21, 2023. It is now read-only.

Commit 19dd20c

Browse files
authored
Merge pull request #122 from scalecube/upgrade-scalecube-services
Upgrade scalecube services
2 parents 4dd951d + fa7e3f8 commit 19dd20c

File tree

25 files changed

+249
-344
lines changed

25 files changed

+249
-344
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
<properties>
2525
<scalecube-commons.version>1.0.2</scalecube-commons.version>
26-
<scalecube-services.version>2.9.1-RC1</scalecube-services.version>
26+
<scalecube-services.version>2.9.1-RC4</scalecube-services.version>
2727
<scalecube-benchmarks.version>1.2.2</scalecube-benchmarks.version>
2828
<scalecube-config.version>0.4.3</scalecube-config.version>
2929
<reactor.version>Dysprosium-SR7</reactor.version>

services-gateway-benchmarks/src/main/java/io/scalecube/services/benchmarks/gateway/distributed/DistributedBenchmarkState.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ protected void beforeAll() throws Exception {
4040
.gateway(opts -> new HttpGateway(opts.id("http")))
4141
.discovery(ScalecubeServiceDiscovery::new)
4242
.transport(RSocketServiceTransport::new)
43-
.metrics(registry())
4443
.startAwait();
4544

4645
Address seedAddress = gateway.discovery().address();

services-gateway-benchmarks/src/main/java/io/scalecube/services/benchmarks/gateway/standalone/StandaloneBenchmarkState.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ protected void beforeAll() throws Exception {
3939
.gateway(opts -> new HttpGateway(opts.id("http")))
4040
.discovery(ScalecubeServiceDiscovery::new)
4141
.transport(RSocketServiceTransport::new)
42-
.metrics(registry())
4342
.startAwait();
4443
}
4544

services-gateway-netty/src/main/java/io/scalecube/services/gateway/GatewayTemplate.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,9 @@ public abstract class GatewayTemplate implements Gateway {
2626
}
2727

2828
protected final GatewayOptions options;
29-
protected final GatewayMetrics gatewayMetrics;
3029

3130
protected GatewayTemplate(GatewayOptions options) {
3231
this.options = new GatewayOptions(options);
33-
this.gatewayMetrics = new GatewayMetrics(this.options.id(), this.options.metrics());
3432
}
3533

3634
@Override
@@ -43,26 +41,16 @@ public final String id() {
4341
*
4442
* @param loopResources loop resources
4543
* @param port listen port
46-
* @param metrics gateway metrics
4744
* @return http server
4845
*/
49-
protected HttpServer prepareHttpServer(
50-
LoopResources loopResources, int port, GatewayMetrics metrics) {
46+
protected HttpServer prepareHttpServer(LoopResources loopResources, int port) {
5147
return HttpServer.create()
5248
.tcpConfiguration(
5349
tcpServer -> {
5450
if (loopResources != null) {
5551
tcpServer = tcpServer.runOn(loopResources);
5652
}
57-
if (metrics != null) {
58-
tcpServer =
59-
tcpServer.doOnConnection(
60-
connection -> {
61-
metrics.incConnection();
62-
connection.onDispose(metrics::decConnection);
63-
});
64-
}
65-
return tcpServer.addressSupplier(() -> new InetSocketAddress(port));
53+
return tcpServer.bindAddress(() -> new InetSocketAddress(port));
6654
});
6755
}
6856

services-gateway-netty/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,9 @@
55
import io.netty.handler.codec.http.cors.CorsConfigBuilder;
66
import io.netty.handler.codec.http.cors.CorsHandler;
77
import io.scalecube.net.Address;
8-
import io.scalecube.services.ServiceCall;
98
import io.scalecube.services.gateway.Gateway;
10-
import io.scalecube.services.gateway.GatewayMetrics;
119
import io.scalecube.services.gateway.GatewayOptions;
1210
import io.scalecube.services.gateway.GatewayTemplate;
13-
import io.scalecube.services.gateway.ReferenceCountUtil;
1411
import java.net.InetSocketAddress;
1512
import java.util.Map.Entry;
1613
import java.util.StringJoiner;
@@ -111,13 +108,11 @@ private CorsConfigBuilder copy(CorsConfigBuilder other) {
111108
public Mono<Gateway> start() {
112109
return Mono.defer(
113110
() -> {
114-
ServiceCall serviceCall =
115-
options.call().requestReleaser(ReferenceCountUtil::safestRelease);
116-
HttpGatewayAcceptor acceptor = new HttpGatewayAcceptor(serviceCall, gatewayMetrics);
111+
HttpGatewayAcceptor acceptor = new HttpGatewayAcceptor(options.call());
117112

118113
loopResources = LoopResources.create("http-gateway");
119114

120-
return prepareHttpServer(loopResources, options.port(), null /*metrics*/)
115+
return prepareHttpServer(loopResources, options.port())
121116
.handle(acceptor)
122117
.bind()
123118
.doOnSuccess(server -> this.server = server)
@@ -137,24 +132,15 @@ public Mono<Void> stop() {
137132
.then();
138133
}
139134

140-
protected HttpServer prepareHttpServer(
141-
LoopResources loopResources, int port, GatewayMetrics metrics) {
135+
protected HttpServer prepareHttpServer(LoopResources loopResources, int port) {
142136
return HttpServer.create()
143137
.tcpConfiguration(
144138
tcpServer -> {
145139
if (loopResources != null) {
146140
tcpServer = tcpServer.runOn(loopResources);
147141
}
148-
if (metrics != null) {
149-
tcpServer =
150-
tcpServer.doOnConnection(
151-
connection -> {
152-
metrics.incConnection();
153-
connection.onDispose(metrics::decConnection);
154-
});
155-
}
156142
return tcpServer
157-
.addressSupplier(() -> new InetSocketAddress(port))
143+
.bindAddress(() -> new InetSocketAddress(port))
158144
.doOnConnection(
159145
connection -> {
160146
if (corsEnabled) {

services-gateway-netty/src/main/java/io/scalecube/services/gateway/http/HttpGatewayAcceptor.java

Lines changed: 18 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,13 @@
1010
import io.netty.buffer.ByteBufAllocator;
1111
import io.netty.buffer.ByteBufOutputStream;
1212
import io.netty.buffer.Unpooled;
13-
import io.netty.handler.codec.http.HttpHeaders;
1413
import io.netty.handler.codec.http.HttpResponseStatus;
1514
import io.scalecube.services.ServiceCall;
1615
import io.scalecube.services.api.ErrorData;
1716
import io.scalecube.services.api.ServiceMessage;
18-
import io.scalecube.services.api.ServiceMessage.Builder;
1917
import io.scalecube.services.exceptions.DefaultErrorMapper;
20-
import io.scalecube.services.gateway.GatewayMetrics;
2118
import io.scalecube.services.gateway.ReferenceCountUtil;
2219
import io.scalecube.services.transport.api.DataCodec;
23-
import java.util.Optional;
2420
import java.util.function.BiFunction;
2521
import org.reactivestreams.Publisher;
2622
import org.slf4j.Logger;
@@ -35,17 +31,10 @@ public class HttpGatewayAcceptor
3531

3632
private static final Logger LOGGER = LoggerFactory.getLogger(HttpGatewayAcceptor.class);
3733

38-
private static final String SERVICE_RECV_TIME = "service-recv-time";
39-
private static final String SERVICE_SEND_TIME = "service-send-time";
40-
private static final String CLIENT_RECV_TIME = "client-recv-time";
41-
private static final String CLIENT_SEND_TIME = "client-send-time";
42-
4334
private final ServiceCall serviceCall;
44-
private final GatewayMetrics metrics;
4535

46-
HttpGatewayAcceptor(ServiceCall serviceCall, GatewayMetrics metrics) {
36+
HttpGatewayAcceptor(ServiceCall serviceCall) {
4737
this.serviceCall = serviceCall;
48-
this.metrics = metrics;
4938
}
5039

5140
@Override
@@ -66,33 +55,31 @@ public Publisher<Void> apply(HttpServerRequest httpRequest, HttpServerResponse h
6655
.aggregate()
6756
.switchIfEmpty(Mono.defer(() -> ByteBufMono.just(Unpooled.EMPTY_BUFFER)))
6857
.map(ByteBuf::retain)
69-
.doOnNext(content -> metrics.markRequest())
7058
.flatMap(content -> handleRequest(content, httpRequest, httpResponse))
71-
.doOnSuccess(avoid -> metrics.markResponse())
7259
.onErrorResume(t -> error(httpResponse, DefaultErrorMapper.INSTANCE.toMessage(t)));
7360
}
7461

7562
private Mono<Void> handleRequest(
7663
ByteBuf content, HttpServerRequest httpRequest, HttpServerResponse httpResponse) {
7764

78-
String qualifier = httpRequest.uri();
79-
Builder builder = ServiceMessage.builder().qualifier(qualifier).data(content);
80-
enrichRequest(httpRequest.requestHeaders(), builder);
65+
ServiceMessage request =
66+
ServiceMessage.builder().qualifier(httpRequest.uri()).data(content).build();
8167

8268
return serviceCall
83-
.requestOne(builder.build())
84-
.doOnNext(message -> metrics.markServiceResponse())
85-
.switchIfEmpty(
86-
Mono.defer(() -> Mono.just(ServiceMessage.builder().qualifier(qualifier).build())))
69+
.requestOne(request)
70+
.switchIfEmpty(Mono.defer(() -> emptyMessage(httpRequest)))
71+
.doOnError(th -> releaseRequestOnError(request))
8772
.flatMap(
88-
response -> {
89-
enrichResponse(httpResponse, response);
90-
return response.isError() // check error
91-
? error(httpResponse, response)
92-
: response.hasData() // check data
93-
? ok(httpResponse, response)
94-
: noContent(httpResponse);
95-
});
73+
response ->
74+
response.isError() // check error
75+
? error(httpResponse, response)
76+
: response.hasData() // check data
77+
? ok(httpResponse, response)
78+
: noContent(httpResponse));
79+
}
80+
81+
private Mono<ServiceMessage> emptyMessage(HttpServerRequest httpRequest) {
82+
return Mono.just(ServiceMessage.builder().qualifier(httpRequest.uri()).build());
9683
}
9784

9885
private Publisher<Void> methodNotAllowed(HttpServerResponse httpResponse) {
@@ -140,31 +127,7 @@ private ByteBuf encodeData(Object data, String dataFormat) {
140127
return byteBuf;
141128
}
142129

143-
private void enrichRequest(HttpHeaders requestHeaders, Builder builder) {
144-
Optional.ofNullable(requestHeaders.get(CLIENT_SEND_TIME))
145-
.ifPresent(value -> builder.header(CLIENT_SEND_TIME, value));
146-
147-
Optional.ofNullable(requestHeaders.get(CLIENT_RECV_TIME))
148-
.ifPresent(value -> builder.header(CLIENT_RECV_TIME, value));
149-
150-
Optional.ofNullable(requestHeaders.get(SERVICE_RECV_TIME))
151-
.ifPresent(value -> builder.header(SERVICE_RECV_TIME, value));
152-
153-
Optional.ofNullable(requestHeaders.get(SERVICE_SEND_TIME))
154-
.ifPresent(value -> builder.header(SERVICE_SEND_TIME, value));
155-
}
156-
157-
private void enrichResponse(HttpServerResponse httpResponse, ServiceMessage response) {
158-
Optional.ofNullable(response.header(CLIENT_SEND_TIME))
159-
.ifPresent(value -> httpResponse.header(CLIENT_SEND_TIME, value));
160-
161-
Optional.ofNullable(response.header(CLIENT_RECV_TIME))
162-
.ifPresent(value -> httpResponse.header(CLIENT_RECV_TIME, value));
163-
164-
Optional.ofNullable(response.header(SERVICE_RECV_TIME))
165-
.ifPresent(value -> httpResponse.header(SERVICE_RECV_TIME, value));
166-
167-
Optional.ofNullable(response.header(SERVICE_SEND_TIME))
168-
.ifPresent(value -> httpResponse.header(SERVICE_SEND_TIME, value));
130+
private void releaseRequestOnError(ServiceMessage request) {
131+
ReferenceCountUtil.safestRelease(request.data());
169132
}
170133
}

services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGateway.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,11 @@
55
import io.rsocket.transport.netty.server.CloseableChannel;
66
import io.rsocket.transport.netty.server.WebsocketServerTransport;
77
import io.scalecube.net.Address;
8-
import io.scalecube.services.ServiceCall;
98
import io.scalecube.services.api.ServiceMessage;
109
import io.scalecube.services.gateway.Gateway;
1110
import io.scalecube.services.gateway.GatewayOptions;
1211
import io.scalecube.services.gateway.GatewaySessionHandler;
1312
import io.scalecube.services.gateway.GatewayTemplate;
14-
import io.scalecube.services.gateway.ReferenceCountUtil;
1513
import java.net.InetSocketAddress;
1614
import java.util.StringJoiner;
1715
import org.slf4j.Logger;
@@ -24,40 +22,36 @@ public class RSocketGateway extends GatewayTemplate {
2422

2523
private static final Logger LOGGER = LoggerFactory.getLogger(RSocketGateway.class);
2624

27-
private final GatewaySessionHandler<ServiceMessage> gatewaySessionHandler;
25+
private final GatewaySessionHandler<ServiceMessage> sessionHandler;
2826
private CloseableChannel server;
2927
private LoopResources loopResources;
3028

3129
public RSocketGateway(GatewayOptions options) {
3230
super(options);
33-
this.gatewaySessionHandler = GatewaySessionHandler.DEFAULT_RS_INSTANCE;
31+
this.sessionHandler = GatewaySessionHandler.DEFAULT_RS_INSTANCE;
3432
}
3533

3634
public RSocketGateway(
37-
GatewayOptions options, GatewaySessionHandler<ServiceMessage> gatewaySessionHandler) {
35+
GatewayOptions options, GatewaySessionHandler<ServiceMessage> sessionHandler) {
3836
super(options);
39-
this.gatewaySessionHandler = gatewaySessionHandler;
37+
this.sessionHandler = sessionHandler;
4038
}
4139

4240
@Override
4341
public Mono<Gateway> start() {
4442
return Mono.defer(
4543
() -> {
46-
ServiceCall serviceCall =
47-
options.call().requestReleaser(ReferenceCountUtil::safestRelease);
4844
RSocketGatewayAcceptor acceptor =
49-
new RSocketGatewayAcceptor(serviceCall, gatewayMetrics, gatewaySessionHandler);
45+
new RSocketGatewayAcceptor(options.call(), sessionHandler);
5046

5147
loopResources = LoopResources.create("rsocket-gateway");
5248

5349
WebsocketServerTransport rsocketTransport =
54-
WebsocketServerTransport.create(
55-
prepareHttpServer(loopResources, options.port(), gatewayMetrics));
50+
WebsocketServerTransport.create(prepareHttpServer(loopResources, options.port()));
5651

5752
return RSocketServer.create()
5853
.acceptor(acceptor)
5954
.payloadDecoder(PayloadDecoder.DEFAULT)
60-
.errorConsumer(th -> LOGGER.warn("Exception occurred at rsocket gateway: " + th))
6155
.bind(rsocketTransport)
6256
.doOnSuccess(server -> this.server = server)
6357
.thenReturn(this);

services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewayAcceptor.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import io.rsocket.SocketAcceptor;
66
import io.scalecube.services.ServiceCall;
77
import io.scalecube.services.api.ServiceMessage;
8-
import io.scalecube.services.gateway.GatewayMetrics;
98
import io.scalecube.services.gateway.GatewaySessionHandler;
109
import io.scalecube.services.gateway.ServiceMessageCodec;
1110
import io.scalecube.services.transport.api.HeadersCodec;
@@ -19,23 +18,18 @@ public class RSocketGatewayAcceptor implements SocketAcceptor {
1918
private static final Logger LOGGER = LoggerFactory.getLogger(RSocketGatewayAcceptor.class);
2019

2120
private final ServiceCall serviceCall;
22-
private final GatewayMetrics metrics;
23-
private final GatewaySessionHandler<ServiceMessage> gatewaySessionHandler;
21+
private final GatewaySessionHandler<ServiceMessage> sessionHandler;
2422

2523
/**
2624
* Creates new acceptor for RS gateway.
2725
*
2826
* @param serviceCall to call remote service
29-
* @param metrics to report events
30-
* @param gatewaySessionHandler handler for session events
27+
* @param sessionHandler handler for session events
3128
*/
3229
public RSocketGatewayAcceptor(
33-
ServiceCall serviceCall,
34-
GatewayMetrics metrics,
35-
GatewaySessionHandler<ServiceMessage> gatewaySessionHandler) {
30+
ServiceCall serviceCall, GatewaySessionHandler<ServiceMessage> sessionHandler) {
3631
this.serviceCall = serviceCall;
37-
this.metrics = metrics;
38-
this.gatewaySessionHandler = gatewaySessionHandler;
32+
this.sessionHandler = sessionHandler;
3933
}
4034

4135
@Override
@@ -48,16 +42,15 @@ public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket rsocket) {
4842
final RSocketGatewaySession gatewaySession =
4943
new RSocketGatewaySession(
5044
serviceCall,
51-
metrics,
5245
messageCodec,
53-
(session, req) -> gatewaySessionHandler.mapMessage(session, req, Context.empty()));
54-
gatewaySessionHandler.onSessionOpen(gatewaySession);
46+
(session, req) -> sessionHandler.mapMessage(session, req, Context.empty()));
47+
sessionHandler.onSessionOpen(gatewaySession);
5548
rsocket
5649
.onClose()
5750
.doOnTerminate(
5851
() -> {
5952
LOGGER.info("Client disconnected: {}", rsocket);
60-
gatewaySessionHandler.onSessionClose(gatewaySession);
53+
sessionHandler.onSessionClose(gatewaySession);
6154
})
6255
.subscribe(null, th -> LOGGER.error("Exception on closing rsocket: {}", th.toString()));
6356

0 commit comments

Comments
 (0)