Skip to content

Potential lambda enhancement #166

@regbo

Description

@regbo

I wanted to find a way to use lambdas with this library, so that listeners can be created like this:

MBassadorLFP<Date> bus = new MBassadorLFP<Date>();
var listener1 = bus.subscribe(d -> {
	System.out.println("event consumer 1: " + d);
});

I was able to achieve this by adding 2 methods and a couple of hacks that depend on this library:
https://github.com/jhalterman/typetools/

The methods are:

public Consumer<T> subscribe(Consumer<T> listener);

public Consumer<T> subscribe(Consumer<T> listener, HandlerOptions handlerOptions);

This allows me to simplify code as such:

Code:

MBassadorLFP<Date> bus = new MBassadorLFP<Date>();
var listener1 = bus.subscribe(d -> {
	System.out.println("event consumer 1: " + d);
});
Consumer<Date> listener2 = d -> {
	System.out.println("event consumer 2: " + d);
};
bus.subscribe(listener2);
System.out.println("should print 1 & 2");
bus.post(new Date()).now();
bus.unsubscribe(listener1);
System.out.println("should print 2");
bus.post(new Date()).now();
bus.unsubscribe(listener2);
System.out.println("should print none");
bus.post(new Date()).now();
System.err.println("done");

Output:

should print 1 & 2
event consumer 2: Thu Feb 25 18:30:13 EST 2021
event consumer 1: Thu Feb 25 18:30:13 EST 2021
should print 2
event consumer 2: Thu Feb 25 18:30:13 EST 2021
should print none

The HandlerOptions class represents a builder class for the handler annotation. It is accessed in a hacky way by tracking threads.
This works for me, but it'd be great if there was a way to have something similar implimented without the current thread hack. Below are the two class modifications

MBassadorLFP.java

@SuppressWarnings("rawtypes")
public class MBassadorLFP<T> extends MBassador<T> {

	private static final Object CONSUMER_ACCEPT_METHOD_NAME = "accept";

	private static final Map<Thread, HandlerOptions> HANDLER_OPTIONS_TRACKER = new ConcurrentHashMap<>();

	/**
	 * Default constructor using default setup. super() will also add a default
	 * publication error logger
	 */
	public MBassadorLFP() {
		this(getDefaultConfiguration());
	}

	/**
	 * Construct with default settings and specified publication error handler
	 *
	 * @param errorHandler
	 */
	public MBassadorLFP(IPublicationErrorHandler errorHandler) {
		this(getDefaultConfiguration().addPublicationErrorHandler(errorHandler));
	}

	/**
	 * Construct with fully specified configuration
	 *
	 * @param configuration
	 */
	public MBassadorLFP(IBusConfiguration configuration) {
		super(modifyConfiguration(configuration));
	}

	public Consumer<T> subscribe(Consumer<T> listener) {
		return subscribe(listener, null);
	}

	public Consumer<T> subscribe(Consumer<T> listener, HandlerOptions handlerOptions) {
		if (handlerOptions == null)
			handlerOptions = new HandlerOptions();
		var thread = Thread.currentThread();
		HANDLER_OPTIONS_TRACKER.put(thread, handlerOptions);
		try {
			subscribe((Object) listener);
		} finally {
			HANDLER_OPTIONS_TRACKER.remove(thread);
		}
		return listener;
	}

	private static IBusConfiguration modifyConfiguration(IBusConfiguration configuration) {
		if (configuration == null)
			configuration = getDefaultConfiguration();
		var syncPubSub = configuration.getFeature(Feature.SyncPubSub.class);
		if (syncPubSub == null) {
			syncPubSub = Feature.SyncPubSub.Default();
			configuration.addFeature(syncPubSub);
		}
		var delegate = syncPubSub.getMetadataReader();
		syncPubSub.setMetadataReader(new MetadataReader() {

			@Override
			public MessageListener getMessageListener(Class target) {
				MessageListener messageListener;
				if (delegate != null)
					messageListener = delegate.getMessageListener(target);
				else
					messageListener = super.getMessageListener(target);
				return modifyMessageListener(target, messageListener);
			}
		});
		return configuration;
	}

	private static MessageListener modifyMessageListener(Class target, MessageListener messageListener) {
		if (!Consumer.class.isAssignableFrom(target))
			return messageListener;
		var handlers = messageListener.getHandlers();
		if (handlers != null && handlers.length > 0)
			return messageListener;
		MessageHandler messageHandler = getConsumerMessageHandler(target, messageListener);
		if (messageHandler != null)
			messageListener.addHandler(messageHandler);
		return messageListener;
	}

