Skip to content
This repository was archived by the owner on Jun 21, 2023. It is now read-only.

Commit 2d91fc7

Browse files
authored
Merge pull request #104 from scalecube/fix-sending-complete-signal
Fix sending complete signal
2 parents 392b633 + 73263d9 commit 2d91fc7

File tree

6 files changed

+14
-7
lines changed

6 files changed

+14
-7
lines changed

services-gateway-netty/src/main/java/io/scalecube/services/gateway/GatewaySessionHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public interface GatewaySessionHandler<M> {
2323
* @param req request message (not null)
2424
* @return message
2525
*/
26-
default M mapMessage(GatewaySession session, M req) {
26+
default M mapMessage(GatewaySession session, M req, Context context) {
2727
return req;
2828
}
2929

services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewayAcceptor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.slf4j.Logger;
1313
import org.slf4j.LoggerFactory;
1414
import reactor.core.publisher.Mono;
15+
import reactor.util.context.Context;
1516

1617
public class RSocketGatewayAcceptor implements SocketAcceptor {
1718

@@ -46,7 +47,10 @@ public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket rsocket) {
4647
ServiceMessageCodec messageCodec = new ServiceMessageCodec(headersCodec);
4748
final RSocketGatewaySession gatewaySession =
4849
new RSocketGatewaySession(
49-
serviceCall, metrics, messageCodec, gatewaySessionHandler::mapMessage);
50+
serviceCall,
51+
metrics,
52+
messageCodec,
53+
(session, req) -> gatewaySessionHandler.mapMessage(session, req, Context.empty()));
5054
gatewaySessionHandler.onSessionOpen(gatewaySession);
5155
rsocket
5256
.onClose()

services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ private Mono<GatewayMessage> onRequest(
7878
.flatMap(msg -> onCancel(session, msg))
7979
.map(msg -> validateSid(session, (GatewayMessage) msg))
8080
.map(this::validateQualifier)
81-
.map(msg -> gatewayHandler.mapMessage(session, msg))
81+
.map(msg -> gatewayHandler.mapMessage(session, msg, context))
8282
.doOnNext(request -> onMessage(session, request, context))
8383
.doOnError(
8484
th -> {
@@ -108,10 +108,10 @@ private void onMessage(WebsocketGatewaySession session, GatewayMessage request,
108108
.orElse(serviceStream)
109109
.map(response -> prepareResponse(sid, response, receivedError))
110110
.doOnNext(response -> metrics.markServiceResponse())
111-
.doFinally(signalType -> session.dispose(sid))
112111
.flatMap(session::send)
113112
.doOnError(th -> onError(session, request, th, context))
114113
.doOnComplete(() -> onComplete(session, request, receivedError, context))
114+
.doFinally(signalType -> session.dispose(sid))
115115
.subscriberContext(context)
116116
.subscribe();
117117

services-gateway-tests/src/main/java/io/scalecube/services/testservice/SecuredRsGwGatewaySessionHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.scalecube.services.gateway.GatewaySessionHandler;
66
import org.slf4j.Logger;
77
import org.slf4j.LoggerFactory;
8+
import reactor.util.context.Context;
89

910
public class SecuredRsGwGatewaySessionHandler implements GatewaySessionHandler<ServiceMessage> {
1011
private static final Logger LOGGER =
@@ -16,7 +17,7 @@ public SecuredRsGwGatewaySessionHandler(AuthRegistry authRegistry) {
1617
}
1718

1819
@Override
19-
public ServiceMessage mapMessage(GatewaySession session, ServiceMessage req) {
20+
public ServiceMessage mapMessage(GatewaySession session, ServiceMessage req, Context context) {
2021
return ServiceMessage.from(req).header(AuthRegistry.SESSION_ID, session).build();
2122
}
2223

services-gateway-tests/src/main/java/io/scalecube/services/testservice/SecuredWsGwGatewaySessionManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.scalecube.services.gateway.ws.GatewayMessage;
66
import org.slf4j.Logger;
77
import org.slf4j.LoggerFactory;
8+
import reactor.util.context.Context;
89

910
public class SecuredWsGwGatewaySessionManager implements GatewaySessionHandler<GatewayMessage> {
1011
private static final Logger LOGGER =
@@ -16,7 +17,7 @@ public SecuredWsGwGatewaySessionManager(AuthRegistry authRegistry) {
1617
}
1718

1819
@Override
19-
public GatewayMessage mapMessage(GatewaySession session, GatewayMessage req) {
20+
public GatewayMessage mapMessage(GatewaySession session, GatewayMessage req, Context context) {
2021
return GatewayMessage.from(req).header(AuthRegistry.SESSION_ID, session).build();
2122
}
2223

services-gateway-tests/src/test/java/io/scalecube/services/gateway/TestGatewaySessionHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.scalecube.services.gateway;
22

33
import java.util.concurrent.CountDownLatch;
4+
import reactor.util.context.Context;
45

56
public class TestGatewaySessionHandler implements GatewaySessionHandler {
67

@@ -9,7 +10,7 @@ public class TestGatewaySessionHandler implements GatewaySessionHandler {
910
public final CountDownLatch disconnLatch = new CountDownLatch(1);
1011

1112
@Override
12-
public Object mapMessage(GatewaySession s, Object req) {
13+
public Object mapMessage(GatewaySession s, Object req, Context context) {
1314
msgLatch.countDown();
1415
return req;
1516
}

0 commit comments

Comments
 (0)