diff --git a/pom.xml b/pom.xml index b0409959d..31c6c33c9 100644 --- a/pom.xml +++ b/pom.xml @@ -194,6 +194,12 @@ 3.4.3 provided + + com.google.guava + guava + 18.0 + provided + diff --git a/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java index b472682b4..3eb37619e 100644 --- a/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java @@ -15,23 +15,46 @@ */ package com.corundumstudio.socketio.store; +import java.util.Map; +import java.util.concurrent.Executors; + import com.corundumstudio.socketio.store.pubsub.PubSubListener; import com.corundumstudio.socketio.store.pubsub.PubSubMessage; import com.corundumstudio.socketio.store.pubsub.PubSubStore; import com.corundumstudio.socketio.store.pubsub.PubSubType; -public class MemoryPubSubStore implements PubSubStore { +import com.google.common.collect.Maps; +import com.google.common.eventbus.AsyncEventBus; +public class MemoryPubSubStore implements PubSubStore { + + private final AsyncEventBus asyncEventBus = new AsyncEventBus(Executors.newFixedThreadPool(3)); + + private final Map> subtypeListener = Maps.newHashMap(); + + private final Long nodeId; + + public MemoryPubSubStore(Long nodeId){ + this.nodeId = nodeId; + } + @Override public void publish(PubSubType type, PubSubMessage msg) { + msg.setNodeId(nodeId); + asyncEventBus.post(msg); } @Override public void subscribe(PubSubType type, PubSubListener listener, Class clazz) { + subtypeListener.put(type, listener); + asyncEventBus.register(listener); } @Override public void unsubscribe(PubSubType type) { + PubSubListener ls = subtypeListener.get(type); + asyncEventBus.unregister(ls); + subtypeListener.remove(type); } @Override diff --git a/src/main/java/com/corundumstudio/socketio/store/MemoryStoreFactory.java b/src/main/java/com/corundumstudio/socketio/store/MemoryStoreFactory.java index 4be21dddc..0027c7888 100644 --- a/src/main/java/com/corundumstudio/socketio/store/MemoryStoreFactory.java +++ b/src/main/java/com/corundumstudio/socketio/store/MemoryStoreFactory.java @@ -25,7 +25,7 @@ public class MemoryStoreFactory extends BaseStoreFactory { - private final MemoryPubSubStore pubSubMemoryStore = new MemoryPubSubStore(); + private final MemoryPubSubStore pubSubMemoryStore = new MemoryPubSubStore(getNodeId()); @Override public Store createStore(UUID sessionId) { diff --git a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubListener.java b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubListener.java index 9b4881045..7c3cf82ff 100644 --- a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubListener.java +++ b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubListener.java @@ -15,9 +15,11 @@ */ package com.corundumstudio.socketio.store.pubsub; +import com.google.common.eventbus.Subscribe; public interface PubSubListener { + @Subscribe void onMessage(T data); }