11package fr .rakambda .channelpointsminer .miner .api .hermes ;
22
3- import java .util .Collection ;
4- import java .util .Objects ;
5- import java .util .Queue ;
6- import java .util .concurrent .ConcurrentLinkedQueue ;
7- import static java .time .temporal .ChronoUnit .MINUTES ;
8- import static org .java_websocket .framing .CloseFrame .ABNORMAL_CLOSE ;
9- import static org .java_websocket .framing .CloseFrame .NORMAL ;
10- import fr .rakambda .channelpointsminer .miner .api .hermes .data .request .topic .Topic ;
11- import fr .rakambda .channelpointsminer .miner .api .hermes .data .request .topic .Topics ;
3+ import com .fasterxml .jackson .core .type .TypeReference ;
124import fr .rakambda .channelpointsminer .miner .api .hermes .data .response .ITwitchHermesWebSocketResponse ;
13- import fr .rakambda .channelpointsminer .miner .api .hermes .data .response .MessageResponseHermes ;
5+ import fr .rakambda .channelpointsminer .miner .api .hermes .data .response .NotificationResponse ;
6+ import fr .rakambda .channelpointsminer .miner .api .hermes .data .response .UnsubscribeResponse ;
7+ import fr .rakambda .channelpointsminer .miner .api .hermes .data .response .notification .PubSubNotificationType ;
8+ import fr .rakambda .channelpointsminer .miner .api .passport .TwitchClient ;
9+ import fr .rakambda .channelpointsminer .miner .api .passport .TwitchLogin ;
10+ import fr .rakambda .channelpointsminer .miner .api .pubsub .ITwitchPubSubMessageListener ;
11+ import fr .rakambda .channelpointsminer .miner .api .pubsub .data .message .IPubSubMessage ;
12+ import fr .rakambda .channelpointsminer .miner .api .pubsub .data .request .topic .Topic ;
1413import fr .rakambda .channelpointsminer .miner .factory .TimeFactory ;
1514import fr .rakambda .channelpointsminer .miner .factory .TwitchWebSocketClientFactory ;
15+ import fr .rakambda .channelpointsminer .miner .util .json .JacksonUtils ;
1616import lombok .extern .log4j .Log4j2 ;
1717import org .java_websocket .client .WebSocketClient ;
1818import org .jetbrains .annotations .NotNull ;
1919import org .jetbrains .annotations .Nullable ;
20+ import java .io .IOException ;
21+ import java .util .Collection ;
22+ import java .util .Map ;
23+ import java .util .Objects ;
24+ import java .util .Queue ;
25+ import java .util .concurrent .ConcurrentHashMap ;
26+ import java .util .concurrent .ConcurrentLinkedQueue ;
27+ import static java .time .temporal .ChronoUnit .MINUTES ;
28+ import static org .java_websocket .framing .CloseFrame .ABNORMAL_CLOSE ;
29+ import static org .java_websocket .framing .CloseFrame .NORMAL ;
2030
2131@ Log4j2
22- public class TwitchHermesWebSocketPool implements AutoCloseable , ITwitchHermesWebSocketListener {
32+ public class TwitchHermesWebSocketPool implements AutoCloseable , ITwitchHermesWebSocketListener {
2333 private static final int SOCKET_TIMEOUT_MINUTES = 5 ;
2434
35+ private final int maxSubscriptionPerClient ;
36+ private final TwitchLogin twitchLogin ;
37+
2538 private final Collection <TwitchHermesWebSocketClient > clients ;
2639 private final Collection <ITwitchHermesMessageListener > listeners ;
27- private final Queue <Topics > pendingTopics ;
28- private final int maxTopicPerClient ;
40+ private final Collection <ITwitchPubSubMessageListener > pubSubListeners ;
41+ private final Queue <Topic > pendingTopics ;
42+
43+ private final Map <String , fr .rakambda .channelpointsminer .miner .api .pubsub .data .request .topic .Topic > topics ;
2944
30- public TwitchHermesWebSocketPool (int maxTopicPerClient ){
31- this .maxTopicPerClient = maxTopicPerClient ;
45+ public TwitchHermesWebSocketPool (int maxSubscriptionPerClient , @ NotNull TwitchLogin twitchLogin ){
46+ this .maxSubscriptionPerClient = maxSubscriptionPerClient ;
47+ this .twitchLogin = twitchLogin ;
48+
3249 clients = new ConcurrentLinkedQueue <>();
3350 listeners = new ConcurrentLinkedQueue <>();
51+ pubSubListeners = new ConcurrentLinkedQueue <>();
3452 pendingTopics = new ConcurrentLinkedQueue <>();
53+ topics = new ConcurrentHashMap <>();
3554 }
3655
3756 public void ping (){
3857 checkStaleConnection ();
39-
40- clients .stream ()
41- .filter (WebSocketClient ::isOpen )
42- .filter (client -> !client .isClosing ())
43- .forEach (TwitchHermesWebSocketClient ::ping );
4458 }
4559
4660 public void checkStaleConnection (){
@@ -49,78 +63,99 @@ public void checkStaleConnection(){
4963 .forEach (client -> client .close (ABNORMAL_CLOSE , "Timeout reached" ));
5064 }
5165
52- public void removeTopic (@ NotNull Topic topic ){
66+ public void removePubSubTopic (@ NotNull Topic topic ){
67+ var subscriptionId = topics .entrySet ().stream ().filter (e -> Objects .equals (e .getValue (), topic )).findFirst ();
68+ if (subscriptionId .isEmpty ()){
69+ return ;
70+ }
5371 clients .stream ()
54- .filter (client -> client .isTopicListened (topic ))
55- .forEach (client -> client .removeTopic ( topic ));
72+ .filter (client -> client .isPubSubTopicListened (topic ))
73+ .forEach (client -> client .removeSubscription ( subscriptionId . get (). getKey () ));
5674 }
5775
5876 public void addListener (@ NotNull ITwitchHermesMessageListener listener ){
5977 listeners .add (listener );
6078 }
6179
80+ public void addPubSubListener (@ NotNull ITwitchPubSubMessageListener listener ){
81+ pubSubListeners .add (listener );
82+ }
83+
6284 @ Override
6385 public void onWebSocketMessage (@ NotNull ITwitchHermesWebSocketResponse response ){
64- if (response instanceof MessageResponseHermes m ){
65- var topic = m .getData ().getTopic ();
66- var message = m .getData ().getMessage ();
67- listeners .forEach (l -> l .onTwitchMessage (topic , message ));
86+ if (response instanceof UnsubscribeResponse u ){
87+ topics .remove (u .getUnsubscribeResponse ().getSubscription ().getId ());
88+ }
89+ if (response instanceof NotificationResponse n ){
90+ if (n .getNotification () instanceof PubSubNotificationType t ){
91+ try {
92+ var topic = topics .get (n .getNotification ().getSubscription ().getId ());
93+ var message = JacksonUtils .read (t .getPubsub (), new TypeReference <IPubSubMessage >(){});
94+ pubSubListeners .forEach (l -> l .onTwitchMessage (topic , message ));
95+ }
96+ catch (IOException e ){
97+ log .error ("Failed to parse PubSub notification from Hermes {}" , t .getPubsub (), e );
98+ }
99+ }
68100 }
69101 }
70102
71103 @ Override
72104 public void onWebSocketClosed (@ NotNull TwitchHermesWebSocketClient client , int code , @ Nullable String reason , boolean remote ){
73105 clients .remove (client );
74106 if (code != NORMAL ){
75- pendingTopics .addAll (client .getTopics ());
107+ pendingTopics .addAll (client .getSubscribeRequests (). keySet (). stream (). map ( topics :: get ). filter ( Objects :: nonNull ). toList ());
76108 }
77109 }
78110
79- public void listenPendingTopics (){
111+ public void listenPendingPubSubTopics (){
80112 try {
81- Topics topic ;
113+ Topic topic ;
82114 while (Objects .nonNull (topic = pendingTopics .poll ())){
83115 listenTopic (topic );
84116 }
85117 }
86118 catch (RuntimeException e ){
87- log .error ("Failed to join pending chats " , e );
119+ log .error ("Failed to join pending subscriptions " , e );
88120 }
89121 }
90122
91- public void listenTopic (@ NotNull Topics topics ){
92- var isListened = topics .getTopics ().stream ().anyMatch (this ::isTopicListened );
93- if (isListened ){
123+ public void listenTopic (@ NotNull Topic topic ){
124+ if (isTopicListened (topic )){
94125 log .debug ("Topic {} is already being listened" , topics );
95126 return ;
96127 }
97128
98129 try {
99- getAvailableClient ().listenTopic ( topics );
130+ getAvailableClient ().listenPubSubTopic ( topic ). ifPresent ( subscriptionId -> topics . put ( subscriptionId , topic ) );
100131 }
101132 catch (RuntimeException e ){
102- pendingTopics .add (topics );
133+ pendingTopics .add (topic );
103134 throw e ;
104135 }
105136 }
106137
107138 private boolean isTopicListened (@ NotNull Topic topic ){
108- return clients .stream ().anyMatch (client -> client .isTopicListened (topic ));
139+ return clients .stream ().anyMatch (client -> client .isPubSubTopicListened (topic ));
109140 }
110141
111142 @ NotNull
112143 private TwitchHermesWebSocketClient getAvailableClient (){
113144 return clients .stream ()
114145 .filter (client -> !client .isClosing () && !client .isClosed ())
115- .filter (client -> client .getTopicCount () < maxTopicPerClient )
146+ .filter (client -> client .getSubscriptionCount () < maxSubscriptionPerClient )
116147 .findAny ()
117- .orElseGet (this ::createNewClient );
148+ .orElseGet (() -> {
149+ var client = createNewClient ();
150+ client .authenticate (twitchLogin );
151+ return client ;
152+ });
118153 }
119154
120155 @ NotNull
121156 public TwitchHermesWebSocketClient createNewClient (){
122157 try {
123- var client = TwitchWebSocketClientFactory .createHermesClient ();
158+ var client = TwitchWebSocketClientFactory .createHermesClient (TwitchClient . WEB );
124159 log .debug ("Created Hermes WebSocket client with uuid {}" , client .getUuid ());
125160 client .addListener (this );
126161 client .connectBlocking ();
0 commit comments