Skip to content

feat: ability to use retrosocket without pairing #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@
@Documented
@Inherited
public @interface RSocketClient {

String host() default "";
String port() default "";
RequesterMode mode() default RequesterMode.PAIRING;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
class RSocketClientAutoConfiguration {

@Bean
@ConditionalOnBean(RSocketRequester.class)
RSocketClientBuilder rSocketClientBuilder() {
return new RSocketClientBuilder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,28 @@ class RSocketClientFactoryBean implements BeanFactoryAware, FactoryBean<Object>

private ListableBeanFactory context;

private String host;

private String port;

private RequesterMode mode;

private RSocketRequester rSocketRequester() {
if (mode == RequesterMode.PAIRING) {
return forInterface(type, context);
} else {
Assert.isTrue(host.length()!=0, "In MANAGED mode host should be provided");
Assert.isTrue(port.length()!=0, "In MANAGED mode port should be provided");
return requesterBuilder()
.connectTcp(host, Integer.parseInt(port))
.block();
}
}

private RSocketRequester.Builder requesterBuilder() {
return context.getBean(RSocketRequester.Builder.class);
}

private static RSocketRequester forInterface(Class<?> clientInterface, ListableBeanFactory context) {
Map<String, RSocketRequester> rSocketRequestersInContext = context.getBeansOfType(RSocketRequester.class);
int rSocketRequestersCount = rSocketRequestersInContext.size();
Expand Down Expand Up @@ -66,7 +88,7 @@ public void setType(String type) {

@Override
public Object getObject() {
RSocketRequester rSocketRequester = forInterface(this.type, this.context);
RSocketRequester rSocketRequester = rSocketRequester();
RSocketClientBuilder clientBuilder = this.context.getBean(RSocketClientBuilder.class);
return clientBuilder.buildClientFor(this.type, rSocketRequester);
}
Expand All @@ -83,4 +105,17 @@ public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.context = (ListableBeanFactory) beanFactory;
}

public void setHost(String host) {
this.host = host;
}

public void setPort(String port) {
this.port = port;
}

public void setMode(RequesterMode mode) {
this.mode = mode;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, B
.forEach(beanDefinition -> {
AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
this.validateInterface(annotationMetadata);
this.registerRSocketClient(annotationMetadata, registry);
Map<String, Object> attributes = annotationMetadata
.getAnnotationAttributes(RSocketClient.class.getCanonicalName());
this.registerRSocketClient(annotationMetadata, registry, attributes);
}));
}

Expand Down Expand Up @@ -118,14 +120,18 @@ public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, B
}

@SneakyThrows
private void registerRSocketClient(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry registry) {
private void registerRSocketClient(AnnotationMetadata annotationMetadata,
BeanDefinitionRegistry registry, Map<String, Object> attributes) {
String className = annotationMetadata.getClassName();
if (log.isDebugEnabled()) {
log.debug("trying to turn the interface " + className + " into an RSocketClientFactoryBean");
}

BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(RSocketClientFactoryBean.class);
definition.addPropertyValue("type", className);
definition.addPropertyValue("host", getHost(attributes));
definition.addPropertyValue("port", getAttribute(attributes, "port"));
definition.addPropertyValue("mode", getAttribute(attributes, "mode"));
definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);

AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
Expand All @@ -136,6 +142,21 @@ private void registerRSocketClient(AnnotationMetadata annotationMetadata, BeanDe
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}

private <T>T getAttribute(Map<String, Object> attributes, String name) {
return (T) attributes.get(name);
}

private String getHost(Map<String, Object> attributes) {
return resolve((String) attributes.get("host"));
}

private String resolve(String value) {
if (StringUtils.hasText(value)) {
return this.environment.resolvePlaceholders(value);
}
return value;
}

@Override
public void setResourceLoader(ResourceLoader resourceLoader) {
this.resourceLoader = resourceLoader;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.springframework.retrosocket

enum class RequesterMode {
PAIRING,
MANAGED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.springframework.retrosocket.nopairing;

import static org.springframework.retrosocket.RequesterMode.MANAGED;

import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.retrosocket.RSocketClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
* @author <a href="mailto:[email protected]">Josh Long</a>
*/
@RSocketClient(host = "${greeting-client.host:localhost}", port = "${greeting-client.port:8888}", mode = MANAGED)
interface GreetingClient {

@MessageMapping("greetings")
Mono<GreetingResponse> greet();

@MessageMapping("greetings-with-channel")
Flux<GreetingResponse> greetParams(Flux<String> names);

@MessageMapping("greetings-stream")
Flux<GreetingResponse> greetStream(Mono<String> name);

@MessageMapping("greetings-with-name")
Mono<GreetingResponse> greet(Mono<String> name);

@MessageMapping("fire-and-forget")
Mono<Void> greetFireAndForget(Mono<String> name);

@MessageMapping("greetings-mono-name.{name}.{age}")
Mono<String> greetMonoNameDestinationVariable(@DestinationVariable("name") String name,
@DestinationVariable("age") int age, @Payload Mono<String> payload);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.springframework.retrosocket.nopairing;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* @author <a href="mailto:[email protected]">Josh Long</a>
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class GreetingResponse {

private String message;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.springframework.retrosocket.nopairing;

import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import lombok.extern.log4j.Log4j2;
import org.springframework.context.annotation.Profile;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
* @author <a href="mailto:[email protected]">Josh Long</a>
*/
@Log4j2
@Profile("service")
@Controller
class GreetingsController {

static AtomicReference<String> fireAndForget = new AtomicReference<>();

@PostConstruct
public void begin() {
log.info("begin()");
}

@MessageMapping("greetings-mono-name.{name}.{age}")
Mono<String> greetMonoNameDestinationVariable(@DestinationVariable("name") String name,
@DestinationVariable("age") int age, @Payload Mono<String> payload) {
log.info("name=" + name);
log.info("age=" + age);
return payload;
}

@MessageMapping("fire-and-forget")
Mono<Void> fireAndForget(Mono<String> valueIn) {
return valueIn//
.doOnNext(value -> {
log.info("received fire-and-forget " + value + '.');
fireAndForget.set(value);
})//
.then();
}

@MessageMapping("greetings-with-channel")
Flux<GreetingResponse> greetParams(Flux<String> names) {
return names.map(String::toUpperCase).map(GreetingResponse::new);
}

@MessageMapping("greetings-stream")
Flux<GreetingResponse> greetFlux(Mono<String> name) {
return name.flatMapMany(
leNom -> Flux.fromStream(Stream.generate(() -> new GreetingResponse(leNom.toUpperCase()))).take(2));
}

@MessageMapping("greetings-with-name")
Mono<GreetingResponse> greetMono(Mono<String> name) {
return name.map(GreetingResponse::new);
}

@MessageMapping("greetings")
Mono<GreetingResponse> greet() {
log.info("invoking greetings and returning a GreetingsResponse.");
return Mono.just(new GreetingResponse("Hello, world!"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.springframework.retrosocket.nopairing;

import javax.annotation.PostConstruct;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Profile;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.retrosocket.EnableRSocketClients;

/**
* @author <a href="mailto:[email protected]">Josh Long</a>
*/
@Log4j2
@Profile("client")
@EnableRSocketClients
@SpringBootApplication
class RSocketClientConfiguration {
}
Loading