Skip to content

Commit f929c6c

Browse files
committed
Use dataType header for encoding/decoding data
1 parent a86d6c0 commit f929c6c

File tree

31 files changed

+885
-291
lines changed

31 files changed

+885
-291
lines changed

services-api/src/main/java/io/scalecube/services/ServiceCall.java

Lines changed: 62 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.scalecube.services.routing.Router;
2222
import io.scalecube.services.routing.Routers;
2323
import io.scalecube.services.transport.api.ClientTransport;
24+
import io.scalecube.services.transport.api.ServiceMessageDataDecoder;
2425
import java.lang.reflect.Method;
2526
import java.lang.reflect.Proxy;
2627
import java.lang.reflect.Type;
@@ -44,6 +45,7 @@ public class ServiceCall implements AutoCloseable {
4445
private ServiceClientErrorMapper errorMapper = DefaultErrorMapper.INSTANCE;
4546
private Map<String, String> credentials = Collections.emptyMap();
4647
private String contentType = ServiceMessage.DEFAULT_DATA_FORMAT;
48+
private ServiceMessageDataDecoder dataDecoder = ServiceMessageDataDecoder.INSTANCE;
4749

4850
// private Logger logger;
4951

@@ -55,6 +57,7 @@ private ServiceCall(ServiceCall other) {
5557
this.router = other.router;
5658
this.errorMapper = other.errorMapper;
5759
this.contentType = other.contentType;
60+
this.dataDecoder = other.dataDecoder;
5861
this.credentials = Collections.unmodifiableMap(new HashMap<>(other.credentials));
5962
}
6063

@@ -143,33 +146,35 @@ public ServiceCall contentType(String contentType) {
143146
}
144147

145148
/**
146-
* Invokes fire-and-forget request.
149+
* Setter for {@code dataDecoder}.
147150
*
148-
* @param request request message to send.
149-
* @return mono publisher completing normally or with error.
151+
* @param dataDecoder dataDecoder.
152+
* @return new {@link ServiceCall} instance.
150153
*/
151-
public Mono<Void> oneWay(ServiceMessage request) {
152-
return requestOne(request, Void.class).then();
154+
public ServiceCall dataDecoder(ServiceMessageDataDecoder dataDecoder) {
155+
ServiceCall target = new ServiceCall(this);
156+
target.dataDecoder = dataDecoder;
157+
return target;
153158
}
154159

155160
/**
156-
* Invokes request-and-reply request.
161+
* Invokes fire-and-forget request.
157162
*
158163
* @param request request message to send.
159-
* @return mono publisher completing with single response message or with error.
164+
* @return mono publisher completing normally or with error.
160165
*/
161-
public Mono<ServiceMessage> requestOne(ServiceMessage request) {
162-
return requestOne(request, null);
166+
public Mono<Void> oneWay(ServiceMessage request) {
167+
return requestOne(request, true).then();
163168
}
164169

165170
/**
166171
* Invokes request-and-reply request.
167172
*
168173
* @param request request message to send.
169-
* @param responseType type of response (optional).
174+
* @param decodeData should decode data.
170175
* @return mono publisher completing with single response message or with error.
171176
*/
172-
public Mono<ServiceMessage> requestOne(ServiceMessage request, Type responseType) {
177+
public Mono<ServiceMessage> requestOne(ServiceMessage request, boolean decodeData) {
173178
return Mono.defer(
174179
() -> {
175180
ServiceMethodInvoker methodInvoker;
@@ -178,6 +183,7 @@ public Mono<ServiceMessage> requestOne(ServiceMessage request, Type responseType
178183
// local service
179184
return methodInvoker
180185
.invokeOne(request)
186+
.map(msg -> decodeData ? dataDecoder.decodeData(msg, getDataType(msg, null)) : msg)
181187
.map(this::throwIfError)
182188
.contextWrite(
183189
context -> {
@@ -198,7 +204,12 @@ public Mono<ServiceMessage> requestOne(ServiceMessage request, Type responseType
198204
serviceReference ->
199205
transport
200206
.create(serviceReference)
201-
.requestResponse(request, responseType)
207+
.requestResponse(request)
208+
.map(
209+
msg ->
210+
decodeData
211+
? dataDecoder.decodeData(msg, getDataType(msg, null))
212+
: msg)
202213
.map(this::throwIfError));
203214
}
204215
});
@@ -208,20 +219,10 @@ public Mono<ServiceMessage> requestOne(ServiceMessage request, Type responseType
208219
* Issues request to service which returns stream of service messages back.
209220
*
210221
* @param request request message to send.
222+
* @param decodeData should decode data.
211223
* @return flux publisher of service responses.
212224
*/
213-
public Flux<ServiceMessage> requestMany(ServiceMessage request) {
214-
return requestMany(request, null);
215-
}
216-
217-
/**
218-
* Issues request to service which returns stream of service messages back.
219-
*
220-
* @param request request with given headers.
221-
* @param responseType type of responses (optional).
222-
* @return flux publisher of service responses.
223-
*/
224-
public Flux<ServiceMessage> requestMany(ServiceMessage request, Type responseType) {
225+
public Flux<ServiceMessage> requestMany(ServiceMessage request, boolean decodeData) {
225226
return Flux.defer(
226227
() -> {
227228
ServiceMethodInvoker methodInvoker;
@@ -230,6 +231,7 @@ public Flux<ServiceMessage> requestMany(ServiceMessage request, Type responseTyp
230231
// local service
231232
return methodInvoker
232233
.invokeMany(request)
234+
.map(msg -> decodeData ? dataDecoder.decodeData(msg, getDataType(msg, null)) : msg)
233235
.map(this::throwIfError)
234236
.contextWrite(
235237
context -> {
@@ -250,7 +252,12 @@ public Flux<ServiceMessage> requestMany(ServiceMessage request, Type responseTyp
250252
serviceReference ->
251253
transport
252254
.create(serviceReference)
253-
.requestStream(request, responseType)
255+
.requestStream(request)
256+
.map(
257+
msg ->
258+
decodeData
259+
? dataDecoder.decodeData(msg, getDataType(msg, null))
260+
: msg)
254261
.map(this::throwIfError));
255262
}
256263
});
@@ -260,21 +267,11 @@ public Flux<ServiceMessage> requestMany(ServiceMessage request, Type responseTyp
260267
* Issues stream of service requests to service which returns stream of service messages back.
261268
*
262269
* @param publisher of service requests.
263-
* @return flux publisher of service responses.
264-
*/
265-
public Flux<ServiceMessage> requestBidirectional(Publisher<ServiceMessage> publisher) {
266-
return requestBidirectional(publisher, null);
267-
}
268-
269-
/**
270-
* Issues stream of service requests to service which returns stream of service messages back.
271-
*
272-
* @param publisher of service requests.
273-
* @param responseType type of responses (optional).
270+
* @param decodeData should decode data.
274271
* @return flux publisher of service responses.
275272
*/
276273
public Flux<ServiceMessage> requestBidirectional(
277-
Publisher<ServiceMessage> publisher, Type responseType) {
274+
Publisher<ServiceMessage> publisher, boolean decodeData) {
278275
return Flux.from(publisher)
279276
.switchOnFirst(
280277
(first, messages) -> {
@@ -286,6 +283,11 @@ public Flux<ServiceMessage> requestBidirectional(
286283
// local service
287284
return methodInvoker
288285
.invokeBidirectional(messages)
286+
.map(
287+
msg ->
288+
decodeData
289+
? dataDecoder.decodeData(msg, getDataType(msg, null))
290+
: msg)
289291
.map(this::throwIfError)
290292
.contextWrite(
291293
context -> {
@@ -306,7 +308,12 @@ public Flux<ServiceMessage> requestBidirectional(
306308
serviceReference ->
307309
transport
308310
.create(serviceReference)
309-
.requestChannel(messages, responseType)
311+
.requestChannel(messages)
312+
.map(
313+
msg ->
314+
decodeData
315+
? dataDecoder.decodeData(msg, getDataType(msg, null))
316+
: msg)
310317
.map(this::throwIfError));
311318
}
312319
}
@@ -343,23 +350,28 @@ public <T> T api(Class<T> serviceInterface) {
343350
switch (methodInfo.communicationMode()) {
344351
case REQUEST_RESPONSE:
345352
return serviceCall
346-
.requestOne(toServiceMessage(methodInfo, request), returnType)
353+
.requestOne(toServiceMessage(methodInfo, request), false)
354+
.map(msg -> dataDecoder.decodeData(msg, getDataType(msg, returnType)))
355+
.map(this::throwIfError)
347356
.transform(asMono(isReturnTypeServiceMessage));
348357

349358
case REQUEST_STREAM:
350359
return serviceCall
351-
.requestMany(toServiceMessage(methodInfo, request), returnType)
360+
.requestMany(toServiceMessage(methodInfo, request), false)
361+
.map(msg -> dataDecoder.decodeData(msg, getDataType(msg, returnType)))
362+
.map(this::throwIfError)
352363
.transform(asFlux(isReturnTypeServiceMessage));
353364

354365
case REQUEST_CHANNEL:
355366
// this is REQUEST_CHANNEL so it means params[0] must
356367
// be a publisher - its safe to cast.
357-
//noinspection rawtypes
358368
return serviceCall
359369
.requestBidirectional(
360-
Flux.from((Publisher) request)
370+
Flux.from((Publisher<?>) request)
361371
.map(data -> toServiceMessage(methodInfo, data)),
362-
returnType)
372+
false)
373+
.map(msg -> dataDecoder.decodeData(msg, getDataType(msg, returnType)))
374+
.map(message -> throwIfError(message))
363375
.transform(asFlux(isReturnTypeServiceMessage));
364376

365377
default:
@@ -461,6 +473,13 @@ private static MethodInfo getMethodInfo(Class<?> serviceInterface, Method method
461473
Collections.emptyList());
462474
}
463475

476+
private static Type getDataType(ServiceMessage message, Type defaultType) {
477+
final var dataType = message.header(ServiceMessage.HEADER_DATA_TYPE);
478+
final var type = TypeUtils.parseTypeDescriptor(dataType);
479+
final var resultType = type != null ? type : defaultType;
480+
return resultType == null && message.isError() ? ErrorData.class : resultType;
481+
}
482+
464483
@Override
465484
public void close() {
466485
if (transport != null) {

0 commit comments

Comments
 (0)