|
16 | 16 | import redis.clients.jedis.JedisPoolConfig;
|
17 | 17 | import redis.clients.jedis.JedisPubSub;
|
18 | 18 |
|
| 19 | +import java.util.concurrent.CompletableFuture; |
| 20 | +import java.util.concurrent.ExecutorService; |
| 21 | +import java.util.concurrent.Executors; |
19 | 22 | import java.util.function.Consumer;
|
20 | 23 | import java.util.regex.Matcher;
|
21 | 24 | import java.util.regex.Pattern;
|
|
26 | 29 | public class RedisAPI {
|
27 | 30 | private static final String REDIS_FULL_URI_PATTERN = "rediss?:\\/\\/(?:(?<user>\\w+)?:(?<password>[\\w-]+)@)?(?<host>[\\w.-]+):(?<port>\\d+)";
|
28 | 31 | private static final String REDIS_URI_PATTERN = "rediss?:\\/\\/[\\w.-]+:\\d+";
|
| 32 | + private final ExecutorService executorService = Executors.newCachedThreadPool(); |
29 | 33 |
|
30 | 34 | @Getter
|
31 | 35 | private static RedisAPI instance = null;
|
@@ -163,31 +167,37 @@ public void setFilterID(String filterId) {
|
163 | 167 | }
|
164 | 168 |
|
165 | 169 | /**
|
166 |
| - * Publishes a message to the generated instances redis pool |
| 170 | + * Asynchronously publishes a message to the generated instances redis pool |
167 | 171 | * @param channel the channel object being published to, this is what should be registered on your other instances
|
168 | 172 | * @param message the message being sent across that channel
|
| 173 | + * @return CompletableFuture<Void> representing the asynchronous operation |
169 | 174 | */
|
170 |
| - public void publishMessage(RedisChannel channel, String message) { |
171 |
| - try (Jedis jedis = pool.getResource()) { |
172 |
| - jedis.publish(channel.channelName, "none" + ";" + message); |
173 |
| - } catch (Exception ex) { |
174 |
| - throw new MessageFailureException("Failed to send message to redis", ex); |
175 |
| - } |
| 175 | + public CompletableFuture<Void> publishMessage(RedisChannel channel, String message) { |
| 176 | + return CompletableFuture.runAsync(() -> { |
| 177 | + try (Jedis jedis = pool.getResource()) { |
| 178 | + jedis.publish(channel.channelName, "none" + ";" + message); |
| 179 | + } catch (Exception ex) { |
| 180 | + throw new MessageFailureException("Failed to send message to redis", ex); |
| 181 | + } |
| 182 | + }, executorService); |
176 | 183 | }
|
177 | 184 |
|
178 | 185 | /**
|
179 |
| - * Publishes a message to the generated instances redis pool |
| 186 | + * Asynchronously publishes a message to the generated instances redis pool |
180 | 187 | * @param filterId the filter id for the message being sent, this filter id is checked by all the receiving pools
|
181 | 188 | * to ensure that only a specific jedis pool handles the message
|
182 | 189 | * @param channel the channel object being published to, this is what should be registered on your other instances
|
183 | 190 | * @param message the message being sent across that channel
|
| 191 | + * @return CompletableFuture<Void> representing the asynchronous operation |
184 | 192 | */
|
185 |
| - public void publishMessage(String filterId, RedisChannel channel, String message) { |
186 |
| - try (Jedis jedis = pool.getResource()) { |
187 |
| - jedis.publish(channel.channelName, filterId + ";" + message); |
188 |
| - } catch (Exception ex) { |
189 |
| - throw new MessageFailureException("Failed to send message to redis", ex); |
190 |
| - } |
| 193 | + public CompletableFuture<Void> publishMessage(String filterId, RedisChannel channel, String message) { |
| 194 | + return CompletableFuture.runAsync(() -> { |
| 195 | + try (Jedis jedis = pool.getResource()) { |
| 196 | + jedis.publish(channel.channelName, filterId + ";" + message); |
| 197 | + } catch (Exception ex) { |
| 198 | + throw new MessageFailureException("Failed to send message to redis", ex); |
| 199 | + } |
| 200 | + }, executorService); |
191 | 201 | }
|
192 | 202 |
|
193 | 203 | /**
|
|
0 commit comments