	@SuppressWarnings("unchecked")
	private static MessageHandler getConsumerMessageHandler(Class target, MessageListener messageListener) {
		var handlerOptions = HANDLER_OPTIONS_TRACKER.get(Thread.currentThread());
		if (handlerOptions == null)
			return null;
		var applyMethods = getConsumerAcceptMethods(target);
		if (applyMethods.length != 1)
			return null;
//*** WHERE THINGS GET SPICY ***
		var rawArgument = TypeResolver.resolveRawArgument(Consumer.class, target);
		if (rawArgument == null || TypeResolver.Unknown.class.equals(rawArgument))
			return null;
		var handler = applyMethods[0];
		Handler handlerConfig = handlerOptions.toHandler();
		Enveloped enveloped = new Enveloped() {

			@Override
			public Class<? extends Annotation> annotationType() {
				return Enveloped.class;
			}

			@Override
			public Class[] messages() {
				return new Class[] { rawArgument };
			}
		};
		if (!handlerConfig.enabled())
			return null;
		var handlerProperties = MessageHandler.Properties.Create(handler, handlerConfig, enveloped,
				new IMessageFilter[0], messageListener);
		handlerProperties.put(MessageHandler.Properties.Enveloped, false);
		MessageHandler handlerMetadata = new MessageHandler(handlerProperties);
		return handlerMetadata;
	}

	private static Method[] getConsumerAcceptMethods(Class target) {
		var methods = ReflectionUtils.getMethods(m -> {
			if (Modifier.isAbstract(m.getModifiers()))
				return false;
			if (!CONSUMER_ACCEPT_METHOD_NAME.equals(m.getName()))
				return false;
			if (m.getParameterCount() != 1)
				return false;
			if (!Object.class.isAssignableFrom(m.getParameterTypes()[0]))
				return false;
			return true;
		}, target);
		return methods;
	}

	private static IBusConfiguration getDefaultConfiguration() {
		return new BusConfiguration().addFeature(Feature.SyncPubSub.Default())
				.addFeature(Feature.AsynchronousHandlerInvocation.Default())
				.addFeature(Feature.AsynchronousMessageDispatch.Default());
	}

}

HandlerOptions.java

public class HandlerOptions {

	private static final Method HandlerOptions_toHandler_METHOD;
	static {
		try {
			HandlerOptions_toHandler_METHOD = HandlerOptions.class.getMethod("toHandler");
		} catch (NoSuchMethodException e) {
			throw new RuntimeException(e);
		}
	}

	private Filter[] filters;
	private String condition;
	private Invoke delivery;
	private Integer priority;
	private Boolean rejectSubtypes;
	private Boolean enabled;
	private Class<? extends HandlerInvocation> invocation;

	@Handler
	public Handler toHandler() {
		var handler = HandlerOptions_toHandler_METHOD.getAnnotation(Handler.class);
		return new Handler() {

			@Override
			public Class<? extends Annotation> annotationType() {
				return handler.annotationType();
			}

			@Override
			public Filter[] filters() {
				return Optional.ofNullable(filters).orElse(handler.filters());
			}

			@Override
			public String condition() {
				return Optional.ofNullable(condition).orElse(handler.condition());
			}

			@Override
			public Invoke delivery() {
				return Optional.ofNullable(delivery).orElse(handler.delivery());
			}

			@Override
			public int priority() {
				return Optional.ofNullable(priority).orElse(handler.priority());
			}

			@Override
			public boolean rejectSubtypes() {
				return Optional.ofNullable(rejectSubtypes).orElse(handler.rejectSubtypes());
			}

			@Override
			public boolean enabled() {
				return Optional.ofNullable(enabled).orElse(handler.enabled());
			}

			@Override
			public Class<? extends HandlerInvocation> invocation() {
				return Optional.ofNullable(invocation).orElse((Class) handler.invocation());
			}
		};
	}

	public HandlerOptions() {

	}

	public HandlerOptions(Filter[] filters, String condition, Invoke delivery, Integer priority, Boolean rejectSubtypes,
			Boolean enabled, Class<? extends HandlerInvocation> invocation) {
		this.filters = filters;
		this.condition = condition;
		this.delivery = delivery;
		this.priority = priority;
		this.rejectSubtypes = rejectSubtypes;
		this.enabled = enabled;
		this.invocation = invocation;
	}

	public HandlerOptions setFilters(Filter[] filters) {
		this.filters = filters;
		return HandlerOptions.this;
	}

	public Filter[] getFilters() {
		return this.filters;
	}

	public HandlerOptions setCondition(String condition) {
		this.condition = condition;
		return HandlerOptions.this;
	}

	public String getCondition() {
		return this.condition;
	}

	public HandlerOptions setDelivery(Invoke delivery) {
		this.delivery = delivery;
		return HandlerOptions.this;
	}

	public Invoke getDelivery() {
		return this.delivery;
	}

	public HandlerOptions setPriority(Integer priority) {
		this.priority = priority;
		return HandlerOptions.this;
	}

	public Integer getPriority() {
		return this.priority;
	}

	public HandlerOptions setRejectSubtypes(Boolean rejectSubtypes) {
		this.rejectSubtypes = rejectSubtypes;
		return HandlerOptions.this;
	}

	public Boolean getRejectSubtypes() {
		return this.rejectSubtypes;
	}

	public HandlerOptions setEnabled(Boolean enabled) {
		this.enabled = enabled;
		return HandlerOptions.this;
	}

	public Boolean getEnabled() {
		return this.enabled;
	}

	public HandlerOptions setInvocation(Class<? extends HandlerInvocation> invocation) {
		this.invocation = invocation;
		return HandlerOptions.this;
	}

	public Class<? extends HandlerInvocation> getInvocation() {
		return this.invocation;
	}

}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions