Skip to content
This repository was archived by the owner on Jun 21, 2023. It is now read-only.

Commit afb0b07

Browse files
authored
Merge pull request #135 from scalecube/copy-msg-hdrs-corresp-client-trnsprt
Passing client headers on connection setup
2 parents ba3507e + 8fb5ed2 commit afb0b07

File tree

13 files changed

+113
-23
lines changed

13 files changed

+113
-23
lines changed

services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
import io.scalecube.services.exceptions.DefaultErrorMapper;
55
import io.scalecube.services.exceptions.ServiceClientErrorMapper;
66
import java.time.Duration;
7+
import java.util.Collections;
8+
import java.util.HashMap;
9+
import java.util.Map;
710
import reactor.netty.tcp.SslProvider;
811

912
public class GatewayClientSettings {
@@ -20,6 +23,7 @@ public class GatewayClientSettings {
2023
private final ServiceClientErrorMapper errorMapper;
2124
private final Duration keepAliveInterval;
2225
private final boolean wiretap;
26+
private final Map<String, String> headers;
2327

2428
private GatewayClientSettings(Builder builder) {
2529
this.host = builder.host;
@@ -30,6 +34,7 @@ private GatewayClientSettings(Builder builder) {
3034
this.errorMapper = builder.errorMapper;
3135
this.keepAliveInterval = builder.keepAliveInterval;
3236
this.wiretap = builder.wiretap;
37+
this.headers = builder.headers;
3338
}
3439

3540
public String host() {
@@ -64,6 +69,10 @@ public boolean wiretap() {
6469
return this.wiretap;
6570
}
6671

72+
public Map<String, String> headers() {
73+
return headers;
74+
}
75+
6776
public static Builder builder() {
6877
return new Builder();
6978
}
@@ -96,9 +105,9 @@ public static class Builder {
96105
private ServiceClientErrorMapper errorMapper = DefaultErrorMapper.INSTANCE;
97106
private Duration keepAliveInterval = DEFAULT_KEEPALIVE_INTERVAL;
98107
private boolean wiretap = false;
108+
private Map<String, String> headers = Collections.emptyMap();
99109

100-
private Builder() {
101-
}
110+
private Builder() {}
102111

103112
private Builder(GatewayClientSettings originalSettings) {
104113
this.host = originalSettings.host;
@@ -109,6 +118,7 @@ private Builder(GatewayClientSettings originalSettings) {
109118
this.errorMapper = originalSettings.errorMapper;
110119
this.keepAliveInterval = originalSettings.keepAliveInterval;
111120
this.wiretap = originalSettings.wiretap;
121+
this.headers = Collections.unmodifiableMap(new HashMap<>(originalSettings.headers));
112122
}
113123

114124
public Builder host(String host) {
@@ -191,6 +201,11 @@ public Builder errorMapper(ServiceClientErrorMapper errorMapper) {
191201
return this;
192202
}
193203

204+
public Builder headers(Map<String, String> headers) {
205+
this.headers = Collections.unmodifiableMap(new HashMap<>(headers));
206+
return this;
207+
}
208+
194209
public GatewayClientSettings build() {
195210
return new GatewayClientSettings(this);
196211
}

services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/http/HttpGatewayClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public HttpGatewayClient(GatewayClientSettings settings, GatewayClientCodec<Byte
4141

4242
httpClient =
4343
HttpClient.create(ConnectionProvider.elastic("http-gateway-client"))
44+
.headers(headers -> settings.headers().forEach(headers::add))
4445
.followRedirect(settings.followRedirect())
4546
.tcpConfiguration(
4647
tcpClient -> {

services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/rsocket/RSocketGatewayClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.rsocket.core.RSocketConnector;
66
import io.rsocket.frame.decoder.PayloadDecoder;
77
import io.rsocket.transport.netty.client.WebsocketClientTransport;
8+
import io.rsocket.util.EmptyPayload;
89
import io.scalecube.services.api.ServiceMessage;
910
import io.scalecube.services.exceptions.ConnectionClosedException;
1011
import io.scalecube.services.gateway.transport.GatewayClient;
@@ -135,8 +136,14 @@ private Mono<RSocket> getOrConnect0(Mono prev) {
135136
return prev;
136137
}
137138

139+
Payload setupPayload = EmptyPayload.INSTANCE;
140+
if (!settings.headers().isEmpty()) {
141+
setupPayload = codec.encode(ServiceMessage.builder().headers(settings.headers()).build());
142+
}
143+
138144
return RSocketConnector.create()
139145
.payloadDecoder(PayloadDecoder.DEFAULT)
146+
.setupPayload(setupPayload)
140147
.metadataMimeType(settings.contentType())
141148
.connect(createRSocketTransport(settings))
142149
.doOnSuccess(

services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public WebsocketGatewayClient(GatewayClientSettings settings, GatewayClientCodec
5555

5656
httpClient =
5757
HttpClient.newConnection()
58+
.headers(headers -> settings.headers().forEach(headers::add))
5859
.followRedirect(settings.followRedirect())
5960
.tcpConfiguration(
6061
tcpClient -> {

services-gateway-netty/src/main/java/io/scalecube/services/gateway/GatewaySession.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.scalecube.services.gateway;
22

3-
import java.util.List;
43
import java.util.Map;
54

65
public interface GatewaySession {
@@ -15,7 +14,7 @@ public interface GatewaySession {
1514
/**
1615
* Returns headers associated with session.
1716
*
18-
* @return heades map
17+
* @return headers map
1918
*/
20-
Map<String, List<String>> headers();
19+
Map<String, String> headers();
2120
}

services-gateway-netty/src/main/java/io/scalecube/services/gateway/GatewaySessionHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import io.netty.buffer.ByteBuf;
44
import io.scalecube.services.api.ServiceMessage;
5-
import java.util.List;
65
import java.util.Map;
76
import org.slf4j.Logger;
87
import org.slf4j.LoggerFactory;
@@ -84,7 +83,7 @@ default void onSessionError(GatewaySession session, Throwable throwable) {
8483
* @param headers connection/session headers
8584
* @return mono result
8685
*/
87-
default Mono<Void> onConnectionOpen(long sessionId, Map<String, List<String>> headers) {
86+
default Mono<Void> onConnectionOpen(long sessionId, Map<String, String> headers) {
8887
return Mono.fromRunnable(
8988
() ->
9089
LOGGER.debug(

services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewayAcceptor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import io.scalecube.services.gateway.GatewaySessionHandler;
88
import io.scalecube.services.gateway.ServiceMessageCodec;
99
import io.scalecube.services.transport.api.HeadersCodec;
10+
import java.util.Map;
1011
import org.slf4j.Logger;
1112
import org.slf4j.LoggerFactory;
1213
import reactor.core.publisher.Mono;
@@ -41,6 +42,7 @@ public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket rsocket) {
4142
new RSocketGatewaySession(
4243
serviceCall,
4344
messageCodec,
45+
headers(messageCodec, setup),
4446
(session, req) -> sessionHandler.mapMessage(session, req, Context.empty()));
4547
sessionHandler.onSessionOpen(gatewaySession);
4648
rsocket
@@ -54,4 +56,11 @@ public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket rsocket) {
5456

5557
return Mono.just(gatewaySession);
5658
}
59+
60+
private Map<String, String> headers(
61+
ServiceMessageCodec messageCodec, ConnectionSetupPayload setup) {
62+
return messageCodec
63+
.decode(setup.sliceData().retain(), setup.sliceMetadata().retain())
64+
.headers();
65+
}
5766
}

services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewaySession.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import io.scalecube.services.gateway.ReferenceCountUtil;
1111
import io.scalecube.services.gateway.ServiceMessageCodec;
1212
import java.util.Collections;
13-
import java.util.List;
13+
import java.util.HashMap;
1414
import java.util.Map;
1515
import java.util.concurrent.atomic.AtomicLong;
1616
import java.util.function.BiFunction;
@@ -30,6 +30,7 @@ public final class RSocketGatewaySession extends AbstractRSocket implements Gate
3030
private final ServiceMessageCodec messageCodec;
3131
private final long sessionId;
3232
private final BiFunction<GatewaySession, ServiceMessage, ServiceMessage> messageMapper;
33+
private final Map<String, String> headers;
3334

3435
/**
3536
* Constructor for gateway rsocket.
@@ -40,11 +41,13 @@ public final class RSocketGatewaySession extends AbstractRSocket implements Gate
4041
public RSocketGatewaySession(
4142
ServiceCall serviceCall,
4243
ServiceMessageCodec messageCodec,
44+
Map<String, String> headers,
4345
BiFunction<GatewaySession, ServiceMessage, ServiceMessage> messageMapper) {
4446
this.serviceCall = serviceCall;
4547
this.messageCodec = messageCodec;
4648
this.messageMapper = messageMapper;
4749
this.sessionId = SESSION_ID_GENERATOR.incrementAndGet();
50+
this.headers = Collections.unmodifiableMap(new HashMap<>(headers));
4851
}
4952

5053
@Override
@@ -53,8 +56,8 @@ public long sessionId() {
5356
}
5457

5558
@Override
56-
public Map<String, List<String>> headers() {
57-
return Collections.emptyMap();
59+
public Map<String, String> headers() {
60+
return headers;
5861
}
5962

6063
@Override

services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@
2121
import io.scalecube.services.exceptions.UnauthorizedException;
2222
import io.scalecube.services.gateway.GatewaySessionHandler;
2323
import io.scalecube.services.gateway.ReferenceCountUtil;
24-
import java.util.HashMap;
25-
import java.util.List;
2624
import java.util.Map;
25+
import java.util.Map.Entry;
2726
import java.util.Objects;
2827
import java.util.Optional;
2928
import java.util.concurrent.atomic.AtomicBoolean;
3029
import java.util.concurrent.atomic.AtomicLong;
3130
import java.util.function.BiFunction;
31+
import java.util.stream.Collectors;
3232
import org.reactivestreams.Publisher;
3333
import reactor.core.Disposable;
3434
import reactor.core.publisher.Flux;
@@ -63,7 +63,7 @@ public WebsocketGatewayAcceptor(ServiceCall serviceCall, GatewaySessionHandler g
6363

6464
@Override
6565
public Publisher<Void> apply(HttpServerRequest httpRequest, HttpServerResponse httpResponse) {
66-
final Map<String, List<String>> headers = computeHeaders(httpRequest.requestHeaders());
66+
final Map<String, String> headers = computeHeaders(httpRequest.requestHeaders());
6767
final long sessionId = SESSION_ID_GENERATOR.incrementAndGet();
6868

6969
return gatewayHandler
@@ -85,12 +85,9 @@ public Publisher<Void> apply(HttpServerRequest httpRequest, HttpServerResponse h
8585
.onErrorResume(throwable -> Mono.empty());
8686
}
8787

88-
private static Map<String, List<String>> computeHeaders(HttpHeaders httpHeaders) {
89-
Map<String, List<String>> headers = new HashMap<>();
90-
for (String name : httpHeaders.names()) {
91-
headers.put(name, httpHeaders.getAll(name));
92-
}
93-
return headers;
88+
private static Map<String, String> computeHeaders(HttpHeaders httpHeaders) {
89+
// exception will be thrown on duplicate
90+
return httpHeaders.entries().stream().collect(Collectors.toMap(Entry::getKey, Entry::getValue));
9491
}
9592

9693
private static int toStatusCode(Throwable throwable) {

services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewaySession.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import io.scalecube.services.gateway.GatewaySessionHandler;
1010
import java.util.Collections;
1111
import java.util.HashMap;
12-
import java.util.List;
1312
import java.util.Map;
1413
import org.jctools.maps.NonBlockingHashMapLong;
1514
import org.slf4j.Logger;
@@ -33,7 +32,7 @@ public final class WebsocketGatewaySession implements GatewaySession {
3332
private final WebsocketServiceMessageCodec codec;
3433

3534
private final long sessionId;
36-
private final Map<String, List<String>> headers;
35+
private final Map<String, String> headers;
3736

3837
/**
3938
* Create a new websocket session with given handshake, inbound and outbound channels.
@@ -48,7 +47,7 @@ public final class WebsocketGatewaySession implements GatewaySession {
4847
public WebsocketGatewaySession(
4948
long sessionId,
5049
WebsocketServiceMessageCodec codec,
51-
Map<String, List<String>> headers,
50+
Map<String, String> headers,
5251
WebsocketInbound inbound,
5352
WebsocketOutbound outbound,
5453
GatewaySessionHandler gatewayHandler) {
@@ -68,7 +67,7 @@ public long sessionId() {
6867
}
6968

7069
@Override
71-
public Map<String, List<String>> headers() {
70+
public Map<String, String> headers() {
7271
return headers;
7372
}
7473

0 commit comments

Comments
 (0)