From a43ae9516c5b239863bddd470182f91b3bb68b43 Mon Sep 17 00:00:00 2001 From: jpiraguha Date: Thu, 17 Dec 2020 12:43:59 +0100 Subject: [PATCH] feat: ability to use retrosocket without pairing Signed-off-by: jpiraguha --- .../retrosocket/RSocketClient.java | 4 +- .../RSocketClientAutoConfiguration.java | 1 - .../retrosocket/RSocketClientFactoryBean.java | 37 +++++- .../retrosocket/RSocketClientsRegistrar.java | 25 +++- .../retrosocket/RequesterMode.kt | 6 + .../retrosocket/nopairing/GreetingClient.java | 37 ++++++ .../nopairing/GreetingResponse.java | 17 +++ .../nopairing/GreetingsController.java | 70 +++++++++++ .../nopairing/RSocketClientConfiguration.java | 20 +++ .../nopairing/RSocketClientTest.java | 119 ++++++++++++++++++ .../nopairing/RSocketServerConfiguration.java | 29 +++++ 11 files changed, 360 insertions(+), 5 deletions(-) create mode 100644 spring-retrosocket/src/main/java/org/springframework/retrosocket/RequesterMode.kt create mode 100644 spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/GreetingClient.java create mode 100644 spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/GreetingResponse.java create mode 100644 spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/GreetingsController.java create mode 100644 spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/RSocketClientConfiguration.java create mode 100644 spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/RSocketClientTest.java create mode 100644 spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/RSocketServerConfiguration.java diff --git a/spring-retrosocket/src/main/java/org/springframework/retrosocket/RSocketClient.java b/spring-retrosocket/src/main/java/org/springframework/retrosocket/RSocketClient.java index 8dd9908..57a9fd7 100644 --- a/spring-retrosocket/src/main/java/org/springframework/retrosocket/RSocketClient.java +++ b/spring-retrosocket/src/main/java/org/springframework/retrosocket/RSocketClient.java @@ -10,5 +10,7 @@ @Documented @Inherited public @interface RSocketClient { - + String host() default ""; + String port() default ""; + RequesterMode mode() default RequesterMode.PAIRING; } diff --git a/spring-retrosocket/src/main/java/org/springframework/retrosocket/RSocketClientAutoConfiguration.java b/spring-retrosocket/src/main/java/org/springframework/retrosocket/RSocketClientAutoConfiguration.java index 7c67e6c..cf68412 100644 --- a/spring-retrosocket/src/main/java/org/springframework/retrosocket/RSocketClientAutoConfiguration.java +++ b/spring-retrosocket/src/main/java/org/springframework/retrosocket/RSocketClientAutoConfiguration.java @@ -14,7 +14,6 @@ class RSocketClientAutoConfiguration { @Bean - @ConditionalOnBean(RSocketRequester.class) RSocketClientBuilder rSocketClientBuilder() { return new RSocketClientBuilder(); } diff --git a/spring-retrosocket/src/main/java/org/springframework/retrosocket/RSocketClientFactoryBean.java b/spring-retrosocket/src/main/java/org/springframework/retrosocket/RSocketClientFactoryBean.java index cf6e954..dc53086 100644 --- a/spring-retrosocket/src/main/java/org/springframework/retrosocket/RSocketClientFactoryBean.java +++ b/spring-retrosocket/src/main/java/org/springframework/retrosocket/RSocketClientFactoryBean.java @@ -26,6 +26,28 @@ class RSocketClientFactoryBean implements BeanFactoryAware, FactoryBean private ListableBeanFactory context; + private String host; + + private String port; + + private RequesterMode mode; + + private RSocketRequester rSocketRequester() { + if (mode == RequesterMode.PAIRING) { + return forInterface(type, context); + } else { + Assert.isTrue(host.length()!=0, "In MANAGED mode host should be provided"); + Assert.isTrue(port.length()!=0, "In MANAGED mode port should be provided"); + return requesterBuilder() + .connectTcp(host, Integer.parseInt(port)) + .block(); + } + } + + private RSocketRequester.Builder requesterBuilder() { + return context.getBean(RSocketRequester.Builder.class); + } + private static RSocketRequester forInterface(Class clientInterface, ListableBeanFactory context) { Map rSocketRequestersInContext = context.getBeansOfType(RSocketRequester.class); int rSocketRequestersCount = rSocketRequestersInContext.size(); @@ -66,7 +88,7 @@ public void setType(String type) { @Override public Object getObject() { - RSocketRequester rSocketRequester = forInterface(this.type, this.context); + RSocketRequester rSocketRequester = rSocketRequester(); RSocketClientBuilder clientBuilder = this.context.getBean(RSocketClientBuilder.class); return clientBuilder.buildClientFor(this.type, rSocketRequester); } @@ -83,4 +105,17 @@ public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.context = (ListableBeanFactory) beanFactory; } + public void setHost(String host) { + this.host = host; + } + + public void setPort(String port) { + this.port = port; + } + + public void setMode(RequesterMode mode) { + this.mode = mode; + } + + } diff --git a/spring-retrosocket/src/main/java/org/springframework/retrosocket/RSocketClientsRegistrar.java b/spring-retrosocket/src/main/java/org/springframework/retrosocket/RSocketClientsRegistrar.java index e680f93..56c0040 100644 --- a/spring-retrosocket/src/main/java/org/springframework/retrosocket/RSocketClientsRegistrar.java +++ b/spring-retrosocket/src/main/java/org/springframework/retrosocket/RSocketClientsRegistrar.java @@ -81,7 +81,9 @@ public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, B .forEach(beanDefinition -> { AnnotationMetadata annotationMetadata = beanDefinition.getMetadata(); this.validateInterface(annotationMetadata); - this.registerRSocketClient(annotationMetadata, registry); + Map attributes = annotationMetadata + .getAnnotationAttributes(RSocketClient.class.getCanonicalName()); + this.registerRSocketClient(annotationMetadata, registry, attributes); })); } @@ -118,7 +120,8 @@ public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, B } @SneakyThrows - private void registerRSocketClient(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry registry) { + private void registerRSocketClient(AnnotationMetadata annotationMetadata, + BeanDefinitionRegistry registry, Map attributes) { String className = annotationMetadata.getClassName(); if (log.isDebugEnabled()) { log.debug("trying to turn the interface " + className + " into an RSocketClientFactoryBean"); @@ -126,6 +129,9 @@ private void registerRSocketClient(AnnotationMetadata annotationMetadata, BeanDe BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(RSocketClientFactoryBean.class); definition.addPropertyValue("type", className); + definition.addPropertyValue("host", getHost(attributes)); + definition.addPropertyValue("port", getAttribute(attributes, "port")); + definition.addPropertyValue("mode", getAttribute(attributes, "mode")); definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE); AbstractBeanDefinition beanDefinition = definition.getBeanDefinition(); @@ -136,6 +142,21 @@ private void registerRSocketClient(AnnotationMetadata annotationMetadata, BeanDe BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry); } + private T getAttribute(Map attributes, String name) { + return (T) attributes.get(name); + } + + private String getHost(Map attributes) { + return resolve((String) attributes.get("host")); + } + + private String resolve(String value) { + if (StringUtils.hasText(value)) { + return this.environment.resolvePlaceholders(value); + } + return value; + } + @Override public void setResourceLoader(ResourceLoader resourceLoader) { this.resourceLoader = resourceLoader; diff --git a/spring-retrosocket/src/main/java/org/springframework/retrosocket/RequesterMode.kt b/spring-retrosocket/src/main/java/org/springframework/retrosocket/RequesterMode.kt new file mode 100644 index 0000000..3e2a9c9 --- /dev/null +++ b/spring-retrosocket/src/main/java/org/springframework/retrosocket/RequesterMode.kt @@ -0,0 +1,6 @@ +package org.springframework.retrosocket + +enum class RequesterMode { + PAIRING, + MANAGED +} diff --git a/spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/GreetingClient.java b/spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/GreetingClient.java new file mode 100644 index 0000000..f951341 --- /dev/null +++ b/spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/GreetingClient.java @@ -0,0 +1,37 @@ +package org.springframework.retrosocket.nopairing; + +import static org.springframework.retrosocket.RequesterMode.MANAGED; + +import org.springframework.messaging.handler.annotation.DestinationVariable; +import org.springframework.messaging.handler.annotation.MessageMapping; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.retrosocket.RSocketClient; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * @author Josh Long + */ +@RSocketClient(host = "${greeting-client.host:localhost}", port = "${greeting-client.port:8888}", mode = MANAGED) +interface GreetingClient { + + @MessageMapping("greetings") + Mono greet(); + + @MessageMapping("greetings-with-channel") + Flux greetParams(Flux names); + + @MessageMapping("greetings-stream") + Flux greetStream(Mono name); + + @MessageMapping("greetings-with-name") + Mono greet(Mono name); + + @MessageMapping("fire-and-forget") + Mono greetFireAndForget(Mono name); + + @MessageMapping("greetings-mono-name.{name}.{age}") + Mono greetMonoNameDestinationVariable(@DestinationVariable("name") String name, + @DestinationVariable("age") int age, @Payload Mono payload); + +} diff --git a/spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/GreetingResponse.java b/spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/GreetingResponse.java new file mode 100644 index 0000000..2307a6e --- /dev/null +++ b/spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/GreetingResponse.java @@ -0,0 +1,17 @@ +package org.springframework.retrosocket.nopairing; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author Josh Long + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class GreetingResponse { + + private String message; + +} diff --git a/spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/GreetingsController.java b/spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/GreetingsController.java new file mode 100644 index 0000000..54ce423 --- /dev/null +++ b/spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/GreetingsController.java @@ -0,0 +1,70 @@ +package org.springframework.retrosocket.nopairing; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; +import javax.annotation.PostConstruct; +import lombok.extern.log4j.Log4j2; +import org.springframework.context.annotation.Profile; +import org.springframework.messaging.handler.annotation.DestinationVariable; +import org.springframework.messaging.handler.annotation.MessageMapping; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Controller; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * @author Josh Long + */ +@Log4j2 +@Profile("service") +@Controller +class GreetingsController { + + static AtomicReference fireAndForget = new AtomicReference<>(); + + @PostConstruct + public void begin() { + log.info("begin()"); + } + + @MessageMapping("greetings-mono-name.{name}.{age}") + Mono greetMonoNameDestinationVariable(@DestinationVariable("name") String name, + @DestinationVariable("age") int age, @Payload Mono payload) { + log.info("name=" + name); + log.info("age=" + age); + return payload; + } + + @MessageMapping("fire-and-forget") + Mono fireAndForget(Mono valueIn) { + return valueIn// + .doOnNext(value -> { + log.info("received fire-and-forget " + value + '.'); + fireAndForget.set(value); + })// + .then(); + } + + @MessageMapping("greetings-with-channel") + Flux greetParams(Flux names) { + return names.map(String::toUpperCase).map(GreetingResponse::new); + } + + @MessageMapping("greetings-stream") + Flux greetFlux(Mono name) { + return name.flatMapMany( + leNom -> Flux.fromStream(Stream.generate(() -> new GreetingResponse(leNom.toUpperCase()))).take(2)); + } + + @MessageMapping("greetings-with-name") + Mono greetMono(Mono name) { + return name.map(GreetingResponse::new); + } + + @MessageMapping("greetings") + Mono greet() { + log.info("invoking greetings and returning a GreetingsResponse."); + return Mono.just(new GreetingResponse("Hello, world!")); + } + +} diff --git a/spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/RSocketClientConfiguration.java b/spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/RSocketClientConfiguration.java new file mode 100644 index 0000000..f009dad --- /dev/null +++ b/spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/RSocketClientConfiguration.java @@ -0,0 +1,20 @@ +package org.springframework.retrosocket.nopairing; + +import javax.annotation.PostConstruct; +import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Profile; +import org.springframework.messaging.rsocket.RSocketRequester; +import org.springframework.retrosocket.EnableRSocketClients; + +/** + * @author Josh Long + */ +@Log4j2 +@Profile("client") +@EnableRSocketClients +@SpringBootApplication +class RSocketClientConfiguration { +} diff --git a/spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/RSocketClientTest.java b/spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/RSocketClientTest.java new file mode 100644 index 0000000..3b0ecf3 --- /dev/null +++ b/spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/RSocketClientTest.java @@ -0,0 +1,119 @@ +package org.springframework.retrosocket.nopairing; + +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.log4j.Log4j2; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.util.SocketUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +/** + * @author Josh Long + */ + +@Log4j2 +public class RSocketClientTest { + + static ConfigurableApplicationContext serviceApplicationContext; + + static AtomicInteger port = new AtomicInteger(7000); + + @BeforeAll + public static void begin() { + serviceApplicationContext = new SpringApplicationBuilder( + RSocketServerConfiguration.class)// + .web(WebApplicationType.NONE) + .run("--spring.profiles.active=service", "--spring.rsocket.server.port=" + port.get()); + } + + @AfterAll + public static void destroy() { + serviceApplicationContext.stop(); + } + + @Test + public void destinationVariablesAndPayload() { + GreetingClient greetingClient = buildClient(); + Mono greetingResponseFlux = greetingClient.greetMonoNameDestinationVariable("jlong", 36, + Mono.just("Hello")); + StepVerifier// + .create(greetingResponseFlux)// + .expectNextMatches(gr -> gr.equalsIgnoreCase("Hello"))// + .verifyComplete(); + } + + @Test + public void monoInFluxOut() { + GreetingClient greetingClient = buildClient(); + Flux greetingResponseFlux = greetingClient.greetStream(Mono.just("a")); + StepVerifier// + .create(greetingResponseFlux)// + .expectNextCount(1).expectNextMatches(gr -> gr.getMessage().equalsIgnoreCase("A"))// + .verifyComplete(); + + } + + @Test + public void fluxInFluxOut() { + GreetingClient greetingClient = buildClient(); + Flux greetingResponseFlux = greetingClient.greetParams(Flux.just("a", "b")); + StepVerifier// + .create(greetingResponseFlux)// + .expectNextMatches(gr -> gr.getMessage().equalsIgnoreCase("A"))// + .expectNextMatches(gr -> gr.getMessage().equalsIgnoreCase("B"))// + .verifyComplete(); + + } + + @Test + public void monoInMonoOut() { + GreetingClient greetingClient = buildClient(); + Mono greet = greetingClient.greet(Mono.just("Hello, Mario")); + StepVerifier// + .create(greet)// + .expectNextMatches(gr -> gr.getMessage().equalsIgnoreCase("Hello, Mario"))// + .verifyComplete(); + + } + + @Test + public void noValueInMonoOut() throws Exception { + GreetingClient greetingClient = buildClient(); + Mono greet = greetingClient.greet(); + StepVerifier// + .create(greet)// + .expectNextMatches(gr -> gr.getMessage().equalsIgnoreCase("Hello, world!"))// + .verifyComplete(); + + } + + @Test + public void fireAndForget() throws Exception { + GreetingClient greetingClient = buildClient(); + String name = "Kimly"; + Mono greet = greetingClient.greetFireAndForget(Mono.just(name)); + StepVerifier// + .create(greet)// + .verifyComplete(); + + Thread.sleep(1000); + boolean kimly = GreetingsController.fireAndForget.get().equalsIgnoreCase(name); + Assertions.assertTrue(kimly, "the name perceived on the service is the same as we sent."); + } + + private GreetingClient buildClient() { + ConfigurableApplicationContext context = new SpringApplicationBuilder( + RSocketClientConfiguration.class)// + .web(WebApplicationType.NONE)// + .run("--service.port=" + port.get(), "--spring.profiles.active=client" , "--greeting-client.port=" + port.get()); + return context.getBean(GreetingClient.class); + } + +} diff --git a/spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/RSocketServerConfiguration.java b/spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/RSocketServerConfiguration.java new file mode 100644 index 0000000..44a5db9 --- /dev/null +++ b/spring-retrosocket/src/test/java/org/springframework/retrosocket/nopairing/RSocketServerConfiguration.java @@ -0,0 +1,29 @@ +package org.springframework.retrosocket.nopairing; + +import javax.annotation.PostConstruct; +import lombok.extern.log4j.Log4j2; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; + +/** + * @author Josh Long + */ +@Log4j2 +@Profile("service") +@Configuration +@EnableAutoConfiguration +class RSocketServerConfiguration { + + @Bean + GreetingsController greetingsController() { + return new GreetingsController(); + } + + @PostConstruct + public void start() { + log.info("starting " + RSocketServerConfiguration.class.getName() + '.'); + } + +}