8
8
import lombok .Getter ;
9
9
import lombok .Setter ;
10
10
import net .swofty .redisapi .exceptions .ChannelAlreadyRegisteredException ;
11
+ import net .swofty .redisapi .exceptions .MessageFailureException ;
11
12
import redis .clients .jedis .Jedis ;
12
13
import redis .clients .jedis .JedisPool ;
13
14
import redis .clients .jedis .JedisPoolConfig ;
@@ -53,22 +54,30 @@ public static RedisAPI generateInstance(String uri, String password) {
53
54
throw new CouldNotConnectToRedisException ("Either invalid Redis URI passed through; '" + uri + "' OR invalid Redis Password passed through; '" + password + "'" );
54
55
}
55
56
56
- Jedis jedis = api .getPool ().getResource ();
57
- jedis .connect ();
58
-
59
- EventRegistry .pubSub = new JedisPubSub () {
60
- @ Override
61
- public void onMessage (String channel , String message ) {
62
- System .out .println ("Received " + message );
63
- EventRegistry .handleAll (channel , message );
64
- super .onMessage (channel , message );
65
- }
66
- };
67
-
68
57
instance = api ;
69
58
return api ;
70
59
}
71
60
61
+ /**
62
+ * Starts listeners for the Redis Pub/Sub channels
63
+ */
64
+ public void startListeners () {
65
+ new Thread (() -> {
66
+ try (Jedis jedis = getPool ().getResource ()) {
67
+ EventRegistry .pubSub = new JedisPubSub () {
68
+ @ Override
69
+ public void onMessage (String channel , String message ) {
70
+ EventRegistry .handleAll (channel , message );
71
+ }
72
+ };
73
+ jedis .subscribe (EventRegistry .pubSub , ChannelRegistry .registeredChannels .stream ().map ((e ) -> e .channelName ).toArray (String []::new ));
74
+ getPool ().returnResource (jedis );
75
+ } catch (Exception e ) {
76
+ e .printStackTrace ();
77
+ }
78
+ }).start ();
79
+ }
80
+
72
81
/**
73
82
* Creates a new main Redis pool instance, there will only ever be one at a time so #getInstance should be used after generation
74
83
* @param uri the URI used to connect to the Redis server running
@@ -94,9 +103,11 @@ public void setFilterID(String filterId) {
94
103
* @param message the message being sent across that channel
95
104
*/
96
105
public void publishMessage (RedisChannel channel , String message ) {
97
- Jedis jedis = pool .getResource ();
98
- jedis .connect ();
99
- jedis .publish (channel .channelName , "all" + ";" + message );
106
+ try (Jedis jedis = pool .getResource ()) {
107
+ jedis .publish (channel .channelName , "none" + ";" + message );
108
+ } catch (Exception ex ) {
109
+ throw new MessageFailureException ("Failed to send message to redis" , ex );
110
+ }
100
111
}
101
112
102
113
/**
@@ -107,9 +118,11 @@ public void publishMessage(RedisChannel channel, String message) {
107
118
* @param message the message being sent across that channel
108
119
*/
109
120
public void publishMessage (String filterId , RedisChannel channel , String message ) {
110
- Jedis jedis = pool .getResource ();
111
- jedis .connect ();
112
- jedis .publish (channel .channelName , filterId + ";" + message );
121
+ try (Jedis jedis = pool .getResource ()) {
122
+ jedis .publish (channel .channelName , filterId + ";" + message );
123
+ } catch (Exception ex ) {
124
+ throw new MessageFailureException ("Failed to send message to redis" , ex );
125
+ }
113
126
}
114
127
115
128
/**
@@ -139,4 +152,4 @@ public RedisChannel registerChannel(String channelName, @NonNull Consumer<RedisM
139
152
ChannelRegistry .registerChannel (channel );
140
153
return channel ;
141
154
}
142
- }
155
+ }
0 commit comments